diff --git a/tractor/_context.py b/tractor/_context.py index a31c3b1..7a56215 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -1198,8 +1198,12 @@ class Context: # TODO: replace all the instances of this!! XD def maybe_raise( self, + + hide_tb: bool = True, **kwargs, + ) -> Exception|None: + __tracebackhide__: bool = hide_tb if re := self._remote_error: return self._maybe_raise_remote_err( re, @@ -1209,8 +1213,10 @@ class Context: def _maybe_raise_remote_err( self, remote_error: Exception, + raise_ctxc_from_self_call: bool = False, raise_overrun_from_self: bool = True, + hide_tb: bool = True, ) -> ( ContextCancelled # `.cancel()` request to far side @@ -1222,6 +1228,7 @@ class Context: a cancellation (if any). ''' + __tracebackhide__: bool = hide_tb our_uid: tuple = self.chan.uid # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption @@ -1305,7 +1312,7 @@ class Context: # TODO: change to `.wait_for_result()`? async def result( self, - hide_tb: bool = False, + hide_tb: bool = True, ) -> Any|Exception: ''' diff --git a/tractor/_ipc.py b/tractor/_ipc.py index b108c90..f57d3bd 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -19,13 +19,14 @@ Inter-process comms abstractions """ from __future__ import annotations -import struct -import platform -from pprint import pformat from collections.abc import ( AsyncGenerator, AsyncIterator, ) +from contextlib import asynccontextmanager as acm +import platform +from pprint import pformat +import struct import typing from typing import ( Any, @@ -35,18 +36,16 @@ from typing import ( TypeVar, ) -from tricycle import BufferedReceiveStream import msgspec +from tricycle import BufferedReceiveStream import trio -from async_generator import asynccontextmanager -from .log import get_logger -from ._exceptions import TransportClosed +from tractor.log import get_logger +from tractor._exceptions import TransportClosed + log = get_logger(__name__) - _is_windows = platform.system() == 'Windows' -log = get_logger(__name__) def get_stream_addrs(stream: trio.SocketStream) -> tuple: @@ -206,7 +205,17 @@ class MsgpackTCPStream(MsgTransport): else: raise - async def send(self, msg: Any) -> None: + async def send( + self, + msg: Any, + + # hide_tb: bool = False, + ) -> None: + ''' + Send a msgpack coded blob-as-msg over TCP. + + ''' + # __tracebackhide__: bool = hide_tb async with self._send_lock: bytes_data: bytes = self.encode(msg) @@ -388,15 +397,28 @@ class Channel: ) return transport - async def send(self, item: Any) -> None: + async def send( + self, + payload: Any, + # hide_tb: bool = False, + + ) -> None: + ''' + Send a coded msg-blob over the transport. + + ''' + # __tracebackhide__: bool = hide_tb log.transport( '=> send IPC msg:\n\n' - f'{pformat(item)}\n' + f'{pformat(payload)}\n' ) # type: ignore assert self._transport - await self._transport.send(item) + await self._transport.send( + payload, + # hide_tb=hide_tb, + ) async def recv(self) -> Any: assert self._transport @@ -493,7 +515,7 @@ class Channel: return self._transport.connected() if self._transport else False -@asynccontextmanager +@acm async def _connect_chan( host: str, port: int diff --git a/tractor/_portal.py b/tractor/_portal.py index 7ac5711..5e64943 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -465,7 +465,7 @@ class Portal: # TODO: if we set this the wrapping `@acm` body will # still be shown (awkwardly) on pdb REPL entry. Ideally # we can similarly annotate that frame to NOT show? - hide_tb: bool = False, + hide_tb: bool = True, # proxied to RPC **kwargs, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 307dacd..587d636 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -315,38 +315,19 @@ async def _errors_relayed_via_ipc( if not entered_debug: log.exception('Actor crashed:\n') - # always ship errors back to caller - err_msg: dict[str, dict] = pack_error( - err, - # tb=tb, # TODO: special tb fmting? - cid=ctx.cid, - ) - - # NOTE: the src actor should always be packed into the - # error.. but how should we verify this? - # assert err_msg['src_actor_uid'] - # if not err_msg['error'].get('src_actor_uid'): - # import pdbp; pdbp.set_trace() - + # always (try to) ship RPC errors back to caller if is_rpc: - try: - await chan.send(err_msg) - + # # TODO: tests for this scenario: # - RPC caller closes connection before getting a response # should **not** crash this actor.. - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - BrokenPipeError, - ) as ipc_err: - - # if we can't propagate the error that's a big boo boo - log.exception( - f"Failed to ship error to caller @ {chan.uid} !?\n" - f'{ipc_err}' - - ) + await try_ship_error_to_remote( + chan, + err, + cid=ctx.cid, + remote_descr='caller', + hide_tb=hide_tb, + ) # error is probably from above coro running code *not from # the target rpc invocation since a scope was never @@ -719,9 +700,13 @@ def _get_mod_abspath(module: ModuleType) -> str: return os.path.abspath(module.__file__) -async def try_ship_error_to_parent( +async def try_ship_error_to_remote( channel: Channel, - err: Exception | BaseExceptionGroup, + err: Exception|BaseExceptionGroup, + + cid: str|None = None, + remote_descr: str = 'parent', + hide_tb: bool = True, ) -> None: ''' @@ -730,22 +715,39 @@ async def try_ship_error_to_parent( local cancellation ignored but logged as critical(ly bad). ''' + __tracebackhide__: bool = hide_tb with CancelScope(shield=True): try: - await channel.send( - # NOTE: normally only used for internal runtime errors - # so ship to peer actor without a cid. - pack_error(err) + # NOTE: normally only used for internal runtime errors + # so ship to peer actor without a cid. + msg: dict = pack_error( + err, + cid=cid, + + # TODO: special tb fmting for ctxc cases? + # tb=tb, ) + # NOTE: the src actor should always be packed into the + # error.. but how should we verify this? + # actor: Actor = _state.current_actor() + # assert err_msg['src_actor_uid'] + # if not err_msg['error'].get('src_actor_uid'): + # import pdbp; pdbp.set_trace() + await channel.send(msg) + + # XXX NOTE XXX in SC terms this is one of the worst things + # that can happen and provides for a 2-general's dilemma.. except ( trio.ClosedResourceError, trio.BrokenResourceError, + BrokenPipeError, ): - # in SC terms this is one of the worst things that can - # happen and provides for a 2-general's dilemma.. + err_msg: dict = msg['error']['tb_str'] log.critical( - f'Failed to ship error to parent ' - f'{channel.uid}, IPC transport failure!' + 'IPC transport failure -> ' + f'failed to ship error to {remote_descr}!\n\n' + f'X=> {channel.uid}\n\n' + f'{err_msg}\n' ) @@ -954,7 +956,10 @@ class Actor: log.runtime(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] - def load_modules(self) -> None: + def load_modules( + self, + debug_mode: bool = False, + ) -> None: ''' Load allowed RPC modules locally (after fork). @@ -986,7 +991,9 @@ class Actor: except ModuleNotFoundError: # it is expected the corresponding `ModuleNotExposed` error # will be raised later - log.error(f"Failed to import {modpath} in {self.name}") + log.error( + f"Failed to import {modpath} in {self.name}" + ) raise def _get_rpc_func(self, ns, funcname): @@ -1836,7 +1843,7 @@ class Actor: log.cancel( 'Cancel request for RPC task\n\n' - f'<= Actor.cancel_task(): {requesting_uid}\n\n' + f'<= Actor._cancel_task(): {requesting_uid}\n\n' f'=> {ctx._task}\n' f' |_ >> {ctx.repr_rpc}\n' # f' >> Actor._cancel_task() => {ctx._task}\n' @@ -2117,11 +2124,6 @@ async def async_main( ): accept_addrs = set_accept_addr_says_rent - # load exposed/allowed RPC modules - # XXX: do this **after** establishing a channel to the parent - # but **before** starting the message loop for that channel - # such that import errors are properly propagated upwards - actor.load_modules() # The "root" nursery ensures the channel with the immediate # parent is kept alive as a resilient service until @@ -2139,6 +2141,24 @@ async def async_main( actor._service_n = service_nursery assert actor._service_n + # load exposed/allowed RPC modules + # XXX: do this **after** establishing a channel to the parent + # but **before** starting the message loop for that channel + # such that import errors are properly propagated upwards + actor.load_modules() + + # XXX TODO XXX: figuring out debugging of this + # would somemwhat guarantee "self-hosted" runtime + # debugging (since it hits all the ede cases?) + # + # `tractor.pause()` right? + # try: + # actor.load_modules() + # except ModuleNotFoundError as err: + # _debug.pause_from_sync() + # import pdbp; pdbp.set_trace() + # raise + # Startup up the transport(-channel) server with, # - subactor: the bind address is sent by our parent # over our established channel @@ -2258,7 +2278,7 @@ async def async_main( ) if actor._parent_chan: - await try_ship_error_to_parent( + await try_ship_error_to_remote( actor._parent_chan, err, ) @@ -2674,7 +2694,7 @@ async def process_messages( log.exception("Actor errored:") if actor._parent_chan: - await try_ship_error_to_parent( + await try_ship_error_to_remote( actor._parent_chan, err, ) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 5268b25..e23d70f 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -215,7 +215,7 @@ async def cancel_on_completion( async def hard_kill( proc: trio.Process, - terminate_after: int = 3, + terminate_after: int = 1.6, # NOTE: for mucking with `.pause()`-ing inside the runtime # whilst also hacking on it XD @@ -281,8 +281,11 @@ async def hard_kill( # zombies (as a feature) we ask the OS to do send in the # removal swad as the last resort. if cs.cancelled_caught: + # TODO: toss in the skynet-logo face as ascii art? log.critical( - 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' + # 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' + '#T-800 deployed to collect zombie B0\n' + f'|\n' f'|_{proc}\n' ) proc.kill() diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 50a32ae..149bb35 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -114,13 +114,19 @@ class MsgStream(trio.abc.Channel): stream=self, ) - async def receive(self): + async def receive( + self, + + hide_tb: bool = True, + ): ''' Receive a single msg from the IPC transport, the next in sequence sent by the far end task (possibly in order as determined by the underlying protocol). ''' + __tracebackhide__: bool = hide_tb + # NOTE: `trio.ReceiveChannel` implements # EOC handling as follows (aka uses it # to gracefully exit async for loops): @@ -139,7 +145,7 @@ class MsgStream(trio.abc.Channel): if self._closed: raise self._closed - src_err: Exception|None = None + src_err: Exception|None = None # orig tb try: try: msg = await self._rx_chan.receive() @@ -186,7 +192,7 @@ class MsgStream(trio.abc.Channel): # TODO: Locally, we want to close this stream gracefully, by # terminating any local consumers tasks deterministically. - # One we have broadcast support, we **don't** want to be + # Once we have broadcast support, we **don't** want to be # closing this stream and not flushing a final value to # remaining (clone) consumers who may not have been # scheduled to receive it yet. @@ -237,7 +243,12 @@ class MsgStream(trio.abc.Channel): raise_ctxc_from_self_call=True, ) - raise src_err # propagate + # propagate any error but hide low-level frames from + # caller by default. + if hide_tb: + raise type(src_err)(*src_err.args) from src_err + else: + raise src_err async def aclose(self) -> list[Exception|dict]: ''' @@ -475,23 +486,39 @@ class MsgStream(trio.abc.Channel): async def send( self, - data: Any + data: Any, + + hide_tb: bool = True, ) -> None: ''' Send a message over this stream to the far end. ''' - if self._ctx._remote_error: - raise self._ctx._remote_error # from None + __tracebackhide__: bool = hide_tb + self._ctx.maybe_raise() if self._closed: raise self._closed - # raise trio.ClosedResourceError('This stream was already closed') - await self._ctx.chan.send({ - 'yield': data, - 'cid': self._ctx.cid, - }) + try: + await self._ctx.chan.send( + payload={ + 'yield': data, + 'cid': self._ctx.cid, + }, + # hide_tb=hide_tb, + ) + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + BrokenPipeError, + ) as trans_err: + if hide_tb: + raise type(trans_err)( + *trans_err.args + ) from trans_err + else: + raise def stream(func: Callable) -> Callable: