From 250275d98d52e8076d2797b8c3fabf5f8098d124 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 2 Jan 2024 09:08:39 -0500 Subject: [PATCH] Guarding for IPC failures in `._runtime._invoke()` Took me longer then i wanted to figure out the source of a failed-response to a remote-cancellation (in this case in `modden` where a client was cancelling a workspace layer.. but disconnects before receiving the ack msg) that was triggering an IPC error when sending the error msg for the cancellation of a `Actor._cancel_task()`, but since this (non-rpc) `._invoke()` task was trying to send to a now disconnected canceller it was resulting in a `BrokenPipeError` (or similar) error. Now, we except for such IPC errors and only raise them when, 1. the transport `Channel` is for sure up (bc ow what's the point of trying to send an error on the thing that caused it..) 2. it's definitely for handling an RPC task Similarly if the entire main invoke `try:` excepts, - we only hide the call-stack frame from the debugger (with `__tracebackhide__: bool`) if it's an RPC task that has a connected channel since we always want to see the frame when debugging internal task or IPC failures. - we don't bother trying to send errors to the context caller (actor) when it's a non-RPC request since failures on actor-runtime-internal tasks shouldn't really ever be reported remotely, only maybe raised locally. Also some other tidying, - this properly corrects for the self-cancel case where an RPC context is cancelled due to a local (runtime) task calling a method like `Actor.cancel_soon()`. We now set our own `.uid` as the `ContextCancelled.canceller` value so that other-end tasks know that the cancellation was due to a self-cancellation by the actor itself. We still need to properly test for this though! - add a more detailed module doc-str. - more explicit imports for `trio` core types throughout. --- tractor/_runtime.py | 186 +++++++++++++++++++++++++++++++------------- 1 file changed, 130 insertions(+), 56 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index fee14c4..5808065 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -15,7 +15,10 @@ # along with this program. If not, see . """ -Actor primitives and helpers +The fundamental core machinery implementing every "actor" including +the process-local (python-interpreter global) `Actor` state-type +primitive(s), RPC-in-task scheduling, and IPC connectivity and +low-level transport msg handling. """ from __future__ import annotations @@ -41,8 +44,14 @@ import warnings from async_generator import aclosing from exceptiongroup import BaseExceptionGroup -import trio # type: ignore -from trio_typing import TaskStatus +import trio +from trio import ( + CancelScope, +) +from trio_typing import ( + Nursery, + TaskStatus, +) from ._ipc import Channel from ._context import ( @@ -90,10 +99,9 @@ async def _invoke( connected IPC channel. This is the core "RPC" `trio.Task` scheduling machinery used to start every - remotely invoked function, normally in `Actor._service_n: trio.Nursery`. + remotely invoked function, normally in `Actor._service_n: Nursery`. ''' - __tracebackhide__: bool = True treat_as_gen: bool = False failed_resp: bool = False @@ -110,9 +118,9 @@ async def _invoke( # possibly a traceback (not sure what typing is for this..) tb = None - cancel_scope = trio.CancelScope() + cancel_scope = CancelScope() # activated cancel scope ref - cs: trio.CancelScope | None = None + cs: CancelScope | None = None ctx = actor.get_context( chan, @@ -124,6 +132,7 @@ async def _invoke( ) context: bool = False + # TODO: deprecate this style.. if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions sig = inspect.signature(func) @@ -165,6 +174,7 @@ async def _invoke( except TypeError: raise + # TODO: can we unify this with the `context=True` impl below? if inspect.isasyncgen(coro): await chan.send({'functype': 'asyncgen', 'cid': cid}) # XXX: massive gotcha! If the containing scope @@ -195,6 +205,7 @@ async def _invoke( await chan.send({'stop': True, 'cid': cid}) # one way @stream func that gets treated like an async gen + # TODO: can we unify this with the `context=True` impl below? elif treat_as_gen: await chan.send({'functype': 'asyncgen', 'cid': cid}) # XXX: the async-func may spawn further tasks which push @@ -211,8 +222,20 @@ async def _invoke( # far end async gen to tear down await chan.send({'stop': True, 'cid': cid}) + # our most general case: a remote SC-transitive, + # IPC-linked, cross-actor-task "context" + # ------ - ------ # TODO: every other "func type" should be implemented from - # a special case of a context eventually! + # a special case of this impl eventually! + # -[ ] streaming funcs should instead of being async-for + # handled directly here wrapped in + # a async-with-open_stream() closure that does the + # normal thing you'd expect a far end streaming context + # to (if written by the app-dev). + # -[ ] one off async funcs can literally just be called + # here and awaited directly, possibly just with a small + # wrapper that calls `Context.started()` and then does + # the `await coro()`? elif context: # context func with support for bi-dir streaming await chan.send({'functype': 'context', 'cid': cid}) @@ -273,11 +296,12 @@ async def _invoke( ctx._maybe_raise_remote_err(re) fname: str = func.__name__ - cs: trio.CancelScope = ctx._scope + cs: CancelScope = ctx._scope if cs.cancel_called: + our_uid: tuple = actor.uid canceller: tuple = ctx.canceller msg: str = ( - f'`{fname}()`@{actor.uid} cancelled by ' + f'`{fname}()`@{our_uid} cancelled by ' ) # NOTE / TODO: if we end up having @@ -286,6 +310,8 @@ async def _invoke( # need to change this logic branch since it # will always enter.. if ctx._cancel_called: + # TODO: test for this!!!!! + canceller: tuple = our_uid msg += 'itself ' # if the channel which spawned the ctx is the @@ -318,40 +344,76 @@ async def _invoke( canceller=canceller, ) - # regular async function + # regular async function/method + # XXX: possibly just a scheduled `Actor._cancel_task()` + # from a remote request to cancel some `Context`. + # ------ - ------ + # TODO: ideally we unify this with the above `context=True` + # block such that for any remote invocation ftype, we + # always invoke the far end RPC task scheduling the same + # way: using the linked IPC context machinery. else: try: await chan.send({ 'functype': 'asyncfunc', 'cid': cid }) - except trio.BrokenResourceError: + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + BrokenPipeError, + ) as ipc_err: failed_resp = True if is_rpc: raise else: + # TODO: should this be an `.exception()` call? log.warning( - f'Failed to respond to non-rpc request: {func}' + f'Failed to respond to non-rpc request: {func}\n' + f'{ipc_err}' ) with cancel_scope as cs: - ctx._scope = cs + ctx._scope: CancelScope = cs task_status.started(ctx) result = await coro fname: str = func.__name__ log.runtime(f'{fname}() result: {result}') - if not failed_resp: - # only send result if we know IPC isn't down - await chan.send( - {'return': result, - 'cid': cid} - ) + + # NOTE: only send result if we know IPC isn't down + if ( + not failed_resp + and chan.connected() + ): + try: + await chan.send( + {'return': result, + 'cid': cid} + ) + except ( + BrokenPipeError, + trio.BrokenResourceError, + ): + log.warning( + 'Failed to return result:\n' + f'{func}@{actor.uid}\n' + f'remote chan: {chan.uid}' + ) except ( Exception, BaseExceptionGroup, ) as err: + # always hide this frame from debug REPL if the crash + # originated from an rpc task and we DID NOT fail + # due to an IPC transport error! + if ( + is_rpc + and chan.connected() + ): + __tracebackhide__: bool = True + if not is_multi_cancelled(err): # TODO: maybe we'll want different "levels" of debugging @@ -385,24 +447,31 @@ async def _invoke( log.exception("Actor crashed:") # always ship errors back to caller - err_msg = pack_error(err, tb=tb) + err_msg: dict[str, dict] = pack_error( + err, + tb=tb, + ) err_msg['cid'] = cid - try: - await chan.send(err_msg) + if is_rpc: + try: + await chan.send(err_msg) - # TODO: tests for this scenario: - # - RPC caller closes connection before getting a response - # should **not** crash this actor.. - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - BrokenPipeError, - ): - # if we can't propagate the error that's a big boo boo - log.exception( - f"Failed to ship error to caller @ {chan.uid} !?" - ) + # TODO: tests for this scenario: + # - RPC caller closes connection before getting a response + # should **not** crash this actor.. + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + BrokenPipeError, + ) as ipc_err: + + # if we can't propagate the error that's a big boo boo + log.exception( + f"Failed to ship error to caller @ {chan.uid} !?\n" + f'{ipc_err}' + + ) # error is probably from above coro running code *not from the # underlyingn rpc invocation* since a scope was never allocated @@ -428,7 +497,11 @@ async def _invoke( log.warning( f"Task {func} likely errored or cancelled before start") else: - log.cancel(f'{func.__name__}({kwargs}) failed?') + log.cancel( + 'Failed to de-alloc internal task!?\n' + f'cid: {cid}\n' + f'{func.__name__}({kwargs})' + ) finally: if not actor._rpc_tasks: @@ -445,7 +518,7 @@ async def try_ship_error_to_parent( err: Exception | BaseExceptionGroup, ) -> None: - with trio.CancelScope(shield=True): + with CancelScope(shield=True): try: # internal error so ship to parent without cid await channel.send(pack_error(err)) @@ -497,13 +570,13 @@ class Actor: msg_buffer_size: int = 2**6 # nursery placeholders filled in by `async_main()` after fork - _root_n: trio.Nursery | None = None - _service_n: trio.Nursery | None = None - _server_n: trio.Nursery | None = None + _root_n: Nursery | None = None + _service_n: Nursery | None = None + _server_n: Nursery | None = None # Information about `__main__` from parent _parent_main_data: dict[str, str] - _parent_chan_cs: trio.CancelScope | None = None + _parent_chan_cs: CancelScope | None = None # syncs for setup/teardown sequences _server_down: trio.Event | None = None @@ -1096,12 +1169,12 @@ class Actor: async def _serve_forever( self, - handler_nursery: trio.Nursery, + handler_nursery: Nursery, *, # (host, port) to bind for channel server listen_sockaddrs: list[tuple[str, int]] | None = None, - task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: ''' Start the channel server, begin listening for new connections. @@ -1188,7 +1261,7 @@ class Actor: self._cancel_called = True # cancel all ongoing rpc tasks - with trio.CancelScope(shield=True): + with CancelScope(shield=True): # kill any debugger request task to avoid deadlock # with the root actor in this tree @@ -1248,7 +1321,7 @@ class Actor: # this ctx based lookup ensures the requested task to # be cancelled was indeed spawned by a request from this channel ctx, func, is_complete = self._rpc_tasks[(chan, cid)] - scope: trio.CancelScope = ctx._scope + scope: CancelScope = ctx._scope except KeyError: log.cancel(f"{cid} has already completed/terminated?") return True @@ -1613,7 +1686,7 @@ async def async_main( # block it might be actually possible to debug THIS # machinery in the same way as user task code? # if actor.name == 'brokerd.ib': - # with trio.CancelScope(shield=True): + # with CancelScope(shield=True): # await _debug.breakpoint() actor.lifetime_stack.close() @@ -1655,7 +1728,7 @@ async def async_main( ): log.runtime( f"Waiting for remaining peers {actor._peers} to clear") - with trio.CancelScope(shield=True): + with CancelScope(shield=True): await actor._no_more_peers.wait() log.runtime("All peer channels are complete") @@ -1666,7 +1739,7 @@ async def process_messages( actor: Actor, chan: Channel, shield: bool = False, - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, ) -> bool: ''' @@ -1684,7 +1757,7 @@ async def process_messages( log.runtime(f"Entering msg loop for {chan} from {chan.uid}") try: - with trio.CancelScope(shield=shield) as loop_cs: + with CancelScope(shield=shield) as loop_cs: # this internal scope allows for keeping this message # loop running despite the current task having been # cancelled (eg. `open_portal()` may call this method from @@ -1746,18 +1819,18 @@ async def process_messages( if ns == 'self': if funcname == 'cancel': - func = actor.cancel + func: Callable = actor.cancel kwargs['requesting_uid'] = chan.uid # don't start entire actor runtime cancellation # if this actor is currently in debug mode! - pdb_complete = _debug.Lock.local_pdb_complete + pdb_complete: trio.Event | None = _debug.Lock.local_pdb_complete if pdb_complete: await pdb_complete.wait() # we immediately start the runtime machinery # shutdown - with trio.CancelScope(shield=True): + with CancelScope(shield=True): # actor.cancel() was called so kill this # msg loop and break out into # ``async_main()`` @@ -1785,7 +1858,7 @@ async def process_messages( # we immediately start the runtime machinery # shutdown - # with trio.CancelScope(shield=True): + # with CancelScope(shield=True): kwargs['chan'] = chan target_cid = kwargs['cid'] kwargs['requesting_uid'] = chan.uid @@ -1810,7 +1883,7 @@ async def process_messages( else: # normally registry methods, eg. # ``.register_actor()`` etc. - func = getattr(actor, funcname) + func: Callable = getattr(actor, funcname) else: # complain to client about restricted modules @@ -1900,9 +1973,10 @@ async def process_messages( Exception, BaseExceptionGroup, ) as err: + if nursery_cancelled_before_task: - sn = actor._service_n - assert sn and sn.cancel_scope.cancel_called + sn: Nursery = actor._service_n + assert sn and sn.cancel_scope.cancel_called # sanity log.cancel( f'Service nursery cancelled before it handled {funcname}' )