Compare commits
No commits in common. "f0342d6ae31416da506ef4cbcdbe5baf528f9104" and "8ea0f08386ec62721581a792156f62d124b0d2aa" have entirely different histories.
f0342d6ae3
...
8ea0f08386
|
@ -315,7 +315,7 @@ def test_basic_payload_spec(
|
||||||
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
f"match type-spec: `{msg_type_str}.pld: PldMsg|NoneType`",
|
||||||
],
|
],
|
||||||
# only for debug
|
# only for debug
|
||||||
# post_mortem=True,
|
post_mortem=True,
|
||||||
),
|
),
|
||||||
p.open_context(
|
p.open_context(
|
||||||
child,
|
child,
|
||||||
|
|
|
@ -86,10 +86,7 @@ from .msg import (
|
||||||
from ._ipc import (
|
from ._ipc import (
|
||||||
Channel,
|
Channel,
|
||||||
)
|
)
|
||||||
from ._streaming import (
|
from ._streaming import MsgStream
|
||||||
MsgStream,
|
|
||||||
open_stream_from_ctx,
|
|
||||||
)
|
|
||||||
from ._state import (
|
from ._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
debug_mode,
|
debug_mode,
|
||||||
|
@ -981,6 +978,198 @@ class Context:
|
||||||
assert self._scope
|
assert self._scope
|
||||||
self._scope.cancel()
|
self._scope.cancel()
|
||||||
|
|
||||||
|
# TODO? should we move this to `._streaming` much like we
|
||||||
|
# moved `Portal.open_context()`'s def to this mod?
|
||||||
|
@acm
|
||||||
|
async def open_stream(
|
||||||
|
self,
|
||||||
|
allow_overruns: bool|None = False,
|
||||||
|
msg_buffer_size: int|None = None,
|
||||||
|
|
||||||
|
) -> AsyncGenerator[MsgStream, None]:
|
||||||
|
'''
|
||||||
|
Open a ``MsgStream``, a bi-directional stream connected to the
|
||||||
|
cross-actor (far end) task for this ``Context``.
|
||||||
|
|
||||||
|
This context manager must be entered on both the caller and
|
||||||
|
callee for the stream to logically be considered "connected".
|
||||||
|
|
||||||
|
A ``MsgStream`` is currently "one-shot" use, meaning if you
|
||||||
|
close it you can not "re-open" it for streaming and instead you
|
||||||
|
must re-establish a new surrounding ``Context`` using
|
||||||
|
``Portal.open_context()``. In the future this may change but
|
||||||
|
currently there seems to be no obvious reason to support
|
||||||
|
"re-opening":
|
||||||
|
- pausing a stream can be done with a message.
|
||||||
|
- task errors will normally require a restart of the entire
|
||||||
|
scope of the inter-actor task context due to the nature of
|
||||||
|
``trio``'s cancellation system.
|
||||||
|
|
||||||
|
'''
|
||||||
|
actor: Actor = self._actor
|
||||||
|
|
||||||
|
# If the surrounding context has been cancelled by some
|
||||||
|
# task with a handle to THIS, we error here immediately
|
||||||
|
# since it likely means the surrounding lexical-scope has
|
||||||
|
# errored, been `trio.Cancelled` or at the least
|
||||||
|
# `Context.cancel()` was called by some task.
|
||||||
|
if self._cancel_called:
|
||||||
|
|
||||||
|
# XXX NOTE: ALWAYS RAISE any remote error here even if
|
||||||
|
# it's an expected `ContextCancelled` due to a local
|
||||||
|
# task having called `.cancel()`!
|
||||||
|
#
|
||||||
|
# WHY: we expect the error to always bubble up to the
|
||||||
|
# surrounding `Portal.open_context()` call and be
|
||||||
|
# absorbed there (silently) and we DO NOT want to
|
||||||
|
# actually try to stream - a cancel msg was already
|
||||||
|
# sent to the other side!
|
||||||
|
self.maybe_raise(
|
||||||
|
raise_ctxc_from_self_call=True,
|
||||||
|
)
|
||||||
|
# NOTE: this is diff then calling
|
||||||
|
# `._maybe_raise_remote_err()` specifically
|
||||||
|
# because we want to raise a ctxc on any task entering this `.open_stream()`
|
||||||
|
# AFTER cancellation was already been requested,
|
||||||
|
# we DO NOT want to absorb any ctxc ACK silently!
|
||||||
|
# if self._remote_error:
|
||||||
|
# raise self._remote_error
|
||||||
|
|
||||||
|
# XXX NOTE: if no `ContextCancelled` has been responded
|
||||||
|
# back from the other side (yet), we raise a different
|
||||||
|
# runtime error indicating that this task's usage of
|
||||||
|
# `Context.cancel()` and then `.open_stream()` is WRONG!
|
||||||
|
task: str = trio.lowlevel.current_task().name
|
||||||
|
raise RuntimeError(
|
||||||
|
'Stream opened after `Context.cancel()` called..?\n'
|
||||||
|
f'task: {actor.uid[0]}:{task}\n'
|
||||||
|
f'{self}'
|
||||||
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
not self._portal
|
||||||
|
and not self._started_called
|
||||||
|
):
|
||||||
|
raise RuntimeError(
|
||||||
|
'Context.started()` must be called before opening a stream'
|
||||||
|
)
|
||||||
|
|
||||||
|
# NOTE: in one way streaming this only happens on the
|
||||||
|
# parent-ctx-task side (on the side that calls
|
||||||
|
# `Actor.start_remote_task()`) so if you try to send
|
||||||
|
# a stop from the caller to the callee in the
|
||||||
|
# single-direction-stream case you'll get a lookup error
|
||||||
|
# currently.
|
||||||
|
ctx: Context = actor.get_context(
|
||||||
|
chan=self.chan,
|
||||||
|
cid=self.cid,
|
||||||
|
nsf=self._nsf,
|
||||||
|
# side=self.side,
|
||||||
|
|
||||||
|
msg_buffer_size=msg_buffer_size,
|
||||||
|
allow_overruns=allow_overruns,
|
||||||
|
)
|
||||||
|
ctx._allow_overruns: bool = allow_overruns
|
||||||
|
assert ctx is self
|
||||||
|
|
||||||
|
# XXX: If the underlying channel feeder receive mem chan has
|
||||||
|
# been closed then likely client code has already exited
|
||||||
|
# a ``.open_stream()`` block prior or there was some other
|
||||||
|
# unanticipated error or cancellation from ``trio``.
|
||||||
|
|
||||||
|
if ctx._rx_chan._closed:
|
||||||
|
raise trio.ClosedResourceError(
|
||||||
|
'The underlying channel for this stream was already closed!\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# NOTE: implicitly this will call `MsgStream.aclose()` on
|
||||||
|
# `.__aexit__()` due to stream's parent `Channel` type!
|
||||||
|
#
|
||||||
|
# XXX NOTE XXX: ensures the stream is "one-shot use",
|
||||||
|
# which specifically means that on exit,
|
||||||
|
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
|
||||||
|
# the far end indicating that the caller exited
|
||||||
|
# the streaming context purposefully by letting
|
||||||
|
# the exit block exec.
|
||||||
|
# - this is diff from the cancel/error case where
|
||||||
|
# a cancel request from this side or an error
|
||||||
|
# should be sent to the far end indicating the
|
||||||
|
# stream WAS NOT just closed normally/gracefully.
|
||||||
|
async with MsgStream(
|
||||||
|
ctx=self,
|
||||||
|
rx_chan=ctx._rx_chan,
|
||||||
|
) as stream:
|
||||||
|
|
||||||
|
# NOTE: we track all existing streams per portal for
|
||||||
|
# the purposes of attempting graceful closes on runtime
|
||||||
|
# cancel requests.
|
||||||
|
if self._portal:
|
||||||
|
self._portal._streams.add(stream)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._stream_opened: bool = True
|
||||||
|
self._stream = stream
|
||||||
|
|
||||||
|
# XXX: do we need this?
|
||||||
|
# ensure we aren't cancelled before yielding the stream
|
||||||
|
# await trio.lowlevel.checkpoint()
|
||||||
|
yield stream
|
||||||
|
|
||||||
|
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
||||||
|
# wait for any immediate child in debug before popping the
|
||||||
|
# context from the runtime msg loop otherwise inside
|
||||||
|
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
|
||||||
|
# the case where that msg is global debugger unlock (via
|
||||||
|
# a "stop" msg for a stream), this can result in a deadlock
|
||||||
|
# where the root is waiting on the lock to clear but the
|
||||||
|
# child has already cleared it and clobbered IPC.
|
||||||
|
#
|
||||||
|
# await maybe_wait_for_debugger()
|
||||||
|
|
||||||
|
# XXX TODO: pretty sure this isn't needed (see
|
||||||
|
# note above this block) AND will result in
|
||||||
|
# a double `.send_stop()` call. The only reason to
|
||||||
|
# put it here would be to due with "order" in
|
||||||
|
# terms of raising any remote error (as per
|
||||||
|
# directly below) or bc the stream's
|
||||||
|
# `.__aexit__()` block might not get run
|
||||||
|
# (doubtful)? Either way if we did put this back
|
||||||
|
# in we also need a state var to avoid the double
|
||||||
|
# stop-msg send..
|
||||||
|
#
|
||||||
|
# await stream.aclose()
|
||||||
|
|
||||||
|
# NOTE: absorb and do not raise any
|
||||||
|
# EoC received from the other side such that
|
||||||
|
# it is not raised inside the surrounding
|
||||||
|
# context block's scope!
|
||||||
|
except trio.EndOfChannel as eoc:
|
||||||
|
if (
|
||||||
|
eoc
|
||||||
|
and
|
||||||
|
stream.closed
|
||||||
|
):
|
||||||
|
# sanity, can remove?
|
||||||
|
assert eoc is stream._eoc
|
||||||
|
|
||||||
|
log.warning(
|
||||||
|
'Stream was terminated by EoC\n\n'
|
||||||
|
# NOTE: won't show the error <Type> but
|
||||||
|
# does show txt followed by IPC msg.
|
||||||
|
f'{str(eoc)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if self._portal:
|
||||||
|
try:
|
||||||
|
self._portal._streams.remove(stream)
|
||||||
|
except KeyError:
|
||||||
|
log.warning(
|
||||||
|
f'Stream was already destroyed?\n'
|
||||||
|
f'actor: {self.chan.uid}\n'
|
||||||
|
f'ctx id: {self.cid}'
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: replace all the `._maybe_raise_remote_err()` usage
|
# TODO: replace all the `._maybe_raise_remote_err()` usage
|
||||||
# with instances of this!!
|
# with instances of this!!
|
||||||
def maybe_raise(
|
def maybe_raise(
|
||||||
|
@ -989,14 +1178,6 @@ class Context:
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> Exception|None:
|
) -> Exception|None:
|
||||||
'''
|
|
||||||
Check for for a remote error delivered by the runtime from
|
|
||||||
our peer (task); if set immediately raise.
|
|
||||||
|
|
||||||
This is a convenience wrapper for
|
|
||||||
`._maybe_raise_remote_err(self._remote_error)`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
if re := self._remote_error:
|
if re := self._remote_error:
|
||||||
return self._maybe_raise_remote_err(
|
return self._maybe_raise_remote_err(
|
||||||
|
@ -1109,7 +1290,8 @@ class Context:
|
||||||
|
|
||||||
raise remote_error
|
raise remote_error
|
||||||
|
|
||||||
async def wait_for_result(
|
# TODO: change to `.wait_for_result()`?
|
||||||
|
async def result(
|
||||||
self,
|
self,
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
|
||||||
|
@ -1198,27 +1380,18 @@ class Context:
|
||||||
(not self._cancel_called)
|
(not self._cancel_called)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# TODO: eventually make `.outcome: Outcome` and thus return
|
|
||||||
# `self.outcome.unwrap()` here!
|
|
||||||
return self.outcome
|
return self.outcome
|
||||||
|
|
||||||
# TODO: switch this with above!
|
# TODO: switch this with above!
|
||||||
# -[ ] should be named `.wait_for_outcome()` and instead do
|
# -[ ] should be named `.wait_for_outcome()` and instead do
|
||||||
# a `.outcome.Outcome.unwrap()` ?
|
# a `.outcome.Outcome.unwrap()` ?
|
||||||
#
|
#
|
||||||
async def result(
|
# @property
|
||||||
self,
|
# def result(self) -> Any|None:
|
||||||
*args,
|
# if self._final_result_is_set():
|
||||||
**kwargs,
|
# return self._result
|
||||||
) -> Any|Exception:
|
|
||||||
log.warning(
|
# raise RuntimeError('No result is available!')
|
||||||
'`Context.result()` is DEPRECATED!\n'
|
|
||||||
'Use `Context.[no]wait_for_result()` instead!\n'
|
|
||||||
)
|
|
||||||
return await self.wait_for_result(
|
|
||||||
*args,
|
|
||||||
**kwargs,
|
|
||||||
)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def maybe_error(self) -> BaseException|None:
|
def maybe_error(self) -> BaseException|None:
|
||||||
|
@ -1274,9 +1447,6 @@ class Context:
|
||||||
return self._result is not Unresolved
|
return self._result is not Unresolved
|
||||||
|
|
||||||
# def get_result_nowait(self) -> Any|None:
|
# def get_result_nowait(self) -> Any|None:
|
||||||
# def get_outcome_nowait(self) -> Any|None:
|
|
||||||
# def recv_result_nowait(self) -> Any|None:
|
|
||||||
# def receive_outcome_nowait(self) -> Any|None:
|
|
||||||
# TODO: use `outcome.Outcome` here instead?
|
# TODO: use `outcome.Outcome` here instead?
|
||||||
@property
|
@property
|
||||||
def outcome(self) -> (
|
def outcome(self) -> (
|
||||||
|
@ -1306,6 +1476,7 @@ class Context:
|
||||||
def has_outcome(self) -> bool:
|
def has_outcome(self) -> bool:
|
||||||
return bool(self.maybe_error) or self._final_result_is_set()
|
return bool(self.maybe_error) or self._final_result_is_set()
|
||||||
|
|
||||||
|
# @property
|
||||||
def repr_outcome(
|
def repr_outcome(
|
||||||
self,
|
self,
|
||||||
show_error_fields: bool = False,
|
show_error_fields: bool = False,
|
||||||
|
@ -1327,8 +1498,7 @@ class Context:
|
||||||
# just deliver the type name.
|
# just deliver the type name.
|
||||||
if (
|
if (
|
||||||
(reprol := getattr(merr, 'reprol', False))
|
(reprol := getattr(merr, 'reprol', False))
|
||||||
and
|
and show_error_fields
|
||||||
show_error_fields
|
|
||||||
):
|
):
|
||||||
return reprol()
|
return reprol()
|
||||||
|
|
||||||
|
@ -1345,6 +1515,10 @@ class Context:
|
||||||
repr(merr)
|
repr(merr)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# just the type name
|
||||||
|
# else: # but wen?
|
||||||
|
# return type(merr).__name__
|
||||||
|
|
||||||
# for all other errors show their regular output
|
# for all other errors show their regular output
|
||||||
return (
|
return (
|
||||||
str(merr)
|
str(merr)
|
||||||
|
@ -1398,7 +1572,7 @@ class Context:
|
||||||
_, # any non-unresolved value
|
_, # any non-unresolved value
|
||||||
None,
|
None,
|
||||||
) if self._final_result_is_set():
|
) if self._final_result_is_set():
|
||||||
status = 'result-returned'
|
status = 'returned'
|
||||||
|
|
||||||
# normal operation but still in a pre-`Return`-result
|
# normal operation but still in a pre-`Return`-result
|
||||||
# dialog phase
|
# dialog phase
|
||||||
|
@ -1766,11 +1940,6 @@ class Context:
|
||||||
# ow, indicate unable to deliver by default
|
# ow, indicate unable to deliver by default
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# NOTE: similar to `Portal.open_context()`, this impl is found in
|
|
||||||
# the `._streaming`` mod to make reading/groking the details
|
|
||||||
# simpler code-org-wise.
|
|
||||||
open_stream = open_stream_from_ctx
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: exception tb masking by using a manual
|
# TODO: exception tb masking by using a manual
|
||||||
# `.__aexit__()`/.__aenter__()` pair on a type?
|
# `.__aexit__()`/.__aenter__()` pair on a type?
|
||||||
|
|
|
@ -739,24 +739,37 @@ async def _invoke(
|
||||||
cid,
|
cid,
|
||||||
))
|
))
|
||||||
|
|
||||||
logmeth: Callable = log.runtime
|
|
||||||
merr: Exception|None = ctx.maybe_error
|
merr: Exception|None = ctx.maybe_error
|
||||||
descr_str: str = 'with final result `{repr(ctx.outcome)}`'
|
(
|
||||||
|
res_type_str,
|
||||||
|
res_str,
|
||||||
|
) = (
|
||||||
|
('error', f'{type(merr)}',) if merr
|
||||||
|
else (
|
||||||
|
'result',
|
||||||
|
f'`{repr(ctx.outcome)}`',
|
||||||
|
)
|
||||||
|
)
|
||||||
message: str = (
|
message: str = (
|
||||||
f'IPC context terminated {descr_str}\n\n'
|
f'IPC context terminated with a final {res_type_str}\n\n'
|
||||||
|
f'{ctx}'
|
||||||
)
|
)
|
||||||
if merr:
|
if merr:
|
||||||
descr_str: str = (
|
from tractor import RemoteActorError
|
||||||
f'with ctx having {ctx.repr_state!r}\n'
|
if not isinstance(merr, RemoteActorError):
|
||||||
f'{ctx.repr_outcome()}\n'
|
fmt_merr: str = (
|
||||||
)
|
f'\n{merr!r}\n'
|
||||||
if isinstance(merr, ContextCancelled):
|
# f'{merr.args[0]!r}\n'
|
||||||
logmeth: Callable = log.runtime
|
)
|
||||||
else:
|
else:
|
||||||
logmeth: Callable = log.error
|
fmt_merr = f'\n{merr!r}'
|
||||||
message += f'\n{merr!r}\n'
|
log.error(
|
||||||
|
message
|
||||||
logmeth(message)
|
+
|
||||||
|
fmt_merr
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
log.runtime(message)
|
||||||
|
|
||||||
|
|
||||||
async def try_ship_error_to_remote(
|
async def try_ship_error_to_remote(
|
||||||
|
|
|
@ -26,7 +26,6 @@ import inspect
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncGenerator,
|
|
||||||
Callable,
|
Callable,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -52,7 +51,6 @@ from tractor.msg import (
|
||||||
)
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._runtime import Actor
|
|
||||||
from ._context import Context
|
from ._context import Context
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
|
|
||||||
|
@ -552,213 +550,6 @@ class MsgStream(trio.abc.Channel):
|
||||||
# ...
|
# ...
|
||||||
|
|
||||||
|
|
||||||
@acm
|
|
||||||
async def open_stream_from_ctx(
|
|
||||||
ctx: Context,
|
|
||||||
allow_overruns: bool|None = False,
|
|
||||||
msg_buffer_size: int|None = None,
|
|
||||||
|
|
||||||
) -> AsyncGenerator[MsgStream, None]:
|
|
||||||
'''
|
|
||||||
Open a `MsgStream`, a bi-directional msg transport dialog
|
|
||||||
connected to the cross-actor peer task for an IPC `Context`.
|
|
||||||
|
|
||||||
This context manager must be entered in both the "parent" (task
|
|
||||||
which entered `Portal.open_context()`) and "child" (RPC task
|
|
||||||
which is decorated by `@context`) tasks for the stream to
|
|
||||||
logically be considered "open"; if one side begins sending to an
|
|
||||||
un-opened peer, depending on policy config, msgs will either be
|
|
||||||
queued until the other side opens and/or a `StreamOverrun` will
|
|
||||||
(eventually) be raised.
|
|
||||||
|
|
||||||
------ - ------
|
|
||||||
|
|
||||||
Runtime semantics design:
|
|
||||||
|
|
||||||
A `MsgStream` session adheres to "one-shot use" semantics,
|
|
||||||
meaning if you close the scope it **can not** be "re-opened".
|
|
||||||
|
|
||||||
Instead you must re-establish a new surrounding RPC `Context`
|
|
||||||
(RTC: remote task context?) using `Portal.open_context()`.
|
|
||||||
|
|
||||||
In the future this *design choice* may need to be changed but
|
|
||||||
currently there seems to be no obvious reason to support such
|
|
||||||
semantics..
|
|
||||||
|
|
||||||
- "pausing a stream" can be supported with a message implemented
|
|
||||||
by the `tractor` application dev.
|
|
||||||
|
|
||||||
- any remote error will normally require a restart of the entire
|
|
||||||
`trio.Task`'s scope due to the nature of `trio`'s cancellation
|
|
||||||
(`CancelScope`) system and semantics (level triggered).
|
|
||||||
|
|
||||||
'''
|
|
||||||
actor: Actor = ctx._actor
|
|
||||||
|
|
||||||
# If the surrounding context has been cancelled by some
|
|
||||||
# task with a handle to THIS, we error here immediately
|
|
||||||
# since it likely means the surrounding lexical-scope has
|
|
||||||
# errored, been `trio.Cancelled` or at the least
|
|
||||||
# `Context.cancel()` was called by some task.
|
|
||||||
if ctx._cancel_called:
|
|
||||||
|
|
||||||
# XXX NOTE: ALWAYS RAISE any remote error here even if
|
|
||||||
# it's an expected `ContextCancelled` due to a local
|
|
||||||
# task having called `.cancel()`!
|
|
||||||
#
|
|
||||||
# WHY: we expect the error to always bubble up to the
|
|
||||||
# surrounding `Portal.open_context()` call and be
|
|
||||||
# absorbed there (silently) and we DO NOT want to
|
|
||||||
# actually try to stream - a cancel msg was already
|
|
||||||
# sent to the other side!
|
|
||||||
ctx.maybe_raise(
|
|
||||||
raise_ctxc_from_self_call=True,
|
|
||||||
)
|
|
||||||
# NOTE: this is diff then calling
|
|
||||||
# `._maybe_raise_remote_err()` specifically
|
|
||||||
# because we want to raise a ctxc on any task entering this `.open_stream()`
|
|
||||||
# AFTER cancellation was already been requested,
|
|
||||||
# we DO NOT want to absorb any ctxc ACK silently!
|
|
||||||
# if ctx._remote_error:
|
|
||||||
# raise ctx._remote_error
|
|
||||||
|
|
||||||
# XXX NOTE: if no `ContextCancelled` has been responded
|
|
||||||
# back from the other side (yet), we raise a different
|
|
||||||
# runtime error indicating that this task's usage of
|
|
||||||
# `Context.cancel()` and then `.open_stream()` is WRONG!
|
|
||||||
task: str = trio.lowlevel.current_task().name
|
|
||||||
raise RuntimeError(
|
|
||||||
'Stream opened after `Context.cancel()` called..?\n'
|
|
||||||
f'task: {actor.uid[0]}:{task}\n'
|
|
||||||
f'{ctx}'
|
|
||||||
)
|
|
||||||
|
|
||||||
if (
|
|
||||||
not ctx._portal
|
|
||||||
and not ctx._started_called
|
|
||||||
):
|
|
||||||
raise RuntimeError(
|
|
||||||
'Context.started()` must be called before opening a stream'
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: in one way streaming this only happens on the
|
|
||||||
# parent-ctx-task side (on the side that calls
|
|
||||||
# `Actor.start_remote_task()`) so if you try to send
|
|
||||||
# a stop from the caller to the callee in the
|
|
||||||
# single-direction-stream case you'll get a lookup error
|
|
||||||
# currently.
|
|
||||||
ctx: Context = actor.get_context(
|
|
||||||
chan=ctx.chan,
|
|
||||||
cid=ctx.cid,
|
|
||||||
nsf=ctx._nsf,
|
|
||||||
# side=ctx.side,
|
|
||||||
|
|
||||||
msg_buffer_size=msg_buffer_size,
|
|
||||||
allow_overruns=allow_overruns,
|
|
||||||
)
|
|
||||||
ctx._allow_overruns: bool = allow_overruns
|
|
||||||
assert ctx is ctx
|
|
||||||
|
|
||||||
# XXX: If the underlying channel feeder receive mem chan has
|
|
||||||
# been closed then likely client code has already exited
|
|
||||||
# a ``.open_stream()`` block prior or there was some other
|
|
||||||
# unanticipated error or cancellation from ``trio``.
|
|
||||||
|
|
||||||
if ctx._rx_chan._closed:
|
|
||||||
raise trio.ClosedResourceError(
|
|
||||||
'The underlying channel for this stream was already closed!\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: implicitly this will call `MsgStream.aclose()` on
|
|
||||||
# `.__aexit__()` due to stream's parent `Channel` type!
|
|
||||||
#
|
|
||||||
# XXX NOTE XXX: ensures the stream is "one-shot use",
|
|
||||||
# which specifically means that on exit,
|
|
||||||
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
|
|
||||||
# the far end indicating that the caller exited
|
|
||||||
# the streaming context purposefully by letting
|
|
||||||
# the exit block exec.
|
|
||||||
# - this is diff from the cancel/error case where
|
|
||||||
# a cancel request from this side or an error
|
|
||||||
# should be sent to the far end indicating the
|
|
||||||
# stream WAS NOT just closed normally/gracefully.
|
|
||||||
async with MsgStream(
|
|
||||||
ctx=ctx,
|
|
||||||
rx_chan=ctx._rx_chan,
|
|
||||||
) as stream:
|
|
||||||
|
|
||||||
# NOTE: we track all existing streams per portal for
|
|
||||||
# the purposes of attempting graceful closes on runtime
|
|
||||||
# cancel requests.
|
|
||||||
if ctx._portal:
|
|
||||||
ctx._portal._streams.add(stream)
|
|
||||||
|
|
||||||
try:
|
|
||||||
ctx._stream_opened: bool = True
|
|
||||||
ctx._stream = stream
|
|
||||||
|
|
||||||
# XXX: do we need this?
|
|
||||||
# ensure we aren't cancelled before yielding the stream
|
|
||||||
# await trio.lowlevel.checkpoint()
|
|
||||||
yield stream
|
|
||||||
|
|
||||||
# XXX: (MEGA IMPORTANT) if this is a root opened process we
|
|
||||||
# wait for any immediate child in debug before popping the
|
|
||||||
# context from the runtime msg loop otherwise inside
|
|
||||||
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
|
|
||||||
# the case where that msg is global debugger unlock (via
|
|
||||||
# a "stop" msg for a stream), this can result in a deadlock
|
|
||||||
# where the root is waiting on the lock to clear but the
|
|
||||||
# child has already cleared it and clobbered IPC.
|
|
||||||
#
|
|
||||||
# await maybe_wait_for_debugger()
|
|
||||||
|
|
||||||
# XXX TODO: pretty sure this isn't needed (see
|
|
||||||
# note above this block) AND will result in
|
|
||||||
# a double `.send_stop()` call. The only reason to
|
|
||||||
# put it here would be to due with "order" in
|
|
||||||
# terms of raising any remote error (as per
|
|
||||||
# directly below) or bc the stream's
|
|
||||||
# `.__aexit__()` block might not get run
|
|
||||||
# (doubtful)? Either way if we did put this back
|
|
||||||
# in we also need a state var to avoid the double
|
|
||||||
# stop-msg send..
|
|
||||||
#
|
|
||||||
# await stream.aclose()
|
|
||||||
|
|
||||||
# NOTE: absorb and do not raise any
|
|
||||||
# EoC received from the other side such that
|
|
||||||
# it is not raised inside the surrounding
|
|
||||||
# context block's scope!
|
|
||||||
except trio.EndOfChannel as eoc:
|
|
||||||
if (
|
|
||||||
eoc
|
|
||||||
and
|
|
||||||
stream.closed
|
|
||||||
):
|
|
||||||
# sanity, can remove?
|
|
||||||
assert eoc is stream._eoc
|
|
||||||
|
|
||||||
log.warning(
|
|
||||||
'Stream was terminated by EoC\n\n'
|
|
||||||
# NOTE: won't show the error <Type> but
|
|
||||||
# does show txt followed by IPC msg.
|
|
||||||
f'{str(eoc)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
if ctx._portal:
|
|
||||||
try:
|
|
||||||
ctx._portal._streams.remove(stream)
|
|
||||||
except KeyError:
|
|
||||||
log.warning(
|
|
||||||
f'Stream was already destroyed?\n'
|
|
||||||
f'actor: {ctx.chan.uid}\n'
|
|
||||||
f'ctx id: {ctx.cid}'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def stream(func: Callable) -> Callable:
|
def stream(func: Callable) -> Callable:
|
||||||
'''
|
'''
|
||||||
Mark an async function as a streaming routine with ``@stream``.
|
Mark an async function as a streaming routine with ``@stream``.
|
||||||
|
|
|
@ -52,6 +52,10 @@ from msgspec import (
|
||||||
msgpack,
|
msgpack,
|
||||||
Raw,
|
Raw,
|
||||||
)
|
)
|
||||||
|
# from trio.lowlevel import (
|
||||||
|
# RunVar,
|
||||||
|
# RunVarToken,
|
||||||
|
# )
|
||||||
# TODO: see notes below from @mikenerone..
|
# TODO: see notes below from @mikenerone..
|
||||||
# from tricycle import TreeVar
|
# from tricycle import TreeVar
|
||||||
|
|
||||||
|
@ -364,16 +368,160 @@ class MsgCodec(Struct):
|
||||||
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
# https://jcristharif.com/msgspec/usage.html#typed-decoding
|
||||||
return self._dec.decode(msg)
|
return self._dec.decode(msg)
|
||||||
|
|
||||||
|
# TODO: a sub-decoder system as well?
|
||||||
|
# payload_msg_specs: Union[Type[Struct]] = Any
|
||||||
|
# see related comments in `.msg.types`
|
||||||
|
# _payload_decs: (
|
||||||
|
# dict[
|
||||||
|
# str,
|
||||||
|
# msgpack.Decoder,
|
||||||
|
# ]
|
||||||
|
# |None
|
||||||
|
# ) = None
|
||||||
|
# OR
|
||||||
|
# ) = {
|
||||||
|
# # pre-seed decoders for std-py-type-set for use when
|
||||||
|
# # `MsgType.pld == None|Any`.
|
||||||
|
# None: msgpack.Decoder(Any),
|
||||||
|
# Any: msgpack.Decoder(Any),
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# -[ ] do we still want to try and support the sub-decoder with
|
||||||
|
# `.Raw` technique in the case that the `Generic` approach gives
|
||||||
|
# future grief?
|
||||||
|
#
|
||||||
|
# -[ ] <NEW-ISSUE-FOR-ThIS-HERE>
|
||||||
|
# -> https://jcristharif.com/msgspec/api.html#raw
|
||||||
|
#
|
||||||
|
#def mk_pld_subdec(
|
||||||
|
# self,
|
||||||
|
# payload_types: Union[Type[Struct]],
|
||||||
|
|
||||||
# [x] TODO: a sub-decoder system as well? => No!
|
#) -> msgpack.Decoder:
|
||||||
|
# # TODO: sub-decoder suppor for `.pld: Raw`?
|
||||||
|
# # => see similar notes inside `.msg.types`..
|
||||||
|
# #
|
||||||
|
# # not sure we'll end up needing this though it might have
|
||||||
|
# # unforeseen advantages in terms of enabling encrypted
|
||||||
|
# # appliciation layer (only) payloads?
|
||||||
|
# #
|
||||||
|
# # register sub-payload decoders to load `.pld: Raw`
|
||||||
|
# # decoded `Msg`-packets using a dynamic lookup (table)
|
||||||
|
# # instead of a pre-defined msg-spec via `Generic`
|
||||||
|
# # parameterization.
|
||||||
|
# #
|
||||||
|
# (
|
||||||
|
# tags,
|
||||||
|
# payload_dec,
|
||||||
|
# ) = mk_tagged_union_dec(
|
||||||
|
# tagged_structs=list(payload_types.__args__),
|
||||||
|
# )
|
||||||
|
# # register sub-decoders by tag
|
||||||
|
# subdecs: dict[str, msgpack.Decoder]|None = self._payload_decs
|
||||||
|
# for name in tags:
|
||||||
|
# subdecs.setdefault(
|
||||||
|
# name,
|
||||||
|
# payload_dec,
|
||||||
|
# )
|
||||||
|
|
||||||
|
# return payload_dec
|
||||||
|
|
||||||
|
# sub-decoders for retreiving embedded
|
||||||
|
# payload data and decoding to a sender
|
||||||
|
# side defined (struct) type.
|
||||||
|
# def dec_payload(
|
||||||
|
# codec: MsgCodec,
|
||||||
|
# msg: Msg,
|
||||||
|
|
||||||
|
# ) -> Any|Struct:
|
||||||
|
|
||||||
|
# msg: PayloadMsg = codec.dec.decode(msg)
|
||||||
|
# payload_tag: str = msg.header.payload_tag
|
||||||
|
# payload_dec: msgpack.Decoder = codec._payload_decs[payload_tag]
|
||||||
|
# return payload_dec.decode(msg.pld)
|
||||||
|
|
||||||
|
# def enc_payload(
|
||||||
|
# codec: MsgCodec,
|
||||||
|
# payload: Any,
|
||||||
|
# cid: str,
|
||||||
|
|
||||||
|
# ) -> bytes:
|
||||||
|
|
||||||
|
# # tag_field: str|None = None
|
||||||
|
|
||||||
|
# plbytes = codec.enc.encode(payload)
|
||||||
|
# if b'msg_type' in plbytes:
|
||||||
|
# assert isinstance(payload, Struct)
|
||||||
|
|
||||||
|
# # tag_field: str = type(payload).__name__
|
||||||
|
# payload = msgspec.Raw(plbytes)
|
||||||
|
|
||||||
|
# msg = Msg(
|
||||||
|
# cid=cid,
|
||||||
|
# pld=payload,
|
||||||
|
# # Header(
|
||||||
|
# # payload_tag=tag_field,
|
||||||
|
# # # dialog_id,
|
||||||
|
# # ),
|
||||||
|
# )
|
||||||
|
# return codec.enc.encode(msg)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: sub-decoded `Raw` fields?
|
||||||
|
# -[ ] see `MsgCodec._payload_decs` notes
|
||||||
#
|
#
|
||||||
# -[x] do we still want to try and support the sub-decoder with
|
# XXX if we wanted something more complex then field name str-keys
|
||||||
# `.Raw` technique in the case that the `Generic` approach gives
|
# we might need a header field type to describe the lookup sys?
|
||||||
# future grief?
|
# class Header(Struct, tag=True):
|
||||||
# => NO, since we went with the `PldRx` approach instead B)
|
# '''
|
||||||
#
|
# A msg header which defines payload properties
|
||||||
# IF however you want to see the code that was staged for this
|
|
||||||
# from wayyy back, see the pure removal commit.
|
# '''
|
||||||
|
# payload_tag: str|None = None
|
||||||
|
|
||||||
|
|
||||||
|
#def mk_tagged_union_dec(
|
||||||
|
# tagged_structs: list[Struct],
|
||||||
|
|
||||||
|
#) -> tuple[
|
||||||
|
# list[str],
|
||||||
|
# msgpack.Decoder,
|
||||||
|
#]:
|
||||||
|
# '''
|
||||||
|
# Create a `msgpack.Decoder` for an input `list[msgspec.Struct]`
|
||||||
|
# and return a `list[str]` of each struct's `tag_field: str` value
|
||||||
|
# which can be used to "map to" the initialized dec.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# # See "tagged unions" docs:
|
||||||
|
# # https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||||
|
|
||||||
|
# # "The quickest way to enable tagged unions is to set tag=True when
|
||||||
|
# # defining every struct type in the union. In this case tag_field
|
||||||
|
# # defaults to "type", and tag defaults to the struct class name
|
||||||
|
# # (e.g. "Get")."
|
||||||
|
# first: Struct = tagged_structs[0]
|
||||||
|
# types_union: Union[Type[Struct]] = Union[
|
||||||
|
# first
|
||||||
|
# ]|Any
|
||||||
|
# tags: list[str] = [first.__name__]
|
||||||
|
|
||||||
|
# for struct in tagged_structs[1:]:
|
||||||
|
# types_union |= struct
|
||||||
|
# tags.append(
|
||||||
|
# getattr(
|
||||||
|
# struct,
|
||||||
|
# struct.__struct_config__.tag_field,
|
||||||
|
# struct.__name__,
|
||||||
|
# )
|
||||||
|
# )
|
||||||
|
|
||||||
|
# dec = msgpack.Decoder(types_union)
|
||||||
|
# return (
|
||||||
|
# tags,
|
||||||
|
# dec,
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
def mk_codec(
|
def mk_codec(
|
||||||
|
@ -496,6 +644,10 @@ _def_tractor_codec: MsgCodec = mk_codec(
|
||||||
# 3. We similarly set the pending values for the child nurseries
|
# 3. We similarly set the pending values for the child nurseries
|
||||||
# of the *current* task.
|
# of the *current* task.
|
||||||
#
|
#
|
||||||
|
|
||||||
|
# TODO: STOP USING THIS, since it's basically a global and won't
|
||||||
|
# allow sub-IPC-ctxs to limit the msg-spec however desired..
|
||||||
|
# _ctxvar_MsgCodec: MsgCodec = RunVar(
|
||||||
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
|
||||||
'msgspec_codec',
|
'msgspec_codec',
|
||||||
default=_def_tractor_codec,
|
default=_def_tractor_codec,
|
||||||
|
@ -630,31 +782,3 @@ def limit_msg_spec(
|
||||||
# # import pdbp; pdbp.set_trace()
|
# # import pdbp; pdbp.set_trace()
|
||||||
# assert ext_codec.pld_spec == extended_spec
|
# assert ext_codec.pld_spec == extended_spec
|
||||||
# yield ext_codec
|
# yield ext_codec
|
||||||
|
|
||||||
|
|
||||||
# TODO: make something similar to this inside `._codec` such that
|
|
||||||
# user can just pass a type table of some sort?
|
|
||||||
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
|
|
||||||
# and then call `.to_dict()` on them?
|
|
||||||
# -[x] we're going to need to re-impl all the stuff changed in the
|
|
||||||
# runtime port such that it can handle dicts or `Msg`s?
|
|
||||||
#
|
|
||||||
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
|
|
||||||
# '''
|
|
||||||
# Deliver a `enc_hook()`/`dec_hook()` pair which does
|
|
||||||
# manual convertion from our above native `Msg` set
|
|
||||||
# to `dict` equivalent (wire msgs) in order to keep legacy compat
|
|
||||||
# with the original runtime implementation.
|
|
||||||
#
|
|
||||||
# Note: this is is/was primarly used while moving the core
|
|
||||||
# runtime over to using native `Msg`-struct types wherein we
|
|
||||||
# start with the send side emitting without loading
|
|
||||||
# a typed-decoder and then later flipping the switch over to
|
|
||||||
# load to the native struct types once all runtime usage has
|
|
||||||
# been adjusted appropriately.
|
|
||||||
#
|
|
||||||
# '''
|
|
||||||
# return (
|
|
||||||
# # enc_to_dict,
|
|
||||||
# dec_from_dict,
|
|
||||||
# )
|
|
||||||
|
|
|
@ -26,6 +26,7 @@ from __future__ import annotations
|
||||||
import types
|
import types
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
|
# Callable,
|
||||||
Generic,
|
Generic,
|
||||||
Literal,
|
Literal,
|
||||||
Type,
|
Type,
|
||||||
|
@ -160,6 +161,7 @@ class SpawnSpec(
|
||||||
bind_addrs: list[tuple[str, int]]
|
bind_addrs: list[tuple[str, int]]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: caps based RPC support in the payload?
|
# TODO: caps based RPC support in the payload?
|
||||||
#
|
#
|
||||||
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
|
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
|
||||||
|
@ -312,9 +314,8 @@ class Started(
|
||||||
pld: PayloadT|Raw
|
pld: PayloadT|Raw
|
||||||
|
|
||||||
|
|
||||||
# TODO: cancel request dedicated msg?
|
# TODO: instead of using our existing `Start`
|
||||||
# -[ ] instead of using our existing `Start`?
|
# for this (as we did with the original `{'cmd': ..}` style)
|
||||||
#
|
|
||||||
# class Cancel:
|
# class Cancel:
|
||||||
# cid: str
|
# cid: str
|
||||||
|
|
||||||
|
@ -476,16 +477,12 @@ def from_dict_msg(
|
||||||
)
|
)
|
||||||
return msgT(**dict_msg)
|
return msgT(**dict_msg)
|
||||||
|
|
||||||
# TODO: should be make a set of cancel msgs?
|
# TODO: should be make a msg version of `ContextCancelled?`
|
||||||
# -[ ] a version of `ContextCancelled`?
|
# and/or with a scope field or a full `ActorCancelled`?
|
||||||
# |_ and/or with a scope field?
|
|
||||||
# -[ ] or, a full `ActorCancelled`?
|
|
||||||
#
|
|
||||||
# class Cancelled(MsgType):
|
# class Cancelled(MsgType):
|
||||||
# cid: str
|
# cid: str
|
||||||
#
|
|
||||||
# -[ ] what about overruns?
|
# TODO what about overruns?
|
||||||
#
|
|
||||||
# class Overrun(MsgType):
|
# class Overrun(MsgType):
|
||||||
# cid: str
|
# cid: str
|
||||||
|
|
||||||
|
@ -567,17 +564,10 @@ def mk_msg_spec(
|
||||||
Create a payload-(data-)type-parameterized IPC message specification.
|
Create a payload-(data-)type-parameterized IPC message specification.
|
||||||
|
|
||||||
Allows generating IPC msg types from the above builtin set
|
Allows generating IPC msg types from the above builtin set
|
||||||
with a payload (field) restricted data-type, the `Msg.pld: PayloadT`.
|
with a payload (field) restricted data-type via the `Msg.pld:
|
||||||
|
PayloadT` type var. This allows runtime-task contexts to use
|
||||||
This allows runtime-task contexts to use the python type system
|
the python type system to limit/filter payload values as
|
||||||
to limit/filter payload values as determined by the input
|
determined by the input `payload_type_union: Union[Type]`.
|
||||||
`payload_type_union: Union[Type]`.
|
|
||||||
|
|
||||||
Notes: originally multiple approaches for constructing the
|
|
||||||
type-union passed to `msgspec` were attempted as selected via the
|
|
||||||
`spec_build_method`, but it turns out only the defaul method
|
|
||||||
'indexed_generics' seems to work reliably in all use cases. As
|
|
||||||
such, the others will likely be removed in the near future.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
submsg_types: list[MsgType] = Msg.__subclasses__()
|
submsg_types: list[MsgType] = Msg.__subclasses__()
|
||||||
|
@ -717,3 +707,31 @@ def mk_msg_spec(
|
||||||
+
|
+
|
||||||
ipc_msg_types,
|
ipc_msg_types,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: make something similar to this inside `._codec` such that
|
||||||
|
# user can just pass a type table of some sort?
|
||||||
|
# -[ ] we would need to decode all msgs to `pretty_struct.Struct`
|
||||||
|
# and then call `.to_dict()` on them?
|
||||||
|
# -[ ] we're going to need to re-impl all the stuff changed in the
|
||||||
|
# runtime port such that it can handle dicts or `Msg`s?
|
||||||
|
#
|
||||||
|
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
|
||||||
|
# '''
|
||||||
|
# Deliver a `enc_hook()`/`dec_hook()` pair which does
|
||||||
|
# manual convertion from our above native `Msg` set
|
||||||
|
# to `dict` equivalent (wire msgs) in order to keep legacy compat
|
||||||
|
# with the original runtime implementation.
|
||||||
|
#
|
||||||
|
# Note: this is is/was primarly used while moving the core
|
||||||
|
# runtime over to using native `Msg`-struct types wherein we
|
||||||
|
# start with the send side emitting without loading
|
||||||
|
# a typed-decoder and then later flipping the switch over to
|
||||||
|
# load to the native struct types once all runtime usage has
|
||||||
|
# been adjusted appropriately.
|
||||||
|
#
|
||||||
|
# '''
|
||||||
|
# return (
|
||||||
|
# # enc_to_dict,
|
||||||
|
# dec_from_dict,
|
||||||
|
# )
|
||||||
|
|
Loading…
Reference in New Issue