Compare commits
	
		
			No commits in common. "eee4c61b51e6d3053549e67650be04fcd03ab2d5" and "e4ec6b7b0c3401b336e03a4ea7b42015480b7677" have entirely different histories. 
		
	
	
		
			eee4c61b51
			...
			e4ec6b7b0c
		
	
		|  | @ -1,316 +0,0 @@ | ||||||
| ''' |  | ||||||
| Audit sub-sys APIs from `.msg._ops` |  | ||||||
| mostly for ensuring correct `contextvars` |  | ||||||
| related settings around IPC contexts. |  | ||||||
| 
 |  | ||||||
| ''' |  | ||||||
| from contextlib import ( |  | ||||||
|     asynccontextmanager as acm, |  | ||||||
|     contextmanager as cm, |  | ||||||
| ) |  | ||||||
| # import typing |  | ||||||
| from typing import ( |  | ||||||
|     # Any, |  | ||||||
|     TypeAlias, |  | ||||||
|     # Union, |  | ||||||
| ) |  | ||||||
| from contextvars import ( |  | ||||||
|     Context, |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| from msgspec import ( |  | ||||||
|     # structs, |  | ||||||
|     # msgpack, |  | ||||||
|     Struct, |  | ||||||
|     # ValidationError, |  | ||||||
| ) |  | ||||||
| import pytest |  | ||||||
| import trio |  | ||||||
| 
 |  | ||||||
| import tractor |  | ||||||
| from tractor import ( |  | ||||||
|     # _state, |  | ||||||
|     MsgTypeError, |  | ||||||
|     current_ipc_ctx, |  | ||||||
|     Portal, |  | ||||||
| ) |  | ||||||
| from tractor.msg import ( |  | ||||||
|     _ops as msgops, |  | ||||||
|     Return, |  | ||||||
| ) |  | ||||||
| from tractor.msg import ( |  | ||||||
|     _codec, |  | ||||||
|     # _ctxvar_MsgCodec, |  | ||||||
| 
 |  | ||||||
|     # NamespacePath, |  | ||||||
|     # MsgCodec, |  | ||||||
|     # mk_codec, |  | ||||||
|     # apply_codec, |  | ||||||
|     # current_codec, |  | ||||||
| ) |  | ||||||
| from tractor.msg.types import ( |  | ||||||
|     log, |  | ||||||
|     # _payload_msgs, |  | ||||||
|     # PayloadMsg, |  | ||||||
|     # Started, |  | ||||||
|     # mk_msg_spec, |  | ||||||
| ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| class PldMsg(Struct): |  | ||||||
|     field: str |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| maybe_msg_spec = PldMsg|None |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @cm |  | ||||||
| def custom_spec( |  | ||||||
|     ctx: Context, |  | ||||||
|     spec: TypeAlias, |  | ||||||
| ) -> _codec.MsgCodec: |  | ||||||
|     ''' |  | ||||||
|     Apply a custom payload spec, remove on exit. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     rx: msgops.PldRx = ctx._pld_rx |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @acm |  | ||||||
| async def maybe_expect_raises( |  | ||||||
|     raises: BaseException|None = None, |  | ||||||
|     ensure_in_message: list[str]|None = None, |  | ||||||
| 
 |  | ||||||
|     reraise: bool = False, |  | ||||||
|     timeout: int = 3, |  | ||||||
| ) -> None: |  | ||||||
|     ''' |  | ||||||
|     Async wrapper for ensuring errors propagate from the inner scope. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     with trio.fail_after(timeout): |  | ||||||
|         try: |  | ||||||
|             yield |  | ||||||
|         except BaseException as _inner_err: |  | ||||||
|             inner_err = _inner_err |  | ||||||
|             # wasn't-expected to error.. |  | ||||||
|             if raises is None: |  | ||||||
|                 raise |  | ||||||
| 
 |  | ||||||
|             else: |  | ||||||
|                 assert type(inner_err) is raises |  | ||||||
| 
 |  | ||||||
|                 # maybe check for error txt content |  | ||||||
|                 if ensure_in_message: |  | ||||||
|                     part: str |  | ||||||
|                     for part in ensure_in_message: |  | ||||||
|                         for i, arg in enumerate(inner_err.args): |  | ||||||
|                             if part in arg: |  | ||||||
|                                 break |  | ||||||
|                         # if part never matches an arg, then we're |  | ||||||
|                         # missing a match. |  | ||||||
|                         else: |  | ||||||
|                             raise ValueError( |  | ||||||
|                                 'Failed to find error message content?\n\n' |  | ||||||
|                                 f'expected: {ensure_in_message!r}\n' |  | ||||||
|                                 f'part: {part!r}\n\n' |  | ||||||
|                                 f'{inner_err.args}' |  | ||||||
|                         ) |  | ||||||
| 
 |  | ||||||
|                 if reraise: |  | ||||||
|                     raise inner_err |  | ||||||
| 
 |  | ||||||
|         else: |  | ||||||
|             if raises: |  | ||||||
|                 raise RuntimeError( |  | ||||||
|                     f'Expected a {raises.__name__!r} to be raised?' |  | ||||||
|                 ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @tractor.context |  | ||||||
| async def child( |  | ||||||
|     ctx: Context, |  | ||||||
|     started_value: int|PldMsg|None, |  | ||||||
|     return_value: str|None, |  | ||||||
|     validate_pld_spec: bool, |  | ||||||
|     raise_on_started_mte: bool = True, |  | ||||||
| 
 |  | ||||||
| ) -> None: |  | ||||||
|     ''' |  | ||||||
|     Call ``Context.started()`` more then once (an error). |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     expect_started_mte: bool = started_value == 10 |  | ||||||
| 
 |  | ||||||
|     # sanaity check that child RPC context is the current one |  | ||||||
|     curr_ctx: Context = current_ipc_ctx() |  | ||||||
|     assert ctx is curr_ctx |  | ||||||
| 
 |  | ||||||
|     rx: msgops.PldRx = ctx._pld_rx |  | ||||||
|     orig_pldec: _codec.MsgDec = rx.pld_dec |  | ||||||
|     # senity that default pld-spec should be set |  | ||||||
|     assert ( |  | ||||||
|         rx.pld_dec |  | ||||||
|         is |  | ||||||
|         msgops._def_any_pldec |  | ||||||
|     ) |  | ||||||
| 
 |  | ||||||
|     try: |  | ||||||
|         with msgops.limit_plds( |  | ||||||
|             spec=maybe_msg_spec, |  | ||||||
|         ) as pldec: |  | ||||||
|             # sanity on `MsgDec` state |  | ||||||
|             assert rx.pld_dec is pldec |  | ||||||
|             assert pldec.spec is maybe_msg_spec |  | ||||||
| 
 |  | ||||||
|             # 2 cases: hdndle send-side and recv-only validation |  | ||||||
|             # - when `raise_on_started_mte == True`, send validate |  | ||||||
|             # - else, parent-recv-side only validation |  | ||||||
|             try: |  | ||||||
|                 await ctx.started( |  | ||||||
|                     value=started_value, |  | ||||||
|                     validate_pld_spec=validate_pld_spec, |  | ||||||
|                 ) |  | ||||||
| 
 |  | ||||||
|             except MsgTypeError: |  | ||||||
|                 log.exception('started()` raised an MTE!\n') |  | ||||||
|                 if not expect_started_mte: |  | ||||||
|                     raise RuntimeError( |  | ||||||
|                         'Child-ctx-task SHOULD NOT HAVE raised an MTE for\n\n' |  | ||||||
|                         f'{started_value!r}\n' |  | ||||||
|                     ) |  | ||||||
| 
 |  | ||||||
|                 # propagate to parent? |  | ||||||
|                 if raise_on_started_mte: |  | ||||||
|                     raise |  | ||||||
|             else: |  | ||||||
|                 if expect_started_mte: |  | ||||||
|                     raise RuntimeError( |  | ||||||
|                         'Child-ctx-task SHOULD HAVE raised an MTE for\n\n' |  | ||||||
|                         f'{started_value!r}\n' |  | ||||||
|                     ) |  | ||||||
| 
 |  | ||||||
|             # XXX should always fail on recv side since we can't |  | ||||||
|             # really do much else beside terminate and relay the |  | ||||||
|             # msg-type-error from this RPC task ;) |  | ||||||
|             return return_value |  | ||||||
| 
 |  | ||||||
|     finally: |  | ||||||
|         # sanity on `limit_plds()` reversion |  | ||||||
|         assert ( |  | ||||||
|             rx.pld_dec |  | ||||||
|             is |  | ||||||
|             msgops._def_any_pldec |  | ||||||
|         ) |  | ||||||
|         log.runtime( |  | ||||||
|             'Reverted to previous pld-spec\n\n' |  | ||||||
|             f'{orig_pldec}\n' |  | ||||||
|         ) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| @pytest.mark.parametrize( |  | ||||||
|     'return_value', |  | ||||||
|     [ |  | ||||||
|         None, |  | ||||||
|         'yo', |  | ||||||
|     ], |  | ||||||
|     ids=[ |  | ||||||
|         'return[invalid-"yo"]', |  | ||||||
|         'return[valid-None]', |  | ||||||
|     ], |  | ||||||
| ) |  | ||||||
| @pytest.mark.parametrize( |  | ||||||
|     'started_value', |  | ||||||
|     [ |  | ||||||
|         10, |  | ||||||
|         PldMsg(field='yo'), |  | ||||||
|     ], |  | ||||||
|     ids=[ |  | ||||||
|         'Started[invalid-10]', |  | ||||||
|         'Started[valid-PldMsg]', |  | ||||||
|     ], |  | ||||||
| ) |  | ||||||
| @pytest.mark.parametrize( |  | ||||||
|     'pld_check_started_value', |  | ||||||
|     [ |  | ||||||
|         True, |  | ||||||
|         False, |  | ||||||
|     ], |  | ||||||
|     ids=[ |  | ||||||
|         'check-started-pld', |  | ||||||
|         'no-started-pld-validate', |  | ||||||
|     ], |  | ||||||
| ) |  | ||||||
| def test_basic_payload_spec( |  | ||||||
|     debug_mode: bool, |  | ||||||
|     loglevel: str, |  | ||||||
|     return_value: str|None, |  | ||||||
|     started_value: int|PldMsg, |  | ||||||
|     pld_check_started_value: bool, |  | ||||||
| ): |  | ||||||
|     ''' |  | ||||||
|     Validate the most basic `PldRx` msg-type-spec semantics around |  | ||||||
|     a IPC `Context` endpoint start, started-sync, and final return |  | ||||||
|     value depending on set payload types and the currently applied |  | ||||||
|     pld-spec. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     invalid_return: bool = return_value == 'yo' |  | ||||||
|     invalid_started: bool = started_value == 10 |  | ||||||
| 
 |  | ||||||
|     async def main(): |  | ||||||
|         async with tractor.open_nursery( |  | ||||||
|             debug_mode=debug_mode, |  | ||||||
|             loglevel=loglevel, |  | ||||||
|         ) as an: |  | ||||||
|             p: Portal = await an.start_actor( |  | ||||||
|                 'child', |  | ||||||
|                 enable_modules=[__name__], |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|             # since not opened yet. |  | ||||||
|             assert current_ipc_ctx() is None |  | ||||||
| 
 |  | ||||||
|             async with ( |  | ||||||
|                 maybe_expect_raises( |  | ||||||
|                     raises=MsgTypeError if ( |  | ||||||
|                         invalid_return |  | ||||||
|                         or |  | ||||||
|                         invalid_started |  | ||||||
|                     ) else None, |  | ||||||
|                     ensure_in_message=[ |  | ||||||
|                         "invalid `Return` payload", |  | ||||||
|                         "value: `'yo'` does not match type-spec: `Return.pld: PldMsg|NoneType`", |  | ||||||
|                     ], |  | ||||||
|                 ), |  | ||||||
|                 p.open_context( |  | ||||||
|                     child, |  | ||||||
|                     return_value=return_value, |  | ||||||
|                     started_value=started_value, |  | ||||||
|                     pld_spec=maybe_msg_spec, |  | ||||||
|                     validate_pld_spec=pld_check_started_value, |  | ||||||
|                 ) as (ctx, first), |  | ||||||
|             ): |  | ||||||
|                 # now opened with 'child' sub |  | ||||||
|                 assert current_ipc_ctx() is ctx |  | ||||||
| 
 |  | ||||||
|                 assert type(first) is PldMsg |  | ||||||
|                 assert first.field == 'yo' |  | ||||||
| 
 |  | ||||||
|                 try: |  | ||||||
|                     assert (await ctx.result()) is None |  | ||||||
|                 except MsgTypeError as mte: |  | ||||||
|                     if not invalid_return: |  | ||||||
|                         raise |  | ||||||
| 
 |  | ||||||
|                     else:  # expected this invalid `Return.pld` |  | ||||||
|                         assert mte.cid == ctx.cid |  | ||||||
| 
 |  | ||||||
|                         # verify expected remote mte deats |  | ||||||
|                         await tractor.pause() |  | ||||||
|                         assert ctx._remote_error is mte |  | ||||||
|                         assert mte.expected_msg_type is Return |  | ||||||
| 
 |  | ||||||
|             await p.cancel_actor() |  | ||||||
| 
 |  | ||||||
|     trio.run(main) |  | ||||||
|  | @ -15,22 +15,12 @@ | ||||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| The fundamental cross-process SC abstraction: an inter-actor, | The fundamental cross process SC abstraction: an inter-actor, | ||||||
| transitively cancel-scope linked, (dual) task IPC coupled "context". | cancel-scope linked task "context". | ||||||
| 
 | 
 | ||||||
| A `Context` is very similar to the look and feel of the | A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built | ||||||
| `.cancel_scope: trio.CancelScope` built into each `trio.Nursery` | into each ``trio.Nursery`` except it links the lifetimes of memory space | ||||||
| except that it links the lifetimes of 2 memory space disjoint, | disjoint, parallel executing tasks in separate actors. | ||||||
| parallel executing, tasks scheduled in separate "actors". |  | ||||||
| 
 |  | ||||||
| So while a `trio.Nursery` has a `.parent_task` which exists both |  | ||||||
| before (open) and then inside the body of the `async with` of the |  | ||||||
| nursery's scope (/block), a `Context` contains 2 tasks, a "parent" |  | ||||||
| and a "child" side, where both execute independently in separate |  | ||||||
| memory domains of different (host's) processes linked through |  | ||||||
| a SC-transitive IPC "shuttle dialog protocol". The underlying IPC |  | ||||||
| dialog-(un)protocol allows for the maintainance of SC properties |  | ||||||
| end-2-end between the tasks. |  | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| from __future__ import annotations | from __future__ import annotations | ||||||
|  | @ -81,11 +71,13 @@ from .msg import ( | ||||||
|     MsgCodec, |     MsgCodec, | ||||||
|     NamespacePath, |     NamespacePath, | ||||||
|     PayloadT, |     PayloadT, | ||||||
|  |     Return, | ||||||
|     Started, |     Started, | ||||||
|     Stop, |     Stop, | ||||||
|     Yield, |     Yield, | ||||||
|     current_codec, |     current_codec, | ||||||
|     pretty_struct, |     pretty_struct, | ||||||
|  |     types as msgtypes, | ||||||
|     _ops as msgops, |     _ops as msgops, | ||||||
| ) | ) | ||||||
| from ._ipc import ( | from ._ipc import ( | ||||||
|  | @ -98,7 +90,7 @@ from ._state import ( | ||||||
|     debug_mode, |     debug_mode, | ||||||
|     _ctxvar_Context, |     _ctxvar_Context, | ||||||
| ) | ) | ||||||
| # ------ - ------ | 
 | ||||||
| if TYPE_CHECKING: | if TYPE_CHECKING: | ||||||
|     from ._portal import Portal |     from ._portal import Portal | ||||||
|     from ._runtime import Actor |     from ._runtime import Actor | ||||||
|  | @ -1606,15 +1598,16 @@ class Context: | ||||||
|     async def started( |     async def started( | ||||||
|         self, |         self, | ||||||
| 
 | 
 | ||||||
|  |         # TODO: how to type this so that it's the | ||||||
|  |         # same as the payload type? Is this enough? | ||||||
|         value: PayloadT|None = None, |         value: PayloadT|None = None, | ||||||
|         validate_pld_spec: bool = True, |  | ||||||
|         strict_pld_parity: bool = False, |  | ||||||
| 
 | 
 | ||||||
|         # TODO: this will always emit for msgpack for any () vs. [] |         strict_parity: bool = False, | ||||||
|         # inside the value.. do we want to offer warnings on that? |  | ||||||
|         # complain_no_parity: bool = False, |  | ||||||
| 
 | 
 | ||||||
|         hide_tb: bool = True, |         # TODO: this will always emit now that we do `.pld: Raw` | ||||||
|  |         # passthrough.. so maybe just only complain when above strict | ||||||
|  |         # flag is set? | ||||||
|  |         complain_no_parity: bool = False, | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
|         ''' |         ''' | ||||||
|  | @ -1655,54 +1648,63 @@ class Context: | ||||||
|         # |         # | ||||||
|         # https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern |         # https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern | ||||||
|         # |         # | ||||||
|         __tracebackhide__: bool = hide_tb |  | ||||||
|         if validate_pld_spec: |  | ||||||
|             # __tracebackhide__: bool = False |  | ||||||
|         codec: MsgCodec = current_codec() |         codec: MsgCodec = current_codec() | ||||||
|         msg_bytes: bytes = codec.encode(started_msg) |         msg_bytes: bytes = codec.encode(started_msg) | ||||||
|         try: |         try: | ||||||
|                 roundtripped: Started = codec.decode(msg_bytes) |             # be a "cheap" dialog (see above!) | ||||||
|                 # pld: PayloadT = await self.pld_rx.recv_pld( |  | ||||||
|                 pld: PayloadT = self.pld_rx.dec_msg( |  | ||||||
|                     msg=roundtripped, |  | ||||||
|                     ipc=self, |  | ||||||
|                     expect_msg=Started, |  | ||||||
|                     hide_tb=hide_tb, |  | ||||||
|                     is_started_send_side=True, |  | ||||||
|                 ) |  | ||||||
|             if ( |             if ( | ||||||
|                     strict_pld_parity |                 strict_parity | ||||||
|                     and |                 or | ||||||
|                     pld != value |                 complain_no_parity | ||||||
|             ): |             ): | ||||||
|  |                 rt_started: Started = codec.decode(msg_bytes) | ||||||
|  | 
 | ||||||
|  |                 # XXX something is prolly totes cucked with the | ||||||
|  |                 # codec state! | ||||||
|  |                 if isinstance(rt_started, dict): | ||||||
|  |                     rt_started = msgtypes.from_dict_msg( | ||||||
|  |                         dict_msg=rt_started, | ||||||
|  |                     ) | ||||||
|  |                     raise RuntimeError( | ||||||
|  |                         'Failed to roundtrip `Started` msg?\n' | ||||||
|  |                         f'{pretty_struct.pformat(rt_started)}\n' | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |                 if rt_started != started_msg: | ||||||
|                     # TODO: make that one a mod func too.. |                     # TODO: make that one a mod func too.. | ||||||
|                     diff = pretty_struct.Struct.__sub__( |                     diff = pretty_struct.Struct.__sub__( | ||||||
|                         roundtripped, |                         rt_started, | ||||||
|                         started_msg, |                         started_msg, | ||||||
|                     ) |                     ) | ||||||
|                     complaint: str = ( |                     complaint: str = ( | ||||||
|                         'Started value does not match after roundtrip?\n\n' |                         'Started value does not match after roundtrip?\n\n' | ||||||
|                         f'{diff}' |                         f'{diff}' | ||||||
|                     ) |                     ) | ||||||
|                     raise ValidationError(complaint) | 
 | ||||||
|  |                     # TODO: rn this will pretty much always fail with | ||||||
|  |                     # any other sequence type embeded in the | ||||||
|  |                     # payload... | ||||||
|  |                     if ( | ||||||
|  |                         self._strict_started | ||||||
|  |                         or | ||||||
|  |                         strict_parity | ||||||
|  |                     ): | ||||||
|  |                         raise ValueError(complaint) | ||||||
|  |                     else: | ||||||
|  |                         log.warning(complaint) | ||||||
|  | 
 | ||||||
|  |             await self.chan.send(started_msg) | ||||||
| 
 | 
 | ||||||
|         # raise any msg type error NO MATTER WHAT! |         # raise any msg type error NO MATTER WHAT! | ||||||
|         except ValidationError as verr: |         except ValidationError as verr: | ||||||
|                 # always show this src frame in the tb |  | ||||||
|                 # __tracebackhide__: bool = False |  | ||||||
|             raise _mk_msg_type_err( |             raise _mk_msg_type_err( | ||||||
|                     msg=roundtripped, |                 msg=msg_bytes, | ||||||
|                 codec=codec, |                 codec=codec, | ||||||
|                 src_validation_error=verr, |                 src_validation_error=verr, | ||||||
|                     verb_header='Trying to send ', |                 verb_header='Trying to send payload' | ||||||
|                     is_invalid_payload=True, |                 # > 'invalid `Started IPC msgs\n' | ||||||
|             ) from verr |             ) from verr | ||||||
| 
 | 
 | ||||||
|         # TODO: maybe a flag to by-pass encode op if already done |  | ||||||
|         # here in caller? |  | ||||||
|         await self.chan.send(started_msg) |  | ||||||
| 
 |  | ||||||
|         # set msg-related internal runtime-state |  | ||||||
|         self._started_called = True |         self._started_called = True | ||||||
|         self._started_msg = started_msg |         self._started_msg = started_msg | ||||||
|         self._started_pld = value |         self._started_pld = value | ||||||
|  | @ -1995,7 +1997,12 @@ async def open_context_from_portal( | ||||||
| 
 | 
 | ||||||
|     pld_spec: TypeAlias|None = None, |     pld_spec: TypeAlias|None = None, | ||||||
|     allow_overruns: bool = False, |     allow_overruns: bool = False, | ||||||
|     hide_tb: bool = True, | 
 | ||||||
|  |     # TODO: if we set this the wrapping `@acm` body will | ||||||
|  |     # still be shown (awkwardly) on pdb REPL entry. Ideally | ||||||
|  |     # we can similarly annotate that frame to NOT show? for now | ||||||
|  |     # we DO SHOW this frame since it's awkward ow.. | ||||||
|  |     hide_tb: bool = False, | ||||||
| 
 | 
 | ||||||
|     # proxied to RPC |     # proxied to RPC | ||||||
|     **kwargs, |     **kwargs, | ||||||
|  | @ -2108,7 +2115,6 @@ async def open_context_from_portal( | ||||||
|                 ipc=ctx, |                 ipc=ctx, | ||||||
|                 expect_msg=Started, |                 expect_msg=Started, | ||||||
|                 passthrough_non_pld_msgs=False, |                 passthrough_non_pld_msgs=False, | ||||||
|                 hide_tb=hide_tb, |  | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             # from .devx import pause |             # from .devx import pause | ||||||
|  |  | ||||||
|  | @ -22,9 +22,6 @@ from __future__ import annotations | ||||||
| import builtins | import builtins | ||||||
| import importlib | import importlib | ||||||
| from pprint import pformat | from pprint import pformat | ||||||
| from types import ( |  | ||||||
|     TracebackType, |  | ||||||
| ) |  | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     Callable, |     Callable, | ||||||
|  | @ -95,30 +92,26 @@ _ipcmsg_keys: list[str] = [ | ||||||
|     fi.name |     fi.name | ||||||
|     for fi, k, v |     for fi, k, v | ||||||
|     in iter_fields(Error) |     in iter_fields(Error) | ||||||
|  | 
 | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
| _body_fields: list[str] = list( | _body_fields: list[str] = list( | ||||||
|     set(_ipcmsg_keys) |     set(_ipcmsg_keys) | ||||||
| 
 | 
 | ||||||
|     # XXX NOTE: DON'T-SHOW-FIELDS |     # NOTE: don't show fields that either don't provide | ||||||
|     # - don't provide any extra useful info or, |     # any extra useful info or that are already shown | ||||||
|     # - are already shown as part of `.__repr__()` or, |     # as part of `.__repr__()` output. | ||||||
|     # - are sub-type specific. |  | ||||||
|     - { |     - { | ||||||
|         'src_type_str', |         'src_type_str', | ||||||
|         'boxed_type_str', |         'boxed_type_str', | ||||||
|         'tb_str', |         'tb_str', | ||||||
|         'relay_path', |         'relay_path', | ||||||
|  |         '_msg_dict', | ||||||
|         'cid', |         'cid', | ||||||
| 
 | 
 | ||||||
|         # only ctxc should show it but `Error` does |         # since only ctxc should show it but `Error` does | ||||||
|         # have it as an optional field. |         # have it as an optional field. | ||||||
|         'canceller', |         'canceller', | ||||||
| 
 |  | ||||||
|         # only for MTEs and generally only used |  | ||||||
|         # when devving/testing/debugging. |  | ||||||
|         '_msg_dict', |  | ||||||
|         '_bad_msg', |  | ||||||
|     } |     } | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
|  | @ -153,7 +146,6 @@ def pack_from_raise( | ||||||
|         |MsgTypeError |         |MsgTypeError | ||||||
|     ), |     ), | ||||||
|     cid: str, |     cid: str, | ||||||
|     hide_tb: bool = True, |  | ||||||
| 
 | 
 | ||||||
|     **rae_fields, |     **rae_fields, | ||||||
| 
 | 
 | ||||||
|  | @ -164,7 +156,7 @@ def pack_from_raise( | ||||||
|     `Error`-msg using `pack_error()` to extract the tb info. |     `Error`-msg using `pack_error()` to extract the tb info. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     __tracebackhide__: bool = hide_tb |     __tracebackhide__: bool = True | ||||||
|     try: |     try: | ||||||
|         raise local_err |         raise local_err | ||||||
|     except type(local_err) as local_err: |     except type(local_err) as local_err: | ||||||
|  | @ -239,8 +231,7 @@ class RemoteActorError(Exception): | ||||||
| 
 | 
 | ||||||
|         if ( |         if ( | ||||||
|             extra_msgdata |             extra_msgdata | ||||||
|             and |             and ipc_msg | ||||||
|             ipc_msg |  | ||||||
|         ): |         ): | ||||||
|             # XXX mutate the orig msg directly from |             # XXX mutate the orig msg directly from | ||||||
|             # manually provided input params. |             # manually provided input params. | ||||||
|  | @ -270,16 +261,17 @@ class RemoteActorError(Exception): | ||||||
|         # either by customizing `ContextCancelled.__init__()` or |         # either by customizing `ContextCancelled.__init__()` or | ||||||
|         # through a special factor func? |         # through a special factor func? | ||||||
|         elif boxed_type: |         elif boxed_type: | ||||||
|             boxed_type_str: str = boxed_type.__name__ |             boxed_type_str: str = type(boxed_type).__name__ | ||||||
|             if ( |             if ( | ||||||
|                 ipc_msg |                 ipc_msg | ||||||
|                 and |                 and not self._ipc_msg.boxed_type_str | ||||||
|                 self._ipc_msg.boxed_type_str != boxed_type_str |  | ||||||
|             ): |             ): | ||||||
|                 self._ipc_msg.boxed_type_str = boxed_type_str |                 self._ipc_msg.boxed_type_str = boxed_type_str | ||||||
|                 assert self.boxed_type_str == self._ipc_msg.boxed_type_str |                 assert self.boxed_type_str == self._ipc_msg.boxed_type_str | ||||||
| 
 | 
 | ||||||
|             # ensure any roundtripping evals to the input value |             else: | ||||||
|  |                 self._extra_msgdata['boxed_type_str'] = boxed_type_str | ||||||
|  | 
 | ||||||
|             assert self.boxed_type is boxed_type |             assert self.boxed_type is boxed_type | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|  | @ -317,9 +309,7 @@ class RemoteActorError(Exception): | ||||||
|             if self._ipc_msg |             if self._ipc_msg | ||||||
|             else {} |             else {} | ||||||
|         ) |         ) | ||||||
|         return { |         return self._extra_msgdata | msgdata | ||||||
|             k: v for k, v in self._extra_msgdata.items() |  | ||||||
|         } | msgdata |  | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def src_type_str(self) -> str: |     def src_type_str(self) -> str: | ||||||
|  | @ -512,8 +502,6 @@ class RemoteActorError(Exception): | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         header: str = '' |         header: str = '' | ||||||
|         body: str = '' |  | ||||||
| 
 |  | ||||||
|         if with_type_header: |         if with_type_header: | ||||||
|             header: str = f'<{type(self).__name__}(\n' |             header: str = f'<{type(self).__name__}(\n' | ||||||
| 
 | 
 | ||||||
|  | @ -537,22 +525,24 @@ class RemoteActorError(Exception): | ||||||
|             ) |             ) | ||||||
|             if not with_type_header: |             if not with_type_header: | ||||||
|                 body = '\n' + body |                 body = '\n' + body | ||||||
|  |         else: | ||||||
|  |             first: str = '' | ||||||
|  |             message: str = self._message | ||||||
| 
 | 
 | ||||||
|         elif message := self._message: |  | ||||||
|             # split off the first line so it isn't indented |             # split off the first line so it isn't indented | ||||||
|             # the same like the "boxed content". |             # the same like the "boxed content". | ||||||
|             if not with_type_header: |             if not with_type_header: | ||||||
|                 lines: list[str] = message.splitlines() |                 lines: list[str] = message.splitlines() | ||||||
|                 first: str = lines[0] |                 first = lines[0] | ||||||
|                 message: str = message.removeprefix(first) |                 message = ''.join(lines[1:]) | ||||||
| 
 |  | ||||||
|             else: |  | ||||||
|                 first: str = '' |  | ||||||
| 
 | 
 | ||||||
|             body: str = ( |             body: str = ( | ||||||
|                 first |                 first | ||||||
|                 + |                 + | ||||||
|                 message |                 textwrap.indent( | ||||||
|  |                     message, | ||||||
|  |                     prefix='  ', | ||||||
|  |                 ) | ||||||
|                 + |                 + | ||||||
|                 '\n' |                 '\n' | ||||||
|             ) |             ) | ||||||
|  | @ -718,72 +708,52 @@ class MsgTypeError( | ||||||
|     ] |     ] | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def bad_msg(self) -> PayloadMsg|None: |     def msg_dict(self) -> dict[str, Any]: | ||||||
|         ''' |         ''' | ||||||
|         Ref to the the original invalid IPC shuttle msg which failed |         If the underlying IPC `MsgType` was received from a remote | ||||||
|         to decode thus providing for the reason for this error. |         actor but was unable to be decoded to a native | ||||||
|  |         `Yield`|`Started`|`Return` struct, the interchange backend | ||||||
|  |         native format decoder can be used to stash a `dict` | ||||||
|  |         version for introspection by the invalidating RPC task. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         if ( |         return self.msgdata.get('_msg_dict') | ||||||
|             (_bad_msg := self.msgdata.get('_bad_msg')) |  | ||||||
|             and |  | ||||||
|             isinstance(_bad_msg, PayloadMsg) |  | ||||||
|         ): |  | ||||||
|             return _bad_msg |  | ||||||
| 
 | 
 | ||||||
|         elif bad_msg_dict := self.bad_msg_as_dict: |     @property | ||||||
|  |     def expected_msg(self) -> MsgType|None: | ||||||
|  |         ''' | ||||||
|  |         Attempt to construct what would have been the original | ||||||
|  |         `MsgType`-with-payload subtype (i.e. an instance from the set | ||||||
|  |         of msgs in `.msg.types._payload_msgs`) which failed | ||||||
|  |         validation. | ||||||
|  | 
 | ||||||
|  |         ''' | ||||||
|  |         if msg_dict := self.msg_dict.copy(): | ||||||
|             return msgtypes.from_dict_msg( |             return msgtypes.from_dict_msg( | ||||||
|                 dict_msg=bad_msg_dict.copy(), |                 dict_msg=msg_dict, | ||||||
|                 # use_pretty=True, |                 # use_pretty=True, | ||||||
|                 # ^-TODO-^ would luv to use this BUT then the |                 # ^-TODO-^ would luv to use this BUT then the | ||||||
|                 # `field_prefix` in `pformat_boxed_tb()` cucks it |                 # `field_prefix` in `pformat_boxed_tb()` cucks it | ||||||
|                 # all up.. XD |                 # all up.. XD | ||||||
|             ) |             ) | ||||||
| 
 |  | ||||||
|         return None |         return None | ||||||
| 
 | 
 | ||||||
|     @property |  | ||||||
|     def bad_msg_as_dict(self) -> dict[str, Any]: |  | ||||||
|         ''' |  | ||||||
|         If the underlying IPC `MsgType` was received from a remote |  | ||||||
|         actor but was unable to be decoded to a native `PayloadMsg` |  | ||||||
|         (`Yield`|`Started`|`Return`) struct, the interchange backend |  | ||||||
|         native format decoder can be used to stash a `dict` version |  | ||||||
|         for introspection by the invalidating RPC task. |  | ||||||
| 
 |  | ||||||
|         Optionally when this error is constructed from |  | ||||||
|         `.from_decode()` the caller can attempt to construct what |  | ||||||
|         would have been the original `MsgType`-with-payload subtype |  | ||||||
|         (i.e. an instance from the set of msgs in |  | ||||||
|         `.msg.types._payload_msgs`) which failed validation. |  | ||||||
| 
 |  | ||||||
|         ''' |  | ||||||
|         return self.msgdata.get('_bad_msg_as_dict') |  | ||||||
| 
 |  | ||||||
|     @property |     @property | ||||||
|     def expected_msg_type(self) -> Type[MsgType]|None: |     def expected_msg_type(self) -> Type[MsgType]|None: | ||||||
|         return type(self.bad_msg) |         return type(self.expected_msg) | ||||||
| 
 | 
 | ||||||
|     @property |     @property | ||||||
|     def cid(self) -> str: |     def cid(self) -> str: | ||||||
|         # pull from required `.bad_msg` ref (or src dict) |         # pre-packed using `.from_decode()` constructor | ||||||
|         if bad_msg := self.bad_msg: |         return self.msgdata.get('cid') | ||||||
|             return bad_msg.cid |  | ||||||
| 
 |  | ||||||
|         return self.msgdata['cid'] |  | ||||||
| 
 | 
 | ||||||
|     @classmethod |     @classmethod | ||||||
|     def from_decode( |     def from_decode( | ||||||
|         cls, |         cls, | ||||||
|         message: str, |         message: str, | ||||||
| 
 | 
 | ||||||
|         bad_msg: PayloadMsg|None = None, |         ipc_msg: PayloadMsg|None = None, | ||||||
|         bad_msg_as_dict: dict|None = None, |         msgdict: dict|None = None, | ||||||
| 
 |  | ||||||
|         # if provided, expand and pack all RAE compat fields into the |  | ||||||
|         # `._extra_msgdata` auxillary data `dict` internal to |  | ||||||
|         # `RemoteActorError`. |  | ||||||
|         **extra_msgdata, |  | ||||||
| 
 | 
 | ||||||
|     ) -> MsgTypeError: |     ) -> MsgTypeError: | ||||||
|         ''' |         ''' | ||||||
|  | @ -793,44 +763,25 @@ class MsgTypeError( | ||||||
|         (which is normally the caller of this). |         (which is normally the caller of this). | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         if bad_msg_as_dict: |         # if provided, expand and pack all RAE compat fields into the | ||||||
|  |         # `._extra_msgdata` auxillary data `dict` internal to | ||||||
|  |         # `RemoteActorError`. | ||||||
|  |         extra_msgdata: dict = {} | ||||||
|  |         if msgdict: | ||||||
|  |             extra_msgdata: dict = { | ||||||
|  |                 k: v | ||||||
|  |                 for k, v in msgdict.items() | ||||||
|  |                 if k in _ipcmsg_keys | ||||||
|  |             } | ||||||
|             # NOTE: original "vanilla decode" of the msg-bytes |             # NOTE: original "vanilla decode" of the msg-bytes | ||||||
|             # is placed inside a value readable from |             # is placed inside a value readable from | ||||||
|             # `.msgdata['_msg_dict']` |             # `.msgdata['_msg_dict']` | ||||||
|             extra_msgdata['_bad_msg_as_dict'] = bad_msg_as_dict |             extra_msgdata['_msg_dict'] = msgdict | ||||||
| 
 |  | ||||||
|             # scrape out any underlying fields from the |  | ||||||
|             # msg that failed validation. |  | ||||||
|             for k, v in bad_msg_as_dict.items(): |  | ||||||
|                 if ( |  | ||||||
|                     # always skip a duplicate entry |  | ||||||
|                     # if already provided as an arg |  | ||||||
|                     k == '_bad_msg' and bad_msg |  | ||||||
|                     or |  | ||||||
|                     # skip anything not in the default msg-field set. |  | ||||||
|                     k not in _ipcmsg_keys |  | ||||||
|                     # k not in _body_fields |  | ||||||
|                 ): |  | ||||||
|                     continue |  | ||||||
| 
 |  | ||||||
|                 extra_msgdata[k] = v |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
|         elif bad_msg: |  | ||||||
|             if not isinstance(bad_msg, PayloadMsg): |  | ||||||
|                 raise TypeError( |  | ||||||
|                     'The provided `bad_msg` is not a `PayloadMsg` type?\n\n' |  | ||||||
|                     f'{bad_msg}' |  | ||||||
|                 ) |  | ||||||
|             extra_msgdata['_bad_msg'] = bad_msg |  | ||||||
|             extra_msgdata['cid'] = bad_msg.cid |  | ||||||
| 
 |  | ||||||
|         if 'cid' not in extra_msgdata: |  | ||||||
|             import pdbp; pdbp.set_trace() |  | ||||||
| 
 | 
 | ||||||
|         return cls( |         return cls( | ||||||
|             message=message, |             message=message, | ||||||
|             boxed_type=cls, |             boxed_type=cls, | ||||||
|  |             ipc_msg=ipc_msg, | ||||||
|             **extra_msgdata, |             **extra_msgdata, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  | @ -885,10 +836,9 @@ class MessagingError(Exception): | ||||||
| def pack_error( | def pack_error( | ||||||
|     exc: BaseException|RemoteActorError, |     exc: BaseException|RemoteActorError, | ||||||
| 
 | 
 | ||||||
|  |     tb: str|None = None, | ||||||
|     cid: str|None = None, |     cid: str|None = None, | ||||||
|     src_uid: tuple[str, str]|None = None, |     src_uid: tuple[str, str]|None = None, | ||||||
|     tb: TracebackType|None = None, |  | ||||||
|     tb_str: str = '', |  | ||||||
| 
 | 
 | ||||||
| ) -> Error: | ) -> Error: | ||||||
|     ''' |     ''' | ||||||
|  | @ -898,28 +848,10 @@ def pack_error( | ||||||
|     the receiver side using `unpack_error()` below. |     the receiver side using `unpack_error()` below. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     if not tb_str: |  | ||||||
|         tb_str: str = ( |  | ||||||
|             ''.join(traceback.format_exception(exc)) |  | ||||||
| 
 |  | ||||||
|             # TODO: can we remove this is `exc` is required? |  | ||||||
|             or |  | ||||||
|             # NOTE: this is just a shorthand for the "last error" as |  | ||||||
|             # provided by `sys.exeception()`, see: |  | ||||||
|             # - https://docs.python.org/3/library/traceback.html#traceback.print_exc |  | ||||||
|             # - https://docs.python.org/3/library/traceback.html#traceback.format_exc |  | ||||||
|             traceback.format_exc() |  | ||||||
|         ) |  | ||||||
|     else: |  | ||||||
|         if tb_str[-2:] != '\n': |  | ||||||
|             tb_str += '\n' |  | ||||||
| 
 |  | ||||||
|     # when caller provides a tb instance (say pulled from some other |  | ||||||
|     # src error's `.__traceback__`) we use that as the "boxed" |  | ||||||
|     # tb-string instead. |  | ||||||
|     if tb: |     if tb: | ||||||
|         # https://docs.python.org/3/library/traceback.html#traceback.format_list |         tb_str = ''.join(traceback.format_tb(tb)) | ||||||
|         tb_str: str = ''.join(traceback.format_tb(tb)) + tb_str |     else: | ||||||
|  |         tb_str = traceback.format_exc() | ||||||
| 
 | 
 | ||||||
|     error_msg: dict[  # for IPC |     error_msg: dict[  # for IPC | ||||||
|         str, |         str, | ||||||
|  | @ -1183,7 +1115,7 @@ def _mk_msg_type_err( | ||||||
|     src_validation_error: ValidationError|None = None, |     src_validation_error: ValidationError|None = None, | ||||||
|     src_type_error: TypeError|None = None, |     src_type_error: TypeError|None = None, | ||||||
|     is_invalid_payload: bool = False, |     is_invalid_payload: bool = False, | ||||||
|     # src_err_msg: Error|None = None, |     src_err_msg: Error|None = None, | ||||||
| 
 | 
 | ||||||
|     **mte_kwargs, |     **mte_kwargs, | ||||||
| 
 | 
 | ||||||
|  | @ -1232,10 +1164,10 @@ def _mk_msg_type_err( | ||||||
|                     '|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n' |                     '|_ https://jcristharif.com/msgspec/extending.html#defining-a-custom-extension-messagepack-only\n' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|         msgtyperr = MsgTypeError( |         msgtyperr = MsgTypeError( | ||||||
|             message=message, |             message=message, | ||||||
|             ipc_msg=msg, |             ipc_msg=msg, | ||||||
|             bad_msg=msg, |  | ||||||
|         ) |         ) | ||||||
|         # ya, might be `None` |         # ya, might be `None` | ||||||
|         msgtyperr.__cause__ = src_type_error |         msgtyperr.__cause__ = src_type_error | ||||||
|  | @ -1243,9 +1175,6 @@ def _mk_msg_type_err( | ||||||
| 
 | 
 | ||||||
|     # `Channel.recv()` case |     # `Channel.recv()` case | ||||||
|     else: |     else: | ||||||
|         msg_dict: dict|None = None |  | ||||||
|         bad_msg: PayloadMsg|None = None |  | ||||||
| 
 |  | ||||||
|         if is_invalid_payload: |         if is_invalid_payload: | ||||||
|             msg_type: str = type(msg) |             msg_type: str = type(msg) | ||||||
|             any_pld: Any = msgpack.decode(msg.pld) |             any_pld: Any = msgpack.decode(msg.pld) | ||||||
|  | @ -1257,20 +1186,19 @@ def _mk_msg_type_err( | ||||||
|                 # f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n' |                 # f' |_pld: {codec.pld_spec_str}\n'# != {any_pld!r}\n' | ||||||
|                 # f')>\n\n' |                 # f')>\n\n' | ||||||
|             ) |             ) | ||||||
|             # src_err_msg = msg |  | ||||||
|             bad_msg = msg |  | ||||||
|             # TODO: should we just decode the msg to a dict despite |             # TODO: should we just decode the msg to a dict despite | ||||||
|             # only the payload being wrong? |             # only the payload being wrong? | ||||||
|             # -[ ] maybe the better design is to break this construct |             # -[ ] maybe the better design is to break this construct | ||||||
|             #   logic into a separate explicit helper raiser-func? |             #   logic into a separate explicit helper raiser-func? | ||||||
|  |             msg_dict = None | ||||||
| 
 | 
 | ||||||
|         else: |         else: | ||||||
|             # decode the msg-bytes using the std msgpack |  | ||||||
|             # interchange-prot (i.e. without any `msgspec.Struct` |  | ||||||
|             # handling) so that we can determine what |  | ||||||
|             # `.msg.types.PayloadMsg` is the culprit by reporting the |  | ||||||
|             # received value. |  | ||||||
|             msg: bytes |             msg: bytes | ||||||
|  |             # decode the msg-bytes using the std msgpack | ||||||
|  |             # interchange-prot (i.e. without any | ||||||
|  |             # `msgspec.Struct` handling) so that we can | ||||||
|  |             # determine what `.msg.types.Msg` is the culprit | ||||||
|  |             # by reporting the received value. | ||||||
|             msg_dict: dict = msgpack.decode(msg) |             msg_dict: dict = msgpack.decode(msg) | ||||||
|             msg_type_name: str = msg_dict['msg_type'] |             msg_type_name: str = msg_dict['msg_type'] | ||||||
|             msg_type = getattr(msgtypes, msg_type_name) |             msg_type = getattr(msgtypes, msg_type_name) | ||||||
|  | @ -1307,13 +1235,9 @@ def _mk_msg_type_err( | ||||||
|         if verb_header: |         if verb_header: | ||||||
|             message = f'{verb_header} ' + message |             message = f'{verb_header} ' + message | ||||||
| 
 | 
 | ||||||
|         # if not isinstance(bad_msg, PayloadMsg): |  | ||||||
|         #     import pdbp; pdbp.set_trace() |  | ||||||
| 
 |  | ||||||
|         msgtyperr = MsgTypeError.from_decode( |         msgtyperr = MsgTypeError.from_decode( | ||||||
|             message=message, |             message=message, | ||||||
|             bad_msg=bad_msg, |             msgdict=msg_dict, | ||||||
|             bad_msg_as_dict=msg_dict, |  | ||||||
| 
 | 
 | ||||||
|             # NOTE: for the send-side `.started()` pld-validate |             # NOTE: for the send-side `.started()` pld-validate | ||||||
|             # case we actually set the `._ipc_msg` AFTER we return |             # case we actually set the `._ipc_msg` AFTER we return | ||||||
|  | @ -1321,7 +1245,7 @@ def _mk_msg_type_err( | ||||||
|             # want to emulate the `Error` from the mte we build here |             # want to emulate the `Error` from the mte we build here | ||||||
|             # Bo |             # Bo | ||||||
|             # so by default in that case this is set to `None` |             # so by default in that case this is set to `None` | ||||||
|             # ipc_msg=src_err_msg, |             ipc_msg=src_err_msg, | ||||||
|         ) |         ) | ||||||
|         msgtyperr.__cause__ = src_validation_error |         msgtyperr.__cause__ = src_validation_error | ||||||
|         return msgtyperr |         return msgtyperr | ||||||
|  |  | ||||||
|  | @ -47,7 +47,7 @@ from tractor._exceptions import ( | ||||||
|     _raise_from_unexpected_msg, |     _raise_from_unexpected_msg, | ||||||
|     MsgTypeError, |     MsgTypeError, | ||||||
|     _mk_msg_type_err, |     _mk_msg_type_err, | ||||||
|     pack_error, |     pack_from_raise, | ||||||
| ) | ) | ||||||
| from tractor._state import current_ipc_ctx | from tractor._state import current_ipc_ctx | ||||||
| from ._codec import ( | from ._codec import ( | ||||||
|  | @ -203,6 +203,7 @@ class PldRx(Struct): | ||||||
|         msg: MsgType = ( |         msg: MsgType = ( | ||||||
|             ipc_msg |             ipc_msg | ||||||
|             or |             or | ||||||
|  | 
 | ||||||
|             # async-rx msg from underlying IPC feeder (mem-)chan |             # async-rx msg from underlying IPC feeder (mem-)chan | ||||||
|             await ipc._rx_chan.receive() |             await ipc._rx_chan.receive() | ||||||
|         ) |         ) | ||||||
|  | @ -222,10 +223,6 @@ class PldRx(Struct): | ||||||
|         raise_error: bool = True, |         raise_error: bool = True, | ||||||
|         hide_tb: bool = True, |         hide_tb: bool = True, | ||||||
| 
 | 
 | ||||||
|         # XXX for special (default?) case of send side call with |  | ||||||
|         # `Context.started(validate_pld_spec=True)` |  | ||||||
|         is_started_send_side: bool = False, |  | ||||||
| 
 |  | ||||||
|     ) -> PayloadT|Raw: |     ) -> PayloadT|Raw: | ||||||
|         ''' |         ''' | ||||||
|         Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and |         Decode a msg's payload field: `MsgType.pld: PayloadT|Raw` and | ||||||
|  | @ -233,6 +230,8 @@ class PldRx(Struct): | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__: bool = hide_tb |         __tracebackhide__: bool = hide_tb | ||||||
|  | 
 | ||||||
|  |         _src_err = None | ||||||
|         src_err: BaseException|None = None |         src_err: BaseException|None = None | ||||||
|         match msg: |         match msg: | ||||||
|             # payload-data shuttle msg; deliver the `.pld` value |             # payload-data shuttle msg; deliver the `.pld` value | ||||||
|  | @ -257,58 +256,18 @@ class PldRx(Struct): | ||||||
|                     # pack mgterr into error-msg for |                     # pack mgterr into error-msg for | ||||||
|                     # reraise below; ensure remote-actor-err |                     # reraise below; ensure remote-actor-err | ||||||
|                     # info is displayed nicely? |                     # info is displayed nicely? | ||||||
|                     mte: MsgTypeError = _mk_msg_type_err( |                     msgterr: MsgTypeError = _mk_msg_type_err( | ||||||
|                         msg=msg, |                         msg=msg, | ||||||
|                         codec=self.pld_dec, |                         codec=self.pld_dec, | ||||||
|                         src_validation_error=valerr, |                         src_validation_error=valerr, | ||||||
|                         is_invalid_payload=True, |                         is_invalid_payload=True, | ||||||
|                         expected_msg=expect_msg, |  | ||||||
|                         # ipc_msg=msg, |  | ||||||
|                     ) |                     ) | ||||||
|                     # NOTE: override the `msg` passed to |                     msg: Error = pack_from_raise( | ||||||
|                     # `_raise_from_unexpected_msg()` (below) so so that |                         local_err=msgterr, | ||||||
|                     # we're effectively able to use that same func to |  | ||||||
|                     # unpack and raise an "emulated remote `Error`" of |  | ||||||
|                     # this local MTE. |  | ||||||
|                     err_msg: Error = pack_error( |  | ||||||
|                         exc=mte, |  | ||||||
|                         cid=msg.cid, |                         cid=msg.cid, | ||||||
|                         src_uid=( |                         src_uid=ipc.chan.uid, | ||||||
|                             ipc.chan.uid |  | ||||||
|                             if not is_started_send_side |  | ||||||
|                             else ipc._actor.uid |  | ||||||
|                         ), |  | ||||||
|                         # tb=valerr.__traceback__, |  | ||||||
|                         tb_str=mte._message, |  | ||||||
|                     ) |                     ) | ||||||
|                     # ^-TODO-^ just raise this inline instead of all the |  | ||||||
|                     # pack-unpack-repack non-sense! |  | ||||||
| 
 |  | ||||||
|                     mte._ipc_msg = err_msg |  | ||||||
|                     msg = err_msg |  | ||||||
| 
 |  | ||||||
|                     # set emulated remote error more-or-less as the |  | ||||||
|                     # runtime would |  | ||||||
|                     ctx: Context = getattr(ipc, 'ctx', ipc) |  | ||||||
| 
 |  | ||||||
|                     # TODO: should we instead make this explicit and |  | ||||||
|                     # use the above masked `is_started_send_decode`, |  | ||||||
|                     # expecting the `Context.started()` caller to set |  | ||||||
|                     # it? Rn this is kinda, howyousayyy, implicitly |  | ||||||
|                     # edge-case-y.. |  | ||||||
|                     if ( |  | ||||||
|                         expect_msg is not Started |  | ||||||
|                         and not is_started_send_side |  | ||||||
|                     ): |  | ||||||
|                         ctx._maybe_cancel_and_set_remote_error(mte) |  | ||||||
| 
 |  | ||||||
|                     # XXX NOTE: so when the `_raise_from_unexpected_msg()` |  | ||||||
|                     # raises the boxed `err_msg` from above it raises |  | ||||||
|                     # it from `None`. |  | ||||||
|                     src_err = valerr |                     src_err = valerr | ||||||
|                     # if is_started_send_side: |  | ||||||
|                     #     src_err = None |  | ||||||
| 
 |  | ||||||
| 
 | 
 | ||||||
|                 # XXX some other decoder specific failure? |                 # XXX some other decoder specific failure? | ||||||
|                 # except TypeError as src_error: |                 # except TypeError as src_error: | ||||||
|  | @ -420,7 +379,6 @@ class PldRx(Struct): | ||||||
|         # NOTE: generally speaking only for handling `Stop`-msgs that |         # NOTE: generally speaking only for handling `Stop`-msgs that | ||||||
|         # arrive during a call to `drain_to_final_msg()` above! |         # arrive during a call to `drain_to_final_msg()` above! | ||||||
|         passthrough_non_pld_msgs: bool = True, |         passthrough_non_pld_msgs: bool = True, | ||||||
|         hide_tb: bool = True, |  | ||||||
|         **kwargs, |         **kwargs, | ||||||
| 
 | 
 | ||||||
|     ) -> tuple[MsgType, PayloadT]: |     ) -> tuple[MsgType, PayloadT]: | ||||||
|  | @ -429,7 +387,6 @@ class PldRx(Struct): | ||||||
|         the pair of refs. |         the pair of refs. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         __tracebackhide__: bool = hide_tb |  | ||||||
|         msg: MsgType = await ipc._rx_chan.receive() |         msg: MsgType = await ipc._rx_chan.receive() | ||||||
| 
 | 
 | ||||||
|         if passthrough_non_pld_msgs: |         if passthrough_non_pld_msgs: | ||||||
|  | @ -444,7 +401,6 @@ class PldRx(Struct): | ||||||
|             msg, |             msg, | ||||||
|             ipc=ipc, |             ipc=ipc, | ||||||
|             expect_msg=expect_msg, |             expect_msg=expect_msg, | ||||||
|             hide_tb=hide_tb, |  | ||||||
|             **kwargs, |             **kwargs, | ||||||
|         ) |         ) | ||||||
|         return msg, pld |         return msg, pld | ||||||
|  | @ -458,7 +414,7 @@ def limit_plds( | ||||||
| ) -> MsgDec: | ) -> MsgDec: | ||||||
|     ''' |     ''' | ||||||
|     Apply a `MsgCodec` that will natively decode the SC-msg set's |     Apply a `MsgCodec` that will natively decode the SC-msg set's | ||||||
|     `PayloadMsg.pld: Union[Type[Struct]]` payload fields using |     `Msg.pld: Union[Type[Struct]]` payload fields using | ||||||
|     tagged-unions of `msgspec.Struct`s from the `payload_types` |     tagged-unions of `msgspec.Struct`s from the `payload_types` | ||||||
|     for all IPC contexts in use by the current `trio.Task`. |     for all IPC contexts in use by the current `trio.Task`. | ||||||
| 
 | 
 | ||||||
|  | @ -735,11 +691,3 @@ async def drain_to_final_msg( | ||||||
|         result_msg, |         result_msg, | ||||||
|         pre_result_drained, |         pre_result_drained, | ||||||
|     ) |     ) | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # TODO: factor logic from `.Context.started()` for send-side |  | ||||||
| # validate raising! |  | ||||||
| def validate_payload_msg( |  | ||||||
|     msg: Started|Yield|Return, |  | ||||||
| ) -> MsgTypeError|None: |  | ||||||
|     ... |  | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue