Compare commits

...

9 Commits

Author SHA1 Message Date
Tyler Goodlet 99c5d05aa6 Drop "
" from tail of `BoxedMaybeException.pformat()`
2025-06-11 20:00:30 -04:00
Tyler Goodlet b3635aaf6d Drop `.to_asyncio`s usage-of-`greenback`-reporting to `log.devx()` 2025-06-11 19:57:19 -04:00
Tyler Goodlet 304117b4e1 Disable `greenback` sync fn breakpointing by def
Opting for performance over broad multi-actor "debug-ability" from
sync-function-contexts when `debug_mode=True` is set;
IOW prefer no behind-the-scenes `greenlet` perf impact over being
able to use an actor-safe `breakpoint()` wherever as per,
https://greenback.readthedocs.io/en/latest/principle.html#performance

Adjust the breakpoint restore ex script to match.
2025-06-11 19:56:12 -04:00
Tyler Goodlet 37377e8220 Prevent `test_breakpoint_hook_restored` subproc hangs
If the underlying example script fails (say due to a console output
pattern-mismatch, `AssertionError`) the `pexpect` managed subproc with
a `debug_mode=True` crash-handling-REPL engaged will ofc *not terminate*
due to any SIGINT sent by the test harnesss (since we shield from it as
part of normal sub-actor debugger operation). So instead always send
a 'continue' cmd to the active `PdbREPL`'s stdin so it deactivates and
allows the py-script-process to raise and terminate, unblocking the
`pexpect.spawn`'s internal subproc joiner (which would otherwise hang
without manual intervention, blocking downstream tests..).

Also, use the new `PexpectSpawner` type alias after actually importing
future annots.. XD
2025-06-11 19:47:55 -04:00
Tyler Goodlet 6fd08c6e32 Type alias our `pexpect.spawn()` closure fixture
Such that we can more easily annotate any consumer test's of our
`.tests.devx.conftest.spawn()` fixture which delivers a closure which, when
called in a test fn body, transitively sub-invokes:
`pytest.Pytester.spawn()` -> `pexpect.spawn()`

IMO Expecting `Callable[[str], pexpect.pty_spawn.spawn]]` to be used all
over is a bit too.. verbose?
2025-06-11 19:19:56 -04:00
Tyler Goodlet 87049b991f Type annot the `testdir` fixture 2025-06-11 18:29:24 -04:00
Tyler Goodlet 3d6a194669 Start a very basic ipc-server unit test suite
For now it just boots a server, parametrized over all tpt-protos, sin
any actor runtime bootup. Obvi the future todo is ensuring it all works
with a client connecting via the equivalent lowlevel
`.ipc._chan._connect_chan()` API(s).
2025-06-11 18:26:21 -04:00
Tyler Goodlet 082d373947 Decouple actor-state from low-level ipc-server
As much as is possible given we currently do some graceful
cancellation join-waiting on any connected sub-actors whenever an active
`local_nursery: AcrtorNursery` in the post-rpc teardown sequence of
`handle_stream_from_peer()` is detected. In such cases we try to allow
the higher level inter-actor (task) context(s) to fully cancelled-ack
before conducting IPC machinery shutdown.

The main immediate motivation for all this is to support unit testing
the `.ipc._server` APIs but in the future may be useful for anyone
wanting to use our modular IPC transport layer sin-"actors".

Impl deats,
- drop passing an `actor: Actor` ref from as many routines in
  `.ipc._server` as possible instead opting to use
  `._state.current_actor()` where abs needed; thus the fns dropping an
  `actor` input param are:
  - `open_ipc_server()`
  - `IPCServer.listen_on()`
  - `._serve_ipc_eps()`
  - `.handle_stream_from_peer()`
- factor the above mentioned graceful remote-cancel-ack waiting into
  a new `maybe_wait_on_canced_subs()` which is called from
  `handle_stream_from_peer()` and delivers a
  maybe-`local_nursery: ActorNursery` for downstream logic; it's this
  new fn which primarily still needs to call `current_actor()`.
- in `handle_stream_from_peer()` also use `current_actor()` to check if
  a handshake is needed (or if it was called as part of some
  actor-runtime-less operation like our unit test suite!).
- also don't pass an `actor` to `._rpc.process_messages()` see how-n-why
  below..

Surrounding ipc-server client/caller adjustments,
- `._rpc.process_messages()` no longer takes an `actor` input and
  now calls `current_actor()` instead.
- `._portal.open_portal()` is adjusted to ^.
- `._runtime.async_main()` is adjusted to the `.ipc._server`'s removal
  of `actor` ref passing.

Also,
- drop some server `log.info()`s to `.runtime()`
2025-06-11 18:25:08 -04:00
Tyler Goodlet a7bbcdbeb8 Log listener bind status for TCP as for UDS 2025-06-11 11:29:23 -04:00
14 changed files with 380 additions and 221 deletions

View File

@ -19,8 +19,11 @@ async def main() -> None:
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='devx', loglevel='devx',
) as an: maybe_enable_greenback=True,
assert an # ^XXX REQUIRED to enable `breakpoint()` support (from sync
# fns) and thus required here to avoid an assertion err
# on the next line
):
assert ( assert (
(pybp_var := os.environ['PYTHONBREAKPOINT']) (pybp_var := os.environ['PYTHONBREAKPOINT'])
== ==

View File

@ -103,7 +103,7 @@ def sig_prog(
def daemon( def daemon(
debug_mode: bool, debug_mode: bool,
loglevel: str, loglevel: str,
testdir, testdir: pytest.Pytester,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
tpt_proto: str, tpt_proto: str,

View File

@ -2,9 +2,11 @@
`tractor.devx.*` tooling sub-pkg test space. `tractor.devx.*` tooling sub-pkg test space.
''' '''
from __future__ import annotations
import time import time
from typing import ( from typing import (
Callable, Callable,
TYPE_CHECKING,
) )
import pytest import pytest
@ -26,14 +28,22 @@ from ..conftest import (
_ci_env, _ci_env,
) )
if TYPE_CHECKING:
from pexpect import pty_spawn
# a fn that sub-instantiates a `pexpect.spawn()`
# and returns it.
type PexpectSpawner = Callable[[str], pty_spawn.spawn]
@pytest.fixture @pytest.fixture
def spawn( def spawn(
start_method, start_method: str,
testdir: pytest.Pytester, testdir: pytest.Pytester,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
) -> Callable[[str], None]: ) -> PexpectSpawner:
''' '''
Use the `pexpect` module shipped via `testdir.spawn()` to Use the `pexpect` module shipped via `testdir.spawn()` to
run an `./examples/..` script by name. run an `./examples/..` script by name.
@ -59,7 +69,7 @@ def spawn(
def _spawn( def _spawn(
cmd: str, cmd: str,
**mkcmd_kwargs, **mkcmd_kwargs,
): ) -> pty_spawn.spawn:
unset_colors() unset_colors()
return testdir.spawn( return testdir.spawn(
cmd=mk_cmd( cmd=mk_cmd(
@ -73,7 +83,7 @@ def spawn(
) )
# such that test-dep can pass input script name. # such that test-dep can pass input script name.
return _spawn return _spawn # the `PexpectSpawner`, type alias.
@pytest.fixture( @pytest.fixture(

View File

@ -13,9 +13,13 @@ TODO:
when debugging a problem inside the stack vs. in their app. when debugging a problem inside the stack vs. in their app.
''' '''
from __future__ import annotations
import os import os
import signal import signal
import time import time
from typing import (
TYPE_CHECKING,
)
from .conftest import ( from .conftest import (
expect, expect,
@ -29,9 +33,12 @@ from pexpect.exceptions import (
EOF, EOF,
) )
if TYPE_CHECKING:
from ..conftest import PexpectSpawner
def test_shield_pause( def test_shield_pause(
spawn, spawn: PexpectSpawner,
): ):
''' '''
Verify the `tractor.pause()/.post_mortem()` API works inside an Verify the `tractor.pause()/.post_mortem()` API works inside an
@ -126,7 +133,7 @@ def test_shield_pause(
def test_breakpoint_hook_restored( def test_breakpoint_hook_restored(
spawn, spawn: PexpectSpawner,
): ):
''' '''
Ensures our actor runtime sets a custom `breakpoint()` hook Ensures our actor runtime sets a custom `breakpoint()` hook
@ -140,16 +147,22 @@ def test_breakpoint_hook_restored(
child = spawn('restore_builtin_breakpoint') child = spawn('restore_builtin_breakpoint')
child.expect(PROMPT) child.expect(PROMPT)
assert_before( try:
child, assert_before(
[ child,
_pause_msg, [
"<Task '__main__.main'", _pause_msg,
"('root'", "<Task '__main__.main'",
"first bp, tractor hook set", "('root'",
] "first bp, tractor hook set",
) ]
child.sendline('c') )
# XXX if the above raises `AssertionError`, without sending
# the final 'continue' cmd to the REPL-active sub-process,
# we'll hang waiting for that pexpect instance to terminate..
finally:
child.sendline('c')
child.expect(PROMPT) child.expect(PROMPT)
assert_before( assert_before(
child, child,

View File

@ -0,0 +1,4 @@
'''
`tractor.ipc` subsystem(s)/unit testing suites.
'''

View File

@ -0,0 +1,72 @@
'''
High-level `.ipc._server` unit tests.
'''
from __future__ import annotations
import pytest
import trio
from tractor import (
devx,
ipc,
log,
)
from tractor._testing.addr import (
get_rando_addr,
)
# TODO, use/check-roundtripping with some of these wrapper types?
#
# from .._addr import Address
# from ._chan import Channel
# from ._transport import MsgTransport
# from ._uds import UDSAddress
# from ._tcp import TCPAddress
@pytest.mark.parametrize(
'_tpt_proto',
['uds', 'tcp']
)
def test_basic_ipc_server(
_tpt_proto: str,
debug_mode: bool,
loglevel: str,
):
# so we see the socket-listener reporting on console
log.get_console_log("INFO")
rando_addr: tuple = get_rando_addr(
tpt_proto=_tpt_proto,
)
async def main():
async with ipc._server.open_ipc_server() as server:
assert (
server._parent_tn
and
server._parent_tn is server._stream_handler_tn
)
assert server._no_more_peers.is_set()
eps: list[ipc.IPCEndpoint] = await server.listen_on(
accept_addrs=[rando_addr],
stream_handler_nursery=None,
)
assert (
len(eps) == 1
and
(ep := eps[0])._listener
and
not ep.peer_tpts
)
server._parent_tn.cancel_scope.cancel()
# !TODO! actually make a bg-task connection from a client
# using `ipc._chan._connect_chan()`
with devx.maybe_open_crash_handler(
pdb=debug_mode,
):
trio.run(main)

View File

@ -582,8 +582,7 @@ async def open_portal(
msg_loop_cs = await tn.start( msg_loop_cs = await tn.start(
partial( partial(
_rpc.process_messages, _rpc.process_messages,
actor, chan=channel,
channel,
# if the local task is cancelled we want to keep # if the local task is cancelled we want to keep
# the msg loop running until our block ends # the msg loop running until our block ends
shield=True, shield=True,

View File

@ -166,7 +166,9 @@ async def open_root_actor(
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
# ^XXX NOTE^ the perf implications of use,
# https://greenback.readthedocs.io/en/latest/principle.html#performance
enable_stack_on_sig: bool = False, enable_stack_on_sig: bool = False,
# internal logging # internal logging

View File

@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
async def process_messages( async def process_messages(
actor: Actor,
chan: Channel, chan: Channel,
shield: bool = False, shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
@ -907,6 +906,7 @@ async def process_messages(
(as utilized inside `Portal.cancel_actor()` ). (as utilized inside `Portal.cancel_actor()` ).
''' '''
actor: Actor = _state.current_actor()
assert actor._service_n # runtime state sanity assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we # TODO: once `trio` get's an "obvious way" for req/resp we

View File

@ -1302,6 +1302,10 @@ async def async_main(
the actor's "runtime" and all thus all ongoing RPC tasks. the actor's "runtime" and all thus all ongoing RPC tasks.
''' '''
# XXX NOTE, `_state._current_actor` **must** be set prior to
# calling this core runtime entrypoint!
assert actor is _state.current_actor()
actor._task: trio.Task = trio.lowlevel.current_task() actor._task: trio.Task = trio.lowlevel.current_task()
# attempt to retreive ``trio``'s sigint handler and stash it # attempt to retreive ``trio``'s sigint handler and stash it
@ -1361,7 +1365,6 @@ async def async_main(
) as service_nursery, ) as service_nursery,
_server.open_ipc_server( _server.open_ipc_server(
actor=actor,
parent_tn=service_nursery, parent_tn=service_nursery,
stream_handler_tn=service_nursery, stream_handler_tn=service_nursery,
) as ipc_server, ) as ipc_server,
@ -1415,7 +1418,6 @@ async def async_main(
'Booting IPC server' 'Booting IPC server'
) )
eps: list = await ipc_server.listen_on( eps: list = await ipc_server.listen_on(
actor=actor,
accept_addrs=accept_addrs, accept_addrs=accept_addrs,
stream_handler_nursery=service_nursery, stream_handler_nursery=service_nursery,
) )
@ -1500,8 +1502,7 @@ async def async_main(
await root_nursery.start( await root_nursery.start(
partial( partial(
_rpc.process_messages, _rpc.process_messages,
actor, chan=actor._parent_chan,
actor._parent_chan,
shield=True, shield=True,
) )
) )

View File

@ -105,7 +105,7 @@ class BoxedMaybeException(Struct):
''' '''
if not self.value: if not self.value:
return f'<{type(self).__name__}( .value=None )>\n' return f'<{type(self).__name__}( .value=None )>'
return ( return (
f'<{type(self.value).__name__}(\n' f'<{type(self.value).__name__}(\n'
@ -256,7 +256,6 @@ async def _maybe_enter_pm(
bool, bool,
] = lambda err: not is_multi_cancelled(err), ] = lambda err: not is_multi_cancelled(err),
**_pause_kws, **_pause_kws,
): ):
if ( if (
debug_mode() debug_mode()

View File

@ -72,11 +72,223 @@ if TYPE_CHECKING:
log = log.get_logger(__name__) log = log.get_logger(__name__)
async def maybe_wait_on_canced_subs(
uid: tuple[str, str],
chan: Channel,
disconnected: bool,
actor: Actor|None = None,
chan_drain_timeout: float = 0.5,
an_exit_timeout: float = 0.5,
) -> ActorNursery|None:
'''
When a process-local actor-nursery is found for the given actor
`uid` (i.e. that peer is **also** a subactor of this parent), we
attempt to (with timeouts) wait on,
- all IPC msgs to drain on the (common) `Channel` such that all
local `Context`-parent-tasks can also gracefully collect
`ContextCancelled` msgs from their respective remote children
vs. a `chan_drain_timeout`.
- the actor-nursery to cancel-n-join all its supervised children
(processes) *gracefully* vs. a `an_exit_timeout` and thus also
detect cases where the IPC transport connection broke but
a sub-process is detected as still alive (a case that happens
when the subactor is still in an active debugger REPL session).
If the timeout expires in either case we ofc report with warning.
'''
actor = actor or _state.current_actor()
# XXX running outside actor-runtime usage,
# - unit testing
# - possibly manual usage (eventually) ?
if not actor:
return None
local_nursery: (
ActorNursery|None
) = actor._actoruid2nursery.get(uid)
# This is set in `Portal.cancel_actor()`. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
if (
local_nursery
and (
actor._cancel_called
or
chan._cancel_called
)
#
# ^-TODO-^ along with this is there another condition
# that we should filter with to avoid entering this
# waiting block needlessly?
# -[ ] maybe `and local_nursery.cancelled` and/or
# only if the `._children` table is empty or has
# only `Portal`s with .chan._cancel_called ==
# True` as per what we had below; the MAIN DIFF
# BEING that just bc one `Portal.cancel_actor()`
# was called, doesn't mean the whole actor-nurse
# is gonna exit any time soon right!?
#
# or
# all(chan._cancel_called for chan in chans)
):
log.cancel(
'Waiting on cancel request to peer..\n'
f'c)=>\n'
f' |_{chan.uid}\n'
)
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
# which is mapped to a sub-actor (i.e. it's managed
# by local actor-nursery) has a message that is sent
# to the peer likely by this actor (which may be in
# a shutdown sequence due to cancellation) when the
# local runtime here is now cancelled while
# (presumably) in the middle of msg loop processing.
chan_info: str = (
f'{chan.uid}\n'
f'|_{chan}\n'
f' |_{chan.transport}\n\n'
)
with trio.move_on_after(chan_drain_timeout) as drain_cs:
drain_cs.shield = True
# attempt to wait for the far end to close the
# channel and bail after timeout (a 2-generals
# problem on closure).
assert chan.transport
async for msg in chan.transport.drain():
# try to deliver any lingering msgs
# before we destroy the channel.
# This accomplishes deterministic
# ``Portal.cancel_actor()`` cancellation by
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.warning(
'Draining msg from disconnected peer\n'
f'{chan_info}'
f'{pformat(msg)}\n'
)
# cid: str|None = msg.get('cid')
cid: str|None = msg.cid
if cid:
# deliver response to local caller/waiter
await actor._deliver_ctx_payload(
chan,
cid,
msg,
)
if drain_cs.cancelled_caught:
log.warning(
'Timed out waiting on IPC transport channel to drain?\n'
f'{chan_info}'
)
# XXX NOTE XXX when no explicit call to
# `open_root_actor()` was made by the application
# (normally we implicitly make that call inside
# the first `.open_nursery()` in root-actor
# user/app code), we can assume that either we
# are NOT the root actor or are root but the
# runtime was started manually. and thus DO have
# to wait for the nursery-enterer to exit before
# shutting down the local runtime to avoid
# clobbering any ongoing subactor
# teardown/debugging/graceful-cancel.
#
# see matching note inside `._supervise.open_nursery()`
#
# TODO: should we have a separate cs + timeout
# block here?
if (
# XXX SO either,
# - not root OR,
# - is root but `open_root_actor()` was
# entered manually (in which case we do
# the equiv wait there using the
# `devx.debug` sub-sys APIs).
not local_nursery._implicit_runtime_started
):
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
with trio.move_on_after(an_exit_timeout) as an_exit_cs:
an_exit_cs.shield = True
await local_nursery.exited.wait()
# TODO: currently this is always triggering for every
# sub-daemon spawned from the `piker.services._mngr`?
# -[ ] how do we ensure that the IPC is supposed to
# be long lived and isn't just a register?
# |_ in the register case how can we signal that the
# ephemeral msg loop was intentional?
if (
# not local_nursery._implicit_runtime_started
# and
an_exit_cs.cancelled_caught
):
report: str = (
'Timed out waiting on local actor-nursery to exit?\n'
f'c)>\n'
f' |_{local_nursery}\n'
)
if children := local_nursery._children:
# indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
)
log.warning(report)
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry: tuple|None = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and
poll() is None # proc still alive
):
# TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n'
f' |_{proc}\n'
)
return local_nursery
# TODO multi-tpt support with per-proto peer tracking? # TODO multi-tpt support with per-proto peer tracking?
# #
# -[x] maybe change to mod-func and rename for implied # -[x] maybe change to mod-func and rename for implied
# multi-transport semantics? # multi-transport semantics?
#
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint` # -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
# so that we can query per tpt all peer contact infos? # so that we can query per tpt all peer contact infos?
# |_[ ] possibly provide a global viewing via a # |_[ ] possibly provide a global viewing via a
@ -87,7 +299,6 @@ async def handle_stream_from_peer(
*, *,
server: IPCServer, server: IPCServer,
actor: Actor,
) -> None: ) -> None:
''' '''
@ -119,9 +330,10 @@ async def handle_stream_from_peer(
# initial handshake with peer phase # initial handshake with peer phase
try: try:
peer_aid: msgtypes.Aid = await chan._do_handshake( if actor := _state.current_actor():
aid=actor.aid, peer_aid: msgtypes.Aid = await chan._do_handshake(
) aid=actor.aid,
)
except ( except (
TransportClosed, TransportClosed,
# ^XXX NOTE, the above wraps `trio` exc types raised # ^XXX NOTE, the above wraps `trio` exc types raised
@ -222,8 +434,7 @@ async def handle_stream_from_peer(
disconnected, disconnected,
last_msg, last_msg,
) = await _rpc.process_messages( ) = await _rpc.process_messages(
actor, chan=chan,
chan,
) )
except trio.Cancelled: except trio.Cancelled:
log.cancel( log.cancel(
@ -234,179 +445,16 @@ async def handle_stream_from_peer(
raise raise
finally: finally:
local_nursery: (
ActorNursery|None
) = actor._actoruid2nursery.get(uid)
# This is set in ``Portal.cancel_actor()``. So if # check if there are subs which we should gracefully join at
# the peer was cancelled we try to wait for them # both the inter-actor-task and subprocess levels to
# to tear down their side of the connection before # gracefully remote cancel and later disconnect (particularly
# moving on with closing our own side. # for permitting subs engaged in active debug-REPL sessions).
if ( local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
local_nursery uid=uid,
and ( chan=chan,
actor._cancel_called disconnected=disconnected,
or )
chan._cancel_called
)
#
# ^-TODO-^ along with this is there another condition
# that we should filter with to avoid entering this
# waiting block needlessly?
# -[ ] maybe `and local_nursery.cancelled` and/or
# only if the `._children` table is empty or has
# only `Portal`s with .chan._cancel_called ==
# True` as per what we had below; the MAIN DIFF
# BEING that just bc one `Portal.cancel_actor()`
# was called, doesn't mean the whole actor-nurse
# is gonna exit any time soon right!?
#
# or
# all(chan._cancel_called for chan in chans)
):
log.cancel(
'Waiting on cancel request to peer..\n'
f'c)=>\n'
f' |_{chan.uid}\n'
)
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
# which is mapped to a sub-actor (i.e. it's managed
# by local actor-nursery) has a message that is sent
# to the peer likely by this actor (which may be in
# a shutdown sequence due to cancellation) when the
# local runtime here is now cancelled while
# (presumably) in the middle of msg loop processing.
chan_info: str = (
f'{chan.uid}\n'
f'|_{chan}\n'
f' |_{chan.transport}\n\n'
)
with trio.move_on_after(0.5) as drain_cs:
drain_cs.shield = True
# attempt to wait for the far end to close the
# channel and bail after timeout (a 2-generals
# problem on closure).
assert chan.transport
async for msg in chan.transport.drain():
# try to deliver any lingering msgs
# before we destroy the channel.
# This accomplishes deterministic
# ``Portal.cancel_actor()`` cancellation by
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.warning(
'Draining msg from disconnected peer\n'
f'{chan_info}'
f'{pformat(msg)}\n'
)
# cid: str|None = msg.get('cid')
cid: str|None = msg.cid
if cid:
# deliver response to local caller/waiter
await actor._deliver_ctx_payload(
chan,
cid,
msg,
)
if drain_cs.cancelled_caught:
log.warning(
'Timed out waiting on IPC transport channel to drain?\n'
f'{chan_info}'
)
# XXX NOTE XXX when no explicit call to
# `open_root_actor()` was made by the application
# (normally we implicitly make that call inside
# the first `.open_nursery()` in root-actor
# user/app code), we can assume that either we
# are NOT the root actor or are root but the
# runtime was started manually. and thus DO have
# to wait for the nursery-enterer to exit before
# shutting down the local runtime to avoid
# clobbering any ongoing subactor
# teardown/debugging/graceful-cancel.
#
# see matching note inside `._supervise.open_nursery()`
#
# TODO: should we have a separate cs + timeout
# block here?
if (
# XXX SO either,
# - not root OR,
# - is root but `open_root_actor()` was
# entered manually (in which case we do
# the equiv wait there using the
# `devx.debug` sub-sys APIs).
not local_nursery._implicit_runtime_started
):
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
with trio.move_on_after(0.5) as an_exit_cs:
an_exit_cs.shield = True
await local_nursery.exited.wait()
# TODO: currently this is always triggering for every
# sub-daemon spawned from the `piker.services._mngr`?
# -[ ] how do we ensure that the IPC is supposed to
# be long lived and isn't just a register?
# |_ in the register case how can we signal that the
# ephemeral msg loop was intentional?
if (
# not local_nursery._implicit_runtime_started
# and
an_exit_cs.cancelled_caught
):
report: str = (
'Timed out waiting on local actor-nursery to exit?\n'
f'c)>\n'
f' |_{local_nursery}\n'
)
if children := local_nursery._children:
# indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
)
log.warning(report)
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry: tuple|None = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and
poll() is None # proc still alive
):
# TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n'
f' |_{proc}\n'
)
# ``Channel`` teardown and closure sequence # ``Channel`` teardown and closure sequence
# drop ref to channel so it can be gc-ed and disconnected # drop ref to channel so it can be gc-ed and disconnected
@ -467,11 +515,11 @@ async def handle_stream_from_peer(
# from broken debug TTY locking due to # from broken debug TTY locking due to
# msg-spec races on application using RunVar... # msg-spec races on application using RunVar...
if ( if (
local_nursery
and
(ctx_in_debug := pdb_lock.ctx_in_debug) (ctx_in_debug := pdb_lock.ctx_in_debug)
and and
(pdb_user_uid := ctx_in_debug.chan.uid) (pdb_user_uid := ctx_in_debug.chan.uid)
and
local_nursery
): ):
entry: tuple|None = local_nursery._children.get( entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid) tuple(pdb_user_uid)
@ -804,7 +852,6 @@ class IPCServer(Struct):
async def listen_on( async def listen_on(
self, self,
*, *,
actor: Actor,
accept_addrs: list[tuple[str, int|str]]|None = None, accept_addrs: list[tuple[str, int|str]]|None = None,
stream_handler_nursery: Nursery|None = None, stream_handler_nursery: Nursery|None = None,
) -> list[IPCEndpoint]: ) -> list[IPCEndpoint]:
@ -837,20 +884,19 @@ class IPCServer(Struct):
f'{self}\n' f'{self}\n'
) )
log.info( log.runtime(
f'Binding to endpoints for,\n' f'Binding to endpoints for,\n'
f'{accept_addrs}\n' f'{accept_addrs}\n'
) )
eps: list[IPCEndpoint] = await self._parent_tn.start( eps: list[IPCEndpoint] = await self._parent_tn.start(
partial( partial(
_serve_ipc_eps, _serve_ipc_eps,
actor=actor,
server=self, server=self,
stream_handler_tn=stream_handler_nursery, stream_handler_tn=stream_handler_nursery,
listen_addrs=accept_addrs, listen_addrs=accept_addrs,
) )
) )
log.info( log.runtime(
f'Started IPC endpoints\n' f'Started IPC endpoints\n'
f'{eps}\n' f'{eps}\n'
) )
@ -873,7 +919,6 @@ class IPCServer(Struct):
async def _serve_ipc_eps( async def _serve_ipc_eps(
*, *,
actor: Actor,
server: IPCServer, server: IPCServer,
stream_handler_tn: Nursery, stream_handler_tn: Nursery,
listen_addrs: list[tuple[str, int|str]], listen_addrs: list[tuple[str, int|str]],
@ -907,12 +952,13 @@ async def _serve_ipc_eps(
stream_handler_tn=stream_handler_tn, stream_handler_tn=stream_handler_tn,
) )
try: try:
log.info( log.runtime(
f'Starting new endpoint listener\n' f'Starting new endpoint listener\n'
f'{ep}\n' f'{ep}\n'
) )
listener: trio.abc.Listener = await ep.start_listener() listener: trio.abc.Listener = await ep.start_listener()
assert listener is ep._listener assert listener is ep._listener
# actor = _state.current_actor()
# if actor.is_registry: # if actor.is_registry:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
@ -937,7 +983,6 @@ async def _serve_ipc_eps(
handler=partial( handler=partial(
handle_stream_from_peer, handle_stream_from_peer,
server=server, server=server,
actor=actor,
), ),
listeners=listeners, listeners=listeners,
@ -948,13 +993,13 @@ async def _serve_ipc_eps(
) )
) )
# TODO, wow make this message better! XD # TODO, wow make this message better! XD
log.info( log.runtime(
'Started server(s)\n' 'Started server(s)\n'
+ +
'\n'.join([f'|_{addr}' for addr in listen_addrs]) '\n'.join([f'|_{addr}' for addr in listen_addrs])
) )
log.info( log.runtime(
f'Started IPC endpoints\n' f'Started IPC endpoints\n'
f'{eps}\n' f'{eps}\n'
) )
@ -970,6 +1015,7 @@ async def _serve_ipc_eps(
ep.close_listener() ep.close_listener()
server._endpoints.remove(ep) server._endpoints.remove(ep)
# actor = _state.current_actor()
# if actor.is_arbiter: # if actor.is_arbiter:
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
@ -980,7 +1026,6 @@ async def _serve_ipc_eps(
@acm @acm
async def open_ipc_server( async def open_ipc_server(
actor: Actor,
parent_tn: Nursery|None = None, parent_tn: Nursery|None = None,
stream_handler_tn: Nursery|None = None, stream_handler_tn: Nursery|None = None,

View File

@ -127,6 +127,11 @@ async def start_listener(
Start a TCP socket listener on the given `TCPAddress`. Start a TCP socket listener on the given `TCPAddress`.
''' '''
log.info(
f'Attempting to bind TCP socket\n'
f'>[\n'
f'|_{addr}\n'
)
# ?TODO, maybe we should just change the lower-level call this is # ?TODO, maybe we should just change the lower-level call this is
# using internall per-listener? # using internall per-listener?
listeners: list[SocketListener] = await open_tcp_listeners( listeners: list[SocketListener] = await open_tcp_listeners(
@ -140,6 +145,12 @@ async def start_listener(
assert len(listeners) == 1 assert len(listeners) == 1
listener = listeners[0] listener = listeners[0]
host, port = listener.socket.getsockname()[:2] host, port = listener.socket.getsockname()[:2]
log.info(
f'Listening on TCP socket\n'
f'[>\n'
f' |_{addr}\n'
)
return listener return listener

View File

@ -484,7 +484,7 @@ def _run_asyncio_task(
raise_not_found=False, raise_not_found=False,
)) ))
): ):
log.info( log.devx(
f'Bestowing `greenback` portal for `asyncio`-task\n' f'Bestowing `greenback` portal for `asyncio`-task\n'
f'{task}\n' f'{task}\n'
) )