Compare commits
11 Commits
6e0ef76128
...
a5a0e6854b
Author | SHA1 | Date |
---|---|---|
|
a5a0e6854b | |
|
c383978402 | |
|
08fcd3fb03 | |
|
adba454d1d | |
|
4bab998ff9 | |
|
c25c77c573 | |
|
188ff0e0e5 | |
|
6b30c86eca | |
|
6aa52417ef | |
|
18e97a8f9a | |
|
5eb9144921 |
|
@ -472,13 +472,17 @@ class Context:
|
|||
return 'parent' if self._portal else 'child'
|
||||
|
||||
@staticmethod
|
||||
def peer_side(side: str) -> str:
|
||||
def _peer_side(side: str) -> str:
|
||||
match side:
|
||||
case 'child':
|
||||
return 'parent'
|
||||
case 'parent':
|
||||
return 'child'
|
||||
|
||||
@property
|
||||
def peer_side(self) -> str:
|
||||
return self._peer_side(self.side)
|
||||
|
||||
# TODO: remove stat!
|
||||
# -[ ] re-implement the `.experiemental._pubsub` stuff
|
||||
# with `MsgStream` and that should be last usage?
|
||||
|
@ -512,9 +516,7 @@ class Context:
|
|||
equiv of a `StopIteration`.
|
||||
|
||||
'''
|
||||
await self.chan.send(
|
||||
Stop(cid=self.cid)
|
||||
)
|
||||
await self.chan.send(Stop(cid=self.cid))
|
||||
|
||||
def _maybe_cancel_and_set_remote_error(
|
||||
self,
|
||||
|
@ -593,7 +595,6 @@ class Context:
|
|||
# TODO: never do this right?
|
||||
# if self._remote_error:
|
||||
# return
|
||||
peer_side: str = self.peer_side(self.side)
|
||||
|
||||
# XXX: denote and set the remote side's error so that
|
||||
# after we cancel whatever task is the opener of this
|
||||
|
@ -601,7 +602,7 @@ class Context:
|
|||
# appropriately.
|
||||
log.runtime(
|
||||
'Setting remote error for ctx\n\n'
|
||||
f'<= {peer_side!r}: {self.chan.uid}\n'
|
||||
f'<= {self.peer_side!r}: {self.chan.uid}\n'
|
||||
f'=> {self.side!r}\n\n'
|
||||
f'{error}'
|
||||
)
|
||||
|
@ -623,9 +624,8 @@ class Context:
|
|||
|
||||
elif isinstance(error, MsgTypeError):
|
||||
msgerr = True
|
||||
peer_side: str = self.peer_side(self.side)
|
||||
log.error(
|
||||
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
|
||||
f'IPC dialog error due to msg-type caused by {self.peer_side!r} side\n\n'
|
||||
|
||||
f'{error}\n'
|
||||
f'{pformat(self)}\n'
|
||||
|
@ -1067,12 +1067,12 @@ class Context:
|
|||
except trio.EndOfChannel as eoc:
|
||||
if (
|
||||
eoc
|
||||
and stream.closed
|
||||
and
|
||||
stream.closed
|
||||
):
|
||||
# sanity, can remove?
|
||||
assert eoc is stream._eoc
|
||||
# from .devx import pause
|
||||
# await pause()
|
||||
|
||||
log.warning(
|
||||
'Stream was terminated by EoC\n\n'
|
||||
# NOTE: won't show the error <Type> but
|
||||
|
@ -1644,10 +1644,9 @@ class Context:
|
|||
side: str = self.side
|
||||
if side == 'child':
|
||||
assert not self._portal
|
||||
peer_side: str = self.peer_side(side)
|
||||
|
||||
flow_body: str = (
|
||||
f'<= peer {peer_side!r}: {from_uid}\n'
|
||||
f'<= peer {self.peer_side!r}: {from_uid}\n'
|
||||
f' |_<{nsf}()>\n\n'
|
||||
|
||||
f'=> {side!r}: {self._task}\n'
|
||||
|
@ -1665,7 +1664,7 @@ class Context:
|
|||
log_meth = log.runtime
|
||||
|
||||
log_meth(
|
||||
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
|
||||
f'Delivering IPC ctx error from {self.peer_side!r} to {side!r} task\n\n'
|
||||
|
||||
f'{flow_body}'
|
||||
|
||||
|
@ -2330,7 +2329,7 @@ async def open_context_from_portal(
|
|||
and ctx.cancel_acked
|
||||
):
|
||||
log.cancel(
|
||||
'Context cancelled by caller task\n'
|
||||
'Context cancelled by {ctx.side!r}-side task\n'
|
||||
f'|_{ctx._task}\n\n'
|
||||
|
||||
f'{repr(scope_err)}\n'
|
||||
|
@ -2364,6 +2363,7 @@ async def open_context_from_portal(
|
|||
None,
|
||||
)
|
||||
|
||||
|
||||
def mk_context(
|
||||
chan: Channel,
|
||||
cid: str,
|
||||
|
|
|
@ -54,6 +54,7 @@ from tractor.msg import (
|
|||
from tractor.msg.pretty_struct import (
|
||||
iter_fields,
|
||||
Struct,
|
||||
pformat as struct_format,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -108,6 +109,10 @@ _body_fields: list[str] = list(
|
|||
'relay_path',
|
||||
'_msg_dict',
|
||||
'cid',
|
||||
|
||||
# since only ctxc should show it but `Error` does
|
||||
# have it as an optional field.
|
||||
'canceller',
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -382,6 +387,9 @@ class RemoteActorError(Exception):
|
|||
'''
|
||||
Error type raised by original remote faulting actor.
|
||||
|
||||
When the error has only been relayed a single actor-hop
|
||||
this will be the same as the `.boxed_type`.
|
||||
|
||||
'''
|
||||
if self._src_type is None:
|
||||
self._src_type = get_err_type(
|
||||
|
@ -396,7 +404,8 @@ class RemoteActorError(Exception):
|
|||
String-name of the (last hop's) boxed error type.
|
||||
|
||||
'''
|
||||
return self._ipc_msg.boxed_type_str
|
||||
bt: Type[BaseException] = self.boxed_type
|
||||
return str(bt.__name__)
|
||||
|
||||
@property
|
||||
def boxed_type(self) -> str:
|
||||
|
@ -492,7 +501,11 @@ class RemoteActorError(Exception):
|
|||
'''
|
||||
# TODO: use this matryoshka emjoi XD
|
||||
# => 🪆
|
||||
reprol_str: str = f'{type(self).__name__}('
|
||||
reprol_str: str = (
|
||||
f'{type(self).__name__}' # type name
|
||||
f'[{self.boxed_type_str}]' # parameterized by boxed type
|
||||
'(' # init-style look
|
||||
)
|
||||
_repr: str = self._mk_fields_str(
|
||||
self.reprol_fields,
|
||||
end_char=' ',
|
||||
|
@ -532,7 +545,8 @@ class RemoteActorError(Exception):
|
|||
self,
|
||||
) -> BaseException:
|
||||
'''
|
||||
Unpack the inner-most source error from it's original IPC msg data.
|
||||
Unpack the inner-most source error from it's original IPC
|
||||
msg data.
|
||||
|
||||
We attempt to reconstruct (as best as we can) the original
|
||||
`Exception` from as it would have been raised in the
|
||||
|
@ -570,6 +584,14 @@ class RemoteActorError(Exception):
|
|||
# # boxed_type=get_type_ref(..
|
||||
# raise NotImplementedError
|
||||
|
||||
@property
|
||||
def sender(self) -> tuple[str, str]|None:
|
||||
if (
|
||||
(msg := self._ipc_msg)
|
||||
and (value := msg.sender)
|
||||
):
|
||||
return tuple(value)
|
||||
|
||||
|
||||
class ContextCancelled(RemoteActorError):
|
||||
'''
|
||||
|
@ -644,8 +666,8 @@ class MsgTypeError(
|
|||
- `Yield`
|
||||
- TODO: any embedded `.pld` type defined by user code?
|
||||
|
||||
Normally the source of an error is re-raised from some `.msg._codec`
|
||||
decode which itself raises in a backend interchange
|
||||
Normally the source of an error is re-raised from some
|
||||
`.msg._codec` decode which itself raises in a backend interchange
|
||||
lib (eg. a `msgspec.ValidationError`).
|
||||
|
||||
'''
|
||||
|
@ -734,20 +756,6 @@ class StreamOverrun(
|
|||
handled by app code using `MsgStream.send()/.receive()`.
|
||||
|
||||
'''
|
||||
@property
|
||||
def sender(self) -> tuple[str, str] | None:
|
||||
value = self._ipc_msg.sender
|
||||
if value:
|
||||
return tuple(value)
|
||||
|
||||
|
||||
# class InternalActorError(RemoteActorError):
|
||||
# '''
|
||||
# Boxed (Remote) internal `tractor` error indicating failure of some
|
||||
# primitive, machinery state or lowlevel task that should never
|
||||
# occur.
|
||||
|
||||
# '''
|
||||
|
||||
|
||||
class TransportClosed(trio.ClosedResourceError):
|
||||
|
@ -944,8 +952,7 @@ def _raise_from_unexpected_msg(
|
|||
src_err: AttributeError,
|
||||
log: StackLevelAdapter, # caller specific `log` obj
|
||||
|
||||
expect_msg: str = Yield,
|
||||
stream: MsgStream | None = None,
|
||||
expect_msg: Type[MsgType],
|
||||
|
||||
# allow "deeper" tbs when debugging B^o
|
||||
hide_tb: bool = True,
|
||||
|
@ -987,6 +994,8 @@ def _raise_from_unexpected_msg(
|
|||
) from src_err
|
||||
|
||||
# TODO: test that shows stream raising an expected error!!!
|
||||
stream: MsgStream|None
|
||||
_type: str = 'Context'
|
||||
|
||||
# raise the error message in a boxed exception type!
|
||||
if isinstance(msg, Error):
|
||||
|
@ -1003,59 +1012,54 @@ def _raise_from_unexpected_msg(
|
|||
# TODO: does it make more sense to pack
|
||||
# the stream._eoc outside this in the calleer always?
|
||||
# case Stop():
|
||||
elif (
|
||||
isinstance(msg, Stop)
|
||||
or (
|
||||
stream
|
||||
and stream._eoc
|
||||
)
|
||||
):
|
||||
log.debug(
|
||||
f'Context[{cid}] stream was stopped by remote side\n'
|
||||
f'cid: {cid}\n'
|
||||
)
|
||||
elif stream := ctx._stream:
|
||||
_type: str = 'MsgStream'
|
||||
|
||||
# TODO: if the a local task is already blocking on
|
||||
# a `Context.result()` and thus a `.receive()` on the
|
||||
# rx-chan, we close the chan and set state ensuring that
|
||||
# an eoc is raised!
|
||||
if (
|
||||
stream._eoc
|
||||
or
|
||||
isinstance(msg, Stop)
|
||||
):
|
||||
log.debug(
|
||||
f'Context[{cid}] stream was stopped by remote side\n'
|
||||
f'cid: {cid}\n'
|
||||
)
|
||||
|
||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||
# block below it will trigger ``.aclose()``.
|
||||
eoc = trio.EndOfChannel(
|
||||
f'Context stream ended due to msg:\n\n'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# XXX: important to set so that a new `.receive()`
|
||||
# call (likely by another task using a broadcast receiver)
|
||||
# doesn't accidentally pull the `return` message
|
||||
# value out of the underlying feed mem chan which is
|
||||
# destined for the `Context.result()` call during ctx-exit!
|
||||
stream._eoc: Exception = eoc
|
||||
# TODO: if the a local task is already blocking on
|
||||
# a `Context.result()` and thus a `.receive()` on the
|
||||
# rx-chan, we close the chan and set state ensuring that
|
||||
# an eoc is raised!
|
||||
|
||||
# in case there already is some underlying remote error
|
||||
# that arrived which is probably the source of this stream
|
||||
# closure
|
||||
ctx.maybe_raise()
|
||||
raise eoc from src_err
|
||||
# XXX: this causes ``ReceiveChannel.__anext__()`` to
|
||||
# raise a ``StopAsyncIteration`` **and** in our catch
|
||||
# block below it will trigger ``.aclose()``.
|
||||
eoc = trio.EndOfChannel(
|
||||
f'Context stream ended due to msg:\n\n'
|
||||
f'{pformat(msg)}\n'
|
||||
)
|
||||
# XXX: important to set so that a new `.receive()`
|
||||
# call (likely by another task using a broadcast receiver)
|
||||
# doesn't accidentally pull the `return` message
|
||||
# value out of the underlying feed mem chan which is
|
||||
# destined for the `Context.result()` call during ctx-exit!
|
||||
stream._eoc: Exception = eoc
|
||||
|
||||
if (
|
||||
stream
|
||||
and stream._closed
|
||||
):
|
||||
# TODO: our own error subtype?
|
||||
raise trio.ClosedResourceError(
|
||||
'This stream was closed'
|
||||
)
|
||||
# in case there already is some underlying remote error
|
||||
# that arrived which is probably the source of this stream
|
||||
# closure
|
||||
ctx.maybe_raise()
|
||||
raise eoc from src_err
|
||||
|
||||
# TODO: our own transport/IPC-broke error subtype?
|
||||
if stream._closed:
|
||||
raise trio.ClosedResourceError('This stream was closed')
|
||||
|
||||
# always re-raise the source error if no translation error case
|
||||
# is activated above.
|
||||
_type: str = 'Stream' if stream else 'Context'
|
||||
raise MessagingError(
|
||||
f"{_type} was expecting a {expect_msg} message"
|
||||
" BUT received a non-error msg:\n"
|
||||
f'{pformat(msg)}'
|
||||
f'{_type} was expecting a {expect_msg.__name__!r} message'
|
||||
' BUT received a non-error msg:\n\n'
|
||||
f'{struct_format(msg)}'
|
||||
) from src_err
|
||||
|
||||
|
||||
|
@ -1088,13 +1092,11 @@ def _mk_msg_type_err(
|
|||
# no src error from `msgspec.msgpack.Decoder.decode()` so
|
||||
# prolly a manual type-check on our part.
|
||||
if message is None:
|
||||
fmt_spec: str = codec.pformat_msg_spec()
|
||||
fmt_stack: str = (
|
||||
'\n'.join(traceback.format_stack(limit=3))
|
||||
)
|
||||
tb_fmt: str = pformat_boxed_tb(
|
||||
tb_str=fmt_stack,
|
||||
# fields_str=header,
|
||||
field_prefix=' ',
|
||||
indent='',
|
||||
)
|
||||
|
@ -1102,8 +1104,7 @@ def _mk_msg_type_err(
|
|||
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
||||
f'{tb_fmt}\n'
|
||||
f'Valid IPC msgs are:\n\n'
|
||||
# f' ------ - ------\n'
|
||||
f'{fmt_spec}\n',
|
||||
f'{codec.msg_spec_str}\n',
|
||||
)
|
||||
elif src_type_error:
|
||||
src_message: str = str(src_type_error)
|
||||
|
|
|
@ -420,7 +420,6 @@ class Portal:
|
|||
kwargs=kwargs,
|
||||
portal=self,
|
||||
)
|
||||
ctx._portal = self
|
||||
|
||||
# ensure receive-only stream entrypoint
|
||||
assert ctx._remote_func_type == 'asyncgen'
|
||||
|
@ -430,9 +429,10 @@ class Portal:
|
|||
async with MsgStream(
|
||||
ctx=ctx,
|
||||
rx_chan=ctx._rx_chan,
|
||||
) as rchan:
|
||||
self._streams.add(rchan)
|
||||
yield rchan
|
||||
) as stream:
|
||||
self._streams.add(stream)
|
||||
ctx._stream = stream
|
||||
yield stream
|
||||
|
||||
finally:
|
||||
|
||||
|
@ -454,7 +454,7 @@ class Portal:
|
|||
|
||||
# XXX: should this always be done?
|
||||
# await recv_chan.aclose()
|
||||
self._streams.remove(rchan)
|
||||
self._streams.remove(stream)
|
||||
|
||||
# NOTE: impl is found in `._context`` mod to make
|
||||
# reading/groking the details simpler code-org-wise. This
|
||||
|
|
184
tractor/_rpc.py
184
tractor/_rpc.py
|
@ -181,12 +181,11 @@ async def _invoke_non_context(
|
|||
# way: using the linked IPC context machinery.
|
||||
failed_resp: bool = False
|
||||
try:
|
||||
await chan.send(
|
||||
StartAck(
|
||||
cid=cid,
|
||||
functype='asyncfunc',
|
||||
)
|
||||
ack = StartAck(
|
||||
cid=cid,
|
||||
functype='asyncfunc',
|
||||
)
|
||||
await chan.send(ack)
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
|
@ -194,12 +193,11 @@ async def _invoke_non_context(
|
|||
) as ipc_err:
|
||||
failed_resp = True
|
||||
if is_rpc:
|
||||
raise
|
||||
raise ipc_err
|
||||
else:
|
||||
# TODO: should this be an `.exception()` call?
|
||||
log.warning(
|
||||
f'Failed to respond to non-rpc request: {func}\n'
|
||||
f'{ipc_err}'
|
||||
log.exception(
|
||||
f'Failed to respond to runtime RPC request for\n\n'
|
||||
f'{ack}\n'
|
||||
)
|
||||
|
||||
with cancel_scope as cs:
|
||||
|
@ -220,20 +218,19 @@ async def _invoke_non_context(
|
|||
and chan.connected()
|
||||
):
|
||||
try:
|
||||
await chan.send(
|
||||
return_msg(
|
||||
cid=cid,
|
||||
pld=result,
|
||||
)
|
||||
ret_msg = return_msg(
|
||||
cid=cid,
|
||||
pld=result,
|
||||
)
|
||||
await chan.send(ret_msg)
|
||||
except (
|
||||
BrokenPipeError,
|
||||
trio.BrokenResourceError,
|
||||
):
|
||||
log.warning(
|
||||
'Failed to return result:\n'
|
||||
f'{func}@{actor.uid}\n'
|
||||
f'remote chan: {chan.uid}'
|
||||
'Failed to send RPC result?\n'
|
||||
f'|_{func}@{actor.uid}() -> {ret_msg}\n\n'
|
||||
f'x=> peer: {chan.uid}\n'
|
||||
)
|
||||
|
||||
@acm
|
||||
|
@ -250,7 +247,7 @@ async def _errors_relayed_via_ipc(
|
|||
] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
__tracebackhide__: bool = hide_tb # TODO: use hide_tb here?
|
||||
__tracebackhide__: bool = hide_tb
|
||||
try:
|
||||
yield # run RPC invoke body
|
||||
|
||||
|
@ -262,23 +259,19 @@ async def _errors_relayed_via_ipc(
|
|||
KeyboardInterrupt,
|
||||
) as err:
|
||||
|
||||
# always hide this frame from debug REPL if the crash
|
||||
# originated from an rpc task and we DID NOT fail due to
|
||||
# an IPC transport error!
|
||||
# NOTE: always hide this frame from debug REPL call stack
|
||||
# if the crash originated from an RPC task and we DID NOT
|
||||
# fail due to an IPC transport error!
|
||||
if (
|
||||
is_rpc
|
||||
and chan.connected()
|
||||
and
|
||||
chan.connected()
|
||||
):
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
||||
# TODO: maybe we'll want different "levels" of debugging
|
||||
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
||||
if not is_multi_cancelled(err):
|
||||
|
||||
# TODO: maybe we'll want different "levels" of debugging
|
||||
# eventualy such as ('app', 'supervisory', 'runtime') ?
|
||||
|
||||
# if not isinstance(err, trio.ClosedResourceError) and (
|
||||
# if not is_multi_cancelled(err) and (
|
||||
|
||||
entered_debug: bool = False
|
||||
if (
|
||||
(
|
||||
|
@ -310,19 +303,18 @@ async def _errors_relayed_via_ipc(
|
|||
# strange bug in our transport layer itself? Going
|
||||
# to keep this open ended for now.
|
||||
entered_debug = await _debug._maybe_enter_pm(err)
|
||||
|
||||
if not entered_debug:
|
||||
log.exception(
|
||||
'RPC task crashed\n'
|
||||
f'|_{ctx}'
|
||||
)
|
||||
|
||||
# always (try to) ship RPC errors back to caller
|
||||
# ALWAYS try to ship RPC errors back to parent/caller task
|
||||
if is_rpc:
|
||||
#
|
||||
|
||||
# TODO: tests for this scenario:
|
||||
# - RPC caller closes connection before getting a response
|
||||
# should **not** crash this actor..
|
||||
# should **not** crash this actor..
|
||||
await try_ship_error_to_remote(
|
||||
chan,
|
||||
err,
|
||||
|
@ -331,33 +323,41 @@ async def _errors_relayed_via_ipc(
|
|||
hide_tb=hide_tb,
|
||||
)
|
||||
|
||||
# error is probably from above coro running code *not from
|
||||
# the target rpc invocation since a scope was never
|
||||
# allocated around the coroutine await.
|
||||
# if the ctx cs is NOT allocated, the error is likely from
|
||||
# above `coro` invocation machinery NOT from inside the
|
||||
# `coro` itself, i.e. err is NOT a user application error.
|
||||
if ctx._scope is None:
|
||||
# we don't ever raise directly here to allow the
|
||||
# msg-loop-scheduler to continue running for this
|
||||
# channel.
|
||||
task_status.started(err)
|
||||
|
||||
# always reraise KBIs so they propagate at the sys-process
|
||||
# level.
|
||||
# always reraise KBIs so they propagate at the sys-process level.
|
||||
if isinstance(err, KeyboardInterrupt):
|
||||
raise
|
||||
|
||||
|
||||
# RPC task bookeeping
|
||||
# RPC task bookeeping.
|
||||
# since RPC tasks are scheduled inside a flat
|
||||
# `Actor._service_n`, we add "handles" to each such that
|
||||
# they can be individually ccancelled.
|
||||
finally:
|
||||
try:
|
||||
ctx, func, is_complete = actor._rpc_tasks.pop(
|
||||
ctx: Context
|
||||
func: Callable
|
||||
is_complete: trio.Event
|
||||
(
|
||||
ctx,
|
||||
func,
|
||||
is_complete,
|
||||
) = actor._rpc_tasks.pop(
|
||||
(chan, ctx.cid)
|
||||
)
|
||||
is_complete.set()
|
||||
|
||||
except KeyError:
|
||||
# If we're cancelled before the task returns then the
|
||||
# cancel scope will not have been inserted yet
|
||||
if is_rpc:
|
||||
# If we're cancelled before the task returns then the
|
||||
# cancel scope will not have been inserted yet
|
||||
log.warning(
|
||||
'RPC task likely errored or cancelled before start?'
|
||||
f'|_{ctx._task}\n'
|
||||
|
@ -372,7 +372,7 @@ async def _errors_relayed_via_ipc(
|
|||
|
||||
finally:
|
||||
if not actor._rpc_tasks:
|
||||
log.runtime("All RPC tasks have completed")
|
||||
log.runtime('All RPC tasks have completed')
|
||||
actor._ongoing_rpc_tasks.set()
|
||||
|
||||
|
||||
|
@ -414,19 +414,16 @@ async def _invoke(
|
|||
|
||||
# TODO: possibly a specially formatted traceback
|
||||
# (not sure what typing is for this..)?
|
||||
# tb = None
|
||||
# tb: TracebackType = None
|
||||
|
||||
cancel_scope = CancelScope()
|
||||
# activated cancel scope ref
|
||||
cs: CancelScope|None = None
|
||||
|
||||
cs: CancelScope|None = None # ref when activated
|
||||
ctx = actor.get_context(
|
||||
chan=chan,
|
||||
cid=cid,
|
||||
nsf=NamespacePath.from_ref(func),
|
||||
|
||||
# TODO: if we wanted to get cray and support it?
|
||||
# side='callee',
|
||||
# NOTE: no portal passed bc this is the "child"-side
|
||||
|
||||
# We shouldn't ever need to pass this through right?
|
||||
# it's up to the soon-to-be called rpc task to
|
||||
|
@ -459,8 +456,8 @@ async def _invoke(
|
|||
kwargs['stream'] = ctx
|
||||
|
||||
|
||||
# handle decorated ``@tractor.context`` async function
|
||||
elif getattr(func, '_tractor_context_function', False):
|
||||
# handle decorated ``@tractor.context`` async function
|
||||
kwargs['ctx'] = ctx
|
||||
context = True
|
||||
|
||||
|
@ -474,7 +471,8 @@ async def _invoke(
|
|||
task_status=task_status,
|
||||
):
|
||||
if not (
|
||||
inspect.isasyncgenfunction(func) or
|
||||
inspect.isasyncgenfunction(func)
|
||||
or
|
||||
inspect.iscoroutinefunction(func)
|
||||
):
|
||||
raise TypeError(f'{func} must be an async function!')
|
||||
|
@ -486,8 +484,7 @@ async def _invoke(
|
|||
except TypeError:
|
||||
raise
|
||||
|
||||
# TODO: implement all these cases in terms of the
|
||||
# `Context` one!
|
||||
# TODO: impl all these cases in terms of the `Context` one!
|
||||
if not context:
|
||||
await _invoke_non_context(
|
||||
actor,
|
||||
|
@ -503,7 +500,7 @@ async def _invoke(
|
|||
return_msg,
|
||||
task_status,
|
||||
)
|
||||
# below is only for `@context` funcs
|
||||
# XXX below fallthrough is ONLY for `@context` eps
|
||||
return
|
||||
|
||||
# our most general case: a remote SC-transitive,
|
||||
|
@ -580,9 +577,6 @@ async def _invoke(
|
|||
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
||||
# which cancels the scope presuming the input error
|
||||
# is not a `.cancel_acked` pleaser.
|
||||
# - currently a never-should-happen-fallthrough case
|
||||
# inside ._context._drain_to_final_msg()`..
|
||||
# # TODO: remove this ^ right?
|
||||
if ctx._scope.cancelled_caught:
|
||||
our_uid: tuple = actor.uid
|
||||
|
||||
|
@ -598,9 +592,7 @@ async def _invoke(
|
|||
if cs.cancel_called:
|
||||
|
||||
canceller: tuple = ctx.canceller
|
||||
msg: str = (
|
||||
'actor was cancelled by '
|
||||
)
|
||||
explain: str = f'{ctx.side!r}-side task was cancelled by '
|
||||
|
||||
# NOTE / TODO: if we end up having
|
||||
# ``Actor._cancel_task()`` call
|
||||
|
@ -610,22 +602,28 @@ async def _invoke(
|
|||
if ctx._cancel_called:
|
||||
# TODO: test for this!!!!!
|
||||
canceller: tuple = our_uid
|
||||
msg += 'itself '
|
||||
explain += 'itself '
|
||||
|
||||
# if the channel which spawned the ctx is the
|
||||
# one that cancelled it then we report that, vs.
|
||||
# it being some other random actor that for ex.
|
||||
# some actor who calls `Portal.cancel_actor()`
|
||||
# and by side-effect cancels this ctx.
|
||||
#
|
||||
# TODO: determine if the ctx peer task was the
|
||||
# exact task which cancelled, vs. some other
|
||||
# task in the same actor.
|
||||
elif canceller == ctx.chan.uid:
|
||||
msg += 'its caller'
|
||||
explain += f'its {ctx.peer_side!r}-side peer'
|
||||
|
||||
else:
|
||||
msg += 'a remote peer'
|
||||
explain += 'a remote peer'
|
||||
|
||||
# TODO: move this "div centering" into
|
||||
# a helper for use elsewhere!
|
||||
div_chars: str = '------ - ------'
|
||||
div_offset: int = (
|
||||
round(len(msg)/2)+1
|
||||
round(len(explain)/2)+1
|
||||
+
|
||||
round(len(div_chars)/2)+1
|
||||
)
|
||||
|
@ -636,11 +634,12 @@ async def _invoke(
|
|||
+
|
||||
f'{div_chars}\n'
|
||||
)
|
||||
msg += (
|
||||
explain += (
|
||||
div_str +
|
||||
f'<= canceller: {canceller}\n'
|
||||
f'=> uid: {our_uid}\n'
|
||||
f' |_{ctx._task}()'
|
||||
f'=> cancellee: {our_uid}\n'
|
||||
# TODO: better repr for ctx tasks..
|
||||
f' |_{ctx.side!r} {ctx._task}'
|
||||
|
||||
# TODO: instead just show the
|
||||
# ctx.__str__() here?
|
||||
|
@ -659,7 +658,7 @@ async def _invoke(
|
|||
# task, so relay this cancel signal to the
|
||||
# other side.
|
||||
ctxc = ContextCancelled(
|
||||
message=msg,
|
||||
message=explain,
|
||||
boxed_type=trio.Cancelled,
|
||||
canceller=canceller,
|
||||
)
|
||||
|
@ -702,11 +701,9 @@ async def _invoke(
|
|||
ctx: Context = actor._contexts.pop((
|
||||
chan.uid,
|
||||
cid,
|
||||
# ctx.side,
|
||||
))
|
||||
|
||||
merr: Exception|None = ctx.maybe_error
|
||||
|
||||
(
|
||||
res_type_str,
|
||||
res_str,
|
||||
|
@ -720,7 +717,7 @@ async def _invoke(
|
|||
)
|
||||
log.runtime(
|
||||
f'IPC context terminated with a final {res_type_str}\n\n'
|
||||
f'{ctx}\n'
|
||||
f'{ctx}'
|
||||
)
|
||||
|
||||
|
||||
|
@ -806,13 +803,19 @@ async def process_messages(
|
|||
and `Actor.cancel()` process-wide-runtime-shutdown requests
|
||||
(as utilized inside `Portal.cancel_actor()` ).
|
||||
|
||||
|
||||
'''
|
||||
assert actor._service_n # state sanity
|
||||
|
||||
# TODO: once `trio` get's an "obvious way" for req/resp we
|
||||
# should use it?
|
||||
# https://github.com/python-trio/trio/issues/467
|
||||
# -[ ] existing GH https://github.com/python-trio/trio/issues/467
|
||||
# -[ ] for other transports (like QUIC) we can possibly just
|
||||
# entirely avoid the feeder mem-chans since each msg will be
|
||||
# delivered with a ctx-id already?
|
||||
#
|
||||
# |_ for ex, from `aioquic` which exposed "stream ids":
|
||||
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L1175
|
||||
# - https://github.com/aiortc/aioquic/blob/main/src/aioquic/quic/connection.py#L659
|
||||
log.runtime(
|
||||
'Entering RPC msg loop:\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
|
@ -850,7 +853,7 @@ async def process_messages(
|
|||
| Return(cid=cid)
|
||||
| CancelAck(cid=cid)
|
||||
|
||||
# `.cid` means RPC-ctx-task specific
|
||||
# `.cid` indicates RPC-ctx-task scoped
|
||||
| Error(cid=cid)
|
||||
|
||||
# recv-side `MsgType` decode violation
|
||||
|
@ -1046,16 +1049,16 @@ async def process_messages(
|
|||
trio.Event(),
|
||||
)
|
||||
|
||||
# runtime-scoped remote error (since no `.cid`)
|
||||
# runtime-scoped remote (internal) error
|
||||
# (^- bc no `Error.cid` -^)
|
||||
#
|
||||
# NOTE: this is the non-rpc error case, that
|
||||
# is, an error NOT raised inside a call to
|
||||
# `_invoke()` (i.e. no cid was provided in the
|
||||
# msg - see above). Raise error inline and
|
||||
# mark the channel as "globally errored" for
|
||||
# all downstream consuming primitives.
|
||||
case Error():
|
||||
# NOTE: this is the non-rpc error case,
|
||||
# that is, an error **not** raised inside
|
||||
# a call to ``_invoke()`` (i.e. no cid was
|
||||
# provided in the msg - see above). Push
|
||||
# this error to all local channel
|
||||
# consumers (normally portals) by marking
|
||||
# the channel as errored
|
||||
# assert chan.uid
|
||||
chan._exc: Exception = unpack_error(
|
||||
msg,
|
||||
chan=chan,
|
||||
|
@ -1111,7 +1114,7 @@ async def process_messages(
|
|||
f'|_{chan.raddr}\n'
|
||||
)
|
||||
|
||||
# transport **was** disconnected
|
||||
# transport **WAS** disconnected
|
||||
return True
|
||||
|
||||
except (
|
||||
|
@ -1150,12 +1153,11 @@ async def process_messages(
|
|||
finally:
|
||||
# msg debugging for when he machinery is brokey
|
||||
log.runtime(
|
||||
'Exiting IPC msg loop with\n'
|
||||
f'peer: {chan.uid}\n'
|
||||
'Exiting IPC msg loop with final msg\n\n'
|
||||
f'<= peer: {chan.uid}\n'
|
||||
f'|_{chan}\n\n'
|
||||
'final msg:\n'
|
||||
f'{pformat(msg)}\n'
|
||||
f'{pformat(msg)}\n\n'
|
||||
)
|
||||
|
||||
# transport **was not** disconnected
|
||||
# transport **WAS NOT** disconnected
|
||||
return False
|
||||
|
|
|
@ -850,7 +850,7 @@ class Actor:
|
|||
msg_buffer_size: int|None = None,
|
||||
allow_overruns: bool = False,
|
||||
load_nsf: bool = False,
|
||||
ack_timeout: float = 3,
|
||||
ack_timeout: float = float('inf'),
|
||||
|
||||
) -> Context:
|
||||
'''
|
||||
|
|
|
@ -37,6 +37,11 @@ from ._codec import (
|
|||
MsgDec as MsgDec,
|
||||
current_codec as current_codec,
|
||||
)
|
||||
# currently can't bc circular with `._context`
|
||||
# from ._ops import (
|
||||
# PldRx as PldRx,
|
||||
# _drain_to_final_msg as _drain_to_final_msg,
|
||||
# )
|
||||
|
||||
from .types import (
|
||||
Msg as Msg,
|
||||
|
|
|
@ -75,7 +75,7 @@ log = get_logger(__name__)
|
|||
# TODO: unify with `MsgCodec` by making `._dec` part this?
|
||||
class MsgDec(Struct):
|
||||
'''
|
||||
An IPC msg decoder.
|
||||
An IPC msg (payload) decoder.
|
||||
|
||||
Normally used to decode only a payload: `MsgType.pld:
|
||||
PayloadT` field before delivery to IPC consumer code.
|
||||
|
@ -87,6 +87,31 @@ class MsgDec(Struct):
|
|||
def dec(self) -> msgpack.Decoder:
|
||||
return self._dec
|
||||
|
||||
def __repr__(self) -> str:
|
||||
|
||||
speclines: str = self.spec_str
|
||||
|
||||
# in multi-typed spec case we stick the list
|
||||
# all on newlines after the |__pld_spec__:,
|
||||
# OW it's prolly single type spec-value
|
||||
# so just leave it on same line.
|
||||
if '\n' in speclines:
|
||||
speclines: str = '\n' + textwrap.indent(
|
||||
speclines,
|
||||
prefix=' '*3,
|
||||
)
|
||||
|
||||
body: str = textwrap.indent(
|
||||
f'|_dec_hook: {self.dec.dec_hook}\n'
|
||||
f'|__pld_spec__: {speclines}\n',
|
||||
prefix=' '*2,
|
||||
)
|
||||
return (
|
||||
f'<{type(self).__name__}(\n'
|
||||
f'{body}'
|
||||
')>'
|
||||
)
|
||||
|
||||
# struct type unions
|
||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
#
|
||||
|
@ -137,17 +162,7 @@ class MsgDec(Struct):
|
|||
# TODO: would get moved into `FieldSpec.__str__()` right?
|
||||
@property
|
||||
def spec_str(self) -> str:
|
||||
|
||||
# TODO: could also use match: instead?
|
||||
spec: Union[Type]|Type = self.spec
|
||||
|
||||
# `typing.Union` case
|
||||
if getattr(spec, '__args__', False):
|
||||
return str(spec)
|
||||
|
||||
# just a single type
|
||||
else:
|
||||
return spec.__name__
|
||||
return pformat_msgspec(codec=self)
|
||||
|
||||
pld_spec_str = spec_str
|
||||
|
||||
|
@ -168,9 +183,57 @@ def mk_dec(
|
|||
|
||||
) -> MsgDec:
|
||||
|
||||
return msgpack.Decoder(
|
||||
type=spec, # like `Msg[Any]`
|
||||
dec_hook=dec_hook,
|
||||
return MsgDec(
|
||||
_dec=msgpack.Decoder(
|
||||
type=spec, # like `Msg[Any]`
|
||||
dec_hook=dec_hook,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def mk_msgspec_table(
|
||||
dec: msgpack.Decoder,
|
||||
msg: MsgType|None = None,
|
||||
|
||||
) -> dict[str, MsgType]|str:
|
||||
'''
|
||||
Fill out a `dict` of `MsgType`s keyed by name
|
||||
for a given input `msgspec.msgpack.Decoder`
|
||||
as defined by its `.type: Union[Type]` setting.
|
||||
|
||||
If `msg` is provided, only deliver a `dict` with a single
|
||||
entry for that type.
|
||||
|
||||
'''
|
||||
msgspec: Union[Type]|Type = dec.type
|
||||
|
||||
if not (msgtypes := getattr(msgspec, '__args__', False)):
|
||||
msgtypes = [msgspec]
|
||||
|
||||
msgt_table: dict[str, MsgType] = {
|
||||
msgt: str(msgt)
|
||||
for msgt in msgtypes
|
||||
}
|
||||
if msg:
|
||||
msgt: MsgType = type(msg)
|
||||
str_repr: str = msgt_table[msgt]
|
||||
return {msgt: str_repr}
|
||||
|
||||
return msgt_table
|
||||
|
||||
|
||||
def pformat_msgspec(
|
||||
codec: MsgCodec|MsgDec,
|
||||
msg: MsgType|None = None,
|
||||
join_char: str = '\n',
|
||||
|
||||
) -> str:
|
||||
dec: msgpack.Decoder = getattr(codec, 'dec', codec)
|
||||
return join_char.join(
|
||||
mk_msgspec_table(
|
||||
dec=dec,
|
||||
msg=msg,
|
||||
).values()
|
||||
)
|
||||
|
||||
# TODO: overall IPC msg-spec features (i.e. in this mod)!
|
||||
|
@ -200,7 +263,7 @@ class MsgCodec(Struct):
|
|||
|
||||
def __repr__(self) -> str:
|
||||
speclines: str = textwrap.indent(
|
||||
self.pformat_msg_spec(),
|
||||
pformat_msgspec(codec=self),
|
||||
prefix=' '*3,
|
||||
)
|
||||
body: str = textwrap.indent(
|
||||
|
@ -244,33 +307,11 @@ class MsgCodec(Struct):
|
|||
# NOTE: defined and applied inside `mk_codec()`
|
||||
return self._dec.type
|
||||
|
||||
def msg_spec_items(
|
||||
self,
|
||||
msg: MsgType|None = None,
|
||||
|
||||
) -> dict[str, MsgType]|str:
|
||||
|
||||
msgt_table: dict[str, MsgType] = {
|
||||
msgt: str(msgt)
|
||||
for msgt in self.msg_spec.__args__
|
||||
}
|
||||
if msg:
|
||||
msgt: MsgType = type(msg)
|
||||
str_repr: str = msgt_table[msgt]
|
||||
return {msgt: str_repr}
|
||||
|
||||
return msgt_table
|
||||
|
||||
# TODO: some way to make `pretty_struct.Struct` use this
|
||||
# wrapped field over the `.msg_spec` one?
|
||||
def pformat_msg_spec(
|
||||
self,
|
||||
msg: MsgType|None = None,
|
||||
join_char: str = '\n',
|
||||
) -> str:
|
||||
return join_char.join(
|
||||
self.msg_spec_items(msg=msg).values()
|
||||
)
|
||||
@property
|
||||
def msg_spec_str(self) -> str:
|
||||
return pformat_msgspec(self.msg_spec)
|
||||
|
||||
lib: ModuleType = msgspec
|
||||
|
||||
|
@ -280,17 +321,32 @@ class MsgCodec(Struct):
|
|||
def enc(self) -> msgpack.Encoder:
|
||||
return self._enc
|
||||
|
||||
# TODO: reusing encode buffer for perf?
|
||||
# https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer
|
||||
_buf: bytearray = bytearray()
|
||||
|
||||
def encode(
|
||||
self,
|
||||
py_obj: Any,
|
||||
|
||||
use_buf: bool = False,
|
||||
# ^-XXX-^ uhh why am i getting this?
|
||||
# |_BufferError: Existing exports of data: object cannot be re-sized
|
||||
|
||||
) -> bytes:
|
||||
'''
|
||||
Encode input python objects to `msgpack` bytes for
|
||||
transfer on a tranport protocol connection.
|
||||
|
||||
When `use_buf == True` use the output buffer optimization:
|
||||
https://jcristharif.com/msgspec/perf-tips.html#reusing-an-output-buffer
|
||||
|
||||
'''
|
||||
return self._enc.encode(py_obj)
|
||||
if use_buf:
|
||||
self._enc.encode_into(py_obj, self._buf)
|
||||
return self._buf
|
||||
else:
|
||||
return self._enc.encode(py_obj)
|
||||
|
||||
@property
|
||||
def dec(self) -> msgpack.Decoder:
|
||||
|
|
|
@ -146,7 +146,11 @@ class PldRx(Struct):
|
|||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||
ctx._rx_chan.receive_nowait()
|
||||
)
|
||||
return self.dec_msg(msg)
|
||||
return self.dec_msg(
|
||||
msg,
|
||||
ctx=ctx,
|
||||
expect_msg=expect_msg,
|
||||
)
|
||||
|
||||
async def recv_pld(
|
||||
self,
|
||||
|
|
|
@ -102,6 +102,59 @@ def iter_fields(struct: Struct) -> Iterator[
|
|||
)
|
||||
|
||||
|
||||
def pformat(
|
||||
struct: Struct,
|
||||
field_indent: int = 2,
|
||||
indent: int = 0,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
Recursion-safe `pprint.pformat()` style formatting of
|
||||
a `msgspec.Struct` for sane reading by a human using a REPL.
|
||||
|
||||
'''
|
||||
# global whitespace indent
|
||||
ws: str = ' '*indent
|
||||
|
||||
# field whitespace indent
|
||||
field_ws: str = ' '*(field_indent + indent)
|
||||
|
||||
# qtn: str = ws + struct.__class__.__qualname__
|
||||
qtn: str = struct.__class__.__qualname__
|
||||
|
||||
obj_str: str = '' # accumulator
|
||||
fi: structs.FieldInfo
|
||||
k: str
|
||||
v: Any
|
||||
for fi, k, v in iter_fields(struct):
|
||||
|
||||
# TODO: how can we prefer `Literal['option1', 'option2,
|
||||
# ..]` over .__name__ == `Literal` but still get only the
|
||||
# latter for simple types like `str | int | None` etc..?
|
||||
ft: type = fi.type
|
||||
typ_name: str = getattr(ft, '__name__', str(ft))
|
||||
|
||||
# recurse to get sub-struct's `.pformat()` output Bo
|
||||
if isinstance(v, Struct):
|
||||
val_str: str = v.pformat(
|
||||
indent=field_indent + indent,
|
||||
field_indent=indent + field_indent,
|
||||
)
|
||||
|
||||
else: # the `pprint` recursion-safe format:
|
||||
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
||||
val_str: str = saferepr(v)
|
||||
|
||||
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
|
||||
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
||||
|
||||
return (
|
||||
f'{qtn}(\n'
|
||||
f'{obj_str}'
|
||||
f'{ws})'
|
||||
)
|
||||
|
||||
|
||||
class Struct(
|
||||
_Struct,
|
||||
|
||||
|
@ -140,65 +193,12 @@ class Struct(
|
|||
|
||||
return sin_props
|
||||
|
||||
# TODO: make thisi a mod-func!
|
||||
def pformat(
|
||||
self,
|
||||
field_indent: int = 2,
|
||||
indent: int = 0,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
Recursion-safe `pprint.pformat()` style formatting of
|
||||
a `msgspec.Struct` for sane reading by a human using a REPL.
|
||||
|
||||
'''
|
||||
# global whitespace indent
|
||||
ws: str = ' '*indent
|
||||
|
||||
# field whitespace indent
|
||||
field_ws: str = ' '*(field_indent + indent)
|
||||
|
||||
# qtn: str = ws + self.__class__.__qualname__
|
||||
qtn: str = self.__class__.__qualname__
|
||||
|
||||
obj_str: str = '' # accumulator
|
||||
fi: structs.FieldInfo
|
||||
k: str
|
||||
v: Any
|
||||
for fi, k, v in iter_fields(self):
|
||||
|
||||
# TODO: how can we prefer `Literal['option1', 'option2,
|
||||
# ..]` over .__name__ == `Literal` but still get only the
|
||||
# latter for simple types like `str | int | None` etc..?
|
||||
ft: type = fi.type
|
||||
typ_name: str = getattr(ft, '__name__', str(ft))
|
||||
|
||||
# recurse to get sub-struct's `.pformat()` output Bo
|
||||
if isinstance(v, Struct):
|
||||
val_str: str = v.pformat(
|
||||
indent=field_indent + indent,
|
||||
field_indent=indent + field_indent,
|
||||
)
|
||||
|
||||
else: # the `pprint` recursion-safe format:
|
||||
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
||||
val_str: str = saferepr(v)
|
||||
|
||||
# TODO: LOLOL use `textwrap.indent()` instead dawwwwwg!
|
||||
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
||||
|
||||
return (
|
||||
f'{qtn}(\n'
|
||||
f'{obj_str}'
|
||||
f'{ws})'
|
||||
)
|
||||
|
||||
pformat = pformat
|
||||
# __str__ = __repr__ = pformat
|
||||
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
|
||||
# inside a known tty?
|
||||
# def __repr__(self) -> str:
|
||||
# ...
|
||||
|
||||
# __str__ = __repr__ = pformat
|
||||
__repr__ = pformat
|
||||
|
||||
def copy(
|
||||
|
|
Loading…
Reference in New Issue