Move `Context.open_stream()` impl to `._streaming`

Exactly like how it's organized for `Portal.open_context()`, put the
main streaming API `@acm` with the `MsgStream` code and bind the method
to the new module func.

Other,
- rename `Context.result()` -> `.wait_for_result()` to better match the
  blocking semantics and rebind `.result()` as deprecated.
- add doc-str for `Context.maybe_raise()`.
runtime_to_msgspec
Tyler Goodlet 2024-05-31 17:32:11 -04:00
parent 21f633a900
commit f0342d6ae3
2 changed files with 248 additions and 208 deletions

View File

@ -86,7 +86,10 @@ from .msg import (
from ._ipc import ( from ._ipc import (
Channel, Channel,
) )
from ._streaming import MsgStream from ._streaming import (
MsgStream,
open_stream_from_ctx,
)
from ._state import ( from ._state import (
current_actor, current_actor,
debug_mode, debug_mode,
@ -978,198 +981,6 @@ class Context:
assert self._scope assert self._scope
self._scope.cancel() self._scope.cancel()
# TODO? should we move this to `._streaming` much like we
# moved `Portal.open_context()`'s def to this mod?
@acm
async def open_stream(
self,
allow_overruns: bool|None = False,
msg_buffer_size: int|None = None,
) -> AsyncGenerator[MsgStream, None]:
'''
Open a ``MsgStream``, a bi-directional stream connected to the
cross-actor (far end) task for this ``Context``.
This context manager must be entered on both the caller and
callee for the stream to logically be considered "connected".
A ``MsgStream`` is currently "one-shot" use, meaning if you
close it you can not "re-open" it for streaming and instead you
must re-establish a new surrounding ``Context`` using
``Portal.open_context()``. In the future this may change but
currently there seems to be no obvious reason to support
"re-opening":
- pausing a stream can be done with a message.
- task errors will normally require a restart of the entire
scope of the inter-actor task context due to the nature of
``trio``'s cancellation system.
'''
actor: Actor = self._actor
# If the surrounding context has been cancelled by some
# task with a handle to THIS, we error here immediately
# since it likely means the surrounding lexical-scope has
# errored, been `trio.Cancelled` or at the least
# `Context.cancel()` was called by some task.
if self._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if
# it's an expected `ContextCancelled` due to a local
# task having called `.cancel()`!
#
# WHY: we expect the error to always bubble up to the
# surrounding `Portal.open_context()` call and be
# absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already
# sent to the other side!
self.maybe_raise(
raise_ctxc_from_self_call=True,
)
# NOTE: this is diff then calling
# `._maybe_raise_remote_err()` specifically
# because we want to raise a ctxc on any task entering this `.open_stream()`
# AFTER cancellation was already been requested,
# we DO NOT want to absorb any ctxc ACK silently!
# if self._remote_error:
# raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded
# back from the other side (yet), we raise a different
# runtime error indicating that this task's usage of
# `Context.cancel()` and then `.open_stream()` is WRONG!
task: str = trio.lowlevel.current_task().name
raise RuntimeError(
'Stream opened after `Context.cancel()` called..?\n'
f'task: {actor.uid[0]}:{task}\n'
f'{self}'
)
if (
not self._portal
and not self._started_called
):
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the
# parent-ctx-task side (on the side that calls
# `Actor.start_remote_task()`) so if you try to send
# a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx: Context = actor.get_context(
chan=self.chan,
cid=self.cid,
nsf=self._nsf,
# side=self.side,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._allow_overruns: bool = allow_overruns
assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if ctx._rx_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!\n'
)
# NOTE: implicitly this will call `MsgStream.aclose()` on
# `.__aexit__()` due to stream's parent `Channel` type!
#
# XXX NOTE XXX: ensures the stream is "one-shot use",
# which specifically means that on exit,
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
# the far end indicating that the caller exited
# the streaming context purposefully by letting
# the exit block exec.
# - this is diff from the cancel/error case where
# a cancel request from this side or an error
# should be sent to the far end indicating the
# stream WAS NOT just closed normally/gracefully.
async with MsgStream(
ctx=self,
rx_chan=ctx._rx_chan,
) as stream:
# NOTE: we track all existing streams per portal for
# the purposes of attempting graceful closes on runtime
# cancel requests.
if self._portal:
self._portal._streams.add(stream)
try:
self._stream_opened: bool = True
self._stream = stream
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
#
# await maybe_wait_for_debugger()
# XXX TODO: pretty sure this isn't needed (see
# note above this block) AND will result in
# a double `.send_stop()` call. The only reason to
# put it here would be to due with "order" in
# terms of raising any remote error (as per
# directly below) or bc the stream's
# `.__aexit__()` block might not get run
# (doubtful)? Either way if we did put this back
# in we also need a state var to avoid the double
# stop-msg send..
#
# await stream.aclose()
# NOTE: absorb and do not raise any
# EoC received from the other side such that
# it is not raised inside the surrounding
# context block's scope!
except trio.EndOfChannel as eoc:
if (
eoc
and
stream.closed
):
# sanity, can remove?
assert eoc is stream._eoc
log.warning(
'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but
# does show txt followed by IPC msg.
f'{str(eoc)}\n'
)
finally:
if self._portal:
try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
# TODO: replace all the `._maybe_raise_remote_err()` usage # TODO: replace all the `._maybe_raise_remote_err()` usage
# with instances of this!! # with instances of this!!
def maybe_raise( def maybe_raise(
@ -1178,6 +989,14 @@ class Context:
**kwargs, **kwargs,
) -> Exception|None: ) -> Exception|None:
'''
Check for for a remote error delivered by the runtime from
our peer (task); if set immediately raise.
This is a convenience wrapper for
`._maybe_raise_remote_err(self._remote_error)`.
'''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
if re := self._remote_error: if re := self._remote_error:
return self._maybe_raise_remote_err( return self._maybe_raise_remote_err(
@ -1290,8 +1109,7 @@ class Context:
raise remote_error raise remote_error
# TODO: change to `.wait_for_result()`? async def wait_for_result(
async def result(
self, self,
hide_tb: bool = True, hide_tb: bool = True,
@ -1380,18 +1198,27 @@ class Context:
(not self._cancel_called) (not self._cancel_called)
) )
) )
# TODO: eventually make `.outcome: Outcome` and thus return
# `self.outcome.unwrap()` here!
return self.outcome return self.outcome
# TODO: switch this with above! # TODO: switch this with above!
# -[ ] should be named `.wait_for_outcome()` and instead do # -[ ] should be named `.wait_for_outcome()` and instead do
# a `.outcome.Outcome.unwrap()` ? # a `.outcome.Outcome.unwrap()` ?
# #
# @property async def result(
# def result(self) -> Any|None: self,
# if self._final_result_is_set(): *args,
# return self._result **kwargs,
) -> Any|Exception:
# raise RuntimeError('No result is available!') log.warning(
'`Context.result()` is DEPRECATED!\n'
'Use `Context.[no]wait_for_result()` instead!\n'
)
return await self.wait_for_result(
*args,
**kwargs,
)
@property @property
def maybe_error(self) -> BaseException|None: def maybe_error(self) -> BaseException|None:
@ -1447,6 +1274,9 @@ class Context:
return self._result is not Unresolved return self._result is not Unresolved
# def get_result_nowait(self) -> Any|None: # def get_result_nowait(self) -> Any|None:
# def get_outcome_nowait(self) -> Any|None:
# def recv_result_nowait(self) -> Any|None:
# def receive_outcome_nowait(self) -> Any|None:
# TODO: use `outcome.Outcome` here instead? # TODO: use `outcome.Outcome` here instead?
@property @property
def outcome(self) -> ( def outcome(self) -> (
@ -1476,7 +1306,6 @@ class Context:
def has_outcome(self) -> bool: def has_outcome(self) -> bool:
return bool(self.maybe_error) or self._final_result_is_set() return bool(self.maybe_error) or self._final_result_is_set()
# @property
def repr_outcome( def repr_outcome(
self, self,
show_error_fields: bool = False, show_error_fields: bool = False,
@ -1498,7 +1327,8 @@ class Context:
# just deliver the type name. # just deliver the type name.
if ( if (
(reprol := getattr(merr, 'reprol', False)) (reprol := getattr(merr, 'reprol', False))
and show_error_fields and
show_error_fields
): ):
return reprol() return reprol()
@ -1515,10 +1345,6 @@ class Context:
repr(merr) repr(merr)
) )
# just the type name
# else: # but wen?
# return type(merr).__name__
# for all other errors show their regular output # for all other errors show their regular output
return ( return (
str(merr) str(merr)
@ -1572,7 +1398,7 @@ class Context:
_, # any non-unresolved value _, # any non-unresolved value
None, None,
) if self._final_result_is_set(): ) if self._final_result_is_set():
status = 'returned' status = 'result-returned'
# normal operation but still in a pre-`Return`-result # normal operation but still in a pre-`Return`-result
# dialog phase # dialog phase
@ -1940,6 +1766,11 @@ class Context:
# ow, indicate unable to deliver by default # ow, indicate unable to deliver by default
return False return False
# NOTE: similar to `Portal.open_context()`, this impl is found in
# the `._streaming`` mod to make reading/groking the details
# simpler code-org-wise.
open_stream = open_stream_from_ctx
# TODO: exception tb masking by using a manual # TODO: exception tb masking by using a manual
# `.__aexit__()`/.__aenter__()` pair on a type? # `.__aexit__()`/.__aenter__()` pair on a type?

View File

@ -26,6 +26,7 @@ import inspect
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
AsyncGenerator,
Callable, Callable,
AsyncIterator, AsyncIterator,
TYPE_CHECKING, TYPE_CHECKING,
@ -51,6 +52,7 @@ from tractor.msg import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from ._runtime import Actor
from ._context import Context from ._context import Context
from ._ipc import Channel from ._ipc import Channel
@ -550,6 +552,213 @@ class MsgStream(trio.abc.Channel):
# ... # ...
@acm
async def open_stream_from_ctx(
ctx: Context,
allow_overruns: bool|None = False,
msg_buffer_size: int|None = None,
) -> AsyncGenerator[MsgStream, None]:
'''
Open a `MsgStream`, a bi-directional msg transport dialog
connected to the cross-actor peer task for an IPC `Context`.
This context manager must be entered in both the "parent" (task
which entered `Portal.open_context()`) and "child" (RPC task
which is decorated by `@context`) tasks for the stream to
logically be considered "open"; if one side begins sending to an
un-opened peer, depending on policy config, msgs will either be
queued until the other side opens and/or a `StreamOverrun` will
(eventually) be raised.
------ - ------
Runtime semantics design:
A `MsgStream` session adheres to "one-shot use" semantics,
meaning if you close the scope it **can not** be "re-opened".
Instead you must re-establish a new surrounding RPC `Context`
(RTC: remote task context?) using `Portal.open_context()`.
In the future this *design choice* may need to be changed but
currently there seems to be no obvious reason to support such
semantics..
- "pausing a stream" can be supported with a message implemented
by the `tractor` application dev.
- any remote error will normally require a restart of the entire
`trio.Task`'s scope due to the nature of `trio`'s cancellation
(`CancelScope`) system and semantics (level triggered).
'''
actor: Actor = ctx._actor
# If the surrounding context has been cancelled by some
# task with a handle to THIS, we error here immediately
# since it likely means the surrounding lexical-scope has
# errored, been `trio.Cancelled` or at the least
# `Context.cancel()` was called by some task.
if ctx._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if
# it's an expected `ContextCancelled` due to a local
# task having called `.cancel()`!
#
# WHY: we expect the error to always bubble up to the
# surrounding `Portal.open_context()` call and be
# absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already
# sent to the other side!
ctx.maybe_raise(
raise_ctxc_from_self_call=True,
)
# NOTE: this is diff then calling
# `._maybe_raise_remote_err()` specifically
# because we want to raise a ctxc on any task entering this `.open_stream()`
# AFTER cancellation was already been requested,
# we DO NOT want to absorb any ctxc ACK silently!
# if ctx._remote_error:
# raise ctx._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded
# back from the other side (yet), we raise a different
# runtime error indicating that this task's usage of
# `Context.cancel()` and then `.open_stream()` is WRONG!
task: str = trio.lowlevel.current_task().name
raise RuntimeError(
'Stream opened after `Context.cancel()` called..?\n'
f'task: {actor.uid[0]}:{task}\n'
f'{ctx}'
)
if (
not ctx._portal
and not ctx._started_called
):
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the
# parent-ctx-task side (on the side that calls
# `Actor.start_remote_task()`) so if you try to send
# a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx: Context = actor.get_context(
chan=ctx.chan,
cid=ctx.cid,
nsf=ctx._nsf,
# side=ctx.side,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._allow_overruns: bool = allow_overruns
assert ctx is ctx
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if ctx._rx_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!\n'
)
# NOTE: implicitly this will call `MsgStream.aclose()` on
# `.__aexit__()` due to stream's parent `Channel` type!
#
# XXX NOTE XXX: ensures the stream is "one-shot use",
# which specifically means that on exit,
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
# the far end indicating that the caller exited
# the streaming context purposefully by letting
# the exit block exec.
# - this is diff from the cancel/error case where
# a cancel request from this side or an error
# should be sent to the far end indicating the
# stream WAS NOT just closed normally/gracefully.
async with MsgStream(
ctx=ctx,
rx_chan=ctx._rx_chan,
) as stream:
# NOTE: we track all existing streams per portal for
# the purposes of attempting graceful closes on runtime
# cancel requests.
if ctx._portal:
ctx._portal._streams.add(stream)
try:
ctx._stream_opened: bool = True
ctx._stream = stream
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._deliver_ctx_payload()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
#
# await maybe_wait_for_debugger()
# XXX TODO: pretty sure this isn't needed (see
# note above this block) AND will result in
# a double `.send_stop()` call. The only reason to
# put it here would be to due with "order" in
# terms of raising any remote error (as per
# directly below) or bc the stream's
# `.__aexit__()` block might not get run
# (doubtful)? Either way if we did put this back
# in we also need a state var to avoid the double
# stop-msg send..
#
# await stream.aclose()
# NOTE: absorb and do not raise any
# EoC received from the other side such that
# it is not raised inside the surrounding
# context block's scope!
except trio.EndOfChannel as eoc:
if (
eoc
and
stream.closed
):
# sanity, can remove?
assert eoc is stream._eoc
log.warning(
'Stream was terminated by EoC\n\n'
# NOTE: won't show the error <Type> but
# does show txt followed by IPC msg.
f'{str(eoc)}\n'
)
finally:
if ctx._portal:
try:
ctx._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {ctx.chan.uid}\n'
f'ctx id: {ctx.cid}'
)
def stream(func: Callable) -> Callable: def stream(func: Callable) -> Callable:
''' '''
Mark an async function as a streaming routine with ``@stream``. Mark an async function as a streaming routine with ``@stream``.