From fbc21a1dec09eb2a270020758dbbf799e591ff87 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 May 2024 09:20:43 -0400 Subject: [PATCH] Add a "current IPC `Context`" `ContextVar` Expose it from `._state.current_ipc_ctx()` and set it inside `._rpc._invoke()` for child and inside `Portal.open_context()` for parent. Still need to write a few more tests (particularly demonstrating usage throughout multiple nested nurseries on each side) but this suffices as a proto for testing with some debugger request-from-subactor stuff. Other, - use new `.devx.pformat.add_div()` for ctxc messages. - add a block to always traceback dump on corrupted cs stacks. - better handle non-RAEs exception output-formatting in context termination summary log message. - use a summary for `start_status` for msg logging in RPC loop. --- tests/test_context_stream_semantics.py | 4 + tractor/_rpc.py | 129 +++++++++++++++---------- tractor/_state.py | 23 +++++ 3 files changed, 106 insertions(+), 50 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index cedddf7..8edea51 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -25,6 +25,7 @@ from tractor._exceptions import ( StreamOverrun, ContextCancelled, ) +from tractor._state import current_ipc_ctx from tractor._testing import ( tractor_test, @@ -144,6 +145,8 @@ async def simple_setup_teardown( global _state _state = True + assert current_ipc_ctx() is ctx + # signal to parent that we're up await ctx.started(data + 1) @@ -204,6 +207,7 @@ def test_simple_context( block_forever=callee_blocks_forever, ) as (ctx, sent), ): + assert current_ipc_ctx() is ctx assert sent == 11 if callee_blocks_forever: diff --git a/tractor/_rpc.py b/tractor/_rpc.py index ee3151d..eed4790 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -57,6 +57,7 @@ from ._exceptions import ( from .devx import ( maybe_wait_for_debugger, _debug, + add_div, ) from . import _state from .log import get_logger @@ -250,6 +251,9 @@ async def _errors_relayed_via_ipc( ) -> None: __tracebackhide__: bool = hide_tb + # TODO: a debug nursery when in debug mode! + # async with maybe_open_debugger_nursery() as debug_tn: + # => see matching comment in side `._debug._pause()` try: yield # run RPC invoke body @@ -273,6 +277,8 @@ async def _errors_relayed_via_ipc( # TODO: maybe we'll want different "levels" of debugging # eventualy such as ('app', 'supervisory', 'runtime') ? + # + # -[ ] this if check is duplicate with `._maybe_enter_pm()`.. if not is_multi_cancelled(err): entered_debug: bool = False if ( @@ -296,7 +302,6 @@ async def _errors_relayed_via_ipc( ) ) ): - # await _debug.pause() # XXX QUESTION XXX: is there any case where we'll # want to debug IPC disconnects as a default? # => I can't think of a reason that inspecting this @@ -304,7 +309,14 @@ async def _errors_relayed_via_ipc( # recovery logic - the only case is some kind of # strange bug in our transport layer itself? Going # to keep this open ended for now. - entered_debug = await _debug._maybe_enter_pm(err) + log.debug( + 'RPC task crashed, attempting to enter debugger\n' + f'|_{ctx}' + ) + entered_debug = await _debug._maybe_enter_pm( + err, + api_frame=inspect.currentframe(), + ) if not entered_debug: log.exception( 'RPC task crashed\n' @@ -434,6 +446,8 @@ async def _invoke( ) context: bool = False + assert not _state._ctxvar_Context.get() + # TODO: deprecate this style.. if getattr(func, '_tractor_stream_function', False): # handle decorated ``@tractor.stream`` async functions @@ -557,6 +571,7 @@ async def _invoke( async with trio.open_nursery() as tn: ctx._scope_nursery = tn ctx._scope = tn.cancel_scope + _state._ctxvar_Context.set(ctx) task_status.started(ctx) # TODO: should would be nice to have our @@ -592,7 +607,6 @@ async def _invoke( cs: CancelScope = ctx._scope if cs.cancel_called: - canceller: tuple = ctx.canceller explain: str = f'{ctx.side!r}-side task was cancelled by ' @@ -621,23 +635,9 @@ async def _invoke( else: explain += 'a remote peer' - # TODO: move this "div centering" into - # a helper for use elsewhere! - div_chars: str = '------ - ------' - div_offset: int = ( - round(len(explain)/2)+1 - + - round(len(div_chars)/2)+1 - ) - div_str: str = ( - '\n' - + - ' '*div_offset - + - f'{div_chars}\n' - ) explain += ( - div_str + + add_div(message=explain) + + f'<= canceller: {canceller}\n' f'=> cancellee: {our_uid}\n' # TODO: better repr for ctx tasks.. @@ -664,10 +664,10 @@ async def _invoke( boxed_type=trio.Cancelled, canceller=canceller, ) - # assign local error so that the `.outcome` - # resolves to an error for both reporting and - # state checks. - ctx._local_error = ctxc + # does this matter other then for + # consistentcy/testing? |_ no user code should be + # in this scope at this point.. + # ctx._local_error = ctxc raise ctxc # XXX: do we ever trigger this block any more? @@ -677,6 +677,13 @@ async def _invoke( BaseException, ) as scope_error: + if ( + isinstance(scope_error, RuntimeError) + and scope_error.args + and 'Cancel scope stack corrupted' in scope_error.args[0] + ): + log.exception('Cancel scope stack corrupted!?\n') + # _debug.mk_pdb().set_trace() # always set this (child) side's exception as the # local error on the context @@ -710,17 +717,32 @@ async def _invoke( res_type_str, res_str, ) = ( - ('error', f'{type(merr)}',) - if merr + ('error', f'{type(merr)}',) if merr else ( 'result', f'`{repr(ctx.outcome)}`', ) ) - log.runtime( + message: str = ( f'IPC context terminated with a final {res_type_str}\n\n' f'{ctx}' ) + if merr: + from tractor import RemoteActorError + if not isinstance(merr, RemoteActorError): + fmt_merr: str = ( + f'\n{merr!r}\n' + # f'{merr.args[0]!r}\n' + ) + else: + fmt_merr = f'\n{merr!r}' + log.error( + message + + + fmt_merr + ) + else: + log.runtime(message) async def try_ship_error_to_remote( @@ -955,12 +977,19 @@ async def process_messages( kwargs=kwargs, # type-spec this? see `msg.types` uid=actorid, ): - log.runtime( + start_status: str = ( 'Handling RPC `Start` request\n' - f'<= peer: {actorid}\n' - f' |_{ns}.{funcname}({kwargs})\n\n' + f'<= peer: {actorid}\n\n' + f' |_{chan}\n' + f' |_cid: {cid}\n\n' + # f' |_{ns}.{funcname}({kwargs})\n' + f'>> {actor.uid}\n' + f' |_{actor}\n' + f' -> nsp: `{ns}.{funcname}({kwargs})`\n' - f'{pretty_struct.pformat(msg)}\n' + # f' |_{ns}.{funcname}({kwargs})\n\n' + + # f'{pretty_struct.pformat(msg)}\n' ) # runtime-internal endpoint: `Actor.` @@ -989,6 +1018,10 @@ async def process_messages( await chan.send(err_msg) continue + start_status += ( + f' -> func: {func}\n' + ) + # schedule a task for the requested RPC function # in the actor's main "service nursery". # @@ -996,18 +1029,8 @@ async def process_messages( # supervision isolation? would avoid having to # manage RPC tasks individually in `._rpc_tasks` # table? - log.runtime( - f'Spawning task for RPC request\n' - f'<= caller: {chan.uid}\n' - f' |_{chan}\n\n' - # ^-TODO-^ maddr style repr? - # f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' - # f'cid="{cid[-16:]} .."\n\n' - - f'=> {actor}\n' - f' |_cid: {cid}\n' - f' |>> {func}()\n' - ) + start_status += ' -> scheduling new task..\n' + log.runtime(start_status) try: ctx: Context = await actor._service_n.start( partial( @@ -1035,8 +1058,9 @@ async def process_messages( # scoped exception from ``_invoke()`` itself. if isinstance(err := ctx, Exception): log.warning( - 'Task for RPC failed?' - f'|_ {func}()\n\n' + start_status + + + ' -> task for RPC failed?\n\n' f'{err}' ) continue @@ -1155,12 +1179,17 @@ async def process_messages( finally: # msg debugging for when he machinery is brokey - log.runtime( - 'Exiting IPC msg loop with final msg\n\n' - f'<= peer: {chan.uid}\n' - f' |_{chan}\n\n' - f'{pretty_struct.pformat(msg)}' - ) + if msg is None: + message: str = 'Exiting IPC msg loop without receiving a msg?' + else: + message: str = ( + 'Exiting IPC msg loop with final msg\n\n' + f'<= peer: {chan.uid}\n' + f' |_{chan}\n\n' + f'{pretty_struct.pformat(msg)}' + ) + + log.runtime(message) # transport **WAS NOT** disconnected return (False, msg) diff --git a/tractor/_state.py b/tractor/_state.py index 30346a6..a372983 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -19,13 +19,19 @@ Per process state """ from __future__ import annotations +from contextvars import ( + ContextVar, +) from typing import ( Any, TYPE_CHECKING, ) +from trio.lowlevel import current_task + if TYPE_CHECKING: from ._runtime import Actor + from ._context import Context _current_actor: Actor|None = None # type: ignore # noqa @@ -110,3 +116,20 @@ def debug_mode() -> bool: def is_root_process() -> bool: return _runtime_vars['_is_root'] + + +_ctxvar_Context: ContextVar[Context] = ContextVar( + 'ipc_context', + default=None, +) + + +def current_ipc_ctx() -> Context: + ctx: Context = _ctxvar_Context.get() + if not ctx: + from ._exceptions import InternalError + raise InternalError( + 'No IPC context has been allocated for this task yet?\n' + f'|_{current_task()}\n' + ) + return ctx