diff --git a/tractor/_context.py b/tractor/_context.py index 027f15f..2230598 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -26,6 +26,7 @@ disjoint, parallel executing tasks in separate actors. from __future__ import annotations from collections import deque from contextlib import asynccontextmanager as acm +from contextvars import ContextVar from dataclasses import ( dataclass, field, @@ -56,6 +57,7 @@ from ._exceptions import ( ) from .log import get_logger from .msg import ( + _codec, Error, MsgType, MsgCodec, @@ -80,6 +82,9 @@ if TYPE_CHECKING: from ._portal import Portal from ._runtime import Actor from ._ipc import MsgTransport + from .devx._code import ( + CallerInfo, + ) log = get_logger(__name__) @@ -499,6 +504,18 @@ class Context: _started_called: bool = False _stream_opened: bool = False _stream: MsgStream|None = None + _pld_codec_var: ContextVar[MsgCodec] = ContextVar( + 'pld_codec', + default=_codec._def_msgspec_codec, # i.e. `Any`-payloads + ) + + @property + def pld_codec(self) -> MsgCodec|None: + return self._pld_codec_var.get() + + # caller of `Portal.open_context()` for + # logging purposes mostly + _caller_info: CallerInfo|None = None # overrun handling machinery # NOTE: none of this provides "backpressure" to the remote @@ -525,6 +542,7 @@ class Context: # TODO: figure out how we can enforce this without losing our minds.. _strict_started: bool = False + _cancel_on_msgerr: bool = True def __str__(self) -> str: ds: str = '=' @@ -857,6 +875,7 @@ class Context: # TODO: never do this right? # if self._remote_error: # return + peer_side: str = self.peer_side(self.side) # XXX: denote and set the remote side's error so that # after we cancel whatever task is the opener of this @@ -864,14 +883,15 @@ class Context: # appropriately. log.runtime( 'Setting remote error for ctx\n\n' - f'<= remote ctx uid: {self.chan.uid}\n' - f'=>{error}' + f'<= {peer_side!r}: {self.chan.uid}\n' + f'=> {self.side!r}\n\n' + f'{error}' ) self._remote_error: BaseException = error # self-cancel (ack) or, # peer propagated remote cancellation. - msgtyperr: bool = False + msgerr: bool = False if isinstance(error, ContextCancelled): whom: str = ( @@ -884,7 +904,7 @@ class Context: ) elif isinstance(error, MsgTypeError): - msgtyperr = True + msgerr = True peer_side: str = self.peer_side(self.side) log.error( f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n' @@ -935,13 +955,24 @@ class Context: and not self._is_self_cancelled() and not cs.cancel_called and not cs.cancelled_caught - and not msgtyperr + and ( + msgerr + and + # NOTE: allow user to config not cancelling the + # local scope on `MsgTypeError`s + self._cancel_on_msgerr + ) ): # TODO: it'd sure be handy to inject our own # `trio.Cancelled` subtype here ;) # https://github.com/goodboy/tractor/issues/368 + log.cancel('Cancelling local `.open_context()` scope!') self._scope.cancel() + else: + log.cancel('NOT cancelling local `.open_context()` scope!') + + # TODO: maybe we should also call `._res_scope.cancel()` if it # exists to support cancelling any drain loop hangs? @@ -966,9 +997,7 @@ class Context: dmaddr = dst_maddr @property - def repr_rpc( - self, - ) -> str: + def repr_rpc(self) -> str: # TODO: how to show the transport interchange fmt? # codec: str = self.chan.transport.codec_key outcome_str: str = self.repr_outcome( @@ -980,6 +1009,27 @@ class Context: f'{self._nsf}() -> {outcome_str}:' ) + @property + def repr_caller(self) -> str: + ci: CallerInfo|None = self._caller_info + if ci: + return ( + f'{ci.caller_nsp}()' + # f'|_api: {ci.api_nsp}' + ) + + return '' + + @property + def repr_api(self) -> str: + # ci: CallerInfo|None = self._caller_info + # if ci: + # return ( + # f'{ci.api_nsp}()\n' + # ) + + return 'Portal.open_context()' + async def cancel( self, timeout: float = 0.616, @@ -1184,8 +1234,9 @@ class Context: ) # NOTE: in one way streaming this only happens on the - # caller side inside `Actor.start_remote_task()` so if you try - # to send a stop from the caller to the callee in the + # parent-ctx-task side (on the side that calls + # `Actor.start_remote_task()`) so if you try to send + # a stop from the caller to the callee in the # single-direction-stream case you'll get a lookup error # currently. ctx: Context = actor.get_context( @@ -1850,6 +1901,19 @@ class Context: send_chan: trio.MemorySendChannel = self._send_chan nsf: NamespacePath = self._nsf + side: str = self.side + if side == 'child': + assert not self._portal + peer_side: str = self.peer_side(side) + + flow_body: str = ( + f'<= peer {peer_side!r}: {from_uid}\n' + f' |_<{nsf}()>\n\n' + + f'=> {side!r}: {self._task}\n' + f' |_<{self.repr_api} @ {self.repr_caller}>\n\n' + ) + re: Exception|None if re := unpack_error( msg, @@ -1860,18 +1924,10 @@ class Context: else: log_meth = log.runtime - side: str = self.side - - peer_side: str = self.peer_side(side) - log_meth( f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n' - f'<= peer {peer_side!r}: {from_uid}\n' - f' |_ {nsf}()\n\n' - - f'=> {side!r} cid: {cid}\n' - f' |_{self._task}\n\n' + f'{flow_body}' f'{pformat(re)}\n' ) @@ -1884,30 +1940,27 @@ class Context: # or `RemoteActorError`). self._maybe_cancel_and_set_remote_error(re) - # XXX only case where returning early is fine! + # TODO: expose as mod func instead! structfmt = pretty_struct.Struct.pformat if self._in_overrun: log.warning( - f'Queueing OVERRUN msg on caller task:\n' - f'<= peer: {from_uid}\n' - f' |_ {nsf}()\n\n' + f'Queueing OVERRUN msg on caller task:\n\n' - f'=> cid: {cid}\n' - f' |_{self._task}\n\n' + f'{flow_body}' f'{structfmt(msg)}\n' ) self._overflow_q.append(msg) + + # XXX NOTE XXX + # overrun is the ONLY case where returning early is fine! return False try: log.runtime( f'Delivering msg from IPC ctx:\n\n' - f'<= {from_uid}\n' - f' |_ {nsf}()\n\n' - f'=> {self._task}\n' - f' |_cid={self.cid}\n\n' + f'{flow_body}' f'{structfmt(msg)}\n' ) @@ -1939,6 +1992,7 @@ class Context: f'cid: {self.cid}\n' 'Failed to deliver msg:\n' f'send_chan: {send_chan}\n\n' + f'{pformat(msg)}\n' ) return False @@ -2092,6 +2146,12 @@ async def open_context_from_portal( ''' __tracebackhide__: bool = hide_tb + # denote this frame as a "runtime frame" for stack + # introspection where we report the caller code in logging + # and error message content. + # NOTE: 2 bc of the wrapping `@acm` + __runtimeframe__: int = 2 # noqa + # conduct target func method structural checks if not inspect.iscoroutinefunction(func) and ( getattr(func, '_tractor_contex_function', False) @@ -2119,6 +2179,8 @@ async def open_context_from_portal( nsf=nsf, kwargs=kwargs, + portal=portal, + # NOTE: it's imporant to expose this since you might # get the case where the parent who opened the context does # not open a stream until after some slow startup/init @@ -2129,13 +2191,17 @@ async def open_context_from_portal( # place.. allow_overruns=allow_overruns, ) - # ASAP, so that `Context.side: str` can be determined for - # logging / tracing / debug! - ctx._portal: Portal = portal - assert ctx._remote_func_type == 'context' - msg: Started = await ctx._recv_chan.receive() + assert ctx._caller_info + # XXX NOTE since `._scope` is NOT set BEFORE we retreive the + # `Started`-msg any cancellation triggered + # in `._maybe_cancel_and_set_remote_error()` will + # NOT actually cancel the below line! + # -> it's expected that if there is an error in this phase of + # the dialog, the `Error` msg should be raised from the `msg` + # handling block below. + msg: Started = await ctx._recv_chan.receive() try: # the "first" value here is delivered by the callee's # ``Context.started()`` call. @@ -2145,6 +2211,7 @@ async def open_context_from_portal( # except KeyError as src_error: except AttributeError as src_error: + log.exception('Raising from unexpected msg!\n') _raise_from_no_key_in_msg( ctx=ctx, msg=msg, @@ -2570,7 +2637,6 @@ async def open_context_from_portal( None, ) - def mk_context( chan: Channel, cid: str, @@ -2592,6 +2658,10 @@ def mk_context( recv_chan: trio.MemoryReceiveChannel send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) + # TODO: only scan caller-info if log level so high! + from .devx._code import find_caller_info + caller_info: CallerInfo|None = find_caller_info() + ctx = Context( chan=chan, cid=cid, @@ -2600,6 +2670,7 @@ def mk_context( _recv_chan=recv_chan, _nsf=nsf, _task=trio.lowlevel.current_task(), + _caller_info=caller_info, **kwargs, ) # TODO: we can drop the old placeholder yah? @@ -2610,7 +2681,11 @@ def mk_context( def context(func: Callable) -> Callable: ''' - Mark an async function as a streaming routine with ``@context``. + Mark an (async) function as an SC-supervised, inter-`Actor`, + child-`trio.Task`, IPC endpoint otherwise known more + colloquially as a (RPC) "context". + + Functions annotated the fundamental IPC endpoint type offered by `tractor`. ''' # TODO: apply whatever solution ``mypy`` ends up picking for this: