From 112ed27cda09c612f544a35b3ea6ee238e0ba07a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 11 Apr 2025 16:55:03 -0400 Subject: [PATCH] Move peer-tracking attrs from `Actor` -> `IPCServer` Namely transferring the `Actor` peer-`Channel` tracking attrs, - `._peers` which maps the uids to client channels (with duplicates apparently..) - the `._peer_connected: dict[tuple[str, str], trio.Event]` child-peer syncing table mostly used by parent actors to wait on sub's to connect back during spawn. - the `._no_more_peers = trio.Event()` level triggered state signal. Further we move over with some minor reworks, - `.wait_for_peer()` verbatim (adjusting all dependants). - factor the no-more-peers shielded wait branch-block out of the end of `async_main()` into 2 new server meths, * `.has_peers()` with optional chan-connected checking flag. * `.wait_for_no_more_peers()` which *just* does the maybe-shielded `._no_more_peers.wait()` --- tests/test_spawning.py | 6 +- tractor/_discovery.py | 8 +- tractor/_runtime.py | 62 +++++----------- tractor/_spawn.py | 8 +- tractor/_supervise.py | 10 ++- tractor/devx/_debug.py | 9 ++- tractor/ipc/_server.py | 163 ++++++++++++++++++++++++++++++++--------- 7 files changed, 178 insertions(+), 88 deletions(-) diff --git a/tests/test_spawning.py b/tests/test_spawning.py index cf373ada..30e084d5 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -57,7 +57,11 @@ async def spawn( ) assert len(an._children) == 1 - assert portal.channel.uid in tractor.current_actor()._peers + assert ( + portal.channel.uid + in + tractor.current_actor().ipc_server._peers + ) # get result from child subactor result = await portal.result() diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 157d229c..fd3e4b1c 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -48,6 +48,7 @@ from ._state import ( if TYPE_CHECKING: from ._runtime import Actor + from .ipc._server import IPCServer log = get_logger(__name__) @@ -79,7 +80,7 @@ async def get_registry( ) else: # TODO: try to look pre-existing connection from - # `Actor._peers` and use it instead? + # `IPCServer._peers` and use it instead? async with ( _connect_chan(addr) as chan, open_portal(chan) as regstr_ptl, @@ -111,14 +112,15 @@ def get_peer_by_name( ) -> list[Channel]|None: # at least 1 ''' Scan for an existing connection (set) to a named actor - and return any channels from `Actor._peers`. + and return any channels from `IPCServer._peers: dict`. This is an optimization method over querying the registrar for the same info. ''' actor: Actor = current_actor() - to_scan: dict[tuple, list[Channel]] = actor._peers.copy() + server: IPCServer = actor.ipc_server + to_scan: dict[tuple, list[Channel]] = server._peers.copy() pchan: Channel|None = actor._parent_chan if pchan: to_scan[pchan.uid].append(pchan) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index b21ed0ee..39ac3309 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -40,9 +40,7 @@ from __future__ import annotations from contextlib import ( ExitStack, ) -from collections import defaultdict from functools import partial -from itertools import chain import importlib import importlib.util import os @@ -76,6 +74,7 @@ from tractor.msg import ( ) from .ipc import ( Channel, + # IPCServer, # causes cycles atm.. _server, ) from ._addr import ( @@ -156,7 +155,6 @@ class Actor: _root_n: Nursery|None = None _service_n: Nursery|None = None - # XXX moving to IPCServer! _ipc_server: _server.IPCServer|None = None @property @@ -246,14 +244,6 @@ class Actor: # by the user (currently called the "arbiter") self._spawn_method: str = spawn_method - self._peers: defaultdict[ - str, # uaid - list[Channel], # IPC conns from peer - ] = defaultdict(list) - self._peer_connected: dict[tuple[str, str], trio.Event] = {} - self._no_more_peers = trio.Event() - self._no_more_peers.set() - # RPC state self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks.set() @@ -338,7 +328,12 @@ class Actor: parent_uid: tuple|None = None if rent_chan := self._parent_chan: parent_uid = rent_chan.uid - peers: list[tuple] = list(self._peer_connected) + + peers: list = [] + server: _server.IPCServer = self.ipc_server + if server: + peers: list[tuple] = list(server._peer_connected) + fmtstr: str = ( f' |_id: {self.aid!r}\n' # f" aid{ds}{self.aid!r}\n" @@ -394,25 +389,6 @@ class Actor: self._reg_addrs = addrs - async def wait_for_peer( - self, - uid: tuple[str, str], - - ) -> tuple[trio.Event, Channel]: - ''' - Wait for a connection back from a (spawned sub-)actor with - a `uid` using a `trio.Event` for sync. - - ''' - log.debug(f'Waiting for peer {uid!r} to connect') - event = self._peer_connected.setdefault(uid, trio.Event()) - await event.wait() - log.debug(f'{uid!r} successfully connected back to us') - return ( - event, - self._peers[uid][-1], - ) - def load_modules( self, # debug_mode: bool = False, @@ -724,7 +700,7 @@ class Actor: ) assert isinstance(chan, Channel) - # Initial handshake: swap names. + # init handshake: swap actor-IDs. await chan._do_handshake(aid=self.aid) accept_addrs: list[UnwrappedAddress]|None = None @@ -1620,16 +1596,18 @@ async def async_main( ) # Ensure all peers (actors connected to us as clients) are finished - if not actor._no_more_peers.is_set(): - if any( - chan.connected() for chan in chain(*actor._peers.values()) - ): - teardown_report += ( - f'-> Waiting for remaining peers {actor._peers} to clear..\n' - ) - log.runtime(teardown_report) - with CancelScope(shield=True): - await actor._no_more_peers.wait() + if ( + (ipc_server := actor.ipc_server) + and + ipc_server.has_peers(check_chans=True) + ): + teardown_report += ( + f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n' + ) + log.runtime(teardown_report) + await ipc_server.wait_for_no_more_peers( + shield=True, + ) teardown_report += ( '-> All peer channels are complete\n' diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 5df8125a..49d14805 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -58,9 +58,11 @@ from tractor.msg.types import ( if TYPE_CHECKING: + from ipc import IPCServer from ._supervise import ActorNursery ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) + log = get_logger('tractor') # placeholder for an mp start context if so using that backend @@ -481,6 +483,7 @@ async def trio_proc( cancelled_during_spawn: bool = False proc: trio.Process|None = None + ipc_server: IPCServer = actor_nursery._actor.ipc_server try: try: proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) @@ -492,7 +495,7 @@ async def trio_proc( # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( + event, chan = await ipc_server.wait_for_peer( subactor.uid ) @@ -724,11 +727,12 @@ async def mp_proc( log.runtime(f"Started {proc}") + ipc_server: IPCServer = actor_nursery._actor.ipc_server try: # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( + event, chan = await ipc_server.wait_for_peer( subactor.uid, ) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 07ae8bb3..cdf15e19 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -53,6 +53,9 @@ from . import _spawn if TYPE_CHECKING: import multiprocessing as mp + # from .ipc._server import IPCServer + from .ipc import IPCServer + log = get_logger(__name__) @@ -315,6 +318,9 @@ class ActorNursery: children: dict = self._children child_count: int = len(children) msg: str = f'Cancelling actor nursery with {child_count} children\n' + + server: IPCServer = self._actor.ipc_server + with trio.move_on_after(3) as cs: async with trio.open_nursery( strict_exception_groups=False, @@ -337,7 +343,7 @@ class ActorNursery: else: if portal is None: # actor hasn't fully spawned yet - event = self._actor._peer_connected[subactor.uid] + event: trio.Event = server._peer_connected[subactor.uid] log.warning( f"{subactor.uid} never 't finished spawning?" ) @@ -353,7 +359,7 @@ class ActorNursery: if portal is None: # cancelled while waiting on the event # to arrive - chan = self._actor._peers[subactor.uid][-1] + chan = server._peers[subactor.uid][-1] if chan: portal = Portal(chan) else: # there's no other choice left diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index 0f05534b..67695feb 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -92,7 +92,11 @@ from tractor._state import ( if TYPE_CHECKING: from trio.lowlevel import Task from threading import Thread - from tractor.ipc import Channel + from tractor.ipc import ( + Channel, + IPCServer, + # _server, # TODO? export at top level? + ) from tractor._runtime import ( Actor, ) @@ -1434,6 +1438,7 @@ def any_connected_locker_child() -> bool: ''' actor: Actor = current_actor() + server: IPCServer = actor.ipc_server if not is_root_process(): raise InternalError('This is a root-actor only API!') @@ -1443,7 +1448,7 @@ def any_connected_locker_child() -> bool: and (uid_in_debug := ctx.chan.uid) ): - chans: list[tractor.Channel] = actor._peers.get( + chans: list[tractor.Channel] = server._peers.get( tuple(uid_in_debug) ) if chans: diff --git a/tractor/ipc/_server.py b/tractor/ipc/_server.py index 7d2ab217..e9affccd 100644 --- a/tractor/ipc/_server.py +++ b/tractor/ipc/_server.py @@ -19,10 +19,12 @@ multi-transport-protcol needs! ''' from __future__ import annotations +from collections import defaultdict from contextlib import ( asynccontextmanager as acm, ) from functools import partial +from itertools import chain import inspect from pprint import pformat from types import ( @@ -41,7 +43,7 @@ from trio import ( SocketListener, ) -from ..devx import _debug +# from ..devx import _debug from .._exceptions import ( TransportClosed, ) @@ -82,6 +84,9 @@ log = log.get_logger(__name__) # async def handle_stream_from_peer( stream: trio.SocketStream, + + *, + server: IPCServer, actor: Actor, ) -> None: @@ -99,7 +104,7 @@ async def handle_stream_from_peer( ) ''' - actor._no_more_peers = trio.Event() # unset by making new + server._no_more_peers = trio.Event() # unset by making new # TODO, debug_mode tooling for when hackin this lower layer? # with _debug.maybe_open_crash_handler( @@ -152,7 +157,7 @@ async def handle_stream_from_peer( # TODO, can we make this downstream peer tracking use the # `peer_aid` instead? familiar: str = 'new-peer' - if _pre_chan := actor._peers.get(uid): + if _pre_chan := server._peers.get(uid): familiar: str = 'pre-existing-peer' uid_short: str = f'{uid[0]}[{uid[1][-6:]}]' con_status += ( @@ -175,7 +180,7 @@ async def handle_stream_from_peer( # sub-actor there will be a spawn wait even registered # by a call to `.wait_for_peer()`. # - if a peer is connecting no such event will exit. - event: trio.Event|None = actor._peer_connected.pop( + event: trio.Event|None = server._peer_connected.pop( uid, None, ) @@ -195,7 +200,7 @@ async def handle_stream_from_peer( f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' ) # type: ignore - chans: list[Channel] = actor._peers[uid] + chans: list[Channel] = server._peers[uid] # if chans: # # TODO: re-use channels for new connections instead # # of always new ones? @@ -417,10 +422,10 @@ async def handle_stream_from_peer( con_teardown_status += ( f'-> No more channels with {chan.uid}' ) - actor._peers.pop(uid, None) + server._peers.pop(uid, None) peers_str: str = '' - for uid, chans in actor._peers.items(): + for uid, chans in server._peers.items(): peers_str += ( f'uid: {uid}\n' ) @@ -430,23 +435,28 @@ async def handle_stream_from_peer( ) con_teardown_status += ( - f'-> Remaining IPC {len(actor._peers)} peers: {peers_str}\n' + f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n' ) # No more channels to other actors (at all) registered # as connected. - if not actor._peers: + if not server._peers: con_teardown_status += ( 'Signalling no more peer channel connections' ) - actor._no_more_peers.set() + server._no_more_peers.set() # NOTE: block this actor from acquiring the # debugger-TTY-lock since we have no way to know if we # cancelled it and further there is no way to ensure the # lock will be released if acquired due to having no # more active IPC channels. - if _state.is_root_process(): + if ( + _state.is_root_process() + and + _state.is_debug_mode() + ): + from ..devx import _debug pdb_lock = _debug.Lock pdb_lock._blocked.add(uid) @@ -581,8 +591,23 @@ class IPCEndpoint(Struct): class IPCServer(Struct): _parent_tn: Nursery _stream_handler_tn: Nursery + # level-triggered sig for whether "no peers are currently + # connected"; field is **always** set to an instance but + # initialized with `.is_set() == True`. + _no_more_peers: trio.Event + _endpoints: list[IPCEndpoint] = [] + # connection tracking & mgmt + _peers: defaultdict[ + str, # uaid + list[Channel], # IPC conns from peer + ] = defaultdict(list) + _peer_connected: dict[ + tuple[str, str], + trio.Event, + ] = {} + # syncs for setup/teardown sequences _shutdown: trio.Event|None = None @@ -644,6 +669,65 @@ class IPCServer(Struct): f'protos: {tpt_protos!r}\n' ) + def has_peers( + self, + check_chans: bool = False, + ) -> bool: + ''' + Predicate for "are there any active peer IPC `Channel`s at the moment?" + + ''' + has_peers: bool = not self._no_more_peers.is_set() + if ( + has_peers + and + check_chans + ): + has_peers: bool = ( + any(chan.connected() + for chan in chain( + *self._peers.values() + ) + ) + and + has_peers + ) + + return has_peers + + async def wait_for_no_more_peers( + self, + shield: bool = False, + ) -> None: + with trio.CancelScope(shield=shield): + await self._no_more_peers.wait() + + async def wait_for_peer( + self, + uid: tuple[str, str], + + ) -> tuple[trio.Event, Channel]: + ''' + Wait for a connection back from a (spawned sub-)actor with + a `uid` using a `trio.Event`. + + Returns a pair of the event and the "last" registered IPC + `Channel` for the peer with `uid`. + + ''' + log.debug(f'Waiting for peer {uid!r} to connect') + event: trio.Event = self._peer_connected.setdefault( + uid, + trio.Event(), + ) + await event.wait() + log.debug(f'{uid!r} successfully connected back to us') + mru_chan: Channel = self._peers[uid][-1] + return ( + event, + mru_chan, + ) + @property def addrs(self) -> list[Address]: return [ep.addr for ep in self._endpoints] @@ -672,17 +756,27 @@ class IPCServer(Struct): return ev.is_set() def pformat(self) -> str: + eps: list[IPCEndpoint] = self._endpoints - fmtstr: str = ( - f' |_endpoints: {self._endpoints}\n' + state_repr: str = ( + f'{len(eps)!r} IPC-endpoints active' + ) + fmtstr = ( + f' |_state: {state_repr}\n' + f' no_more_peers: {self.has_peers()}\n' ) if self._shutdown is not None: shutdown_stats: EventStatistics = self._shutdown.statistics() fmtstr += ( - f'\n' - f' |_shutdown: {shutdown_stats}\n' + f' task_waiting_on_shutdown: {shutdown_stats}\n' ) + fmtstr += ( + # TODO, use the `ppfmt()` helper from `modden`! + f' |_endpoints: {pformat(self._endpoints)}\n' + f' |_peers: {len(self._peers)} connected\n' + ) + return ( f'