Proto in new `Context` refinements

As per some newly added features and APIs:

- pass `portal: Portal` to `Actor.start_remote_task()` from
  `open_context_from_portal()` marking `Portal.open_context()` as
  always being the "parent" task side.

- add caller tracing via `.devx._code.CallerInfo/.find_caller_info()`
  called in `mk_context()` and (for now) a `__runtimeframe__: int = 2`
  inside `open_context_from_portal()` such that any enter-er of
  `Portal.open_context()` will be reported.

- pass in a new `._caller_info` attr which is used in 2 new meths:
  - `.repr_caller: str` for showing the name of the app-code-func.
  - `.repr_api: str` for showing the API ep, which for now we just
    hardcode to `Portal.open_context()` since ow its gonna show the mod
    func name `open_context_from_portal()`.
  - use those new props ^ in the `._deliver_msg()` flow body log msg
    content for much clearer msg-flow tracing Bo

- add `Context._cancel_on_msgerr: bool` to toggle whether
  a delivered `MsgTypeError` should trigger a `._scope.cancel()` call.
  - also (temporarily) add separate `.cancel()` emissions for both cases
    as i work through hacking out the maybe `MsgType.pld: Raw` support.
Tyler Goodlet 2024-04-18 15:53:34 -04:00
parent 3c498c2eac
commit cc69d86baf
1 changed files with 111 additions and 36 deletions

View File

@ -26,6 +26,7 @@ disjoint, parallel executing tasks in separate actors.
from __future__ import annotations
from collections import deque
from contextlib import asynccontextmanager as acm
from contextvars import ContextVar
from dataclasses import (
@ -56,6 +57,7 @@ from ._exceptions import (
from .log import get_logger
from .msg import (
@ -80,6 +82,9 @@ if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
from ._ipc import MsgTransport
from .devx._code import (
log = get_logger(__name__)
@ -499,6 +504,18 @@ class Context:
_started_called: bool = False
_stream_opened: bool = False
_stream: MsgStream|None = None
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
def pld_codec(self) -> MsgCodec|None:
return self._pld_codec_var.get()
# caller of `Portal.open_context()` for
# logging purposes mostly
_caller_info: CallerInfo|None = None
# overrun handling machinery
# NOTE: none of this provides "backpressure" to the remote
@ -525,6 +542,7 @@ class Context:
# TODO: figure out how we can enforce this without losing our minds..
_strict_started: bool = False
_cancel_on_msgerr: bool = True
def __str__(self) -> str:
ds: str = '='
@ -857,6 +875,7 @@ class Context:
# TODO: never do this right?
# if self._remote_error:
# return
peer_side: str = self.peer_side(self.side)
# XXX: denote and set the remote side's error so that
# after we cancel whatever task is the opener of this
@ -864,14 +883,15 @@ class Context:
# appropriately.
'Setting remote error for ctx\n\n'
f'<= remote ctx uid: {self.chan.uid}\n'
f'<= {peer_side!r}: {self.chan.uid}\n'
f'=> {self.side!r}\n\n'
self._remote_error: BaseException = error
# self-cancel (ack) or,
# peer propagated remote cancellation.
msgtyperr: bool = False
msgerr: bool = False
if isinstance(error, ContextCancelled):
whom: str = (
@ -884,7 +904,7 @@ class Context:
elif isinstance(error, MsgTypeError):
msgtyperr = True
msgerr = True
peer_side: str = self.peer_side(self.side)
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
@ -935,13 +955,24 @@ class Context:
and not self._is_self_cancelled()
and not cs.cancel_called
and not cs.cancelled_caught
and not msgtyperr
and (
# NOTE: allow user to config not cancelling the
# local scope on `MsgTypeError`s
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
log.cancel('Cancelling local `.open_context()` scope!')
log.cancel('NOT cancelling local `.open_context()` scope!')
# TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs?
# NOTE: this usage actually works here B)
@ -969,9 +1000,7 @@ class Context:
dmaddr = dst_maddr
def repr_rpc(
) -> str:
def repr_rpc(self) -> str:
# TODO: how to show the transport interchange fmt?
# codec: str = self.chan.transport.codec_key
outcome_str: str = self.repr_outcome(
@ -983,6 +1012,27 @@ class Context:
f'{self._nsf}() -> {outcome_str}:'
def repr_caller(self) -> str:
ci: CallerInfo|None = self._caller_info
if ci:
return (
# f'|_api: {ci.api_nsp}'
return '<UNKNOWN caller-frame>'
def repr_api(self) -> str:
# ci: CallerInfo|None = self._caller_info
# if ci:
# return (
# f'{ci.api_nsp}()\n'
# )
return 'Portal.open_context()'
async def cancel(
timeout: float = 0.616,
@ -1187,8 +1237,9 @@ class Context:
# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.start_remote_task()` so if you try
# to send a stop from the caller to the callee in the
# parent-ctx-task side (on the side that calls
# `Actor.start_remote_task()`) so if you try to send
# a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx: Context = actor.get_context(
@ -1853,6 +1904,19 @@ class Context:
send_chan: trio.MemorySendChannel = self._send_chan
nsf: NamespacePath = self._nsf
side: str = self.side
if side == 'child':
assert not self._portal
peer_side: str = self.peer_side(side)
flow_body: str = (
f'<= peer {peer_side!r}: {from_uid}\n'
f' |_<{nsf}()>\n\n'
f'=> {side!r}: {self._task}\n'
f' |_<{self.repr_api} @ {self.repr_caller}>\n\n'
re: Exception|None
if re := unpack_error(
@ -1863,18 +1927,10 @@ class Context:
log_meth = log.runtime
side: str = self.side
peer_side: str = self.peer_side(side)
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
f'<= peer {peer_side!r}: {from_uid}\n'
f' |_ {nsf}()\n\n'
f'=> {side!r} cid: {cid}\n'
f' |_{self._task}\n\n'
@ -1887,30 +1943,27 @@ class Context:
# or `RemoteActorError`).
# XXX only case where returning early is fine!
# TODO: expose as mod func instead!
structfmt = pretty_struct.Struct.pformat
if self._in_overrun:
f'Queueing OVERRUN msg on caller task:\n'
f'<= peer: {from_uid}\n'
f' |_ {nsf}()\n\n'
f'Queueing OVERRUN msg on caller task:\n\n'
f'=> cid: {cid}\n'
f' |_{self._task}\n\n'
# overrun is the ONLY case where returning early is fine!
return False
f'Delivering msg from IPC ctx:\n\n'
f'<= {from_uid}\n'
f' |_ {nsf}()\n\n'
f'=> {self._task}\n'
f' |_cid={self.cid}\n\n'
@ -1942,6 +1995,7 @@ class Context:
f'cid: {self.cid}\n'
'Failed to deliver msg:\n'
f'send_chan: {send_chan}\n\n'
return False
@ -2095,6 +2149,12 @@ async def open_context_from_portal(
__tracebackhide__: bool = hide_tb
# denote this frame as a "runtime frame" for stack
# introspection where we report the caller code in logging
# and error message content.
# NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa
# conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False)
@ -2122,6 +2182,8 @@ async def open_context_from_portal(
# NOTE: it's imporant to expose this since you might
# get the case where the parent who opened the context does
# not open a stream until after some slow startup/init
@ -2132,13 +2194,17 @@ async def open_context_from_portal(
# place..
# ASAP, so that `Context.side: str` can be determined for
# logging / tracing / debug!
ctx._portal: Portal = portal
assert ctx._remote_func_type == 'context'
msg: Started = await ctx._recv_chan.receive()
assert ctx._caller_info
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
# NOT actually cancel the below line!
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
msg: Started = await ctx._recv_chan.receive()
# the "first" value here is delivered by the callee's
# ``Context.started()`` call.
@ -2148,6 +2214,7 @@ async def open_context_from_portal(
# except KeyError as src_error:
except AttributeError as src_error:
log.exception('Raising from unexpected msg!\n')
@ -2573,7 +2640,6 @@ async def open_context_from_portal(
def mk_context(
chan: Channel,
cid: str,
@ -2595,6 +2661,10 @@ def mk_context(
recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high!
from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info()
ctx = Context(
@ -2603,6 +2673,7 @@ def mk_context(
# TODO: we can drop the old placeholder yah?
@ -2613,7 +2684,11 @@ def mk_context(
def context(func: Callable) -> Callable:
Mark an async function as a streaming routine with ``@context``.
Mark an (async) function as an SC-supervised, inter-`Actor`,
child-`trio.Task`, IPC endpoint otherwise known more
colloquially as a (RPC) "context".
Functions annotated the fundamental IPC endpoint type offered by `tractor`.
# TODO: apply whatever solution ``mypy`` ends up picking for this: