diff --git a/tractor/_context.py b/tractor/_context.py index ed720a2..42271b0 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -15,12 +15,22 @@ # along with this program. If not, see . ''' -The fundamental cross process SC abstraction: an inter-actor, -cancel-scope linked task "context". +The fundamental cross-process SC abstraction: an inter-actor, +transitively cancel-scope linked, (dual) task IPC coupled "context". -A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built -into each ``trio.Nursery`` except it links the lifetimes of memory space -disjoint, parallel executing tasks in separate actors. +A `Context` is very similar to the look and feel of the +`.cancel_scope: trio.CancelScope` built into each `trio.Nursery` +except that it links the lifetimes of 2 memory space disjoint, +parallel executing, tasks scheduled in separate "actors". + +So while a `trio.Nursery` has a `.parent_task` which exists both +before (open) and then inside the body of the `async with` of the +nursery's scope (/block), a `Context` contains 2 tasks, a "parent" +and a "child" side, where both execute independently in separate +memory domains of different (host's) processes linked through +a SC-transitive IPC "shuttle dialog protocol". The underlying IPC +dialog-(un)protocol allows for the maintainance of SC properties +end-2-end between the tasks. ''' from __future__ import annotations @@ -71,13 +81,11 @@ from .msg import ( MsgCodec, NamespacePath, PayloadT, - Return, Started, Stop, Yield, current_codec, pretty_struct, - types as msgtypes, _ops as msgops, ) from ._ipc import ( @@ -90,7 +98,7 @@ from ._state import ( debug_mode, _ctxvar_Context, ) - +# ------ - ------ if TYPE_CHECKING: from ._portal import Portal from ._runtime import Actor @@ -1598,16 +1606,15 @@ class Context: async def started( self, - # TODO: how to type this so that it's the - # same as the payload type? Is this enough? value: PayloadT|None = None, + validate_pld_spec: bool = True, + strict_pld_parity: bool = False, - strict_parity: bool = False, + # TODO: this will always emit for msgpack for any () vs. [] + # inside the value.. do we want to offer warnings on that? + # complain_no_parity: bool = False, - # TODO: this will always emit now that we do `.pld: Raw` - # passthrough.. so maybe just only complain when above strict - # flag is set? - complain_no_parity: bool = False, + hide_tb: bool = True, ) -> None: ''' @@ -1648,63 +1655,54 @@ class Context: # # https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern # - codec: MsgCodec = current_codec() - msg_bytes: bytes = codec.encode(started_msg) - try: - # be a "cheap" dialog (see above!) - if ( - strict_parity - or - complain_no_parity - ): - rt_started: Started = codec.decode(msg_bytes) - - # XXX something is prolly totes cucked with the - # codec state! - if isinstance(rt_started, dict): - rt_started = msgtypes.from_dict_msg( - dict_msg=rt_started, - ) - raise RuntimeError( - 'Failed to roundtrip `Started` msg?\n' - f'{pretty_struct.pformat(rt_started)}\n' - ) - - if rt_started != started_msg: + __tracebackhide__: bool = hide_tb + if validate_pld_spec: + # __tracebackhide__: bool = False + codec: MsgCodec = current_codec() + msg_bytes: bytes = codec.encode(started_msg) + try: + roundtripped: Started = codec.decode(msg_bytes) + # pld: PayloadT = await self.pld_rx.recv_pld( + pld: PayloadT = self.pld_rx.dec_msg( + msg=roundtripped, + ipc=self, + expect_msg=Started, + hide_tb=hide_tb, + is_started_send_side=True, + ) + if ( + strict_pld_parity + and + pld != value + ): # TODO: make that one a mod func too.. diff = pretty_struct.Struct.__sub__( - rt_started, + roundtripped, started_msg, ) complaint: str = ( 'Started value does not match after roundtrip?\n\n' f'{diff}' ) + raise ValidationError(complaint) - # TODO: rn this will pretty much always fail with - # any other sequence type embeded in the - # payload... - if ( - self._strict_started - or - strict_parity - ): - raise ValueError(complaint) - else: - log.warning(complaint) + # raise any msg type error NO MATTER WHAT! + except ValidationError as verr: + # always show this src frame in the tb + # __tracebackhide__: bool = False + raise _mk_msg_type_err( + msg=roundtripped, + codec=codec, + src_validation_error=verr, + verb_header='Trying to send ', + is_invalid_payload=True, + ) from verr - await self.chan.send(started_msg) - - # raise any msg type error NO MATTER WHAT! - except ValidationError as verr: - raise _mk_msg_type_err( - msg=msg_bytes, - codec=codec, - src_validation_error=verr, - verb_header='Trying to send payload' - # > 'invalid `Started IPC msgs\n' - ) from verr + # TODO: maybe a flag to by-pass encode op if already done + # here in caller? + await self.chan.send(started_msg) + # set msg-related internal runtime-state self._started_called = True self._started_msg = started_msg self._started_pld = value @@ -1997,12 +1995,7 @@ async def open_context_from_portal( pld_spec: TypeAlias|None = None, allow_overruns: bool = False, - - # TODO: if we set this the wrapping `@acm` body will - # still be shown (awkwardly) on pdb REPL entry. Ideally - # we can similarly annotate that frame to NOT show? for now - # we DO SHOW this frame since it's awkward ow.. - hide_tb: bool = False, + hide_tb: bool = True, # proxied to RPC **kwargs, @@ -2115,6 +2108,7 @@ async def open_context_from_portal( ipc=ctx, expect_msg=Started, passthrough_non_pld_msgs=False, + hide_tb=hide_tb, ) # from .devx import pause diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 3014c15..6faf78e 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -47,7 +47,7 @@ from tractor._exceptions import ( _raise_from_unexpected_msg, MsgTypeError, _mk_msg_type_err, - pack_from_raise, + pack_error, ) from tractor._state import current_ipc_ctx from ._codec import ( @@ -203,7 +203,6 @@ class PldRx(Struct): msg: MsgType = ( ipc_msg or - # async-rx msg from underlying IPC feeder (mem-)chan await ipc._rx_chan.receive() ) @@ -223,6 +222,10 @@ class PldRx(Struct): raise_error: bool = True, hide_tb: bool = True, + # XXX for special (default?) case of send side call with + # `Context.started(validate_pld_spec=True)` + is_started_send_side: bool = False, + ) -> PayloadT|Raw: ''' Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and @@ -230,8 +233,6 @@ class PldRx(Struct): ''' __tracebackhide__: bool = hide_tb - - _src_err = None src_err: BaseException|None = None match msg: # payload-data shuttle msg; deliver the `.pld` value @@ -256,18 +257,58 @@ class PldRx(Struct): # pack mgterr into error-msg for # reraise below; ensure remote-actor-err # info is displayed nicely? - msgterr: MsgTypeError = _mk_msg_type_err( + mte: MsgTypeError = _mk_msg_type_err( msg=msg, codec=self.pld_dec, src_validation_error=valerr, is_invalid_payload=True, + expected_msg=expect_msg, + # ipc_msg=msg, ) - msg: Error = pack_from_raise( - local_err=msgterr, + # NOTE: override the `msg` passed to + # `_raise_from_unexpected_msg()` (below) so so that + # we're effectively able to use that same func to + # unpack and raise an "emulated remote `Error`" of + # this local MTE. + err_msg: Error = pack_error( + exc=mte, cid=msg.cid, - src_uid=ipc.chan.uid, + src_uid=( + ipc.chan.uid + if not is_started_send_side + else ipc._actor.uid + ), + # tb=valerr.__traceback__, + tb_str=mte._message, ) + # ^-TODO-^ just raise this inline instead of all the + # pack-unpack-repack non-sense! + + mte._ipc_msg = err_msg + msg = err_msg + + # set emulated remote error more-or-less as the + # runtime would + ctx: Context = getattr(ipc, 'ctx', ipc) + + # TODO: should we instead make this explicit and + # use the above masked `is_started_send_decode`, + # expecting the `Context.started()` caller to set + # it? Rn this is kinda, howyousayyy, implicitly + # edge-case-y.. + if ( + expect_msg is not Started + and not is_started_send_side + ): + ctx._maybe_cancel_and_set_remote_error(mte) + + # XXX NOTE: so when the `_raise_from_unexpected_msg()` + # raises the boxed `err_msg` from above it raises + # it from `None`. src_err = valerr + # if is_started_send_side: + # src_err = None + # XXX some other decoder specific failure? # except TypeError as src_error: @@ -379,6 +420,7 @@ class PldRx(Struct): # NOTE: generally speaking only for handling `Stop`-msgs that # arrive during a call to `drain_to_final_msg()` above! passthrough_non_pld_msgs: bool = True, + hide_tb: bool = True, **kwargs, ) -> tuple[MsgType, PayloadT]: @@ -387,6 +429,7 @@ class PldRx(Struct): the pair of refs. ''' + __tracebackhide__: bool = hide_tb msg: MsgType = await ipc._rx_chan.receive() if passthrough_non_pld_msgs: @@ -401,6 +444,7 @@ class PldRx(Struct): msg, ipc=ipc, expect_msg=expect_msg, + hide_tb=hide_tb, **kwargs, ) return msg, pld @@ -414,7 +458,7 @@ def limit_plds( ) -> MsgDec: ''' Apply a `MsgCodec` that will natively decode the SC-msg set's - `Msg.pld: Union[Type[Struct]]` payload fields using + `PayloadMsg.pld: Union[Type[Struct]]` payload fields using tagged-unions of `msgspec.Struct`s from the `payload_types` for all IPC contexts in use by the current `trio.Task`. @@ -691,3 +735,11 @@ async def drain_to_final_msg( result_msg, pre_result_drained, ) + + +# TODO: factor logic from `.Context.started()` for send-side +# validate raising! +def validate_payload_msg( + msg: Started|Yield|Return, +) -> MsgTypeError|None: + ...