Compare commits
11 Commits
6e0ef76128
...
a5a0e6854b
Author | SHA1 | Date |
---|---|---|
|
a5a0e6854b | |
|
c383978402 | |
|
08fcd3fb03 | |
|
adba454d1d | |
|
4bab998ff9 | |
|
c25c77c573 | |
|
188ff0e0e5 | |
|
6b30c86eca | |
|
6aa52417ef | |
|
18e97a8f9a | |
|
5eb9144921 |
|
@ -25,26 +25,31 @@ disjoint, parallel executing tasks in separate actors.
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from collections import deque
|
from collections import deque
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import (
|
||||||
from contextvars import ContextVar
|
asynccontextmanager as acm,
|
||||||
|
)
|
||||||
from dataclasses import (
|
from dataclasses import (
|
||||||
dataclass,
|
dataclass,
|
||||||
field,
|
field,
|
||||||
)
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import inspect
|
import inspect
|
||||||
import msgspec
|
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
|
Type,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
Union,
|
||||||
)
|
)
|
||||||
import warnings
|
import warnings
|
||||||
|
# ------ - ------
|
||||||
import trio
|
import trio
|
||||||
|
from msgspec import (
|
||||||
|
ValidationError,
|
||||||
|
)
|
||||||
|
# ------ - ------
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
InternalError,
|
InternalError,
|
||||||
|
@ -53,7 +58,6 @@ from ._exceptions import (
|
||||||
StreamOverrun,
|
StreamOverrun,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
_raise_from_no_key_in_msg,
|
|
||||||
)
|
)
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .msg import (
|
from .msg import (
|
||||||
|
@ -70,8 +74,12 @@ from .msg import (
|
||||||
current_codec,
|
current_codec,
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
types as msgtypes,
|
types as msgtypes,
|
||||||
|
_ops as msgops,
|
||||||
|
)
|
||||||
|
from ._ipc import (
|
||||||
|
Channel,
|
||||||
|
_mk_msg_type_err,
|
||||||
)
|
)
|
||||||
from ._ipc import Channel
|
|
||||||
from ._streaming import MsgStream
|
from ._streaming import MsgStream
|
||||||
from ._state import (
|
from ._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
|
@ -86,294 +94,9 @@ if TYPE_CHECKING:
|
||||||
CallerInfo,
|
CallerInfo,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def _drain_to_final_msg(
|
|
||||||
ctx: Context,
|
|
||||||
|
|
||||||
hide_tb: bool = True,
|
|
||||||
msg_limit: int = 6,
|
|
||||||
|
|
||||||
) -> tuple[
|
|
||||||
Return|None,
|
|
||||||
list[MsgType]
|
|
||||||
]:
|
|
||||||
'''
|
|
||||||
Drain IPC msgs delivered to the underlying rx-mem-chan
|
|
||||||
`Context._recv_chan` from the runtime in search for a final
|
|
||||||
result or error msg.
|
|
||||||
|
|
||||||
The motivation here is to ideally capture errors during ctxc
|
|
||||||
conditions where a canc-request/or local error is sent but the
|
|
||||||
local task also excepts and enters the
|
|
||||||
`Portal.open_context().__aexit__()` block wherein we prefer to
|
|
||||||
capture and raise any remote error or ctxc-ack as part of the
|
|
||||||
`ctx.result()` cleanup and teardown sequence.
|
|
||||||
|
|
||||||
'''
|
|
||||||
__tracebackhide__: bool = hide_tb
|
|
||||||
raise_overrun: bool = not ctx._allow_overruns
|
|
||||||
|
|
||||||
# wait for a final context result by collecting (but
|
|
||||||
# basically ignoring) any bi-dir-stream msgs still in transit
|
|
||||||
# from the far end.
|
|
||||||
pre_result_drained: list[MsgType] = []
|
|
||||||
return_msg: Return|None = None
|
|
||||||
while not (
|
|
||||||
ctx.maybe_error
|
|
||||||
and not ctx._final_result_is_set()
|
|
||||||
):
|
|
||||||
try:
|
|
||||||
# TODO: can remove?
|
|
||||||
# await trio.lowlevel.checkpoint()
|
|
||||||
|
|
||||||
# NOTE: this REPL usage actually works here dawg! Bo
|
|
||||||
# from .devx._debug import pause
|
|
||||||
# await pause()
|
|
||||||
|
|
||||||
# TODO: bad idea?
|
|
||||||
# -[ ] wrap final outcome channel wait in a scope so
|
|
||||||
# it can be cancelled out of band if needed?
|
|
||||||
#
|
|
||||||
# with trio.CancelScope() as res_cs:
|
|
||||||
# ctx._res_scope = res_cs
|
|
||||||
# msg: dict = await ctx._recv_chan.receive()
|
|
||||||
# if res_cs.cancelled_caught:
|
|
||||||
|
|
||||||
# TODO: ensure there's no more hangs, debugging the
|
|
||||||
# runtime pretty preaase!
|
|
||||||
# from .devx._debug import pause
|
|
||||||
# await pause()
|
|
||||||
|
|
||||||
# TODO: can remove this finally?
|
|
||||||
# we have no more need for the sync draining right
|
|
||||||
# since we're can kinda guarantee the async
|
|
||||||
# `.receive()` below will never block yah?
|
|
||||||
#
|
|
||||||
# if (
|
|
||||||
# ctx._cancel_called and (
|
|
||||||
# ctx.cancel_acked
|
|
||||||
# # or ctx.chan._cancel_called
|
|
||||||
# )
|
|
||||||
# # or not ctx._final_result_is_set()
|
|
||||||
# # ctx.outcome is not
|
|
||||||
# # or ctx.chan._closed
|
|
||||||
# ):
|
|
||||||
# try:
|
|
||||||
# msg: dict = await ctx._recv_chan.receive_nowait()()
|
|
||||||
# except trio.WouldBlock:
|
|
||||||
# log.warning(
|
|
||||||
# 'When draining already `.cancel_called` ctx!\n'
|
|
||||||
# 'No final msg arrived..\n'
|
|
||||||
# )
|
|
||||||
# break
|
|
||||||
# else:
|
|
||||||
# msg: dict = await ctx._recv_chan.receive()
|
|
||||||
|
|
||||||
# TODO: don't need it right jefe?
|
|
||||||
# with trio.move_on_after(1) as cs:
|
|
||||||
# if cs.cancelled_caught:
|
|
||||||
# from .devx._debug import pause
|
|
||||||
# await pause()
|
|
||||||
|
|
||||||
# pray to the `trio` gawds that we're corrent with this
|
|
||||||
# msg: dict = await ctx._recv_chan.receive()
|
|
||||||
msg: MsgType = await ctx._recv_chan.receive()
|
|
||||||
|
|
||||||
# NOTE: we get here if the far end was
|
|
||||||
# `ContextCancelled` in 2 cases:
|
|
||||||
# 1. we requested the cancellation and thus
|
|
||||||
# SHOULD NOT raise that far end error,
|
|
||||||
# 2. WE DID NOT REQUEST that cancel and thus
|
|
||||||
# SHOULD RAISE HERE!
|
|
||||||
except trio.Cancelled:
|
|
||||||
|
|
||||||
# CASE 2: mask the local cancelled-error(s)
|
|
||||||
# only when we are sure the remote error is
|
|
||||||
# the source cause of this local task's
|
|
||||||
# cancellation.
|
|
||||||
ctx.maybe_raise()
|
|
||||||
|
|
||||||
# CASE 1: we DID request the cancel we simply
|
|
||||||
# continue to bubble up as normal.
|
|
||||||
raise
|
|
||||||
|
|
||||||
match msg:
|
|
||||||
|
|
||||||
# final result arrived!
|
|
||||||
case Return(
|
|
||||||
# cid=cid,
|
|
||||||
pld=res,
|
|
||||||
):
|
|
||||||
ctx._result: Any = res
|
|
||||||
log.runtime(
|
|
||||||
'Context delivered final draining msg:\n'
|
|
||||||
f'{pformat(msg)}'
|
|
||||||
)
|
|
||||||
# XXX: only close the rx mem chan AFTER
|
|
||||||
# a final result is retreived.
|
|
||||||
# if ctx._recv_chan:
|
|
||||||
# await ctx._recv_chan.aclose()
|
|
||||||
# TODO: ^ we don't need it right?
|
|
||||||
return_msg = msg
|
|
||||||
break
|
|
||||||
|
|
||||||
# far end task is still streaming to us so discard
|
|
||||||
# and report depending on local ctx state.
|
|
||||||
case Yield():
|
|
||||||
pre_result_drained.append(msg)
|
|
||||||
if (
|
|
||||||
(ctx._stream.closed
|
|
||||||
and (reason := 'stream was already closed')
|
|
||||||
)
|
|
||||||
or (ctx.cancel_acked
|
|
||||||
and (reason := 'ctx cancelled other side')
|
|
||||||
)
|
|
||||||
or (ctx._cancel_called
|
|
||||||
and (reason := 'ctx called `.cancel()`')
|
|
||||||
)
|
|
||||||
or (len(pre_result_drained) > msg_limit
|
|
||||||
and (reason := f'"yield" limit={msg_limit}')
|
|
||||||
)
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
'Cancelling `MsgStream` drain since '
|
|
||||||
f'{reason}\n\n'
|
|
||||||
f'<= {ctx.chan.uid}\n'
|
|
||||||
f' |_{ctx._nsf}()\n\n'
|
|
||||||
f'=> {ctx._task}\n'
|
|
||||||
f' |_{ctx._stream}\n\n'
|
|
||||||
|
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
|
||||||
return (
|
|
||||||
return_msg,
|
|
||||||
pre_result_drained,
|
|
||||||
)
|
|
||||||
|
|
||||||
# drain up to the `msg_limit` hoping to get
|
|
||||||
# a final result or error/ctxc.
|
|
||||||
else:
|
|
||||||
log.warning(
|
|
||||||
'Ignoring "yield" msg during `ctx.result()` drain..\n'
|
|
||||||
f'<= {ctx.chan.uid}\n'
|
|
||||||
f' |_{ctx._nsf}()\n\n'
|
|
||||||
f'=> {ctx._task}\n'
|
|
||||||
f' |_{ctx._stream}\n\n'
|
|
||||||
|
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# stream terminated, but no result yet..
|
|
||||||
#
|
|
||||||
# TODO: work out edge cases here where
|
|
||||||
# a stream is open but the task also calls
|
|
||||||
# this?
|
|
||||||
# -[ ] should be a runtime error if a stream is open right?
|
|
||||||
# Stop()
|
|
||||||
case Stop():
|
|
||||||
pre_result_drained.append(msg)
|
|
||||||
log.cancel(
|
|
||||||
'Remote stream terminated due to "stop" msg:\n\n'
|
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# remote error msg, likely already handled inside
|
|
||||||
# `Context._deliver_msg()`
|
|
||||||
case Error():
|
|
||||||
# TODO: can we replace this with `ctx.maybe_raise()`?
|
|
||||||
# -[ ] would this be handier for this case maybe?
|
|
||||||
# async with maybe_raise_on_exit() as raises:
|
|
||||||
# if raises:
|
|
||||||
# log.error('some msg about raising..')
|
|
||||||
#
|
|
||||||
re: Exception|None = ctx._remote_error
|
|
||||||
if re:
|
|
||||||
assert msg is ctx._cancel_msg
|
|
||||||
# NOTE: this solved a super duper edge case XD
|
|
||||||
# this was THE super duper edge case of:
|
|
||||||
# - local task opens a remote task,
|
|
||||||
# - requests remote cancellation of far end
|
|
||||||
# ctx/tasks,
|
|
||||||
# - needs to wait for the cancel ack msg
|
|
||||||
# (ctxc) or some result in the race case
|
|
||||||
# where the other side's task returns
|
|
||||||
# before the cancel request msg is ever
|
|
||||||
# rxed and processed,
|
|
||||||
# - here this surrounding drain loop (which
|
|
||||||
# iterates all ipc msgs until the ack or
|
|
||||||
# an early result arrives) was NOT exiting
|
|
||||||
# since we are the edge case: local task
|
|
||||||
# does not re-raise any ctxc it receives
|
|
||||||
# IFF **it** was the cancellation
|
|
||||||
# requester..
|
|
||||||
#
|
|
||||||
# XXX will raise if necessary but ow break
|
|
||||||
# from loop presuming any supressed error
|
|
||||||
# (ctxc) should terminate the context!
|
|
||||||
ctx._maybe_raise_remote_err(
|
|
||||||
re,
|
|
||||||
# NOTE: obvi we don't care if we
|
|
||||||
# overran the far end if we're already
|
|
||||||
# waiting on a final result (msg).
|
|
||||||
# raise_overrun_from_self=False,
|
|
||||||
raise_overrun_from_self=raise_overrun,
|
|
||||||
)
|
|
||||||
|
|
||||||
break # OOOOOF, yeah obvi we need this..
|
|
||||||
|
|
||||||
# XXX we should never really get here
|
|
||||||
# right! since `._deliver_msg()` should
|
|
||||||
# always have detected an {'error': ..}
|
|
||||||
# msg and already called this right!?!
|
|
||||||
elif error := unpack_error(
|
|
||||||
msg=msg,
|
|
||||||
chan=ctx._portal.channel,
|
|
||||||
hide_tb=False,
|
|
||||||
):
|
|
||||||
log.critical('SHOULD NEVER GET HERE!?')
|
|
||||||
assert msg is ctx._cancel_msg
|
|
||||||
assert error.msgdata == ctx._remote_error.msgdata
|
|
||||||
assert error.ipc_msg == ctx._remote_error.ipc_msg
|
|
||||||
from .devx._debug import pause
|
|
||||||
await pause()
|
|
||||||
ctx._maybe_cancel_and_set_remote_error(error)
|
|
||||||
ctx._maybe_raise_remote_err(error)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# bubble the original src key error
|
|
||||||
raise
|
|
||||||
|
|
||||||
# XXX should pretty much never get here unless someone
|
|
||||||
# overrides the default `MsgType` spec.
|
|
||||||
case _:
|
|
||||||
pre_result_drained.append(msg)
|
|
||||||
# It's definitely an internal error if any other
|
|
||||||
# msg type without a`'cid'` field arrives here!
|
|
||||||
if not msg.cid:
|
|
||||||
raise InternalError(
|
|
||||||
'Unexpected cid-missing msg?\n\n'
|
|
||||||
f'{msg}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
raise RuntimeError('Unknown msg type: {msg}')
|
|
||||||
|
|
||||||
else:
|
|
||||||
log.cancel(
|
|
||||||
'Skipping `MsgStream` drain since final outcome is set\n\n'
|
|
||||||
f'{ctx.outcome}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
return (
|
|
||||||
return_msg,
|
|
||||||
pre_result_drained,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class Unresolved:
|
class Unresolved:
|
||||||
'''
|
'''
|
||||||
Placeholder value for `Context._result` until
|
Placeholder value for `Context._result` until
|
||||||
|
@ -423,9 +146,12 @@ class Context:
|
||||||
|
|
||||||
# the "feeder" channels for delivering message values to the
|
# the "feeder" channels for delivering message values to the
|
||||||
# local task from the runtime's msg processing loop.
|
# local task from the runtime's msg processing loop.
|
||||||
_recv_chan: trio.MemoryReceiveChannel
|
_rx_chan: trio.MemoryReceiveChannel
|
||||||
_send_chan: trio.MemorySendChannel
|
_send_chan: trio.MemorySendChannel
|
||||||
|
|
||||||
|
# payload receiver
|
||||||
|
_pld_rx: msgops.PldRx
|
||||||
|
|
||||||
# full "namespace-path" to target RPC function
|
# full "namespace-path" to target RPC function
|
||||||
_nsf: NamespacePath
|
_nsf: NamespacePath
|
||||||
|
|
||||||
|
@ -447,7 +173,7 @@ class Context:
|
||||||
_task: trio.lowlevel.Task|None = None
|
_task: trio.lowlevel.Task|None = None
|
||||||
|
|
||||||
# TODO: cs around result waiting so we can cancel any
|
# TODO: cs around result waiting so we can cancel any
|
||||||
# permanently blocking `._recv_chan.receive()` call in
|
# permanently blocking `._rx_chan.receive()` call in
|
||||||
# a drain loop?
|
# a drain loop?
|
||||||
# _res_scope: trio.CancelScope|None = None
|
# _res_scope: trio.CancelScope|None = None
|
||||||
|
|
||||||
|
@ -504,14 +230,6 @@ class Context:
|
||||||
_started_called: bool = False
|
_started_called: bool = False
|
||||||
_stream_opened: bool = False
|
_stream_opened: bool = False
|
||||||
_stream: MsgStream|None = None
|
_stream: MsgStream|None = None
|
||||||
_pld_codec_var: ContextVar[MsgCodec] = ContextVar(
|
|
||||||
'pld_codec',
|
|
||||||
default=_codec._def_msgspec_codec, # i.e. `Any`-payloads
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def pld_codec(self) -> MsgCodec|None:
|
|
||||||
return self._pld_codec_var.get()
|
|
||||||
|
|
||||||
# caller of `Portal.open_context()` for
|
# caller of `Portal.open_context()` for
|
||||||
# logging purposes mostly
|
# logging purposes mostly
|
||||||
|
@ -754,13 +472,17 @@ class Context:
|
||||||
return 'parent' if self._portal else 'child'
|
return 'parent' if self._portal else 'child'
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def peer_side(side: str) -> str:
|
def _peer_side(side: str) -> str:
|
||||||
match side:
|
match side:
|
||||||
case 'child':
|
case 'child':
|
||||||
return 'parent'
|
return 'parent'
|
||||||
case 'parent':
|
case 'parent':
|
||||||
return 'child'
|
return 'child'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def peer_side(self) -> str:
|
||||||
|
return self._peer_side(self.side)
|
||||||
|
|
||||||
# TODO: remove stat!
|
# TODO: remove stat!
|
||||||
# -[ ] re-implement the `.experiemental._pubsub` stuff
|
# -[ ] re-implement the `.experiemental._pubsub` stuff
|
||||||
# with `MsgStream` and that should be last usage?
|
# with `MsgStream` and that should be last usage?
|
||||||
|
@ -794,9 +516,7 @@ class Context:
|
||||||
equiv of a `StopIteration`.
|
equiv of a `StopIteration`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
await self.chan.send(
|
await self.chan.send(Stop(cid=self.cid))
|
||||||
Stop(cid=self.cid)
|
|
||||||
)
|
|
||||||
|
|
||||||
def _maybe_cancel_and_set_remote_error(
|
def _maybe_cancel_and_set_remote_error(
|
||||||
self,
|
self,
|
||||||
|
@ -875,7 +595,6 @@ class Context:
|
||||||
# TODO: never do this right?
|
# TODO: never do this right?
|
||||||
# if self._remote_error:
|
# if self._remote_error:
|
||||||
# return
|
# return
|
||||||
peer_side: str = self.peer_side(self.side)
|
|
||||||
|
|
||||||
# XXX: denote and set the remote side's error so that
|
# XXX: denote and set the remote side's error so that
|
||||||
# after we cancel whatever task is the opener of this
|
# after we cancel whatever task is the opener of this
|
||||||
|
@ -883,7 +602,7 @@ class Context:
|
||||||
# appropriately.
|
# appropriately.
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Setting remote error for ctx\n\n'
|
'Setting remote error for ctx\n\n'
|
||||||
f'<= {peer_side!r}: {self.chan.uid}\n'
|
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
||||||
f'=> {self.side!r}\n\n'
|
f'=> {self.side!r}\n\n'
|
||||||
f'{error}'
|
f'{error}'
|
||||||
)
|
)
|
||||||
|
@ -905,9 +624,8 @@ class Context:
|
||||||
|
|
||||||
elif isinstance(error, MsgTypeError):
|
elif isinstance(error, MsgTypeError):
|
||||||
msgerr = True
|
msgerr = True
|
||||||
peer_side: str = self.peer_side(self.side)
|
|
||||||
log.error(
|
log.error(
|
||||||
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
|
f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n'
|
||||||
|
|
||||||
f'{error}\n'
|
f'{error}\n'
|
||||||
f'{pformat(self)}\n'
|
f'{pformat(self)}\n'
|
||||||
|
@ -916,9 +634,8 @@ class Context:
|
||||||
else:
|
else:
|
||||||
log.error(
|
log.error(
|
||||||
f'Remote context error:\n\n'
|
f'Remote context error:\n\n'
|
||||||
|
# f'{pformat(self)}\n'
|
||||||
f'{error}\n'
|
f'{error}\n'
|
||||||
f'{pformat(self)}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# always record the cancelling actor's uid since its
|
# always record the cancelling actor's uid since its
|
||||||
|
@ -955,24 +672,49 @@ class Context:
|
||||||
and not self._is_self_cancelled()
|
and not self._is_self_cancelled()
|
||||||
and not cs.cancel_called
|
and not cs.cancel_called
|
||||||
and not cs.cancelled_caught
|
and not cs.cancelled_caught
|
||||||
and (
|
|
||||||
msgerr
|
|
||||||
and
|
|
||||||
# NOTE: allow user to config not cancelling the
|
|
||||||
# local scope on `MsgTypeError`s
|
|
||||||
self._cancel_on_msgerr
|
|
||||||
)
|
|
||||||
):
|
):
|
||||||
# TODO: it'd sure be handy to inject our own
|
if not (
|
||||||
# `trio.Cancelled` subtype here ;)
|
msgerr
|
||||||
# https://github.com/goodboy/tractor/issues/368
|
|
||||||
log.cancel('Cancelling local `.open_context()` scope!')
|
|
||||||
self._scope.cancel()
|
|
||||||
|
|
||||||
|
# NOTE: we allow user to config not cancelling the
|
||||||
|
# local scope on `MsgTypeError`s
|
||||||
|
and not self._cancel_on_msgerr
|
||||||
|
):
|
||||||
|
# TODO: it'd sure be handy to inject our own
|
||||||
|
# `trio.Cancelled` subtype here ;)
|
||||||
|
# https://github.com/goodboy/tractor/issues/368
|
||||||
|
message: str = 'Cancelling `Context._scope` !\n\n'
|
||||||
|
self._scope.cancel()
|
||||||
|
|
||||||
|
else:
|
||||||
|
message: str = (
|
||||||
|
'NOT Cancelling `Context._scope` since,\n'
|
||||||
|
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
|
||||||
|
f'AND we got a msg-type-error!\n'
|
||||||
|
f'{error}\n'
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
log.cancel('NOT cancelling local `.open_context()` scope!')
|
message: str = 'NOT cancelling `Context._scope` !\n\n'
|
||||||
|
|
||||||
|
scope_info: str = 'No `self._scope: CancelScope` was set/used ?'
|
||||||
|
if cs:
|
||||||
|
scope_info: str = (
|
||||||
|
f'self._scope: {cs}\n'
|
||||||
|
f'|_ .cancel_called: {cs.cancel_called}\n'
|
||||||
|
f'|_ .cancelled_caught: {cs.cancelled_caught}\n'
|
||||||
|
f'|_ ._cancel_status: {cs._cancel_status}\n\n'
|
||||||
|
|
||||||
|
f'{self}\n'
|
||||||
|
f'|_ ._is_self_cancelled(): {self._is_self_cancelled()}\n'
|
||||||
|
f'|_ ._cancel_on_msgerr: {self._cancel_on_msgerr}\n\n'
|
||||||
|
|
||||||
|
f'msgerr: {msgerr}\n'
|
||||||
|
)
|
||||||
|
log.cancel(
|
||||||
|
message
|
||||||
|
+
|
||||||
|
f'{scope_info}'
|
||||||
|
)
|
||||||
# TODO: maybe we should also call `._res_scope.cancel()` if it
|
# TODO: maybe we should also call `._res_scope.cancel()` if it
|
||||||
# exists to support cancelling any drain loop hangs?
|
# exists to support cancelling any drain loop hangs?
|
||||||
|
|
||||||
|
@ -1256,7 +998,7 @@ class Context:
|
||||||
# a ``.open_stream()`` block prior or there was some other
|
# a ``.open_stream()`` block prior or there was some other
|
||||||
# unanticipated error or cancellation from ``trio``.
|
# unanticipated error or cancellation from ``trio``.
|
||||||
|
|
||||||
if ctx._recv_chan._closed:
|
if ctx._rx_chan._closed:
|
||||||
raise trio.ClosedResourceError(
|
raise trio.ClosedResourceError(
|
||||||
'The underlying channel for this stream was already closed!\n'
|
'The underlying channel for this stream was already closed!\n'
|
||||||
)
|
)
|
||||||
|
@ -1276,7 +1018,7 @@ class Context:
|
||||||
# stream WAS NOT just closed normally/gracefully.
|
# stream WAS NOT just closed normally/gracefully.
|
||||||
async with MsgStream(
|
async with MsgStream(
|
||||||
ctx=self,
|
ctx=self,
|
||||||
rx_chan=ctx._recv_chan,
|
rx_chan=ctx._rx_chan,
|
||||||
) as stream:
|
) as stream:
|
||||||
|
|
||||||
# NOTE: we track all existing streams per portal for
|
# NOTE: we track all existing streams per portal for
|
||||||
|
@ -1325,12 +1067,12 @@ class Context:
|
||||||
except trio.EndOfChannel as eoc:
|
except trio.EndOfChannel as eoc:
|
||||||
if (
|
if (
|
||||||
eoc
|
eoc
|
||||||
and stream.closed
|
and
|
||||||
|
stream.closed
|
||||||
):
|
):
|
||||||
# sanity, can remove?
|
# sanity, can remove?
|
||||||
assert eoc is stream._eoc
|
assert eoc is stream._eoc
|
||||||
# from .devx import pause
|
|
||||||
# await pause()
|
|
||||||
log.warning(
|
log.warning(
|
||||||
'Stream was terminated by EoC\n\n'
|
'Stream was terminated by EoC\n\n'
|
||||||
# NOTE: won't show the error <Type> but
|
# NOTE: won't show the error <Type> but
|
||||||
|
@ -1427,13 +1169,12 @@ class Context:
|
||||||
# boxed `StreamOverrun`. This is mostly useful for
|
# boxed `StreamOverrun`. This is mostly useful for
|
||||||
# supressing such faults during
|
# supressing such faults during
|
||||||
# cancellation/error/final-result handling inside
|
# cancellation/error/final-result handling inside
|
||||||
# `_drain_to_final_msg()` such that we do not
|
# `msg._ops.drain_to_final_msg()` such that we do not
|
||||||
# raise such errors particularly in the case where
|
# raise such errors particularly in the case where
|
||||||
# `._cancel_called == True`.
|
# `._cancel_called == True`.
|
||||||
not raise_overrun_from_self
|
not raise_overrun_from_self
|
||||||
and isinstance(remote_error, RemoteActorError)
|
and isinstance(remote_error, RemoteActorError)
|
||||||
|
and remote_error.boxed_type is StreamOverrun
|
||||||
and remote_error.boxed_type_str == 'StreamOverrun'
|
|
||||||
|
|
||||||
# and tuple(remote_error.msgdata['sender']) == our_uid
|
# and tuple(remote_error.msgdata['sender']) == our_uid
|
||||||
and tuple(remote_error.sender) == our_uid
|
and tuple(remote_error.sender) == our_uid
|
||||||
|
@ -1503,12 +1244,12 @@ class Context:
|
||||||
if self._final_result_is_set():
|
if self._final_result_is_set():
|
||||||
return self._result
|
return self._result
|
||||||
|
|
||||||
assert self._recv_chan
|
assert self._rx_chan
|
||||||
raise_overrun: bool = not self._allow_overruns
|
raise_overrun: bool = not self._allow_overruns
|
||||||
if (
|
if (
|
||||||
self.maybe_error is None
|
self.maybe_error is None
|
||||||
and
|
and
|
||||||
not self._recv_chan._closed # type: ignore
|
not self._rx_chan._closed # type: ignore
|
||||||
):
|
):
|
||||||
# wait for a final context result/error by "draining"
|
# wait for a final context result/error by "draining"
|
||||||
# (by more or less ignoring) any bi-dir-stream "yield"
|
# (by more or less ignoring) any bi-dir-stream "yield"
|
||||||
|
@ -1516,7 +1257,7 @@ class Context:
|
||||||
(
|
(
|
||||||
return_msg,
|
return_msg,
|
||||||
drained_msgs,
|
drained_msgs,
|
||||||
) = await _drain_to_final_msg(
|
) = await msgops.drain_to_final_msg(
|
||||||
ctx=self,
|
ctx=self,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
|
@ -1802,8 +1543,7 @@ class Context:
|
||||||
await self.chan.send(started_msg)
|
await self.chan.send(started_msg)
|
||||||
|
|
||||||
# raise any msg type error NO MATTER WHAT!
|
# raise any msg type error NO MATTER WHAT!
|
||||||
except msgspec.ValidationError as verr:
|
except ValidationError as verr:
|
||||||
from tractor._ipc import _mk_msg_type_err
|
|
||||||
raise _mk_msg_type_err(
|
raise _mk_msg_type_err(
|
||||||
msg=msg_bytes,
|
msg=msg_bytes,
|
||||||
codec=codec,
|
codec=codec,
|
||||||
|
@ -1890,7 +1630,7 @@ class Context:
|
||||||
- NEVER `return` early before delivering the msg!
|
- NEVER `return` early before delivering the msg!
|
||||||
bc if the error is a ctxc and there is a task waiting on
|
bc if the error is a ctxc and there is a task waiting on
|
||||||
`.result()` we need the msg to be
|
`.result()` we need the msg to be
|
||||||
`send_chan.send_nowait()`-ed over the `._recv_chan` so
|
`send_chan.send_nowait()`-ed over the `._rx_chan` so
|
||||||
that the error is relayed to that waiter task and thus
|
that the error is relayed to that waiter task and thus
|
||||||
raised in user code!
|
raised in user code!
|
||||||
|
|
||||||
|
@ -1904,10 +1644,9 @@ class Context:
|
||||||
side: str = self.side
|
side: str = self.side
|
||||||
if side == 'child':
|
if side == 'child':
|
||||||
assert not self._portal
|
assert not self._portal
|
||||||
peer_side: str = self.peer_side(side)
|
|
||||||
|
|
||||||
flow_body: str = (
|
flow_body: str = (
|
||||||
f'<= peer {peer_side!r}: {from_uid}\n'
|
f'<= peer {self.peer_side!r}: {from_uid}\n'
|
||||||
f' |_<{nsf}()>\n\n'
|
f' |_<{nsf}()>\n\n'
|
||||||
|
|
||||||
f'=> {side!r}: {self._task}\n'
|
f'=> {side!r}: {self._task}\n'
|
||||||
|
@ -1925,7 +1664,7 @@ class Context:
|
||||||
log_meth = log.runtime
|
log_meth = log.runtime
|
||||||
|
|
||||||
log_meth(
|
log_meth(
|
||||||
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
|
f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n'
|
||||||
|
|
||||||
f'{flow_body}'
|
f'{flow_body}'
|
||||||
|
|
||||||
|
@ -2201,24 +1940,11 @@ async def open_context_from_portal(
|
||||||
# -> it's expected that if there is an error in this phase of
|
# -> it's expected that if there is an error in this phase of
|
||||||
# the dialog, the `Error` msg should be raised from the `msg`
|
# the dialog, the `Error` msg should be raised from the `msg`
|
||||||
# handling block below.
|
# handling block below.
|
||||||
msg: Started = await ctx._recv_chan.receive()
|
first: Any = await ctx._pld_rx.recv_pld(
|
||||||
try:
|
ctx=ctx,
|
||||||
# the "first" value here is delivered by the callee's
|
expect_msg=Started,
|
||||||
# ``Context.started()`` call.
|
)
|
||||||
# first: Any = msg['started']
|
ctx._started_called: bool = True
|
||||||
first: Any = msg.pld
|
|
||||||
ctx._started_called: bool = True
|
|
||||||
|
|
||||||
# except KeyError as src_error:
|
|
||||||
except AttributeError as src_error:
|
|
||||||
log.exception('Raising from unexpected msg!\n')
|
|
||||||
_raise_from_no_key_in_msg(
|
|
||||||
ctx=ctx,
|
|
||||||
msg=msg,
|
|
||||||
src_err=src_error,
|
|
||||||
log=log,
|
|
||||||
expect_msg=Started,
|
|
||||||
)
|
|
||||||
|
|
||||||
uid: tuple = portal.channel.uid
|
uid: tuple = portal.channel.uid
|
||||||
cid: str = ctx.cid
|
cid: str = ctx.cid
|
||||||
|
@ -2540,7 +2266,7 @@ async def open_context_from_portal(
|
||||||
# we tear down the runtime feeder chan last
|
# we tear down the runtime feeder chan last
|
||||||
# to avoid premature stream clobbers.
|
# to avoid premature stream clobbers.
|
||||||
if (
|
if (
|
||||||
(rxchan := ctx._recv_chan)
|
(rxchan := ctx._rx_chan)
|
||||||
|
|
||||||
# maybe TODO: yes i know the below check is
|
# maybe TODO: yes i know the below check is
|
||||||
# touching `trio` memchan internals..BUT, there are
|
# touching `trio` memchan internals..BUT, there are
|
||||||
|
@ -2583,7 +2309,7 @@ async def open_context_from_portal(
|
||||||
# underlying feeder channel is
|
# underlying feeder channel is
|
||||||
# once-and-only-CLOSED!
|
# once-and-only-CLOSED!
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await ctx._recv_chan.aclose()
|
await ctx._rx_chan.aclose()
|
||||||
|
|
||||||
# XXX: we always raise remote errors locally and
|
# XXX: we always raise remote errors locally and
|
||||||
# generally speaking mask runtime-machinery related
|
# generally speaking mask runtime-machinery related
|
||||||
|
@ -2603,7 +2329,7 @@ async def open_context_from_portal(
|
||||||
and ctx.cancel_acked
|
and ctx.cancel_acked
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Context cancelled by caller task\n'
|
'Context cancelled by {ctx.side!r}-side task\n'
|
||||||
f'|_{ctx._task}\n\n'
|
f'|_{ctx._task}\n\n'
|
||||||
|
|
||||||
f'{repr(scope_err)}\n'
|
f'{repr(scope_err)}\n'
|
||||||
|
@ -2628,21 +2354,23 @@ async def open_context_from_portal(
|
||||||
# FINALLY, remove the context from runtime tracking and
|
# FINALLY, remove the context from runtime tracking and
|
||||||
# exit!
|
# exit!
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Removing IPC ctx opened with peer\n'
|
'De-allocating IPC ctx opened with {ctx.side!r} peer \n'
|
||||||
f'{uid}\n'
|
f'uid: {uid}\n'
|
||||||
f'|_{ctx}\n'
|
f'cid: {ctx.cid}\n'
|
||||||
)
|
)
|
||||||
portal.actor._contexts.pop(
|
portal.actor._contexts.pop(
|
||||||
(uid, cid),
|
(uid, cid),
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def mk_context(
|
def mk_context(
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
cid: str,
|
cid: str,
|
||||||
nsf: NamespacePath,
|
nsf: NamespacePath,
|
||||||
|
|
||||||
msg_buffer_size: int = 2**6,
|
msg_buffer_size: int = 2**6,
|
||||||
|
pld_spec: Union[Type] = Any,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
@ -2662,12 +2390,18 @@ def mk_context(
|
||||||
from .devx._code import find_caller_info
|
from .devx._code import find_caller_info
|
||||||
caller_info: CallerInfo|None = find_caller_info()
|
caller_info: CallerInfo|None = find_caller_info()
|
||||||
|
|
||||||
|
pld_rx = msgops.PldRx(
|
||||||
|
# _rx_mc=recv_chan,
|
||||||
|
_msgdec=_codec.mk_dec(spec=pld_spec)
|
||||||
|
)
|
||||||
|
|
||||||
ctx = Context(
|
ctx = Context(
|
||||||
chan=chan,
|
chan=chan,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
_actor=current_actor(),
|
_actor=current_actor(),
|
||||||
_send_chan=send_chan,
|
_send_chan=send_chan,
|
||||||
_recv_chan=recv_chan,
|
_rx_chan=recv_chan,
|
||||||
|
_pld_rx=pld_rx,
|
||||||
_nsf=nsf,
|
_nsf=nsf,
|
||||||
_task=trio.lowlevel.current_task(),
|
_task=trio.lowlevel.current_task(),
|
||||||
_caller_info=caller_info,
|
_caller_info=caller_info,
|
||||||
|
|
|
@ -54,6 +54,7 @@ from tractor.msg import (
|
||||||
from tractor.msg.pretty_struct import (
|
from tractor.msg.pretty_struct import (
|
||||||
iter_fields,
|
iter_fields,
|
||||||
Struct,
|
Struct,
|
||||||
|
pformat as struct_format,
|
||||||
)
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -108,6 +109,10 @@ _body_fields: list[str] = list(
|
||||||
'relay_path',
|
'relay_path',
|
||||||
'_msg_dict',
|
'_msg_dict',
|
||||||
'cid',
|
'cid',
|
||||||
|
|
||||||
|
# since only ctxc should show it but `Error` does
|
||||||
|
# have it as an optional field.
|
||||||
|
'canceller',
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -382,6 +387,9 @@ class RemoteActorError(Exception):
|
||||||
'''
|
'''
|
||||||
Error type raised by original remote faulting actor.
|
Error type raised by original remote faulting actor.
|
||||||
|
|
||||||
|
When the error has only been relayed a single actor-hop
|
||||||
|
this will be the same as the `.boxed_type`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if self._src_type is None:
|
if self._src_type is None:
|
||||||
self._src_type = get_err_type(
|
self._src_type = get_err_type(
|
||||||
|
@ -396,7 +404,8 @@ class RemoteActorError(Exception):
|
||||||
String-name of the (last hop's) boxed error type.
|
String-name of the (last hop's) boxed error type.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self._ipc_msg.boxed_type_str
|
bt: Type[BaseException] = self.boxed_type
|
||||||
|
return str(bt.__name__)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def boxed_type(self) -> str:
|
def boxed_type(self) -> str:
|
||||||
|
@ -492,7 +501,11 @@ class RemoteActorError(Exception):
|
||||||
'''
|
'''
|
||||||
# TODO: use this matryoshka emjoi XD
|
# TODO: use this matryoshka emjoi XD
|
||||||
# => 🪆
|
# => 🪆
|
||||||
reprol_str: str = f'{type(self).__name__}('
|
reprol_str: str = (
|
||||||
|
f'{type(self).__name__}' # type name
|
||||||
|
f'[{self.boxed_type_str}]' # parameterized by boxed type
|
||||||
|
'(' # init-style look
|
||||||
|
)
|
||||||
_repr: str = self._mk_fields_str(
|
_repr: str = self._mk_fields_str(
|
||||||
self.reprol_fields,
|
self.reprol_fields,
|
||||||
end_char=' ',
|
end_char=' ',
|
||||||
|
@ -532,7 +545,8 @@ class RemoteActorError(Exception):
|
||||||
self,
|
self,
|
||||||
) -> BaseException:
|
) -> BaseException:
|
||||||
'''
|
'''
|
||||||
Unpack the inner-most source error from it's original IPC msg data.
|
Unpack the inner-most source error from it's original IPC
|
||||||
|
msg data.
|
||||||
|
|
||||||
We attempt to reconstruct (as best as we can) the original
|
We attempt to reconstruct (as best as we can) the original
|
||||||
`Exception` from as it would have been raised in the
|
`Exception` from as it would have been raised in the
|
||||||
|
@ -570,6 +584,14 @@ class RemoteActorError(Exception):
|
||||||
# # boxed_type=get_type_ref(..
|
# # boxed_type=get_type_ref(..
|
||||||
# raise NotImplementedError
|
# raise NotImplementedError
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sender(self) -> tuple[str, str]|None:
|
||||||
|
if (
|
||||||
|
(msg := self._ipc_msg)
|
||||||
|
and (value := msg.sender)
|
||||||
|
):
|
||||||
|
return tuple(value)
|
||||||
|
|
||||||
|
|
||||||
class ContextCancelled(RemoteActorError):
|
class ContextCancelled(RemoteActorError):
|
||||||
'''
|
'''
|
||||||
|
@ -644,8 +666,8 @@ class MsgTypeError(
|
||||||
- `Yield`
|
- `Yield`
|
||||||
- TODO: any embedded `.pld` type defined by user code?
|
- TODO: any embedded `.pld` type defined by user code?
|
||||||
|
|
||||||
Normally the source of an error is re-raised from some `.msg._codec`
|
Normally the source of an error is re-raised from some
|
||||||
decode which itself raises in a backend interchange
|
`.msg._codec` decode which itself raises in a backend interchange
|
||||||
lib (eg. a `msgspec.ValidationError`).
|
lib (eg. a `msgspec.ValidationError`).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
@ -734,20 +756,6 @@ class StreamOverrun(
|
||||||
handled by app code using `MsgStream.send()/.receive()`.
|
handled by app code using `MsgStream.send()/.receive()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
@property
|
|
||||||
def sender(self) -> tuple[str, str] | None:
|
|
||||||
value = self._ipc_msg.sender
|
|
||||||
if value:
|
|
||||||
return tuple(value)
|
|
||||||
|
|
||||||
|
|
||||||
# class InternalActorError(RemoteActorError):
|
|
||||||
# '''
|
|
||||||
# Boxed (Remote) internal `tractor` error indicating failure of some
|
|
||||||
# primitive, machinery state or lowlevel task that should never
|
|
||||||
# occur.
|
|
||||||
|
|
||||||
# '''
|
|
||||||
|
|
||||||
|
|
||||||
class TransportClosed(trio.ClosedResourceError):
|
class TransportClosed(trio.ClosedResourceError):
|
||||||
|
@ -944,8 +952,7 @@ def _raise_from_unexpected_msg(
|
||||||
src_err: AttributeError,
|
src_err: AttributeError,
|
||||||
log: StackLevelAdapter, # caller specific `log` obj
|
log: StackLevelAdapter, # caller specific `log` obj
|
||||||
|
|
||||||
expect_msg: str = Yield,
|
expect_msg: Type[MsgType],
|
||||||
stream: MsgStream | None = None,
|
|
||||||
|
|
||||||
# allow "deeper" tbs when debugging B^o
|
# allow "deeper" tbs when debugging B^o
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
@ -987,6 +994,8 @@ def _raise_from_unexpected_msg(
|
||||||
) from src_err
|
) from src_err
|
||||||
|
|
||||||
# TODO: test that shows stream raising an expected error!!!
|
# TODO: test that shows stream raising an expected error!!!
|
||||||
|
stream: MsgStream|None
|
||||||
|
_type: str = 'Context'
|
||||||
|
|
||||||
# raise the error message in a boxed exception type!
|
# raise the error message in a boxed exception type!
|
||||||
if isinstance(msg, Error):
|
if isinstance(msg, Error):
|
||||||
|
@ -1003,59 +1012,54 @@ def _raise_from_unexpected_msg(
|
||||||
# TODO: does it make more sense to pack
|
# TODO: does it make more sense to pack
|
||||||
# the stream._eoc outside this in the calleer always?
|
# the stream._eoc outside this in the calleer always?
|
||||||
# case Stop():
|
# case Stop():
|
||||||
elif (
|
elif stream := ctx._stream:
|
||||||
isinstance(msg, Stop)
|
_type: str = 'MsgStream'
|
||||||
or (
|
|
||||||
stream
|
|
||||||
and stream._eoc
|
|
||||||
)
|
|
||||||
):
|
|
||||||
log.debug(
|
|
||||||
f'Context[{cid}] stream was stopped by remote side\n'
|
|
||||||
f'cid: {cid}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: if the a local task is already blocking on
|
if (
|
||||||
# a `Context.result()` and thus a `.receive()` on the
|
stream._eoc
|
||||||
# rx-chan, we close the chan and set state ensuring that
|
or
|
||||||
# an eoc is raised!
|
isinstance(msg, Stop)
|
||||||
|
):
|
||||||
|
log.debug(
|
||||||
|
f'Context[{cid}] stream was stopped by remote side\n'
|
||||||
|
f'cid: {cid}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
# TODO: if the a local task is already blocking on
|
||||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
# a `Context.result()` and thus a `.receive()` on the
|
||||||
# block below it will trigger ``.aclose()``.
|
# rx-chan, we close the chan and set state ensuring that
|
||||||
eoc = trio.EndOfChannel(
|
# an eoc is raised!
|
||||||
f'Context stream ended due to msg:\n\n'
|
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
|
||||||
# XXX: important to set so that a new `.receive()`
|
|
||||||
# call (likely by another task using a broadcast receiver)
|
|
||||||
# doesn't accidentally pull the `return` message
|
|
||||||
# value out of the underlying feed mem chan which is
|
|
||||||
# destined for the `Context.result()` call during ctx-exit!
|
|
||||||
stream._eoc: Exception = eoc
|
|
||||||
|
|
||||||
# in case there already is some underlying remote error
|
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||||
# that arrived which is probably the source of this stream
|
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||||
# closure
|
# block below it will trigger ``.aclose()``.
|
||||||
ctx.maybe_raise()
|
eoc = trio.EndOfChannel(
|
||||||
raise eoc from src_err
|
f'Context stream ended due to msg:\n\n'
|
||||||
|
f'{pformat(msg)}\n'
|
||||||
|
)
|
||||||
|
# XXX: important to set so that a new `.receive()`
|
||||||
|
# call (likely by another task using a broadcast receiver)
|
||||||
|
# doesn't accidentally pull the `return` message
|
||||||
|
# value out of the underlying feed mem chan which is
|
||||||
|
# destined for the `Context.result()` call during ctx-exit!
|
||||||
|
stream._eoc: Exception = eoc
|
||||||
|
|
||||||
if (
|
# in case there already is some underlying remote error
|
||||||
stream
|
# that arrived which is probably the source of this stream
|
||||||
and stream._closed
|
# closure
|
||||||
):
|
ctx.maybe_raise()
|
||||||
# TODO: our own error subtype?
|
raise eoc from src_err
|
||||||
raise trio.ClosedResourceError(
|
|
||||||
'This stream was closed'
|
# TODO: our own transport/IPC-broke error subtype?
|
||||||
)
|
if stream._closed:
|
||||||
|
raise trio.ClosedResourceError('This stream was closed')
|
||||||
|
|
||||||
# always re-raise the source error if no translation error case
|
# always re-raise the source error if no translation error case
|
||||||
# is activated above.
|
# is activated above.
|
||||||
_type: str = 'Stream' if stream else 'Context'
|
|
||||||
raise MessagingError(
|
raise MessagingError(
|
||||||
f"{_type} was expecting a {expect_msg} message"
|
f'{_type} was expecting a {expect_msg.__name__!r} message'
|
||||||
" BUT received a non-error msg:\n"
|
' BUT received a non-error msg:\n\n'
|
||||||
f'{pformat(msg)}'
|
f'{struct_format(msg)}'
|
||||||
) from src_err
|
) from src_err
|
||||||
|
|
||||||
|
|
||||||
|
@ -1088,13 +1092,11 @@ def _mk_msg_type_err(
|
||||||
# no src error from `msgspec.msgpack.Decoder.decode()` so
|
# no src error from `msgspec.msgpack.Decoder.decode()` so
|
||||||
# prolly a manual type-check on our part.
|
# prolly a manual type-check on our part.
|
||||||
if message is None:
|
if message is None:
|
||||||
fmt_spec: str = codec.pformat_msg_spec()
|
|
||||||
fmt_stack: str = (
|
fmt_stack: str = (
|
||||||
'\n'.join(traceback.format_stack(limit=3))
|
'\n'.join(traceback.format_stack(limit=3))
|
||||||
)
|
)
|
||||||
tb_fmt: str = pformat_boxed_tb(
|
tb_fmt: str = pformat_boxed_tb(
|
||||||
tb_str=fmt_stack,
|
tb_str=fmt_stack,
|
||||||
# fields_str=header,
|
|
||||||
field_prefix=' ',
|
field_prefix=' ',
|
||||||
indent='',
|
indent='',
|
||||||
)
|
)
|
||||||
|
@ -1102,8 +1104,7 @@ def _mk_msg_type_err(
|
||||||
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
||||||
f'{tb_fmt}\n'
|
f'{tb_fmt}\n'
|
||||||
f'Valid IPC msgs are:\n\n'
|
f'Valid IPC msgs are:\n\n'
|
||||||
# f' ------ - ------\n'
|
f'{codec.msg_spec_str}\n',
|
||||||
f'{fmt_spec}\n',
|
|
||||||
)
|
)
|
||||||
elif src_type_error:
|
elif src_type_error:
|
||||||
src_message: str = str(src_type_error)
|
src_message: str = str(src_type_error)
|
||||||
|
|
|
@ -31,7 +31,7 @@ from typing import (
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
# Type,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
@ -46,12 +46,12 @@ from ._state import (
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .msg import (
|
from .msg import (
|
||||||
Error,
|
# Error,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
Return,
|
Return,
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
unpack_error,
|
# unpack_error,
|
||||||
NoResult,
|
NoResult,
|
||||||
)
|
)
|
||||||
from ._context import (
|
from ._context import (
|
||||||
|
@ -62,42 +62,44 @@ from ._streaming import (
|
||||||
MsgStream,
|
MsgStream,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ._runtime import Actor
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
# TODO: rename to `unwrap_result()` and use
|
# TODO: remove and/or rework?
|
||||||
# `._raise_from_no_key_in_msg()` (after tweak to
|
# -[ ] rename to `unwrap_result()` and use
|
||||||
# accept a `chan: Channel` arg) in key block!
|
# `._raise_from_unexpected_msg()` (after tweak to accept a `chan:
|
||||||
def _unwrap_msg(
|
# Channel` arg) in key block??
|
||||||
msg: Return|Error,
|
# -[ ] pretty sure this is entirely covered by
|
||||||
channel: Channel,
|
# `_exceptions._raise_from_unexpected_msg()` so REMOVE!
|
||||||
|
# def _unwrap_msg(
|
||||||
|
# msg: Return|Error,
|
||||||
|
# ctx: Context,
|
||||||
|
|
||||||
hide_tb: bool = True,
|
# hide_tb: bool = True,
|
||||||
|
|
||||||
) -> Any:
|
# ) -> Any:
|
||||||
'''
|
# '''
|
||||||
Unwrap a final result from a `{return: <Any>}` IPC msg.
|
# Unwrap a final result from a `{return: <Any>}` IPC msg.
|
||||||
|
|
||||||
'''
|
# '''
|
||||||
__tracebackhide__: bool = hide_tb
|
# __tracebackhide__: bool = hide_tb
|
||||||
|
# try:
|
||||||
|
# return msg.pld
|
||||||
|
# except AttributeError as err:
|
||||||
|
|
||||||
try:
|
# # internal error should never get here
|
||||||
return msg.pld
|
# # assert msg.get('cid'), (
|
||||||
# return msg['return']
|
# assert msg.cid, (
|
||||||
# except KeyError as ke:
|
# "Received internal error at portal?"
|
||||||
except AttributeError as err:
|
# )
|
||||||
|
|
||||||
# internal error should never get here
|
# raise unpack_error(
|
||||||
# assert msg.get('cid'), (
|
# msg,
|
||||||
assert msg.cid, (
|
# ctx.chan,
|
||||||
"Received internal error at portal?"
|
# ) from err
|
||||||
)
|
|
||||||
|
|
||||||
raise unpack_error(
|
|
||||||
msg,
|
|
||||||
channel
|
|
||||||
) from err
|
|
||||||
|
|
||||||
|
|
||||||
class Portal:
|
class Portal:
|
||||||
|
@ -123,17 +125,21 @@ class Portal:
|
||||||
# connected (peer) actors.
|
# connected (peer) actors.
|
||||||
cancel_timeout: float = 0.5
|
cancel_timeout: float = 0.5
|
||||||
|
|
||||||
def __init__(self, channel: Channel) -> None:
|
def __init__(
|
||||||
|
self,
|
||||||
|
channel: Channel,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
self.chan = channel
|
self.chan = channel
|
||||||
# during the portal's lifetime
|
# during the portal's lifetime
|
||||||
self._result_msg: dict|None = None
|
self._final_result: Any|None = None
|
||||||
|
|
||||||
# When set to a ``Context`` (when _submit_for_result is called)
|
# When set to a ``Context`` (when _submit_for_result is called)
|
||||||
# it is expected that ``result()`` will be awaited at some
|
# it is expected that ``result()`` will be awaited at some
|
||||||
# point.
|
# point.
|
||||||
self._expect_result: Context | None = None
|
self._expect_result_ctx: Context|None = None
|
||||||
self._streams: set[MsgStream] = set()
|
self._streams: set[MsgStream] = set()
|
||||||
self.actor = current_actor()
|
self.actor: Actor = current_actor()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def channel(self) -> Channel:
|
def channel(self) -> Channel:
|
||||||
|
@ -147,6 +153,7 @@ class Portal:
|
||||||
)
|
)
|
||||||
return self.chan
|
return self.chan
|
||||||
|
|
||||||
|
# TODO: factor this out into an `ActorNursery` wrapper
|
||||||
async def _submit_for_result(
|
async def _submit_for_result(
|
||||||
self,
|
self,
|
||||||
ns: str,
|
ns: str,
|
||||||
|
@ -154,27 +161,18 @@ class Portal:
|
||||||
**kwargs
|
**kwargs
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
assert self._expect_result is None, (
|
if self._expect_result_ctx is not None:
|
||||||
"A pending main result has already been submitted"
|
raise RuntimeError(
|
||||||
)
|
'A pending main result has already been submitted'
|
||||||
|
)
|
||||||
|
|
||||||
self._expect_result = await self.actor.start_remote_task(
|
self._expect_result_ctx = await self.actor.start_remote_task(
|
||||||
self.channel,
|
self.channel,
|
||||||
nsf=NamespacePath(f'{ns}:{func}'),
|
nsf=NamespacePath(f'{ns}:{func}'),
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
portal=self,
|
portal=self,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _return_once(
|
|
||||||
self,
|
|
||||||
ctx: Context,
|
|
||||||
|
|
||||||
) -> Return:
|
|
||||||
|
|
||||||
assert ctx._remote_func_type == 'asyncfunc' # single response
|
|
||||||
msg: Return = await ctx._recv_chan.receive()
|
|
||||||
return msg
|
|
||||||
|
|
||||||
async def result(self) -> Any:
|
async def result(self) -> Any:
|
||||||
'''
|
'''
|
||||||
Return the result(s) from the remote actor's "main" task.
|
Return the result(s) from the remote actor's "main" task.
|
||||||
|
@ -188,7 +186,7 @@ class Portal:
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
# not expecting a "main" result
|
# not expecting a "main" result
|
||||||
if self._expect_result is None:
|
if self._expect_result_ctx is None:
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Portal for {self.channel.uid} not expecting a final"
|
f"Portal for {self.channel.uid} not expecting a final"
|
||||||
" result?\nresult() should only be called if subactor"
|
" result?\nresult() should only be called if subactor"
|
||||||
|
@ -196,17 +194,15 @@ class Portal:
|
||||||
return NoResult
|
return NoResult
|
||||||
|
|
||||||
# expecting a "main" result
|
# expecting a "main" result
|
||||||
assert self._expect_result
|
assert self._expect_result_ctx
|
||||||
|
|
||||||
if self._result_msg is None:
|
if self._final_result is None:
|
||||||
self._result_msg = await self._return_once(
|
self._final_result: Any = await self._expect_result_ctx._pld_rx.recv_pld(
|
||||||
self._expect_result
|
ctx=self._expect_result_ctx,
|
||||||
|
expect_msg=Return,
|
||||||
)
|
)
|
||||||
|
|
||||||
return _unwrap_msg(
|
return self._final_result
|
||||||
self._result_msg,
|
|
||||||
self.channel,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _cancel_streams(self):
|
async def _cancel_streams(self):
|
||||||
# terminate all locally running async generator
|
# terminate all locally running async generator
|
||||||
|
@ -337,11 +333,9 @@ class Portal:
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
portal=self,
|
portal=self,
|
||||||
)
|
)
|
||||||
ctx._portal: Portal = self
|
return await ctx._pld_rx.recv_pld(
|
||||||
msg: Return = await self._return_once(ctx)
|
ctx=ctx,
|
||||||
return _unwrap_msg(
|
expect_msg=Return,
|
||||||
msg,
|
|
||||||
self.channel,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
async def run(
|
async def run(
|
||||||
|
@ -391,10 +385,9 @@ class Portal:
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
portal=self,
|
portal=self,
|
||||||
)
|
)
|
||||||
ctx._portal = self
|
return await ctx._pld_rx.recv_pld(
|
||||||
return _unwrap_msg(
|
ctx=ctx,
|
||||||
await self._return_once(ctx),
|
expect_msg=Return,
|
||||||
self.channel,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -427,7 +420,6 @@ class Portal:
|
||||||
kwargs=kwargs,
|
kwargs=kwargs,
|
||||||
portal=self,
|
portal=self,
|
||||||
)
|
)
|
||||||
ctx._portal = self
|
|
||||||
|
|
||||||
# ensure receive-only stream entrypoint
|
# ensure receive-only stream entrypoint
|
||||||
assert ctx._remote_func_type == 'asyncgen'
|
assert ctx._remote_func_type == 'asyncgen'
|
||||||
|
@ -436,10 +428,11 @@ class Portal:
|
||||||
# deliver receive only stream
|
# deliver receive only stream
|
||||||
async with MsgStream(
|
async with MsgStream(
|
||||||
ctx=ctx,
|
ctx=ctx,
|
||||||
rx_chan=ctx._recv_chan,
|
rx_chan=ctx._rx_chan,
|
||||||
) as rchan:
|
) as stream:
|
||||||
self._streams.add(rchan)
|
self._streams.add(stream)
|
||||||
yield rchan
|
ctx._stream = stream
|
||||||
|
yield stream
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
|
@ -461,7 +454,7 @@ class Portal:
|
||||||
|
|
||||||
# XXX: should this always be done?
|
# XXX: should this always be done?
|
||||||
# await recv_chan.aclose()
|
# await recv_chan.aclose()
|
||||||
self._streams.remove(rchan)
|
self._streams.remove(stream)
|
||||||
|
|
||||||
# NOTE: impl is found in `._context`` mod to make
|
# NOTE: impl is found in `._context`` mod to make
|
||||||
# reading/groking the details simpler code-org-wise. This
|
# reading/groking the details simpler code-org-wise. This
|
||||||
|
|
184
tractor/_rpc.py
184
tractor/_rpc.py
|
@ -181,12 +181,11 @@ async def _invoke_non_context(
|
||||||
# way: using the linked IPC context machinery.
|
# way: using the linked IPC context machinery.
|
||||||
failed_resp: bool = False
|
failed_resp: bool = False
|
||||||
try:
|
try:
|
||||||
await chan.send(
|
ack = StartAck(
|
||||||
StartAck(
|
cid=cid,
|
||||||
cid=cid,
|
functype='asyncfunc',
|
||||||
functype='asyncfunc',
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
await chan.send(ack)
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
@ -194,12 +193,11 @@ async def _invoke_non_context(
|
||||||
) as ipc_err:
|
) as ipc_err:
|
||||||
failed_resp = True
|
failed_resp = True
|
||||||
if is_rpc:
|
if is_rpc:
|
||||||
raise
|
raise ipc_err
|
||||||
else:
|
else:
|
||||||
# TODO: should this be an `.exception()` call?
|
log.exception(
|
||||||
log.warning(
|
f'Failed to respond to runtime RPC request for\n\n'
|
||||||
f'Failed to respond to non-rpc request: {func}\n'
|
f'{ack}\n'
|
||||||
f'{ipc_err}'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
with cancel_scope as cs:
|
with cancel_scope as cs:
|
||||||
|
@ -220,20 +218,19 @@ async def _invoke_non_context(
|
||||||
and chan.connected()
|
and chan.connected()
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
await chan.send(
|
ret_msg = return_msg(
|
||||||
return_msg(
|
cid=cid,
|
||||||
cid=cid,
|
pld=result,
|
||||||
pld=result,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
await chan.send(ret_msg)
|
||||||
except (
|
except (
|
||||||
BrokenPipeError,
|
BrokenPipeError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
):
|
):
|
||||||
log.warning(
|
log.warning(
|
||||||
'Failed to return result:\n'
|
'Failed to send RPC result?\n'
|
||||||
f'{func}@{actor.uid}\n'
|
f'|_{func}@{actor.uid}() -> {ret_msg}\n\n'
|
||||||
f'remote chan: {chan.uid}'
|
f'x=> peer: {chan.uid}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -250,7 +247,7 @@ async def _errors_relayed_via_ipc(
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
__tracebackhide__: bool = hide_tb # TODO: use hide_tb here?
|
__tracebackhide__: bool = hide_tb
|
||||||
try:
|
try:
|
||||||
yield # run RPC invoke body
|
yield # run RPC invoke body
|
||||||
|
|
||||||
|
@ -262,23 +259,19 @@ async def _errors_relayed_via_ipc(
|
||||||
KeyboardInterrupt,
|
KeyboardInterrupt,
|
||||||
) as err:
|
) as err:
|
||||||
|
|
||||||
# always hide this frame from debug REPL if the crash
|
# NOTE: always hide this frame from debug REPL call stack
|
||||||
# originated from an rpc task and we DID NOT fail due to
|
# if the crash originated from an RPC task and we DID NOT
|
||||||
# an IPC transport error!
|
# fail due to an IPC transport error!
|
||||||
if (
|
if (
|
||||||
is_rpc
|
is_rpc
|
||||||
and chan.connected()
|
and
|
||||||
|
chan.connected()
|
||||||
):
|
):
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
|
||||||
|
# TODO: maybe we'll want different "levels" of debugging
|
||||||
|
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
||||||
if not is_multi_cancelled(err):
|
if not is_multi_cancelled(err):
|
||||||
|
|
||||||
# TODO: maybe we'll want different "levels" of debugging
|
|
||||||
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
|
||||||
|
|
||||||
# if not isinstance(err, trio.ClosedResourceError) and (
|
|
||||||
# if not is_multi_cancelled(err) and (
|
|
||||||
|
|
||||||
entered_debug: bool = False
|
entered_debug: bool = False
|
||||||
if (
|
if (
|
||||||
(
|
(
|
||||||
|
@ -310,19 +303,18 @@ async def _errors_relayed_via_ipc(
|
||||||
# strange bug in our transport layer itself? Going
|
# strange bug in our transport layer itself? Going
|
||||||
# to keep this open ended for now.
|
# to keep this open ended for now.
|
||||||
entered_debug = await _debug._maybe_enter_pm(err)
|
entered_debug = await _debug._maybe_enter_pm(err)
|
||||||
|
|
||||||
if not entered_debug:
|
if not entered_debug:
|
||||||
log.exception(
|
log.exception(
|
||||||
'RPC task crashed\n'
|
'RPC task crashed\n'
|
||||||
f'|_{ctx}'
|
f'|_{ctx}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# always (try to) ship RPC errors back to caller
|
# ALWAYS try to ship RPC errors back to parent/caller task
|
||||||
if is_rpc:
|
if is_rpc:
|
||||||
#
|
|
||||||
# TODO: tests for this scenario:
|
# TODO: tests for this scenario:
|
||||||
# - RPC caller closes connection before getting a response
|
# - RPC caller closes connection before getting a response
|
||||||
# should **not** crash this actor..
|
# should **not** crash this actor..
|
||||||
await try_ship_error_to_remote(
|
await try_ship_error_to_remote(
|
||||||
chan,
|
chan,
|
||||||
err,
|
err,
|
||||||
|
@ -331,33 +323,41 @@ async def _errors_relayed_via_ipc(
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
|
|
||||||
# error is probably from above coro running code *not from
|
# if the ctx cs is NOT allocated, the error is likely from
|
||||||
# the target rpc invocation since a scope was never
|
# above `coro` invocation machinery NOT from inside the
|
||||||
# allocated around the coroutine await.
|
# `coro` itself, i.e. err is NOT a user application error.
|
||||||
if ctx._scope is None:
|
if ctx._scope is None:
|
||||||
# we don't ever raise directly here to allow the
|
# we don't ever raise directly here to allow the
|
||||||
# msg-loop-scheduler to continue running for this
|
# msg-loop-scheduler to continue running for this
|
||||||
# channel.
|
# channel.
|
||||||
task_status.started(err)
|
task_status.started(err)
|
||||||
|
|
||||||
# always reraise KBIs so they propagate at the sys-process
|
# always reraise KBIs so they propagate at the sys-process level.
|
||||||
# level.
|
|
||||||
if isinstance(err, KeyboardInterrupt):
|
if isinstance(err, KeyboardInterrupt):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
# RPC task bookeeping.
|
||||||
# RPC task bookeeping
|
# since RPC tasks are scheduled inside a flat
|
||||||
|
# `Actor._service_n`, we add "handles" to each such that
|
||||||
|
# they can be individually ccancelled.
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
ctx, func, is_complete = actor._rpc_tasks.pop(
|
ctx: Context
|
||||||
|
func: Callable
|
||||||
|
is_complete: trio.Event
|
||||||
|
(
|
||||||
|
ctx,
|
||||||
|
func,
|
||||||
|
is_complete,
|
||||||
|
) = actor._rpc_tasks.pop(
|
||||||
(chan, ctx.cid)
|
(chan, ctx.cid)
|
||||||
)
|
)
|
||||||
is_complete.set()
|
is_complete.set()
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
# If we're cancelled before the task returns then the
|
||||||
|
# cancel scope will not have been inserted yet
|
||||||
if is_rpc:
|
if is_rpc:
|
||||||
# If we're cancelled before the task returns then the
|
|
||||||
# cancel scope will not have been inserted yet
|
|
||||||
log.warning(
|
log.warning(
|
||||||
'RPC task likely errored or cancelled before start?'
|
'RPC task likely errored or cancelled before start?'
|
||||||
f'|_{ctx._task}\n'
|
f'|_{ctx._task}\n'
|
||||||
|
@ -372,7 +372,7 @@ async def _errors_relayed_via_ipc(
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if not actor._rpc_tasks:
|
if not actor._rpc_tasks:
|
||||||
log.runtime("All RPC tasks have completed")
|
log.runtime('All RPC tasks have completed')
|
||||||
actor._ongoing_rpc_tasks.set()
|
actor._ongoing_rpc_tasks.set()
|
||||||
|
|
||||||
|
|
||||||
|
@ -414,19 +414,16 @@ async def _invoke(
|
||||||
|
|
||||||
# TODO: possibly a specially formatted traceback
|
# TODO: possibly a specially formatted traceback
|
||||||
# (not sure what typing is for this..)?
|
# (not sure what typing is for this..)?
|
||||||
# tb = None
|
# tb: TracebackType = None
|
||||||
|
|
||||||
cancel_scope = CancelScope()
|
cancel_scope = CancelScope()
|
||||||
# activated cancel scope ref
|
cs: CancelScope|None = None # ref when activated
|
||||||
cs: CancelScope|None = None
|
|
||||||
|
|
||||||
ctx = actor.get_context(
|
ctx = actor.get_context(
|
||||||
chan=chan,
|
chan=chan,
|
||||||
cid=cid,
|
cid=cid,
|
||||||
nsf=NamespacePath.from_ref(func),
|
nsf=NamespacePath.from_ref(func),
|
||||||
|
|
||||||
# TODO: if we wanted to get cray and support it?
|
# NOTE: no portal passed bc this is the "child"-side
|
||||||
# side='callee',
|
|
||||||
|
|
||||||
# We shouldn't ever need to pass this through right?
|
# We shouldn't ever need to pass this through right?
|
||||||
# it's up to the soon-to-be called rpc task to
|
# it's up to the soon-to-be called rpc task to
|
||||||
|
@ -459,8 +456,8 @@ async def _invoke(
|
||||||
kwargs['stream'] = ctx
|
kwargs['stream'] = ctx
|
||||||
|
|
||||||
|
|
||||||
|
# handle decorated ``@tractor.context`` async function
|
||||||
elif getattr(func, '_tractor_context_function', False):
|
elif getattr(func, '_tractor_context_function', False):
|
||||||
# handle decorated ``@tractor.context`` async function
|
|
||||||
kwargs['ctx'] = ctx
|
kwargs['ctx'] = ctx
|
||||||
context = True
|
context = True
|
||||||
|
|
||||||
|
@ -474,7 +471,8 @@ async def _invoke(
|
||||||
task_status=task_status,
|
task_status=task_status,
|
||||||
):
|
):
|
||||||
if not (
|
if not (
|
||||||
inspect.isasyncgenfunction(func) or
|
inspect.isasyncgenfunction(func)
|
||||||
|
or
|
||||||
inspect.iscoroutinefunction(func)
|
inspect.iscoroutinefunction(func)
|
||||||
):
|
):
|
||||||
raise TypeError(f'{func} must be an async function!')
|
raise TypeError(f'{func} must be an async function!')
|
||||||
|
@ -486,8 +484,7 @@ async def _invoke(
|
||||||
except TypeError:
|
except TypeError:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# TODO: implement all these cases in terms of the
|
# TODO: impl all these cases in terms of the `Context` one!
|
||||||
# `Context` one!
|
|
||||||
if not context:
|
if not context:
|
||||||
await _invoke_non_context(
|
await _invoke_non_context(
|
||||||
actor,
|
actor,
|
||||||
|
@ -503,7 +500,7 @@ async def _invoke(
|
||||||
return_msg,
|
return_msg,
|
||||||
task_status,
|
task_status,
|
||||||
)
|
)
|
||||||
# below is only for `@context` funcs
|
# XXX below fallthrough is ONLY for `@context` eps
|
||||||
return
|
return
|
||||||
|
|
||||||
# our most general case: a remote SC-transitive,
|
# our most general case: a remote SC-transitive,
|
||||||
|
@ -580,9 +577,6 @@ async def _invoke(
|
||||||
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
||||||
# which cancels the scope presuming the input error
|
# which cancels the scope presuming the input error
|
||||||
# is not a `.cancel_acked` pleaser.
|
# is not a `.cancel_acked` pleaser.
|
||||||
# - currently a never-should-happen-fallthrough case
|
|
||||||
# inside ._context._drain_to_final_msg()`..
|
|
||||||
# # TODO: remove this ^ right?
|
|
||||||
if ctx._scope.cancelled_caught:
|
if ctx._scope.cancelled_caught:
|
||||||
our_uid: tuple = actor.uid
|
our_uid: tuple = actor.uid
|
||||||
|
|
||||||
|
@ -598,9 +592,7 @@ async def _invoke(
|
||||||
if cs.cancel_called:
|
if cs.cancel_called:
|
||||||
|
|
||||||
canceller: tuple = ctx.canceller
|
canceller: tuple = ctx.canceller
|
||||||
msg: str = (
|
explain: str = f'{ctx.side!r}-side task was cancelled by '
|
||||||
'actor was cancelled by '
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE / TODO: if we end up having
|
# NOTE / TODO: if we end up having
|
||||||
# ``Actor._cancel_task()`` call
|
# ``Actor._cancel_task()`` call
|
||||||
|
@ -610,22 +602,28 @@ async def _invoke(
|
||||||
if ctx._cancel_called:
|
if ctx._cancel_called:
|
||||||
# TODO: test for this!!!!!
|
# TODO: test for this!!!!!
|
||||||
canceller: tuple = our_uid
|
canceller: tuple = our_uid
|
||||||
msg += 'itself '
|
explain += 'itself '
|
||||||
|
|
||||||
# if the channel which spawned the ctx is the
|
# if the channel which spawned the ctx is the
|
||||||
# one that cancelled it then we report that, vs.
|
# one that cancelled it then we report that, vs.
|
||||||
# it being some other random actor that for ex.
|
# it being some other random actor that for ex.
|
||||||
# some actor who calls `Portal.cancel_actor()`
|
# some actor who calls `Portal.cancel_actor()`
|
||||||
# and by side-effect cancels this ctx.
|
# and by side-effect cancels this ctx.
|
||||||
|
#
|
||||||
|
# TODO: determine if the ctx peer task was the
|
||||||
|
# exact task which cancelled, vs. some other
|
||||||
|
# task in the same actor.
|
||||||
elif canceller == ctx.chan.uid:
|
elif canceller == ctx.chan.uid:
|
||||||
msg += 'its caller'
|
explain += f'its {ctx.peer_side!r}-side peer'
|
||||||
|
|
||||||
else:
|
else:
|
||||||
msg += 'a remote peer'
|
explain += 'a remote peer'
|
||||||
|
|
||||||
|
# TODO: move this "div centering" into
|
||||||
|
# a helper for use elsewhere!
|
||||||
div_chars: str = '------ - ------'
|
div_chars: str = '------ - ------'
|
||||||
div_offset: int = (
|
div_offset: int = (
|
||||||
round(len(msg)/2)+1
|
round(len(explain)/2)+1
|
||||||
+
|
+
|
||||||
round(len(div_chars)/2)+1
|
round(len(div_chars)/2)+1
|
||||||
)
|
)
|
||||||
|
@ -636,11 +634,12 @@ async def _invoke(
|
||||||
+
|
+
|
||||||
f'{div_chars}\n'
|
f'{div_chars}\n'
|
||||||
)
|
)
|
||||||
msg += (
|
explain += (
|
||||||
div_str +
|
div_str +
|
||||||
f'<= canceller: {canceller}\n'
|
f'<= canceller: {canceller}\n'
|
||||||
f'=> uid: {our_uid}\n'
|
f'=> cancellee: {our_uid}\n'
|
||||||
f' |_{ctx._task}()'
|
# TODO: better repr for ctx tasks..
|
||||||
|
f' |_{ctx.side!r} {ctx._task}'
|
||||||
|
|
||||||
# TODO: instead just show the
|
# TODO: instead just show the
|
||||||
# ctx.__str__() here?
|
# ctx.__str__() here?
|
||||||
|
@ -659,7 +658,7 @@ async def _invoke(
|
||||||
# task, so relay this cancel signal to the
|
# task, so relay this cancel signal to the
|
||||||
# other side.
|
# other side.
|
||||||
ctxc = ContextCancelled(
|
ctxc = ContextCancelled(
|
||||||
message=msg,
|
message=explain,
|
||||||
boxed_type=trio.Cancelled,
|
boxed_type=trio.Cancelled,
|
||||||
canceller=canceller,
|
canceller=canceller,
|
||||||
)
|
)
|
||||||
|
@ -702,11 +701,9 @@ async def _invoke(
|
||||||
ctx: Context = actor._contexts.pop((
|
ctx: Context = actor._contexts.pop((
|
||||||
chan.uid,
|
chan.uid,
|
||||||
cid,
|
cid,
|
||||||
# ctx.side,
|
|
||||||
))
|
))
|
||||||
|
|
||||||
merr: Exception|None = ctx.maybe_error
|
merr: Exception|None = ctx.maybe_error
|
||||||
|
|
||||||
(
|
(
|
||||||
res_type_str,
|
res_type_str,
|
||||||
res_str,
|
res_str,
|
||||||
|
@ -720,7 +717,7 @@ async def _invoke(
|
||||||
)
|
)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'IPC context terminated with a final {res_type_str}\n\n'
|
f'IPC context terminated with a final {res_type_str}\n\n'
|
||||||
f'{ctx}\n'
|
f'{ctx}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -806,13 +803,19 @@ async def process_messages(
|
||||||
and `Actor.cancel()` process-wide-runtime-shutdown requests
|
and `Actor.cancel()` process-wide-runtime-shutdown requests
|
||||||
(as utilized inside `Portal.cancel_actor()` ).
|
(as utilized inside `Portal.cancel_actor()` ).
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
assert actor._service_n # state sanity
|
assert actor._service_n # state sanity
|
||||||
|
|
||||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||||
# should use it?
|
# should use it?
|
||||||
# https://github.com/python-trio/trio/issues/467
|
# -[ ] existing GH https://github.com/python-trio/trio/issues/467
|
||||||
|
# -[ ] for other transports (like QUIC) we can possibly just
|
||||||
|
# entirely avoid the feeder mem-chans since each msg will be
|
||||||
|
# delivered with a ctx-id already?
|
||||||
|
#
|
||||||
|
# |_ for ex, from `aioquic` which exposed "stream ids":
|
||||||
|
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
|
||||||
|
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Entering RPC msg loop:\n'
|
'Entering RPC msg loop:\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
|
@ -850,7 +853,7 @@ async def process_messages(
|
||||||
| Return(cid=cid)
|
| Return(cid=cid)
|
||||||
| CancelAck(cid=cid)
|
| CancelAck(cid=cid)
|
||||||
|
|
||||||
# `.cid` means RPC-ctx-task specific
|
# `.cid` indicates RPC-ctx-task scoped
|
||||||
| Error(cid=cid)
|
| Error(cid=cid)
|
||||||
|
|
||||||
# recv-side `MsgType` decode violation
|
# recv-side `MsgType` decode violation
|
||||||
|
@ -1046,16 +1049,16 @@ async def process_messages(
|
||||||
trio.Event(),
|
trio.Event(),
|
||||||
)
|
)
|
||||||
|
|
||||||
# runtime-scoped remote error (since no `.cid`)
|
# runtime-scoped remote (internal) error
|
||||||
|
# (^- bc no `Error.cid` -^)
|
||||||
|
#
|
||||||
|
# NOTE: this is the non-rpc error case, that
|
||||||
|
# is, an error NOT raised inside a call to
|
||||||
|
# `_invoke()` (i.e. no cid was provided in the
|
||||||
|
# msg - see above). Raise error inline and
|
||||||
|
# mark the channel as "globally errored" for
|
||||||
|
# all downstream consuming primitives.
|
||||||
case Error():
|
case Error():
|
||||||
# NOTE: this is the non-rpc error case,
|
|
||||||
# that is, an error **not** raised inside
|
|
||||||
# a call to ``_invoke()`` (i.e. no cid was
|
|
||||||
# provided in the msg - see above). Push
|
|
||||||
# this error to all local channel
|
|
||||||
# consumers (normally portals) by marking
|
|
||||||
# the channel as errored
|
|
||||||
# assert chan.uid
|
|
||||||
chan._exc: Exception = unpack_error(
|
chan._exc: Exception = unpack_error(
|
||||||
msg,
|
msg,
|
||||||
chan=chan,
|
chan=chan,
|
||||||
|
@ -1111,7 +1114,7 @@ async def process_messages(
|
||||||
f'|_{chan.raddr}\n'
|
f'|_{chan.raddr}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# transport **was** disconnected
|
# transport **WAS** disconnected
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except (
|
except (
|
||||||
|
@ -1150,12 +1153,11 @@ async def process_messages(
|
||||||
finally:
|
finally:
|
||||||
# msg debugging for when he machinery is brokey
|
# msg debugging for when he machinery is brokey
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Exiting IPC msg loop with\n'
|
'Exiting IPC msg loop with final msg\n\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'<= peer: {chan.uid}\n'
|
||||||
f'|_{chan}\n\n'
|
f'|_{chan}\n\n'
|
||||||
'final msg:\n'
|
f'{pformat(msg)}\n\n'
|
||||||
f'{pformat(msg)}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# transport **was not** disconnected
|
# transport **WAS NOT** disconnected
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -817,8 +817,8 @@ class Actor:
|
||||||
state.max_buffer_size = msg_buffer_size
|
state.max_buffer_size = msg_buffer_size
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.runtime(
|
log.debug(
|
||||||
f'Creating NEW IPC ctx for\n'
|
f'Allocate new IPC ctx for\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
f'cid: {cid}\n'
|
f'cid: {cid}\n'
|
||||||
)
|
)
|
||||||
|
@ -850,7 +850,7 @@ class Actor:
|
||||||
msg_buffer_size: int|None = None,
|
msg_buffer_size: int|None = None,
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
load_nsf: bool = False,
|
load_nsf: bool = False,
|
||||||
ack_timeout: float = 3,
|
ack_timeout: float = float('inf'),
|
||||||
|
|
||||||
) -> Context:
|
) -> Context:
|
||||||
'''
|
'''
|
||||||
|
@ -906,7 +906,7 @@ class Actor:
|
||||||
# this should be immediate and does not (yet) wait for the
|
# this should be immediate and does not (yet) wait for the
|
||||||
# remote child task to sync via `Context.started()`.
|
# remote child task to sync via `Context.started()`.
|
||||||
with trio.fail_after(ack_timeout):
|
with trio.fail_after(ack_timeout):
|
||||||
first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
|
first_msg: msgtypes.StartAck = await ctx._rx_chan.receive()
|
||||||
try:
|
try:
|
||||||
functype: str = first_msg.functype
|
functype: str = first_msg.functype
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
|
|
|
@ -35,7 +35,7 @@ import warnings
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
_raise_from_no_key_in_msg,
|
# _raise_from_no_key_in_msg,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
)
|
)
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
|
@ -44,8 +44,9 @@ from .trionics import (
|
||||||
BroadcastReceiver,
|
BroadcastReceiver,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
Return,
|
# Return,
|
||||||
Stop,
|
# Stop,
|
||||||
|
MsgType,
|
||||||
Yield,
|
Yield,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -94,24 +95,23 @@ class MsgStream(trio.abc.Channel):
|
||||||
self._eoc: bool|trio.EndOfChannel = False
|
self._eoc: bool|trio.EndOfChannel = False
|
||||||
self._closed: bool|trio.ClosedResourceError = False
|
self._closed: bool|trio.ClosedResourceError = False
|
||||||
|
|
||||||
|
# TODO: could we make this a direct method bind to `PldRx`?
|
||||||
|
# -> receive_nowait = PldRx.recv_pld
|
||||||
|
# |_ means latter would have to accept `MsgStream`-as-`self`?
|
||||||
|
# => should be fine as long as,
|
||||||
|
# -[ ] both define `._rx_chan`
|
||||||
|
# -[ ] .ctx is bound into `PldRx` using a `@cm`?
|
||||||
|
#
|
||||||
# delegate directly to underlying mem channel
|
# delegate directly to underlying mem channel
|
||||||
def receive_nowait(
|
def receive_nowait(
|
||||||
self,
|
self,
|
||||||
allow_msgs: list[str] = Yield,
|
expect_msg: MsgType = Yield,
|
||||||
):
|
):
|
||||||
msg: Yield|Stop = self._rx_chan.receive_nowait()
|
ctx: Context = self._ctx
|
||||||
# TODO: replace msg equiv of this or does the `.pld`
|
return ctx._pld_rx.recv_pld_nowait(
|
||||||
# interface read already satisfy it? I think so, yes?
|
ctx=ctx,
|
||||||
try:
|
expect_msg=expect_msg,
|
||||||
return msg.pld
|
)
|
||||||
except AttributeError as attrerr:
|
|
||||||
_raise_from_no_key_in_msg(
|
|
||||||
ctx=self._ctx,
|
|
||||||
msg=msg,
|
|
||||||
src_err=attrerr,
|
|
||||||
log=log,
|
|
||||||
stream=self,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def receive(
|
async def receive(
|
||||||
self,
|
self,
|
||||||
|
@ -146,24 +146,9 @@ class MsgStream(trio.abc.Channel):
|
||||||
|
|
||||||
src_err: Exception|None = None # orig tb
|
src_err: Exception|None = None # orig tb
|
||||||
try:
|
try:
|
||||||
try:
|
|
||||||
msg: Yield = await self._rx_chan.receive()
|
|
||||||
return msg.pld
|
|
||||||
|
|
||||||
# TODO: implement with match: instead?
|
ctx: Context = self._ctx
|
||||||
except AttributeError as attrerr:
|
return await ctx._pld_rx.recv_pld(ctx=ctx)
|
||||||
# src_err = kerr
|
|
||||||
src_err = attrerr
|
|
||||||
|
|
||||||
# NOTE: may raise any of the below error types
|
|
||||||
# includg EoC when a 'stop' msg is found.
|
|
||||||
_raise_from_no_key_in_msg(
|
|
||||||
ctx=self._ctx,
|
|
||||||
msg=msg,
|
|
||||||
src_err=attrerr,
|
|
||||||
log=log,
|
|
||||||
stream=self,
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: the stream terminates on either of:
|
# XXX: the stream terminates on either of:
|
||||||
# - via `self._rx_chan.receive()` raising after manual closure
|
# - via `self._rx_chan.receive()` raising after manual closure
|
||||||
|
@ -228,7 +213,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
# probably want to instead raise the remote error
|
# probably want to instead raise the remote error
|
||||||
# over the end-of-stream connection error since likely
|
# over the end-of-stream connection error since likely
|
||||||
# the remote error was the source cause?
|
# the remote error was the source cause?
|
||||||
ctx: Context = self._ctx
|
# ctx: Context = self._ctx
|
||||||
ctx.maybe_raise(
|
ctx.maybe_raise(
|
||||||
raise_ctxc_from_self_call=True,
|
raise_ctxc_from_self_call=True,
|
||||||
)
|
)
|
||||||
|
@ -292,7 +277,8 @@ class MsgStream(trio.abc.Channel):
|
||||||
while not drained:
|
while not drained:
|
||||||
try:
|
try:
|
||||||
maybe_final_msg = self.receive_nowait(
|
maybe_final_msg = self.receive_nowait(
|
||||||
allow_msgs=[Yield, Return],
|
# allow_msgs=[Yield, Return],
|
||||||
|
expect_msg=Yield,
|
||||||
)
|
)
|
||||||
if maybe_final_msg:
|
if maybe_final_msg:
|
||||||
log.debug(
|
log.debug(
|
||||||
|
@ -472,6 +458,9 @@ class MsgStream(trio.abc.Channel):
|
||||||
self,
|
self,
|
||||||
# use memory channel size by default
|
# use memory channel size by default
|
||||||
self._rx_chan._state.max_buffer_size, # type: ignore
|
self._rx_chan._state.max_buffer_size, # type: ignore
|
||||||
|
|
||||||
|
# TODO: can remove this kwarg right since
|
||||||
|
# by default behaviour is to do this anyway?
|
||||||
receive_afunc=self.receive,
|
receive_afunc=self.receive,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -517,19 +506,11 @@ class MsgStream(trio.abc.Channel):
|
||||||
raise self._closed
|
raise self._closed
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# await self._ctx.chan.send(
|
|
||||||
# payload={
|
|
||||||
# 'yield': data,
|
|
||||||
# 'cid': self._ctx.cid,
|
|
||||||
# },
|
|
||||||
# # hide_tb=hide_tb,
|
|
||||||
# )
|
|
||||||
await self._ctx.chan.send(
|
await self._ctx.chan.send(
|
||||||
payload=Yield(
|
payload=Yield(
|
||||||
cid=self._ctx.cid,
|
cid=self._ctx.cid,
|
||||||
pld=data,
|
pld=data,
|
||||||
),
|
),
|
||||||
# hide_tb=hide_tb,
|
|
||||||
)
|
)
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
|
@ -562,7 +543,7 @@ def stream(func: Callable) -> Callable:
|
||||||
'''
|
'''
|
||||||
# TODO: apply whatever solution ``mypy`` ends up picking for this:
|
# TODO: apply whatever solution ``mypy`` ends up picking for this:
|
||||||
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
|
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
|
||||||
func._tractor_stream_function = True # type: ignore
|
func._tractor_stream_function: bool = True # type: ignore
|
||||||
|
|
||||||
sig = inspect.signature(func)
|
sig = inspect.signature(func)
|
||||||
params = sig.parameters
|
params = sig.parameters
|
||||||
|
|
|
@ -37,6 +37,11 @@ from ._codec import (
|
||||||
MsgDec as MsgDec,
|
MsgDec as MsgDec,
|
||||||
current_codec as current_codec,
|
current_codec as current_codec,
|
||||||
)
|
)
|
||||||
|
# currently can't bc circular with `._context`
|
||||||
|
# from ._ops import (
|
||||||
|
# PldRx as PldRx,
|
||||||
|
# _drain_to_final_msg as _drain_to_final_msg,
|
||||||
|
# )
|
||||||
|
|
||||||
from .types import (
|
from .types import (
|
||||||
Msg as Msg,
|
Msg as Msg,
|
||||||
|
|
|
@ -75,7 +75,7 @@ log = get_logger(__name__)
|
||||||
# TODO: unify with `MsgCodec` by making `._dec` part this?
|
# TODO: unify with `MsgCodec` by making `._dec` part this?
|
||||||
class MsgDec(Struct):
|
class MsgDec(Struct):
|
||||||
'''
|
'''
|
||||||
An IPC msg decoder.
|
An IPC msg (payload) decoder.
|
||||||
|
|
||||||
Normally used to decode only a payload: `MsgType.pld:
|
Normally used to decode only a payload: `MsgType.pld:
|
||||||
PayloadT` field before delivery to IPC consumer code.
|
PayloadT` field before delivery to IPC consumer code.
|
||||||
|
@ -87,6 +87,31 @@ class MsgDec(Struct):
|
||||||
def dec(self) -> msgpack.Decoder:
|
def dec(self) -> msgpack.Decoder:
|
||||||
return self._dec
|
return self._dec
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
|
||||||
|
speclines: str = self.spec_str
|
||||||
|
|
||||||
|
# in multi-typed spec case we stick the list
|
||||||
|
# all on newlines after the |__pld_spec__:,
|
||||||
|
# OW it's prolly single type spec-value
|
||||||
|
# so just leave it on same line.
|
||||||
|
if '\n' in speclines:
|
||||||
|
speclines: str = '\n' + textwrap.indent(
|
||||||
|
speclines,
|
||||||
|
prefix=' '*3,
|
||||||
|
)
|
||||||
|
|
||||||
|
body: str = textwrap.indent(
|
||||||
|
f'|_dec_hook: {self.dec.dec_hook}\n'
|
||||||
|
f'|__pld_spec__: {speclines}\n',
|
||||||
|
prefix=' '*2,
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
f'<{type(self).__name__}(\n'
|
||||||
|
f'{body}'
|
||||||
|
')>'
|
||||||
|
)
|
||||||
|
|
||||||
# struct type unions
|
# struct type unions
|
||||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||||
#
|
#
|
||||||
|
@ -137,17 +162,7 @@ class MsgDec(Struct):
|
||||||
# TODO: would get moved into `FieldSpec.__str__()` right?
|
# TODO: would get moved into `FieldSpec.__str__()` right?
|
||||||
@property
|
@property
|
||||||
def spec_str(self) -> str:
|
def spec_str(self) -> str:
|
||||||
|
return pformat_msgspec(codec=self)
|
||||||
# TODO: could also use match: instead?
|
|
||||||
spec: Union[Type]|Type = self.spec
|
|
||||||
|
|
||||||
# `typing.Union` case
|
|
||||||
if getattr(spec, '__args__', False):
|
|
||||||
return str(spec)
|
|
||||||
|
|
||||||
# just a single type
|
|
||||||
else:
|
|
||||||
return spec.__name__
|
|
||||||
|
|
||||||
pld_spec_str = spec_str
|
pld_spec_str = spec_str
|
||||||
|
|
||||||
|
@ -168,9 +183,57 @@ def mk_dec(
|
||||||
|
|
||||||
) -> MsgDec:
|
) -> MsgDec:
|
||||||
|
|
||||||
return msgpack.Decoder(
|
return MsgDec(
|
||||||
type=spec, # like `Msg[Any]`
|
_dec=msgpack.Decoder(
|
||||||
dec_hook=dec_hook,
|
type=spec, # like `Msg[Any]`
|
||||||
|
dec_hook=dec_hook,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def mk_msgspec_table(
|
||||||
|
dec: msgpack.Decoder,
|
||||||
|
msg: MsgType|None = None,
|
||||||
|
|
||||||
|
) -> dict[str, MsgType]|str:
|
||||||
|
'''
|
||||||
|
Fill out a `dict` of `MsgType`s keyed by name
|
||||||
|
for a given input `msgspec.msgpack.Decoder`
|
||||||
|
as defined by its `.type: Union[Type]` setting.
|
||||||
|
|
||||||
|
If `msg` is provided, only deliver a `dict` with a single
|
||||||
|
entry for that type.
|
||||||
|
|
||||||
|
'''
|
||||||
|
msgspec: Union[Type]|Type = dec.type
|
||||||
|
|
||||||
|
if not (msgtypes := getattr(msgspec, '__args__', False)):
|
||||||
|
msgtypes = [msgspec]
|
||||||
|
|
||||||
|
msgt_table: dict[str, MsgType] = {
|
||||||
|
msgt: str(msgt)
|
||||||
|
for msgt in msgtypes
|
||||||
|
}
|
||||||
|
if msg:
|
||||||
|
msgt: MsgType = type(msg)
|
||||||
|
str_repr: str = msgt_table[msgt]
|
||||||
|
return {msgt: str_repr}
|
||||||
|
|
||||||
|
return msgt_table
|
||||||
|
|
||||||
|
|
||||||
|
def pformat_msgspec(
|
||||||
|
codec: MsgCodec|MsgDec,
|
||||||
|
msg: MsgType|None = None,
|
||||||
|
join_char: str = '\n',
|
||||||
|
|
||||||
|
) -> str:
|
||||||
|
dec: msgpack.Decoder = getattr(codec, 'dec', codec)
|
||||||
|
return join_char.join(
|
||||||
|
mk_msgspec_table(
|
||||||
|
dec=dec,
|
||||||
|
msg=msg,
|
||||||
|
).values()
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
||||||
|
@ -200,7 +263,7 @@ class MsgCodec(Struct):
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
speclines: str = textwrap.indent(
|
speclines: str = textwrap.indent(
|
||||||
self.pformat_msg_spec(),
|
pformat_msgspec(codec=self),
|
||||||
prefix=' '*3,
|
prefix=' '*3,
|
||||||
)
|
)
|
||||||
body: str = textwrap.indent(
|
body: str = textwrap.indent(
|
||||||
|
@ -244,33 +307,11 @@ class MsgCodec(Struct):
|
||||||
# NOTE: defined and applied inside `mk_codec()`
|
# NOTE: defined and applied inside `mk_codec()`
|
||||||
return self._dec.type
|
return self._dec.type
|
||||||
|
|
||||||
def msg_spec_items(
|
|
||||||
self,
|
|
||||||
msg: MsgType|None = None,
|
|
||||||
|
|
||||||
) -> dict[str, MsgType]|str:
|
|
||||||
|
|
||||||
msgt_table: dict[str, MsgType] = {
|
|
||||||
msgt: str(msgt)
|
|
||||||
for msgt in self.msg_spec.__args__
|
|
||||||
}
|
|
||||||
if msg:
|
|
||||||
msgt: MsgType = type(msg)
|
|
||||||
str_repr: str = msgt_table[msgt]
|
|
||||||
return {msgt: str_repr}
|
|
||||||
|
|
||||||
return msgt_table
|
|
||||||
|
|
||||||
# TODO: some way to make `pretty_struct.Struct` use this
|
# TODO: some way to make `pretty_struct.Struct` use this
|
||||||
# wrapped field over the `.msg_spec` one?
|
# wrapped field over the `.msg_spec` one?
|
||||||
def pformat_msg_spec(
|
@property
|
||||||
self,
|
def msg_spec_str(self) -> str:
|
||||||
msg: MsgType|None = None,
|
return pformat_msgspec(self.msg_spec)
|
||||||
join_char: str = '\n',
|
|
||||||
) -> str:
|
|
||||||
return join_char.join(
|
|
||||||
self.msg_spec_items(msg=msg).values()
|
|
||||||
)
|
|
||||||
|
|
||||||
lib: ModuleType = msgspec
|
lib: ModuleType = msgspec
|
||||||
|
|
||||||
|
@ -280,17 +321,32 @@ class MsgCodec(Struct):
|
||||||
def enc(self) -> msgpack.Encoder:
|
def enc(self) -> msgpack.Encoder:
|
||||||
return self._enc
|
return self._enc
|
||||||
|
|
||||||
|
# TODO: reusing encode buffer for perf?
|
||||||
|
# https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer
|
||||||
|
_buf: bytearray = bytearray()
|
||||||
|
|
||||||
def encode(
|
def encode(
|
||||||
self,
|
self,
|
||||||
py_obj: Any,
|
py_obj: Any,
|
||||||
|
|
||||||
|
use_buf: bool = False,
|
||||||
|
# ^-XXX-^ uhh why am i getting this?
|
||||||
|
# |_BufferError: Existing exports of data: object cannot be re-sized
|
||||||
|
|
||||||
) -> bytes:
|
) -> bytes:
|
||||||
'''
|
'''
|
||||||
Encode input python objects to `msgpack` bytes for
|
Encode input python objects to `msgpack` bytes for
|
||||||
transfer on a tranport protocol connection.
|
transfer on a tranport protocol connection.
|
||||||
|
|
||||||
|
When `use_buf == True` use the output buffer optimization:
|
||||||
|
https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self._enc.encode(py_obj)
|
if use_buf:
|
||||||
|
self._enc.encode_into(py_obj, self._buf)
|
||||||
|
return self._buf
|
||||||
|
else:
|
||||||
|
return self._enc.encode(py_obj)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def dec(self) -> msgpack.Decoder:
|
def dec(self) -> msgpack.Decoder:
|
||||||
|
|
|
@ -0,0 +1,563 @@
|
||||||
|
# tractor: structured concurrent "actors".
|
||||||
|
# Copyright 2018-eternity Tyler Goodlet.
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
Near-application abstractions for `MsgType.pld: PayloadT|Raw`
|
||||||
|
delivery, filtering and type checking as well as generic
|
||||||
|
operational helpers for processing transaction flows.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
from contextlib import (
|
||||||
|
# asynccontextmanager as acm,
|
||||||
|
contextmanager as cm,
|
||||||
|
)
|
||||||
|
from pprint import pformat
|
||||||
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Type,
|
||||||
|
TYPE_CHECKING,
|
||||||
|
# Union,
|
||||||
|
)
|
||||||
|
# ------ - ------
|
||||||
|
from msgspec import (
|
||||||
|
msgpack,
|
||||||
|
Raw,
|
||||||
|
Struct,
|
||||||
|
ValidationError,
|
||||||
|
)
|
||||||
|
import trio
|
||||||
|
# ------ - ------
|
||||||
|
from tractor.log import get_logger
|
||||||
|
from tractor._exceptions import (
|
||||||
|
MessagingError,
|
||||||
|
InternalError,
|
||||||
|
_raise_from_unexpected_msg,
|
||||||
|
MsgTypeError,
|
||||||
|
_mk_msg_type_err,
|
||||||
|
pack_from_raise,
|
||||||
|
)
|
||||||
|
from ._codec import (
|
||||||
|
mk_dec,
|
||||||
|
MsgDec,
|
||||||
|
)
|
||||||
|
from .types import (
|
||||||
|
CancelAck,
|
||||||
|
Error,
|
||||||
|
MsgType,
|
||||||
|
PayloadT,
|
||||||
|
Return,
|
||||||
|
Started,
|
||||||
|
Stop,
|
||||||
|
Yield,
|
||||||
|
# pretty_struct,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from tractor._context import Context
|
||||||
|
from tractor._streaming import MsgStream
|
||||||
|
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PldRx(Struct):
|
||||||
|
'''
|
||||||
|
A "msg payload receiver".
|
||||||
|
|
||||||
|
The pairing of a "feeder" `trio.abc.ReceiveChannel` and an
|
||||||
|
interchange-specific (eg. msgpack) payload field decoder. The
|
||||||
|
validation/type-filtering rules are runtime mutable and allow
|
||||||
|
type constraining the set of `MsgType.pld: Raw|PayloadT`
|
||||||
|
values at runtime, per IPC task-context.
|
||||||
|
|
||||||
|
This abstraction, being just below "user application code",
|
||||||
|
allows for the equivalent of our `MsgCodec` (used for
|
||||||
|
typer-filtering IPC dialog protocol msgs against a msg-spec)
|
||||||
|
but with granular control around payload delivery (i.e. the
|
||||||
|
data-values user code actually sees and uses (the blobs that
|
||||||
|
are "shuttled" by the wrapping dialog prot) such that invalid
|
||||||
|
`.pld: Raw` can be decoded and handled by IPC-primitive user
|
||||||
|
code (i.e. that operates on `Context` and `Msgstream` APIs)
|
||||||
|
without knowledge of the lower level `Channel`/`MsgTransport`
|
||||||
|
primitives nor the `MsgCodec` in use. Further, lazily decoding
|
||||||
|
payload blobs allows for topical (and maybe intentionally
|
||||||
|
"partial") encryption of msg field subsets.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# TODO: better to bind it here?
|
||||||
|
# _rx_mc: trio.MemoryReceiveChannel
|
||||||
|
_msgdec: MsgDec = mk_dec(spec=Any)
|
||||||
|
|
||||||
|
_ipc: Context|MsgStream|None = None
|
||||||
|
|
||||||
|
@cm
|
||||||
|
def apply_to_ipc(
|
||||||
|
self,
|
||||||
|
ipc_prim: Context|MsgStream,
|
||||||
|
|
||||||
|
) -> PldRx:
|
||||||
|
'''
|
||||||
|
Apply this payload receiver to an IPC primitive type, one
|
||||||
|
of `Context` or `MsgStream`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
self._ipc = ipc_prim
|
||||||
|
try:
|
||||||
|
yield self
|
||||||
|
finally:
|
||||||
|
self._ipc = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dec(self) -> msgpack.Decoder:
|
||||||
|
return self._msgdec.dec
|
||||||
|
|
||||||
|
def recv_pld_nowait(
|
||||||
|
self,
|
||||||
|
# TODO: make this `MsgStream` compat as well, see above^
|
||||||
|
# ipc_prim: Context|MsgStream,
|
||||||
|
ctx: Context,
|
||||||
|
|
||||||
|
ipc_msg: MsgType|None = None,
|
||||||
|
expect_msg: Type[MsgType]|None = None,
|
||||||
|
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> Any|Raw:
|
||||||
|
|
||||||
|
msg: MsgType = (
|
||||||
|
ipc_msg
|
||||||
|
or
|
||||||
|
|
||||||
|
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||||
|
ctx._rx_chan.receive_nowait()
|
||||||
|
)
|
||||||
|
return self.dec_msg(
|
||||||
|
msg,
|
||||||
|
ctx=ctx,
|
||||||
|
expect_msg=expect_msg,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def recv_pld(
|
||||||
|
self,
|
||||||
|
ctx: Context,
|
||||||
|
ipc_msg: MsgType|None = None,
|
||||||
|
expect_msg: Type[MsgType]|None = None,
|
||||||
|
|
||||||
|
**kwargs
|
||||||
|
|
||||||
|
) -> Any|Raw:
|
||||||
|
'''
|
||||||
|
Receive a `MsgType`, then decode and return its `.pld` field.
|
||||||
|
|
||||||
|
'''
|
||||||
|
msg: MsgType = (
|
||||||
|
ipc_msg
|
||||||
|
or
|
||||||
|
|
||||||
|
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||||
|
await ctx._rx_chan.receive()
|
||||||
|
)
|
||||||
|
return self.dec_msg(
|
||||||
|
msg,
|
||||||
|
ctx=ctx,
|
||||||
|
expect_msg=expect_msg,
|
||||||
|
)
|
||||||
|
|
||||||
|
def dec_msg(
|
||||||
|
self,
|
||||||
|
msg: MsgType,
|
||||||
|
ctx: Context,
|
||||||
|
expect_msg: Type[MsgType]|None = None,
|
||||||
|
|
||||||
|
) -> PayloadT|Raw:
|
||||||
|
'''
|
||||||
|
Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and
|
||||||
|
return the value or raise an appropriate error.
|
||||||
|
|
||||||
|
'''
|
||||||
|
match msg:
|
||||||
|
# payload-data shuttle msg; deliver the `.pld` value
|
||||||
|
# directly to IPC (primitive) client-consumer code.
|
||||||
|
case (
|
||||||
|
Started(pld=pld) # sync phase
|
||||||
|
|Yield(pld=pld) # streaming phase
|
||||||
|
|Return(pld=pld) # termination phase
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
pld: PayloadT = self._msgdec.decode(pld)
|
||||||
|
log.runtime(
|
||||||
|
'Decode msg payload\n\n'
|
||||||
|
f'{msg}\n\n'
|
||||||
|
f'{pld}\n'
|
||||||
|
)
|
||||||
|
return pld
|
||||||
|
|
||||||
|
# XXX pld-type failure
|
||||||
|
except ValidationError as src_err:
|
||||||
|
msgterr: MsgTypeError = _mk_msg_type_err(
|
||||||
|
msg=msg,
|
||||||
|
codec=self._dec,
|
||||||
|
src_validation_error=src_err,
|
||||||
|
)
|
||||||
|
msg: Error = pack_from_raise(
|
||||||
|
local_err=msgterr,
|
||||||
|
cid=msg.cid,
|
||||||
|
src_uid=ctx.chan.uid,
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX some other decoder specific failure?
|
||||||
|
# except TypeError as src_error:
|
||||||
|
# from .devx import mk_pdb
|
||||||
|
# mk_pdb().set_trace()
|
||||||
|
# raise src_error
|
||||||
|
|
||||||
|
# a runtime-internal RPC endpoint response.
|
||||||
|
# always passthrough since (internal) runtime
|
||||||
|
# responses are generally never exposed to consumer
|
||||||
|
# code.
|
||||||
|
case CancelAck(
|
||||||
|
pld=bool(cancelled)
|
||||||
|
):
|
||||||
|
return cancelled
|
||||||
|
|
||||||
|
case Error():
|
||||||
|
src_err = MessagingError(
|
||||||
|
'IPC dialog termination by msg'
|
||||||
|
)
|
||||||
|
|
||||||
|
case _:
|
||||||
|
src_err = InternalError(
|
||||||
|
'Unknown IPC msg ??\n\n'
|
||||||
|
f'{msg}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# fallthrough and raise from `src_err`
|
||||||
|
_raise_from_unexpected_msg(
|
||||||
|
ctx=ctx,
|
||||||
|
msg=msg,
|
||||||
|
src_err=src_err,
|
||||||
|
log=log,
|
||||||
|
expect_msg=expect_msg,
|
||||||
|
hide_tb=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
async def recv_msg_w_pld(
|
||||||
|
self,
|
||||||
|
ipc: Context|MsgStream,
|
||||||
|
|
||||||
|
) -> tuple[MsgType, PayloadT]:
|
||||||
|
'''
|
||||||
|
Retrieve the next avail IPC msg, decode it's payload, and return
|
||||||
|
the pair of refs.
|
||||||
|
|
||||||
|
'''
|
||||||
|
msg: MsgType = await ipc._rx_chan.receive()
|
||||||
|
|
||||||
|
# TODO: is there some way we can inject the decoded
|
||||||
|
# payload into an existing output buffer for the original
|
||||||
|
# msg instance?
|
||||||
|
pld: PayloadT = self.dec_msg(
|
||||||
|
msg,
|
||||||
|
ctx=ipc,
|
||||||
|
)
|
||||||
|
return msg, pld
|
||||||
|
|
||||||
|
|
||||||
|
async def drain_to_final_msg(
|
||||||
|
ctx: Context,
|
||||||
|
|
||||||
|
hide_tb: bool = True,
|
||||||
|
msg_limit: int = 6,
|
||||||
|
|
||||||
|
) -> tuple[
|
||||||
|
Return|None,
|
||||||
|
list[MsgType]
|
||||||
|
]:
|
||||||
|
'''
|
||||||
|
Drain IPC msgs delivered to the underlying IPC primitive's
|
||||||
|
rx-mem-chan (eg. `Context._rx_chan`) from the runtime in
|
||||||
|
search for a final result or error.
|
||||||
|
|
||||||
|
The motivation here is to ideally capture errors during ctxc
|
||||||
|
conditions where a canc-request/or local error is sent but the
|
||||||
|
local task also excepts and enters the
|
||||||
|
`Portal.open_context().__aexit__()` block wherein we prefer to
|
||||||
|
capture and raise any remote error or ctxc-ack as part of the
|
||||||
|
`ctx.result()` cleanup and teardown sequence.
|
||||||
|
|
||||||
|
'''
|
||||||
|
__tracebackhide__: bool = hide_tb
|
||||||
|
raise_overrun: bool = not ctx._allow_overruns
|
||||||
|
|
||||||
|
# wait for a final context result by collecting (but
|
||||||
|
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||||
|
# from the far end.
|
||||||
|
pre_result_drained: list[MsgType] = []
|
||||||
|
return_msg: Return|None = None
|
||||||
|
while not (
|
||||||
|
ctx.maybe_error
|
||||||
|
and not ctx._final_result_is_set()
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
# TODO: can remove?
|
||||||
|
# await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
# NOTE: this REPL usage actually works here dawg! Bo
|
||||||
|
# from .devx._debug import pause
|
||||||
|
# await pause()
|
||||||
|
|
||||||
|
# TODO: bad idea?
|
||||||
|
# -[ ] wrap final outcome channel wait in a scope so
|
||||||
|
# it can be cancelled out of band if needed?
|
||||||
|
#
|
||||||
|
# with trio.CancelScope() as res_cs:
|
||||||
|
# ctx._res_scope = res_cs
|
||||||
|
# msg: dict = await ctx._rx_chan.receive()
|
||||||
|
# if res_cs.cancelled_caught:
|
||||||
|
|
||||||
|
# TODO: ensure there's no more hangs, debugging the
|
||||||
|
# runtime pretty preaase!
|
||||||
|
# from .devx._debug import pause
|
||||||
|
# await pause()
|
||||||
|
|
||||||
|
# TODO: can remove this finally?
|
||||||
|
# we have no more need for the sync draining right
|
||||||
|
# since we're can kinda guarantee the async
|
||||||
|
# `.receive()` below will never block yah?
|
||||||
|
#
|
||||||
|
# if (
|
||||||
|
# ctx._cancel_called and (
|
||||||
|
# ctx.cancel_acked
|
||||||
|
# # or ctx.chan._cancel_called
|
||||||
|
# )
|
||||||
|
# # or not ctx._final_result_is_set()
|
||||||
|
# # ctx.outcome is not
|
||||||
|
# # or ctx.chan._closed
|
||||||
|
# ):
|
||||||
|
# try:
|
||||||
|
# msg: dict = await ctx._rx_chan.receive_nowait()()
|
||||||
|
# except trio.WouldBlock:
|
||||||
|
# log.warning(
|
||||||
|
# 'When draining already `.cancel_called` ctx!\n'
|
||||||
|
# 'No final msg arrived..\n'
|
||||||
|
# )
|
||||||
|
# break
|
||||||
|
# else:
|
||||||
|
# msg: dict = await ctx._rx_chan.receive()
|
||||||
|
|
||||||
|
# TODO: don't need it right jefe?
|
||||||
|
# with trio.move_on_after(1) as cs:
|
||||||
|
# if cs.cancelled_caught:
|
||||||
|
# from .devx._debug import pause
|
||||||
|
# await pause()
|
||||||
|
|
||||||
|
# pray to the `trio` gawds that we're corrent with this
|
||||||
|
# msg: dict = await ctx._rx_chan.receive()
|
||||||
|
msg, pld = await ctx._pld_rx.recv_msg_w_pld(ipc=ctx)
|
||||||
|
|
||||||
|
# NOTE: we get here if the far end was
|
||||||
|
# `ContextCancelled` in 2 cases:
|
||||||
|
# 1. we requested the cancellation and thus
|
||||||
|
# SHOULD NOT raise that far end error,
|
||||||
|
# 2. WE DID NOT REQUEST that cancel and thus
|
||||||
|
# SHOULD RAISE HERE!
|
||||||
|
except trio.Cancelled:
|
||||||
|
|
||||||
|
# CASE 2: mask the local cancelled-error(s)
|
||||||
|
# only when we are sure the remote error is
|
||||||
|
# the source cause of this local task's
|
||||||
|
# cancellation.
|
||||||
|
ctx.maybe_raise()
|
||||||
|
|
||||||
|
# CASE 1: we DID request the cancel we simply
|
||||||
|
# continue to bubble up as normal.
|
||||||
|
raise
|
||||||
|
|
||||||
|
match msg:
|
||||||
|
|
||||||
|
# final result arrived!
|
||||||
|
case Return(
|
||||||
|
# cid=cid,
|
||||||
|
# pld=res,
|
||||||
|
):
|
||||||
|
# ctx._result: Any = res
|
||||||
|
ctx._result: Any = pld
|
||||||
|
log.runtime(
|
||||||
|
'Context delivered final draining msg:\n'
|
||||||
|
f'{pformat(msg)}'
|
||||||
|
)
|
||||||
|
# XXX: only close the rx mem chan AFTER
|
||||||
|
# a final result is retreived.
|
||||||
|
# if ctx._rx_chan:
|
||||||
|
# await ctx._rx_chan.aclose()
|
||||||
|
# TODO: ^ we don't need it right?
|
||||||
|
return_msg = msg
|
||||||
|
break
|
||||||
|
|
||||||
|
# far end task is still streaming to us so discard
|
||||||
|
# and report depending on local ctx state.
|
||||||
|
case Yield():
|
||||||
|
pre_result_drained.append(msg)
|
||||||
|
if (
|
||||||
|
(ctx._stream.closed
|
||||||
|
and (reason := 'stream was already closed')
|
||||||
|
)
|
||||||
|
or (ctx.cancel_acked
|
||||||
|
and (reason := 'ctx cancelled other side')
|
||||||
|
)
|
||||||
|
or (ctx._cancel_called
|
||||||
|
and (reason := 'ctx called `.cancel()`')
|
||||||
|
)
|
||||||
|
or (len(pre_result_drained) > msg_limit
|
||||||
|
and (reason := f'"yield" limit={msg_limit}')
|
||||||
|
)
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Cancelling `MsgStream` drain since '
|
||||||
|
f'{reason}\n\n'
|
||||||
|
f'<= {ctx.chan.uid}\n'
|
||||||
|
f' |_{ctx._nsf}()\n\n'
|
||||||
|
f'=> {ctx._task}\n'
|
||||||
|
f' |_{ctx._stream}\n\n'
|
||||||
|
|
||||||
|
f'{pformat(msg)}\n'
|
||||||
|
)
|
||||||
|
return (
|
||||||
|
return_msg,
|
||||||
|
pre_result_drained,
|
||||||
|
)
|
||||||
|
|
||||||
|
# drain up to the `msg_limit` hoping to get
|
||||||
|
# a final result or error/ctxc.
|
||||||
|
else:
|
||||||
|
log.warning(
|
||||||
|
'Ignoring "yield" msg during `ctx.result()` drain..\n'
|
||||||
|
f'<= {ctx.chan.uid}\n'
|
||||||
|
f' |_{ctx._nsf}()\n\n'
|
||||||
|
f'=> {ctx._task}\n'
|
||||||
|
f' |_{ctx._stream}\n\n'
|
||||||
|
|
||||||
|
f'{pformat(msg)}\n'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# stream terminated, but no result yet..
|
||||||
|
#
|
||||||
|
# TODO: work out edge cases here where
|
||||||
|
# a stream is open but the task also calls
|
||||||
|
# this?
|
||||||
|
# -[ ] should be a runtime error if a stream is open right?
|
||||||
|
# Stop()
|
||||||
|
case Stop():
|
||||||
|
pre_result_drained.append(msg)
|
||||||
|
log.cancel(
|
||||||
|
'Remote stream terminated due to "stop" msg:\n\n'
|
||||||
|
f'{pformat(msg)}\n'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# remote error msg, likely already handled inside
|
||||||
|
# `Context._deliver_msg()`
|
||||||
|
case Error():
|
||||||
|
# TODO: can we replace this with `ctx.maybe_raise()`?
|
||||||
|
# -[ ] would this be handier for this case maybe?
|
||||||
|
# async with maybe_raise_on_exit() as raises:
|
||||||
|
# if raises:
|
||||||
|
# log.error('some msg about raising..')
|
||||||
|
#
|
||||||
|
re: Exception|None = ctx._remote_error
|
||||||
|
if re:
|
||||||
|
assert msg is ctx._cancel_msg
|
||||||
|
# NOTE: this solved a super duper edge case XD
|
||||||
|
# this was THE super duper edge case of:
|
||||||
|
# - local task opens a remote task,
|
||||||
|
# - requests remote cancellation of far end
|
||||||
|
# ctx/tasks,
|
||||||
|
# - needs to wait for the cancel ack msg
|
||||||
|
# (ctxc) or some result in the race case
|
||||||
|
# where the other side's task returns
|
||||||
|
# before the cancel request msg is ever
|
||||||
|
# rxed and processed,
|
||||||
|
# - here this surrounding drain loop (which
|
||||||
|
# iterates all ipc msgs until the ack or
|
||||||
|
# an early result arrives) was NOT exiting
|
||||||
|
# since we are the edge case: local task
|
||||||
|
# does not re-raise any ctxc it receives
|
||||||
|
# IFF **it** was the cancellation
|
||||||
|
# requester..
|
||||||
|
#
|
||||||
|
# XXX will raise if necessary but ow break
|
||||||
|
# from loop presuming any supressed error
|
||||||
|
# (ctxc) should terminate the context!
|
||||||
|
ctx._maybe_raise_remote_err(
|
||||||
|
re,
|
||||||
|
# NOTE: obvi we don't care if we
|
||||||
|
# overran the far end if we're already
|
||||||
|
# waiting on a final result (msg).
|
||||||
|
# raise_overrun_from_self=False,
|
||||||
|
raise_overrun_from_self=raise_overrun,
|
||||||
|
)
|
||||||
|
|
||||||
|
break # OOOOOF, yeah obvi we need this..
|
||||||
|
|
||||||
|
# XXX we should never really get here
|
||||||
|
# right! since `._deliver_msg()` should
|
||||||
|
# always have detected an {'error': ..}
|
||||||
|
# msg and already called this right!?!
|
||||||
|
# elif error := unpack_error(
|
||||||
|
# msg=msg,
|
||||||
|
# chan=ctx._portal.channel,
|
||||||
|
# hide_tb=False,
|
||||||
|
# ):
|
||||||
|
# log.critical('SHOULD NEVER GET HERE!?')
|
||||||
|
# assert msg is ctx._cancel_msg
|
||||||
|
# assert error.msgdata == ctx._remote_error.msgdata
|
||||||
|
# assert error.ipc_msg == ctx._remote_error.ipc_msg
|
||||||
|
# from .devx._debug import pause
|
||||||
|
# await pause()
|
||||||
|
# ctx._maybe_cancel_and_set_remote_error(error)
|
||||||
|
# ctx._maybe_raise_remote_err(error)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# bubble the original src key error
|
||||||
|
raise
|
||||||
|
|
||||||
|
# XXX should pretty much never get here unless someone
|
||||||
|
# overrides the default `MsgType` spec.
|
||||||
|
case _:
|
||||||
|
pre_result_drained.append(msg)
|
||||||
|
# It's definitely an internal error if any other
|
||||||
|
# msg type without a`'cid'` field arrives here!
|
||||||
|
if not msg.cid:
|
||||||
|
raise InternalError(
|
||||||
|
'Unexpected cid-missing msg?\n\n'
|
||||||
|
f'{msg}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
raise RuntimeError('Unknown msg type: {msg}')
|
||||||
|
|
||||||
|
else:
|
||||||
|
log.cancel(
|
||||||
|
'Skipping `MsgStream` drain since final outcome is set\n\n'
|
||||||
|
f'{ctx.outcome}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
return (
|
||||||
|
return_msg,
|
||||||
|
pre_result_drained,
|
||||||
|
)
|
|
@ -102,6 +102,59 @@ def iter_fields(struct: Struct) -> Iterator[
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def pformat(
|
||||||
|
struct: Struct,
|
||||||
|
field_indent: int = 2,
|
||||||
|
indent: int = 0,
|
||||||
|
|
||||||
|
) -> str:
|
||||||
|
'''
|
||||||
|
Recursion-safe `pprint.pformat()` style formatting of
|
||||||
|
a `msgspec.Struct` for sane reading by a human using a REPL.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# global whitespace indent
|
||||||
|
ws: str = ' '*indent
|
||||||
|
|
||||||
|
# field whitespace indent
|
||||||
|
field_ws: str = ' '*(field_indent + indent)
|
||||||
|
|
||||||
|
# qtn: str = ws + struct.__class__.__qualname__
|
||||||
|
qtn: str = struct.__class__.__qualname__
|
||||||
|
|
||||||
|
obj_str: str = '' # accumulator
|
||||||
|
fi: structs.FieldInfo
|
||||||
|
k: str
|
||||||
|
v: Any
|
||||||
|
for fi, k, v in iter_fields(struct):
|
||||||
|
|
||||||
|
# TODO: how can we prefer `Literal['option1', 'option2,
|
||||||
|
# ..]` over .__name__ == `Literal` but still get only the
|
||||||
|
# latter for simple types like `str | int | None` etc..?
|
||||||
|
ft: type = fi.type
|
||||||
|
typ_name: str = getattr(ft, '__name__', str(ft))
|
||||||
|
|
||||||
|
# recurse to get sub-struct's `.pformat()` output Bo
|
||||||
|
if isinstance(v, Struct):
|
||||||
|
val_str: str = v.pformat(
|
||||||
|
indent=field_indent + indent,
|
||||||
|
field_indent=indent + field_indent,
|
||||||
|
)
|
||||||
|
|
||||||
|
else: # the `pprint` recursion-safe format:
|
||||||
|
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
||||||
|
val_str: str = saferepr(v)
|
||||||
|
|
||||||
|
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
|
||||||
|
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
||||||
|
|
||||||
|
return (
|
||||||
|
f'{qtn}(\n'
|
||||||
|
f'{obj_str}'
|
||||||
|
f'{ws})'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Struct(
|
class Struct(
|
||||||
_Struct,
|
_Struct,
|
||||||
|
|
||||||
|
@ -140,65 +193,12 @@ class Struct(
|
||||||
|
|
||||||
return sin_props
|
return sin_props
|
||||||
|
|
||||||
# TODO: make thisi a mod-func!
|
pformat = pformat
|
||||||
def pformat(
|
# __str__ = __repr__ = pformat
|
||||||
self,
|
|
||||||
field_indent: int = 2,
|
|
||||||
indent: int = 0,
|
|
||||||
|
|
||||||
) -> str:
|
|
||||||
'''
|
|
||||||
Recursion-safe `pprint.pformat()` style formatting of
|
|
||||||
a `msgspec.Struct` for sane reading by a human using a REPL.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# global whitespace indent
|
|
||||||
ws: str = ' '*indent
|
|
||||||
|
|
||||||
# field whitespace indent
|
|
||||||
field_ws: str = ' '*(field_indent + indent)
|
|
||||||
|
|
||||||
# qtn: str = ws + self.__class__.__qualname__
|
|
||||||
qtn: str = self.__class__.__qualname__
|
|
||||||
|
|
||||||
obj_str: str = '' # accumulator
|
|
||||||
fi: structs.FieldInfo
|
|
||||||
k: str
|
|
||||||
v: Any
|
|
||||||
for fi, k, v in iter_fields(self):
|
|
||||||
|
|
||||||
# TODO: how can we prefer `Literal['option1', 'option2,
|
|
||||||
# ..]` over .__name__ == `Literal` but still get only the
|
|
||||||
# latter for simple types like `str | int | None` etc..?
|
|
||||||
ft: type = fi.type
|
|
||||||
typ_name: str = getattr(ft, '__name__', str(ft))
|
|
||||||
|
|
||||||
# recurse to get sub-struct's `.pformat()` output Bo
|
|
||||||
if isinstance(v, Struct):
|
|
||||||
val_str: str = v.pformat(
|
|
||||||
indent=field_indent + indent,
|
|
||||||
field_indent=indent + field_indent,
|
|
||||||
)
|
|
||||||
|
|
||||||
else: # the `pprint` recursion-safe format:
|
|
||||||
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
|
||||||
val_str: str = saferepr(v)
|
|
||||||
|
|
||||||
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
|
|
||||||
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
|
||||||
|
|
||||||
return (
|
|
||||||
f'{qtn}(\n'
|
|
||||||
f'{obj_str}'
|
|
||||||
f'{ws})'
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
|
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
|
||||||
# inside a known tty?
|
# inside a known tty?
|
||||||
# def __repr__(self) -> str:
|
# def __repr__(self) -> str:
|
||||||
# ...
|
# ...
|
||||||
|
|
||||||
# __str__ = __repr__ = pformat
|
|
||||||
__repr__ = pformat
|
__repr__ = pformat
|
||||||
|
|
||||||
def copy(
|
def copy(
|
||||||
|
|
Loading…
Reference in New Issue