Compare commits
10 Commits
f3285ea870
...
1c73c0c0ee
Author | SHA1 | Date |
---|---|---|
|
1c73c0c0ee | |
|
101cd94e89 | |
|
3f33ba1cc0 | |
|
70f5315506 | |
|
496fac04bb | |
|
02baeb6a8b | |
|
d4ab802e14 | |
|
fdeaeef9f7 | |
|
41609d1433 | |
|
c9068522ed |
|
@ -138,11 +138,19 @@ def tpt_protos(request) -> list[str]:
|
||||||
yield proto_keys
|
yield proto_keys
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session')
|
@pytest.fixture(
|
||||||
|
scope='session',
|
||||||
|
autouse=True,
|
||||||
|
)
|
||||||
def tpt_proto(
|
def tpt_proto(
|
||||||
tpt_protos: list[str],
|
tpt_protos: list[str],
|
||||||
) -> str:
|
) -> str:
|
||||||
yield tpt_protos[0]
|
proto_key: str = tpt_protos[0]
|
||||||
|
from tractor import _state
|
||||||
|
if _state._def_tpt_proto != proto_key:
|
||||||
|
_state._def_tpt_proto = proto_key
|
||||||
|
# breakpoint()
|
||||||
|
yield proto_key
|
||||||
|
|
||||||
|
|
||||||
_ci_env: bool = os.environ.get('CI', False)
|
_ci_env: bool = os.environ.get('CI', False)
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
'''
|
||||||
|
`tractor.ipc` subsystem(s)/unit testing suites.
|
||||||
|
|
||||||
|
'''
|
|
@ -0,0 +1,72 @@
|
||||||
|
'''
|
||||||
|
High-level `.ipc._server` unit tests.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
from tractor import (
|
||||||
|
devx,
|
||||||
|
ipc,
|
||||||
|
log,
|
||||||
|
)
|
||||||
|
from tractor._testing.addr import (
|
||||||
|
get_rando_addr,
|
||||||
|
)
|
||||||
|
# TODO, use/check-roundtripping with some of these wrapper types?
|
||||||
|
#
|
||||||
|
# from .._addr import Address
|
||||||
|
# from ._chan import Channel
|
||||||
|
# from ._transport import MsgTransport
|
||||||
|
# from ._uds import UDSAddress
|
||||||
|
# from ._tcp import TCPAddress
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'_tpt_proto',
|
||||||
|
['uds', 'tcp']
|
||||||
|
)
|
||||||
|
def test_basic_ipc_server(
|
||||||
|
_tpt_proto: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
|
|
||||||
|
# so we see the socket-listener reporting on console
|
||||||
|
log.get_console_log("INFO")
|
||||||
|
|
||||||
|
rando_addr: tuple = get_rando_addr(
|
||||||
|
tpt_proto=_tpt_proto,
|
||||||
|
)
|
||||||
|
async def main():
|
||||||
|
async with ipc._server.open_ipc_server() as server:
|
||||||
|
|
||||||
|
assert (
|
||||||
|
server._parent_tn
|
||||||
|
and
|
||||||
|
server._parent_tn is server._stream_handler_tn
|
||||||
|
)
|
||||||
|
assert server._no_more_peers.is_set()
|
||||||
|
|
||||||
|
eps: list[ipc.IPCEndpoint] = await server.listen_on(
|
||||||
|
accept_addrs=[rando_addr],
|
||||||
|
stream_handler_nursery=None,
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
len(eps) == 1
|
||||||
|
and
|
||||||
|
(ep := eps[0])._listener
|
||||||
|
and
|
||||||
|
not ep.peer_tpts
|
||||||
|
)
|
||||||
|
|
||||||
|
server._parent_tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
# !TODO! actually make a bg-task connection from a client
|
||||||
|
# using `ipc._chan._connect_chan()`
|
||||||
|
|
||||||
|
with devx.maybe_open_crash_handler(
|
||||||
|
pdb=debug_mode,
|
||||||
|
):
|
||||||
|
trio.run(main)
|
|
@ -100,16 +100,29 @@ async def streamer(
|
||||||
@acm
|
@acm
|
||||||
async def open_stream() -> Awaitable[tractor.MsgStream]:
|
async def open_stream() -> Awaitable[tractor.MsgStream]:
|
||||||
|
|
||||||
async with tractor.open_nursery() as tn:
|
try:
|
||||||
portal = await tn.start_actor('streamer', enable_modules=[__name__])
|
async with tractor.open_nursery() as an:
|
||||||
async with (
|
portal = await an.start_actor(
|
||||||
portal.open_context(streamer) as (ctx, first),
|
'streamer',
|
||||||
ctx.open_stream() as stream,
|
enable_modules=[__name__],
|
||||||
):
|
)
|
||||||
yield stream
|
async with (
|
||||||
|
portal.open_context(streamer) as (ctx, first),
|
||||||
|
ctx.open_stream() as stream,
|
||||||
|
):
|
||||||
|
yield stream
|
||||||
|
|
||||||
await portal.cancel_actor()
|
print('Cancelling streamer')
|
||||||
print('CANCELLED STREAMER')
|
await portal.cancel_actor()
|
||||||
|
print('Cancelled streamer')
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
print(
|
||||||
|
f'`open_stream()` errored?\n'
|
||||||
|
f'{err!r}\n'
|
||||||
|
)
|
||||||
|
await tractor.pause(shield=True)
|
||||||
|
raise err
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -132,19 +145,28 @@ async def maybe_open_stream(taskname: str):
|
||||||
yield stream
|
yield stream
|
||||||
|
|
||||||
|
|
||||||
def test_open_local_sub_to_stream():
|
def test_open_local_sub_to_stream(
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
'''
|
'''
|
||||||
Verify a single inter-actor stream can can be fanned-out shared to
|
Verify a single inter-actor stream can can be fanned-out shared to
|
||||||
N local tasks using ``trionics.maybe_open_context():``.
|
N local tasks using `trionics.maybe_open_context()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
timeout: float = 3.6 if platform.system() != "Windows" else 10
|
timeout: float = 3.6
|
||||||
|
if platform.system() == "Windows":
|
||||||
|
timeout: float = 10
|
||||||
|
|
||||||
|
if debug_mode:
|
||||||
|
timeout = 999
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
full = list(range(1000))
|
full = list(range(1000))
|
||||||
|
|
||||||
async def get_sub_and_pull(taskname: str):
|
async def get_sub_and_pull(taskname: str):
|
||||||
|
|
||||||
|
stream: tractor.MsgStream
|
||||||
async with (
|
async with (
|
||||||
maybe_open_stream(taskname) as stream,
|
maybe_open_stream(taskname) as stream,
|
||||||
):
|
):
|
||||||
|
@ -165,17 +187,27 @@ def test_open_local_sub_to_stream():
|
||||||
assert set(seq).issubset(set(full))
|
assert set(seq).issubset(set(full))
|
||||||
print(f'{taskname} finished')
|
print(f'{taskname} finished')
|
||||||
|
|
||||||
with trio.fail_after(timeout):
|
with trio.fail_after(timeout) as cs:
|
||||||
# TODO: turns out this isn't multi-task entrant XD
|
# TODO: turns out this isn't multi-task entrant XD
|
||||||
# We probably need an indepotent entry semantic?
|
# We probably need an indepotent entry semantic?
|
||||||
async with tractor.open_root_actor():
|
async with tractor.open_root_actor(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
):
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as nurse,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
for i in range(10):
|
for i in range(10):
|
||||||
nurse.start_soon(get_sub_and_pull, f'task_{i}')
|
tn.start_soon(
|
||||||
|
get_sub_and_pull,
|
||||||
|
f'task_{i}',
|
||||||
|
)
|
||||||
await trio.sleep(0.001)
|
await trio.sleep(0.001)
|
||||||
|
|
||||||
print('all consumer tasks finished')
|
print('all consumer tasks finished')
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
pytest.fail(
|
||||||
|
'Should NOT time out in `open_root_actor()` ?'
|
||||||
|
)
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
@ -57,7 +57,11 @@ async def spawn(
|
||||||
)
|
)
|
||||||
|
|
||||||
assert len(an._children) == 1
|
assert len(an._children) == 1
|
||||||
assert portal.channel.uid in tractor.current_actor()._peers
|
assert (
|
||||||
|
portal.channel.uid
|
||||||
|
in
|
||||||
|
tractor.current_actor().ipc_server._peers
|
||||||
|
)
|
||||||
|
|
||||||
# get result from child subactor
|
# get result from child subactor
|
||||||
result = await portal.result()
|
result = await portal.result()
|
||||||
|
|
|
@ -180,7 +180,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
with tractor.devx.maybe_open_crash_handler(
|
with tractor.devx.maybe_open_crash_handler(
|
||||||
pdb=debug_mode,
|
pdb=debug_mode,
|
||||||
) as bxerr:
|
) as bxerr:
|
||||||
assert not bxerr.value
|
if bxerr:
|
||||||
|
assert not bxerr.value
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
wraps_tn_that_always_cancels() as tn,
|
wraps_tn_that_always_cancels() as tn,
|
||||||
|
|
|
@ -48,6 +48,7 @@ from ._state import (
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
|
from .ipc._server import IPCServer
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -79,7 +80,7 @@ async def get_registry(
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# TODO: try to look pre-existing connection from
|
# TODO: try to look pre-existing connection from
|
||||||
# `Actor._peers` and use it instead?
|
# `IPCServer._peers` and use it instead?
|
||||||
async with (
|
async with (
|
||||||
_connect_chan(addr) as chan,
|
_connect_chan(addr) as chan,
|
||||||
open_portal(chan) as regstr_ptl,
|
open_portal(chan) as regstr_ptl,
|
||||||
|
@ -111,7 +112,7 @@ def get_peer_by_name(
|
||||||
) -> list[Channel]|None: # at least 1
|
) -> list[Channel]|None: # at least 1
|
||||||
'''
|
'''
|
||||||
Scan for an existing connection (set) to a named actor
|
Scan for an existing connection (set) to a named actor
|
||||||
and return any channels from `Actor._peers`.
|
and return any channels from `IPCServer._peers: dict`.
|
||||||
|
|
||||||
This is an optimization method over querying the registrar for
|
This is an optimization method over querying the registrar for
|
||||||
the same info.
|
the same info.
|
||||||
|
|
|
@ -578,12 +578,11 @@ async def open_portal(
|
||||||
|
|
||||||
msg_loop_cs: trio.CancelScope|None = None
|
msg_loop_cs: trio.CancelScope|None = None
|
||||||
if start_msg_loop:
|
if start_msg_loop:
|
||||||
from ._runtime import process_messages
|
from . import _rpc
|
||||||
msg_loop_cs = await tn.start(
|
msg_loop_cs = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
process_messages,
|
_rpc.process_messages,
|
||||||
actor,
|
chan=channel,
|
||||||
channel,
|
|
||||||
# if the local task is cancelled we want to keep
|
# if the local task is cancelled we want to keep
|
||||||
# the msg loop running until our block ends
|
# the msg loop running until our block ends
|
||||||
shield=True,
|
shield=True,
|
||||||
|
|
|
@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
|
||||||
|
|
||||||
|
|
||||||
async def process_messages(
|
async def process_messages(
|
||||||
actor: Actor,
|
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
@ -907,6 +906,7 @@ async def process_messages(
|
||||||
(as utilized inside `Portal.cancel_actor()` ).
|
(as utilized inside `Portal.cancel_actor()` ).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
actor: Actor = _state.current_actor()
|
||||||
assert actor._service_n # runtime state sanity
|
assert actor._service_n # runtime state sanity
|
||||||
|
|
||||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||||
|
|
|
@ -40,9 +40,7 @@ from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
ExitStack,
|
ExitStack,
|
||||||
)
|
)
|
||||||
from collections import defaultdict
|
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from itertools import chain
|
|
||||||
import importlib
|
import importlib
|
||||||
import importlib.util
|
import importlib.util
|
||||||
import os
|
import os
|
||||||
|
@ -76,6 +74,7 @@ from tractor.msg import (
|
||||||
)
|
)
|
||||||
from .ipc import (
|
from .ipc import (
|
||||||
Channel,
|
Channel,
|
||||||
|
# IPCServer, # causes cycles atm..
|
||||||
_server,
|
_server,
|
||||||
)
|
)
|
||||||
from ._addr import (
|
from ._addr import (
|
||||||
|
@ -96,18 +95,13 @@ from ._exceptions import (
|
||||||
ModuleNotExposed,
|
ModuleNotExposed,
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
TransportClosed,
|
|
||||||
)
|
)
|
||||||
from .devx import _debug
|
from .devx import _debug
|
||||||
from ._discovery import get_registry
|
from ._discovery import get_registry
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from . import _state
|
from . import _state
|
||||||
from . import _mp_fixup_main
|
from . import _mp_fixup_main
|
||||||
from ._rpc import (
|
from . import _rpc
|
||||||
process_messages,
|
|
||||||
try_ship_error_to_remote,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._supervise import ActorNursery
|
from ._supervise import ActorNursery
|
||||||
|
@ -161,7 +155,6 @@ class Actor:
|
||||||
_root_n: Nursery|None = None
|
_root_n: Nursery|None = None
|
||||||
_service_n: Nursery|None = None
|
_service_n: Nursery|None = None
|
||||||
|
|
||||||
# XXX moving to IPCServer!
|
|
||||||
_ipc_server: _server.IPCServer|None = None
|
_ipc_server: _server.IPCServer|None = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -251,14 +244,6 @@ class Actor:
|
||||||
# by the user (currently called the "arbiter")
|
# by the user (currently called the "arbiter")
|
||||||
self._spawn_method: str = spawn_method
|
self._spawn_method: str = spawn_method
|
||||||
|
|
||||||
self._peers: defaultdict[
|
|
||||||
str, # uaid
|
|
||||||
list[Channel], # IPC conns from peer
|
|
||||||
] = defaultdict(list)
|
|
||||||
self._peer_connected: dict[tuple[str, str], trio.Event] = {}
|
|
||||||
self._no_more_peers = trio.Event()
|
|
||||||
self._no_more_peers.set()
|
|
||||||
|
|
||||||
# RPC state
|
# RPC state
|
||||||
self._ongoing_rpc_tasks = trio.Event()
|
self._ongoing_rpc_tasks = trio.Event()
|
||||||
self._ongoing_rpc_tasks.set()
|
self._ongoing_rpc_tasks.set()
|
||||||
|
@ -343,7 +328,12 @@ class Actor:
|
||||||
parent_uid: tuple|None = None
|
parent_uid: tuple|None = None
|
||||||
if rent_chan := self._parent_chan:
|
if rent_chan := self._parent_chan:
|
||||||
parent_uid = rent_chan.uid
|
parent_uid = rent_chan.uid
|
||||||
peers: list[tuple] = list(self._peer_connected)
|
|
||||||
|
peers: list = []
|
||||||
|
server: _server.IPCServer = self.ipc_server
|
||||||
|
if server:
|
||||||
|
peers: list[tuple] = list(server._peer_connected)
|
||||||
|
|
||||||
fmtstr: str = (
|
fmtstr: str = (
|
||||||
f' |_id: {self.aid!r}\n'
|
f' |_id: {self.aid!r}\n'
|
||||||
# f" aid{ds}{self.aid!r}\n"
|
# f" aid{ds}{self.aid!r}\n"
|
||||||
|
@ -399,25 +389,6 @@ class Actor:
|
||||||
|
|
||||||
self._reg_addrs = addrs
|
self._reg_addrs = addrs
|
||||||
|
|
||||||
async def wait_for_peer(
|
|
||||||
self,
|
|
||||||
uid: tuple[str, str],
|
|
||||||
|
|
||||||
) -> tuple[trio.Event, Channel]:
|
|
||||||
'''
|
|
||||||
Wait for a connection back from a (spawned sub-)actor with
|
|
||||||
a `uid` using a `trio.Event` for sync.
|
|
||||||
|
|
||||||
'''
|
|
||||||
log.debug(f'Waiting for peer {uid!r} to connect')
|
|
||||||
event = self._peer_connected.setdefault(uid, trio.Event())
|
|
||||||
await event.wait()
|
|
||||||
log.debug(f'{uid!r} successfully connected back to us')
|
|
||||||
return (
|
|
||||||
event,
|
|
||||||
self._peers[uid][-1],
|
|
||||||
)
|
|
||||||
|
|
||||||
def load_modules(
|
def load_modules(
|
||||||
self,
|
self,
|
||||||
# debug_mode: bool = False,
|
# debug_mode: bool = False,
|
||||||
|
@ -493,434 +464,6 @@ class Actor:
|
||||||
|
|
||||||
raise mne
|
raise mne
|
||||||
|
|
||||||
# TODO: maybe change to mod-func and rename for implied
|
|
||||||
# multi-transport semantics?
|
|
||||||
async def _stream_handler(
|
|
||||||
self,
|
|
||||||
stream: trio.SocketStream,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
'''
|
|
||||||
Entry point for new inbound IPC connections on a specific
|
|
||||||
transport server.
|
|
||||||
|
|
||||||
'''
|
|
||||||
self._no_more_peers = trio.Event() # unset by making new
|
|
||||||
# with _debug.maybe_open_crash_handler(
|
|
||||||
# pdb=True,
|
|
||||||
# ) as boxerr:
|
|
||||||
chan = Channel.from_stream(stream)
|
|
||||||
con_status: str = (
|
|
||||||
'New inbound IPC connection <=\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# send/receive initial handshake response
|
|
||||||
try:
|
|
||||||
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
|
||||||
aid=self.aid,
|
|
||||||
)
|
|
||||||
except (
|
|
||||||
TransportClosed,
|
|
||||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
|
||||||
# during various `SocketStream.send/receive_xx()` calls
|
|
||||||
# under different fault conditions such as,
|
|
||||||
#
|
|
||||||
# trio.BrokenResourceError,
|
|
||||||
# trio.ClosedResourceError,
|
|
||||||
#
|
|
||||||
# Inside our `.ipc._transport` layer we absorb and
|
|
||||||
# re-raise our own `TransportClosed` exc such that this
|
|
||||||
# higher level runtime code can only worry one
|
|
||||||
# "kinda-error" that we expect to tolerate during
|
|
||||||
# discovery-sys related pings, queires, DoS etc.
|
|
||||||
):
|
|
||||||
# XXX: This may propagate up from `Channel._aiter_recv()`
|
|
||||||
# and `MsgpackStream._inter_packets()` on a read from the
|
|
||||||
# stream particularly when the runtime is first starting up
|
|
||||||
# inside `open_root_actor()` where there is a check for
|
|
||||||
# a bound listener on the "arbiter" addr. the reset will be
|
|
||||||
# because the handshake was never meant took place.
|
|
||||||
log.runtime(
|
|
||||||
con_status
|
|
||||||
+
|
|
||||||
' -> But failed to handshake? Ignoring..\n'
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
uid: tuple[str, str] = (
|
|
||||||
peer_aid.name,
|
|
||||||
peer_aid.uuid,
|
|
||||||
)
|
|
||||||
# TODO, can we make this downstream peer tracking use the
|
|
||||||
# `peer_aid` instead?
|
|
||||||
familiar: str = 'new-peer'
|
|
||||||
if _pre_chan := self._peers.get(uid):
|
|
||||||
familiar: str = 'pre-existing-peer'
|
|
||||||
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
|
||||||
con_status += (
|
|
||||||
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
if _pre_chan:
|
|
||||||
# con_status += (
|
|
||||||
# ^TODO^ swap once we minimize conn duplication
|
|
||||||
# -[ ] last thing might be reg/unreg runtime reqs?
|
|
||||||
# log.warning(
|
|
||||||
log.debug(
|
|
||||||
f'?Wait?\n'
|
|
||||||
f'We already have IPC with peer {uid_short!r}\n'
|
|
||||||
f'|_{_pre_chan}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# IPC connection tracking for both peers and new children:
|
|
||||||
# - if this is a new channel to a locally spawned
|
|
||||||
# sub-actor there will be a spawn wait even registered
|
|
||||||
# by a call to `.wait_for_peer()`.
|
|
||||||
# - if a peer is connecting no such event will exit.
|
|
||||||
event: trio.Event|None = self._peer_connected.pop(
|
|
||||||
uid,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
if event:
|
|
||||||
con_status += (
|
|
||||||
' -> Waking subactor spawn waiters: '
|
|
||||||
f'{event.statistics().tasks_waiting}\n'
|
|
||||||
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
|
|
||||||
# f' {event}\n'
|
|
||||||
# f' |{event.statistics()}\n'
|
|
||||||
)
|
|
||||||
# wake tasks waiting on this IPC-transport "connect-back"
|
|
||||||
event.set()
|
|
||||||
|
|
||||||
else:
|
|
||||||
con_status += (
|
|
||||||
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
|
|
||||||
) # type: ignore
|
|
||||||
|
|
||||||
chans: list[Channel] = self._peers[uid]
|
|
||||||
# if chans:
|
|
||||||
# # TODO: re-use channels for new connections instead
|
|
||||||
# # of always new ones?
|
|
||||||
# # => will require changing all the discovery funcs..
|
|
||||||
|
|
||||||
# append new channel
|
|
||||||
# TODO: can we just use list-ref directly?
|
|
||||||
chans.append(chan)
|
|
||||||
|
|
||||||
con_status += ' -> Entering RPC msg loop..\n'
|
|
||||||
log.runtime(con_status)
|
|
||||||
|
|
||||||
# Begin channel management - respond to remote requests and
|
|
||||||
# process received reponses.
|
|
||||||
disconnected: bool = False
|
|
||||||
last_msg: MsgType
|
|
||||||
try:
|
|
||||||
(
|
|
||||||
disconnected,
|
|
||||||
last_msg,
|
|
||||||
) = await process_messages(
|
|
||||||
self,
|
|
||||||
chan,
|
|
||||||
)
|
|
||||||
except trio.Cancelled:
|
|
||||||
log.cancel(
|
|
||||||
'IPC transport msg loop was cancelled\n'
|
|
||||||
f'c)>\n'
|
|
||||||
f' |_{chan}\n'
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
|
|
||||||
finally:
|
|
||||||
local_nursery: (
|
|
||||||
ActorNursery|None
|
|
||||||
) = self._actoruid2nursery.get(uid)
|
|
||||||
|
|
||||||
# This is set in ``Portal.cancel_actor()``. So if
|
|
||||||
# the peer was cancelled we try to wait for them
|
|
||||||
# to tear down their side of the connection before
|
|
||||||
# moving on with closing our own side.
|
|
||||||
if (
|
|
||||||
local_nursery
|
|
||||||
and (
|
|
||||||
self._cancel_called
|
|
||||||
or
|
|
||||||
chan._cancel_called
|
|
||||||
)
|
|
||||||
#
|
|
||||||
# ^-TODO-^ along with this is there another condition
|
|
||||||
# that we should filter with to avoid entering this
|
|
||||||
# waiting block needlessly?
|
|
||||||
# -[ ] maybe `and local_nursery.cancelled` and/or
|
|
||||||
# only if the `._children` table is empty or has
|
|
||||||
# only `Portal`s with .chan._cancel_called ==
|
|
||||||
# True` as per what we had below; the MAIN DIFF
|
|
||||||
# BEING that just bc one `Portal.cancel_actor()`
|
|
||||||
# was called, doesn't mean the whole actor-nurse
|
|
||||||
# is gonna exit any time soon right!?
|
|
||||||
#
|
|
||||||
# or
|
|
||||||
# all(chan._cancel_called for chan in chans)
|
|
||||||
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
'Waiting on cancel request to peer..\n'
|
|
||||||
f'c)=>\n'
|
|
||||||
f' |_{chan.uid}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: this is a soft wait on the channel (and its
|
|
||||||
# underlying transport protocol) to close from the
|
|
||||||
# remote peer side since we presume that any channel
|
|
||||||
# which is mapped to a sub-actor (i.e. it's managed
|
|
||||||
# by local actor-nursery) has a message that is sent
|
|
||||||
# to the peer likely by this actor (which may be in
|
|
||||||
# a shutdown sequence due to cancellation) when the
|
|
||||||
# local runtime here is now cancelled while
|
|
||||||
# (presumably) in the middle of msg loop processing.
|
|
||||||
chan_info: str = (
|
|
||||||
f'{chan.uid}\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
f' |_{chan.transport}\n\n'
|
|
||||||
)
|
|
||||||
with trio.move_on_after(0.5) as drain_cs:
|
|
||||||
drain_cs.shield = True
|
|
||||||
|
|
||||||
# attempt to wait for the far end to close the
|
|
||||||
# channel and bail after timeout (a 2-generals
|
|
||||||
# problem on closure).
|
|
||||||
assert chan.transport
|
|
||||||
async for msg in chan.transport.drain():
|
|
||||||
|
|
||||||
# try to deliver any lingering msgs
|
|
||||||
# before we destroy the channel.
|
|
||||||
# This accomplishes deterministic
|
|
||||||
# ``Portal.cancel_actor()`` cancellation by
|
|
||||||
# making sure any RPC response to that call is
|
|
||||||
# delivered the local calling task.
|
|
||||||
# TODO: factor this into a helper?
|
|
||||||
log.warning(
|
|
||||||
'Draining msg from disconnected peer\n'
|
|
||||||
f'{chan_info}'
|
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
|
||||||
# cid: str|None = msg.get('cid')
|
|
||||||
cid: str|None = msg.cid
|
|
||||||
if cid:
|
|
||||||
# deliver response to local caller/waiter
|
|
||||||
await self._deliver_ctx_payload(
|
|
||||||
chan,
|
|
||||||
cid,
|
|
||||||
msg,
|
|
||||||
)
|
|
||||||
if drain_cs.cancelled_caught:
|
|
||||||
log.warning(
|
|
||||||
'Timed out waiting on IPC transport channel to drain?\n'
|
|
||||||
f'{chan_info}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX NOTE XXX when no explicit call to
|
|
||||||
# `open_root_actor()` was made by the application
|
|
||||||
# (normally we implicitly make that call inside
|
|
||||||
# the first `.open_nursery()` in root-actor
|
|
||||||
# user/app code), we can assume that either we
|
|
||||||
# are NOT the root actor or are root but the
|
|
||||||
# runtime was started manually. and thus DO have
|
|
||||||
# to wait for the nursery-enterer to exit before
|
|
||||||
# shutting down the local runtime to avoid
|
|
||||||
# clobbering any ongoing subactor
|
|
||||||
# teardown/debugging/graceful-cancel.
|
|
||||||
#
|
|
||||||
# see matching note inside `._supervise.open_nursery()`
|
|
||||||
#
|
|
||||||
# TODO: should we have a separate cs + timeout
|
|
||||||
# block here?
|
|
||||||
if (
|
|
||||||
# XXX SO either,
|
|
||||||
# - not root OR,
|
|
||||||
# - is root but `open_root_actor()` was
|
|
||||||
# entered manually (in which case we do
|
|
||||||
# the equiv wait there using the
|
|
||||||
# `devx._debug` sub-sys APIs).
|
|
||||||
not local_nursery._implicit_runtime_started
|
|
||||||
):
|
|
||||||
log.runtime(
|
|
||||||
'Waiting on local actor nursery to exit..\n'
|
|
||||||
f'|_{local_nursery}\n'
|
|
||||||
)
|
|
||||||
with trio.move_on_after(0.5) as an_exit_cs:
|
|
||||||
an_exit_cs.shield = True
|
|
||||||
await local_nursery.exited.wait()
|
|
||||||
|
|
||||||
# TODO: currently this is always triggering for every
|
|
||||||
# sub-daemon spawned from the `piker.services._mngr`?
|
|
||||||
# -[ ] how do we ensure that the IPC is supposed to
|
|
||||||
# be long lived and isn't just a register?
|
|
||||||
# |_ in the register case how can we signal that the
|
|
||||||
# ephemeral msg loop was intentional?
|
|
||||||
if (
|
|
||||||
# not local_nursery._implicit_runtime_started
|
|
||||||
# and
|
|
||||||
an_exit_cs.cancelled_caught
|
|
||||||
):
|
|
||||||
report: str = (
|
|
||||||
'Timed out waiting on local actor-nursery to exit?\n'
|
|
||||||
f'c)>\n'
|
|
||||||
f' |_{local_nursery}\n'
|
|
||||||
)
|
|
||||||
if children := local_nursery._children:
|
|
||||||
# indent from above local-nurse repr
|
|
||||||
report += (
|
|
||||||
f' |_{pformat(children)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
log.warning(report)
|
|
||||||
|
|
||||||
if disconnected:
|
|
||||||
# if the transport died and this actor is still
|
|
||||||
# registered within a local nursery, we report
|
|
||||||
# that the IPC layer may have failed
|
|
||||||
# unexpectedly since it may be the cause of
|
|
||||||
# other downstream errors.
|
|
||||||
entry: tuple|None = local_nursery._children.get(uid)
|
|
||||||
if entry:
|
|
||||||
proc: trio.Process
|
|
||||||
_, proc, _ = entry
|
|
||||||
|
|
||||||
if (
|
|
||||||
(poll := getattr(proc, 'poll', None))
|
|
||||||
and
|
|
||||||
poll() is None # proc still alive
|
|
||||||
):
|
|
||||||
# TODO: change log level based on
|
|
||||||
# detecting whether chan was created for
|
|
||||||
# ephemeral `.register_actor()` request!
|
|
||||||
# -[ ] also, that should be avoidable by
|
|
||||||
# re-using any existing chan from the
|
|
||||||
# `._discovery.get_registry()` call as
|
|
||||||
# well..
|
|
||||||
log.runtime(
|
|
||||||
f'Peer IPC broke but subproc is alive?\n\n'
|
|
||||||
|
|
||||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
|
||||||
f' |_{proc}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# ``Channel`` teardown and closure sequence
|
|
||||||
# drop ref to channel so it can be gc-ed and disconnected
|
|
||||||
con_teardown_status: str = (
|
|
||||||
f'IPC channel disconnected:\n'
|
|
||||||
f'<=x uid: {chan.uid}\n'
|
|
||||||
f' |_{pformat(chan)}\n\n'
|
|
||||||
)
|
|
||||||
chans.remove(chan)
|
|
||||||
|
|
||||||
# TODO: do we need to be this pedantic?
|
|
||||||
if not chans:
|
|
||||||
con_teardown_status += (
|
|
||||||
f'-> No more channels with {chan.uid}'
|
|
||||||
)
|
|
||||||
self._peers.pop(uid, None)
|
|
||||||
|
|
||||||
peers_str: str = ''
|
|
||||||
for uid, chans in self._peers.items():
|
|
||||||
peers_str += (
|
|
||||||
f'uid: {uid}\n'
|
|
||||||
)
|
|
||||||
for i, chan in enumerate(chans):
|
|
||||||
peers_str += (
|
|
||||||
f' |_[{i}] {pformat(chan)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
con_teardown_status += (
|
|
||||||
f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# No more channels to other actors (at all) registered
|
|
||||||
# as connected.
|
|
||||||
if not self._peers:
|
|
||||||
con_teardown_status += (
|
|
||||||
'Signalling no more peer channel connections'
|
|
||||||
)
|
|
||||||
self._no_more_peers.set()
|
|
||||||
|
|
||||||
# NOTE: block this actor from acquiring the
|
|
||||||
# debugger-TTY-lock since we have no way to know if we
|
|
||||||
# cancelled it and further there is no way to ensure the
|
|
||||||
# lock will be released if acquired due to having no
|
|
||||||
# more active IPC channels.
|
|
||||||
if _state.is_root_process():
|
|
||||||
pdb_lock = _debug.Lock
|
|
||||||
pdb_lock._blocked.add(uid)
|
|
||||||
|
|
||||||
# TODO: NEEEDS TO BE TESTED!
|
|
||||||
# actually, no idea if this ever even enters.. XD
|
|
||||||
#
|
|
||||||
# XXX => YES IT DOES, when i was testing ctl-c
|
|
||||||
# from broken debug TTY locking due to
|
|
||||||
# msg-spec races on application using RunVar...
|
|
||||||
if (
|
|
||||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
|
||||||
and
|
|
||||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
|
||||||
and
|
|
||||||
local_nursery
|
|
||||||
):
|
|
||||||
entry: tuple|None = local_nursery._children.get(
|
|
||||||
tuple(pdb_user_uid)
|
|
||||||
)
|
|
||||||
if entry:
|
|
||||||
proc: trio.Process
|
|
||||||
_, proc, _ = entry
|
|
||||||
|
|
||||||
if (
|
|
||||||
(poll := getattr(proc, 'poll', None))
|
|
||||||
and poll() is None
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
'Root actor reports no-more-peers, BUT\n'
|
|
||||||
'a DISCONNECTED child still has the debug '
|
|
||||||
'lock!\n\n'
|
|
||||||
# f'root uid: {self.uid}\n'
|
|
||||||
f'last disconnected child uid: {uid}\n'
|
|
||||||
f'locking child uid: {pdb_user_uid}\n'
|
|
||||||
)
|
|
||||||
await _debug.maybe_wait_for_debugger(
|
|
||||||
child_in_debug=True
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: just bc a child's transport dropped
|
|
||||||
# doesn't mean it's not still using the pdb
|
|
||||||
# REPL! so,
|
|
||||||
# -[ ] ideally we can check out child proc
|
|
||||||
# tree to ensure that its alive (and
|
|
||||||
# actually using the REPL) before we cancel
|
|
||||||
# it's lock acquire by doing the below!
|
|
||||||
# -[ ] create a way to read the tree of each actor's
|
|
||||||
# grandchildren such that when an
|
|
||||||
# intermediary parent is cancelled but their
|
|
||||||
# child has locked the tty, the grandparent
|
|
||||||
# will not allow the parent to cancel or
|
|
||||||
# zombie reap the child! see open issue:
|
|
||||||
# - https://github.com/goodboy/tractor/issues/320
|
|
||||||
# ------ - ------
|
|
||||||
# if a now stale local task has the TTY lock still
|
|
||||||
# we cancel it to allow servicing other requests for
|
|
||||||
# the lock.
|
|
||||||
if (
|
|
||||||
(db_cs := pdb_lock.get_locking_task_cs())
|
|
||||||
and not db_cs.cancel_called
|
|
||||||
and uid == pdb_user_uid
|
|
||||||
):
|
|
||||||
log.critical(
|
|
||||||
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
|
||||||
)
|
|
||||||
# TODO: figure out why this breaks tests..
|
|
||||||
db_cs.cancel()
|
|
||||||
|
|
||||||
log.runtime(con_teardown_status)
|
|
||||||
# finally block closure
|
|
||||||
|
|
||||||
# TODO: rename to `._deliver_payload()` since this handles
|
# TODO: rename to `._deliver_payload()` since this handles
|
||||||
# more then just `result` msgs now obvi XD
|
# more then just `result` msgs now obvi XD
|
||||||
async def _deliver_ctx_payload(
|
async def _deliver_ctx_payload(
|
||||||
|
@ -1157,7 +700,7 @@ class Actor:
|
||||||
)
|
)
|
||||||
assert isinstance(chan, Channel)
|
assert isinstance(chan, Channel)
|
||||||
|
|
||||||
# Initial handshake: swap names.
|
# init handshake: swap actor-IDs.
|
||||||
await chan._do_handshake(aid=self.aid)
|
await chan._do_handshake(aid=self.aid)
|
||||||
|
|
||||||
accept_addrs: list[UnwrappedAddress]|None = None
|
accept_addrs: list[UnwrappedAddress]|None = None
|
||||||
|
@ -1719,6 +1262,10 @@ async def async_main(
|
||||||
the actor's "runtime" and all thus all ongoing RPC tasks.
|
the actor's "runtime" and all thus all ongoing RPC tasks.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# XXX NOTE, `_state._current_actor` **must** be set prior to
|
||||||
|
# calling this core runtime entrypoint!
|
||||||
|
assert actor is _state.current_actor()
|
||||||
|
|
||||||
actor._task: trio.Task = trio.lowlevel.current_task()
|
actor._task: trio.Task = trio.lowlevel.current_task()
|
||||||
|
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
|
@ -1778,7 +1325,6 @@ async def async_main(
|
||||||
) as service_nursery,
|
) as service_nursery,
|
||||||
|
|
||||||
_server.open_ipc_server(
|
_server.open_ipc_server(
|
||||||
actor=actor,
|
|
||||||
parent_tn=service_nursery,
|
parent_tn=service_nursery,
|
||||||
stream_handler_tn=service_nursery,
|
stream_handler_tn=service_nursery,
|
||||||
) as ipc_server,
|
) as ipc_server,
|
||||||
|
@ -1832,7 +1378,6 @@ async def async_main(
|
||||||
'Booting IPC server'
|
'Booting IPC server'
|
||||||
)
|
)
|
||||||
eps: list = await ipc_server.listen_on(
|
eps: list = await ipc_server.listen_on(
|
||||||
actor=actor,
|
|
||||||
accept_addrs=accept_addrs,
|
accept_addrs=accept_addrs,
|
||||||
stream_handler_nursery=service_nursery,
|
stream_handler_nursery=service_nursery,
|
||||||
)
|
)
|
||||||
|
@ -1916,9 +1461,8 @@ async def async_main(
|
||||||
if actor._parent_chan:
|
if actor._parent_chan:
|
||||||
await root_nursery.start(
|
await root_nursery.start(
|
||||||
partial(
|
partial(
|
||||||
process_messages,
|
_rpc.process_messages,
|
||||||
actor,
|
chan=actor._parent_chan,
|
||||||
actor._parent_chan,
|
|
||||||
shield=True,
|
shield=True,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -1959,7 +1503,7 @@ async def async_main(
|
||||||
log.exception(err_report)
|
log.exception(err_report)
|
||||||
|
|
||||||
if actor._parent_chan:
|
if actor._parent_chan:
|
||||||
await try_ship_error_to_remote(
|
await _rpc.try_ship_error_to_remote(
|
||||||
actor._parent_chan,
|
actor._parent_chan,
|
||||||
internal_err,
|
internal_err,
|
||||||
)
|
)
|
||||||
|
@ -2053,16 +1597,18 @@ async def async_main(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Ensure all peers (actors connected to us as clients) are finished
|
# Ensure all peers (actors connected to us as clients) are finished
|
||||||
if not actor._no_more_peers.is_set():
|
if (
|
||||||
if any(
|
(ipc_server := actor.ipc_server)
|
||||||
chan.connected() for chan in chain(*actor._peers.values())
|
and
|
||||||
):
|
ipc_server.has_peers(check_chans=True)
|
||||||
teardown_report += (
|
):
|
||||||
f'-> Waiting for remaining peers {actor._peers} to clear..\n'
|
teardown_report += (
|
||||||
)
|
f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
|
||||||
log.runtime(teardown_report)
|
)
|
||||||
with CancelScope(shield=True):
|
log.runtime(teardown_report)
|
||||||
await actor._no_more_peers.wait()
|
await ipc_server.wait_for_no_more_peers(
|
||||||
|
shield=True,
|
||||||
|
)
|
||||||
|
|
||||||
teardown_report += (
|
teardown_report += (
|
||||||
'-> All peer channels are complete\n'
|
'-> All peer channels are complete\n'
|
||||||
|
|
|
@ -58,9 +58,11 @@ from tractor.msg.types import (
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
from ipc import IPCServer
|
||||||
from ._supervise import ActorNursery
|
from ._supervise import ActorNursery
|
||||||
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
|
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
|
||||||
|
|
||||||
|
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
|
||||||
# placeholder for an mp start context if so using that backend
|
# placeholder for an mp start context if so using that backend
|
||||||
|
@ -481,6 +483,7 @@ async def trio_proc(
|
||||||
|
|
||||||
cancelled_during_spawn: bool = False
|
cancelled_during_spawn: bool = False
|
||||||
proc: trio.Process|None = None
|
proc: trio.Process|None = None
|
||||||
|
ipc_server: IPCServer = actor_nursery._actor.ipc_server
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
|
proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
|
||||||
|
@ -492,7 +495,7 @@ async def trio_proc(
|
||||||
# wait for actor to spawn and connect back to us
|
# wait for actor to spawn and connect back to us
|
||||||
# channel should have handshake completed by the
|
# channel should have handshake completed by the
|
||||||
# local actor by the time we get a ref to it
|
# local actor by the time we get a ref to it
|
||||||
event, chan = await actor_nursery._actor.wait_for_peer(
|
event, chan = await ipc_server.wait_for_peer(
|
||||||
subactor.uid
|
subactor.uid
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -724,11 +727,12 @@ async def mp_proc(
|
||||||
|
|
||||||
log.runtime(f"Started {proc}")
|
log.runtime(f"Started {proc}")
|
||||||
|
|
||||||
|
ipc_server: IPCServer = actor_nursery._actor.ipc_server
|
||||||
try:
|
try:
|
||||||
# wait for actor to spawn and connect back to us
|
# wait for actor to spawn and connect back to us
|
||||||
# channel should have handshake completed by the
|
# channel should have handshake completed by the
|
||||||
# local actor by the time we get a ref to it
|
# local actor by the time we get a ref to it
|
||||||
event, chan = await actor_nursery._actor.wait_for_peer(
|
event, chan = await ipc_server.wait_for_peer(
|
||||||
subactor.uid,
|
subactor.uid,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,9 @@ from . import _spawn
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
|
# from .ipc._server import IPCServer
|
||||||
|
from .ipc import IPCServer
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -315,6 +318,9 @@ class ActorNursery:
|
||||||
children: dict = self._children
|
children: dict = self._children
|
||||||
child_count: int = len(children)
|
child_count: int = len(children)
|
||||||
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
||||||
|
|
||||||
|
server: IPCServer = self._actor.ipc_server
|
||||||
|
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
async with trio.open_nursery(
|
async with trio.open_nursery(
|
||||||
strict_exception_groups=False,
|
strict_exception_groups=False,
|
||||||
|
@ -337,7 +343,7 @@ class ActorNursery:
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if portal is None: # actor hasn't fully spawned yet
|
if portal is None: # actor hasn't fully spawned yet
|
||||||
event = self._actor._peer_connected[subactor.uid]
|
event: trio.Event = server._peer_connected[subactor.uid]
|
||||||
log.warning(
|
log.warning(
|
||||||
f"{subactor.uid} never 't finished spawning?"
|
f"{subactor.uid} never 't finished spawning?"
|
||||||
)
|
)
|
||||||
|
@ -353,7 +359,7 @@ class ActorNursery:
|
||||||
if portal is None:
|
if portal is None:
|
||||||
# cancelled while waiting on the event
|
# cancelled while waiting on the event
|
||||||
# to arrive
|
# to arrive
|
||||||
chan = self._actor._peers[subactor.uid][-1]
|
chan = server._peers[subactor.uid][-1]
|
||||||
if chan:
|
if chan:
|
||||||
portal = Portal(chan)
|
portal = Portal(chan)
|
||||||
else: # there's no other choice left
|
else: # there's no other choice left
|
||||||
|
|
|
@ -92,7 +92,11 @@ from tractor._state import (
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from trio.lowlevel import Task
|
from trio.lowlevel import Task
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
from tractor.ipc import Channel
|
from tractor.ipc import (
|
||||||
|
Channel,
|
||||||
|
IPCServer,
|
||||||
|
# _server, # TODO? export at top level?
|
||||||
|
)
|
||||||
from tractor._runtime import (
|
from tractor._runtime import (
|
||||||
Actor,
|
Actor,
|
||||||
)
|
)
|
||||||
|
@ -1434,6 +1438,7 @@ def any_connected_locker_child() -> bool:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
|
server: IPCServer = actor.ipc_server
|
||||||
|
|
||||||
if not is_root_process():
|
if not is_root_process():
|
||||||
raise InternalError('This is a root-actor only API!')
|
raise InternalError('This is a root-actor only API!')
|
||||||
|
@ -1443,7 +1448,7 @@ def any_connected_locker_child() -> bool:
|
||||||
and
|
and
|
||||||
(uid_in_debug := ctx.chan.uid)
|
(uid_in_debug := ctx.chan.uid)
|
||||||
):
|
):
|
||||||
chans: list[tractor.Channel] = actor._peers.get(
|
chans: list[tractor.Channel] = server._peers.get(
|
||||||
tuple(uid_in_debug)
|
tuple(uid_in_debug)
|
||||||
)
|
)
|
||||||
if chans:
|
if chans:
|
||||||
|
@ -3003,6 +3008,7 @@ async def _maybe_enter_pm(
|
||||||
[BaseException|BaseExceptionGroup],
|
[BaseException|BaseExceptionGroup],
|
||||||
bool,
|
bool,
|
||||||
] = lambda err: not is_multi_cancelled(err),
|
] = lambda err: not is_multi_cancelled(err),
|
||||||
|
**_pause_kws,
|
||||||
|
|
||||||
):
|
):
|
||||||
if (
|
if (
|
||||||
|
@ -3029,6 +3035,7 @@ async def _maybe_enter_pm(
|
||||||
await post_mortem(
|
await post_mortem(
|
||||||
api_frame=api_frame,
|
api_frame=api_frame,
|
||||||
tb=tb,
|
tb=tb,
|
||||||
|
**_pause_kws,
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,7 @@ from tractor.log import get_logger
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
|
TransportClosed,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
Aid,
|
Aid,
|
||||||
|
@ -256,7 +257,7 @@ class Channel:
|
||||||
self,
|
self,
|
||||||
payload: Any,
|
payload: Any,
|
||||||
|
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -274,18 +275,27 @@ class Channel:
|
||||||
payload,
|
payload,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
except BaseException as _err:
|
except (
|
||||||
|
BaseException,
|
||||||
|
MsgTypeError,
|
||||||
|
TransportClosed,
|
||||||
|
)as _err:
|
||||||
err = _err # bind for introspection
|
err = _err # bind for introspection
|
||||||
if not isinstance(_err, MsgTypeError):
|
match err:
|
||||||
# assert err
|
case MsgTypeError():
|
||||||
__tracebackhide__: bool = False
|
try:
|
||||||
else:
|
assert err.cid
|
||||||
try:
|
except KeyError:
|
||||||
assert err.cid
|
raise err
|
||||||
|
case TransportClosed():
|
||||||
except KeyError:
|
log.transport(
|
||||||
raise err
|
f'Transport stream closed due to\n'
|
||||||
|
f'{err.repr_src_exc()}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
case _:
|
||||||
|
# never suppress non-tpt sources
|
||||||
|
__tracebackhide__: bool = False
|
||||||
raise
|
raise
|
||||||
|
|
||||||
async def recv(self) -> Any:
|
async def recv(self) -> Any:
|
||||||
|
|
|
@ -19,11 +19,14 @@ multi-transport-protcol needs!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
from collections import defaultdict
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
from itertools import chain
|
||||||
import inspect
|
import inspect
|
||||||
|
from pprint import pformat
|
||||||
from types import (
|
from types import (
|
||||||
ModuleType,
|
ModuleType,
|
||||||
)
|
)
|
||||||
|
@ -40,24 +43,540 @@ from trio import (
|
||||||
SocketListener,
|
SocketListener,
|
||||||
)
|
)
|
||||||
|
|
||||||
from ..msg import Struct
|
# from ..devx import _debug
|
||||||
|
from .._exceptions import (
|
||||||
|
TransportClosed,
|
||||||
|
)
|
||||||
|
from .. import _rpc
|
||||||
|
from ..msg import (
|
||||||
|
MsgType,
|
||||||
|
Struct,
|
||||||
|
types as msgtypes,
|
||||||
|
)
|
||||||
from ..trionics import maybe_open_nursery
|
from ..trionics import maybe_open_nursery
|
||||||
from .. import (
|
from .. import (
|
||||||
_state,
|
_state,
|
||||||
log,
|
log,
|
||||||
)
|
)
|
||||||
from .._addr import Address
|
from .._addr import Address
|
||||||
|
from ._chan import Channel
|
||||||
from ._transport import MsgTransport
|
from ._transport import MsgTransport
|
||||||
from ._uds import UDSAddress
|
from ._uds import UDSAddress
|
||||||
from ._tcp import TCPAddress
|
from ._tcp import TCPAddress
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .._runtime import Actor
|
from .._runtime import Actor
|
||||||
|
from .._supervise import ActorNursery
|
||||||
|
|
||||||
|
|
||||||
log = log.get_logger(__name__)
|
log = log.get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def maybe_wait_on_canced_subs(
|
||||||
|
uid: tuple[str, str],
|
||||||
|
chan: Channel,
|
||||||
|
disconnected: bool,
|
||||||
|
|
||||||
|
actor: Actor|None = None,
|
||||||
|
chan_drain_timeout: float = 0.5,
|
||||||
|
an_exit_timeout: float = 0.5,
|
||||||
|
|
||||||
|
) -> ActorNursery|None:
|
||||||
|
'''
|
||||||
|
When a process-local actor-nursery is found for the given actor
|
||||||
|
`uid` (i.e. that peer is **also** a subactor of this parent), we
|
||||||
|
attempt to (with timeouts) wait on,
|
||||||
|
|
||||||
|
- all IPC msgs to drain on the (common) `Channel` such that all
|
||||||
|
local `Context`-parent-tasks can also gracefully collect
|
||||||
|
`ContextCancelled` msgs from their respective remote children
|
||||||
|
vs. a `chan_drain_timeout`.
|
||||||
|
|
||||||
|
- the actor-nursery to cancel-n-join all its supervised children
|
||||||
|
(processes) *gracefully* vs. a `an_exit_timeout` and thus also
|
||||||
|
detect cases where the IPC transport connection broke but
|
||||||
|
a sub-process is detected as still alive (a case that happens
|
||||||
|
when the subactor is still in an active debugger REPL session).
|
||||||
|
|
||||||
|
If the timeout expires in either case we ofc report with warning.
|
||||||
|
|
||||||
|
'''
|
||||||
|
actor = actor or _state.current_actor()
|
||||||
|
|
||||||
|
# XXX running outside actor-runtime usage,
|
||||||
|
# - unit testing
|
||||||
|
# - possibly manual usage (eventually) ?
|
||||||
|
if not actor:
|
||||||
|
return None
|
||||||
|
|
||||||
|
local_nursery: (
|
||||||
|
ActorNursery|None
|
||||||
|
) = actor._actoruid2nursery.get(uid)
|
||||||
|
|
||||||
|
# This is set in `Portal.cancel_actor()`. So if
|
||||||
|
# the peer was cancelled we try to wait for them
|
||||||
|
# to tear down their side of the connection before
|
||||||
|
# moving on with closing our own side.
|
||||||
|
if (
|
||||||
|
local_nursery
|
||||||
|
and (
|
||||||
|
actor._cancel_called
|
||||||
|
or
|
||||||
|
chan._cancel_called
|
||||||
|
)
|
||||||
|
#
|
||||||
|
# ^-TODO-^ along with this is there another condition
|
||||||
|
# that we should filter with to avoid entering this
|
||||||
|
# waiting block needlessly?
|
||||||
|
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||||
|
# only if the `._children` table is empty or has
|
||||||
|
# only `Portal`s with .chan._cancel_called ==
|
||||||
|
# True` as per what we had below; the MAIN DIFF
|
||||||
|
# BEING that just bc one `Portal.cancel_actor()`
|
||||||
|
# was called, doesn't mean the whole actor-nurse
|
||||||
|
# is gonna exit any time soon right!?
|
||||||
|
#
|
||||||
|
# or
|
||||||
|
# all(chan._cancel_called for chan in chans)
|
||||||
|
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Waiting on cancel request to peer..\n'
|
||||||
|
f'c)=>\n'
|
||||||
|
f' |_{chan.uid}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: this is a soft wait on the channel (and its
|
||||||
|
# underlying transport protocol) to close from the
|
||||||
|
# remote peer side since we presume that any channel
|
||||||
|
# which is mapped to a sub-actor (i.e. it's managed
|
||||||
|
# by local actor-nursery) has a message that is sent
|
||||||
|
# to the peer likely by this actor (which may be in
|
||||||
|
# a shutdown sequence due to cancellation) when the
|
||||||
|
# local runtime here is now cancelled while
|
||||||
|
# (presumably) in the middle of msg loop processing.
|
||||||
|
chan_info: str = (
|
||||||
|
f'{chan.uid}\n'
|
||||||
|
f'|_{chan}\n'
|
||||||
|
f' |_{chan.transport}\n\n'
|
||||||
|
)
|
||||||
|
with trio.move_on_after(chan_drain_timeout) as drain_cs:
|
||||||
|
drain_cs.shield = True
|
||||||
|
|
||||||
|
# attempt to wait for the far end to close the
|
||||||
|
# channel and bail after timeout (a 2-generals
|
||||||
|
# problem on closure).
|
||||||
|
assert chan.transport
|
||||||
|
async for msg in chan.transport.drain():
|
||||||
|
|
||||||
|
# try to deliver any lingering msgs
|
||||||
|
# before we destroy the channel.
|
||||||
|
# This accomplishes deterministic
|
||||||
|
# ``Portal.cancel_actor()`` cancellation by
|
||||||
|
# making sure any RPC response to that call is
|
||||||
|
# delivered the local calling task.
|
||||||
|
# TODO: factor this into a helper?
|
||||||
|
log.warning(
|
||||||
|
'Draining msg from disconnected peer\n'
|
||||||
|
f'{chan_info}'
|
||||||
|
f'{pformat(msg)}\n'
|
||||||
|
)
|
||||||
|
# cid: str|None = msg.get('cid')
|
||||||
|
cid: str|None = msg.cid
|
||||||
|
if cid:
|
||||||
|
# deliver response to local caller/waiter
|
||||||
|
await actor._deliver_ctx_payload(
|
||||||
|
chan,
|
||||||
|
cid,
|
||||||
|
msg,
|
||||||
|
)
|
||||||
|
if drain_cs.cancelled_caught:
|
||||||
|
log.warning(
|
||||||
|
'Timed out waiting on IPC transport channel to drain?\n'
|
||||||
|
f'{chan_info}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX NOTE XXX when no explicit call to
|
||||||
|
# `open_root_actor()` was made by the application
|
||||||
|
# (normally we implicitly make that call inside
|
||||||
|
# the first `.open_nursery()` in root-actor
|
||||||
|
# user/app code), we can assume that either we
|
||||||
|
# are NOT the root actor or are root but the
|
||||||
|
# runtime was started manually. and thus DO have
|
||||||
|
# to wait for the nursery-enterer to exit before
|
||||||
|
# shutting down the local runtime to avoid
|
||||||
|
# clobbering any ongoing subactor
|
||||||
|
# teardown/debugging/graceful-cancel.
|
||||||
|
#
|
||||||
|
# see matching note inside `._supervise.open_nursery()`
|
||||||
|
#
|
||||||
|
# TODO: should we have a separate cs + timeout
|
||||||
|
# block here?
|
||||||
|
if (
|
||||||
|
# XXX SO either,
|
||||||
|
# - not root OR,
|
||||||
|
# - is root but `open_root_actor()` was
|
||||||
|
# entered manually (in which case we do
|
||||||
|
# the equiv wait there using the
|
||||||
|
# `devx.debug` sub-sys APIs).
|
||||||
|
not local_nursery._implicit_runtime_started
|
||||||
|
):
|
||||||
|
log.runtime(
|
||||||
|
'Waiting on local actor nursery to exit..\n'
|
||||||
|
f'|_{local_nursery}\n'
|
||||||
|
)
|
||||||
|
with trio.move_on_after(an_exit_timeout) as an_exit_cs:
|
||||||
|
an_exit_cs.shield = True
|
||||||
|
await local_nursery.exited.wait()
|
||||||
|
|
||||||
|
# TODO: currently this is always triggering for every
|
||||||
|
# sub-daemon spawned from the `piker.services._mngr`?
|
||||||
|
# -[ ] how do we ensure that the IPC is supposed to
|
||||||
|
# be long lived and isn't just a register?
|
||||||
|
# |_ in the register case how can we signal that the
|
||||||
|
# ephemeral msg loop was intentional?
|
||||||
|
if (
|
||||||
|
# not local_nursery._implicit_runtime_started
|
||||||
|
# and
|
||||||
|
an_exit_cs.cancelled_caught
|
||||||
|
):
|
||||||
|
report: str = (
|
||||||
|
'Timed out waiting on local actor-nursery to exit?\n'
|
||||||
|
f'c)>\n'
|
||||||
|
f' |_{local_nursery}\n'
|
||||||
|
)
|
||||||
|
if children := local_nursery._children:
|
||||||
|
# indent from above local-nurse repr
|
||||||
|
report += (
|
||||||
|
f' |_{pformat(children)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
log.warning(report)
|
||||||
|
|
||||||
|
if disconnected:
|
||||||
|
# if the transport died and this actor is still
|
||||||
|
# registered within a local nursery, we report
|
||||||
|
# that the IPC layer may have failed
|
||||||
|
# unexpectedly since it may be the cause of
|
||||||
|
# other downstream errors.
|
||||||
|
entry: tuple|None = local_nursery._children.get(uid)
|
||||||
|
if entry:
|
||||||
|
proc: trio.Process
|
||||||
|
_, proc, _ = entry
|
||||||
|
|
||||||
|
if (
|
||||||
|
(poll := getattr(proc, 'poll', None))
|
||||||
|
and
|
||||||
|
poll() is None # proc still alive
|
||||||
|
):
|
||||||
|
# TODO: change log level based on
|
||||||
|
# detecting whether chan was created for
|
||||||
|
# ephemeral `.register_actor()` request!
|
||||||
|
# -[ ] also, that should be avoidable by
|
||||||
|
# re-using any existing chan from the
|
||||||
|
# `._discovery.get_registry()` call as
|
||||||
|
# well..
|
||||||
|
log.runtime(
|
||||||
|
f'Peer IPC broke but subproc is alive?\n\n'
|
||||||
|
|
||||||
|
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
return local_nursery
|
||||||
|
|
||||||
|
# TODO multi-tpt support with per-proto peer tracking?
|
||||||
|
#
|
||||||
|
# -[x] maybe change to mod-func and rename for implied
|
||||||
|
# multi-transport semantics?
|
||||||
|
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
|
||||||
|
# so that we can query per tpt all peer contact infos?
|
||||||
|
# |_[ ] possibly provide a global viewing via a
|
||||||
|
# `collections.ChainMap`?
|
||||||
|
#
|
||||||
|
async def handle_stream_from_peer(
|
||||||
|
stream: trio.SocketStream,
|
||||||
|
|
||||||
|
*,
|
||||||
|
server: IPCServer,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Top-level `trio.abc.Stream` (i.e. normally `trio.SocketStream`)
|
||||||
|
handler-callback as spawn-invoked by `trio.serve_listeners()`.
|
||||||
|
|
||||||
|
Note that each call to this handler is as a spawned task inside
|
||||||
|
any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
|
||||||
|
such that it is invoked as,
|
||||||
|
|
||||||
|
IPCEndpoint.stream_handler_tn.start_soon(
|
||||||
|
handle_stream,
|
||||||
|
stream,
|
||||||
|
)
|
||||||
|
|
||||||
|
'''
|
||||||
|
server._no_more_peers = trio.Event() # unset by making new
|
||||||
|
|
||||||
|
# TODO, debug_mode tooling for when hackin this lower layer?
|
||||||
|
# with _debug.maybe_open_crash_handler(
|
||||||
|
# pdb=True,
|
||||||
|
# ) as boxerr:
|
||||||
|
|
||||||
|
chan = Channel.from_stream(stream)
|
||||||
|
con_status: str = (
|
||||||
|
'New inbound IPC connection <=\n'
|
||||||
|
f'|_{chan}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# initial handshake with peer phase
|
||||||
|
try:
|
||||||
|
if actor := _state.current_actor():
|
||||||
|
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||||
|
aid=actor.aid,
|
||||||
|
)
|
||||||
|
except (
|
||||||
|
TransportClosed,
|
||||||
|
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||||
|
# during various `SocketStream.send/receive_xx()` calls
|
||||||
|
# under different fault conditions such as,
|
||||||
|
#
|
||||||
|
# trio.BrokenResourceError,
|
||||||
|
# trio.ClosedResourceError,
|
||||||
|
#
|
||||||
|
# Inside our `.ipc._transport` layer we absorb and
|
||||||
|
# re-raise our own `TransportClosed` exc such that this
|
||||||
|
# higher level runtime code can only worry one
|
||||||
|
# "kinda-error" that we expect to tolerate during
|
||||||
|
# discovery-sys related pings, queires, DoS etc.
|
||||||
|
):
|
||||||
|
# XXX: This may propagate up from `Channel._aiter_recv()`
|
||||||
|
# and `MsgpackStream._inter_packets()` on a read from the
|
||||||
|
# stream particularly when the runtime is first starting up
|
||||||
|
# inside `open_root_actor()` where there is a check for
|
||||||
|
# a bound listener on the "arbiter" addr. the reset will be
|
||||||
|
# because the handshake was never meant took place.
|
||||||
|
log.runtime(
|
||||||
|
con_status
|
||||||
|
+
|
||||||
|
' -> But failed to handshake? Ignoring..\n'
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
uid: tuple[str, str] = (
|
||||||
|
peer_aid.name,
|
||||||
|
peer_aid.uuid,
|
||||||
|
)
|
||||||
|
# TODO, can we make this downstream peer tracking use the
|
||||||
|
# `peer_aid` instead?
|
||||||
|
familiar: str = 'new-peer'
|
||||||
|
if _pre_chan := server._peers.get(uid):
|
||||||
|
familiar: str = 'pre-existing-peer'
|
||||||
|
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
||||||
|
con_status += (
|
||||||
|
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
if _pre_chan:
|
||||||
|
# con_status += (
|
||||||
|
# ^TODO^ swap once we minimize conn duplication
|
||||||
|
# -[ ] last thing might be reg/unreg runtime reqs?
|
||||||
|
# log.warning(
|
||||||
|
log.debug(
|
||||||
|
f'?Wait?\n'
|
||||||
|
f'We already have IPC with peer {uid_short!r}\n'
|
||||||
|
f'|_{_pre_chan}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# IPC connection tracking for both peers and new children:
|
||||||
|
# - if this is a new channel to a locally spawned
|
||||||
|
# sub-actor there will be a spawn wait even registered
|
||||||
|
# by a call to `.wait_for_peer()`.
|
||||||
|
# - if a peer is connecting no such event will exit.
|
||||||
|
event: trio.Event|None = server._peer_connected.pop(
|
||||||
|
uid,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
if event:
|
||||||
|
con_status += (
|
||||||
|
' -> Waking subactor spawn waiters: '
|
||||||
|
f'{event.statistics().tasks_waiting}\n'
|
||||||
|
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
|
||||||
|
# f' {event}\n'
|
||||||
|
# f' |{event.statistics()}\n'
|
||||||
|
)
|
||||||
|
# wake tasks waiting on this IPC-transport "connect-back"
|
||||||
|
event.set()
|
||||||
|
|
||||||
|
else:
|
||||||
|
con_status += (
|
||||||
|
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
|
||||||
|
) # type: ignore
|
||||||
|
|
||||||
|
chans: list[Channel] = server._peers[uid]
|
||||||
|
# if chans:
|
||||||
|
# # TODO: re-use channels for new connections instead
|
||||||
|
# # of always new ones?
|
||||||
|
# # => will require changing all the discovery funcs..
|
||||||
|
|
||||||
|
# append new channel
|
||||||
|
# TODO: can we just use list-ref directly?
|
||||||
|
chans.append(chan)
|
||||||
|
|
||||||
|
con_status += ' -> Entering RPC msg loop..\n'
|
||||||
|
log.runtime(con_status)
|
||||||
|
|
||||||
|
# Begin channel management - respond to remote requests and
|
||||||
|
# process received reponses.
|
||||||
|
disconnected: bool = False
|
||||||
|
last_msg: MsgType
|
||||||
|
try:
|
||||||
|
(
|
||||||
|
disconnected,
|
||||||
|
last_msg,
|
||||||
|
) = await _rpc.process_messages(
|
||||||
|
chan=chan,
|
||||||
|
)
|
||||||
|
except trio.Cancelled:
|
||||||
|
log.cancel(
|
||||||
|
'IPC transport msg loop was cancelled\n'
|
||||||
|
f'c)>\n'
|
||||||
|
f' |_{chan}\n'
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
finally:
|
||||||
|
|
||||||
|
# check if there are subs which we should gracefully join at
|
||||||
|
# both the inter-actor-task and subprocess levels to
|
||||||
|
# gracefully remote cancel and later disconnect (particularly
|
||||||
|
# for permitting subs engaged in active debug-REPL sessions).
|
||||||
|
local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
|
||||||
|
uid=uid,
|
||||||
|
chan=chan,
|
||||||
|
disconnected=disconnected,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ``Channel`` teardown and closure sequence
|
||||||
|
# drop ref to channel so it can be gc-ed and disconnected
|
||||||
|
con_teardown_status: str = (
|
||||||
|
f'IPC channel disconnected:\n'
|
||||||
|
f'<=x uid: {chan.uid}\n'
|
||||||
|
f' |_{pformat(chan)}\n\n'
|
||||||
|
)
|
||||||
|
chans.remove(chan)
|
||||||
|
|
||||||
|
# TODO: do we need to be this pedantic?
|
||||||
|
if not chans:
|
||||||
|
con_teardown_status += (
|
||||||
|
f'-> No more channels with {chan.uid}'
|
||||||
|
)
|
||||||
|
server._peers.pop(uid, None)
|
||||||
|
|
||||||
|
peers_str: str = ''
|
||||||
|
for uid, chans in server._peers.items():
|
||||||
|
peers_str += (
|
||||||
|
f'uid: {uid}\n'
|
||||||
|
)
|
||||||
|
for i, chan in enumerate(chans):
|
||||||
|
peers_str += (
|
||||||
|
f' |_[{i}] {pformat(chan)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
con_teardown_status += (
|
||||||
|
f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# No more channels to other actors (at all) registered
|
||||||
|
# as connected.
|
||||||
|
if not server._peers:
|
||||||
|
con_teardown_status += (
|
||||||
|
'Signalling no more peer channel connections'
|
||||||
|
)
|
||||||
|
server._no_more_peers.set()
|
||||||
|
|
||||||
|
# NOTE: block this actor from acquiring the
|
||||||
|
# debugger-TTY-lock since we have no way to know if we
|
||||||
|
# cancelled it and further there is no way to ensure the
|
||||||
|
# lock will be released if acquired due to having no
|
||||||
|
# more active IPC channels.
|
||||||
|
if (
|
||||||
|
_state.is_root_process()
|
||||||
|
and
|
||||||
|
_state.is_debug_mode()
|
||||||
|
):
|
||||||
|
from ..devx import _debug
|
||||||
|
pdb_lock = _debug.Lock
|
||||||
|
pdb_lock._blocked.add(uid)
|
||||||
|
|
||||||
|
# TODO: NEEEDS TO BE TESTED!
|
||||||
|
# actually, no idea if this ever even enters.. XD
|
||||||
|
#
|
||||||
|
# XXX => YES IT DOES, when i was testing ctl-c
|
||||||
|
# from broken debug TTY locking due to
|
||||||
|
# msg-spec races on application using RunVar...
|
||||||
|
if (
|
||||||
|
local_nursery
|
||||||
|
and
|
||||||
|
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||||
|
and
|
||||||
|
(pdb_user_uid := ctx_in_debug.chan.uid)
|
||||||
|
):
|
||||||
|
entry: tuple|None = local_nursery._children.get(
|
||||||
|
tuple(pdb_user_uid)
|
||||||
|
)
|
||||||
|
if entry:
|
||||||
|
proc: trio.Process
|
||||||
|
_, proc, _ = entry
|
||||||
|
|
||||||
|
if (
|
||||||
|
(poll := getattr(proc, 'poll', None))
|
||||||
|
and poll() is None
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Root actor reports no-more-peers, BUT\n'
|
||||||
|
'a DISCONNECTED child still has the debug '
|
||||||
|
'lock!\n\n'
|
||||||
|
# f'root uid: {actor.uid}\n'
|
||||||
|
f'last disconnected child uid: {uid}\n'
|
||||||
|
f'locking child uid: {pdb_user_uid}\n'
|
||||||
|
)
|
||||||
|
await _debug.maybe_wait_for_debugger(
|
||||||
|
child_in_debug=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: just bc a child's transport dropped
|
||||||
|
# doesn't mean it's not still using the pdb
|
||||||
|
# REPL! so,
|
||||||
|
# -[ ] ideally we can check out child proc
|
||||||
|
# tree to ensure that its alive (and
|
||||||
|
# actually using the REPL) before we cancel
|
||||||
|
# it's lock acquire by doing the below!
|
||||||
|
# -[ ] create a way to read the tree of each actor's
|
||||||
|
# grandchildren such that when an
|
||||||
|
# intermediary parent is cancelled but their
|
||||||
|
# child has locked the tty, the grandparent
|
||||||
|
# will not allow the parent to cancel or
|
||||||
|
# zombie reap the child! see open issue:
|
||||||
|
# - https://github.com/goodboy/tractor/issues/320
|
||||||
|
# ------ - ------
|
||||||
|
# if a now stale local task has the TTY lock still
|
||||||
|
# we cancel it to allow servicing other requests for
|
||||||
|
# the lock.
|
||||||
|
if (
|
||||||
|
(db_cs := pdb_lock.get_locking_task_cs())
|
||||||
|
and not db_cs.cancel_called
|
||||||
|
and uid == pdb_user_uid
|
||||||
|
):
|
||||||
|
log.critical(
|
||||||
|
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
||||||
|
)
|
||||||
|
# TODO: figure out why this breaks tests..
|
||||||
|
db_cs.cancel()
|
||||||
|
|
||||||
|
log.runtime(con_teardown_status)
|
||||||
|
# finally block closure
|
||||||
|
|
||||||
|
|
||||||
class IPCEndpoint(Struct):
|
class IPCEndpoint(Struct):
|
||||||
'''
|
'''
|
||||||
An instance of an IPC "bound" address where the lifetime of the
|
An instance of an IPC "bound" address where the lifetime of the
|
||||||
|
@ -120,8 +639,23 @@ class IPCEndpoint(Struct):
|
||||||
class IPCServer(Struct):
|
class IPCServer(Struct):
|
||||||
_parent_tn: Nursery
|
_parent_tn: Nursery
|
||||||
_stream_handler_tn: Nursery
|
_stream_handler_tn: Nursery
|
||||||
|
# level-triggered sig for whether "no peers are currently
|
||||||
|
# connected"; field is **always** set to an instance but
|
||||||
|
# initialized with `.is_set() == True`.
|
||||||
|
_no_more_peers: trio.Event
|
||||||
|
|
||||||
_endpoints: list[IPCEndpoint] = []
|
_endpoints: list[IPCEndpoint] = []
|
||||||
|
|
||||||
|
# connection tracking & mgmt
|
||||||
|
_peers: defaultdict[
|
||||||
|
str, # uaid
|
||||||
|
list[Channel], # IPC conns from peer
|
||||||
|
] = defaultdict(list)
|
||||||
|
_peer_connected: dict[
|
||||||
|
tuple[str, str],
|
||||||
|
trio.Event,
|
||||||
|
] = {}
|
||||||
|
|
||||||
# syncs for setup/teardown sequences
|
# syncs for setup/teardown sequences
|
||||||
_shutdown: trio.Event|None = None
|
_shutdown: trio.Event|None = None
|
||||||
|
|
||||||
|
@ -183,6 +717,65 @@ class IPCServer(Struct):
|
||||||
f'protos: {tpt_protos!r}\n'
|
f'protos: {tpt_protos!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def has_peers(
|
||||||
|
self,
|
||||||
|
check_chans: bool = False,
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Predicate for "are there any active peer IPC `Channel`s at the moment?"
|
||||||
|
|
||||||
|
'''
|
||||||
|
has_peers: bool = not self._no_more_peers.is_set()
|
||||||
|
if (
|
||||||
|
has_peers
|
||||||
|
and
|
||||||
|
check_chans
|
||||||
|
):
|
||||||
|
has_peers: bool = (
|
||||||
|
any(chan.connected()
|
||||||
|
for chan in chain(
|
||||||
|
*self._peers.values()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
and
|
||||||
|
has_peers
|
||||||
|
)
|
||||||
|
|
||||||
|
return has_peers
|
||||||
|
|
||||||
|
async def wait_for_no_more_peers(
|
||||||
|
self,
|
||||||
|
shield: bool = False,
|
||||||
|
) -> None:
|
||||||
|
with trio.CancelScope(shield=shield):
|
||||||
|
await self._no_more_peers.wait()
|
||||||
|
|
||||||
|
async def wait_for_peer(
|
||||||
|
self,
|
||||||
|
uid: tuple[str, str],
|
||||||
|
|
||||||
|
) -> tuple[trio.Event, Channel]:
|
||||||
|
'''
|
||||||
|
Wait for a connection back from a (spawned sub-)actor with
|
||||||
|
a `uid` using a `trio.Event`.
|
||||||
|
|
||||||
|
Returns a pair of the event and the "last" registered IPC
|
||||||
|
`Channel` for the peer with `uid`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
log.debug(f'Waiting for peer {uid!r} to connect')
|
||||||
|
event: trio.Event = self._peer_connected.setdefault(
|
||||||
|
uid,
|
||||||
|
trio.Event(),
|
||||||
|
)
|
||||||
|
await event.wait()
|
||||||
|
log.debug(f'{uid!r} successfully connected back to us')
|
||||||
|
mru_chan: Channel = self._peers[uid][-1]
|
||||||
|
return (
|
||||||
|
event,
|
||||||
|
mru_chan,
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def addrs(self) -> list[Address]:
|
def addrs(self) -> list[Address]:
|
||||||
return [ep.addr for ep in self._endpoints]
|
return [ep.addr for ep in self._endpoints]
|
||||||
|
@ -211,17 +804,27 @@ class IPCServer(Struct):
|
||||||
return ev.is_set()
|
return ev.is_set()
|
||||||
|
|
||||||
def pformat(self) -> str:
|
def pformat(self) -> str:
|
||||||
|
eps: list[IPCEndpoint] = self._endpoints
|
||||||
|
|
||||||
fmtstr: str = (
|
state_repr: str = (
|
||||||
f' |_endpoints: {self._endpoints}\n'
|
f'{len(eps)!r} IPC-endpoints active'
|
||||||
|
)
|
||||||
|
fmtstr = (
|
||||||
|
f' |_state: {state_repr}\n'
|
||||||
|
f' no_more_peers: {self.has_peers()}\n'
|
||||||
)
|
)
|
||||||
if self._shutdown is not None:
|
if self._shutdown is not None:
|
||||||
shutdown_stats: EventStatistics = self._shutdown.statistics()
|
shutdown_stats: EventStatistics = self._shutdown.statistics()
|
||||||
fmtstr += (
|
fmtstr += (
|
||||||
f'\n'
|
f' task_waiting_on_shutdown: {shutdown_stats}\n'
|
||||||
f' |_shutdown: {shutdown_stats}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
fmtstr += (
|
||||||
|
# TODO, use the `ppfmt()` helper from `modden`!
|
||||||
|
f' |_endpoints: {pformat(self._endpoints)}\n'
|
||||||
|
f' |_peers: {len(self._peers)} connected\n'
|
||||||
|
)
|
||||||
|
|
||||||
return (
|
return (
|
||||||
f'<IPCServer(\n'
|
f'<IPCServer(\n'
|
||||||
f'{fmtstr}'
|
f'{fmtstr}'
|
||||||
|
@ -249,7 +852,6 @@ class IPCServer(Struct):
|
||||||
async def listen_on(
|
async def listen_on(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
actor: Actor,
|
|
||||||
accept_addrs: list[tuple[str, int|str]]|None = None,
|
accept_addrs: list[tuple[str, int|str]]|None = None,
|
||||||
stream_handler_nursery: Nursery|None = None,
|
stream_handler_nursery: Nursery|None = None,
|
||||||
) -> list[IPCEndpoint]:
|
) -> list[IPCEndpoint]:
|
||||||
|
@ -282,20 +884,19 @@ class IPCServer(Struct):
|
||||||
f'{self}\n'
|
f'{self}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Binding to endpoints for,\n'
|
f'Binding to endpoints for,\n'
|
||||||
f'{accept_addrs}\n'
|
f'{accept_addrs}\n'
|
||||||
)
|
)
|
||||||
eps: list[IPCEndpoint] = await self._parent_tn.start(
|
eps: list[IPCEndpoint] = await self._parent_tn.start(
|
||||||
partial(
|
partial(
|
||||||
_serve_ipc_eps,
|
_serve_ipc_eps,
|
||||||
actor=actor,
|
|
||||||
server=self,
|
server=self,
|
||||||
stream_handler_tn=stream_handler_nursery,
|
stream_handler_tn=stream_handler_nursery,
|
||||||
listen_addrs=accept_addrs,
|
listen_addrs=accept_addrs,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Started IPC endpoints\n'
|
f'Started IPC endpoints\n'
|
||||||
f'{eps}\n'
|
f'{eps}\n'
|
||||||
)
|
)
|
||||||
|
@ -318,7 +919,6 @@ class IPCServer(Struct):
|
||||||
|
|
||||||
async def _serve_ipc_eps(
|
async def _serve_ipc_eps(
|
||||||
*,
|
*,
|
||||||
actor: Actor,
|
|
||||||
server: IPCServer,
|
server: IPCServer,
|
||||||
stream_handler_tn: Nursery,
|
stream_handler_tn: Nursery,
|
||||||
listen_addrs: list[tuple[str, int|str]],
|
listen_addrs: list[tuple[str, int|str]],
|
||||||
|
@ -352,12 +952,13 @@ async def _serve_ipc_eps(
|
||||||
stream_handler_tn=stream_handler_tn,
|
stream_handler_tn=stream_handler_tn,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Starting new endpoint listener\n'
|
f'Starting new endpoint listener\n'
|
||||||
f'{ep}\n'
|
f'{ep}\n'
|
||||||
)
|
)
|
||||||
listener: trio.abc.Listener = await ep.start_listener()
|
listener: trio.abc.Listener = await ep.start_listener()
|
||||||
assert listener is ep._listener
|
assert listener is ep._listener
|
||||||
|
# actor = _state.current_actor()
|
||||||
# if actor.is_registry:
|
# if actor.is_registry:
|
||||||
# import pdbp; pdbp.set_trace()
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
|
@ -379,7 +980,10 @@ async def _serve_ipc_eps(
|
||||||
_listeners: list[SocketListener] = await listen_tn.start(
|
_listeners: list[SocketListener] = await listen_tn.start(
|
||||||
partial(
|
partial(
|
||||||
trio.serve_listeners,
|
trio.serve_listeners,
|
||||||
handler=actor._stream_handler,
|
handler=partial(
|
||||||
|
handle_stream_from_peer,
|
||||||
|
server=server,
|
||||||
|
),
|
||||||
listeners=listeners,
|
listeners=listeners,
|
||||||
|
|
||||||
# NOTE: configured such that new
|
# NOTE: configured such that new
|
||||||
|
@ -389,13 +993,13 @@ async def _serve_ipc_eps(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# TODO, wow make this message better! XD
|
# TODO, wow make this message better! XD
|
||||||
log.info(
|
log.runtime(
|
||||||
'Started server(s)\n'
|
'Started server(s)\n'
|
||||||
+
|
+
|
||||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Started IPC endpoints\n'
|
f'Started IPC endpoints\n'
|
||||||
f'{eps}\n'
|
f'{eps}\n'
|
||||||
)
|
)
|
||||||
|
@ -411,6 +1015,7 @@ async def _serve_ipc_eps(
|
||||||
ep.close_listener()
|
ep.close_listener()
|
||||||
server._endpoints.remove(ep)
|
server._endpoints.remove(ep)
|
||||||
|
|
||||||
|
# actor = _state.current_actor()
|
||||||
# if actor.is_arbiter:
|
# if actor.is_arbiter:
|
||||||
# import pdbp; pdbp.set_trace()
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
|
@ -421,7 +1026,6 @@ async def _serve_ipc_eps(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_ipc_server(
|
async def open_ipc_server(
|
||||||
actor: Actor,
|
|
||||||
parent_tn: Nursery|None = None,
|
parent_tn: Nursery|None = None,
|
||||||
stream_handler_tn: Nursery|None = None,
|
stream_handler_tn: Nursery|None = None,
|
||||||
|
|
||||||
|
@ -430,20 +1034,28 @@ async def open_ipc_server(
|
||||||
async with maybe_open_nursery(
|
async with maybe_open_nursery(
|
||||||
nursery=parent_tn,
|
nursery=parent_tn,
|
||||||
) as rent_tn:
|
) as rent_tn:
|
||||||
|
no_more_peers = trio.Event()
|
||||||
|
no_more_peers.set()
|
||||||
|
|
||||||
ipc_server = IPCServer(
|
ipc_server = IPCServer(
|
||||||
_parent_tn=rent_tn,
|
_parent_tn=rent_tn,
|
||||||
_stream_handler_tn=stream_handler_tn or rent_tn,
|
_stream_handler_tn=stream_handler_tn or rent_tn,
|
||||||
|
_no_more_peers=no_more_peers,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
yield ipc_server
|
yield ipc_server
|
||||||
|
log.runtime(
|
||||||
|
f'Waiting on server to shutdown or be cancelled..\n'
|
||||||
|
f'{ipc_server}'
|
||||||
|
)
|
||||||
|
# TODO? when if ever would we want/need this?
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# await ipc_server.wait_for_shutdown()
|
||||||
|
|
||||||
# except BaseException as berr:
|
except BaseException as berr:
|
||||||
# log.exception(
|
log.exception(
|
||||||
# 'IPC server crashed on exit ?'
|
'IPC server caller crashed ??'
|
||||||
# )
|
)
|
||||||
# raise berr
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# ?TODO, maybe we can ensure the endpoints are torndown
|
# ?TODO, maybe we can ensure the endpoints are torndown
|
||||||
# (and thus their managed listeners) beforehand to ensure
|
# (and thus their managed listeners) beforehand to ensure
|
||||||
# super graceful RPC mechanics?
|
# super graceful RPC mechanics?
|
||||||
|
@ -451,17 +1063,5 @@ async def open_ipc_server(
|
||||||
# -[ ] but aren't we doing that already per-`listen_tn`
|
# -[ ] but aren't we doing that already per-`listen_tn`
|
||||||
# inside `_serve_ipc_eps()` above?
|
# inside `_serve_ipc_eps()` above?
|
||||||
#
|
#
|
||||||
# if not ipc_server.is_shutdown():
|
# ipc_server.cancel()
|
||||||
# ipc_server.cancel()
|
raise berr
|
||||||
# await ipc_server.wait_for_shutdown()
|
|
||||||
# assert ipc_server.is_shutdown()
|
|
||||||
pass
|
|
||||||
|
|
||||||
# !XXX TODO! lol so classic, the below code is rekt!
|
|
||||||
#
|
|
||||||
# XXX here is a perfect example of suppressing errors with
|
|
||||||
# `trio.Cancelled` as per our demonstrating example,
|
|
||||||
# `test_trioisms::test_acm_embedded_nursery_propagates_enter_err
|
|
||||||
#
|
|
||||||
# with trio.CancelScope(shield=True):
|
|
||||||
# await ipc_server.wait_for_shutdown()
|
|
||||||
|
|
|
@ -127,6 +127,11 @@ async def start_listener(
|
||||||
Start a TCP socket listener on the given `TCPAddress`.
|
Start a TCP socket listener on the given `TCPAddress`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
log.info(
|
||||||
|
f'Attempting to bind TCP socket\n'
|
||||||
|
f'>[\n'
|
||||||
|
f'|_{addr}\n'
|
||||||
|
)
|
||||||
# ?TODO, maybe we should just change the lower-level call this is
|
# ?TODO, maybe we should just change the lower-level call this is
|
||||||
# using internall per-listener?
|
# using internall per-listener?
|
||||||
listeners: list[SocketListener] = await open_tcp_listeners(
|
listeners: list[SocketListener] = await open_tcp_listeners(
|
||||||
|
@ -140,6 +145,12 @@ async def start_listener(
|
||||||
assert len(listeners) == 1
|
assert len(listeners) == 1
|
||||||
listener = listeners[0]
|
listener = listeners[0]
|
||||||
host, port = listener.socket.getsockname()[:2]
|
host, port = listener.socket.getsockname()[:2]
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f'Listening on TCP socket\n'
|
||||||
|
f'[>\n'
|
||||||
|
f' |_{addr}\n'
|
||||||
|
)
|
||||||
return listener
|
return listener
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue