From e153cc0187de7bb37a76809b16d917cf124468b3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 2 Apr 2024 13:41:52 -0400 Subject: [PATCH] WIP porting runtime to use `Msg`-spec --- tractor/_context.py | 245 +++++++++------- tractor/_entry.py | 1 + tractor/_exceptions.py | 94 ++++-- tractor/_ipc.py | 99 ++++++- tractor/_portal.py | 19 +- tractor/_rpc.py | 646 ++++++++++++++++++++++++----------------- tractor/_runtime.py | 174 +++++++---- tractor/_spawn.py | 30 +- tractor/_streaming.py | 46 ++- tractor/devx/_debug.py | 3 + 10 files changed, 879 insertions(+), 478 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index 3c2490a..38b4431 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -53,7 +53,14 @@ from ._exceptions import ( _raise_from_no_key_in_msg, ) from .log import get_logger -from .msg import NamespacePath +from .msg import ( + NamespacePath, + Msg, + Return, + Started, + Stop, + Yield, +) from ._ipc import Channel from ._streaming import MsgStream from ._state import ( @@ -96,7 +103,8 @@ async def _drain_to_final_msg( # wait for a final context result by collecting (but # basically ignoring) any bi-dir-stream msgs still in transit # from the far end. - pre_result_drained: list[dict] = [] + # pre_result_drained: list[dict] = [] + pre_result_drained: list[Msg] = [] while not ( ctx.maybe_error and not ctx._final_result_is_set() @@ -155,7 +163,10 @@ async def _drain_to_final_msg( # await pause() # pray to the `trio` gawds that we're corrent with this - msg: dict = await ctx._recv_chan.receive() + # msg: dict = await ctx._recv_chan.receive() + msg: Msg = await ctx._recv_chan.receive() + # always capture unexpected/non-result msgs + pre_result_drained.append(msg) # NOTE: we get here if the far end was # `ContextCancelled` in 2 cases: @@ -175,24 +186,31 @@ async def _drain_to_final_msg( # continue to bubble up as normal. raise - try: - ctx._result: Any = msg['return'] - log.runtime( - 'Context delivered final draining msg:\n' - f'{pformat(msg)}' - ) - # XXX: only close the rx mem chan AFTER - # a final result is retreived. - # if ctx._recv_chan: - # await ctx._recv_chan.aclose() - # TODO: ^ we don't need it right? - break + match msg: + case Return( + cid=cid, + pld=res, + ): + # try: + # ctx._result: Any = msg['return'] + # ctx._result: Any = msg.pld + ctx._result: Any = res + log.runtime( + 'Context delivered final draining msg:\n' + f'{pformat(msg)}' + ) + # XXX: only close the rx mem chan AFTER + # a final result is retreived. + # if ctx._recv_chan: + # await ctx._recv_chan.aclose() + # TODO: ^ we don't need it right? + break - except KeyError: - # always capture unexpected/non-result msgs - pre_result_drained.append(msg) + # except KeyError: + # except AttributeError: + case Yield(): + # if 'yield' in msg: - if 'yield' in msg: # far end task is still streaming to us so discard # and report per local context state. if ( @@ -238,9 +256,10 @@ async def _drain_to_final_msg( # TODO: work out edge cases here where # a stream is open but the task also calls # this? - # -[ ] should be a runtime error if a stream is open - # right? - elif 'stop' in msg: + # -[ ] should be a runtime error if a stream is open right? + # Stop() + case Stop(): + # elif 'stop' in msg: log.cancel( 'Remote stream terminated due to "stop" msg:\n\n' f'{pformat(msg)}\n' @@ -249,78 +268,80 @@ async def _drain_to_final_msg( # It's an internal error if any other msg type without # a`'cid'` field arrives here! - if not msg.get('cid'): - raise InternalError( - 'Unexpected cid-missing msg?\n\n' - f'{msg}\n' - ) + case _: + # if not msg.get('cid'): + if not msg.cid: + raise InternalError( + 'Unexpected cid-missing msg?\n\n' + f'{msg}\n' + ) - # XXX fallthrough to handle expected error XXX - # TODO: replace this with `ctx.maybe_raise()` - # - # TODO: would this be handier for this case maybe? - # async with maybe_raise_on_exit() as raises: - # if raises: - # log.error('some msg about raising..') + # XXX fallthrough to handle expected error XXX + # TODO: replace this with `ctx.maybe_raise()` + # + # TODO: would this be handier for this case maybe? + # async with maybe_raise_on_exit() as raises: + # if raises: + # log.error('some msg about raising..') - re: Exception|None = ctx._remote_error - if re: - log.critical( - 'Remote ctx terminated due to "error" msg:\n' - f'{re}' - ) - assert msg is ctx._cancel_msg - # NOTE: this solved a super dupe edge case XD - # this was THE super duper edge case of: - # - local task opens a remote task, - # - requests remote cancellation of far end - # ctx/tasks, - # - needs to wait for the cancel ack msg - # (ctxc) or some result in the race case - # where the other side's task returns - # before the cancel request msg is ever - # rxed and processed, - # - here this surrounding drain loop (which - # iterates all ipc msgs until the ack or - # an early result arrives) was NOT exiting - # since we are the edge case: local task - # does not re-raise any ctxc it receives - # IFF **it** was the cancellation - # requester.. - # will raise if necessary, ow break from - # loop presuming any error terminates the - # context! - ctx._maybe_raise_remote_err( - re, - # NOTE: obvi we don't care if we - # overran the far end if we're already - # waiting on a final result (msg). - # raise_overrun_from_self=False, - raise_overrun_from_self=raise_overrun, - ) + re: Exception|None = ctx._remote_error + if re: + log.critical( + 'Remote ctx terminated due to "error" msg:\n' + f'{re}' + ) + assert msg is ctx._cancel_msg + # NOTE: this solved a super dupe edge case XD + # this was THE super duper edge case of: + # - local task opens a remote task, + # - requests remote cancellation of far end + # ctx/tasks, + # - needs to wait for the cancel ack msg + # (ctxc) or some result in the race case + # where the other side's task returns + # before the cancel request msg is ever + # rxed and processed, + # - here this surrounding drain loop (which + # iterates all ipc msgs until the ack or + # an early result arrives) was NOT exiting + # since we are the edge case: local task + # does not re-raise any ctxc it receives + # IFF **it** was the cancellation + # requester.. + # will raise if necessary, ow break from + # loop presuming any error terminates the + # context! + ctx._maybe_raise_remote_err( + re, + # NOTE: obvi we don't care if we + # overran the far end if we're already + # waiting on a final result (msg). + # raise_overrun_from_self=False, + raise_overrun_from_self=raise_overrun, + ) - break # OOOOOF, yeah obvi we need this.. + break # OOOOOF, yeah obvi we need this.. - # XXX we should never really get here - # right! since `._deliver_msg()` should - # always have detected an {'error': ..} - # msg and already called this right!?! - elif error := unpack_error( - msg=msg, - chan=ctx._portal.channel, - hide_tb=False, - ): - log.critical('SHOULD NEVER GET HERE!?') - assert msg is ctx._cancel_msg - assert error.msgdata == ctx._remote_error.msgdata - from .devx._debug import pause - await pause() - ctx._maybe_cancel_and_set_remote_error(error) - ctx._maybe_raise_remote_err(error) + # XXX we should never really get here + # right! since `._deliver_msg()` should + # always have detected an {'error': ..} + # msg and already called this right!?! + elif error := unpack_error( + msg=msg, + chan=ctx._portal.channel, + hide_tb=False, + ): + log.critical('SHOULD NEVER GET HERE!?') + assert msg is ctx._cancel_msg + assert error.msgdata == ctx._remote_error.msgdata + from .devx._debug import pause + await pause() + ctx._maybe_cancel_and_set_remote_error(error) + ctx._maybe_raise_remote_err(error) - else: - # bubble the original src key error - raise + else: + # bubble the original src key error + raise else: log.cancel( 'Skipping `MsgStream` drain since final outcome is set\n\n' @@ -710,10 +731,14 @@ class Context: async def send_stop(self) -> None: # await pause() - await self.chan.send({ - 'stop': True, - 'cid': self.cid - }) + # await self.chan.send({ + # # Stop( + # 'stop': True, + # 'cid': self.cid + # }) + await self.chan.send( + Stop(cid=self.cid) + ) def _maybe_cancel_and_set_remote_error( self, @@ -1395,17 +1420,19 @@ class Context: for msg in drained_msgs: # TODO: mask this by default.. - if 'return' in msg: + # if 'return' in msg: + if isinstance(msg, Return): # from .devx import pause # await pause() - raise InternalError( + # raise InternalError( + log.warning( 'Final `return` msg should never be drained !?!?\n\n' f'{msg}\n' ) log.cancel( 'Ctx drained pre-result msgs:\n' - f'{drained_msgs}' + f'{pformat(drained_msgs)}' ) self.maybe_raise( @@ -1613,7 +1640,18 @@ class Context: f'called `.started()` twice on context with {self.chan.uid}' ) - await self.chan.send({'started': value, 'cid': self.cid}) + # await self.chan.send( + # { + # 'started': value, + # 'cid': self.cid, + # } + # ) + await self.chan.send( + Started( + cid=self.cid, + pld=value, + ) + ) self._started_called = True async def _drain_overflows( @@ -1668,7 +1706,8 @@ class Context: async def _deliver_msg( self, - msg: dict, + # msg: dict, + msg: Msg, ) -> bool: ''' @@ -1852,7 +1891,7 @@ class Context: # anything different. return False else: - txt += f'\n{msg}\n' + # txt += f'\n{msg}\n' # raise local overrun and immediately pack as IPC # msg for far end. try: @@ -1983,15 +2022,17 @@ async def open_context_from_portal( ) assert ctx._remote_func_type == 'context' - msg: dict = await ctx._recv_chan.receive() + msg: Started = await ctx._recv_chan.receive() try: # the "first" value here is delivered by the callee's # ``Context.started()`` call. - first: Any = msg['started'] + # first: Any = msg['started'] + first: Any = msg.pld ctx._started_called: bool = True - except KeyError as src_error: + # except KeyError as src_error: + except AttributeError as src_error: _raise_from_no_key_in_msg( ctx=ctx, msg=msg, diff --git a/tractor/_entry.py b/tractor/_entry.py index 0ac0dc4..b2aae2e 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -135,6 +135,7 @@ def _trio_main( run_as_asyncio_guest(trio_main) else: trio.run(trio_main) + except KeyboardInterrupt: log.cancel( 'Actor received KBI\n' diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index b1a8ee6..7deda9d 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -31,9 +31,16 @@ import textwrap import traceback import trio +from msgspec import structs from tractor._state import current_actor from tractor.log import get_logger +from tractor.msg import ( + Error, + Msg, + Stop, + Yield, +) if TYPE_CHECKING: from ._context import Context @@ -135,6 +142,8 @@ class RemoteActorError(Exception): # and instead render if from `.boxed_type_str`? self._boxed_type: BaseException = boxed_type self._src_type: BaseException|None = None + + # TODO: make this a `.errmsg: Error` throughout? self.msgdata: dict[str, Any] = msgdata # TODO: mask out eventually or place in `pack_error()` @@ -464,7 +473,23 @@ class AsyncioCancelled(Exception): ''' class MessagingError(Exception): - 'Some kind of unexpected SC messaging dialog issue' + ''' + IPC related msg (typing), transaction (ordering) or dialog + handling error. + + ''' + + +class MsgTypeError(MessagingError): + ''' + Equivalent of a `TypeError` for an IPC wire-message + due to an invalid field value (type). + + Normally this is re-raised from some `.msg._codec` + decode error raised by a backend interchange lib + like `msgspec` or `pycapnproto`. + + ''' def pack_error( @@ -473,7 +498,7 @@ def pack_error( tb: str|None = None, cid: str|None = None, -) -> dict[str, dict]: +) -> Error|dict[str, dict]: ''' Create an "error message" which boxes a locally caught exception's meta-data and encodes it for wire transport via an @@ -536,17 +561,23 @@ def pack_error( # content's `.msgdata`). error_msg['tb_str'] = tb_str - pkt: dict = { - 'error': error_msg, - } - if cid: - pkt['cid'] = cid + # Error() + # pkt: dict = { + # 'error': error_msg, + # } + pkt: Error = Error( + cid=cid, + **error_msg, + # TODO: just get rid of `.pld` on this msg? + ) + # if cid: + # pkt['cid'] = cid return pkt def unpack_error( - msg: dict[str, Any], + msg: dict[str, Any]|Error, chan: Channel|None = None, box_type: RemoteActorError = RemoteActorError, @@ -564,15 +595,17 @@ def unpack_error( ''' __tracebackhide__: bool = hide_tb - error_dict: dict[str, dict] | None - if ( - error_dict := msg.get('error') - ) is None: + error_dict: dict[str, dict]|None + if not isinstance(msg, Error): + # if ( + # error_dict := msg.get('error') + # ) is None: # no error field, nothing to unpack. return None # retrieve the remote error's msg encoded details - tb_str: str = error_dict.get('tb_str', '') + # tb_str: str = error_dict.get('tb_str', '') + tb_str: str = msg.tb_str message: str = ( f'{chan.uid}\n' + @@ -581,7 +614,8 @@ def unpack_error( # try to lookup a suitable error type from the local runtime # env then use it to construct a local instance. - boxed_type_str: str = error_dict['boxed_type_str'] + # boxed_type_str: str = error_dict['boxed_type_str'] + boxed_type_str: str = msg.boxed_type_str boxed_type: Type[BaseException] = get_err_type(boxed_type_str) if boxed_type_str == 'ContextCancelled': @@ -595,7 +629,11 @@ def unpack_error( # original source error. elif boxed_type_str == 'RemoteActorError': assert boxed_type is RemoteActorError - assert len(error_dict['relay_path']) >= 1 + # assert len(error_dict['relay_path']) >= 1 + assert len(msg.relay_path) >= 1 + + # TODO: mk RAE just take the `Error` instance directly? + error_dict: dict = structs.asdict(msg) exc = box_type( message, @@ -623,11 +661,12 @@ def is_multi_cancelled(exc: BaseException) -> bool: def _raise_from_no_key_in_msg( ctx: Context, - msg: dict, + msg: Msg, src_err: KeyError, log: StackLevelAdapter, # caller specific `log` obj expect_key: str = 'yield', + expect_msg: str = Yield, stream: MsgStream | None = None, # allow "deeper" tbs when debugging B^o @@ -660,8 +699,10 @@ def _raise_from_no_key_in_msg( # an internal error should never get here try: - cid: str = msg['cid'] - except KeyError as src_err: + cid: str = msg.cid + # cid: str = msg['cid'] + # except KeyError as src_err: + except AttributeError as src_err: raise MessagingError( f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n' f'cid: {cid}\n\n' @@ -672,7 +713,10 @@ def _raise_from_no_key_in_msg( # TODO: test that shows stream raising an expected error!!! # raise the error message in a boxed exception type! - if msg.get('error'): + # if msg.get('error'): + if isinstance(msg, Error): + # match msg: + # case Error(): raise unpack_error( msg, ctx.chan, @@ -683,8 +727,10 @@ def _raise_from_no_key_in_msg( # `MsgStream` termination msg. # TODO: does it make more sense to pack # the stream._eoc outside this in the calleer always? + # case Stop(): elif ( - msg.get('stop') + # msg.get('stop') + isinstance(msg, Stop) or ( stream and stream._eoc @@ -725,14 +771,16 @@ def _raise_from_no_key_in_msg( stream and stream._closed ): - raise trio.ClosedResourceError('This stream was closed') - + # TODO: our own error subtype? + raise trio.ClosedResourceError( + 'This stream was closed' + ) # always re-raise the source error if no translation error case # is activated above. _type: str = 'Stream' if stream else 'Context' raise MessagingError( - f"{_type} was expecting a '{expect_key}' message" + f"{_type} was expecting a '{expect_key.upper()}' message" " BUT received a non-error msg:\n" f'{pformat(msg)}' ) from src_err diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5f71c38..6168c77 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -38,17 +38,23 @@ from typing import ( Protocol, Type, TypeVar, + Union, ) +import msgspec from tricycle import BufferedReceiveStream import trio from tractor.log import get_logger -from tractor._exceptions import TransportClosed +from tractor._exceptions import ( + TransportClosed, + MsgTypeError, +) from tractor.msg import ( _ctxvar_MsgCodec, + _codec, MsgCodec, - mk_codec, + types, ) log = get_logger(__name__) @@ -163,7 +169,16 @@ class MsgpackTCPStream(MsgTransport): # allow for custom IPC msg interchange format # dynamic override Bo - self.codec: MsgCodec = codec or mk_codec() + self._task = trio.lowlevel.current_task() + self._codec: MsgCodec = ( + codec + or + _codec._ctxvar_MsgCodec.get() + ) + log.critical( + '!?!: USING STD `tractor` CODEC !?!?\n' + f'{self._codec}\n' + ) async def _iter_packets(self) -> AsyncGenerator[dict, None]: ''' @@ -171,7 +186,6 @@ class MsgpackTCPStream(MsgTransport): stream using the current task's `MsgCodec`. ''' - import msgspec # noqa decodes_failed: int = 0 while True: @@ -206,7 +220,19 @@ class MsgpackTCPStream(MsgTransport): try: # NOTE: lookup the `trio.Task.context`'s var for # the current `MsgCodec`. - yield _ctxvar_MsgCodec.get().decode(msg_bytes) + codec: MsgCodec = _ctxvar_MsgCodec.get() + if self._codec.pld_spec != codec.pld_spec: + # assert ( + # task := trio.lowlevel.current_task() + # ) is not self._task + # self._task = task + self._codec = codec + log.critical( + '.recv() USING NEW CODEC !?!?\n' + f'{self._codec}\n\n' + f'msg_bytes -> {msg_bytes}\n' + ) + yield codec.decode(msg_bytes) # TODO: remove, was only for orig draft impl # testing. @@ -221,6 +247,41 @@ class MsgpackTCPStream(MsgTransport): # # yield obj + # XXX NOTE: since the below error derives from + # `DecodeError` we need to catch is specially + # and always raise such that spec violations + # are never allowed to be caught silently! + except msgspec.ValidationError as verr: + + # decode the msg-bytes using the std msgpack + # interchange-prot (i.e. without any + # `msgspec.Struct` handling) so that we can + # determine what `.msg.types.Msg` is the culprit + # by reporting the received value. + msg_dict: dict = msgspec.msgpack.decode(msg_bytes) + msg_type_name: str = msg_dict['msg_type'] + msg_type = getattr(types, msg_type_name) + errmsg: str = ( + f'Received invalid IPC `{msg_type_name}` msg\n\n' + ) + + # XXX see if we can determine the exact invalid field + # such that we can comprehensively report the + # specific field's type problem + msgspec_msg: str = verr.args[0].rstrip('`') + msg, _, maybe_field = msgspec_msg.rpartition('$.') + if field_val := msg_dict.get(maybe_field): + field_type: Union[Type] = msg_type.__signature__.parameters[ + maybe_field + ].annotation + errmsg += ( + f'{msg.rstrip("`")}\n\n' + f'{msg_type}\n' + f' |_.{maybe_field}: {field_type} = {field_val}\n' + ) + + raise MsgTypeError(errmsg) from verr + except ( msgspec.DecodeError, UnicodeDecodeError, @@ -230,14 +291,15 @@ class MsgpackTCPStream(MsgTransport): # do with a channel drop - hope that receiving from the # channel will raise an expected error and bubble up. try: - msg_str: str | bytes = msg_bytes.decode() + msg_str: str|bytes = msg_bytes.decode() except UnicodeDecodeError: msg_str = msg_bytes - log.error( - '`msgspec` failed to decode!?\n' - 'dumping bytes:\n' - f'{msg_str!r}' + log.exception( + 'Failed to decode msg?\n' + f'{codec}\n\n' + 'Rxed bytes from wire:\n\n' + f'{msg_str!r}\n' ) decodes_failed += 1 else: @@ -258,8 +320,21 @@ class MsgpackTCPStream(MsgTransport): # NOTE: lookup the `trio.Task.context`'s var for # the current `MsgCodec`. - bytes_data: bytes = _ctxvar_MsgCodec.get().encode(msg) - # bytes_data: bytes = self.codec.encode(msg) + codec: MsgCodec = _ctxvar_MsgCodec.get() + # if self._codec != codec: + if self._codec.pld_spec != codec.pld_spec: + self._codec = codec + log.critical( + '.send() using NEW CODEC !?!?\n' + f'{self._codec}\n\n' + f'OBJ -> {msg}\n' + ) + if type(msg) not in types.__spec__: + log.warning( + 'Sending non-`Msg`-spec msg?\n\n' + f'{msg}\n' + ) + bytes_data: bytes = codec.encode(msg) # supposedly the fastest says, # https://stackoverflow.com/a/54027962 diff --git a/tractor/_portal.py b/tractor/_portal.py index ac602dd..cc9052b 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -45,7 +45,10 @@ from ._state import ( ) from ._ipc import Channel from .log import get_logger -from .msg import NamespacePath +from .msg import ( + NamespacePath, + Return, +) from ._exceptions import ( unpack_error, NoResult, @@ -66,7 +69,8 @@ log = get_logger(__name__) # `._raise_from_no_key_in_msg()` (after tweak to # accept a `chan: Channel` arg) in key block! def _unwrap_msg( - msg: dict[str, Any], + # msg: dict[str, Any], + msg: Return, channel: Channel, hide_tb: bool = True, @@ -79,18 +83,21 @@ def _unwrap_msg( __tracebackhide__: bool = hide_tb try: - return msg['return'] - except KeyError as ke: + return msg.pld + # return msg['return'] + # except KeyError as ke: + except AttributeError as err: # internal error should never get here - assert msg.get('cid'), ( + # assert msg.get('cid'), ( + assert msg.cid, ( "Received internal error at portal?" ) raise unpack_error( msg, channel - ) from ke + ) from err class Portal: diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 310b80a..0549b0c 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -55,12 +55,21 @@ from ._exceptions import ( TransportClosed, ) from .devx import ( - # pause, + pause, maybe_wait_for_debugger, _debug, ) from . import _state from .log import get_logger +from tractor.msg.types import ( + Start, + StartAck, + Started, + Stop, + Yield, + Return, + Error, +) if TYPE_CHECKING: @@ -89,10 +98,13 @@ async def _invoke_non_context( # TODO: can we unify this with the `context=True` impl below? if inspect.isasyncgen(coro): - await chan.send({ - 'cid': cid, - 'functype': 'asyncgen', - }) + # await chan.send({ + await chan.send( + StartAck( + cid=cid, + functype='asyncgen', + ) + ) # XXX: massive gotcha! If the containing scope # is cancelled and we execute the below line, # any ``ActorNursery.__aexit__()`` WON'T be @@ -112,27 +124,45 @@ async def _invoke_non_context( # to_send = await chan.recv_nowait() # if to_send is not None: # to_yield = await coro.asend(to_send) - await chan.send({ - 'yield': item, - 'cid': cid, - }) + # await chan.send({ + # # Yield() + # 'cid': cid, + # 'yield': item, + # }) + await chan.send( + Yield( + cid=cid, + pld=item, + ) + ) log.runtime(f"Finished iterating {coro}") # TODO: we should really support a proper # `StopAsyncIteration` system here for returning a final # value if desired - await chan.send({ - 'stop': True, - 'cid': cid, - }) + await chan.send( + Stop(cid=cid) + ) + # await chan.send({ + # # Stop( + # 'cid': cid, + # 'stop': True, + # }) # one way @stream func that gets treated like an async gen # TODO: can we unify this with the `context=True` impl below? elif treat_as_gen: - await chan.send({ - 'cid': cid, - 'functype': 'asyncgen', - }) + await chan.send( + StartAck( + cid=cid, + functype='asyncgen', + ) + ) + # await chan.send({ + # # StartAck() + # 'cid': cid, + # 'functype': 'asyncgen', + # }) # XXX: the async-func may spawn further tasks which push # back values like an async-generator would but must # manualy construct the response dict-packet-responses as @@ -145,10 +175,14 @@ async def _invoke_non_context( if not cs.cancelled_caught: # task was not cancelled so we can instruct the # far end async gen to tear down - await chan.send({ - 'stop': True, - 'cid': cid - }) + await chan.send( + Stop(cid=cid) + ) + # await chan.send({ + # # Stop( + # 'cid': cid, + # 'stop': True, + # }) else: # regular async function/method # XXX: possibly just a scheduled `Actor._cancel_task()` @@ -160,10 +194,17 @@ async def _invoke_non_context( # way: using the linked IPC context machinery. failed_resp: bool = False try: - await chan.send({ - 'functype': 'asyncfunc', - 'cid': cid - }) + await chan.send( + StartAck( + cid=cid, + functype='asyncfunc', + ) + ) + # await chan.send({ + # # StartAck() + # 'cid': cid, + # 'functype': 'asyncfunc', + # }) except ( trio.ClosedResourceError, trio.BrokenResourceError, @@ -197,10 +238,17 @@ async def _invoke_non_context( and chan.connected() ): try: - await chan.send({ - 'return': result, - 'cid': cid, - }) + # await chan.send({ + # # Return() + # 'cid': cid, + # 'return': result, + # }) + await chan.send( + Return( + cid=cid, + pld=result, + ) + ) except ( BrokenPipeError, trio.BrokenResourceError, @@ -381,6 +429,8 @@ async def _invoke( # XXX for .pause_from_sync()` usage we need to make sure # `greenback` is boostrapped in the subactor! await _debug.maybe_init_greenback() + # else: + # await pause() # TODO: possibly a specially formatted traceback # (not sure what typing is for this..)? @@ -493,10 +543,18 @@ async def _invoke( # a "context" endpoint type is the most general and # "least sugary" type of RPC ep with support for # bi-dir streaming B) - await chan.send({ - 'cid': cid, - 'functype': 'context', - }) + # StartAck + await chan.send( + StartAck( + cid=cid, + functype='context', + ) + ) + # await chan.send({ + # # StartAck() + # 'cid': cid, + # 'functype': 'context', + # }) # TODO: should we also use an `.open_context()` equiv # for this callee side by factoring the impl from @@ -520,10 +578,17 @@ async def _invoke( ctx._result = res # deliver final result to caller side. - await chan.send({ - 'return': res, - 'cid': cid - }) + await chan.send( + Return( + cid=cid, + pld=res, + ) + ) + # await chan.send({ + # # Return() + # 'cid': cid, + # 'return': res, + # }) # NOTE: this happens IFF `ctx._scope.cancel()` is # called by any of, @@ -696,7 +761,8 @@ async def try_ship_error_to_remote( try: # NOTE: normally only used for internal runtime errors # so ship to peer actor without a cid. - msg: dict = pack_error( + # msg: dict = pack_error( + msg: Error = pack_error( err, cid=cid, @@ -712,12 +778,13 @@ async def try_ship_error_to_remote( trio.BrokenResourceError, BrokenPipeError, ): - err_msg: dict = msg['error']['tb_str'] + # err_msg: dict = msg['error']['tb_str'] log.critical( 'IPC transport failure -> ' f'failed to ship error to {remote_descr}!\n\n' f'X=> {channel.uid}\n\n' - f'{err_msg}\n' + # f'{err_msg}\n' + f'{msg}\n' ) @@ -777,31 +844,6 @@ async def process_messages( with CancelScope(shield=shield) as loop_cs: task_status.started(loop_cs) async for msg in chan: - - # dedicated loop terminate sentinel - if msg is None: - - tasks: dict[ - tuple[Channel, str], - tuple[Context, Callable, trio.Event] - ] = actor._rpc_tasks.copy() - log.cancel( - f'Peer IPC channel terminated via `None` setinel msg?\n' - f'=> Cancelling all {len(tasks)} local RPC tasks..\n' - f'peer: {chan.uid}\n' - f'|_{chan}\n' - ) - for (channel, cid) in tasks: - if channel is chan: - await actor._cancel_task( - cid, - channel, - requesting_uid=channel.uid, - - ipc_msg=msg, - ) - break - log.transport( # type: ignore f'<= IPC msg from peer: {chan.uid}\n\n' @@ -811,216 +853,294 @@ async def process_messages( f'{pformat(msg)}\n' ) - cid = msg.get('cid') - if cid: - # deliver response to local caller/waiter - # via its per-remote-context memory channel. - await actor._push_result( - chan, - cid, - msg, - ) + match msg: - log.runtime( - 'Waiting on next IPC msg from\n' - f'peer: {chan.uid}:\n' - f'|_{chan}\n' + # if msg is None: + # dedicated loop terminate sentinel + case None: - # f'last msg: {msg}\n' - ) - continue - - # process a 'cmd' request-msg upack - # TODO: impl with native `msgspec.Struct` support !! - # -[ ] implement with ``match:`` syntax? - # -[ ] discard un-authed msgs as per, - # - try: - ( - ns, - funcname, - kwargs, - actorid, - cid, - ) = msg['cmd'] - - except KeyError: - # This is the non-rpc error case, that is, an - # error **not** raised inside a call to ``_invoke()`` - # (i.e. no cid was provided in the msg - see above). - # Push this error to all local channel consumers - # (normally portals) by marking the channel as errored - assert chan.uid - exc = unpack_error(msg, chan=chan) - chan._exc = exc - raise exc - - log.runtime( - 'Handling RPC cmd from\n' - f'peer: {actorid}\n' - '\n' - f'=> {ns}.{funcname}({kwargs})\n' - ) - if ns == 'self': - if funcname == 'cancel': - func: Callable = actor.cancel - kwargs |= { - 'req_chan': chan, - } - - # don't start entire actor runtime cancellation - # if this actor is currently in debug mode! - pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete - if pdb_complete: - await pdb_complete.wait() - - # Either of `Actor.cancel()`/`.cancel_soon()` - # was called, so terminate this IPC msg - # loop, exit back out into `async_main()`, - # and immediately start the core runtime - # machinery shutdown! - with CancelScope(shield=True): - await _invoke( - actor, - cid, - chan, - func, - kwargs, - is_rpc=False, - ) - - log.runtime( - 'Cancelling IPC transport msg-loop with peer:\n' + tasks: dict[ + tuple[Channel, str], + tuple[Context, Callable, trio.Event] + ] = actor._rpc_tasks.copy() + log.cancel( + f'Peer IPC channel terminated via `None` setinel msg?\n' + f'=> Cancelling all {len(tasks)} local RPC tasks..\n' + f'peer: {chan.uid}\n' f'|_{chan}\n' ) - loop_cs.cancel() + for (channel, cid) in tasks: + if channel is chan: + await actor._cancel_task( + cid, + channel, + requesting_uid=channel.uid, + + ipc_msg=msg, + ) break - if funcname == '_cancel_task': - func: Callable = actor._cancel_task - - # we immediately start the runtime machinery - # shutdown - # with CancelScope(shield=True): - target_cid: str = kwargs['cid'] - kwargs |= { - # NOTE: ONLY the rpc-task-owning - # parent IPC channel should be able to - # cancel it! - 'parent_chan': chan, - 'requesting_uid': chan.uid, - 'ipc_msg': msg, - } - # TODO: remove? already have emit in meth. - # log.runtime( - # f'Rx RPC task cancel request\n' - # f'<= canceller: {chan.uid}\n' - # f' |_{chan}\n\n' - # f'=> {actor}\n' - # f' |_cid: {target_cid}\n' - # ) - try: - await _invoke( - actor, - cid, - chan, - func, - kwargs, - is_rpc=False, - ) - except BaseException: - log.exception( - 'Failed to cancel task?\n' - f'<= canceller: {chan.uid}\n' - f' |_{chan}\n\n' - f'=> {actor}\n' - f' |_cid: {target_cid}\n' - ) - continue - else: - # normally registry methods, eg. - # ``.register_actor()`` etc. - func: Callable = getattr(actor, funcname) - - else: - # complain to client about restricted modules - try: - func = actor._get_rpc_func(ns, funcname) - except ( - ModuleNotExposed, - AttributeError, - ) as err: - err_msg: dict[str, dict] = pack_error( - err, - cid=cid, - ) - await chan.send(err_msg) - continue - - # schedule a task for the requested RPC function - # in the actor's main "service nursery". - # TODO: possibly a service-tn per IPC channel for - # supervision isolation? would avoid having to - # manage RPC tasks individually in `._rpc_tasks` - # table? - log.runtime( - f'Spawning task for RPC request\n' - f'<= caller: {chan.uid}\n' - f' |_{chan}\n\n' - # TODO: maddr style repr? - # f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' - # f'cid="{cid[-16:]} .."\n\n' - - f'=> {actor}\n' - f' |_cid: {cid}\n' - f' |>> {func}()\n' - ) - assert actor._service_n # wait why? do it at top? - try: - ctx: Context = await actor._service_n.start( - partial( - _invoke, - actor, - cid, + # cid = msg.get('cid') + # if cid: + case ( + StartAck(cid=cid) + | Started(cid=cid) + | Yield(cid=cid) + | Stop(cid=cid) + | Return(cid=cid) + | Error(cid=cid) + ): + # deliver response to local caller/waiter + # via its per-remote-context memory channel. + await actor._push_result( chan, - func, - kwargs, - ), - name=funcname, - ) + cid, + msg, + ) - except ( - RuntimeError, - BaseExceptionGroup, - ): - # avoid reporting a benign race condition - # during actor runtime teardown. - nursery_cancelled_before_task: bool = True - break + log.runtime( + 'Waiting on next IPC msg from\n' + f'peer: {chan.uid}:\n' + f'|_{chan}\n' - # in the lone case where a ``Context`` is not - # delivered, it's likely going to be a locally - # scoped exception from ``_invoke()`` itself. - if isinstance(err := ctx, Exception): - log.warning( - 'Task for RPC failed?' - f'|_ {func}()\n\n' + # f'last msg: {msg}\n' + ) + continue - f'{err}' - ) - continue + # process a 'cmd' request-msg upack + # TODO: impl with native `msgspec.Struct` support !! + # -[ ] implement with ``match:`` syntax? + # -[ ] discard un-authed msgs as per, + # + case Start( + cid=cid, + ns=ns, + func=funcname, + kwargs=kwargs, + uid=actorid, + ): + # try: + # ( + # ns, + # funcname, + # kwargs, + # actorid, + # cid, + # ) = msg['cmd'] - else: - # mark that we have ongoing rpc tasks - actor._ongoing_rpc_tasks = trio.Event() + # # TODO: put in `case Error():` right? + # except KeyError: + # # This is the non-rpc error case, that is, an + # # error **not** raised inside a call to ``_invoke()`` + # # (i.e. no cid was provided in the msg - see above). + # # Push this error to all local channel consumers + # # (normally portals) by marking the channel as errored + # assert chan.uid + # exc = unpack_error(msg, chan=chan) + # chan._exc = exc + # raise exc - # store cancel scope such that the rpc task can be - # cancelled gracefully if requested - actor._rpc_tasks[(chan, cid)] = ( - ctx, - func, - trio.Event(), - ) + log.runtime( + 'Handling RPC `Start` request from\n' + f'peer: {actorid}\n' + '\n' + f'=> {ns}.{funcname}({kwargs})\n' + ) + # case Start( + # ns='self', + # funcname='cancel', + # ): + if ns == 'self': + if funcname == 'cancel': + func: Callable = actor.cancel + kwargs |= { + 'req_chan': chan, + } + + # don't start entire actor runtime cancellation + # if this actor is currently in debug mode! + pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete + if pdb_complete: + await pdb_complete.wait() + + # Either of `Actor.cancel()`/`.cancel_soon()` + # was called, so terminate this IPC msg + # loop, exit back out into `async_main()`, + # and immediately start the core runtime + # machinery shutdown! + with CancelScope(shield=True): + await _invoke( + actor, + cid, + chan, + func, + kwargs, + is_rpc=False, + ) + + log.runtime( + 'Cancelling IPC transport msg-loop with peer:\n' + f'|_{chan}\n' + ) + loop_cs.cancel() + break + + # case Start( + # ns='self', + # funcname='_cancel_task', + # ): + if funcname == '_cancel_task': + func: Callable = actor._cancel_task + + # we immediately start the runtime machinery + # shutdown + # with CancelScope(shield=True): + target_cid: str = kwargs['cid'] + kwargs |= { + # NOTE: ONLY the rpc-task-owning + # parent IPC channel should be able to + # cancel it! + 'parent_chan': chan, + 'requesting_uid': chan.uid, + 'ipc_msg': msg, + } + # TODO: remove? already have emit in meth. + # log.runtime( + # f'Rx RPC task cancel request\n' + # f'<= canceller: {chan.uid}\n' + # f' |_{chan}\n\n' + # f'=> {actor}\n' + # f' |_cid: {target_cid}\n' + # ) + try: + await _invoke( + actor, + cid, + chan, + func, + kwargs, + is_rpc=False, + ) + except BaseException: + log.exception( + 'Failed to cancel task?\n' + f'<= canceller: {chan.uid}\n' + f' |_{chan}\n\n' + f'=> {actor}\n' + f' |_cid: {target_cid}\n' + ) + continue + + # case Start( + # ns='self', + # funcname='register_actor', + # ): + else: + # normally registry methods, eg. + # ``.register_actor()`` etc. + func: Callable = getattr(actor, funcname) + + # case Start( + # ns=str(), + # funcname=funcname, + # ): + else: + # complain to client about restricted modules + try: + func = actor._get_rpc_func(ns, funcname) + except ( + ModuleNotExposed, + AttributeError, + ) as err: + err_msg: dict[str, dict] = pack_error( + err, + cid=cid, + ) + await chan.send(err_msg) + continue + + # schedule a task for the requested RPC function + # in the actor's main "service nursery". + # TODO: possibly a service-tn per IPC channel for + # supervision isolation? would avoid having to + # manage RPC tasks individually in `._rpc_tasks` + # table? + log.runtime( + f'Spawning task for RPC request\n' + f'<= caller: {chan.uid}\n' + f' |_{chan}\n\n' + # TODO: maddr style repr? + # f' |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' + # f'cid="{cid[-16:]} .."\n\n' + + f'=> {actor}\n' + f' |_cid: {cid}\n' + f' |>> {func}()\n' + ) + assert actor._service_n # wait why? do it at top? + try: + ctx: Context = await actor._service_n.start( + partial( + _invoke, + actor, + cid, + chan, + func, + kwargs, + ), + name=funcname, + ) + + except ( + RuntimeError, + BaseExceptionGroup, + ): + # avoid reporting a benign race condition + # during actor runtime teardown. + nursery_cancelled_before_task: bool = True + break + + # in the lone case where a ``Context`` is not + # delivered, it's likely going to be a locally + # scoped exception from ``_invoke()`` itself. + if isinstance(err := ctx, Exception): + log.warning( + 'Task for RPC failed?' + f'|_ {func}()\n\n' + + f'{err}' + ) + continue + + else: + # mark that we have ongoing rpc tasks + actor._ongoing_rpc_tasks = trio.Event() + + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + actor._rpc_tasks[(chan, cid)] = ( + ctx, + func, + trio.Event(), + ) + + case Error()|_: + # This is the non-rpc error case, that is, an + # error **not** raised inside a call to ``_invoke()`` + # (i.e. no cid was provided in the msg - see above). + # Push this error to all local channel consumers + # (normally portals) by marking the channel as errored + log.exception( + f'Unhandled IPC msg:\n\n' + f'{msg}\n' + ) + assert chan.uid + exc = unpack_error( + msg, + chan=chan, + ) + chan._exc = exc + raise exc log.runtime( 'Waiting on next IPC msg from\n' diff --git a/tractor/_runtime.py b/tractor/_runtime.py index e2d78d5..3bafada 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -91,6 +91,23 @@ from ._rpc import ( process_messages, try_ship_error_to_remote, ) +from tractor.msg import ( + types as msgtypes, + pretty_struct, +) +# from tractor.msg.types import ( +# Aid, +# SpawnSpec, +# Start, +# StartAck, +# Started, +# Yield, +# Stop, +# Return, +# Error, +# ) + + if TYPE_CHECKING: @@ -147,6 +164,7 @@ class Actor: # Information about `__main__` from parent _parent_main_data: dict[str, str] _parent_chan_cs: CancelScope|None = None + _spawn_spec: SpawnSpec|None = None # syncs for setup/teardown sequences _server_down: trio.Event|None = None @@ -537,7 +555,8 @@ class Actor: f'{pformat(msg)}\n' ) - cid = msg.get('cid') + # cid: str|None = msg.get('cid') + cid: str|None = msg.cid if cid: # deliver response to local caller/waiter await self._push_result( @@ -889,29 +908,44 @@ class Actor: f'=> {ns}.{func}({kwargs})\n' ) await chan.send( - {'cmd': ( - ns, - func, - kwargs, - self.uid, - cid, - )} + msgtypes.Start( + ns=ns, + func=func, + kwargs=kwargs, + uid=self.uid, + cid=cid, + ) ) + # {'cmd': ( + # ns, + # func, + # kwargs, + # self.uid, + # cid, + # )} + # ) # Wait on first response msg and validate; this should be # immediate. - first_msg: dict = await ctx._recv_chan.receive() - functype: str = first_msg.get('functype') + # first_msg: dict = await ctx._recv_chan.receive() + # functype: str = first_msg.get('functype') - if 'error' in first_msg: + first_msg: msgtypes.StartAck = await ctx._recv_chan.receive() + try: + functype: str = first_msg.functype + except AttributeError: raise unpack_error(first_msg, chan) + # if 'error' in first_msg: + # raise unpack_error(first_msg, chan) - elif functype not in ( + if functype not in ( 'asyncfunc', 'asyncgen', 'context', ): - raise ValueError(f"{first_msg} is an invalid response packet?") + raise ValueError( + f'{first_msg} is an invalid response packet?' + ) ctx._remote_func_type = functype return ctx @@ -944,24 +978,36 @@ class Actor: await self._do_handshake(chan) accept_addrs: list[tuple[str, int]]|None = None - if self._spawn_method == "trio": - # Receive runtime state from our parent - parent_data: dict[str, Any] - parent_data = await chan.recv() - log.runtime( - 'Received state from parent:\n\n' - # TODO: eventually all these msgs as - # `msgspec.Struct` with a special mode that - # pformats them in multi-line mode, BUT only - # if "trace"/"util" mode is enabled? - f'{pformat(parent_data)}\n' - ) - accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') - rvs = parent_data.pop('_runtime_vars') + if self._spawn_method == "trio": + + # Receive runtime state from our parent + # parent_data: dict[str, Any] + # parent_data = await chan.recv() + + # TODO: maybe we should just wrap this directly + # in a `Actor.spawn_info: SpawnInfo` struct? + spawnspec: msgtypes.SpawnSpec = await chan.recv() + self._spawn_spec = spawnspec + + # TODO: eventually all these msgs as + # `msgspec.Struct` with a special mode that + # pformats them in multi-line mode, BUT only + # if "trace"/"util" mode is enabled? + log.runtime( + 'Received runtime spec from parent:\n\n' + f'{pformat(spawnspec)}\n' + ) + # accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') + accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs + + # rvs = parent_data.pop('_runtime_vars') + rvs = spawnspec._runtime_vars if rvs['_debug_mode']: try: - log.info('Enabling `stackscope` traces on SIGUSR1') + log.info( + 'Enabling `stackscope` traces on SIGUSR1' + ) from .devx import enable_stack_on_sig enable_stack_on_sig() except ImportError: @@ -969,28 +1015,40 @@ class Actor: '`stackscope` not installed for use in debug mode!' ) - log.runtime(f"Runtime vars are: {rvs}") + log.runtime(f'Runtime vars are: {rvs}') rvs['_is_root'] = False _state._runtime_vars.update(rvs) - for attr, value in parent_data.items(): - if ( - attr == 'reg_addrs' - and value - ): - # XXX: ``msgspec`` doesn't support serializing tuples - # so just cash manually here since it's what our - # internals expect. - # TODO: we don't really NEED these as - # tuples so we can probably drop this - # casting since apparently in python lists - # are "more efficient"? - self.reg_addrs = [tuple(val) for val in value] + # XXX: ``msgspec`` doesn't support serializing tuples + # so just cash manually here since it's what our + # internals expect. + # + self.reg_addrs = [ + # TODO: we don't really NEED these as tuples? + # so we can probably drop this casting since + # apparently in python lists are "more + # efficient"? + tuple(val) + for val in spawnspec.reg_addrs + ] - else: - setattr(self, attr, value) + # for attr, value in parent_data.items(): + for _, attr, value in pretty_struct.iter_fields( + spawnspec, + ): + setattr(self, attr, value) + # if ( + # attr == 'reg_addrs' + # and value + # ): + # self.reg_addrs = [tuple(val) for val in value] + # else: + # setattr(self, attr, value) - return chan, accept_addrs + return ( + chan, + accept_addrs, + ) except OSError: # failed to connect log.warning( @@ -1432,7 +1490,7 @@ class Actor: self, chan: Channel - ) -> tuple[str, str]: + ) -> msgtypes.Aid: ''' Exchange `(name, UUIDs)` identifiers as the first communication step with any (peer) remote `Actor`. @@ -1441,14 +1499,27 @@ class Actor: "actor model" parlance. ''' - await chan.send(self.uid) - value: tuple = await chan.recv() - uid: tuple[str, str] = (str(value[0]), str(value[1])) + name, uuid = self.uid + await chan.send( + msgtypes.Aid( + name=name, + uuid=uuid, + ) + ) + aid: msgtypes.Aid = await chan.recv() + chan.aid = aid + + uid: tuple[str, str] = ( + # str(value[0]), + # str(value[1]) + aid.name, + aid.uuid, + ) if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") - chan.uid = str(uid[0]), str(uid[1]) + chan.uid = uid return uid def is_infected_aio(self) -> bool: @@ -1508,7 +1579,8 @@ async def async_main( # because we're running in mp mode if ( set_accept_addr_says_rent - and set_accept_addr_says_rent is not None + and + set_accept_addr_says_rent is not None ): accept_addrs = set_accept_addr_says_rent diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 741a2f8..4715bd1 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -49,6 +49,9 @@ from tractor._portal import Portal from tractor._runtime import Actor from tractor._entry import _mp_main from tractor._exceptions import ActorFailure +from tractor.msg.types import ( + SpawnSpec, +) if TYPE_CHECKING: @@ -489,14 +492,25 @@ async def trio_proc( portal, ) - # send additional init params - await chan.send({ - '_parent_main_data': subactor._parent_main_data, - 'enable_modules': subactor.enable_modules, - 'reg_addrs': subactor.reg_addrs, - 'bind_addrs': bind_addrs, - '_runtime_vars': _runtime_vars, - }) + # send a "spawning specification" which configures the + # initial runtime state of the child. + await chan.send( + SpawnSpec( + _parent_main_data=subactor._parent_main_data, + enable_modules=subactor.enable_modules, + reg_addrs=subactor.reg_addrs, + bind_addrs=bind_addrs, + _runtime_vars=_runtime_vars, + ) + ) + + # await chan.send({ + # '_parent_main_data': subactor._parent_main_data, + # 'enable_modules': subactor.enable_modules, + # 'reg_addrs': subactor.reg_addrs, + # 'bind_addrs': bind_addrs, + # '_runtime_vars': _runtime_vars, + # }) # track subactor in current nursery curr_actor = current_actor() diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 90c33d3..941cfe8 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -43,6 +43,11 @@ from .trionics import ( broadcast_receiver, BroadcastReceiver, ) +from tractor.msg import ( + Stop, + Yield, + Error, +) if TYPE_CHECKING: from ._context import Context @@ -94,21 +99,25 @@ class MsgStream(trio.abc.Channel): self, allow_msg_keys: list[str] = ['yield'], ): - msg: dict = self._rx_chan.receive_nowait() + # msg: dict = self._rx_chan.receive_nowait() + msg: Yield|Stop = self._rx_chan.receive_nowait() for ( i, key, ) in enumerate(allow_msg_keys): try: - return msg[key] - except KeyError as kerr: + # return msg[key] + return msg.pld + # except KeyError as kerr: + except AttributeError as attrerr: if i < (len(allow_msg_keys) - 1): continue _raise_from_no_key_in_msg( ctx=self._ctx, msg=msg, - src_err=kerr, + # src_err=kerr, + src_err=attrerr, log=log, expect_key=key, stream=self, @@ -148,18 +157,22 @@ class MsgStream(trio.abc.Channel): src_err: Exception|None = None # orig tb try: try: - msg = await self._rx_chan.receive() - return msg['yield'] + msg: Yield = await self._rx_chan.receive() + # return msg['yield'] + return msg.pld - except KeyError as kerr: - src_err = kerr + # except KeyError as kerr: + except AttributeError as attrerr: + # src_err = kerr + src_err = attrerr # NOTE: may raise any of the below error types # includg EoC when a 'stop' msg is found. _raise_from_no_key_in_msg( ctx=self._ctx, msg=msg, - src_err=kerr, + # src_err=kerr, + src_err=attrerr, log=log, expect_key='yield', stream=self, @@ -514,11 +527,18 @@ class MsgStream(trio.abc.Channel): raise self._closed try: + # await self._ctx.chan.send( + # payload={ + # 'yield': data, + # 'cid': self._ctx.cid, + # }, + # # hide_tb=hide_tb, + # ) await self._ctx.chan.send( - payload={ - 'yield': data, - 'cid': self._ctx.cid, - }, + payload=Yield( + cid=self._ctx.cid, + pld=data, + ), # hide_tb=hide_tb, ) except ( diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index 255b1db..26155b2 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -935,6 +935,9 @@ async def _pause( # ``breakpoint()`` was awaited and begin handling stdio. log.debug('Entering sync world of the `pdb` REPL..') try: + # log.critical( + # f'stack len: {len(pdb.stack)}\n' + # ) debug_func( actor, pdb,