Compare commits
No commits in common. "f0342d6ae31416da506ef4cbcdbe5baf528f9104" and "8ea0f08386ec62721581a792156f62d124b0d2aa" have entirely different histories.
f0342d6ae3
...
8ea0f08386
|
@ -315,7 +315,7 @@ def test_basic_payload_spec(
|
|||
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
||||
],
|
||||
# only for debug
|
||||
# post_mortem=True,
|
||||
post_mortem=True,
|
||||
),
|
||||
p.open_context(
|
||||
child,
|
||||
|
|
|
@ -86,10 +86,7 @@ from .msg import (
|
|||
from ._ipc import (
|
||||
Channel,
|
||||
)
|
||||
from ._streaming import (
|
||||
MsgStream,
|
||||
open_stream_from_ctx,
|
||||
)
|
||||
from ._streaming import MsgStream
|
||||
from ._state import (
|
||||
current_actor,
|
||||
debug_mode,
|
||||
|
@ -981,6 +978,198 @@ class Context:
|
|||
assert self._scope
|
||||
self._scope.cancel()
|
||||
|
||||
# TODO? should we move this to `._streaming` much like we
|
||||
# moved `Portal.open_context()`'s def to this mod?
|
||||
@acm
|
||||
async def open_stream(
|
||||
self,
|
||||
allow_overruns: bool|None = False,
|
||||
msg_buffer_size: int|None = None,
|
||||
|
||||
) -> AsyncGenerator[MsgStream, None]:
|
||||
'''
|
||||
Open a ``MsgStream``, a bi-directional stream connected to the
|
||||
cross-actor (far end) task for this ``Context``.
|
||||
|
||||
This context manager must be entered on both the caller and
|
||||
callee for the stream to logically be considered "connected".
|
||||
|
||||
A ``MsgStream`` is currently "one-shot" use, meaning if you
|
||||
close it you can not "re-open" it for streaming and instead you
|
||||
must re-establish a new surrounding ``Context`` using
|
||||
``Portal.open_context()``. In the future this may change but
|
||||
currently there seems to be no obvious reason to support
|
||||
"re-opening":
|
||||
- pausing a stream can be done with a message.
|
||||
- task errors will normally require a restart of the entire
|
||||
scope of the inter-actor task context due to the nature of
|
||||
``trio``'s cancellation system.
|
||||
|
||||
'''
|
||||
actor: Actor = self._actor
|
||||
|
||||
# If the surrounding context has been cancelled by some
|
||||
# task with a handle to THIS, we error here immediately
|
||||
# since it likely means the surrounding lexical-scope has
|
||||
# errored, been `trio.Cancelled` or at the least
|
||||
# `Context.cancel()` was called by some task.
|
||||
if self._cancel_called:
|
||||
|
||||
# XXX NOTE: ALWAYS RAISE any remote error here even if
|
||||
# it's an expected `ContextCancelled` due to a local
|
||||
# task having called `.cancel()`!
|
||||
#
|
||||
# WHY: we expect the error to always bubble up to the
|
||||
# surrounding `Portal.open_context()` call and be
|
||||
# absorbed there (silently) and we DO NOT want to
|
||||
# actually try to stream - a cancel msg was already
|
||||
# sent to the other side!
|
||||
self.maybe_raise(
|
||||
raise_ctxc_from_self_call=True,
|
||||
)
|
||||
# NOTE: this is diff then calling
|
||||
# `._maybe_raise_remote_err()` specifically
|
||||
# because we want to raise a ctxc on any task entering this `.open_stream()`
|
||||
# AFTER cancellation was already been requested,
|
||||
# we DO NOT want to absorb any ctxc ACK silently!
|
||||
# if self._remote_error:
|
||||
# raise self._remote_error
|
||||
|
||||
# XXX NOTE: if no `ContextCancelled` has been responded
|
||||
# back from the other side (yet), we raise a different
|
||||
# runtime error indicating that this task's usage of
|
||||
# `Context.cancel()` and then `.open_stream()` is WRONG!
|
||||
task: str = trio.lowlevel.current_task().name
|
||||
raise RuntimeError(
|
||||
'Stream opened after `Context.cancel()` called..?\n'
|
||||
f'task: {actor.uid[0]}:{task}\n'
|
||||
f'{self}'
|
||||
)
|
||||
|
||||
if (
|
||||
not self._portal
|
||||
and not self._started_called
|
||||
):
|
||||
raise RuntimeError(
|
||||
'Context.started()` must be called before opening a stream'
|
||||
)
|
||||
|
||||
# NOTE: in one way streaming this only happens on the
|
||||
# parent-ctx-task side (on the side that calls
|
||||
# `Actor.start_remote_task()`) so if you try to send
|
||||
# a stop from the caller to the callee in the
|
||||
# single-direction-stream case you'll get a lookup error
|
||||
# currently.
|
||||
ctx: Context = actor.get_context(
|
||||
chan=self.chan,
|
||||
cid=self.cid,
|
||||
nsf=self._nsf,
|
||||
# side=self.side,
|
||||
|
||||
msg_buffer_size=msg_buffer_size,
|
||||
allow_overruns=allow_overruns,
|
||||
)
|
||||
ctx._allow_overruns: bool = allow_overruns
|
||||
assert ctx is self
|
||||
|
||||
# XXX: If the underlying channel feeder receive mem chan has
|
||||
# been closed then likely client code has already exited
|
||||
# a ``.open_stream()`` block prior or there was some other
|
||||
# unanticipated error or cancellation from ``trio``.
|
||||
|
||||
if ctx._rx_chan._closed:
|
||||
raise trio.ClosedResourceError(
|
||||
'The underlying channel for this stream was already closed!\n'
|
||||
)
|
||||
|
||||
# NOTE: implicitly this will call `MsgStream.aclose()` on
|
||||
# `.__aexit__()` due to stream's parent `Channel` type!
|
||||
#
|
||||
# XXX NOTE XXX: ensures the stream is "one-shot use",
|
||||
# which specifically means that on exit,
|
||||
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
|
||||
# the far end indicating that the caller exited
|
||||
# the streaming context purposefully by letting
|
||||
# the exit block exec.
|
||||
# - this is diff from the cancel/error case where
|
||||
# a cancel request from this side or an error
|
||||
# should be sent to the far end indicating the
|
||||
# stream WAS NOT just closed normally/gracefully.
|
||||
async with MsgStream(
|
||||
ctx=self,
|
||||
rx_chan=ctx._rx_chan,
|
||||
) as stream:
|
||||
|
||||
# NOTE: we track all existing streams per portal for
|
||||
# the purposes of attempting graceful closes on runtime
|
||||
# cancel requests.
|
||||
if self._portal:
|
||||
self._portal._streams.add(stream)
|
||||
|
||||
try:
|
||||
self._stream_opened: bool = True
|
||||
self._stream = stream
|
||||
|
||||
# XXX: do we need this?
|
||||
# ensure we aren't cancelled before yielding the stream
|
||||
# await trio.lowlevel.checkpoint()
|
||||
yield stream
|
||||
|
||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||
# wait for any immediate child in debug before popping the
|
||||
# context from the runtime msg loop otherwise inside
|
||||
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
|
||||
# the case where that msg is global debugger unlock (via
|
||||
# a "stop" msg for a stream), this can result in a deadlock
|
||||
# where the root is waiting on the lock to clear but the
|
||||
# child has already cleared it and clobbered IPC.
|
||||
#
|
||||
# await maybe_wait_for_debugger()
|
||||
|
||||
# XXX TODO: pretty sure this isn't needed (see
|
||||
# note above this block) AND will result in
|
||||
# a double `.send_stop()` call. The only reason to
|
||||
# put it here would be to due with "order" in
|
||||
# terms of raising any remote error (as per
|
||||
# directly below) or bc the stream's
|
||||
# `.__aexit__()` block might not get run
|
||||
# (doubtful)? Either way if we did put this back
|
||||
# in we also need a state var to avoid the double
|
||||
# stop-msg send..
|
||||
#
|
||||
# await stream.aclose()
|
||||
|
||||
# NOTE: absorb and do not raise any
|
||||
# EoC received from the other side such that
|
||||
# it is not raised inside the surrounding
|
||||
# context block's scope!
|
||||
except trio.EndOfChannel as eoc:
|
||||
if (
|
||||
eoc
|
||||
and
|
||||
stream.closed
|
||||
):
|
||||
# sanity, can remove?
|
||||
assert eoc is stream._eoc
|
||||
|
||||
log.warning(
|
||||
'Stream was terminated by EoC\n\n'
|
||||
# NOTE: won't show the error <Type> but
|
||||
# does show txt followed by IPC msg.
|
||||
f'{str(eoc)}\n'
|
||||
)
|
||||
|
||||
finally:
|
||||
if self._portal:
|
||||
try:
|
||||
self._portal._streams.remove(stream)
|
||||
except KeyError:
|
||||
log.warning(
|
||||
f'Stream was already destroyed?\n'
|
||||
f'actor: {self.chan.uid}\n'
|
||||
f'ctx id: {self.cid}'
|
||||
)
|
||||
|
||||
# TODO: replace all the `._maybe_raise_remote_err()` usage
|
||||
# with instances of this!!
|
||||
def maybe_raise(
|
||||
|
@ -989,14 +1178,6 @@ class Context:
|
|||
**kwargs,
|
||||
|
||||
) -> Exception|None:
|
||||
'''
|
||||
Check for for a remote error delivered by the runtime from
|
||||
our peer (task); if set immediately raise.
|
||||
|
||||
This is a convenience wrapper for
|
||||
`._maybe_raise_remote_err(self._remote_error)`.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
if re := self._remote_error:
|
||||
return self._maybe_raise_remote_err(
|
||||
|
@ -1109,7 +1290,8 @@ class Context:
|
|||
|
||||
raise remote_error
|
||||
|
||||
async def wait_for_result(
|
||||
# TODO: change to `.wait_for_result()`?
|
||||
async def result(
|
||||
self,
|
||||
hide_tb: bool = True,
|
||||
|
||||
|
@ -1198,27 +1380,18 @@ class Context:
|
|||
(not self._cancel_called)
|
||||
)
|
||||
)
|
||||
# TODO: eventually make `.outcome: Outcome` and thus return
|
||||
# `self.outcome.unwrap()` here!
|
||||
return self.outcome
|
||||
|
||||
# TODO: switch this with above!
|
||||
# -[ ] should be named `.wait_for_outcome()` and instead do
|
||||
# a `.outcome.Outcome.unwrap()` ?
|
||||
#
|
||||
async def result(
|
||||
self,
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> Any|Exception:
|
||||
log.warning(
|
||||
'`Context.result()` is DEPRECATED!\n'
|
||||
'Use `Context.[no]wait_for_result()` instead!\n'
|
||||
)
|
||||
return await self.wait_for_result(
|
||||
*args,
|
||||
**kwargs,
|
||||
)
|
||||
# @property
|
||||
# def result(self) -> Any|None:
|
||||
# if self._final_result_is_set():
|
||||
# return self._result
|
||||
|
||||
# raise RuntimeError('No result is available!')
|
||||
|
||||
@property
|
||||
def maybe_error(self) -> BaseException|None:
|
||||
|
@ -1274,9 +1447,6 @@ class Context:
|
|||
return self._result is not Unresolved
|
||||
|
||||
# def get_result_nowait(self) -> Any|None:
|
||||
# def get_outcome_nowait(self) -> Any|None:
|
||||
# def recv_result_nowait(self) -> Any|None:
|
||||
# def receive_outcome_nowait(self) -> Any|None:
|
||||
# TODO: use `outcome.Outcome` here instead?
|
||||
@property
|
||||
def outcome(self) -> (
|
||||
|
@ -1306,6 +1476,7 @@ class Context:
|
|||
def has_outcome(self) -> bool:
|
||||
return bool(self.maybe_error) or self._final_result_is_set()
|
||||
|
||||
# @property
|
||||
def repr_outcome(
|
||||
self,
|
||||
show_error_fields: bool = False,
|
||||
|
@ -1327,8 +1498,7 @@ class Context:
|
|||
# just deliver the type name.
|
||||
if (
|
||||
(reprol := getattr(merr, 'reprol', False))
|
||||
and
|
||||
show_error_fields
|
||||
and show_error_fields
|
||||
):
|
||||
return reprol()
|
||||
|
||||
|
@ -1345,6 +1515,10 @@ class Context:
|
|||
repr(merr)
|
||||
)
|
||||
|
||||
# just the type name
|
||||
# else: # but wen?
|
||||
# return type(merr).__name__
|
||||
|
||||
# for all other errors show their regular output
|
||||
return (
|
||||
str(merr)
|
||||
|
@ -1398,7 +1572,7 @@ class Context:
|
|||
_, # any non-unresolved value
|
||||
None,
|
||||
) if self._final_result_is_set():
|
||||
status = 'result-returned'
|
||||
status = 'returned'
|
||||
|
||||
# normal operation but still in a pre-`Return`-result
|
||||
# dialog phase
|
||||
|
@ -1766,11 +1940,6 @@ class Context:
|
|||
# ow, indicate unable to deliver by default
|
||||
return False
|
||||
|
||||
# NOTE: similar to `Portal.open_context()`, this impl is found in
|
||||
# the `._streaming`` mod to make reading/groking the details
|
||||
# simpler code-org-wise.
|
||||
open_stream = open_stream_from_ctx
|
||||
|
||||
|
||||
# TODO: exception tb masking by using a manual
|
||||
# `.__aexit__()`/.__aenter__()` pair on a type?
|
||||
|
|
|
@ -739,24 +739,37 @@ async def _invoke(
|
|||
cid,
|
||||
))
|
||||
|
||||
logmeth: Callable = log.runtime
|
||||
merr: Exception|None = ctx.maybe_error
|
||||
descr_str: str = 'with final result `{repr(ctx.outcome)}`'
|
||||
(
|
||||
res_type_str,
|
||||
res_str,
|
||||
) = (
|
||||
('error', f'{type(merr)}',) if merr
|
||||
else (
|
||||
'result',
|
||||
f'`{repr(ctx.outcome)}`',
|
||||
)
|
||||
)
|
||||
message: str = (
|
||||
f'IPC context terminated {descr_str}\n\n'
|
||||
f'IPC context terminated with a final {res_type_str}\n\n'
|
||||
f'{ctx}'
|
||||
)
|
||||
if merr:
|
||||
descr_str: str = (
|
||||
f'with ctx having {ctx.repr_state!r}\n'
|
||||
f'{ctx.repr_outcome()}\n'
|
||||
from tractor import RemoteActorError
|
||||
if not isinstance(merr, RemoteActorError):
|
||||
fmt_merr: str = (
|
||||
f'\n{merr!r}\n'
|
||||
# f'{merr.args[0]!r}\n'
|
||||
)
|
||||
if isinstance(merr, ContextCancelled):
|
||||
logmeth: Callable = log.runtime
|
||||
else:
|
||||
logmeth: Callable = log.error
|
||||
message += f'\n{merr!r}\n'
|
||||
|
||||
logmeth(message)
|
||||
fmt_merr = f'\n{merr!r}'
|
||||
log.error(
|
||||
message
|
||||
+
|
||||
fmt_merr
|
||||
)
|
||||
else:
|
||||
log.runtime(message)
|
||||
|
||||
|
||||
async def try_ship_error_to_remote(
|
||||
|
|
|
@ -26,7 +26,6 @@ import inspect
|
|||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncGenerator,
|
||||
Callable,
|
||||
AsyncIterator,
|
||||
TYPE_CHECKING,
|
||||
|
@ -52,7 +51,6 @@ from tractor.msg import (
|
|||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._runtime import Actor
|
||||
from ._context import Context
|
||||
from ._ipc import Channel
|
||||
|
||||
|
@ -552,213 +550,6 @@ class MsgStream(trio.abc.Channel):
|
|||
# ...
|
||||
|
||||
|
||||
@acm
|
||||
async def open_stream_from_ctx(
|
||||
ctx: Context,
|
||||
allow_overruns: bool|None = False,
|
||||
msg_buffer_size: int|None = None,
|
||||
|
||||
) -> AsyncGenerator[MsgStream, None]:
|
||||
'''
|
||||
Open a `MsgStream`, a bi-directional msg transport dialog
|
||||
connected to the cross-actor peer task for an IPC `Context`.
|
||||
|
||||
This context manager must be entered in both the "parent" (task
|
||||
which entered `Portal.open_context()`) and "child" (RPC task
|
||||
which is decorated by `@context`) tasks for the stream to
|
||||
logically be considered "open"; if one side begins sending to an
|
||||
un-opened peer, depending on policy config, msgs will either be
|
||||
queued until the other side opens and/or a `StreamOverrun` will
|
||||
(eventually) be raised.
|
||||
|
||||
------ - ------
|
||||
|
||||
Runtime semantics design:
|
||||
|
||||
A `MsgStream` session adheres to "one-shot use" semantics,
|
||||
meaning if you close the scope it **can not** be "re-opened".
|
||||
|
||||
Instead you must re-establish a new surrounding RPC `Context`
|
||||
(RTC: remote task context?) using `Portal.open_context()`.
|
||||
|
||||
In the future this *design choice* may need to be changed but
|
||||
currently there seems to be no obvious reason to support such
|
||||
semantics..
|
||||
|
||||
- "pausing a stream" can be supported with a message implemented
|
||||
by the `tractor` application dev.
|
||||
|
||||
- any remote error will normally require a restart of the entire
|
||||
`trio.Task`'s scope due to the nature of `trio`'s cancellation
|
||||
(`CancelScope`) system and semantics (level triggered).
|
||||
|
||||
'''
|
||||
actor: Actor = ctx._actor
|
||||
|
||||
# If the surrounding context has been cancelled by some
|
||||
# task with a handle to THIS, we error here immediately
|
||||
# since it likely means the surrounding lexical-scope has
|
||||
# errored, been `trio.Cancelled` or at the least
|
||||
# `Context.cancel()` was called by some task.
|
||||
if ctx._cancel_called:
|
||||
|
||||
# XXX NOTE: ALWAYS RAISE any remote error here even if
|
||||
# it's an expected `ContextCancelled` due to a local
|
||||
# task having called `.cancel()`!
|
||||
#
|
||||
# WHY: we expect the error to always bubble up to the
|
||||
# surrounding `Portal.open_context()` call and be
|
||||
# absorbed there (silently) and we DO NOT want to
|
||||
# actually try to stream - a cancel msg was already
|
||||
# sent to the other side!
|
||||
ctx.maybe_raise(
|
||||
raise_ctxc_from_self_call=True,
|
||||
)
|
||||
# NOTE: this is diff then calling
|
||||
# `._maybe_raise_remote_err()` specifically
|
||||
# because we want to raise a ctxc on any task entering this `.open_stream()`
|
||||
# AFTER cancellation was already been requested,
|
||||
# we DO NOT want to absorb any ctxc ACK silently!
|
||||
# if ctx._remote_error:
|
||||
# raise ctx._remote_error
|
||||
|
||||
# XXX NOTE: if no `ContextCancelled` has been responded
|
||||
# back from the other side (yet), we raise a different
|
||||
# runtime error indicating that this task's usage of
|
||||
# `Context.cancel()` and then `.open_stream()` is WRONG!
|
||||
task: str = trio.lowlevel.current_task().name
|
||||
raise RuntimeError(
|
||||
'Stream opened after `Context.cancel()` called..?\n'
|
||||
f'task: {actor.uid[0]}:{task}\n'
|
||||
f'{ctx}'
|
||||
)
|
||||
|
||||
if (
|
||||
not ctx._portal
|
||||
and not ctx._started_called
|
||||
):
|
||||
raise RuntimeError(
|
||||
'Context.started()` must be called before opening a stream'
|
||||
)
|
||||
|
||||
# NOTE: in one way streaming this only happens on the
|
||||
# parent-ctx-task side (on the side that calls
|
||||
# `Actor.start_remote_task()`) so if you try to send
|
||||
# a stop from the caller to the callee in the
|
||||
# single-direction-stream case you'll get a lookup error
|
||||
# currently.
|
||||
ctx: Context = actor.get_context(
|
||||
chan=ctx.chan,
|
||||
cid=ctx.cid,
|
||||
nsf=ctx._nsf,
|
||||
# side=ctx.side,
|
||||
|
||||
msg_buffer_size=msg_buffer_size,
|
||||
allow_overruns=allow_overruns,
|
||||
)
|
||||
ctx._allow_overruns: bool = allow_overruns
|
||||
assert ctx is ctx
|
||||
|
||||
# XXX: If the underlying channel feeder receive mem chan has
|
||||
# been closed then likely client code has already exited
|
||||
# a ``.open_stream()`` block prior or there was some other
|
||||
# unanticipated error or cancellation from ``trio``.
|
||||
|
||||
if ctx._rx_chan._closed:
|
||||
raise trio.ClosedResourceError(
|
||||
'The underlying channel for this stream was already closed!\n'
|
||||
)
|
||||
|
||||
# NOTE: implicitly this will call `MsgStream.aclose()` on
|
||||
# `.__aexit__()` due to stream's parent `Channel` type!
|
||||
#
|
||||
# XXX NOTE XXX: ensures the stream is "one-shot use",
|
||||
# which specifically means that on exit,
|
||||
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
|
||||
# the far end indicating that the caller exited
|
||||
# the streaming context purposefully by letting
|
||||
# the exit block exec.
|
||||
# - this is diff from the cancel/error case where
|
||||
# a cancel request from this side or an error
|
||||
# should be sent to the far end indicating the
|
||||
# stream WAS NOT just closed normally/gracefully.
|
||||
async with MsgStream(
|
||||
ctx=ctx,
|
||||
rx_chan=ctx._rx_chan,
|
||||
) as stream:
|
||||
|
||||
# NOTE: we track all existing streams per portal for
|
||||
# the purposes of attempting graceful closes on runtime
|
||||
# cancel requests.
|
||||
if ctx._portal:
|
||||
ctx._portal._streams.add(stream)
|
||||
|
||||
try:
|
||||
ctx._stream_opened: bool = True
|
||||
ctx._stream = stream
|
||||
|
||||
# XXX: do we need this?
|
||||
# ensure we aren't cancelled before yielding the stream
|
||||
# await trio.lowlevel.checkpoint()
|
||||
yield stream
|
||||
|
||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||
# wait for any immediate child in debug before popping the
|
||||
# context from the runtime msg loop otherwise inside
|
||||
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
|
||||
# the case where that msg is global debugger unlock (via
|
||||
# a "stop" msg for a stream), this can result in a deadlock
|
||||
# where the root is waiting on the lock to clear but the
|
||||
# child has already cleared it and clobbered IPC.
|
||||
#
|
||||
# await maybe_wait_for_debugger()
|
||||
|
||||
# XXX TODO: pretty sure this isn't needed (see
|
||||
# note above this block) AND will result in
|
||||
# a double `.send_stop()` call. The only reason to
|
||||
# put it here would be to due with "order" in
|
||||
# terms of raising any remote error (as per
|
||||
# directly below) or bc the stream's
|
||||
# `.__aexit__()` block might not get run
|
||||
# (doubtful)? Either way if we did put this back
|
||||
# in we also need a state var to avoid the double
|
||||
# stop-msg send..
|
||||
#
|
||||
# await stream.aclose()
|
||||
|
||||
# NOTE: absorb and do not raise any
|
||||
# EoC received from the other side such that
|
||||
# it is not raised inside the surrounding
|
||||
# context block's scope!
|
||||
except trio.EndOfChannel as eoc:
|
||||
if (
|
||||
eoc
|
||||
and
|
||||
stream.closed
|
||||
):
|
||||
# sanity, can remove?
|
||||
assert eoc is stream._eoc
|
||||
|
||||
log.warning(
|
||||
'Stream was terminated by EoC\n\n'
|
||||
# NOTE: won't show the error <Type> but
|
||||
# does show txt followed by IPC msg.
|
||||
f'{str(eoc)}\n'
|
||||
)
|
||||
|
||||
finally:
|
||||
if ctx._portal:
|
||||
try:
|
||||
ctx._portal._streams.remove(stream)
|
||||
except KeyError:
|
||||
log.warning(
|
||||
f'Stream was already destroyed?\n'
|
||||
f'actor: {ctx.chan.uid}\n'
|
||||
f'ctx id: {ctx.cid}'
|
||||
)
|
||||
|
||||
|
||||
|
||||
def stream(func: Callable) -> Callable:
|
||||
'''
|
||||
Mark an async function as a streaming routine with ``@stream``.
|
||||
|
|
|
@ -52,6 +52,10 @@ from msgspec import (
|
|||
msgpack,
|
||||
Raw,
|
||||
)
|
||||
# from trio.lowlevel import (
|
||||
# RunVar,
|
||||
# RunVarToken,
|
||||
# )
|
||||
# TODO: see notes below from @mikenerone..
|
||||
# from tricycle import TreeVar
|
||||
|
||||
|
@ -364,16 +368,160 @@ class MsgCodec(Struct):
|
|||
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
||||
return self._dec.decode(msg)
|
||||
|
||||
|
||||
# [x] TODO: a sub-decoder system as well? => No!
|
||||
# TODO: a sub-decoder system as well?
|
||||
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||
# see related comments in `.msg.types`
|
||||
# _payload_decs: (
|
||||
# dict[
|
||||
# str,
|
||||
# msgpack.Decoder,
|
||||
# ]
|
||||
# |None
|
||||
# ) = None
|
||||
# OR
|
||||
# ) = {
|
||||
# # pre-seed decoders for std-py-type-set for use when
|
||||
# # `MsgType.pld == None|Any`.
|
||||
# None: msgpack.Decoder(Any),
|
||||
# Any: msgpack.Decoder(Any),
|
||||
# }
|
||||
#
|
||||
# -[x] do we still want to try and support the sub-decoder with
|
||||
# -[ ] do we still want to try and support the sub-decoder with
|
||||
# `.Raw` technique in the case that the `Generic` approach gives
|
||||
# future grief?
|
||||
# => NO, since we went with the `PldRx` approach instead B)
|
||||
#
|
||||
# IF however you want to see the code that was staged for this
|
||||
# from wayyy back, see the pure removal commit.
|
||||
# -[ ] <NEW-ISSUE-FOR-ThIS-HERE>
|
||||
# -> https://jcristharif.com/msgspec/api.html#raw
|
||||
#
|
||||
#def mk_pld_subdec(
|
||||
# self,
|
||||
# payload_types: Union[Type[Struct]],
|
||||
|
||||
#) -> msgpack.Decoder:
|
||||
# # TODO: sub-decoder suppor for `.pld: Raw`?
|
||||
# # => see similar notes inside `.msg.types`..
|
||||
# #
|
||||
# # not sure we'll end up needing this though it might have
|
||||
# # unforeseen advantages in terms of enabling encrypted
|
||||
# # appliciation layer (only) payloads?
|
||||
# #
|
||||
# # register sub-payload decoders to load `.pld: Raw`
|
||||
# # decoded `Msg`-packets using a dynamic lookup (table)
|
||||
# # instead of a pre-defined msg-spec via `Generic`
|
||||
# # parameterization.
|
||||
# #
|
||||
# (
|
||||
# tags,
|
||||
# payload_dec,
|
||||
# ) = mk_tagged_union_dec(
|
||||
# tagged_structs=list(payload_types.__args__),
|
||||
# )
|
||||
# # register sub-decoders by tag
|
||||
# subdecs: dict[str, msgpack.Decoder]|None = self._payload_decs
|
||||
# for name in tags:
|
||||
# subdecs.setdefault(
|
||||
# name,
|
||||
# payload_dec,
|
||||
# )
|
||||
|
||||
# return payload_dec
|
||||
|
||||
# sub-decoders for retreiving embedded
|
||||
# payload data and decoding to a sender
|
||||
# side defined (struct) type.
|
||||
# def dec_payload(
|
||||
# codec: MsgCodec,
|
||||
# msg: Msg,
|
||||
|
||||
# ) -> Any|Struct:
|
||||
|
||||
# msg: PayloadMsg = codec.dec.decode(msg)
|
||||
# payload_tag: str = msg.header.payload_tag
|
||||
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
|
||||
# return payload_dec.decode(msg.pld)
|
||||
|
||||
# def enc_payload(
|
||||
# codec: MsgCodec,
|
||||
# payload: Any,
|
||||
# cid: str,
|
||||
|
||||
# ) -> bytes:
|
||||
|
||||
# # tag_field: str|None = None
|
||||
|
||||
# plbytes = codec.enc.encode(payload)
|
||||
# if b'msg_type' in plbytes:
|
||||
# assert isinstance(payload, Struct)
|
||||
|
||||
# # tag_field: str = type(payload).__name__
|
||||
# payload = msgspec.Raw(plbytes)
|
||||
|
||||
# msg = Msg(
|
||||
# cid=cid,
|
||||
# pld=payload,
|
||||
# # Header(
|
||||
# # payload_tag=tag_field,
|
||||
# # # dialog_id,
|
||||
# # ),
|
||||
# )
|
||||
# return codec.enc.encode(msg)
|
||||
|
||||
|
||||
|
||||
# TODO: sub-decoded `Raw` fields?
|
||||
# -[ ] see `MsgCodec._payload_decs` notes
|
||||
#
|
||||
# XXX if we wanted something more complex then field name str-keys
|
||||
# we might need a header field type to describe the lookup sys?
|
||||
# class Header(Struct, tag=True):
|
||||
# '''
|
||||
# A msg header which defines payload properties
|
||||
|
||||
# '''
|
||||
# payload_tag: str|None = None
|
||||
|
||||
|
||||
#def mk_tagged_union_dec(
|
||||
# tagged_structs: list[Struct],
|
||||
|
||||
#) -> tuple[
|
||||
# list[str],
|
||||
# msgpack.Decoder,
|
||||
#]:
|
||||
# '''
|
||||
# Create a `msgpack.Decoder` for an input `list[msgspec.Struct]`
|
||||
# and return a `list[str]` of each struct's `tag_field: str` value
|
||||
# which can be used to "map to" the initialized dec.
|
||||
|
||||
# '''
|
||||
# # See "tagged unions" docs:
|
||||
# # https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
|
||||
# # "The quickest way to enable tagged unions is to set tag=True when
|
||||
# # defining every struct type in the union. In this case tag_field
|
||||
# # defaults to "type", and tag defaults to the struct class name
|
||||
# # (e.g. "Get")."
|
||||
# first: Struct = tagged_structs[0]
|
||||
# types_union: Union[Type[Struct]] = Union[
|
||||
# first
|
||||
# ]|Any
|
||||
# tags: list[str] = [first.__name__]
|
||||
|
||||
# for struct in tagged_structs[1:]:
|
||||
# types_union |= struct
|
||||
# tags.append(
|
||||
# getattr(
|
||||
# struct,
|
||||
# struct.__struct_config__.tag_field,
|
||||
# struct.__name__,
|
||||
# )
|
||||
# )
|
||||
|
||||
# dec = msgpack.Decoder(types_union)
|
||||
# return (
|
||||
# tags,
|
||||
# dec,
|
||||
# )
|
||||
|
||||
|
||||
def mk_codec(
|
||||
|
@ -496,6 +644,10 @@ _def_tractor_codec: MsgCodec = mk_codec(
|
|||
# 3. We similarly set the pending values for the child nurseries
|
||||
# of the *current* task.
|
||||
#
|
||||
|
||||
# TODO: STOP USING THIS, since it's basically a global and won't
|
||||
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
||||
# _ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
||||
'msgspec_codec',
|
||||
default=_def_tractor_codec,
|
||||
|
@ -630,31 +782,3 @@ def limit_msg_spec(
|
|||
# # import pdbp; pdbp.set_trace()
|
||||
# assert ext_codec.pld_spec == extended_spec
|
||||
# yield ext_codec
|
||||
|
||||
|
||||
# TODO: make something similar to this inside `._codec` such that
|
||||
# user can just pass a type table of some sort?
|
||||
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
|
||||
# and then call `.to_dict()` on them?
|
||||
# -[x] we're going to need to re-impl all the stuff changed in the
|
||||
# runtime port such that it can handle dicts or `Msg`s?
|
||||
#
|
||||
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
|
||||
# '''
|
||||
# Deliver a `enc_hook()`/`dec_hook()` pair which does
|
||||
# manual convertion from our above native `Msg` set
|
||||
# to `dict` equivalent (wire msgs) in order to keep legacy compat
|
||||
# with the original runtime implementation.
|
||||
#
|
||||
# Note: this is is/was primarly used while moving the core
|
||||
# runtime over to using native `Msg`-struct types wherein we
|
||||
# start with the send side emitting without loading
|
||||
# a typed-decoder and then later flipping the switch over to
|
||||
# load to the native struct types once all runtime usage has
|
||||
# been adjusted appropriately.
|
||||
#
|
||||
# '''
|
||||
# return (
|
||||
# # enc_to_dict,
|
||||
# dec_from_dict,
|
||||
# )
|
||||
|
|
|
@ -26,6 +26,7 @@ from __future__ import annotations
|
|||
import types
|
||||
from typing import (
|
||||
Any,
|
||||
# Callable,
|
||||
Generic,
|
||||
Literal,
|
||||
Type,
|
||||
|
@ -160,6 +161,7 @@ class SpawnSpec(
|
|||
bind_addrs: list[tuple[str, int]]
|
||||
|
||||
|
||||
|
||||
# TODO: caps based RPC support in the payload?
|
||||
#
|
||||
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
|
||||
|
@ -312,9 +314,8 @@ class Started(
|
|||
pld: PayloadT|Raw
|
||||
|
||||
|
||||
# TODO: cancel request dedicated msg?
|
||||
# -[ ] instead of using our existing `Start`?
|
||||
#
|
||||
# TODO: instead of using our existing `Start`
|
||||
# for this (as we did with the original `{'cmd': ..}` style)
|
||||
# class Cancel:
|
||||
# cid: str
|
||||
|
||||
|
@ -476,16 +477,12 @@ def from_dict_msg(
|
|||
)
|
||||
return msgT(**dict_msg)
|
||||
|
||||
# TODO: should be make a set of cancel msgs?
|
||||
# -[ ] a version of `ContextCancelled`?
|
||||
# |_ and/or with a scope field?
|
||||
# -[ ] or, a full `ActorCancelled`?
|
||||
#
|
||||
# TODO: should be make a msg version of `ContextCancelled?`
|
||||
# and/or with a scope field or a full `ActorCancelled`?
|
||||
# class Cancelled(MsgType):
|
||||
# cid: str
|
||||
#
|
||||
# -[ ] what about overruns?
|
||||
#
|
||||
|
||||
# TODO what about overruns?
|
||||
# class Overrun(MsgType):
|
||||
# cid: str
|
||||
|
||||
|
@ -567,17 +564,10 @@ def mk_msg_spec(
|
|||
Create a payload-(data-)type-parameterized IPC message specification.
|
||||
|
||||
Allows generating IPC msg types from the above builtin set
|
||||
with a payload (field) restricted data-type, the `Msg.pld: PayloadT`.
|
||||
|
||||
This allows runtime-task contexts to use the python type system
|
||||
to limit/filter payload values as determined by the input
|
||||
`payload_type_union: Union[Type]`.
|
||||
|
||||
Notes: originally multiple approaches for constructing the
|
||||
type-union passed to `msgspec` were attempted as selected via the
|
||||
`spec_build_method`, but it turns out only the defaul method
|
||||
'indexed_generics' seems to work reliably in all use cases. As
|
||||
such, the others will likely be removed in the near future.
|
||||
with a payload (field) restricted data-type via the `Msg.pld:
|
||||
PayloadT` type var. This allows runtime-task contexts to use
|
||||
the python type system to limit/filter payload values as
|
||||
determined by the input `payload_type_union: Union[Type]`.
|
||||
|
||||
'''
|
||||
submsg_types: list[MsgType] = Msg.__subclasses__()
|
||||
|
@ -717,3 +707,31 @@ def mk_msg_spec(
|
|||
+
|
||||
ipc_msg_types,
|
||||
)
|
||||
|
||||
|
||||
# TODO: make something similar to this inside `._codec` such that
|
||||
# user can just pass a type table of some sort?
|
||||
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
|
||||
# and then call `.to_dict()` on them?
|
||||
# -[ ] we're going to need to re-impl all the stuff changed in the
|
||||
# runtime port such that it can handle dicts or `Msg`s?
|
||||
#
|
||||
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
|
||||
# '''
|
||||
# Deliver a `enc_hook()`/`dec_hook()` pair which does
|
||||
# manual convertion from our above native `Msg` set
|
||||
# to `dict` equivalent (wire msgs) in order to keep legacy compat
|
||||
# with the original runtime implementation.
|
||||
#
|
||||
# Note: this is is/was primarly used while moving the core
|
||||
# runtime over to using native `Msg`-struct types wherein we
|
||||
# start with the send side emitting without loading
|
||||
# a typed-decoder and then later flipping the switch over to
|
||||
# load to the native struct types once all runtime usage has
|
||||
# been adjusted appropriately.
|
||||
#
|
||||
# '''
|
||||
# return (
|
||||
# # enc_to_dict,
|
||||
# dec_from_dict,
|
||||
# )
|
||||
|
|
Loading…
Reference in New Issue