First draft "payload receiver in a new `.msg._ops`
As per much tinkering, re-designs and preceding rubber-ducking via many "commit msg novelas", **finally** this adds the (hopefully) final missing layer for typed msg safety: `tractor.msg._ops.PldRx` (or `PayloadReceiver`? haven't decided how verbose to go..) Design justification summary: ------ - ------ - need a way to be as-close-as-possible to the `tractor`-application such that when `MsgType.pld: PayloadT` validation takes place, it is straightforward and obvious how user code can decide to handle any resulting `MsgTypeError`. - there should be a common and optional-yet-modular way to modify **how** data delivered via IPC (possibly embedded as user defined, type-constrained `.pld: msgspec.Struct`s) can be handled and processed during fault conditions and/or IPC "msg attacks". - support for nested type constraints within a `MsgType.pld` field should be simple to define, implement and understand at runtime. - a layer between the app-level IPC primitive APIs (`Context`/`MsgStream`) and application-task code (consumer code of those APIs) should be easily customized and prove-to-be-as-such through demonstrably rigorous internal (sub-sys) use! -> eg. via seemless runtime RPC eps support like `Actor.cancel()` -> by correctly implementing our `.devx._debug.Lock` REPL TTY mgmt dialog prot, via a dead simple payload-as-ctl-msg-spec. There are some fairly detailed doc strings included so I won't duplicate that content, the majority of the work here is actually somewhat of a factoring of many similar blocks that are doing more or less the same `msg = await Context._rx_chan.receive()` with boilerplate for `Error`/`Stop` handling via `_raise_from_no_key_in_msg()`. The new `PldRx` basically provides a shim layer for this common "receive msg, decode its payload, yield it up to the consuming app task" by pairing the RPC feeder mem-chan with a msg-payload decoder and expecting IPC API internals to use **one** API instead of re-implementing the same pattern all over the place XD `PldRx` breakdown ------ - ------ - for now only expects a `._msgdec: MsgDec` which allows for override-able `MsgType.pld` validation and most obviously used in the impl of `.dec_msg()`, the decode message method. - provides multiple mem-chan receive options including: |_ `.recv_pld()` which does the e2e operation of receiving a payload item. |_ a sync `.recv_pld_nowait()` version. |_ a `.recv_msg_w_pld()` which optionally allows retreiving both the shuttling `MsgType` as well as it's `.pld` body for use cases where info on both is important (eg. draining a `MsgStream`). Dirty internal changeover/implementation deatz: ------ - ------ - obvi move over all the IPC "primitives" that previously had the duplicate recv-n-yield logic: - `MsgStream.receive[_nowait]()` delegating instead to the equivalent `PldRx.recv_pld[_nowait]()`. - add `Context._pld_rx: PldRx`, created and passed in by `mk_context()`; use it for the `.started()` -> `first: Started` retrieval inside `open_context_from_portal()`. - all the relevant `Portal` invocation methods: `.result()`, `.run_from_ns()`, `.run()`; also allows for dropping `_unwrap_msg()` and `.Portal_return_once()` outright Bo - rename `Context.ctx._recv_chan` -> `._rx_chan`. - add detailed `Context._scope` info for logging whether or not it's cancelled inside `_maybe_cancel_and_set_remote_error()`. - move `._context._drain_to_final_msg()` -> `._ops.drain_to_final_msg()` since it's really not necessarily ctx specific per say, and it does kinda fit with "msg operations" more abstractly ;)
parent
a51632ffa6
commit
6e0ef76128
|
@ -25,26 +25,31 @@ disjoint, parallel executing tasks in separate actors.
|
|||
'''
|
||||
from __future__ import annotations
|
||||
from collections import deque
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from contextvars import ContextVar
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from dataclasses import (
|
||||
dataclass,
|
||||
field,
|
||||
)
|
||||
from functools import partial
|
||||
import inspect
|
||||
import msgspec
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
AsyncGenerator,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
Union,
|
||||
)
|
||||
import warnings
|
||||
|
||||
# ------ - ------
|
||||
import trio
|
||||
|
||||
from msgspec import (
|
||||
ValidationError,
|
||||
)
|
||||
# ------ - ------
|
||||
from ._exceptions import (
|
||||
ContextCancelled,
|
||||
InternalError,
|
||||
|
@ -53,7 +58,6 @@ from ._exceptions import (
|
|||
StreamOverrun,
|
||||
pack_from_raise,
|
||||
unpack_error,
|
||||
_raise_from_no_key_in_msg,
|
||||
)
|
||||
from .log import get_logger
|
||||
from .msg import (
|
||||
|
@ -70,8 +74,12 @@ from .msg import (
|
|||
current_codec,
|
||||
pretty_struct,
|
||||
types as msgtypes,
|
||||
_ops as msgops,
|
||||
)
|
||||
from ._ipc import (
|
||||
Channel,
|
||||
_mk_msg_type_err,
|
||||
)
|
||||
from ._ipc import Channel
|
||||
from ._streaming import MsgStream
|
||||
from ._state import (
|
||||
current_actor,
|
||||
|
@ -86,294 +94,9 @@ if TYPE_CHECKING:
|
|||
CallerInfo,
|
||||
)
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
async def _drain_to_final_msg(
|
||||
ctx: Context,
|
||||
|
||||
hide_tb: bool = True,
|
||||
msg_limit: int = 6,
|
||||
|
||||
) -> tuple[
|
||||
Return|None,
|
||||
list[MsgType]
|
||||
]:
|
||||
'''
|
||||
Drain IPC msgs delivered to the underlying rx-mem-chan
|
||||
`Context._recv_chan` from the runtime in search for a final
|
||||
result or error msg.
|
||||
|
||||
The motivation here is to ideally capture errors during ctxc
|
||||
conditions where a canc-request/or local error is sent but the
|
||||
local task also excepts and enters the
|
||||
`Portal.open_context().__aexit__()` block wherein we prefer to
|
||||
capture and raise any remote error or ctxc-ack as part of the
|
||||
`ctx.result()` cleanup and teardown sequence.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
raise_overrun: bool = not ctx._allow_overruns
|
||||
|
||||
# wait for a final context result by collecting (but
|
||||
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||
# from the far end.
|
||||
pre_result_drained: list[MsgType] = []
|
||||
return_msg: Return|None = None
|
||||
while not (
|
||||
ctx.maybe_error
|
||||
and not ctx._final_result_is_set()
|
||||
):
|
||||
try:
|
||||
# TODO: can remove?
|
||||
# await trio.lowlevel.checkpoint()
|
||||
|
||||
# NOTE: this REPL usage actually works here dawg! Bo
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# TODO: bad idea?
|
||||
# -[ ] wrap final outcome channel wait in a scope so
|
||||
# it can be cancelled out of band if needed?
|
||||
#
|
||||
# with trio.CancelScope() as res_cs:
|
||||
# ctx._res_scope = res_cs
|
||||
# msg: dict = await ctx._recv_chan.receive()
|
||||
# if res_cs.cancelled_caught:
|
||||
|
||||
# TODO: ensure there's no more hangs, debugging the
|
||||
# runtime pretty preaase!
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# TODO: can remove this finally?
|
||||
# we have no more need for the sync draining right
|
||||
# since we're can kinda guarantee the async
|
||||
# `.receive()` below will never block yah?
|
||||
#
|
||||
# if (
|
||||
# ctx._cancel_called and (
|
||||
# ctx.cancel_acked
|
||||
# # or ctx.chan._cancel_called
|
||||
# )
|
||||
# # or not ctx._final_result_is_set()
|
||||
# # ctx.outcome is not
|
||||
# # or ctx.chan._closed
|
||||
# ):
|
||||
# try:
|
||||
# msg: dict = await ctx._recv_chan.receive_nowait()()
|
||||
# except trio.WouldBlock:
|
||||
# log.warning(
|
||||
# 'When draining already `.cancel_called` ctx!\n'
|
||||
# 'No final msg arrived..\n'
|
||||
# )
|
||||
# break
|
||||
# else:
|
||||
# msg: dict = await ctx._recv_chan.receive()
|
||||
|
||||
# TODO: don't need it right jefe?
|
||||
# with trio.move_on_after(1) as cs:
|
||||
# if cs.cancelled_caught:
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# pray to the `trio` gawds that we're corrent with this
|
||||
# msg: dict = await ctx._recv_chan.receive()
|
||||
msg: MsgType = await ctx._recv_chan.receive()
|
||||
|
||||
# NOTE: we get here if the far end was
|
||||
# `ContextCancelled` in 2 cases:
|
||||
# 1. we requested the cancellation and thus
|
||||
# SHOULD NOT raise that far end error,
|
||||
# 2. WE DID NOT REQUEST that cancel and thus
|
||||
# SHOULD RAISE HERE!
|
||||
except trio.Cancelled:
|
||||
|
||||
# CASE 2: mask the local cancelled-error(s)
|
||||
# only when we are sure the remote error is
|
||||
# the source cause of this local task's
|
||||
# cancellation.
|
||||
ctx.maybe_raise()
|
||||
|
||||
# CASE 1: we DID request the cancel we simply
|
||||
# continue to bubble up as normal.
|
||||
raise
|
||||
|
||||
match msg:
|
||||
|
||||
# final result arrived!
|
||||
case Return(
|
||||
# cid=cid,
|
||||
pld=res,
|
||||
):
|
||||
ctx._result: Any = res
|
||||
log.runtime(
|
||||
'Context delivered final draining msg:\n'
|
||||
f'{pformat(msg)}'
|
||||
)
|
||||
# XXX: only close the rx mem chan AFTER
|
||||
# a final result is retreived.
|
||||
# if ctx._recv_chan:
|
||||
# await ctx._recv_chan.aclose()
|
||||
# TODO: ^ we don't need it right?
|
||||
return_msg = msg
|
||||
break
|
||||
|
||||
# far end task is still streaming to us so discard
|
||||
# and report depending on local ctx state.
|
||||
case Yield():
|
||||
pre_result_drained.append(msg)
|
||||
if (
|
||||
(ctx._stream.closed
|
||||
and (reason := 'stream was already closed')
|
||||
)
|
||||
or (ctx.cancel_acked
|
||||
and (reason := 'ctx cancelled other side')
|
||||
)
|
||||
or (ctx._cancel_called
|
||||
and (reason := 'ctx called `.cancel()`')
|
||||
)
|
||||
or (len(pre_result_drained) > msg_limit
|
||||
and (reason := f'"yield" limit={msg_limit}')
|
||||
)
|
||||
):
|
||||
log.cancel(
|
||||
'Cancelling `MsgStream` drain since '
|
||||
f'{reason}\n\n'
|
||||
f'<= {ctx.chan.uid}\n'
|
||||
f' |_{ctx._nsf}()\n\n'
|
||||
f'=> {ctx._task}\n'
|
||||
f' |_{ctx._stream}\n\n'
|
||||
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
return (
|
||||
return_msg,
|
||||
pre_result_drained,
|
||||
)
|
||||
|
||||
# drain up to the `msg_limit` hoping to get
|
||||
# a final result or error/ctxc.
|
||||
else:
|
||||
log.warning(
|
||||
'Ignoring "yield" msg during `ctx.result()` drain..\n'
|
||||
f'<= {ctx.chan.uid}\n'
|
||||
f' |_{ctx._nsf}()\n\n'
|
||||
f'=> {ctx._task}\n'
|
||||
f' |_{ctx._stream}\n\n'
|
||||
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
continue
|
||||
|
||||
# stream terminated, but no result yet..
|
||||
#
|
||||
# TODO: work out edge cases here where
|
||||
# a stream is open but the task also calls
|
||||
# this?
|
||||
# -[ ] should be a runtime error if a stream is open right?
|
||||
# Stop()
|
||||
case Stop():
|
||||
pre_result_drained.append(msg)
|
||||
log.cancel(
|
||||
'Remote stream terminated due to "stop" msg:\n\n'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
continue
|
||||
|
||||
# remote error msg, likely already handled inside
|
||||
# `Context._deliver_msg()`
|
||||
case Error():
|
||||
# TODO: can we replace this with `ctx.maybe_raise()`?
|
||||
# -[ ] would this be handier for this case maybe?
|
||||
# async with maybe_raise_on_exit() as raises:
|
||||
# if raises:
|
||||
# log.error('some msg about raising..')
|
||||
#
|
||||
re: Exception|None = ctx._remote_error
|
||||
if re:
|
||||
assert msg is ctx._cancel_msg
|
||||
# NOTE: this solved a super duper edge case XD
|
||||
# this was THE super duper edge case of:
|
||||
# - local task opens a remote task,
|
||||
# - requests remote cancellation of far end
|
||||
# ctx/tasks,
|
||||
# - needs to wait for the cancel ack msg
|
||||
# (ctxc) or some result in the race case
|
||||
# where the other side's task returns
|
||||
# before the cancel request msg is ever
|
||||
# rxed and processed,
|
||||
# - here this surrounding drain loop (which
|
||||
# iterates all ipc msgs until the ack or
|
||||
# an early result arrives) was NOT exiting
|
||||
# since we are the edge case: local task
|
||||
# does not re-raise any ctxc it receives
|
||||
# IFF **it** was the cancellation
|
||||
# requester..
|
||||
#
|
||||
# XXX will raise if necessary but ow break
|
||||
# from loop presuming any supressed error
|
||||
# (ctxc) should terminate the context!
|
||||
ctx._maybe_raise_remote_err(
|
||||
re,
|
||||
# NOTE: obvi we don't care if we
|
||||
# overran the far end if we're already
|
||||
# waiting on a final result (msg).
|
||||
# raise_overrun_from_self=False,
|
||||
raise_overrun_from_self=raise_overrun,
|
||||
)
|
||||
|
||||
break # OOOOOF, yeah obvi we need this..
|
||||
|
||||
# XXX we should never really get here
|
||||
# right! since `._deliver_msg()` should
|
||||
# always have detected an {'error': ..}
|
||||
# msg and already called this right!?!
|
||||
elif error := unpack_error(
|
||||
msg=msg,
|
||||
chan=ctx._portal.channel,
|
||||
hide_tb=False,
|
||||
):
|
||||
log.critical('SHOULD NEVER GET HERE!?')
|
||||
assert msg is ctx._cancel_msg
|
||||
assert error.msgdata == ctx._remote_error.msgdata
|
||||
assert error.ipc_msg == ctx._remote_error.ipc_msg
|
||||
from .devx._debug import pause
|
||||
await pause()
|
||||
ctx._maybe_cancel_and_set_remote_error(error)
|
||||
ctx._maybe_raise_remote_err(error)
|
||||
|
||||
else:
|
||||
# bubble the original src key error
|
||||
raise
|
||||
|
||||
# XXX should pretty much never get here unless someone
|
||||
# overrides the default `MsgType` spec.
|
||||
case _:
|
||||
pre_result_drained.append(msg)
|
||||
# It's definitely an internal error if any other
|
||||
# msg type without a`'cid'` field arrives here!
|
||||
if not msg.cid:
|
||||
raise InternalError(
|
||||
'Unexpected cid-missing msg?\n\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
|
||||
raise RuntimeError('Unknown msg type: {msg}')
|
||||
|
||||
else:
|
||||
log.cancel(
|
||||
'Skipping `MsgStream` drain since final outcome is set\n\n'
|
||||
f'{ctx.outcome}\n'
|
||||
)
|
||||
|
||||
return (
|
||||
return_msg,
|
||||
pre_result_drained,
|
||||
)
|
||||
|
||||
|
||||
class Unresolved:
|
||||
'''
|
||||
Placeholder value for `Context._result` until
|
||||
|
@ -423,9 +146,12 @@ class Context:
|
|||
|
||||
# the "feeder" channels for delivering message values to the
|
||||
# local task from the runtime's msg processing loop.
|
||||
_recv_chan: trio.MemoryReceiveChannel
|
||||
_rx_chan: trio.MemoryReceiveChannel
|
||||
_send_chan: trio.MemorySendChannel
|
||||
|
||||
# payload receiver
|
||||
_pld_rx: msgops.PldRx
|
||||
|
||||
# full "namespace-path" to target RPC function
|
||||
_nsf: NamespacePath
|
||||
|
||||
|
@ -447,7 +173,7 @@ class Context:
|
|||
_task: trio.lowlevel.Task|None = None
|
||||
|
||||
# TODO: cs around result waiting so we can cancel any
|
||||
# permanently blocking `._recv_chan.receive()` call in
|
||||
# permanently blocking `._rx_chan.receive()` call in
|
||||
# a drain loop?
|
||||
# _res_scope: trio.CancelScope|None = None
|
||||
|
||||
|
@ -504,14 +230,6 @@ class Context:
|
|||
_started_called: bool = False
|
||||
_stream_opened: bool = False
|
||||
_stream: MsgStream|None = None
|
||||
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
|
||||
'pld_codec',
|
||||
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
|
||||
)
|
||||
|
||||
@property
|
||||
def pld_codec(self) -> MsgCodec|None:
|
||||
return self._pld_codec_var.get()
|
||||
|
||||
# caller of `Portal.open_context()` for
|
||||
# logging purposes mostly
|
||||
|
@ -916,9 +634,8 @@ class Context:
|
|||
else:
|
||||
log.error(
|
||||
f'Remote context error:\n\n'
|
||||
|
||||
# f'{pformat(self)}\n'
|
||||
f'{error}\n'
|
||||
f'{pformat(self)}\n'
|
||||
)
|
||||
|
||||
# always record the cancelling actor's uid since its
|
||||
|
@ -955,24 +672,49 @@ class Context:
|
|||
and not self._is_self_cancelled()
|
||||
and not cs.cancel_called
|
||||
and not cs.cancelled_caught
|
||||
and (
|
||||
):
|
||||
if not (
|
||||
msgerr
|
||||
and
|
||||
# NOTE: allow user to config not cancelling the
|
||||
|
||||
# NOTE: we allow user to config not cancelling the
|
||||
# local scope on `MsgTypeError`s
|
||||
self._cancel_on_msgerr
|
||||
)
|
||||
and not self._cancel_on_msgerr
|
||||
):
|
||||
# TODO: it'd sure be handy to inject our own
|
||||
# `trio.Cancelled` subtype here ;)
|
||||
# https://github.com/goodboy/tractor/issues/368
|
||||
log.cancel('Cancelling local `.open_context()` scope!')
|
||||
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||
self._scope.cancel()
|
||||
|
||||
else:
|
||||
log.cancel('NOT cancelling local `.open_context()` scope!')
|
||||
message: str = (
|
||||
'NOT Cancelling `Context._scope` since,\n'
|
||||
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
|
||||
f'AND we got a msg-type-error!\n'
|
||||
f'{error}\n'
|
||||
)
|
||||
else:
|
||||
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||
|
||||
scope_info: str = 'No `self._scope: CancelScope` was set/used ?'
|
||||
if cs:
|
||||
scope_info: str = (
|
||||
f'self._scope: {cs}\n'
|
||||
f'|_ .cancel_called: {cs.cancel_called}\n'
|
||||
f'|_ .cancelled_caught: {cs.cancelled_caught}\n'
|
||||
f'|_ ._cancel_status: {cs._cancel_status}\n\n'
|
||||
|
||||
f'{self}\n'
|
||||
f'|_ ._is_self_cancelled(): {self._is_self_cancelled()}\n'
|
||||
f'|_ ._cancel_on_msgerr: {self._cancel_on_msgerr}\n\n'
|
||||
|
||||
f'msgerr: {msgerr}\n'
|
||||
)
|
||||
log.cancel(
|
||||
message
|
||||
+
|
||||
f'{scope_info}'
|
||||
)
|
||||
# TODO: maybe we should also call `._res_scope.cancel()` if it
|
||||
# exists to support cancelling any drain loop hangs?
|
||||
|
||||
|
@ -1256,7 +998,7 @@ class Context:
|
|||
# a ``.open_stream()`` block prior or there was some other
|
||||
# unanticipated error or cancellation from ``trio``.
|
||||
|
||||
if ctx._recv_chan._closed:
|
||||
if ctx._rx_chan._closed:
|
||||
raise trio.ClosedResourceError(
|
||||
'The underlying channel for this stream was already closed!\n'
|
||||
)
|
||||
|
@ -1276,7 +1018,7 @@ class Context:
|
|||
# stream WAS NOT just closed normally/gracefully.
|
||||
async with MsgStream(
|
||||
ctx=self,
|
||||
rx_chan=ctx._recv_chan,
|
||||
rx_chan=ctx._rx_chan,
|
||||
) as stream:
|
||||
|
||||
# NOTE: we track all existing streams per portal for
|
||||
|
@ -1427,13 +1169,12 @@ class Context:
|
|||
# boxed `StreamOverrun`. This is mostly useful for
|
||||
# supressing such faults during
|
||||
# cancellation/error/final-result handling inside
|
||||
# `_drain_to_final_msg()` such that we do not
|
||||
# `msg._ops.drain_to_final_msg()` such that we do not
|
||||
# raise such errors particularly in the case where
|
||||
# `._cancel_called == True`.
|
||||
not raise_overrun_from_self
|
||||
and isinstance(remote_error, RemoteActorError)
|
||||
|
||||
and remote_error.boxed_type_str == 'StreamOverrun'
|
||||
and remote_error.boxed_type is StreamOverrun
|
||||
|
||||
# and tuple(remote_error.msgdata['sender']) == our_uid
|
||||
and tuple(remote_error.sender) == our_uid
|
||||
|
@ -1503,12 +1244,12 @@ class Context:
|
|||
if self._final_result_is_set():
|
||||
return self._result
|
||||
|
||||
assert self._recv_chan
|
||||
assert self._rx_chan
|
||||
raise_overrun: bool = not self._allow_overruns
|
||||
if (
|
||||
self.maybe_error is None
|
||||
and
|
||||
not self._recv_chan._closed # type: ignore
|
||||
not self._rx_chan._closed # type: ignore
|
||||
):
|
||||
# wait for a final context result/error by "draining"
|
||||
# (by more or less ignoring) any bi-dir-stream "yield"
|
||||
|
@ -1516,7 +1257,7 @@ class Context:
|
|||
(
|
||||
return_msg,
|
||||
drained_msgs,
|
||||
) = await _drain_to_final_msg(
|
||||
) = await msgops.drain_to_final_msg(
|
||||
ctx=self,
|
||||
hide_tb=hide_tb,
|
||||
)
|
||||
|
@ -1802,8 +1543,7 @@ class Context:
|
|||
await self.chan.send(started_msg)
|
||||
|
||||
# raise any msg type error NO MATTER WHAT!
|
||||
except msgspec.ValidationError as verr:
|
||||
from tractor._ipc import _mk_msg_type_err
|
||||
except ValidationError as verr:
|
||||
raise _mk_msg_type_err(
|
||||
msg=msg_bytes,
|
||||
codec=codec,
|
||||
|
@ -1890,7 +1630,7 @@ class Context:
|
|||
- NEVER `return` early before delivering the msg!
|
||||
bc if the error is a ctxc and there is a task waiting on
|
||||
`.result()` we need the msg to be
|
||||
`send_chan.send_nowait()`-ed over the `._recv_chan` so
|
||||
`send_chan.send_nowait()`-ed over the `._rx_chan` so
|
||||
that the error is relayed to that waiter task and thus
|
||||
raised in user code!
|
||||
|
||||
|
@ -2201,24 +1941,11 @@ async def open_context_from_portal(
|
|||
# -> it's expected that if there is an error in this phase of
|
||||
# the dialog, the `Error` msg should be raised from the `msg`
|
||||
# handling block below.
|
||||
msg: Started = await ctx._recv_chan.receive()
|
||||
try:
|
||||
# the "first" value here is delivered by the callee's
|
||||
# ``Context.started()`` call.
|
||||
# first: Any = msg['started']
|
||||
first: Any = msg.pld
|
||||
ctx._started_called: bool = True
|
||||
|
||||
# except KeyError as src_error:
|
||||
except AttributeError as src_error:
|
||||
log.exception('Raising from unexpected msg!\n')
|
||||
_raise_from_no_key_in_msg(
|
||||
first: Any = await ctx._pld_rx.recv_pld(
|
||||
ctx=ctx,
|
||||
msg=msg,
|
||||
src_err=src_error,
|
||||
log=log,
|
||||
expect_msg=Started,
|
||||
)
|
||||
ctx._started_called: bool = True
|
||||
|
||||
uid: tuple = portal.channel.uid
|
||||
cid: str = ctx.cid
|
||||
|
@ -2540,7 +2267,7 @@ async def open_context_from_portal(
|
|||
# we tear down the runtime feeder chan last
|
||||
# to avoid premature stream clobbers.
|
||||
if (
|
||||
(rxchan := ctx._recv_chan)
|
||||
(rxchan := ctx._rx_chan)
|
||||
|
||||
# maybe TODO: yes i know the below check is
|
||||
# touching `trio` memchan internals..BUT, there are
|
||||
|
@ -2583,7 +2310,7 @@ async def open_context_from_portal(
|
|||
# underlying feeder channel is
|
||||
# once-and-only-CLOSED!
|
||||
with trio.CancelScope(shield=True):
|
||||
await ctx._recv_chan.aclose()
|
||||
await ctx._rx_chan.aclose()
|
||||
|
||||
# XXX: we always raise remote errors locally and
|
||||
# generally speaking mask runtime-machinery related
|
||||
|
@ -2628,9 +2355,9 @@ async def open_context_from_portal(
|
|||
# FINALLY, remove the context from runtime tracking and
|
||||
# exit!
|
||||
log.runtime(
|
||||
'Removing IPC ctx opened with peer\n'
|
||||
f'{uid}\n'
|
||||
f'|_{ctx}\n'
|
||||
'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
|
||||
f'uid: {uid}\n'
|
||||
f'cid: {ctx.cid}\n'
|
||||
)
|
||||
portal.actor._contexts.pop(
|
||||
(uid, cid),
|
||||
|
@ -2643,6 +2370,7 @@ def mk_context(
|
|||
nsf: NamespacePath,
|
||||
|
||||
msg_buffer_size: int = 2**6,
|
||||
pld_spec: Union[Type] = Any,
|
||||
|
||||
**kwargs,
|
||||
|
||||
|
@ -2662,12 +2390,18 @@ def mk_context(
|
|||
from .devx._code import find_caller_info
|
||||
caller_info: CallerInfo|None = find_caller_info()
|
||||
|
||||
pld_rx = msgops.PldRx(
|
||||
# _rx_mc=recv_chan,
|
||||
_msgdec=_codec.mk_dec(spec=pld_spec)
|
||||
)
|
||||
|
||||
ctx = Context(
|
||||
chan=chan,
|
||||
cid=cid,
|
||||
_actor=current_actor(),
|
||||
_send_chan=send_chan,
|
||||
_recv_chan=recv_chan,
|
||||
_rx_chan=recv_chan,
|
||||
_pld_rx=pld_rx,
|
||||
_nsf=nsf,
|
||||
_task=trio.lowlevel.current_task(),
|
||||
_caller_info=caller_info,
|
||||
|
|
|
@ -31,7 +31,7 @@ from typing import (
|
|||
Any,
|
||||
Callable,
|
||||
AsyncGenerator,
|
||||
# Type,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
from functools import partial
|
||||
from dataclasses import dataclass
|
||||
|
@ -46,12 +46,12 @@ from ._state import (
|
|||
from ._ipc import Channel
|
||||
from .log import get_logger
|
||||
from .msg import (
|
||||
Error,
|
||||
# Error,
|
||||
NamespacePath,
|
||||
Return,
|
||||
)
|
||||
from ._exceptions import (
|
||||
unpack_error,
|
||||
# unpack_error,
|
||||
NoResult,
|
||||
)
|
||||
from ._context import (
|
||||
|
@ -62,42 +62,44 @@ from ._streaming import (
|
|||
MsgStream,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
# TODO: rename to `unwrap_result()` and use
|
||||
# `._raise_from_no_key_in_msg()` (after tweak to
|
||||
# accept a `chan: Channel` arg) in key block!
|
||||
def _unwrap_msg(
|
||||
msg: Return|Error,
|
||||
channel: Channel,
|
||||
# TODO: remove and/or rework?
|
||||
# -[ ] rename to `unwrap_result()` and use
|
||||
# `._raise_from_unexpected_msg()` (after tweak to accept a `chan:
|
||||
# Channel` arg) in key block??
|
||||
# -[ ] pretty sure this is entirely covered by
|
||||
# `_exceptions._raise_from_unexpected_msg()` so REMOVE!
|
||||
# def _unwrap_msg(
|
||||
# msg: Return|Error,
|
||||
# ctx: Context,
|
||||
|
||||
hide_tb: bool = True,
|
||||
# hide_tb: bool = True,
|
||||
|
||||
) -> Any:
|
||||
'''
|
||||
Unwrap a final result from a `{return: <Any>}` IPC msg.
|
||||
# ) -> Any:
|
||||
# '''
|
||||
# Unwrap a final result from a `{return: <Any>}` IPC msg.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
# '''
|
||||
# __tracebackhide__: bool = hide_tb
|
||||
# try:
|
||||
# return msg.pld
|
||||
# except AttributeError as err:
|
||||
|
||||
try:
|
||||
return msg.pld
|
||||
# return msg['return']
|
||||
# except KeyError as ke:
|
||||
except AttributeError as err:
|
||||
# # internal error should never get here
|
||||
# # assert msg.get('cid'), (
|
||||
# assert msg.cid, (
|
||||
# "Received internal error at portal?"
|
||||
# )
|
||||
|
||||
# internal error should never get here
|
||||
# assert msg.get('cid'), (
|
||||
assert msg.cid, (
|
||||
"Received internal error at portal?"
|
||||
)
|
||||
|
||||
raise unpack_error(
|
||||
msg,
|
||||
channel
|
||||
) from err
|
||||
# raise unpack_error(
|
||||
# msg,
|
||||
# ctx.chan,
|
||||
# ) from err
|
||||
|
||||
|
||||
class Portal:
|
||||
|
@ -123,17 +125,21 @@ class Portal:
|
|||
# connected (peer) actors.
|
||||
cancel_timeout: float = 0.5
|
||||
|
||||
def __init__(self, channel: Channel) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
channel: Channel,
|
||||
) -> None:
|
||||
|
||||
self.chan = channel
|
||||
# during the portal's lifetime
|
||||
self._result_msg: dict|None = None
|
||||
self._final_result: Any|None = None
|
||||
|
||||
# When set to a ``Context`` (when _submit_for_result is called)
|
||||
# it is expected that ``result()`` will be awaited at some
|
||||
# point.
|
||||
self._expect_result: Context | None = None
|
||||
self._expect_result_ctx: Context|None = None
|
||||
self._streams: set[MsgStream] = set()
|
||||
self.actor = current_actor()
|
||||
self.actor: Actor = current_actor()
|
||||
|
||||
@property
|
||||
def channel(self) -> Channel:
|
||||
|
@ -147,6 +153,7 @@ class Portal:
|
|||
)
|
||||
return self.chan
|
||||
|
||||
# TODO: factor this out into an `ActorNursery` wrapper
|
||||
async def _submit_for_result(
|
||||
self,
|
||||
ns: str,
|
||||
|
@ -154,27 +161,18 @@ class Portal:
|
|||
**kwargs
|
||||
) -> None:
|
||||
|
||||
assert self._expect_result is None, (
|
||||
"A pending main result has already been submitted"
|
||||
if self._expect_result_ctx is not None:
|
||||
raise RuntimeError(
|
||||
'A pending main result has already been submitted'
|
||||
)
|
||||
|
||||
self._expect_result = await self.actor.start_remote_task(
|
||||
self._expect_result_ctx = await self.actor.start_remote_task(
|
||||
self.channel,
|
||||
nsf=NamespacePath(f'{ns}:{func}'),
|
||||
kwargs=kwargs,
|
||||
portal=self,
|
||||
)
|
||||
|
||||
async def _return_once(
|
||||
self,
|
||||
ctx: Context,
|
||||
|
||||
) -> Return:
|
||||
|
||||
assert ctx._remote_func_type == 'asyncfunc' # single response
|
||||
msg: Return = await ctx._recv_chan.receive()
|
||||
return msg
|
||||
|
||||
async def result(self) -> Any:
|
||||
'''
|
||||
Return the result(s) from the remote actor's "main" task.
|
||||
|
@ -188,7 +186,7 @@ class Portal:
|
|||
raise exc
|
||||
|
||||
# not expecting a "main" result
|
||||
if self._expect_result is None:
|
||||
if self._expect_result_ctx is None:
|
||||
log.warning(
|
||||
f"Portal for {self.channel.uid} not expecting a final"
|
||||
" result?\nresult() should only be called if subactor"
|
||||
|
@ -196,17 +194,15 @@ class Portal:
|
|||
return NoResult
|
||||
|
||||
# expecting a "main" result
|
||||
assert self._expect_result
|
||||
assert self._expect_result_ctx
|
||||
|
||||
if self._result_msg is None:
|
||||
self._result_msg = await self._return_once(
|
||||
self._expect_result
|
||||
if self._final_result is None:
|
||||
self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld(
|
||||
ctx=self._expect_result_ctx,
|
||||
expect_msg=Return,
|
||||
)
|
||||
|
||||
return _unwrap_msg(
|
||||
self._result_msg,
|
||||
self.channel,
|
||||
)
|
||||
return self._final_result
|
||||
|
||||
async def _cancel_streams(self):
|
||||
# terminate all locally running async generator
|
||||
|
@ -337,11 +333,9 @@ class Portal:
|
|||
kwargs=kwargs,
|
||||
portal=self,
|
||||
)
|
||||
ctx._portal: Portal = self
|
||||
msg: Return = await self._return_once(ctx)
|
||||
return _unwrap_msg(
|
||||
msg,
|
||||
self.channel,
|
||||
return await ctx._pld_rx.recv_pld(
|
||||
ctx=ctx,
|
||||
expect_msg=Return,
|
||||
)
|
||||
|
||||
async def run(
|
||||
|
@ -391,10 +385,9 @@ class Portal:
|
|||
kwargs=kwargs,
|
||||
portal=self,
|
||||
)
|
||||
ctx._portal = self
|
||||
return _unwrap_msg(
|
||||
await self._return_once(ctx),
|
||||
self.channel,
|
||||
return await ctx._pld_rx.recv_pld(
|
||||
ctx=ctx,
|
||||
expect_msg=Return,
|
||||
)
|
||||
|
||||
@acm
|
||||
|
@ -436,7 +429,7 @@ class Portal:
|
|||
# deliver receive only stream
|
||||
async with MsgStream(
|
||||
ctx=ctx,
|
||||
rx_chan=ctx._recv_chan,
|
||||
rx_chan=ctx._rx_chan,
|
||||
) as rchan:
|
||||
self._streams.add(rchan)
|
||||
yield rchan
|
||||
|
|
|
@ -817,8 +817,8 @@ class Actor:
|
|||
state.max_buffer_size = msg_buffer_size
|
||||
|
||||
except KeyError:
|
||||
log.runtime(
|
||||
f'Creating NEW IPC ctx for\n'
|
||||
log.debug(
|
||||
f'Allocate new IPC ctx for\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
f'cid: {cid}\n'
|
||||
)
|
||||
|
@ -906,7 +906,7 @@ class Actor:
|
|||
# this should be immediate and does not (yet) wait for the
|
||||
# remote child task to sync via `Context.started()`.
|
||||
with trio.fail_after(ack_timeout):
|
||||
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
||||
first_msg: msgtypes.StartAck = await ctx._rx_chan.receive()
|
||||
try:
|
||||
functype: str = first_msg.functype
|
||||
except AttributeError:
|
||||
|
|
|
@ -35,7 +35,7 @@ import warnings
|
|||
import trio
|
||||
|
||||
from ._exceptions import (
|
||||
_raise_from_no_key_in_msg,
|
||||
# _raise_from_no_key_in_msg,
|
||||
ContextCancelled,
|
||||
)
|
||||
from .log import get_logger
|
||||
|
@ -44,8 +44,9 @@ from .trionics import (
|
|||
BroadcastReceiver,
|
||||
)
|
||||
from tractor.msg import (
|
||||
Return,
|
||||
Stop,
|
||||
# Return,
|
||||
# Stop,
|
||||
MsgType,
|
||||
Yield,
|
||||
)
|
||||
|
||||
|
@ -94,23 +95,22 @@ class MsgStream(trio.abc.Channel):
|
|||
self._eoc: bool|trio.EndOfChannel = False
|
||||
self._closed: bool|trio.ClosedResourceError = False
|
||||
|
||||
# TODO: could we make this a direct method bind to `PldRx`?
|
||||
# -> receive_nowait = PldRx.recv_pld
|
||||
# |_ means latter would have to accept `MsgStream`-as-`self`?
|
||||
# => should be fine as long as,
|
||||
# -[ ] both define `._rx_chan`
|
||||
# -[ ] .ctx is bound into `PldRx` using a `@cm`?
|
||||
#
|
||||
# delegate directly to underlying mem channel
|
||||
def receive_nowait(
|
||||
self,
|
||||
allow_msgs: list[str] = Yield,
|
||||
expect_msg: MsgType = Yield,
|
||||
):
|
||||
msg: Yield|Stop = self._rx_chan.receive_nowait()
|
||||
# TODO: replace msg equiv of this or does the `.pld`
|
||||
# interface read already satisfy it? I think so, yes?
|
||||
try:
|
||||
return msg.pld
|
||||
except AttributeError as attrerr:
|
||||
_raise_from_no_key_in_msg(
|
||||
ctx=self._ctx,
|
||||
msg=msg,
|
||||
src_err=attrerr,
|
||||
log=log,
|
||||
stream=self,
|
||||
ctx: Context = self._ctx
|
||||
return ctx._pld_rx.recv_pld_nowait(
|
||||
ctx=ctx,
|
||||
expect_msg=expect_msg,
|
||||
)
|
||||
|
||||
async def receive(
|
||||
|
@ -146,24 +146,9 @@ class MsgStream(trio.abc.Channel):
|
|||
|
||||
src_err: Exception|None = None # orig tb
|
||||
try:
|
||||
try:
|
||||
msg: Yield = await self._rx_chan.receive()
|
||||
return msg.pld
|
||||
|
||||
# TODO: implement with match: instead?
|
||||
except AttributeError as attrerr:
|
||||
# src_err = kerr
|
||||
src_err = attrerr
|
||||
|
||||
# NOTE: may raise any of the below error types
|
||||
# includg EoC when a 'stop' msg is found.
|
||||
_raise_from_no_key_in_msg(
|
||||
ctx=self._ctx,
|
||||
msg=msg,
|
||||
src_err=attrerr,
|
||||
log=log,
|
||||
stream=self,
|
||||
)
|
||||
ctx: Context = self._ctx
|
||||
return await ctx._pld_rx.recv_pld(ctx=ctx)
|
||||
|
||||
# XXX: the stream terminates on either of:
|
||||
# - via `self._rx_chan.receive()` raising after manual closure
|
||||
|
@ -228,7 +213,7 @@ class MsgStream(trio.abc.Channel):
|
|||
# probably want to instead raise the remote error
|
||||
# over the end-of-stream connection error since likely
|
||||
# the remote error was the source cause?
|
||||
ctx: Context = self._ctx
|
||||
# ctx: Context = self._ctx
|
||||
ctx.maybe_raise(
|
||||
raise_ctxc_from_self_call=True,
|
||||
)
|
||||
|
@ -292,7 +277,8 @@ class MsgStream(trio.abc.Channel):
|
|||
while not drained:
|
||||
try:
|
||||
maybe_final_msg = self.receive_nowait(
|
||||
allow_msgs=[Yield, Return],
|
||||
# allow_msgs=[Yield, Return],
|
||||
expect_msg=Yield,
|
||||
)
|
||||
if maybe_final_msg:
|
||||
log.debug(
|
||||
|
@ -472,6 +458,9 @@ class MsgStream(trio.abc.Channel):
|
|||
self,
|
||||
# use memory channel size by default
|
||||
self._rx_chan._state.max_buffer_size, # type: ignore
|
||||
|
||||
# TODO: can remove this kwarg right since
|
||||
# by default behaviour is to do this anyway?
|
||||
receive_afunc=self.receive,
|
||||
)
|
||||
|
||||
|
@ -517,19 +506,11 @@ class MsgStream(trio.abc.Channel):
|
|||
raise self._closed
|
||||
|
||||
try:
|
||||
# await self._ctx.chan.send(
|
||||
# payload={
|
||||
# 'yield': data,
|
||||
# 'cid': self._ctx.cid,
|
||||
# },
|
||||
# # hide_tb=hide_tb,
|
||||
# )
|
||||
await self._ctx.chan.send(
|
||||
payload=Yield(
|
||||
cid=self._ctx.cid,
|
||||
pld=data,
|
||||
),
|
||||
# hide_tb=hide_tb,
|
||||
)
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
|
@ -562,7 +543,7 @@ def stream(func: Callable) -> Callable:
|
|||
'''
|
||||
# TODO: apply whatever solution ``mypy`` ends up picking for this:
|
||||
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
|
||||
func._tractor_stream_function = True # type: ignore
|
||||
func._tractor_stream_function: bool = True # type: ignore
|
||||
|
||||
sig = inspect.signature(func)
|
||||
params = sig.parameters
|
||||
|
|
|
@ -0,0 +1,559 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Near-application abstractions for `MsgType.pld: PayloadT|Raw`
|
||||
delivery, filtering and type checking as well as generic
|
||||
operational helpers for processing transaction flows.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import (
|
||||
# asynccontextmanager as acm,
|
||||
contextmanager as cm,
|
||||
)
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
Type,
|
||||
TYPE_CHECKING,
|
||||
# Union,
|
||||
)
|
||||
# ------ - ------
|
||||
from msgspec import (
|
||||
msgpack,
|
||||
Raw,
|
||||
Struct,
|
||||
ValidationError,
|
||||
)
|
||||
import trio
|
||||
# ------ - ------
|
||||
from tractor.log import get_logger
|
||||
from tractor._exceptions import (
|
||||
MessagingError,
|
||||
InternalError,
|
||||
_raise_from_unexpected_msg,
|
||||
MsgTypeError,
|
||||
_mk_msg_type_err,
|
||||
pack_from_raise,
|
||||
)
|
||||
from ._codec import (
|
||||
mk_dec,
|
||||
MsgDec,
|
||||
)
|
||||
from .types import (
|
||||
CancelAck,
|
||||
Error,
|
||||
MsgType,
|
||||
PayloadT,
|
||||
Return,
|
||||
Started,
|
||||
Stop,
|
||||
Yield,
|
||||
# pretty_struct,
|
||||
)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor._context import Context
|
||||
from tractor._streaming import MsgStream
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
class PldRx(Struct):
|
||||
'''
|
||||
A "msg payload receiver".
|
||||
|
||||
The pairing of a "feeder" `trio.abc.ReceiveChannel` and an
|
||||
interchange-specific (eg. msgpack) payload field decoder. The
|
||||
validation/type-filtering rules are runtime mutable and allow
|
||||
type constraining the set of `MsgType.pld: Raw|PayloadT`
|
||||
values at runtime, per IPC task-context.
|
||||
|
||||
This abstraction, being just below "user application code",
|
||||
allows for the equivalent of our `MsgCodec` (used for
|
||||
typer-filtering IPC dialog protocol msgs against a msg-spec)
|
||||
but with granular control around payload delivery (i.e. the
|
||||
data-values user code actually sees and uses (the blobs that
|
||||
are "shuttled" by the wrapping dialog prot) such that invalid
|
||||
`.pld: Raw` can be decoded and handled by IPC-primitive user
|
||||
code (i.e. that operates on `Context` and `Msgstream` APIs)
|
||||
without knowledge of the lower level `Channel`/`MsgTransport`
|
||||
primitives nor the `MsgCodec` in use. Further, lazily decoding
|
||||
payload blobs allows for topical (and maybe intentionally
|
||||
"partial") encryption of msg field subsets.
|
||||
|
||||
'''
|
||||
# TODO: better to bind it here?
|
||||
# _rx_mc: trio.MemoryReceiveChannel
|
||||
_msgdec: MsgDec = mk_dec(spec=Any)
|
||||
|
||||
_ipc: Context|MsgStream|None = None
|
||||
|
||||
@cm
|
||||
def apply_to_ipc(
|
||||
self,
|
||||
ipc_prim: Context|MsgStream,
|
||||
|
||||
) -> PldRx:
|
||||
'''
|
||||
Apply this payload receiver to an IPC primitive type, one
|
||||
of `Context` or `MsgStream`.
|
||||
|
||||
'''
|
||||
self._ipc = ipc_prim
|
||||
try:
|
||||
yield self
|
||||
finally:
|
||||
self._ipc = None
|
||||
|
||||
@property
|
||||
def dec(self) -> msgpack.Decoder:
|
||||
return self._msgdec.dec
|
||||
|
||||
def recv_pld_nowait(
|
||||
self,
|
||||
# TODO: make this `MsgStream` compat as well, see above^
|
||||
# ipc_prim: Context|MsgStream,
|
||||
ctx: Context,
|
||||
|
||||
ipc_msg: MsgType|None = None,
|
||||
expect_msg: Type[MsgType]|None = None,
|
||||
|
||||
**kwargs,
|
||||
|
||||
) -> Any|Raw:
|
||||
|
||||
msg: MsgType = (
|
||||
ipc_msg
|
||||
or
|
||||
|
||||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||
ctx._rx_chan.receive_nowait()
|
||||
)
|
||||
return self.dec_msg(msg)
|
||||
|
||||
async def recv_pld(
|
||||
self,
|
||||
ctx: Context,
|
||||
ipc_msg: MsgType|None = None,
|
||||
expect_msg: Type[MsgType]|None = None,
|
||||
|
||||
**kwargs
|
||||
|
||||
) -> Any|Raw:
|
||||
'''
|
||||
Receive a `MsgType`, then decode and return its `.pld` field.
|
||||
|
||||
'''
|
||||
msg: MsgType = (
|
||||
ipc_msg
|
||||
or
|
||||
|
||||
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||
await ctx._rx_chan.receive()
|
||||
)
|
||||
return self.dec_msg(
|
||||
msg,
|
||||
ctx=ctx,
|
||||
expect_msg=expect_msg,
|
||||
)
|
||||
|
||||
def dec_msg(
|
||||
self,
|
||||
msg: MsgType,
|
||||
ctx: Context,
|
||||
expect_msg: Type[MsgType]|None = None,
|
||||
|
||||
) -> PayloadT|Raw:
|
||||
'''
|
||||
Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and
|
||||
return the value or raise an appropriate error.
|
||||
|
||||
'''
|
||||
match msg:
|
||||
# payload-data shuttle msg; deliver the `.pld` value
|
||||
# directly to IPC (primitive) client-consumer code.
|
||||
case (
|
||||
Started(pld=pld) # sync phase
|
||||
|Yield(pld=pld) # streaming phase
|
||||
|Return(pld=pld) # termination phase
|
||||
):
|
||||
try:
|
||||
pld: PayloadT = self._msgdec.decode(pld)
|
||||
log.runtime(
|
||||
'Decode msg payload\n\n'
|
||||
f'{msg}\n\n'
|
||||
f'{pld}\n'
|
||||
)
|
||||
return pld
|
||||
|
||||
# XXX pld-type failure
|
||||
except ValidationError as src_err:
|
||||
msgterr: MsgTypeError = _mk_msg_type_err(
|
||||
msg=msg,
|
||||
codec=self._dec,
|
||||
src_validation_error=src_err,
|
||||
)
|
||||
msg: Error = pack_from_raise(
|
||||
local_err=msgterr,
|
||||
cid=msg.cid,
|
||||
src_uid=ctx.chan.uid,
|
||||
)
|
||||
|
||||
# XXX some other decoder specific failure?
|
||||
# except TypeError as src_error:
|
||||
# from .devx import mk_pdb
|
||||
# mk_pdb().set_trace()
|
||||
# raise src_error
|
||||
|
||||
# a runtime-internal RPC endpoint response.
|
||||
# always passthrough since (internal) runtime
|
||||
# responses are generally never exposed to consumer
|
||||
# code.
|
||||
case CancelAck(
|
||||
pld=bool(cancelled)
|
||||
):
|
||||
return cancelled
|
||||
|
||||
case Error():
|
||||
src_err = MessagingError(
|
||||
'IPC dialog termination by msg'
|
||||
)
|
||||
|
||||
case _:
|
||||
src_err = InternalError(
|
||||
'Unknown IPC msg ??\n\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
|
||||
# fallthrough and raise from `src_err`
|
||||
_raise_from_unexpected_msg(
|
||||
ctx=ctx,
|
||||
msg=msg,
|
||||
src_err=src_err,
|
||||
log=log,
|
||||
expect_msg=expect_msg,
|
||||
hide_tb=False,
|
||||
)
|
||||
|
||||
async def recv_msg_w_pld(
|
||||
self,
|
||||
ipc: Context|MsgStream,
|
||||
|
||||
) -> tuple[MsgType, PayloadT]:
|
||||
'''
|
||||
Retrieve the next avail IPC msg, decode it's payload, and return
|
||||
the pair of refs.
|
||||
|
||||
'''
|
||||
msg: MsgType = await ipc._rx_chan.receive()
|
||||
|
||||
# TODO: is there some way we can inject the decoded
|
||||
# payload into an existing output buffer for the original
|
||||
# msg instance?
|
||||
pld: PayloadT = self.dec_msg(
|
||||
msg,
|
||||
ctx=ipc,
|
||||
)
|
||||
return msg, pld
|
||||
|
||||
|
||||
async def drain_to_final_msg(
|
||||
ctx: Context,
|
||||
|
||||
hide_tb: bool = True,
|
||||
msg_limit: int = 6,
|
||||
|
||||
) -> tuple[
|
||||
Return|None,
|
||||
list[MsgType]
|
||||
]:
|
||||
'''
|
||||
Drain IPC msgs delivered to the underlying IPC primitive's
|
||||
rx-mem-chan (eg. `Context._rx_chan`) from the runtime in
|
||||
search for a final result or error.
|
||||
|
||||
The motivation here is to ideally capture errors during ctxc
|
||||
conditions where a canc-request/or local error is sent but the
|
||||
local task also excepts and enters the
|
||||
`Portal.open_context().__aexit__()` block wherein we prefer to
|
||||
capture and raise any remote error or ctxc-ack as part of the
|
||||
`ctx.result()` cleanup and teardown sequence.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
raise_overrun: bool = not ctx._allow_overruns
|
||||
|
||||
# wait for a final context result by collecting (but
|
||||
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||
# from the far end.
|
||||
pre_result_drained: list[MsgType] = []
|
||||
return_msg: Return|None = None
|
||||
while not (
|
||||
ctx.maybe_error
|
||||
and not ctx._final_result_is_set()
|
||||
):
|
||||
try:
|
||||
# TODO: can remove?
|
||||
# await trio.lowlevel.checkpoint()
|
||||
|
||||
# NOTE: this REPL usage actually works here dawg! Bo
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# TODO: bad idea?
|
||||
# -[ ] wrap final outcome channel wait in a scope so
|
||||
# it can be cancelled out of band if needed?
|
||||
#
|
||||
# with trio.CancelScope() as res_cs:
|
||||
# ctx._res_scope = res_cs
|
||||
# msg: dict = await ctx._rx_chan.receive()
|
||||
# if res_cs.cancelled_caught:
|
||||
|
||||
# TODO: ensure there's no more hangs, debugging the
|
||||
# runtime pretty preaase!
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# TODO: can remove this finally?
|
||||
# we have no more need for the sync draining right
|
||||
# since we're can kinda guarantee the async
|
||||
# `.receive()` below will never block yah?
|
||||
#
|
||||
# if (
|
||||
# ctx._cancel_called and (
|
||||
# ctx.cancel_acked
|
||||
# # or ctx.chan._cancel_called
|
||||
# )
|
||||
# # or not ctx._final_result_is_set()
|
||||
# # ctx.outcome is not
|
||||
# # or ctx.chan._closed
|
||||
# ):
|
||||
# try:
|
||||
# msg: dict = await ctx._rx_chan.receive_nowait()()
|
||||
# except trio.WouldBlock:
|
||||
# log.warning(
|
||||
# 'When draining already `.cancel_called` ctx!\n'
|
||||
# 'No final msg arrived..\n'
|
||||
# )
|
||||
# break
|
||||
# else:
|
||||
# msg: dict = await ctx._rx_chan.receive()
|
||||
|
||||
# TODO: don't need it right jefe?
|
||||
# with trio.move_on_after(1) as cs:
|
||||
# if cs.cancelled_caught:
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
|
||||
# pray to the `trio` gawds that we're corrent with this
|
||||
# msg: dict = await ctx._rx_chan.receive()
|
||||
msg, pld = await ctx._pld_rx.recv_msg_w_pld(ipc=ctx)
|
||||
|
||||
# NOTE: we get here if the far end was
|
||||
# `ContextCancelled` in 2 cases:
|
||||
# 1. we requested the cancellation and thus
|
||||
# SHOULD NOT raise that far end error,
|
||||
# 2. WE DID NOT REQUEST that cancel and thus
|
||||
# SHOULD RAISE HERE!
|
||||
except trio.Cancelled:
|
||||
|
||||
# CASE 2: mask the local cancelled-error(s)
|
||||
# only when we are sure the remote error is
|
||||
# the source cause of this local task's
|
||||
# cancellation.
|
||||
ctx.maybe_raise()
|
||||
|
||||
# CASE 1: we DID request the cancel we simply
|
||||
# continue to bubble up as normal.
|
||||
raise
|
||||
|
||||
match msg:
|
||||
|
||||
# final result arrived!
|
||||
case Return(
|
||||
# cid=cid,
|
||||
# pld=res,
|
||||
):
|
||||
# ctx._result: Any = res
|
||||
ctx._result: Any = pld
|
||||
log.runtime(
|
||||
'Context delivered final draining msg:\n'
|
||||
f'{pformat(msg)}'
|
||||
)
|
||||
# XXX: only close the rx mem chan AFTER
|
||||
# a final result is retreived.
|
||||
# if ctx._rx_chan:
|
||||
# await ctx._rx_chan.aclose()
|
||||
# TODO: ^ we don't need it right?
|
||||
return_msg = msg
|
||||
break
|
||||
|
||||
# far end task is still streaming to us so discard
|
||||
# and report depending on local ctx state.
|
||||
case Yield():
|
||||
pre_result_drained.append(msg)
|
||||
if (
|
||||
(ctx._stream.closed
|
||||
and (reason := 'stream was already closed')
|
||||
)
|
||||
or (ctx.cancel_acked
|
||||
and (reason := 'ctx cancelled other side')
|
||||
)
|
||||
or (ctx._cancel_called
|
||||
and (reason := 'ctx called `.cancel()`')
|
||||
)
|
||||
or (len(pre_result_drained) > msg_limit
|
||||
and (reason := f'"yield" limit={msg_limit}')
|
||||
)
|
||||
):
|
||||
log.cancel(
|
||||
'Cancelling `MsgStream` drain since '
|
||||
f'{reason}\n\n'
|
||||
f'<= {ctx.chan.uid}\n'
|
||||
f' |_{ctx._nsf}()\n\n'
|
||||
f'=> {ctx._task}\n'
|
||||
f' |_{ctx._stream}\n\n'
|
||||
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
return (
|
||||
return_msg,
|
||||
pre_result_drained,
|
||||
)
|
||||
|
||||
# drain up to the `msg_limit` hoping to get
|
||||
# a final result or error/ctxc.
|
||||
else:
|
||||
log.warning(
|
||||
'Ignoring "yield" msg during `ctx.result()` drain..\n'
|
||||
f'<= {ctx.chan.uid}\n'
|
||||
f' |_{ctx._nsf}()\n\n'
|
||||
f'=> {ctx._task}\n'
|
||||
f' |_{ctx._stream}\n\n'
|
||||
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
continue
|
||||
|
||||
# stream terminated, but no result yet..
|
||||
#
|
||||
# TODO: work out edge cases here where
|
||||
# a stream is open but the task also calls
|
||||
# this?
|
||||
# -[ ] should be a runtime error if a stream is open right?
|
||||
# Stop()
|
||||
case Stop():
|
||||
pre_result_drained.append(msg)
|
||||
log.cancel(
|
||||
'Remote stream terminated due to "stop" msg:\n\n'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
continue
|
||||
|
||||
# remote error msg, likely already handled inside
|
||||
# `Context._deliver_msg()`
|
||||
case Error():
|
||||
# TODO: can we replace this with `ctx.maybe_raise()`?
|
||||
# -[ ] would this be handier for this case maybe?
|
||||
# async with maybe_raise_on_exit() as raises:
|
||||
# if raises:
|
||||
# log.error('some msg about raising..')
|
||||
#
|
||||
re: Exception|None = ctx._remote_error
|
||||
if re:
|
||||
assert msg is ctx._cancel_msg
|
||||
# NOTE: this solved a super duper edge case XD
|
||||
# this was THE super duper edge case of:
|
||||
# - local task opens a remote task,
|
||||
# - requests remote cancellation of far end
|
||||
# ctx/tasks,
|
||||
# - needs to wait for the cancel ack msg
|
||||
# (ctxc) or some result in the race case
|
||||
# where the other side's task returns
|
||||
# before the cancel request msg is ever
|
||||
# rxed and processed,
|
||||
# - here this surrounding drain loop (which
|
||||
# iterates all ipc msgs until the ack or
|
||||
# an early result arrives) was NOT exiting
|
||||
# since we are the edge case: local task
|
||||
# does not re-raise any ctxc it receives
|
||||
# IFF **it** was the cancellation
|
||||
# requester..
|
||||
#
|
||||
# XXX will raise if necessary but ow break
|
||||
# from loop presuming any supressed error
|
||||
# (ctxc) should terminate the context!
|
||||
ctx._maybe_raise_remote_err(
|
||||
re,
|
||||
# NOTE: obvi we don't care if we
|
||||
# overran the far end if we're already
|
||||
# waiting on a final result (msg).
|
||||
# raise_overrun_from_self=False,
|
||||
raise_overrun_from_self=raise_overrun,
|
||||
)
|
||||
|
||||
break # OOOOOF, yeah obvi we need this..
|
||||
|
||||
# XXX we should never really get here
|
||||
# right! since `._deliver_msg()` should
|
||||
# always have detected an {'error': ..}
|
||||
# msg and already called this right!?!
|
||||
# elif error := unpack_error(
|
||||
# msg=msg,
|
||||
# chan=ctx._portal.channel,
|
||||
# hide_tb=False,
|
||||
# ):
|
||||
# log.critical('SHOULD NEVER GET HERE!?')
|
||||
# assert msg is ctx._cancel_msg
|
||||
# assert error.msgdata == ctx._remote_error.msgdata
|
||||
# assert error.ipc_msg == ctx._remote_error.ipc_msg
|
||||
# from .devx._debug import pause
|
||||
# await pause()
|
||||
# ctx._maybe_cancel_and_set_remote_error(error)
|
||||
# ctx._maybe_raise_remote_err(error)
|
||||
|
||||
else:
|
||||
# bubble the original src key error
|
||||
raise
|
||||
|
||||
# XXX should pretty much never get here unless someone
|
||||
# overrides the default `MsgType` spec.
|
||||
case _:
|
||||
pre_result_drained.append(msg)
|
||||
# It's definitely an internal error if any other
|
||||
# msg type without a`'cid'` field arrives here!
|
||||
if not msg.cid:
|
||||
raise InternalError(
|
||||
'Unexpected cid-missing msg?\n\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
|
||||
raise RuntimeError('Unknown msg type: {msg}')
|
||||
|
||||
else:
|
||||
log.cancel(
|
||||
'Skipping `MsgStream` drain since final outcome is set\n\n'
|
||||
f'{ctx.outcome}\n'
|
||||
)
|
||||
|
||||
return (
|
||||
return_msg,
|
||||
pre_result_drained,
|
||||
)
|
Loading…
Reference in New Issue