Compare commits
3 Commits
6445f1cde4
...
fbc9325184
Author | SHA1 | Date |
---|---|---|
|
fbc9325184 | |
|
3cd222959a | |
|
2ea703cc75 |
|
@ -0,0 +1,4 @@
|
|||
'''
|
||||
`tractor.ipc` subsystem(s)/unit testing suites.
|
||||
|
||||
'''
|
|
@ -0,0 +1,72 @@
|
|||
'''
|
||||
High-level `.ipc._server` unit tests.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
from tractor import (
|
||||
devx,
|
||||
ipc,
|
||||
log,
|
||||
)
|
||||
from tractor._testing.addr import (
|
||||
get_rando_addr,
|
||||
)
|
||||
# TODO, use/check-roundtripping with some of these wrapper types?
|
||||
#
|
||||
# from .._addr import Address
|
||||
# from ._chan import Channel
|
||||
# from ._transport import MsgTransport
|
||||
# from ._uds import UDSAddress
|
||||
# from ._tcp import TCPAddress
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'_tpt_proto',
|
||||
['uds', 'tcp']
|
||||
)
|
||||
def test_basic_ipc_server(
|
||||
_tpt_proto: str,
|
||||
debug_mode: bool,
|
||||
loglevel: str,
|
||||
):
|
||||
|
||||
# so we see the socket-listener reporting on console
|
||||
log.get_console_log("INFO")
|
||||
|
||||
rando_addr: tuple = get_rando_addr(
|
||||
tpt_proto=_tpt_proto,
|
||||
)
|
||||
async def main():
|
||||
async with ipc._server.open_ipc_server() as server:
|
||||
|
||||
assert (
|
||||
server._parent_tn
|
||||
and
|
||||
server._parent_tn is server._stream_handler_tn
|
||||
)
|
||||
assert server._no_more_peers.is_set()
|
||||
|
||||
eps: list[ipc.IPCEndpoint] = await server.listen_on(
|
||||
accept_addrs=[rando_addr],
|
||||
stream_handler_nursery=None,
|
||||
)
|
||||
assert (
|
||||
len(eps) == 1
|
||||
and
|
||||
(ep := eps[0])._listener
|
||||
and
|
||||
not ep.peer_tpts
|
||||
)
|
||||
|
||||
server._parent_tn.cancel_scope.cancel()
|
||||
|
||||
# !TODO! actually make a bg-task connection from a client
|
||||
# using `ipc._chan._connect_chan()`
|
||||
|
||||
with devx.maybe_open_crash_handler(
|
||||
pdb=debug_mode,
|
||||
):
|
||||
trio.run(main)
|
|
@ -22,6 +22,7 @@ from __future__ import annotations
|
|||
from functools import partial
|
||||
import multiprocessing as mp
|
||||
import os
|
||||
import textwrap
|
||||
from typing import (
|
||||
Any,
|
||||
TYPE_CHECKING,
|
||||
|
@ -34,10 +35,7 @@ from .log import (
|
|||
get_logger,
|
||||
)
|
||||
from . import _state
|
||||
from .devx import (
|
||||
_debug,
|
||||
pformat,
|
||||
)
|
||||
from .devx import _debug
|
||||
from .to_asyncio import run_as_asyncio_guest
|
||||
from ._addr import UnwrappedAddress
|
||||
from ._runtime import (
|
||||
|
@ -105,6 +103,107 @@ def _mp_main(
|
|||
)
|
||||
|
||||
|
||||
# TODO: move this func to some kinda `.devx._conc_lang.py` eventually
|
||||
# as we work out our multi-domain state-flow-syntax!
|
||||
def nest_from_op(
|
||||
input_op: str,
|
||||
#
|
||||
# ?TODO? an idea for a syntax to the state of concurrent systems
|
||||
# as a "3-domain" (execution, scope, storage) model and using
|
||||
# a minimal ascii/utf-8 operator-set.
|
||||
#
|
||||
# try not to take any of this seriously yet XD
|
||||
#
|
||||
# > is a "play operator" indicating (CPU bound)
|
||||
# exec/work/ops required at the "lowest level computing"
|
||||
#
|
||||
# execution primititves (tasks, threads, actors..) denote their
|
||||
# lifetime with '(' and ')' since parentheses normally are used
|
||||
# in many langs to denote function calls.
|
||||
#
|
||||
# starting = (
|
||||
# >( opening/starting; beginning of the thread-of-exec (toe?)
|
||||
# (> opened/started, (finished spawning toe)
|
||||
# |_<Task: blah blah..> repr of toe, in py these look like <objs>
|
||||
#
|
||||
# >) closing/exiting/stopping,
|
||||
# )> closed/exited/stopped,
|
||||
# |_<Task: blah blah..>
|
||||
# [OR <), )< ?? ]
|
||||
#
|
||||
# ending = )
|
||||
# >c) cancelling to close/exit
|
||||
# c)> cancelled (caused close), OR?
|
||||
# |_<Actor: ..>
|
||||
# OR maybe "<c)" which better indicates the cancel being
|
||||
# "delivered/returned" / returned" to LHS?
|
||||
#
|
||||
# >x) erroring to eventuall exit
|
||||
# x)> errored and terminated
|
||||
# |_<Actor: ...>
|
||||
#
|
||||
# scopes: supers/nurseries, IPC-ctxs, sessions, perms, etc.
|
||||
# >{ opening
|
||||
# {> opened
|
||||
# }> closed
|
||||
# >} closing
|
||||
#
|
||||
# storage: like queues, shm-buffers, files, etc..
|
||||
# >[ opening
|
||||
# [> opened
|
||||
# |_<FileObj: ..>
|
||||
#
|
||||
# >] closing
|
||||
# ]> closed
|
||||
|
||||
# IPC ops: channels, transports, msging
|
||||
# => req msg
|
||||
# <= resp msg
|
||||
# <=> 2-way streaming (of msgs)
|
||||
# <- recv 1 msg
|
||||
# -> send 1 msg
|
||||
#
|
||||
# TODO: still not sure on R/L-HS approach..?
|
||||
# =>( send-req to exec start (task, actor, thread..)
|
||||
# (<= recv-req to ^
|
||||
#
|
||||
# (<= recv-req ^
|
||||
# <=( recv-resp opened remote exec primitive
|
||||
# <=) recv-resp closed
|
||||
#
|
||||
# )<=c req to stop due to cancel
|
||||
# c=>) req to stop due to cancel
|
||||
#
|
||||
# =>{ recv-req to open
|
||||
# <={ send-status that it closed
|
||||
|
||||
tree_str: str,
|
||||
|
||||
# NOTE: so move back-from-the-left of the `input_op` by
|
||||
# this amount.
|
||||
back_from_op: int = 0,
|
||||
) -> str:
|
||||
'''
|
||||
Depth-increment the input (presumably hierarchy/supervision)
|
||||
input "tree string" below the provided `input_op` execution
|
||||
operator, so injecting a `"\n|_{input_op}\n"`and indenting the
|
||||
`tree_str` to nest content aligned with the ops last char.
|
||||
|
||||
'''
|
||||
return (
|
||||
f'{input_op}\n'
|
||||
+
|
||||
textwrap.indent(
|
||||
tree_str,
|
||||
prefix=(
|
||||
len(input_op)
|
||||
-
|
||||
(back_from_op + 1)
|
||||
) * ' ',
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def _trio_main(
|
||||
actor: Actor,
|
||||
*,
|
||||
|
@ -137,7 +236,7 @@ def _trio_main(
|
|||
log.info(
|
||||
'Starting new `trio` subactor:\n'
|
||||
+
|
||||
pformat.nest_from_op(
|
||||
nest_from_op(
|
||||
input_op='>(', # see syntax ideas above
|
||||
tree_str=actor_info,
|
||||
back_from_op=2, # since "complete"
|
||||
|
@ -147,7 +246,7 @@ def _trio_main(
|
|||
exit_status: str = (
|
||||
'Subactor exited\n'
|
||||
+
|
||||
pformat.nest_from_op(
|
||||
nest_from_op(
|
||||
input_op=')>', # like a "closed-to-play"-icon from super perspective
|
||||
tree_str=actor_info,
|
||||
back_from_op=1,
|
||||
|
@ -165,7 +264,7 @@ def _trio_main(
|
|||
exit_status: str = (
|
||||
'Actor received KBI (aka an OS-cancel)\n'
|
||||
+
|
||||
pformat.nest_from_op(
|
||||
nest_from_op(
|
||||
input_op='c)>', # closed due to cancel (see above)
|
||||
tree_str=actor_info,
|
||||
)
|
||||
|
@ -175,7 +274,7 @@ def _trio_main(
|
|||
exit_status: str = (
|
||||
'Main actor task exited due to crash?\n'
|
||||
+
|
||||
pformat.nest_from_op(
|
||||
nest_from_op(
|
||||
input_op='x)>', # closed by error
|
||||
tree_str=actor_info,
|
||||
)
|
||||
|
|
|
@ -582,8 +582,7 @@ async def open_portal(
|
|||
msg_loop_cs = await tn.start(
|
||||
partial(
|
||||
_rpc.process_messages,
|
||||
actor,
|
||||
channel,
|
||||
chan=channel,
|
||||
# if the local task is cancelled we want to keep
|
||||
# the msg loop running until our block ends
|
||||
shield=True,
|
||||
|
|
|
@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
|
|||
|
||||
|
||||
async def process_messages(
|
||||
actor: Actor,
|
||||
chan: Channel,
|
||||
shield: bool = False,
|
||||
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
@ -907,6 +906,7 @@ async def process_messages(
|
|||
(as utilized inside `Portal.cancel_actor()` ).
|
||||
|
||||
'''
|
||||
actor: Actor = _state.current_actor()
|
||||
assert actor._service_n # runtime state sanity
|
||||
|
||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||
|
|
|
@ -1262,6 +1262,10 @@ async def async_main(
|
|||
the actor's "runtime" and all thus all ongoing RPC tasks.
|
||||
|
||||
'''
|
||||
# XXX NOTE, `_state._current_actor` **must** be set prior to
|
||||
# calling this core runtime entrypoint!
|
||||
assert actor is _state.current_actor()
|
||||
|
||||
actor._task: trio.Task = trio.lowlevel.current_task()
|
||||
|
||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||
|
@ -1321,7 +1325,6 @@ async def async_main(
|
|||
) as service_nursery,
|
||||
|
||||
_server.open_ipc_server(
|
||||
actor=actor,
|
||||
parent_tn=service_nursery,
|
||||
stream_handler_tn=service_nursery,
|
||||
) as ipc_server,
|
||||
|
@ -1375,7 +1378,6 @@ async def async_main(
|
|||
'Booting IPC server'
|
||||
)
|
||||
eps: list = await ipc_server.listen_on(
|
||||
actor=actor,
|
||||
accept_addrs=accept_addrs,
|
||||
stream_handler_nursery=service_nursery,
|
||||
)
|
||||
|
@ -1460,8 +1462,7 @@ async def async_main(
|
|||
await root_nursery.start(
|
||||
partial(
|
||||
_rpc.process_messages,
|
||||
actor,
|
||||
actor._parent_chan,
|
||||
chan=actor._parent_chan,
|
||||
shield=True,
|
||||
)
|
||||
)
|
||||
|
|
|
@ -247,104 +247,3 @@ def pformat_cs(
|
|||
+
|
||||
fields
|
||||
)
|
||||
|
||||
|
||||
# TODO: move this func to some kinda `.devx.pformat.py` eventually
|
||||
# as we work out our multi-domain state-flow-syntax!
|
||||
def nest_from_op(
|
||||
input_op: str,
|
||||
#
|
||||
# ?TODO? an idea for a syntax to the state of concurrent systems
|
||||
# as a "3-domain" (execution, scope, storage) model and using
|
||||
# a minimal ascii/utf-8 operator-set.
|
||||
#
|
||||
# try not to take any of this seriously yet XD
|
||||
#
|
||||
# > is a "play operator" indicating (CPU bound)
|
||||
# exec/work/ops required at the "lowest level computing"
|
||||
#
|
||||
# execution primititves (tasks, threads, actors..) denote their
|
||||
# lifetime with '(' and ')' since parentheses normally are used
|
||||
# in many langs to denote function calls.
|
||||
#
|
||||
# starting = (
|
||||
# >( opening/starting; beginning of the thread-of-exec (toe?)
|
||||
# (> opened/started, (finished spawning toe)
|
||||
# |_<Task: blah blah..> repr of toe, in py these look like <objs>
|
||||
#
|
||||
# >) closing/exiting/stopping,
|
||||
# )> closed/exited/stopped,
|
||||
# |_<Task: blah blah..>
|
||||
# [OR <), )< ?? ]
|
||||
#
|
||||
# ending = )
|
||||
# >c) cancelling to close/exit
|
||||
# c)> cancelled (caused close), OR?
|
||||
# |_<Actor: ..>
|
||||
# OR maybe "<c)" which better indicates the cancel being
|
||||
# "delivered/returned" / returned" to LHS?
|
||||
#
|
||||
# >x) erroring to eventuall exit
|
||||
# x)> errored and terminated
|
||||
# |_<Actor: ...>
|
||||
#
|
||||
# scopes: supers/nurseries, IPC-ctxs, sessions, perms, etc.
|
||||
# >{ opening
|
||||
# {> opened
|
||||
# }> closed
|
||||
# >} closing
|
||||
#
|
||||
# storage: like queues, shm-buffers, files, etc..
|
||||
# >[ opening
|
||||
# [> opened
|
||||
# |_<FileObj: ..>
|
||||
#
|
||||
# >] closing
|
||||
# ]> closed
|
||||
|
||||
# IPC ops: channels, transports, msging
|
||||
# => req msg
|
||||
# <= resp msg
|
||||
# <=> 2-way streaming (of msgs)
|
||||
# <- recv 1 msg
|
||||
# -> send 1 msg
|
||||
#
|
||||
# TODO: still not sure on R/L-HS approach..?
|
||||
# =>( send-req to exec start (task, actor, thread..)
|
||||
# (<= recv-req to ^
|
||||
#
|
||||
# (<= recv-req ^
|
||||
# <=( recv-resp opened remote exec primitive
|
||||
# <=) recv-resp closed
|
||||
#
|
||||
# )<=c req to stop due to cancel
|
||||
# c=>) req to stop due to cancel
|
||||
#
|
||||
# =>{ recv-req to open
|
||||
# <={ send-status that it closed
|
||||
|
||||
tree_str: str,
|
||||
|
||||
# NOTE: so move back-from-the-left of the `input_op` by
|
||||
# this amount.
|
||||
back_from_op: int = 0,
|
||||
) -> str:
|
||||
'''
|
||||
Depth-increment the input (presumably hierarchy/supervision)
|
||||
input "tree string" below the provided `input_op` execution
|
||||
operator, so injecting a `"\n|_{input_op}\n"`and indenting the
|
||||
`tree_str` to nest content aligned with the ops last char.
|
||||
|
||||
'''
|
||||
return (
|
||||
f'{input_op}\n'
|
||||
+
|
||||
textwrap.indent(
|
||||
tree_str,
|
||||
prefix=(
|
||||
len(input_op)
|
||||
-
|
||||
(back_from_op + 1)
|
||||
) * ' ',
|
||||
)
|
||||
)
|
||||
|
|
|
@ -72,11 +72,223 @@ if TYPE_CHECKING:
|
|||
log = log.get_logger(__name__)
|
||||
|
||||
|
||||
async def maybe_wait_on_canced_subs(
|
||||
uid: tuple[str, str],
|
||||
chan: Channel,
|
||||
disconnected: bool,
|
||||
|
||||
actor: Actor|None = None,
|
||||
chan_drain_timeout: float = 0.5,
|
||||
an_exit_timeout: float = 0.5,
|
||||
|
||||
) -> ActorNursery|None:
|
||||
'''
|
||||
When a process-local actor-nursery is found for the given actor
|
||||
`uid` (i.e. that peer is **also** a subactor of this parent), we
|
||||
attempt to (with timeouts) wait on,
|
||||
|
||||
- all IPC msgs to drain on the (common) `Channel` such that all
|
||||
local `Context`-parent-tasks can also gracefully collect
|
||||
`ContextCancelled` msgs from their respective remote children
|
||||
vs. a `chan_drain_timeout`.
|
||||
|
||||
- the actor-nursery to cancel-n-join all its supervised children
|
||||
(processes) *gracefully* vs. a `an_exit_timeout` and thus also
|
||||
detect cases where the IPC transport connection broke but
|
||||
a sub-process is detected as still alive (a case that happens
|
||||
when the subactor is still in an active debugger REPL session).
|
||||
|
||||
If the timeout expires in either case we ofc report with warning.
|
||||
|
||||
'''
|
||||
actor = actor or _state.current_actor()
|
||||
|
||||
# XXX running outside actor-runtime usage,
|
||||
# - unit testing
|
||||
# - possibly manual usage (eventually) ?
|
||||
if not actor:
|
||||
return None
|
||||
|
||||
local_nursery: (
|
||||
ActorNursery|None
|
||||
) = actor._actoruid2nursery.get(uid)
|
||||
|
||||
# This is set in `Portal.cancel_actor()`. So if
|
||||
# the peer was cancelled we try to wait for them
|
||||
# to tear down their side of the connection before
|
||||
# moving on with closing our own side.
|
||||
if (
|
||||
local_nursery
|
||||
and (
|
||||
actor._cancel_called
|
||||
or
|
||||
chan._cancel_called
|
||||
)
|
||||
#
|
||||
# ^-TODO-^ along with this is there another condition
|
||||
# that we should filter with to avoid entering this
|
||||
# waiting block needlessly?
|
||||
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||
# only if the `._children` table is empty or has
|
||||
# only `Portal`s with .chan._cancel_called ==
|
||||
# True` as per what we had below; the MAIN DIFF
|
||||
# BEING that just bc one `Portal.cancel_actor()`
|
||||
# was called, doesn't mean the whole actor-nurse
|
||||
# is gonna exit any time soon right!?
|
||||
#
|
||||
# or
|
||||
# all(chan._cancel_called for chan in chans)
|
||||
|
||||
):
|
||||
log.cancel(
|
||||
'Waiting on cancel request to peer..\n'
|
||||
f'c)=>\n'
|
||||
f' |_{chan.uid}\n'
|
||||
)
|
||||
|
||||
# XXX: this is a soft wait on the channel (and its
|
||||
# underlying transport protocol) to close from the
|
||||
# remote peer side since we presume that any channel
|
||||
# which is mapped to a sub-actor (i.e. it's managed
|
||||
# by local actor-nursery) has a message that is sent
|
||||
# to the peer likely by this actor (which may be in
|
||||
# a shutdown sequence due to cancellation) when the
|
||||
# local runtime here is now cancelled while
|
||||
# (presumably) in the middle of msg loop processing.
|
||||
chan_info: str = (
|
||||
f'{chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
f' |_{chan.transport}\n\n'
|
||||
)
|
||||
with trio.move_on_after(chan_drain_timeout) as drain_cs:
|
||||
drain_cs.shield = True
|
||||
|
||||
# attempt to wait for the far end to close the
|
||||
# channel and bail after timeout (a 2-generals
|
||||
# problem on closure).
|
||||
assert chan.transport
|
||||
async for msg in chan.transport.drain():
|
||||
|
||||
# try to deliver any lingering msgs
|
||||
# before we destroy the channel.
|
||||
# This accomplishes deterministic
|
||||
# ``Portal.cancel_actor()`` cancellation by
|
||||
# making sure any RPC response to that call is
|
||||
# delivered the local calling task.
|
||||
# TODO: factor this into a helper?
|
||||
log.warning(
|
||||
'Draining msg from disconnected peer\n'
|
||||
f'{chan_info}'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# cid: str|None = msg.get('cid')
|
||||
cid: str|None = msg.cid
|
||||
if cid:
|
||||
# deliver response to local caller/waiter
|
||||
await actor._deliver_ctx_payload(
|
||||
chan,
|
||||
cid,
|
||||
msg,
|
||||
)
|
||||
if drain_cs.cancelled_caught:
|
||||
log.warning(
|
||||
'Timed out waiting on IPC transport channel to drain?\n'
|
||||
f'{chan_info}'
|
||||
)
|
||||
|
||||
# XXX NOTE XXX when no explicit call to
|
||||
# `open_root_actor()` was made by the application
|
||||
# (normally we implicitly make that call inside
|
||||
# the first `.open_nursery()` in root-actor
|
||||
# user/app code), we can assume that either we
|
||||
# are NOT the root actor or are root but the
|
||||
# runtime was started manually. and thus DO have
|
||||
# to wait for the nursery-enterer to exit before
|
||||
# shutting down the local runtime to avoid
|
||||
# clobbering any ongoing subactor
|
||||
# teardown/debugging/graceful-cancel.
|
||||
#
|
||||
# see matching note inside `._supervise.open_nursery()`
|
||||
#
|
||||
# TODO: should we have a separate cs + timeout
|
||||
# block here?
|
||||
if (
|
||||
# XXX SO either,
|
||||
# - not root OR,
|
||||
# - is root but `open_root_actor()` was
|
||||
# entered manually (in which case we do
|
||||
# the equiv wait there using the
|
||||
# `devx.debug` sub-sys APIs).
|
||||
not local_nursery._implicit_runtime_started
|
||||
):
|
||||
log.runtime(
|
||||
'Waiting on local actor nursery to exit..\n'
|
||||
f'|_{local_nursery}\n'
|
||||
)
|
||||
with trio.move_on_after(an_exit_timeout) as an_exit_cs:
|
||||
an_exit_cs.shield = True
|
||||
await local_nursery.exited.wait()
|
||||
|
||||
# TODO: currently this is always triggering for every
|
||||
# sub-daemon spawned from the `piker.services._mngr`?
|
||||
# -[ ] how do we ensure that the IPC is supposed to
|
||||
# be long lived and isn't just a register?
|
||||
# |_ in the register case how can we signal that the
|
||||
# ephemeral msg loop was intentional?
|
||||
if (
|
||||
# not local_nursery._implicit_runtime_started
|
||||
# and
|
||||
an_exit_cs.cancelled_caught
|
||||
):
|
||||
report: str = (
|
||||
'Timed out waiting on local actor-nursery to exit?\n'
|
||||
f'c)>\n'
|
||||
f' |_{local_nursery}\n'
|
||||
)
|
||||
if children := local_nursery._children:
|
||||
# indent from above local-nurse repr
|
||||
report += (
|
||||
f' |_{pformat(children)}\n'
|
||||
)
|
||||
|
||||
log.warning(report)
|
||||
|
||||
if disconnected:
|
||||
# if the transport died and this actor is still
|
||||
# registered within a local nursery, we report
|
||||
# that the IPC layer may have failed
|
||||
# unexpectedly since it may be the cause of
|
||||
# other downstream errors.
|
||||
entry: tuple|None = local_nursery._children.get(uid)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
if (
|
||||
(poll := getattr(proc, 'poll', None))
|
||||
and
|
||||
poll() is None # proc still alive
|
||||
):
|
||||
# TODO: change log level based on
|
||||
# detecting whether chan was created for
|
||||
# ephemeral `.register_actor()` request!
|
||||
# -[ ] also, that should be avoidable by
|
||||
# re-using any existing chan from the
|
||||
# `._discovery.get_registry()` call as
|
||||
# well..
|
||||
log.runtime(
|
||||
f'Peer IPC broke but subproc is alive?\n\n'
|
||||
|
||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||
f' |_{proc}\n'
|
||||
)
|
||||
|
||||
return local_nursery
|
||||
|
||||
# TODO multi-tpt support with per-proto peer tracking?
|
||||
#
|
||||
# -[x] maybe change to mod-func and rename for implied
|
||||
# multi-transport semantics?
|
||||
#
|
||||
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
|
||||
# so that we can query per tpt all peer contact infos?
|
||||
# |_[ ] possibly provide a global viewing via a
|
||||
|
@ -87,7 +299,6 @@ async def handle_stream_from_peer(
|
|||
|
||||
*,
|
||||
server: IPCServer,
|
||||
actor: Actor,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -119,9 +330,10 @@ async def handle_stream_from_peer(
|
|||
|
||||
# initial handshake with peer phase
|
||||
try:
|
||||
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||
aid=actor.aid,
|
||||
)
|
||||
if actor := _state.current_actor():
|
||||
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||
aid=actor.aid,
|
||||
)
|
||||
except (
|
||||
TransportClosed,
|
||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||
|
@ -222,8 +434,7 @@ async def handle_stream_from_peer(
|
|||
disconnected,
|
||||
last_msg,
|
||||
) = await _rpc.process_messages(
|
||||
actor,
|
||||
chan,
|
||||
chan=chan,
|
||||
)
|
||||
except trio.Cancelled:
|
||||
log.cancel(
|
||||
|
@ -234,179 +445,16 @@ async def handle_stream_from_peer(
|
|||
raise
|
||||
|
||||
finally:
|
||||
local_nursery: (
|
||||
ActorNursery|None
|
||||
) = actor._actoruid2nursery.get(uid)
|
||||
|
||||
# This is set in ``Portal.cancel_actor()``. So if
|
||||
# the peer was cancelled we try to wait for them
|
||||
# to tear down their side of the connection before
|
||||
# moving on with closing our own side.
|
||||
if (
|
||||
local_nursery
|
||||
and (
|
||||
actor._cancel_called
|
||||
or
|
||||
chan._cancel_called
|
||||
)
|
||||
#
|
||||
# ^-TODO-^ along with this is there another condition
|
||||
# that we should filter with to avoid entering this
|
||||
# waiting block needlessly?
|
||||
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||
# only if the `._children` table is empty or has
|
||||
# only `Portal`s with .chan._cancel_called ==
|
||||
# True` as per what we had below; the MAIN DIFF
|
||||
# BEING that just bc one `Portal.cancel_actor()`
|
||||
# was called, doesn't mean the whole actor-nurse
|
||||
# is gonna exit any time soon right!?
|
||||
#
|
||||
# or
|
||||
# all(chan._cancel_called for chan in chans)
|
||||
|
||||
):
|
||||
log.cancel(
|
||||
'Waiting on cancel request to peer..\n'
|
||||
f'c)=>\n'
|
||||
f' |_{chan.uid}\n'
|
||||
)
|
||||
|
||||
# XXX: this is a soft wait on the channel (and its
|
||||
# underlying transport protocol) to close from the
|
||||
# remote peer side since we presume that any channel
|
||||
# which is mapped to a sub-actor (i.e. it's managed
|
||||
# by local actor-nursery) has a message that is sent
|
||||
# to the peer likely by this actor (which may be in
|
||||
# a shutdown sequence due to cancellation) when the
|
||||
# local runtime here is now cancelled while
|
||||
# (presumably) in the middle of msg loop processing.
|
||||
chan_info: str = (
|
||||
f'{chan.uid}\n'
|
||||
f'|_{chan}\n'
|
||||
f' |_{chan.transport}\n\n'
|
||||
)
|
||||
with trio.move_on_after(0.5) as drain_cs:
|
||||
drain_cs.shield = True
|
||||
|
||||
# attempt to wait for the far end to close the
|
||||
# channel and bail after timeout (a 2-generals
|
||||
# problem on closure).
|
||||
assert chan.transport
|
||||
async for msg in chan.transport.drain():
|
||||
|
||||
# try to deliver any lingering msgs
|
||||
# before we destroy the channel.
|
||||
# This accomplishes deterministic
|
||||
# ``Portal.cancel_actor()`` cancellation by
|
||||
# making sure any RPC response to that call is
|
||||
# delivered the local calling task.
|
||||
# TODO: factor this into a helper?
|
||||
log.warning(
|
||||
'Draining msg from disconnected peer\n'
|
||||
f'{chan_info}'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# cid: str|None = msg.get('cid')
|
||||
cid: str|None = msg.cid
|
||||
if cid:
|
||||
# deliver response to local caller/waiter
|
||||
await actor._deliver_ctx_payload(
|
||||
chan,
|
||||
cid,
|
||||
msg,
|
||||
)
|
||||
if drain_cs.cancelled_caught:
|
||||
log.warning(
|
||||
'Timed out waiting on IPC transport channel to drain?\n'
|
||||
f'{chan_info}'
|
||||
)
|
||||
|
||||
# XXX NOTE XXX when no explicit call to
|
||||
# `open_root_actor()` was made by the application
|
||||
# (normally we implicitly make that call inside
|
||||
# the first `.open_nursery()` in root-actor
|
||||
# user/app code), we can assume that either we
|
||||
# are NOT the root actor or are root but the
|
||||
# runtime was started manually. and thus DO have
|
||||
# to wait for the nursery-enterer to exit before
|
||||
# shutting down the local runtime to avoid
|
||||
# clobbering any ongoing subactor
|
||||
# teardown/debugging/graceful-cancel.
|
||||
#
|
||||
# see matching note inside `._supervise.open_nursery()`
|
||||
#
|
||||
# TODO: should we have a separate cs + timeout
|
||||
# block here?
|
||||
if (
|
||||
# XXX SO either,
|
||||
# - not root OR,
|
||||
# - is root but `open_root_actor()` was
|
||||
# entered manually (in which case we do
|
||||
# the equiv wait there using the
|
||||
# `devx._debug` sub-sys APIs).
|
||||
not local_nursery._implicit_runtime_started
|
||||
):
|
||||
log.runtime(
|
||||
'Waiting on local actor nursery to exit..\n'
|
||||
f'|_{local_nursery}\n'
|
||||
)
|
||||
with trio.move_on_after(0.5) as an_exit_cs:
|
||||
an_exit_cs.shield = True
|
||||
await local_nursery.exited.wait()
|
||||
|
||||
# TODO: currently this is always triggering for every
|
||||
# sub-daemon spawned from the `piker.services._mngr`?
|
||||
# -[ ] how do we ensure that the IPC is supposed to
|
||||
# be long lived and isn't just a register?
|
||||
# |_ in the register case how can we signal that the
|
||||
# ephemeral msg loop was intentional?
|
||||
if (
|
||||
# not local_nursery._implicit_runtime_started
|
||||
# and
|
||||
an_exit_cs.cancelled_caught
|
||||
):
|
||||
report: str = (
|
||||
'Timed out waiting on local actor-nursery to exit?\n'
|
||||
f'c)>\n'
|
||||
f' |_{local_nursery}\n'
|
||||
)
|
||||
if children := local_nursery._children:
|
||||
# indent from above local-nurse repr
|
||||
report += (
|
||||
f' |_{pformat(children)}\n'
|
||||
)
|
||||
|
||||
log.warning(report)
|
||||
|
||||
if disconnected:
|
||||
# if the transport died and this actor is still
|
||||
# registered within a local nursery, we report
|
||||
# that the IPC layer may have failed
|
||||
# unexpectedly since it may be the cause of
|
||||
# other downstream errors.
|
||||
entry: tuple|None = local_nursery._children.get(uid)
|
||||
if entry:
|
||||
proc: trio.Process
|
||||
_, proc, _ = entry
|
||||
|
||||
if (
|
||||
(poll := getattr(proc, 'poll', None))
|
||||
and
|
||||
poll() is None # proc still alive
|
||||
):
|
||||
# TODO: change log level based on
|
||||
# detecting whether chan was created for
|
||||
# ephemeral `.register_actor()` request!
|
||||
# -[ ] also, that should be avoidable by
|
||||
# re-using any existing chan from the
|
||||
# `._discovery.get_registry()` call as
|
||||
# well..
|
||||
log.runtime(
|
||||
f'Peer IPC broke but subproc is alive?\n\n'
|
||||
|
||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||
f' |_{proc}\n'
|
||||
)
|
||||
# check if there are subs which we should gracefully join at
|
||||
# both the inter-actor-task and subprocess levels to
|
||||
# gracefully remote cancel and later disconnect (particularly
|
||||
# for permitting subs engaged in active debug-REPL sessions).
|
||||
local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
|
||||
uid=uid,
|
||||
chan=chan,
|
||||
disconnected=disconnected,
|
||||
)
|
||||
|
||||
# ``Channel`` teardown and closure sequence
|
||||
# drop ref to channel so it can be gc-ed and disconnected
|
||||
|
@ -467,11 +515,11 @@ async def handle_stream_from_peer(
|
|||
# from broken debug TTY locking due to
|
||||
# msg-spec races on application using RunVar...
|
||||
if (
|
||||
local_nursery
|
||||
and
|
||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||
and
|
||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
||||
and
|
||||
local_nursery
|
||||
):
|
||||
entry: tuple|None = local_nursery._children.get(
|
||||
tuple(pdb_user_uid)
|
||||
|
@ -804,7 +852,6 @@ class IPCServer(Struct):
|
|||
async def listen_on(
|
||||
self,
|
||||
*,
|
||||
actor: Actor,
|
||||
accept_addrs: list[tuple[str, int|str]]|None = None,
|
||||
stream_handler_nursery: Nursery|None = None,
|
||||
) -> list[IPCEndpoint]:
|
||||
|
@ -837,20 +884,19 @@ class IPCServer(Struct):
|
|||
f'{self}\n'
|
||||
)
|
||||
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Binding to endpoints for,\n'
|
||||
f'{accept_addrs}\n'
|
||||
)
|
||||
eps: list[IPCEndpoint] = await self._parent_tn.start(
|
||||
partial(
|
||||
_serve_ipc_eps,
|
||||
actor=actor,
|
||||
server=self,
|
||||
stream_handler_tn=stream_handler_nursery,
|
||||
listen_addrs=accept_addrs,
|
||||
)
|
||||
)
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Started IPC endpoints\n'
|
||||
f'{eps}\n'
|
||||
)
|
||||
|
@ -873,7 +919,6 @@ class IPCServer(Struct):
|
|||
|
||||
async def _serve_ipc_eps(
|
||||
*,
|
||||
actor: Actor,
|
||||
server: IPCServer,
|
||||
stream_handler_tn: Nursery,
|
||||
listen_addrs: list[tuple[str, int|str]],
|
||||
|
@ -907,12 +952,13 @@ async def _serve_ipc_eps(
|
|||
stream_handler_tn=stream_handler_tn,
|
||||
)
|
||||
try:
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Starting new endpoint listener\n'
|
||||
f'{ep}\n'
|
||||
)
|
||||
listener: trio.abc.Listener = await ep.start_listener()
|
||||
assert listener is ep._listener
|
||||
# actor = _state.current_actor()
|
||||
# if actor.is_registry:
|
||||
# import pdbp; pdbp.set_trace()
|
||||
|
||||
|
@ -937,7 +983,6 @@ async def _serve_ipc_eps(
|
|||
handler=partial(
|
||||
handle_stream_from_peer,
|
||||
server=server,
|
||||
actor=actor,
|
||||
),
|
||||
listeners=listeners,
|
||||
|
||||
|
@ -948,13 +993,13 @@ async def _serve_ipc_eps(
|
|||
)
|
||||
)
|
||||
# TODO, wow make this message better! XD
|
||||
log.info(
|
||||
log.runtime(
|
||||
'Started server(s)\n'
|
||||
+
|
||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
||||
)
|
||||
|
||||
log.info(
|
||||
log.runtime(
|
||||
f'Started IPC endpoints\n'
|
||||
f'{eps}\n'
|
||||
)
|
||||
|
@ -970,6 +1015,7 @@ async def _serve_ipc_eps(
|
|||
ep.close_listener()
|
||||
server._endpoints.remove(ep)
|
||||
|
||||
# actor = _state.current_actor()
|
||||
# if actor.is_arbiter:
|
||||
# import pdbp; pdbp.set_trace()
|
||||
|
||||
|
@ -980,7 +1026,6 @@ async def _serve_ipc_eps(
|
|||
|
||||
@acm
|
||||
async def open_ipc_server(
|
||||
actor: Actor,
|
||||
parent_tn: Nursery|None = None,
|
||||
stream_handler_tn: Nursery|None = None,
|
||||
|
||||
|
|
|
@ -127,6 +127,11 @@ async def start_listener(
|
|||
Start a TCP socket listener on the given `TCPAddress`.
|
||||
|
||||
'''
|
||||
log.info(
|
||||
f'Attempting to bind TCP socket\n'
|
||||
f'>[\n'
|
||||
f'|_{addr}\n'
|
||||
)
|
||||
# ?TODO, maybe we should just change the lower-level call this is
|
||||
# using internall per-listener?
|
||||
listeners: list[SocketListener] = await open_tcp_listeners(
|
||||
|
@ -140,6 +145,12 @@ async def start_listener(
|
|||
assert len(listeners) == 1
|
||||
listener = listeners[0]
|
||||
host, port = listener.socket.getsockname()[:2]
|
||||
|
||||
log.info(
|
||||
f'Listening on TCP socket\n'
|
||||
f'[>\n'
|
||||
f' |_{addr}\n'
|
||||
)
|
||||
return listener
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue