diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 576e988..de975a9 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -181,12 +181,11 @@ async def _invoke_non_context( # way: using the linked IPC context machinery. failed_resp: bool = False try: - await chan.send( - StartAck( - cid=cid, - functype='asyncfunc', - ) + ack = StartAck( + cid=cid, + functype='asyncfunc', ) + await chan.send(ack) except ( trio.ClosedResourceError, trio.BrokenResourceError, @@ -194,12 +193,11 @@ async def _invoke_non_context( ) as ipc_err: failed_resp = True if is_rpc: - raise + raise ipc_err else: - # TODO: should this be an `.exception()` call? - log.warning( - f'Failed to respond to non-rpc request: {func}\n' - f'{ipc_err}' + log.exception( + f'Failed to respond to runtime RPC request for\n\n' + f'{ack}\n' ) with cancel_scope as cs: @@ -220,20 +218,19 @@ async def _invoke_non_context( and chan.connected() ): try: - await chan.send( - return_msg( - cid=cid, - pld=result, - ) + ret_msg = return_msg( + cid=cid, + pld=result, ) + await chan.send(ret_msg) except ( BrokenPipeError, trio.BrokenResourceError, ): log.warning( - 'Failed to return result:\n' - f'{func}@{actor.uid}\n' - f'remote chan: {chan.uid}' + 'Failed to send RPC result?\n' + f'|_{func}@{actor.uid}() -> {ret_msg}\n\n' + f'x=> peer: {chan.uid}\n' ) @acm @@ -250,7 +247,7 @@ async def _errors_relayed_via_ipc( ] = trio.TASK_STATUS_IGNORED, ) -> None: - __tracebackhide__: bool = hide_tb # TODO: use hide_tb here? + __tracebackhide__: bool = hide_tb try: yield # run RPC invoke body @@ -262,23 +259,19 @@ async def _errors_relayed_via_ipc( KeyboardInterrupt, ) as err: - # always hide this frame from debug REPL if the crash - # originated from an rpc task and we DID NOT fail due to - # an IPC transport error! + # NOTE: always hide this frame from debug REPL call stack + # if the crash originated from an RPC task and we DID NOT + # fail due to an IPC transport error! if ( is_rpc - and chan.connected() + and + chan.connected() ): __tracebackhide__: bool = hide_tb + # TODO: maybe we'll want different "levels" of debugging + # eventualy such as ('app', 'supervisory', 'runtime') ? if not is_multi_cancelled(err): - - # TODO: maybe we'll want different "levels" of debugging - # eventualy such as ('app', 'supervisory', 'runtime') ? - - # if not isinstance(err, trio.ClosedResourceError) and ( - # if not is_multi_cancelled(err) and ( - entered_debug: bool = False if ( ( @@ -310,19 +303,18 @@ async def _errors_relayed_via_ipc( # strange bug in our transport layer itself? Going # to keep this open ended for now. entered_debug = await _debug._maybe_enter_pm(err) - if not entered_debug: log.exception( 'RPC task crashed\n' f'|_{ctx}' ) - # always (try to) ship RPC errors back to caller + # ALWAYS try to ship RPC errors back to parent/caller task if is_rpc: - # + # TODO: tests for this scenario: # - RPC caller closes connection before getting a response - # should **not** crash this actor.. + # should **not** crash this actor.. await try_ship_error_to_remote( chan, err, @@ -331,33 +323,41 @@ async def _errors_relayed_via_ipc( hide_tb=hide_tb, ) - # error is probably from above coro running code *not from - # the target rpc invocation since a scope was never - # allocated around the coroutine await. + # if the ctx cs is NOT allocated, the error is likely from + # above `coro` invocation machinery NOT from inside the + # `coro` itself, i.e. err is NOT a user application error. if ctx._scope is None: # we don't ever raise directly here to allow the # msg-loop-scheduler to continue running for this # channel. task_status.started(err) - # always reraise KBIs so they propagate at the sys-process - # level. + # always reraise KBIs so they propagate at the sys-process level. if isinstance(err, KeyboardInterrupt): raise - - # RPC task bookeeping + # RPC task bookeeping. + # since RPC tasks are scheduled inside a flat + # `Actor._service_n`, we add "handles" to each such that + # they can be individually ccancelled. finally: try: - ctx, func, is_complete = actor._rpc_tasks.pop( + ctx: Context + func: Callable + is_complete: trio.Event + ( + ctx, + func, + is_complete, + ) = actor._rpc_tasks.pop( (chan, ctx.cid) ) is_complete.set() except KeyError: + # If we're cancelled before the task returns then the + # cancel scope will not have been inserted yet if is_rpc: - # If we're cancelled before the task returns then the - # cancel scope will not have been inserted yet log.warning( 'RPC task likely errored or cancelled before start?' f'|_{ctx._task}\n' @@ -372,7 +372,7 @@ async def _errors_relayed_via_ipc( finally: if not actor._rpc_tasks: - log.runtime("All RPC tasks have completed") + log.runtime('All RPC tasks have completed') actor._ongoing_rpc_tasks.set() @@ -414,19 +414,16 @@ async def _invoke( # TODO: possibly a specially formatted traceback # (not sure what typing is for this..)? - # tb = None + # tb: TracebackType = None cancel_scope = CancelScope() - # activated cancel scope ref - cs: CancelScope|None = None - + cs: CancelScope|None = None # ref when activated ctx = actor.get_context( chan=chan, cid=cid, nsf=NamespacePath.from_ref(func), - # TODO: if we wanted to get cray and support it? - # side='callee', + # NOTE: no portal passed bc this is the "child"-side # We shouldn't ever need to pass this through right? # it's up to the soon-to-be called rpc task to @@ -459,8 +456,8 @@ async def _invoke( kwargs['stream'] = ctx + # handle decorated ``@tractor.context`` async function elif getattr(func, '_tractor_context_function', False): - # handle decorated ``@tractor.context`` async function kwargs['ctx'] = ctx context = True @@ -474,7 +471,8 @@ async def _invoke( task_status=task_status, ): if not ( - inspect.isasyncgenfunction(func) or + inspect.isasyncgenfunction(func) + or inspect.iscoroutinefunction(func) ): raise TypeError(f'{func} must be an async function!') @@ -486,8 +484,7 @@ async def _invoke( except TypeError: raise - # TODO: implement all these cases in terms of the - # `Context` one! + # TODO: impl all these cases in terms of the `Context` one! if not context: await _invoke_non_context( actor, @@ -503,7 +500,7 @@ async def _invoke( return_msg, task_status, ) - # below is only for `@context` funcs + # XXX below fallthrough is ONLY for `@context` eps return # our most general case: a remote SC-transitive, @@ -580,9 +577,6 @@ async def _invoke( # itself calls `ctx._maybe_cancel_and_set_remote_error()` # which cancels the scope presuming the input error # is not a `.cancel_acked` pleaser. - # - currently a never-should-happen-fallthrough case - # inside ._context._drain_to_final_msg()`.. - # # TODO: remove this ^ right? if ctx._scope.cancelled_caught: our_uid: tuple = actor.uid @@ -598,9 +592,7 @@ async def _invoke( if cs.cancel_called: canceller: tuple = ctx.canceller - msg: str = ( - 'actor was cancelled by ' - ) + msg: str = 'actor was cancelled by ' # NOTE / TODO: if we end up having # ``Actor._cancel_task()`` call @@ -623,6 +615,8 @@ async def _invoke( else: msg += 'a remote peer' + # TODO: move this "div centering" into + # a helper for use elsewhere! div_chars: str = '------ - ------' div_offset: int = ( round(len(msg)/2)+1 @@ -702,11 +696,9 @@ async def _invoke( ctx: Context = actor._contexts.pop(( chan.uid, cid, - # ctx.side, )) merr: Exception|None = ctx.maybe_error - ( res_type_str, res_str, @@ -720,7 +712,7 @@ async def _invoke( ) log.runtime( f'IPC context terminated with a final {res_type_str}\n\n' - f'{ctx}\n' + f'{ctx}' ) @@ -806,13 +798,19 @@ async def process_messages( and `Actor.cancel()` process-wide-runtime-shutdown requests (as utilized inside `Portal.cancel_actor()` ). - ''' assert actor._service_n # state sanity # TODO: once `trio` get's an "obvious way" for req/resp we # should use it? - # https://github.com/python-trio/trio/issues/467 + # -[ ] existing GH https://github.com/python-trio/trio/issues/467 + # -[ ] for other transports (like QUIC) we can possibly just + # entirely avoid the feeder mem-chans since each msg will be + # delivered with a ctx-id already? + # + # |_ for ex, from `aioquic` which exposed "stream ids": + # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175 + # - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659 log.runtime( 'Entering RPC msg loop:\n' f'peer: {chan.uid}\n' @@ -850,7 +848,7 @@ async def process_messages( | Return(cid=cid) | CancelAck(cid=cid) - # `.cid` means RPC-ctx-task specific + # `.cid` indicates RPC-ctx-task scoped | Error(cid=cid) # recv-side `MsgType` decode violation @@ -1046,16 +1044,16 @@ async def process_messages( trio.Event(), ) - # runtime-scoped remote error (since no `.cid`) + # runtime-scoped remote (internal) error + # (^- bc no `Error.cid` -^) + # + # NOTE: this is the non-rpc error case, that + # is, an error NOT raised inside a call to + # `_invoke()` (i.e. no cid was provided in the + # msg - see above). Raise error inline and + # mark the channel as "globally errored" for + # all downstream consuming primitives. case Error(): - # NOTE: this is the non-rpc error case, - # that is, an error **not** raised inside - # a call to ``_invoke()`` (i.e. no cid was - # provided in the msg - see above). Push - # this error to all local channel - # consumers (normally portals) by marking - # the channel as errored - # assert chan.uid chan._exc: Exception = unpack_error( msg, chan=chan, @@ -1111,7 +1109,7 @@ async def process_messages( f'|_{chan.raddr}\n' ) - # transport **was** disconnected + # transport **WAS** disconnected return True except ( @@ -1150,12 +1148,11 @@ async def process_messages( finally: # msg debugging for when he machinery is brokey log.runtime( - 'Exiting IPC msg loop with\n' - f'peer: {chan.uid}\n' + 'Exiting IPC msg loop with final msg\n\n' + f'<= peer: {chan.uid}\n' f'|_{chan}\n\n' - 'final msg:\n' - f'{pformat(msg)}\n' + f'{pformat(msg)}\n\n' ) - # transport **was not** disconnected + # transport **WAS NOT** disconnected return False