Compare commits
9 Commits
2a37f6abdd
...
99c5d05aa6
Author | SHA1 | Date |
---|---|---|
|
99c5d05aa6 | |
|
b3635aaf6d | |
|
304117b4e1 | |
|
37377e8220 | |
|
6fd08c6e32 | |
|
87049b991f | |
|
3d6a194669 | |
|
082d373947 | |
|
a7bbcdbeb8 |
|
@ -19,8 +19,11 @@ async def main() -> None:
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
loglevel='devx',
|
loglevel='devx',
|
||||||
) as an:
|
maybe_enable_greenback=True,
|
||||||
assert an
|
# ^XXX REQUIRED to enable `breakpoint()` support (from sync
|
||||||
|
# fns) and thus required here to avoid an assertion err
|
||||||
|
# on the next line
|
||||||
|
):
|
||||||
assert (
|
assert (
|
||||||
(pybp_var := os.environ['PYTHONBREAKPOINT'])
|
(pybp_var := os.environ['PYTHONBREAKPOINT'])
|
||||||
==
|
==
|
||||||
|
|
|
@ -103,7 +103,7 @@ def sig_prog(
|
||||||
def daemon(
|
def daemon(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
testdir,
|
testdir: pytest.Pytester,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
tpt_proto: str,
|
tpt_proto: str,
|
||||||
|
|
||||||
|
|
|
@ -2,9 +2,11 @@
|
||||||
`tractor.devx.*` tooling sub-pkg test space.
|
`tractor.devx.*` tooling sub-pkg test space.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
import time
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -26,14 +28,22 @@ from ..conftest import (
|
||||||
_ci_env,
|
_ci_env,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from pexpect import pty_spawn
|
||||||
|
|
||||||
|
|
||||||
|
# a fn that sub-instantiates a `pexpect.spawn()`
|
||||||
|
# and returns it.
|
||||||
|
type PexpectSpawner = Callable[[str], pty_spawn.spawn]
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def spawn(
|
def spawn(
|
||||||
start_method,
|
start_method: str,
|
||||||
testdir: pytest.Pytester,
|
testdir: pytest.Pytester,
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
|
|
||||||
) -> Callable[[str], None]:
|
) -> PexpectSpawner:
|
||||||
'''
|
'''
|
||||||
Use the `pexpect` module shipped via `testdir.spawn()` to
|
Use the `pexpect` module shipped via `testdir.spawn()` to
|
||||||
run an `./examples/..` script by name.
|
run an `./examples/..` script by name.
|
||||||
|
@ -59,7 +69,7 @@ def spawn(
|
||||||
def _spawn(
|
def _spawn(
|
||||||
cmd: str,
|
cmd: str,
|
||||||
**mkcmd_kwargs,
|
**mkcmd_kwargs,
|
||||||
):
|
) -> pty_spawn.spawn:
|
||||||
unset_colors()
|
unset_colors()
|
||||||
return testdir.spawn(
|
return testdir.spawn(
|
||||||
cmd=mk_cmd(
|
cmd=mk_cmd(
|
||||||
|
@ -73,7 +83,7 @@ def spawn(
|
||||||
)
|
)
|
||||||
|
|
||||||
# such that test-dep can pass input script name.
|
# such that test-dep can pass input script name.
|
||||||
return _spawn
|
return _spawn # the `PexpectSpawner`, type alias.
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(
|
@pytest.fixture(
|
||||||
|
|
|
@ -13,9 +13,13 @@ TODO:
|
||||||
when debugging a problem inside the stack vs. in their app.
|
when debugging a problem inside the stack vs. in their app.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
from typing import (
|
||||||
|
TYPE_CHECKING,
|
||||||
|
)
|
||||||
|
|
||||||
from .conftest import (
|
from .conftest import (
|
||||||
expect,
|
expect,
|
||||||
|
@ -29,9 +33,12 @@ from pexpect.exceptions import (
|
||||||
EOF,
|
EOF,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ..conftest import PexpectSpawner
|
||||||
|
|
||||||
|
|
||||||
def test_shield_pause(
|
def test_shield_pause(
|
||||||
spawn,
|
spawn: PexpectSpawner,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
Verify the `tractor.pause()/.post_mortem()` API works inside an
|
||||||
|
@ -126,7 +133,7 @@ def test_shield_pause(
|
||||||
|
|
||||||
|
|
||||||
def test_breakpoint_hook_restored(
|
def test_breakpoint_hook_restored(
|
||||||
spawn,
|
spawn: PexpectSpawner,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Ensures our actor runtime sets a custom `breakpoint()` hook
|
Ensures our actor runtime sets a custom `breakpoint()` hook
|
||||||
|
@ -140,16 +147,22 @@ def test_breakpoint_hook_restored(
|
||||||
child = spawn('restore_builtin_breakpoint')
|
child = spawn('restore_builtin_breakpoint')
|
||||||
|
|
||||||
child.expect(PROMPT)
|
child.expect(PROMPT)
|
||||||
assert_before(
|
try:
|
||||||
child,
|
assert_before(
|
||||||
[
|
child,
|
||||||
_pause_msg,
|
[
|
||||||
"<Task '__main__.main'",
|
_pause_msg,
|
||||||
"('root'",
|
"<Task '__main__.main'",
|
||||||
"first bp, tractor hook set",
|
"('root'",
|
||||||
]
|
"first bp, tractor hook set",
|
||||||
)
|
]
|
||||||
child.sendline('c')
|
)
|
||||||
|
# XXX if the above raises `AssertionError`, without sending
|
||||||
|
# the final 'continue' cmd to the REPL-active sub-process,
|
||||||
|
# we'll hang waiting for that pexpect instance to terminate..
|
||||||
|
finally:
|
||||||
|
child.sendline('c')
|
||||||
|
|
||||||
child.expect(PROMPT)
|
child.expect(PROMPT)
|
||||||
assert_before(
|
assert_before(
|
||||||
child,
|
child,
|
||||||
|
|
|
@ -0,0 +1,4 @@
|
||||||
|
'''
|
||||||
|
`tractor.ipc` subsystem(s)/unit testing suites.
|
||||||
|
|
||||||
|
'''
|
|
@ -0,0 +1,72 @@
|
||||||
|
'''
|
||||||
|
High-level `.ipc._server` unit tests.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
from tractor import (
|
||||||
|
devx,
|
||||||
|
ipc,
|
||||||
|
log,
|
||||||
|
)
|
||||||
|
from tractor._testing.addr import (
|
||||||
|
get_rando_addr,
|
||||||
|
)
|
||||||
|
# TODO, use/check-roundtripping with some of these wrapper types?
|
||||||
|
#
|
||||||
|
# from .._addr import Address
|
||||||
|
# from ._chan import Channel
|
||||||
|
# from ._transport import MsgTransport
|
||||||
|
# from ._uds import UDSAddress
|
||||||
|
# from ._tcp import TCPAddress
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'_tpt_proto',
|
||||||
|
['uds', 'tcp']
|
||||||
|
)
|
||||||
|
def test_basic_ipc_server(
|
||||||
|
_tpt_proto: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
|
|
||||||
|
# so we see the socket-listener reporting on console
|
||||||
|
log.get_console_log("INFO")
|
||||||
|
|
||||||
|
rando_addr: tuple = get_rando_addr(
|
||||||
|
tpt_proto=_tpt_proto,
|
||||||
|
)
|
||||||
|
async def main():
|
||||||
|
async with ipc._server.open_ipc_server() as server:
|
||||||
|
|
||||||
|
assert (
|
||||||
|
server._parent_tn
|
||||||
|
and
|
||||||
|
server._parent_tn is server._stream_handler_tn
|
||||||
|
)
|
||||||
|
assert server._no_more_peers.is_set()
|
||||||
|
|
||||||
|
eps: list[ipc.IPCEndpoint] = await server.listen_on(
|
||||||
|
accept_addrs=[rando_addr],
|
||||||
|
stream_handler_nursery=None,
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
len(eps) == 1
|
||||||
|
and
|
||||||
|
(ep := eps[0])._listener
|
||||||
|
and
|
||||||
|
not ep.peer_tpts
|
||||||
|
)
|
||||||
|
|
||||||
|
server._parent_tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
# !TODO! actually make a bg-task connection from a client
|
||||||
|
# using `ipc._chan._connect_chan()`
|
||||||
|
|
||||||
|
with devx.maybe_open_crash_handler(
|
||||||
|
pdb=debug_mode,
|
||||||
|
):
|
||||||
|
trio.run(main)
|
|
@ -582,8 +582,7 @@ async def open_portal(
|
||||||
msg_loop_cs = await tn.start(
|
msg_loop_cs = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
_rpc.process_messages,
|
_rpc.process_messages,
|
||||||
actor,
|
chan=channel,
|
||||||
channel,
|
|
||||||
# if the local task is cancelled we want to keep
|
# if the local task is cancelled we want to keep
|
||||||
# the msg loop running until our block ends
|
# the msg loop running until our block ends
|
||||||
shield=True,
|
shield=True,
|
||||||
|
|
|
@ -166,7 +166,9 @@ async def open_root_actor(
|
||||||
|
|
||||||
# enables the multi-process debugger support
|
# enables the multi-process debugger support
|
||||||
debug_mode: bool = False,
|
debug_mode: bool = False,
|
||||||
maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support
|
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
|
||||||
|
# ^XXX NOTE^ the perf implications of use,
|
||||||
|
# https://greenback.readthedocs.io/en/latest/principle.html#performance
|
||||||
enable_stack_on_sig: bool = False,
|
enable_stack_on_sig: bool = False,
|
||||||
|
|
||||||
# internal logging
|
# internal logging
|
||||||
|
|
|
@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
|
||||||
|
|
||||||
|
|
||||||
async def process_messages(
|
async def process_messages(
|
||||||
actor: Actor,
|
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
shield: bool = False,
|
shield: bool = False,
|
||||||
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
@ -907,6 +906,7 @@ async def process_messages(
|
||||||
(as utilized inside `Portal.cancel_actor()` ).
|
(as utilized inside `Portal.cancel_actor()` ).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
actor: Actor = _state.current_actor()
|
||||||
assert actor._service_n # runtime state sanity
|
assert actor._service_n # runtime state sanity
|
||||||
|
|
||||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||||
|
|
|
@ -1302,6 +1302,10 @@ async def async_main(
|
||||||
the actor's "runtime" and all thus all ongoing RPC tasks.
|
the actor's "runtime" and all thus all ongoing RPC tasks.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# XXX NOTE, `_state._current_actor` **must** be set prior to
|
||||||
|
# calling this core runtime entrypoint!
|
||||||
|
assert actor is _state.current_actor()
|
||||||
|
|
||||||
actor._task: trio.Task = trio.lowlevel.current_task()
|
actor._task: trio.Task = trio.lowlevel.current_task()
|
||||||
|
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
|
@ -1361,7 +1365,6 @@ async def async_main(
|
||||||
) as service_nursery,
|
) as service_nursery,
|
||||||
|
|
||||||
_server.open_ipc_server(
|
_server.open_ipc_server(
|
||||||
actor=actor,
|
|
||||||
parent_tn=service_nursery,
|
parent_tn=service_nursery,
|
||||||
stream_handler_tn=service_nursery,
|
stream_handler_tn=service_nursery,
|
||||||
) as ipc_server,
|
) as ipc_server,
|
||||||
|
@ -1415,7 +1418,6 @@ async def async_main(
|
||||||
'Booting IPC server'
|
'Booting IPC server'
|
||||||
)
|
)
|
||||||
eps: list = await ipc_server.listen_on(
|
eps: list = await ipc_server.listen_on(
|
||||||
actor=actor,
|
|
||||||
accept_addrs=accept_addrs,
|
accept_addrs=accept_addrs,
|
||||||
stream_handler_nursery=service_nursery,
|
stream_handler_nursery=service_nursery,
|
||||||
)
|
)
|
||||||
|
@ -1500,8 +1502,7 @@ async def async_main(
|
||||||
await root_nursery.start(
|
await root_nursery.start(
|
||||||
partial(
|
partial(
|
||||||
_rpc.process_messages,
|
_rpc.process_messages,
|
||||||
actor,
|
chan=actor._parent_chan,
|
||||||
actor._parent_chan,
|
|
||||||
shield=True,
|
shield=True,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
|
@ -105,7 +105,7 @@ class BoxedMaybeException(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if not self.value:
|
if not self.value:
|
||||||
return f'<{type(self).__name__}( .value=None )>\n'
|
return f'<{type(self).__name__}( .value=None )>'
|
||||||
|
|
||||||
return (
|
return (
|
||||||
f'<{type(self.value).__name__}(\n'
|
f'<{type(self.value).__name__}(\n'
|
||||||
|
@ -256,7 +256,6 @@ async def _maybe_enter_pm(
|
||||||
bool,
|
bool,
|
||||||
] = lambda err: not is_multi_cancelled(err),
|
] = lambda err: not is_multi_cancelled(err),
|
||||||
**_pause_kws,
|
**_pause_kws,
|
||||||
|
|
||||||
):
|
):
|
||||||
if (
|
if (
|
||||||
debug_mode()
|
debug_mode()
|
||||||
|
|
|
@ -72,11 +72,223 @@ if TYPE_CHECKING:
|
||||||
log = log.get_logger(__name__)
|
log = log.get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
async def maybe_wait_on_canced_subs(
|
||||||
|
uid: tuple[str, str],
|
||||||
|
chan: Channel,
|
||||||
|
disconnected: bool,
|
||||||
|
|
||||||
|
actor: Actor|None = None,
|
||||||
|
chan_drain_timeout: float = 0.5,
|
||||||
|
an_exit_timeout: float = 0.5,
|
||||||
|
|
||||||
|
) -> ActorNursery|None:
|
||||||
|
'''
|
||||||
|
When a process-local actor-nursery is found for the given actor
|
||||||
|
`uid` (i.e. that peer is **also** a subactor of this parent), we
|
||||||
|
attempt to (with timeouts) wait on,
|
||||||
|
|
||||||
|
- all IPC msgs to drain on the (common) `Channel` such that all
|
||||||
|
local `Context`-parent-tasks can also gracefully collect
|
||||||
|
`ContextCancelled` msgs from their respective remote children
|
||||||
|
vs. a `chan_drain_timeout`.
|
||||||
|
|
||||||
|
- the actor-nursery to cancel-n-join all its supervised children
|
||||||
|
(processes) *gracefully* vs. a `an_exit_timeout` and thus also
|
||||||
|
detect cases where the IPC transport connection broke but
|
||||||
|
a sub-process is detected as still alive (a case that happens
|
||||||
|
when the subactor is still in an active debugger REPL session).
|
||||||
|
|
||||||
|
If the timeout expires in either case we ofc report with warning.
|
||||||
|
|
||||||
|
'''
|
||||||
|
actor = actor or _state.current_actor()
|
||||||
|
|
||||||
|
# XXX running outside actor-runtime usage,
|
||||||
|
# - unit testing
|
||||||
|
# - possibly manual usage (eventually) ?
|
||||||
|
if not actor:
|
||||||
|
return None
|
||||||
|
|
||||||
|
local_nursery: (
|
||||||
|
ActorNursery|None
|
||||||
|
) = actor._actoruid2nursery.get(uid)
|
||||||
|
|
||||||
|
# This is set in `Portal.cancel_actor()`. So if
|
||||||
|
# the peer was cancelled we try to wait for them
|
||||||
|
# to tear down their side of the connection before
|
||||||
|
# moving on with closing our own side.
|
||||||
|
if (
|
||||||
|
local_nursery
|
||||||
|
and (
|
||||||
|
actor._cancel_called
|
||||||
|
or
|
||||||
|
chan._cancel_called
|
||||||
|
)
|
||||||
|
#
|
||||||
|
# ^-TODO-^ along with this is there another condition
|
||||||
|
# that we should filter with to avoid entering this
|
||||||
|
# waiting block needlessly?
|
||||||
|
# -[ ] maybe `and local_nursery.cancelled` and/or
|
||||||
|
# only if the `._children` table is empty or has
|
||||||
|
# only `Portal`s with .chan._cancel_called ==
|
||||||
|
# True` as per what we had below; the MAIN DIFF
|
||||||
|
# BEING that just bc one `Portal.cancel_actor()`
|
||||||
|
# was called, doesn't mean the whole actor-nurse
|
||||||
|
# is gonna exit any time soon right!?
|
||||||
|
#
|
||||||
|
# or
|
||||||
|
# all(chan._cancel_called for chan in chans)
|
||||||
|
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Waiting on cancel request to peer..\n'
|
||||||
|
f'c)=>\n'
|
||||||
|
f' |_{chan.uid}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: this is a soft wait on the channel (and its
|
||||||
|
# underlying transport protocol) to close from the
|
||||||
|
# remote peer side since we presume that any channel
|
||||||
|
# which is mapped to a sub-actor (i.e. it's managed
|
||||||
|
# by local actor-nursery) has a message that is sent
|
||||||
|
# to the peer likely by this actor (which may be in
|
||||||
|
# a shutdown sequence due to cancellation) when the
|
||||||
|
# local runtime here is now cancelled while
|
||||||
|
# (presumably) in the middle of msg loop processing.
|
||||||
|
chan_info: str = (
|
||||||
|
f'{chan.uid}\n'
|
||||||
|
f'|_{chan}\n'
|
||||||
|
f' |_{chan.transport}\n\n'
|
||||||
|
)
|
||||||
|
with trio.move_on_after(chan_drain_timeout) as drain_cs:
|
||||||
|
drain_cs.shield = True
|
||||||
|
|
||||||
|
# attempt to wait for the far end to close the
|
||||||
|
# channel and bail after timeout (a 2-generals
|
||||||
|
# problem on closure).
|
||||||
|
assert chan.transport
|
||||||
|
async for msg in chan.transport.drain():
|
||||||
|
|
||||||
|
# try to deliver any lingering msgs
|
||||||
|
# before we destroy the channel.
|
||||||
|
# This accomplishes deterministic
|
||||||
|
# ``Portal.cancel_actor()`` cancellation by
|
||||||
|
# making sure any RPC response to that call is
|
||||||
|
# delivered the local calling task.
|
||||||
|
# TODO: factor this into a helper?
|
||||||
|
log.warning(
|
||||||
|
'Draining msg from disconnected peer\n'
|
||||||
|
f'{chan_info}'
|
||||||
|
f'{pformat(msg)}\n'
|
||||||
|
)
|
||||||
|
# cid: str|None = msg.get('cid')
|
||||||
|
cid: str|None = msg.cid
|
||||||
|
if cid:
|
||||||
|
# deliver response to local caller/waiter
|
||||||
|
await actor._deliver_ctx_payload(
|
||||||
|
chan,
|
||||||
|
cid,
|
||||||
|
msg,
|
||||||
|
)
|
||||||
|
if drain_cs.cancelled_caught:
|
||||||
|
log.warning(
|
||||||
|
'Timed out waiting on IPC transport channel to drain?\n'
|
||||||
|
f'{chan_info}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX NOTE XXX when no explicit call to
|
||||||
|
# `open_root_actor()` was made by the application
|
||||||
|
# (normally we implicitly make that call inside
|
||||||
|
# the first `.open_nursery()` in root-actor
|
||||||
|
# user/app code), we can assume that either we
|
||||||
|
# are NOT the root actor or are root but the
|
||||||
|
# runtime was started manually. and thus DO have
|
||||||
|
# to wait for the nursery-enterer to exit before
|
||||||
|
# shutting down the local runtime to avoid
|
||||||
|
# clobbering any ongoing subactor
|
||||||
|
# teardown/debugging/graceful-cancel.
|
||||||
|
#
|
||||||
|
# see matching note inside `._supervise.open_nursery()`
|
||||||
|
#
|
||||||
|
# TODO: should we have a separate cs + timeout
|
||||||
|
# block here?
|
||||||
|
if (
|
||||||
|
# XXX SO either,
|
||||||
|
# - not root OR,
|
||||||
|
# - is root but `open_root_actor()` was
|
||||||
|
# entered manually (in which case we do
|
||||||
|
# the equiv wait there using the
|
||||||
|
# `devx.debug` sub-sys APIs).
|
||||||
|
not local_nursery._implicit_runtime_started
|
||||||
|
):
|
||||||
|
log.runtime(
|
||||||
|
'Waiting on local actor nursery to exit..\n'
|
||||||
|
f'|_{local_nursery}\n'
|
||||||
|
)
|
||||||
|
with trio.move_on_after(an_exit_timeout) as an_exit_cs:
|
||||||
|
an_exit_cs.shield = True
|
||||||
|
await local_nursery.exited.wait()
|
||||||
|
|
||||||
|
# TODO: currently this is always triggering for every
|
||||||
|
# sub-daemon spawned from the `piker.services._mngr`?
|
||||||
|
# -[ ] how do we ensure that the IPC is supposed to
|
||||||
|
# be long lived and isn't just a register?
|
||||||
|
# |_ in the register case how can we signal that the
|
||||||
|
# ephemeral msg loop was intentional?
|
||||||
|
if (
|
||||||
|
# not local_nursery._implicit_runtime_started
|
||||||
|
# and
|
||||||
|
an_exit_cs.cancelled_caught
|
||||||
|
):
|
||||||
|
report: str = (
|
||||||
|
'Timed out waiting on local actor-nursery to exit?\n'
|
||||||
|
f'c)>\n'
|
||||||
|
f' |_{local_nursery}\n'
|
||||||
|
)
|
||||||
|
if children := local_nursery._children:
|
||||||
|
# indent from above local-nurse repr
|
||||||
|
report += (
|
||||||
|
f' |_{pformat(children)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
log.warning(report)
|
||||||
|
|
||||||
|
if disconnected:
|
||||||
|
# if the transport died and this actor is still
|
||||||
|
# registered within a local nursery, we report
|
||||||
|
# that the IPC layer may have failed
|
||||||
|
# unexpectedly since it may be the cause of
|
||||||
|
# other downstream errors.
|
||||||
|
entry: tuple|None = local_nursery._children.get(uid)
|
||||||
|
if entry:
|
||||||
|
proc: trio.Process
|
||||||
|
_, proc, _ = entry
|
||||||
|
|
||||||
|
if (
|
||||||
|
(poll := getattr(proc, 'poll', None))
|
||||||
|
and
|
||||||
|
poll() is None # proc still alive
|
||||||
|
):
|
||||||
|
# TODO: change log level based on
|
||||||
|
# detecting whether chan was created for
|
||||||
|
# ephemeral `.register_actor()` request!
|
||||||
|
# -[ ] also, that should be avoidable by
|
||||||
|
# re-using any existing chan from the
|
||||||
|
# `._discovery.get_registry()` call as
|
||||||
|
# well..
|
||||||
|
log.runtime(
|
||||||
|
f'Peer IPC broke but subproc is alive?\n\n'
|
||||||
|
|
||||||
|
f'<=x {chan.uid}@{chan.raddr}\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
return local_nursery
|
||||||
|
|
||||||
# TODO multi-tpt support with per-proto peer tracking?
|
# TODO multi-tpt support with per-proto peer tracking?
|
||||||
#
|
#
|
||||||
# -[x] maybe change to mod-func and rename for implied
|
# -[x] maybe change to mod-func and rename for implied
|
||||||
# multi-transport semantics?
|
# multi-transport semantics?
|
||||||
#
|
|
||||||
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
|
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
|
||||||
# so that we can query per tpt all peer contact infos?
|
# so that we can query per tpt all peer contact infos?
|
||||||
# |_[ ] possibly provide a global viewing via a
|
# |_[ ] possibly provide a global viewing via a
|
||||||
|
@ -87,7 +299,6 @@ async def handle_stream_from_peer(
|
||||||
|
|
||||||
*,
|
*,
|
||||||
server: IPCServer,
|
server: IPCServer,
|
||||||
actor: Actor,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -119,9 +330,10 @@ async def handle_stream_from_peer(
|
||||||
|
|
||||||
# initial handshake with peer phase
|
# initial handshake with peer phase
|
||||||
try:
|
try:
|
||||||
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
if actor := _state.current_actor():
|
||||||
aid=actor.aid,
|
peer_aid: msgtypes.Aid = await chan._do_handshake(
|
||||||
)
|
aid=actor.aid,
|
||||||
|
)
|
||||||
except (
|
except (
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
# ^XXX NOTE, the above wraps `trio` exc types raised
|
# ^XXX NOTE, the above wraps `trio` exc types raised
|
||||||
|
@ -222,8 +434,7 @@ async def handle_stream_from_peer(
|
||||||
disconnected,
|
disconnected,
|
||||||
last_msg,
|
last_msg,
|
||||||
) = await _rpc.process_messages(
|
) = await _rpc.process_messages(
|
||||||
actor,
|
chan=chan,
|
||||||
chan,
|
|
||||||
)
|
)
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
|
@ -234,179 +445,16 @@ async def handle_stream_from_peer(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
local_nursery: (
|
|
||||||
ActorNursery|None
|
|
||||||
) = actor._actoruid2nursery.get(uid)
|
|
||||||
|
|
||||||
# This is set in ``Portal.cancel_actor()``. So if
|
# check if there are subs which we should gracefully join at
|
||||||
# the peer was cancelled we try to wait for them
|
# both the inter-actor-task and subprocess levels to
|
||||||
# to tear down their side of the connection before
|
# gracefully remote cancel and later disconnect (particularly
|
||||||
# moving on with closing our own side.
|
# for permitting subs engaged in active debug-REPL sessions).
|
||||||
if (
|
local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
|
||||||
local_nursery
|
uid=uid,
|
||||||
and (
|
chan=chan,
|
||||||
actor._cancel_called
|
disconnected=disconnected,
|
||||||
or
|
)
|
||||||
chan._cancel_called
|
|
||||||
)
|
|
||||||
#
|
|
||||||
# ^-TODO-^ along with this is there another condition
|
|
||||||
# that we should filter with to avoid entering this
|
|
||||||
# waiting block needlessly?
|
|
||||||
# -[ ] maybe `and local_nursery.cancelled` and/or
|
|
||||||
# only if the `._children` table is empty or has
|
|
||||||
# only `Portal`s with .chan._cancel_called ==
|
|
||||||
# True` as per what we had below; the MAIN DIFF
|
|
||||||
# BEING that just bc one `Portal.cancel_actor()`
|
|
||||||
# was called, doesn't mean the whole actor-nurse
|
|
||||||
# is gonna exit any time soon right!?
|
|
||||||
#
|
|
||||||
# or
|
|
||||||
# all(chan._cancel_called for chan in chans)
|
|
||||||
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
'Waiting on cancel request to peer..\n'
|
|
||||||
f'c)=>\n'
|
|
||||||
f' |_{chan.uid}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: this is a soft wait on the channel (and its
|
|
||||||
# underlying transport protocol) to close from the
|
|
||||||
# remote peer side since we presume that any channel
|
|
||||||
# which is mapped to a sub-actor (i.e. it's managed
|
|
||||||
# by local actor-nursery) has a message that is sent
|
|
||||||
# to the peer likely by this actor (which may be in
|
|
||||||
# a shutdown sequence due to cancellation) when the
|
|
||||||
# local runtime here is now cancelled while
|
|
||||||
# (presumably) in the middle of msg loop processing.
|
|
||||||
chan_info: str = (
|
|
||||||
f'{chan.uid}\n'
|
|
||||||
f'|_{chan}\n'
|
|
||||||
f' |_{chan.transport}\n\n'
|
|
||||||
)
|
|
||||||
with trio.move_on_after(0.5) as drain_cs:
|
|
||||||
drain_cs.shield = True
|
|
||||||
|
|
||||||
# attempt to wait for the far end to close the
|
|
||||||
# channel and bail after timeout (a 2-generals
|
|
||||||
# problem on closure).
|
|
||||||
assert chan.transport
|
|
||||||
async for msg in chan.transport.drain():
|
|
||||||
|
|
||||||
# try to deliver any lingering msgs
|
|
||||||
# before we destroy the channel.
|
|
||||||
# This accomplishes deterministic
|
|
||||||
# ``Portal.cancel_actor()`` cancellation by
|
|
||||||
# making sure any RPC response to that call is
|
|
||||||
# delivered the local calling task.
|
|
||||||
# TODO: factor this into a helper?
|
|
||||||
log.warning(
|
|
||||||
'Draining msg from disconnected peer\n'
|
|
||||||
f'{chan_info}'
|
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
|
||||||
# cid: str|None = msg.get('cid')
|
|
||||||
cid: str|None = msg.cid
|
|
||||||
if cid:
|
|
||||||
# deliver response to local caller/waiter
|
|
||||||
await actor._deliver_ctx_payload(
|
|
||||||
chan,
|
|
||||||
cid,
|
|
||||||
msg,
|
|
||||||
)
|
|
||||||
if drain_cs.cancelled_caught:
|
|
||||||
log.warning(
|
|
||||||
'Timed out waiting on IPC transport channel to drain?\n'
|
|
||||||
f'{chan_info}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX NOTE XXX when no explicit call to
|
|
||||||
# `open_root_actor()` was made by the application
|
|
||||||
# (normally we implicitly make that call inside
|
|
||||||
# the first `.open_nursery()` in root-actor
|
|
||||||
# user/app code), we can assume that either we
|
|
||||||
# are NOT the root actor or are root but the
|
|
||||||
# runtime was started manually. and thus DO have
|
|
||||||
# to wait for the nursery-enterer to exit before
|
|
||||||
# shutting down the local runtime to avoid
|
|
||||||
# clobbering any ongoing subactor
|
|
||||||
# teardown/debugging/graceful-cancel.
|
|
||||||
#
|
|
||||||
# see matching note inside `._supervise.open_nursery()`
|
|
||||||
#
|
|
||||||
# TODO: should we have a separate cs + timeout
|
|
||||||
# block here?
|
|
||||||
if (
|
|
||||||
# XXX SO either,
|
|
||||||
# - not root OR,
|
|
||||||
# - is root but `open_root_actor()` was
|
|
||||||
# entered manually (in which case we do
|
|
||||||
# the equiv wait there using the
|
|
||||||
# `devx.debug` sub-sys APIs).
|
|
||||||
not local_nursery._implicit_runtime_started
|
|
||||||
):
|
|
||||||
log.runtime(
|
|
||||||
'Waiting on local actor nursery to exit..\n'
|
|
||||||
f'|_{local_nursery}\n'
|
|
||||||
)
|
|
||||||
with trio.move_on_after(0.5) as an_exit_cs:
|
|
||||||
an_exit_cs.shield = True
|
|
||||||
await local_nursery.exited.wait()
|
|
||||||
|
|
||||||
# TODO: currently this is always triggering for every
|
|
||||||
# sub-daemon spawned from the `piker.services._mngr`?
|
|
||||||
# -[ ] how do we ensure that the IPC is supposed to
|
|
||||||
# be long lived and isn't just a register?
|
|
||||||
# |_ in the register case how can we signal that the
|
|
||||||
# ephemeral msg loop was intentional?
|
|
||||||
if (
|
|
||||||
# not local_nursery._implicit_runtime_started
|
|
||||||
# and
|
|
||||||
an_exit_cs.cancelled_caught
|
|
||||||
):
|
|
||||||
report: str = (
|
|
||||||
'Timed out waiting on local actor-nursery to exit?\n'
|
|
||||||
f'c)>\n'
|
|
||||||
f' |_{local_nursery}\n'
|
|
||||||
)
|
|
||||||
if children := local_nursery._children:
|
|
||||||
# indent from above local-nurse repr
|
|
||||||
report += (
|
|
||||||
f' |_{pformat(children)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
log.warning(report)
|
|
||||||
|
|
||||||
if disconnected:
|
|
||||||
# if the transport died and this actor is still
|
|
||||||
# registered within a local nursery, we report
|
|
||||||
# that the IPC layer may have failed
|
|
||||||
# unexpectedly since it may be the cause of
|
|
||||||
# other downstream errors.
|
|
||||||
entry: tuple|None = local_nursery._children.get(uid)
|
|
||||||
if entry:
|
|
||||||
proc: trio.Process
|
|
||||||
_, proc, _ = entry
|
|
||||||
|
|
||||||
if (
|
|
||||||
(poll := getattr(proc, 'poll', None))
|
|
||||||
and
|
|
||||||
poll() is None # proc still alive
|
|
||||||
):
|
|
||||||
# TODO: change log level based on
|
|
||||||
# detecting whether chan was created for
|
|
||||||
# ephemeral `.register_actor()` request!
|
|
||||||
# -[ ] also, that should be avoidable by
|
|
||||||
# re-using any existing chan from the
|
|
||||||
# `._discovery.get_registry()` call as
|
|
||||||
# well..
|
|
||||||
log.runtime(
|
|
||||||
f'Peer IPC broke but subproc is alive?\n\n'
|
|
||||||
|
|
||||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
|
||||||
f' |_{proc}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# ``Channel`` teardown and closure sequence
|
# ``Channel`` teardown and closure sequence
|
||||||
# drop ref to channel so it can be gc-ed and disconnected
|
# drop ref to channel so it can be gc-ed and disconnected
|
||||||
|
@ -467,11 +515,11 @@ async def handle_stream_from_peer(
|
||||||
# from broken debug TTY locking due to
|
# from broken debug TTY locking due to
|
||||||
# msg-spec races on application using RunVar...
|
# msg-spec races on application using RunVar...
|
||||||
if (
|
if (
|
||||||
|
local_nursery
|
||||||
|
and
|
||||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||||
and
|
and
|
||||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
(pdb_user_uid := ctx_in_debug.chan.uid)
|
||||||
and
|
|
||||||
local_nursery
|
|
||||||
):
|
):
|
||||||
entry: tuple|None = local_nursery._children.get(
|
entry: tuple|None = local_nursery._children.get(
|
||||||
tuple(pdb_user_uid)
|
tuple(pdb_user_uid)
|
||||||
|
@ -804,7 +852,6 @@ class IPCServer(Struct):
|
||||||
async def listen_on(
|
async def listen_on(
|
||||||
self,
|
self,
|
||||||
*,
|
*,
|
||||||
actor: Actor,
|
|
||||||
accept_addrs: list[tuple[str, int|str]]|None = None,
|
accept_addrs: list[tuple[str, int|str]]|None = None,
|
||||||
stream_handler_nursery: Nursery|None = None,
|
stream_handler_nursery: Nursery|None = None,
|
||||||
) -> list[IPCEndpoint]:
|
) -> list[IPCEndpoint]:
|
||||||
|
@ -837,20 +884,19 @@ class IPCServer(Struct):
|
||||||
f'{self}\n'
|
f'{self}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Binding to endpoints for,\n'
|
f'Binding to endpoints for,\n'
|
||||||
f'{accept_addrs}\n'
|
f'{accept_addrs}\n'
|
||||||
)
|
)
|
||||||
eps: list[IPCEndpoint] = await self._parent_tn.start(
|
eps: list[IPCEndpoint] = await self._parent_tn.start(
|
||||||
partial(
|
partial(
|
||||||
_serve_ipc_eps,
|
_serve_ipc_eps,
|
||||||
actor=actor,
|
|
||||||
server=self,
|
server=self,
|
||||||
stream_handler_tn=stream_handler_nursery,
|
stream_handler_tn=stream_handler_nursery,
|
||||||
listen_addrs=accept_addrs,
|
listen_addrs=accept_addrs,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Started IPC endpoints\n'
|
f'Started IPC endpoints\n'
|
||||||
f'{eps}\n'
|
f'{eps}\n'
|
||||||
)
|
)
|
||||||
|
@ -873,7 +919,6 @@ class IPCServer(Struct):
|
||||||
|
|
||||||
async def _serve_ipc_eps(
|
async def _serve_ipc_eps(
|
||||||
*,
|
*,
|
||||||
actor: Actor,
|
|
||||||
server: IPCServer,
|
server: IPCServer,
|
||||||
stream_handler_tn: Nursery,
|
stream_handler_tn: Nursery,
|
||||||
listen_addrs: list[tuple[str, int|str]],
|
listen_addrs: list[tuple[str, int|str]],
|
||||||
|
@ -907,12 +952,13 @@ async def _serve_ipc_eps(
|
||||||
stream_handler_tn=stream_handler_tn,
|
stream_handler_tn=stream_handler_tn,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Starting new endpoint listener\n'
|
f'Starting new endpoint listener\n'
|
||||||
f'{ep}\n'
|
f'{ep}\n'
|
||||||
)
|
)
|
||||||
listener: trio.abc.Listener = await ep.start_listener()
|
listener: trio.abc.Listener = await ep.start_listener()
|
||||||
assert listener is ep._listener
|
assert listener is ep._listener
|
||||||
|
# actor = _state.current_actor()
|
||||||
# if actor.is_registry:
|
# if actor.is_registry:
|
||||||
# import pdbp; pdbp.set_trace()
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
|
@ -937,7 +983,6 @@ async def _serve_ipc_eps(
|
||||||
handler=partial(
|
handler=partial(
|
||||||
handle_stream_from_peer,
|
handle_stream_from_peer,
|
||||||
server=server,
|
server=server,
|
||||||
actor=actor,
|
|
||||||
),
|
),
|
||||||
listeners=listeners,
|
listeners=listeners,
|
||||||
|
|
||||||
|
@ -948,13 +993,13 @@ async def _serve_ipc_eps(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# TODO, wow make this message better! XD
|
# TODO, wow make this message better! XD
|
||||||
log.info(
|
log.runtime(
|
||||||
'Started server(s)\n'
|
'Started server(s)\n'
|
||||||
+
|
+
|
||||||
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
'\n'.join([f'|_{addr}' for addr in listen_addrs])
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info(
|
log.runtime(
|
||||||
f'Started IPC endpoints\n'
|
f'Started IPC endpoints\n'
|
||||||
f'{eps}\n'
|
f'{eps}\n'
|
||||||
)
|
)
|
||||||
|
@ -970,6 +1015,7 @@ async def _serve_ipc_eps(
|
||||||
ep.close_listener()
|
ep.close_listener()
|
||||||
server._endpoints.remove(ep)
|
server._endpoints.remove(ep)
|
||||||
|
|
||||||
|
# actor = _state.current_actor()
|
||||||
# if actor.is_arbiter:
|
# if actor.is_arbiter:
|
||||||
# import pdbp; pdbp.set_trace()
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
|
@ -980,7 +1026,6 @@ async def _serve_ipc_eps(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_ipc_server(
|
async def open_ipc_server(
|
||||||
actor: Actor,
|
|
||||||
parent_tn: Nursery|None = None,
|
parent_tn: Nursery|None = None,
|
||||||
stream_handler_tn: Nursery|None = None,
|
stream_handler_tn: Nursery|None = None,
|
||||||
|
|
||||||
|
|
|
@ -127,6 +127,11 @@ async def start_listener(
|
||||||
Start a TCP socket listener on the given `TCPAddress`.
|
Start a TCP socket listener on the given `TCPAddress`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
log.info(
|
||||||
|
f'Attempting to bind TCP socket\n'
|
||||||
|
f'>[\n'
|
||||||
|
f'|_{addr}\n'
|
||||||
|
)
|
||||||
# ?TODO, maybe we should just change the lower-level call this is
|
# ?TODO, maybe we should just change the lower-level call this is
|
||||||
# using internall per-listener?
|
# using internall per-listener?
|
||||||
listeners: list[SocketListener] = await open_tcp_listeners(
|
listeners: list[SocketListener] = await open_tcp_listeners(
|
||||||
|
@ -140,6 +145,12 @@ async def start_listener(
|
||||||
assert len(listeners) == 1
|
assert len(listeners) == 1
|
||||||
listener = listeners[0]
|
listener = listeners[0]
|
||||||
host, port = listener.socket.getsockname()[:2]
|
host, port = listener.socket.getsockname()[:2]
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f'Listening on TCP socket\n'
|
||||||
|
f'[>\n'
|
||||||
|
f' |_{addr}\n'
|
||||||
|
)
|
||||||
return listener
|
return listener
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -484,7 +484,7 @@ def _run_asyncio_task(
|
||||||
raise_not_found=False,
|
raise_not_found=False,
|
||||||
))
|
))
|
||||||
):
|
):
|
||||||
log.info(
|
log.devx(
|
||||||
f'Bestowing `greenback` portal for `asyncio`-task\n'
|
f'Bestowing `greenback` portal for `asyncio`-task\n'
|
||||||
f'{task}\n'
|
f'{task}\n'
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue