diff --git a/tractor/_context.py b/tractor/_context.py index 3ed54d7..f333c9e 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -61,7 +61,6 @@ from ._exceptions import ( ) from .log import get_logger from .msg import ( - _codec, Error, MsgType, MsgCodec, @@ -103,7 +102,6 @@ class Unresolved: a final return value or raised error is resolved. ''' - ... # TODO: make this a .msg.types.Struct! @@ -116,19 +114,19 @@ class Context: NB: This class should **never be instatiated directly**, it is allocated by the runtime in 2 ways: - - by entering ``Portal.open_context()`` which is the primary - public API for any "caller" task or, + - by entering `Portal.open_context()` which is the primary + public API for any "parent" task or, - by the RPC machinery's `._rpc._invoke()` as a `ctx` arg - to a remotely scheduled "callee" function. + to a remotely scheduled "child" function. - AND is always constructed using the below ``mk_context()``. + AND is always constructed using the below `mk_context()`. Allows maintaining task or protocol specific state between 2 cancel-scope-linked, communicating and parallel executing `trio.Task`s. Contexts are allocated on each side of any task RPC-linked msg dialog, i.e. for every request to a remote actor from a `Portal`. On the "callee" side a context is - always allocated inside ``._rpc._invoke()``. + always allocated inside `._rpc._invoke()`. TODO: more detailed writeup on cancellation, error and streaming semantics.. @@ -262,7 +260,13 @@ class Context: _strict_started: bool = False _cancel_on_msgerr: bool = True - def __str__(self) -> str: + def pformat( + self, + extra_fields: dict[str, Any]|None = None, + # ^-TODO-^ some built-in extra state fields + # we'll want in some devx specific cases? + + ) -> str: ds: str = '=' # ds: str = ': ' @@ -279,11 +283,7 @@ class Context: outcome_str: str = self.repr_outcome( show_error_fields=True ) - outcome_typ_str: str = self.repr_outcome( - type_only=True - ) - - return ( + fmtstr: str = ( f'\n' ) # NOTE: making this return a value that can be passed to @@ -335,7 +345,8 @@ class Context: # logging perspective over `eval()`-ability since we do NOT # target serializing non-struct instances! # def __repr__(self) -> str: - __repr__ = __str__ + __str__ = pformat + __repr__ = pformat @property def cancel_called(self) -> bool: @@ -615,10 +626,10 @@ class Context: whom: str = ( 'us' if error.canceller == self._actor.uid - else 'peer' + else 'a remote peer (not us)' ) log.cancel( - f'IPC context cancelled by {whom}!\n\n' + f'IPC context was cancelled by {whom}!\n\n' f'{error}' ) @@ -626,7 +637,6 @@ class Context: msgerr = True log.error( f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n' - f'{error}\n' f'{pformat(self)}\n' ) @@ -696,24 +706,23 @@ class Context: else: message: str = 'NOT cancelling `Context._scope` !\n\n' - scope_info: str = 'No `self._scope: CancelScope` was set/used ?' + fmt_str: str = 'No `self._scope: CancelScope` was set/used ?' if cs: - scope_info: str = ( - f'self._scope: {cs}\n' - f'|_ .cancel_called: {cs.cancel_called}\n' - f'|_ .cancelled_caught: {cs.cancelled_caught}\n' - f'|_ ._cancel_status: {cs._cancel_status}\n\n' + fmt_str: str = self.pformat( + extra_fields={ + '._is_self_cancelled()': self._is_self_cancelled(), + '._cancel_on_msgerr': self._cancel_on_msgerr, - f'{self}\n' - f'|_ ._is_self_cancelled(): {self._is_self_cancelled()}\n' - f'|_ ._cancel_on_msgerr: {self._cancel_on_msgerr}\n\n' - - f'msgerr: {msgerr}\n' + '._scope': cs, + '._scope.cancel_called': cs.cancel_called, + '._scope.cancelled_caught': cs.cancelled_caught, + '._scope._cancel_status': cs._cancel_status, + } ) log.cancel( message + - f'{scope_info}' + fmt_str ) # TODO: maybe we should also call `._res_scope.cancel()` if it # exists to support cancelling any drain loop hangs? @@ -748,7 +757,7 @@ class Context: ) return ( # f'{self._nsf}() -{{{codec}}}-> {repr(self.outcome)}:' - f'{self._nsf}() -> {outcome_str}:' + f'{self._nsf}() -> {outcome_str}' ) @property @@ -836,7 +845,7 @@ class Context: if not self._portal: raise InternalError( 'No portal found!?\n' - 'Why is this supposed caller context missing it?' + 'Why is this supposed {self.side!r}-side ctx task missing it?!?' ) cid: str = self.cid @@ -1274,11 +1283,11 @@ class Context: ) log.cancel( - 'Ctx drained pre-result msgs:\n' - f'{pformat(drained_msgs)}\n\n' + 'Ctx drained to final result msgs\n' + f'{return_msg}\n\n' - f'Final return msg:\n' - f'{return_msg}\n' + f'pre-result drained msgs:\n' + f'{pformat(drained_msgs)}\n' ) self.maybe_raise( @@ -1443,6 +1452,65 @@ class Context: repr(self._result) ) + @property + def repr_state(self) -> str: + ''' + A `str`-status describing the current state of this + inter-actor IPC context in terms of the current "phase" state + of the SC shuttling dialog protocol. + + ''' + merr: Exception|None = self.maybe_error + outcome: Unresolved|Exception|Any = self.outcome + + match ( + outcome, + merr, + ): + case ( + Unresolved, + ContextCancelled(), + ) if self.cancel_acked: + status = 'self-cancelled' + + case ( + Unresolved, + ContextCancelled(), + ) if ( + self.canceller + and not self._cancel_called + ): + status = 'peer-cancelled' + + case ( + Unresolved, + BaseException(), + ) if self.canceller: + status = 'errored' + + case ( + _, # any non-unresolved value + None, + ) if self._final_result_is_set(): + status = 'returned' + + case ( + Unresolved, # noqa (weird.. ruff) + None, + ): + if stream := self._stream: + if stream.closed: + status = 'streaming-finished' + else: + status = 'streaming' + elif self._started_called: + status = 'started' + + case _: + status = 'unknown!?' + + return status + async def started( self, @@ -1451,7 +1519,11 @@ class Context: value: PayloadT|None = None, strict_parity: bool = False, - complain_no_parity: bool = True, + + # TODO: this will always emit now that we do `.pld: Raw` + # passthrough.. so maybe just only complain when above strict + # flag is set? + complain_no_parity: bool = False, ) -> None: ''' @@ -1511,18 +1583,19 @@ class Context: ) raise RuntimeError( 'Failed to roundtrip `Started` msg?\n' - f'{pformat(rt_started)}\n' + f'{pretty_struct.pformat(rt_started)}\n' ) if rt_started != started_msg: # TODO: break these methods out from the struct subtype? + # TODO: make that one a mod func too.. diff = pretty_struct.Struct.__sub__( rt_started, started_msg, ) complaint: str = ( - 'Started value does not match after codec rountrip?\n\n' + 'Started value does not match after roundtrip?\n\n' f'{diff}' ) @@ -1538,8 +1611,6 @@ class Context: else: log.warning(complaint) - # started_msg = rt_started - await self.chan.send(started_msg) # raise any msg type error NO MATTER WHAT! @@ -2354,7 +2425,7 @@ async def open_context_from_portal( # FINALLY, remove the context from runtime tracking and # exit! log.runtime( - 'De-allocating IPC ctx opened with {ctx.side!r} peer \n' + f'De-allocating IPC ctx opened with {ctx.side!r} peer \n' f'uid: {uid}\n' f'cid: {ctx.cid}\n' ) @@ -2390,10 +2461,8 @@ def mk_context( from .devx._code import find_caller_info caller_info: CallerInfo|None = find_caller_info() - pld_rx = msgops.PldRx( - # _rx_mc=recv_chan, - _msgdec=_codec.mk_dec(spec=pld_spec) - ) + # TODO: when/how do we apply `.limit_plds()` from here? + pld_rx: msgops.PldRx = msgops.current_pldrx() ctx = Context( chan=chan, diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index f2ff8c2..af653f9 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -46,7 +46,7 @@ from tractor.msg import ( Error, MsgType, Stop, - Yield, + # Yield, types as msgtypes, MsgCodec, MsgDec, @@ -140,71 +140,6 @@ def get_err_type(type_name: str) -> BaseException|None: return type_ref -def pformat_boxed_tb( - tb_str: str, - fields_str: str|None = None, - field_prefix: str = ' |_', - - tb_box_indent: int|None = None, - tb_body_indent: int = 1, - -) -> str: - if ( - fields_str - and - field_prefix - ): - fields: str = textwrap.indent( - fields_str, - prefix=field_prefix, - ) - else: - fields = fields_str or '' - - tb_body = tb_str - if tb_body_indent: - tb_body: str = textwrap.indent( - tb_str, - prefix=tb_body_indent * ' ', - ) - - tb_box: str = ( - - # orig - # f' |\n' - # f' ------ - ------\n\n' - # f'{tb_str}\n' - # f' ------ - ------\n' - # f' _|\n' - - f'|\n' - f' ------ - ------\n\n' - # f'{tb_str}\n' - f'{tb_body}' - f' ------ - ------\n' - f'_|\n' - ) - tb_box_indent: str = ( - tb_box_indent - or - 1 - - # (len(field_prefix)) - # ? ^-TODO-^ ? if you wanted another indent level - ) - if tb_box_indent > 0: - tb_box: str = textwrap.indent( - tb_box, - prefix=tb_box_indent * ' ', - ) - - return ( - fields - + - tb_box - ) - - def pack_from_raise( local_err: ( ContextCancelled @@ -504,12 +439,15 @@ class RemoteActorError(Exception): reprol_str: str = ( f'{type(self).__name__}' # type name f'[{self.boxed_type_str}]' # parameterized by boxed type - '(' # init-style look ) + _repr: str = self._mk_fields_str( self.reprol_fields, end_char=' ', ) + if _repr: + reprol_str += '(' # init-style call + return ( reprol_str + @@ -521,6 +459,7 @@ class RemoteActorError(Exception): Nicely formatted boxed error meta data + traceback. ''' + from tractor.devx._code import pformat_boxed_tb fields: str = self._mk_fields_str( _body_fields + @@ -1092,14 +1031,10 @@ def _mk_msg_type_err( # no src error from `msgspec.msgpack.Decoder.decode()` so # prolly a manual type-check on our part. if message is None: - fmt_stack: str = ( - '\n'.join(traceback.format_stack(limit=3)) - ) - tb_fmt: str = pformat_boxed_tb( - tb_str=fmt_stack, - field_prefix=' ', - indent='', + from tractor.devx._code import ( + pformat_caller_frame, ) + tb_fmt: str = pformat_caller_frame(stack_limit=3) message: str = ( f'invalid msg -> {msg}: {type(msg)}\n\n' f'{tb_fmt}\n' diff --git a/tractor/devx/_code.py b/tractor/devx/_code.py index 01d64cd..8d55212 100644 --- a/tractor/devx/_code.py +++ b/tractor/devx/_code.py @@ -23,6 +23,8 @@ from __future__ import annotations import inspect # import msgspec # from pprint import pformat +import textwrap +import traceback from types import ( FrameType, FunctionType, @@ -175,3 +177,103 @@ def find_caller_info( ) return None + + +def pformat_boxed_tb( + tb_str: str, + fields_str: str|None = None, + field_prefix: str = ' |_', + + tb_box_indent: int|None = None, + tb_body_indent: int = 1, + +) -> str: + ''' + Create a "boxed" looking traceback string. + + Useful for emphasizing traceback text content as being an + embedded attribute of some other object (like + a `RemoteActorError` or other boxing remote error shuttle + container). + + Any other parent/container "fields" can be passed in the + `fields_str` input along with other prefix/indent settings. + + ''' + if ( + fields_str + and + field_prefix + ): + fields: str = textwrap.indent( + fields_str, + prefix=field_prefix, + ) + else: + fields = fields_str or '' + + tb_body = tb_str + if tb_body_indent: + tb_body: str = textwrap.indent( + tb_str, + prefix=tb_body_indent * ' ', + ) + + tb_box: str = ( + + # orig + # f' |\n' + # f' ------ - ------\n\n' + # f'{tb_str}\n' + # f' ------ - ------\n' + # f' _|\n' + + f'|\n' + f' ------ - ------\n\n' + # f'{tb_str}\n' + f'{tb_body}' + f' ------ - ------\n' + f'_|\n' + ) + tb_box_indent: str = ( + tb_box_indent + or + 1 + + # (len(field_prefix)) + # ? ^-TODO-^ ? if you wanted another indent level + ) + if tb_box_indent > 0: + tb_box: str = textwrap.indent( + tb_box, + prefix=tb_box_indent * ' ', + ) + + return ( + fields + + + tb_box + ) + + +def pformat_caller_frame( + stack_limit: int = 1, + box_tb: bool = True, +) -> str: + ''' + Capture and return the traceback text content from + `stack_limit` call frames up. + + ''' + tb_str: str = ( + '\n'.join( + traceback.format_stack(limit=stack_limit) + ) + ) + if box_tb: + tb_str: str = pformat_boxed_tb( + tb_str=tb_str, + field_prefix=' ', + indent='', + ) + return tb_str