Compare commits
4 Commits
cf48fdecfe
...
af013912ac
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | af013912ac | |
Tyler Goodlet | 8839bb06a3 | |
Tyler Goodlet | a35c1d40ab | |
Tyler Goodlet | 15549f7c26 |
|
@ -49,20 +49,21 @@ from ._exceptions import (
|
||||||
InternalError,
|
InternalError,
|
||||||
RemoteActorError,
|
RemoteActorError,
|
||||||
StreamOverrun,
|
StreamOverrun,
|
||||||
pack_error,
|
pack_from_raise,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
_raise_from_no_key_in_msg,
|
_raise_from_no_key_in_msg,
|
||||||
)
|
)
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .msg import (
|
from .msg import (
|
||||||
|
Error,
|
||||||
|
MsgType,
|
||||||
|
MsgCodec,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
Msg,
|
|
||||||
Return,
|
Return,
|
||||||
Started,
|
Started,
|
||||||
Stop,
|
Stop,
|
||||||
Yield,
|
Yield,
|
||||||
current_codec,
|
current_codec,
|
||||||
MsgCodec,
|
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
)
|
)
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
|
@ -107,8 +108,7 @@ async def _drain_to_final_msg(
|
||||||
# wait for a final context result by collecting (but
|
# wait for a final context result by collecting (but
|
||||||
# basically ignoring) any bi-dir-stream msgs still in transit
|
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||||
# from the far end.
|
# from the far end.
|
||||||
# pre_result_drained: list[dict] = []
|
pre_result_drained: list[MsgType] = []
|
||||||
pre_result_drained: list[Msg] = []
|
|
||||||
while not (
|
while not (
|
||||||
ctx.maybe_error
|
ctx.maybe_error
|
||||||
and not ctx._final_result_is_set()
|
and not ctx._final_result_is_set()
|
||||||
|
@ -168,7 +168,7 @@ async def _drain_to_final_msg(
|
||||||
|
|
||||||
# pray to the `trio` gawds that we're corrent with this
|
# pray to the `trio` gawds that we're corrent with this
|
||||||
# msg: dict = await ctx._recv_chan.receive()
|
# msg: dict = await ctx._recv_chan.receive()
|
||||||
msg: Msg = await ctx._recv_chan.receive()
|
msg: MsgType = await ctx._recv_chan.receive()
|
||||||
# always capture unexpected/non-result msgs
|
# always capture unexpected/non-result msgs
|
||||||
pre_result_drained.append(msg)
|
pre_result_drained.append(msg)
|
||||||
|
|
||||||
|
@ -191,13 +191,12 @@ async def _drain_to_final_msg(
|
||||||
raise
|
raise
|
||||||
|
|
||||||
match msg:
|
match msg:
|
||||||
|
|
||||||
|
# final result arrived!
|
||||||
case Return(
|
case Return(
|
||||||
cid=cid,
|
# cid=cid,
|
||||||
pld=res,
|
pld=res,
|
||||||
):
|
):
|
||||||
# try:
|
|
||||||
# ctx._result: Any = msg['return']
|
|
||||||
# ctx._result: Any = msg.pld
|
|
||||||
ctx._result: Any = res
|
ctx._result: Any = res
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Context delivered final draining msg:\n'
|
'Context delivered final draining msg:\n'
|
||||||
|
@ -210,13 +209,9 @@ async def _drain_to_final_msg(
|
||||||
# TODO: ^ we don't need it right?
|
# TODO: ^ we don't need it right?
|
||||||
break
|
break
|
||||||
|
|
||||||
# except KeyError:
|
|
||||||
# except AttributeError:
|
|
||||||
case Yield():
|
|
||||||
# if 'yield' in msg:
|
|
||||||
|
|
||||||
# far end task is still streaming to us so discard
|
# far end task is still streaming to us so discard
|
||||||
# and report per local context state.
|
# and report depending on local ctx state.
|
||||||
|
case Yield():
|
||||||
if (
|
if (
|
||||||
(ctx._stream.closed
|
(ctx._stream.closed
|
||||||
and (reason := 'stream was already closed')
|
and (reason := 'stream was already closed')
|
||||||
|
@ -257,45 +252,34 @@ async def _drain_to_final_msg(
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# stream terminated, but no result yet..
|
||||||
|
#
|
||||||
# TODO: work out edge cases here where
|
# TODO: work out edge cases here where
|
||||||
# a stream is open but the task also calls
|
# a stream is open but the task also calls
|
||||||
# this?
|
# this?
|
||||||
# -[ ] should be a runtime error if a stream is open right?
|
# -[ ] should be a runtime error if a stream is open right?
|
||||||
# Stop()
|
# Stop()
|
||||||
case Stop():
|
case Stop():
|
||||||
# elif 'stop' in msg:
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Remote stream terminated due to "stop" msg:\n\n'
|
'Remote stream terminated due to "stop" msg:\n\n'
|
||||||
f'{pformat(msg)}\n'
|
f'{pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# It's an internal error if any other msg type without
|
# remote error msg, likely already handled inside
|
||||||
# a`'cid'` field arrives here!
|
# `Context._deliver_msg()`
|
||||||
case _:
|
case Error():
|
||||||
# if not msg.get('cid'):
|
|
||||||
if not msg.cid:
|
|
||||||
raise InternalError(
|
|
||||||
'Unexpected cid-missing msg?\n\n'
|
|
||||||
f'{msg}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX fallthrough to handle expected error XXX
|
# TODO: can we replace this with `ctx.maybe_raise()`?
|
||||||
# TODO: replace this with `ctx.maybe_raise()`
|
# -[ ] would this be handier for this case maybe?
|
||||||
#
|
|
||||||
# TODO: would this be handier for this case maybe?
|
|
||||||
# async with maybe_raise_on_exit() as raises:
|
# async with maybe_raise_on_exit() as raises:
|
||||||
# if raises:
|
# if raises:
|
||||||
# log.error('some msg about raising..')
|
# log.error('some msg about raising..')
|
||||||
|
#
|
||||||
re: Exception|None = ctx._remote_error
|
re: Exception|None = ctx._remote_error
|
||||||
if re:
|
if re:
|
||||||
log.critical(
|
|
||||||
'Remote ctx terminated due to "error" msg:\n'
|
|
||||||
f'{re}'
|
|
||||||
)
|
|
||||||
assert msg is ctx._cancel_msg
|
assert msg is ctx._cancel_msg
|
||||||
# NOTE: this solved a super dupe edge case XD
|
# NOTE: this solved a super duper edge case XD
|
||||||
# this was THE super duper edge case of:
|
# this was THE super duper edge case of:
|
||||||
# - local task opens a remote task,
|
# - local task opens a remote task,
|
||||||
# - requests remote cancellation of far end
|
# - requests remote cancellation of far end
|
||||||
|
@ -312,9 +296,10 @@ async def _drain_to_final_msg(
|
||||||
# does not re-raise any ctxc it receives
|
# does not re-raise any ctxc it receives
|
||||||
# IFF **it** was the cancellation
|
# IFF **it** was the cancellation
|
||||||
# requester..
|
# requester..
|
||||||
# will raise if necessary, ow break from
|
#
|
||||||
# loop presuming any error terminates the
|
# XXX will raise if necessary but ow break
|
||||||
# context!
|
# from loop presuming any supressed error
|
||||||
|
# (ctxc) should terminate the context!
|
||||||
ctx._maybe_raise_remote_err(
|
ctx._maybe_raise_remote_err(
|
||||||
re,
|
re,
|
||||||
# NOTE: obvi we don't care if we
|
# NOTE: obvi we don't care if we
|
||||||
|
@ -338,6 +323,7 @@ async def _drain_to_final_msg(
|
||||||
log.critical('SHOULD NEVER GET HERE!?')
|
log.critical('SHOULD NEVER GET HERE!?')
|
||||||
assert msg is ctx._cancel_msg
|
assert msg is ctx._cancel_msg
|
||||||
assert error.msgdata == ctx._remote_error.msgdata
|
assert error.msgdata == ctx._remote_error.msgdata
|
||||||
|
assert error.ipc_msg == ctx._remote_error.ipc_msg
|
||||||
from .devx._debug import pause
|
from .devx._debug import pause
|
||||||
await pause()
|
await pause()
|
||||||
ctx._maybe_cancel_and_set_remote_error(error)
|
ctx._maybe_cancel_and_set_remote_error(error)
|
||||||
|
@ -346,6 +332,20 @@ async def _drain_to_final_msg(
|
||||||
else:
|
else:
|
||||||
# bubble the original src key error
|
# bubble the original src key error
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
# XXX should pretty much never get here unless someone
|
||||||
|
# overrides the default `MsgType` spec.
|
||||||
|
case _:
|
||||||
|
# It's definitely an internal error if any other
|
||||||
|
# msg type without a`'cid'` field arrives here!
|
||||||
|
if not msg.cid:
|
||||||
|
raise InternalError(
|
||||||
|
'Unexpected cid-missing msg?\n\n'
|
||||||
|
f'{msg}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
raise RuntimeError('Unknown msg type: {msg}')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Skipping `MsgStream` drain since final outcome is set\n\n'
|
'Skipping `MsgStream` drain since final outcome is set\n\n'
|
||||||
|
@ -1342,8 +1342,11 @@ class Context:
|
||||||
# `._cancel_called == True`.
|
# `._cancel_called == True`.
|
||||||
not raise_overrun_from_self
|
not raise_overrun_from_self
|
||||||
and isinstance(remote_error, RemoteActorError)
|
and isinstance(remote_error, RemoteActorError)
|
||||||
and remote_error.msgdata['boxed_type_str'] == 'StreamOverrun'
|
|
||||||
and tuple(remote_error.msgdata['sender']) == our_uid
|
and remote_error.boxed_type_str == 'StreamOverrun'
|
||||||
|
|
||||||
|
# and tuple(remote_error.msgdata['sender']) == our_uid
|
||||||
|
and tuple(remote_error.sender) == our_uid
|
||||||
):
|
):
|
||||||
# NOTE: we set the local scope error to any "self
|
# NOTE: we set the local scope error to any "self
|
||||||
# cancellation" error-response thus "absorbing"
|
# cancellation" error-response thus "absorbing"
|
||||||
|
@ -1412,16 +1415,11 @@ class Context:
|
||||||
|
|
||||||
assert self._recv_chan
|
assert self._recv_chan
|
||||||
raise_overrun: bool = not self._allow_overruns
|
raise_overrun: bool = not self._allow_overruns
|
||||||
# res_placeholder: int = id(self)
|
|
||||||
if (
|
if (
|
||||||
# self._result == res_placeholder
|
|
||||||
# and not self._remote_error
|
|
||||||
self.maybe_error is None
|
self.maybe_error is None
|
||||||
# not self._remote_error
|
and
|
||||||
# and not self._local_error
|
not self._recv_chan._closed # type: ignore
|
||||||
and not self._recv_chan._closed # type: ignore
|
|
||||||
):
|
):
|
||||||
|
|
||||||
# wait for a final context result/error by "draining"
|
# wait for a final context result/error by "draining"
|
||||||
# (by more or less ignoring) any bi-dir-stream "yield"
|
# (by more or less ignoring) any bi-dir-stream "yield"
|
||||||
# msgs still in transit from the far end.
|
# msgs still in transit from the far end.
|
||||||
|
@ -1432,7 +1430,6 @@ class Context:
|
||||||
for msg in drained_msgs:
|
for msg in drained_msgs:
|
||||||
|
|
||||||
# TODO: mask this by default..
|
# TODO: mask this by default..
|
||||||
# if 'return' in msg:
|
|
||||||
if isinstance(msg, Return):
|
if isinstance(msg, Return):
|
||||||
# from .devx import pause
|
# from .devx import pause
|
||||||
# await pause()
|
# await pause()
|
||||||
|
@ -1448,6 +1445,9 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
self.maybe_raise(
|
self.maybe_raise(
|
||||||
|
# NOTE: obvi we don't care if we
|
||||||
|
# overran the far end if we're already
|
||||||
|
# waiting on a final result (msg).
|
||||||
raise_overrun_from_self=(
|
raise_overrun_from_self=(
|
||||||
raise_overrun
|
raise_overrun
|
||||||
and
|
and
|
||||||
|
@ -1458,34 +1458,12 @@ class Context:
|
||||||
(not self._cancel_called)
|
(not self._cancel_called)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# if (
|
|
||||||
# (re := self._remote_error)
|
|
||||||
# # and self._result == res_placeholder
|
|
||||||
# ):
|
|
||||||
# self._maybe_raise_remote_err(
|
|
||||||
# re,
|
|
||||||
# # NOTE: obvi we don't care if we
|
|
||||||
# # overran the far end if we're already
|
|
||||||
# # waiting on a final result (msg).
|
|
||||||
# # raise_overrun_from_self=False,
|
|
||||||
# raise_overrun_from_self=(
|
|
||||||
# raise_overrun
|
|
||||||
# and
|
|
||||||
# # only when we ARE NOT the canceller
|
|
||||||
# # should we raise overruns, bc ow we're
|
|
||||||
# # raising something we know might happen
|
|
||||||
# # during cancellation ;)
|
|
||||||
# (not self._cancel_called)
|
|
||||||
# ),
|
|
||||||
# )
|
|
||||||
# if maybe_err:
|
|
||||||
# self._result = maybe_err
|
|
||||||
|
|
||||||
return self.outcome
|
return self.outcome
|
||||||
|
|
||||||
# TODO: switch this with above which should be named
|
# TODO: switch this with above!
|
||||||
# `.wait_for_outcome()` and instead do
|
# -[ ] should be named `.wait_for_outcome()` and instead do
|
||||||
# a `.outcome.Outcome.unwrap()` ?
|
# a `.outcome.Outcome.unwrap()` ?
|
||||||
|
#
|
||||||
# @property
|
# @property
|
||||||
# def result(self) -> Any|None:
|
# def result(self) -> Any|None:
|
||||||
# if self._final_result_is_set():
|
# if self._final_result_is_set():
|
||||||
|
@ -1544,7 +1522,6 @@ class Context:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _final_result_is_set(self) -> bool:
|
def _final_result_is_set(self) -> bool:
|
||||||
# return not (self._result == id(self))
|
|
||||||
return self._result is not Unresolved
|
return self._result is not Unresolved
|
||||||
|
|
||||||
# def get_result_nowait(self) -> Any|None:
|
# def get_result_nowait(self) -> Any|None:
|
||||||
|
@ -1761,8 +1738,7 @@ class Context:
|
||||||
|
|
||||||
async def _deliver_msg(
|
async def _deliver_msg(
|
||||||
self,
|
self,
|
||||||
# msg: dict,
|
msg: MsgType,
|
||||||
msg: Msg,
|
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
|
@ -1776,6 +1752,20 @@ class Context:
|
||||||
`._scope_nursery: trio.Nursery`) which ensures that such
|
`._scope_nursery: trio.Nursery`) which ensures that such
|
||||||
messages are queued up and eventually sent if possible.
|
messages are queued up and eventually sent if possible.
|
||||||
|
|
||||||
|
XXX RULES XXX
|
||||||
|
------ - ------
|
||||||
|
- NEVER raise remote errors from this method; a runtime task caller.
|
||||||
|
An error "delivered" to a ctx should always be raised by
|
||||||
|
the corresponding local task operating on the
|
||||||
|
`Portal`/`Context` APIs.
|
||||||
|
|
||||||
|
- NEVER `return` early before delivering the msg!
|
||||||
|
bc if the error is a ctxc and there is a task waiting on
|
||||||
|
`.result()` we need the msg to be
|
||||||
|
`send_chan.send_nowait()`-ed over the `._recv_chan` so
|
||||||
|
that the error is relayed to that waiter task and thus
|
||||||
|
raised in user code!
|
||||||
|
|
||||||
'''
|
'''
|
||||||
cid: str = self.cid
|
cid: str = self.cid
|
||||||
chan: Channel = self.chan
|
chan: Channel = self.chan
|
||||||
|
@ -1806,28 +1796,14 @@ class Context:
|
||||||
)
|
)
|
||||||
self._cancel_msg: dict = msg
|
self._cancel_msg: dict = msg
|
||||||
|
|
||||||
# NOTE: this will not raise an error, merely set
|
# XXX NOTE: this will not raise an error, merely set
|
||||||
# `._remote_error` and maybe cancel any task currently
|
# `._remote_error` and maybe cancel any task currently
|
||||||
# entered in `Portal.open_context()` presuming the
|
# entered in `Portal.open_context()` presuming the
|
||||||
# error is "cancel causing" (i.e. a `ContextCancelled`
|
# error is "cancel causing" (i.e. a `ContextCancelled`
|
||||||
# or `RemoteActorError`).
|
# or `RemoteActorError`).
|
||||||
self._maybe_cancel_and_set_remote_error(re)
|
self._maybe_cancel_and_set_remote_error(re)
|
||||||
|
|
||||||
# XXX NEVER do this XXX..!!
|
# XXX only case where returning early is fine!
|
||||||
# bc if the error is a ctxc and there is a task
|
|
||||||
# waiting on `.result()` we need the msg to be sent
|
|
||||||
# over the `send_chan`/`._recv_chan` so that the error
|
|
||||||
# is relayed to that waiter task..
|
|
||||||
# return True
|
|
||||||
#
|
|
||||||
# XXX ALSO NO!! XXX
|
|
||||||
# => NEVER raise remote errors from the calling
|
|
||||||
# runtime task, they should always be raised by
|
|
||||||
# consumer side tasks operating on the
|
|
||||||
# `Portal`/`Context` APIs.
|
|
||||||
# if self._remote_error:
|
|
||||||
# self._maybe_raise_remote_err(error)
|
|
||||||
|
|
||||||
if self._in_overrun:
|
if self._in_overrun:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Queueing OVERRUN msg on caller task:\n'
|
f'Queueing OVERRUN msg on caller task:\n'
|
||||||
|
@ -1946,17 +1922,13 @@ class Context:
|
||||||
# anything different.
|
# anything different.
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
# txt += f'\n{msg}\n'
|
|
||||||
# raise local overrun and immediately pack as IPC
|
# raise local overrun and immediately pack as IPC
|
||||||
# msg for far end.
|
# msg for far end.
|
||||||
try:
|
err_msg: Error = pack_from_raise(
|
||||||
raise StreamOverrun(
|
local_err=StreamOverrun(
|
||||||
txt,
|
txt,
|
||||||
sender=from_uid,
|
sender=from_uid,
|
||||||
)
|
),
|
||||||
except StreamOverrun as err:
|
|
||||||
err_msg: dict[str, dict] = pack_error(
|
|
||||||
err,
|
|
||||||
cid=cid,
|
cid=cid,
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
|
@ -1964,9 +1936,9 @@ class Context:
|
||||||
await chan.send(err_msg)
|
await chan.send(err_msg)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
# XXX: local consumer has closed their side of
|
||||||
|
# the IPC so cancel the far end streaming task
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
# XXX: local consumer has closed their side
|
|
||||||
# so cancel the far end streaming task
|
|
||||||
log.warning(
|
log.warning(
|
||||||
'Channel for ctx is already closed?\n'
|
'Channel for ctx is already closed?\n'
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
|
@ -2379,28 +2351,17 @@ async def open_context_from_portal(
|
||||||
# an exception type boxed in a `RemoteActorError`
|
# an exception type boxed in a `RemoteActorError`
|
||||||
# is returned (meaning it was obvi not raised)
|
# is returned (meaning it was obvi not raised)
|
||||||
# that we want to log-report on.
|
# that we want to log-report on.
|
||||||
msgdata: str|None = getattr(
|
match result_or_err:
|
||||||
result_or_err,
|
case ContextCancelled() as ctxc:
|
||||||
'msgdata',
|
log.cancel(ctxc.tb_str)
|
||||||
None
|
|
||||||
)
|
|
||||||
match (msgdata, result_or_err):
|
|
||||||
case (
|
|
||||||
{'tb_str': tbstr},
|
|
||||||
ContextCancelled(),
|
|
||||||
):
|
|
||||||
log.cancel(tbstr)
|
|
||||||
|
|
||||||
case (
|
case RemoteActorError() as rae:
|
||||||
{'tb_str': tbstr},
|
|
||||||
RemoteActorError(),
|
|
||||||
):
|
|
||||||
log.exception(
|
log.exception(
|
||||||
'Context remotely errored!\n'
|
'Context remotely errored!\n'
|
||||||
f'<= peer: {uid}\n'
|
f'<= peer: {uid}\n'
|
||||||
f' |_ {nsf}()\n\n'
|
f' |_ {nsf}()\n\n'
|
||||||
|
|
||||||
f'{tbstr}'
|
f'{rae.tb_str}'
|
||||||
)
|
)
|
||||||
case (None, _):
|
case (None, _):
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
@ -2410,7 +2371,6 @@ async def open_context_from_portal(
|
||||||
|
|
||||||
f'`{result_or_err}`\n'
|
f'`{result_or_err}`\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||||
# wait for any immediate child in debug before popping the
|
# wait for any immediate child in debug before popping the
|
||||||
|
|
123
tractor/_ipc.py
123
tractor/_ipc.py
|
@ -38,7 +38,6 @@ from typing import (
|
||||||
Protocol,
|
Protocol,
|
||||||
Type,
|
Type,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
Union,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
import msgspec
|
import msgspec
|
||||||
|
@ -47,8 +46,9 @@ import trio
|
||||||
|
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
TransportClosed,
|
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
|
pack_from_raise,
|
||||||
|
TransportClosed,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
_ctxvar_MsgCodec,
|
_ctxvar_MsgCodec,
|
||||||
|
@ -118,17 +118,24 @@ class MsgTransport(Protocol[MsgType]):
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
def _raise_msg_type_err(
|
def _mk_msg_type_err(
|
||||||
msg: Any|bytes,
|
msg: Any|bytes,
|
||||||
codec: MsgCodec,
|
codec: MsgCodec,
|
||||||
validation_err: msgspec.ValidationError|None = None,
|
|
||||||
|
message: str|None = None,
|
||||||
verb_header: str = '',
|
verb_header: str = '',
|
||||||
|
|
||||||
) -> None:
|
src_validation_error: msgspec.ValidationError|None = None,
|
||||||
|
src_type_error: TypeError|None = None,
|
||||||
|
|
||||||
# if side == 'send':
|
) -> MsgTypeError:
|
||||||
if validation_err is None: # send-side
|
|
||||||
|
|
||||||
|
# `Channel.send()` case
|
||||||
|
if src_validation_error is None: # send-side
|
||||||
|
|
||||||
|
# no src error from `msgspec.msgpack.Decoder.decode()` so
|
||||||
|
# prolly a manual type-check on our part.
|
||||||
|
if message is None:
|
||||||
import traceback
|
import traceback
|
||||||
from tractor._exceptions import pformat_boxed_tb
|
from tractor._exceptions import pformat_boxed_tb
|
||||||
|
|
||||||
|
@ -144,14 +151,42 @@ def _raise_msg_type_err(
|
||||||
field_prefix=' ',
|
field_prefix=' ',
|
||||||
indent='',
|
indent='',
|
||||||
)
|
)
|
||||||
raise MsgTypeError(
|
message: str = (
|
||||||
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
f'invalid msg -> {msg}: {type(msg)}\n\n'
|
||||||
f'{tb_fmt}\n'
|
f'{tb_fmt}\n'
|
||||||
f'Valid IPC msgs are:\n\n'
|
f'Valid IPC msgs are:\n\n'
|
||||||
# f' ------ - ------\n'
|
# f' ------ - ------\n'
|
||||||
f'{fmt_spec}\n'
|
f'{fmt_spec}\n',
|
||||||
|
)
|
||||||
|
elif src_type_error:
|
||||||
|
src_message: str = str(src_type_error)
|
||||||
|
patt: str = 'type '
|
||||||
|
type_idx: int = src_message.find('type ')
|
||||||
|
invalid_type: str = src_message[type_idx + len(patt):].split()[0]
|
||||||
|
|
||||||
|
enc_hook: Callable|None = codec.enc.enc_hook
|
||||||
|
if enc_hook is None:
|
||||||
|
message += (
|
||||||
|
'\n\n'
|
||||||
|
|
||||||
|
f"The current IPC-msg codec can't encode type `{invalid_type}` !\n"
|
||||||
|
f'Maybe a `msgpack.Encoder.enc_hook()` extension is needed?\n\n'
|
||||||
|
|
||||||
|
f'Check the `msgspec` docs for ad-hoc type extending:\n'
|
||||||
|
'|_ https://jcristharif.com/msgspec/extending.html\n'
|
||||||
|
'|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
msgtyperr = MsgTypeError(
|
||||||
|
message=message,
|
||||||
|
ipc_msg=msg,
|
||||||
|
)
|
||||||
|
# ya, might be `None`
|
||||||
|
msgtyperr.__cause__ = src_type_error
|
||||||
|
return msgtyperr
|
||||||
|
|
||||||
|
# `Channel.recv()` case
|
||||||
else:
|
else:
|
||||||
# decode the msg-bytes using the std msgpack
|
# decode the msg-bytes using the std msgpack
|
||||||
# interchange-prot (i.e. without any
|
# interchange-prot (i.e. without any
|
||||||
|
@ -161,29 +196,31 @@ def _raise_msg_type_err(
|
||||||
msg_dict: dict = msgspec.msgpack.decode(msg)
|
msg_dict: dict = msgspec.msgpack.decode(msg)
|
||||||
msg_type_name: str = msg_dict['msg_type']
|
msg_type_name: str = msg_dict['msg_type']
|
||||||
msg_type = getattr(msgtypes, msg_type_name)
|
msg_type = getattr(msgtypes, msg_type_name)
|
||||||
errmsg: str = (
|
message: str = (
|
||||||
f'invalid `{msg_type_name}` IPC msg\n\n'
|
f'invalid `{msg_type_name}` IPC msg\n\n'
|
||||||
)
|
)
|
||||||
if verb_header:
|
if verb_header:
|
||||||
errmsg = f'{verb_header} ' + errmsg
|
message = f'{verb_header} ' + message
|
||||||
|
|
||||||
# XXX see if we can determine the exact invalid field
|
# XXX see if we can determine the exact invalid field
|
||||||
# such that we can comprehensively report the
|
# such that we can comprehensively report the
|
||||||
# specific field's type problem
|
# specific field's type problem
|
||||||
msgspec_msg: str = validation_err.args[0].rstrip('`')
|
msgspec_msg: str = src_validation_error.args[0].rstrip('`')
|
||||||
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
||||||
obj = object()
|
obj = object()
|
||||||
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
|
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
|
||||||
field_type: Union[Type] = msg_type.__signature__.parameters[
|
message += (
|
||||||
maybe_field
|
|
||||||
].annotation
|
|
||||||
errmsg += (
|
|
||||||
f'{msg.rstrip("`")}\n\n'
|
f'{msg.rstrip("`")}\n\n'
|
||||||
f'{msg_type}\n'
|
f'{msg_type}\n'
|
||||||
f' |_.{maybe_field}: {field_type} = {field_val!r}\n'
|
f' |_.{maybe_field}: {codec.pld_spec_str} = {field_val!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
raise MsgTypeError(errmsg) from validation_err
|
msgtyperr = MsgTypeError.from_decode(
|
||||||
|
message=message,
|
||||||
|
msgdict=msg_dict,
|
||||||
|
)
|
||||||
|
msgtyperr.__cause__ = src_validation_error
|
||||||
|
return msgtyperr
|
||||||
|
|
||||||
|
|
||||||
# TODO: not sure why we have to inherit here, but it seems to be an
|
# TODO: not sure why we have to inherit here, but it seems to be an
|
||||||
|
@ -325,12 +362,15 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
# and always raise such that spec violations
|
# and always raise such that spec violations
|
||||||
# are never allowed to be caught silently!
|
# are never allowed to be caught silently!
|
||||||
except msgspec.ValidationError as verr:
|
except msgspec.ValidationError as verr:
|
||||||
# re-raise as type error
|
msgtyperr: MsgTypeError = _mk_msg_type_err(
|
||||||
_raise_msg_type_err(
|
|
||||||
msg=msg_bytes,
|
msg=msg_bytes,
|
||||||
codec=codec,
|
codec=codec,
|
||||||
validation_err=verr,
|
src_validation_error=verr,
|
||||||
)
|
)
|
||||||
|
# XXX deliver up to `Channel.recv()` where
|
||||||
|
# a re-raise and `Error`-pack can inject the far
|
||||||
|
# end actor `.uid`.
|
||||||
|
yield msgtyperr
|
||||||
|
|
||||||
except (
|
except (
|
||||||
msgspec.DecodeError,
|
msgspec.DecodeError,
|
||||||
|
@ -387,7 +427,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
|
|
||||||
if type(msg) not in msgtypes.__msg_types__:
|
if type(msg) not in msgtypes.__msg_types__:
|
||||||
if strict_types:
|
if strict_types:
|
||||||
_raise_msg_type_err(
|
raise _mk_msg_type_err(
|
||||||
msg,
|
msg,
|
||||||
codec=codec,
|
codec=codec,
|
||||||
)
|
)
|
||||||
|
@ -400,11 +440,16 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
try:
|
try:
|
||||||
bytes_data: bytes = codec.encode(msg)
|
bytes_data: bytes = codec.encode(msg)
|
||||||
except TypeError as typerr:
|
except TypeError as typerr:
|
||||||
raise MsgTypeError(
|
msgtyperr: MsgTypeError = _mk_msg_type_err(
|
||||||
'A msg field violates the current spec\n'
|
msg,
|
||||||
f'{codec.pld_spec}\n\n'
|
codec=codec,
|
||||||
|
message=(
|
||||||
|
f'IPC-msg-spec violation in\n\n'
|
||||||
f'{pretty_struct.Struct.pformat(msg)}'
|
f'{pretty_struct.Struct.pformat(msg)}'
|
||||||
) from typerr
|
),
|
||||||
|
src_type_error=typerr,
|
||||||
|
)
|
||||||
|
raise msgtyperr from typerr
|
||||||
|
|
||||||
# supposedly the fastest says,
|
# supposedly the fastest says,
|
||||||
# https://stackoverflow.com/a/54027962
|
# https://stackoverflow.com/a/54027962
|
||||||
|
@ -719,13 +764,35 @@ class Channel:
|
||||||
assert self._transport
|
assert self._transport
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
async for item in self._transport:
|
async for msg in self._transport:
|
||||||
yield item
|
match msg:
|
||||||
|
# NOTE: if transport/interchange delivers
|
||||||
|
# a type error, we pack it with the far
|
||||||
|
# end peer `Actor.uid` and relay the
|
||||||
|
# `Error`-msg upward to the `._rpc` stack
|
||||||
|
# for normal RAE handling.
|
||||||
|
case MsgTypeError():
|
||||||
|
yield pack_from_raise(
|
||||||
|
local_err=msg,
|
||||||
|
cid=msg.cid,
|
||||||
|
|
||||||
|
# XXX we pack it here bc lower
|
||||||
|
# layers have no notion of an
|
||||||
|
# actor-id ;)
|
||||||
|
src_uid=self.uid,
|
||||||
|
)
|
||||||
|
case _:
|
||||||
|
yield msg
|
||||||
|
|
||||||
|
# TODO: if we were gonna do this it should be
|
||||||
|
# done up at the `MsgStream` layer!
|
||||||
|
#
|
||||||
# sent = yield item
|
# sent = yield item
|
||||||
# if sent is not None:
|
# if sent is not None:
|
||||||
# # optimization, passing None through all the
|
# # optimization, passing None through all the
|
||||||
# # time is pointless
|
# # time is pointless
|
||||||
# await self._transport.send(sent)
|
# await self._transport.send(sent)
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
|
|
||||||
# if not self._autorecon:
|
# if not self._autorecon:
|
||||||
|
|
|
@ -46,6 +46,7 @@ from ._state import (
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .msg import (
|
from .msg import (
|
||||||
|
Error,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
Return,
|
Return,
|
||||||
)
|
)
|
||||||
|
@ -69,8 +70,7 @@ log = get_logger(__name__)
|
||||||
# `._raise_from_no_key_in_msg()` (after tweak to
|
# `._raise_from_no_key_in_msg()` (after tweak to
|
||||||
# accept a `chan: Channel` arg) in key block!
|
# accept a `chan: Channel` arg) in key block!
|
||||||
def _unwrap_msg(
|
def _unwrap_msg(
|
||||||
# msg: dict[str, Any],
|
msg: Return|Error,
|
||||||
msg: Return,
|
|
||||||
channel: Channel,
|
channel: Channel,
|
||||||
|
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
|
|
@ -47,12 +47,13 @@ from ._context import (
|
||||||
Context,
|
Context,
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ModuleNotExposed,
|
|
||||||
is_multi_cancelled,
|
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
ModuleNotExposed,
|
||||||
|
MsgTypeError,
|
||||||
|
TransportClosed,
|
||||||
|
is_multi_cancelled,
|
||||||
pack_error,
|
pack_error,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
TransportClosed,
|
|
||||||
)
|
)
|
||||||
from .devx import (
|
from .devx import (
|
||||||
maybe_wait_for_debugger,
|
maybe_wait_for_debugger,
|
||||||
|
@ -636,7 +637,7 @@ async def _invoke(
|
||||||
# (callee) task, so relay this cancel signal to the
|
# (callee) task, so relay this cancel signal to the
|
||||||
# other side.
|
# other side.
|
||||||
ctxc = ContextCancelled(
|
ctxc = ContextCancelled(
|
||||||
msg,
|
message=msg,
|
||||||
boxed_type=trio.Cancelled,
|
boxed_type=trio.Cancelled,
|
||||||
canceller=canceller,
|
canceller=canceller,
|
||||||
)
|
)
|
||||||
|
@ -826,7 +827,12 @@ async def process_messages(
|
||||||
| Stop(cid=cid)
|
| Stop(cid=cid)
|
||||||
| Return(cid=cid)
|
| Return(cid=cid)
|
||||||
| CancelAck(cid=cid)
|
| CancelAck(cid=cid)
|
||||||
| Error(cid=cid) # RPC-task ctx specific
|
|
||||||
|
# `.cid` means RPC-ctx-task specific
|
||||||
|
| Error(cid=cid)
|
||||||
|
|
||||||
|
# recv-side `MsgType` decode violation
|
||||||
|
| MsgTypeError(cid=cid)
|
||||||
):
|
):
|
||||||
# deliver response to local caller/waiter
|
# deliver response to local caller/waiter
|
||||||
# via its per-remote-context memory channel.
|
# via its per-remote-context memory channel.
|
||||||
|
|
|
@ -49,7 +49,6 @@ from pprint import pformat
|
||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
|
||||||
Callable,
|
Callable,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
|
@ -19,7 +19,6 @@ Built-in messaging patterns, types, APIs and helpers.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from typing import (
|
from typing import (
|
||||||
Union,
|
|
||||||
TypeAlias,
|
TypeAlias,
|
||||||
)
|
)
|
||||||
from .ptr import (
|
from .ptr import (
|
||||||
|
@ -56,8 +55,9 @@ from .types import (
|
||||||
|
|
||||||
# full msg class set from above as list
|
# full msg class set from above as list
|
||||||
__msg_types__ as __msg_types__,
|
__msg_types__ as __msg_types__,
|
||||||
|
|
||||||
|
# type-alias for union of all msgs
|
||||||
|
MsgType as MsgType,
|
||||||
)
|
)
|
||||||
# TODO: use new type declaration syntax for msg-type-spec
|
|
||||||
# https://docs.python.org/3/library/typing.html#type-aliases
|
__msg_spec__: TypeAlias = MsgType
|
||||||
# https://docs.python.org/3/reference/simple_stmts.html#type
|
|
||||||
__msg_spec__: TypeAlias = Union[*__msg_types__]
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ from trio.lowlevel import (
|
||||||
from tractor.msg.pretty_struct import Struct
|
from tractor.msg.pretty_struct import Struct
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
mk_msg_spec,
|
mk_msg_spec,
|
||||||
Msg,
|
MsgType,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -87,12 +87,50 @@ class MsgCodec(Struct):
|
||||||
|
|
||||||
pld_spec: Union[Type[Struct]]|None
|
pld_spec: Union[Type[Struct]]|None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pld_spec_str(self) -> str:
|
||||||
|
spec: Union[Type]|Type = self.pld_spec
|
||||||
|
|
||||||
|
# TODO: could also use match: instead?
|
||||||
|
if getattr(spec, '__args__', False):
|
||||||
|
# `typing.Union` case
|
||||||
|
return str(spec)
|
||||||
|
else:
|
||||||
|
return spec.__name__
|
||||||
|
|
||||||
# struct type unions
|
# struct type unions
|
||||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||||
@property
|
@property
|
||||||
def msg_spec(self) -> Union[Type[Struct]]:
|
def msg_spec(self) -> Union[Type[Struct]]:
|
||||||
return self._dec.type
|
return self._dec.type
|
||||||
|
|
||||||
|
def msg_spec_items(
|
||||||
|
self,
|
||||||
|
msg: MsgType|None = None,
|
||||||
|
|
||||||
|
) -> dict[str, MsgType]|str:
|
||||||
|
|
||||||
|
msgt_table: dict[str, MsgType] = {
|
||||||
|
msgt: str(msgt)
|
||||||
|
for msgt in self.msg_spec.__args__
|
||||||
|
}
|
||||||
|
if msg:
|
||||||
|
msgt: MsgType = type(msg)
|
||||||
|
str_repr: str = msgt_table[msgt]
|
||||||
|
return {msgt: str_repr}
|
||||||
|
|
||||||
|
return msgt_table
|
||||||
|
|
||||||
|
# TODO: some way to make `pretty_struct.Struct` use this
|
||||||
|
# wrapped field over the `.msg_spec` one?
|
||||||
|
def pformat_msg_spec(
|
||||||
|
self,
|
||||||
|
msg: MsgType|None = None,
|
||||||
|
) -> str:
|
||||||
|
return '\n'.join(
|
||||||
|
self.msg_spec_items(msg=msg).values()
|
||||||
|
)
|
||||||
|
|
||||||
lib: ModuleType = msgspec
|
lib: ModuleType = msgspec
|
||||||
|
|
||||||
# TODO: a sub-decoder system as well?
|
# TODO: a sub-decoder system as well?
|
||||||
|
@ -108,7 +146,7 @@ class MsgCodec(Struct):
|
||||||
# OR
|
# OR
|
||||||
# ) = {
|
# ) = {
|
||||||
# # pre-seed decoders for std-py-type-set for use when
|
# # pre-seed decoders for std-py-type-set for use when
|
||||||
# # `Msg.pld == None|Any`.
|
# # `MsgType.pld == None|Any`.
|
||||||
# None: msgpack.Decoder(Any),
|
# None: msgpack.Decoder(Any),
|
||||||
# Any: msgpack.Decoder(Any),
|
# Any: msgpack.Decoder(Any),
|
||||||
# }
|
# }
|
||||||
|
@ -303,7 +341,7 @@ def mk_codec(
|
||||||
# by `tag_field: str` value key?
|
# by `tag_field: str` value key?
|
||||||
# payload_msg_specs: dict[
|
# payload_msg_specs: dict[
|
||||||
# str, # tag_field value as sub-decoder key
|
# str, # tag_field value as sub-decoder key
|
||||||
# Union[Type[Struct]] # `Msg.pld` type spec
|
# Union[Type[Struct]] # `MsgType.pld` type spec
|
||||||
# ]|None = None,
|
# ]|None = None,
|
||||||
|
|
||||||
libname: str = 'msgspec',
|
libname: str = 'msgspec',
|
||||||
|
@ -336,7 +374,7 @@ def mk_codec(
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f'If a payload spec is provided,\n'
|
f'If a payload spec is provided,\n'
|
||||||
"the builtin SC-shuttle-protocol's msg set\n"
|
"the builtin SC-shuttle-protocol's msg set\n"
|
||||||
f'(i.e. `{Msg}`) MUST be used!\n\n'
|
f'(i.e. a `{MsgType}`) MUST be used!\n\n'
|
||||||
f'However both values were passed as => mk_codec(\n'
|
f'However both values were passed as => mk_codec(\n'
|
||||||
f' ipc_msg_spec={ipc_msg_spec}`\n'
|
f' ipc_msg_spec={ipc_msg_spec}`\n'
|
||||||
f' ipc_pld_spec={ipc_pld_spec}`\n)\n'
|
f' ipc_pld_spec={ipc_pld_spec}`\n)\n'
|
||||||
|
|
|
@ -31,6 +31,7 @@ from typing import (
|
||||||
Literal,
|
Literal,
|
||||||
Type,
|
Type,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
|
TypeAlias,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -400,16 +401,29 @@ class CancelAck(
|
||||||
pld: bool
|
pld: bool
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: unify this with `._exceptions.RemoteActorError`
|
||||||
|
# such that we can have a msg which is both raisable and
|
||||||
|
# IPC-wire ready?
|
||||||
|
# B~o
|
||||||
class Error(
|
class Error(
|
||||||
Struct,
|
Struct,
|
||||||
tag=True,
|
tag=True,
|
||||||
tag_field='msg_type',
|
tag_field='msg_type',
|
||||||
|
|
||||||
|
# TODO may omit defaults?
|
||||||
|
# https://jcristharif.com/msgspec/structs.html#omitting-default-values
|
||||||
|
# omit_defaults=True,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
A pkt that wraps `RemoteActorError`s for relay and raising.
|
A pkt that wraps `RemoteActorError`s for relay and raising.
|
||||||
|
|
||||||
Fields are 1-to-1 meta-data as needed originally by
|
Fields are 1-to-1 meta-data as needed originally by
|
||||||
`RemoteActorError.msgdata: dict`.
|
`RemoteActorError.msgdata: dict` but now are defined here.
|
||||||
|
|
||||||
|
Note: this msg shuttles `ContextCancelled` and `StreamOverrun`
|
||||||
|
as well is used to rewrap any `MsgTypeError` for relay-reponse
|
||||||
|
to bad `Yield.pld` senders during an IPC ctx's streaming dialog
|
||||||
|
phase.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
src_uid: tuple[str, str]
|
src_uid: tuple[str, str]
|
||||||
|
@ -428,6 +442,10 @@ class Error(
|
||||||
# `StreamOverrun`
|
# `StreamOverrun`
|
||||||
sender: tuple[str, str]|None = None
|
sender: tuple[str, str]|None = None
|
||||||
|
|
||||||
|
# for the `MsgTypeError` case where the receiver side
|
||||||
|
# decodes the underlying original `Msg`-subtype
|
||||||
|
_msg_dict: dict|None = None
|
||||||
|
|
||||||
|
|
||||||
# TODO: should be make a msg version of `ContextCancelled?`
|
# TODO: should be make a msg version of `ContextCancelled?`
|
||||||
# and/or with a scope field or a full `ActorCancelled`?
|
# and/or with a scope field or a full `ActorCancelled`?
|
||||||
|
@ -486,6 +504,11 @@ __msg_types__: list[Msg] = (
|
||||||
_payload_msgs
|
_payload_msgs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: use new type declaration syntax for msg-type-spec
|
||||||
|
# https://docs.python.org/3/library/typing.html#type-aliases
|
||||||
|
# https://docs.python.org/3/reference/simple_stmts.html#type
|
||||||
|
MsgType: TypeAlias = Union[*__msg_types__]
|
||||||
|
|
||||||
|
|
||||||
def mk_msg_spec(
|
def mk_msg_spec(
|
||||||
payload_type_union: Union[Type] = Any,
|
payload_type_union: Union[Type] = Any,
|
||||||
|
|
Loading…
Reference in New Issue