forked from goodboy/tractor
				
			WIP porting runtime to use `Msg`-spec
							parent
							
								
									b56b3aa890
								
							
						
					
					
						commit
						09eed9d7e1
					
				|  | @ -53,7 +53,14 @@ from ._exceptions import ( | |||
|     _raise_from_no_key_in_msg, | ||||
| ) | ||||
| from .log import get_logger | ||||
| from .msg import NamespacePath | ||||
| from .msg import ( | ||||
|     NamespacePath, | ||||
|     Msg, | ||||
|     Return, | ||||
|     Started, | ||||
|     Stop, | ||||
|     Yield, | ||||
| ) | ||||
| from ._ipc import Channel | ||||
| from ._streaming import MsgStream | ||||
| from ._state import ( | ||||
|  | @ -96,7 +103,8 @@ async def _drain_to_final_msg( | |||
|     # wait for a final context result by collecting (but | ||||
|     # basically ignoring) any bi-dir-stream msgs still in transit | ||||
|     # from the far end. | ||||
|     pre_result_drained: list[dict] = [] | ||||
|     # pre_result_drained: list[dict] = [] | ||||
|     pre_result_drained: list[Msg] = [] | ||||
|     while not ( | ||||
|         ctx.maybe_error | ||||
|         and not ctx._final_result_is_set() | ||||
|  | @ -155,7 +163,10 @@ async def _drain_to_final_msg( | |||
|             #     await pause() | ||||
| 
 | ||||
|             # pray to the `trio` gawds that we're corrent with this | ||||
|             msg: dict = await ctx._recv_chan.receive() | ||||
|             # msg: dict = await ctx._recv_chan.receive() | ||||
|             msg: Msg = await ctx._recv_chan.receive() | ||||
|             # always capture unexpected/non-result msgs | ||||
|             pre_result_drained.append(msg) | ||||
| 
 | ||||
|         # NOTE: we get here if the far end was | ||||
|         # `ContextCancelled` in 2 cases: | ||||
|  | @ -175,24 +186,31 @@ async def _drain_to_final_msg( | |||
|             # continue to bubble up as normal. | ||||
|             raise | ||||
| 
 | ||||
|         try: | ||||
|             ctx._result: Any = msg['return'] | ||||
|             log.runtime( | ||||
|                 'Context delivered final draining msg:\n' | ||||
|                 f'{pformat(msg)}' | ||||
|             ) | ||||
|             # XXX: only close the rx mem chan AFTER | ||||
|             # a final result is retreived. | ||||
|             # if ctx._recv_chan: | ||||
|             #     await ctx._recv_chan.aclose() | ||||
|             # TODO: ^ we don't need it right? | ||||
|             break | ||||
|         match msg: | ||||
|             case Return( | ||||
|                 cid=cid, | ||||
|                 pld=res, | ||||
|             ): | ||||
|         # try: | ||||
|             # ctx._result: Any = msg['return'] | ||||
|             # ctx._result: Any = msg.pld | ||||
|                 ctx._result: Any = res | ||||
|                 log.runtime( | ||||
|                     'Context delivered final draining msg:\n' | ||||
|                     f'{pformat(msg)}' | ||||
|                 ) | ||||
|                 # XXX: only close the rx mem chan AFTER | ||||
|                 # a final result is retreived. | ||||
|                 # if ctx._recv_chan: | ||||
|                 #     await ctx._recv_chan.aclose() | ||||
|                 # TODO: ^ we don't need it right? | ||||
|                 break | ||||
| 
 | ||||
|         except KeyError: | ||||
|             # always capture unexpected/non-result msgs | ||||
|             pre_result_drained.append(msg) | ||||
|         # except KeyError: | ||||
|         # except AttributeError: | ||||
|             case Yield(): | ||||
|             # if 'yield' in msg: | ||||
| 
 | ||||
|             if 'yield' in msg: | ||||
|                 # far end task is still streaming to us so discard | ||||
|                 # and report per local context state. | ||||
|                 if ( | ||||
|  | @ -238,9 +256,10 @@ async def _drain_to_final_msg( | |||
|             # TODO: work out edge cases here where | ||||
|             # a stream is open but the task also calls | ||||
|             # this? | ||||
|             # -[ ] should be a runtime error if a stream is open | ||||
|             #   right? | ||||
|             elif 'stop' in msg: | ||||
|             # -[ ] should be a runtime error if a stream is open right? | ||||
|             # Stop() | ||||
|             case Stop(): | ||||
|             # elif 'stop' in msg: | ||||
|                 log.cancel( | ||||
|                     'Remote stream terminated due to "stop" msg:\n\n' | ||||
|                     f'{pformat(msg)}\n' | ||||
|  | @ -249,78 +268,80 @@ async def _drain_to_final_msg( | |||
| 
 | ||||
|             # It's an internal error if any other msg type without | ||||
|             # a`'cid'` field arrives here! | ||||
|             if not msg.get('cid'): | ||||
|                 raise InternalError( | ||||
|                     'Unexpected cid-missing msg?\n\n' | ||||
|                     f'{msg}\n' | ||||
|                 ) | ||||
|             case _: | ||||
|             # if not msg.get('cid'): | ||||
|                 if not msg.cid: | ||||
|                     raise InternalError( | ||||
|                         'Unexpected cid-missing msg?\n\n' | ||||
|                         f'{msg}\n' | ||||
|                     ) | ||||
| 
 | ||||
|             # XXX fallthrough to handle expected error XXX | ||||
|             # TODO: replace this with `ctx.maybe_raise()` | ||||
|             # | ||||
|             # TODO: would this be handier for this case maybe? | ||||
|             # async with maybe_raise_on_exit() as raises: | ||||
|             #     if raises: | ||||
|             #         log.error('some msg about raising..') | ||||
|                 # XXX fallthrough to handle expected error XXX | ||||
|                 # TODO: replace this with `ctx.maybe_raise()` | ||||
|                 # | ||||
|                 # TODO: would this be handier for this case maybe? | ||||
|                 # async with maybe_raise_on_exit() as raises: | ||||
|                 #     if raises: | ||||
|                 #         log.error('some msg about raising..') | ||||
| 
 | ||||
|             re: Exception|None = ctx._remote_error | ||||
|             if re: | ||||
|                 log.critical( | ||||
|                     'Remote ctx terminated due to "error" msg:\n' | ||||
|                     f'{re}' | ||||
|                 ) | ||||
|                 assert msg is ctx._cancel_msg | ||||
|                 # NOTE: this solved a super dupe edge case XD | ||||
|                 # this was THE super duper edge case of: | ||||
|                 # - local task opens a remote task, | ||||
|                 # - requests remote cancellation of far end | ||||
|                 #   ctx/tasks, | ||||
|                 # - needs to wait for the cancel ack msg | ||||
|                 #   (ctxc) or some result in the race case | ||||
|                 #   where the other side's task returns | ||||
|                 #   before the cancel request msg is ever | ||||
|                 #   rxed and processed, | ||||
|                 # - here this surrounding drain loop (which | ||||
|                 #   iterates all ipc msgs until the ack or | ||||
|                 #   an early result arrives) was NOT exiting | ||||
|                 #   since we are the edge case: local task | ||||
|                 #   does not re-raise any ctxc it receives | ||||
|                 #   IFF **it** was the cancellation | ||||
|                 #   requester.. | ||||
|                 # will raise if necessary, ow break from | ||||
|                 # loop presuming any error terminates the | ||||
|                 # context! | ||||
|                 ctx._maybe_raise_remote_err( | ||||
|                     re, | ||||
|                     # NOTE: obvi we don't care if we | ||||
|                     # overran the far end if we're already | ||||
|                     # waiting on a final result (msg). | ||||
|                     # raise_overrun_from_self=False, | ||||
|                     raise_overrun_from_self=raise_overrun, | ||||
|                 ) | ||||
|                 re: Exception|None = ctx._remote_error | ||||
|                 if re: | ||||
|                     log.critical( | ||||
|                         'Remote ctx terminated due to "error" msg:\n' | ||||
|                         f'{re}' | ||||
|                     ) | ||||
|                     assert msg is ctx._cancel_msg | ||||
|                     # NOTE: this solved a super dupe edge case XD | ||||
|                     # this was THE super duper edge case of: | ||||
|                     # - local task opens a remote task, | ||||
|                     # - requests remote cancellation of far end | ||||
|                     #   ctx/tasks, | ||||
|                     # - needs to wait for the cancel ack msg | ||||
|                     #   (ctxc) or some result in the race case | ||||
|                     #   where the other side's task returns | ||||
|                     #   before the cancel request msg is ever | ||||
|                     #   rxed and processed, | ||||
|                     # - here this surrounding drain loop (which | ||||
|                     #   iterates all ipc msgs until the ack or | ||||
|                     #   an early result arrives) was NOT exiting | ||||
|                     #   since we are the edge case: local task | ||||
|                     #   does not re-raise any ctxc it receives | ||||
|                     #   IFF **it** was the cancellation | ||||
|                     #   requester.. | ||||
|                     # will raise if necessary, ow break from | ||||
|                     # loop presuming any error terminates the | ||||
|                     # context! | ||||
|                     ctx._maybe_raise_remote_err( | ||||
|                         re, | ||||
|                         # NOTE: obvi we don't care if we | ||||
|                         # overran the far end if we're already | ||||
|                         # waiting on a final result (msg). | ||||
|                         # raise_overrun_from_self=False, | ||||
|                         raise_overrun_from_self=raise_overrun, | ||||
|                     ) | ||||
| 
 | ||||
|                 break  # OOOOOF, yeah obvi we need this.. | ||||
|                     break  # OOOOOF, yeah obvi we need this.. | ||||
| 
 | ||||
|             # XXX we should never really get here | ||||
|             # right! since `._deliver_msg()` should | ||||
|             # always have detected an {'error': ..} | ||||
|             # msg and already called this right!?! | ||||
|             elif error := unpack_error( | ||||
|                 msg=msg, | ||||
|                 chan=ctx._portal.channel, | ||||
|                 hide_tb=False, | ||||
|             ): | ||||
|                 log.critical('SHOULD NEVER GET HERE!?') | ||||
|                 assert msg is ctx._cancel_msg | ||||
|                 assert error.msgdata == ctx._remote_error.msgdata | ||||
|                 from .devx._debug import pause | ||||
|                 await pause() | ||||
|                 ctx._maybe_cancel_and_set_remote_error(error) | ||||
|                 ctx._maybe_raise_remote_err(error) | ||||
|                 # XXX we should never really get here | ||||
|                 # right! since `._deliver_msg()` should | ||||
|                 # always have detected an {'error': ..} | ||||
|                 # msg and already called this right!?! | ||||
|                 elif error := unpack_error( | ||||
|                     msg=msg, | ||||
|                     chan=ctx._portal.channel, | ||||
|                     hide_tb=False, | ||||
|                 ): | ||||
|                     log.critical('SHOULD NEVER GET HERE!?') | ||||
|                     assert msg is ctx._cancel_msg | ||||
|                     assert error.msgdata == ctx._remote_error.msgdata | ||||
|                     from .devx._debug import pause | ||||
|                     await pause() | ||||
|                     ctx._maybe_cancel_and_set_remote_error(error) | ||||
|                     ctx._maybe_raise_remote_err(error) | ||||
| 
 | ||||
|             else: | ||||
|                 # bubble the original src key error | ||||
|                 raise | ||||
|                 else: | ||||
|                     # bubble the original src key error | ||||
|                     raise | ||||
|     else: | ||||
|         log.cancel( | ||||
|             'Skipping `MsgStream` drain since final outcome is set\n\n' | ||||
|  | @ -710,10 +731,14 @@ class Context: | |||
| 
 | ||||
|     async def send_stop(self) -> None: | ||||
|         # await pause() | ||||
|         await self.chan.send({ | ||||
|             'stop': True, | ||||
|             'cid': self.cid | ||||
|         }) | ||||
|         # await self.chan.send({ | ||||
|         #     # Stop( | ||||
|         #     'stop': True, | ||||
|         #     'cid': self.cid | ||||
|         # }) | ||||
|         await self.chan.send( | ||||
|             Stop(cid=self.cid) | ||||
|         ) | ||||
| 
 | ||||
|     def _maybe_cancel_and_set_remote_error( | ||||
|         self, | ||||
|  | @ -1398,17 +1423,19 @@ class Context: | |||
|             for msg in drained_msgs: | ||||
| 
 | ||||
|                 # TODO: mask this by default.. | ||||
|                 if 'return' in msg: | ||||
|                 # if 'return' in msg: | ||||
|                 if isinstance(msg, Return): | ||||
|                     # from .devx import pause | ||||
|                     # await pause() | ||||
|                     raise InternalError( | ||||
|                     # raise InternalError( | ||||
|                     log.warning( | ||||
|                         'Final `return` msg should never be drained !?!?\n\n' | ||||
|                         f'{msg}\n' | ||||
|                     ) | ||||
| 
 | ||||
|             log.cancel( | ||||
|                 'Ctx drained pre-result msgs:\n' | ||||
|                 f'{drained_msgs}' | ||||
|                 f'{pformat(drained_msgs)}' | ||||
|             ) | ||||
| 
 | ||||
|         self.maybe_raise( | ||||
|  | @ -1616,7 +1643,18 @@ class Context: | |||
|                 f'called `.started()` twice on context with {self.chan.uid}' | ||||
|             ) | ||||
| 
 | ||||
|         await self.chan.send({'started': value, 'cid': self.cid}) | ||||
|         # await self.chan.send( | ||||
|         #     { | ||||
|         #         'started': value, | ||||
|         #          'cid': self.cid, | ||||
|         #     } | ||||
|         # ) | ||||
|         await self.chan.send( | ||||
|             Started( | ||||
|                 cid=self.cid, | ||||
|                 pld=value, | ||||
|             ) | ||||
|         ) | ||||
|         self._started_called = True | ||||
| 
 | ||||
|     async def _drain_overflows( | ||||
|  | @ -1671,7 +1709,8 @@ class Context: | |||
| 
 | ||||
|     async def _deliver_msg( | ||||
|         self, | ||||
|         msg: dict, | ||||
|         # msg: dict, | ||||
|         msg: Msg, | ||||
| 
 | ||||
|     ) -> bool: | ||||
|         ''' | ||||
|  | @ -1855,7 +1894,7 @@ class Context: | |||
|                         # anything different. | ||||
|                         return False | ||||
|             else: | ||||
|                 txt += f'\n{msg}\n' | ||||
|                 # txt += f'\n{msg}\n' | ||||
|                 # raise local overrun and immediately pack as IPC | ||||
|                 # msg for far end. | ||||
|                 try: | ||||
|  | @ -1986,15 +2025,17 @@ async def open_context_from_portal( | |||
|     ) | ||||
| 
 | ||||
|     assert ctx._remote_func_type == 'context' | ||||
|     msg: dict = await ctx._recv_chan.receive() | ||||
|     msg: Started = await ctx._recv_chan.receive() | ||||
| 
 | ||||
|     try: | ||||
|         # the "first" value here is delivered by the callee's | ||||
|         # ``Context.started()`` call. | ||||
|         first: Any = msg['started'] | ||||
|         # first: Any = msg['started'] | ||||
|         first: Any = msg.pld | ||||
|         ctx._started_called: bool = True | ||||
| 
 | ||||
|     except KeyError as src_error: | ||||
|     # except KeyError as src_error: | ||||
|     except AttributeError as src_error: | ||||
|         _raise_from_no_key_in_msg( | ||||
|             ctx=ctx, | ||||
|             msg=msg, | ||||
|  |  | |||
|  | @ -136,6 +136,7 @@ def _trio_main( | |||
|             run_as_asyncio_guest(trio_main) | ||||
|         else: | ||||
|             trio.run(trio_main) | ||||
| 
 | ||||
|     except KeyboardInterrupt: | ||||
|         log.cancel( | ||||
|             'Actor received KBI\n' | ||||
|  |  | |||
|  | @ -31,9 +31,16 @@ import textwrap | |||
| import traceback | ||||
| 
 | ||||
| import trio | ||||
| from msgspec import structs | ||||
| 
 | ||||
| from tractor._state import current_actor | ||||
| from tractor.log import get_logger | ||||
| from tractor.msg import ( | ||||
|     Error, | ||||
|     Msg, | ||||
|     Stop, | ||||
|     Yield, | ||||
| ) | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._context import Context | ||||
|  | @ -135,6 +142,8 @@ class RemoteActorError(Exception): | |||
|         # and instead render if from `.boxed_type_str`? | ||||
|         self._boxed_type: BaseException = boxed_type | ||||
|         self._src_type: BaseException|None = None | ||||
| 
 | ||||
|         # TODO: make this a `.errmsg: Error` throughout? | ||||
|         self.msgdata: dict[str, Any] = msgdata | ||||
| 
 | ||||
|         # TODO: mask out eventually or place in `pack_error()` | ||||
|  | @ -464,7 +473,23 @@ class AsyncioCancelled(Exception): | |||
|     ''' | ||||
| 
 | ||||
| class MessagingError(Exception): | ||||
|     'Some kind of unexpected SC messaging dialog issue' | ||||
|     ''' | ||||
|     IPC related msg (typing), transaction (ordering) or dialog | ||||
|     handling error. | ||||
| 
 | ||||
|     ''' | ||||
| 
 | ||||
| 
 | ||||
| class MsgTypeError(MessagingError): | ||||
|     ''' | ||||
|     Equivalent of a `TypeError` for an IPC wire-message | ||||
|     due to an invalid field value (type). | ||||
| 
 | ||||
|     Normally this is re-raised from some `.msg._codec` | ||||
|     decode error raised by a backend interchange lib | ||||
|     like `msgspec` or `pycapnproto`. | ||||
| 
 | ||||
|     ''' | ||||
| 
 | ||||
| 
 | ||||
| def pack_error( | ||||
|  | @ -473,7 +498,7 @@ def pack_error( | |||
|     tb: str|None = None, | ||||
|     cid: str|None = None, | ||||
| 
 | ||||
| ) -> dict[str, dict]: | ||||
| ) -> Error|dict[str, dict]: | ||||
|     ''' | ||||
|     Create an "error message" which boxes a locally caught | ||||
|     exception's meta-data and encodes it for wire transport via an | ||||
|  | @ -536,17 +561,23 @@ def pack_error( | |||
|     # content's `.msgdata`). | ||||
|     error_msg['tb_str'] = tb_str | ||||
| 
 | ||||
|     pkt: dict = { | ||||
|         'error': error_msg, | ||||
|     } | ||||
|     if cid: | ||||
|         pkt['cid'] = cid | ||||
|     # Error() | ||||
|     # pkt: dict = { | ||||
|     #     'error': error_msg, | ||||
|     # } | ||||
|     pkt: Error = Error( | ||||
|         cid=cid, | ||||
|         **error_msg, | ||||
|         # TODO: just get rid of `.pld` on this msg? | ||||
|     ) | ||||
|     # if cid: | ||||
|     #     pkt['cid'] = cid | ||||
| 
 | ||||
|     return pkt | ||||
| 
 | ||||
| 
 | ||||
| def unpack_error( | ||||
|     msg: dict[str, Any], | ||||
|     msg: dict[str, Any]|Error, | ||||
| 
 | ||||
|     chan: Channel|None = None, | ||||
|     box_type: RemoteActorError = RemoteActorError, | ||||
|  | @ -564,15 +595,17 @@ def unpack_error( | |||
|     ''' | ||||
|     __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|     error_dict: dict[str, dict] | None | ||||
|     if ( | ||||
|         error_dict := msg.get('error') | ||||
|     ) is None: | ||||
|     error_dict: dict[str, dict]|None | ||||
|     if not isinstance(msg, Error): | ||||
|     # if ( | ||||
|     #     error_dict := msg.get('error') | ||||
|     # ) is None: | ||||
|         # no error field, nothing to unpack. | ||||
|         return None | ||||
| 
 | ||||
|     # retrieve the remote error's msg encoded details | ||||
|     tb_str: str = error_dict.get('tb_str', '') | ||||
|     # tb_str: str = error_dict.get('tb_str', '') | ||||
|     tb_str: str = msg.tb_str | ||||
|     message: str = ( | ||||
|         f'{chan.uid}\n' | ||||
|         + | ||||
|  | @ -581,7 +614,8 @@ def unpack_error( | |||
| 
 | ||||
|     # try to lookup a suitable error type from the local runtime | ||||
|     # env then use it to construct a local instance. | ||||
|     boxed_type_str: str = error_dict['boxed_type_str'] | ||||
|     # boxed_type_str: str = error_dict['boxed_type_str'] | ||||
|     boxed_type_str: str = msg.boxed_type_str | ||||
|     boxed_type: Type[BaseException] = get_err_type(boxed_type_str) | ||||
| 
 | ||||
|     if boxed_type_str == 'ContextCancelled': | ||||
|  | @ -595,7 +629,11 @@ def unpack_error( | |||
|     # original source error. | ||||
|     elif boxed_type_str == 'RemoteActorError': | ||||
|         assert boxed_type is RemoteActorError | ||||
|         assert len(error_dict['relay_path']) >= 1 | ||||
|         # assert len(error_dict['relay_path']) >= 1 | ||||
|         assert len(msg.relay_path) >= 1 | ||||
| 
 | ||||
|     # TODO: mk RAE just take the `Error` instance directly? | ||||
|     error_dict: dict = structs.asdict(msg) | ||||
| 
 | ||||
|     exc = box_type( | ||||
|         message, | ||||
|  | @ -623,11 +661,12 @@ def is_multi_cancelled(exc: BaseException) -> bool: | |||
| 
 | ||||
| def _raise_from_no_key_in_msg( | ||||
|     ctx: Context, | ||||
|     msg: dict, | ||||
|     msg: Msg, | ||||
|     src_err: KeyError, | ||||
|     log: StackLevelAdapter,  # caller specific `log` obj | ||||
| 
 | ||||
|     expect_key: str = 'yield', | ||||
|     expect_msg: str = Yield, | ||||
|     stream: MsgStream | None = None, | ||||
| 
 | ||||
|     # allow "deeper" tbs when debugging B^o | ||||
|  | @ -660,8 +699,10 @@ def _raise_from_no_key_in_msg( | |||
| 
 | ||||
|     # an internal error should never get here | ||||
|     try: | ||||
|         cid: str = msg['cid'] | ||||
|     except KeyError as src_err: | ||||
|         cid: str = msg.cid | ||||
|         # cid: str = msg['cid'] | ||||
|     # except KeyError as src_err: | ||||
|     except AttributeError as src_err: | ||||
|         raise MessagingError( | ||||
|             f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n' | ||||
|             f'cid: {cid}\n\n' | ||||
|  | @ -672,7 +713,10 @@ def _raise_from_no_key_in_msg( | |||
|     # TODO: test that shows stream raising an expected error!!! | ||||
| 
 | ||||
|     # raise the error message in a boxed exception type! | ||||
|     if msg.get('error'): | ||||
|     # if msg.get('error'): | ||||
|     if isinstance(msg, Error): | ||||
|     # match msg: | ||||
|     #     case Error(): | ||||
|         raise unpack_error( | ||||
|             msg, | ||||
|             ctx.chan, | ||||
|  | @ -683,8 +727,10 @@ def _raise_from_no_key_in_msg( | |||
|     # `MsgStream` termination msg. | ||||
|     # TODO: does it make more sense to pack  | ||||
|     # the stream._eoc outside this in the calleer always? | ||||
|         # case Stop(): | ||||
|     elif ( | ||||
|         msg.get('stop') | ||||
|         # msg.get('stop') | ||||
|         isinstance(msg, Stop) | ||||
|         or ( | ||||
|             stream | ||||
|             and stream._eoc | ||||
|  | @ -725,14 +771,16 @@ def _raise_from_no_key_in_msg( | |||
|         stream | ||||
|         and stream._closed | ||||
|     ): | ||||
|         raise trio.ClosedResourceError('This stream was closed') | ||||
| 
 | ||||
|         # TODO: our own error subtype? | ||||
|         raise trio.ClosedResourceError( | ||||
|             'This stream was closed' | ||||
|         ) | ||||
| 
 | ||||
|     # always re-raise the source error if no translation error case | ||||
|     # is activated above. | ||||
|     _type: str = 'Stream' if stream else 'Context' | ||||
|     raise MessagingError( | ||||
|         f"{_type} was expecting a '{expect_key}' message" | ||||
|         f"{_type} was expecting a '{expect_key.upper()}' message" | ||||
|         " BUT received a non-error msg:\n" | ||||
|         f'{pformat(msg)}' | ||||
|     ) from src_err | ||||
|  |  | |||
|  | @ -38,17 +38,23 @@ from typing import ( | |||
|     Protocol, | ||||
|     Type, | ||||
|     TypeVar, | ||||
|     Union, | ||||
| ) | ||||
| 
 | ||||
| import msgspec | ||||
| from tricycle import BufferedReceiveStream | ||||
| import trio | ||||
| 
 | ||||
| from tractor.log import get_logger | ||||
| from tractor._exceptions import TransportClosed | ||||
| from tractor._exceptions import ( | ||||
|     TransportClosed, | ||||
|     MsgTypeError, | ||||
| ) | ||||
| from tractor.msg import ( | ||||
|     _ctxvar_MsgCodec, | ||||
|     _codec, | ||||
|     MsgCodec, | ||||
|     mk_codec, | ||||
|     types, | ||||
| ) | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
|  | @ -163,7 +169,16 @@ class MsgpackTCPStream(MsgTransport): | |||
| 
 | ||||
|         # allow for custom IPC msg interchange format | ||||
|         # dynamic override Bo | ||||
|         self.codec: MsgCodec = codec or mk_codec() | ||||
|         self._task = trio.lowlevel.current_task() | ||||
|         self._codec: MsgCodec = ( | ||||
|             codec | ||||
|             or | ||||
|             _codec._ctxvar_MsgCodec.get() | ||||
|         ) | ||||
|         log.critical( | ||||
|             '!?!: USING STD `tractor` CODEC !?!?\n' | ||||
|             f'{self._codec}\n' | ||||
|         ) | ||||
| 
 | ||||
|     async def _iter_packets(self) -> AsyncGenerator[dict, None]: | ||||
|         ''' | ||||
|  | @ -171,7 +186,6 @@ class MsgpackTCPStream(MsgTransport): | |||
|         stream using the current task's `MsgCodec`. | ||||
| 
 | ||||
|         ''' | ||||
|         import msgspec  # noqa | ||||
|         decodes_failed: int = 0 | ||||
| 
 | ||||
|         while True: | ||||
|  | @ -206,7 +220,19 @@ class MsgpackTCPStream(MsgTransport): | |||
|             try: | ||||
|                 # NOTE: lookup the `trio.Task.context`'s var for | ||||
|                 # the current `MsgCodec`. | ||||
|                 yield  _ctxvar_MsgCodec.get().decode(msg_bytes) | ||||
|                 codec: MsgCodec = _ctxvar_MsgCodec.get() | ||||
|                 if self._codec.pld_spec != codec.pld_spec: | ||||
|                     # assert ( | ||||
|                     #     task := trio.lowlevel.current_task() | ||||
|                     # ) is not self._task | ||||
|                     # self._task = task | ||||
|                     self._codec = codec | ||||
|                     log.critical( | ||||
|                         '.recv() USING NEW CODEC !?!?\n' | ||||
|                         f'{self._codec}\n\n' | ||||
|                         f'msg_bytes -> {msg_bytes}\n' | ||||
|                     ) | ||||
|                 yield codec.decode(msg_bytes) | ||||
| 
 | ||||
|                 # TODO: remove, was only for orig draft impl | ||||
|                 # testing. | ||||
|  | @ -221,6 +247,41 @@ class MsgpackTCPStream(MsgTransport): | |||
|                 # | ||||
|                 # yield obj | ||||
| 
 | ||||
|             # XXX NOTE: since the below error derives from | ||||
|             # `DecodeError` we need to catch is specially | ||||
|             # and always raise such that spec violations | ||||
|             # are never allowed to be caught silently! | ||||
|             except msgspec.ValidationError as verr: | ||||
| 
 | ||||
|                 # decode the msg-bytes using the std msgpack | ||||
|                 # interchange-prot (i.e. without any | ||||
|                 # `msgspec.Struct` handling) so that we can | ||||
|                 # determine what `.msg.types.Msg` is the culprit | ||||
|                 # by reporting the received value. | ||||
|                 msg_dict: dict = msgspec.msgpack.decode(msg_bytes) | ||||
|                 msg_type_name: str = msg_dict['msg_type'] | ||||
|                 msg_type = getattr(types, msg_type_name) | ||||
|                 errmsg: str = ( | ||||
|                     f'Received invalid IPC `{msg_type_name}` msg\n\n' | ||||
|                 ) | ||||
| 
 | ||||
|                 # XXX see if we can determine the exact invalid field | ||||
|                 # such that we can comprehensively report the | ||||
|                 # specific field's type problem | ||||
|                 msgspec_msg: str = verr.args[0].rstrip('`') | ||||
|                 msg, _, maybe_field = msgspec_msg.rpartition('$.') | ||||
|                 if field_val := msg_dict.get(maybe_field): | ||||
|                     field_type: Union[Type] = msg_type.__signature__.parameters[ | ||||
|                         maybe_field | ||||
|                     ].annotation | ||||
|                     errmsg += ( | ||||
|                         f'{msg.rstrip("`")}\n\n' | ||||
|                         f'{msg_type}\n' | ||||
|                         f' |_.{maybe_field}: {field_type} = {field_val}\n' | ||||
|                     ) | ||||
| 
 | ||||
|                 raise MsgTypeError(errmsg) from verr | ||||
| 
 | ||||
|             except ( | ||||
|                 msgspec.DecodeError, | ||||
|                 UnicodeDecodeError, | ||||
|  | @ -230,14 +291,15 @@ class MsgpackTCPStream(MsgTransport): | |||
|                     # do with a channel drop - hope that receiving from the | ||||
|                     # channel will raise an expected error and bubble up. | ||||
|                     try: | ||||
|                         msg_str: str | bytes = msg_bytes.decode() | ||||
|                         msg_str: str|bytes = msg_bytes.decode() | ||||
|                     except UnicodeDecodeError: | ||||
|                         msg_str = msg_bytes | ||||
| 
 | ||||
|                     log.error( | ||||
|                         '`msgspec` failed to decode!?\n' | ||||
|                         'dumping bytes:\n' | ||||
|                         f'{msg_str!r}' | ||||
|                     log.exception( | ||||
|                         'Failed to decode msg?\n' | ||||
|                         f'{codec}\n\n' | ||||
|                         'Rxed bytes from wire:\n\n' | ||||
|                         f'{msg_str!r}\n' | ||||
|                     ) | ||||
|                     decodes_failed += 1 | ||||
|                 else: | ||||
|  | @ -258,8 +320,21 @@ class MsgpackTCPStream(MsgTransport): | |||
| 
 | ||||
|             # NOTE: lookup the `trio.Task.context`'s var for | ||||
|             # the current `MsgCodec`. | ||||
|             bytes_data: bytes = _ctxvar_MsgCodec.get().encode(msg) | ||||
|             # bytes_data: bytes = self.codec.encode(msg) | ||||
|             codec: MsgCodec = _ctxvar_MsgCodec.get() | ||||
|             # if self._codec != codec: | ||||
|             if self._codec.pld_spec != codec.pld_spec: | ||||
|                 self._codec = codec | ||||
|                 log.critical( | ||||
|                     '.send() using NEW CODEC !?!?\n' | ||||
|                     f'{self._codec}\n\n' | ||||
|                     f'OBJ -> {msg}\n' | ||||
|                 ) | ||||
|             if type(msg) not in types.__spec__: | ||||
|                 log.warning( | ||||
|                     'Sending non-`Msg`-spec msg?\n\n' | ||||
|                     f'{msg}\n' | ||||
|                 ) | ||||
|             bytes_data: bytes = codec.encode(msg) | ||||
| 
 | ||||
|             # supposedly the fastest says, | ||||
|             # https://stackoverflow.com/a/54027962 | ||||
|  |  | |||
|  | @ -45,7 +45,10 @@ from ._state import ( | |||
| ) | ||||
| from ._ipc import Channel | ||||
| from .log import get_logger | ||||
| from .msg import NamespacePath | ||||
| from .msg import ( | ||||
|     NamespacePath, | ||||
|     Return, | ||||
| ) | ||||
| from ._exceptions import ( | ||||
|     unpack_error, | ||||
|     NoResult, | ||||
|  | @ -66,7 +69,8 @@ log = get_logger(__name__) | |||
| # `._raise_from_no_key_in_msg()` (after tweak to | ||||
| # accept a `chan: Channel` arg) in key block! | ||||
| def _unwrap_msg( | ||||
|     msg: dict[str, Any], | ||||
|     # msg: dict[str, Any], | ||||
|     msg: Return, | ||||
|     channel: Channel, | ||||
| 
 | ||||
|     hide_tb: bool = True, | ||||
|  | @ -79,18 +83,21 @@ def _unwrap_msg( | |||
|     __tracebackhide__: bool = hide_tb | ||||
| 
 | ||||
|     try: | ||||
|         return msg['return'] | ||||
|     except KeyError as ke: | ||||
|         return msg.pld | ||||
|         # return msg['return'] | ||||
|     # except KeyError as ke: | ||||
|     except AttributeError as err: | ||||
| 
 | ||||
|         # internal error should never get here | ||||
|         assert msg.get('cid'), ( | ||||
|         # assert msg.get('cid'), ( | ||||
|         assert msg.cid, ( | ||||
|             "Received internal error at portal?" | ||||
|         ) | ||||
| 
 | ||||
|         raise unpack_error( | ||||
|             msg, | ||||
|             channel | ||||
|         ) from ke | ||||
|         ) from err | ||||
| 
 | ||||
| 
 | ||||
| class Portal: | ||||
|  |  | |||
							
								
								
									
										644
									
								
								tractor/_rpc.py
								
								
								
								
							
							
						
						
									
										644
									
								
								tractor/_rpc.py
								
								
								
								
							|  | @ -57,6 +57,15 @@ from ._exceptions import ( | |||
| from .devx import _debug | ||||
| from . import _state | ||||
| from .log import get_logger | ||||
| from tractor.msg.types import ( | ||||
|     Start, | ||||
|     StartAck, | ||||
|     Started, | ||||
|     Stop, | ||||
|     Yield, | ||||
|     Return, | ||||
|     Error, | ||||
| ) | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._runtime import Actor | ||||
|  | @ -84,10 +93,13 @@ async def _invoke_non_context( | |||
| 
 | ||||
|     # TODO: can we unify this with the `context=True` impl below? | ||||
|     if inspect.isasyncgen(coro): | ||||
|         await chan.send({ | ||||
|             'cid': cid, | ||||
|             'functype': 'asyncgen', | ||||
|         }) | ||||
|         # await chan.send({ | ||||
|         await chan.send( | ||||
|             StartAck( | ||||
|                 cid=cid, | ||||
|                 functype='asyncgen', | ||||
|             ) | ||||
|         ) | ||||
|         # XXX: massive gotcha! If the containing scope | ||||
|         # is cancelled and we execute the below line, | ||||
|         # any ``ActorNursery.__aexit__()`` WON'T be | ||||
|  | @ -107,27 +119,45 @@ async def _invoke_non_context( | |||
|                     # to_send = await chan.recv_nowait() | ||||
|                     # if to_send is not None: | ||||
|                     #     to_yield = await coro.asend(to_send) | ||||
|                     await chan.send({ | ||||
|                         'yield': item, | ||||
|                         'cid': cid, | ||||
|                     }) | ||||
|                     # await chan.send({ | ||||
|                     #     # Yield() | ||||
|                     #     'cid': cid, | ||||
|                     #     'yield': item, | ||||
|                     # }) | ||||
|                     await chan.send( | ||||
|                         Yield( | ||||
|                             cid=cid, | ||||
|                             pld=item, | ||||
|                         ) | ||||
|                     ) | ||||
| 
 | ||||
|         log.runtime(f"Finished iterating {coro}") | ||||
|         # TODO: we should really support a proper | ||||
|         # `StopAsyncIteration` system here for returning a final | ||||
|         # value if desired | ||||
|         await chan.send({ | ||||
|             'stop': True, | ||||
|             'cid': cid, | ||||
|         }) | ||||
|         await chan.send( | ||||
|             Stop(cid=cid) | ||||
|         ) | ||||
|         # await chan.send({ | ||||
|         #     # Stop( | ||||
|         #     'cid': cid, | ||||
|         #     'stop': True, | ||||
|         # }) | ||||
| 
 | ||||
|     # one way @stream func that gets treated like an async gen | ||||
|     # TODO: can we unify this with the `context=True` impl below? | ||||
|     elif treat_as_gen: | ||||
|         await chan.send({ | ||||
|             'cid': cid, | ||||
|             'functype': 'asyncgen', | ||||
|         }) | ||||
|         await chan.send( | ||||
|             StartAck( | ||||
|                 cid=cid, | ||||
|                 functype='asyncgen', | ||||
|             ) | ||||
|         ) | ||||
|         # await chan.send({ | ||||
|         #     # StartAck() | ||||
|         #     'cid': cid, | ||||
|         #     'functype': 'asyncgen', | ||||
|         # }) | ||||
|         # XXX: the async-func may spawn further tasks which push | ||||
|         # back values like an async-generator would but must | ||||
|         # manualy construct the response dict-packet-responses as | ||||
|  | @ -140,10 +170,14 @@ async def _invoke_non_context( | |||
|         if not cs.cancelled_caught: | ||||
|             # task was not cancelled so we can instruct the | ||||
|             # far end async gen to tear down | ||||
|             await chan.send({ | ||||
|                 'stop': True, | ||||
|                 'cid': cid | ||||
|             }) | ||||
|             await chan.send( | ||||
|                 Stop(cid=cid) | ||||
|             ) | ||||
|             # await chan.send({ | ||||
|             #     # Stop( | ||||
|             #     'cid': cid, | ||||
|             #     'stop': True, | ||||
|             # }) | ||||
|     else: | ||||
|         # regular async function/method | ||||
|         # XXX: possibly just a scheduled `Actor._cancel_task()` | ||||
|  | @ -155,10 +189,17 @@ async def _invoke_non_context( | |||
|         # way: using the linked IPC context machinery. | ||||
|         failed_resp: bool = False | ||||
|         try: | ||||
|             await chan.send({ | ||||
|                 'functype': 'asyncfunc', | ||||
|                 'cid': cid | ||||
|             }) | ||||
|             await chan.send( | ||||
|                 StartAck( | ||||
|                     cid=cid, | ||||
|                     functype='asyncfunc', | ||||
|                 ) | ||||
|             ) | ||||
|             # await chan.send({ | ||||
|             #     # StartAck() | ||||
|             #     'cid': cid, | ||||
|             #     'functype': 'asyncfunc', | ||||
|             # }) | ||||
|         except ( | ||||
|             trio.ClosedResourceError, | ||||
|             trio.BrokenResourceError, | ||||
|  | @ -192,10 +233,17 @@ async def _invoke_non_context( | |||
|                 and chan.connected() | ||||
|             ): | ||||
|                 try: | ||||
|                     await chan.send({ | ||||
|                         'return': result, | ||||
|                         'cid': cid, | ||||
|                     }) | ||||
|                     # await chan.send({ | ||||
|                     #     # Return() | ||||
|                     #     'cid': cid, | ||||
|                     #     'return': result, | ||||
|                     # }) | ||||
|                     await chan.send( | ||||
|                         Return( | ||||
|                             cid=cid, | ||||
|                             pld=result, | ||||
|                         ) | ||||
|                     ) | ||||
|                 except ( | ||||
|                     BrokenPipeError, | ||||
|                     trio.BrokenResourceError, | ||||
|  | @ -376,6 +424,8 @@ async def _invoke( | |||
|         # XXX for .pause_from_sync()` usage we need to make sure | ||||
|         # `greenback` is boostrapped in the subactor! | ||||
|         await _debug.maybe_init_greenback() | ||||
|     # else: | ||||
|     #     await pause() | ||||
| 
 | ||||
|     # TODO: possibly a specially formatted traceback | ||||
|     # (not sure what typing is for this..)? | ||||
|  | @ -488,10 +538,18 @@ async def _invoke( | |||
|         # a "context" endpoint type is the most general and | ||||
|         # "least sugary" type of RPC ep with support for | ||||
|         # bi-dir streaming B) | ||||
|         await chan.send({ | ||||
|             'cid': cid, | ||||
|             'functype': 'context', | ||||
|         }) | ||||
|         # StartAck | ||||
|         await chan.send( | ||||
|             StartAck( | ||||
|                 cid=cid, | ||||
|                 functype='context', | ||||
|             ) | ||||
|         ) | ||||
|         # await chan.send({ | ||||
|         #     # StartAck() | ||||
|         #     'cid': cid, | ||||
|         #     'functype': 'context', | ||||
|         # }) | ||||
| 
 | ||||
|         # TODO: should we also use an `.open_context()` equiv | ||||
|         # for this callee side by factoring the impl from | ||||
|  | @ -515,10 +573,17 @@ async def _invoke( | |||
|                 ctx._result = res | ||||
| 
 | ||||
|                 # deliver final result to caller side. | ||||
|                 await chan.send({ | ||||
|                     'return': res, | ||||
|                     'cid': cid | ||||
|                 }) | ||||
|                 await chan.send( | ||||
|                     Return( | ||||
|                         cid=cid, | ||||
|                         pld=res, | ||||
|                     ) | ||||
|                 ) | ||||
|                 # await chan.send({ | ||||
|                 #     # Return() | ||||
|                 #     'cid': cid, | ||||
|                 #     'return': res, | ||||
|                 # }) | ||||
| 
 | ||||
|             # NOTE: this happens IFF `ctx._scope.cancel()` is | ||||
|             # called by any of, | ||||
|  | @ -691,7 +756,8 @@ async def try_ship_error_to_remote( | |||
|         try: | ||||
|             # NOTE: normally only used for internal runtime errors | ||||
|             # so ship to peer actor without a cid. | ||||
|             msg: dict = pack_error( | ||||
|             # msg: dict = pack_error( | ||||
|             msg: Error = pack_error( | ||||
|                 err, | ||||
|                 cid=cid, | ||||
| 
 | ||||
|  | @ -707,12 +773,13 @@ async def try_ship_error_to_remote( | |||
|             trio.BrokenResourceError, | ||||
|             BrokenPipeError, | ||||
|         ): | ||||
|             err_msg: dict = msg['error']['tb_str'] | ||||
|             # err_msg: dict = msg['error']['tb_str'] | ||||
|             log.critical( | ||||
|                 'IPC transport failure -> ' | ||||
|                 f'failed to ship error to {remote_descr}!\n\n' | ||||
|                 f'X=> {channel.uid}\n\n' | ||||
|                 f'{err_msg}\n' | ||||
|                 # f'{err_msg}\n' | ||||
|                 f'{msg}\n' | ||||
|             ) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -772,31 +839,6 @@ async def process_messages( | |||
|         with CancelScope(shield=shield) as loop_cs: | ||||
|             task_status.started(loop_cs) | ||||
|             async for msg in chan: | ||||
| 
 | ||||
|                 # dedicated loop terminate sentinel | ||||
|                 if msg is None: | ||||
| 
 | ||||
|                     tasks: dict[ | ||||
|                         tuple[Channel, str], | ||||
|                         tuple[Context, Callable, trio.Event] | ||||
|                     ] = actor._rpc_tasks.copy() | ||||
|                     log.cancel( | ||||
|                         f'Peer IPC channel terminated via `None` setinel msg?\n' | ||||
|                         f'=> Cancelling all {len(tasks)} local RPC tasks..\n' | ||||
|                         f'peer: {chan.uid}\n' | ||||
|                         f'|_{chan}\n' | ||||
|                     ) | ||||
|                     for (channel, cid) in tasks: | ||||
|                         if channel is chan: | ||||
|                             await actor._cancel_task( | ||||
|                                 cid, | ||||
|                                 channel, | ||||
|                                 requesting_uid=channel.uid, | ||||
| 
 | ||||
|                                 ipc_msg=msg, | ||||
|                             ) | ||||
|                     break | ||||
| 
 | ||||
|                 log.transport(   # type: ignore | ||||
|                     f'<= IPC msg from peer: {chan.uid}\n\n' | ||||
| 
 | ||||
|  | @ -806,216 +848,294 @@ async def process_messages( | |||
|                     f'{pformat(msg)}\n' | ||||
|                 ) | ||||
| 
 | ||||
|                 cid = msg.get('cid') | ||||
|                 if cid: | ||||
|                     # deliver response to local caller/waiter | ||||
|                     # via its per-remote-context memory channel. | ||||
|                     await actor._push_result( | ||||
|                         chan, | ||||
|                         cid, | ||||
|                         msg, | ||||
|                     ) | ||||
|                 match msg: | ||||
| 
 | ||||
|                     log.runtime( | ||||
|                         'Waiting on next IPC msg from\n' | ||||
|                         f'peer: {chan.uid}:\n' | ||||
|                         f'|_{chan}\n' | ||||
|                 # if msg is None: | ||||
|                 # dedicated loop terminate sentinel | ||||
|                     case None: | ||||
| 
 | ||||
|                         # f'last msg: {msg}\n' | ||||
|                     ) | ||||
|                     continue | ||||
| 
 | ||||
|                 # process a 'cmd' request-msg upack | ||||
|                 # TODO: impl with native `msgspec.Struct` support !! | ||||
|                 # -[ ] implement with ``match:`` syntax? | ||||
|                 # -[ ] discard un-authed msgs as per, | ||||
|                 # <TODO put issue for typed msging structs> | ||||
|                 try: | ||||
|                     ( | ||||
|                         ns, | ||||
|                         funcname, | ||||
|                         kwargs, | ||||
|                         actorid, | ||||
|                         cid, | ||||
|                     ) = msg['cmd'] | ||||
| 
 | ||||
|                 except KeyError: | ||||
|                     # This is the non-rpc error case, that is, an | ||||
|                     # error **not** raised inside a call to ``_invoke()`` | ||||
|                     # (i.e. no cid was provided in the msg - see above). | ||||
|                     # Push this error to all local channel consumers | ||||
|                     # (normally portals) by marking the channel as errored | ||||
|                     assert chan.uid | ||||
|                     exc = unpack_error(msg, chan=chan) | ||||
|                     chan._exc = exc | ||||
|                     raise exc | ||||
| 
 | ||||
|                 log.runtime( | ||||
|                     'Handling RPC cmd from\n' | ||||
|                     f'peer: {actorid}\n' | ||||
|                     '\n' | ||||
|                     f'=> {ns}.{funcname}({kwargs})\n' | ||||
|                 ) | ||||
|                 if ns == 'self': | ||||
|                     if funcname == 'cancel': | ||||
|                         func: Callable = actor.cancel | ||||
|                         kwargs |= { | ||||
|                             'req_chan': chan, | ||||
|                         } | ||||
| 
 | ||||
|                         # don't start entire actor runtime cancellation | ||||
|                         # if this actor is currently in debug mode! | ||||
|                         pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete | ||||
|                         if pdb_complete: | ||||
|                             await pdb_complete.wait() | ||||
| 
 | ||||
|                         # Either of  `Actor.cancel()`/`.cancel_soon()` | ||||
|                         # was called, so terminate this IPC msg | ||||
|                         # loop, exit back out into `async_main()`, | ||||
|                         # and immediately start the core runtime | ||||
|                         # machinery shutdown! | ||||
|                         with CancelScope(shield=True): | ||||
|                             await _invoke( | ||||
|                                 actor, | ||||
|                                 cid, | ||||
|                                 chan, | ||||
|                                 func, | ||||
|                                 kwargs, | ||||
|                                 is_rpc=False, | ||||
|                             ) | ||||
| 
 | ||||
|                         log.runtime( | ||||
|                             'Cancelling IPC transport msg-loop with peer:\n' | ||||
|                         tasks: dict[ | ||||
|                             tuple[Channel, str], | ||||
|                             tuple[Context, Callable, trio.Event] | ||||
|                         ] = actor._rpc_tasks.copy() | ||||
|                         log.cancel( | ||||
|                             f'Peer IPC channel terminated via `None` setinel msg?\n' | ||||
|                             f'=> Cancelling all {len(tasks)} local RPC tasks..\n' | ||||
|                             f'peer: {chan.uid}\n' | ||||
|                             f'|_{chan}\n' | ||||
|                         ) | ||||
|                         loop_cs.cancel() | ||||
|                         for (channel, cid) in tasks: | ||||
|                             if channel is chan: | ||||
|                                 await actor._cancel_task( | ||||
|                                     cid, | ||||
|                                     channel, | ||||
|                                     requesting_uid=channel.uid, | ||||
| 
 | ||||
|                                     ipc_msg=msg, | ||||
|                                 ) | ||||
|                         break | ||||
| 
 | ||||
|                     if funcname == '_cancel_task': | ||||
|                         func: Callable = actor._cancel_task | ||||
| 
 | ||||
|                         # we immediately start the runtime machinery | ||||
|                         # shutdown | ||||
|                         # with CancelScope(shield=True): | ||||
|                         target_cid: str = kwargs['cid'] | ||||
|                         kwargs |= { | ||||
|                             # NOTE: ONLY the rpc-task-owning | ||||
|                             # parent IPC channel should be able to | ||||
|                             # cancel it! | ||||
|                             'parent_chan': chan, | ||||
|                             'requesting_uid': chan.uid, | ||||
|                             'ipc_msg': msg, | ||||
|                         } | ||||
|                         # TODO: remove? already have emit in meth. | ||||
|                         # log.runtime( | ||||
|                         #     f'Rx RPC task cancel request\n' | ||||
|                         #     f'<= canceller: {chan.uid}\n' | ||||
|                         #     f'  |_{chan}\n\n' | ||||
|                         #     f'=> {actor}\n' | ||||
|                         #     f'  |_cid: {target_cid}\n' | ||||
|                         # ) | ||||
|                         try: | ||||
|                             await _invoke( | ||||
|                                 actor, | ||||
|                                 cid, | ||||
|                                 chan, | ||||
|                                 func, | ||||
|                                 kwargs, | ||||
|                                 is_rpc=False, | ||||
|                             ) | ||||
|                         except BaseException: | ||||
|                             log.exception( | ||||
|                                 'Failed to cancel task?\n' | ||||
|                                 f'<= canceller: {chan.uid}\n' | ||||
|                                 f'  |_{chan}\n\n' | ||||
|                                 f'=> {actor}\n' | ||||
|                                 f'  |_cid: {target_cid}\n' | ||||
|                             ) | ||||
|                         continue | ||||
|                     else: | ||||
|                         # normally registry methods, eg. | ||||
|                         # ``.register_actor()`` etc. | ||||
|                         func: Callable = getattr(actor, funcname) | ||||
| 
 | ||||
|                 else: | ||||
|                     # complain to client about restricted modules | ||||
|                     try: | ||||
|                         func = actor._get_rpc_func(ns, funcname) | ||||
|                     except ( | ||||
|                         ModuleNotExposed, | ||||
|                         AttributeError, | ||||
|                     ) as err: | ||||
|                         err_msg: dict[str, dict] = pack_error( | ||||
|                             err, | ||||
|                             cid=cid, | ||||
|                         ) | ||||
|                         await chan.send(err_msg) | ||||
|                         continue | ||||
| 
 | ||||
|                 # schedule a task for the requested RPC function | ||||
|                 # in the actor's main "service nursery". | ||||
|                 # TODO: possibly a service-tn per IPC channel for | ||||
|                 # supervision isolation? would avoid having to | ||||
|                 # manage RPC tasks individually in `._rpc_tasks` | ||||
|                 # table? | ||||
|                 log.runtime( | ||||
|                     f'Spawning task for RPC request\n' | ||||
|                     f'<= caller: {chan.uid}\n' | ||||
|                     f'  |_{chan}\n\n' | ||||
|                     # TODO: maddr style repr? | ||||
|                     # f'  |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' | ||||
|                     # f'cid="{cid[-16:]} .."\n\n' | ||||
| 
 | ||||
|                     f'=> {actor}\n' | ||||
|                     f'  |_cid: {cid}\n' | ||||
|                     f'   |>> {func}()\n' | ||||
|                 ) | ||||
|                 assert actor._service_n  # wait why? do it at top? | ||||
|                 try: | ||||
|                     ctx: Context = await actor._service_n.start( | ||||
|                         partial( | ||||
|                             _invoke, | ||||
|                             actor, | ||||
|                             cid, | ||||
|                 # cid = msg.get('cid') | ||||
|                 # if cid: | ||||
|                     case ( | ||||
|                         StartAck(cid=cid) | ||||
|                         | Started(cid=cid) | ||||
|                         | Yield(cid=cid) | ||||
|                         | Stop(cid=cid) | ||||
|                         | Return(cid=cid) | ||||
|                         | Error(cid=cid) | ||||
|                     ): | ||||
|                         # deliver response to local caller/waiter | ||||
|                         # via its per-remote-context memory channel. | ||||
|                         await actor._push_result( | ||||
|                             chan, | ||||
|                             func, | ||||
|                             kwargs, | ||||
|                         ), | ||||
|                         name=funcname, | ||||
|                     ) | ||||
|                             cid, | ||||
|                             msg, | ||||
|                         ) | ||||
| 
 | ||||
|                 except ( | ||||
|                     RuntimeError, | ||||
|                     BaseExceptionGroup, | ||||
|                 ): | ||||
|                     # avoid reporting a benign race condition | ||||
|                     # during actor runtime teardown. | ||||
|                     nursery_cancelled_before_task: bool = True | ||||
|                     break | ||||
|                         log.runtime( | ||||
|                             'Waiting on next IPC msg from\n' | ||||
|                             f'peer: {chan.uid}:\n' | ||||
|                             f'|_{chan}\n' | ||||
| 
 | ||||
|                 # in the lone case where a ``Context`` is not | ||||
|                 # delivered, it's likely going to be a locally | ||||
|                 # scoped exception from ``_invoke()`` itself. | ||||
|                 if isinstance(err := ctx, Exception): | ||||
|                     log.warning( | ||||
|                         'Task for RPC failed?' | ||||
|                         f'|_ {func}()\n\n' | ||||
|                             # f'last msg: {msg}\n' | ||||
|                         ) | ||||
|                         continue | ||||
| 
 | ||||
|                         f'{err}' | ||||
|                     ) | ||||
|                     continue | ||||
|                     # process a 'cmd' request-msg upack | ||||
|                     # TODO: impl with native `msgspec.Struct` support !! | ||||
|                     # -[ ] implement with ``match:`` syntax? | ||||
|                     # -[ ] discard un-authed msgs as per, | ||||
|                     # <TODO put issue for typed msging structs> | ||||
|                     case Start( | ||||
|                         cid=cid, | ||||
|                         ns=ns, | ||||
|                         func=funcname, | ||||
|                         kwargs=kwargs, | ||||
|                         uid=actorid, | ||||
|                     ): | ||||
|                         # try: | ||||
|                         #     ( | ||||
|                         #         ns, | ||||
|                         #         funcname, | ||||
|                         #         kwargs, | ||||
|                         #         actorid, | ||||
|                         #         cid, | ||||
|                         #     ) = msg['cmd'] | ||||
| 
 | ||||
|                 else: | ||||
|                     # mark that we have ongoing rpc tasks | ||||
|                     actor._ongoing_rpc_tasks = trio.Event() | ||||
|                         # # TODO: put in `case Error():` right? | ||||
|                         # except KeyError: | ||||
|                         #     # This is the non-rpc error case, that is, an | ||||
|                         #     # error **not** raised inside a call to ``_invoke()`` | ||||
|                         #     # (i.e. no cid was provided in the msg - see above). | ||||
|                         #     # Push this error to all local channel consumers | ||||
|                         #     # (normally portals) by marking the channel as errored | ||||
|                         #     assert chan.uid | ||||
|                         #     exc = unpack_error(msg, chan=chan) | ||||
|                         #     chan._exc = exc | ||||
|                         #     raise exc | ||||
| 
 | ||||
|                     # store cancel scope such that the rpc task can be | ||||
|                     # cancelled gracefully if requested | ||||
|                     actor._rpc_tasks[(chan, cid)] = ( | ||||
|                         ctx, | ||||
|                         func, | ||||
|                         trio.Event(), | ||||
|                     ) | ||||
|                         log.runtime( | ||||
|                             'Handling RPC `Start` request from\n' | ||||
|                             f'peer: {actorid}\n' | ||||
|                             '\n' | ||||
|                             f'=> {ns}.{funcname}({kwargs})\n' | ||||
|                         ) | ||||
|                         # case Start( | ||||
|                         #     ns='self', | ||||
|                         #     funcname='cancel', | ||||
|                         # ): | ||||
|                         if ns == 'self': | ||||
|                             if funcname == 'cancel': | ||||
|                                 func: Callable = actor.cancel | ||||
|                                 kwargs |= { | ||||
|                                     'req_chan': chan, | ||||
|                                 } | ||||
| 
 | ||||
|                                 # don't start entire actor runtime cancellation | ||||
|                                 # if this actor is currently in debug mode! | ||||
|                                 pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete | ||||
|                                 if pdb_complete: | ||||
|                                     await pdb_complete.wait() | ||||
| 
 | ||||
|                                 # Either of  `Actor.cancel()`/`.cancel_soon()` | ||||
|                                 # was called, so terminate this IPC msg | ||||
|                                 # loop, exit back out into `async_main()`, | ||||
|                                 # and immediately start the core runtime | ||||
|                                 # machinery shutdown! | ||||
|                                 with CancelScope(shield=True): | ||||
|                                     await _invoke( | ||||
|                                         actor, | ||||
|                                         cid, | ||||
|                                         chan, | ||||
|                                         func, | ||||
|                                         kwargs, | ||||
|                                         is_rpc=False, | ||||
|                                     ) | ||||
| 
 | ||||
|                                 log.runtime( | ||||
|                                     'Cancelling IPC transport msg-loop with peer:\n' | ||||
|                                     f'|_{chan}\n' | ||||
|                                 ) | ||||
|                                 loop_cs.cancel() | ||||
|                                 break | ||||
| 
 | ||||
|                         # case Start( | ||||
|                         #     ns='self', | ||||
|                         #     funcname='_cancel_task', | ||||
|                         # ): | ||||
|                             if funcname == '_cancel_task': | ||||
|                                 func: Callable = actor._cancel_task | ||||
| 
 | ||||
|                                 # we immediately start the runtime machinery | ||||
|                                 # shutdown | ||||
|                                 # with CancelScope(shield=True): | ||||
|                                 target_cid: str = kwargs['cid'] | ||||
|                                 kwargs |= { | ||||
|                                     # NOTE: ONLY the rpc-task-owning | ||||
|                                     # parent IPC channel should be able to | ||||
|                                     # cancel it! | ||||
|                                     'parent_chan': chan, | ||||
|                                     'requesting_uid': chan.uid, | ||||
|                                     'ipc_msg': msg, | ||||
|                                 } | ||||
|                                 # TODO: remove? already have emit in meth. | ||||
|                                 # log.runtime( | ||||
|                                 #     f'Rx RPC task cancel request\n' | ||||
|                                 #     f'<= canceller: {chan.uid}\n' | ||||
|                                 #     f'  |_{chan}\n\n' | ||||
|                                 #     f'=> {actor}\n' | ||||
|                                 #     f'  |_cid: {target_cid}\n' | ||||
|                                 # ) | ||||
|                                 try: | ||||
|                                     await _invoke( | ||||
|                                         actor, | ||||
|                                         cid, | ||||
|                                         chan, | ||||
|                                         func, | ||||
|                                         kwargs, | ||||
|                                         is_rpc=False, | ||||
|                                     ) | ||||
|                                 except BaseException: | ||||
|                                     log.exception( | ||||
|                                         'Failed to cancel task?\n' | ||||
|                                         f'<= canceller: {chan.uid}\n' | ||||
|                                         f'  |_{chan}\n\n' | ||||
|                                         f'=> {actor}\n' | ||||
|                                         f'  |_cid: {target_cid}\n' | ||||
|                                     ) | ||||
|                                 continue | ||||
| 
 | ||||
|                             # case Start( | ||||
|                             #     ns='self', | ||||
|                             #     funcname='register_actor', | ||||
|                             # ): | ||||
|                             else: | ||||
|                                 # normally registry methods, eg. | ||||
|                                 # ``.register_actor()`` etc. | ||||
|                                 func: Callable = getattr(actor, funcname) | ||||
| 
 | ||||
|                         # case Start( | ||||
|                         #     ns=str(), | ||||
|                         #     funcname=funcname, | ||||
|                         # ): | ||||
|                         else: | ||||
|                             # complain to client about restricted modules | ||||
|                             try: | ||||
|                                 func = actor._get_rpc_func(ns, funcname) | ||||
|                             except ( | ||||
|                                 ModuleNotExposed, | ||||
|                                 AttributeError, | ||||
|                             ) as err: | ||||
|                                 err_msg: dict[str, dict] = pack_error( | ||||
|                                     err, | ||||
|                                     cid=cid, | ||||
|                                 ) | ||||
|                                 await chan.send(err_msg) | ||||
|                                 continue | ||||
| 
 | ||||
|                         # schedule a task for the requested RPC function | ||||
|                         # in the actor's main "service nursery". | ||||
|                         # TODO: possibly a service-tn per IPC channel for | ||||
|                         # supervision isolation? would avoid having to | ||||
|                         # manage RPC tasks individually in `._rpc_tasks` | ||||
|                         # table? | ||||
|                         log.runtime( | ||||
|                             f'Spawning task for RPC request\n' | ||||
|                             f'<= caller: {chan.uid}\n' | ||||
|                             f'  |_{chan}\n\n' | ||||
|                             # TODO: maddr style repr? | ||||
|                             # f'  |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/' | ||||
|                             # f'cid="{cid[-16:]} .."\n\n' | ||||
| 
 | ||||
|                             f'=> {actor}\n' | ||||
|                             f'  |_cid: {cid}\n' | ||||
|                             f'   |>> {func}()\n' | ||||
|                         ) | ||||
|                         assert actor._service_n  # wait why? do it at top? | ||||
|                         try: | ||||
|                             ctx: Context = await actor._service_n.start( | ||||
|                                 partial( | ||||
|                                     _invoke, | ||||
|                                     actor, | ||||
|                                     cid, | ||||
|                                     chan, | ||||
|                                     func, | ||||
|                                     kwargs, | ||||
|                                 ), | ||||
|                                 name=funcname, | ||||
|                             ) | ||||
| 
 | ||||
|                         except ( | ||||
|                             RuntimeError, | ||||
|                             BaseExceptionGroup, | ||||
|                         ): | ||||
|                             # avoid reporting a benign race condition | ||||
|                             # during actor runtime teardown. | ||||
|                             nursery_cancelled_before_task: bool = True | ||||
|                             break | ||||
| 
 | ||||
|                         # in the lone case where a ``Context`` is not | ||||
|                         # delivered, it's likely going to be a locally | ||||
|                         # scoped exception from ``_invoke()`` itself. | ||||
|                         if isinstance(err := ctx, Exception): | ||||
|                             log.warning( | ||||
|                                 'Task for RPC failed?' | ||||
|                                 f'|_ {func}()\n\n' | ||||
| 
 | ||||
|                                 f'{err}' | ||||
|                             ) | ||||
|                             continue | ||||
| 
 | ||||
|                         else: | ||||
|                             # mark that we have ongoing rpc tasks | ||||
|                             actor._ongoing_rpc_tasks = trio.Event() | ||||
| 
 | ||||
|                             # store cancel scope such that the rpc task can be | ||||
|                             # cancelled gracefully if requested | ||||
|                             actor._rpc_tasks[(chan, cid)] = ( | ||||
|                                 ctx, | ||||
|                                 func, | ||||
|                                 trio.Event(), | ||||
|                             ) | ||||
| 
 | ||||
|                     case Error()|_: | ||||
|                         # This is the non-rpc error case, that is, an | ||||
|                         # error **not** raised inside a call to ``_invoke()`` | ||||
|                         # (i.e. no cid was provided in the msg - see above). | ||||
|                         # Push this error to all local channel consumers | ||||
|                         # (normally portals) by marking the channel as errored | ||||
|                         log.exception( | ||||
|                             f'Unhandled IPC msg:\n\n' | ||||
|                             f'{msg}\n' | ||||
|                         ) | ||||
|                         assert chan.uid | ||||
|                         exc = unpack_error( | ||||
|                             msg, | ||||
|                             chan=chan, | ||||
|                         ) | ||||
|                         chan._exc = exc | ||||
|                         raise exc | ||||
| 
 | ||||
|                 log.runtime( | ||||
|                     'Waiting on next IPC msg from\n' | ||||
|  |  | |||
|  | @ -87,6 +87,23 @@ from ._rpc import ( | |||
|     process_messages, | ||||
|     try_ship_error_to_remote, | ||||
| ) | ||||
| from tractor.msg import ( | ||||
|     types as msgtypes, | ||||
|     pretty_struct, | ||||
| ) | ||||
| # from tractor.msg.types import ( | ||||
| #     Aid, | ||||
| #     SpawnSpec, | ||||
| #     Start, | ||||
| #     StartAck, | ||||
| #     Started, | ||||
| #     Yield, | ||||
| #     Stop, | ||||
| #     Return, | ||||
| #     Error, | ||||
| # ) | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|  | @ -143,6 +160,7 @@ class Actor: | |||
|     # Information about `__main__` from parent | ||||
|     _parent_main_data: dict[str, str] | ||||
|     _parent_chan_cs: CancelScope|None = None | ||||
|     _spawn_spec: SpawnSpec|None = None | ||||
| 
 | ||||
|     # syncs for setup/teardown sequences | ||||
|     _server_down: trio.Event|None = None | ||||
|  | @ -539,7 +557,8 @@ class Actor: | |||
| 
 | ||||
|                             f'{pformat(msg)}\n' | ||||
|                         ) | ||||
|                         cid = msg.get('cid') | ||||
|                         # cid: str|None = msg.get('cid') | ||||
|                         cid: str|None = msg.cid | ||||
|                         if cid: | ||||
|                             # deliver response to local caller/waiter | ||||
|                             await self._push_result( | ||||
|  | @ -891,29 +910,44 @@ class Actor: | |||
|             f'=> {ns}.{func}({kwargs})\n' | ||||
|         ) | ||||
|         await chan.send( | ||||
|             {'cmd': ( | ||||
|                 ns, | ||||
|                 func, | ||||
|                 kwargs, | ||||
|                 self.uid, | ||||
|                 cid, | ||||
|             )} | ||||
|             msgtypes.Start( | ||||
|                 ns=ns, | ||||
|                 func=func, | ||||
|                 kwargs=kwargs, | ||||
|                 uid=self.uid, | ||||
|                 cid=cid, | ||||
|             ) | ||||
|         ) | ||||
|             # {'cmd': ( | ||||
|             #     ns, | ||||
|             #     func, | ||||
|             #     kwargs, | ||||
|             #     self.uid, | ||||
|             #     cid, | ||||
|             # )} | ||||
|         # ) | ||||
| 
 | ||||
|         # Wait on first response msg and validate; this should be | ||||
|         # immediate. | ||||
|         first_msg: dict = await ctx._recv_chan.receive() | ||||
|         functype: str = first_msg.get('functype') | ||||
|         # first_msg: dict = await ctx._recv_chan.receive() | ||||
|         # functype: str = first_msg.get('functype') | ||||
| 
 | ||||
|         if 'error' in first_msg: | ||||
|         first_msg: msgtypes.StartAck = await ctx._recv_chan.receive() | ||||
|         try: | ||||
|             functype: str = first_msg.functype | ||||
|         except AttributeError: | ||||
|             raise unpack_error(first_msg, chan) | ||||
|             # if 'error' in first_msg: | ||||
|             #     raise unpack_error(first_msg, chan) | ||||
| 
 | ||||
|         elif functype not in ( | ||||
|         if functype not in ( | ||||
|             'asyncfunc', | ||||
|             'asyncgen', | ||||
|             'context', | ||||
|         ): | ||||
|             raise ValueError(f"{first_msg} is an invalid response packet?") | ||||
|             raise ValueError( | ||||
|                 f'{first_msg} is an invalid response packet?' | ||||
|             ) | ||||
| 
 | ||||
|         ctx._remote_func_type = functype | ||||
|         return ctx | ||||
|  | @ -946,24 +980,36 @@ class Actor: | |||
|             await self._do_handshake(chan) | ||||
| 
 | ||||
|             accept_addrs: list[tuple[str, int]]|None = None | ||||
|             if self._spawn_method == "trio": | ||||
|                 # Receive runtime state from our parent | ||||
|                 parent_data: dict[str, Any] | ||||
|                 parent_data = await chan.recv() | ||||
|                 log.runtime( | ||||
|                     'Received state from parent:\n\n' | ||||
|                     # TODO: eventually all these msgs as | ||||
|                     # `msgspec.Struct` with a special mode that | ||||
|                     # pformats them in multi-line mode, BUT only | ||||
|                     # if "trace"/"util" mode is enabled? | ||||
|                     f'{pformat(parent_data)}\n' | ||||
|                 ) | ||||
|                 accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') | ||||
|                 rvs = parent_data.pop('_runtime_vars') | ||||
| 
 | ||||
|             if self._spawn_method == "trio": | ||||
| 
 | ||||
|                 # Receive runtime state from our parent | ||||
|                 # parent_data: dict[str, Any] | ||||
|                 # parent_data = await chan.recv() | ||||
| 
 | ||||
|                 # TODO: maybe we should just wrap this directly | ||||
|                 # in a `Actor.spawn_info: SpawnInfo` struct? | ||||
|                 spawnspec: msgtypes.SpawnSpec = await chan.recv() | ||||
|                 self._spawn_spec = spawnspec | ||||
| 
 | ||||
|                 # TODO: eventually all these msgs as | ||||
|                 # `msgspec.Struct` with a special mode that | ||||
|                 # pformats them in multi-line mode, BUT only | ||||
|                 # if "trace"/"util" mode is enabled? | ||||
|                 log.runtime( | ||||
|                     'Received runtime spec from parent:\n\n' | ||||
|                     f'{pformat(spawnspec)}\n' | ||||
|                 ) | ||||
|                 # accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs') | ||||
|                 accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs | ||||
| 
 | ||||
|                 # rvs = parent_data.pop('_runtime_vars') | ||||
|                 rvs = spawnspec._runtime_vars | ||||
|                 if rvs['_debug_mode']: | ||||
|                     try: | ||||
|                         log.info('Enabling `stackscope` traces on SIGUSR1') | ||||
|                         log.info( | ||||
|                             'Enabling `stackscope` traces on SIGUSR1' | ||||
|                         ) | ||||
|                         from .devx import enable_stack_on_sig | ||||
|                         enable_stack_on_sig() | ||||
|                     except ImportError: | ||||
|  | @ -971,28 +1017,40 @@ class Actor: | |||
|                             '`stackscope` not installed for use in debug mode!' | ||||
|                         ) | ||||
| 
 | ||||
|                 log.runtime(f"Runtime vars are: {rvs}") | ||||
|                 log.runtime(f'Runtime vars are: {rvs}') | ||||
|                 rvs['_is_root'] = False | ||||
|                 _state._runtime_vars.update(rvs) | ||||
| 
 | ||||
|                 for attr, value in parent_data.items(): | ||||
|                     if ( | ||||
|                         attr == 'reg_addrs' | ||||
|                         and value | ||||
|                     ): | ||||
|                         # XXX: ``msgspec`` doesn't support serializing tuples | ||||
|                         # so just cash manually here since it's what our | ||||
|                         # internals expect. | ||||
|                         # TODO: we don't really NEED these as | ||||
|                         # tuples so we can probably drop this | ||||
|                         # casting since apparently in python lists | ||||
|                         # are "more efficient"? | ||||
|                         self.reg_addrs = [tuple(val) for val in value] | ||||
|                 # XXX: ``msgspec`` doesn't support serializing tuples | ||||
|                 # so just cash manually here since it's what our | ||||
|                 # internals expect. | ||||
|                 # | ||||
|                 self.reg_addrs = [ | ||||
|                     # TODO: we don't really NEED these as tuples? | ||||
|                     # so we can probably drop this casting since | ||||
|                     # apparently in python lists are "more | ||||
|                     # efficient"? | ||||
|                     tuple(val) | ||||
|                     for val in spawnspec.reg_addrs | ||||
|                 ] | ||||
| 
 | ||||
|                     else: | ||||
|                         setattr(self, attr, value) | ||||
|                 # for attr, value in parent_data.items(): | ||||
|                 for _, attr, value in pretty_struct.iter_fields( | ||||
|                     spawnspec, | ||||
|                 ): | ||||
|                     setattr(self, attr, value) | ||||
|                     # if ( | ||||
|                     #     attr == 'reg_addrs' | ||||
|                     #     and value | ||||
|                     # ): | ||||
|                     #     self.reg_addrs = [tuple(val) for val in value] | ||||
|                     # else: | ||||
|                     #     setattr(self, attr, value) | ||||
| 
 | ||||
|             return chan, accept_addrs | ||||
|             return ( | ||||
|                 chan, | ||||
|                 accept_addrs, | ||||
|             ) | ||||
| 
 | ||||
|         except OSError:  # failed to connect | ||||
|             log.warning( | ||||
|  | @ -1434,7 +1492,7 @@ class Actor: | |||
|         self, | ||||
|         chan: Channel | ||||
| 
 | ||||
|     ) -> tuple[str, str]: | ||||
|     ) -> msgtypes.Aid: | ||||
|         ''' | ||||
|         Exchange `(name, UUIDs)` identifiers as the first | ||||
|         communication step with any (peer) remote `Actor`. | ||||
|  | @ -1443,14 +1501,27 @@ class Actor: | |||
|         "actor model" parlance. | ||||
| 
 | ||||
|         ''' | ||||
|         await chan.send(self.uid) | ||||
|         value: tuple = await chan.recv() | ||||
|         uid: tuple[str, str] = (str(value[0]), str(value[1])) | ||||
|         name, uuid = self.uid | ||||
|         await chan.send( | ||||
|             msgtypes.Aid( | ||||
|                 name=name, | ||||
|                 uuid=uuid, | ||||
|             ) | ||||
|         ) | ||||
|         aid: msgtypes.Aid = await chan.recv() | ||||
|         chan.aid = aid | ||||
| 
 | ||||
|         uid: tuple[str, str] = ( | ||||
|             # str(value[0]), | ||||
|             # str(value[1]) | ||||
|             aid.name, | ||||
|             aid.uuid, | ||||
|         ) | ||||
| 
 | ||||
|         if not isinstance(uid, tuple): | ||||
|             raise ValueError(f"{uid} is not a valid uid?!") | ||||
| 
 | ||||
|         chan.uid = str(uid[0]), str(uid[1]) | ||||
|         chan.uid = uid | ||||
|         return uid | ||||
| 
 | ||||
|     def is_infected_aio(self) -> bool: | ||||
|  | @ -1510,7 +1581,8 @@ async def async_main( | |||
|             # because we're running in mp mode | ||||
|             if ( | ||||
|                 set_accept_addr_says_rent | ||||
|                 and set_accept_addr_says_rent is not None | ||||
|                 and | ||||
|                 set_accept_addr_says_rent is not None | ||||
|             ): | ||||
|                 accept_addrs = set_accept_addr_says_rent | ||||
| 
 | ||||
|  |  | |||
|  | @ -49,6 +49,9 @@ from tractor._portal import Portal | |||
| from tractor._runtime import Actor | ||||
| from tractor._entry import _mp_main | ||||
| from tractor._exceptions import ActorFailure | ||||
| from tractor.msg.types import ( | ||||
|     SpawnSpec, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|  | @ -493,14 +496,25 @@ async def trio_proc( | |||
|             portal, | ||||
|         ) | ||||
| 
 | ||||
|         # send additional init params | ||||
|         await chan.send({ | ||||
|             '_parent_main_data': subactor._parent_main_data, | ||||
|             'enable_modules': subactor.enable_modules, | ||||
|             'reg_addrs': subactor.reg_addrs, | ||||
|             'bind_addrs': bind_addrs, | ||||
|             '_runtime_vars': _runtime_vars, | ||||
|         }) | ||||
|         # send a "spawning specification" which configures the | ||||
|         # initial runtime state of the child. | ||||
|         await chan.send( | ||||
|             SpawnSpec( | ||||
|                 _parent_main_data=subactor._parent_main_data, | ||||
|                 enable_modules=subactor.enable_modules, | ||||
|                 reg_addrs=subactor.reg_addrs, | ||||
|                 bind_addrs=bind_addrs, | ||||
|                 _runtime_vars=_runtime_vars, | ||||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|         # await chan.send({ | ||||
|         #     '_parent_main_data': subactor._parent_main_data, | ||||
|         #     'enable_modules': subactor.enable_modules, | ||||
|         #     'reg_addrs': subactor.reg_addrs, | ||||
|         #     'bind_addrs': bind_addrs, | ||||
|         #     '_runtime_vars': _runtime_vars, | ||||
|         # }) | ||||
| 
 | ||||
|         # track subactor in current nursery | ||||
|         curr_actor: Actor = current_actor() | ||||
|  |  | |||
|  | @ -43,6 +43,11 @@ from .trionics import ( | |||
|     broadcast_receiver, | ||||
|     BroadcastReceiver, | ||||
| ) | ||||
| from tractor.msg import ( | ||||
|     Stop, | ||||
|     Yield, | ||||
|     Error, | ||||
| ) | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._context import Context | ||||
|  | @ -94,21 +99,25 @@ class MsgStream(trio.abc.Channel): | |||
|         self, | ||||
|         allow_msg_keys: list[str] = ['yield'], | ||||
|     ): | ||||
|         msg: dict = self._rx_chan.receive_nowait() | ||||
|         # msg: dict = self._rx_chan.receive_nowait() | ||||
|         msg: Yield|Stop = self._rx_chan.receive_nowait() | ||||
|         for ( | ||||
|             i, | ||||
|             key, | ||||
|         ) in enumerate(allow_msg_keys): | ||||
|             try: | ||||
|                 return msg[key] | ||||
|             except KeyError as kerr: | ||||
|                 # return msg[key] | ||||
|                 return msg.pld | ||||
|             # except KeyError as kerr: | ||||
|             except AttributeError as attrerr: | ||||
|                 if i < (len(allow_msg_keys) - 1): | ||||
|                     continue | ||||
| 
 | ||||
|                 _raise_from_no_key_in_msg( | ||||
|                     ctx=self._ctx, | ||||
|                     msg=msg, | ||||
|                     src_err=kerr, | ||||
|                     # src_err=kerr, | ||||
|                     src_err=attrerr, | ||||
|                     log=log, | ||||
|                     expect_key=key, | ||||
|                     stream=self, | ||||
|  | @ -148,18 +157,22 @@ class MsgStream(trio.abc.Channel): | |||
|         src_err: Exception|None = None  # orig tb | ||||
|         try: | ||||
|             try: | ||||
|                 msg = await self._rx_chan.receive() | ||||
|                 return msg['yield'] | ||||
|                 msg: Yield = await self._rx_chan.receive() | ||||
|                 # return msg['yield'] | ||||
|                 return msg.pld | ||||
| 
 | ||||
|             except KeyError as kerr: | ||||
|                 src_err = kerr | ||||
|             # except KeyError as kerr: | ||||
|             except AttributeError as attrerr: | ||||
|                 # src_err = kerr | ||||
|                 src_err = attrerr | ||||
| 
 | ||||
|                 # NOTE: may raise any of the below error types | ||||
|                 # includg EoC when a 'stop' msg is found. | ||||
|                 _raise_from_no_key_in_msg( | ||||
|                     ctx=self._ctx, | ||||
|                     msg=msg, | ||||
|                     src_err=kerr, | ||||
|                     # src_err=kerr, | ||||
|                     src_err=attrerr, | ||||
|                     log=log, | ||||
|                     expect_key='yield', | ||||
|                     stream=self, | ||||
|  | @ -514,11 +527,18 @@ class MsgStream(trio.abc.Channel): | |||
|             raise self._closed | ||||
| 
 | ||||
|         try: | ||||
|             # await self._ctx.chan.send( | ||||
|             #     payload={ | ||||
|             #         'yield': data, | ||||
|             #         'cid': self._ctx.cid, | ||||
|             #     }, | ||||
|             #     # hide_tb=hide_tb, | ||||
|             # ) | ||||
|             await self._ctx.chan.send( | ||||
|                 payload={ | ||||
|                     'yield': data, | ||||
|                     'cid': self._ctx.cid, | ||||
|                 }, | ||||
|                 payload=Yield( | ||||
|                     cid=self._ctx.cid, | ||||
|                     pld=data, | ||||
|                 ), | ||||
|                 # hide_tb=hide_tb, | ||||
|             ) | ||||
|         except ( | ||||
|  |  | |||
|  | @ -935,6 +935,9 @@ async def _pause( | |||
|             # ``breakpoint()`` was awaited and begin handling stdio. | ||||
|             log.debug('Entering sync world of the `pdb` REPL..') | ||||
|             try: | ||||
|                 # log.critical( | ||||
|                 #     f'stack len: {len(pdb.stack)}\n' | ||||
|                 # ) | ||||
|                 debug_func( | ||||
|                     actor, | ||||
|                     pdb, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue