Compare commits

..

7 Commits

Author SHA1 Message Date
Tyler Goodlet 4fbd469c33 Update `._entry` actor status log
Log-report the different types of actor exit conditions including cancel
via KBI, error or normal return with varying levels depending on case.

Also, start proto-ing out this weird ascii-syntax idea for describing
conc system states and implement the first bit in a `nest_from_op()`
log-message fmter that joins and indents an obj `repr()` with
a tree-like `'>)\n|_'` header.
2024-06-28 18:45:52 -04:00
Tyler Goodlet cb90f3e6ba Update `MsgTypeError` content matching to latest 2024-06-28 14:46:29 -04:00
Tyler Goodlet 5e009a8229 Further formalize `greenback` integration
Since we more or less require it for `tractor.pause_from_sync()` this
refines enable toggles and their relay down the actor tree as well as
more explicit logging around init and activation.

Tweaks summary:
- `.info()` report the module if discovered during root boot.
- use a `._state._runtime_vars['use_greenback']: bool` activation flag
  inside `Actor._from_parent()` to determine if the sub should try to
  use it and set to `False` if mod-loading fails / not installed.
- expose `maybe_init_greenback()` from `.devx` sugpkg.
- comment out RTE in `._pause()` for now since we already have it in
  `.pause_from_sync()`.
- always `.exception()` on `maybe_init_greenback()` import errors to
  clarify the underlying failure deats.
- always explicitly report if `._state._runtime_vars['use_greenback']`
  was NOT set when `.pause_from_sync()` is called.

Other `._runtime.async_main()` adjustments:
- combine the "internal error call ur parents" message and the failed
  registry contact status into one new `err_report: str`.
- drop the final exception handler's call to
  `Actor.lifetime_stack.close()` since we're already doing it in the
  `finally:` block and the earlier call has no currently known benefit.
- only report on the `.lifetime_stack()` callbacks if any are detected
  as registered.
2024-06-28 14:45:45 -04:00
Tyler Goodlet b72a025d0f Always reset `._state._ctxvar_Context` to prior
Not sure how I forgot this but, obviously it's correct context-var
semantics to revert the current IPC `Context` (set in the latest
`.open_context()` block) such that any prior instance is reset..

This ensures the sanity `assert`s pass inside
`.msg._ops.maybe_limit_plds()` and just in general ensures for any task
that the last opened `Context` is the one returned from
`current_ipc_ctx()`.
2024-06-28 12:59:31 -04:00
Tyler Goodlet 5739e79645 Use `delay=0` in pump loop..
Turns out it does work XD

Prior presumption was from before I had the fute poll-loop so makes
sense we needed more then one sched-tick's worth of context switch vs.
now we can just keep looping-n-pumping as fast possible until the
guest-run's main task completes.

Also,
- minimize the preface commentary (as per todo) now that we have tests
  codifying all the edge cases :finger_crossed:
- parameter-ize the pump-loop-cycle delay and default it to 0.
2024-06-27 19:27:59 -04:00
Tyler Goodlet 2ac999cc3c Prep for legacy RPC API factor-n-remove
This change is adding commentary about the upcoming API removal and
simplification of nursery + portal internals; no actual code changes are
included.

The plan to (re)move the old RPC methods:
- `ActorNursery.run_in_actor()`
- `Portal.run()`
- `Portal.run_from_ns()`

and any related impl internals out of each conc-primitive and instead
into something like a `.hilevel.rpc` set of APIs which then are all
implemented using the newer and more lowlevel `Context`/`MsgStream`
primitives instead Bo

Further,
- formally deprecate the `Portal.result()` meth for
  `.wait_for_result()`.
- only `log.info()` about runtime shutdown in the implicit root case.
2024-06-27 16:25:46 -04:00
Tyler Goodlet 9f9b0b17dc Add a `Context.portal`, more cancel tooing
Might as well add a public maybe-getter for use on the "parent" side
since it can be handy to check out-of-band cancellation conditions (like
from `Portal.cancel_actor()`).

Buncha bitty tweaks for more easily debugging cancel conditions:
- add a `@.cancel_called.setter` for hooking into `.cancel_called = True`
  being set in hard to decipher "who cancelled us" scenarios.
- use a new `self_ctxc: bool` var in `.cancel()` to capture the output
  state from `._is_self_cancelled(remote_error)` at call time so it can
  be compared against the measured value at crash-time (when REPL-ing it
  can often have already changed due to runtime teardown sequencing vs.
  the crash handler hook entry).
- proxy `hide_tb` to `.drain_to_final_msg()` from `.wait_for_result()`.
- use `remote_error.sender` attr directly instead of through
  `RAE.msgdata: dict` lookup.
- change var name `our_uid` -> `peer_uid`; it's not "ours"..

Other various docs/comment updates:
- extend the main class doc to include some other name ideas.
- change over all remaining `.result()` refs to `.wait_for_result()`.
- doc more details on how we want `.outcome` to eventually signature.
2024-06-26 16:26:18 -04:00
12 changed files with 543 additions and 224 deletions

View File

@ -285,14 +285,14 @@ def test_basic_payload_spec(
if invalid_started: if invalid_started:
msg_type_str: str = 'Started' msg_type_str: str = 'Started'
bad_value_str: str = '10' bad_value: int = 10
elif invalid_return: elif invalid_return:
msg_type_str: str = 'Return' msg_type_str: str = 'Return'
bad_value_str: str = "'yo'" bad_value: str = 'yo'
else: else:
# XXX but should never be used below then.. # XXX but should never be used below then..
msg_type_str: str = '' msg_type_str: str = ''
bad_value_str: str = '' bad_value: str = ''
maybe_mte: MsgTypeError|None = None maybe_mte: MsgTypeError|None = None
should_raise: Exception|None = ( should_raise: Exception|None = (
@ -307,8 +307,10 @@ def test_basic_payload_spec(
raises=should_raise, raises=should_raise,
ensure_in_message=[ ensure_in_message=[
f"invalid `{msg_type_str}` msg payload", f"invalid `{msg_type_str}` msg payload",
f"value: `{bad_value_str}` does not " f'{bad_value}',
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`", f'has type {type(bad_value)!r}',
'not match type-spec',
f'`{msg_type_str}.pld: PldMsg|NoneType`',
], ],
# only for debug # only for debug
# post_mortem=True, # post_mortem=True,

View File

@ -38,6 +38,7 @@ from collections import deque
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from contextvars import Token
from dataclasses import ( from dataclasses import (
dataclass, dataclass,
field, field,
@ -121,10 +122,19 @@ class Unresolved:
@dataclass @dataclass
class Context: class Context:
''' '''
An inter-actor, SC transitive, `Task` communication context. An inter-actor, SC transitive, `trio.Task` (pair)
communication context.
NB: This class should **never be instatiated directly**, it is allocated (We've also considered other names and ideas:
by the runtime in 2 ways: - "communicating tasks scope": cts
- "distributed task scope": dts
- "communicating tasks context": ctc
**Got a better idea for naming? Make an issue dawg!**
)
NB: This class should **never be instatiated directly**, it is
allocated by the runtime in 2 ways:
- by entering `Portal.open_context()` which is the primary - by entering `Portal.open_context()` which is the primary
public API for any "parent" task or, public API for any "parent" task or,
- by the RPC machinery's `._rpc._invoke()` as a `ctx` arg - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg
@ -210,6 +220,16 @@ class Context:
# more the the `Context` is needed? # more the the `Context` is needed?
_portal: Portal | None = None _portal: Portal | None = None
@property
def portal(self) -> Portal|None:
'''
Return any wrapping memory-`Portal` if this is
a 'parent'-side task which called `Portal.open_context()`,
otherwise `None`.
'''
return self._portal
# NOTE: each side of the context has its own cancel scope # NOTE: each side of the context has its own cancel scope
# which is exactly the primitive that allows for # which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC. # cross-actor-task-supervision and thus SC.
@ -299,6 +319,8 @@ class Context:
# boxed exception. NOW, it's used for spawning overrun queuing # boxed exception. NOW, it's used for spawning overrun queuing
# tasks when `.allow_overruns == True` !!! # tasks when `.allow_overruns == True` !!!
_scope_nursery: trio.Nursery|None = None _scope_nursery: trio.Nursery|None = None
# ^-TODO-^ change name?
# -> `._scope_tn` "scope task nursery"
# streaming overrun state tracking # streaming overrun state tracking
_in_overrun: bool = False _in_overrun: bool = False
@ -408,10 +430,23 @@ class Context:
''' '''
return self._cancel_called return self._cancel_called
@cancel_called.setter
def cancel_called(self, val: bool) -> None:
'''
Set the self-cancelled request `bool` value.
'''
# to debug who frickin sets it..
# if val:
# from .devx import pause_from_sync
# pause_from_sync()
self._cancel_called = val
@property @property
def canceller(self) -> tuple[str, str]|None: def canceller(self) -> tuple[str, str]|None:
''' '''
``Actor.uid: tuple[str, str]`` of the (remote) `Actor.uid: tuple[str, str]` of the (remote)
actor-process who's task was cancelled thus causing this actor-process who's task was cancelled thus causing this
(side of the) context to also be cancelled. (side of the) context to also be cancelled.
@ -515,7 +550,7 @@ class Context:
# the local scope was never cancelled # the local scope was never cancelled
# and instead likely we received a remote side # and instead likely we received a remote side
# # cancellation that was raised inside `.result()` # # cancellation that was raised inside `.wait_for_result()`
# or ( # or (
# (se := self._local_error) # (se := self._local_error)
# and se is re # and se is re
@ -585,6 +620,10 @@ class Context:
self, self,
error: BaseException, error: BaseException,
# TODO: manual toggle for cases where we wouldn't normally
# mark ourselves cancelled but want to?
# set_cancel_called: bool = False,
) -> None: ) -> None:
''' '''
(Maybe) cancel this local scope due to a received remote (Maybe) cancel this local scope due to a received remote
@ -603,7 +642,7 @@ class Context:
- `Portal.open_context()` - `Portal.open_context()`
- `Portal.result()` - `Portal.result()`
- `Context.open_stream()` - `Context.open_stream()`
- `Context.result()` - `Context.wait_for_result()`
when called/closed by actor local task(s). when called/closed by actor local task(s).
@ -729,7 +768,7 @@ class Context:
# Cancel the local `._scope`, catch that # Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error # `._scope.cancelled_caught` and re-raise any remote error
# once exiting (or manually calling `.result()`) the # once exiting (or manually calling `.wait_for_result()`) the
# `.open_context()` block. # `.open_context()` block.
cs: trio.CancelScope = self._scope cs: trio.CancelScope = self._scope
if ( if (
@ -764,8 +803,9 @@ class Context:
# `trio.Cancelled` subtype here ;) # `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n' message: str = 'Cancelling `Context._scope` !\n\n'
# from .devx import pause_from_sync
# pause_from_sync()
self._scope.cancel() self._scope.cancel()
else: else:
message: str = 'NOT cancelling `Context._scope` !\n\n' message: str = 'NOT cancelling `Context._scope` !\n\n'
# from .devx import mk_pdb # from .devx import mk_pdb
@ -845,15 +885,15 @@ class Context:
@property @property
def repr_api(self) -> str: def repr_api(self) -> str:
return 'Portal.open_context()'
# TODO: use `.dev._frame_stack` scanning to find caller!
# ci: CallerInfo|None = self._caller_info # ci: CallerInfo|None = self._caller_info
# if ci: # if ci:
# return ( # return (
# f'{ci.api_nsp}()\n' # f'{ci.api_nsp}()\n'
# ) # )
# TODO: use `.dev._frame_stack` scanning to find caller!
return 'Portal.open_context()'
async def cancel( async def cancel(
self, self,
timeout: float = 0.616, timeout: float = 0.616,
@ -889,7 +929,8 @@ class Context:
''' '''
side: str = self.side side: str = self.side
self._cancel_called: bool = True # XXX for debug via the `@.setter`
self.cancel_called = True
header: str = ( header: str = (
f'Cancelling ctx with peer from {side.upper()} side\n\n' f'Cancelling ctx with peer from {side.upper()} side\n\n'
@ -912,7 +953,7 @@ class Context:
# `._scope.cancel()` since we expect the eventual # `._scope.cancel()` since we expect the eventual
# `ContextCancelled` from the other side to trigger this # `ContextCancelled` from the other side to trigger this
# when the runtime finally receives it during teardown # when the runtime finally receives it during teardown
# (normally in `.result()` called from # (normally in `.wait_for_result()` called from
# `Portal.open_context().__aexit__()`) # `Portal.open_context().__aexit__()`)
if side == 'parent': if side == 'parent':
if not self._portal: if not self._portal:
@ -1025,10 +1066,10 @@ class Context:
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
our_uid: tuple = self.chan.uid peer_uid: tuple = self.chan.uid
# XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption
# for "graceful cancellation" case: # for "graceful cancellation" case(s):
# #
# Whenever a "side" of a context (a `Task` running in # Whenever a "side" of a context (a `Task` running in
# an actor) **is** the side which requested ctx # an actor) **is** the side which requested ctx
@ -1045,9 +1086,11 @@ class Context:
# set to the `Actor.uid` of THIS task (i.e. the # set to the `Actor.uid` of THIS task (i.e. the
# cancellation requesting task's actor is the actor # cancellation requesting task's actor is the actor
# checking whether it should absorb the ctxc). # checking whether it should absorb the ctxc).
self_ctxc: bool = self._is_self_cancelled(remote_error)
if ( if (
self_ctxc
and
not raise_ctxc_from_self_call not raise_ctxc_from_self_call
and self._is_self_cancelled(remote_error)
# TODO: ?potentially it is useful to emit certain # TODO: ?potentially it is useful to emit certain
# warning/cancel logs for the cases where the # warning/cancel logs for the cases where the
@ -1077,8 +1120,8 @@ class Context:
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.boxed_type is StreamOverrun and remote_error.boxed_type is StreamOverrun
# and tuple(remote_error.msgdata['sender']) == our_uid # and tuple(remote_error.msgdata['sender']) == peer_uid
and tuple(remote_error.sender) == our_uid and tuple(remote_error.sender) == peer_uid
): ):
# NOTE: we set the local scope error to any "self # NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing" # cancellation" error-response thus "absorbing"
@ -1140,9 +1183,9 @@ class Context:
of the remote cancellation. of the remote cancellation.
''' '''
__tracebackhide__ = hide_tb __tracebackhide__: bool = hide_tb
assert self._portal, ( assert self._portal, (
"Context.result() can not be called from callee side!" '`Context.wait_for_result()` can not be called from callee side!'
) )
if self._final_result_is_set(): if self._final_result_is_set():
return self._result return self._result
@ -1197,10 +1240,11 @@ class Context:
# raising something we know might happen # raising something we know might happen
# during cancellation ;) # during cancellation ;)
(not self._cancel_called) (not self._cancel_called)
) ),
hide_tb=hide_tb,
) )
# TODO: eventually make `.outcome: Outcome` and thus return # TODO: eventually make `.outcome: Outcome` and thus return
# `self.outcome.unwrap()` here! # `self.outcome.unwrap()` here?
return self.outcome return self.outcome
# TODO: switch this with above! # TODO: switch this with above!
@ -1284,17 +1328,24 @@ class Context:
Any| Any|
RemoteActorError| RemoteActorError|
ContextCancelled ContextCancelled
# TODO: make this a `outcome.Outcome`!
): ):
''' '''
The final "outcome" from an IPC context which can either be Return the "final outcome" (state) of the far end peer task
some Value returned from the target `@context`-decorated non-blocking. If the remote task has not completed then this
remote task-as-func, or an `Error` wrapping an exception field always resolves to the module defined `Unresolved`
raised from an RPC task fault or cancellation. handle.
Note that if the remote task has not terminated then this ------ - ------
field always resolves to the module defined `Unresolved` handle. TODO->( this is doc-driven-dev content not yet actual ;P )
TODO: implement this using `outcome.Outcome` types? The final "outcome" from an IPC context which can be any of:
- some `outcome.Value` which boxes the returned output from the peer task's
`@context`-decorated remote task-as-func, or
- an `outcome.Error` wrapping an exception raised that same RPC task
after a fault or cancellation, or
- an unresolved `outcome.Outcome` when the peer task is still
executing and has not yet completed.
''' '''
return ( return (
@ -1583,7 +1634,7 @@ class Context:
- NEVER `return` early before delivering the msg! - NEVER `return` early before delivering the msg!
bc if the error is a ctxc and there is a task waiting on bc if the error is a ctxc and there is a task waiting on
`.result()` we need the msg to be `.wait_for_result()` we need the msg to be
`send_chan.send_nowait()`-ed over the `._rx_chan` so `send_chan.send_nowait()`-ed over the `._rx_chan` so
that the error is relayed to that waiter task and thus that the error is relayed to that waiter task and thus
raised in user code! raised in user code!
@ -1828,7 +1879,7 @@ async def open_context_from_portal(
When the "callee" (side that is "called"/started by a call When the "callee" (side that is "called"/started by a call
to *this* method) returns, the caller side (this) unblocks to *this* method) returns, the caller side (this) unblocks
and any final value delivered from the other end can be and any final value delivered from the other end can be
retrieved using the `Contex.result()` api. retrieved using the `Contex.wait_for_result()` api.
The yielded ``Context`` instance further allows for opening The yielded ``Context`` instance further allows for opening
bidirectional streams, explicit cancellation and bidirectional streams, explicit cancellation and
@ -1893,7 +1944,7 @@ async def open_context_from_portal(
) )
assert ctx._remote_func_type == 'context' assert ctx._remote_func_type == 'context'
assert ctx._caller_info assert ctx._caller_info
_ctxvar_Context.set(ctx) prior_ctx_tok: Token = _ctxvar_Context.set(ctx)
# placeholder for any exception raised in the runtime # placeholder for any exception raised in the runtime
# or by user tasks which cause this context's closure. # or by user tasks which cause this context's closure.
@ -1965,14 +2016,14 @@ async def open_context_from_portal(
yield ctx, first yield ctx, first
# ??TODO??: do we still want to consider this or is # ??TODO??: do we still want to consider this or is
# the `else:` block handling via a `.result()` # the `else:` block handling via a `.wait_for_result()`
# call below enough?? # call below enough??
# #
# -[ ] pretty sure `.result()` internals do the # -[ ] pretty sure `.wait_for_result()` internals do the
# same as our ctxc handler below so it ended up # same as our ctxc handler below so it ended up
# being same (repeated?) behaviour, but ideally we # being same (repeated?) behaviour, but ideally we
# wouldn't have that duplication either by somehow # wouldn't have that duplication either by somehow
# factoring the `.result()` handler impl in a way # factoring the `.wait_for_result()` handler impl in a way
# that we can re-use it around the `yield` ^ here # that we can re-use it around the `yield` ^ here
# or vice versa? # or vice versa?
# #
@ -2110,7 +2161,7 @@ async def open_context_from_portal(
# AND a group-exc is only raised if there was > 1 # AND a group-exc is only raised if there was > 1
# tasks started *here* in the "caller" / opener # tasks started *here* in the "caller" / opener
# block. If any one of those tasks calls # block. If any one of those tasks calls
# `.result()` or `MsgStream.receive()` # `.wait_for_result()` or `MsgStream.receive()`
# `._maybe_raise_remote_err()` will be transitively # `._maybe_raise_remote_err()` will be transitively
# called and the remote error raised causing all # called and the remote error raised causing all
# tasks to be cancelled. # tasks to be cancelled.
@ -2180,7 +2231,7 @@ async def open_context_from_portal(
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
) )
# XXX NOTE XXX: the below call to # XXX NOTE XXX: the below call to
# `Context.result()` will ALWAYS raise # `Context.wait_for_result()` will ALWAYS raise
# a `ContextCancelled` (via an embedded call to # a `ContextCancelled` (via an embedded call to
# `Context._maybe_raise_remote_err()`) IFF # `Context._maybe_raise_remote_err()`) IFF
# a `Context._remote_error` was set by the runtime # a `Context._remote_error` was set by the runtime
@ -2190,10 +2241,10 @@ async def open_context_from_portal(
# ALWAYS SET any time "callee" side fails and causes "caller # ALWAYS SET any time "callee" side fails and causes "caller
# side" cancellation via a `ContextCancelled` here. # side" cancellation via a `ContextCancelled` here.
try: try:
result_or_err: Exception|Any = await ctx.result() result_or_err: Exception|Any = await ctx.wait_for_result()
except BaseException as berr: except BaseException as berr:
# on normal teardown, if we get some error # on normal teardown, if we get some error
# raised in `Context.result()` we still want to # raised in `Context.wait_for_result()` we still want to
# save that error on the ctx's state to # save that error on the ctx's state to
# determine things like `.cancelled_caught` for # determine things like `.cancelled_caught` for
# cases where there was remote cancellation but # cases where there was remote cancellation but
@ -2344,6 +2395,9 @@ async def open_context_from_portal(
None, None,
) )
# XXX revert to prior IPC-task-ctx scope
_ctxvar_Context.reset(prior_ctx_tok)
def mk_context( def mk_context(
chan: Channel, chan: Channel,

View File

@ -20,7 +20,8 @@ Sub-process entry points.
""" """
from __future__ import annotations from __future__ import annotations
from functools import partial from functools import partial
# import textwrap import os
import textwrap
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
@ -58,7 +59,7 @@ def _mp_main(
) -> None: ) -> None:
''' '''
The routine called *after fork* which invokes a fresh ``trio.run`` The routine called *after fork* which invokes a fresh `trio.run()`
''' '''
actor._forkserver_info = forkserver_info actor._forkserver_info = forkserver_info
@ -96,6 +97,107 @@ def _mp_main(
log.info(f"Subactor {actor.uid} terminated") log.info(f"Subactor {actor.uid} terminated")
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually
# as we work out our multi-domain state-flow-syntax!
def nest_from_op(
input_op: str,
#
# ?TODO? an idea for a syntax to the state of concurrent systems
# as a "3-domain" (execution, scope, storage) model and using
# a minimal ascii/utf-8 operator-set.
#
# try not to take any of this seriously yet XD
#
# > is a "play operator" indicating (CPU bound)
# exec/work/ops required at the "lowest level computing"
#
# execution primititves (tasks, threads, actors..) denote their
# lifetime with '(' and ')' since parentheses normally are used
# in many langs to denote function calls.
#
# starting = (
# >( opening/starting; beginning of the thread-of-exec (toe?)
# (> opened/started, (finished spawning toe)
# |_<Task: blah blah..> repr of toe, in py these look like <objs>
#
# >) closing/exiting/stopping,
# )> closed/exited/stopped,
# |_<Task: blah blah..>
# [OR <), )< ?? ]
#
# ending = )
# >c) cancelling to close/exit
# c)> cancelled (caused close), OR?
# |_<Actor: ..>
# OR maybe "<c)" which better indicates the cancel being
# "delivered/returned" / returned" to LHS?
#
# >x) erroring to eventuall exit
# x)> errored and terminated
# |_<Actor: ...>
#
# scopes: supers/nurseries, IPC-ctxs, sessions, perms, etc.
# >{ opening
# {> opened
# }> closed
# >} closing
#
# storage: like queues, shm-buffers, files, etc..
# >[ opening
# [> opened
# |_<FileObj: ..>
#
# >] closing
# ]> closed
# IPC ops: channels, transports, msging
# => req msg
# <= resp msg
# <=> 2-way streaming (of msgs)
# <- recv 1 msg
# -> send 1 msg
#
# TODO: still not sure on R/L-HS approach..?
# =>( send-req to exec start (task, actor, thread..)
# (<= recv-req to ^
#
# (<= recv-req ^
# <=( recv-resp opened remote exec primitive
# <=) recv-resp closed
#
# )<=c req to stop due to cancel
# c=>) req to stop due to cancel
#
# =>{ recv-req to open
# <={ send-status that it closed
tree_str: str,
# NOTE: so move back-from-the-left of the `input_op` by
# this amount.
back_from_op: int = 0,
) -> str:
'''
Depth-increment the input (presumably hierarchy/supervision)
input "tree string" below the provided `input_op` execution
operator, so injecting a `"\n|_{input_op}\n"`and indenting the
`tree_str` to nest content aligned with the ops last char.
'''
return (
f'{input_op}\n'
+
textwrap.indent(
tree_str,
prefix=(
len(input_op)
-
(back_from_op + 1)
) * ' ',
)
)
def _trio_main( def _trio_main(
actor: Actor, actor: Actor,
*, *,
@ -107,7 +209,6 @@ def _trio_main(
Entry point for a `trio_run_in_process` subactor. Entry point for a `trio_run_in_process` subactor.
''' '''
# __tracebackhide__: bool = True
_debug.hide_runtime_frames() _debug.hide_runtime_frames()
_state._current_actor = actor _state._current_actor = actor
@ -119,7 +220,6 @@ def _trio_main(
if actor.loglevel is not None: if actor.loglevel is not None:
get_console_log(actor.loglevel) get_console_log(actor.loglevel)
import os
actor_info: str = ( actor_info: str = (
f'|_{actor}\n' f'|_{actor}\n'
f' uid: {actor.uid}\n' f' uid: {actor.uid}\n'
@ -128,13 +228,23 @@ def _trio_main(
f' loglevel: {actor.loglevel}\n' f' loglevel: {actor.loglevel}\n'
) )
log.info( log.info(
'Started new trio subactor:\n' 'Starting new `trio` subactor:\n'
+ +
'>\n' # like a "started/play"-icon from super perspective nest_from_op(
+ input_op='>(', # see syntax ideas above
actor_info, tree_str=actor_info,
back_from_op=1,
)
)
logmeth = log.info
exit_status: str = (
'Subactor exited\n'
+
nest_from_op(
input_op=')>', # like a "closed-to-play"-icon from super perspective
tree_str=actor_info,
)
) )
try: try:
if infect_asyncio: if infect_asyncio:
actor._infected_aio = True actor._infected_aio = True
@ -143,16 +253,28 @@ def _trio_main(
trio.run(trio_main) trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
log.cancel( logmeth = log.cancel
'Actor received KBI\n' exit_status: str = (
'Actor received KBI (aka an OS-cancel)\n'
+ +
actor_info nest_from_op(
input_op='c)>', # closed due to cancel (see above)
tree_str=actor_info,
) )
)
except BaseException as err:
logmeth = log.error
exit_status: str = (
'Main actor task crashed during exit?\n'
+
nest_from_op(
input_op='x)>', # closed by error
tree_str=actor_info,
)
)
# NOTE since we raise a tb will already be shown on the
# console, thus we do NOT use `.exception()` above.
raise err
finally: finally:
log.info( logmeth(exit_status)
'Subactor terminated\n'
+
'x\n' # like a "crossed-out/killed" from super perspective
+
actor_info
)

View File

@ -121,7 +121,8 @@ class Portal:
) )
return self.chan return self.chan
# TODO: factor this out into an `ActorNursery` wrapper # TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def _submit_for_result( async def _submit_for_result(
self, self,
ns: str, ns: str,
@ -141,13 +142,22 @@ class Portal:
portal=self, portal=self,
) )
# TODO: we should deprecate this API right? since if we remove
# `.run_in_actor()` (and instead move it to a `.highlevel`
# wrapper api (around a single `.open_context()` call) we don't
# really have any notion of a "main" remote task any more?
#
# @api_frame # @api_frame
async def result(self) -> Any: async def wait_for_result(
self,
hide_tb: bool = True,
) -> Any:
''' '''
Return the result(s) from the remote actor's "main" task. Return the final result delivered by a `Return`-msg from the
remote peer actor's "main" task's `return` statement.
''' '''
__tracebackhide__ = True __tracebackhide__: bool = hide_tb
# Check for non-rpc errors slapped on the # Check for non-rpc errors slapped on the
# channel for which we always raise # channel for which we always raise
exc = self.channel._exc exc = self.channel._exc
@ -182,6 +192,23 @@ class Portal:
return self._final_result_pld return self._final_result_pld
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def result(
self,
*args,
**kwargs,
) -> Any|Exception:
typname: str = type(self).__name__
log.warning(
f'`{typname}.result()` is DEPRECATED!\n'
f'Use `{typname}.wait_for_result()` instead!\n'
)
return await self.wait_for_result(
*args,
**kwargs,
)
async def _cancel_streams(self): async def _cancel_streams(self):
# terminate all locally running async generator # terminate all locally running async generator
# IPC calls # IPC calls
@ -240,6 +267,7 @@ class Portal:
f'{reminfo}' f'{reminfo}'
) )
# XXX the one spot we set it?
self.channel._cancel_called: bool = True self.channel._cancel_called: bool = True
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
@ -279,6 +307,8 @@ class Portal:
) )
return False return False
# TODO: do we still need this for low level `Actor`-runtime
# method calls or can we also remove it?
async def run_from_ns( async def run_from_ns(
self, self,
namespace_path: str, namespace_path: str,
@ -316,6 +346,8 @@ class Portal:
expect_msg=Return, expect_msg=Return,
) )
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
async def run( async def run(
self, self,
func: str, func: str,
@ -370,6 +402,8 @@ class Portal:
expect_msg=Return, expect_msg=Return,
) )
# TODO: factor this out into a `.highlevel` API-wrapper that uses
# a single `.open_context()` call underneath.
@acm @acm
async def open_stream_from( async def open_stream_from(
self, self,

View File

@ -21,6 +21,7 @@ Root actor runtime ignition(s).
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import importlib import importlib
import inspect
import logging import logging
import os import os
import signal import signal
@ -115,10 +116,16 @@ async def open_root_actor(
if ( if (
debug_mode debug_mode
and maybe_enable_greenback and maybe_enable_greenback
and await _debug.maybe_init_greenback( and (
maybe_mod := await _debug.maybe_init_greenback(
raise_not_found=False, raise_not_found=False,
) )
)
): ):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = ( os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug._sync_pause_from_builtin' 'tractor.devx._debug._sync_pause_from_builtin'
) )
@ -264,7 +271,9 @@ async def open_root_actor(
except OSError: except OSError:
# TODO: make this a "discovery" log level? # TODO: make this a "discovery" log level?
logger.warning(f'No actor registry found @ {addr}') logger.info(
f'No actor registry found @ {addr}\n'
)
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
for addr in registry_addrs: for addr in registry_addrs:
@ -278,7 +287,6 @@ async def open_root_actor(
# Create a new local root-actor instance which IS NOT THE # Create a new local root-actor instance which IS NOT THE
# REGISTRAR # REGISTRAR
if ponged_addrs: if ponged_addrs:
if ensure_registry: if ensure_registry:
raise RuntimeError( raise RuntimeError(
f'Failed to open `{name}`@{ponged_addrs}: ' f'Failed to open `{name}`@{ponged_addrs}: '
@ -365,23 +373,25 @@ async def open_root_actor(
) )
try: try:
yield actor yield actor
except ( except (
Exception, Exception,
BaseExceptionGroup, BaseExceptionGroup,
) as err: ) as err:
# XXX NOTE XXX see equiv note inside
import inspect # `._runtime.Actor._stream_handler()` where in the
# non-root or root-that-opened-this-mahually case we
# wait for the local actor-nursery to exit before
# exiting the transport channel handler.
entered: bool = await _debug._maybe_enter_pm( entered: bool = await _debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
) )
if ( if (
not entered not entered
and not is_multi_cancelled(err) and
not is_multi_cancelled(err)
): ):
logger.exception('Root actor crashed:\n') logger.exception('Root actor crashed\n')
# ALWAYS re-raise any error bubbled up from the # ALWAYS re-raise any error bubbled up from the
# runtime! # runtime!

View File

@ -1046,6 +1046,10 @@ class Actor:
# TODO: another `Struct` for rtvs.. # TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars rvs: dict[str, Any] = spawnspec._runtime_vars
if rvs['_debug_mode']: if rvs['_debug_mode']:
from .devx import (
enable_stack_on_sig,
maybe_init_greenback,
)
try: try:
# TODO: maybe return some status msgs upward # TODO: maybe return some status msgs upward
# to that we can emit them in `con_status` # to that we can emit them in `con_status`
@ -1053,13 +1057,27 @@ class Actor:
log.devx( log.devx(
'Enabling `stackscope` traces on SIGUSR1' 'Enabling `stackscope` traces on SIGUSR1'
) )
from .devx import enable_stack_on_sig
enable_stack_on_sig() enable_stack_on_sig()
except ImportError: except ImportError:
log.warning( log.warning(
'`stackscope` not installed for use in debug mode!' '`stackscope` not installed for use in debug mode!'
) )
if rvs.get('use_greenback', False):
maybe_mod: ModuleType|None = await maybe_init_greenback()
if maybe_mod:
log.devx(
'Activated `greenback` '
'for `tractor.pause_from_sync()` support!'
)
else:
rvs['use_greenback'] = False
log.warning(
'`greenback` not installed for use in debug mode!\n'
'`tractor.pause_from_sync()` not available!'
)
rvs['_is_root'] = False rvs['_is_root'] = False
_state._runtime_vars.update(rvs) _state._runtime_vars.update(rvs)
@ -1717,8 +1735,8 @@ async def async_main(
# Register with the arbiter if we're told its addr # Register with the arbiter if we're told its addr
log.runtime( log.runtime(
f'Registering `{actor.name}` ->\n' f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
f'{pformat(accept_addrs)}' # ^-TODO-^ we should instead show the maddr here^^
) )
# TODO: ideally we don't fan out to all registrars # TODO: ideally we don't fan out to all registrars
@ -1776,57 +1794,90 @@ async def async_main(
# Blocks here as expected until the root nursery is # Blocks here as expected until the root nursery is
# killed (i.e. this actor is cancelled or signalled by the parent) # killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err: except Exception as internal_err:
log.runtime("Closing all actor lifetime contexts")
actor.lifetime_stack.close()
if not is_registered: if not is_registered:
err_report: str = (
'\n'
"Actor runtime (internally) failed BEFORE contacting the registry?\n"
f'registrars -> {actor.reg_addrs} ?!?!\n\n'
'^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n'
'\t>> CALMLY CANCEL YOUR CHILDREN AND CALL YOUR PARENTS <<\n\n'
'\tIf this is a sub-actor hopefully its parent will keep running '
'and cancel/reap this sub-process..\n'
'(well, presuming this error was propagated upward)\n\n'
'\t---------------------------------------------\n'
'\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT @ ' # oneline
'https://github.com/goodboy/tractor/issues\n'
'\t---------------------------------------------\n'
)
# TODO: I guess we could try to connect back # TODO: I guess we could try to connect back
# to the parent through a channel and engage a debugger # to the parent through a channel and engage a debugger
# once we have that all working with std streams locking? # once we have that all working with std streams locking?
log.exception( log.exception(err_report)
f"Actor errored and failed to register with arbiter "
f"@ {actor.reg_addrs[0]}?")
log.error(
"\n\n\t^^^ THIS IS PROBABLY AN INTERNAL `tractor` BUG! ^^^\n\n"
"\t>> CALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN <<\n\n"
"\tIf this is a sub-actor hopefully its parent will keep running "
"correctly presuming this error was safely ignored..\n\n"
"\tPLEASE REPORT THIS TRACEBACK IN A BUG REPORT: "
"https://github.com/goodboy/tractor/issues\n"
)
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_remote( await try_ship_error_to_remote(
actor._parent_chan, actor._parent_chan,
err, internal_err,
) )
# always! # always!
match err: match internal_err:
case ContextCancelled(): case ContextCancelled():
log.cancel( log.cancel(
f'Actor: {actor.uid} was task-context-cancelled with,\n' f'Actor: {actor.uid} was task-context-cancelled with,\n'
f'str(err)' f'str(internal_err)'
) )
case _: case _:
log.exception("Actor errored:") log.exception(
raise 'Main actor-runtime task errored\n'
f'<x)\n'
f' |_{actor}\n'
)
raise internal_err
finally: finally:
log.runtime( teardown_report: str = (
'Runtime nursery complete' 'Main actor-runtime task completed\n'
'-> Closing all actor lifetime contexts..'
) )
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
actor.lifetime_stack.close()
# TODO: we can't actually do this bc the debugger # ?TODO? should this be in `._entry`/`._root` mods instead?
# uses the _service_n to spawn the lock task, BUT, #
# in theory if we had the root nursery surround this finally # teardown any actor-lifetime-bound contexts
# block it might be actually possible to debug THIS ls: ExitStack = actor.lifetime_stack
# machinery in the same way as user task code? # only report if there are any registered
cbs: list[Callable] = [
repr(tup[1].__wrapped__)
for tup in ls._exit_callbacks
]
if cbs:
cbs_str: str = '\n'.join(cbs)
teardown_report += (
'-> Closing actor-lifetime-bound callbacks\n\n'
f'}}>\n'
f' |_{ls}\n'
f' |_{cbs_str}\n'
)
# XXX NOTE XXX this will cause an error which
# prevents any `infected_aio` actor from continuing
# and any callbacks in the `ls` here WILL NOT be
# called!!
# await _debug.pause(shield=True)
ls.close()
# XXX TODO but hard XXX
# we can't actually do this bc the debugger uses the
# _service_n to spawn the lock task, BUT, in theory if we had
# the root nursery surround this finally block it might be
# actually possible to debug THIS machinery in the same way
# as user task code?
#
# if actor.name == 'brokerd.ib': # if actor.name == 'brokerd.ib':
# with CancelScope(shield=True): # with CancelScope(shield=True):
# await _debug.breakpoint() # await _debug.breakpoint()
@ -1856,9 +1907,9 @@ async def async_main(
failed = True failed = True
if failed: if failed:
log.warning( teardown_report += (
f'Failed to unregister {actor.name} from ' f'-> Failed to unregister {actor.name} from '
f'registar @ {addr}' f'registar @ {addr}\n'
) )
# Ensure all peers (actors connected to us as clients) are finished # Ensure all peers (actors connected to us as clients) are finished
@ -1866,13 +1917,17 @@ async def async_main(
if any( if any(
chan.connected() for chan in chain(*actor._peers.values()) chan.connected() for chan in chain(*actor._peers.values())
): ):
log.runtime( teardown_report += (
f"Waiting for remaining peers {actor._peers} to clear") f'-> Waiting for remaining peers {actor._peers} to clear..\n'
)
log.runtime(teardown_report)
with CancelScope(shield=True): with CancelScope(shield=True):
await actor._no_more_peers.wait() await actor._no_more_peers.wait()
log.runtime("All peer channels are complete")
log.runtime("Runtime completed") teardown_report += ('-> All peer channels are complete\n')
teardown_report += ('Actor runtime exited')
log.info(teardown_report)
# TODO: rename to `Registry` and move to `._discovery`! # TODO: rename to `Registry` and move to `._discovery`!

View File

@ -149,7 +149,7 @@ async def exhaust_portal(
# XXX: streams should never be reaped here since they should # XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api # always be established and shutdown using a context manager api
final: Any = await portal.result() final: Any = await portal.wait_for_result()
except ( except (
Exception, Exception,
@ -223,8 +223,8 @@ async def cancel_on_completion(
async def hard_kill( async def hard_kill(
proc: trio.Process, proc: trio.Process,
terminate_after: int = 1.6,
terminate_after: int = 1.6,
# NOTE: for mucking with `.pause()`-ing inside the runtime # NOTE: for mucking with `.pause()`-ing inside the runtime
# whilst also hacking on it XD # whilst also hacking on it XD
# terminate_after: int = 99999, # terminate_after: int = 99999,

View File

@ -44,7 +44,7 @@ _runtime_vars: dict[str, Any] = {
'_root_mailbox': (None, None), '_root_mailbox': (None, None),
'_registry_addrs': [], '_registry_addrs': [],
# for `breakpoint()` support # for `tractor.pause_from_sync()` & `breakpoint()` support
'use_greenback': False, 'use_greenback': False,
} }

View File

@ -80,6 +80,7 @@ class ActorNursery:
''' '''
def __init__( def __init__(
self, self,
# TODO: maybe def these as fields of a struct looking type?
actor: Actor, actor: Actor,
ria_nursery: trio.Nursery, ria_nursery: trio.Nursery,
da_nursery: trio.Nursery, da_nursery: trio.Nursery,
@ -88,8 +89,10 @@ class ActorNursery:
) -> None: ) -> None:
# self.supervisor = supervisor # TODO # self.supervisor = supervisor # TODO
self._actor: Actor = actor self._actor: Actor = actor
self._ria_nursery = ria_nursery
# TODO: rename to `._tn` for our conventional "task-nursery"
self._da_nursery = da_nursery self._da_nursery = da_nursery
self._children: dict[ self._children: dict[
tuple[str, str], tuple[str, str],
tuple[ tuple[
@ -98,15 +101,13 @@ class ActorNursery:
Portal | None, Portal | None,
] ]
] = {} ] = {}
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False self.cancelled: bool = False
self._join_procs = trio.Event() self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False self._at_least_one_child_in_debug: bool = False
self.errors = errors self.errors = errors
self.exited = trio.Event()
self._scope_error: BaseException|None = None self._scope_error: BaseException|None = None
self.exited = trio.Event()
# NOTE: when no explicit call is made to # NOTE: when no explicit call is made to
# `.open_root_actor()` by application code, # `.open_root_actor()` by application code,
@ -116,6 +117,13 @@ class ActorNursery:
# and syncing purposes to any actor opened nurseries. # and syncing purposes to any actor opened nurseries.
self._implicit_runtime_started: bool = False self._implicit_runtime_started: bool = False
# TODO: remove the `.run_in_actor()` API and thus this 2ndary
# nursery when that API get's moved outside this primitive!
self._ria_nursery = ria_nursery
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
@ -126,10 +134,14 @@ class ActorNursery:
rpc_module_paths: list[str]|None = None, rpc_module_paths: list[str]|None = None,
enable_modules: list[str]|None = None, enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor loglevel: str|None = None, # set log level per subactor
nursery: trio.Nursery|None = None,
debug_mode: bool|None = None, debug_mode: bool|None = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,
# TODO: ideally we can rm this once we no longer have
# a `._ria_nursery` since the dependent APIs have been
# removed!
nursery: trio.Nursery|None = None,
) -> Portal: ) -> Portal:
''' '''
Start a (daemon) actor: an process that has no designated Start a (daemon) actor: an process that has no designated
@ -200,6 +212,7 @@ class ActorNursery:
# |_ dynamic @context decoration on child side # |_ dynamic @context decoration on child side
# |_ implicit `Portal.open_context() as (ctx, first):` # |_ implicit `Portal.open_context() as (ctx, first):`
# and `return first` on parent side. # and `return first` on parent side.
# |_ mention how it's similar to `trio-parallel` API?
# -[ ] use @api_frame on the wrapper # -[ ] use @api_frame on the wrapper
async def run_in_actor( async def run_in_actor(
self, self,
@ -269,11 +282,14 @@ class ActorNursery:
) -> None: ) -> None:
''' '''
Cancel this nursery by instructing each subactor to cancel Cancel this actor-nursery by instructing each subactor's
itself and wait for all subactors to terminate. runtime to cancel and wait for all underlying sub-processes
to terminate.
If ``hard_killl`` is set to ``True`` then kill the processes If `hard_kill` is set then kill the processes directly using
directly without any far end graceful ``trio`` cancellation. the spawning-backend's API/OS-machinery without any attempt
at (graceful) `trio`-style cancellation using our
`Actor.cancel()`.
''' '''
__runtimeframe__: int = 1 # noqa __runtimeframe__: int = 1 # noqa
@ -629,8 +645,12 @@ async def open_nursery(
f'|_{an}\n' f'|_{an}\n'
) )
# shutdown runtime if it was started
if implicit_runtime: if implicit_runtime:
# shutdown runtime if it was started and report noisly
# that we're did so.
msg += '=> Shutting down actor runtime <=\n' msg += '=> Shutting down actor runtime <=\n'
log.info(msg) log.info(msg)
else:
# keep noise low during std operation.
log.runtime(msg)

View File

@ -29,6 +29,7 @@ from ._debug import (
shield_sigint_handler as shield_sigint_handler, shield_sigint_handler as shield_sigint_handler,
open_crash_handler as open_crash_handler, open_crash_handler as open_crash_handler,
maybe_open_crash_handler as maybe_open_crash_handler, maybe_open_crash_handler as maybe_open_crash_handler,
maybe_init_greenback as maybe_init_greenback,
post_mortem as post_mortem, post_mortem as post_mortem,
mk_pdb as mk_pdb, mk_pdb as mk_pdb,
) )

View File

@ -69,6 +69,7 @@ from trio import (
import tractor import tractor
from tractor.log import get_logger from tractor.log import get_logger
from tractor._context import Context from tractor._context import Context
from tractor import _state
from tractor._state import ( from tractor._state import (
current_actor, current_actor,
is_root_process, is_root_process,
@ -87,9 +88,6 @@ if TYPE_CHECKING:
from tractor._runtime import ( from tractor._runtime import (
Actor, Actor,
) )
from tractor.msg import (
_codec,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -1599,12 +1597,16 @@ async def _pause(
try: try:
task: Task = current_task() task: Task = current_task()
except RuntimeError as rte: except RuntimeError as rte:
log.exception('Failed to get current task?') __tracebackhide__: bool = False
if actor.is_infected_aio(): log.exception(
raise RuntimeError( 'Failed to get current `trio`-task?'
'`tractor.pause[_from_sync]()` not yet supported ' )
'for infected `asyncio` mode!' # if actor.is_infected_aio():
) from rte # mk_pdb().set_trace()
# raise RuntimeError(
# '`tractor.pause[_from_sync]()` not yet supported '
# 'directly (infected) `asyncio` tasks!'
# ) from rte
raise raise
@ -2163,10 +2165,8 @@ def maybe_import_greenback(
return False return False
async def maybe_init_greenback( async def maybe_init_greenback(**kwargs) -> None|ModuleType:
**kwargs, try:
) -> None|ModuleType:
if mod := maybe_import_greenback(**kwargs): if mod := maybe_import_greenback(**kwargs):
await mod.ensure_portal() await mod.ensure_portal()
log.devx( log.devx(
@ -2174,11 +2174,13 @@ async def maybe_init_greenback(
'Sync debug support activated!\n' 'Sync debug support activated!\n'
) )
return mod return mod
except BaseException:
log.exception('Failed to init `greenback`..')
raise
return None return None
async def _pause_from_bg_root_thread( async def _pause_from_bg_root_thread(
behalf_of_thread: Thread, behalf_of_thread: Thread,
repl: PdbREPL, repl: PdbREPL,
@ -2324,6 +2326,12 @@ def pause_from_sync(
# TODO: once supported, remove this AND the one # TODO: once supported, remove this AND the one
# inside `._pause()`! # inside `._pause()`!
# outstanding impl fixes:
# -[ ] need to make `.shield_sigint()` below work here!
# -[ ] how to handle `asyncio`'s new SIGINT-handler
# injection?
# -[ ] should `breakpoint()` work and what does it normally
# do in `asyncio` ctxs?
if actor.is_infected_aio(): if actor.is_infected_aio():
raise RuntimeError( raise RuntimeError(
'`tractor.pause[_from_sync]()` not yet supported ' '`tractor.pause[_from_sync]()` not yet supported '
@ -2399,9 +2407,16 @@ def pause_from_sync(
else: # we are presumably the `trio.run()` + main thread else: # we are presumably the `trio.run()` + main thread
# raises on not-found by default # raises on not-found by default
greenback: ModuleType = maybe_import_greenback() greenback: ModuleType = maybe_import_greenback()
# TODO: how to ensure this is either dynamically (if
# needed) called here (in some bg tn??) or that the
# subactor always already called it?
# greenback: ModuleType = await maybe_init_greenback()
message += f'-> imported {greenback}\n' message += f'-> imported {greenback}\n'
repl_owner: Task = current_task() repl_owner: Task = current_task()
message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n' message += '-> calling `greenback.await_(_pause(debug_func=None))` from sync caller..\n'
try:
out = greenback.await_( out = greenback.await_(
_pause( _pause(
debug_func=None, debug_func=None,
@ -2411,6 +2426,18 @@ def pause_from_sync(
**_pause_kwargs, **_pause_kwargs,
) )
) )
except RuntimeError as rte:
if not _state._runtime_vars.get(
'use_greenback',
False,
):
raise RuntimeError(
'`greenback` was never initialized in this actor!?\n\n'
f'{_state._runtime_vars}\n'
) from rte
raise
if out: if out:
bg_task, repl = out bg_task, repl = out
assert repl is repl assert repl is repl
@ -2801,10 +2828,10 @@ def open_crash_handler(
`trio.run()`. `trio.run()`.
''' '''
err: BaseException
try: try:
yield yield
except tuple(catch) as err: except tuple(catch) as err:
if type(err) not in ignore: if type(err) not in ignore:
pdbp.xpm() pdbp.xpm()

View File

@ -558,6 +558,8 @@ def run_as_asyncio_guest(
# normally `Actor._async_main()` as is passed by some boostrap # normally `Actor._async_main()` as is passed by some boostrap
# entrypoint like `._entry._trio_main()`. # entrypoint like `._entry._trio_main()`.
_sigint_loop_pump_delay: float = 0,
) -> None: ) -> None:
# ^-TODO-^ technically whatever `trio_main` returns.. we should # ^-TODO-^ technically whatever `trio_main` returns.. we should
# try to use func-typevar-params at leaast by 3.13! # try to use func-typevar-params at leaast by 3.13!
@ -598,7 +600,7 @@ def run_as_asyncio_guest(
''' '''
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
trio_done_fut = asyncio.Future() trio_done_fute = asyncio.Future()
startup_msg: str = ( startup_msg: str = (
'Starting `asyncio` guest-loop-run\n' 'Starting `asyncio` guest-loop-run\n'
'-> got running loop\n' '-> got running loop\n'
@ -633,13 +635,13 @@ def run_as_asyncio_guest(
f'{error}\n\n' f'{error}\n\n'
f'{tb_str}\n' f'{tb_str}\n'
) )
trio_done_fut.set_exception(error) trio_done_fute.set_exception(error)
# raise inline # raise inline
main_outcome.unwrap() main_outcome.unwrap()
else: else:
trio_done_fut.set_result(main_outcome) trio_done_fute.set_result(main_outcome)
startup_msg += ( startup_msg += (
f'-> created {trio_done_callback!r}\n' f'-> created {trio_done_callback!r}\n'
@ -660,7 +662,7 @@ def run_as_asyncio_guest(
) )
fute_err: BaseException|None = None fute_err: BaseException|None = None
try: try:
out: Outcome = await asyncio.shield(trio_done_fut) out: Outcome = await asyncio.shield(trio_done_fute)
# NOTE will raise (via `Error.unwrap()`) from any # NOTE will raise (via `Error.unwrap()`) from any
# exception packed into the guest-run's `main_outcome`. # exception packed into the guest-run's `main_outcome`.
@ -697,83 +699,75 @@ def run_as_asyncio_guest(
f' |_{actor}.cancel_soon()\n' f' |_{actor}.cancel_soon()\n'
) )
# TODO: reduce this comment bloc since abandon issues are # XXX WARNING XXX the next LOCs are super important, since
# now solved? # without them, we can get guest-run abandonment cases
# where `asyncio` will not schedule or wait on the `trio`
# guest-run task before final shutdown! This is
# particularly true if the `trio` side has tasks doing
# shielded work when a SIGINT condition occurs.
# #
# XXX NOTE XXX the next LOC is super important!!! # We now have the
# => without it, we can get a guest-run abandonment case
# where asyncio will not trigger `trio` in a final event
# loop cycle!
#
# our test,
# `test_infected_asyncio.test_sigint_closes_lifetime_stack()` # `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
# demonstrates how if when we raise a SIGINT-signal in an infected # suite to ensure we do not suffer this issues
# child we get a variable race condition outcome where # (hopefully) ever again.
# either of the following can indeterminately happen,
# #
# - "silent-abandon": `asyncio` abandons the `trio` # The original abandonment issue surfaced as 2 different
# guest-run task silently and no `trio`-guest-run or # race-condition dependent types scenarios all to do with
# `tractor`-actor-runtime teardown happens whatsoever.. # `asyncio` handling SIGINT from the system:
# this is the WORST (race) case outcome.
# #
# - OR, "loud-abandon": the guest run get's abaondoned "loudly" with # - "silent-abandon" (WORST CASE):
# `trio` reporting a console traceback and further tbs of all # `asyncio` abandons the `trio` guest-run task silently
# the failed shutdown routines also show on console.. # and no `trio`-guest-run or `tractor`-actor-runtime
# teardown happens whatsoever..
# #
# our test can thus fail and (has been parametrized for) # - "loud-abandon" (BEST-ish CASE):
# the 2 cases: # the guest run get's abaondoned "loudly" with `trio`
# reporting a console traceback and further tbs of all
# the (failed) GC-triggered shutdown routines which
# thankfully does get dumped to console..
# #
# - when the parent raises a KBI just after # The abandonment is most easily reproduced if the `trio`
# signalling the child, # side has tasks doing shielded work where those tasks
# |_silent-abandon => the `Actor.lifetime_stack` will # ignore the normal `Cancelled` condition and continue to
# never be closed thus leaking a resource! # run, but obviously `asyncio` isn't aware of this and at
# -> FAIL! # some point bails on the guest-run unless we take manual
# |_loud-abandon => despite the abandonment at least the # intervention..
# stack will be closed out..
# -> PASS
# #
# - when the parent instead simply waits on `ctx.wait_for_result()` # To repeat, *WITHOUT THIS* stuff below the guest-run can
# (i.e. DOES not raise a KBI itself), # get race-conditionally abandoned!!
# |_silent-abandon => test will just hang and thus the ctx #
# and actor will never be closed/cancelled/shutdown # XXX SOLUTION XXX
# resulting in leaking a (file) resource since the # ------ - ------
# `trio`/`tractor` runtime never relays a ctxc back to # XXX FIRST PART:
# the parent; the test's timeout will trigger.. # ------ - ------
# -> FAIL! # the obvious fix to the "silent-abandon" case is to
# |_loud-abandon => this case seems to never happen?? # explicitly cancel the actor runtime such that no
# runtime tasks are even left unaware that the guest-run
# should be terminated due to OS cancellation.
# #
# XXX FIRST PART XXX, SO, this is a fix to the
# "silent-abandon" case, NOT the `trio`-guest-run
# abandonment issue in general, for which the NEXT LOC
# is apparently a working fix!
actor.cancel_soon() actor.cancel_soon()
# XXX NOTE XXX pump the `asyncio` event-loop to allow # ------ - ------
# XXX SECOND PART:
# ------ - ------
# Pump the `asyncio` event-loop to allow
# `trio`-side to `trio`-guest-run to complete and # `trio`-side to `trio`-guest-run to complete and
# teardown !! # teardown !!
# #
# *WITHOUT THIS* the guest-run can get race-conditionally abandoned!! # oh `asyncio`, how i don't miss you at all XD
# XD while not trio_done_fute.done():
#
await asyncio.sleep(.1) # `delay` can't be 0 either XD
while not trio_done_fut.done():
log.runtime( log.runtime(
'Waiting on main guest-run `asyncio` task to complete..\n' 'Waiting on main guest-run `asyncio` task to complete..\n'
f'|_trio_done_fut: {trio_done_fut}\n' f'|_trio_done_fut: {trio_done_fute}\n'
) )
await asyncio.sleep(.1) await asyncio.sleep(_sigint_loop_pump_delay)
# XXX: don't actually need the shield.. seems to # XXX is there any alt API/approach like the internal
# make no difference (??) and we know it spawns an # call below but that doesn't block indefinitely..?
# internal task..
# await asyncio.shield(asyncio.sleep(.1))
# XXX alt approach but can block indefinitely..
# so don't use?
# loop._run_once() # loop._run_once()
try: try:
return trio_done_fut.result() return trio_done_fute.result()
except asyncio.exceptions.InvalidStateError as state_err: except asyncio.exceptions.InvalidStateError as state_err:
# XXX be super dupere noisy about abandonment issues! # XXX be super dupere noisy about abandonment issues!