From 60aa16adf655ac954c88e222b054f5d843fc317b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 14 Apr 2024 19:31:50 -0400 Subject: [PATCH] Pass a `use_greenback: bool` runtime var to subs Such that the top level `maybe_enable_greenback` from `open_root_actor()` can toggle the entire actor tree's usage. Read the rtv in `._rpc` tasks and only enable if set. Also, rigor up the `._rpc.process_messages()` loop to handle `Error()` and `case _:` separately such that we now raise an explicit rte for unknown / invalid msgs. Use "parent" / "child" for side descriptions in loop comments and put a fat comment before the `StartAck` in `_invoke()`. --- tractor/_root.py | 12 +++++++-- tractor/_rpc.py | 68 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 58 insertions(+), 22 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index 9ce470f..1964a06 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -116,6 +116,8 @@ async def open_root_actor( os.environ['PYTHONBREAKPOINT'] = ( 'tractor.devx._debug.pause_from_sync' ) + _state._runtime_vars['use_greenback'] = True + else: # TODO: disable `breakpoint()` by default (without # `greenback`) since it will break any multi-actor @@ -386,14 +388,20 @@ async def open_root_actor( _state._last_actor_terminated = actor # restore built-in `breakpoint()` hook state - if debug_mode: + if ( + debug_mode + and + maybe_enable_greenback + ): if builtin_bp_handler is not None: sys.breakpointhook = builtin_bp_handler + if orig_bp_path is not None: os.environ['PYTHONBREAKPOINT'] = orig_bp_path + else: # clear env back to having no entry - os.environ.pop('PYTHONBREAKPOINT') + os.environ.pop('PYTHONBREAKPOINT', None) logger.runtime("Root actor terminated") diff --git a/tractor/_rpc.py b/tractor/_rpc.py index a95dbfe..86c3e27 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -41,7 +41,6 @@ from trio import ( TaskStatus, ) -from .msg import NamespacePath from ._ipc import Channel from ._context import ( Context, @@ -61,6 +60,11 @@ from .devx import ( ) from . import _state from .log import get_logger +from .msg import ( + current_codec, + MsgCodec, + NamespacePath, +) from tractor.msg.types import ( CancelAck, Error, @@ -98,6 +102,7 @@ async def _invoke_non_context( Context | BaseException ] = trio.TASK_STATUS_IGNORED, ): + __tracebackhide__: bool = True # TODO: can we unify this with the `context=True` impl below? if inspect.isasyncgen(coro): @@ -398,7 +403,11 @@ async def _invoke( __tracebackhide__: bool = hide_tb treat_as_gen: bool = False - if _state.debug_mode(): + if ( + _state.debug_mode() + and + _state._runtime_vars['use_greenback'] + ): # XXX for .pause_from_sync()` usage we need to make sure # `greenback` is boostrapped in the subactor! await _debug.maybe_init_greenback() @@ -512,10 +521,22 @@ async def _invoke( # wrapper that calls `Context.started()` and then does # the `await coro()`? - # a "context" endpoint type is the most general and - # "least sugary" type of RPC ep with support for + # ------ - ------ + # a "context" endpoint is the most general and + # "least sugary" type of RPC with support for # bi-dir streaming B) - # StartAck + # + # the concurrency relation is simlar to a task nursery + # wherein a "parent" task (the one that enters + # `trio.open_nursery()` in some actor "opens" (via + # `Portal.open_context()`) an IPC ctx to another peer + # (which is maybe a sub-) actor who then schedules (aka + # `trio.Nursery.start()`s) a new "child" task to execute + # the `@context` annotated func; that is this func we're + # running directly below! + # ------ - ------ + # + # StartAck: respond immediately with endpoint info await chan.send( StartAck( cid=cid, @@ -524,11 +545,11 @@ async def _invoke( ) # TODO: should we also use an `.open_context()` equiv - # for this callee side by factoring the impl from + # for this child side by factoring the impl from # `Portal.open_context()` into a common helper? # # NOTE: there are many different ctx state details - # in a callee side instance according to current impl: + # in a child side instance according to current impl: # - `.cancelled_caught` can never be `True`. # -> the below scope is never exposed to the # `@context` marked RPC function. @@ -554,7 +575,7 @@ async def _invoke( # NOTE: this happens IFF `ctx._scope.cancel()` is # called by any of, - # - *this* callee task manually calling `ctx.cancel()`. + # - *this* child task manually calling `ctx.cancel()`. # - the runtime calling `ctx._deliver_msg()` which # itself calls `ctx._maybe_cancel_and_set_remote_error()` # which cancels the scope presuming the input error @@ -631,10 +652,11 @@ async def _invoke( # f' |_{ctx}' ) - # task-contex was either cancelled by request using - # ``Portal.cancel_actor()`` or ``Context.cancel()`` - # on the far end, or it was cancelled by the local - # (callee) task, so relay this cancel signal to the + # task-contex was either cancelled by request + # using ``Portal.cancel_actor()`` or + # ``Context.cancel()`` on the far end, or it + # was cancelled by the local child (or callee) + # task, so relay this cancel signal to the # other side. ctxc = ContextCancelled( message=msg, @@ -655,7 +677,7 @@ async def _invoke( ) as scope_error: - # always set this (callee) side's exception as the + # always set this (child) side's exception as the # local error on the context ctx._local_error: BaseException = scope_error @@ -1024,9 +1046,8 @@ async def process_messages( trio.Event(), ) - # XXX remote (runtime scoped) error or uknown - # msg (type). - case Error() | _: + # runtime-scoped remote error (since no `.cid`) + case Error(): # NOTE: this is the non-rpc error case, # that is, an error **not** raised inside # a call to ``_invoke()`` (i.e. no cid was @@ -1034,10 +1055,6 @@ async def process_messages( # this error to all local channel # consumers (normally portals) by marking # the channel as errored - log.exception( - f'Unhandled IPC msg:\n\n' - f'{msg}\n' - ) # assert chan.uid chan._exc: Exception = unpack_error( msg, @@ -1045,6 +1062,17 @@ async def process_messages( ) raise chan._exc + # unknown/invalid msg type? + case _: + codec: MsgCodec = current_codec() + message: str = ( + f'Unhandled IPC msg for codec?\n\n' + f'|_{codec}\n\n' + f'{msg}\n' + ) + log.exception(message) + raise RuntimeError(message) + log.runtime( 'Waiting on next IPC msg from\n' f'peer: {chan.uid}\n'