Compare commits
11 Commits
eec240a70a
...
b209990d04
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | b209990d04 | |
Tyler Goodlet | 60aa16adf6 | |
Tyler Goodlet | eca2c02f8b | |
Tyler Goodlet | 921f72f7fe | |
Tyler Goodlet | 38a6483859 | |
Tyler Goodlet | f72b972348 | |
Tyler Goodlet | 2edfed75eb | |
Tyler Goodlet | 2d22713806 | |
Tyler Goodlet | df548257ad | |
Tyler Goodlet | 3fb3608879 | |
Tyler Goodlet | faa7194daf |
|
@ -38,6 +38,7 @@ async def main():
|
||||||
"""
|
"""
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
|
# loglevel='runtime',
|
||||||
) as n:
|
) as n:
|
||||||
|
|
||||||
# Spawn both actors, don't bother with collecting results
|
# Spawn both actors, don't bother with collecting results
|
||||||
|
|
|
@ -3,17 +3,20 @@ import tractor
|
||||||
|
|
||||||
|
|
||||||
async def breakpoint_forever():
|
async def breakpoint_forever():
|
||||||
"""Indefinitely re-enter debugger in child actor.
|
'''
|
||||||
"""
|
Indefinitely re-enter debugger in child actor.
|
||||||
|
|
||||||
|
'''
|
||||||
while True:
|
while True:
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
await tractor.breakpoint()
|
await tractor.pause()
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
|
loglevel='cancel',
|
||||||
) as n:
|
) as n:
|
||||||
|
|
||||||
portal = await n.run_in_actor(
|
portal = await n.run_in_actor(
|
||||||
|
|
|
@ -246,10 +246,10 @@ def test_simple_context(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
except error_parent:
|
except error_parent:
|
||||||
pass
|
pass
|
||||||
except trio.MultiError as me:
|
except BaseExceptionGroup as beg:
|
||||||
# XXX: on windows it seems we may have to expect the group error
|
# XXX: on windows it seems we may have to expect the group error
|
||||||
from tractor._exceptions import is_multi_cancelled
|
from tractor._exceptions import is_multi_cancelled
|
||||||
assert is_multi_cancelled(me)
|
assert is_multi_cancelled(beg)
|
||||||
else:
|
else:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
|
@ -38,10 +38,13 @@ async def async_gen_stream(sequence):
|
||||||
assert cs.cancelled_caught
|
assert cs.cancelled_caught
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: deprecated either remove entirely
|
||||||
|
# or re-impl in terms of `MsgStream` one-sides
|
||||||
|
# wrapper, but at least remove `Portal.open_stream_from()`
|
||||||
@tractor.stream
|
@tractor.stream
|
||||||
async def context_stream(
|
async def context_stream(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
sequence
|
sequence: list[int],
|
||||||
):
|
):
|
||||||
for i in sequence:
|
for i in sequence:
|
||||||
await ctx.send_yield(i)
|
await ctx.send_yield(i)
|
||||||
|
|
|
@ -36,6 +36,7 @@ def parse_ipaddr(arg):
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
__tracebackhide__: bool = True
|
||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument("--uid", type=parse_uid)
|
parser.add_argument("--uid", type=parse_uid)
|
||||||
|
|
|
@ -47,6 +47,7 @@ import trio
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
InternalError,
|
InternalError,
|
||||||
|
MsgTypeError,
|
||||||
RemoteActorError,
|
RemoteActorError,
|
||||||
StreamOverrun,
|
StreamOverrun,
|
||||||
pack_from_raise,
|
pack_from_raise,
|
||||||
|
@ -59,12 +60,14 @@ from .msg import (
|
||||||
MsgType,
|
MsgType,
|
||||||
MsgCodec,
|
MsgCodec,
|
||||||
NamespacePath,
|
NamespacePath,
|
||||||
|
PayloadT,
|
||||||
Return,
|
Return,
|
||||||
Started,
|
Started,
|
||||||
Stop,
|
Stop,
|
||||||
Yield,
|
Yield,
|
||||||
current_codec,
|
current_codec,
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
|
types as msgtypes,
|
||||||
)
|
)
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._streaming import MsgStream
|
from ._streaming import MsgStream
|
||||||
|
@ -88,7 +91,10 @@ async def _drain_to_final_msg(
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
msg_limit: int = 6,
|
msg_limit: int = 6,
|
||||||
|
|
||||||
) -> list[dict]:
|
) -> tuple[
|
||||||
|
Return|None,
|
||||||
|
list[MsgType]
|
||||||
|
]:
|
||||||
'''
|
'''
|
||||||
Drain IPC msgs delivered to the underlying rx-mem-chan
|
Drain IPC msgs delivered to the underlying rx-mem-chan
|
||||||
`Context._recv_chan` from the runtime in search for a final
|
`Context._recv_chan` from the runtime in search for a final
|
||||||
|
@ -109,6 +115,7 @@ async def _drain_to_final_msg(
|
||||||
# basically ignoring) any bi-dir-stream msgs still in transit
|
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||||
# from the far end.
|
# from the far end.
|
||||||
pre_result_drained: list[MsgType] = []
|
pre_result_drained: list[MsgType] = []
|
||||||
|
return_msg: Return|None = None
|
||||||
while not (
|
while not (
|
||||||
ctx.maybe_error
|
ctx.maybe_error
|
||||||
and not ctx._final_result_is_set()
|
and not ctx._final_result_is_set()
|
||||||
|
@ -169,8 +176,6 @@ async def _drain_to_final_msg(
|
||||||
# pray to the `trio` gawds that we're corrent with this
|
# pray to the `trio` gawds that we're corrent with this
|
||||||
# msg: dict = await ctx._recv_chan.receive()
|
# msg: dict = await ctx._recv_chan.receive()
|
||||||
msg: MsgType = await ctx._recv_chan.receive()
|
msg: MsgType = await ctx._recv_chan.receive()
|
||||||
# always capture unexpected/non-result msgs
|
|
||||||
pre_result_drained.append(msg)
|
|
||||||
|
|
||||||
# NOTE: we get here if the far end was
|
# NOTE: we get here if the far end was
|
||||||
# `ContextCancelled` in 2 cases:
|
# `ContextCancelled` in 2 cases:
|
||||||
|
@ -207,11 +212,13 @@ async def _drain_to_final_msg(
|
||||||
# if ctx._recv_chan:
|
# if ctx._recv_chan:
|
||||||
# await ctx._recv_chan.aclose()
|
# await ctx._recv_chan.aclose()
|
||||||
# TODO: ^ we don't need it right?
|
# TODO: ^ we don't need it right?
|
||||||
|
return_msg = msg
|
||||||
break
|
break
|
||||||
|
|
||||||
# far end task is still streaming to us so discard
|
# far end task is still streaming to us so discard
|
||||||
# and report depending on local ctx state.
|
# and report depending on local ctx state.
|
||||||
case Yield():
|
case Yield():
|
||||||
|
pre_result_drained.append(msg)
|
||||||
if (
|
if (
|
||||||
(ctx._stream.closed
|
(ctx._stream.closed
|
||||||
and (reason := 'stream was already closed')
|
and (reason := 'stream was already closed')
|
||||||
|
@ -236,7 +243,10 @@ async def _drain_to_final_msg(
|
||||||
|
|
||||||
f'{pformat(msg)}\n'
|
f'{pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
return pre_result_drained
|
return (
|
||||||
|
return_msg,
|
||||||
|
pre_result_drained,
|
||||||
|
)
|
||||||
|
|
||||||
# drain up to the `msg_limit` hoping to get
|
# drain up to the `msg_limit` hoping to get
|
||||||
# a final result or error/ctxc.
|
# a final result or error/ctxc.
|
||||||
|
@ -260,6 +270,7 @@ async def _drain_to_final_msg(
|
||||||
# -[ ] should be a runtime error if a stream is open right?
|
# -[ ] should be a runtime error if a stream is open right?
|
||||||
# Stop()
|
# Stop()
|
||||||
case Stop():
|
case Stop():
|
||||||
|
pre_result_drained.append(msg)
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Remote stream terminated due to "stop" msg:\n\n'
|
'Remote stream terminated due to "stop" msg:\n\n'
|
||||||
f'{pformat(msg)}\n'
|
f'{pformat(msg)}\n'
|
||||||
|
@ -269,7 +280,6 @@ async def _drain_to_final_msg(
|
||||||
# remote error msg, likely already handled inside
|
# remote error msg, likely already handled inside
|
||||||
# `Context._deliver_msg()`
|
# `Context._deliver_msg()`
|
||||||
case Error():
|
case Error():
|
||||||
|
|
||||||
# TODO: can we replace this with `ctx.maybe_raise()`?
|
# TODO: can we replace this with `ctx.maybe_raise()`?
|
||||||
# -[ ] would this be handier for this case maybe?
|
# -[ ] would this be handier for this case maybe?
|
||||||
# async with maybe_raise_on_exit() as raises:
|
# async with maybe_raise_on_exit() as raises:
|
||||||
|
@ -336,6 +346,7 @@ async def _drain_to_final_msg(
|
||||||
# XXX should pretty much never get here unless someone
|
# XXX should pretty much never get here unless someone
|
||||||
# overrides the default `MsgType` spec.
|
# overrides the default `MsgType` spec.
|
||||||
case _:
|
case _:
|
||||||
|
pre_result_drained.append(msg)
|
||||||
# It's definitely an internal error if any other
|
# It's definitely an internal error if any other
|
||||||
# msg type without a`'cid'` field arrives here!
|
# msg type without a`'cid'` field arrives here!
|
||||||
if not msg.cid:
|
if not msg.cid:
|
||||||
|
@ -352,7 +363,10 @@ async def _drain_to_final_msg(
|
||||||
f'{ctx.outcome}\n'
|
f'{ctx.outcome}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
return pre_result_drained
|
return (
|
||||||
|
return_msg,
|
||||||
|
pre_result_drained,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class Unresolved:
|
class Unresolved:
|
||||||
|
@ -719,21 +733,36 @@ class Context:
|
||||||
Return string indicating which task this instance is wrapping.
|
Return string indicating which task this instance is wrapping.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return 'caller' if self._portal else 'callee'
|
return 'parent' if self._portal else 'child'
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def peer_side(side: str) -> str:
|
||||||
|
match side:
|
||||||
|
case 'child':
|
||||||
|
return 'parent'
|
||||||
|
case 'parent':
|
||||||
|
return 'child'
|
||||||
|
|
||||||
|
# TODO: remove stat!
|
||||||
|
# -[ ] re-implement the `.experiemental._pubsub` stuff
|
||||||
|
# with `MsgStream` and that should be last usage?
|
||||||
|
# -[ ] remove from `tests/legacy_one_way_streaming.py`!
|
||||||
async def send_yield(
|
async def send_yield(
|
||||||
self,
|
self,
|
||||||
data: Any,
|
data: Any,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''
|
||||||
|
Deprecated method for what now is implemented in `MsgStream`.
|
||||||
|
|
||||||
|
We need to rework / remove some stuff tho, see above.
|
||||||
|
|
||||||
|
'''
|
||||||
warnings.warn(
|
warnings.warn(
|
||||||
"`Context.send_yield()` is now deprecated. "
|
"`Context.send_yield()` is now deprecated. "
|
||||||
"Use ``MessageStream.send()``. ",
|
"Use ``MessageStream.send()``. ",
|
||||||
DeprecationWarning,
|
DeprecationWarning,
|
||||||
stacklevel=2,
|
stacklevel=2,
|
||||||
)
|
)
|
||||||
# await self.chan.send({'yield': data, 'cid': self.cid})
|
|
||||||
await self.chan.send(
|
await self.chan.send(
|
||||||
Yield(
|
Yield(
|
||||||
cid=self.cid,
|
cid=self.cid,
|
||||||
|
@ -742,12 +771,11 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
async def send_stop(self) -> None:
|
async def send_stop(self) -> None:
|
||||||
# await pause()
|
'''
|
||||||
# await self.chan.send({
|
Terminate a `MsgStream` dialog-phase by sending the IPC
|
||||||
# # Stop(
|
equiv of a `StopIteration`.
|
||||||
# 'stop': True,
|
|
||||||
# 'cid': self.cid
|
'''
|
||||||
# })
|
|
||||||
await self.chan.send(
|
await self.chan.send(
|
||||||
Stop(cid=self.cid)
|
Stop(cid=self.cid)
|
||||||
)
|
)
|
||||||
|
@ -843,6 +871,7 @@ class Context:
|
||||||
|
|
||||||
# self-cancel (ack) or,
|
# self-cancel (ack) or,
|
||||||
# peer propagated remote cancellation.
|
# peer propagated remote cancellation.
|
||||||
|
msgtyperr: bool = False
|
||||||
if isinstance(error, ContextCancelled):
|
if isinstance(error, ContextCancelled):
|
||||||
|
|
||||||
whom: str = (
|
whom: str = (
|
||||||
|
@ -854,6 +883,16 @@ class Context:
|
||||||
f'{error}'
|
f'{error}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
elif isinstance(error, MsgTypeError):
|
||||||
|
msgtyperr = True
|
||||||
|
peer_side: str = self.peer_side(self.side)
|
||||||
|
log.error(
|
||||||
|
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
|
||||||
|
|
||||||
|
f'{error}\n'
|
||||||
|
f'{pformat(self)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.error(
|
log.error(
|
||||||
f'Remote context error:\n\n'
|
f'Remote context error:\n\n'
|
||||||
|
@ -894,9 +933,9 @@ class Context:
|
||||||
# if `._cancel_called` then `.cancel_acked and .cancel_called`
|
# if `._cancel_called` then `.cancel_acked and .cancel_called`
|
||||||
# always should be set.
|
# always should be set.
|
||||||
and not self._is_self_cancelled()
|
and not self._is_self_cancelled()
|
||||||
|
|
||||||
and not cs.cancel_called
|
and not cs.cancel_called
|
||||||
and not cs.cancelled_caught
|
and not cs.cancelled_caught
|
||||||
|
and not msgtyperr
|
||||||
):
|
):
|
||||||
# TODO: it'd sure be handy to inject our own
|
# TODO: it'd sure be handy to inject our own
|
||||||
# `trio.Cancelled` subtype here ;)
|
# `trio.Cancelled` subtype here ;)
|
||||||
|
@ -1001,7 +1040,7 @@ class Context:
|
||||||
# when the runtime finally receives it during teardown
|
# when the runtime finally receives it during teardown
|
||||||
# (normally in `.result()` called from
|
# (normally in `.result()` called from
|
||||||
# `Portal.open_context().__aexit__()`)
|
# `Portal.open_context().__aexit__()`)
|
||||||
if side == 'caller':
|
if side == 'parent':
|
||||||
if not self._portal:
|
if not self._portal:
|
||||||
raise InternalError(
|
raise InternalError(
|
||||||
'No portal found!?\n'
|
'No portal found!?\n'
|
||||||
|
@ -1423,7 +1462,10 @@ class Context:
|
||||||
# wait for a final context result/error by "draining"
|
# wait for a final context result/error by "draining"
|
||||||
# (by more or less ignoring) any bi-dir-stream "yield"
|
# (by more or less ignoring) any bi-dir-stream "yield"
|
||||||
# msgs still in transit from the far end.
|
# msgs still in transit from the far end.
|
||||||
drained_msgs: list[dict] = await _drain_to_final_msg(
|
(
|
||||||
|
return_msg,
|
||||||
|
drained_msgs,
|
||||||
|
) = await _drain_to_final_msg(
|
||||||
ctx=self,
|
ctx=self,
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
)
|
)
|
||||||
|
@ -1441,7 +1483,10 @@ class Context:
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Ctx drained pre-result msgs:\n'
|
'Ctx drained pre-result msgs:\n'
|
||||||
f'{pformat(drained_msgs)}'
|
f'{pformat(drained_msgs)}\n\n'
|
||||||
|
|
||||||
|
f'Final return msg:\n'
|
||||||
|
f'{return_msg}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
self.maybe_raise(
|
self.maybe_raise(
|
||||||
|
@ -1608,7 +1653,13 @@ class Context:
|
||||||
|
|
||||||
async def started(
|
async def started(
|
||||||
self,
|
self,
|
||||||
value: Any | None = None
|
|
||||||
|
# TODO: how to type this so that it's the
|
||||||
|
# same as the payload type? Is this enough?
|
||||||
|
value: PayloadT|None = None,
|
||||||
|
|
||||||
|
strict_parity: bool = False,
|
||||||
|
complain_no_parity: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -1629,7 +1680,7 @@ class Context:
|
||||||
f'called `.started()` twice on context with {self.chan.uid}'
|
f'called `.started()` twice on context with {self.chan.uid}'
|
||||||
)
|
)
|
||||||
|
|
||||||
started = Started(
|
started_msg = Started(
|
||||||
cid=self.cid,
|
cid=self.cid,
|
||||||
pld=value,
|
pld=value,
|
||||||
)
|
)
|
||||||
|
@ -1650,28 +1701,54 @@ class Context:
|
||||||
# https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
|
# https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
|
||||||
#
|
#
|
||||||
codec: MsgCodec = current_codec()
|
codec: MsgCodec = current_codec()
|
||||||
msg_bytes: bytes = codec.encode(started)
|
msg_bytes: bytes = codec.encode(started_msg)
|
||||||
try:
|
try:
|
||||||
# be a "cheap" dialog (see above!)
|
# be a "cheap" dialog (see above!)
|
||||||
rt_started = codec.decode(msg_bytes)
|
if (
|
||||||
if rt_started != started:
|
strict_parity
|
||||||
|
or
|
||||||
|
complain_no_parity
|
||||||
|
):
|
||||||
|
rt_started: Started = codec.decode(msg_bytes)
|
||||||
|
|
||||||
|
# XXX something is prolly totes cucked with the
|
||||||
|
# codec state!
|
||||||
|
if isinstance(rt_started, dict):
|
||||||
|
rt_started = msgtypes.from_dict_msg(
|
||||||
|
dict_msg=rt_started,
|
||||||
|
)
|
||||||
|
raise RuntimeError(
|
||||||
|
'Failed to roundtrip `Started` msg?\n'
|
||||||
|
f'{pformat(rt_started)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
if rt_started != started_msg:
|
||||||
# TODO: break these methods out from the struct subtype?
|
# TODO: break these methods out from the struct subtype?
|
||||||
diff = pretty_struct.Struct.__sub__(rt_started, started)
|
|
||||||
|
|
||||||
|
diff = pretty_struct.Struct.__sub__(
|
||||||
|
rt_started,
|
||||||
|
started_msg,
|
||||||
|
)
|
||||||
complaint: str = (
|
complaint: str = (
|
||||||
'Started value does not match after codec rountrip?\n\n'
|
'Started value does not match after codec rountrip?\n\n'
|
||||||
f'{diff}'
|
f'{diff}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: rn this will pretty much always fail with
|
# TODO: rn this will pretty much always fail with
|
||||||
# any other sequence type embeded in the
|
# any other sequence type embeded in the
|
||||||
# payload...
|
# payload...
|
||||||
if self._strict_started:
|
if (
|
||||||
|
self._strict_started
|
||||||
|
or
|
||||||
|
strict_parity
|
||||||
|
):
|
||||||
raise ValueError(complaint)
|
raise ValueError(complaint)
|
||||||
else:
|
else:
|
||||||
log.warning(complaint)
|
log.warning(complaint)
|
||||||
|
|
||||||
await self.chan.send(rt_started)
|
# started_msg = rt_started
|
||||||
|
|
||||||
|
await self.chan.send(started_msg)
|
||||||
|
|
||||||
# raise any msg type error NO MATTER WHAT!
|
# raise any msg type error NO MATTER WHAT!
|
||||||
except msgspec.ValidationError as verr:
|
except msgspec.ValidationError as verr:
|
||||||
|
@ -1682,7 +1759,7 @@ class Context:
|
||||||
src_validation_error=verr,
|
src_validation_error=verr,
|
||||||
verb_header='Trying to send payload'
|
verb_header='Trying to send payload'
|
||||||
# > 'invalid `Started IPC msgs\n'
|
# > 'invalid `Started IPC msgs\n'
|
||||||
)
|
) from verr
|
||||||
|
|
||||||
self._started_called = True
|
self._started_called = True
|
||||||
|
|
||||||
|
@ -1783,13 +1860,17 @@ class Context:
|
||||||
else:
|
else:
|
||||||
log_meth = log.runtime
|
log_meth = log.runtime
|
||||||
|
|
||||||
log_meth(
|
side: str = self.side
|
||||||
f'Delivering error-msg to caller\n\n'
|
|
||||||
|
|
||||||
f'<= peer: {from_uid}\n'
|
peer_side: str = self.peer_side(side)
|
||||||
|
|
||||||
|
log_meth(
|
||||||
|
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
|
||||||
|
|
||||||
|
f'<= peer {peer_side!r}: {from_uid}\n'
|
||||||
f' |_ {nsf}()\n\n'
|
f' |_ {nsf}()\n\n'
|
||||||
|
|
||||||
f'=> cid: {cid}\n'
|
f'=> {side!r} cid: {cid}\n'
|
||||||
f' |_{self._task}\n\n'
|
f' |_{self._task}\n\n'
|
||||||
|
|
||||||
f'{pformat(re)}\n'
|
f'{pformat(re)}\n'
|
||||||
|
@ -1804,6 +1885,7 @@ class Context:
|
||||||
self._maybe_cancel_and_set_remote_error(re)
|
self._maybe_cancel_and_set_remote_error(re)
|
||||||
|
|
||||||
# XXX only case where returning early is fine!
|
# XXX only case where returning early is fine!
|
||||||
|
structfmt = pretty_struct.Struct.pformat
|
||||||
if self._in_overrun:
|
if self._in_overrun:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Queueing OVERRUN msg on caller task:\n'
|
f'Queueing OVERRUN msg on caller task:\n'
|
||||||
|
@ -1813,7 +1895,7 @@ class Context:
|
||||||
f'=> cid: {cid}\n'
|
f'=> cid: {cid}\n'
|
||||||
f' |_{self._task}\n\n'
|
f' |_{self._task}\n\n'
|
||||||
|
|
||||||
f'{pformat(msg)}\n'
|
f'{structfmt(msg)}\n'
|
||||||
)
|
)
|
||||||
self._overflow_q.append(msg)
|
self._overflow_q.append(msg)
|
||||||
return False
|
return False
|
||||||
|
@ -1827,7 +1909,7 @@ class Context:
|
||||||
f'=> {self._task}\n'
|
f'=> {self._task}\n'
|
||||||
f' |_cid={self.cid}\n\n'
|
f' |_cid={self.cid}\n\n'
|
||||||
|
|
||||||
f'{pformat(msg)}\n'
|
f'{structfmt(msg)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE: if an error is deteced we should always still
|
# NOTE: if an error is deteced we should always still
|
||||||
|
@ -2047,6 +2129,9 @@ async def open_context_from_portal(
|
||||||
# place..
|
# place..
|
||||||
allow_overruns=allow_overruns,
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
|
# ASAP, so that `Context.side: str` can be determined for
|
||||||
|
# logging / tracing / debug!
|
||||||
|
ctx._portal: Portal = portal
|
||||||
|
|
||||||
assert ctx._remote_func_type == 'context'
|
assert ctx._remote_func_type == 'context'
|
||||||
msg: Started = await ctx._recv_chan.receive()
|
msg: Started = await ctx._recv_chan.receive()
|
||||||
|
@ -2065,10 +2150,9 @@ async def open_context_from_portal(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
src_err=src_error,
|
src_err=src_error,
|
||||||
log=log,
|
log=log,
|
||||||
expect_key='started',
|
expect_msg=Started,
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx._portal: Portal = portal
|
|
||||||
uid: tuple = portal.channel.uid
|
uid: tuple = portal.channel.uid
|
||||||
cid: str = ctx.cid
|
cid: str = ctx.cid
|
||||||
|
|
||||||
|
|
|
@ -106,6 +106,7 @@ def _trio_main(
|
||||||
Entry point for a `trio_run_in_process` subactor.
|
Entry point for a `trio_run_in_process` subactor.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__tracebackhide__: bool = True
|
||||||
_state._current_actor = actor
|
_state._current_actor = actor
|
||||||
trio_main = partial(
|
trio_main = partial(
|
||||||
async_main,
|
async_main,
|
||||||
|
|
|
@ -43,9 +43,12 @@ from tractor.msg import (
|
||||||
MsgType,
|
MsgType,
|
||||||
Stop,
|
Stop,
|
||||||
Yield,
|
Yield,
|
||||||
pretty_struct,
|
|
||||||
types as msgtypes,
|
types as msgtypes,
|
||||||
)
|
)
|
||||||
|
from tractor.msg.pretty_struct import (
|
||||||
|
iter_fields,
|
||||||
|
Struct,
|
||||||
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._context import Context
|
from ._context import Context
|
||||||
|
@ -82,7 +85,7 @@ class InternalError(RuntimeError):
|
||||||
_ipcmsg_keys: list[str] = [
|
_ipcmsg_keys: list[str] = [
|
||||||
fi.name
|
fi.name
|
||||||
for fi, k, v
|
for fi, k, v
|
||||||
in pretty_struct.iter_fields(Error)
|
in iter_fields(Error)
|
||||||
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -321,7 +324,7 @@ class RemoteActorError(Exception):
|
||||||
assert self.boxed_type is boxed_type
|
assert self.boxed_type is boxed_type
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def ipc_msg(self) -> pretty_struct.Struct:
|
def ipc_msg(self) -> Struct:
|
||||||
'''
|
'''
|
||||||
Re-render the underlying `._ipc_msg: Msg` as
|
Re-render the underlying `._ipc_msg: Msg` as
|
||||||
a `pretty_struct.Struct` for introspection such that the
|
a `pretty_struct.Struct` for introspection such that the
|
||||||
|
@ -334,12 +337,12 @@ class RemoteActorError(Exception):
|
||||||
msg_type: MsgType = type(self._ipc_msg)
|
msg_type: MsgType = type(self._ipc_msg)
|
||||||
fields: dict[str, Any] = {
|
fields: dict[str, Any] = {
|
||||||
k: v for _, k, v in
|
k: v for _, k, v in
|
||||||
pretty_struct.iter_fields(self._ipc_msg)
|
iter_fields(self._ipc_msg)
|
||||||
}
|
}
|
||||||
return defstruct(
|
return defstruct(
|
||||||
msg_type.__name__,
|
msg_type.__name__,
|
||||||
fields=fields.keys(),
|
fields=fields.keys(),
|
||||||
bases=(msg_type, pretty_struct.Struct),
|
bases=(msg_type, Struct),
|
||||||
)(**fields)
|
)(**fields)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -641,11 +644,11 @@ class MsgTypeError(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
reprol_fields: list[str] = [
|
reprol_fields: list[str] = [
|
||||||
'payload_msg',
|
'expected_msg_type',
|
||||||
]
|
]
|
||||||
extra_body_fields: list[str] = [
|
extra_body_fields: list[str] = [
|
||||||
'cid',
|
'cid',
|
||||||
'payload_msg',
|
'expected_msg',
|
||||||
]
|
]
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -661,9 +664,7 @@ class MsgTypeError(
|
||||||
return self.msgdata.get('_msg_dict')
|
return self.msgdata.get('_msg_dict')
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def payload_msg(
|
def expected_msg(self) -> MsgType|None:
|
||||||
self,
|
|
||||||
) -> MsgType|None:
|
|
||||||
'''
|
'''
|
||||||
Attempt to construct what would have been the original
|
Attempt to construct what would have been the original
|
||||||
`MsgType`-with-payload subtype (i.e. an instance from the set
|
`MsgType`-with-payload subtype (i.e. an instance from the set
|
||||||
|
@ -674,9 +675,17 @@ class MsgTypeError(
|
||||||
if msg_dict := self.msg_dict.copy():
|
if msg_dict := self.msg_dict.copy():
|
||||||
return msgtypes.from_dict_msg(
|
return msgtypes.from_dict_msg(
|
||||||
dict_msg=msg_dict,
|
dict_msg=msg_dict,
|
||||||
|
# use_pretty=True,
|
||||||
|
# ^-TODO-^ would luv to use this BUT then the
|
||||||
|
# `field_prefix` in `pformat_boxed_tb()` cucks it
|
||||||
|
# all up.. XD
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def expected_msg_type(self) -> Type[MsgType]|None:
|
||||||
|
return type(self.expected_msg)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def cid(self) -> str:
|
def cid(self) -> str:
|
||||||
# pre-packed using `.from_decode()` constructor
|
# pre-packed using `.from_decode()` constructor
|
||||||
|
@ -929,7 +938,6 @@ def _raise_from_no_key_in_msg(
|
||||||
src_err: KeyError,
|
src_err: KeyError,
|
||||||
log: StackLevelAdapter, # caller specific `log` obj
|
log: StackLevelAdapter, # caller specific `log` obj
|
||||||
|
|
||||||
expect_key: str = 'yield',
|
|
||||||
expect_msg: str = Yield,
|
expect_msg: str = Yield,
|
||||||
stream: MsgStream | None = None,
|
stream: MsgStream | None = None,
|
||||||
|
|
||||||
|
@ -1044,7 +1052,7 @@ def _raise_from_no_key_in_msg(
|
||||||
# is activated above.
|
# is activated above.
|
||||||
_type: str = 'Stream' if stream else 'Context'
|
_type: str = 'Stream' if stream else 'Context'
|
||||||
raise MessagingError(
|
raise MessagingError(
|
||||||
f"{_type} was expecting a '{expect_key.upper()}' message"
|
f"{_type} was expecting a {expect_msg} message"
|
||||||
" BUT received a non-error msg:\n"
|
" BUT received a non-error msg:\n"
|
||||||
f'{pformat(msg)}'
|
f'{pformat(msg)}'
|
||||||
) from src_err
|
) from src_err
|
||||||
|
|
|
@ -130,6 +130,8 @@ def _mk_msg_type_err(
|
||||||
|
|
||||||
) -> MsgTypeError:
|
) -> MsgTypeError:
|
||||||
|
|
||||||
|
import textwrap
|
||||||
|
|
||||||
# `Channel.send()` case
|
# `Channel.send()` case
|
||||||
if src_validation_error is None: # send-side
|
if src_validation_error is None: # send-side
|
||||||
|
|
||||||
|
@ -209,10 +211,24 @@ def _mk_msg_type_err(
|
||||||
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
msg, _, maybe_field = msgspec_msg.rpartition('$.')
|
||||||
obj = object()
|
obj = object()
|
||||||
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
|
if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
|
||||||
|
field_name_expr: str = (
|
||||||
|
f' |_{maybe_field}: {codec.pld_spec_str} = '
|
||||||
|
)
|
||||||
|
fmt_val_lines: list[str] = pformat(field_val).splitlines()
|
||||||
|
fmt_val: str = (
|
||||||
|
f'{fmt_val_lines[0]}\n'
|
||||||
|
+
|
||||||
|
textwrap.indent(
|
||||||
|
'\n'.join(fmt_val_lines[1:]),
|
||||||
|
prefix=' '*len(field_name_expr),
|
||||||
|
)
|
||||||
|
)
|
||||||
message += (
|
message += (
|
||||||
f'{msg.rstrip("`")}\n\n'
|
f'{msg.rstrip("`")}\n\n'
|
||||||
f'{msg_type}\n'
|
f'<{msg_type.__qualname__}(\n'
|
||||||
f' |_.{maybe_field}: {codec.pld_spec_str} = {field_val!r}\n'
|
# f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
|
||||||
|
f'{field_name_expr}{fmt_val}\n'
|
||||||
|
f')>'
|
||||||
)
|
)
|
||||||
|
|
||||||
msgtyperr = MsgTypeError.from_decode(
|
msgtyperr = MsgTypeError.from_decode(
|
||||||
|
@ -338,7 +354,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
# self._task = task
|
# self._task = task
|
||||||
self._codec = codec
|
self._codec = codec
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Using new codec in {self}.recv()\n'
|
f'Using new codec in {self}.recv()\n'
|
||||||
f'codec: {self._codec}\n\n'
|
f'codec: {self._codec}\n\n'
|
||||||
f'msg_bytes: {msg_bytes}\n'
|
f'msg_bytes: {msg_bytes}\n'
|
||||||
)
|
)
|
||||||
|
@ -420,7 +436,7 @@ class MsgpackTCPStream(MsgTransport):
|
||||||
if self._codec.pld_spec != codec.pld_spec:
|
if self._codec.pld_spec != codec.pld_spec:
|
||||||
self._codec = codec
|
self._codec = codec
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Using new codec in {self}.send()\n'
|
f'Using new codec in {self}.send()\n'
|
||||||
f'codec: {self._codec}\n\n'
|
f'codec: {self._codec}\n\n'
|
||||||
f'msg: {msg}\n'
|
f'msg: {msg}\n'
|
||||||
)
|
)
|
||||||
|
|
|
@ -79,6 +79,7 @@ async def open_root_actor(
|
||||||
|
|
||||||
# enables the multi-process debugger support
|
# enables the multi-process debugger support
|
||||||
debug_mode: bool = False,
|
debug_mode: bool = False,
|
||||||
|
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
|
||||||
|
|
||||||
# internal logging
|
# internal logging
|
||||||
loglevel: str|None = None,
|
loglevel: str|None = None,
|
||||||
|
@ -107,14 +108,16 @@ async def open_root_actor(
|
||||||
)
|
)
|
||||||
if (
|
if (
|
||||||
debug_mode
|
debug_mode
|
||||||
and
|
and maybe_enable_greenback
|
||||||
await _debug.maybe_init_greenback(
|
and await _debug.maybe_init_greenback(
|
||||||
raise_not_found=False,
|
raise_not_found=False,
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
os.environ['PYTHONBREAKPOINT'] = (
|
os.environ['PYTHONBREAKPOINT'] = (
|
||||||
'tractor.devx._debug.pause_from_sync'
|
'tractor.devx._debug.pause_from_sync'
|
||||||
)
|
)
|
||||||
|
_state._runtime_vars['use_greenback'] = True
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# TODO: disable `breakpoint()` by default (without
|
# TODO: disable `breakpoint()` by default (without
|
||||||
# `greenback`) since it will break any multi-actor
|
# `greenback`) since it will break any multi-actor
|
||||||
|
@ -385,14 +388,20 @@ async def open_root_actor(
|
||||||
_state._last_actor_terminated = actor
|
_state._last_actor_terminated = actor
|
||||||
|
|
||||||
# restore built-in `breakpoint()` hook state
|
# restore built-in `breakpoint()` hook state
|
||||||
if debug_mode:
|
if (
|
||||||
|
debug_mode
|
||||||
|
and
|
||||||
|
maybe_enable_greenback
|
||||||
|
):
|
||||||
if builtin_bp_handler is not None:
|
if builtin_bp_handler is not None:
|
||||||
sys.breakpointhook = builtin_bp_handler
|
sys.breakpointhook = builtin_bp_handler
|
||||||
|
|
||||||
if orig_bp_path is not None:
|
if orig_bp_path is not None:
|
||||||
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
|
os.environ['PYTHONBREAKPOINT'] = orig_bp_path
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# clear env back to having no entry
|
# clear env back to having no entry
|
||||||
os.environ.pop('PYTHONBREAKPOINT')
|
os.environ.pop('PYTHONBREAKPOINT', None)
|
||||||
|
|
||||||
logger.runtime("Root actor terminated")
|
logger.runtime("Root actor terminated")
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,6 @@ from trio import (
|
||||||
TaskStatus,
|
TaskStatus,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .msg import NamespacePath
|
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._context import (
|
from ._context import (
|
||||||
Context,
|
Context,
|
||||||
|
@ -61,6 +60,11 @@ from .devx import (
|
||||||
)
|
)
|
||||||
from . import _state
|
from . import _state
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
|
from .msg import (
|
||||||
|
current_codec,
|
||||||
|
MsgCodec,
|
||||||
|
NamespacePath,
|
||||||
|
)
|
||||||
from tractor.msg.types import (
|
from tractor.msg.types import (
|
||||||
CancelAck,
|
CancelAck,
|
||||||
Error,
|
Error,
|
||||||
|
@ -98,6 +102,7 @@ async def _invoke_non_context(
|
||||||
Context | BaseException
|
Context | BaseException
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
):
|
):
|
||||||
|
__tracebackhide__: bool = True
|
||||||
|
|
||||||
# TODO: can we unify this with the `context=True` impl below?
|
# TODO: can we unify this with the `context=True` impl below?
|
||||||
if inspect.isasyncgen(coro):
|
if inspect.isasyncgen(coro):
|
||||||
|
@ -398,7 +403,11 @@ async def _invoke(
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
treat_as_gen: bool = False
|
treat_as_gen: bool = False
|
||||||
|
|
||||||
if _state.debug_mode():
|
if (
|
||||||
|
_state.debug_mode()
|
||||||
|
and
|
||||||
|
_state._runtime_vars['use_greenback']
|
||||||
|
):
|
||||||
# XXX for .pause_from_sync()` usage we need to make sure
|
# XXX for .pause_from_sync()` usage we need to make sure
|
||||||
# `greenback` is boostrapped in the subactor!
|
# `greenback` is boostrapped in the subactor!
|
||||||
await _debug.maybe_init_greenback()
|
await _debug.maybe_init_greenback()
|
||||||
|
@ -512,10 +521,22 @@ async def _invoke(
|
||||||
# wrapper that calls `Context.started()` and then does
|
# wrapper that calls `Context.started()` and then does
|
||||||
# the `await coro()`?
|
# the `await coro()`?
|
||||||
|
|
||||||
# a "context" endpoint type is the most general and
|
# ------ - ------
|
||||||
# "least sugary" type of RPC ep with support for
|
# a "context" endpoint is the most general and
|
||||||
|
# "least sugary" type of RPC with support for
|
||||||
# bi-dir streaming B)
|
# bi-dir streaming B)
|
||||||
# StartAck
|
#
|
||||||
|
# the concurrency relation is simlar to a task nursery
|
||||||
|
# wherein a "parent" task (the one that enters
|
||||||
|
# `trio.open_nursery()` in some actor "opens" (via
|
||||||
|
# `Portal.open_context()`) an IPC ctx to another peer
|
||||||
|
# (which is maybe a sub-) actor who then schedules (aka
|
||||||
|
# `trio.Nursery.start()`s) a new "child" task to execute
|
||||||
|
# the `@context` annotated func; that is this func we're
|
||||||
|
# running directly below!
|
||||||
|
# ------ - ------
|
||||||
|
#
|
||||||
|
# StartAck: respond immediately with endpoint info
|
||||||
await chan.send(
|
await chan.send(
|
||||||
StartAck(
|
StartAck(
|
||||||
cid=cid,
|
cid=cid,
|
||||||
|
@ -524,11 +545,11 @@ async def _invoke(
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: should we also use an `.open_context()` equiv
|
# TODO: should we also use an `.open_context()` equiv
|
||||||
# for this callee side by factoring the impl from
|
# for this child side by factoring the impl from
|
||||||
# `Portal.open_context()` into a common helper?
|
# `Portal.open_context()` into a common helper?
|
||||||
#
|
#
|
||||||
# NOTE: there are many different ctx state details
|
# NOTE: there are many different ctx state details
|
||||||
# in a callee side instance according to current impl:
|
# in a child side instance according to current impl:
|
||||||
# - `.cancelled_caught` can never be `True`.
|
# - `.cancelled_caught` can never be `True`.
|
||||||
# -> the below scope is never exposed to the
|
# -> the below scope is never exposed to the
|
||||||
# `@context` marked RPC function.
|
# `@context` marked RPC function.
|
||||||
|
@ -554,7 +575,7 @@ async def _invoke(
|
||||||
|
|
||||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||||
# called by any of,
|
# called by any of,
|
||||||
# - *this* callee task manually calling `ctx.cancel()`.
|
# - *this* child task manually calling `ctx.cancel()`.
|
||||||
# - the runtime calling `ctx._deliver_msg()` which
|
# - the runtime calling `ctx._deliver_msg()` which
|
||||||
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
# itself calls `ctx._maybe_cancel_and_set_remote_error()`
|
||||||
# which cancels the scope presuming the input error
|
# which cancels the scope presuming the input error
|
||||||
|
@ -631,10 +652,11 @@ async def _invoke(
|
||||||
# f' |_{ctx}'
|
# f' |_{ctx}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# task-contex was either cancelled by request using
|
# task-contex was either cancelled by request
|
||||||
# ``Portal.cancel_actor()`` or ``Context.cancel()``
|
# using ``Portal.cancel_actor()`` or
|
||||||
# on the far end, or it was cancelled by the local
|
# ``Context.cancel()`` on the far end, or it
|
||||||
# (callee) task, so relay this cancel signal to the
|
# was cancelled by the local child (or callee)
|
||||||
|
# task, so relay this cancel signal to the
|
||||||
# other side.
|
# other side.
|
||||||
ctxc = ContextCancelled(
|
ctxc = ContextCancelled(
|
||||||
message=msg,
|
message=msg,
|
||||||
|
@ -655,7 +677,7 @@ async def _invoke(
|
||||||
|
|
||||||
) as scope_error:
|
) as scope_error:
|
||||||
|
|
||||||
# always set this (callee) side's exception as the
|
# always set this (child) side's exception as the
|
||||||
# local error on the context
|
# local error on the context
|
||||||
ctx._local_error: BaseException = scope_error
|
ctx._local_error: BaseException = scope_error
|
||||||
|
|
||||||
|
@ -1024,9 +1046,8 @@ async def process_messages(
|
||||||
trio.Event(),
|
trio.Event(),
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX remote (runtime scoped) error or uknown
|
# runtime-scoped remote error (since no `.cid`)
|
||||||
# msg (type).
|
case Error():
|
||||||
case Error() | _:
|
|
||||||
# NOTE: this is the non-rpc error case,
|
# NOTE: this is the non-rpc error case,
|
||||||
# that is, an error **not** raised inside
|
# that is, an error **not** raised inside
|
||||||
# a call to ``_invoke()`` (i.e. no cid was
|
# a call to ``_invoke()`` (i.e. no cid was
|
||||||
|
@ -1034,10 +1055,6 @@ async def process_messages(
|
||||||
# this error to all local channel
|
# this error to all local channel
|
||||||
# consumers (normally portals) by marking
|
# consumers (normally portals) by marking
|
||||||
# the channel as errored
|
# the channel as errored
|
||||||
log.exception(
|
|
||||||
f'Unhandled IPC msg:\n\n'
|
|
||||||
f'{msg}\n'
|
|
||||||
)
|
|
||||||
# assert chan.uid
|
# assert chan.uid
|
||||||
chan._exc: Exception = unpack_error(
|
chan._exc: Exception = unpack_error(
|
||||||
msg,
|
msg,
|
||||||
|
@ -1045,6 +1062,17 @@ async def process_messages(
|
||||||
)
|
)
|
||||||
raise chan._exc
|
raise chan._exc
|
||||||
|
|
||||||
|
# unknown/invalid msg type?
|
||||||
|
case _:
|
||||||
|
codec: MsgCodec = current_codec()
|
||||||
|
message: str = (
|
||||||
|
f'Unhandled IPC msg for codec?\n\n'
|
||||||
|
f'|_{codec}\n\n'
|
||||||
|
f'{msg}\n'
|
||||||
|
)
|
||||||
|
log.exception(message)
|
||||||
|
raise RuntimeError(message)
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
'Waiting on next IPC msg from\n'
|
'Waiting on next IPC msg from\n'
|
||||||
f'peer: {chan.uid}\n'
|
f'peer: {chan.uid}\n'
|
||||||
|
|
|
@ -513,7 +513,7 @@ async def trio_proc(
|
||||||
# })
|
# })
|
||||||
|
|
||||||
# track subactor in current nursery
|
# track subactor in current nursery
|
||||||
curr_actor = current_actor()
|
curr_actor: Actor = current_actor()
|
||||||
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
|
||||||
|
|
||||||
# resume caller at next checkpoint now that child is up
|
# resume caller at next checkpoint now that child is up
|
||||||
|
|
|
@ -44,6 +44,7 @@ from .trionics import (
|
||||||
BroadcastReceiver,
|
BroadcastReceiver,
|
||||||
)
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
|
Return,
|
||||||
Stop,
|
Stop,
|
||||||
Yield,
|
Yield,
|
||||||
)
|
)
|
||||||
|
@ -96,36 +97,26 @@ class MsgStream(trio.abc.Channel):
|
||||||
# delegate directly to underlying mem channel
|
# delegate directly to underlying mem channel
|
||||||
def receive_nowait(
|
def receive_nowait(
|
||||||
self,
|
self,
|
||||||
allow_msg_keys: list[str] = ['yield'],
|
allow_msgs: list[str] = Yield,
|
||||||
):
|
):
|
||||||
# msg: dict = self._rx_chan.receive_nowait()
|
|
||||||
msg: Yield|Stop = self._rx_chan.receive_nowait()
|
msg: Yield|Stop = self._rx_chan.receive_nowait()
|
||||||
for (
|
# TODO: replace msg equiv of this or does the `.pld`
|
||||||
i,
|
# interface read already satisfy it? I think so, yes?
|
||||||
key,
|
|
||||||
) in enumerate(allow_msg_keys):
|
|
||||||
try:
|
try:
|
||||||
# return msg[key]
|
|
||||||
return msg.pld
|
return msg.pld
|
||||||
# except KeyError as kerr:
|
|
||||||
except AttributeError as attrerr:
|
except AttributeError as attrerr:
|
||||||
if i < (len(allow_msg_keys) - 1):
|
|
||||||
continue
|
|
||||||
|
|
||||||
_raise_from_no_key_in_msg(
|
_raise_from_no_key_in_msg(
|
||||||
ctx=self._ctx,
|
ctx=self._ctx,
|
||||||
msg=msg,
|
msg=msg,
|
||||||
# src_err=kerr,
|
|
||||||
src_err=attrerr,
|
src_err=attrerr,
|
||||||
log=log,
|
log=log,
|
||||||
expect_key=key,
|
|
||||||
stream=self,
|
stream=self,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def receive(
|
async def receive(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = False,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Receive a single msg from the IPC transport, the next in
|
Receive a single msg from the IPC transport, the next in
|
||||||
|
@ -157,10 +148,9 @@ class MsgStream(trio.abc.Channel):
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
msg: Yield = await self._rx_chan.receive()
|
msg: Yield = await self._rx_chan.receive()
|
||||||
# return msg['yield']
|
|
||||||
return msg.pld
|
return msg.pld
|
||||||
|
|
||||||
# except KeyError as kerr:
|
# TODO: implement with match: instead?
|
||||||
except AttributeError as attrerr:
|
except AttributeError as attrerr:
|
||||||
# src_err = kerr
|
# src_err = kerr
|
||||||
src_err = attrerr
|
src_err = attrerr
|
||||||
|
@ -170,10 +160,8 @@ class MsgStream(trio.abc.Channel):
|
||||||
_raise_from_no_key_in_msg(
|
_raise_from_no_key_in_msg(
|
||||||
ctx=self._ctx,
|
ctx=self._ctx,
|
||||||
msg=msg,
|
msg=msg,
|
||||||
# src_err=kerr,
|
|
||||||
src_err=attrerr,
|
src_err=attrerr,
|
||||||
log=log,
|
log=log,
|
||||||
expect_key='yield',
|
|
||||||
stream=self,
|
stream=self,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -304,7 +292,7 @@ class MsgStream(trio.abc.Channel):
|
||||||
while not drained:
|
while not drained:
|
||||||
try:
|
try:
|
||||||
maybe_final_msg = self.receive_nowait(
|
maybe_final_msg = self.receive_nowait(
|
||||||
allow_msg_keys=['yield', 'return'],
|
allow_msgs=[Yield, Return],
|
||||||
)
|
)
|
||||||
if maybe_final_msg:
|
if maybe_final_msg:
|
||||||
log.debug(
|
log.debug(
|
||||||
|
|
|
@ -420,7 +420,7 @@ def mk_codec(
|
||||||
|
|
||||||
# instance of the default `msgspec.msgpack` codec settings, i.e.
|
# instance of the default `msgspec.msgpack` codec settings, i.e.
|
||||||
# no custom structs, hooks or other special types.
|
# no custom structs, hooks or other special types.
|
||||||
_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
|
_def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
|
||||||
|
|
||||||
# The built-in IPC `Msg` spec.
|
# The built-in IPC `Msg` spec.
|
||||||
# Our composing "shuttle" protocol which allows `tractor`-app code
|
# Our composing "shuttle" protocol which allows `tractor`-app code
|
||||||
|
|
|
@ -451,7 +451,8 @@ def from_dict_msg(
|
||||||
dict_msg: dict,
|
dict_msg: dict,
|
||||||
|
|
||||||
msgT: MsgType|None = None,
|
msgT: MsgType|None = None,
|
||||||
tag_field: str = 'msg_type'
|
tag_field: str = 'msg_type',
|
||||||
|
use_pretty: bool = False,
|
||||||
|
|
||||||
) -> MsgType:
|
) -> MsgType:
|
||||||
'''
|
'''
|
||||||
|
@ -468,6 +469,19 @@ def from_dict_msg(
|
||||||
# XXX ensure tag field is removed
|
# XXX ensure tag field is removed
|
||||||
msgT_name: str = dict_msg.pop(msg_type_tag_field)
|
msgT_name: str = dict_msg.pop(msg_type_tag_field)
|
||||||
msgT: MsgType = _msg_table[msgT_name]
|
msgT: MsgType = _msg_table[msgT_name]
|
||||||
|
if use_pretty:
|
||||||
|
msgT = defstruct(
|
||||||
|
name=msgT_name,
|
||||||
|
fields=[
|
||||||
|
(key, fi.type)
|
||||||
|
for fi, key, _
|
||||||
|
in pretty_struct.iter_fields(msgT)
|
||||||
|
],
|
||||||
|
bases=(
|
||||||
|
pretty_struct.Struct,
|
||||||
|
msgT,
|
||||||
|
),
|
||||||
|
)
|
||||||
return msgT(**dict_msg)
|
return msgT(**dict_msg)
|
||||||
|
|
||||||
# TODO: should be make a msg version of `ContextCancelled?`
|
# TODO: should be make a msg version of `ContextCancelled?`
|
||||||
|
|
Loading…
Reference in New Issue