Move peer-tracking attrs from `Actor` -> `IPCServer`

Namely transferring the `Actor` peer-`Channel` tracking attrs,
- `._peers` which maps the uids to client channels (with duplicates
  apparently..)
- the `._peer_connected: dict[tuple[str, str], trio.Event]` child-peer
  syncing table mostly used by parent actors to wait on sub's to connect
  back during spawn.
- the `._no_more_peers = trio.Event()` level triggered state signal.

Further we move over with some minor reworks,
- `.wait_for_peer()` verbatim (adjusting all dependants).
- factor the no-more-peers shielded wait branch-block out of
  the end of `async_main()` into 2 new server meths,
  * `.has_peers()` with optional chan-connected checking flag.
  * `.wait_for_no_more_peers()` which *just* does the
    maybe-shielded `._no_more_peers.wait()`
structural_dynamics_of_flow
Tyler Goodlet 2025-04-11 16:55:03 -04:00
parent 42cf9e11a4
commit 112ed27cda
7 changed files with 178 additions and 88 deletions

View File

@ -57,7 +57,11 @@ async def spawn(
) )
assert len(an._children) == 1 assert len(an._children) == 1
assert portal.channel.uid in tractor.current_actor()._peers assert (
portal.channel.uid
in
tractor.current_actor().ipc_server._peers
)
# get result from child subactor # get result from child subactor
result = await portal.result() result = await portal.result()

View File

@ -48,6 +48,7 @@ from ._state import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
from .ipc._server import IPCServer
log = get_logger(__name__) log = get_logger(__name__)
@ -79,7 +80,7 @@ async def get_registry(
) )
else: else:
# TODO: try to look pre-existing connection from # TODO: try to look pre-existing connection from
# `Actor._peers` and use it instead? # `IPCServer._peers` and use it instead?
async with ( async with (
_connect_chan(addr) as chan, _connect_chan(addr) as chan,
open_portal(chan) as regstr_ptl, open_portal(chan) as regstr_ptl,
@ -111,14 +112,15 @@ def get_peer_by_name(
) -> list[Channel]|None: # at least 1 ) -> list[Channel]|None: # at least 1
''' '''
Scan for an existing connection (set) to a named actor Scan for an existing connection (set) to a named actor
and return any channels from `Actor._peers`. and return any channels from `IPCServer._peers: dict`.
This is an optimization method over querying the registrar for This is an optimization method over querying the registrar for
the same info. the same info.
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
to_scan: dict[tuple, list[Channel]] = actor._peers.copy() server: IPCServer = actor.ipc_server
to_scan: dict[tuple, list[Channel]] = server._peers.copy()
pchan: Channel|None = actor._parent_chan pchan: Channel|None = actor._parent_chan
if pchan: if pchan:
to_scan[pchan.uid].append(pchan) to_scan[pchan.uid].append(pchan)

View File

@ -40,9 +40,7 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
ExitStack, ExitStack,
) )
from collections import defaultdict
from functools import partial from functools import partial
from itertools import chain
import importlib import importlib
import importlib.util import importlib.util
import os import os
@ -76,6 +74,7 @@ from tractor.msg import (
) )
from .ipc import ( from .ipc import (
Channel, Channel,
# IPCServer, # causes cycles atm..
_server, _server,
) )
from ._addr import ( from ._addr import (
@ -156,7 +155,6 @@ class Actor:
_root_n: Nursery|None = None _root_n: Nursery|None = None
_service_n: Nursery|None = None _service_n: Nursery|None = None
# XXX moving to IPCServer!
_ipc_server: _server.IPCServer|None = None _ipc_server: _server.IPCServer|None = None
@property @property
@ -246,14 +244,6 @@ class Actor:
# by the user (currently called the "arbiter") # by the user (currently called the "arbiter")
self._spawn_method: str = spawn_method self._spawn_method: str = spawn_method
self._peers: defaultdict[
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event()
self._no_more_peers.set()
# RPC state # RPC state
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
self._ongoing_rpc_tasks.set() self._ongoing_rpc_tasks.set()
@ -338,7 +328,12 @@ class Actor:
parent_uid: tuple|None = None parent_uid: tuple|None = None
if rent_chan := self._parent_chan: if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid parent_uid = rent_chan.uid
peers: list[tuple] = list(self._peer_connected)
peers: list = []
server: _server.IPCServer = self.ipc_server
if server:
peers: list[tuple] = list(server._peer_connected)
fmtstr: str = ( fmtstr: str = (
f' |_id: {self.aid!r}\n' f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n" # f" aid{ds}{self.aid!r}\n"
@ -394,25 +389,6 @@ class Actor:
self._reg_addrs = addrs self._reg_addrs = addrs
async def wait_for_peer(
self,
uid: tuple[str, str],
) -> tuple[trio.Event, Channel]:
'''
Wait for a connection back from a (spawned sub-)actor with
a `uid` using a `trio.Event` for sync.
'''
log.debug(f'Waiting for peer {uid!r} to connect')
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.debug(f'{uid!r} successfully connected back to us')
return (
event,
self._peers[uid][-1],
)
def load_modules( def load_modules(
self, self,
# debug_mode: bool = False, # debug_mode: bool = False,
@ -724,7 +700,7 @@ class Actor:
) )
assert isinstance(chan, Channel) assert isinstance(chan, Channel)
# Initial handshake: swap names. # init handshake: swap actor-IDs.
await chan._do_handshake(aid=self.aid) await chan._do_handshake(aid=self.aid)
accept_addrs: list[UnwrappedAddress]|None = None accept_addrs: list[UnwrappedAddress]|None = None
@ -1620,16 +1596,18 @@ async def async_main(
) )
# Ensure all peers (actors connected to us as clients) are finished # Ensure all peers (actors connected to us as clients) are finished
if not actor._no_more_peers.is_set(): if (
if any( (ipc_server := actor.ipc_server)
chan.connected() for chan in chain(*actor._peers.values()) and
): ipc_server.has_peers(check_chans=True)
teardown_report += ( ):
f'-> Waiting for remaining peers {actor._peers} to clear..\n' teardown_report += (
) f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
log.runtime(teardown_report) )
with CancelScope(shield=True): log.runtime(teardown_report)
await actor._no_more_peers.wait() await ipc_server.wait_for_no_more_peers(
shield=True,
)
teardown_report += ( teardown_report += (
'-> All peer channels are complete\n' '-> All peer channels are complete\n'

View File

@ -58,9 +58,11 @@ from tractor.msg.types import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ipc import IPCServer
from ._supervise import ActorNursery from ._supervise import ActorNursery
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
log = get_logger('tractor') log = get_logger('tractor')
# placeholder for an mp start context if so using that backend # placeholder for an mp start context if so using that backend
@ -481,6 +483,7 @@ async def trio_proc(
cancelled_during_spawn: bool = False cancelled_during_spawn: bool = False
proc: trio.Process|None = None proc: trio.Process|None = None
ipc_server: IPCServer = actor_nursery._actor.ipc_server
try: try:
try: try:
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
@ -492,7 +495,7 @@ async def trio_proc(
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await ipc_server.wait_for_peer(
subactor.uid subactor.uid
) )
@ -724,11 +727,12 @@ async def mp_proc(
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
ipc_server: IPCServer = actor_nursery._actor.ipc_server
try: try:
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await ipc_server.wait_for_peer(
subactor.uid, subactor.uid,
) )

View File

@ -53,6 +53,9 @@ from . import _spawn
if TYPE_CHECKING: if TYPE_CHECKING:
import multiprocessing as mp import multiprocessing as mp
# from .ipc._server import IPCServer
from .ipc import IPCServer
log = get_logger(__name__) log = get_logger(__name__)
@ -315,6 +318,9 @@ class ActorNursery:
children: dict = self._children children: dict = self._children
child_count: int = len(children) child_count: int = len(children)
msg: str = f'Cancelling actor nursery with {child_count} children\n' msg: str = f'Cancelling actor nursery with {child_count} children\n'
server: IPCServer = self._actor.ipc_server
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery( async with trio.open_nursery(
strict_exception_groups=False, strict_exception_groups=False,
@ -337,7 +343,7 @@ class ActorNursery:
else: else:
if portal is None: # actor hasn't fully spawned yet if portal is None: # actor hasn't fully spawned yet
event = self._actor._peer_connected[subactor.uid] event: trio.Event = server._peer_connected[subactor.uid]
log.warning( log.warning(
f"{subactor.uid} never 't finished spawning?" f"{subactor.uid} never 't finished spawning?"
) )
@ -353,7 +359,7 @@ class ActorNursery:
if portal is None: if portal is None:
# cancelled while waiting on the event # cancelled while waiting on the event
# to arrive # to arrive
chan = self._actor._peers[subactor.uid][-1] chan = server._peers[subactor.uid][-1]
if chan: if chan:
portal = Portal(chan) portal = Portal(chan)
else: # there's no other choice left else: # there's no other choice left

View File

@ -92,7 +92,11 @@ from tractor._state import (
if TYPE_CHECKING: if TYPE_CHECKING:
from trio.lowlevel import Task from trio.lowlevel import Task
from threading import Thread from threading import Thread
from tractor.ipc import Channel from tractor.ipc import (
Channel,
IPCServer,
# _server, # TODO? export at top level?
)
from tractor._runtime import ( from tractor._runtime import (
Actor, Actor,
) )
@ -1434,6 +1438,7 @@ def any_connected_locker_child() -> bool:
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
server: IPCServer = actor.ipc_server
if not is_root_process(): if not is_root_process():
raise InternalError('This is a root-actor only API!') raise InternalError('This is a root-actor only API!')
@ -1443,7 +1448,7 @@ def any_connected_locker_child() -> bool:
and and
(uid_in_debug := ctx.chan.uid) (uid_in_debug := ctx.chan.uid)
): ):
chans: list[tractor.Channel] = actor._peers.get( chans: list[tractor.Channel] = server._peers.get(
tuple(uid_in_debug) tuple(uid_in_debug)
) )
if chans: if chans:

View File

@ -19,10 +19,12 @@ multi-transport-protcol needs!
''' '''
from __future__ import annotations from __future__ import annotations
from collections import defaultdict
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from functools import partial from functools import partial
from itertools import chain
import inspect import inspect
from pprint import pformat from pprint import pformat
from types import ( from types import (
@ -41,7 +43,7 @@ from trio import (
SocketListener, SocketListener,
) )
from ..devx import _debug # from ..devx import _debug
from .._exceptions import ( from .._exceptions import (
TransportClosed, TransportClosed,
) )
@ -82,6 +84,9 @@ log = log.get_logger(__name__)
# #
async def handle_stream_from_peer( async def handle_stream_from_peer(
stream: trio.SocketStream, stream: trio.SocketStream,
*,
server: IPCServer,
actor: Actor, actor: Actor,
) -> None: ) -> None:
@ -99,7 +104,7 @@ async def handle_stream_from_peer(
) )
''' '''
actor._no_more_peers = trio.Event() # unset by making new server._no_more_peers = trio.Event() # unset by making new
# TODO, debug_mode tooling for when hackin this lower layer? # TODO, debug_mode tooling for when hackin this lower layer?
# with _debug.maybe_open_crash_handler( # with _debug.maybe_open_crash_handler(
@ -152,7 +157,7 @@ async def handle_stream_from_peer(
# TODO, can we make this downstream peer tracking use the # TODO, can we make this downstream peer tracking use the
# `peer_aid` instead? # `peer_aid` instead?
familiar: str = 'new-peer' familiar: str = 'new-peer'
if _pre_chan := actor._peers.get(uid): if _pre_chan := server._peers.get(uid):
familiar: str = 'pre-existing-peer' familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]' uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += ( con_status += (
@ -175,7 +180,7 @@ async def handle_stream_from_peer(
# sub-actor there will be a spawn wait even registered # sub-actor there will be a spawn wait even registered
# by a call to `.wait_for_peer()`. # by a call to `.wait_for_peer()`.
# - if a peer is connecting no such event will exit. # - if a peer is connecting no such event will exit.
event: trio.Event|None = actor._peer_connected.pop( event: trio.Event|None = server._peer_connected.pop(
uid, uid,
None, None,
) )
@ -195,7 +200,7 @@ async def handle_stream_from_peer(
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n' f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore ) # type: ignore
chans: list[Channel] = actor._peers[uid] chans: list[Channel] = server._peers[uid]
# if chans: # if chans:
# # TODO: re-use channels for new connections instead # # TODO: re-use channels for new connections instead
# # of always new ones? # # of always new ones?
@ -417,10 +422,10 @@ async def handle_stream_from_peer(
con_teardown_status += ( con_teardown_status += (
f'-> No more channels with {chan.uid}' f'-> No more channels with {chan.uid}'
) )
actor._peers.pop(uid, None) server._peers.pop(uid, None)
peers_str: str = '' peers_str: str = ''
for uid, chans in actor._peers.items(): for uid, chans in server._peers.items():
peers_str += ( peers_str += (
f'uid: {uid}\n' f'uid: {uid}\n'
) )
@ -430,23 +435,28 @@ async def handle_stream_from_peer(
) )
con_teardown_status += ( con_teardown_status += (
f'-> Remaining IPC {len(actor._peers)} peers: {peers_str}\n' f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n'
) )
# No more channels to other actors (at all) registered # No more channels to other actors (at all) registered
# as connected. # as connected.
if not actor._peers: if not server._peers:
con_teardown_status += ( con_teardown_status += (
'Signalling no more peer channel connections' 'Signalling no more peer channel connections'
) )
actor._no_more_peers.set() server._no_more_peers.set()
# NOTE: block this actor from acquiring the # NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we # debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the # cancelled it and further there is no way to ensure the
# lock will be released if acquired due to having no # lock will be released if acquired due to having no
# more active IPC channels. # more active IPC channels.
if _state.is_root_process(): if (
_state.is_root_process()
and
_state.is_debug_mode()
):
from ..devx import _debug
pdb_lock = _debug.Lock pdb_lock = _debug.Lock
pdb_lock._blocked.add(uid) pdb_lock._blocked.add(uid)
@ -581,8 +591,23 @@ class IPCEndpoint(Struct):
class IPCServer(Struct): class IPCServer(Struct):
_parent_tn: Nursery _parent_tn: Nursery
_stream_handler_tn: Nursery _stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently
# connected"; field is **always** set to an instance but
# initialized with `.is_set() == True`.
_no_more_peers: trio.Event
_endpoints: list[IPCEndpoint] = [] _endpoints: list[IPCEndpoint] = []
# connection tracking & mgmt
_peers: defaultdict[
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
_peer_connected: dict[
tuple[str, str],
trio.Event,
] = {}
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
_shutdown: trio.Event|None = None _shutdown: trio.Event|None = None
@ -644,6 +669,65 @@ class IPCServer(Struct):
f'protos: {tpt_protos!r}\n' f'protos: {tpt_protos!r}\n'
) )
def has_peers(
self,
check_chans: bool = False,
) -> bool:
'''
Predicate for "are there any active peer IPC `Channel`s at the moment?"
'''
has_peers: bool = not self._no_more_peers.is_set()
if (
has_peers
and
check_chans
):
has_peers: bool = (
any(chan.connected()
for chan in chain(
*self._peers.values()
)
)
and
has_peers
)
return has_peers
async def wait_for_no_more_peers(
self,
shield: bool = False,
) -> None:
with trio.CancelScope(shield=shield):
await self._no_more_peers.wait()
async def wait_for_peer(
self,
uid: tuple[str, str],
) -> tuple[trio.Event, Channel]:
'''
Wait for a connection back from a (spawned sub-)actor with
a `uid` using a `trio.Event`.
Returns a pair of the event and the "last" registered IPC
`Channel` for the peer with `uid`.
'''
log.debug(f'Waiting for peer {uid!r} to connect')
event: trio.Event = self._peer_connected.setdefault(
uid,
trio.Event(),
)
await event.wait()
log.debug(f'{uid!r} successfully connected back to us')
mru_chan: Channel = self._peers[uid][-1]
return (
event,
mru_chan,
)
@property @property
def addrs(self) -> list[Address]: def addrs(self) -> list[Address]:
return [ep.addr for ep in self._endpoints] return [ep.addr for ep in self._endpoints]
@ -672,17 +756,27 @@ class IPCServer(Struct):
return ev.is_set() return ev.is_set()
def pformat(self) -> str: def pformat(self) -> str:
eps: list[IPCEndpoint] = self._endpoints
fmtstr: str = ( state_repr: str = (
f' |_endpoints: {self._endpoints}\n' f'{len(eps)!r} IPC-endpoints active'
)
fmtstr = (
f' |_state: {state_repr}\n'
f' no_more_peers: {self.has_peers()}\n'
) )
if self._shutdown is not None: if self._shutdown is not None:
shutdown_stats: EventStatistics = self._shutdown.statistics() shutdown_stats: EventStatistics = self._shutdown.statistics()
fmtstr += ( fmtstr += (
f'\n' f' task_waiting_on_shutdown: {shutdown_stats}\n'
f' |_shutdown: {shutdown_stats}\n'
) )
fmtstr += (
# TODO, use the `ppfmt()` helper from `modden`!
f' |_endpoints: {pformat(self._endpoints)}\n'
f' |_peers: {len(self._peers)} connected\n'
)
return ( return (
f'<IPCServer(\n' f'<IPCServer(\n'
f'{fmtstr}' f'{fmtstr}'
@ -842,6 +936,7 @@ async def _serve_ipc_eps(
trio.serve_listeners, trio.serve_listeners,
handler=partial( handler=partial(
handle_stream_from_peer, handle_stream_from_peer,
server=server,
actor=actor, actor=actor,
), ),
listeners=listeners, listeners=listeners,
@ -894,20 +989,28 @@ async def open_ipc_server(
async with maybe_open_nursery( async with maybe_open_nursery(
nursery=parent_tn, nursery=parent_tn,
) as rent_tn: ) as rent_tn:
no_more_peers = trio.Event()
no_more_peers.set()
ipc_server = IPCServer( ipc_server = IPCServer(
_parent_tn=rent_tn, _parent_tn=rent_tn,
_stream_handler_tn=stream_handler_tn or rent_tn, _stream_handler_tn=stream_handler_tn or rent_tn,
_no_more_peers=no_more_peers,
) )
try: try:
yield ipc_server yield ipc_server
log.runtime(
f'Waiting on server to shutdown or be cancelled..\n'
f'{ipc_server}'
)
# TODO? when if ever would we want/need this?
# with trio.CancelScope(shield=True):
# await ipc_server.wait_for_shutdown()
# except BaseException as berr: except BaseException as berr:
# log.exception( log.exception(
# 'IPC server crashed on exit ?' 'IPC server caller crashed ??'
# ) )
# raise berr
finally:
# ?TODO, maybe we can ensure the endpoints are torndown # ?TODO, maybe we can ensure the endpoints are torndown
# (and thus their managed listeners) beforehand to ensure # (and thus their managed listeners) beforehand to ensure
# super graceful RPC mechanics? # super graceful RPC mechanics?
@ -915,17 +1018,5 @@ async def open_ipc_server(
# -[ ] but aren't we doing that already per-`listen_tn` # -[ ] but aren't we doing that already per-`listen_tn`
# inside `_serve_ipc_eps()` above? # inside `_serve_ipc_eps()` above?
# #
# if not ipc_server.is_shutdown(): # ipc_server.cancel()
# ipc_server.cancel() raise berr
# await ipc_server.wait_for_shutdown()
# assert ipc_server.is_shutdown()
pass
# !XXX TODO! lol so classic, the below code is rekt!
#
# XXX here is a perfect example of suppressing errors with
# `trio.Cancelled` as per our demonstrating example,
# `test_trioisms::test_acm_embedded_nursery_propagates_enter_err
#
# with trio.CancelScope(shield=True):
# await ipc_server.wait_for_shutdown()