From 7e93b81a83996472657fa2c01f87979d455a774f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Jun 2024 19:06:17 -0400 Subject: [PATCH] Don't use pretty struct stuff in `._invoke` It's too fragile to put in side core RPC machinery since `msgspec.Struct` defs can fail if a field type can't be looked up at creation time (like can easily happen if you conditionally import using `if TYPE_CHECKING:`) Also, - rename `cs` to `rpc_ctx_cs: CancelScope` since it's literally the wrapping RPC `Context._scope`. - report self cancellation via `explain: str` and add tail case for "unknown cause". - put a ?TODO? around what to do about KBIs if a context is opened from an `infected_aio`-actor task. - similar to our nursery and portal add TODO list for moving all `_invoke_non_context()` content out the RPC core and instead implement them as `.hilevel` endpoint helpers (maybe as decorators?)which under neath define `@context`-funcs. --- tractor/_rpc.py | 70 +++++++++++++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 166ee96..fc8687c 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -68,7 +68,7 @@ from .msg import ( MsgCodec, PayloadT, NamespacePath, - pretty_struct, + # pretty_struct, _ops as msgops, ) from tractor.msg.types import ( @@ -89,6 +89,16 @@ if TYPE_CHECKING: log = get_logger('tractor') +# ?TODO? move to a `tractor.lowlevel._rpc` with the below +# func-type-cases implemented "on top of" `@context` defs: +# -[ ] std async func helper decorated with `@rpc_func`? +# -[ ] `Portal.open_stream_from()` with async-gens? +# |_ possibly a duplex form of this with a +# `sent_from_peer = yield send_to_peer` form, which would require +# syncing the send/recv side with possibly `.receive_nowait()` +# on each `yield`? +# -[ ] some kinda `@rpc_acm` maybe that does a fixture style with +# user only defining a single-`yield` generator-func? async def _invoke_non_context( actor: Actor, cancel_scope: CancelScope, @@ -108,8 +118,9 @@ async def _invoke_non_context( ] = trio.TASK_STATUS_IGNORED, ): __tracebackhide__: bool = True + cs: CancelScope|None = None # ref when activated - # TODO: can we unify this with the `context=True` impl below? + # ?TODO? can we unify this with the `context=True` impl below? if inspect.isasyncgen(coro): await chan.send( StartAck( @@ -160,10 +171,6 @@ async def _invoke_non_context( functype='asyncgen', ) ) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above with cancel_scope as cs: ctx._scope = cs task_status.started(ctx) @@ -175,15 +182,13 @@ async def _invoke_non_context( await chan.send( Stop(cid=cid) ) + + # simplest function/method request-response pattern + # XXX: in the most minimally used case, just a scheduled internal runtime + # call to `Actor._cancel_task()` from the ctx-peer task since we + # don't (yet) have a dedicated IPC msg. + # ------ - ------ else: - # regular async function/method - # XXX: possibly just a scheduled `Actor._cancel_task()` - # from a remote request to cancel some `Context`. - # ------ - ------ - # TODO: ideally we unify this with the above `context=True` - # block such that for any remote invocation ftype, we - # always invoke the far end RPC task scheduling the same - # way: using the linked IPC context machinery. failed_resp: bool = False try: ack = StartAck( @@ -354,8 +359,15 @@ async def _errors_relayed_via_ipc( # channel. task_status.started(err) - # always reraise KBIs so they propagate at the sys-process level. - if isinstance(err, KeyboardInterrupt): + # always propagate KBIs at the sys-process level. + if ( + isinstance(err, KeyboardInterrupt) + + # ?TODO? except when running in asyncio mode? + # |_ wut if you want to open a `@context` FROM an + # infected_aio task? + # and not actor.is_infected_aio() + ): raise # RPC task bookeeping. @@ -458,7 +470,6 @@ async def _invoke( # tb: TracebackType = None cancel_scope = CancelScope() - cs: CancelScope|None = None # ref when activated ctx = actor.get_context( chan=chan, cid=cid, @@ -607,6 +618,8 @@ async def _invoke( # `@context` marked RPC function. # - `._portal` is never set. try: + tn: trio.Nursery + rpc_ctx_cs: CancelScope async with ( trio.open_nursery() as tn, msgops.maybe_limit_plds( @@ -616,7 +629,7 @@ async def _invoke( ), ): ctx._scope_nursery = tn - ctx._scope = tn.cancel_scope + rpc_ctx_cs = ctx._scope = tn.cancel_scope task_status.started(ctx) # TODO: better `trionics` tooling: @@ -642,7 +655,7 @@ async def _invoke( # itself calls `ctx._maybe_cancel_and_set_remote_error()` # which cancels the scope presuming the input error # is not a `.cancel_acked` pleaser. - if ctx._scope.cancelled_caught: + if rpc_ctx_cs.cancelled_caught: our_uid: tuple = actor.uid # first check for and raise any remote error @@ -652,9 +665,7 @@ async def _invoke( if re := ctx._remote_error: ctx._maybe_raise_remote_err(re) - cs: CancelScope = ctx._scope - - if cs.cancel_called: + if rpc_ctx_cs.cancel_called: canceller: tuple = ctx.canceller explain: str = f'{ctx.side!r}-side task was cancelled by ' @@ -680,9 +691,15 @@ async def _invoke( elif canceller == ctx.chan.uid: explain += f'its {ctx.peer_side!r}-side peer' - else: + elif canceller == our_uid: + explain += 'itself' + + elif canceller: explain += 'a remote peer' + else: + explain += 'an unknown cause?' + explain += ( add_div(message=explain) + @@ -911,7 +928,10 @@ async def process_messages( f'IPC msg from peer\n' f'<= {chan.uid}\n\n' - # TODO: avoid fmting depending on loglevel for perf? + # TODO: use of the pprinting of structs is + # FRAGILE and should prolly not be + # + # avoid fmting depending on loglevel for perf? # -[ ] specifically `pretty_struct.pformat()` sub-call..? # - how to only log-level-aware actually call this? # -[ ] use `.msg.pretty_struct` here now instead! @@ -1238,7 +1258,7 @@ async def process_messages( 'Exiting IPC msg loop with final msg\n\n' f'<= peer: {chan.uid}\n' f' |_{chan}\n\n' - f'{pretty_struct.pformat(msg)}' + # f'{pretty_struct.pformat(msg)}' ) log.runtime(message)