Rename `Actor._push_result()` -> `._deliver_ctx_payload()`

Better describes the internal RPC impl/latest-architecture with the msgs
delivered being those which either define a `.pld: PayloadT` that gets
passed up to user code, or the error-msg subset that similarly is raised
in a ctx-linked task.
runtime_to_msgspec
Tyler Goodlet 2024-04-08 10:25:57 -04:00
parent 2f451ab9a3
commit b341146bd1
4 changed files with 17 additions and 12 deletions

View File

@ -1207,7 +1207,7 @@ class Context:
# XXX: (MEGA IMPORTANT) if this is a root opened process we # XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the # wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside # context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in # ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via # the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock # a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the # where the root is waiting on the lock to clear but the
@ -1698,11 +1698,11 @@ class Context:
# raise any msg type error NO MATTER WHAT! # raise any msg type error NO MATTER WHAT!
except msgspec.ValidationError as verr: except msgspec.ValidationError as verr:
from tractor._ipc import _raise_msg_type_err from tractor._ipc import _mk_msg_type_err
_raise_msg_type_err( raise _mk_msg_type_err(
msg=msg_bytes, msg=msg_bytes,
codec=codec, codec=codec,
validation_err=verr, src_validation_error=verr,
verb_header='Trying to send payload' verb_header='Trying to send payload'
# > 'invalid `Started IPC msgs\n' # > 'invalid `Started IPC msgs\n'
) )
@ -2415,7 +2415,7 @@ async def open_context_from_portal(
# XXX: (MEGA IMPORTANT) if this is a root opened process we # XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the # wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside # context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in # ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via # the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock # a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the # where the root is waiting on the lock to clear but the

View File

@ -830,7 +830,7 @@ async def process_messages(
): ):
# deliver response to local caller/waiter # deliver response to local caller/waiter
# via its per-remote-context memory channel. # via its per-remote-context memory channel.
await actor._push_result( await actor._deliver_ctx_payload(
chan, chan,
cid, cid,
msg, msg,

View File

@ -69,6 +69,7 @@ from tractor.msg import (
pretty_struct, pretty_struct,
NamespacePath, NamespacePath,
types as msgtypes, types as msgtypes,
Msg,
) )
from ._ipc import Channel from ._ipc import Channel
from ._context import ( from ._context import (
@ -77,9 +78,10 @@ from ._context import (
) )
from .log import get_logger from .log import get_logger
from ._exceptions import ( from ._exceptions import (
unpack_error,
ModuleNotExposed,
ContextCancelled, ContextCancelled,
ModuleNotExposed,
MsgTypeError,
unpack_error,
TransportClosed, TransportClosed,
) )
from .devx import ( from .devx import (
@ -557,7 +559,7 @@ class Actor:
cid: str|None = msg.cid cid: str|None = msg.cid
if cid: if cid:
# deliver response to local caller/waiter # deliver response to local caller/waiter
await self._push_result( await self._deliver_ctx_payload(
chan, chan,
cid, cid,
msg, msg,
@ -716,11 +718,11 @@ class Actor:
# TODO: rename to `._deliver_payload()` since this handles # TODO: rename to `._deliver_payload()` since this handles
# more then just `result` msgs now obvi XD # more then just `result` msgs now obvi XD
async def _push_result( async def _deliver_ctx_payload(
self, self,
chan: Channel, chan: Channel,
cid: str, cid: str,
msg: dict[str, Any], msg: Msg|MsgTypeError,
) -> None|bool: ) -> None|bool:
''' '''
@ -749,6 +751,9 @@ class Actor:
) )
return return
# if isinstance(msg, MsgTypeError):
# return await ctx._deliver_bad_msg()
return await ctx._deliver_msg(msg) return await ctx._deliver_msg(msg)
def get_context( def get_context(

View File

@ -183,7 +183,7 @@ class MsgStream(trio.abc.Channel):
# - via a received `{'stop': ...}` msg from remote side. # - via a received `{'stop': ...}` msg from remote side.
# |_ NOTE: previously this was triggered by calling # |_ NOTE: previously this was triggered by calling
# ``._rx_chan.aclose()`` on the send side of the channel inside # ``._rx_chan.aclose()`` on the send side of the channel inside
# `Actor._push_result()`, but now the 'stop' message handling # `Actor._deliver_ctx_payload()`, but now the 'stop' message handling
# has been put just above inside `_raise_from_no_key_in_msg()`. # has been put just above inside `_raise_from_no_key_in_msg()`.
except ( except (
trio.EndOfChannel, trio.EndOfChannel,