diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index c88e0dfe..4242a543 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -17,6 +17,7 @@ from tractor import ( MsgStream, _testing, trionics, + TransportClosed, ) import trio import pytest @@ -208,12 +209,16 @@ async def main( # TODO: is this needed or no? raise - except trio.ClosedResourceError: + except ( + trio.ClosedResourceError, + TransportClosed, + ) as _tpt_err: # NOTE: don't send if we already broke the # connection to avoid raising a closed-error # such that we drop through to the ctl-c # mashing by user. - await trio.sleep(0.01) + with trio.CancelScope(shield=True): + await trio.sleep(0.01) # timeout: int = 1 # with trio.move_on_after(timeout) as cs: @@ -247,6 +252,7 @@ async def main( await stream.send(i) pytest.fail('stream not closed?') except ( + TransportClosed, trio.ClosedResourceError, trio.EndOfChannel, ) as send_err: diff --git a/tests/devx/conftest.py b/tests/devx/conftest.py index 11ec9ed2..efc74d44 100644 --- a/tests/devx/conftest.py +++ b/tests/devx/conftest.py @@ -4,6 +4,7 @@ ''' from __future__ import annotations import time +import signal from typing import ( Callable, TYPE_CHECKING, @@ -69,12 +70,15 @@ def spawn( import os os.environ['PYTHON_COLORS'] = '0' + spawned: PexpectSpawner|None = None + def _spawn( cmd: str, **mkcmd_kwargs, ) -> pty_spawn.spawn: + nonlocal spawned unset_colors() - return testdir.spawn( + spawned = testdir.spawn( cmd=mk_cmd( cmd, **mkcmd_kwargs, @@ -84,9 +88,35 @@ def spawn( # ^TODO? get `pytest` core to expose underlying # `pexpect.spawn()` stuff? ) + return spawned # such that test-dep can pass input script name. - return _spawn # the `PexpectSpawner`, type alias. + yield _spawn # the `PexpectSpawner`, type alias. + + if ( + spawned + and + (ptyproc := spawned.ptyproc) + ): + start: float = time.time() + timeout: float = 5 + while ( + ptyproc.isalive() + and + ( + (_time_took := (time.time() - start)) + < + timeout + ) + ): + ptyproc.kill(signal.SIGINT) + time.sleep(0.01) + + if ptyproc.isalive(): + ptyproc.kill(signal.SIGKILL) + + # TODO? ensure we've cleaned up any UDS-paths? + # breakpoint() @pytest.fixture( diff --git a/tests/devx/test_debugger.py b/tests/devx/test_debugger.py index cacab803..d3f9fa5d 100644 --- a/tests/devx/test_debugger.py +++ b/tests/devx/test_debugger.py @@ -1138,7 +1138,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks( ['peer IPC channel closed abruptly?', 'another task closed this fd', 'Debug lock request was CANCELLED?', - "TransportClosed: 'MsgpackUDSStream' was already closed locally ?",] + "'MsgpackUDSStream' was already closed locally?", + "TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?", + # ?TODO^? match depending on `tpt_proto(s)`? + ] # XXX races on whether these show/hit? # 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!', diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index 061ae5aa..2103f627 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -98,7 +98,8 @@ def test_ipc_channel_break_during_stream( expect_final_exc = TransportClosed mod: ModuleType = import_path( - examples_dir() / 'advanced_faults' + examples_dir() + / 'advanced_faults' / 'ipc_failure_during_stream.py', root=examples_dir(), consider_namespace_packages=False, @@ -113,8 +114,9 @@ def test_ipc_channel_break_during_stream( if ( # only expect EoC if trans is broken on the child side, ipc_break['break_child_ipc_after'] is not False + and # AND we tell the child to call `MsgStream.aclose()`. - and pre_aclose_msgstream + pre_aclose_msgstream ): # expect_final_exc = trio.EndOfChannel # ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this @@ -160,7 +162,8 @@ def test_ipc_channel_break_during_stream( ipc_break['break_child_ipc_after'] is not False and ( ipc_break['break_parent_ipc_after'] - > ipc_break['break_child_ipc_after'] + > + ipc_break['break_child_ipc_after'] ) ): if pre_aclose_msgstream: @@ -248,8 +251,15 @@ def test_ipc_channel_break_during_stream( # get raw instance from pytest wrapper value = excinfo.value if isinstance(value, ExceptionGroup): - excs = value.exceptions - assert len(excs) == 1 + excs: tuple[Exception] = value.exceptions + assert ( + len(excs) <= 2 + and + all( + isinstance(exc, TransportClosed) + for exc in excs + ) + ) final_exc = excs[0] assert isinstance(final_exc, expect_final_exc) diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 79c454f6..4df705b1 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -11,12 +11,13 @@ import trio import tractor from tractor import ( # typing Actor, - current_actor, - open_nursery, - Portal, Context, ContextCancelled, + MsgStream, + Portal, RemoteActorError, + current_actor, + open_nursery, ) from tractor._testing import ( # tractor_test, @@ -796,8 +797,8 @@ async def basic_echo_server( ) -> None: ''' - Just the simplest `MsgStream` echo server which resays what - you told it but with its uid in front ;) + Just the simplest `MsgStream` echo server which resays what you + told it but with its uid in front ;) ''' actor: Actor = tractor.current_actor() @@ -966,9 +967,14 @@ async def tell_little_bro( caller: str = '', err_after: float|None = None, - rng_seed: int = 50, + rng_seed: int = 100, + # NOTE, ensure ^ is large enough (on fast hw anyway) + # to ensure the peer cancel req arrives before the + # echoing dialog does itself Bp ): # contact target actor, do a stream dialog. + lb: Portal + echo_ipc: MsgStream async with ( tractor.wait_for_actor( name=actor_name @@ -983,7 +989,6 @@ async def tell_little_bro( else None ), ) as (sub_ctx, first), - sub_ctx.open_stream() as echo_ipc, ): actor: Actor = current_actor() @@ -994,6 +999,7 @@ async def tell_little_bro( i, ) await echo_ipc.send(msg) + await trio.sleep(0.001) resp = await echo_ipc.receive() print( f'{caller} => {actor_name}: {msg}\n' @@ -1006,6 +1012,9 @@ async def tell_little_bro( assert sub_uid != uid assert _i == i + # XXX, usually should never get here! + # await tractor.pause() + @pytest.mark.parametrize( 'raise_client_error', @@ -1020,6 +1029,9 @@ def test_peer_spawns_and_cancels_service_subactor( raise_client_error: str, reg_addr: tuple[str, int], raise_sub_spawn_error_after: float|None, + loglevel: str, + # ^XXX, set to 'warning' to see masked-exc warnings + # that may transpire during actor-nursery teardown. ): # NOTE: this tests for the modden `mod wks open piker` bug # discovered as part of implementing workspace ctx @@ -1049,6 +1061,7 @@ def test_peer_spawns_and_cancels_service_subactor( # NOTE: to halt the peer tasks on ctxc, uncomment this. debug_mode=debug_mode, registry_addrs=[reg_addr], + loglevel=loglevel, ) as an: server: Portal = await an.start_actor( (server_name := 'spawn_server'), diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index eff9a731..17003e1c 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -163,7 +163,7 @@ def test_non_registrar_spawns_child( async with sub_ptl.open_context( get_root_portal, - ) as (ctx, first): + ) as (ctx, _): print('Waiting for `sub` to connect back to us..') await an.cancel() diff --git a/tractor/_context.py b/tractor/_context.py index 8ace1b4d..5a69077e 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -70,6 +70,7 @@ from ._exceptions import ( MsgTypeError, RemoteActorError, StreamOverrun, + TransportClosed, pack_from_raise, unpack_error, ) @@ -2428,10 +2429,7 @@ async def open_context_from_portal( try: # await pause(shield=True) await ctx.cancel() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError, - ): + except TransportClosed: log.warning( 'IPC connection for context is broken?\n' f'task: {ctx.cid}\n' diff --git a/tractor/_discovery.py b/tractor/_discovery.py index edc43d86..bf4d066a 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -91,10 +91,13 @@ async def get_registry( @acm -async def get_root( - **kwargs, -) -> AsyncGenerator[Portal, None]: +async def get_root(**kwargs) -> AsyncGenerator[Portal, None]: + ''' + Deliver the current actor's "root process" actor (yes in actor + and proc tree terms) by delivering a `Portal` from the spawn-time + provided contact address. + ''' # TODO: rename mailbox to `_root_maddr` when we finally # add and impl libp2p multi-addrs? addr = _runtime_vars['_root_mailbox'] @@ -193,6 +196,11 @@ async def maybe_open_portal( addr: UnwrappedAddress, name: str, ): + ''' + Open a `Portal` to the actor serving @ `addr` or `None` if no + peer can be contacted or found. + + ''' async with query_actor( name=name, regaddr=addr, diff --git a/tractor/_portal.py b/tractor/_portal.py index a914bb80..2fc9dbb7 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -329,18 +329,7 @@ class Portal: # if we get here some weird cancellation case happened return False - except ( - # XXX, should never really get raised unless we aren't - # wrapping them in the below type by mistake? - # - # Leaving the catch here for now until we're very sure - # all the cases (for various tpt protos) have indeed been - # re-wrapped ;p - trio.ClosedResourceError, - trio.BrokenResourceError, - - TransportClosed, - ) as tpt_err: + except TransportClosed as tpt_err: ipc_borked_report: str = ( f'IPC for actor already closed/broken?\n\n' f'\n' diff --git a/tractor/_root.py b/tractor/_root.py index f480a619..6589dacb 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -524,7 +524,7 @@ async def open_root_actor( # ?TODO, per-OS non-network-proto alt options? # -[ ] on linux we should be able to always use UDS? # - raddrs: list[Address] = _state._runtime_vars['_root_addrs'] + raddrs: list[UnwrappedAddress] = _state._runtime_vars['_root_addrs'] raddrs.extend( accept_addrs, ) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 68ce56ea..ac658cb2 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -284,6 +284,15 @@ async def _errors_relayed_via_ipc( try: yield # run RPC invoke body + # NOTE, never REPL any pseudo-expected tpt-disconnect. + except TransportClosed as err: + rpc_err = err + log.warning( + f'Tpt disconnect during remote-exc relay due to,\n' + f'{err!r}\n' + ) + raise err + # box and ship RPC errors for wire-transit via # the task's requesting parent IPC-channel. except ( @@ -327,10 +336,15 @@ async def _errors_relayed_via_ipc( # recovery logic - the only case is some kind of # strange bug in our transport layer itself? Going # to keep this open ended for now. - log.debug( - 'RPC task crashed, attempting to enter debugger\n' - f'|_{ctx}' - ) + + if _state.debug_mode(): + log.exception( + f'RPC task crashed!\n' + f'Attempting to enter debugger\n' + f'\n' + f'{ctx}' + ) + entered_debug = await debug._maybe_enter_pm( err, api_frame=inspect.currentframe(), @@ -419,7 +433,7 @@ async def _errors_relayed_via_ipc( # cancel scope will not have been inserted yet if is_rpc: log.warning( - 'RPC task likely errored or cancelled before start?\n' + 'RPC task likely crashed or cancelled before start?\n' f'|_{ctx._task}\n' f' >> {ctx.repr_rpc}\n' ) @@ -862,9 +876,9 @@ async def _invoke( ) logmeth( - f'{message}\n' + f'{message}' f'\n' - f'{descr_str}\n' + f'{descr_str}' ) @@ -900,6 +914,11 @@ async def try_ship_error_to_remote( # XXX NOTE XXX in SC terms this is one of the worst things # that can happen and provides for a 2-general's dilemma.. + # + # FURHTER, we should never really have to handle these + # lowlevel excs from `trio` since the `Channel.send()` layers + # downward should be mostly wrapping such cases in a + # tpt-closed; the `.critical()` usage is warranted. except ( trio.ClosedResourceError, trio.BrokenResourceError, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index c6ff9e4c..f77c69c1 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -183,6 +183,14 @@ class Actor: def is_registrar(self) -> bool: return self.is_arbiter + @property + def is_root(self) -> bool: + ''' + This actor is the parent most in the tree? + + ''' + return _state.is_root_process() + msg_buffer_size: int = 2**6 # nursery placeholders filled in by `async_main()`, diff --git a/tractor/_streaming.py b/tractor/_streaming.py index fb870f1c..56e0607a 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -38,6 +38,7 @@ import trio from ._exceptions import ( ContextCancelled, RemoteActorError, + TransportClosed, ) from .log import get_logger from .trionics import ( @@ -409,10 +410,8 @@ class MsgStream(trio.abc.Channel): # it). with trio.CancelScope(shield=True): await self._ctx.send_stop() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError + TransportClosed, ) as re: # the underlying channel may already have been pulled # in which case our stop message is meaningless since @@ -593,9 +592,8 @@ class MsgStream(trio.abc.Channel): ), ) except ( - trio.ClosedResourceError, - trio.BrokenResourceError, BrokenPipeError, + TransportClosed, ) as _trans_err: trans_err = _trans_err if ( diff --git a/tractor/devx/debug/_trace.py b/tractor/devx/debug/_trace.py index c1219c30..84608671 100644 --- a/tractor/devx/debug/_trace.py +++ b/tractor/devx/debug/_trace.py @@ -1257,3 +1257,26 @@ async def breakpoint( api_frame=inspect.currentframe(), **kwargs, ) + + +async def maybe_pause_bp(): + ''' + Internal (ONLY for now) `breakpoint()`-er fn which only tries to + use the multi-actor `.pause()` API when the current actor is the + root. + + ?! BUT WHY !? + ------- + + This is useful when debugging cases where the tpt layer breaks + (or is intentionally broken, say during resiliency testing) in + the case where a child can no longer contact the root process to + acquire the process-tree-singleton TTY lock. + + ''' + import tractor + actor = tractor.current_actor() + if actor.aid.name == 'root': + await tractor.pause(shield=True) + else: + tractor.devx.mk_pdb().set_trace() diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 64973fd4..9d109f3f 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -307,7 +307,12 @@ class Channel: ) -> None: ''' - Send a coded msg-blob over the transport. + Send a coded msg-blob over the underlying IPC transport. + + This fn raises `TransportClosed` on comms failures and is + normally handled by higher level runtime machinery for the + expected-graceful cases, normally ephemercal + (re/dis)connects. ''' __tracebackhide__: bool = hide_tb @@ -334,9 +339,10 @@ class Channel: except KeyError: raise err case TransportClosed(): + src_exc_str: str = err.repr_src_exc() log.transport( - f'Transport stream closed due to\n' - f'{err.repr_src_exc()}\n' + f'Transport stream closed due to,\n' + f'{src_exc_str}' ) case _: @@ -345,6 +351,11 @@ class Channel: raise async def recv(self) -> Any: + ''' + Receive the latest (queued) msg-blob from the underlying IPC + transport. + + ''' assert self._transport return await self._transport.recv() @@ -418,16 +429,18 @@ class Channel: self ) -> AsyncGenerator[Any, None]: ''' - Yield `MsgType` IPC msgs decoded and deliverd from - an underlying `MsgTransport` protocol. + Yield `MsgType` IPC msgs decoded and deliverd from an + underlying `MsgTransport` protocol. - This is a streaming routine alo implemented as an async-gen - func (same a `MsgTransport._iter_pkts()`) gets allocated by - a `.__call__()` inside `.__init__()` where it is assigned to - the `._aiter_msgs` attr. + This is a streaming routine alo implemented as an + async-generator func (same a `MsgTransport._iter_pkts()`) + gets allocated by a `.__call__()` inside `.__init__()` where + it is assigned to the `._aiter_msgs` attr. ''' - assert self._transport + if not self._transport: + raise RuntimeError('No IPC transport initialized!?') + while True: try: async for msg in self._transport: @@ -462,7 +475,15 @@ class Channel: # continue def connected(self) -> bool: - return self._transport.connected() if self._transport else False + ''' + Predicate whether underlying IPC tpt is connected. + + ''' + return ( + self._transport.connected() + if self._transport + else False + ) async def _do_handshake( self, @@ -493,8 +514,11 @@ async def _connect_chan( addr: UnwrappedAddress ) -> typing.AsyncGenerator[Channel, None]: ''' - Create and connect a channel with disconnect on context manager - teardown. + Create and connect a `Channel` to the provided `addr`, disconnect + it on cm exit. + + NOTE, this is a lowlevel, normally internal-only iface. You + should likely use `.open_portal()` instead. ''' chan = await Channel.from_addr(addr) diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 704c13a6..5078ae7d 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -154,7 +154,6 @@ class MsgTransport(Protocol): # ... - class MsgpackTransport(MsgTransport): # TODO: better naming for this? @@ -278,14 +277,18 @@ class MsgpackTransport(MsgTransport): except trio.ClosedResourceError as cre: closure_err = cre + # await tractor.devx._trace.maybe_pause_bp() + raise TransportClosed( message=( - f'{tpt_name} was already closed locally ?\n' + f'{tpt_name} was already closed locally?' ), src_exc=closure_err, loglevel='error', raise_on_report=( - 'another task closed this fd' in closure_err.args + 'another task closed this fd' + in + closure_err.args ), ) from closure_err @@ -435,6 +438,11 @@ class MsgpackTransport(MsgTransport): trans_err = _re tpt_name: str = f'{type(self).__name__!r}' + trans_err_msg: str = trans_err.args[0] + by_whom: str = { + 'another task closed this fd': 'locally', + 'this socket was already closed': 'by peer', + }.get(trans_err_msg) match trans_err: # XXX, specifc to UDS transport and its, @@ -446,38 +454,42 @@ class MsgpackTransport(MsgTransport): case trio.BrokenResourceError() if ( '[Errno 32] Broken pipe' in - trans_err.args[0] + trans_err_msg ): tpt_closed = TransportClosed.from_src_exc( message=( f'{tpt_name} already closed by peer\n' ), - body=f'{self}\n', + body=f'{self}', src_exc=trans_err, raise_on_report=True, loglevel='transport', ) raise tpt_closed from trans_err - # case trio.ClosedResourceError() if ( - # 'this socket was already closed' - # in - # trans_err.args[0] - # ): - # tpt_closed = TransportClosed.from_src_exc( - # message=( - # f'{tpt_name} already closed by peer\n' - # ), - # body=f'{self}\n', - # src_exc=trans_err, - # raise_on_report=True, - # loglevel='transport', - # ) - # raise tpt_closed from trans_err + # ??TODO??, what case in piker does this and HOW + # CAN WE RE-PRODUCE IT?!?!? + case trio.ClosedResourceError() if ( + by_whom + ): + tpt_closed = TransportClosed.from_src_exc( + message=( + f'{tpt_name} was already closed {by_whom!r}?\n' + ), + body=f'{self}', + src_exc=trans_err, + raise_on_report=True, + loglevel='transport', + ) - # unless the disconnect condition falls under "a - # normal operation breakage" we usualy console warn - # about it. + # await tractor.devx._trace.maybe_pause_bp() + raise tpt_closed from trans_err + + # XXX, unless the disconnect condition falls + # under "a normal/expected operating breakage" + # (per the `trans_err_msg` guards in the cases + # above) we usualy console-error about it and + # raise-thru. about it. case _: log.exception( f'{tpt_name} layer failed pre-send ??\n'