diff --git a/tractor/_runtime.py b/tractor/_runtime.py index fee14c4..5808065 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -15,7 +15,10 @@ # along with this program. If not, see . """ -Actor primitives and helpers +The fundamental core machinery implementing every "actor" including +the process-local (python-interpreter global) `Actor` state-type +primitive(s), RPC-in-task scheduling, and IPC connectivity and +low-level transport msg handling. """ from __future__ import annotations @@ -41,8 +44,14 @@ import warnings from async_generator import aclosing from exceptiongroup import BaseExceptionGroup -import trio # type: ignore -from trio_typing import TaskStatus +import trio +from trio import ( + CancelScope, +) +from trio_typing import ( + Nursery, + TaskStatus, +) from ._ipc import Channel from ._context import ( @@ -90,10 +99,9 @@ async def _invoke( connected IPC channel. This is the core "RPC" `trio.Task` scheduling machinery used to start every - remotely invoked function, normally in `Actor._service_n: trio.Nursery`. + remotely invoked function, normally in `Actor._service_n: Nursery`. ''' - __tracebackhide__: bool = True treat_as_gen: bool = False failed_resp: bool = False @@ -110,9 +118,9 @@ async def _invoke( # possibly a traceback (not sure what typing is for this..) tb = None - cancel_scope = trio.CancelScope() + cancel_scope = CancelScope() # activated cancel scope ref - cs: trio.CancelScope | None = None + cs: CancelScope | None = None ctx = actor.get_context( chan, @@ -124,6 +132,7 @@ async def _invoke( ) context: bool = False + # TODO: deprecate this style.. if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions sig = inspect.signature(func) @@ -165,6 +174,7 @@ async def _invoke( except TypeError: raise + # TODO: can we unify this with the `context=True` impl below? if inspect.isasyncgen(coro): await chan.send({'functype': 'asyncgen', 'cid': cid}) # XXX: massive gotcha! If the containing scope @@ -195,6 +205,7 @@ async def _invoke( await chan.send({'stop': True, 'cid': cid}) # one way @stream func that gets treated like an async gen + # TODO: can we unify this with the `context=True` impl below? elif treat_as_gen: await chan.send({'functype': 'asyncgen', 'cid': cid}) # XXX: the async-func may spawn further tasks which push @@ -211,8 +222,20 @@ async def _invoke( # far end async gen to tear down await chan.send({'stop': True, 'cid': cid}) + # our most general case: a remote SC-transitive, + # IPC-linked, cross-actor-task "context" + # ------ - ------ # TODO: every other "func type" should be implemented from - # a special case of a context eventually! + # a special case of this impl eventually! + # -[ ] streaming funcs should instead of being async-for + # handled directly here wrapped in + # a async-with-open_stream() closure that does the + # normal thing you'd expect a far end streaming context + # to (if written by the app-dev). + # -[ ] one off async funcs can literally just be called + # here and awaited directly, possibly just with a small + # wrapper that calls `Context.started()` and then does + # the `await coro()`? elif context: # context func with support for bi-dir streaming await chan.send({'functype': 'context', 'cid': cid}) @@ -273,11 +296,12 @@ async def _invoke( ctx._maybe_raise_remote_err(re) fname: str = func.__name__ - cs: trio.CancelScope = ctx._scope + cs: CancelScope = ctx._scope if cs.cancel_called: + our_uid: tuple = actor.uid canceller: tuple = ctx.canceller msg: str = ( - f'`{fname}()`@{actor.uid} cancelled by ' + f'`{fname}()`@{our_uid} cancelled by ' ) # NOTE / TODO: if we end up having @@ -286,6 +310,8 @@ async def _invoke( # need to change this logic branch since it # will always enter.. if ctx._cancel_called: + # TODO: test for this!!!!! + canceller: tuple = our_uid msg += 'itself ' # if the channel which spawned the ctx is the @@ -318,40 +344,76 @@ async def _invoke( canceller=canceller, ) - # regular async function + # regular async function/method + # XXX: possibly just a scheduled `Actor._cancel_task()` + # from a remote request to cancel some `Context`. + # ------ - ------ + # TODO: ideally we unify this with the above `context=True` + # block such that for any remote invocation ftype, we + # always invoke the far end RPC task scheduling the same + # way: using the linked IPC context machinery. else: try: await chan.send({ 'functype': 'asyncfunc', 'cid': cid }) - except trio.BrokenResourceError: + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + BrokenPipeError, + ) as ipc_err: failed_resp = True if is_rpc: raise else: + # TODO: should this be an `.exception()` call? log.warning( - f'Failed to respond to non-rpc request: {func}' + f'Failed to respond to non-rpc request: {func}\n' + f'{ipc_err}' ) with cancel_scope as cs: - ctx._scope = cs + ctx._scope: CancelScope = cs task_status.started(ctx) result = await coro fname: str = func.__name__ log.runtime(f'{fname}() result: {result}') - if not failed_resp: - # only send result if we know IPC isn't down - await chan.send( - {'return': result, - 'cid': cid} - ) + + # NOTE: only send result if we know IPC isn't down + if ( + not failed_resp + and chan.connected() + ): + try: + await chan.send( + {'return': result, + 'cid': cid} + ) + except ( + BrokenPipeError, + trio.BrokenResourceError, + ): + log.warning( + 'Failed to return result:\n' + f'{func}@{actor.uid}\n' + f'remote chan: {chan.uid}' + ) except ( Exception, BaseExceptionGroup, ) as err: + # always hide this frame from debug REPL if the crash + # originated from an rpc task and we DID NOT fail + # due to an IPC transport error! + if ( + is_rpc + and chan.connected() + ): + __tracebackhide__: bool = True + if not is_multi_cancelled(err): # TODO: maybe we'll want different "levels" of debugging @@ -385,24 +447,31 @@ async def _invoke( log.exception("Actor crashed:") # always ship errors back to caller - err_msg = pack_error(err, tb=tb) + err_msg: dict[str, dict] = pack_error( + err, + tb=tb, + ) err_msg['cid'] = cid - try: - await chan.send(err_msg) + if is_rpc: + try: + await chan.send(err_msg) - # TODO: tests for this scenario: - # - RPC caller closes connection before getting a response - # should **not** crash this actor.. - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - BrokenPipeError, - ): - # if we can't propagate the error that's a big boo boo - log.exception( - f"Failed to ship error to caller @ {chan.uid} !?" - ) + # TODO: tests for this scenario: + # - RPC caller closes connection before getting a response + # should **not** crash this actor.. + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + BrokenPipeError, + ) as ipc_err: + + # if we can't propagate the error that's a big boo boo + log.exception( + f"Failed to ship error to caller @ {chan.uid} !?\n" + f'{ipc_err}' + + ) # error is probably from above coro running code *not from the # underlyingn rpc invocation* since a scope was never allocated @@ -428,7 +497,11 @@ async def _invoke( log.warning( f"Task {func} likely errored or cancelled before start") else: - log.cancel(f'{func.__name__}({kwargs}) failed?') + log.cancel( + 'Failed to de-alloc internal task!?\n' + f'cid: {cid}\n' + f'{func.__name__}({kwargs})' + ) finally: if not actor._rpc_tasks: @@ -445,7 +518,7 @@ async def try_ship_error_to_parent( err: Exception | BaseExceptionGroup, ) -> None: - with trio.CancelScope(shield=True): + with CancelScope(shield=True): try: # internal error so ship to parent without cid await channel.send(pack_error(err)) @@ -497,13 +570,13 @@ class Actor: msg_buffer_size: int = 2**6 # nursery placeholders filled in by `async_main()` after fork - _root_n: trio.Nursery | None = None - _service_n: trio.Nursery | None = None - _server_n: trio.Nursery | None = None + _root_n: Nursery | None = None + _service_n: Nursery | None = None + _server_n: Nursery | None = None # Information about `__main__` from parent _parent_main_data: dict[str, str] - _parent_chan_cs: trio.CancelScope | None = None + _parent_chan_cs: CancelScope | None = None # syncs for setup/teardown sequences _server_down: trio.Event | None = None @@ -1096,12 +1169,12 @@ class Actor: async def _serve_forever( self, - handler_nursery: trio.Nursery, + handler_nursery: Nursery, *, # (host, port) to bind for channel server listen_sockaddrs: list[tuple[str, int]] | None = None, - task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: ''' Start the channel server, begin listening for new connections. @@ -1188,7 +1261,7 @@ class Actor: self._cancel_called = True # cancel all ongoing rpc tasks - with trio.CancelScope(shield=True): + with CancelScope(shield=True): # kill any debugger request task to avoid deadlock # with the root actor in this tree @@ -1248,7 +1321,7 @@ class Actor: # this ctx based lookup ensures the requested task to # be cancelled was indeed spawned by a request from this channel ctx, func, is_complete = self._rpc_tasks[(chan, cid)] - scope: trio.CancelScope = ctx._scope + scope: CancelScope = ctx._scope except KeyError: log.cancel(f"{cid} has already completed/terminated?") return True @@ -1613,7 +1686,7 @@ async def async_main( # block it might be actually possible to debug THIS # machinery in the same way as user task code? # if actor.name == 'brokerd.ib': - # with trio.CancelScope(shield=True): + # with CancelScope(shield=True): # await _debug.breakpoint() actor.lifetime_stack.close() @@ -1655,7 +1728,7 @@ async def async_main( ): log.runtime( f"Waiting for remaining peers {actor._peers} to clear") - with trio.CancelScope(shield=True): + with CancelScope(shield=True): await actor._no_more_peers.wait() log.runtime("All peer channels are complete") @@ -1666,7 +1739,7 @@ async def process_messages( actor: Actor, chan: Channel, shield: bool = False, - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, ) -> bool: ''' @@ -1684,7 +1757,7 @@ async def process_messages( log.runtime(f"Entering msg loop for {chan} from {chan.uid}") try: - with trio.CancelScope(shield=shield) as loop_cs: + with CancelScope(shield=shield) as loop_cs: # this internal scope allows for keeping this message # loop running despite the current task having been # cancelled (eg. `open_portal()` may call this method from @@ -1746,18 +1819,18 @@ async def process_messages( if ns == 'self': if funcname == 'cancel': - func = actor.cancel + func: Callable = actor.cancel kwargs['requesting_uid'] = chan.uid # don't start entire actor runtime cancellation # if this actor is currently in debug mode! - pdb_complete = _debug.Lock.local_pdb_complete + pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete if pdb_complete: await pdb_complete.wait() # we immediately start the runtime machinery # shutdown - with trio.CancelScope(shield=True): + with CancelScope(shield=True): # actor.cancel() was called so kill this # msg loop and break out into # ``async_main()`` @@ -1785,7 +1858,7 @@ async def process_messages( # we immediately start the runtime machinery # shutdown - # with trio.CancelScope(shield=True): + # with CancelScope(shield=True): kwargs['chan'] = chan target_cid = kwargs['cid'] kwargs['requesting_uid'] = chan.uid @@ -1810,7 +1883,7 @@ async def process_messages( else: # normally registry methods, eg. # ``.register_actor()`` etc. - func = getattr(actor, funcname) + func: Callable = getattr(actor, funcname) else: # complain to client about restricted modules @@ -1900,9 +1973,10 @@ async def process_messages( Exception, BaseExceptionGroup, ) as err: + if nursery_cancelled_before_task: - sn = actor._service_n - assert sn and sn.cancel_scope.cancel_called + sn: Nursery = actor._service_n + assert sn and sn.cancel_scope.cancel_called # sanity log.cancel( f'Service nursery cancelled before it handled {funcname}' )