From 9a18b57d384c3dc16125c8b1154c0f8db4323924 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 2 Mar 2024 19:26:40 -0500 Subject: [PATCH] Mega-refactor on `._invoke()` targeting `@context`s Since eventually we want to implement all other RPC "func types" as contexts underneath this starts the rework to move all the other cases into a separate func not only to simplify the main `._invoke()` body but also as a reminder of the intention to do it XD Details of re-factor: - add a new `._invoke_non_context()` which just moves all the old blocks for non-context handling to a single def. - factor what was basically just the `finally:` block handler (doing all the task bookkeeping) into a new `@acm`: `_errors_relayed_via_ipc()` with that content packed into the post-`yield` (also with a `hide_tb: bool` flag added of course). * include a `debug_kbis: bool` for when needed. - since the `@context` block is the only type left in the main `_invoke()` body, de-dent it so it's more grok-able B) Obviously this patch also includes a few improvements regarding context-cancellation-semantics (for the `context` RPC case) on the callee side in order to match previous changes to the `Context` api: - always setting any ctxc as the `Context._local_error`. - using the new convenience `.maybe_raise()` topically (for now). - avoiding any previous reliance on `Context.cancelled_caught` for anything public of meaning. Further included is more logging content updates: - being pedantic in `.cancel()` msgs about whether termination is caused by error or ctxc. - optional `._invoke()` traceback hiding via a `hide_tb: bool`. - simpler log headers throughout instead leveraging new `.__repr__()` on primitives. - buncha `<= ` sent some message emissions. - simplified handshake statuses reporting. Other subsys api changes we need to match: - change to `Channel.transport`. - avoiding any `local_nursery: ActorNursery` waiting when the `._implicit_runtime_started` is set. And yes, lotsa more comments for #TODOs dawg.. since there's always somethin! --- tractor/_runtime.py | 937 +++++++++++++++++++++++++++----------------- 1 file changed, 576 insertions(+), 361 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index b3b87e2..4c1181d 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -22,6 +22,10 @@ low-level transport msg handling. """ from __future__ import annotations +from contextlib import ( + ExitStack, + asynccontextmanager as acm, +) from collections import defaultdict from functools import partial from itertools import chain @@ -34,12 +38,12 @@ import sys from typing import ( Any, Callable, + Coroutine, TYPE_CHECKING, ) import uuid from types import ModuleType import os -from contextlib import ExitStack import warnings from async_generator import aclosing @@ -99,13 +103,290 @@ async def maybe_import_gb(): await greenback.ensure_portal() except ModuleNotFoundError: - log.warning( + log.debug( '`greenback` is not installed.\n' - 'No sync debug support!' + 'No sync debug support!\n' ) _gb_mod = False +async def _invoke_non_context( + actor: Actor, + cancel_scope: CancelScope, + ctx: Context, + cid: str, + chan: Channel, + func: Callable, + coro: Coroutine, + kwargs: dict[str, Any], + + treat_as_gen: bool, + is_rpc: bool, + + task_status: TaskStatus[ + Context | BaseException + ] = trio.TASK_STATUS_IGNORED, +): + + # TODO: can we unify this with the `context=True` impl below? + if inspect.isasyncgen(coro): + await chan.send({'functype': 'asyncgen', 'cid': cid}) + # XXX: massive gotcha! If the containing scope + # is cancelled and we execute the below line, + # any ``ActorNursery.__aexit__()`` WON'T be + # triggered in the underlying async gen! So we + # have to properly handle the closing (aclosing) + # of the async gen in order to be sure the cancel + # is propagated! + with cancel_scope as cs: + ctx._scope = cs + task_status.started(ctx) + async with aclosing(coro) as agen: + async for item in agen: + # TODO: can we send values back in here? + # it's gonna require a `while True:` and + # some non-blocking way to retrieve new `asend()` + # values from the channel: + # to_send = await chan.recv_nowait() + # if to_send is not None: + # to_yield = await coro.asend(to_send) + await chan.send({'yield': item, 'cid': cid}) + + log.runtime(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + await chan.send({'stop': True, 'cid': cid}) + + # one way @stream func that gets treated like an async gen + # TODO: can we unify this with the `context=True` impl below? + elif treat_as_gen: + await chan.send({'functype': 'asyncgen', 'cid': cid}) + # XXX: the async-func may spawn further tasks which push + # back values like an async-generator would but must + # manualy construct the response dict-packet-responses as + # above + with cancel_scope as cs: + ctx._scope = cs + task_status.started(ctx) + await coro + + if not cs.cancelled_caught: + # task was not cancelled so we can instruct the + # far end async gen to tear down + await chan.send({'stop': True, 'cid': cid}) + else: + # regular async function/method + # XXX: possibly just a scheduled `Actor._cancel_task()` + # from a remote request to cancel some `Context`. + # ------ - ------ + # TODO: ideally we unify this with the above `context=True` + # block such that for any remote invocation ftype, we + # always invoke the far end RPC task scheduling the same + # way: using the linked IPC context machinery. + failed_resp: bool = False + try: + await chan.send({ + 'functype': 'asyncfunc', + 'cid': cid + }) + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + BrokenPipeError, + ) as ipc_err: + failed_resp = True + if is_rpc: + raise + else: + # TODO: should this be an `.exception()` call? + log.warning( + f'Failed to respond to non-rpc request: {func}\n' + f'{ipc_err}' + ) + + with cancel_scope as cs: + ctx._scope: CancelScope = cs + task_status.started(ctx) + result = await coro + fname: str = func.__name__ + log.runtime( + 'RPC complete:\n' + f'task: {ctx._task}\n' + f'|_cid={ctx.cid}\n' + f'|_{fname}() -> {pformat(result)}\n' + ) + + # NOTE: only send result if we know IPC isn't down + if ( + not failed_resp + and chan.connected() + ): + try: + await chan.send( + {'return': result, + 'cid': cid} + ) + except ( + BrokenPipeError, + trio.BrokenResourceError, + ): + log.warning( + 'Failed to return result:\n' + f'{func}@{actor.uid}\n' + f'remote chan: {chan.uid}' + ) + +@acm +async def _errors_relayed_via_ipc( + actor: Actor, + chan: Channel, + ctx: Context, + is_rpc: bool, + + hide_tb: bool = False, + debug_kbis: bool = False, + task_status: TaskStatus[ + Context | BaseException + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + __tracebackhide__: bool = hide_tb # TODO: use hide_tb here? + try: + yield # run RPC invoke body + + # box and ship RPC errors for wire-transit via + # the task's requesting parent IPC-channel. + except ( + Exception, + BaseExceptionGroup, + KeyboardInterrupt, + ) as err: + + # always hide this frame from debug REPL if the crash + # originated from an rpc task and we DID NOT fail due to + # an IPC transport error! + if ( + is_rpc + and chan.connected() + ): + __tracebackhide__: bool = hide_tb + + if not is_multi_cancelled(err): + + # TODO: maybe we'll want different "levels" of debugging + # eventualy such as ('app', 'supervisory', 'runtime') ? + + # if not isinstance(err, trio.ClosedResourceError) and ( + # if not is_multi_cancelled(err) and ( + + entered_debug: bool = False + if ( + ( + not isinstance(err, ContextCancelled) + or ( + isinstance(err, ContextCancelled) + and ctx._cancel_called + + # if the root blocks the debugger lock request from a child + # we will get a remote-cancelled condition. + and ctx._enter_debugger_on_cancel + ) + ) + and + ( + not isinstance(err, KeyboardInterrupt) + or ( + isinstance(err, KeyboardInterrupt) + and debug_kbis + ) + ) + ): + # await pause() + # XXX QUESTION XXX: is there any case where we'll + # want to debug IPC disconnects as a default? + # => I can't think of a reason that inspecting this + # type of failure will be useful for respawns or + # recovery logic - the only case is some kind of + # strange bug in our transport layer itself? Going + # to keep this open ended for now. + entered_debug = await _debug._maybe_enter_pm(err) + + if not entered_debug: + log.exception('Actor crashed:\n') + + # always ship errors back to caller + err_msg: dict[str, dict] = pack_error( + err, + # tb=tb, # TODO: special tb fmting? + cid=ctx.cid, + ) + + if is_rpc: + try: + await chan.send(err_msg) + + # TODO: tests for this scenario: + # - RPC caller closes connection before getting a response + # should **not** crash this actor.. + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + BrokenPipeError, + ) as ipc_err: + + # if we can't propagate the error that's a big boo boo + log.exception( + f"Failed to ship error to caller @ {chan.uid} !?\n" + f'{ipc_err}' + + ) + + # error is probably from above coro running code *not from + # the target rpc invocation since a scope was never + # allocated around the coroutine await. + if ctx._scope is None: + # we don't ever raise directly here to allow the + # msg-loop-scheduler to continue running for this + # channel. + task_status.started(err) + + # always reraise KBIs so they propagate at the sys-process + # level. + if isinstance(err, KeyboardInterrupt): + raise + + + # RPC task bookeeping + finally: + try: + ctx, func, is_complete = actor._rpc_tasks.pop( + (chan, ctx.cid) + ) + is_complete.set() + + except KeyError: + if is_rpc: + # If we're cancelled before the task returns then the + # cancel scope will not have been inserted yet + log.warning( + 'RPC task likely errored or cancelled before start?' + f'|_{ctx._task}\n' + f' >> {ctx.repr_rpc}\n' + ) + else: + log.cancel( + 'Failed to de-alloc internal runtime cancel task?\n' + f'|_{ctx._task}\n' + f' >> {ctx.repr_rpc}\n' + ) + + finally: + if not actor._rpc_tasks: + log.runtime("All RPC tasks have completed") + actor._ongoing_rpc_tasks.set() + + async def _invoke( actor: Actor, @@ -115,6 +396,8 @@ async def _invoke( kwargs: dict[str, Any], is_rpc: bool = True, + hide_tb: bool = True, + task_status: TaskStatus[ Context | BaseException ] = trio.TASK_STATUS_IGNORED, @@ -127,8 +410,8 @@ async def _invoke( remotely invoked function, normally in `Actor._service_n: Nursery`. ''' + __tracebackhide__: bool = hide_tb treat_as_gen: bool = False - failed_resp: bool = False if _state.debug_mode(): await maybe_import_gb() @@ -139,7 +422,7 @@ async def _invoke( cancel_scope = CancelScope() # activated cancel scope ref - cs: CancelScope | None = None + cs: CancelScope|None = None ctx = actor.get_context( chan=chan, @@ -160,6 +443,7 @@ async def _invoke( # compat with old api kwargs['ctx'] = ctx + treat_as_gen = True if 'ctx' in params: warnings.warn( @@ -174,7 +458,6 @@ async def _invoke( assert 'stream' in params kwargs['stream'] = ctx - treat_as_gen = True elif getattr(func, '_tractor_context_function', False): # handle decorated ``@tractor.context`` async function @@ -182,65 +465,45 @@ async def _invoke( context = True # errors raised inside this block are propgated back to caller - try: + async with _errors_relayed_via_ipc( + actor, + chan, + ctx, + is_rpc, + hide_tb=hide_tb, + task_status=task_status, + ): if not ( inspect.isasyncgenfunction(func) or inspect.iscoroutinefunction(func) ): raise TypeError(f'{func} must be an async function!') + # init coroutine with `kwargs` to immediately catch any + # type-sig errors. try: coro = func(**kwargs) except TypeError: raise - # TODO: can we unify this with the `context=True` impl below? - if inspect.isasyncgen(coro): - await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: massive gotcha! If the containing scope - # is cancelled and we execute the below line, - # any ``ActorNursery.__aexit__()`` WON'T be - # triggered in the underlying async gen! So we - # have to properly handle the closing (aclosing) - # of the async gen in order to be sure the cancel - # is propagated! - with cancel_scope as cs: - ctx._scope = cs - task_status.started(ctx) - async with aclosing(coro) as agen: - async for item in agen: - # TODO: can we send values back in here? - # it's gonna require a `while True:` and - # some non-blocking way to retrieve new `asend()` - # values from the channel: - # to_send = await chan.recv_nowait() - # if to_send is not None: - # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) - - log.runtime(f"Finished iterating {coro}") - # TODO: we should really support a proper - # `StopAsyncIteration` system here for returning a final - # value if desired - await chan.send({'stop': True, 'cid': cid}) - - # one way @stream func that gets treated like an async gen - # TODO: can we unify this with the `context=True` impl below? - elif treat_as_gen: - await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above - with cancel_scope as cs: - ctx._scope = cs - task_status.started(ctx) - await coro - - if not cs.cancelled_caught: - # task was not cancelled so we can instruct the - # far end async gen to tear down - await chan.send({'stop': True, 'cid': cid}) + # TODO: implement all these cases in terms of the + # `Context` one! + if not context: + await _invoke_non_context( + actor, + cancel_scope, + ctx, + cid, + chan, + func, + coro, + kwargs, + treat_as_gen, + is_rpc, + task_status, + ) + # below is only for `@context` funcs + return # our most general case: a remote SC-transitive, # IPC-linked, cross-actor-task "context" @@ -256,77 +519,53 @@ async def _invoke( # here and awaited directly, possibly just with a small # wrapper that calls `Context.started()` and then does # the `await coro()`? - elif context: - # a "context" endpoint type is the most general and - # "least sugary" type of RPC ep with support for - # bi-dir streaming B) - await chan.send({ - 'functype': 'context', - 'cid': cid - }) + # a "context" endpoint type is the most general and + # "least sugary" type of RPC ep with support for + # bi-dir streaming B) + await chan.send({ + 'functype': 'context', + 'cid': cid + }) - try: - async with trio.open_nursery() as nurse: - ctx._scope_nursery = nurse - ctx._scope = nurse.cancel_scope - task_status.started(ctx) + # TODO: should we also use an `.open_context()` equiv + # for this callee side by factoring the impl from + # `Portal.open_context()` into a common helper? + # + # NOTE: there are many different ctx state details + # in a callee side instance according to current impl: + # - `.cancelled_caught` can never be `True`. + # -> the below scope is never exposed to the + # `@context` marked RPC function. + # - `._portal` is never set. + try: + async with trio.open_nursery() as tn: + ctx._scope_nursery = tn + ctx._scope = tn.cancel_scope + task_status.started(ctx) - # TODO: should would be nice to have our - # `TaskMngr` nursery here! - res: Any = await coro - ctx._result = res + # TODO: should would be nice to have our + # `TaskMngr` nursery here! + res: Any = await coro + ctx._result = res - # deliver final result to caller side. - await chan.send({ - 'return': res, - 'cid': cid - }) + # deliver final result to caller side. + await chan.send({ + 'return': res, + 'cid': cid + }) - # XXX: do we ever trigger this block any more? - except ( - BaseExceptionGroup, - trio.Cancelled, - ) as scope_error: - - # always set this (callee) side's exception as the - # local error on the context - ctx._local_error: BaseException = scope_error - - # if a remote error was set then likely the - # exception group was raised due to that, so - # and we instead raise that error immediately! - if re := ctx._remote_error: - ctx._maybe_raise_remote_err(re) - - # maybe TODO: pack in - # ``trio.Cancelled.__traceback__`` here so they can - # be unwrapped and displayed on the caller side? - raise - - finally: - # XXX: only pop the context tracking if - # a ``@tractor.context`` entrypoint was called - assert chan.uid - - # don't pop the local context until we know the - # associated child isn't in debug any more - await maybe_wait_for_debugger() - ctx: Context = actor._contexts.pop( - (chan.uid, cid) - ) - - res_str: str = ( - 'error: {ctx._local_error}' - if ctx._local_error - else f'result: {ctx._result}' - ) - log.cancel( - f'IPC context terminated with final {res_str}\n\n' - f'|_{pformat(ctx)}\n' - ) - - if ctx.cancelled_caught: + # NOTE: this happens IFF `ctx._scope.cancel()` is + # called by any of, + # - *this* callee task manually calling `ctx.cancel()`. + # - the runtime calling `ctx._deliver_msg()` which + # itself calls `ctx._maybe_cancel_and_set_remote_error()` + # which cancels the scope presuming the input error + # is not a `.cancel_acked` pleaser. + # - currently a never-should-happen-fallthrough case + # inside ._context._drain_to_final_msg()`.. + # # TODO: remove this ^ right? + if ctx._scope.cancelled_caught: # first check for and raise any remote error # before raising any context cancelled case @@ -335,7 +574,6 @@ async def _invoke( if re := ctx._remote_error: ctx._maybe_raise_remote_err(re) - # fname: str = func.__name__ cs: CancelScope = ctx._scope if cs.cancel_called: our_uid: tuple = actor.uid @@ -382,7 +620,16 @@ async def _invoke( div_str + f'<= canceller: {canceller}\n' f'=> uid: {our_uid}\n' - f' |_{ctx._task}()\n' + f' |_{ctx._task}()' + + # TODO: instead just show the + # ctx.__str__() here? + # -[ ] textwrap.indent() it correctly! + # -[ ] BUT we need to wait until + # the state is filled out before emitting + # this msg right ow its kinda empty? bleh.. + # + # f' |_{ctx}' ) # TODO: does this ever get set any more or can @@ -391,7 +638,7 @@ async def _invoke( msg += ( # '------ - ------\n' # 'IPC msg:\n' - f'\n{ctx._cancel_msg}' + f'\n\n{ctx._cancel_msg}' ) # task-contex was either cancelled by request using @@ -399,180 +646,68 @@ async def _invoke( # on the far end, or it was cancelled by the local # (callee) task, so relay this cancel signal to the # other side. - raise ContextCancelled( + ctxc = ContextCancelled( msg, suberror_type=trio.Cancelled, canceller=canceller, ) + # assign local error so that the `.outcome` + # resolves to an error for both reporting and + # state checks. + ctx._local_error = ctxc + raise ctxc - # regular async function/method - # XXX: possibly just a scheduled `Actor._cancel_task()` - # from a remote request to cancel some `Context`. - # ------ - ------ - # TODO: ideally we unify this with the above `context=True` - # block such that for any remote invocation ftype, we - # always invoke the far end RPC task scheduling the same - # way: using the linked IPC context machinery. - else: - try: - await chan.send({ - 'functype': 'asyncfunc', - 'cid': cid - }) - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - BrokenPipeError, - ) as ipc_err: - failed_resp = True - if is_rpc: - raise - else: - # TODO: should this be an `.exception()` call? - log.warning( - f'Failed to respond to non-rpc request: {func}\n' - f'{ipc_err}' - ) + # XXX: do we ever trigger this block any more? + except ( + BaseExceptionGroup, + trio.Cancelled, + BaseException, - with cancel_scope as cs: - ctx._scope: CancelScope = cs - task_status.started(ctx) - result = await coro - fname: str = func.__name__ - log.runtime( - 'RPC complete:\n' - f'task: {ctx._task}\n' - f'|_cid={ctx.cid}\n' - f'|_{fname}() -> {pformat(result)}\n' - ) + ) as scope_error: - # NOTE: only send result if we know IPC isn't down - if ( - not failed_resp - and chan.connected() - ): - try: - await chan.send( - {'return': result, - 'cid': cid} - ) - except ( - BrokenPipeError, - trio.BrokenResourceError, - ): - log.warning( - 'Failed to return result:\n' - f'{func}@{actor.uid}\n' - f'remote chan: {chan.uid}' - ) + # always set this (callee) side's exception as the + # local error on the context + ctx._local_error: BaseException = scope_error - except ( - Exception, - BaseExceptionGroup, - ) as err: + # if a remote error was set then likely the + # exception group was raised due to that, so + # and we instead raise that error immediately! + ctx.maybe_raise() - # always hide this frame from debug REPL if the crash - # originated from an rpc task and we DID NOT fail - # due to an IPC transport error! - if ( - is_rpc - and chan.connected() - ): - __tracebackhide__: bool = True - - if not is_multi_cancelled(err): - - # TODO: maybe we'll want different "levels" of debugging - # eventualy such as ('app', 'supervisory', 'runtime') ? - - # if not isinstance(err, trio.ClosedResourceError) and ( - # if not is_multi_cancelled(err) and ( - - entered_debug: bool = False - if ( - not isinstance(err, ContextCancelled) - or ( - isinstance(err, ContextCancelled) - and ctx._cancel_called - - # if the root blocks the debugger lock request from a child - # we will get a remote-cancelled condition. - and ctx._enter_debugger_on_cancel - ) - ): - # XXX QUESTION XXX: is there any case where we'll - # want to debug IPC disconnects as a default? - # => I can't think of a reason that inspecting this - # type of failure will be useful for respawns or - # recovery logic - the only case is some kind of - # strange bug in our transport layer itself? Going - # to keep this open ended for now. - entered_debug = await _debug._maybe_enter_pm(err) - - if not entered_debug: - log.exception("Actor crashed:") - - # always ship errors back to caller - err_msg: dict[str, dict] = pack_error( - err, - # tb=tb, # TODO: special tb fmting? - cid=cid, - ) - - if is_rpc: - try: - await chan.send(err_msg) - - # TODO: tests for this scenario: - # - RPC caller closes connection before getting a response - # should **not** crash this actor.. - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - BrokenPipeError, - ) as ipc_err: - - # if we can't propagate the error that's a big boo boo - log.exception( - f"Failed to ship error to caller @ {chan.uid} !?\n" - f'{ipc_err}' - - ) - - # error is probably from above coro running code *not from the - # underlyingn rpc invocation* since a scope was never allocated - # around actual coroutine await. - if ctx._scope is None: - # we don't ever raise directly here to allow the - # msg-loop-scheduler to continue running for this - # channel. - task_status.started(err) - - finally: - # RPC task bookeeping - try: - ctx, func, is_complete = actor._rpc_tasks.pop( - (chan, cid) - ) - is_complete.set() - - except KeyError: - if is_rpc: - # If we're cancelled before the task returns then the - # cancel scope will not have been inserted yet - log.warning( - f"Task {func} likely errored or cancelled before start") - else: - log.cancel( - 'Failed to de-alloc internal task!?\n' - f'cid: {cid}\n' - f'{func.__name__}({kwargs})' - ) + # maybe TODO: pack in come kinda + # `trio.Cancelled.__traceback__` here so they can be + # unwrapped and displayed on the caller side? no se.. + raise + # `@context` entrypoint task bookeeping. + # i.e. only pop the context tracking if used ;) finally: - if not actor._rpc_tasks: - log.runtime("All RPC tasks have completed") - actor._ongoing_rpc_tasks.set() + assert chan.uid + + # don't pop the local context until we know the + # associated child isn't in debug any more + await maybe_wait_for_debugger() + ctx: Context = actor._contexts.pop( + (chan.uid, cid) + ) + + merr: Exception|None = ctx.maybe_error + + ( + res_type_str, + res_str, + ) = ( + ('error', f'{type(merr)}',) + if merr + else ( + 'result', + f'`{repr(ctx.outcome)}`', + ) + ) + log.cancel( + f'IPC context terminated with a final {res_type_str}\n\n' + f'{ctx}\n' + ) def _get_mod_abspath(module: ModuleType) -> str: @@ -878,20 +1013,29 @@ class Actor: Entry point for new inbound connections to the channel server. ''' - self._no_more_peers = trio.Event() # unset - + self._no_more_peers = trio.Event() # unset by making new chan = Channel.from_stream(stream) - their_uid: tuple[str, str] | None = chan.uid + their_uid: tuple[str, str]|None = chan.uid + + con_msg: str = '' if their_uid: - log.warning( - f'Re-connection from already known {their_uid}' + # NOTE: `.uid` is only set after first contact + con_msg = ( + 'IPC Re-connection from already known peer? ' ) else: - log.runtime(f'New connection to us @{chan.raddr}') + con_msg = ( + 'New IPC connection to us ' + ) + con_msg += ( + f'<= @{chan.raddr}\n' + f'|_{chan}\n' + # f' |_@{chan.raddr}\n\n' + ) # send/receive initial handshake response try: - uid = await self._do_handshake(chan) + uid: tuple|None = await self._do_handshake(chan) except ( # we need this for ``msgspec`` for some reason? # for now, it's been put in the stream backend. @@ -906,44 +1050,66 @@ class Actor: # inside ``open_root_actor()`` where there is a check for # a bound listener on the "arbiter" addr. the reset will be # because the handshake was never meant took place. - log.warning(f"Channel {chan} failed to handshake") + log.warning( + con_msg + + + ' -> But failed to handshake? Ignoring..\n' + ) return - # channel tracking + con_msg += ( + f' -> Handshake with actor `{uid[0]}[{uid[1][-6:]}]` complete\n' + ) + # IPC connection tracking for both peers and new children: + # - if this is a new channel to a locally spawned + # sub-actor there will be a spawn wait even registered + # by a call to `.wait_for_peer()`. + # - if a peer is connecting no such event will exit. event: trio.Event|None = self._peer_connected.pop( uid, None, ) if event: - # Instructing connection: this is likely a new channel to - # a recently spawned actor which we'd like to control via - # async-rpc calls. - log.runtime(f"Waking channel waiters {event.statistics()}") - # Alert any task waiting on this connection to come up + con_msg += ( + ' -> Waking subactor spawn waiters: ' + f'{event.statistics().tasks_waiting}\n' + f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n' + # f' {event}\n' + # f' |{event.statistics()}\n' + ) + # wake tasks waiting on this IPC-transport "connect-back" event.set() + else: + con_msg += ( + f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' + ) # type: ignore + chans: list[Channel] = self._peers[uid] - if chans: - # TODO: re-use channels for new connections instead - # of always new ones? - # => will require changing all the discovery funcs.. - log.runtime( - f"already have channel(s) for {uid}:{chans}?" - ) + # if chans: + # # TODO: re-use channels for new connections instead + # # of always new ones? + # # => will require changing all the discovery funcs.. # append new channel - log.runtime(f"Registered {chan} for {uid}") # type: ignore # TODO: can we just use list-ref directly? - # chans.append(chan) - self._peers[uid].append(chan) + chans.append(chan) + + log.runtime(con_msg) # Begin channel management - respond to remote requests and # process received reponses. disconnected: bool = False try: - disconnected: bool = await process_messages(self, chan) + disconnected: bool = await process_messages( + self, + chan, + ) except trio.Cancelled: - log.cancel(f'Msg loop was cancelled for {chan}') + log.cancel( + 'IPC transport msg loop was cancelled for \n' + f'|_{chan}\n' + ) raise finally: @@ -957,7 +1123,10 @@ class Actor: # moving on with closing our own side. if local_nursery: if chan._cancel_called: - log.cancel(f'Waiting on cancel request to peer {chan.uid}') + log.cancel( + 'Waiting on cancel request to peer\n' + f'`Portal.cancel_actor()` => {chan.uid}\n' + ) # XXX: this is a soft wait on the channel (and its # underlying transport protocol) to close from the @@ -970,10 +1139,13 @@ class Actor: # loop processing. with trio.move_on_after(0.5) as cs: cs.shield = True - # Attempt to wait for the far end to close the channel - # and bail after timeout (2-generals on closure). - assert chan.msgstream - async for msg in chan.msgstream.drain(): + + # attempt to wait for the far end to close the + # channel and bail after timeout (a 2-generals + # problem on closure). + assert chan.transport + async for msg in chan.transport.drain(): + # try to deliver any lingering msgs # before we destroy the channel. # This accomplishes deterministic @@ -985,7 +1157,7 @@ class Actor: 'Draining msg from disconnected peer\n' f'{chan.uid}\n' f'|_{chan}\n' - f' |_{chan.msgstream}\n\n' + f' |_{chan.transport}\n\n' f'{pformat(msg)}\n' ) @@ -998,11 +1170,30 @@ class Actor: msg, ) - log.runtime( - 'Waiting on local actor nursery to exit..\n' + # NOTE: when no call to `open_root_actor()` was + # made, we implicitly make that call inside + # the first `.open_nursery()`, in this case we + # can assume that we are the root actor and do + # not have to wait for the nursery-enterer to + # exit before shutting down the actor runtime. + # + # see matching note inside `._supervise.open_nursery()` + if not local_nursery._implicit_runtime_started: + log.runtime( + 'Waiting on local actor nursery to exit..\n' + f'|_{local_nursery}\n' + ) + await local_nursery.exited.wait() + + if ( + cs.cancelled_caught + and not local_nursery._implicit_runtime_started + ): + log.warning( + 'Failed to exit local actor nursery?\n' f'|_{local_nursery}\n' ) - await local_nursery.exited.wait() + # await _debug.pause() if disconnected: # if the transport died and this actor is still @@ -1022,7 +1213,7 @@ class Actor: log.cancel( f'Peer IPC broke but subproc is alive?\n\n' - f'<=x @{chan.raddr}\n' + f'<=x {chan.uid}@{chan.raddr}\n' f' |_{proc}\n' ) @@ -1033,9 +1224,9 @@ class Actor: f'uid: {chan.uid}\n' f'|_{pformat(chan)}\n' ) - chans = self._peers.get(chan.uid) chans.remove(chan) + # TODO: do we need to be this pedantic? if not chans: log.runtime( f'No more channels with {chan.uid}' @@ -1045,7 +1236,7 @@ class Actor: peers_str: str = '' for uid, chans in self._peers.items(): peers_str += ( - f'- uid: {uid}\n' + f'|_ uid: {uid}\n' ) for i, chan in enumerate(chans): peers_str += ( @@ -1487,22 +1678,27 @@ class Actor: requesting_uid, requester_type, req_chan, + log_meth, ) = ( req_chan.uid, 'peer', req_chan, + log.cancel, ) if req_chan else ( # a self cancel of ALL rpc tasks self.uid, 'self', - self + self, + log.runtime, ) + # TODO: just use the new `Context.repr_rpc: str` (and + # other) repr fields instead of doing this all manual.. msg: str = ( - f'`Actor.cancel()` request from {requester_type}:\n' - f'<= {requesting_uid}\n' + f'Runtime cancel request from {requester_type}:\n\n' + f'<= .cancel(): {requesting_uid}\n' ) # TODO: what happens here when we self-cancel tho? @@ -1541,7 +1737,7 @@ class Actor: if self._service_n: self._service_n.cancel_scope.cancel() - log.cancel(msg) + log_meth(msg) self._cancel_complete.set() return True @@ -1604,20 +1800,23 @@ class Actor: return True log.cancel( - 'Cancel request for RPC task\n' - f'<= canceller: {requesting_uid}\n\n' + 'Cancel request for RPC task\n\n' + f'<= ._cancel_task(): {requesting_uid}\n' + f' |_ @{ctx.dmaddr}\n\n' # TODO: better ascii repr for "supervisor" like # a nursery or context scope? - f'=> ipc-parent: {parent_chan}\n' + # f'=> {parent_chan}\n' + f'=> {ctx._task}\n' # TODO: simplified `Context.__repr__()` fields output # shows only application state-related stuff like, # - ._stream # - .closed # - .started_called # - .. etc. - f' |_ctx: {cid}\n' - f' >> {ctx._nsf}()\n' + f' >> {ctx.repr_rpc}\n' + # f' |_ctx: {cid}\n' + # f' >> {ctx._nsf}()\n' ) if ( ctx._canceller is None @@ -1670,7 +1869,7 @@ class Actor: ''' tasks: dict = self._rpc_tasks if not tasks: - log.warning( + log.runtime( 'Actor has no cancellable RPC tasks?\n' f'<= canceller: {req_uid}\n' ) @@ -1700,11 +1899,17 @@ class Actor: f' |>> {ctx._nsf}() -> dict:\n' ) + descr: str = ( + 'all' if not parent_chan + else + "IPC channel's " + ) + log.cancel( - f'Cancelling all {len(tasks)} rpc tasks:\n\n' - f'<= .cancel() from {req_uid}\n' - f'{self}\n' - f'{tasks_str}' + f'Cancelling {descr} {len(tasks)} rpc tasks\n\n' + f'<= .cancel_rpc_tasks(): {req_uid}\n' + # f'{self}\n' + # f'{tasks_str}' ) for ( (task_caller_chan, cid), @@ -1733,10 +1938,11 @@ class Actor: requesting_uid=req_uid, ) - log.cancel( - 'Waiting for remaining rpc tasks to complete\n' - f'|_{tasks}' - ) + if tasks: + log.cancel( + 'Waiting for remaining rpc tasks to complete\n' + f'|_{tasks}' + ) await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: @@ -1793,21 +1999,21 @@ class Actor: ) -> tuple[str, str]: ''' - Exchange (name, UUIDs) identifiers as the first communication step. + Exchange `(name, UUIDs)` identifiers as the first + communication step. - These are essentially the "mailbox addresses" found in actor model - parlance. + These are essentially the "mailbox addresses" found in + actor model parlance. ''' await chan.send(self.uid) - value = await chan.recv() + value: tuple = await chan.recv() uid: tuple[str, str] = (str(value[0]), str(value[1])) if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") chan.uid = str(uid[0]), str(uid[1]) - log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid def is_infected_aio(self) -> bool: @@ -1970,7 +2176,10 @@ async def async_main( shield=True, ) ) - log.runtime("Waiting on service nursery to complete") + log.runtime( + 'Actor runtime is up!' + # 'Blocking on service nursery to exit..\n' + ) log.runtime( "Service nursery complete\n" "Waiting on root nursery to complete" @@ -2016,11 +2225,13 @@ async def async_main( raise finally: - log.info("Runtime nursery complete") - + log.runtime( + 'Runtime nursery complete' + '-> Closing all actor lifetime contexts..' + ) # tear down all lifetime contexts if not in guest mode # XXX: should this just be in the entrypoint? - log.info("Closing all actor lifetime contexts") + actor.lifetime_stack.close() # TODO: we can't actually do this bc the debugger # uses the _service_n to spawn the lock task, BUT, @@ -2031,9 +2242,7 @@ async def async_main( # with CancelScope(shield=True): # await _debug.breakpoint() - actor.lifetime_stack.close() - - # Unregister actor from the registry + # Unregister actor from the registry-sys / registrar. if ( is_registered and not actor.is_registrar @@ -2241,13 +2450,14 @@ async def process_messages( 'parent_chan': chan, 'requesting_uid': chan.uid, } - log.cancel( - f'Rx task cancel request\n' - f'<= canceller: {chan.uid}\n' - f' |_{chan}\n\n' - f'=> {actor}\n' - f' |_cid: {target_cid}\n' - ) + # TODO: remove? already have emit in meth. + # log.runtime( + # f'Rx RPC task cancel request\n' + # f'<= canceller: {chan.uid}\n' + # f' |_{chan}\n\n' + # f'=> {actor}\n' + # f' |_cid: {target_cid}\n' + # ) try: await _invoke( actor, @@ -2527,6 +2737,11 @@ class Arbiter(Actor): sockaddr: tuple[str, int] for (aname, _), sockaddr in self._registry.items(): + log.info( + f'Actor mailbox info:\n' + f'aname: {aname}\n' + f'sockaddr: {sockaddr}\n' + ) if name == aname: sockaddrs.append(sockaddr)