diff --git a/tractor/_context.py b/tractor/_context.py index a31c3b1b..7a562155 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -1198,8 +1198,12 @@ class Context: # TODO: replace all the instances of this!! XD def maybe_raise( self, + + hide_tb: bool = True, **kwargs, + ) -> Exception|None: + __tracebackhide__: bool = hide_tb if re := self._remote_error: return self._maybe_raise_remote_err( re, @@ -1209,8 +1213,10 @@ class Context: def _maybe_raise_remote_err( self, remote_error: Exception, + raise_ctxc_from_self_call: bool = False, raise_overrun_from_self: bool = True, + hide_tb: bool = True, ) -> ( ContextCancelled # `.cancel()` request to far side @@ -1222,6 +1228,7 @@ class Context: a cancellation (if any). ''' + __tracebackhide__: bool = hide_tb our_uid: tuple = self.chan.uid # XXX NOTE XXX: `ContextCancelled`/`StreamOverrun` absorption @@ -1305,7 +1312,7 @@ class Context: # TODO: change to `.wait_for_result()`? async def result( self, - hide_tb: bool = False, + hide_tb: bool = True, ) -> Any|Exception: ''' diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 33110c04..e80a1c35 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -19,13 +19,14 @@ Inter-process comms abstractions """ from __future__ import annotations -import struct -import platform -from pprint import pformat from collections.abc import ( AsyncGenerator, AsyncIterator, ) +from contextlib import asynccontextmanager as acm +import platform +from pprint import pformat +import struct import typing from typing import ( Any, @@ -35,18 +36,16 @@ from typing import ( TypeVar, ) -from tricycle import BufferedReceiveStream import msgspec +from tricycle import BufferedReceiveStream import trio -from async_generator import asynccontextmanager -from .log import get_logger -from ._exceptions import TransportClosed +from tractor.log import get_logger +from tractor._exceptions import TransportClosed + log = get_logger(__name__) - _is_windows = platform.system() == 'Windows' -log = get_logger(__name__) def get_stream_addrs(stream: trio.SocketStream) -> tuple: @@ -206,7 +205,17 @@ class MsgpackTCPStream(MsgTransport): else: raise - async def send(self, msg: Any) -> None: + async def send( + self, + msg: Any, + + # hide_tb: bool = False, + ) -> None: + ''' + Send a msgpack coded blob-as-msg over TCP. + + ''' + # __tracebackhide__: bool = hide_tb async with self._send_lock: bytes_data: bytes = self.encode(msg) @@ -388,15 +397,28 @@ class Channel: ) return transport - async def send(self, item: Any) -> None: + async def send( + self, + payload: Any, + # hide_tb: bool = False, + + ) -> None: + ''' + Send a coded msg-blob over the transport. + + ''' + # __tracebackhide__: bool = hide_tb log.transport( '=> send IPC msg:\n\n' - f'{pformat(item)}\n' + f'{pformat(payload)}\n' ) # type: ignore assert self._transport - await self._transport.send(item) + await self._transport.send( + payload, + # hide_tb=hide_tb, + ) async def recv(self) -> Any: assert self._transport @@ -493,7 +515,7 @@ class Channel: return self._transport.connected() if self._transport else False -@asynccontextmanager +@acm async def _connect_chan( host: str, port: int ) -> typing.AsyncGenerator[Channel, None]: diff --git a/tractor/_portal.py b/tractor/_portal.py index c9f314f3..bd6cf860 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -465,7 +465,7 @@ class Portal: # TODO: if we set this the wrapping `@acm` body will # still be shown (awkwardly) on pdb REPL entry. Ideally # we can similarly annotate that frame to NOT show? - hide_tb: bool = False, + hide_tb: bool = True, # proxied to RPC **kwargs, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index a06b5948..c034bd86 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -1,423 +1,4 @@ -# tractor: structured concurrent "actors". -# Copyright 2018-eternity Tyler Goodlet. - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -""" -The fundamental core machinery implementing every "actor" including -the process-local (python-interpreter global) `Actor` state-type -primitive(s), RPC-in-task scheduling, and IPC connectivity and -low-level transport msg handling. - -""" -from __future__ import annotations -from contextlib import ( - ExitStack, - asynccontextmanager as acm, -) -from collections import defaultdict -from functools import partial -from itertools import chain -import importlib -import importlib.util -import inspect -from pprint import pformat -import signal -import sys -from typing import ( - Any, - Callable, - Union, - Coroutine, - TYPE_CHECKING, -) -import uuid -from types import ModuleType -import os -import warnings - -from async_generator import aclosing -from exceptiongroup import BaseExceptionGroup -import trio -from trio import ( - CancelScope, -) -from trio_typing import ( - Nursery, - TaskStatus, -) - -from .msg import NamespacePath -from ._ipc import Channel -from ._context import ( - mk_context, - Context, -) -from .log import get_logger -from ._exceptions import ( - pack_error, - unpack_error, - ModuleNotExposed, - is_multi_cancelled, - ContextCancelled, - TransportClosed, -) -from . import _debug -from ._discovery import get_arbiter -from ._portal import Portal -from . import _state -from . import _mp_fixup_main - - -if TYPE_CHECKING: - from ._supervise import ActorNursery - - -log = get_logger('tractor') - -_gb_mod: ModuleType|None|False = None - - -async def maybe_import_gb(): - global _gb_mod - if _gb_mod is False: - return - - try: - import greenback - _gb_mod = greenback - await greenback.ensure_portal() - - except ModuleNotFoundError: - log.debug( - '`greenback` is not installed.\n' - 'No sync debug support!\n' - ) - _gb_mod = False - - -async def _invoke_non_context( - actor: Actor, - cancel_scope: CancelScope, - ctx: Context, - cid: str, - chan: Channel, - func: Callable, - coro: Coroutine, - kwargs: dict[str, Any], - - treat_as_gen: bool, - is_rpc: bool, - - task_status: TaskStatus[ - Context | BaseException - ] = trio.TASK_STATUS_IGNORED, -): - - # TODO: can we unify this with the `context=True` impl below? - if inspect.isasyncgen(coro): - await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: massive gotcha! If the containing scope - # is cancelled and we execute the below line, - # any ``ActorNursery.__aexit__()`` WON'T be - # triggered in the underlying async gen! So we - # have to properly handle the closing (aclosing) - # of the async gen in order to be sure the cancel - # is propagated! - with cancel_scope as cs: - ctx._scope = cs - task_status.started(ctx) - async with aclosing(coro) as agen: - async for item in agen: - # TODO: can we send values back in here? - # it's gonna require a `while True:` and - # some non-blocking way to retrieve new `asend()` - # values from the channel: - # to_send = await chan.recv_nowait() - # if to_send is not None: - # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) - - log.runtime(f"Finished iterating {coro}") - # TODO: we should really support a proper - # `StopAsyncIteration` system here for returning a final - # value if desired - await chan.send({'stop': True, 'cid': cid}) - - # one way @stream func that gets treated like an async gen - # TODO: can we unify this with the `context=True` impl below? - elif treat_as_gen: - await chan.send({'functype': 'asyncgen', 'cid': cid}) - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above - with cancel_scope as cs: - ctx._scope = cs - task_status.started(ctx) - await coro - - if not cs.cancelled_caught: - # task was not cancelled so we can instruct the - # far end async gen to tear down - await chan.send({'stop': True, 'cid': cid}) - else: - # regular async function/method - # XXX: possibly just a scheduled `Actor._cancel_task()` - # from a remote request to cancel some `Context`. - # ------ - ------ - # TODO: ideally we unify this with the above `context=True` - # block such that for any remote invocation ftype, we - # always invoke the far end RPC task scheduling the same - # way: using the linked IPC context machinery. - failed_resp: bool = False - try: - await chan.send({ - 'functype': 'asyncfunc', - 'cid': cid - }) - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - BrokenPipeError, - ) as ipc_err: - failed_resp = True - if is_rpc: - raise - else: - # TODO: should this be an `.exception()` call? - log.warning( - f'Failed to respond to non-rpc request: {func}\n' - f'{ipc_err}' - ) - - with cancel_scope as cs: - ctx._scope: CancelScope = cs - task_status.started(ctx) - result = await coro - fname: str = func.__name__ - log.runtime( - 'RPC complete:\n' - f'task: {ctx._task}\n' - f'|_cid={ctx.cid}\n' - f'|_{fname}() -> {pformat(result)}\n' - ) - - # NOTE: only send result if we know IPC isn't down - if ( - not failed_resp - and chan.connected() - ): - try: - await chan.send( - {'return': result, - 'cid': cid} - ) - except ( - BrokenPipeError, - trio.BrokenResourceError, - ): - log.warning( - 'Failed to return result:\n' - f'{func}@{actor.uid}\n' - f'remote chan: {chan.uid}' - ) - -@acm -async def _errors_relayed_via_ipc( - actor: Actor, - chan: Channel, - ctx: Context, - is_rpc: bool, - - hide_tb: bool = False, - debug_kbis: bool = False, - task_status: TaskStatus[ - Context | BaseException - ] = trio.TASK_STATUS_IGNORED, - -) -> None: - __tracebackhide__: bool = hide_tb # TODO: use hide_tb here? - try: - yield # run RPC invoke body - - # box and ship RPC errors for wire-transit via - # the task's requesting parent IPC-channel. - except ( - Exception, - BaseExceptionGroup, - KeyboardInterrupt, - ) as err: - - # always hide this frame from debug REPL if the crash - # originated from an rpc task and we DID NOT fail due to - # an IPC transport error! - if ( - is_rpc - and chan.connected() - ): - __tracebackhide__: bool = hide_tb - - if not is_multi_cancelled(err): - - # TODO: maybe we'll want different "levels" of debugging - # eventualy such as ('app', 'supervisory', 'runtime') ? - - # if not isinstance(err, trio.ClosedResourceError) and ( - # if not is_multi_cancelled(err) and ( - - entered_debug: bool = False - if ( - ( - not isinstance(err, ContextCancelled) - or ( - isinstance(err, ContextCancelled) - and ctx._cancel_called - - # if the root blocks the debugger lock request from a child - # we will get a remote-cancelled condition. - and ctx._enter_debugger_on_cancel - ) - ) - and - ( - not isinstance(err, KeyboardInterrupt) - or ( - isinstance(err, KeyboardInterrupt) - and debug_kbis - ) - ) - ): - # await _debug.pause() - # XXX QUESTION XXX: is there any case where we'll - # want to debug IPC disconnects as a default? - # => I can't think of a reason that inspecting this - # type of failure will be useful for respawns or - # recovery logic - the only case is some kind of - # strange bug in our transport layer itself? Going - # to keep this open ended for now. - entered_debug = await _debug._maybe_enter_pm(err) - - if not entered_debug: - log.exception('Actor crashed:\n') - - # always ship errors back to caller - err_msg: dict[str, dict] = pack_error( - err, - # tb=tb, # TODO: special tb fmting? - cid=ctx.cid, - ) - - # NOTE: the src actor should always be packed into the - # error.. but how should we verify this? - # assert err_msg['src_actor_uid'] - # if not err_msg['error'].get('src_actor_uid'): - # import pdbp; pdbp.set_trace() - - if is_rpc: - try: - await chan.send(err_msg) - - # TODO: tests for this scenario: - # - RPC caller closes connection before getting a response - # should **not** crash this actor.. - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - BrokenPipeError, - ) as ipc_err: - - # if we can't propagate the error that's a big boo boo - log.exception( - f"Failed to ship error to caller @ {chan.uid} !?\n" - f'{ipc_err}' - - ) - - # error is probably from above coro running code *not from - # the target rpc invocation since a scope was never - # allocated around the coroutine await. - if ctx._scope is None: - # we don't ever raise directly here to allow the - # msg-loop-scheduler to continue running for this - # channel. - task_status.started(err) - - # always reraise KBIs so they propagate at the sys-process - # level. - if isinstance(err, KeyboardInterrupt): - raise - - - # RPC task bookeeping - finally: - try: - ctx, func, is_complete = actor._rpc_tasks.pop( - (chan, ctx.cid) - ) - is_complete.set() - - except KeyError: - if is_rpc: - # If we're cancelled before the task returns then the - # cancel scope will not have been inserted yet - log.warning( - 'RPC task likely errored or cancelled before start?' - f'|_{ctx._task}\n' - f' >> {ctx.repr_rpc}\n' - ) - else: - log.cancel( - 'Failed to de-alloc internal runtime cancel task?\n' - f'|_{ctx._task}\n' - f' >> {ctx.repr_rpc}\n' - ) - - finally: - if not actor._rpc_tasks: - log.runtime("All RPC tasks have completed") - actor._ongoing_rpc_tasks.set() - - -async def _invoke( - - actor: 'Actor', - cid: str, - chan: Channel, - func: Callable, - kwargs: dict[str, Any], - - is_rpc: bool = True, - hide_tb: bool = True, - - task_status: TaskStatus[ - Union[Context, BaseException] - ] = trio.TASK_STATUS_IGNORED, -): - ''' - Schedule a `trio` task-as-func and deliver result(s) over - connected IPC channel. - - This is the core "RPC" `trio.Task` scheduling machinery used to start every - remotely invoked function, normally in `Actor._service_n: Nursery`. - - ''' - __tracebackhide__: bool = hide_tb - treat_as_gen: bool = False - - # possibly a traceback (not sure what typing is for this..) - tb = None +tb = None cancel_scope = CancelScope() # activated cancel scope ref @@ -712,9 +293,13 @@ def _get_mod_abspath(module): return os.path.abspath(module.__file__) -async def try_ship_error_to_parent( +async def try_ship_error_to_remote( channel: Channel, - err: Union[Exception, BaseExceptionGroup], + err: Exception|BaseExceptionGroup, + + cid: str|None = None, + remote_descr: str = 'parent', + hide_tb: bool = True, ) -> None: ''' @@ -723,22 +308,39 @@ async def try_ship_error_to_parent( local cancellation ignored but logged as critical(ly bad). ''' + __tracebackhide__: bool = hide_tb with CancelScope(shield=True): try: - await channel.send( - # NOTE: normally only used for internal runtime errors - # so ship to peer actor without a cid. - pack_error(err) + # NOTE: normally only used for internal runtime errors + # so ship to peer actor without a cid. + msg: dict = pack_error( + err, + cid=cid, + + # TODO: special tb fmting for ctxc cases? + # tb=tb, ) + # NOTE: the src actor should always be packed into the + # error.. but how should we verify this? + # actor: Actor = _state.current_actor() + # assert err_msg['src_actor_uid'] + # if not err_msg['error'].get('src_actor_uid'): + # import pdbp; pdbp.set_trace() + await channel.send(msg) + + # XXX NOTE XXX in SC terms this is one of the worst things + # that can happen and provides for a 2-general's dilemma.. except ( trio.ClosedResourceError, trio.BrokenResourceError, + BrokenPipeError, ): - # in SC terms this is one of the worst things that can - # happen and provides for a 2-general's dilemma.. + err_msg: dict = msg['error']['tb_str'] log.critical( - f'Failed to ship error to parent ' - f'{channel.uid}, IPC transport failure!' + 'IPC transport failure -> ' + f'failed to ship error to {remote_descr}!\n\n' + f'X=> {channel.uid}\n\n' + f'{err_msg}\n' ) @@ -896,7 +498,10 @@ class Actor: log.runtime(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] - def load_modules(self) -> None: + def load_modules( + self, + debug_mode: bool = False, + ) -> None: ''' Load allowed RPC modules locally (after fork). @@ -928,7 +533,9 @@ class Actor: except ModuleNotFoundError: # it is expected the corresponding `ModuleNotExposed` error # will be raised later - log.error(f"Failed to import {modpath} in {self.name}") + log.error( + f"Failed to import {modpath} in {self.name}" + ) raise def _get_rpc_func(self, ns, funcname): @@ -1759,7 +1366,7 @@ class Actor: log.cancel( 'Cancel request for RPC task\n\n' - f'<= Actor.cancel_task(): {requesting_uid}\n\n' + f'<= Actor._cancel_task(): {requesting_uid}\n\n' f'=> {ctx._task}\n' f' |_ >> {ctx.repr_rpc}\n' # f' >> Actor._cancel_task() => {ctx._task}\n' @@ -2021,11 +1628,6 @@ async def async_main( if accept_addr_rent is not None: accept_addr = accept_addr_rent - # load exposed/allowed RPC modules - # XXX: do this **after** establishing a channel to the parent - # but **before** starting the message loop for that channel - # such that import errors are properly propagated upwards - actor.load_modules() # The "root" nursery ensures the channel with the immediate # parent is kept alive as a resilient service until @@ -2043,7 +1645,25 @@ async def async_main( actor._service_n = service_nursery assert actor._service_n - # Startup up the channel server with, + # load exposed/allowed RPC modules + # XXX: do this **after** establishing a channel to the parent + # but **before** starting the message loop for that channel + # such that import errors are properly propagated upwards + actor.load_modules() + + # XXX TODO XXX: figuring out debugging of this + # would somemwhat guarantee "self-hosted" runtime + # debugging (since it hits all the ede cases?) + # + # `tractor.pause()` right? + # try: + # actor.load_modules() + # except ModuleNotFoundError as err: + # _debug.pause_from_sync() + # import pdbp; pdbp.set_trace() + # raise + + # Startup up the transport(-channel) server with, # - subactor: the bind address is sent by our parent # over our established channel # - root actor: the ``accept_addr`` passed to this method @@ -2122,7 +1742,7 @@ async def async_main( ) if actor._parent_chan: - await try_ship_error_to_parent( + await try_ship_error_to_remote( actor._parent_chan, err, ) @@ -2532,7 +2152,7 @@ async def process_messages( log.exception("Actor errored:") if actor._parent_chan: - await try_ship_error_to_parent( + await try_ship_error_to_remote( actor._parent_chan, err, ) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 58c187f0..a2643876 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -215,7 +215,7 @@ async def cancel_on_completion( async def hard_kill( proc: trio.Process, - terminate_after: int = 3, + terminate_after: int = 1.6, # NOTE: for mucking with `.pause()`-ing inside the runtime # whilst also hacking on it XD @@ -281,8 +281,11 @@ async def hard_kill( # zombies (as a feature) we ask the OS to do send in the # removal swad as the last resort. if cs.cancelled_caught: + # TODO: toss in the skynet-logo face as ascii art? log.critical( - 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' + # 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' + '#T-800 deployed to collect zombie B0\n' + f'|\n' f'|_{proc}\n' ) proc.kill() diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 50a32ae9..149bb350 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -114,13 +114,19 @@ class MsgStream(trio.abc.Channel): stream=self, ) - async def receive(self): + async def receive( + self, + + hide_tb: bool = True, + ): ''' Receive a single msg from the IPC transport, the next in sequence sent by the far end task (possibly in order as determined by the underlying protocol). ''' + __tracebackhide__: bool = hide_tb + # NOTE: `trio.ReceiveChannel` implements # EOC handling as follows (aka uses it # to gracefully exit async for loops): @@ -139,7 +145,7 @@ class MsgStream(trio.abc.Channel): if self._closed: raise self._closed - src_err: Exception|None = None + src_err: Exception|None = None # orig tb try: try: msg = await self._rx_chan.receive() @@ -186,7 +192,7 @@ class MsgStream(trio.abc.Channel): # TODO: Locally, we want to close this stream gracefully, by # terminating any local consumers tasks deterministically. - # One we have broadcast support, we **don't** want to be + # Once we have broadcast support, we **don't** want to be # closing this stream and not flushing a final value to # remaining (clone) consumers who may not have been # scheduled to receive it yet. @@ -237,7 +243,12 @@ class MsgStream(trio.abc.Channel): raise_ctxc_from_self_call=True, ) - raise src_err # propagate + # propagate any error but hide low-level frames from + # caller by default. + if hide_tb: + raise type(src_err)(*src_err.args) from src_err + else: + raise src_err async def aclose(self) -> list[Exception|dict]: ''' @@ -475,23 +486,39 @@ class MsgStream(trio.abc.Channel): async def send( self, - data: Any + data: Any, + + hide_tb: bool = True, ) -> None: ''' Send a message over this stream to the far end. ''' - if self._ctx._remote_error: - raise self._ctx._remote_error # from None + __tracebackhide__: bool = hide_tb + self._ctx.maybe_raise() if self._closed: raise self._closed - # raise trio.ClosedResourceError('This stream was already closed') - await self._ctx.chan.send({ - 'yield': data, - 'cid': self._ctx.cid, - }) + try: + await self._ctx.chan.send( + payload={ + 'yield': data, + 'cid': self._ctx.cid, + }, + # hide_tb=hide_tb, + ) + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + BrokenPipeError, + ) as trans_err: + if hide_tb: + raise type(trans_err)( + *trans_err.args + ) from trans_err + else: + raise def stream(func: Callable) -> Callable: