From 3cd222959a4da45f19152c1a67a56550b6b8724c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 11 Jun 2025 16:44:47 -0400 Subject: [PATCH] Decouple actor-state from low-level ipc-server As much as is possible given we currently do some graceful cancellation join-waiting on any connected sub-actors whenever an active `local_nursery: AcrtorNursery` in the post-rpc teardown sequence of `handle_stream_from_peer()` is detected. In such cases we try to allow the higher level inter-actor (task) context(s) to fully cancelled-ack before conducting IPC machinery shutdown. The main immediate motivation for all this is to support unit testing the `.ipc._server` APIs but in the future may be useful for anyone wanting to use our modular IPC transport layer sin-"actors". Impl deats, - drop passing an `actor: Actor` ref from as many routines in `.ipc._server` as possible instead opting to use `._state.current_actor()` where abs needed; thus the fns dropping an `actor` input param are: - `open_ipc_server()` - `IPCServer.listen_on()` - `._serve_ipc_eps()` - `.handle_stream_from_peer()` - factor the above mentioned graceful remote-cancel-ack waiting into a new `maybe_wait_on_canced_subs()` which is called from `handle_stream_from_peer()` and delivers a maybe-`local_nursery: ActorNursery` for downstream logic; it's this new fn which primarily still needs to call `current_actor()`. - in `handle_stream_from_peer()` also use `current_actor()` to check if a handshake is needed (or if it was called as part of some actor-runtime-less operation like our unit test suite!). - also don't pass an `actor` to `._rpc.process_messages()` see how-n-why below.. Surrounding ipc-server client/caller adjustments, - `._rpc.process_messages()` no longer takes an `actor` input and now calls `current_actor()` instead. - `._portal.open_portal()` is adjusted to ^. - `._runtime.async_main()` is adjusted to the `.ipc._server`'s removal of `actor` ref passing. Also, - drop some server `log.info()`s to `.runtime()` --- tractor/_portal.py | 3 +- tractor/_rpc.py | 2 +- tractor/_runtime.py | 9 +- tractor/ipc/_server.py | 427 +++++++++++++++++++++++------------------ 4 files changed, 243 insertions(+), 198 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index d5dd8369..c741df7d 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -582,8 +582,7 @@ async def open_portal( msg_loop_cs = await tn.start( partial( _rpc.process_messages, - actor, - channel, + chan=channel, # if the local task is cancelled we want to keep # the msg loop running until our block ends shield=True, diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 3fc11db7..c9f4312e 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -869,7 +869,6 @@ async def try_ship_error_to_remote( async def process_messages( - actor: Actor, chan: Channel, shield: bool = False, task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, @@ -907,6 +906,7 @@ async def process_messages( (as utilized inside `Portal.cancel_actor()` ). ''' + actor: Actor = _state.current_actor() assert actor._service_n # runtime state sanity # TODO: once `trio` get's an "obvious way" for req/resp we diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 39ac3309..0097e224 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -1262,6 +1262,10 @@ async def async_main( the actor's "runtime" and all thus all ongoing RPC tasks. ''' + # XXX NOTE, `_state._current_actor` **must** be set prior to + # calling this core runtime entrypoint! + assert actor is _state.current_actor() + actor._task: trio.Task = trio.lowlevel.current_task() # attempt to retreive ``trio``'s sigint handler and stash it @@ -1321,7 +1325,6 @@ async def async_main( ) as service_nursery, _server.open_ipc_server( - actor=actor, parent_tn=service_nursery, stream_handler_tn=service_nursery, ) as ipc_server, @@ -1375,7 +1378,6 @@ async def async_main( 'Booting IPC server' ) eps: list = await ipc_server.listen_on( - actor=actor, accept_addrs=accept_addrs, stream_handler_nursery=service_nursery, ) @@ -1460,8 +1462,7 @@ async def async_main( await root_nursery.start( partial( _rpc.process_messages, - actor, - actor._parent_chan, + chan=actor._parent_chan, shield=True, ) ) diff --git a/tractor/ipc/_server.py b/tractor/ipc/_server.py index e9affccd..7df67aac 100644 --- a/tractor/ipc/_server.py +++ b/tractor/ipc/_server.py @@ -72,11 +72,223 @@ if TYPE_CHECKING: log = log.get_logger(__name__) +async def maybe_wait_on_canced_subs( + uid: tuple[str, str], + chan: Channel, + disconnected: bool, + + actor: Actor|None = None, + chan_drain_timeout: float = 0.5, + an_exit_timeout: float = 0.5, + +) -> ActorNursery|None: + ''' + When a process-local actor-nursery is found for the given actor + `uid` (i.e. that peer is **also** a subactor of this parent), we + attempt to (with timeouts) wait on, + + - all IPC msgs to drain on the (common) `Channel` such that all + local `Context`-parent-tasks can also gracefully collect + `ContextCancelled` msgs from their respective remote children + vs. a `chan_drain_timeout`. + + - the actor-nursery to cancel-n-join all its supervised children + (processes) *gracefully* vs. a `an_exit_timeout` and thus also + detect cases where the IPC transport connection broke but + a sub-process is detected as still alive (a case that happens + when the subactor is still in an active debugger REPL session). + + If the timeout expires in either case we ofc report with warning. + + ''' + actor = actor or _state.current_actor() + + # XXX running outside actor-runtime usage, + # - unit testing + # - possibly manual usage (eventually) ? + if not actor: + return None + + local_nursery: ( + ActorNursery|None + ) = actor._actoruid2nursery.get(uid) + + # This is set in `Portal.cancel_actor()`. So if + # the peer was cancelled we try to wait for them + # to tear down their side of the connection before + # moving on with closing our own side. + if ( + local_nursery + and ( + actor._cancel_called + or + chan._cancel_called + ) + # + # ^-TODO-^ along with this is there another condition + # that we should filter with to avoid entering this + # waiting block needlessly? + # -[ ] maybe `and local_nursery.cancelled` and/or + # only if the `._children` table is empty or has + # only `Portal`s with .chan._cancel_called == + # True` as per what we had below; the MAIN DIFF + # BEING that just bc one `Portal.cancel_actor()` + # was called, doesn't mean the whole actor-nurse + # is gonna exit any time soon right!? + # + # or + # all(chan._cancel_called for chan in chans) + + ): + log.cancel( + 'Waiting on cancel request to peer..\n' + f'c)=>\n' + f' |_{chan.uid}\n' + ) + + # XXX: this is a soft wait on the channel (and its + # underlying transport protocol) to close from the + # remote peer side since we presume that any channel + # which is mapped to a sub-actor (i.e. it's managed + # by local actor-nursery) has a message that is sent + # to the peer likely by this actor (which may be in + # a shutdown sequence due to cancellation) when the + # local runtime here is now cancelled while + # (presumably) in the middle of msg loop processing. + chan_info: str = ( + f'{chan.uid}\n' + f'|_{chan}\n' + f' |_{chan.transport}\n\n' + ) + with trio.move_on_after(chan_drain_timeout) as drain_cs: + drain_cs.shield = True + + # attempt to wait for the far end to close the + # channel and bail after timeout (a 2-generals + # problem on closure). + assert chan.transport + async for msg in chan.transport.drain(): + + # try to deliver any lingering msgs + # before we destroy the channel. + # This accomplishes deterministic + # ``Portal.cancel_actor()`` cancellation by + # making sure any RPC response to that call is + # delivered the local calling task. + # TODO: factor this into a helper? + log.warning( + 'Draining msg from disconnected peer\n' + f'{chan_info}' + f'{pformat(msg)}\n' + ) + # cid: str|None = msg.get('cid') + cid: str|None = msg.cid + if cid: + # deliver response to local caller/waiter + await actor._deliver_ctx_payload( + chan, + cid, + msg, + ) + if drain_cs.cancelled_caught: + log.warning( + 'Timed out waiting on IPC transport channel to drain?\n' + f'{chan_info}' + ) + + # XXX NOTE XXX when no explicit call to + # `open_root_actor()` was made by the application + # (normally we implicitly make that call inside + # the first `.open_nursery()` in root-actor + # user/app code), we can assume that either we + # are NOT the root actor or are root but the + # runtime was started manually. and thus DO have + # to wait for the nursery-enterer to exit before + # shutting down the local runtime to avoid + # clobbering any ongoing subactor + # teardown/debugging/graceful-cancel. + # + # see matching note inside `._supervise.open_nursery()` + # + # TODO: should we have a separate cs + timeout + # block here? + if ( + # XXX SO either, + # - not root OR, + # - is root but `open_root_actor()` was + # entered manually (in which case we do + # the equiv wait there using the + # `devx.debug` sub-sys APIs). + not local_nursery._implicit_runtime_started + ): + log.runtime( + 'Waiting on local actor nursery to exit..\n' + f'|_{local_nursery}\n' + ) + with trio.move_on_after(an_exit_timeout) as an_exit_cs: + an_exit_cs.shield = True + await local_nursery.exited.wait() + + # TODO: currently this is always triggering for every + # sub-daemon spawned from the `piker.services._mngr`? + # -[ ] how do we ensure that the IPC is supposed to + # be long lived and isn't just a register? + # |_ in the register case how can we signal that the + # ephemeral msg loop was intentional? + if ( + # not local_nursery._implicit_runtime_started + # and + an_exit_cs.cancelled_caught + ): + report: str = ( + 'Timed out waiting on local actor-nursery to exit?\n' + f'c)>\n' + f' |_{local_nursery}\n' + ) + if children := local_nursery._children: + # indent from above local-nurse repr + report += ( + f' |_{pformat(children)}\n' + ) + + log.warning(report) + + if disconnected: + # if the transport died and this actor is still + # registered within a local nursery, we report + # that the IPC layer may have failed + # unexpectedly since it may be the cause of + # other downstream errors. + entry: tuple|None = local_nursery._children.get(uid) + if entry: + proc: trio.Process + _, proc, _ = entry + + if ( + (poll := getattr(proc, 'poll', None)) + and + poll() is None # proc still alive + ): + # TODO: change log level based on + # detecting whether chan was created for + # ephemeral `.register_actor()` request! + # -[ ] also, that should be avoidable by + # re-using any existing chan from the + # `._discovery.get_registry()` call as + # well.. + log.runtime( + f'Peer IPC broke but subproc is alive?\n\n' + + f'<=x {chan.uid}@{chan.raddr}\n' + f' |_{proc}\n' + ) + + return local_nursery + # TODO multi-tpt support with per-proto peer tracking? # # -[x] maybe change to mod-func and rename for implied # multi-transport semantics? -# # -[ ] register each stream/tpt/chan with the owning `IPCEndpoint` # so that we can query per tpt all peer contact infos? # |_[ ] possibly provide a global viewing via a @@ -87,7 +299,6 @@ async def handle_stream_from_peer( *, server: IPCServer, - actor: Actor, ) -> None: ''' @@ -119,9 +330,10 @@ async def handle_stream_from_peer( # initial handshake with peer phase try: - peer_aid: msgtypes.Aid = await chan._do_handshake( - aid=actor.aid, - ) + if actor := _state.current_actor(): + peer_aid: msgtypes.Aid = await chan._do_handshake( + aid=actor.aid, + ) except ( TransportClosed, # ^XXX NOTE, the above wraps `trio` exc types raised @@ -222,8 +434,7 @@ async def handle_stream_from_peer( disconnected, last_msg, ) = await _rpc.process_messages( - actor, - chan, + chan=chan, ) except trio.Cancelled: log.cancel( @@ -234,179 +445,16 @@ async def handle_stream_from_peer( raise finally: - local_nursery: ( - ActorNursery|None - ) = actor._actoruid2nursery.get(uid) - # This is set in ``Portal.cancel_actor()``. So if - # the peer was cancelled we try to wait for them - # to tear down their side of the connection before - # moving on with closing our own side. - if ( - local_nursery - and ( - actor._cancel_called - or - chan._cancel_called - ) - # - # ^-TODO-^ along with this is there another condition - # that we should filter with to avoid entering this - # waiting block needlessly? - # -[ ] maybe `and local_nursery.cancelled` and/or - # only if the `._children` table is empty or has - # only `Portal`s with .chan._cancel_called == - # True` as per what we had below; the MAIN DIFF - # BEING that just bc one `Portal.cancel_actor()` - # was called, doesn't mean the whole actor-nurse - # is gonna exit any time soon right!? - # - # or - # all(chan._cancel_called for chan in chans) - - ): - log.cancel( - 'Waiting on cancel request to peer..\n' - f'c)=>\n' - f' |_{chan.uid}\n' - ) - - # XXX: this is a soft wait on the channel (and its - # underlying transport protocol) to close from the - # remote peer side since we presume that any channel - # which is mapped to a sub-actor (i.e. it's managed - # by local actor-nursery) has a message that is sent - # to the peer likely by this actor (which may be in - # a shutdown sequence due to cancellation) when the - # local runtime here is now cancelled while - # (presumably) in the middle of msg loop processing. - chan_info: str = ( - f'{chan.uid}\n' - f'|_{chan}\n' - f' |_{chan.transport}\n\n' - ) - with trio.move_on_after(0.5) as drain_cs: - drain_cs.shield = True - - # attempt to wait for the far end to close the - # channel and bail after timeout (a 2-generals - # problem on closure). - assert chan.transport - async for msg in chan.transport.drain(): - - # try to deliver any lingering msgs - # before we destroy the channel. - # This accomplishes deterministic - # ``Portal.cancel_actor()`` cancellation by - # making sure any RPC response to that call is - # delivered the local calling task. - # TODO: factor this into a helper? - log.warning( - 'Draining msg from disconnected peer\n' - f'{chan_info}' - f'{pformat(msg)}\n' - ) - # cid: str|None = msg.get('cid') - cid: str|None = msg.cid - if cid: - # deliver response to local caller/waiter - await actor._deliver_ctx_payload( - chan, - cid, - msg, - ) - if drain_cs.cancelled_caught: - log.warning( - 'Timed out waiting on IPC transport channel to drain?\n' - f'{chan_info}' - ) - - # XXX NOTE XXX when no explicit call to - # `open_root_actor()` was made by the application - # (normally we implicitly make that call inside - # the first `.open_nursery()` in root-actor - # user/app code), we can assume that either we - # are NOT the root actor or are root but the - # runtime was started manually. and thus DO have - # to wait for the nursery-enterer to exit before - # shutting down the local runtime to avoid - # clobbering any ongoing subactor - # teardown/debugging/graceful-cancel. - # - # see matching note inside `._supervise.open_nursery()` - # - # TODO: should we have a separate cs + timeout - # block here? - if ( - # XXX SO either, - # - not root OR, - # - is root but `open_root_actor()` was - # entered manually (in which case we do - # the equiv wait there using the - # `devx._debug` sub-sys APIs). - not local_nursery._implicit_runtime_started - ): - log.runtime( - 'Waiting on local actor nursery to exit..\n' - f'|_{local_nursery}\n' - ) - with trio.move_on_after(0.5) as an_exit_cs: - an_exit_cs.shield = True - await local_nursery.exited.wait() - - # TODO: currently this is always triggering for every - # sub-daemon spawned from the `piker.services._mngr`? - # -[ ] how do we ensure that the IPC is supposed to - # be long lived and isn't just a register? - # |_ in the register case how can we signal that the - # ephemeral msg loop was intentional? - if ( - # not local_nursery._implicit_runtime_started - # and - an_exit_cs.cancelled_caught - ): - report: str = ( - 'Timed out waiting on local actor-nursery to exit?\n' - f'c)>\n' - f' |_{local_nursery}\n' - ) - if children := local_nursery._children: - # indent from above local-nurse repr - report += ( - f' |_{pformat(children)}\n' - ) - - log.warning(report) - - if disconnected: - # if the transport died and this actor is still - # registered within a local nursery, we report - # that the IPC layer may have failed - # unexpectedly since it may be the cause of - # other downstream errors. - entry: tuple|None = local_nursery._children.get(uid) - if entry: - proc: trio.Process - _, proc, _ = entry - - if ( - (poll := getattr(proc, 'poll', None)) - and - poll() is None # proc still alive - ): - # TODO: change log level based on - # detecting whether chan was created for - # ephemeral `.register_actor()` request! - # -[ ] also, that should be avoidable by - # re-using any existing chan from the - # `._discovery.get_registry()` call as - # well.. - log.runtime( - f'Peer IPC broke but subproc is alive?\n\n' - - f'<=x {chan.uid}@{chan.raddr}\n' - f' |_{proc}\n' - ) + # check if there are subs which we should gracefully join at + # both the inter-actor-task and subprocess levels to + # gracefully remote cancel and later disconnect (particularly + # for permitting subs engaged in active debug-REPL sessions). + local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs( + uid=uid, + chan=chan, + disconnected=disconnected, + ) # ``Channel`` teardown and closure sequence # drop ref to channel so it can be gc-ed and disconnected @@ -467,11 +515,11 @@ async def handle_stream_from_peer( # from broken debug TTY locking due to # msg-spec races on application using RunVar... if ( + local_nursery + and (ctx_in_debug := pdb_lock.ctx_in_debug) and (pdb_user_uid := ctx_in_debug.chan.uid) - and - local_nursery ): entry: tuple|None = local_nursery._children.get( tuple(pdb_user_uid) @@ -804,7 +852,6 @@ class IPCServer(Struct): async def listen_on( self, *, - actor: Actor, accept_addrs: list[tuple[str, int|str]]|None = None, stream_handler_nursery: Nursery|None = None, ) -> list[IPCEndpoint]: @@ -837,20 +884,19 @@ class IPCServer(Struct): f'{self}\n' ) - log.info( + log.runtime( f'Binding to endpoints for,\n' f'{accept_addrs}\n' ) eps: list[IPCEndpoint] = await self._parent_tn.start( partial( _serve_ipc_eps, - actor=actor, server=self, stream_handler_tn=stream_handler_nursery, listen_addrs=accept_addrs, ) ) - log.info( + log.runtime( f'Started IPC endpoints\n' f'{eps}\n' ) @@ -873,7 +919,6 @@ class IPCServer(Struct): async def _serve_ipc_eps( *, - actor: Actor, server: IPCServer, stream_handler_tn: Nursery, listen_addrs: list[tuple[str, int|str]], @@ -907,12 +952,13 @@ async def _serve_ipc_eps( stream_handler_tn=stream_handler_tn, ) try: - log.info( + log.runtime( f'Starting new endpoint listener\n' f'{ep}\n' ) listener: trio.abc.Listener = await ep.start_listener() assert listener is ep._listener + # actor = _state.current_actor() # if actor.is_registry: # import pdbp; pdbp.set_trace() @@ -937,7 +983,6 @@ async def _serve_ipc_eps( handler=partial( handle_stream_from_peer, server=server, - actor=actor, ), listeners=listeners, @@ -948,13 +993,13 @@ async def _serve_ipc_eps( ) ) # TODO, wow make this message better! XD - log.info( + log.runtime( 'Started server(s)\n' + '\n'.join([f'|_{addr}' for addr in listen_addrs]) ) - log.info( + log.runtime( f'Started IPC endpoints\n' f'{eps}\n' ) @@ -970,6 +1015,7 @@ async def _serve_ipc_eps( ep.close_listener() server._endpoints.remove(ep) + # actor = _state.current_actor() # if actor.is_arbiter: # import pdbp; pdbp.set_trace() @@ -980,7 +1026,6 @@ async def _serve_ipc_eps( @acm async def open_ipc_server( - actor: Actor, parent_tn: Nursery|None = None, stream_handler_tn: Nursery|None = None,