Compare commits

..

No commits in common. "99c5d05aa60a1723a9d3aa159c83cd9c4fadd2d0" and "2a37f6abddb4ebd7a49505c92b4fca0a3cb0d61e" have entirely different histories.

14 changed files with 221 additions and 380 deletions

View File

@ -19,11 +19,8 @@ async def main() -> None:
async with tractor.open_nursery(
debug_mode=True,
loglevel='devx',
maybe_enable_greenback=True,
# ^XXX REQUIRED to enable `breakpoint()` support (from sync
# fns) and thus required here to avoid an assertion err
# on the next line
):
) as an:
assert an
assert (
(pybp_var := os.environ['PYTHONBREAKPOINT'])
==

View File

@ -103,7 +103,7 @@ def sig_prog(
def daemon(
debug_mode: bool,
loglevel: str,
testdir: pytest.Pytester,
testdir,
reg_addr: tuple[str, int],
tpt_proto: str,

View File

@ -2,11 +2,9 @@
`tractor.devx.*` tooling sub-pkg test space.
'''
from __future__ import annotations
import time
from typing import (
Callable,
TYPE_CHECKING,
)
import pytest
@ -28,22 +26,14 @@ from ..conftest import (
_ci_env,
)
if TYPE_CHECKING:
from pexpect import pty_spawn
# a fn that sub-instantiates a `pexpect.spawn()`
# and returns it.
type PexpectSpawner = Callable[[str], pty_spawn.spawn]
@pytest.fixture
def spawn(
start_method: str,
start_method,
testdir: pytest.Pytester,
reg_addr: tuple[str, int],
) -> PexpectSpawner:
) -> Callable[[str], None]:
'''
Use the `pexpect` module shipped via `testdir.spawn()` to
run an `./examples/..` script by name.
@ -69,7 +59,7 @@ def spawn(
def _spawn(
cmd: str,
**mkcmd_kwargs,
) -> pty_spawn.spawn:
):
unset_colors()
return testdir.spawn(
cmd=mk_cmd(
@ -83,7 +73,7 @@ def spawn(
)
# such that test-dep can pass input script name.
return _spawn # the `PexpectSpawner`, type alias.
return _spawn
@pytest.fixture(

View File

@ -13,13 +13,9 @@ TODO:
when debugging a problem inside the stack vs. in their app.
'''
from __future__ import annotations
import os
import signal
import time
from typing import (
TYPE_CHECKING,
)
from .conftest import (
expect,
@ -33,12 +29,9 @@ from pexpect.exceptions import (
EOF,
)
if TYPE_CHECKING:
from ..conftest import PexpectSpawner
def test_shield_pause(
spawn: PexpectSpawner,
spawn,
):
'''
Verify the `tractor.pause()/.post_mortem()` API works inside an
@ -133,7 +126,7 @@ def test_shield_pause(
def test_breakpoint_hook_restored(
spawn: PexpectSpawner,
spawn,
):
'''
Ensures our actor runtime sets a custom `breakpoint()` hook
@ -147,7 +140,6 @@ def test_breakpoint_hook_restored(
child = spawn('restore_builtin_breakpoint')
child.expect(PROMPT)
try:
assert_before(
child,
[
@ -157,12 +149,7 @@ def test_breakpoint_hook_restored(
"first bp, tractor hook set",
]
)
# XXX if the above raises `AssertionError`, without sending
# the final 'continue' cmd to the REPL-active sub-process,
# we'll hang waiting for that pexpect instance to terminate..
finally:
child.sendline('c')
child.expect(PROMPT)
assert_before(
child,

View File

@ -1,4 +0,0 @@
'''
`tractor.ipc` subsystem(s)/unit testing suites.
'''

View File

@ -1,72 +0,0 @@
'''
High-level `.ipc._server` unit tests.
'''
from __future__ import annotations
import pytest
import trio
from tractor import (
devx,
ipc,
log,
)
from tractor._testing.addr import (
get_rando_addr,
)
# TODO, use/check-roundtripping with some of these wrapper types?
#
# from .._addr import Address
# from ._chan import Channel
# from ._transport import MsgTransport
# from ._uds import UDSAddress
# from ._tcp import TCPAddress
@pytest.mark.parametrize(
'_tpt_proto',
['uds', 'tcp']
)
def test_basic_ipc_server(
_tpt_proto: str,
debug_mode: bool,
loglevel: str,
):
# so we see the socket-listener reporting on console
log.get_console_log("INFO")
rando_addr: tuple = get_rando_addr(
tpt_proto=_tpt_proto,
)
async def main():
async with ipc._server.open_ipc_server() as server:
assert (
server._parent_tn
and
server._parent_tn is server._stream_handler_tn
)
assert server._no_more_peers.is_set()
eps: list[ipc.IPCEndpoint] = await server.listen_on(
accept_addrs=[rando_addr],
stream_handler_nursery=None,
)
assert (
len(eps) == 1
and
(ep := eps[0])._listener
and
not ep.peer_tpts
)
server._parent_tn.cancel_scope.cancel()
# !TODO! actually make a bg-task connection from a client
# using `ipc._chan._connect_chan()`
with devx.maybe_open_crash_handler(
pdb=debug_mode,
):
trio.run(main)

View File

@ -582,7 +582,8 @@ async def open_portal(
msg_loop_cs = await tn.start(
partial(
_rpc.process_messages,
chan=channel,
actor,
channel,
# if the local task is cancelled we want to keep
# the msg loop running until our block ends
shield=True,

View File

@ -166,9 +166,7 @@ async def open_root_actor(
# enables the multi-process debugger support
debug_mode: bool = False,
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
# ^XXX NOTE^ the perf implications of use,
# https://greenback.readthedocs.io/en/latest/principle.html#performance
maybe_enable_greenback: bool = True, # `.pause_from_sync()/breakpoint()` support
enable_stack_on_sig: bool = False,
# internal logging

View File

@ -869,6 +869,7 @@ async def try_ship_error_to_remote(
async def process_messages(
actor: Actor,
chan: Channel,
shield: bool = False,
task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
@ -906,7 +907,6 @@ async def process_messages(
(as utilized inside `Portal.cancel_actor()` ).
'''
actor: Actor = _state.current_actor()
assert actor._service_n # runtime state sanity
# TODO: once `trio` get's an "obvious way" for req/resp we

View File

@ -1302,10 +1302,6 @@ async def async_main(
the actor's "runtime" and all thus all ongoing RPC tasks.
'''
# XXX NOTE, `_state._current_actor` **must** be set prior to
# calling this core runtime entrypoint!
assert actor is _state.current_actor()
actor._task: trio.Task = trio.lowlevel.current_task()
# attempt to retreive ``trio``'s sigint handler and stash it
@ -1365,6 +1361,7 @@ async def async_main(
) as service_nursery,
_server.open_ipc_server(
actor=actor,
parent_tn=service_nursery,
stream_handler_tn=service_nursery,
) as ipc_server,
@ -1418,6 +1415,7 @@ async def async_main(
'Booting IPC server'
)
eps: list = await ipc_server.listen_on(
actor=actor,
accept_addrs=accept_addrs,
stream_handler_nursery=service_nursery,
)
@ -1502,7 +1500,8 @@ async def async_main(
await root_nursery.start(
partial(
_rpc.process_messages,
chan=actor._parent_chan,
actor,
actor._parent_chan,
shield=True,
)
)

View File

@ -105,7 +105,7 @@ class BoxedMaybeException(Struct):
'''
if not self.value:
return f'<{type(self).__name__}( .value=None )>'
return f'<{type(self).__name__}( .value=None )>\n'
return (
f'<{type(self.value).__name__}(\n'
@ -256,6 +256,7 @@ async def _maybe_enter_pm(
bool,
] = lambda err: not is_multi_cancelled(err),
**_pause_kws,
):
if (
debug_mode()

View File

@ -72,223 +72,11 @@ if TYPE_CHECKING:
log = log.get_logger(__name__)
async def maybe_wait_on_canced_subs(
uid: tuple[str, str],
chan: Channel,
disconnected: bool,
actor: Actor|None = None,
chan_drain_timeout: float = 0.5,
an_exit_timeout: float = 0.5,
) -> ActorNursery|None:
'''
When a process-local actor-nursery is found for the given actor
`uid` (i.e. that peer is **also** a subactor of this parent), we
attempt to (with timeouts) wait on,
- all IPC msgs to drain on the (common) `Channel` such that all
local `Context`-parent-tasks can also gracefully collect
`ContextCancelled` msgs from their respective remote children
vs. a `chan_drain_timeout`.
- the actor-nursery to cancel-n-join all its supervised children
(processes) *gracefully* vs. a `an_exit_timeout` and thus also
detect cases where the IPC transport connection broke but
a sub-process is detected as still alive (a case that happens
when the subactor is still in an active debugger REPL session).
If the timeout expires in either case we ofc report with warning.
'''
actor = actor or _state.current_actor()
# XXX running outside actor-runtime usage,
# - unit testing
# - possibly manual usage (eventually) ?
if not actor:
return None
local_nursery: (
ActorNursery|None
) = actor._actoruid2nursery.get(uid)
# This is set in `Portal.cancel_actor()`. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
if (
local_nursery
and (
actor._cancel_called
or
chan._cancel_called
)
#
# ^-TODO-^ along with this is there another condition
# that we should filter with to avoid entering this
# waiting block needlessly?
# -[ ] maybe `and local_nursery.cancelled` and/or
# only if the `._children` table is empty or has
# only `Portal`s with .chan._cancel_called ==
# True` as per what we had below; the MAIN DIFF
# BEING that just bc one `Portal.cancel_actor()`
# was called, doesn't mean the whole actor-nurse
# is gonna exit any time soon right!?
#
# or
# all(chan._cancel_called for chan in chans)
):
log.cancel(
'Waiting on cancel request to peer..\n'
f'c)=>\n'
f' |_{chan.uid}\n'
)
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
# which is mapped to a sub-actor (i.e. it's managed
# by local actor-nursery) has a message that is sent
# to the peer likely by this actor (which may be in
# a shutdown sequence due to cancellation) when the
# local runtime here is now cancelled while
# (presumably) in the middle of msg loop processing.
chan_info: str = (
f'{chan.uid}\n'
f'|_{chan}\n'
f' |_{chan.transport}\n\n'
)
with trio.move_on_after(chan_drain_timeout) as drain_cs:
drain_cs.shield = True
# attempt to wait for the far end to close the
# channel and bail after timeout (a 2-generals
# problem on closure).
assert chan.transport
async for msg in chan.transport.drain():
# try to deliver any lingering msgs
# before we destroy the channel.
# This accomplishes deterministic
# ``Portal.cancel_actor()`` cancellation by
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.warning(
'Draining msg from disconnected peer\n'
f'{chan_info}'
f'{pformat(msg)}\n'
)
# cid: str|None = msg.get('cid')
cid: str|None = msg.cid
if cid:
# deliver response to local caller/waiter
await actor._deliver_ctx_payload(
chan,
cid,
msg,
)
if drain_cs.cancelled_caught:
log.warning(
'Timed out waiting on IPC transport channel to drain?\n'
f'{chan_info}'
)
# XXX NOTE XXX when no explicit call to
# `open_root_actor()` was made by the application
# (normally we implicitly make that call inside
# the first `.open_nursery()` in root-actor
# user/app code), we can assume that either we
# are NOT the root actor or are root but the
# runtime was started manually. and thus DO have
# to wait for the nursery-enterer to exit before
# shutting down the local runtime to avoid
# clobbering any ongoing subactor
# teardown/debugging/graceful-cancel.
#
# see matching note inside `._supervise.open_nursery()`
#
# TODO: should we have a separate cs + timeout
# block here?
if (
# XXX SO either,
# - not root OR,
# - is root but `open_root_actor()` was
# entered manually (in which case we do
# the equiv wait there using the
# `devx.debug` sub-sys APIs).
not local_nursery._implicit_runtime_started
):
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
with trio.move_on_after(an_exit_timeout) as an_exit_cs:
an_exit_cs.shield = True
await local_nursery.exited.wait()
# TODO: currently this is always triggering for every
# sub-daemon spawned from the `piker.services._mngr`?
# -[ ] how do we ensure that the IPC is supposed to
# be long lived and isn't just a register?
# |_ in the register case how can we signal that the
# ephemeral msg loop was intentional?
if (
# not local_nursery._implicit_runtime_started
# and
an_exit_cs.cancelled_caught
):
report: str = (
'Timed out waiting on local actor-nursery to exit?\n'
f'c)>\n'
f' |_{local_nursery}\n'
)
if children := local_nursery._children:
# indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
)
log.warning(report)
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry: tuple|None = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and
poll() is None # proc still alive
):
# TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n'
f' |_{proc}\n'
)
return local_nursery
# TODO multi-tpt support with per-proto peer tracking?
#
# -[x] maybe change to mod-func and rename for implied
# multi-transport semantics?
#
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
# so that we can query per tpt all peer contact infos?
# |_[ ] possibly provide a global viewing via a
@ -299,6 +87,7 @@ async def handle_stream_from_peer(
*,
server: IPCServer,
actor: Actor,
) -> None:
'''
@ -330,7 +119,6 @@ async def handle_stream_from_peer(
# initial handshake with peer phase
try:
if actor := _state.current_actor():
peer_aid: msgtypes.Aid = await chan._do_handshake(
aid=actor.aid,
)
@ -434,7 +222,8 @@ async def handle_stream_from_peer(
disconnected,
last_msg,
) = await _rpc.process_messages(
chan=chan,
actor,
chan,
)
except trio.Cancelled:
log.cancel(
@ -445,15 +234,178 @@ async def handle_stream_from_peer(
raise
finally:
local_nursery: (
ActorNursery|None
) = actor._actoruid2nursery.get(uid)
# check if there are subs which we should gracefully join at
# both the inter-actor-task and subprocess levels to
# gracefully remote cancel and later disconnect (particularly
# for permitting subs engaged in active debug-REPL sessions).
local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
uid=uid,
chan=chan,
disconnected=disconnected,
# This is set in ``Portal.cancel_actor()``. So if
# the peer was cancelled we try to wait for them
# to tear down their side of the connection before
# moving on with closing our own side.
if (
local_nursery
and (
actor._cancel_called
or
chan._cancel_called
)
#
# ^-TODO-^ along with this is there another condition
# that we should filter with to avoid entering this
# waiting block needlessly?
# -[ ] maybe `and local_nursery.cancelled` and/or
# only if the `._children` table is empty or has
# only `Portal`s with .chan._cancel_called ==
# True` as per what we had below; the MAIN DIFF
# BEING that just bc one `Portal.cancel_actor()`
# was called, doesn't mean the whole actor-nurse
# is gonna exit any time soon right!?
#
# or
# all(chan._cancel_called for chan in chans)
):
log.cancel(
'Waiting on cancel request to peer..\n'
f'c)=>\n'
f' |_{chan.uid}\n'
)
# XXX: this is a soft wait on the channel (and its
# underlying transport protocol) to close from the
# remote peer side since we presume that any channel
# which is mapped to a sub-actor (i.e. it's managed
# by local actor-nursery) has a message that is sent
# to the peer likely by this actor (which may be in
# a shutdown sequence due to cancellation) when the
# local runtime here is now cancelled while
# (presumably) in the middle of msg loop processing.
chan_info: str = (
f'{chan.uid}\n'
f'|_{chan}\n'
f' |_{chan.transport}\n\n'
)
with trio.move_on_after(0.5) as drain_cs:
drain_cs.shield = True
# attempt to wait for the far end to close the
# channel and bail after timeout (a 2-generals
# problem on closure).
assert chan.transport
async for msg in chan.transport.drain():
# try to deliver any lingering msgs
# before we destroy the channel.
# This accomplishes deterministic
# ``Portal.cancel_actor()`` cancellation by
# making sure any RPC response to that call is
# delivered the local calling task.
# TODO: factor this into a helper?
log.warning(
'Draining msg from disconnected peer\n'
f'{chan_info}'
f'{pformat(msg)}\n'
)
# cid: str|None = msg.get('cid')
cid: str|None = msg.cid
if cid:
# deliver response to local caller/waiter
await actor._deliver_ctx_payload(
chan,
cid,
msg,
)
if drain_cs.cancelled_caught:
log.warning(
'Timed out waiting on IPC transport channel to drain?\n'
f'{chan_info}'
)
# XXX NOTE XXX when no explicit call to
# `open_root_actor()` was made by the application
# (normally we implicitly make that call inside
# the first `.open_nursery()` in root-actor
# user/app code), we can assume that either we
# are NOT the root actor or are root but the
# runtime was started manually. and thus DO have
# to wait for the nursery-enterer to exit before
# shutting down the local runtime to avoid
# clobbering any ongoing subactor
# teardown/debugging/graceful-cancel.
#
# see matching note inside `._supervise.open_nursery()`
#
# TODO: should we have a separate cs + timeout
# block here?
if (
# XXX SO either,
# - not root OR,
# - is root but `open_root_actor()` was
# entered manually (in which case we do
# the equiv wait there using the
# `devx.debug` sub-sys APIs).
not local_nursery._implicit_runtime_started
):
log.runtime(
'Waiting on local actor nursery to exit..\n'
f'|_{local_nursery}\n'
)
with trio.move_on_after(0.5) as an_exit_cs:
an_exit_cs.shield = True
await local_nursery.exited.wait()
# TODO: currently this is always triggering for every
# sub-daemon spawned from the `piker.services._mngr`?
# -[ ] how do we ensure that the IPC is supposed to
# be long lived and isn't just a register?
# |_ in the register case how can we signal that the
# ephemeral msg loop was intentional?
if (
# not local_nursery._implicit_runtime_started
# and
an_exit_cs.cancelled_caught
):
report: str = (
'Timed out waiting on local actor-nursery to exit?\n'
f'c)>\n'
f' |_{local_nursery}\n'
)
if children := local_nursery._children:
# indent from above local-nurse repr
report += (
f' |_{pformat(children)}\n'
)
log.warning(report)
if disconnected:
# if the transport died and this actor is still
# registered within a local nursery, we report
# that the IPC layer may have failed
# unexpectedly since it may be the cause of
# other downstream errors.
entry: tuple|None = local_nursery._children.get(uid)
if entry:
proc: trio.Process
_, proc, _ = entry
if (
(poll := getattr(proc, 'poll', None))
and
poll() is None # proc still alive
):
# TODO: change log level based on
# detecting whether chan was created for
# ephemeral `.register_actor()` request!
# -[ ] also, that should be avoidable by
# re-using any existing chan from the
# `._discovery.get_registry()` call as
# well..
log.runtime(
f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n'
f' |_{proc}\n'
)
# ``Channel`` teardown and closure sequence
@ -515,11 +467,11 @@ async def handle_stream_from_peer(
# from broken debug TTY locking due to
# msg-spec races on application using RunVar...
if (
local_nursery
and
(ctx_in_debug := pdb_lock.ctx_in_debug)
and
(pdb_user_uid := ctx_in_debug.chan.uid)
and
local_nursery
):
entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid)
@ -852,6 +804,7 @@ class IPCServer(Struct):
async def listen_on(
self,
*,
actor: Actor,
accept_addrs: list[tuple[str, int|str]]|None = None,
stream_handler_nursery: Nursery|None = None,
) -> list[IPCEndpoint]:
@ -884,19 +837,20 @@ class IPCServer(Struct):
f'{self}\n'
)
log.runtime(
log.info(
f'Binding to endpoints for,\n'
f'{accept_addrs}\n'
)
eps: list[IPCEndpoint] = await self._parent_tn.start(
partial(
_serve_ipc_eps,
actor=actor,
server=self,
stream_handler_tn=stream_handler_nursery,
listen_addrs=accept_addrs,
)
)
log.runtime(
log.info(
f'Started IPC endpoints\n'
f'{eps}\n'
)
@ -919,6 +873,7 @@ class IPCServer(Struct):
async def _serve_ipc_eps(
*,
actor: Actor,
server: IPCServer,
stream_handler_tn: Nursery,
listen_addrs: list[tuple[str, int|str]],
@ -952,13 +907,12 @@ async def _serve_ipc_eps(
stream_handler_tn=stream_handler_tn,
)
try:
log.runtime(
log.info(
f'Starting new endpoint listener\n'
f'{ep}\n'
)
listener: trio.abc.Listener = await ep.start_listener()
assert listener is ep._listener
# actor = _state.current_actor()
# if actor.is_registry:
# import pdbp; pdbp.set_trace()
@ -983,6 +937,7 @@ async def _serve_ipc_eps(
handler=partial(
handle_stream_from_peer,
server=server,
actor=actor,
),
listeners=listeners,
@ -993,13 +948,13 @@ async def _serve_ipc_eps(
)
)
# TODO, wow make this message better! XD
log.runtime(
log.info(
'Started server(s)\n'
+
'\n'.join([f'|_{addr}' for addr in listen_addrs])
)
log.runtime(
log.info(
f'Started IPC endpoints\n'
f'{eps}\n'
)
@ -1015,7 +970,6 @@ async def _serve_ipc_eps(
ep.close_listener()
server._endpoints.remove(ep)
# actor = _state.current_actor()
# if actor.is_arbiter:
# import pdbp; pdbp.set_trace()
@ -1026,6 +980,7 @@ async def _serve_ipc_eps(
@acm
async def open_ipc_server(
actor: Actor,
parent_tn: Nursery|None = None,
stream_handler_tn: Nursery|None = None,

View File

@ -127,11 +127,6 @@ async def start_listener(
Start a TCP socket listener on the given `TCPAddress`.
'''
log.info(
f'Attempting to bind TCP socket\n'
f'>[\n'
f'|_{addr}\n'
)
# ?TODO, maybe we should just change the lower-level call this is
# using internall per-listener?
listeners: list[SocketListener] = await open_tcp_listeners(
@ -145,12 +140,6 @@ async def start_listener(
assert len(listeners) == 1
listener = listeners[0]
host, port = listener.socket.getsockname()[:2]
log.info(
f'Listening on TCP socket\n'
f'[>\n'
f' |_{addr}\n'
)
return listener

View File

@ -484,7 +484,7 @@ def _run_asyncio_task(
raise_not_found=False,
))
):
log.devx(
log.info(
f'Bestowing `greenback` portal for `asyncio`-task\n'
f'{task}\n'
)