Compare commits

..

No commits in common. "f0342d6ae31416da506ef4cbcdbe5baf528f9104" and "8ea0f08386ec62721581a792156f62d124b0d2aa" have entirely different histories.

6 changed files with 435 additions and 320 deletions

View File

@ -315,7 +315,7 @@ def test_basic_payload_spec(
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
],
# only for debug
# post_mortem=True,
post_mortem=True,
),
p.open_context(
child,

View File

@ -86,10 +86,7 @@ from .msg import (
from ._ipc import (
Channel,
)
from ._streaming import (
MsgStream,
open_stream_from_ctx,
)
from ._streaming import MsgStream
from ._state import (
current_actor,
debug_mode,
@ -981,6 +978,198 @@ class Context:
assert self._scope
self._scope.cancel()
# TODO? should we move this to `._streaming` much like we
# moved `Portal.open_context()`'s def to this mod?
@acm
async def open_stream(
self,
allow_overruns: bool|None = False,
msg_buffer_size: int|None = None,
) -> AsyncGenerator[MsgStream, None]:
'''
Open a ``MsgStream``, a bi-directional stream connected to the
cross-actor (far end) task for this ``Context``.
This context manager must be entered on both the caller and
callee for the stream to logically be considered "connected".
A ``MsgStream`` is currently "one-shot" use, meaning if you
close it you can not "re-open" it for streaming and instead you
must re-establish a new surrounding ``Context`` using
``Portal.open_context()``. In the future this may change but
currently there seems to be no obvious reason to support
"re-opening":
- pausing a stream can be done with a message.
- task errors will normally require a restart of the entire
scope of the inter-actor task context due to the nature of
``trio``'s cancellation system.
'''
actor: Actor = self._actor
# If the surrounding context has been cancelled by some
# task with a handle to THIS, we error here immediately
# since it likely means the surrounding lexical-scope has
# errored, been `trio.Cancelled` or at the least
# `Context.cancel()` was called by some task.
if self._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if
# it's an expected `ContextCancelled` due to a local
# task having called `.cancel()`!
#
# WHY: we expect the error to always bubble up to the
# surrounding `Portal.open_context()` call and be
# absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already
# sent to the other side!
self.maybe_raise(
raise_ctxc_from_self_call=True,
)
# NOTE: this is diff then calling
# `._maybe_raise_remote_err()` specifically
# because we want to raise a ctxc on any task entering this `.open_stream()`
# AFTER cancellation was already been requested,
# we DO NOT want to absorb any ctxc ACK silently!
# if self._remote_error:
# raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded
# back from the other side (yet), we raise a different
# runtime error indicating that this task's usage of
# `Context.cancel()` and then `.open_stream()` is WRONG!
task: str = trio.lowlevel.current_task().name
raise RuntimeError(
'Stream opened after `Context.cancel()` called..?\n'
f'task: {actor.uid[0]}:{task}\n'
f'{self}'
)
if (
not self._portal
and not self._started_called
):
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the
# parent-ctx-task side (on the side that calls
# `Actor.start_remote_task()`) so if you try to send
# a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx: Context = actor.get_context(
chan=self.chan,
cid=self.cid,
nsf=self._nsf,
# side=self.side,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._allow_overruns: bool = allow_overruns
assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if ctx._rx_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!\n'
)
# NOTE: implicitly this will call `MsgStream.aclose()` on
# `.__aexit__()` due to stream's parent `Channel` type!
#
# XXX NOTE XXX: ensures the stream is "one-shot use",
# which specifically means that on exit,
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
# the far end indicating that the caller exited
# the streaming context purposefully by letting
# the exit block exec.
# - this is diff from the cancel/error case where
# a cancel request from this side or an error
# should be sent to the far end indicating the
# stream WAS NOT just closed normally/gracefully.
async with MsgStream(
ctx=self,
rx_chan=ctx._rx_chan,
) as stream:
# NOTE: we track all existing streams per portal for
# the purposes of attempting graceful closes on runtime
# cancel requests.
if self._portal:
self._portal._streams.add(stream)
try:
self._stream_opened: bool = True
self._stream = stream
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
#
# await maybe_wait_for_debugger()
# XXX TODO: pretty sure this isn't needed (see
# note above this block) AND will result in
# a double `.send_stop()` call. The only reason to
# put it here would be to due with "order" in
# terms of raising any remote error (as per
# directly below) or bc the stream's
# `.__aexit__()` block might not get run
# (doubtful)? Either way if we did put this back
# in we also need a state var to avoid the double
# stop-msg send..
#
# await stream.aclose()
# NOTE: absorb and do not raise any
# EoC received from the other side such that
# it is not raised inside the surrounding
# context block's scope!
except trio.EndOfChannel as eoc:
if (
eoc
and
stream.closed
):
# sanity, can remove?
assert eoc is stream._eoc
log.warning(
'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but
# does show txt followed by IPC msg.
f'{str(eoc)}\n'
)
finally:
if self._portal:
try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
# TODO: replace all the `._maybe_raise_remote_err()` usage
# with instances of this!!
def maybe_raise(
@ -989,14 +1178,6 @@ class Context:
**kwargs,
) -> Exception|None:
'''
Check for for a remote error delivered by the runtime from
our peer (task); if set immediately raise.
This is a convenience wrapper for
`._maybe_raise_remote_err(self._remote_error)`.
'''
__tracebackhide__: bool = hide_tb
if re := self._remote_error:
return self._maybe_raise_remote_err(
@ -1109,7 +1290,8 @@ class Context:
raise remote_error
async def wait_for_result(
# TODO: change to `.wait_for_result()`?
async def result(
self,
hide_tb: bool = True,
@ -1198,27 +1380,18 @@ class Context:
(not self._cancel_called)
)
)
# TODO: eventually make `.outcome: Outcome` and thus return
# `self.outcome.unwrap()` here!
return self.outcome
# TODO: switch this with above!
# -[ ] should be named `.wait_for_outcome()` and instead do
# a `.outcome.Outcome.unwrap()` ?
#
async def result(
self,
*args,
**kwargs,
) -> Any|Exception:
log.warning(
'`Context.result()` is DEPRECATED!\n'
'Use `Context.[no]wait_for_result()` instead!\n'
)
return await self.wait_for_result(
*args,
**kwargs,
)
# @property
# def result(self) -> Any|None:
# if self._final_result_is_set():
# return self._result
# raise RuntimeError('No result is available!')
@property
def maybe_error(self) -> BaseException|None:
@ -1274,9 +1447,6 @@ class Context:
return self._result is not Unresolved
# def get_result_nowait(self) -> Any|None:
# def get_outcome_nowait(self) -> Any|None:
# def recv_result_nowait(self) -> Any|None:
# def receive_outcome_nowait(self) -> Any|None:
# TODO: use `outcome.Outcome` here instead?
@property
def outcome(self) -> (
@ -1306,6 +1476,7 @@ class Context:
def has_outcome(self) -> bool:
return bool(self.maybe_error) or self._final_result_is_set()
# @property
def repr_outcome(
self,
show_error_fields: bool = False,
@ -1327,8 +1498,7 @@ class Context:
# just deliver the type name.
if (
(reprol := getattr(merr, 'reprol', False))
and
show_error_fields
and show_error_fields
):
return reprol()
@ -1345,6 +1515,10 @@ class Context:
repr(merr)
)
# just the type name
# else: # but wen?
# return type(merr).__name__
# for all other errors show their regular output
return (
str(merr)
@ -1398,7 +1572,7 @@ class Context:
_, # any non-unresolved value
None,
) if self._final_result_is_set():
status = 'result-returned'
status = 'returned'
# normal operation but still in a pre-`Return`-result
# dialog phase
@ -1766,11 +1940,6 @@ class Context:
# ow, indicate unable to deliver by default
return False
# NOTE: similar to `Portal.open_context()`, this impl is found in
# the `._streaming`` mod to make reading/groking the details
# simpler code-org-wise.
open_stream = open_stream_from_ctx
# TODO: exception tb masking by using a manual
# `.__aexit__()`/.__aenter__()` pair on a type?

View File

@ -739,24 +739,37 @@ async def _invoke(
cid,
))
logmeth: Callable = log.runtime
merr: Exception|None = ctx.maybe_error
descr_str: str = 'with final result `{repr(ctx.outcome)}`'
(
res_type_str,
res_str,
) = (
('error', f'{type(merr)}',) if merr
else (
'result',
f'`{repr(ctx.outcome)}`',
)
)
message: str = (
f'IPC context terminated {descr_str}\n\n'
f'IPC context terminated with a final {res_type_str}\n\n'
f'{ctx}'
)
if merr:
descr_str: str = (
f'with ctx having {ctx.repr_state!r}\n'
f'{ctx.repr_outcome()}\n'
from tractor import RemoteActorError
if not isinstance(merr, RemoteActorError):
fmt_merr: str = (
f'\n{merr!r}\n'
# f'{merr.args[0]!r}\n'
)
if isinstance(merr, ContextCancelled):
logmeth: Callable = log.runtime
else:
logmeth: Callable = log.error
message += f'\n{merr!r}\n'
logmeth(message)
fmt_merr = f'\n{merr!r}'
log.error(
message
+
fmt_merr
)
else:
log.runtime(message)
async def try_ship_error_to_remote(

View File

@ -26,7 +26,6 @@ import inspect
from pprint import pformat
from typing import (
Any,
AsyncGenerator,
Callable,
AsyncIterator,
TYPE_CHECKING,
@ -52,7 +51,6 @@ from tractor.msg import (
)
if TYPE_CHECKING:
from ._runtime import Actor
from ._context import Context
from ._ipc import Channel
@ -552,213 +550,6 @@ class MsgStream(trio.abc.Channel):
# ...
@acm
async def open_stream_from_ctx(
ctx: Context,
allow_overruns: bool|None = False,
msg_buffer_size: int|None = None,
) -> AsyncGenerator[MsgStream, None]:
'''
Open a `MsgStream`, a bi-directional msg transport dialog
connected to the cross-actor peer task for an IPC `Context`.
This context manager must be entered in both the "parent" (task
which entered `Portal.open_context()`) and "child" (RPC task
which is decorated by `@context`) tasks for the stream to
logically be considered "open"; if one side begins sending to an
un-opened peer, depending on policy config, msgs will either be
queued until the other side opens and/or a `StreamOverrun` will
(eventually) be raised.
------ - ------
Runtime semantics design:
A `MsgStream` session adheres to "one-shot use" semantics,
meaning if you close the scope it **can not** be "re-opened".
Instead you must re-establish a new surrounding RPC `Context`
(RTC: remote task context?) using `Portal.open_context()`.
In the future this *design choice* may need to be changed but
currently there seems to be no obvious reason to support such
semantics..
- "pausing a stream" can be supported with a message implemented
by the `tractor` application dev.
- any remote error will normally require a restart of the entire
`trio.Task`'s scope due to the nature of `trio`'s cancellation
(`CancelScope`) system and semantics (level triggered).
'''
actor: Actor = ctx._actor
# If the surrounding context has been cancelled by some
# task with a handle to THIS, we error here immediately
# since it likely means the surrounding lexical-scope has
# errored, been `trio.Cancelled` or at the least
# `Context.cancel()` was called by some task.
if ctx._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if
# it's an expected `ContextCancelled` due to a local
# task having called `.cancel()`!
#
# WHY: we expect the error to always bubble up to the
# surrounding `Portal.open_context()` call and be
# absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already
# sent to the other side!
ctx.maybe_raise(
raise_ctxc_from_self_call=True,
)
# NOTE: this is diff then calling
# `._maybe_raise_remote_err()` specifically
# because we want to raise a ctxc on any task entering this `.open_stream()`
# AFTER cancellation was already been requested,
# we DO NOT want to absorb any ctxc ACK silently!
# if ctx._remote_error:
# raise ctx._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded
# back from the other side (yet), we raise a different
# runtime error indicating that this task's usage of
# `Context.cancel()` and then `.open_stream()` is WRONG!
task: str = trio.lowlevel.current_task().name
raise RuntimeError(
'Stream opened after `Context.cancel()` called..?\n'
f'task: {actor.uid[0]}:{task}\n'
f'{ctx}'
)
if (
not ctx._portal
and not ctx._started_called
):
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the
# parent-ctx-task side (on the side that calls
# `Actor.start_remote_task()`) so if you try to send
# a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx: Context = actor.get_context(
chan=ctx.chan,
cid=ctx.cid,
nsf=ctx._nsf,
# side=ctx.side,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._allow_overruns: bool = allow_overruns
assert ctx is ctx
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if ctx._rx_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!\n'
)
# NOTE: implicitly this will call `MsgStream.aclose()` on
# `.__aexit__()` due to stream's parent `Channel` type!
#
# XXX NOTE XXX: ensures the stream is "one-shot use",
# which specifically means that on exit,
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
# the far end indicating that the caller exited
# the streaming context purposefully by letting
# the exit block exec.
# - this is diff from the cancel/error case where
# a cancel request from this side or an error
# should be sent to the far end indicating the
# stream WAS NOT just closed normally/gracefully.
async with MsgStream(
ctx=ctx,
rx_chan=ctx._rx_chan,
) as stream:
# NOTE: we track all existing streams per portal for
# the purposes of attempting graceful closes on runtime
# cancel requests.
if ctx._portal:
ctx._portal._streams.add(stream)
try:
ctx._stream_opened: bool = True
ctx._stream = stream
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
#
# await maybe_wait_for_debugger()
# XXX TODO: pretty sure this isn't needed (see
# note above this block) AND will result in
# a double `.send_stop()` call. The only reason to
# put it here would be to due with "order" in
# terms of raising any remote error (as per
# directly below) or bc the stream's
# `.__aexit__()` block might not get run
# (doubtful)? Either way if we did put this back
# in we also need a state var to avoid the double
# stop-msg send..
#
# await stream.aclose()
# NOTE: absorb and do not raise any
# EoC received from the other side such that
# it is not raised inside the surrounding
# context block's scope!
except trio.EndOfChannel as eoc:
if (
eoc
and
stream.closed
):
# sanity, can remove?
assert eoc is stream._eoc
log.warning(
'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but
# does show txt followed by IPC msg.
f'{str(eoc)}\n'
)
finally:
if ctx._portal:
try:
ctx._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {ctx.chan.uid}\n'
f'ctx id: {ctx.cid}'
)
def stream(func: Callable) -> Callable:
'''
Mark an async function as a streaming routine with ``@stream``.

View File

@ -52,6 +52,10 @@ from msgspec import (
msgpack,
Raw,
)
# from trio.lowlevel import (
# RunVar,
# RunVarToken,
# )
# TODO: see notes below from @mikenerone..
# from tricycle import TreeVar
@ -364,16 +368,160 @@ class MsgCodec(Struct):
# https://jcristharif.com/msgspec/usage.html#typed-decoding
return self._dec.decode(msg)
# TODO: a sub-decoder system as well?
# payload_msg_specs: Union[Type[Struct]] = Any
# see related comments in `.msg.types`
# _payload_decs: (
# dict[
# str,
# msgpack.Decoder,
# ]
# |None
# ) = None
# OR
# ) = {
# # pre-seed decoders for std-py-type-set for use when
# # `MsgType.pld == None|Any`.
# None: msgpack.Decoder(Any),
# Any: msgpack.Decoder(Any),
# }
#
# -[ ] do we still want to try and support the sub-decoder with
# `.Raw` technique in the case that the `Generic` approach gives
# future grief?
#
# -[ ] <NEW-ISSUE-FOR-ThIS-HERE>
# -> https://jcristharif.com/msgspec/api.html#raw
#
#def mk_pld_subdec(
# self,
# payload_types: Union[Type[Struct]],
# [x] TODO: a sub-decoder system as well? => No!
#) -> msgpack.Decoder:
# # TODO: sub-decoder suppor for `.pld: Raw`?
# # => see similar notes inside `.msg.types`..
# #
# # not sure we'll end up needing this though it might have
# # unforeseen advantages in terms of enabling encrypted
# # appliciation layer (only) payloads?
# #
# # register sub-payload decoders to load `.pld: Raw`
# # decoded `Msg`-packets using a dynamic lookup (table)
# # instead of a pre-defined msg-spec via `Generic`
# # parameterization.
# #
# (
# tags,
# payload_dec,
# ) = mk_tagged_union_dec(
# tagged_structs=list(payload_types.__args__),
# )
# # register sub-decoders by tag
# subdecs: dict[str, msgpack.Decoder]|None = self._payload_decs
# for name in tags:
# subdecs.setdefault(
# name,
# payload_dec,
# )
# return payload_dec
# sub-decoders for retreiving embedded
# payload data and decoding to a sender
# side defined (struct) type.
# def dec_payload(
# codec: MsgCodec,
# msg: Msg,
# ) -> Any|Struct:
# msg: PayloadMsg = codec.dec.decode(msg)
# payload_tag: str = msg.header.payload_tag
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
# return payload_dec.decode(msg.pld)
# def enc_payload(
# codec: MsgCodec,
# payload: Any,
# cid: str,
# ) -> bytes:
# # tag_field: str|None = None
# plbytes = codec.enc.encode(payload)
# if b'msg_type' in plbytes:
# assert isinstance(payload, Struct)
# # tag_field: str = type(payload).__name__
# payload = msgspec.Raw(plbytes)
# msg = Msg(
# cid=cid,
# pld=payload,
# # Header(
# # payload_tag=tag_field,
# # # dialog_id,
# # ),
# )
# return codec.enc.encode(msg)
# TODO: sub-decoded `Raw` fields?
# -[ ] see `MsgCodec._payload_decs` notes
#
# -[x] do we still want to try and support the sub-decoder with
# `.Raw` technique in the case that the `Generic` approach gives
# future grief?
# => NO, since we went with the `PldRx` approach instead B)
#
# IF however you want to see the code that was staged for this
# from wayyy back, see the pure removal commit.
# XXX if we wanted something more complex then field name str-keys
# we might need a header field type to describe the lookup sys?
# class Header(Struct, tag=True):
# '''
# A msg header which defines payload properties
# '''
# payload_tag: str|None = None
#def mk_tagged_union_dec(
# tagged_structs: list[Struct],
#) -> tuple[
# list[str],
# msgpack.Decoder,
#]:
# '''
# Create a `msgpack.Decoder` for an input `list[msgspec.Struct]`
# and return a `list[str]` of each struct's `tag_field: str` value
# which can be used to "map to" the initialized dec.
# '''
# # See "tagged unions" docs:
# # https://jcristharif.com/msgspec/structs.html#tagged-unions
# # "The quickest way to enable tagged unions is to set tag=True when
# # defining every struct type in the union. In this case tag_field
# # defaults to "type", and tag defaults to the struct class name
# # (e.g. "Get")."
# first: Struct = tagged_structs[0]
# types_union: Union[Type[Struct]] = Union[
# first
# ]|Any
# tags: list[str] = [first.__name__]
# for struct in tagged_structs[1:]:
# types_union |= struct
# tags.append(
# getattr(
# struct,
# struct.__struct_config__.tag_field,
# struct.__name__,
# )
# )
# dec = msgpack.Decoder(types_union)
# return (
# tags,
# dec,
# )
def mk_codec(
@ -496,6 +644,10 @@ _def_tractor_codec: MsgCodec = mk_codec(
# 3. We similarly set the pending values for the child nurseries
# of the *current* task.
#
# TODO: STOP USING THIS, since it's basically a global and won't
# allow sub-IPC-ctxs to limit the msg-spec however desired..
# _ctxvar_MsgCodec: MsgCodec = RunVar(
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
'msgspec_codec',
default=_def_tractor_codec,
@ -630,31 +782,3 @@ def limit_msg_spec(
# # import pdbp; pdbp.set_trace()
# assert ext_codec.pld_spec == extended_spec
# yield ext_codec
# TODO: make something similar to this inside `._codec` such that
# user can just pass a type table of some sort?
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
# and then call `.to_dict()` on them?
# -[x] we're going to need to re-impl all the stuff changed in the
# runtime port such that it can handle dicts or `Msg`s?
#
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
# '''
# Deliver a `enc_hook()`/`dec_hook()` pair which does
# manual convertion from our above native `Msg` set
# to `dict` equivalent (wire msgs) in order to keep legacy compat
# with the original runtime implementation.
#
# Note: this is is/was primarly used while moving the core
# runtime over to using native `Msg`-struct types wherein we
# start with the send side emitting without loading
# a typed-decoder and then later flipping the switch over to
# load to the native struct types once all runtime usage has
# been adjusted appropriately.
#
# '''
# return (
# # enc_to_dict,
# dec_from_dict,
# )

View File

@ -26,6 +26,7 @@ from __future__ import annotations
import types
from typing import (
Any,
# Callable,
Generic,
Literal,
Type,
@ -160,6 +161,7 @@ class SpawnSpec(
bind_addrs: list[tuple[str, int]]
# TODO: caps based RPC support in the payload?
#
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
@ -312,9 +314,8 @@ class Started(
pld: PayloadT|Raw
# TODO: cancel request dedicated msg?
# -[ ] instead of using our existing `Start`?
#
# TODO: instead of using our existing `Start`
# for this (as we did with the original `{'cmd': ..}` style)
# class Cancel:
# cid: str
@ -476,16 +477,12 @@ def from_dict_msg(
)
return msgT(**dict_msg)
# TODO: should be make a set of cancel msgs?
# -[ ] a version of `ContextCancelled`?
# |_ and/or with a scope field?
# -[ ] or, a full `ActorCancelled`?
#
# TODO: should be make a msg version of `ContextCancelled?`
# and/or with a scope field or a full `ActorCancelled`?
# class Cancelled(MsgType):
# cid: str
#
# -[ ] what about overruns?
#
# TODO what about overruns?
# class Overrun(MsgType):
# cid: str
@ -567,17 +564,10 @@ def mk_msg_spec(
Create a payload-(data-)type-parameterized IPC message specification.
Allows generating IPC msg types from the above builtin set
with a payload (field) restricted data-type, the `Msg.pld: PayloadT`.
This allows runtime-task contexts to use the python type system
to limit/filter payload values as determined by the input
`payload_type_union: Union[Type]`.
Notes: originally multiple approaches for constructing the
type-union passed to `msgspec` were attempted as selected via the
`spec_build_method`, but it turns out only the defaul method
'indexed_generics' seems to work reliably in all use cases. As
such, the others will likely be removed in the near future.
with a payload (field) restricted data-type via the `Msg.pld:
PayloadT` type var. This allows runtime-task contexts to use
the python type system to limit/filter payload values as
determined by the input `payload_type_union: Union[Type]`.
'''
submsg_types: list[MsgType] = Msg.__subclasses__()
@ -717,3 +707,31 @@ def mk_msg_spec(
+
ipc_msg_types,
)
# TODO: make something similar to this inside `._codec` such that
# user can just pass a type table of some sort?
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
# and then call `.to_dict()` on them?
# -[ ] we're going to need to re-impl all the stuff changed in the
# runtime port such that it can handle dicts or `Msg`s?
#
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
# '''
# Deliver a `enc_hook()`/`dec_hook()` pair which does
# manual convertion from our above native `Msg` set
# to `dict` equivalent (wire msgs) in order to keep legacy compat
# with the original runtime implementation.
#
# Note: this is is/was primarly used while moving the core
# runtime over to using native `Msg`-struct types wherein we
# start with the send side emitting without loading
# a typed-decoder and then later flipping the switch over to
# load to the native struct types once all runtime usage has
# been adjusted appropriately.
#
# '''
# return (
# # enc_to_dict,
# dec_from_dict,
# )