Compare commits

..

11 Commits

Author SHA1 Message Date
Tyler Goodlet b209990d04 Flip a last `MultiError` to a beg, add todo on `@stream` func 2024-04-14 19:39:57 -04:00
Tyler Goodlet 60aa16adf6 Pass a `use_greenback: bool` runtime var to subs
Such that the top level `maybe_enable_greenback` from
`open_root_actor()` can toggle the entire actor tree's usage.
Read the rtv in `._rpc` tasks and only enable if set.

Also, rigor up the `._rpc.process_messages()` loop to handle `Error()`
and `case _:` separately such that we now raise an explicit rte for
unknown / invalid msgs. Use "parent" / "child" for side descriptions in
loop comments and put a fat comment before the `StartAck` in `_invoke()`.
2024-04-14 19:31:50 -04:00
Tyler Goodlet eca2c02f8b Flip to `.pause()` in subactor bp example 2024-04-14 18:53:42 -04:00
Tyler Goodlet 921f72f7fe Add `maybe_enable_greenback: bool` flag to `open_root_actor()` 2024-04-14 18:36:22 -04:00
Tyler Goodlet 38a6483859 Use `_raise_from_no_key_in_msg(allow_msgs)`
Instead of `allow_msg_keys` since we've fully flipped over to
struct-types for msgs in the runtime.

- drop the loop from `MsgStream.receive_nowait()` since
  `Yield/Return.pld` getting will handle both (instead of a loop of
  `dict`-key reads).
2024-04-14 18:31:41 -04:00
Tyler Goodlet f72b972348 Hide `._entry`/`._child` frames, tweak some more type annots 2024-04-14 17:49:18 -04:00
Tyler Goodlet 2edfed75eb Add `MsgTypeError.expected_msg_type`
Which matches with renaming `.payload_msg` -> `.expected_msg` which is
the value we attempt to construct from a vanilla-msgppack
decode-to-`dict` and then construct manually into a `MsgType` using
`.msg.types.from_dict_msg()`. Add a todo to use new `use_pretty` flag
which currently conflicts with `._exceptions.pformat_boxed_type()`
prefix formatting..
2024-04-14 16:32:18 -04:00
Tyler Goodlet 2d22713806 Add `from_dict_msg(user_pretty: bool)` flag
Allows for optionally (and dynamically) constructing the "expected"
`MsgType` from a `dict` into a `pretty_struct.Struct`, mostly for
logging usage.
2024-04-14 16:29:21 -04:00
Tyler Goodlet df548257ad IPC ctx refinements around `MsgTypeError` awareness
Add a bit of special handling for msg-type-errors with a dedicated
log-msg detailing which `.side: str` is the sender/causer and avoiding
a `._scope.cancel()` call in such cases since the local task might be
written to handle and tolerate the badly (typed) IPC msg.

As part of ^, change the ctx task-pair "side" semantics from "caller" ->
"callee" to be "parent" -> "child" which better matches the
cross-process SC-linked-task supervision hierarchy, and
`trio.Nursery.parent_task`; in `trio` the task that opens a nursery is
also named the "parent".

Impl deats / fixes around the `.side` semantics:
- ensure that `._portal: Portal` is set ASAP after
  `Actor.start_remote_task()` such that if the `Started` transaction
  fails, the parent-vs.-child sides are still denoted correctly (since
  `._portal` being set is the predicate for that).
- add a helper func `Context.peer_side(side: str) -> str:` which inverts
  from "child" to "parent" and vice versa, useful for logging info.

Other tweaks:
- make `_drain_to_final_msg()` return a tuple of a maybe-`Return` and
  the list of other `pre_result_drained: list[MsgType]` such that we
  don't ever have to warn about the return msg getting captured as
  a pre-"result" msg.
- Add some strictness flags to `.started()` which allow for toggling
  whether to error or warn log about mismatching roundtripped `Started`
  msgs prior to IPC transit.
2024-04-13 15:19:08 -04:00
Tyler Goodlet 3fb3608879 Extend recv-side `MsgTypeError` default message
Display the new `MsgCodec.pld_spec_str` and format the incorrect field
value to be placed entirely (txt block wise) right of the "type annot"
part of the line:

Iow if you had a bad `dict` value where something else should be it'd
look something like this:

<Started(
 |_pld: NamespacePath = {'cid': '3e0ca00c-7d32-4d2a-a0c2-ac2e12453871',
                         'locked': True,
                         'msg_type': 'LockStatus',
                         'subactor_uid': ['sub', 'af7ccb69-1dab-491f-84f7-2ec42c32d137']}
2024-04-12 11:49:50 -04:00
Tyler Goodlet faa7194daf TOSQUASH 322e015d Fix `mk_codec()` input arg 2024-04-12 11:47:10 -04:00
16 changed files with 287 additions and 131 deletions

View File

@ -38,6 +38,7 @@ async def main():
""" """
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
# loglevel='runtime',
) as n: ) as n:
# Spawn both actors, don't bother with collecting results # Spawn both actors, don't bother with collecting results

View File

@ -3,17 +3,20 @@ import tractor
async def breakpoint_forever(): async def breakpoint_forever():
"""Indefinitely re-enter debugger in child actor. '''
""" Indefinitely re-enter debugger in child actor.
'''
while True: while True:
await trio.sleep(0.1) await trio.sleep(0.1)
await tractor.breakpoint() await tractor.pause()
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
loglevel='cancel',
) as n: ) as n:
portal = await n.run_in_actor( portal = await n.run_in_actor(

View File

@ -246,10 +246,10 @@ def test_simple_context(
trio.run(main) trio.run(main)
except error_parent: except error_parent:
pass pass
except trio.MultiError as me: except BaseExceptionGroup as beg:
# XXX: on windows it seems we may have to expect the group error # XXX: on windows it seems we may have to expect the group error
from tractor._exceptions import is_multi_cancelled from tractor._exceptions import is_multi_cancelled
assert is_multi_cancelled(me) assert is_multi_cancelled(beg)
else: else:
trio.run(main) trio.run(main)

View File

@ -38,10 +38,13 @@ async def async_gen_stream(sequence):
assert cs.cancelled_caught assert cs.cancelled_caught
# TODO: deprecated either remove entirely
# or re-impl in terms of `MsgStream` one-sides
# wrapper, but at least remove `Portal.open_stream_from()`
@tractor.stream @tractor.stream
async def context_stream( async def context_stream(
ctx: tractor.Context, ctx: tractor.Context,
sequence sequence: list[int],
): ):
for i in sequence: for i in sequence:
await ctx.send_yield(i) await ctx.send_yield(i)

View File

@ -36,6 +36,7 @@ def parse_ipaddr(arg):
if __name__ == "__main__": if __name__ == "__main__":
__tracebackhide__: bool = True
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--uid", type=parse_uid) parser.add_argument("--uid", type=parse_uid)

View File

@ -47,6 +47,7 @@ import trio
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
InternalError, InternalError,
MsgTypeError,
RemoteActorError, RemoteActorError,
StreamOverrun, StreamOverrun,
pack_from_raise, pack_from_raise,
@ -59,12 +60,14 @@ from .msg import (
MsgType, MsgType,
MsgCodec, MsgCodec,
NamespacePath, NamespacePath,
PayloadT,
Return, Return,
Started, Started,
Stop, Stop,
Yield, Yield,
current_codec, current_codec,
pretty_struct, pretty_struct,
types as msgtypes,
) )
from ._ipc import Channel from ._ipc import Channel
from ._streaming import MsgStream from ._streaming import MsgStream
@ -88,7 +91,10 @@ async def _drain_to_final_msg(
hide_tb: bool = True, hide_tb: bool = True,
msg_limit: int = 6, msg_limit: int = 6,
) -> list[dict]: ) -> tuple[
Return|None,
list[MsgType]
]:
''' '''
Drain IPC msgs delivered to the underlying rx-mem-chan Drain IPC msgs delivered to the underlying rx-mem-chan
`Context._recv_chan` from the runtime in search for a final `Context._recv_chan` from the runtime in search for a final
@ -109,6 +115,7 @@ async def _drain_to_final_msg(
# basically ignoring) any bi-dir-stream msgs still in transit # basically ignoring) any bi-dir-stream msgs still in transit
# from the far end. # from the far end.
pre_result_drained: list[MsgType] = [] pre_result_drained: list[MsgType] = []
return_msg: Return|None = None
while not ( while not (
ctx.maybe_error ctx.maybe_error
and not ctx._final_result_is_set() and not ctx._final_result_is_set()
@ -169,8 +176,6 @@ async def _drain_to_final_msg(
# pray to the `trio` gawds that we're corrent with this # pray to the `trio` gawds that we're corrent with this
# msg: dict = await ctx._recv_chan.receive() # msg: dict = await ctx._recv_chan.receive()
msg: MsgType = await ctx._recv_chan.receive() msg: MsgType = await ctx._recv_chan.receive()
# always capture unexpected/non-result msgs
pre_result_drained.append(msg)
# NOTE: we get here if the far end was # NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases: # `ContextCancelled` in 2 cases:
@ -207,11 +212,13 @@ async def _drain_to_final_msg(
# if ctx._recv_chan: # if ctx._recv_chan:
# await ctx._recv_chan.aclose() # await ctx._recv_chan.aclose()
# TODO: ^ we don't need it right? # TODO: ^ we don't need it right?
return_msg = msg
break break
# far end task is still streaming to us so discard # far end task is still streaming to us so discard
# and report depending on local ctx state. # and report depending on local ctx state.
case Yield(): case Yield():
pre_result_drained.append(msg)
if ( if (
(ctx._stream.closed (ctx._stream.closed
and (reason := 'stream was already closed') and (reason := 'stream was already closed')
@ -236,7 +243,10 @@ async def _drain_to_final_msg(
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
) )
return pre_result_drained return (
return_msg,
pre_result_drained,
)
# drain up to the `msg_limit` hoping to get # drain up to the `msg_limit` hoping to get
# a final result or error/ctxc. # a final result or error/ctxc.
@ -260,6 +270,7 @@ async def _drain_to_final_msg(
# -[ ] should be a runtime error if a stream is open right? # -[ ] should be a runtime error if a stream is open right?
# Stop() # Stop()
case Stop(): case Stop():
pre_result_drained.append(msg)
log.cancel( log.cancel(
'Remote stream terminated due to "stop" msg:\n\n' 'Remote stream terminated due to "stop" msg:\n\n'
f'{pformat(msg)}\n' f'{pformat(msg)}\n'
@ -269,7 +280,6 @@ async def _drain_to_final_msg(
# remote error msg, likely already handled inside # remote error msg, likely already handled inside
# `Context._deliver_msg()` # `Context._deliver_msg()`
case Error(): case Error():
# TODO: can we replace this with `ctx.maybe_raise()`? # TODO: can we replace this with `ctx.maybe_raise()`?
# -[ ] would this be handier for this case maybe? # -[ ] would this be handier for this case maybe?
# async with maybe_raise_on_exit() as raises: # async with maybe_raise_on_exit() as raises:
@ -336,6 +346,7 @@ async def _drain_to_final_msg(
# XXX should pretty much never get here unless someone # XXX should pretty much never get here unless someone
# overrides the default `MsgType` spec. # overrides the default `MsgType` spec.
case _: case _:
pre_result_drained.append(msg)
# It's definitely an internal error if any other # It's definitely an internal error if any other
# msg type without a`'cid'` field arrives here! # msg type without a`'cid'` field arrives here!
if not msg.cid: if not msg.cid:
@ -352,7 +363,10 @@ async def _drain_to_final_msg(
f'{ctx.outcome}\n' f'{ctx.outcome}\n'
) )
return pre_result_drained return (
return_msg,
pre_result_drained,
)
class Unresolved: class Unresolved:
@ -719,21 +733,36 @@ class Context:
Return string indicating which task this instance is wrapping. Return string indicating which task this instance is wrapping.
''' '''
return 'caller' if self._portal else 'callee' return 'parent' if self._portal else 'child'
@staticmethod
def peer_side(side: str) -> str:
match side:
case 'child':
return 'parent'
case 'parent':
return 'child'
# TODO: remove stat!
# -[ ] re-implement the `.experiemental._pubsub` stuff
# with `MsgStream` and that should be last usage?
# -[ ] remove from `tests/legacy_one_way_streaming.py`!
async def send_yield( async def send_yield(
self, self,
data: Any, data: Any,
) -> None: ) -> None:
'''
Deprecated method for what now is implemented in `MsgStream`.
We need to rework / remove some stuff tho, see above.
'''
warnings.warn( warnings.warn(
"`Context.send_yield()` is now deprecated. " "`Context.send_yield()` is now deprecated. "
"Use ``MessageStream.send()``. ", "Use ``MessageStream.send()``. ",
DeprecationWarning, DeprecationWarning,
stacklevel=2, stacklevel=2,
) )
# await self.chan.send({'yield': data, 'cid': self.cid})
await self.chan.send( await self.chan.send(
Yield( Yield(
cid=self.cid, cid=self.cid,
@ -742,12 +771,11 @@ class Context:
) )
async def send_stop(self) -> None: async def send_stop(self) -> None:
# await pause() '''
# await self.chan.send({ Terminate a `MsgStream` dialog-phase by sending the IPC
# # Stop( equiv of a `StopIteration`.
# 'stop': True,
# 'cid': self.cid '''
# })
await self.chan.send( await self.chan.send(
Stop(cid=self.cid) Stop(cid=self.cid)
) )
@ -843,6 +871,7 @@ class Context:
# self-cancel (ack) or, # self-cancel (ack) or,
# peer propagated remote cancellation. # peer propagated remote cancellation.
msgtyperr: bool = False
if isinstance(error, ContextCancelled): if isinstance(error, ContextCancelled):
whom: str = ( whom: str = (
@ -854,6 +883,16 @@ class Context:
f'{error}' f'{error}'
) )
elif isinstance(error, MsgTypeError):
msgtyperr = True
peer_side: str = self.peer_side(self.side)
log.error(
f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
f'{error}\n'
f'{pformat(self)}\n'
)
else: else:
log.error( log.error(
f'Remote context error:\n\n' f'Remote context error:\n\n'
@ -894,9 +933,9 @@ class Context:
# if `._cancel_called` then `.cancel_acked and .cancel_called` # if `._cancel_called` then `.cancel_acked and .cancel_called`
# always should be set. # always should be set.
and not self._is_self_cancelled() and not self._is_self_cancelled()
and not cs.cancel_called and not cs.cancel_called
and not cs.cancelled_caught and not cs.cancelled_caught
and not msgtyperr
): ):
# TODO: it'd sure be handy to inject our own # TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;) # `trio.Cancelled` subtype here ;)
@ -1001,7 +1040,7 @@ class Context:
# when the runtime finally receives it during teardown # when the runtime finally receives it during teardown
# (normally in `.result()` called from # (normally in `.result()` called from
# `Portal.open_context().__aexit__()`) # `Portal.open_context().__aexit__()`)
if side == 'caller': if side == 'parent':
if not self._portal: if not self._portal:
raise InternalError( raise InternalError(
'No portal found!?\n' 'No portal found!?\n'
@ -1423,7 +1462,10 @@ class Context:
# wait for a final context result/error by "draining" # wait for a final context result/error by "draining"
# (by more or less ignoring) any bi-dir-stream "yield" # (by more or less ignoring) any bi-dir-stream "yield"
# msgs still in transit from the far end. # msgs still in transit from the far end.
drained_msgs: list[dict] = await _drain_to_final_msg( (
return_msg,
drained_msgs,
) = await _drain_to_final_msg(
ctx=self, ctx=self,
hide_tb=hide_tb, hide_tb=hide_tb,
) )
@ -1441,7 +1483,10 @@ class Context:
log.cancel( log.cancel(
'Ctx drained pre-result msgs:\n' 'Ctx drained pre-result msgs:\n'
f'{pformat(drained_msgs)}' f'{pformat(drained_msgs)}\n\n'
f'Final return msg:\n'
f'{return_msg}\n'
) )
self.maybe_raise( self.maybe_raise(
@ -1608,7 +1653,13 @@ class Context:
async def started( async def started(
self, self,
value: Any | None = None
# TODO: how to type this so that it's the
# same as the payload type? Is this enough?
value: PayloadT|None = None,
strict_parity: bool = False,
complain_no_parity: bool = True,
) -> None: ) -> None:
''' '''
@ -1629,7 +1680,7 @@ class Context:
f'called `.started()` twice on context with {self.chan.uid}' f'called `.started()` twice on context with {self.chan.uid}'
) )
started = Started( started_msg = Started(
cid=self.cid, cid=self.cid,
pld=value, pld=value,
) )
@ -1650,28 +1701,54 @@ class Context:
# https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern # https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
# #
codec: MsgCodec = current_codec() codec: MsgCodec = current_codec()
msg_bytes: bytes = codec.encode(started) msg_bytes: bytes = codec.encode(started_msg)
try: try:
# be a "cheap" dialog (see above!) # be a "cheap" dialog (see above!)
rt_started = codec.decode(msg_bytes) if (
if rt_started != started: strict_parity
or
complain_no_parity
):
rt_started: Started = codec.decode(msg_bytes)
# XXX something is prolly totes cucked with the
# codec state!
if isinstance(rt_started, dict):
rt_started = msgtypes.from_dict_msg(
dict_msg=rt_started,
)
raise RuntimeError(
'Failed to roundtrip `Started` msg?\n'
f'{pformat(rt_started)}\n'
)
if rt_started != started_msg:
# TODO: break these methods out from the struct subtype? # TODO: break these methods out from the struct subtype?
diff = pretty_struct.Struct.__sub__(rt_started, started)
diff = pretty_struct.Struct.__sub__(
rt_started,
started_msg,
)
complaint: str = ( complaint: str = (
'Started value does not match after codec rountrip?\n\n' 'Started value does not match after codec rountrip?\n\n'
f'{diff}' f'{diff}'
) )
# TODO: rn this will pretty much always fail with # TODO: rn this will pretty much always fail with
# any other sequence type embeded in the # any other sequence type embeded in the
# payload... # payload...
if self._strict_started: if (
self._strict_started
or
strict_parity
):
raise ValueError(complaint) raise ValueError(complaint)
else: else:
log.warning(complaint) log.warning(complaint)
await self.chan.send(rt_started) # started_msg = rt_started
await self.chan.send(started_msg)
# raise any msg type error NO MATTER WHAT! # raise any msg type error NO MATTER WHAT!
except msgspec.ValidationError as verr: except msgspec.ValidationError as verr:
@ -1682,7 +1759,7 @@ class Context:
src_validation_error=verr, src_validation_error=verr,
verb_header='Trying to send payload' verb_header='Trying to send payload'
# > 'invalid `Started IPC msgs\n' # > 'invalid `Started IPC msgs\n'
) ) from verr
self._started_called = True self._started_called = True
@ -1783,13 +1860,17 @@ class Context:
else: else:
log_meth = log.runtime log_meth = log.runtime
log_meth( side: str = self.side
f'Delivering error-msg to caller\n\n'
f'<= peer: {from_uid}\n' peer_side: str = self.peer_side(side)
log_meth(
f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
f'<= peer {peer_side!r}: {from_uid}\n'
f' |_ {nsf}()\n\n' f' |_ {nsf}()\n\n'
f'=> cid: {cid}\n' f'=> {side!r} cid: {cid}\n'
f' |_{self._task}\n\n' f' |_{self._task}\n\n'
f'{pformat(re)}\n' f'{pformat(re)}\n'
@ -1804,6 +1885,7 @@ class Context:
self._maybe_cancel_and_set_remote_error(re) self._maybe_cancel_and_set_remote_error(re)
# XXX only case where returning early is fine! # XXX only case where returning early is fine!
structfmt = pretty_struct.Struct.pformat
if self._in_overrun: if self._in_overrun:
log.warning( log.warning(
f'Queueing OVERRUN msg on caller task:\n' f'Queueing OVERRUN msg on caller task:\n'
@ -1813,7 +1895,7 @@ class Context:
f'=> cid: {cid}\n' f'=> cid: {cid}\n'
f' |_{self._task}\n\n' f' |_{self._task}\n\n'
f'{pformat(msg)}\n' f'{structfmt(msg)}\n'
) )
self._overflow_q.append(msg) self._overflow_q.append(msg)
return False return False
@ -1827,7 +1909,7 @@ class Context:
f'=> {self._task}\n' f'=> {self._task}\n'
f' |_cid={self.cid}\n\n' f' |_cid={self.cid}\n\n'
f'{pformat(msg)}\n' f'{structfmt(msg)}\n'
) )
# NOTE: if an error is deteced we should always still # NOTE: if an error is deteced we should always still
@ -2047,6 +2129,9 @@ async def open_context_from_portal(
# place.. # place..
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
) )
# ASAP, so that `Context.side: str` can be determined for
# logging / tracing / debug!
ctx._portal: Portal = portal
assert ctx._remote_func_type == 'context' assert ctx._remote_func_type == 'context'
msg: Started = await ctx._recv_chan.receive() msg: Started = await ctx._recv_chan.receive()
@ -2065,10 +2150,9 @@ async def open_context_from_portal(
msg=msg, msg=msg,
src_err=src_error, src_err=src_error,
log=log, log=log,
expect_key='started', expect_msg=Started,
) )
ctx._portal: Portal = portal
uid: tuple = portal.channel.uid uid: tuple = portal.channel.uid
cid: str = ctx.cid cid: str = ctx.cid

View File

@ -106,6 +106,7 @@ def _trio_main(
Entry point for a `trio_run_in_process` subactor. Entry point for a `trio_run_in_process` subactor.
''' '''
__tracebackhide__: bool = True
_state._current_actor = actor _state._current_actor = actor
trio_main = partial( trio_main = partial(
async_main, async_main,

View File

@ -43,9 +43,12 @@ from tractor.msg import (
MsgType, MsgType,
Stop, Stop,
Yield, Yield,
pretty_struct,
types as msgtypes, types as msgtypes,
) )
from tractor.msg.pretty_struct import (
iter_fields,
Struct,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ._context import Context from ._context import Context
@ -82,7 +85,7 @@ class InternalError(RuntimeError):
_ipcmsg_keys: list[str] = [ _ipcmsg_keys: list[str] = [
fi.name fi.name
for fi, k, v for fi, k, v
in pretty_struct.iter_fields(Error) in iter_fields(Error)
] ]
@ -321,7 +324,7 @@ class RemoteActorError(Exception):
assert self.boxed_type is boxed_type assert self.boxed_type is boxed_type
@property @property
def ipc_msg(self) -> pretty_struct.Struct: def ipc_msg(self) -> Struct:
''' '''
Re-render the underlying `._ipc_msg: Msg` as Re-render the underlying `._ipc_msg: Msg` as
a `pretty_struct.Struct` for introspection such that the a `pretty_struct.Struct` for introspection such that the
@ -334,12 +337,12 @@ class RemoteActorError(Exception):
msg_type: MsgType = type(self._ipc_msg) msg_type: MsgType = type(self._ipc_msg)
fields: dict[str, Any] = { fields: dict[str, Any] = {
k: v for _, k, v in k: v for _, k, v in
pretty_struct.iter_fields(self._ipc_msg) iter_fields(self._ipc_msg)
} }
return defstruct( return defstruct(
msg_type.__name__, msg_type.__name__,
fields=fields.keys(), fields=fields.keys(),
bases=(msg_type, pretty_struct.Struct), bases=(msg_type, Struct),
)(**fields) )(**fields)
@property @property
@ -641,11 +644,11 @@ class MsgTypeError(
''' '''
reprol_fields: list[str] = [ reprol_fields: list[str] = [
'payload_msg', 'expected_msg_type',
] ]
extra_body_fields: list[str] = [ extra_body_fields: list[str] = [
'cid', 'cid',
'payload_msg', 'expected_msg',
] ]
@property @property
@ -661,9 +664,7 @@ class MsgTypeError(
return self.msgdata.get('_msg_dict') return self.msgdata.get('_msg_dict')
@property @property
def payload_msg( def expected_msg(self) -> MsgType|None:
self,
) -> MsgType|None:
''' '''
Attempt to construct what would have been the original Attempt to construct what would have been the original
`MsgType`-with-payload subtype (i.e. an instance from the set `MsgType`-with-payload subtype (i.e. an instance from the set
@ -674,9 +675,17 @@ class MsgTypeError(
if msg_dict := self.msg_dict.copy(): if msg_dict := self.msg_dict.copy():
return msgtypes.from_dict_msg( return msgtypes.from_dict_msg(
dict_msg=msg_dict, dict_msg=msg_dict,
# use_pretty=True,
# ^-TODO-^ would luv to use this BUT then the
# `field_prefix` in `pformat_boxed_tb()` cucks it
# all up.. XD
) )
return None return None
@property
def expected_msg_type(self) -> Type[MsgType]|None:
return type(self.expected_msg)
@property @property
def cid(self) -> str: def cid(self) -> str:
# pre-packed using `.from_decode()` constructor # pre-packed using `.from_decode()` constructor
@ -929,7 +938,6 @@ def _raise_from_no_key_in_msg(
src_err: KeyError, src_err: KeyError,
log: StackLevelAdapter, # caller specific `log` obj log: StackLevelAdapter, # caller specific `log` obj
expect_key: str = 'yield',
expect_msg: str = Yield, expect_msg: str = Yield,
stream: MsgStream | None = None, stream: MsgStream | None = None,
@ -1044,7 +1052,7 @@ def _raise_from_no_key_in_msg(
# is activated above. # is activated above.
_type: str = 'Stream' if stream else 'Context' _type: str = 'Stream' if stream else 'Context'
raise MessagingError( raise MessagingError(
f"{_type} was expecting a '{expect_key.upper()}' message" f"{_type} was expecting a {expect_msg} message"
" BUT received a non-error msg:\n" " BUT received a non-error msg:\n"
f'{pformat(msg)}' f'{pformat(msg)}'
) from src_err ) from src_err

View File

@ -130,6 +130,8 @@ def _mk_msg_type_err(
) -> MsgTypeError: ) -> MsgTypeError:
import textwrap
# `Channel.send()` case # `Channel.send()` case
if src_validation_error is None: # send-side if src_validation_error is None: # send-side
@ -209,10 +211,24 @@ def _mk_msg_type_err(
msg, _, maybe_field = msgspec_msg.rpartition('$.') msg, _, maybe_field = msgspec_msg.rpartition('$.')
obj = object() obj = object()
if (field_val := msg_dict.get(maybe_field, obj)) is not obj: if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
field_name_expr: str = (
f' |_{maybe_field}: {codec.pld_spec_str} = '
)
fmt_val_lines: list[str] = pformat(field_val).splitlines()
fmt_val: str = (
f'{fmt_val_lines[0]}\n'
+
textwrap.indent(
'\n'.join(fmt_val_lines[1:]),
prefix=' '*len(field_name_expr),
)
)
message += ( message += (
f'{msg.rstrip("`")}\n\n' f'{msg.rstrip("`")}\n\n'
f'{msg_type}\n' f'<{msg_type.__qualname__}(\n'
f' |_.{maybe_field}: {codec.pld_spec_str} = {field_val!r}\n' # f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
f'{field_name_expr}{fmt_val}\n'
f')>'
) )
msgtyperr = MsgTypeError.from_decode( msgtyperr = MsgTypeError.from_decode(
@ -338,7 +354,7 @@ class MsgpackTCPStream(MsgTransport):
# self._task = task # self._task = task
self._codec = codec self._codec = codec
log.runtime( log.runtime(
'Using new codec in {self}.recv()\n' f'Using new codec in {self}.recv()\n'
f'codec: {self._codec}\n\n' f'codec: {self._codec}\n\n'
f'msg_bytes: {msg_bytes}\n' f'msg_bytes: {msg_bytes}\n'
) )
@ -420,7 +436,7 @@ class MsgpackTCPStream(MsgTransport):
if self._codec.pld_spec != codec.pld_spec: if self._codec.pld_spec != codec.pld_spec:
self._codec = codec self._codec = codec
log.runtime( log.runtime(
'Using new codec in {self}.send()\n' f'Using new codec in {self}.send()\n'
f'codec: {self._codec}\n\n' f'codec: {self._codec}\n\n'
f'msg: {msg}\n' f'msg: {msg}\n'
) )

View File

@ -79,6 +79,7 @@ async def open_root_actor(
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
# internal logging # internal logging
loglevel: str|None = None, loglevel: str|None = None,
@ -107,14 +108,16 @@ async def open_root_actor(
) )
if ( if (
debug_mode debug_mode
and and maybe_enable_greenback
await _debug.maybe_init_greenback( and await _debug.maybe_init_greenback(
raise_not_found=False, raise_not_found=False,
) )
): ):
os.environ['PYTHONBREAKPOINT'] = ( os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx._debug.pause_from_sync' 'tractor.devx._debug.pause_from_sync'
) )
_state._runtime_vars['use_greenback'] = True
else: else:
# TODO: disable `breakpoint()` by default (without # TODO: disable `breakpoint()` by default (without
# `greenback`) since it will break any multi-actor # `greenback`) since it will break any multi-actor
@ -385,14 +388,20 @@ async def open_root_actor(
_state._last_actor_terminated = actor _state._last_actor_terminated = actor
# restore built-in `breakpoint()` hook state # restore built-in `breakpoint()` hook state
if debug_mode: if (
debug_mode
and
maybe_enable_greenback
):
if builtin_bp_handler is not None: if builtin_bp_handler is not None:
sys.breakpointhook = builtin_bp_handler sys.breakpointhook = builtin_bp_handler
if orig_bp_path is not None: if orig_bp_path is not None:
os.environ['PYTHONBREAKPOINT'] = orig_bp_path os.environ['PYTHONBREAKPOINT'] = orig_bp_path
else: else:
# clear env back to having no entry # clear env back to having no entry
os.environ.pop('PYTHONBREAKPOINT') os.environ.pop('PYTHONBREAKPOINT', None)
logger.runtime("Root actor terminated") logger.runtime("Root actor terminated")

View File

@ -41,7 +41,6 @@ from trio import (
TaskStatus, TaskStatus,
) )
from .msg import NamespacePath
from ._ipc import Channel from ._ipc import Channel
from ._context import ( from ._context import (
Context, Context,
@ -61,6 +60,11 @@ from .devx import (
) )
from . import _state from . import _state
from .log import get_logger from .log import get_logger
from .msg import (
current_codec,
MsgCodec,
NamespacePath,
)
from tractor.msg.types import ( from tractor.msg.types import (
CancelAck, CancelAck,
Error, Error,
@ -98,6 +102,7 @@ async def _invoke_non_context(
Context | BaseException Context | BaseException
] = trio.TASK_STATUS_IGNORED, ] = trio.TASK_STATUS_IGNORED,
): ):
__tracebackhide__: bool = True
# TODO: can we unify this with the `context=True` impl below? # TODO: can we unify this with the `context=True` impl below?
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
@ -398,7 +403,11 @@ async def _invoke(
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
treat_as_gen: bool = False treat_as_gen: bool = False
if _state.debug_mode(): if (
_state.debug_mode()
and
_state._runtime_vars['use_greenback']
):
# XXX for .pause_from_sync()` usage we need to make sure # XXX for .pause_from_sync()` usage we need to make sure
# `greenback` is boostrapped in the subactor! # `greenback` is boostrapped in the subactor!
await _debug.maybe_init_greenback() await _debug.maybe_init_greenback()
@ -512,10 +521,22 @@ async def _invoke(
# wrapper that calls `Context.started()` and then does # wrapper that calls `Context.started()` and then does
# the `await coro()`? # the `await coro()`?
# a "context" endpoint type is the most general and # ------ - ------
# "least sugary" type of RPC ep with support for # a "context" endpoint is the most general and
# "least sugary" type of RPC with support for
# bi-dir streaming B) # bi-dir streaming B)
# StartAck #
# the concurrency relation is simlar to a task nursery
# wherein a "parent" task (the one that enters
# `trio.open_nursery()` in some actor "opens" (via
# `Portal.open_context()`) an IPC ctx to another peer
# (which is maybe a sub-) actor who then schedules (aka
# `trio.Nursery.start()`s) a new "child" task to execute
# the `@context` annotated func; that is this func we're
# running directly below!
# ------ - ------
#
# StartAck: respond immediately with endpoint info
await chan.send( await chan.send(
StartAck( StartAck(
cid=cid, cid=cid,
@ -524,11 +545,11 @@ async def _invoke(
) )
# TODO: should we also use an `.open_context()` equiv # TODO: should we also use an `.open_context()` equiv
# for this callee side by factoring the impl from # for this child side by factoring the impl from
# `Portal.open_context()` into a common helper? # `Portal.open_context()` into a common helper?
# #
# NOTE: there are many different ctx state details # NOTE: there are many different ctx state details
# in a callee side instance according to current impl: # in a child side instance according to current impl:
# - `.cancelled_caught` can never be `True`. # - `.cancelled_caught` can never be `True`.
# -> the below scope is never exposed to the # -> the below scope is never exposed to the
# `@context` marked RPC function. # `@context` marked RPC function.
@ -554,7 +575,7 @@ async def _invoke(
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of, # called by any of,
# - *this* callee task manually calling `ctx.cancel()`. # - *this* child task manually calling `ctx.cancel()`.
# - the runtime calling `ctx._deliver_msg()` which # - the runtime calling `ctx._deliver_msg()` which
# itself calls `ctx._maybe_cancel_and_set_remote_error()` # itself calls `ctx._maybe_cancel_and_set_remote_error()`
# which cancels the scope presuming the input error # which cancels the scope presuming the input error
@ -631,10 +652,11 @@ async def _invoke(
# f' |_{ctx}' # f' |_{ctx}'
) )
# task-contex was either cancelled by request using # task-contex was either cancelled by request
# ``Portal.cancel_actor()`` or ``Context.cancel()`` # using ``Portal.cancel_actor()`` or
# on the far end, or it was cancelled by the local # ``Context.cancel()`` on the far end, or it
# (callee) task, so relay this cancel signal to the # was cancelled by the local child (or callee)
# task, so relay this cancel signal to the
# other side. # other side.
ctxc = ContextCancelled( ctxc = ContextCancelled(
message=msg, message=msg,
@ -655,7 +677,7 @@ async def _invoke(
) as scope_error: ) as scope_error:
# always set this (callee) side's exception as the # always set this (child) side's exception as the
# local error on the context # local error on the context
ctx._local_error: BaseException = scope_error ctx._local_error: BaseException = scope_error
@ -1024,9 +1046,8 @@ async def process_messages(
trio.Event(), trio.Event(),
) )
# XXX remote (runtime scoped) error or uknown # runtime-scoped remote error (since no `.cid`)
# msg (type). case Error():
case Error() | _:
# NOTE: this is the non-rpc error case, # NOTE: this is the non-rpc error case,
# that is, an error **not** raised inside # that is, an error **not** raised inside
# a call to ``_invoke()`` (i.e. no cid was # a call to ``_invoke()`` (i.e. no cid was
@ -1034,10 +1055,6 @@ async def process_messages(
# this error to all local channel # this error to all local channel
# consumers (normally portals) by marking # consumers (normally portals) by marking
# the channel as errored # the channel as errored
log.exception(
f'Unhandled IPC msg:\n\n'
f'{msg}\n'
)
# assert chan.uid # assert chan.uid
chan._exc: Exception = unpack_error( chan._exc: Exception = unpack_error(
msg, msg,
@ -1045,6 +1062,17 @@ async def process_messages(
) )
raise chan._exc raise chan._exc
# unknown/invalid msg type?
case _:
codec: MsgCodec = current_codec()
message: str = (
f'Unhandled IPC msg for codec?\n\n'
f'|_{codec}\n\n'
f'{msg}\n'
)
log.exception(message)
raise RuntimeError(message)
log.runtime( log.runtime(
'Waiting on next IPC msg from\n' 'Waiting on next IPC msg from\n'
f'peer: {chan.uid}\n' f'peer: {chan.uid}\n'

View File

@ -513,7 +513,7 @@ async def trio_proc(
# }) # })
# track subactor in current nursery # track subactor in current nursery
curr_actor = current_actor() curr_actor: Actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up # resume caller at next checkpoint now that child is up

View File

@ -44,6 +44,7 @@ from .trionics import (
BroadcastReceiver, BroadcastReceiver,
) )
from tractor.msg import ( from tractor.msg import (
Return,
Stop, Stop,
Yield, Yield,
) )
@ -96,36 +97,26 @@ class MsgStream(trio.abc.Channel):
# delegate directly to underlying mem channel # delegate directly to underlying mem channel
def receive_nowait( def receive_nowait(
self, self,
allow_msg_keys: list[str] = ['yield'], allow_msgs: list[str] = Yield,
): ):
# msg: dict = self._rx_chan.receive_nowait()
msg: Yield|Stop = self._rx_chan.receive_nowait() msg: Yield|Stop = self._rx_chan.receive_nowait()
for ( # TODO: replace msg equiv of this or does the `.pld`
i, # interface read already satisfy it? I think so, yes?
key,
) in enumerate(allow_msg_keys):
try: try:
# return msg[key]
return msg.pld return msg.pld
# except KeyError as kerr:
except AttributeError as attrerr: except AttributeError as attrerr:
if i < (len(allow_msg_keys) - 1):
continue
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=self._ctx, ctx=self._ctx,
msg=msg, msg=msg,
# src_err=kerr,
src_err=attrerr, src_err=attrerr,
log=log, log=log,
expect_key=key,
stream=self, stream=self,
) )
async def receive( async def receive(
self, self,
hide_tb: bool = True, hide_tb: bool = False,
): ):
''' '''
Receive a single msg from the IPC transport, the next in Receive a single msg from the IPC transport, the next in
@ -157,10 +148,9 @@ class MsgStream(trio.abc.Channel):
try: try:
try: try:
msg: Yield = await self._rx_chan.receive() msg: Yield = await self._rx_chan.receive()
# return msg['yield']
return msg.pld return msg.pld
# except KeyError as kerr: # TODO: implement with match: instead?
except AttributeError as attrerr: except AttributeError as attrerr:
# src_err = kerr # src_err = kerr
src_err = attrerr src_err = attrerr
@ -170,10 +160,8 @@ class MsgStream(trio.abc.Channel):
_raise_from_no_key_in_msg( _raise_from_no_key_in_msg(
ctx=self._ctx, ctx=self._ctx,
msg=msg, msg=msg,
# src_err=kerr,
src_err=attrerr, src_err=attrerr,
log=log, log=log,
expect_key='yield',
stream=self, stream=self,
) )
@ -304,7 +292,7 @@ class MsgStream(trio.abc.Channel):
while not drained: while not drained:
try: try:
maybe_final_msg = self.receive_nowait( maybe_final_msg = self.receive_nowait(
allow_msg_keys=['yield', 'return'], allow_msgs=[Yield, Return],
) )
if maybe_final_msg: if maybe_final_msg:
log.debug( log.debug(

View File

@ -420,7 +420,7 @@ def mk_codec(
# instance of the default `msgspec.msgpack` codec settings, i.e. # instance of the default `msgspec.msgpack` codec settings, i.e.
# no custom structs, hooks or other special types. # no custom structs, hooks or other special types.
_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any) _def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
# The built-in IPC `Msg` spec. # The built-in IPC `Msg` spec.
# Our composing "shuttle" protocol which allows `tractor`-app code # Our composing "shuttle" protocol which allows `tractor`-app code

View File

@ -451,7 +451,8 @@ def from_dict_msg(
dict_msg: dict, dict_msg: dict,
msgT: MsgType|None = None, msgT: MsgType|None = None,
tag_field: str = 'msg_type' tag_field: str = 'msg_type',
use_pretty: bool = False,
) -> MsgType: ) -> MsgType:
''' '''
@ -468,6 +469,19 @@ def from_dict_msg(
# XXX ensure tag field is removed # XXX ensure tag field is removed
msgT_name: str = dict_msg.pop(msg_type_tag_field) msgT_name: str = dict_msg.pop(msg_type_tag_field)
msgT: MsgType = _msg_table[msgT_name] msgT: MsgType = _msg_table[msgT_name]
if use_pretty:
msgT = defstruct(
name=msgT_name,
fields=[
(key, fi.type)
for fi, key, _
in pretty_struct.iter_fields(msgT)
],
bases=(
pretty_struct.Struct,
msgT,
),
)
return msgT(**dict_msg) return msgT(**dict_msg)
# TODO: should be make a msg version of `ContextCancelled?` # TODO: should be make a msg version of `ContextCancelled?`