Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 1c73c0c0ee Start a very basic ipc-server unit test suite
For now it just boots a server, parametrized over all tpt-protos, sin
any actor runtime bootup. Obvi the future todo is ensuring it all works
with a client connecting via the equivalent lowlevel
`.ipc._chan._connect_chan()` API(s).
2025-07-08 12:59:22 -04:00
Tyler Goodlet 101cd94e89 Decouple actor-state from low-level ipc-server
As much as is possible given we currently do some graceful
cancellation join-waiting on any connected sub-actors whenever an active
`local_nursery: AcrtorNursery` in the post-rpc teardown sequence of
`handle_stream_from_peer()` is detected. In such cases we try to allow
the higher level inter-actor (task) context(s) to fully cancelled-ack
before conducting IPC machinery shutdown.

The main immediate motivation for all this is to support unit testing
the `.ipc._server` APIs but in the future may be useful for anyone
wanting to use our modular IPC transport layer sin-"actors".

Impl deats,
- drop passing an `actor: Actor` ref from as many routines in
  `.ipc._server` as possible instead opting to use
  `._state.current_actor()` where abs needed; thus the fns dropping an
  `actor` input param are:
  - `open_ipc_server()`
  - `IPCServer.listen_on()`
  - `._serve_ipc_eps()`
  - `.handle_stream_from_peer()`
- factor the above mentioned graceful remote-cancel-ack waiting into
  a new `maybe_wait_on_canced_subs()` which is called from
  `handle_stream_from_peer()` and delivers a
  maybe-`local_nursery: ActorNursery` for downstream logic; it's this
  new fn which primarily still needs to call `current_actor()`.
- in `handle_stream_from_peer()` also use `current_actor()` to check if
  a handshake is needed (or if it was called as part of some
  actor-runtime-less operation like our unit test suite!).
- also don't pass an `actor` to `._rpc.process_messages()` see how-n-why
  below..

Surrounding ipc-server client/caller adjustments,
- `._rpc.process_messages()` no longer takes an `actor` input and
  now calls `current_actor()` instead.
- `._portal.open_portal()` is adjusted to ^.
- `._runtime.async_main()` is adjusted to the `.ipc._server`'s removal
  of `actor` ref passing.

Also,
- drop some server `log.info()`s to `.runtime()`
2025-07-08 12:59:22 -04:00
Tyler Goodlet 3f33ba1cc0 Log listener bind status for TCP as for UDS 2025-07-08 12:59:22 -04:00
Tyler Goodlet 70f5315506 Move peer-tracking attrs from `Actor` -> `IPCServer`
Namely transferring the `Actor` peer-`Channel` tracking attrs,
- `._peers` which maps the uids to client channels (with duplicates
  apparently..)
- the `._peer_connected: dict[tuple[str, str], trio.Event]` child-peer
  syncing table mostly used by parent actors to wait on sub's to connect
  back during spawn.
- the `._no_more_peers = trio.Event()` level triggered state signal.

Further we move over with some minor reworks,
- `.wait_for_peer()` verbatim (adjusting all dependants).
- factor the no-more-peers shielded wait branch-block out of
  the end of `async_main()` into 2 new server meths,
  * `.has_peers()` with optional chan-connected checking flag.
  * `.wait_for_no_more_peers()` which *just* does the
    maybe-shielded `._no_more_peers.wait()`
2025-07-08 12:59:19 -04:00
Tyler Goodlet 496fac04bb Mv `Actor._stream_handler()` to `.ipc._server` func
Call it `handle_stream_from_peer()` and bind in the `actor: Actor` via
a `handler=partial()` to `trio.serve_listeners()`.

With this (minus the `Actor._peers/._peer_connected/._no_more_peers`
attrs ofc) we get nearly full separation of IPC-connection-processing
(concerns) from `Actor` state. Thus it's a first look at modularizing
the low-level runtime into isolated subsystems which will hopefully
improve the entire code base's grok-ability and ease any new feature
design discussions especially pertaining to introducing and/or
composing-together any new transport protocols.
2025-07-08 12:57:29 -04:00
Tyler Goodlet 02baeb6a8b Passthrough `_pause()` kwargs from `_maybe_enter_pm()` 2025-07-08 12:57:29 -04:00
Tyler Goodlet d4ab802e14 Fix assert on `.devx.maybe_open_crash_handler()` delivered `bxerr` 2025-07-08 12:57:29 -04:00
Tyler Goodlet fdeaeef9f7 Improve bit of tooling for `test_resource_cache.py`
Namely while what I was actually trying to solve was why
`TransportClosed` was getting raised from `Portal.cancel_actor()` but
still useful edge case auditing either way. Also opts into the
`debug_mode` fixture with apprope timeout adjustment B)
2025-07-08 12:57:29 -04:00
Tyler Goodlet 41609d1433 Never hide non-[msgtype/tpt-closed] error tbs in `Channel.send()` 2025-07-08 12:57:29 -04:00
Tyler Goodlet c9068522ed Set `_state._def_tpt_proto` in `tpt_proto` fixture
Such that the global test-session always (and only) runs against the CLI
specified `--tpt-proto=` transport protocol.
2025-07-08 12:57:29 -04:00
16 changed files with 867 additions and 562 deletions

View File

@ -138,11 +138,19 @@ def tpt_protos(request) -> list[str]:
yield proto_keys yield proto_keys
@pytest.fixture(scope='session') @pytest.fixture(
scope='session',
autouse=True,
)
def tpt_proto( def tpt_proto(
tpt_protos: list[str], tpt_protos: list[str],
) -> str: ) -> str:
yield tpt_protos[0] proto_key: str = tpt_protos[0]
from tractor import _state
if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key
# breakpoint()
yield proto_key
_ci_env: bool = os.environ.get('CI', False) _ci_env: bool = os.environ.get('CI', False)

View File

@ -0,0 +1,4 @@
'''
`tractor.ipc` subsystem(s)/unit testing suites.
'''

View File

@ -0,0 +1,72 @@
'''
High-level `.ipc._server` unit tests.
'''
from __future__ import annotations
import pytest
import trio
from tractor import (
devx,
ipc,
log,
)
from tractor._testing.addr import (
get_rando_addr,
)
# TODO, use/check-roundtripping with some of these wrapper types?
#
# from .._addr import Address
# from ._chan import Channel
# from ._transport import MsgTransport
# from ._uds import UDSAddress
# from ._tcp import TCPAddress
@pytest.mark.parametrize(
'_tpt_proto',
['uds', 'tcp']
)
def test_basic_ipc_server(
_tpt_proto: str,
debug_mode: bool,
loglevel: str,
):
# so we see the socket-listener reporting on console
log.get_console_log("INFO")
rando_addr: tuple = get_rando_addr(
tpt_proto=_tpt_proto,
)
async def main():
async with ipc._server.open_ipc_server() as server:
assert (
server._parent_tn
and
server._parent_tn is server._stream_handler_tn
)
assert server._no_more_peers.is_set()
eps: list[ipc.IPCEndpoint] = await server.listen_on(
accept_addrs=[rando_addr],
stream_handler_nursery=None,
)
assert (
len(eps) == 1
and
(ep := eps[0])._listener
and
not ep.peer_tpts
)
server._parent_tn.cancel_scope.cancel()
# !TODO! actually make a bg-task connection from a client
# using `ipc._chan._connect_chan()`
with devx.maybe_open_crash_handler(
pdb=debug_mode,
):
trio.run(main)

View File

@ -100,16 +100,29 @@ async def streamer(
@acm @acm
async def open_stream() -> Awaitable[tractor.MsgStream]: async def open_stream() -> Awaitable[tractor.MsgStream]:
async with tractor.open_nursery() as tn: try:
portal = await tn.start_actor('streamer', enable_modules=[__name__]) async with tractor.open_nursery() as an:
portal = await an.start_actor(
'streamer',
enable_modules=[__name__],
)
async with ( async with (
portal.open_context(streamer) as (ctx, first), portal.open_context(streamer) as (ctx, first),
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
yield stream yield stream
print('Cancelling streamer')
await portal.cancel_actor() await portal.cancel_actor()
print('CANCELLED STREAMER') print('Cancelled streamer')
except Exception as err:
print(
f'`open_stream()` errored?\n'
f'{err!r}\n'
)
await tractor.pause(shield=True)
raise err
@acm @acm
@ -132,19 +145,28 @@ async def maybe_open_stream(taskname: str):
yield stream yield stream
def test_open_local_sub_to_stream(): def test_open_local_sub_to_stream(
debug_mode: bool,
):
''' '''
Verify a single inter-actor stream can can be fanned-out shared to Verify a single inter-actor stream can can be fanned-out shared to
N local tasks using ``trionics.maybe_open_context():``. N local tasks using `trionics.maybe_open_context()`.
''' '''
timeout: float = 3.6 if platform.system() != "Windows" else 10 timeout: float = 3.6
if platform.system() == "Windows":
timeout: float = 10
if debug_mode:
timeout = 999
async def main(): async def main():
full = list(range(1000)) full = list(range(1000))
async def get_sub_and_pull(taskname: str): async def get_sub_and_pull(taskname: str):
stream: tractor.MsgStream
async with ( async with (
maybe_open_stream(taskname) as stream, maybe_open_stream(taskname) as stream,
): ):
@ -165,17 +187,27 @@ def test_open_local_sub_to_stream():
assert set(seq).issubset(set(full)) assert set(seq).issubset(set(full))
print(f'{taskname} finished') print(f'{taskname} finished')
with trio.fail_after(timeout): with trio.fail_after(timeout) as cs:
# TODO: turns out this isn't multi-task entrant XD # TODO: turns out this isn't multi-task entrant XD
# We probably need an indepotent entry semantic? # We probably need an indepotent entry semantic?
async with tractor.open_root_actor(): async with tractor.open_root_actor(
debug_mode=debug_mode,
):
async with ( async with (
trio.open_nursery() as nurse, trio.open_nursery() as tn,
): ):
for i in range(10): for i in range(10):
nurse.start_soon(get_sub_and_pull, f'task_{i}') tn.start_soon(
get_sub_and_pull,
f'task_{i}',
)
await trio.sleep(0.001) await trio.sleep(0.001)
print('all consumer tasks finished') print('all consumer tasks finished')
if cs.cancelled_caught:
pytest.fail(
'Should NOT time out in `open_root_actor()` ?'
)
trio.run(main) trio.run(main)

View File

@ -57,7 +57,11 @@ async def spawn(
) )
assert len(an._children) == 1 assert len(an._children) == 1
assert portal.channel.uid in tractor.current_actor()._peers assert (
portal.channel.uid
in
tractor.current_actor().ipc_server._peers
)
# get result from child subactor # get result from child subactor
result = await portal.result() result = await portal.result()

View File

@ -180,6 +180,7 @@ def test_acm_embedded_nursery_propagates_enter_err(
with tractor.devx.maybe_open_crash_handler( with tractor.devx.maybe_open_crash_handler(
pdb=debug_mode, pdb=debug_mode,
) as bxerr: ) as bxerr:
if bxerr:
assert not bxerr.value assert not bxerr.value
async with ( async with (

View File

@ -48,6 +48,7 @@ from ._state import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor from ._runtime import Actor
from .ipc._server import IPCServer
log = get_logger(__name__) log = get_logger(__name__)
@ -79,7 +80,7 @@ async def get_registry(
) )
else: else:
# TODO: try to look pre-existing connection from # TODO: try to look pre-existing connection from
# `Actor._peers` and use it instead? # `IPCServer._peers` and use it instead?
async with ( async with (
_connect_chan(addr) as chan, _connect_chan(addr) as chan,
open_portal(chan) as regstr_ptl, open_portal(chan) as regstr_ptl,
@ -111,7 +112,7 @@ def get_peer_by_name(
) -> list[Channel]|None: # at least 1 ) -> list[Channel]|None: # at least 1
''' '''
Scan for an existing connection (set) to a named actor Scan for an existing connection (set) to a named actor
and return any channels from `Actor._peers`. and return any channels from `IPCServer._peers: dict`.
This is an optimization method over querying the registrar for This is an optimization method over querying the registrar for
the same info. the same info.

View File

@ -578,12 +578,11 @@ async def open_portal(
msg_loop_cs: trio.CancelScope|None = None msg_loop_cs: trio.CancelScope|None = None
if start_msg_loop: if start_msg_loop:
from ._runtime import process_messages from . import _rpc
msg_loop_cs = await tn.start( msg_loop_cs = await tn.start(
partial( partial(
process_messages, _rpc.process_messages,
actor, chan=channel,
channel,
# if the local task is cancelled we want to keep # if the local task is cancelled we want to keep
# the msg loop running until our block ends # the msg loop running until our block ends
shield=True, shield=True,

View File

@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
async def process_messages( async def process_messages(
actor: Actor,
chan: Channel, chan: Channel,
shield: bool = False, shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
@ -907,6 +906,7 @@ async def process_messages(
(as utilized inside `Portal.cancel_actor()` ). (as utilized inside `Portal.cancel_actor()` ).
''' '''
actor: Actor = _state.current_actor()
assert actor._service_n # runtime state sanity assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we

View File

@ -40,9 +40,7 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
ExitStack, ExitStack,
) )
from collections import defaultdict
from functools import partial from functools import partial
from itertools import chain
import importlib import importlib
import importlib.util import importlib.util
import os import os
@ -76,6 +74,7 @@ from tractor.msg import (
) )
from .ipc import ( from .ipc import (
Channel, Channel,
# IPCServer, # causes cycles atm..
_server, _server,
) )
from ._addr import ( from ._addr import (
@ -96,18 +95,13 @@ from ._exceptions import (
ModuleNotExposed, ModuleNotExposed,
MsgTypeError, MsgTypeError,
unpack_error, unpack_error,
TransportClosed,
) )
from .devx import _debug from .devx import _debug
from ._discovery import get_registry from ._discovery import get_registry
from ._portal import Portal from ._portal import Portal
from . import _state from . import _state
from . import _mp_fixup_main from . import _mp_fixup_main
from ._rpc import ( from . import _rpc
process_messages,
try_ship_error_to_remote,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ._supervise import ActorNursery from ._supervise import ActorNursery
@ -161,7 +155,6 @@ class Actor:
_root_n: Nursery|None = None _root_n: Nursery|None = None
_service_n: Nursery|None = None _service_n: Nursery|None = None
# XXX moving to IPCServer!
_ipc_server: _server.IPCServer|None = None _ipc_server: _server.IPCServer|None = None
@property @property
@ -251,14 +244,6 @@ class Actor:
# by the user (currently called the "arbiter") # by the user (currently called the "arbiter")
self._spawn_method: str = spawn_method self._spawn_method: str = spawn_method
self._peers: defaultdict[
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
self._no_more_peers = trio.Event()
self._no_more_peers.set()
# RPC state # RPC state
self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks = trio.Event()
self._ongoing_rpc_tasks.set() self._ongoing_rpc_tasks.set()
@ -343,7 +328,12 @@ class Actor:
parent_uid: tuple|None = None parent_uid: tuple|None = None
if rent_chan := self._parent_chan: if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid parent_uid = rent_chan.uid
peers: list[tuple] = list(self._peer_connected)
peers: list = []
server: _server.IPCServer = self.ipc_server
if server:
peers: list[tuple] = list(server._peer_connected)
fmtstr: str = ( fmtstr: str = (
f' |_id: {self.aid!r}\n' f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n" # f" aid{ds}{self.aid!r}\n"
@ -399,25 +389,6 @@ class Actor:
self._reg_addrs = addrs self._reg_addrs = addrs
async def wait_for_peer(
self,
uid: tuple[str, str],
) -> tuple[trio.Event, Channel]:
'''
Wait for a connection back from a (spawned sub-)actor with
a `uid` using a `trio.Event` for sync.
'''
log.debug(f'Waiting for peer {uid!r} to connect')
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.debug(f'{uid!r} successfully connected back to us')
return (
event,
self._peers[uid][-1],
)
def load_modules( def load_modules(
self, self,
# debug_mode: bool = False, # debug_mode: bool = False,
@ -493,434 +464,6 @@ class Actor:
raise mne raise mne
# TODO: maybe change to mod-func and rename for implied
# multi-transport semantics?
async def _stream_handler(
self,
stream: trio.SocketStream,
) -> None:
'''
Entry point for new inbound IPC connections on a specific
transport server.
'''
self._no_more_peers = trio.Event() # unset by making new
# with _debug.maybe_open_crash_handler(
# pdb=True,
# ) as boxerr:
chan = Channel.from_stream(stream)
con_status: str = (
'New inbound IPC connection <=\n'
f'|_{chan}\n'
)
# send/receive initial handshake response
try:
peer_aid: msgtypes.Aid = await chan._do_handshake(
aid=self.aid,
)
except (
TransportClosed,
# ^XXX NOTE, the above wraps `trio` exc types raised
# during various `SocketStream.send/receive_xx()` calls
# under different fault conditions such as,
#
# trio.BrokenResourceError,
# trio.ClosedResourceError,
#
# Inside our `.ipc._transport` layer we absorb and
# re-raise our own `TransportClosed` exc such that this
# higher level runtime code can only worry one
# "kinda-error" that we expect to tolerate during
# discovery-sys related pings, queires, DoS etc.
):
# XXX: This may propagate up from `Channel._aiter_recv()`
# and `MsgpackStream._inter_packets()` on a read from the
# stream particularly when the runtime is first starting up
# inside `open_root_actor()` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.runtime(
con_status
+
' -> But failed to handshake? Ignoring..\n'
)
return
uid: tuple[str, str] = (
peer_aid.name,
peer_aid.uuid,
)
# TODO, can we make this downstream peer tracking use the
# `peer_aid` instead?
familiar: str = 'new-peer'
if _pre_chan := self._peers.get(uid):
familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += (
f' -> Handshake with {familiar} `{uid_short}` complete\n'
)
if _pre_chan:
# con_status += (
# ^TODO^ swap once we minimize conn duplication
# -[ ] last thing might be reg/unreg runtime reqs?
# log.warning(
log.debug(
f'?Wait?\n'
f'We already have IPC with peer {uid_short!r}\n'
f'|_{_pre_chan}\n'
)
# IPC connection tracking for both peers and new children:
# - if this is a new channel to a locally spawned
# sub-actor there will be a spawn wait even registered
# by a call to `.wait_for_peer()`.
# - if a peer is connecting no such event will exit.
event: trio.Event|None = self._peer_connected.pop(
uid,
None,
)
if event:
con_status += (
' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
# f' {event}\n'
# f' |{event.statistics()}\n'
)
# wake tasks waiting on this IPC-transport "connect-back"
event.set()
else:
con_status += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore
chans: list[Channel] = self._peers[uid]
# if chans:
# # TODO: re-use channels for new connections instead
# # of always new ones?
# # => will require changing all the discovery funcs..
# append new channel
# TODO: can we just use list-ref directly?
chans.append(chan)
con_status += ' -> Entering RPC msg loop..\n'
log.runtime(con_status)
# Begin channel management - respond to remote requests and
# process received reponses.
disconnected: bool = False
last_msg: MsgType
try:
(
disconnected,
last_msg,
) = await process_messages(
self,
chan,
)
except trio.Cancelled:
log.cancel(
'IPC transport msg loop was cancelled\n'
f'c)>\n'
f' |_{chan}\n'
)
raise
finally:
local_nursery: (
ActorNursery|None
) = self._actoruid2nursery.get(uid)
# This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
if (
local_nursery
and (
self._cancel_called
or
chan._cancel_called
)
#
# ^-TODO-^ along with this is there another condition
# that we should filter with to avoid entering this
# waiting block needlessly?
# -[ ] maybe `and local_nursery.cancelled` and/or
# only if the `._children` table is empty or has
# only `Portal`s with .chan._cancel_called ==
# True` as per what we had below; the MAIN DIFF
# BEING that just bc one `Portal.cancel_actor()`
# was called, doesn't mean the whole actor-nurse
# is gonna exit any time soon right!?
#
# or
# all(chan._cancel_called for chan in chans)
):
log.cancel(
'Waiting on cancel request to peer..\n'
f'c)=>\n'
f' |_{chan.uid}\n'
)
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
# which is mapped to a sub-actor (i.e. it's managed
# by local actor-nursery) has a message that is sent
# to the peer likely by this actor (which may be in
# a shutdown sequence due to cancellation) when the
# local runtime here is now cancelled while
# (presumably) in the middle of msg loop processing.
chan_info: str = (
f'{chan.uid}\n'
f'|_{chan}\n'
f' |_{chan.transport}\n\n'
)
with trio.move_on_after(0.5) as drain_cs:
drain_cs.shield = True
# attempt to wait for the far end to close the
# channel and bail after timeout (a 2-generals
# problem on closure).
assert chan.transport
async for msg in chan.transport.drain():
# try to deliver any lingering msgs
# before we destroy the channel.
# This accomplishes deterministic
# ``Portal.cancel_actor()`` cancellation by
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.warning(
'Draining msg from disconnected peer\n'
f'{chan_info}'
f'{pformat(msg)}\n'
)
# cid: str|None = msg.get('cid')
cid: str|None = msg.cid
if cid:
# deliver response to local caller/waiter
await self._deliver_ctx_payload(
chan,
cid,
msg,
)
if drain_cs.cancelled_caught:
log.warning(
'Timed out waiting on IPC transport channel to drain?\n'
f'{chan_info}'
)
# XXX NOTE XXX when no explicit call to
# `open_root_actor()` was made by the application
# (normally we implicitly make that call inside
# the first `.open_nursery()` in root-actor
# user/app code), we can assume that either we
# are NOT the root actor or are root but the
# runtime was started manually. and thus DO have
# to wait for the nursery-enterer to exit before
# shutting down the local runtime to avoid
# clobbering any ongoing subactor
# teardown/debugging/graceful-cancel.
#
# see matching note inside `._supervise.open_nursery()`
#
# TODO: should we have a separate cs + timeout
# block here?
if (
# XXX SO either,
# - not root OR,
# - is root but `open_root_actor()` was
# entered manually (in which case we do
# the equiv wait there using the
# `devx._debug` sub-sys APIs).
not local_nursery._implicit_runtime_started
):
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
with trio.move_on_after(0.5) as an_exit_cs:
an_exit_cs.shield = True
await local_nursery.exited.wait()
# TODO: currently this is always triggering for every
# sub-daemon spawned from the `piker.services._mngr`?
# -[ ] how do we ensure that the IPC is supposed to
# be long lived and isn't just a register?
# |_ in the register case how can we signal that the
# ephemeral msg loop was intentional?
if (
# not local_nursery._implicit_runtime_started
# and
an_exit_cs.cancelled_caught
):
report: str = (
'Timed out waiting on local actor-nursery to exit?\n'
f'c)>\n'
f' |_{local_nursery}\n'
)
if children := local_nursery._children:
# indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
)
log.warning(report)
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry: tuple|None = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and
poll() is None # proc still alive
):
# TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n'
f' |_{proc}\n'
)
# ``Channel`` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected
con_teardown_status: str = (
f'IPC channel disconnected:\n'
f'<=x uid: {chan.uid}\n'
f' |_{pformat(chan)}\n\n'
)
chans.remove(chan)
# TODO: do we need to be this pedantic?
if not chans:
con_teardown_status += (
f'-> No more channels with {chan.uid}'
)
self._peers.pop(uid, None)
peers_str: str = ''
for uid, chans in self._peers.items():
peers_str += (
f'uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
f' |_[{i}] {pformat(chan)}\n'
)
con_teardown_status += (
f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
)
# No more channels to other actors (at all) registered
# as connected.
if not self._peers:
con_teardown_status += (
'Signalling no more peer channel connections'
)
self._no_more_peers.set()
# NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the
# lock will be released if acquired due to having no
# more active IPC channels.
if _state.is_root_process():
pdb_lock = _debug.Lock
pdb_lock._blocked.add(uid)
# TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD
#
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
if (
(ctx_in_debug := pdb_lock.ctx_in_debug)
and
(pdb_user_uid := ctx_in_debug.chan.uid)
and
local_nursery
):
entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
):
log.cancel(
'Root actor reports no-more-peers, BUT\n'
'a DISCONNECTED child still has the debug '
'lock!\n\n'
# f'root uid: {self.uid}\n'
f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n'
)
await _debug.maybe_wait_for_debugger(
child_in_debug=True
)
# TODO: just bc a child's transport dropped
# doesn't mean it's not still using the pdb
# REPL! so,
# -[ ] ideally we can check out child proc
# tree to ensure that its alive (and
# actually using the REPL) before we cancel
# it's lock acquire by doing the below!
# -[ ] create a way to read the tree of each actor's
# grandchildren such that when an
# intermediary parent is cancelled but their
# child has locked the tty, the grandparent
# will not allow the parent to cancel or
# zombie reap the child! see open issue:
# - https://github.com/goodboy/tractor/issues/320
# ------ - ------
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
if (
(db_cs := pdb_lock.get_locking_task_cs())
and not db_cs.cancel_called
and uid == pdb_user_uid
):
log.critical(
f'STALE DEBUG LOCK DETECTED FOR {uid}'
)
# TODO: figure out why this breaks tests..
db_cs.cancel()
log.runtime(con_teardown_status)
# finally block closure
# TODO: rename to `._deliver_payload()` since this handles # TODO: rename to `._deliver_payload()` since this handles
# more then just `result` msgs now obvi XD # more then just `result` msgs now obvi XD
async def _deliver_ctx_payload( async def _deliver_ctx_payload(
@ -1157,7 +700,7 @@ class Actor:
) )
assert isinstance(chan, Channel) assert isinstance(chan, Channel)
# Initial handshake: swap names. # init handshake: swap actor-IDs.
await chan._do_handshake(aid=self.aid) await chan._do_handshake(aid=self.aid)
accept_addrs: list[UnwrappedAddress]|None = None accept_addrs: list[UnwrappedAddress]|None = None
@ -1719,6 +1262,10 @@ async def async_main(
the actor's "runtime" and all thus all ongoing RPC tasks. the actor's "runtime" and all thus all ongoing RPC tasks.
''' '''
# XXX NOTE, `_state._current_actor` **must** be set prior to
# calling this core runtime entrypoint!
assert actor is _state.current_actor()
actor._task: trio.Task = trio.lowlevel.current_task() actor._task: trio.Task = trio.lowlevel.current_task()
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
@ -1778,7 +1325,6 @@ async def async_main(
) as service_nursery, ) as service_nursery,
_server.open_ipc_server( _server.open_ipc_server(
actor=actor,
parent_tn=service_nursery, parent_tn=service_nursery,
stream_handler_tn=service_nursery, stream_handler_tn=service_nursery,
) as ipc_server, ) as ipc_server,
@ -1832,7 +1378,6 @@ async def async_main(
'Booting IPC server' 'Booting IPC server'
) )
eps: list = await ipc_server.listen_on( eps: list = await ipc_server.listen_on(
actor=actor,
accept_addrs=accept_addrs, accept_addrs=accept_addrs,
stream_handler_nursery=service_nursery, stream_handler_nursery=service_nursery,
) )
@ -1916,9 +1461,8 @@ async def async_main(
if actor._parent_chan: if actor._parent_chan:
await root_nursery.start( await root_nursery.start(
partial( partial(
process_messages, _rpc.process_messages,
actor, chan=actor._parent_chan,
actor._parent_chan,
shield=True, shield=True,
) )
) )
@ -1959,7 +1503,7 @@ async def async_main(
log.exception(err_report) log.exception(err_report)
if actor._parent_chan: if actor._parent_chan:
await try_ship_error_to_remote( await _rpc.try_ship_error_to_remote(
actor._parent_chan, actor._parent_chan,
internal_err, internal_err,
) )
@ -2053,16 +1597,18 @@ async def async_main(
) )
# Ensure all peers (actors connected to us as clients) are finished # Ensure all peers (actors connected to us as clients) are finished
if not actor._no_more_peers.is_set(): if (
if any( (ipc_server := actor.ipc_server)
chan.connected() for chan in chain(*actor._peers.values()) and
ipc_server.has_peers(check_chans=True)
): ):
teardown_report += ( teardown_report += (
f'-> Waiting for remaining peers {actor._peers} to clear..\n' f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
) )
log.runtime(teardown_report) log.runtime(teardown_report)
with CancelScope(shield=True): await ipc_server.wait_for_no_more_peers(
await actor._no_more_peers.wait() shield=True,
)
teardown_report += ( teardown_report += (
'-> All peer channels are complete\n' '-> All peer channels are complete\n'

View File

@ -58,9 +58,11 @@ from tractor.msg.types import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ipc import IPCServer
from ._supervise import ActorNursery from ._supervise import ActorNursery
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
log = get_logger('tractor') log = get_logger('tractor')
# placeholder for an mp start context if so using that backend # placeholder for an mp start context if so using that backend
@ -481,6 +483,7 @@ async def trio_proc(
cancelled_during_spawn: bool = False cancelled_during_spawn: bool = False
proc: trio.Process|None = None proc: trio.Process|None = None
ipc_server: IPCServer = actor_nursery._actor.ipc_server
try: try:
try: try:
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs) proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
@ -492,7 +495,7 @@ async def trio_proc(
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await ipc_server.wait_for_peer(
subactor.uid subactor.uid
) )
@ -724,11 +727,12 @@ async def mp_proc(
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
ipc_server: IPCServer = actor_nursery._actor.ipc_server
try: try:
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await ipc_server.wait_for_peer(
subactor.uid, subactor.uid,
) )

View File

@ -53,6 +53,9 @@ from . import _spawn
if TYPE_CHECKING: if TYPE_CHECKING:
import multiprocessing as mp import multiprocessing as mp
# from .ipc._server import IPCServer
from .ipc import IPCServer
log = get_logger(__name__) log = get_logger(__name__)
@ -315,6 +318,9 @@ class ActorNursery:
children: dict = self._children children: dict = self._children
child_count: int = len(children) child_count: int = len(children)
msg: str = f'Cancelling actor nursery with {child_count} children\n' msg: str = f'Cancelling actor nursery with {child_count} children\n'
server: IPCServer = self._actor.ipc_server
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery( async with trio.open_nursery(
strict_exception_groups=False, strict_exception_groups=False,
@ -337,7 +343,7 @@ class ActorNursery:
else: else:
if portal is None: # actor hasn't fully spawned yet if portal is None: # actor hasn't fully spawned yet
event = self._actor._peer_connected[subactor.uid] event: trio.Event = server._peer_connected[subactor.uid]
log.warning( log.warning(
f"{subactor.uid} never 't finished spawning?" f"{subactor.uid} never 't finished spawning?"
) )
@ -353,7 +359,7 @@ class ActorNursery:
if portal is None: if portal is None:
# cancelled while waiting on the event # cancelled while waiting on the event
# to arrive # to arrive
chan = self._actor._peers[subactor.uid][-1] chan = server._peers[subactor.uid][-1]
if chan: if chan:
portal = Portal(chan) portal = Portal(chan)
else: # there's no other choice left else: # there's no other choice left

View File

@ -92,7 +92,11 @@ from tractor._state import (
if TYPE_CHECKING: if TYPE_CHECKING:
from trio.lowlevel import Task from trio.lowlevel import Task
from threading import Thread from threading import Thread
from tractor.ipc import Channel from tractor.ipc import (
Channel,
IPCServer,
# _server, # TODO? export at top level?
)
from tractor._runtime import ( from tractor._runtime import (
Actor, Actor,
) )
@ -1434,6 +1438,7 @@ def any_connected_locker_child() -> bool:
''' '''
actor: Actor = current_actor() actor: Actor = current_actor()
server: IPCServer = actor.ipc_server
if not is_root_process(): if not is_root_process():
raise InternalError('This is a root-actor only API!') raise InternalError('This is a root-actor only API!')
@ -1443,7 +1448,7 @@ def any_connected_locker_child() -> bool:
and and
(uid_in_debug := ctx.chan.uid) (uid_in_debug := ctx.chan.uid)
): ):
chans: list[tractor.Channel] = actor._peers.get( chans: list[tractor.Channel] = server._peers.get(
tuple(uid_in_debug) tuple(uid_in_debug)
) )
if chans: if chans:
@ -3003,6 +3008,7 @@ async def _maybe_enter_pm(
[BaseException|BaseExceptionGroup], [BaseException|BaseExceptionGroup],
bool, bool,
] = lambda err: not is_multi_cancelled(err), ] = lambda err: not is_multi_cancelled(err),
**_pause_kws,
): ):
if ( if (
@ -3029,6 +3035,7 @@ async def _maybe_enter_pm(
await post_mortem( await post_mortem(
api_frame=api_frame, api_frame=api_frame,
tb=tb, tb=tb,
**_pause_kws,
) )
return True return True

View File

@ -49,6 +49,7 @@ from tractor.log import get_logger
from tractor._exceptions import ( from tractor._exceptions import (
MsgTypeError, MsgTypeError,
pack_from_raise, pack_from_raise,
TransportClosed,
) )
from tractor.msg import ( from tractor.msg import (
Aid, Aid,
@ -256,7 +257,7 @@ class Channel:
self, self,
payload: Any, payload: Any,
hide_tb: bool = False, hide_tb: bool = True,
) -> None: ) -> None:
''' '''
@ -274,18 +275,27 @@ class Channel:
payload, payload,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
except BaseException as _err: except (
BaseException,
MsgTypeError,
TransportClosed,
)as _err:
err = _err # bind for introspection err = _err # bind for introspection
if not isinstance(_err, MsgTypeError): match err:
# assert err case MsgTypeError():
__tracebackhide__: bool = False
else:
try: try:
assert err.cid assert err.cid
except KeyError: except KeyError:
raise err raise err
case TransportClosed():
log.transport(
f'Transport stream closed due to\n'
f'{err.repr_src_exc()}\n'
)
case _:
# never suppress non-tpt sources
__tracebackhide__: bool = False
raise raise
async def recv(self) -> Any: async def recv(self) -> Any:

View File

@ -19,11 +19,14 @@ multi-transport-protcol needs!
''' '''
from __future__ import annotations from __future__ import annotations
from collections import defaultdict
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from functools import partial from functools import partial
from itertools import chain
import inspect import inspect
from pprint import pformat
from types import ( from types import (
ModuleType, ModuleType,
) )
@ -40,24 +43,540 @@ from trio import (
SocketListener, SocketListener,
) )
from ..msg import Struct # from ..devx import _debug
from .._exceptions import (
TransportClosed,
)
from .. import _rpc
from ..msg import (
MsgType,
Struct,
types as msgtypes,
)
from ..trionics import maybe_open_nursery from ..trionics import maybe_open_nursery
from .. import ( from .. import (
_state, _state,
log, log,
) )
from .._addr import Address from .._addr import Address
from ._chan import Channel
from ._transport import MsgTransport from ._transport import MsgTransport
from ._uds import UDSAddress from ._uds import UDSAddress
from ._tcp import TCPAddress from ._tcp import TCPAddress
if TYPE_CHECKING: if TYPE_CHECKING:
from .._runtime import Actor from .._runtime import Actor
from .._supervise import ActorNursery
log = log.get_logger(__name__) log = log.get_logger(__name__)
async def maybe_wait_on_canced_subs(
uid: tuple[str, str],
chan: Channel,
disconnected: bool,
actor: Actor|None = None,
chan_drain_timeout: float = 0.5,
an_exit_timeout: float = 0.5,
) -> ActorNursery|None:
'''
When a process-local actor-nursery is found for the given actor
`uid` (i.e. that peer is **also** a subactor of this parent), we
attempt to (with timeouts) wait on,
- all IPC msgs to drain on the (common) `Channel` such that all
local `Context`-parent-tasks can also gracefully collect
`ContextCancelled` msgs from their respective remote children
vs. a `chan_drain_timeout`.
- the actor-nursery to cancel-n-join all its supervised children
(processes) *gracefully* vs. a `an_exit_timeout` and thus also
detect cases where the IPC transport connection broke but
a sub-process is detected as still alive (a case that happens
when the subactor is still in an active debugger REPL session).
If the timeout expires in either case we ofc report with warning.
'''
actor = actor or _state.current_actor()
# XXX running outside actor-runtime usage,
# - unit testing
# - possibly manual usage (eventually) ?
if not actor:
return None
local_nursery: (
ActorNursery|None
) = actor._actoruid2nursery.get(uid)
# This is set in `Portal.cancel_actor()`. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
if (
local_nursery
and (
actor._cancel_called
or
chan._cancel_called
)
#
# ^-TODO-^ along with this is there another condition
# that we should filter with to avoid entering this
# waiting block needlessly?
# -[ ] maybe `and local_nursery.cancelled` and/or
# only if the `._children` table is empty or has
# only `Portal`s with .chan._cancel_called ==
# True` as per what we had below; the MAIN DIFF
# BEING that just bc one `Portal.cancel_actor()`
# was called, doesn't mean the whole actor-nurse
# is gonna exit any time soon right!?
#
# or
# all(chan._cancel_called for chan in chans)
):
log.cancel(
'Waiting on cancel request to peer..\n'
f'c)=>\n'
f' |_{chan.uid}\n'
)
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
# which is mapped to a sub-actor (i.e. it's managed
# by local actor-nursery) has a message that is sent
# to the peer likely by this actor (which may be in
# a shutdown sequence due to cancellation) when the
# local runtime here is now cancelled while
# (presumably) in the middle of msg loop processing.
chan_info: str = (
f'{chan.uid}\n'
f'|_{chan}\n'
f' |_{chan.transport}\n\n'
)
with trio.move_on_after(chan_drain_timeout) as drain_cs:
drain_cs.shield = True
# attempt to wait for the far end to close the
# channel and bail after timeout (a 2-generals
# problem on closure).
assert chan.transport
async for msg in chan.transport.drain():
# try to deliver any lingering msgs
# before we destroy the channel.
# This accomplishes deterministic
# ``Portal.cancel_actor()`` cancellation by
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.warning(
'Draining msg from disconnected peer\n'
f'{chan_info}'
f'{pformat(msg)}\n'
)
# cid: str|None = msg.get('cid')
cid: str|None = msg.cid
if cid:
# deliver response to local caller/waiter
await actor._deliver_ctx_payload(
chan,
cid,
msg,
)
if drain_cs.cancelled_caught:
log.warning(
'Timed out waiting on IPC transport channel to drain?\n'
f'{chan_info}'
)
# XXX NOTE XXX when no explicit call to
# `open_root_actor()` was made by the application
# (normally we implicitly make that call inside
# the first `.open_nursery()` in root-actor
# user/app code), we can assume that either we
# are NOT the root actor or are root but the
# runtime was started manually. and thus DO have
# to wait for the nursery-enterer to exit before
# shutting down the local runtime to avoid
# clobbering any ongoing subactor
# teardown/debugging/graceful-cancel.
#
# see matching note inside `._supervise.open_nursery()`
#
# TODO: should we have a separate cs + timeout
# block here?
if (
# XXX SO either,
# - not root OR,
# - is root but `open_root_actor()` was
# entered manually (in which case we do
# the equiv wait there using the
# `devx.debug` sub-sys APIs).
not local_nursery._implicit_runtime_started
):
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
with trio.move_on_after(an_exit_timeout) as an_exit_cs:
an_exit_cs.shield = True
await local_nursery.exited.wait()
# TODO: currently this is always triggering for every
# sub-daemon spawned from the `piker.services._mngr`?
# -[ ] how do we ensure that the IPC is supposed to
# be long lived and isn't just a register?
# |_ in the register case how can we signal that the
# ephemeral msg loop was intentional?
if (
# not local_nursery._implicit_runtime_started
# and
an_exit_cs.cancelled_caught
):
report: str = (
'Timed out waiting on local actor-nursery to exit?\n'
f'c)>\n'
f' |_{local_nursery}\n'
)
if children := local_nursery._children:
# indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
)
log.warning(report)
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry: tuple|None = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and
poll() is None # proc still alive
):
# TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n'
f' |_{proc}\n'
)
return local_nursery
# TODO multi-tpt support with per-proto peer tracking?
#
# -[x] maybe change to mod-func and rename for implied
# multi-transport semantics?
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
# so that we can query per tpt all peer contact infos?
# |_[ ] possibly provide a global viewing via a
# `collections.ChainMap`?
#
async def handle_stream_from_peer(
stream: trio.SocketStream,
*,
server: IPCServer,
) -> None:
'''
Top-level `trio.abc.Stream` (i.e. normally `trio.SocketStream`)
handler-callback as spawn-invoked by `trio.serve_listeners()`.
Note that each call to this handler is as a spawned task inside
any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
such that it is invoked as,
IPCEndpoint.stream_handler_tn.start_soon(
handle_stream,
stream,
)
'''
server._no_more_peers = trio.Event() # unset by making new
# TODO, debug_mode tooling for when hackin this lower layer?
# with _debug.maybe_open_crash_handler(
# pdb=True,
# ) as boxerr:
chan = Channel.from_stream(stream)
con_status: str = (
'New inbound IPC connection <=\n'
f'|_{chan}\n'
)
# initial handshake with peer phase
try:
if actor := _state.current_actor():
peer_aid: msgtypes.Aid = await chan._do_handshake(
aid=actor.aid,
)
except (
TransportClosed,
# ^XXX NOTE, the above wraps `trio` exc types raised
# during various `SocketStream.send/receive_xx()` calls
# under different fault conditions such as,
#
# trio.BrokenResourceError,
# trio.ClosedResourceError,
#
# Inside our `.ipc._transport` layer we absorb and
# re-raise our own `TransportClosed` exc such that this
# higher level runtime code can only worry one
# "kinda-error" that we expect to tolerate during
# discovery-sys related pings, queires, DoS etc.
):
# XXX: This may propagate up from `Channel._aiter_recv()`
# and `MsgpackStream._inter_packets()` on a read from the
# stream particularly when the runtime is first starting up
# inside `open_root_actor()` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# because the handshake was never meant took place.
log.runtime(
con_status
+
' -> But failed to handshake? Ignoring..\n'
)
return
uid: tuple[str, str] = (
peer_aid.name,
peer_aid.uuid,
)
# TODO, can we make this downstream peer tracking use the
# `peer_aid` instead?
familiar: str = 'new-peer'
if _pre_chan := server._peers.get(uid):
familiar: str = 'pre-existing-peer'
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
con_status += (
f' -> Handshake with {familiar} `{uid_short}` complete\n'
)
if _pre_chan:
# con_status += (
# ^TODO^ swap once we minimize conn duplication
# -[ ] last thing might be reg/unreg runtime reqs?
# log.warning(
log.debug(
f'?Wait?\n'
f'We already have IPC with peer {uid_short!r}\n'
f'|_{_pre_chan}\n'
)
# IPC connection tracking for both peers and new children:
# - if this is a new channel to a locally spawned
# sub-actor there will be a spawn wait even registered
# by a call to `.wait_for_peer()`.
# - if a peer is connecting no such event will exit.
event: trio.Event|None = server._peer_connected.pop(
uid,
None,
)
if event:
con_status += (
' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n'
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
# f' {event}\n'
# f' |{event.statistics()}\n'
)
# wake tasks waiting on this IPC-transport "connect-back"
event.set()
else:
con_status += (
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
) # type: ignore
chans: list[Channel] = server._peers[uid]
# if chans:
# # TODO: re-use channels for new connections instead
# # of always new ones?
# # => will require changing all the discovery funcs..
# append new channel
# TODO: can we just use list-ref directly?
chans.append(chan)
con_status += ' -> Entering RPC msg loop..\n'
log.runtime(con_status)
# Begin channel management - respond to remote requests and
# process received reponses.
disconnected: bool = False
last_msg: MsgType
try:
(
disconnected,
last_msg,
) = await _rpc.process_messages(
chan=chan,
)
except trio.Cancelled:
log.cancel(
'IPC transport msg loop was cancelled\n'
f'c)>\n'
f' |_{chan}\n'
)
raise
finally:
# check if there are subs which we should gracefully join at
# both the inter-actor-task and subprocess levels to
# gracefully remote cancel and later disconnect (particularly
# for permitting subs engaged in active debug-REPL sessions).
local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
uid=uid,
chan=chan,
disconnected=disconnected,
)
# ``Channel`` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected
con_teardown_status: str = (
f'IPC channel disconnected:\n'
f'<=x uid: {chan.uid}\n'
f' |_{pformat(chan)}\n\n'
)
chans.remove(chan)
# TODO: do we need to be this pedantic?
if not chans:
con_teardown_status += (
f'-> No more channels with {chan.uid}'
)
server._peers.pop(uid, None)
peers_str: str = ''
for uid, chans in server._peers.items():
peers_str += (
f'uid: {uid}\n'
)
for i, chan in enumerate(chans):
peers_str += (
f' |_[{i}] {pformat(chan)}\n'
)
con_teardown_status += (
f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n'
)
# No more channels to other actors (at all) registered
# as connected.
if not server._peers:
con_teardown_status += (
'Signalling no more peer channel connections'
)
server._no_more_peers.set()
# NOTE: block this actor from acquiring the
# debugger-TTY-lock since we have no way to know if we
# cancelled it and further there is no way to ensure the
# lock will be released if acquired due to having no
# more active IPC channels.
if (
_state.is_root_process()
and
_state.is_debug_mode()
):
from ..devx import _debug
pdb_lock = _debug.Lock
pdb_lock._blocked.add(uid)
# TODO: NEEEDS TO BE TESTED!
# actually, no idea if this ever even enters.. XD
#
# XXX => YES IT DOES, when i was testing ctl-c
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
if (
local_nursery
and
(ctx_in_debug := pdb_lock.ctx_in_debug)
and
(pdb_user_uid := ctx_in_debug.chan.uid)
):
entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and poll() is None
):
log.cancel(
'Root actor reports no-more-peers, BUT\n'
'a DISCONNECTED child still has the debug '
'lock!\n\n'
# f'root uid: {actor.uid}\n'
f'last disconnected child uid: {uid}\n'
f'locking child uid: {pdb_user_uid}\n'
)
await _debug.maybe_wait_for_debugger(
child_in_debug=True
)
# TODO: just bc a child's transport dropped
# doesn't mean it's not still using the pdb
# REPL! so,
# -[ ] ideally we can check out child proc
# tree to ensure that its alive (and
# actually using the REPL) before we cancel
# it's lock acquire by doing the below!
# -[ ] create a way to read the tree of each actor's
# grandchildren such that when an
# intermediary parent is cancelled but their
# child has locked the tty, the grandparent
# will not allow the parent to cancel or
# zombie reap the child! see open issue:
# - https://github.com/goodboy/tractor/issues/320
# ------ - ------
# if a now stale local task has the TTY lock still
# we cancel it to allow servicing other requests for
# the lock.
if (
(db_cs := pdb_lock.get_locking_task_cs())
and not db_cs.cancel_called
and uid == pdb_user_uid
):
log.critical(
f'STALE DEBUG LOCK DETECTED FOR {uid}'
)
# TODO: figure out why this breaks tests..
db_cs.cancel()
log.runtime(con_teardown_status)
# finally block closure
class IPCEndpoint(Struct): class IPCEndpoint(Struct):
''' '''
An instance of an IPC "bound" address where the lifetime of the An instance of an IPC "bound" address where the lifetime of the
@ -120,8 +639,23 @@ class IPCEndpoint(Struct):
class IPCServer(Struct): class IPCServer(Struct):
_parent_tn: Nursery _parent_tn: Nursery
_stream_handler_tn: Nursery _stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently
# connected"; field is **always** set to an instance but
# initialized with `.is_set() == True`.
_no_more_peers: trio.Event
_endpoints: list[IPCEndpoint] = [] _endpoints: list[IPCEndpoint] = []
# connection tracking & mgmt
_peers: defaultdict[
str, # uaid
list[Channel], # IPC conns from peer
] = defaultdict(list)
_peer_connected: dict[
tuple[str, str],
trio.Event,
] = {}
# syncs for setup/teardown sequences # syncs for setup/teardown sequences
_shutdown: trio.Event|None = None _shutdown: trio.Event|None = None
@ -183,6 +717,65 @@ class IPCServer(Struct):
f'protos: {tpt_protos!r}\n' f'protos: {tpt_protos!r}\n'
) )
def has_peers(
self,
check_chans: bool = False,
) -> bool:
'''
Predicate for "are there any active peer IPC `Channel`s at the moment?"
'''
has_peers: bool = not self._no_more_peers.is_set()
if (
has_peers
and
check_chans
):
has_peers: bool = (
any(chan.connected()
for chan in chain(
*self._peers.values()
)
)
and
has_peers
)
return has_peers
async def wait_for_no_more_peers(
self,
shield: bool = False,
) -> None:
with trio.CancelScope(shield=shield):
await self._no_more_peers.wait()
async def wait_for_peer(
self,
uid: tuple[str, str],
) -> tuple[trio.Event, Channel]:
'''
Wait for a connection back from a (spawned sub-)actor with
a `uid` using a `trio.Event`.
Returns a pair of the event and the "last" registered IPC
`Channel` for the peer with `uid`.
'''
log.debug(f'Waiting for peer {uid!r} to connect')
event: trio.Event = self._peer_connected.setdefault(
uid,
trio.Event(),
)
await event.wait()
log.debug(f'{uid!r} successfully connected back to us')
mru_chan: Channel = self._peers[uid][-1]
return (
event,
mru_chan,
)
@property @property
def addrs(self) -> list[Address]: def addrs(self) -> list[Address]:
return [ep.addr for ep in self._endpoints] return [ep.addr for ep in self._endpoints]
@ -211,15 +804,25 @@ class IPCServer(Struct):
return ev.is_set() return ev.is_set()
def pformat(self) -> str: def pformat(self) -> str:
eps: list[IPCEndpoint] = self._endpoints
fmtstr: str = ( state_repr: str = (
f' |_endpoints: {self._endpoints}\n' f'{len(eps)!r} IPC-endpoints active'
)
fmtstr = (
f' |_state: {state_repr}\n'
f' no_more_peers: {self.has_peers()}\n'
) )
if self._shutdown is not None: if self._shutdown is not None:
shutdown_stats: EventStatistics = self._shutdown.statistics() shutdown_stats: EventStatistics = self._shutdown.statistics()
fmtstr += ( fmtstr += (
f'\n' f' task_waiting_on_shutdown: {shutdown_stats}\n'
f' |_shutdown: {shutdown_stats}\n' )
fmtstr += (
# TODO, use the `ppfmt()` helper from `modden`!
f' |_endpoints: {pformat(self._endpoints)}\n'
f' |_peers: {len(self._peers)} connected\n'
) )
return ( return (
@ -249,7 +852,6 @@ class IPCServer(Struct):
async def listen_on( async def listen_on(
self, self,
*, *,
actor: Actor,
accept_addrs: list[tuple[str, int|str]]|None = None, accept_addrs: list[tuple[str, int|str]]|None = None,
stream_handler_nursery: Nursery|None = None, stream_handler_nursery: Nursery|None = None,
) -> list[IPCEndpoint]: ) -> list[IPCEndpoint]:
@ -282,20 +884,19 @@ class IPCServer(Struct):
f'{self}\n' f'{self}\n'
) )
log.info( log.runtime(
f'Binding to endpoints for,\n' f'Binding to endpoints for,\n'
f'{accept_addrs}\n' f'{accept_addrs}\n'
) )
eps: list[IPCEndpoint] = await self._parent_tn.start( eps: list[IPCEndpoint] = await self._parent_tn.start(
partial( partial(
_serve_ipc_eps, _serve_ipc_eps,
actor=actor,
server=self, server=self,
stream_handler_tn=stream_handler_nursery, stream_handler_tn=stream_handler_nursery,
listen_addrs=accept_addrs, listen_addrs=accept_addrs,
) )
) )
log.info( log.runtime(
f'Started IPC endpoints\n' f'Started IPC endpoints\n'
f'{eps}\n' f'{eps}\n'
) )
@ -318,7 +919,6 @@ class IPCServer(Struct):
async def _serve_ipc_eps( async def _serve_ipc_eps(
*, *,
actor: Actor,
server: IPCServer, server: IPCServer,
stream_handler_tn: Nursery, stream_handler_tn: Nursery,
listen_addrs: list[tuple[str, int|str]], listen_addrs: list[tuple[str, int|str]],
@ -352,12 +952,13 @@ async def _serve_ipc_eps(
stream_handler_tn=stream_handler_tn, stream_handler_tn=stream_handler_tn,
) )
try: try:
log.info( log.runtime(
f'Starting new endpoint listener\n' f'Starting new endpoint listener\n'
f'{ep}\n' f'{ep}\n'
) )
listener: trio.abc.Listener = await ep.start_listener() listener: trio.abc.Listener = await ep.start_listener()
assert listener is ep._listener assert listener is ep._listener
# actor = _state.current_actor()
# if actor.is_registry: # if actor.is_registry:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
@ -379,7 +980,10 @@ async def _serve_ipc_eps(
_listeners: list[SocketListener] = await listen_tn.start( _listeners: list[SocketListener] = await listen_tn.start(
partial( partial(
trio.serve_listeners, trio.serve_listeners,
handler=actor._stream_handler, handler=partial(
handle_stream_from_peer,
server=server,
),
listeners=listeners, listeners=listeners,
# NOTE: configured such that new # NOTE: configured such that new
@ -389,13 +993,13 @@ async def _serve_ipc_eps(
) )
) )
# TODO, wow make this message better! XD # TODO, wow make this message better! XD
log.info( log.runtime(
'Started server(s)\n' 'Started server(s)\n'
+ +
'\n'.join([f'|_{addr}' for addr in listen_addrs]) '\n'.join([f'|_{addr}' for addr in listen_addrs])
) )
log.info( log.runtime(
f'Started IPC endpoints\n' f'Started IPC endpoints\n'
f'{eps}\n' f'{eps}\n'
) )
@ -411,6 +1015,7 @@ async def _serve_ipc_eps(
ep.close_listener() ep.close_listener()
server._endpoints.remove(ep) server._endpoints.remove(ep)
# actor = _state.current_actor()
# if actor.is_arbiter: # if actor.is_arbiter:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
@ -421,7 +1026,6 @@ async def _serve_ipc_eps(
@acm @acm
async def open_ipc_server( async def open_ipc_server(
actor: Actor,
parent_tn: Nursery|None = None, parent_tn: Nursery|None = None,
stream_handler_tn: Nursery|None = None, stream_handler_tn: Nursery|None = None,
@ -430,20 +1034,28 @@ async def open_ipc_server(
async with maybe_open_nursery( async with maybe_open_nursery(
nursery=parent_tn, nursery=parent_tn,
) as rent_tn: ) as rent_tn:
no_more_peers = trio.Event()
no_more_peers.set()
ipc_server = IPCServer( ipc_server = IPCServer(
_parent_tn=rent_tn, _parent_tn=rent_tn,
_stream_handler_tn=stream_handler_tn or rent_tn, _stream_handler_tn=stream_handler_tn or rent_tn,
_no_more_peers=no_more_peers,
) )
try: try:
yield ipc_server yield ipc_server
log.runtime(
f'Waiting on server to shutdown or be cancelled..\n'
f'{ipc_server}'
)
# TODO? when if ever would we want/need this?
# with trio.CancelScope(shield=True):
# await ipc_server.wait_for_shutdown()
# except BaseException as berr: except BaseException as berr:
# log.exception( log.exception(
# 'IPC server crashed on exit ?' 'IPC server caller crashed ??'
# ) )
# raise berr
finally:
# ?TODO, maybe we can ensure the endpoints are torndown # ?TODO, maybe we can ensure the endpoints are torndown
# (and thus their managed listeners) beforehand to ensure # (and thus their managed listeners) beforehand to ensure
# super graceful RPC mechanics? # super graceful RPC mechanics?
@ -451,17 +1063,5 @@ async def open_ipc_server(
# -[ ] but aren't we doing that already per-`listen_tn` # -[ ] but aren't we doing that already per-`listen_tn`
# inside `_serve_ipc_eps()` above? # inside `_serve_ipc_eps()` above?
# #
# if not ipc_server.is_shutdown():
# ipc_server.cancel() # ipc_server.cancel()
# await ipc_server.wait_for_shutdown() raise berr
# assert ipc_server.is_shutdown()
pass
# !XXX TODO! lol so classic, the below code is rekt!
#
# XXX here is a perfect example of suppressing errors with
# `trio.Cancelled` as per our demonstrating example,
# `test_trioisms::test_acm_embedded_nursery_propagates_enter_err
#
# with trio.CancelScope(shield=True):
# await ipc_server.wait_for_shutdown()

View File

@ -127,6 +127,11 @@ async def start_listener(
Start a TCP socket listener on the given `TCPAddress`. Start a TCP socket listener on the given `TCPAddress`.
''' '''
log.info(
f'Attempting to bind TCP socket\n'
f'>[\n'
f'|_{addr}\n'
)
# ?TODO, maybe we should just change the lower-level call this is # ?TODO, maybe we should just change the lower-level call this is
# using internall per-listener? # using internall per-listener?
listeners: list[SocketListener] = await open_tcp_listeners( listeners: list[SocketListener] = await open_tcp_listeners(
@ -140,6 +145,12 @@ async def start_listener(
assert len(listeners) == 1 assert len(listeners) == 1
listener = listeners[0] listener = listeners[0]
host, port = listener.socket.getsockname()[:2] host, port = listener.socket.getsockname()[:2]
log.info(
f'Listening on TCP socket\n'
f'[>\n'
f' |_{addr}\n'
)
return listener return listener