Compare commits
10 Commits
f3285ea870
...
1c73c0c0ee
Author | SHA1 | Date |
---|---|---|
|
1c73c0c0ee | |
|
101cd94e89 | |
|
3f33ba1cc0 | |
|
70f5315506 | |
|
496fac04bb | |
|
02baeb6a8b | |
|
d4ab802e14 | |
|
fdeaeef9f7 | |
|
41609d1433 | |
|
c9068522ed |
|
@ -138,11 +138,19 @@ def tpt_protos(request) -> list[str]:
|
|||
yield proto_keys
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
@pytest.fixture(
|
||||
scope='session',
|
||||
autouse=True,
|
||||
)
|
||||
def tpt_proto(
|
||||
tpt_protos: list[str],
|
||||
) -> str:
|
||||
yield tpt_protos[0]
|
||||
proto_key: str = tpt_protos[0]
|
||||
from tractor import _state
|
||||
if _state._def_tpt_proto != proto_key:
|
||||
_state._def_tpt_proto = proto_key
|
||||
# breakpoint()
|
||||
yield proto_key
|
||||
|
||||
|
||||
_ci_env: bool = os.environ.get('CI', False)
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
'''
|
||||
`tractor.ipc` subsystem(s)/unit testing suites.
|
||||
|
||||
'''
|
|
@ -0,0 +1,72 @@
|
|||
'''
|
||||
High-level `.ipc._server` unit tests.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
from tractor import (
|
||||
devx,
|
||||
ipc,
|
||||
log,
|
||||
)
|
||||
from tractor._testing.addr import (
|
||||
get_rando_addr,
|
||||
)
|
||||
# TODO, use/check-roundtripping with some of these wrapper types?
|
||||
#
|
||||
# from .._addr import Address
|
||||
# from ._chan import Channel
|
||||
# from ._transport import MsgTransport
|
||||
# from ._uds import UDSAddress
|
||||
# from ._tcp import TCPAddress
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'_tpt_proto',
|
||||
['uds', 'tcp']
|
||||
)
|
||||
def test_basic_ipc_server(
|
||||
_tpt_proto: str,
|
||||
debug_mode: bool,
|
||||
loglevel: str,
|
||||
):
|
||||
|
||||
# so we see the socket-listener reporting on console
|
||||
log.get_console_log("INFO")
|
||||
|
||||
rando_addr: tuple = get_rando_addr(
|
||||
tpt_proto=_tpt_proto,
|
||||
)
|
||||
async def main():
|
||||
async with ipc._server.open_ipc_server() as server:
|
||||
|
||||
assert (
|
||||
server._parent_tn
|
||||
and
|
||||
server._parent_tn is server._stream_handler_tn
|
||||
)
|
||||
assert server._no_more_peers.is_set()
|
||||
|
||||
eps: list[ipc.IPCEndpoint] = await server.listen_on(
|
||||
accept_addrs=[rando_addr],
|
||||
stream_handler_nursery=None,
|
||||
)
|
||||
assert (
|
||||
len(eps) == 1
|
||||
and
|
||||
(ep := eps[0])._listener
|
||||
and
|
||||
not ep.peer_tpts
|
||||
)
|
||||
|
||||
server._parent_tn.cancel_scope.cancel()
|
||||
|
||||
# !TODO! actually make a bg-task connection from a client
|
||||
# using `ipc._chan._connect_chan()`
|
||||
|
||||
with devx.maybe_open_crash_handler(
|
||||
pdb=debug_mode,
|
||||
):
|
||||
trio.run(main)
|
|
@ -100,16 +100,29 @@ async def streamer(
|
|||
@acm
|
||||
async def open_stream() -> Awaitable[tractor.MsgStream]:
|
||||
|
||||
async with tractor.open_nursery() as tn:
|
||||
portal = await tn.start_actor('streamer', enable_modules=[__name__])
|
||||
async with (
|
||||
portal.open_context(streamer) as (ctx, first),
|
||||
ctx.open_stream() as stream,
|
||||
):
|
||||
yield stream
|
||||
try:
|
||||
async with tractor.open_nursery() as an:
|
||||
portal = await an.start_actor(
|
||||
'streamer',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
async with (
|
||||
portal.open_context(streamer) as (ctx, first),
|
||||
ctx.open_stream() as stream,
|
||||
):
|
||||
yield stream
|
||||
|
||||
await portal.cancel_actor()
|
||||
print('CANCELLED STREAMER')
|
||||
print('Cancelling streamer')
|
||||
await portal.cancel_actor()
|
||||
print('Cancelled streamer')
|
||||
|
||||
except Exception as err:
|
||||
print(
|
||||
f'`open_stream()` errored?\n'
|
||||
f'{err!r}\n'
|
||||
)
|
||||
await tractor.pause(shield=True)
|
||||
raise err
|
||||
|
||||
|
||||
@acm
|
||||
|
@ -132,19 +145,28 @@ async def maybe_open_stream(taskname: str):
|
|||
yield stream
|
||||
|
||||
|
||||
def test_open_local_sub_to_stream():
|
||||
def test_open_local_sub_to_stream(
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify a single inter-actor stream can can be fanned-out shared to
|
||||
N local tasks using ``trionics.maybe_open_context():``.
|
||||
N local tasks using `trionics.maybe_open_context()`.
|
||||
|
||||
'''
|
||||
timeout: float = 3.6 if platform.system() != "Windows" else 10
|
||||
timeout: float = 3.6
|
||||
if platform.system() == "Windows":
|
||||
timeout: float = 10
|
||||
|
||||
if debug_mode:
|
||||
timeout = 999
|
||||
|
||||
async def main():
|
||||
|
||||
full = list(range(1000))
|
||||
|
||||
async def get_sub_and_pull(taskname: str):
|
||||
|
||||
stream: tractor.MsgStream
|
||||
async with (
|
||||
maybe_open_stream(taskname) as stream,
|
||||
):
|
||||
|
@ -165,17 +187,27 @@ def test_open_local_sub_to_stream():
|
|||
assert set(seq).issubset(set(full))
|
||||
print(f'{taskname} finished')
|
||||
|
||||
with trio.fail_after(timeout):
|
||||
with trio.fail_after(timeout) as cs:
|
||||
# TODO: turns out this isn't multi-task entrant XD
|
||||
# We probably need an indepotent entry semantic?
|
||||
async with tractor.open_root_actor():
|
||||
async with tractor.open_root_actor(
|
||||
debug_mode=debug_mode,
|
||||
):
|
||||
async with (
|
||||
trio.open_nursery() as nurse,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
for i in range(10):
|
||||
nurse.start_soon(get_sub_and_pull, f'task_{i}')
|
||||
tn.start_soon(
|
||||
get_sub_and_pull,
|
||||
f'task_{i}',
|
||||
)
|
||||
await trio.sleep(0.001)
|
||||
|
||||
print('all consumer tasks finished')
|
||||
|
||||
if cs.cancelled_caught:
|
||||
pytest.fail(
|
||||
'Should NOT time out in `open_root_actor()` ?'
|
||||
)
|
||||
|
||||
trio.run(main)
|
||||
|
|
|
@ -57,7 +57,11 @@ async def spawn(
|
|||
)
|
||||
|
||||
assert len(an._children) == 1
|
||||
assert portal.channel.uid in tractor.current_actor()._peers
|
||||
assert (
|
||||
portal.channel.uid
|
||||
in
|
||||
tractor.current_actor().ipc_server._peers
|
||||
)
|
||||
|
||||
# get result from child subactor
|
||||
result = await portal.result()
|
||||
|
|
|
@ -180,7 +180,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
with tractor.devx.maybe_open_crash_handler(
|
||||
pdb=debug_mode,
|
||||
) as bxerr:
|
||||
assert not bxerr.value
|
||||
if bxerr:
|
||||
assert not bxerr.value
|
||||
|
||||
async with (
|
||||
wraps_tn_that_always_cancels() as tn,
|
||||
|
|
|
@ -48,6 +48,7 @@ from ._state import (
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
from .ipc._server import IPCServer
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
@ -79,7 +80,7 @@ async def get_registry(
|
|||
)
|
||||
else:
|
||||
# TODO: try to look pre-existing connection from
|
||||
# `Actor._peers` and use it instead?
|
||||
# `IPCServer._peers` and use it instead?
|
||||
async with (
|
||||
_connect_chan(addr) as chan,
|
||||
open_portal(chan) as regstr_ptl,
|
||||
|
@ -111,7 +112,7 @@ def get_peer_by_name(
|
|||
) -> list[Channel]|None: # at least 1
|
||||
'''
|
||||
Scan for an existing connection (set) to a named actor
|
||||
and return any channels from `Actor._peers`.
|
||||
and return any channels from `IPCServer._peers: dict`.
|
||||
|
||||
This is an optimization method over querying the registrar for
|
||||
the same info.
|
||||
|
|
|
@ -578,12 +578,11 @@ async def open_portal(
|
|||
|
||||
msg_loop_cs: trio.CancelScope|None = None
|
||||
if start_msg_loop:
|
||||
from ._runtime import process_messages
|
||||
from . import _rpc
|
||||
msg_loop_cs = await tn.start(
|
||||
partial(
|
||||
process_messages,
|
||||
actor,
|
||||
channel,
|
||||
_rpc.process_messages,
|
||||
chan=channel,
|
||||
# if the local task is cancelled we want to keep
|
||||
# the msg loop running until our block ends
|
||||
shield=True,
|
||||
|
|
|
@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
|
|||
|
||||
|
||||
async def process_messages(
|
||||
actor: Actor,
|
||||
chan: Channel,
|
||||
shield: bool = False,
|
||||
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
@ -907,6 +906,7 @@ async def process_messages(
|
|||
(as utilized inside `Portal.cancel_actor()` ).
|
||||
|
||||
'''
|
||||
actor: Actor = _state.current_actor()
|
||||
assert actor._service_n # runtime state sanity
|
||||
|
||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||
|
|
|
@ -40,9 +40,7 @@ from __future__ import annotations
|
|||
from contextlib import (
|
||||
ExitStack,
|
||||
)
|
||||
from collections import defaultdict
|
||||
from functools import partial
|
||||
from itertools import chain
|
||||
import importlib
|
||||
import importlib.util
|
||||
import os
|
||||
|
@ -76,6 +74,7 @@ from tractor.msg import (
|
|||
)
|
||||
from .ipc import (
|
||||
Channel,
|
||||
# IPCServer, # causes cycles atm..
|
||||
_server,
|
||||
)
|
||||
from ._addr import (
|
||||
|
@ -96,18 +95,13 @@ from ._exceptions import (
|
|||
ModuleNotExposed,
|
||||
MsgTypeError,
|
||||
unpack_error,
|
||||
TransportClosed,
|
||||
)
|
||||
from .devx import _debug
|
||||
from ._discovery import get_registry
|
||||
from ._portal import Portal
|
||||
from . import _state
|
||||
from . import _mp_fixup_main
|
||||
from ._rpc import (
|
||||
process_messages,
|
||||
try_ship_error_to_remote,
|
||||
)
|
||||
|
||||
from . import _rpc
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._supervise import ActorNursery
|
||||
|
@ -161,7 +155,6 @@ class Actor:
|
|||
_root_n: Nursery|None = None
|
||||
_service_n: Nursery|None = None
|
||||
|
||||
# XXX moving to IPCServer!
|
||||
_ipc_server: _server.IPCServer|None = None
|
||||
|
||||
@property
|
||||
|
@ -251,14 +244,6 @@ class Actor:
|
|||
# by the user (currently called the "arbiter")
|
||||
self._spawn_method: str = spawn_method
|
||||
|
||||
self._peers: defaultdict[
|
||||
str, # uaid
|
||||
list[Channel], # IPC conns from peer
|
||||
] = defaultdict(list)
|
||||
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
|
||||
self._no_more_peers = trio.Event()
|
||||
self._no_more_peers.set()
|
||||
|
||||
# RPC state
|
||||
self._ongoing_rpc_tasks = trio.Event()
|
||||
self._ongoing_rpc_tasks.set()
|
||||
|
@ -343,7 +328,12 @@ class Actor:
|
|||
parent_uid: tuple|None = None
|
||||
if rent_chan := self._parent_chan:
|
||||
parent_uid = rent_chan.uid
|
||||
peers: list[tuple] = list(self._peer_connected)
|
||||
|
||||
peers: list = []
|
||||
server: _server.IPCServer = self.ipc_server
|
||||
if server:
|
||||
peers: list[tuple] = list(server._peer_connected)
|
||||
|
||||
fmtstr: str = (
|
||||
f' |_id: {self.aid!r}\n'
|
||||
# f" aid{ds}{self.aid!r}\n"
|
||||
|
@ -399,25 +389,6 @@ class Actor:
|
|||
|
||||
self._reg_addrs = addrs
|
||||
|
||||
async def wait_for_peer(
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
|
||||
) -> tuple[trio.Event, Channel]:
|
||||
'''
|
||||
Wait for a connection back from a (spawned sub-)actor with
|
||||
a `uid` using a `trio.Event` for sync.
|
||||
|
||||
'''
|
||||
log.debug(f'Waiting for peer {uid!r} to connect')
|
||||
event = self._peer_connected.setdefault(uid, trio.Event())
|
||||
await event.wait()
|
||||
log.debug(f'{uid!r} successfully connected back to us')
|
||||
return (
|
||||
event,
|
||||
self._peers[uid][-1],
|
||||
)
|
||||
|
||||
def load_modules(
|
||||
self,
|
||||
# debug_mode: bool = False,
|
||||
|
@ -493,434 +464,6 @@ class Actor:
|
|||
|
||||
raise mne
|
||||
|
||||
# TODO: maybe change to mod-func and rename for implied
|
||||
# multi-transport semantics?
|
||||
async def _stream_handler(
|
||||
self,
|
||||
stream: trio.SocketStream,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Entry point for new inbound IPC connections on a specific
|
||||
transport server.
|
||||
|
||||
'''
|
||||
self._no_more_peers = trio.Event() # unset by making new
|
||||
# with _debug.maybe_open_crash_handler(
|
||||
# pdb=True,
|
||||
# ) as boxerr:
|
||||
chan = Channel.from_stream(stream)
|
||||
con_status: str = (
|
||||
'New inbound IPC connection <=\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
|
||||
# send/receive initial handshake response
|
||||
try:
|
||||
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||
aid=self.aid,
|
||||
)
|
||||
except (
|
||||
TransportClosed,
|
||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||
# during various `SocketStream.send/receive_xx()` calls
|
||||
# under different fault conditions such as,
|
||||
#
|
||||
# trio.BrokenResourceError,
|
||||
# trio.ClosedResourceError,
|
||||
#
|
||||
# Inside our `.ipc._transport` layer we absorb and
|
||||
# re-raise our own `TransportClosed` exc such that this
|
||||
# higher level runtime code can only worry one
|
||||
# "kinda-error" that we expect to tolerate during
|
||||
# discovery-sys related pings, queires, DoS etc.
|
||||
):
|
||||
# XXX: This may propagate up from `Channel._aiter_recv()`
|
||||
# and `MsgpackStream._inter_packets()` on a read from the
|
||||
# stream particularly when the runtime is first starting up
|
||||
# inside `open_root_actor()` where there is a check for
|
||||
# a bound listener on the "arbiter" addr. the reset will be
|
||||
# because the handshake was never meant took place.
|
||||
log.runtime(
|
||||
con_status
|
||||
+
|
||||
' -> But failed to handshake? Ignoring..\n'
|
||||
)
|
||||
return
|
||||
|
||||
uid: tuple[str, str] = (
|
||||
peer_aid.name,
|
||||
peer_aid.uuid,
|
||||
)
|
||||
# TODO, can we make this downstream peer tracking use the
|
||||
# `peer_aid` instead?
|
||||
familiar: str = 'new-peer'
|
||||
if _pre_chan := self._peers.get(uid):
|
||||
familiar: str = 'pre-existing-peer'
|
||||
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
||||
con_status += (
|
||||
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
||||
)
|
||||
|
||||
if _pre_chan:
|
||||
# con_status += (
|
||||
# ^TODO^ swap once we minimize conn duplication
|
||||
# -[ ] last thing might be reg/unreg runtime reqs?
|
||||
# log.warning(
|
||||
log.debug(
|
||||
f'?Wait?\n'
|
||||
f'We already have IPC with peer {uid_short!r}\n'
|
||||
f'|_{_pre_chan}\n'
|
||||
)
|
||||
|
||||
# IPC connection tracking for both peers and new children:
|
||||
# - if this is a new channel to a locally spawned
|
||||
# sub-actor there will be a spawn wait even registered
|
||||
# by a call to `.wait_for_peer()`.
|
||||
# - if a peer is connecting no such event will exit.
|
||||
event: trio.Event|None = self._peer_connected.pop(
|
||||
uid,
|
||||
None,
|
||||
)
|
||||
if event:
|
||||
con_status += (
|
||||
' -> Waking subactor spawn waiters: '
|
||||
f'{event.statistics().tasks_waiting}\n'
|
||||
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
|
||||
# f' {event}\n'
|
||||
# f' |{event.statistics()}\n'
|
||||
)
|
||||
# wake tasks waiting on this IPC-transport "connect-back"
|
||||
event.set()
|
||||
|
||||
else:
|
||||
con_status += (
|
||||
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
|
||||
) # type: ignore
|
||||
|
||||
chans: list[Channel] = self._peers[uid]
|
||||
# if chans:
|
||||
# # TODO: re-use channels for new connections instead
|
||||
# # of always new ones?
|
||||
# # => will require changing all the discovery funcs..
|
||||
|
||||
# append new channel
|
||||
# TODO: can we just use list-ref directly?
|
||||
chans.append(chan)
|
||||
|
||||
con_status += ' -> Entering RPC msg loop..\n'
|
||||
log.runtime(con_status)
|
||||
|
||||
# Begin channel management - respond to remote requests and
|
||||
# process received reponses.
|
||||
disconnected: bool = False
|
||||
last_msg: MsgType
|
||||
try:
|
||||
(
|
||||
disconnected,
|
||||
last_msg,
|
||||
) = await process_messages(
|
||||
self,
|
||||
chan,
|
||||
)
|
||||
except trio.Cancelled:
|
||||
log.cancel(
|
||||
'IPC transport msg loop was cancelled\n'
|
||||
f'c)>\n'
|
||||
f' |_{chan}\n'
|
||||
)
|
||||
raise
|
||||
|
||||
finally:
|
||||
local_nursery: (
|
||||
ActorNursery|None
|
||||
) = self._actoruid2nursery.get(uid)
|
||||
|
||||
# This is set in ``Portal.cancel_actor()``. So if
|
||||
# the peer was cancelled we try to wait for them
|
||||
# to tear down their side of the connection before
|
||||
# moving on with closing our own side.
|
||||
if (
|
||||
local_nursery
|
||||
and (
|
||||
self._cancel_called
|
||||
or
|
||||
chan._cancel_called
|
||||
)
|
||||
#
|
||||
# ^-TODO-^ along with this is there another condition
|
||||
# that we should filter with to avoid entering this
|
||||
# waiting block needlessly?
|
||||
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||
# only if the `._children` table is empty or has
|
||||
# only `Portal`s with .chan._cancel_called ==
|
||||
# True` as per what we had below; the MAIN DIFF
|
||||
# BEING that just bc one `Portal.cancel_actor()`
|
||||
# was called, doesn't mean the whole actor-nurse
|
||||
# is gonna exit any time soon right!?
|
||||
#
|
||||
# or
|
||||
# all(chan._cancel_called for chan in chans)
|
||||
|
||||
):
|
||||
log.cancel(
|
||||
'Waiting on cancel request to peer..\n'
|
||||
f'c)=>\n'
|
||||
f' |_{chan.uid}\n'
|
||||
)
|
||||
|
||||
# XXX: this is a soft wait on the channel (and its
|
||||
# underlying transport protocol) to close from the
|
||||
# remote peer side since we presume that any channel
|
||||
# which is mapped to a sub-actor (i.e. it's managed
|
||||
# by local actor-nursery) has a message that is sent
|
||||
# to the peer likely by this actor (which may be in
|
||||
# a shutdown sequence due to cancellation) when the
|
||||
# local runtime here is now cancelled while
|
||||
# (presumably) in the middle of msg loop processing.
|
||||
chan_info: str = (
|
||||
f'{chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
f' |_{chan.transport}\n\n'
|
||||
)
|
||||
with trio.move_on_after(0.5) as drain_cs:
|
||||
drain_cs.shield = True
|
||||
|
||||
# attempt to wait for the far end to close the
|
||||
# channel and bail after timeout (a 2-generals
|
||||
# problem on closure).
|
||||
assert chan.transport
|
||||
async for msg in chan.transport.drain():
|
||||
|
||||
# try to deliver any lingering msgs
|
||||
# before we destroy the channel.
|
||||
# This accomplishes deterministic
|
||||
# ``Portal.cancel_actor()`` cancellation by
|
||||
# making sure any RPC response to that call is
|
||||
# delivered the local calling task.
|
||||
# TODO: factor this into a helper?
|
||||
log.warning(
|
||||
'Draining msg from disconnected peer\n'
|
||||
f'{chan_info}'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# cid: str|None = msg.get('cid')
|
||||
cid: str|None = msg.cid
|
||||
if cid:
|
||||
# deliver response to local caller/waiter
|
||||
await self._deliver_ctx_payload(
|
||||
chan,
|
||||
cid,
|
||||
msg,
|
||||
)
|
||||
if drain_cs.cancelled_caught:
|
||||
log.warning(
|
||||
'Timed out waiting on IPC transport channel to drain?\n'
|
||||
f'{chan_info}'
|
||||
)
|
||||
|
||||
# XXX NOTE XXX when no explicit call to
|
||||
# `open_root_actor()` was made by the application
|
||||
# (normally we implicitly make that call inside
|
||||
# the first `.open_nursery()` in root-actor
|
||||
# user/app code), we can assume that either we
|
||||
# are NOT the root actor or are root but the
|
||||
# runtime was started manually. and thus DO have
|
||||
# to wait for the nursery-enterer to exit before
|
||||
# shutting down the local runtime to avoid
|
||||
# clobbering any ongoing subactor
|
||||
# teardown/debugging/graceful-cancel.
|
||||
#
|
||||
# see matching note inside `._supervise.open_nursery()`
|
||||
#
|
||||
# TODO: should we have a separate cs + timeout
|
||||
# block here?
|
||||
if (
|
||||
# XXX SO either,
|
||||
# - not root OR,
|
||||
# - is root but `open_root_actor()` was
|
||||
# entered manually (in which case we do
|
||||
# the equiv wait there using the
|
||||
# `devx._debug` sub-sys APIs).
|
||||
not local_nursery._implicit_runtime_started
|
||||
):
|
||||
log.runtime(
|
||||
'Waiting on local actor nursery to exit..\n'
|
||||
f'|_{local_nursery}\n'
|
||||
)
|
||||
with trio.move_on_after(0.5) as an_exit_cs:
|
||||
an_exit_cs.shield = True
|
||||
await local_nursery.exited.wait()
|
||||
|
||||
# TODO: currently this is always triggering for every
|
||||
# sub-daemon spawned from the `piker.services._mngr`?
|
||||
# -[ ] how do we ensure that the IPC is supposed to
|
||||
# be long lived and isn't just a register?
|
||||
# |_ in the register case how can we signal that the
|
||||
# ephemeral msg loop was intentional?
|
||||
if (
|
||||
# not local_nursery._implicit_runtime_started
|
||||
# and
|
||||
an_exit_cs.cancelled_caught
|
||||
):
|
||||
report: str = (
|
||||
'Timed out waiting on local actor-nursery to exit?\n'
|
||||
f'c)>\n'
|
||||
f' |_{local_nursery}\n'
|
||||
)
|
||||
if children := local_nursery._children:
|
||||
# indent from above local-nurse repr
|
||||
report += (
|
||||
f' |_{pformat(children)}\n'
|
||||
)
|
||||
|
||||
log.warning(report)
|
||||
|
||||
if disconnected:
|
||||
# if the transport died and this actor is still
|
||||
# registered within a local nursery, we report
|
||||
# that the IPC layer may have failed
|
||||
# unexpectedly since it may be the cause of
|
||||
# other downstream errors.
|
||||
entry: tuple|None = local_nursery._children.get(uid)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
if (
|
||||
(poll := getattr(proc, 'poll', None))
|
||||
and
|
||||
poll() is None # proc still alive
|
||||
):
|
||||
# TODO: change log level based on
|
||||
# detecting whether chan was created for
|
||||
# ephemeral `.register_actor()` request!
|
||||
# -[ ] also, that should be avoidable by
|
||||
# re-using any existing chan from the
|
||||
# `._discovery.get_registry()` call as
|
||||
# well..
|
||||
log.runtime(
|
||||
f'Peer IPC broke but subproc is alive?\n\n'
|
||||
|
||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||
f' |_{proc}\n'
|
||||
)
|
||||
|
||||
# ``Channel`` teardown and closure sequence
|
||||
# drop ref to channel so it can be gc-ed and disconnected
|
||||
con_teardown_status: str = (
|
||||
f'IPC channel disconnected:\n'
|
||||
f'<=x uid: {chan.uid}\n'
|
||||
f' |_{pformat(chan)}\n\n'
|
||||
)
|
||||
chans.remove(chan)
|
||||
|
||||
# TODO: do we need to be this pedantic?
|
||||
if not chans:
|
||||
con_teardown_status += (
|
||||
f'-> No more channels with {chan.uid}'
|
||||
)
|
||||
self._peers.pop(uid, None)
|
||||
|
||||
peers_str: str = ''
|
||||
for uid, chans in self._peers.items():
|
||||
peers_str += (
|
||||
f'uid: {uid}\n'
|
||||
)
|
||||
for i, chan in enumerate(chans):
|
||||
peers_str += (
|
||||
f' |_[{i}] {pformat(chan)}\n'
|
||||
)
|
||||
|
||||
con_teardown_status += (
|
||||
f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
|
||||
)
|
||||
|
||||
# No more channels to other actors (at all) registered
|
||||
# as connected.
|
||||
if not self._peers:
|
||||
con_teardown_status += (
|
||||
'Signalling no more peer channel connections'
|
||||
)
|
||||
self._no_more_peers.set()
|
||||
|
||||
# NOTE: block this actor from acquiring the
|
||||
# debugger-TTY-lock since we have no way to know if we
|
||||
# cancelled it and further there is no way to ensure the
|
||||
# lock will be released if acquired due to having no
|
||||
# more active IPC channels.
|
||||
if _state.is_root_process():
|
||||
pdb_lock = _debug.Lock
|
||||
pdb_lock._blocked.add(uid)
|
||||
|
||||
# TODO: NEEEDS TO BE TESTED!
|
||||
# actually, no idea if this ever even enters.. XD
|
||||
#
|
||||
# XXX => YES IT DOES, when i was testing ctl-c
|
||||
# from broken debug TTY locking due to
|
||||
# msg-spec races on application using RunVar...
|
||||
if (
|
||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||
and
|
||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
||||
and
|
||||
local_nursery
|
||||
):
|
||||
entry: tuple|None = local_nursery._children.get(
|
||||
tuple(pdb_user_uid)
|
||||
)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
if (
|
||||
(poll := getattr(proc, 'poll', None))
|
||||
and poll() is None
|
||||
):
|
||||
log.cancel(
|
||||
'Root actor reports no-more-peers, BUT\n'
|
||||
'a DISCONNECTED child still has the debug '
|
||||
'lock!\n\n'
|
||||
# f'root uid: {self.uid}\n'
|
||||
f'last disconnected child uid: {uid}\n'
|
||||
f'locking child uid: {pdb_user_uid}\n'
|
||||
)
|
||||
await _debug.maybe_wait_for_debugger(
|
||||
child_in_debug=True
|
||||
)
|
||||
|
||||
# TODO: just bc a child's transport dropped
|
||||
# doesn't mean it's not still using the pdb
|
||||
# REPL! so,
|
||||
# -[ ] ideally we can check out child proc
|
||||
# tree to ensure that its alive (and
|
||||
# actually using the REPL) before we cancel
|
||||
# it's lock acquire by doing the below!
|
||||
# -[ ] create a way to read the tree of each actor's
|
||||
# grandchildren such that when an
|
||||
# intermediary parent is cancelled but their
|
||||
# child has locked the tty, the grandparent
|
||||
# will not allow the parent to cancel or
|
||||
# zombie reap the child! see open issue:
|
||||
# - https://github.com/goodboy/tractor/issues/320
|
||||
# ------ - ------
|
||||
# if a now stale local task has the TTY lock still
|
||||
# we cancel it to allow servicing other requests for
|
||||
# the lock.
|
||||
if (
|
||||
(db_cs := pdb_lock.get_locking_task_cs())
|
||||
and not db_cs.cancel_called
|
||||
and uid == pdb_user_uid
|
||||
):
|
||||
log.critical(
|
||||
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
||||
)
|
||||
# TODO: figure out why this breaks tests..
|
||||
db_cs.cancel()
|
||||
|
||||
log.runtime(con_teardown_status)
|
||||
# finally block closure
|
||||
|
||||
# TODO: rename to `._deliver_payload()` since this handles
|
||||
# more then just `result` msgs now obvi XD
|
||||
async def _deliver_ctx_payload(
|
||||
|
@ -1157,7 +700,7 @@ class Actor:
|
|||
)
|
||||
assert isinstance(chan, Channel)
|
||||
|
||||
# Initial handshake: swap names.
|
||||
# init handshake: swap actor-IDs.
|
||||
await chan._do_handshake(aid=self.aid)
|
||||
|
||||
accept_addrs: list[UnwrappedAddress]|None = None
|
||||
|
@ -1719,6 +1262,10 @@ async def async_main(
|
|||
the actor's "runtime" and all thus all ongoing RPC tasks.
|
||||
|
||||
'''
|
||||
# XXX NOTE, `_state._current_actor` **must** be set prior to
|
||||
# calling this core runtime entrypoint!
|
||||
assert actor is _state.current_actor()
|
||||
|
||||
actor._task: trio.Task = trio.lowlevel.current_task()
|
||||
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
|
@ -1778,7 +1325,6 @@ async def async_main(
|
|||
) as service_nursery,
|
||||
|
||||
_server.open_ipc_server(
|
||||
actor=actor,
|
||||
parent_tn=service_nursery,
|
||||
stream_handler_tn=service_nursery,
|
||||
) as ipc_server,
|
||||
|
@ -1832,7 +1378,6 @@ async def async_main(
|
|||
'Booting IPC server'
|
||||
)
|
||||
eps: list = await ipc_server.listen_on(
|
||||
actor=actor,
|
||||
accept_addrs=accept_addrs,
|
||||
stream_handler_nursery=service_nursery,
|
||||
)
|
||||
|
@ -1916,9 +1461,8 @@ async def async_main(
|
|||
if actor._parent_chan:
|
||||
await root_nursery.start(
|
||||
partial(
|
||||
process_messages,
|
||||
actor,
|
||||
actor._parent_chan,
|
||||
_rpc.process_messages,
|
||||
chan=actor._parent_chan,
|
||||
shield=True,
|
||||
)
|
||||
)
|
||||
|
@ -1959,7 +1503,7 @@ async def async_main(
|
|||
log.exception(err_report)
|
||||
|
||||
if actor._parent_chan:
|
||||
await try_ship_error_to_remote(
|
||||
await _rpc.try_ship_error_to_remote(
|
||||
actor._parent_chan,
|
||||
internal_err,
|
||||
)
|
||||
|
@ -2053,16 +1597,18 @@ async def async_main(
|
|||
)
|
||||
|
||||
# Ensure all peers (actors connected to us as clients) are finished
|
||||
if not actor._no_more_peers.is_set():
|
||||
if any(
|
||||
chan.connected() for chan in chain(*actor._peers.values())
|
||||
):
|
||||
teardown_report += (
|
||||
f'-> Waiting for remaining peers {actor._peers} to clear..\n'
|
||||
)
|
||||
log.runtime(teardown_report)
|
||||
with CancelScope(shield=True):
|
||||
await actor._no_more_peers.wait()
|
||||
if (
|
||||
(ipc_server := actor.ipc_server)
|
||||
and
|
||||
ipc_server.has_peers(check_chans=True)
|
||||
):
|
||||
teardown_report += (
|
||||
f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
|
||||
)
|
||||
log.runtime(teardown_report)
|
||||
await ipc_server.wait_for_no_more_peers(
|
||||
shield=True,
|
||||
)
|
||||
|
||||
teardown_report += (
|
||||
'-> All peer channels are complete\n'
|
||||
|
|
|
@ -58,9 +58,11 @@ from tractor.msg.types import (
|
|||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ipc import IPCServer
|
||||
from ._supervise import ActorNursery
|
||||
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
|
||||
|
||||
|
||||
log = get_logger('tractor')
|
||||
|
||||
# placeholder for an mp start context if so using that backend
|
||||
|
@ -481,6 +483,7 @@ async def trio_proc(
|
|||
|
||||
cancelled_during_spawn: bool = False
|
||||
proc: trio.Process|None = None
|
||||
ipc_server: IPCServer = actor_nursery._actor.ipc_server
|
||||
try:
|
||||
try:
|
||||
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
|
||||
|
@ -492,7 +495,7 @@ async def trio_proc(
|
|||
# wait for actor to spawn and connect back to us
|
||||
# channel should have handshake completed by the
|
||||
# local actor by the time we get a ref to it
|
||||
event, chan = await actor_nursery._actor.wait_for_peer(
|
||||
event, chan = await ipc_server.wait_for_peer(
|
||||
subactor.uid
|
||||
)
|
||||
|
||||
|
@ -724,11 +727,12 @@ async def mp_proc(
|
|||
|
||||
log.runtime(f"Started {proc}")
|
||||
|
||||
ipc_server: IPCServer = actor_nursery._actor.ipc_server
|
||||
try:
|
||||
# wait for actor to spawn and connect back to us
|
||||
# channel should have handshake completed by the
|
||||
# local actor by the time we get a ref to it
|
||||
event, chan = await actor_nursery._actor.wait_for_peer(
|
||||
event, chan = await ipc_server.wait_for_peer(
|
||||
subactor.uid,
|
||||
)
|
||||
|
||||
|
|
|
@ -53,6 +53,9 @@ from . import _spawn
|
|||
|
||||
if TYPE_CHECKING:
|
||||
import multiprocessing as mp
|
||||
# from .ipc._server import IPCServer
|
||||
from .ipc import IPCServer
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
@ -315,6 +318,9 @@ class ActorNursery:
|
|||
children: dict = self._children
|
||||
child_count: int = len(children)
|
||||
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
||||
|
||||
server: IPCServer = self._actor.ipc_server
|
||||
|
||||
with trio.move_on_after(3) as cs:
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
|
@ -337,7 +343,7 @@ class ActorNursery:
|
|||
|
||||
else:
|
||||
if portal is None: # actor hasn't fully spawned yet
|
||||
event = self._actor._peer_connected[subactor.uid]
|
||||
event: trio.Event = server._peer_connected[subactor.uid]
|
||||
log.warning(
|
||||
f"{subactor.uid} never 't finished spawning?"
|
||||
)
|
||||
|
@ -353,7 +359,7 @@ class ActorNursery:
|
|||
if portal is None:
|
||||
# cancelled while waiting on the event
|
||||
# to arrive
|
||||
chan = self._actor._peers[subactor.uid][-1]
|
||||
chan = server._peers[subactor.uid][-1]
|
||||
if chan:
|
||||
portal = Portal(chan)
|
||||
else: # there's no other choice left
|
||||
|
|
|
@ -92,7 +92,11 @@ from tractor._state import (
|
|||
if TYPE_CHECKING:
|
||||
from trio.lowlevel import Task
|
||||
from threading import Thread
|
||||
from tractor.ipc import Channel
|
||||
from tractor.ipc import (
|
||||
Channel,
|
||||
IPCServer,
|
||||
# _server, # TODO? export at top level?
|
||||
)
|
||||
from tractor._runtime import (
|
||||
Actor,
|
||||
)
|
||||
|
@ -1434,6 +1438,7 @@ def any_connected_locker_child() -> bool:
|
|||
|
||||
'''
|
||||
actor: Actor = current_actor()
|
||||
server: IPCServer = actor.ipc_server
|
||||
|
||||
if not is_root_process():
|
||||
raise InternalError('This is a root-actor only API!')
|
||||
|
@ -1443,7 +1448,7 @@ def any_connected_locker_child() -> bool:
|
|||
and
|
||||
(uid_in_debug := ctx.chan.uid)
|
||||
):
|
||||
chans: list[tractor.Channel] = actor._peers.get(
|
||||
chans: list[tractor.Channel] = server._peers.get(
|
||||
tuple(uid_in_debug)
|
||||
)
|
||||
if chans:
|
||||
|
@ -3003,6 +3008,7 @@ async def _maybe_enter_pm(
|
|||
[BaseException|BaseExceptionGroup],
|
||||
bool,
|
||||
] = lambda err: not is_multi_cancelled(err),
|
||||
**_pause_kws,
|
||||
|
||||
):
|
||||
if (
|
||||
|
@ -3029,6 +3035,7 @@ async def _maybe_enter_pm(
|
|||
await post_mortem(
|
||||
api_frame=api_frame,
|
||||
tb=tb,
|
||||
**_pause_kws,
|
||||
)
|
||||
return True
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ from tractor.log import get_logger
|
|||
from tractor._exceptions import (
|
||||
MsgTypeError,
|
||||
pack_from_raise,
|
||||
TransportClosed,
|
||||
)
|
||||
from tractor.msg import (
|
||||
Aid,
|
||||
|
@ -256,7 +257,7 @@ class Channel:
|
|||
self,
|
||||
payload: Any,
|
||||
|
||||
hide_tb: bool = False,
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -274,18 +275,27 @@ class Channel:
|
|||
payload,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
except BaseException as _err:
|
||||
except (
|
||||
BaseException,
|
||||
MsgTypeError,
|
||||
TransportClosed,
|
||||
)as _err:
|
||||
err = _err # bind for introspection
|
||||
if not isinstance(_err, MsgTypeError):
|
||||
# assert err
|
||||
__tracebackhide__: bool = False
|
||||
else:
|
||||
try:
|
||||
assert err.cid
|
||||
|
||||
except KeyError:
|
||||
raise err
|
||||
match err:
|
||||
case MsgTypeError():
|
||||
try:
|
||||
assert err.cid
|
||||
except KeyError:
|
||||
raise err
|
||||
case TransportClosed():
|
||||
log.transport(
|
||||
f'Transport stream closed due to\n'
|
||||
f'{err.repr_src_exc()}\n'
|
||||
)
|
||||
|
||||
case _:
|
||||
# never suppress non-tpt sources
|
||||
__tracebackhide__: bool = False
|
||||
raise
|
||||
|
||||
async def recv(self) -> Any:
|
||||
|
|
|
@ -19,11 +19,14 @@ multi-transport-protcol needs!
|
|||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from collections import defaultdict
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from functools import partial
|
||||
from itertools import chain
|
||||
import inspect
|
||||
from pprint import pformat
|
||||
from types import (
|
||||
ModuleType,
|
||||
)
|
||||
|
@ -40,24 +43,540 @@ from trio import (
|
|||
SocketListener,
|
||||
)
|
||||
|
||||
from ..msg import Struct
|
||||
# from ..devx import _debug
|
||||
from .._exceptions import (
|
||||
TransportClosed,
|
||||
)
|
||||
from .. import _rpc
|
||||
from ..msg import (
|
||||
MsgType,
|
||||
Struct,
|
||||
types as msgtypes,
|
||||
)
|
||||
from ..trionics import maybe_open_nursery
|
||||
from .. import (
|
||||
_state,
|
||||
log,
|
||||
)
|
||||
from .._addr import Address
|
||||
from ._chan import Channel
|
||||
from ._transport import MsgTransport
|
||||
from ._uds import UDSAddress
|
||||
from ._tcp import TCPAddress
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .._runtime import Actor
|
||||
from .._supervise import ActorNursery
|
||||
|
||||
|
||||
log = log.get_logger(__name__)
|
||||
|
||||
|
||||
async def maybe_wait_on_canced_subs(
|
||||
uid: tuple[str, str],
|
||||
chan: Channel,
|
||||
disconnected: bool,
|
||||
|
||||
actor: Actor|None = None,
|
||||
chan_drain_timeout: float = 0.5,
|
||||
an_exit_timeout: float = 0.5,
|
||||
|
||||
) -> ActorNursery|None:
|
||||
'''
|
||||
When a process-local actor-nursery is found for the given actor
|
||||
`uid` (i.e. that peer is **also** a subactor of this parent), we
|
||||
attempt to (with timeouts) wait on,
|
||||
|
||||
- all IPC msgs to drain on the (common) `Channel` such that all
|
||||
local `Context`-parent-tasks can also gracefully collect
|
||||
`ContextCancelled` msgs from their respective remote children
|
||||
vs. a `chan_drain_timeout`.
|
||||
|
||||
- the actor-nursery to cancel-n-join all its supervised children
|
||||
(processes) *gracefully* vs. a `an_exit_timeout` and thus also
|
||||
detect cases where the IPC transport connection broke but
|
||||
a sub-process is detected as still alive (a case that happens
|
||||
when the subactor is still in an active debugger REPL session).
|
||||
|
||||
If the timeout expires in either case we ofc report with warning.
|
||||
|
||||
'''
|
||||
actor = actor or _state.current_actor()
|
||||
|
||||
# XXX running outside actor-runtime usage,
|
||||
# - unit testing
|
||||
# - possibly manual usage (eventually) ?
|
||||
if not actor:
|
||||
return None
|
||||
|
||||
local_nursery: (
|
||||
ActorNursery|None
|
||||
) = actor._actoruid2nursery.get(uid)
|
||||
|
||||
# This is set in `Portal.cancel_actor()`. So if
|
||||
# the peer was cancelled we try to wait for them
|
||||
# to tear down their side of the connection before
|
||||
# moving on with closing our own side.
|
||||
if (
|
||||
local_nursery
|
||||
and (
|
||||
actor._cancel_called
|
||||
or
|
||||
chan._cancel_called
|
||||
)
|
||||
#
|
||||
# ^-TODO-^ along with this is there another condition
|
||||
# that we should filter with to avoid entering this
|
||||
# waiting block needlessly?
|
||||
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||
# only if the `._children` table is empty or has
|
||||
# only `Portal`s with .chan._cancel_called ==
|
||||
# True` as per what we had below; the MAIN DIFF
|
||||
# BEING that just bc one `Portal.cancel_actor()`
|
||||
# was called, doesn't mean the whole actor-nurse
|
||||
# is gonna exit any time soon right!?
|
||||
#
|
||||
# or
|
||||
# all(chan._cancel_called for chan in chans)
|
||||
|
||||
):
|
||||
log.cancel(
|
||||
'Waiting on cancel request to peer..\n'
|
||||
f'c)=>\n'
|
||||
f' |_{chan.uid}\n'
|
||||
)
|
||||
|
||||
# XXX: this is a soft wait on the channel (and its
|
||||
# underlying transport protocol) to close from the
|
||||
# remote peer side since we presume that any channel
|
||||
# which is mapped to a sub-actor (i.e. it's managed
|
||||
# by local actor-nursery) has a message that is sent
|
||||
# to the peer likely by this actor (which may be in
|
||||
# a shutdown sequence due to cancellation) when the
|
||||
# local runtime here is now cancelled while
|
||||
# (presumably) in the middle of msg loop processing.
|
||||
chan_info: str = (
|
||||
f'{chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
f' |_{chan.transport}\n\n'
|
||||
)
|
||||
with trio.move_on_after(chan_drain_timeout) as drain_cs:
|
||||
drain_cs.shield = True
|
||||
|
||||
# attempt to wait for the far end to close the
|
||||
# channel and bail after timeout (a 2-generals
|
||||
# problem on closure).
|
||||
assert chan.transport
|
||||
async for msg in chan.transport.drain():
|
||||
|
||||
# try to deliver any lingering msgs
|
||||
# before we destroy the channel.
|
||||
# This accomplishes deterministic
|
||||
# ``Portal.cancel_actor()`` cancellation by
|
||||
# making sure any RPC response to that call is
|
||||
# delivered the local calling task.
|
||||
# TODO: factor this into a helper?
|
||||
log.warning(
|
||||
'Draining msg from disconnected peer\n'
|
||||
f'{chan_info}'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# cid: str|None = msg.get('cid')
|
||||
cid: str|None = msg.cid
|
||||
if cid:
|
||||
# deliver response to local caller/waiter
|
||||
await actor._deliver_ctx_payload(
|
||||
chan,
|
||||
cid,
|
||||
msg,
|
||||
)
|
||||
if drain_cs.cancelled_caught:
|
||||
log.warning(
|
||||
'Timed out waiting on IPC transport channel to drain?\n'
|
||||
f'{chan_info}'
|
||||
)
|
||||
|
||||
# XXX NOTE XXX when no explicit call to
|
||||
# `open_root_actor()` was made by the application
|
||||
# (normally we implicitly make that call inside
|
||||
# the first `.open_nursery()` in root-actor
|
||||
# user/app code), we can assume that either we
|
||||
# are NOT the root actor or are root but the
|
||||
# runtime was started manually. and thus DO have
|
||||
# to wait for the nursery-enterer to exit before
|
||||
# shutting down the local runtime to avoid
|
||||
# clobbering any ongoing subactor
|
||||
# teardown/debugging/graceful-cancel.
|
||||
#
|
||||
# see matching note inside `._supervise.open_nursery()`
|
||||
#
|
||||
# TODO: should we have a separate cs + timeout
|
||||
# block here?
|
||||
if (
|
||||
# XXX SO either,
|
||||
# - not root OR,
|
||||
# - is root but `open_root_actor()` was
|
||||
# entered manually (in which case we do
|
||||
# the equiv wait there using the
|
||||
# `devx.debug` sub-sys APIs).
|
||||
not local_nursery._implicit_runtime_started
|
||||
):
|
||||
log.runtime(
|
||||
'Waiting on local actor nursery to exit..\n'
|
||||
f'|_{local_nursery}\n'
|
||||
)
|
||||
with trio.move_on_after(an_exit_timeout) as an_exit_cs:
|
||||
an_exit_cs.shield = True
|
||||
await local_nursery.exited.wait()
|
||||
|
||||
# TODO: currently this is always triggering for every
|
||||
# sub-daemon spawned from the `piker.services._mngr`?
|
||||
# -[ ] how do we ensure that the IPC is supposed to
|
||||
# be long lived and isn't just a register?
|
||||
# |_ in the register case how can we signal that the
|
||||
# ephemeral msg loop was intentional?
|
||||
if (
|
||||
# not local_nursery._implicit_runtime_started
|
||||
# and
|
||||
an_exit_cs.cancelled_caught
|
||||
):
|
||||
report: str = (
|
||||
'Timed out waiting on local actor-nursery to exit?\n'
|
||||
f'c)>\n'
|
||||
f' |_{local_nursery}\n'
|
||||
)
|
||||
if children := local_nursery._children:
|
||||
# indent from above local-nurse repr
|
||||
report += (
|
||||
f' |_{pformat(children)}\n'
|
||||
)
|
||||
|
||||
log.warning(report)
|
||||
|
||||
if disconnected:
|
||||
# if the transport died and this actor is still
|
||||
# registered within a local nursery, we report
|
||||
# that the IPC layer may have failed
|
||||
# unexpectedly since it may be the cause of
|
||||
# other downstream errors.
|
||||
entry: tuple|None = local_nursery._children.get(uid)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
if (
|
||||
(poll := getattr(proc, 'poll', None))
|
||||
and
|
||||
poll() is None # proc still alive
|
||||
):
|
||||
# TODO: change log level based on
|
||||
# detecting whether chan was created for
|
||||
# ephemeral `.register_actor()` request!
|
||||
# -[ ] also, that should be avoidable by
|
||||
# re-using any existing chan from the
|
||||
# `._discovery.get_registry()` call as
|
||||
# well..
|
||||
log.runtime(
|
||||
f'Peer IPC broke but subproc is alive?\n\n'
|
||||
|
||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||
f' |_{proc}\n'
|
||||
)
|
||||
|
||||
return local_nursery
|
||||
|
||||
# TODO multi-tpt support with per-proto peer tracking?
|
||||
#
|
||||
# -[x] maybe change to mod-func and rename for implied
|
||||
# multi-transport semantics?
|
||||
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
|
||||
# so that we can query per tpt all peer contact infos?
|
||||
# |_[ ] possibly provide a global viewing via a
|
||||
# `collections.ChainMap`?
|
||||
#
|
||||
async def handle_stream_from_peer(
|
||||
stream: trio.SocketStream,
|
||||
|
||||
*,
|
||||
server: IPCServer,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Top-level `trio.abc.Stream` (i.e. normally `trio.SocketStream`)
|
||||
handler-callback as spawn-invoked by `trio.serve_listeners()`.
|
||||
|
||||
Note that each call to this handler is as a spawned task inside
|
||||
any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
|
||||
such that it is invoked as,
|
||||
|
||||
IPCEndpoint.stream_handler_tn.start_soon(
|
||||
handle_stream,
|
||||
stream,
|
||||
)
|
||||
|
||||
'''
|
||||
server._no_more_peers = trio.Event() # unset by making new
|
||||
|
||||
# TODO, debug_mode tooling for when hackin this lower layer?
|
||||
# with _debug.maybe_open_crash_handler(
|
||||
# pdb=True,
|
||||
# ) as boxerr:
|
||||
|
||||
chan = Channel.from_stream(stream)
|
||||
con_status: str = (
|
||||
'New inbound IPC connection <=\n'
|
||||
f'|_{chan}\n'
|
||||
)
|
||||
|
||||
# initial handshake with peer phase
|
||||
try:
|
||||
if actor := _state.current_actor():
|
||||
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||
aid=actor.aid,
|
||||
)
|
||||
except (
|
||||
TransportClosed,
|
||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||
# during various `SocketStream.send/receive_xx()` calls
|
||||
# under different fault conditions such as,
|
||||
#
|
||||
# trio.BrokenResourceError,
|
||||
# trio.ClosedResourceError,
|
||||
#
|
||||
# Inside our `.ipc._transport` layer we absorb and
|
||||
# re-raise our own `TransportClosed` exc such that this
|
||||
# higher level runtime code can only worry one
|
||||
# "kinda-error" that we expect to tolerate during
|
||||
# discovery-sys related pings, queires, DoS etc.
|
||||
):
|
||||
# XXX: This may propagate up from `Channel._aiter_recv()`
|
||||
# and `MsgpackStream._inter_packets()` on a read from the
|
||||
# stream particularly when the runtime is first starting up
|
||||
# inside `open_root_actor()` where there is a check for
|
||||
# a bound listener on the "arbiter" addr. the reset will be
|
||||
# because the handshake was never meant took place.
|
||||
log.runtime(
|
||||
con_status
|
||||
+
|
||||
' -> But failed to handshake? Ignoring..\n'
|
||||
)
|
||||
return
|
||||
|
||||
uid: tuple[str, str] = (
|
||||
peer_aid.name,
|
||||
peer_aid.uuid,
|
||||
)
|
||||
# TODO, can we make this downstream peer tracking use the
|
||||
# `peer_aid` instead?
|
||||
familiar: str = 'new-peer'
|
||||
if _pre_chan := server._peers.get(uid):
|
||||
familiar: str = 'pre-existing-peer'
|
||||
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
||||
con_status += (
|
||||
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
||||
)
|
||||
|
||||
if _pre_chan:
|
||||
# con_status += (
|
||||
# ^TODO^ swap once we minimize conn duplication
|
||||
# -[ ] last thing might be reg/unreg runtime reqs?
|
||||
# log.warning(
|
||||
log.debug(
|
||||
f'?Wait?\n'
|
||||
f'We already have IPC with peer {uid_short!r}\n'
|
||||
f'|_{_pre_chan}\n'
|
||||
)
|
||||
|
||||
# IPC connection tracking for both peers and new children:
|
||||
# - if this is a new channel to a locally spawned
|
||||
# sub-actor there will be a spawn wait even registered
|
||||
# by a call to `.wait_for_peer()`.
|
||||
# - if a peer is connecting no such event will exit.
|
||||
event: trio.Event|None = server._peer_connected.pop(
|
||||
uid,
|
||||
None,
|
||||
)
|
||||
if event:
|
||||
con_status += (
|
||||
' -> Waking subactor spawn waiters: '
|
||||
f'{event.statistics().tasks_waiting}\n'
|
||||
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
|
||||
# f' {event}\n'
|
||||
# f' |{event.statistics()}\n'
|
||||
)
|
||||
# wake tasks waiting on this IPC-transport "connect-back"
|
||||
event.set()
|
||||
|
||||
else:
|
||||
con_status += (
|
||||
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
|
||||
) # type: ignore
|
||||
|
||||
chans: list[Channel] = server._peers[uid]
|
||||
# if chans:
|
||||
# # TODO: re-use channels for new connections instead
|
||||
# # of always new ones?
|
||||
# # => will require changing all the discovery funcs..
|
||||
|
||||
# append new channel
|
||||
# TODO: can we just use list-ref directly?
|
||||
chans.append(chan)
|
||||
|
||||
con_status += ' -> Entering RPC msg loop..\n'
|
||||
log.runtime(con_status)
|
||||
|
||||
# Begin channel management - respond to remote requests and
|
||||
# process received reponses.
|
||||
disconnected: bool = False
|
||||
last_msg: MsgType
|
||||
try:
|
||||
(
|
||||
disconnected,
|
||||
last_msg,
|
||||
) = await _rpc.process_messages(
|
||||
chan=chan,
|
||||
)
|
||||
except trio.Cancelled:
|
||||
log.cancel(
|
||||
'IPC transport msg loop was cancelled\n'
|
||||
f'c)>\n'
|
||||
f' |_{chan}\n'
|
||||
)
|
||||
raise
|
||||
|
||||
finally:
|
||||
|
||||
# check if there are subs which we should gracefully join at
|
||||
# both the inter-actor-task and subprocess levels to
|
||||
# gracefully remote cancel and later disconnect (particularly
|
||||
# for permitting subs engaged in active debug-REPL sessions).
|
||||
local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
|
||||
uid=uid,
|
||||
chan=chan,
|
||||
disconnected=disconnected,
|
||||
)
|
||||
|
||||
# ``Channel`` teardown and closure sequence
|
||||
# drop ref to channel so it can be gc-ed and disconnected
|
||||
con_teardown_status: str = (
|
||||
f'IPC channel disconnected:\n'
|
||||
f'<=x uid: {chan.uid}\n'
|
||||
f' |_{pformat(chan)}\n\n'
|
||||
)
|
||||
chans.remove(chan)
|
||||
|
||||
# TODO: do we need to be this pedantic?
|
||||
if not chans:
|
||||
con_teardown_status += (
|
||||
f'-> No more channels with {chan.uid}'
|
||||
)
|
||||
server._peers.pop(uid, None)
|
||||
|
||||
peers_str: str = ''
|
||||
for uid, chans in server._peers.items():
|
||||
peers_str += (
|
||||
f'uid: {uid}\n'
|
||||
)
|
||||
for i, chan in enumerate(chans):
|
||||
peers_str += (
|
||||
f' |_[{i}] {pformat(chan)}\n'
|
||||
)
|
||||
|
||||
con_teardown_status += (
|
||||
f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n'
|
||||
)
|
||||
|
||||
# No more channels to other actors (at all) registered
|
||||
# as connected.
|
||||
if not server._peers:
|
||||
con_teardown_status += (
|
||||
'Signalling no more peer channel connections'
|
||||
)
|
||||
server._no_more_peers.set()
|
||||
|
||||
# NOTE: block this actor from acquiring the
|
||||
# debugger-TTY-lock since we have no way to know if we
|
||||
# cancelled it and further there is no way to ensure the
|
||||
# lock will be released if acquired due to having no
|
||||
# more active IPC channels.
|
||||
if (
|
||||
_state.is_root_process()
|
||||
and
|
||||
_state.is_debug_mode()
|
||||
):
|
||||
from ..devx import _debug
|
||||
pdb_lock = _debug.Lock
|
||||
pdb_lock._blocked.add(uid)
|
||||
|
||||
# TODO: NEEEDS TO BE TESTED!
|
||||
# actually, no idea if this ever even enters.. XD
|
||||
#
|
||||
# XXX => YES IT DOES, when i was testing ctl-c
|
||||
# from broken debug TTY locking due to
|
||||
# msg-spec races on application using RunVar...
|
||||
if (
|
||||
local_nursery
|
||||
and
|
||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||
and
|
||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
||||
):
|
||||
entry: tuple|None = local_nursery._children.get(
|
||||
tuple(pdb_user_uid)
|
||||
)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
if (
|
||||
(poll := getattr(proc, 'poll', None))
|
||||
and poll() is None
|
||||
):
|
||||
log.cancel(
|
||||
'Root actor reports no-more-peers, BUT\n'
|
||||
'a DISCONNECTED child still has the debug '
|
||||
'lock!\n\n'
|
||||
# f'root uid: {actor.uid}\n'
|
||||
f'last disconnected child uid: {uid}\n'
|
||||
f'locking child uid: {pdb_user_uid}\n'
|
||||
)
|
||||
await _debug.maybe_wait_for_debugger(
|
||||
child_in_debug=True
|
||||
)
|
||||
|
||||
# TODO: just bc a child's transport dropped
|
||||
# doesn't mean it's not still using the pdb
|
||||
# REPL! so,
|
||||
# -[ ] ideally we can check out child proc
|
||||
# tree to ensure that its alive (and
|
||||
# actually using the REPL) before we cancel
|
||||
# it's lock acquire by doing the below!
|
||||
# -[ ] create a way to read the tree of each actor's
|
||||
# grandchildren such that when an
|
||||
# intermediary parent is cancelled but their
|
||||
# child has locked the tty, the grandparent
|
||||
# will not allow the parent to cancel or
|
||||
# zombie reap the child! see open issue:
|
||||
# - https://github.com/goodboy/tractor/issues/320
|
||||
# ------ - ------
|
||||
# if a now stale local task has the TTY lock still
|
||||
# we cancel it to allow servicing other requests for
|
||||
# the lock.
|
||||
if (
|
||||
(db_cs := pdb_lock.get_locking_task_cs())
|
||||
and not db_cs.cancel_called
|
||||
and uid == pdb_user_uid
|
||||
):
|
||||
log.critical(
|
||||
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
||||
)
|
||||
# TODO: figure out why this breaks tests..
|
||||
db_cs.cancel()
|
||||
|
||||
log.runtime(con_teardown_status)
|
||||
# finally block closure
|
||||
|
||||
|
||||
class IPCEndpoint(Struct):
|
||||
'''
|
||||
An instance of an IPC "bound" address where the lifetime of the
|
||||
|
@ -120,8 +639,23 @@ class IPCEndpoint(Struct):
|
|||
class IPCServer(Struct):
|
||||
_parent_tn: Nursery
|
||||
_stream_handler_tn: Nursery
|
||||
# level-triggered sig for whether "no peers are currently
|
||||
# connected"; field is **always** set to an instance but
|
||||
# initialized with `.is_set() == True`.
|
||||
_no_more_peers: trio.Event
|
||||
|
||||
_endpoints: list[IPCEndpoint] = []
|
||||
|
||||
# connection tracking & mgmt
|
||||
_peers: defaultdict[
|
||||
str, # uaid
|
||||
list[Channel], # IPC conns from peer
|
||||
] = defaultdict(list)
|
||||
_peer_connected: dict[
|
||||
tuple[str, str],
|
||||
trio.Event,
|
||||
] = {}
|
||||
|
||||
# syncs for setup/teardown sequences
|
||||
_shutdown: trio.Event|None = None
|
||||
|
||||
|
@ -183,6 +717,65 @@ class IPCServer(Struct):
|
|||
f'protos: {tpt_protos!r}\n'
|
||||
)
|
||||
|
||||
def has_peers(
|
||||
self,
|
||||
check_chans: bool = False,
|
||||
) -> bool:
|
||||
'''
|
||||
Predicate for "are there any active peer IPC `Channel`s at the moment?"
|
||||
|
||||
'''
|
||||
has_peers: bool = not self._no_more_peers.is_set()
|
||||
if (
|
||||
has_peers
|
||||
and
|
||||
check_chans
|
||||
):
|
||||
has_peers: bool = (
|
||||
any(chan.connected()
|
||||
for chan in chain(
|
||||
*self._peers.values()
|
||||
)
|
||||
)
|
||||
and
|
||||
has_peers
|
||||
)
|
||||
|
||||
return has_peers
|
||||
|
||||
async def wait_for_no_more_peers(
|
||||
self,
|
||||
shield: bool = False,
|
||||
) -> None:
|
||||
with trio.CancelScope(shield=shield):
|
||||
await self._no_more_peers.wait()
|
||||
|
||||
async def wait_for_peer(
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
|
||||
) -> tuple[trio.Event, Channel]:
|
||||
'''
|
||||
Wait for a connection back from a (spawned sub-)actor with
|
||||
a `uid` using a `trio.Event`.
|
||||
|
||||
Returns a pair of the event and the "last" registered IPC
|
||||
`Channel` for the peer with `uid`.
|
||||
|
||||
'''
|
||||
log.debug(f'Waiting for peer {uid!r} to connect')
|
||||
event: trio.Event = self._peer_connected.setdefault(
|
||||
uid,
|
||||
trio.Event(),
|
||||
)
|
||||
await event.wait()
|
||||
log.debug(f'{uid!r} successfully connected back to us')
|
||||
mru_chan: Channel = self._peers[uid][-1]
|
||||
return (
|
||||
event,
|
||||
mru_chan,
|
||||
)
|
||||
|
||||
@property
|
||||
def addrs(self) -> list[Address]:
|
||||
return [ep.addr for ep in self._endpoints]
|
||||
|
@ -211,17 +804,27 @@ class IPCServer(Struct):
|
|||
return ev.is_set()
|
||||
|
||||
def pformat(self) -> str:
|
||||
eps: list[IPCEndpoint] = self._endpoints
|
||||
|
||||
fmtstr: str = (
|
||||
f' |_endpoints: {self._endpoints}\n'
|
||||
state_repr: str = (
|
||||
f'{len(eps)!r} IPC-endpoints active'
|
||||
)
|
||||
fmtstr = (
|
||||
f' |_state: {state_repr}\n'
|
||||
f' no_more_peers: {self.has_peers()}\n'
|
||||
)
|
||||
if self._shutdown is not None:
|
||||
shutdown_stats: EventStatistics = self._shutdown.statistics()
|
||||
fmtstr += (
|
||||
f'\n'
|
||||
f' |_shutdown: {shutdown_stats}\n'
|
||||
f' task_waiting_on_shutdown: {shutdown_stats}\n'
|
||||
)
|
||||
|
||||
fmtstr += (
|
||||
# TODO, use the `ppfmt()` helper from `modden`!
|
||||
f' |_endpoints: {pformat(self._endpoints)}\n'
|
||||
f' |_peers: {len(self._peers)} connected\n'
|
||||
)
|
||||
|
||||
return (
|
||||
f'<IPCServer(\n'
|
||||
f'{fmtstr}'
|
||||
|
@ -249,7 +852,6 @@ class IPCServer(Struct):
|
|||
async def listen_on(
|
||||
self,
|
||||
*,
|
||||
actor: Actor,
|
||||
accept_addrs: list[tuple[str, int|str]]|None = None,
|
||||
stream_handler_nursery: Nursery|None = None,
|
||||
) -> list[IPCEndpoint]:
|
||||
|
@ -282,20 +884,19 @@ class IPCServer(Struct):
|
|||
f'{self}\n'
|
||||
)
|
||||
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Binding to endpoints for,\n'
|
||||
f'{accept_addrs}\n'
|
||||
)
|
||||
eps: list[IPCEndpoint] = await self._parent_tn.start(
|
||||
partial(
|
||||
_serve_ipc_eps,
|
||||
actor=actor,
|
||||
server=self,
|
||||
stream_handler_tn=stream_handler_nursery,
|
||||
listen_addrs=accept_addrs,
|
||||
)
|
||||
)
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Started IPC endpoints\n'
|
||||
f'{eps}\n'
|
||||
)
|
||||
|
@ -318,7 +919,6 @@ class IPCServer(Struct):
|
|||
|
||||
async def _serve_ipc_eps(
|
||||
*,
|
||||
actor: Actor,
|
||||
server: IPCServer,
|
||||
stream_handler_tn: Nursery,
|
||||
listen_addrs: list[tuple[str, int|str]],
|
||||
|
@ -352,12 +952,13 @@ async def _serve_ipc_eps(
|
|||
stream_handler_tn=stream_handler_tn,
|
||||
)
|
||||
try:
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Starting new endpoint listener\n'
|
||||
f'{ep}\n'
|
||||
)
|
||||
listener: trio.abc.Listener = await ep.start_listener()
|
||||
assert listener is ep._listener
|
||||
# actor = _state.current_actor()
|
||||
# if actor.is_registry:
|
||||
# import pdbp; pdbp.set_trace()
|
||||
|
||||
|
@ -379,7 +980,10 @@ async def _serve_ipc_eps(
|
|||
_listeners: list[SocketListener] = await listen_tn.start(
|
||||
partial(
|
||||
trio.serve_listeners,
|
||||
handler=actor._stream_handler,
|
||||
handler=partial(
|
||||
handle_stream_from_peer,
|
||||
server=server,
|
||||
),
|
||||
listeners=listeners,
|
||||
|
||||
# NOTE: configured such that new
|
||||
|
@ -389,13 +993,13 @@ async def _serve_ipc_eps(
|
|||
)
|
||||
)
|
||||
# TODO, wow make this message better! XD
|
||||
log.info(
|
||||
log.runtime(
|
||||
'Started server(s)\n'
|
||||
+
|
||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
||||
)
|
||||
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Started IPC endpoints\n'
|
||||
f'{eps}\n'
|
||||
)
|
||||
|
@ -411,6 +1015,7 @@ async def _serve_ipc_eps(
|
|||
ep.close_listener()
|
||||
server._endpoints.remove(ep)
|
||||
|
||||
# actor = _state.current_actor()
|
||||
# if actor.is_arbiter:
|
||||
# import pdbp; pdbp.set_trace()
|
||||
|
||||
|
@ -421,7 +1026,6 @@ async def _serve_ipc_eps(
|
|||
|
||||
@acm
|
||||
async def open_ipc_server(
|
||||
actor: Actor,
|
||||
parent_tn: Nursery|None = None,
|
||||
stream_handler_tn: Nursery|None = None,
|
||||
|
||||
|
@ -430,20 +1034,28 @@ async def open_ipc_server(
|
|||
async with maybe_open_nursery(
|
||||
nursery=parent_tn,
|
||||
) as rent_tn:
|
||||
no_more_peers = trio.Event()
|
||||
no_more_peers.set()
|
||||
|
||||
ipc_server = IPCServer(
|
||||
_parent_tn=rent_tn,
|
||||
_stream_handler_tn=stream_handler_tn or rent_tn,
|
||||
_no_more_peers=no_more_peers,
|
||||
)
|
||||
try:
|
||||
yield ipc_server
|
||||
log.runtime(
|
||||
f'Waiting on server to shutdown or be cancelled..\n'
|
||||
f'{ipc_server}'
|
||||
)
|
||||
# TODO? when if ever would we want/need this?
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await ipc_server.wait_for_shutdown()
|
||||
|
||||
# except BaseException as berr:
|
||||
# log.exception(
|
||||
# 'IPC server crashed on exit ?'
|
||||
# )
|
||||
# raise berr
|
||||
|
||||
finally:
|
||||
except BaseException as berr:
|
||||
log.exception(
|
||||
'IPC server caller crashed ??'
|
||||
)
|
||||
# ?TODO, maybe we can ensure the endpoints are torndown
|
||||
# (and thus their managed listeners) beforehand to ensure
|
||||
# super graceful RPC mechanics?
|
||||
|
@ -451,17 +1063,5 @@ async def open_ipc_server(
|
|||
# -[ ] but aren't we doing that already per-`listen_tn`
|
||||
# inside `_serve_ipc_eps()` above?
|
||||
#
|
||||
# if not ipc_server.is_shutdown():
|
||||
# ipc_server.cancel()
|
||||
# await ipc_server.wait_for_shutdown()
|
||||
# assert ipc_server.is_shutdown()
|
||||
pass
|
||||
|
||||
# !XXX TODO! lol so classic, the below code is rekt!
|
||||
#
|
||||
# XXX here is a perfect example of suppressing errors with
|
||||
# `trio.Cancelled` as per our demonstrating example,
|
||||
# `test_trioisms::test_acm_embedded_nursery_propagates_enter_err
|
||||
#
|
||||
# with trio.CancelScope(shield=True):
|
||||
# await ipc_server.wait_for_shutdown()
|
||||
# ipc_server.cancel()
|
||||
raise berr
|
||||
|
|
|
@ -127,6 +127,11 @@ async def start_listener(
|
|||
Start a TCP socket listener on the given `TCPAddress`.
|
||||
|
||||
'''
|
||||
log.info(
|
||||
f'Attempting to bind TCP socket\n'
|
||||
f'>[\n'
|
||||
f'|_{addr}\n'
|
||||
)
|
||||
# ?TODO, maybe we should just change the lower-level call this is
|
||||
# using internall per-listener?
|
||||
listeners: list[SocketListener] = await open_tcp_listeners(
|
||||
|
@ -140,6 +145,12 @@ async def start_listener(
|
|||
assert len(listeners) == 1
|
||||
listener = listeners[0]
|
||||
host, port = listener.socket.getsockname()[:2]
|
||||
|
||||
log.info(
|
||||
f'Listening on TCP socket\n'
|
||||
f'[>\n'
|
||||
f' |_{addr}\n'
|
||||
)
|
||||
return listener
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue