From b341146bd1bd3df6c9fad2b97c11760c32083db1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 8 Apr 2024 10:25:57 -0400 Subject: [PATCH] Rename `Actor._push_result()` -> `._deliver_ctx_payload()` Better describes the internal RPC impl/latest-architecture with the msgs delivered being those which either define a `.pld: PayloadT` that gets passed up to user code, or the error-msg subset that similarly is raised in a ctx-linked task. --- tractor/_context.py | 10 +++++----- tractor/_rpc.py | 2 +- tractor/_runtime.py | 15 ++++++++++----- tractor/_streaming.py | 2 +- 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index 6e55c3c..6a63416 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -1207,7 +1207,7 @@ class Context: # XXX: (MEGA IMPORTANT) if this is a root opened process we # wait for any immediate child in debug before popping the # context from the runtime msg loop otherwise inside - # ``Actor._push_result()`` the msg will be discarded and in + # ``Actor._deliver_ctx_payload()`` the msg will be discarded and in # the case where that msg is global debugger unlock (via # a "stop" msg for a stream), this can result in a deadlock # where the root is waiting on the lock to clear but the @@ -1698,11 +1698,11 @@ class Context: # raise any msg type error NO MATTER WHAT! except msgspec.ValidationError as verr: - from tractor._ipc import _raise_msg_type_err - _raise_msg_type_err( + from tractor._ipc import _mk_msg_type_err + raise _mk_msg_type_err( msg=msg_bytes, codec=codec, - validation_err=verr, + src_validation_error=verr, verb_header='Trying to send payload' # > 'invalid `Started IPC msgs\n' ) @@ -2415,7 +2415,7 @@ async def open_context_from_portal( # XXX: (MEGA IMPORTANT) if this is a root opened process we # wait for any immediate child in debug before popping the # context from the runtime msg loop otherwise inside - # ``Actor._push_result()`` the msg will be discarded and in + # ``Actor._deliver_ctx_payload()`` the msg will be discarded and in # the case where that msg is global debugger unlock (via # a "stop" msg for a stream), this can result in a deadlock # where the root is waiting on the lock to clear but the diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 75e5951..d935909 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -830,7 +830,7 @@ async def process_messages( ): # deliver response to local caller/waiter # via its per-remote-context memory channel. - await actor._push_result( + await actor._deliver_ctx_payload( chan, cid, msg, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 854db3a..4be5ea1 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -69,6 +69,7 @@ from tractor.msg import ( pretty_struct, NamespacePath, types as msgtypes, + Msg, ) from ._ipc import Channel from ._context import ( @@ -77,9 +78,10 @@ from ._context import ( ) from .log import get_logger from ._exceptions import ( - unpack_error, - ModuleNotExposed, ContextCancelled, + ModuleNotExposed, + MsgTypeError, + unpack_error, TransportClosed, ) from .devx import ( @@ -557,7 +559,7 @@ class Actor: cid: str|None = msg.cid if cid: # deliver response to local caller/waiter - await self._push_result( + await self._deliver_ctx_payload( chan, cid, msg, @@ -716,11 +718,11 @@ class Actor: # TODO: rename to `._deliver_payload()` since this handles # more then just `result` msgs now obvi XD - async def _push_result( + async def _deliver_ctx_payload( self, chan: Channel, cid: str, - msg: dict[str, Any], + msg: Msg|MsgTypeError, ) -> None|bool: ''' @@ -749,6 +751,9 @@ class Actor: ) return + # if isinstance(msg, MsgTypeError): + # return await ctx._deliver_bad_msg() + return await ctx._deliver_msg(msg) def get_context( diff --git a/tractor/_streaming.py b/tractor/_streaming.py index dc30ac6..fcf8daf 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -183,7 +183,7 @@ class MsgStream(trio.abc.Channel): # - via a received `{'stop': ...}` msg from remote side. # |_ NOTE: previously this was triggered by calling # ``._rx_chan.aclose()`` on the send side of the channel inside - # `Actor._push_result()`, but now the 'stop' message handling + # `Actor._deliver_ctx_payload()`, but now the 'stop' message handling # has been put just above inside `_raise_from_no_key_in_msg()`. except ( trio.EndOfChannel,