Proto in new `Context` refinements

As per some newly added features and APIs:

- pass `portal: Portal` to `Actor.start_remote_task()` from
  `open_context_from_portal()` marking `Portal.open_context()` as
  always being the "parent" task side.

- add caller tracing via `.devx._code.CallerInfo/.find_caller_info()`
  called in `mk_context()` and (for now) a `__runtimeframe__: int = 2`
  inside `open_context_from_portal()` such that any enter-er of
  `Portal.open_context()` will be reported.

- pass in a new `._caller_info` attr which is used in 2 new meths:
  - `.repr_caller: str` for showing the name of the app-code-func.
  - `.repr_api: str` for showing the API ep, which for now we just
    hardcode to `Portal.open_context()` since ow its gonna show the mod
    func name `open_context_from_portal()`.
  - use those new props ^ in the `._deliver_msg()` flow body log msg
    content for much clearer msg-flow tracing Bo

- add `Context._cancel_on_msgerr: bool` to toggle whether
  a delivered `MsgTypeError` should trigger a `._scope.cancel()` call.
  - also (temporarily) add separate `.cancel()` emissions for both cases
    as i work through hacking out the maybe `MsgType.pld: Raw` support.
runtime_to_msgspec
Tyler Goodlet 2024-04-18 15:53:34 -04:00
parent 3018187228
commit d51be2a36a
1 changed files with 111 additions and 36 deletions

View File

@ -26,6 +26,7 @@ disjoint, parallel executing tasks in separate actors.
from __future__ import annotations from __future__ import annotations
from collections import deque from collections import deque
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from contextvars import ContextVar
from dataclasses import ( from dataclasses import (
dataclass, dataclass,
field, field,
@ -56,6 +57,7 @@ from ._exceptions import (
) )
from .log import get_logger from .log import get_logger
from .msg import ( from .msg import (
_codec,
Error, Error,
MsgType, MsgType,
MsgCodec, MsgCodec,
@ -80,6 +82,9 @@ if TYPE_CHECKING:
from ._portal import Portal from ._portal import Portal
from ._runtime import Actor from ._runtime import Actor
from ._ipc import MsgTransport from ._ipc import MsgTransport
from .devx._code import (
CallerInfo,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -499,6 +504,18 @@ class Context:
_started_called: bool = False _started_called: bool = False
_stream_opened: bool = False _stream_opened: bool = False
_stream: MsgStream|None = None _stream: MsgStream|None = None
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
'pld_codec',
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
)
@property
def pld_codec(self) -> MsgCodec|None:
return self._pld_codec_var.get()
# caller of `Portal.open_context()` for
# logging purposes mostly
_caller_info: CallerInfo|None = None
# overrun handling machinery # overrun handling machinery
# NOTE: none of this provides "backpressure" to the remote # NOTE: none of this provides "backpressure" to the remote
@ -525,6 +542,7 @@ class Context:
# TODO: figure out how we can enforce this without losing our minds.. # TODO: figure out how we can enforce this without losing our minds..
_strict_started: bool = False _strict_started: bool = False
_cancel_on_msgerr: bool = True
def __str__(self) -> str: def __str__(self) -> str:
ds: str = '=' ds: str = '='
@ -857,6 +875,7 @@ class Context:
# TODO: never do this right? # TODO: never do this right?
# if self._remote_error: # if self._remote_error:
# return # return
peer_side: str = self.peer_side(self.side)
# XXX: denote and set the remote side's error so that # XXX: denote and set the remote side's error so that
# after we cancel whatever task is the opener of this # after we cancel whatever task is the opener of this
@ -864,14 +883,15 @@ class Context:
# appropriately. # appropriately.
log.runtime( log.runtime(
'Setting remote error for ctx\n\n' 'Setting remote error for ctx\n\n'
f'<= remote ctx uid: {self.chan.uid}\n' f'<= {peer_side!r}: {self.chan.uid}\n'
f'=>{error}' f'=> {self.side!r}\n\n'
f'{error}'
) )
self._remote_error: BaseException = error self._remote_error: BaseException = error
# self-cancel (ack) or, # self-cancel (ack) or,
# peer propagated remote cancellation. # peer propagated remote cancellation.
msgtyperr: bool = False msgerr: bool = False
if isinstance(error, ContextCancelled): if isinstance(error, ContextCancelled):
whom: str = ( whom: str = (
@ -884,7 +904,7 @@ class Context:
) )
elif isinstance(error, MsgTypeError): elif isinstance(error, MsgTypeError):
msgtyperr = True msgerr = True
peer_side: str = self.peer_side(self.side) peer_side: str = self.peer_side(self.side)
log.error( log.error(
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n' f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
@ -935,13 +955,24 @@ class Context:
and not self._is_self_cancelled() and not self._is_self_cancelled()
and not cs.cancel_called and not cs.cancel_called
and not cs.cancelled_caught and not cs.cancelled_caught
and not msgtyperr and (
msgerr
and
# NOTE: allow user to config not cancelling the
# local scope on `MsgTypeError`s
self._cancel_on_msgerr
)
): ):
# TODO: it'd sure be handy to inject our own # TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;) # `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
log.cancel('Cancelling local `.open_context()` scope!')
self._scope.cancel() self._scope.cancel()
else:
log.cancel('NOT cancelling local `.open_context()` scope!')
# TODO: maybe we should also call `._res_scope.cancel()` if it # TODO: maybe we should also call `._res_scope.cancel()` if it
# exists to support cancelling any drain loop hangs? # exists to support cancelling any drain loop hangs?
@ -966,9 +997,7 @@ class Context:
dmaddr = dst_maddr dmaddr = dst_maddr
@property @property
def repr_rpc( def repr_rpc(self) -> str:
self,
) -> str:
# TODO: how to show the transport interchange fmt? # TODO: how to show the transport interchange fmt?
# codec: str = self.chan.transport.codec_key # codec: str = self.chan.transport.codec_key
outcome_str: str = self.repr_outcome( outcome_str: str = self.repr_outcome(
@ -980,6 +1009,27 @@ class Context:
f'{self._nsf}() -> {outcome_str}:' f'{self._nsf}() -> {outcome_str}:'
) )
@property
def repr_caller(self) -> str:
ci: CallerInfo|None = self._caller_info
if ci:
return (
f'{ci.caller_nsp}()'
# f'|_api: {ci.api_nsp}'
)
return '<UNKNOWN caller-frame>'
@property
def repr_api(self) -> str:
# ci: CallerInfo|None = self._caller_info
# if ci:
# return (
# f'{ci.api_nsp}()\n'
# )
return 'Portal.open_context()'
async def cancel( async def cancel(
self, self,
timeout: float = 0.616, timeout: float = 0.616,
@ -1184,8 +1234,9 @@ class Context:
) )
# NOTE: in one way streaming this only happens on the # NOTE: in one way streaming this only happens on the
# caller side inside `Actor.start_remote_task()` so if you try # parent-ctx-task side (on the side that calls
# to send a stop from the caller to the callee in the # `Actor.start_remote_task()`) so if you try to send
# a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error # single-direction-stream case you'll get a lookup error
# currently. # currently.
ctx: Context = actor.get_context( ctx: Context = actor.get_context(
@ -1850,6 +1901,19 @@ class Context:
send_chan: trio.MemorySendChannel = self._send_chan send_chan: trio.MemorySendChannel = self._send_chan
nsf: NamespacePath = self._nsf nsf: NamespacePath = self._nsf
side: str = self.side
if side == 'child':
assert not self._portal
peer_side: str = self.peer_side(side)
flow_body: str = (
f'<= peer {peer_side!r}: {from_uid}\n'
f' |_<{nsf}()>\n\n'
f'=> {side!r}: {self._task}\n'
f' |_<{self.repr_api} @ {self.repr_caller}>\n\n'
)
re: Exception|None re: Exception|None
if re := unpack_error( if re := unpack_error(
msg, msg,
@ -1860,18 +1924,10 @@ class Context:
else: else:
log_meth = log.runtime log_meth = log.runtime
side: str = self.side
peer_side: str = self.peer_side(side)
log_meth( log_meth(
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n' f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
f'<= peer {peer_side!r}: {from_uid}\n' f'{flow_body}'
f' |_ {nsf}()\n\n'
f'=> {side!r} cid: {cid}\n'
f' |_{self._task}\n\n'
f'{pformat(re)}\n' f'{pformat(re)}\n'
) )
@ -1884,30 +1940,27 @@ class Context:
# or `RemoteActorError`). # or `RemoteActorError`).
self._maybe_cancel_and_set_remote_error(re) self._maybe_cancel_and_set_remote_error(re)
# XXX only case where returning early is fine! # TODO: expose as mod func instead!
structfmt = pretty_struct.Struct.pformat structfmt = pretty_struct.Struct.pformat
if self._in_overrun: if self._in_overrun:
log.warning( log.warning(
f'Queueing OVERRUN msg on caller task:\n' f'Queueing OVERRUN msg on caller task:\n\n'
f'<= peer: {from_uid}\n'
f' |_ {nsf}()\n\n'
f'=> cid: {cid}\n' f'{flow_body}'
f' |_{self._task}\n\n'
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
self._overflow_q.append(msg) self._overflow_q.append(msg)
# XXX NOTE XXX
# overrun is the ONLY case where returning early is fine!
return False return False
try: try:
log.runtime( log.runtime(
f'Delivering msg from IPC ctx:\n\n' f'Delivering msg from IPC ctx:\n\n'
f'<= {from_uid}\n'
f' |_ {nsf}()\n\n'
f'=> {self._task}\n' f'{flow_body}'
f' |_cid={self.cid}\n\n'
f'{structfmt(msg)}\n' f'{structfmt(msg)}\n'
) )
@ -1939,6 +1992,7 @@ class Context:
f'cid: {self.cid}\n' f'cid: {self.cid}\n'
'Failed to deliver msg:\n' 'Failed to deliver msg:\n'
f'send_chan: {send_chan}\n\n' f'send_chan: {send_chan}\n\n'
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) )
return False return False
@ -2092,6 +2146,12 @@ async def open_context_from_portal(
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
# denote this frame as a "runtime frame" for stack
# introspection where we report the caller code in logging
# and error message content.
# NOTE: 2 bc of the wrapping `@acm`
__runtimeframe__: int = 2 # noqa
# conduct target func method structural checks # conduct target func method structural checks
if not inspect.iscoroutinefunction(func) and ( if not inspect.iscoroutinefunction(func) and (
getattr(func, '_tractor_contex_function', False) getattr(func, '_tractor_contex_function', False)
@ -2119,6 +2179,8 @@ async def open_context_from_portal(
nsf=nsf, nsf=nsf,
kwargs=kwargs, kwargs=kwargs,
portal=portal,
# NOTE: it's imporant to expose this since you might # NOTE: it's imporant to expose this since you might
# get the case where the parent who opened the context does # get the case where the parent who opened the context does
# not open a stream until after some slow startup/init # not open a stream until after some slow startup/init
@ -2129,13 +2191,17 @@ async def open_context_from_portal(
# place.. # place..
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
# ASAP, so that `Context.side: str` can be determined for
# logging / tracing / debug!
ctx._portal: Portal = portal
assert ctx._remote_func_type == 'context' assert ctx._remote_func_type == 'context'
msg: Started = await ctx._recv_chan.receive() assert ctx._caller_info
# XXX NOTE since `._scope` is NOT set BEFORE we retreive the
# `Started`-msg any cancellation triggered
# in `._maybe_cancel_and_set_remote_error()` will
# NOT actually cancel the below line!
# -> it's expected that if there is an error in this phase of
# the dialog, the `Error` msg should be raised from the `msg`
# handling block below.
msg: Started = await ctx._recv_chan.receive()
try: try:
# the "first" value here is delivered by the callee's # the "first" value here is delivered by the callee's
# ``Context.started()`` call. # ``Context.started()`` call.
@ -2145,6 +2211,7 @@ async def open_context_from_portal(
# except KeyError as src_error: # except KeyError as src_error:
except AttributeError as src_error: except AttributeError as src_error:
log.exception('Raising from unexpected msg!\n')
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=ctx, ctx=ctx,
msg=msg, msg=msg,
@ -2570,7 +2637,6 @@ async def open_context_from_portal(
None, None,
) )
def mk_context( def mk_context(
chan: Channel, chan: Channel,
cid: str, cid: str,
@ -2592,6 +2658,10 @@ def mk_context(
recv_chan: trio.MemoryReceiveChannel recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size) send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
# TODO: only scan caller-info if log level so high!
from .devx._code import find_caller_info
caller_info: CallerInfo|None = find_caller_info()
ctx = Context( ctx = Context(
chan=chan, chan=chan,
cid=cid, cid=cid,
@ -2600,6 +2670,7 @@ def mk_context(
_recv_chan=recv_chan, _recv_chan=recv_chan,
_nsf=nsf, _nsf=nsf,
_task=trio.lowlevel.current_task(), _task=trio.lowlevel.current_task(),
_caller_info=caller_info,
**kwargs, **kwargs,
) )
# TODO: we can drop the old placeholder yah? # TODO: we can drop the old placeholder yah?
@ -2610,7 +2681,11 @@ def mk_context(
def context(func: Callable) -> Callable: def context(func: Callable) -> Callable:
''' '''
Mark an async function as a streaming routine with ``@context``. Mark an (async) function as an SC-supervised, inter-`Actor`,
child-`trio.Task`, IPC endpoint otherwise known more
colloquially as a (RPC) "context".
Functions annotated the fundamental IPC endpoint type offered by `tractor`.
''' '''
# TODO: apply whatever solution ``mypy`` ends up picking for this: # TODO: apply whatever solution ``mypy`` ends up picking for this: