Mv `Actor._stream_handler()` to `.ipc._server` func
Call it `handle_stream_from_peer()` and bind in the `actor: Actor` via a `handler=partial()` to `trio.serve_listeners()`. With this (minus the `Actor._peers/._peer_connected/._no_more_peers` attrs ofc) we get nearly full separation of IPC-connection-processing (concerns) from `Actor` state. Thus it's a first look at modularizing the low-level runtime into isolated subsystems which will hopefully improve the entire code base's grok-ability and ease any new feature design discussions especially pertaining to introducing and/or composing-together any new transport protocols.structural_dynamics_of_flow
parent
1ccb14455d
commit
42cf9e11a4
|
@ -578,10 +578,10 @@ async def open_portal(
|
||||||
|
|
||||||
msg_loop_cs: trio.CancelScope|None = None
|
msg_loop_cs: trio.CancelScope|None = None
|
||||||
if start_msg_loop:
|
if start_msg_loop:
|
||||||
from ._runtime import process_messages
|
from . import _rpc
|
||||||
msg_loop_cs = await tn.start(
|
msg_loop_cs = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
process_messages,
|
_rpc.process_messages,
|
||||||
actor,
|
actor,
|
||||||
channel,
|
channel,
|
||||||
# if the local task is cancelled we want to keep
|
# if the local task is cancelled we want to keep
|
||||||
|
|
|
@ -96,18 +96,13 @@ from ._exceptions import (
|
||||||
ModuleNotExposed,
|
ModuleNotExposed,
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
TransportClosed,
|
|
||||||
)
|
)
|
||||||
from .devx import _debug
|
from .devx import _debug
|
||||||
from ._discovery import get_registry
|
from ._discovery import get_registry
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from . import _state
|
from . import _state
|
||||||
from . import _mp_fixup_main
|
from . import _mp_fixup_main
|
||||||
from ._rpc import (
|
from . import _rpc
|
||||||
process_messages,
|
|
||||||
try_ship_error_to_remote,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._supervise import ActorNursery
|
from ._supervise import ActorNursery
|
||||||
|
@ -493,434 +488,6 @@ class Actor:
|
||||||
|
|
||||||
raise mne
|
raise mne
|
||||||
|
|
||||||
# TODO: maybe change to mod-func and rename for implied
|
|
||||||
# multi-transport semantics?
|
|
||||||
async def _stream_handler(
|
|
||||||
self,
|
|
||||||
stream: trio.SocketStream,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
'''
|
|
||||||
Entry point for new inbound IPC connections on a specific
|
|
||||||
transport server.
|
|
||||||
|
|
||||||
'''
|
|
||||||
self._no_more_peers = trio.Event() # unset by making new
|
|
||||||
# with _debug.maybe_open_crash_handler(
|
|
||||||
# pdb=True,
|
|
||||||
# ) as boxerr:
|
|
||||||
chan = Channel.from_stream(stream)
|
|
||||||
con_status: str = (
|
|
||||||
'New inbound IPC connection <=\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# send/receive initial handshake response
|
|
||||||
try:
|
|
||||||
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
|
||||||
aid=self.aid,
|
|
||||||
)
|
|
||||||
except (
|
|
||||||
TransportClosed,
|
|
||||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
|
||||||
# during various `SocketStream.send/receive_xx()` calls
|
|
||||||
# under different fault conditions such as,
|
|
||||||
#
|
|
||||||
# trio.BrokenResourceError,
|
|
||||||
# trio.ClosedResourceError,
|
|
||||||
#
|
|
||||||
# Inside our `.ipc._transport` layer we absorb and
|
|
||||||
# re-raise our own `TransportClosed` exc such that this
|
|
||||||
# higher level runtime code can only worry one
|
|
||||||
# "kinda-error" that we expect to tolerate during
|
|
||||||
# discovery-sys related pings, queires, DoS etc.
|
|
||||||
):
|
|
||||||
# XXX: This may propagate up from `Channel._aiter_recv()`
|
|
||||||
# and `MsgpackStream._inter_packets()` on a read from the
|
|
||||||
# stream particularly when the runtime is first starting up
|
|
||||||
# inside `open_root_actor()` where there is a check for
|
|
||||||
# a bound listener on the "arbiter" addr. the reset will be
|
|
||||||
# because the handshake was never meant took place.
|
|
||||||
log.runtime(
|
|
||||||
con_status
|
|
||||||
+
|
|
||||||
' -> But failed to handshake? Ignoring..\n'
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
uid: tuple[str, str] = (
|
|
||||||
peer_aid.name,
|
|
||||||
peer_aid.uuid,
|
|
||||||
)
|
|
||||||
# TODO, can we make this downstream peer tracking use the
|
|
||||||
# `peer_aid` instead?
|
|
||||||
familiar: str = 'new-peer'
|
|
||||||
if _pre_chan := self._peers.get(uid):
|
|
||||||
familiar: str = 'pre-existing-peer'
|
|
||||||
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
|
||||||
con_status += (
|
|
||||||
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
if _pre_chan:
|
|
||||||
# con_status += (
|
|
||||||
# ^TODO^ swap once we minimize conn duplication
|
|
||||||
# -[ ] last thing might be reg/unreg runtime reqs?
|
|
||||||
# log.warning(
|
|
||||||
log.debug(
|
|
||||||
f'?Wait?\n'
|
|
||||||
f'We already have IPC with peer {uid_short!r}\n'
|
|
||||||
f'|_{_pre_chan}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# IPC connection tracking for both peers and new children:
|
|
||||||
# - if this is a new channel to a locally spawned
|
|
||||||
# sub-actor there will be a spawn wait even registered
|
|
||||||
# by a call to `.wait_for_peer()`.
|
|
||||||
# - if a peer is connecting no such event will exit.
|
|
||||||
event: trio.Event|None = self._peer_connected.pop(
|
|
||||||
uid,
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
if event:
|
|
||||||
con_status += (
|
|
||||||
' -> Waking subactor spawn waiters: '
|
|
||||||
f'{event.statistics().tasks_waiting}\n'
|
|
||||||
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
|
|
||||||
# f' {event}\n'
|
|
||||||
# f' |{event.statistics()}\n'
|
|
||||||
)
|
|
||||||
# wake tasks waiting on this IPC-transport "connect-back"
|
|
||||||
event.set()
|
|
||||||
|
|
||||||
else:
|
|
||||||
con_status += (
|
|
||||||
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
|
|
||||||
) # type: ignore
|
|
||||||
|
|
||||||
chans: list[Channel] = self._peers[uid]
|
|
||||||
# if chans:
|
|
||||||
# # TODO: re-use channels for new connections instead
|
|
||||||
# # of always new ones?
|
|
||||||
# # => will require changing all the discovery funcs..
|
|
||||||
|
|
||||||
# append new channel
|
|
||||||
# TODO: can we just use list-ref directly?
|
|
||||||
chans.append(chan)
|
|
||||||
|
|
||||||
con_status += ' -> Entering RPC msg loop..\n'
|
|
||||||
log.runtime(con_status)
|
|
||||||
|
|
||||||
# Begin channel management - respond to remote requests and
|
|
||||||
# process received reponses.
|
|
||||||
disconnected: bool = False
|
|
||||||
last_msg: MsgType
|
|
||||||
try:
|
|
||||||
(
|
|
||||||
disconnected,
|
|
||||||
last_msg,
|
|
||||||
) = await process_messages(
|
|
||||||
self,
|
|
||||||
chan,
|
|
||||||
)
|
|
||||||
except trio.Cancelled:
|
|
||||||
log.cancel(
|
|
||||||
'IPC transport msg loop was cancelled\n'
|
|
||||||
f'c)>\n'
|
|
||||||
f' |_{chan}\n'
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
|
|
||||||
finally:
|
|
||||||
local_nursery: (
|
|
||||||
ActorNursery|None
|
|
||||||
) = self._actoruid2nursery.get(uid)
|
|
||||||
|
|
||||||
# This is set in ``Portal.cancel_actor()``. So if
|
|
||||||
# the peer was cancelled we try to wait for them
|
|
||||||
# to tear down their side of the connection before
|
|
||||||
# moving on with closing our own side.
|
|
||||||
if (
|
|
||||||
local_nursery
|
|
||||||
and (
|
|
||||||
self._cancel_called
|
|
||||||
or
|
|
||||||
chan._cancel_called
|
|
||||||
)
|
|
||||||
#
|
|
||||||
# ^-TODO-^ along with this is there another condition
|
|
||||||
# that we should filter with to avoid entering this
|
|
||||||
# waiting block needlessly?
|
|
||||||
# -[ ] maybe `and local_nursery.cancelled` and/or
|
|
||||||
# only if the `._children` table is empty or has
|
|
||||||
# only `Portal`s with .chan._cancel_called ==
|
|
||||||
# True` as per what we had below; the MAIN DIFF
|
|
||||||
# BEING that just bc one `Portal.cancel_actor()`
|
|
||||||
# was called, doesn't mean the whole actor-nurse
|
|
||||||
# is gonna exit any time soon right!?
|
|
||||||
#
|
|
||||||
# or
|
|
||||||
# all(chan._cancel_called for chan in chans)
|
|
||||||
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
'Waiting on cancel request to peer..\n'
|
|
||||||
f'c)=>\n'
|
|
||||||
f' |_{chan.uid}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: this is a soft wait on the channel (and its
|
|
||||||
# underlying transport protocol) to close from the
|
|
||||||
# remote peer side since we presume that any channel
|
|
||||||
# which is mapped to a sub-actor (i.e. it's managed
|
|
||||||
# by local actor-nursery) has a message that is sent
|
|
||||||
# to the peer likely by this actor (which may be in
|
|
||||||
# a shutdown sequence due to cancellation) when the
|
|
||||||
# local runtime here is now cancelled while
|
|
||||||
# (presumably) in the middle of msg loop processing.
|
|
||||||
chan_info: str = (
|
|
||||||
f'{chan.uid}\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
f' |_{chan.transport}\n\n'
|
|
||||||
)
|
|
||||||
with trio.move_on_after(0.5) as drain_cs:
|
|
||||||
drain_cs.shield = True
|
|
||||||
|
|
||||||
# attempt to wait for the far end to close the
|
|
||||||
# channel and bail after timeout (a 2-generals
|
|
||||||
# problem on closure).
|
|
||||||
assert chan.transport
|
|
||||||
async for msg in chan.transport.drain():
|
|
||||||
|
|
||||||
# try to deliver any lingering msgs
|
|
||||||
# before we destroy the channel.
|
|
||||||
# This accomplishes deterministic
|
|
||||||
# ``Portal.cancel_actor()`` cancellation by
|
|
||||||
# making sure any RPC response to that call is
|
|
||||||
# delivered the local calling task.
|
|
||||||
# TODO: factor this into a helper?
|
|
||||||
log.warning(
|
|
||||||
'Draining msg from disconnected peer\n'
|
|
||||||
f'{chan_info}'
|
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
|
||||||
# cid: str|None = msg.get('cid')
|
|
||||||
cid: str|None = msg.cid
|
|
||||||
if cid:
|
|
||||||
# deliver response to local caller/waiter
|
|
||||||
await self._deliver_ctx_payload(
|
|
||||||
chan,
|
|
||||||
cid,
|
|
||||||
msg,
|
|
||||||
)
|
|
||||||
if drain_cs.cancelled_caught:
|
|
||||||
log.warning(
|
|
||||||
'Timed out waiting on IPC transport channel to drain?\n'
|
|
||||||
f'{chan_info}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX NOTE XXX when no explicit call to
|
|
||||||
# `open_root_actor()` was made by the application
|
|
||||||
# (normally we implicitly make that call inside
|
|
||||||
# the first `.open_nursery()` in root-actor
|
|
||||||
# user/app code), we can assume that either we
|
|
||||||
# are NOT the root actor or are root but the
|
|
||||||
# runtime was started manually. and thus DO have
|
|
||||||
# to wait for the nursery-enterer to exit before
|
|
||||||
# shutting down the local runtime to avoid
|
|
||||||
# clobbering any ongoing subactor
|
|
||||||
# teardown/debugging/graceful-cancel.
|
|
||||||
#
|
|
||||||
# see matching note inside `._supervise.open_nursery()`
|
|
||||||
#
|
|
||||||
# TODO: should we have a separate cs + timeout
|
|
||||||
# block here?
|
|
||||||
if (
|
|
||||||
# XXX SO either,
|
|
||||||
# - not root OR,
|
|
||||||
# - is root but `open_root_actor()` was
|
|
||||||
# entered manually (in which case we do
|
|
||||||
# the equiv wait there using the
|
|
||||||
# `devx._debug` sub-sys APIs).
|
|
||||||
not local_nursery._implicit_runtime_started
|
|
||||||
):
|
|
||||||
log.runtime(
|
|
||||||
'Waiting on local actor nursery to exit..\n'
|
|
||||||
f'|_{local_nursery}\n'
|
|
||||||
)
|
|
||||||
with trio.move_on_after(0.5) as an_exit_cs:
|
|
||||||
an_exit_cs.shield = True
|
|
||||||
await local_nursery.exited.wait()
|
|
||||||
|
|
||||||
# TODO: currently this is always triggering for every
|
|
||||||
# sub-daemon spawned from the `piker.services._mngr`?
|
|
||||||
# -[ ] how do we ensure that the IPC is supposed to
|
|
||||||
# be long lived and isn't just a register?
|
|
||||||
# |_ in the register case how can we signal that the
|
|
||||||
# ephemeral msg loop was intentional?
|
|
||||||
if (
|
|
||||||
# not local_nursery._implicit_runtime_started
|
|
||||||
# and
|
|
||||||
an_exit_cs.cancelled_caught
|
|
||||||
):
|
|
||||||
report: str = (
|
|
||||||
'Timed out waiting on local actor-nursery to exit?\n'
|
|
||||||
f'c)>\n'
|
|
||||||
f' |_{local_nursery}\n'
|
|
||||||
)
|
|
||||||
if children := local_nursery._children:
|
|
||||||
# indent from above local-nurse repr
|
|
||||||
report += (
|
|
||||||
f' |_{pformat(children)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
log.warning(report)
|
|
||||||
|
|
||||||
if disconnected:
|
|
||||||
# if the transport died and this actor is still
|
|
||||||
# registered within a local nursery, we report
|
|
||||||
# that the IPC layer may have failed
|
|
||||||
# unexpectedly since it may be the cause of
|
|
||||||
# other downstream errors.
|
|
||||||
entry: tuple|None = local_nursery._children.get(uid)
|
|
||||||
if entry:
|
|
||||||
proc: trio.Process
|
|
||||||
_, proc, _ = entry
|
|
||||||
|
|
||||||
if (
|
|
||||||
(poll := getattr(proc, 'poll', None))
|
|
||||||
and
|
|
||||||
poll() is None # proc still alive
|
|
||||||
):
|
|
||||||
# TODO: change log level based on
|
|
||||||
# detecting whether chan was created for
|
|
||||||
# ephemeral `.register_actor()` request!
|
|
||||||
# -[ ] also, that should be avoidable by
|
|
||||||
# re-using any existing chan from the
|
|
||||||
# `._discovery.get_registry()` call as
|
|
||||||
# well..
|
|
||||||
log.runtime(
|
|
||||||
f'Peer IPC broke but subproc is alive?\n\n'
|
|
||||||
|
|
||||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
|
||||||
f' |_{proc}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# ``Channel`` teardown and closure sequence
|
|
||||||
# drop ref to channel so it can be gc-ed and disconnected
|
|
||||||
con_teardown_status: str = (
|
|
||||||
f'IPC channel disconnected:\n'
|
|
||||||
f'<=x uid: {chan.uid}\n'
|
|
||||||
f' |_{pformat(chan)}\n\n'
|
|
||||||
)
|
|
||||||
chans.remove(chan)
|
|
||||||
|
|
||||||
# TODO: do we need to be this pedantic?
|
|
||||||
if not chans:
|
|
||||||
con_teardown_status += (
|
|
||||||
f'-> No more channels with {chan.uid}'
|
|
||||||
)
|
|
||||||
self._peers.pop(uid, None)
|
|
||||||
|
|
||||||
peers_str: str = ''
|
|
||||||
for uid, chans in self._peers.items():
|
|
||||||
peers_str += (
|
|
||||||
f'uid: {uid}\n'
|
|
||||||
)
|
|
||||||
for i, chan in enumerate(chans):
|
|
||||||
peers_str += (
|
|
||||||
f' |_[{i}] {pformat(chan)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
con_teardown_status += (
|
|
||||||
f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# No more channels to other actors (at all) registered
|
|
||||||
# as connected.
|
|
||||||
if not self._peers:
|
|
||||||
con_teardown_status += (
|
|
||||||
'Signalling no more peer channel connections'
|
|
||||||
)
|
|
||||||
self._no_more_peers.set()
|
|
||||||
|
|
||||||
# NOTE: block this actor from acquiring the
|
|
||||||
# debugger-TTY-lock since we have no way to know if we
|
|
||||||
# cancelled it and further there is no way to ensure the
|
|
||||||
# lock will be released if acquired due to having no
|
|
||||||
# more active IPC channels.
|
|
||||||
if _state.is_root_process():
|
|
||||||
pdb_lock = _debug.Lock
|
|
||||||
pdb_lock._blocked.add(uid)
|
|
||||||
|
|
||||||
# TODO: NEEEDS TO BE TESTED!
|
|
||||||
# actually, no idea if this ever even enters.. XD
|
|
||||||
#
|
|
||||||
# XXX => YES IT DOES, when i was testing ctl-c
|
|
||||||
# from broken debug TTY locking due to
|
|
||||||
# msg-spec races on application using RunVar...
|
|
||||||
if (
|
|
||||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
|
||||||
and
|
|
||||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
|
||||||
and
|
|
||||||
local_nursery
|
|
||||||
):
|
|
||||||
entry: tuple|None = local_nursery._children.get(
|
|
||||||
tuple(pdb_user_uid)
|
|
||||||
)
|
|
||||||
if entry:
|
|
||||||
proc: trio.Process
|
|
||||||
_, proc, _ = entry
|
|
||||||
|
|
||||||
if (
|
|
||||||
(poll := getattr(proc, 'poll', None))
|
|
||||||
and poll() is None
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
'Root actor reports no-more-peers, BUT\n'
|
|
||||||
'a DISCONNECTED child still has the debug '
|
|
||||||
'lock!\n\n'
|
|
||||||
# f'root uid: {self.uid}\n'
|
|
||||||
f'last disconnected child uid: {uid}\n'
|
|
||||||
f'locking child uid: {pdb_user_uid}\n'
|
|
||||||
)
|
|
||||||
await _debug.maybe_wait_for_debugger(
|
|
||||||
child_in_debug=True
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: just bc a child's transport dropped
|
|
||||||
# doesn't mean it's not still using the pdb
|
|
||||||
# REPL! so,
|
|
||||||
# -[ ] ideally we can check out child proc
|
|
||||||
# tree to ensure that its alive (and
|
|
||||||
# actually using the REPL) before we cancel
|
|
||||||
# it's lock acquire by doing the below!
|
|
||||||
# -[ ] create a way to read the tree of each actor's
|
|
||||||
# grandchildren such that when an
|
|
||||||
# intermediary parent is cancelled but their
|
|
||||||
# child has locked the tty, the grandparent
|
|
||||||
# will not allow the parent to cancel or
|
|
||||||
# zombie reap the child! see open issue:
|
|
||||||
# - https://github.com/goodboy/tractor/issues/320
|
|
||||||
# ------ - ------
|
|
||||||
# if a now stale local task has the TTY lock still
|
|
||||||
# we cancel it to allow servicing other requests for
|
|
||||||
# the lock.
|
|
||||||
if (
|
|
||||||
(db_cs := pdb_lock.get_locking_task_cs())
|
|
||||||
and not db_cs.cancel_called
|
|
||||||
and uid == pdb_user_uid
|
|
||||||
):
|
|
||||||
log.critical(
|
|
||||||
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
|
||||||
)
|
|
||||||
# TODO: figure out why this breaks tests..
|
|
||||||
db_cs.cancel()
|
|
||||||
|
|
||||||
log.runtime(con_teardown_status)
|
|
||||||
# finally block closure
|
|
||||||
|
|
||||||
# TODO: rename to `._deliver_payload()` since this handles
|
# TODO: rename to `._deliver_payload()` since this handles
|
||||||
# more then just `result` msgs now obvi XD
|
# more then just `result` msgs now obvi XD
|
||||||
async def _deliver_ctx_payload(
|
async def _deliver_ctx_payload(
|
||||||
|
@ -1916,7 +1483,7 @@ async def async_main(
|
||||||
if actor._parent_chan:
|
if actor._parent_chan:
|
||||||
await root_nursery.start(
|
await root_nursery.start(
|
||||||
partial(
|
partial(
|
||||||
process_messages,
|
_rpc.process_messages,
|
||||||
actor,
|
actor,
|
||||||
actor._parent_chan,
|
actor._parent_chan,
|
||||||
shield=True,
|
shield=True,
|
||||||
|
@ -1959,7 +1526,7 @@ async def async_main(
|
||||||
log.exception(err_report)
|
log.exception(err_report)
|
||||||
|
|
||||||
if actor._parent_chan:
|
if actor._parent_chan:
|
||||||
await try_ship_error_to_remote(
|
await _rpc.try_ship_error_to_remote(
|
||||||
actor._parent_chan,
|
actor._parent_chan,
|
||||||
internal_err,
|
internal_err,
|
||||||
)
|
)
|
||||||
|
|
|
@ -24,6 +24,7 @@ from contextlib import (
|
||||||
)
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import inspect
|
import inspect
|
||||||
|
from pprint import pformat
|
||||||
from types import (
|
from types import (
|
||||||
ModuleType,
|
ModuleType,
|
||||||
)
|
)
|
||||||
|
@ -40,24 +41,484 @@ from trio import (
|
||||||
SocketListener,
|
SocketListener,
|
||||||
)
|
)
|
||||||
|
|
||||||
from ..msg import Struct
|
from ..devx import _debug
|
||||||
|
from .._exceptions import (
|
||||||
|
TransportClosed,
|
||||||
|
)
|
||||||
|
from .. import _rpc
|
||||||
|
from ..msg import (
|
||||||
|
MsgType,
|
||||||
|
Struct,
|
||||||
|
types as msgtypes,
|
||||||
|
)
|
||||||
from ..trionics import maybe_open_nursery
|
from ..trionics import maybe_open_nursery
|
||||||
from .. import (
|
from .. import (
|
||||||
_state,
|
_state,
|
||||||
log,
|
log,
|
||||||
)
|
)
|
||||||
from .._addr import Address
|
from .._addr import Address
|
||||||
|
from ._chan import Channel
|
||||||
from ._transport import MsgTransport
|
from ._transport import MsgTransport
|
||||||
from ._uds import UDSAddress
|
from ._uds import UDSAddress
|
||||||
from ._tcp import TCPAddress
|
from ._tcp import TCPAddress
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .._runtime import Actor
|
from .._runtime import Actor
|
||||||
|
from .._supervise import ActorNursery
|
||||||
|
|
||||||
|
|
||||||
log = log.get_logger(__name__)
|
log = log.get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO multi-tpt support with per-proto peer tracking?
|
||||||
|
#
|
||||||
|
# -[x] maybe change to mod-func and rename for implied
|
||||||
|
# multi-transport semantics?
|
||||||
|
#
|
||||||
|
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
|
||||||
|
# so that we can query per tpt all peer contact infos?
|
||||||
|
# |_[ ] possibly provide a global viewing via a
|
||||||
|
# `collections.ChainMap`?
|
||||||
|
#
|
||||||
|
async def handle_stream_from_peer(
|
||||||
|
stream: trio.SocketStream,
|
||||||
|
actor: Actor,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Top-level `trio.abc.Stream` (i.e. normally `trio.SocketStream`)
|
||||||
|
handler-callback as spawn-invoked by `trio.serve_listeners()`.
|
||||||
|
|
||||||
|
Note that each call to this handler is as a spawned task inside
|
||||||
|
any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
|
||||||
|
such that it is invoked as,
|
||||||
|
|
||||||
|
IPCEndpoint.stream_handler_tn.start_soon(
|
||||||
|
handle_stream,
|
||||||
|
stream,
|
||||||
|
)
|
||||||
|
|
||||||
|
'''
|
||||||
|
actor._no_more_peers = trio.Event() # unset by making new
|
||||||
|
|
||||||
|
# TODO, debug_mode tooling for when hackin this lower layer?
|
||||||
|
# with _debug.maybe_open_crash_handler(
|
||||||
|
# pdb=True,
|
||||||
|
# ) as boxerr:
|
||||||
|
|
||||||
|
chan = Channel.from_stream(stream)
|
||||||
|
con_status: str = (
|
||||||
|
'New inbound IPC connection <=\n'
|
||||||
|
f'|_{chan}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# initial handshake with peer phase
|
||||||
|
try:
|
||||||
|
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||||
|
aid=actor.aid,
|
||||||
|
)
|
||||||
|
except (
|
||||||
|
TransportClosed,
|
||||||
|
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||||
|
# during various `SocketStream.send/receive_xx()` calls
|
||||||
|
# under different fault conditions such as,
|
||||||
|
#
|
||||||
|
# trio.BrokenResourceError,
|
||||||
|
# trio.ClosedResourceError,
|
||||||
|
#
|
||||||
|
# Inside our `.ipc._transport` layer we absorb and
|
||||||
|
# re-raise our own `TransportClosed` exc such that this
|
||||||
|
# higher level runtime code can only worry one
|
||||||
|
# "kinda-error" that we expect to tolerate during
|
||||||
|
# discovery-sys related pings, queires, DoS etc.
|
||||||
|
):
|
||||||
|
# XXX: This may propagate up from `Channel._aiter_recv()`
|
||||||
|
# and `MsgpackStream._inter_packets()` on a read from the
|
||||||
|
# stream particularly when the runtime is first starting up
|
||||||
|
# inside `open_root_actor()` where there is a check for
|
||||||
|
# a bound listener on the "arbiter" addr. the reset will be
|
||||||
|
# because the handshake was never meant took place.
|
||||||
|
log.runtime(
|
||||||
|
con_status
|
||||||
|
+
|
||||||
|
' -> But failed to handshake? Ignoring..\n'
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
uid: tuple[str, str] = (
|
||||||
|
peer_aid.name,
|
||||||
|
peer_aid.uuid,
|
||||||
|
)
|
||||||
|
# TODO, can we make this downstream peer tracking use the
|
||||||
|
# `peer_aid` instead?
|
||||||
|
familiar: str = 'new-peer'
|
||||||
|
if _pre_chan := actor._peers.get(uid):
|
||||||
|
familiar: str = 'pre-existing-peer'
|
||||||
|
uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
|
||||||
|
con_status += (
|
||||||
|
f' -> Handshake with {familiar} `{uid_short}` complete\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
if _pre_chan:
|
||||||
|
# con_status += (
|
||||||
|
# ^TODO^ swap once we minimize conn duplication
|
||||||
|
# -[ ] last thing might be reg/unreg runtime reqs?
|
||||||
|
# log.warning(
|
||||||
|
log.debug(
|
||||||
|
f'?Wait?\n'
|
||||||
|
f'We already have IPC with peer {uid_short!r}\n'
|
||||||
|
f'|_{_pre_chan}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# IPC connection tracking for both peers and new children:
|
||||||
|
# - if this is a new channel to a locally spawned
|
||||||
|
# sub-actor there will be a spawn wait even registered
|
||||||
|
# by a call to `.wait_for_peer()`.
|
||||||
|
# - if a peer is connecting no such event will exit.
|
||||||
|
event: trio.Event|None = actor._peer_connected.pop(
|
||||||
|
uid,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
if event:
|
||||||
|
con_status += (
|
||||||
|
' -> Waking subactor spawn waiters: '
|
||||||
|
f'{event.statistics().tasks_waiting}\n'
|
||||||
|
f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
|
||||||
|
# f' {event}\n'
|
||||||
|
# f' |{event.statistics()}\n'
|
||||||
|
)
|
||||||
|
# wake tasks waiting on this IPC-transport "connect-back"
|
||||||
|
event.set()
|
||||||
|
|
||||||
|
else:
|
||||||
|
con_status += (
|
||||||
|
f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
|
||||||
|
) # type: ignore
|
||||||
|
|
||||||
|
chans: list[Channel] = actor._peers[uid]
|
||||||
|
# if chans:
|
||||||
|
# # TODO: re-use channels for new connections instead
|
||||||
|
# # of always new ones?
|
||||||
|
# # => will require changing all the discovery funcs..
|
||||||
|
|
||||||
|
# append new channel
|
||||||
|
# TODO: can we just use list-ref directly?
|
||||||
|
chans.append(chan)
|
||||||
|
|
||||||
|
con_status += ' -> Entering RPC msg loop..\n'
|
||||||
|
log.runtime(con_status)
|
||||||
|
|
||||||
|
# Begin channel management - respond to remote requests and
|
||||||
|
# process received reponses.
|
||||||
|
disconnected: bool = False
|
||||||
|
last_msg: MsgType
|
||||||
|
try:
|
||||||
|
(
|
||||||
|
disconnected,
|
||||||
|
last_msg,
|
||||||
|
) = await _rpc.process_messages(
|
||||||
|
actor,
|
||||||
|
chan,
|
||||||
|
)
|
||||||
|
except trio.Cancelled:
|
||||||
|
log.cancel(
|
||||||
|
'IPC transport msg loop was cancelled\n'
|
||||||
|
f'c)>\n'
|
||||||
|
f' |_{chan}\n'
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
finally:
|
||||||
|
local_nursery: (
|
||||||
|
ActorNursery|None
|
||||||
|
) = actor._actoruid2nursery.get(uid)
|
||||||
|
|
||||||
|
# This is set in ``Portal.cancel_actor()``. So if
|
||||||
|
# the peer was cancelled we try to wait for them
|
||||||
|
# to tear down their side of the connection before
|
||||||
|
# moving on with closing our own side.
|
||||||
|
if (
|
||||||
|
local_nursery
|
||||||
|
and (
|
||||||
|
actor._cancel_called
|
||||||
|
or
|
||||||
|
chan._cancel_called
|
||||||
|
)
|
||||||
|
#
|
||||||
|
# ^-TODO-^ along with this is there another condition
|
||||||
|
# that we should filter with to avoid entering this
|
||||||
|
# waiting block needlessly?
|
||||||
|
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||||
|
# only if the `._children` table is empty or has
|
||||||
|
# only `Portal`s with .chan._cancel_called ==
|
||||||
|
# True` as per what we had below; the MAIN DIFF
|
||||||
|
# BEING that just bc one `Portal.cancel_actor()`
|
||||||
|
# was called, doesn't mean the whole actor-nurse
|
||||||
|
# is gonna exit any time soon right!?
|
||||||
|
#
|
||||||
|
# or
|
||||||
|
# all(chan._cancel_called for chan in chans)
|
||||||
|
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Waiting on cancel request to peer..\n'
|
||||||
|
f'c)=>\n'
|
||||||
|
f' |_{chan.uid}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: this is a soft wait on the channel (and its
|
||||||
|
# underlying transport protocol) to close from the
|
||||||
|
# remote peer side since we presume that any channel
|
||||||
|
# which is mapped to a sub-actor (i.e. it's managed
|
||||||
|
# by local actor-nursery) has a message that is sent
|
||||||
|
# to the peer likely by this actor (which may be in
|
||||||
|
# a shutdown sequence due to cancellation) when the
|
||||||
|
# local runtime here is now cancelled while
|
||||||
|
# (presumably) in the middle of msg loop processing.
|
||||||
|
chan_info: str = (
|
||||||
|
f'{chan.uid}\n'
|
||||||
|
f'|_{chan}\n'
|
||||||
|
f' |_{chan.transport}\n\n'
|
||||||
|
)
|
||||||
|
with trio.move_on_after(0.5) as drain_cs:
|
||||||
|
drain_cs.shield = True
|
||||||
|
|
||||||
|
# attempt to wait for the far end to close the
|
||||||
|
# channel and bail after timeout (a 2-generals
|
||||||
|
# problem on closure).
|
||||||
|
assert chan.transport
|
||||||
|
async for msg in chan.transport.drain():
|
||||||
|
|
||||||
|
# try to deliver any lingering msgs
|
||||||
|
# before we destroy the channel.
|
||||||
|
# This accomplishes deterministic
|
||||||
|
# ``Portal.cancel_actor()`` cancellation by
|
||||||
|
# making sure any RPC response to that call is
|
||||||
|
# delivered the local calling task.
|
||||||
|
# TODO: factor this into a helper?
|
||||||
|
log.warning(
|
||||||
|
'Draining msg from disconnected peer\n'
|
||||||
|
f'{chan_info}'
|
||||||
|
f'{pformat(msg)}\n'
|
||||||
|
)
|
||||||
|
# cid: str|None = msg.get('cid')
|
||||||
|
cid: str|None = msg.cid
|
||||||
|
if cid:
|
||||||
|
# deliver response to local caller/waiter
|
||||||
|
await actor._deliver_ctx_payload(
|
||||||
|
chan,
|
||||||
|
cid,
|
||||||
|
msg,
|
||||||
|
)
|
||||||
|
if drain_cs.cancelled_caught:
|
||||||
|
log.warning(
|
||||||
|
'Timed out waiting on IPC transport channel to drain?\n'
|
||||||
|
f'{chan_info}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX NOTE XXX when no explicit call to
|
||||||
|
# `open_root_actor()` was made by the application
|
||||||
|
# (normally we implicitly make that call inside
|
||||||
|
# the first `.open_nursery()` in root-actor
|
||||||
|
# user/app code), we can assume that either we
|
||||||
|
# are NOT the root actor or are root but the
|
||||||
|
# runtime was started manually. and thus DO have
|
||||||
|
# to wait for the nursery-enterer to exit before
|
||||||
|
# shutting down the local runtime to avoid
|
||||||
|
# clobbering any ongoing subactor
|
||||||
|
# teardown/debugging/graceful-cancel.
|
||||||
|
#
|
||||||
|
# see matching note inside `._supervise.open_nursery()`
|
||||||
|
#
|
||||||
|
# TODO: should we have a separate cs + timeout
|
||||||
|
# block here?
|
||||||
|
if (
|
||||||
|
# XXX SO either,
|
||||||
|
# - not root OR,
|
||||||
|
# - is root but `open_root_actor()` was
|
||||||
|
# entered manually (in which case we do
|
||||||
|
# the equiv wait there using the
|
||||||
|
# `devx._debug` sub-sys APIs).
|
||||||
|
not local_nursery._implicit_runtime_started
|
||||||
|
):
|
||||||
|
log.runtime(
|
||||||
|
'Waiting on local actor nursery to exit..\n'
|
||||||
|
f'|_{local_nursery}\n'
|
||||||
|
)
|
||||||
|
with trio.move_on_after(0.5) as an_exit_cs:
|
||||||
|
an_exit_cs.shield = True
|
||||||
|
await local_nursery.exited.wait()
|
||||||
|
|
||||||
|
# TODO: currently this is always triggering for every
|
||||||
|
# sub-daemon spawned from the `piker.services._mngr`?
|
||||||
|
# -[ ] how do we ensure that the IPC is supposed to
|
||||||
|
# be long lived and isn't just a register?
|
||||||
|
# |_ in the register case how can we signal that the
|
||||||
|
# ephemeral msg loop was intentional?
|
||||||
|
if (
|
||||||
|
# not local_nursery._implicit_runtime_started
|
||||||
|
# and
|
||||||
|
an_exit_cs.cancelled_caught
|
||||||
|
):
|
||||||
|
report: str = (
|
||||||
|
'Timed out waiting on local actor-nursery to exit?\n'
|
||||||
|
f'c)>\n'
|
||||||
|
f' |_{local_nursery}\n'
|
||||||
|
)
|
||||||
|
if children := local_nursery._children:
|
||||||
|
# indent from above local-nurse repr
|
||||||
|
report += (
|
||||||
|
f' |_{pformat(children)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
log.warning(report)
|
||||||
|
|
||||||
|
if disconnected:
|
||||||
|
# if the transport died and this actor is still
|
||||||
|
# registered within a local nursery, we report
|
||||||
|
# that the IPC layer may have failed
|
||||||
|
# unexpectedly since it may be the cause of
|
||||||
|
# other downstream errors.
|
||||||
|
entry: tuple|None = local_nursery._children.get(uid)
|
||||||
|
if entry:
|
||||||
|
proc: trio.Process
|
||||||
|
_, proc, _ = entry
|
||||||
|
|
||||||
|
if (
|
||||||
|
(poll := getattr(proc, 'poll', None))
|
||||||
|
and
|
||||||
|
poll() is None # proc still alive
|
||||||
|
):
|
||||||
|
# TODO: change log level based on
|
||||||
|
# detecting whether chan was created for
|
||||||
|
# ephemeral `.register_actor()` request!
|
||||||
|
# -[ ] also, that should be avoidable by
|
||||||
|
# re-using any existing chan from the
|
||||||
|
# `._discovery.get_registry()` call as
|
||||||
|
# well..
|
||||||
|
log.runtime(
|
||||||
|
f'Peer IPC broke but subproc is alive?\n\n'
|
||||||
|
|
||||||
|
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ``Channel`` teardown and closure sequence
|
||||||
|
# drop ref to channel so it can be gc-ed and disconnected
|
||||||
|
con_teardown_status: str = (
|
||||||
|
f'IPC channel disconnected:\n'
|
||||||
|
f'<=x uid: {chan.uid}\n'
|
||||||
|
f' |_{pformat(chan)}\n\n'
|
||||||
|
)
|
||||||
|
chans.remove(chan)
|
||||||
|
|
||||||
|
# TODO: do we need to be this pedantic?
|
||||||
|
if not chans:
|
||||||
|
con_teardown_status += (
|
||||||
|
f'-> No more channels with {chan.uid}'
|
||||||
|
)
|
||||||
|
actor._peers.pop(uid, None)
|
||||||
|
|
||||||
|
peers_str: str = ''
|
||||||
|
for uid, chans in actor._peers.items():
|
||||||
|
peers_str += (
|
||||||
|
f'uid: {uid}\n'
|
||||||
|
)
|
||||||
|
for i, chan in enumerate(chans):
|
||||||
|
peers_str += (
|
||||||
|
f' |_[{i}] {pformat(chan)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
con_teardown_status += (
|
||||||
|
f'-> Remaining IPC {len(actor._peers)} peers: {peers_str}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# No more channels to other actors (at all) registered
|
||||||
|
# as connected.
|
||||||
|
if not actor._peers:
|
||||||
|
con_teardown_status += (
|
||||||
|
'Signalling no more peer channel connections'
|
||||||
|
)
|
||||||
|
actor._no_more_peers.set()
|
||||||
|
|
||||||
|
# NOTE: block this actor from acquiring the
|
||||||
|
# debugger-TTY-lock since we have no way to know if we
|
||||||
|
# cancelled it and further there is no way to ensure the
|
||||||
|
# lock will be released if acquired due to having no
|
||||||
|
# more active IPC channels.
|
||||||
|
if _state.is_root_process():
|
||||||
|
pdb_lock = _debug.Lock
|
||||||
|
pdb_lock._blocked.add(uid)
|
||||||
|
|
||||||
|
# TODO: NEEEDS TO BE TESTED!
|
||||||
|
# actually, no idea if this ever even enters.. XD
|
||||||
|
#
|
||||||
|
# XXX => YES IT DOES, when i was testing ctl-c
|
||||||
|
# from broken debug TTY locking due to
|
||||||
|
# msg-spec races on application using RunVar...
|
||||||
|
if (
|
||||||
|
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||||
|
and
|
||||||
|
(pdb_user_uid := ctx_in_debug.chan.uid)
|
||||||
|
and
|
||||||
|
local_nursery
|
||||||
|
):
|
||||||
|
entry: tuple|None = local_nursery._children.get(
|
||||||
|
tuple(pdb_user_uid)
|
||||||
|
)
|
||||||
|
if entry:
|
||||||
|
proc: trio.Process
|
||||||
|
_, proc, _ = entry
|
||||||
|
|
||||||
|
if (
|
||||||
|
(poll := getattr(proc, 'poll', None))
|
||||||
|
and poll() is None
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Root actor reports no-more-peers, BUT\n'
|
||||||
|
'a DISCONNECTED child still has the debug '
|
||||||
|
'lock!\n\n'
|
||||||
|
# f'root uid: {actor.uid}\n'
|
||||||
|
f'last disconnected child uid: {uid}\n'
|
||||||
|
f'locking child uid: {pdb_user_uid}\n'
|
||||||
|
)
|
||||||
|
await _debug.maybe_wait_for_debugger(
|
||||||
|
child_in_debug=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: just bc a child's transport dropped
|
||||||
|
# doesn't mean it's not still using the pdb
|
||||||
|
# REPL! so,
|
||||||
|
# -[ ] ideally we can check out child proc
|
||||||
|
# tree to ensure that its alive (and
|
||||||
|
# actually using the REPL) before we cancel
|
||||||
|
# it's lock acquire by doing the below!
|
||||||
|
# -[ ] create a way to read the tree of each actor's
|
||||||
|
# grandchildren such that when an
|
||||||
|
# intermediary parent is cancelled but their
|
||||||
|
# child has locked the tty, the grandparent
|
||||||
|
# will not allow the parent to cancel or
|
||||||
|
# zombie reap the child! see open issue:
|
||||||
|
# - https://github.com/goodboy/tractor/issues/320
|
||||||
|
# ------ - ------
|
||||||
|
# if a now stale local task has the TTY lock still
|
||||||
|
# we cancel it to allow servicing other requests for
|
||||||
|
# the lock.
|
||||||
|
if (
|
||||||
|
(db_cs := pdb_lock.get_locking_task_cs())
|
||||||
|
and not db_cs.cancel_called
|
||||||
|
and uid == pdb_user_uid
|
||||||
|
):
|
||||||
|
log.critical(
|
||||||
|
f'STALE DEBUG LOCK DETECTED FOR {uid}'
|
||||||
|
)
|
||||||
|
# TODO: figure out why this breaks tests..
|
||||||
|
db_cs.cancel()
|
||||||
|
|
||||||
|
log.runtime(con_teardown_status)
|
||||||
|
# finally block closure
|
||||||
|
|
||||||
|
|
||||||
class IPCEndpoint(Struct):
|
class IPCEndpoint(Struct):
|
||||||
'''
|
'''
|
||||||
An instance of an IPC "bound" address where the lifetime of the
|
An instance of an IPC "bound" address where the lifetime of the
|
||||||
|
@ -379,7 +840,10 @@ async def _serve_ipc_eps(
|
||||||
_listeners: list[SocketListener] = await listen_tn.start(
|
_listeners: list[SocketListener] = await listen_tn.start(
|
||||||
partial(
|
partial(
|
||||||
trio.serve_listeners,
|
trio.serve_listeners,
|
||||||
handler=actor._stream_handler,
|
handler=partial(
|
||||||
|
handle_stream_from_peer,
|
||||||
|
actor=actor,
|
||||||
|
),
|
||||||
listeners=listeners,
|
listeners=listeners,
|
||||||
|
|
||||||
# NOTE: configured such that new
|
# NOTE: configured such that new
|
||||||
|
|
Loading…
Reference in New Issue