tractor/tractor/_context.py

1375 lines
52 KiB
Python

# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
The fundamental cross process SC abstraction: an inter-actor,
cancel-scope linked task "context".
A ``Context`` is very similar to the ``trio.Nursery.cancel_scope`` built
into each ``trio.Nursery`` except it links the lifetimes of memory space
disjoint, parallel executing tasks in separate actors.
'''
from __future__ import annotations
from collections import deque
from contextlib import asynccontextmanager as acm
from dataclasses import (
dataclass,
field,
)
from functools import partial
import inspect
from pprint import pformat
from typing import (
Any,
Callable,
AsyncGenerator,
TYPE_CHECKING,
)
import warnings
import trio
# from .devx import (
# maybe_wait_for_debugger,
# pause,
# )
from ._exceptions import (
# _raise_from_no_key_in_msg,
unpack_error,
pack_error,
ContextCancelled,
# MessagingError,
RemoteActorError,
StreamOverrun,
)
from .log import get_logger
from ._ipc import Channel
from ._streaming import MsgStream
from ._state import current_actor
if TYPE_CHECKING:
from ._portal import Portal
from ._runtime import Actor
log = get_logger(__name__)
async def _drain_to_final_msg(
ctx: Context,
) -> list[dict]:
# ) -> tuple[
# Any|Exception,
# list[dict],
# ]:
raise_overrun: bool = not ctx._allow_overruns
# wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit
# from the far end.
pre_result_drained: list[dict] = []
while not ctx._remote_error:
try:
# NOTE: this REPL usage actually works here dawg! Bo
# from .devx._debug import pause
# await pause()
# if re := ctx._remote_error:
# ctx._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# raise_overrun_from_self=raise_overrun,
# )
# TODO: bad idea?
# with trio.CancelScope() as res_cs:
# ctx._res_scope = res_cs
# msg: dict = await ctx._recv_chan.receive()
# if res_cs.cancelled_caught:
# from .devx._debug import pause
# await pause()
msg: dict = await ctx._recv_chan.receive()
ctx._result: Any = msg['return']
log.runtime(
'Context delivered final result msg:\n'
f'{pformat(msg)}'
)
pre_result_drained.append(msg)
# NOTE: we don't need to do this right?
# XXX: only close the rx mem chan AFTER
# a final result is retreived.
# if ctx._recv_chan:
# await ctx._recv_chan.aclose()
break
# NOTE: we get here if the far end was
# `ContextCancelled` in 2 cases:
# 1. we requested the cancellation and thus
# SHOULD NOT raise that far end error,
# 2. WE DID NOT REQUEST that cancel and thus
# SHOULD RAISE HERE!
except trio.Cancelled:
# CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is
# the source cause of this local task's
# cancellation.
if re := ctx._remote_error:
ctx._maybe_raise_remote_err(re)
# CASE 1: we DID request the cancel we simply
# continue to bubble up as normal.
raise
except KeyError:
if 'yield' in msg:
# far end task is still streaming to us so discard
log.warning(f'Discarding std "yield"\n{msg}')
pre_result_drained.append(msg)
continue
# TODO: work out edge cases here where
# a stream is open but the task also calls
# this?
# -[ ] should be a runtime error if a stream is open
# right?
elif 'stop' in msg:
log.cancel(
'Remote stream terminated due to "stop" msg:\n'
f'{msg}'
)
pre_result_drained.append(msg)
continue
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?"
)
# XXX fallthrough to handle expected error XXX
re: Exception|None = ctx._remote_error
if re:
log.critical(
'Remote ctx terminated due to "error" msg:\n'
f'{re}'
)
assert msg is ctx._cancel_msg
# NOTE: this solved a super dupe edge case XD
# this was THE super duper edge case of:
# - local task opens a remote task,
# - requests remote cancellation of far end
# ctx/tasks,
# - needs to wait for the cancel ack msg
# (ctxc) or some result in the race case
# where the other side's task returns
# before the cancel request msg is ever
# rxed and processed,
# - here this surrounding drain loop (which
# iterates all ipc msgs until the ack or
# an early result arrives) was NOT exiting
# since we are the edge case: local task
# does not re-raise any ctxc it receives
# IFF **it** was the cancellation
# requester..
# will raise if necessary, ow break from
# loop presuming any error terminates the
# context!
ctx._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=raise_overrun,
)
break # OOOOOF, yeah obvi we need this..
# XXX we should never really get here
# right! since `._deliver_msg()` should
# always have detected an {'error': ..}
# msg and already called this right!?!
elif error := unpack_error(
msg=msg,
chan=ctx._portal.channel,
hide_tb=False,
):
log.critical('SHOULD NEVER GET HERE!?')
assert msg is ctx._cancel_msg
assert error.msgdata == ctx._remote_error.msgdata
from .devx._debug import pause
await pause()
ctx._maybe_cancel_and_set_remote_error(error)
ctx._maybe_raise_remote_err(error)
else:
# bubble the original src key error
raise
return pre_result_drained
# TODO: make this a msgspec.Struct!
@dataclass
class Context:
'''
An inter-actor, SC transitive, `trio.Task` communication context.
NB: This class should **never be instatiated directly**, it is allocated
by the runtime in 2 ways:
- by entering ``Portal.open_context()`` which is the primary
public API for any "caller" task or,
- by the RPC machinery's `._runtime._invoke()` as a `ctx` arg
to a remotely scheduled "callee" function.
AND is always constructed using the below ``mk_context()``.
Allows maintaining task or protocol specific state between
2 cancel-scope-linked, communicating and parallel executing
`trio.Task`s. Contexts are allocated on each side of any task
RPC-linked msg dialog, i.e. for every request to a remote
actor from a `Portal`. On the "callee" side a context is
always allocated inside ``._runtime._invoke()``.
# TODO: more detailed writeup on cancellation, error and
# streaming semantics..
A context can be cancelled and (possibly eventually restarted) from
either side of the underlying IPC channel, it can also open task
oriented message streams, and acts more or less as an IPC aware
inter-actor-task ``trio.CancelScope``.
'''
chan: Channel
cid: str # "context id", more or less a unique linked-task-pair id
# the "feeder" channels for delivering message values to the
# local task from the runtime's msg processing loop.
_recv_chan: trio.MemoryReceiveChannel
_send_chan: trio.MemorySendChannel
# the "invocation type" of the far end task-entry-point
# function, normally matching a logic block inside
# `._runtime.invoke()`.
_remote_func_type: str | None = None
# NOTE: (for now) only set (a portal) on the caller side since
# the callee doesn't generally need a ref to one and should
# normally need to explicitly ask for handle to its peer if
# more the the `Context` is needed?
_portal: Portal | None = None
# NOTE: each side of the context has its own cancel scope
# which is exactly the primitive that allows for
# cross-actor-task-supervision and thus SC.
_scope: trio.CancelScope | None = None
# _res_scope: trio.CancelScope|None = None
# on a clean exit there should be a final value
# delivered from the far end "callee" task, so
# this value is only set on one side.
_result: Any | int = None
# if the local "caller" task errors this
# value is always set to the error that was
# captured in the `Portal.open_context().__aexit__()`
# teardown.
_local_error: BaseException | None = None
# if the either side gets an error from the other
# this value is set to that error unpacked from an
# IPC msg.
_remote_error: BaseException | None = None
# only set if the local task called `.cancel()`
_cancel_called: bool = False # did WE cancel the far end?
# TODO: do we even need this? we can assume that if we're
# cancelled that the other side is as well, so maybe we should
# instead just have a `.canceller` pulled from the
# `ContextCancelled`?
_canceller: tuple[str, str] | None = None
# NOTE: we try to ensure assignment of a "cancel msg" since
# there's always going to be an "underlying reason" that any
# context was closed due to either a remote side error or
# a call to `.cancel()` which triggers `ContextCancelled`.
_cancel_msg: str | dict | None = None
# NOTE: this state var used by the runtime to determine if the
# `pdbp` REPL is allowed to engage on contexts terminated via
# a `ContextCancelled` due to a call to `.cancel()` triggering
# "graceful closure" on either side:
# - `._runtime._invoke()` will check this flag before engaging
# the crash handler REPL in such cases where the "callee"
# raises the cancellation,
# - `.devx._debug.lock_tty_for_child()` will set it to `False` if
# the global tty-lock has been configured to filter out some
# actors from being able to acquire the debugger lock.
_enter_debugger_on_cancel: bool = True
@property
def cancel_called(self) -> bool:
'''
Records whether cancellation has been requested for this context
by either an explicit call to ``.cancel()`` or an implicit call
due to an error caught inside the ``Portal.open_context()``
block.
'''
return self._cancel_called
@property
def canceller(self) -> tuple[str, str] | None:
'''
``Actor.uid: tuple[str, str]`` of the (remote)
actor-process who's task was cancelled thus causing this
(side of the) context to also be cancelled.
'''
return self._canceller
@property
def cancelled_caught(self) -> bool:
return (
# the local scope was cancelled either by
# remote error or self-request
self._scope.cancelled_caught
# the local scope was never cancelled
# and instead likely we received a remote side
# cancellation that was raised inside `.result()`
or (
(se := self._local_error)
and
isinstance(se, ContextCancelled)
and (
se.canceller == self.canceller
or
se is self._remote_error
)
)
)
# @property
# def is_waiting_result(self) -> bool:
# return bool(self._res_scope)
@property
def side(self) -> str:
'''
Return string indicating which task this instance is wrapping.
'''
return 'caller' if self._portal else 'callee'
# init and streaming state
_started_called: bool = False
_stream_opened: bool = False
# overrun handling machinery
# NOTE: none of this provides "backpressure" to the remote
# task, only an ability to not lose messages when the local
# task is configured to NOT transmit ``StreamOverrun``s back
# to the other side.
_overflow_q: deque[dict] = field(
default_factory=partial(
deque,
maxlen=616,
)
)
_scope_nursery: trio.Nursery | None = None
_in_overrun: bool = False
_allow_overruns: bool = False
async def send_yield(
self,
data: Any,
) -> None:
warnings.warn(
"`Context.send_yield()` is now deprecated. "
"Use ``MessageStream.send()``. ",
DeprecationWarning,
stacklevel=2,
)
await self.chan.send({'yield': data, 'cid': self.cid})
async def send_stop(self) -> None:
# await pause()
await self.chan.send({
'stop': True,
'cid': self.cid
})
def _maybe_cancel_and_set_remote_error(
self,
error: BaseException,
) -> None:
'''
(Maybe) cancel this local scope due to a received remote
error (normally via an IPC msg) which the actor runtime
routes to this context.
Acts as a form of "relay" for a remote error raised in the
corresponding remote task's `Context` wherein the next time
the local task exectutes a checkpoint, a `trio.Cancelled`
will be raised and depending on the type and source of the
original remote error, and whether or not the local task
called `.cancel()` itself prior, an equivalent
`ContextCancelled` or `RemoteActorError` wrapping the
remote error may be raised here by any of,
- `Portal.open_context()`
- `Portal.result()`
- `Context.open_stream()`
- `Context.result()`
when called/closed by actor local task(s).
NOTEs & TODOs:
- It is expected that the caller has previously unwrapped
the remote error using a call to `unpack_error()` and
provides that output exception value as the input
`error` argument here.
- If this is an error message from a context opened by
`Portal.open_context()` we want to interrupt any
ongoing local tasks operating within that `Context`'s
cancel-scope so as to be notified ASAP of the remote
error and engage any caller handling (eg. for
cross-process task supervision).
- In some cases we may want to raise the remote error
immediately since there is no guarantee the locally
operating task(s) will attempt to execute a checkpoint
any time soon; in such cases there are 2 possible
approaches depending on the current task's work and
wrapping "thread" type:
- `trio`-native-and-graceful: only ever wait for tasks
to exec a next `trio.lowlevel.checkpoint()` assuming
that any such task must do so to interact with the
actor runtime and IPC interfaces.
- (NOT IMPLEMENTED) system-level-aggressive: maybe we
could eventually interrupt sync code (invoked using
`trio.to_thread` or some other adapter layer) with
a signal (a custom unix one for example?
https://stackoverflow.com/a/5744185) depending on the
task's wrapping thread-type such that long running
sync code should never cause the delay of actor
supervision tasks such as cancellation and respawn
logic.
'''
# XXX: currently this should only be used when
# `Portal.open_context()` has been opened since it's
# assumed that other portal APIs like,
# - `Portal.run()`,
# - `ActorNursery.run_in_actor()`
# do their own error checking at their own call points and
# result processing.
# XXX: set the remote side's error so that after we cancel
# whatever task is the opener of this context it can raise
# that error as the reason.
# if self._remote_error:
# return
# breakpoint()
log.cancel(
'Setting remote error for ctx \n'
f'<= remote ctx uid: {self.chan.uid}\n'
f'=>\n{error}'
)
self._remote_error: BaseException = error
if (
isinstance(error, ContextCancelled)
):
log.cancel(
'Remote task-context was cancelled for '
f'actor: {self.chan.uid}\n'
f'task: {self.cid}\n'
f'canceller: {error.canceller}\n'
)
# always record the cancelling actor's uid since its cancellation
# state is linked and we want to know which process was
# the cause / requester of the cancellation.
# if error.canceller is None:
# import pdbp; pdbp.set_trace()
# breakpoint()
self._canceller = error.canceller
if self._cancel_called:
# this is an expected cancel request response message
# and we **don't need to raise it** in local cancel
# scope since it will potentially override a real error.
return
else:
log.error(
f'Remote context error:\n'
f'{error}\n'
f'{pformat(self)}\n'
# f'remote actor: {self.chan.uid}\n'
# f'cid: {self.cid}\n'
)
self._canceller = self.chan.uid
# TODO: tempted to **not** do this by-reraising in a
# nursery and instead cancel a surrounding scope, detect
# the cancellation, then lookup the error that was set?
# YES! this is way better and simpler!
cs: trio.CancelScope = self._scope
if (
cs
and not cs.cancel_called
and not cs.cancelled_caught
):
# TODO: we can for sure drop this right?
# from trio.testing import wait_all_tasks_blocked
# await wait_all_tasks_blocked()
# TODO: it'd sure be handy to inject our own
# `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368
self._scope.cancel()
# NOTE: this REPL usage actually works here dawg! Bo
# await pause()
# TODO: maybe we have to use `._res_scope.cancel()` if it
# exists?
async def cancel(
self,
timeout: float = 0.616,
) -> None:
'''
Cancel this inter-actor-task context.
Request that the far side cancel it's current linked context,
Timeout quickly in an attempt to sidestep 2-generals...
'''
side: str = self.side
log.cancel(
f'Cancelling {side} side of context to {self.chan.uid}'
)
# await pause()
self._cancel_called: bool = True
# caller side who entered `Portal.open_context()`
# NOTE: on the call side we never manually call
# `._scope.cancel()` since we expect the eventual
# `ContextCancelled` from the other side to trigger this
# when the runtime finally receives it during teardown
# (normally in `.result()` called from
# `Portal.open_context().__aexit__()`)
if side == 'caller':
if not self._portal:
raise RuntimeError(
"No portal found, this is likely a callee side context"
)
cid: str = self.cid
with trio.move_on_after(timeout) as cs:
cs.shield = True
log.cancel(
f'Cancelling stream {cid} to '
f'{self._portal.channel.uid}'
)
# NOTE: we're telling the far end actor to cancel a task
# corresponding to *this actor*. The far end local channel
# instance is passed to `Actor._cancel_task()` implicitly.
await self._portal.run_from_ns(
'self',
'_cancel_task',
cid=cid,
)
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
# cancelled in the case where the connection is broken or
# some other network error occurred.
# if not self._portal.channel.connected():
if not self.chan.connected():
log.cancel(
'May have failed to cancel remote task '
f'{cid} for {self._portal.channel.uid}'
)
else:
log.cancel(
'Timed out on cancel request of remote task '
f'{cid} for {self._portal.channel.uid}'
)
# callee side remote task
# NOTE: on this side we ALWAYS cancel the local scope since
# the caller expects a `ContextCancelled` to be sent from
# `._runtime._invoke()` back to the other side.
else:
# TODO: should we have an explicit cancel message
# or is relaying the local `trio.Cancelled` as an
# {'error': trio.Cancelled, cid: "blah"} enough?
# This probably gets into the discussion in
# https://github.com/goodboy/tractor/issues/36
assert self._scope
self._scope.cancel()
@acm
async def open_stream(
self,
allow_overruns: bool | None = False,
msg_buffer_size: int | None = None,
) -> AsyncGenerator[MsgStream, None]:
'''
Open a ``MsgStream``, a bi-directional stream connected to the
cross-actor (far end) task for this ``Context``.
This context manager must be entered on both the caller and
callee for the stream to logically be considered "connected".
A ``MsgStream`` is currently "one-shot" use, meaning if you
close it you can not "re-open" it for streaming and instead you
must re-establish a new surrounding ``Context`` using
``Portal.open_context()``. In the future this may change but
currently there seems to be no obvious reason to support
"re-opening":
- pausing a stream can be done with a message.
- task errors will normally require a restart of the entire
scope of the inter-actor task context due to the nature of
``trio``'s cancellation system.
'''
actor: Actor = current_actor()
# If the surrounding context has been cancelled by some
# task with a handle to THIS, we error here immediately
# since it likely means the surrounding lexical-scope has
# errored, been `trio.Cancelled` or at the least
# `Context.cancel()` was called by some task.
if self._cancel_called:
# XXX NOTE: ALWAYS RAISE any remote error here even if
# it's an expected `ContextCancelled` due to a local
# task having called `.cancel()`!
#
# WHY: we expect the error to always bubble up to the
# surrounding `Portal.open_context()` call and be
# absorbed there (silently) and we DO NOT want to
# actually try to stream - a cancel msg was already
# sent to the other side!
if self._remote_error:
# NOTE: this is diff then calling
# `._maybe_raise_from_remote_msg()` specifically
# because any task entering this `.open_stream()`
# AFTER cancellation has already been requested,
# we DO NOT want to absorb any ctxc ACK silently!
raise self._remote_error
# XXX NOTE: if no `ContextCancelled` has been responded
# back from the other side (yet), we raise a different
# runtime error indicating that this task's usage of
# `Context.cancel()` and then `.open_stream()` is WRONG!
task: str = trio.lowlevel.current_task().name
raise RuntimeError(
'Stream opened after `Context.cancel()` called..?\n'
f'task: {actor.uid[0]}:{task}\n'
f'{self}'
)
if (
not self._portal
and not self._started_called
):
raise RuntimeError(
'Context.started()` must be called before opening a stream'
)
# NOTE: in one way streaming this only happens on the
# caller side inside `Actor.start_remote_task()` so if you try
# to send a stop from the caller to the callee in the
# single-direction-stream case you'll get a lookup error
# currently.
ctx: Context = actor.get_context(
self.chan,
self.cid,
msg_buffer_size=msg_buffer_size,
allow_overruns=allow_overruns,
)
ctx._allow_overruns: bool = allow_overruns
assert ctx is self
# XXX: If the underlying channel feeder receive mem chan has
# been closed then likely client code has already exited
# a ``.open_stream()`` block prior or there was some other
# unanticipated error or cancellation from ``trio``.
if ctx._recv_chan._closed:
raise trio.ClosedResourceError(
'The underlying channel for this stream was already closed!?'
)
# NOTE: implicitly this will call `MsgStream.aclose()` on
# `.__aexit__()` due to stream's parent `Channel` type!
#
# XXX NOTE XXX: ensures the stream is "one-shot use",
# which specifically means that on exit,
# - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
# the far end indicating that the caller exited
# the streaming context purposefully by letting
# the exit block exec.
# - this is diff from the cancel/error case where
# a cancel request from this side or an error
# should be sent to the far end indicating the
# stream WAS NOT just closed normally/gracefully.
async with MsgStream(
ctx=self,
rx_chan=ctx._recv_chan,
) as stream:
# NOTE: we track all existing streams per portal for
# the purposes of attempting graceful closes on runtime
# cancel requests.
if self._portal:
self._portal._streams.add(stream)
try:
self._stream_opened: bool = True
# XXX: do we need this?
# ensure we aren't cancelled before yielding the stream
# await trio.lowlevel.checkpoint()
yield stream
# XXX: (MEGA IMPORTANT) if this is a root opened process we
# wait for any immediate child in debug before popping the
# context from the runtime msg loop otherwise inside
# ``Actor._push_result()`` the msg will be discarded and in
# the case where that msg is global debugger unlock (via
# a "stop" msg for a stream), this can result in a deadlock
# where the root is waiting on the lock to clear but the
# child has already cleared it and clobbered IPC.
#
# await maybe_wait_for_debugger()
# XXX TODO: pretty sure this isn't needed (see
# note above this block) AND will result in
# a double `.send_stop()` call. The only reason to
# put it here would be to due with "order" in
# terms of raising any remote error (as per
# directly below) or bc the stream's
# `.__aexit__()` block might not get run
# (doubtful)? Either way if we did put this back
# in we also need a state var to avoid the double
# stop-msg send..
#
# await stream.aclose()
# if re := ctx._remote_error:
# ctx._maybe_raise_remote_err(
# re,
# raise_ctxc_from_self_call=True,
# )
# await trio.lowlevel.checkpoint()
finally:
if self._portal:
try:
self._portal._streams.remove(stream)
except KeyError:
log.warning(
f'Stream was already destroyed?\n'
f'actor: {self.chan.uid}\n'
f'ctx id: {self.cid}'
)
def _maybe_raise_remote_err(
self,
err: Exception,
raise_ctxc_from_self_call: bool = False,
raise_overrun_from_self: bool = True,
) -> ContextCancelled|None:
'''
Maybe raise a remote error depending on who (which task from
which actor) requested a cancellation (if any).
'''
# NOTE: whenever the context's "opener" side (task) **is**
# the side which requested the cancellation (likekly via
# ``Context.cancel()``), we don't want to re-raise that
# cancellation signal locally (would be akin to
# a ``trio.Nursery`` nursery raising ``trio.Cancelled``
# whenever ``CancelScope.cancel()`` was called) and
# instead silently reap the expected cancellation
# "error"-msg.
our_uid: tuple[str, str] = current_actor().uid
if (
(not raise_ctxc_from_self_call
and isinstance(err, ContextCancelled)
and (
self._cancel_called
or self.chan._cancel_called
or self.canceller == our_uid
or tuple(err.canceller) == our_uid)
)
or
(not raise_overrun_from_self
and isinstance(err, RemoteActorError)
and err.msgdata['type_str'] == 'StreamOverrun'
and tuple(err.msgdata['sender']) == our_uid
)
):
# NOTE: we set the local scope error to any "self
# cancellation" error-response thus "absorbing"
# the error silently B)
if self._local_error is None:
self._local_error = err
return err
# NOTE: currently we are masking underlying runtime errors
# which are often superfluous to user handler code. not
# sure if this is still needed / desired for all operation?
# TODO: maybe we can only NOT mask if:
# - [ ] debug mode is enabled or,
# - [ ] a certain log level is set?
# - [ ] consider using `.with_traceback()` to filter out
# runtime frames from the tb explicitly?
# https://docs.python.org/3/reference/simple_stmts.html#the-raise-statement
# https://stackoverflow.com/a/24752607
# __tracebackhide__: bool = True
raise err from None
async def result(self) -> Any | Exception:
'''
From some (caller) side task, wait for and return the final
result from the remote (callee) side's task.
This provides a mechanism for one task running in some actor to wait
on another task at the other side, in some other actor, to terminate.
If the remote task is still in a streaming state (it is delivering
values from inside a ``Context.open_stream():`` block, then those
msgs are drained but discarded since it is presumed this side of
the context has already finished with its own streaming logic.
If the remote context (or its containing actor runtime) was
canceled, either by a local task calling one of
``Context.cancel()`` or `Portal.cancel_actor()``, we ignore the
received ``ContextCancelled`` exception if the context or
underlying IPC channel is marked as having been "cancel called".
This is similar behavior to using ``trio.Nursery.cancel()``
wherein tasks which raise ``trio.Cancel`` are silently reaped;
the main different in this API is in the "cancel called" case,
instead of just not raising, we also return the exception *as
the result* since client code may be interested in the details
of the remote cancellation.
'''
assert self._portal, "Context.result() can not be called from callee!"
assert self._recv_chan
raise_overrun: bool = not self._allow_overruns
# if re := self._remote_error:
# return self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# raise_overrun_from_self=raise_overrun,
# )
res_placeholder: int = id(self)
if (
self._result == res_placeholder
and not self._remote_error
and not self._recv_chan._closed # type: ignore
):
# wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit
# from the far end.
drained_msgs: list[dict] = await _drain_to_final_msg(ctx=self)
log.runtime(
'Ctx drained pre-result msgs:\n'
f'{drained_msgs}'
)
# TODO: implement via helper func ^^^^
# pre_result_drained: list[dict] = []
# while not self._remote_error:
# try:
# # NOTE: this REPL usage actually works here dawg! Bo
# # from .devx._debug import pause
# # await pause()
# # if re := self._remote_error:
# # self._maybe_raise_remote_err(
# # re,
# # # NOTE: obvi we don't care if we
# # # overran the far end if we're already
# # # waiting on a final result (msg).
# # raise_overrun_from_self=raise_overrun,
# # )
# # TODO: bad idea?
# # with trio.CancelScope() as res_cs:
# # self._res_scope = res_cs
# # msg: dict = await self._recv_chan.receive()
# # if res_cs.cancelled_caught:
# # from .devx._debug import pause
# # await pause()
# msg: dict = await self._recv_chan.receive()
# self._result: Any = msg['return']
# log.runtime(
# 'Context delivered final result msg:\n'
# f'{pformat(msg)}'
# )
# # NOTE: we don't need to do this right?
# # XXX: only close the rx mem chan AFTER
# # a final result is retreived.
# # if self._recv_chan:
# # await self._recv_chan.aclose()
# break
# # NOTE: we get here if the far end was
# # `ContextCancelled` in 2 cases:
# # 1. we requested the cancellation and thus
# # SHOULD NOT raise that far end error,
# # 2. WE DID NOT REQUEST that cancel and thus
# # SHOULD RAISE HERE!
# except trio.Cancelled:
# # CASE 2: mask the local cancelled-error(s)
# # only when we are sure the remote error is
# # the source cause of this local task's
# # cancellation.
# if re := self._remote_error:
# self._maybe_raise_remote_err(re)
# # CASE 1: we DID request the cancel we simply
# # continue to bubble up as normal.
# raise
# except KeyError:
# if 'yield' in msg:
# # far end task is still streaming to us so discard
# log.warning(f'Discarding std "yield"\n{msg}')
# pre_result_drained.append(msg)
# continue
# # TODO: work out edge cases here where
# # a stream is open but the task also calls
# # this?
# # -[ ] should be a runtime error if a stream is open
# # right?
# elif 'stop' in msg:
# log.cancel(
# 'Remote stream terminated due to "stop" msg:\n'
# f'{msg}'
# )
# pre_result_drained.append(msg)
# continue
# # internal error should never get here
# assert msg.get('cid'), (
# "Received internal error at portal?"
# )
# # XXX fallthrough to handle expected error XXX
# re: Exception|None = self._remote_error
# if re:
# log.critical(
# 'Remote ctx terminated due to "error" msg:\n'
# f'{re}'
# )
# assert msg is self._cancel_msg
# # NOTE: this solved a super dupe edge case XD
# # this was THE super duper edge case of:
# # - local task opens a remote task,
# # - requests remote cancellation of far end
# # ctx/tasks,
# # - needs to wait for the cancel ack msg
# # (ctxc) or some result in the race case
# # where the other side's task returns
# # before the cancel request msg is ever
# # rxed and processed,
# # - here this surrounding drain loop (which
# # iterates all ipc msgs until the ack or
# # an early result arrives) was NOT exiting
# # since we are the edge case: local task
# # does not re-raise any ctxc it receives
# # IFF **it** was the cancellation
# # requester..
# # will raise if necessary, ow break from
# # loop presuming any error terminates the
# # context!
# self._maybe_raise_remote_err(
# re,
# # NOTE: obvi we don't care if we
# # overran the far end if we're already
# # waiting on a final result (msg).
# # raise_overrun_from_self=False,
# raise_overrun_from_self=raise_overrun,
# )
# break # OOOOOF, yeah obvi we need this..
# # XXX we should never really get here
# # right! since `._deliver_msg()` should
# # always have detected an {'error': ..}
# # msg and already called this right!?!
# elif error := unpack_error(
# msg=msg,
# chan=self._portal.channel,
# hide_tb=False,
# ):
# log.critical('SHOULD NEVER GET HERE!?')
# assert msg is self._cancel_msg
# assert error.msgdata == self._remote_error.msgdata
# from .devx._debug import pause
# await pause()
# self._maybe_cancel_and_set_remote_error(error)
# self._maybe_raise_remote_err(error)
# else:
# # bubble the original src key error
# raise
if (
(re := self._remote_error)
and self._result == res_placeholder
):
maybe_err: Exception|None = self._maybe_raise_remote_err(
re,
# NOTE: obvi we don't care if we
# overran the far end if we're already
# waiting on a final result (msg).
# raise_overrun_from_self=False,
raise_overrun_from_self=(
raise_overrun
and
# only when we ARE NOT the canceller
# should we raise overruns, bc ow we're
# raising something we know might happen
# during cancellation ;)
(not self._cancel_called)
),
)
if maybe_err:
self._result = maybe_err
return self._result
async def started(
self,
value: Any | None = None
) -> None:
'''
Indicate to calling actor's task that this linked context
has started and send ``value`` to the other side via IPC.
On the calling side ``value`` is the second item delivered
in the tuple returned by ``Portal.open_context()``.
'''
if self._portal:
raise RuntimeError(
f'Caller side context {self} can not call started!'
)
elif self._started_called:
raise RuntimeError(
f'called `.started()` twice on context with {self.chan.uid}'
)
await self.chan.send({'started': value, 'cid': self.cid})
self._started_called = True
async def _drain_overflows(
self,
) -> None:
'''
Private task spawned to push newly received msgs to the local
task which getting overrun by the remote side.
In order to not block the rpc msg loop, but also not discard
msgs received in this context, we need to async push msgs in
a new task which only runs for as long as the local task is in
an overrun state.
'''
self._in_overrun = True
try:
while self._overflow_q:
# NOTE: these msgs should never be errors since we always do
# the check prior to checking if we're in an overrun state
# inside ``._deliver_msg()``.
msg = self._overflow_q.popleft()
try:
await self._send_chan.send(msg)
except trio.BrokenResourceError:
log.warning(
f"{self._send_chan} consumer is already closed"
)
return
except trio.Cancelled:
# we are obviously still in overrun
# but the context is being closed anyway
# so we just warn that there are un received
# msgs still..
self._overflow_q.appendleft(msg)
fmt_msgs = ''
for msg in self._overflow_q:
fmt_msgs += f'{pformat(msg)}\n'
log.warning(
f'Context for {self.cid} is being closed while '
'in an overrun state!\n'
'Discarding the following msgs:\n'
f'{fmt_msgs}\n'
)
raise
finally:
# task is now finished with the backlog so mark us as
# no longer in backlog.
self._in_overrun = False
async def _deliver_msg(
self,
msg: dict,
# draining: bool = False,
) -> bool:
'''
Deliver an IPC msg received from a transport-channel to
this context's underlying mem chan for handling by
user operating tasks; deliver a bool indicating whether the
msg was immediately sent.
If `._allow_overruns == True` (maybe) append the msg to an
"overflow queue" and start a "drainer task" (inside the
`._scope_nursery: trio.Nursery`) which ensures that such
messages are eventually sent if possible.
'''
cid: str = self.cid
chan: Channel = self.chan
from_uid: tuple[str, str] = chan.uid
send_chan: trio.MemorySendChannel = self._send_chan
if re := unpack_error(
msg,
self.chan,
):
log.error(
f'Delivering error-msg from {from_uid} to caller {cid}'
f'{re}'
)
self._cancel_msg = msg
self._maybe_cancel_and_set_remote_error(re)
# XXX NEVER do this XXX..!!
# bc if the error is a ctxc and there is a task
# waiting on `.result()` we need the msg to be sent
# over the `send_chan`/`._recv_chan` so that the error
# is relayed to that waiter task..
# return True
#
# XXX ALSO NO!! XXX
# if self._remote_error:
# self._maybe_raise_remote_err(error)
if self._in_overrun:
log.warning(
f'Capturing overrun-msg from {from_uid} to caller {cid}'
f'{msg}'
)
self._overflow_q.append(msg)
return False
try:
log.runtime(
f'Delivering IPC `Context` msg:\n'
f'<= {from_uid}\n'
f'=> caller: {cid}\n'
f'{msg}'
)
# from .devx._debug import pause
# await pause()
send_chan.send_nowait(msg)
return True
# if an error is deteced we should always
# expect it to be raised by any context (stream)
# consumer task
except trio.BrokenResourceError:
# TODO: what is the right way to handle the case where the
# local task has already sent a 'stop' / StopAsyncInteration
# to the other side but and possibly has closed the local
# feeder mem chan? Do we wait for some kind of ack or just
# let this fail silently and bubble up (currently)?
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed")
return False
# NOTE XXX: by default we do **not** maintain context-stream
# backpressure and instead opt to relay stream overrun errors to
# the sender; the main motivation is that using bp can block the
# msg handling loop which calls into this method!
except trio.WouldBlock:
# XXX: always push an error even if the local
# receiver is in overrun state.
# self._maybe_cancel_and_set_remote_error(msg)
local_uid = current_actor().uid
lines = [
f'OVERRUN on actor-task context {cid}@{local_uid}!\n'
# TODO: put remote task name here if possible?
f'sender: {from_uid}',
f'msg: {msg}',
# TODO: put task func name here and maybe an arrow
# from sender to overrunner?
# f'local task {self.func_name}'
]
if not self._stream_opened:
lines.insert(
1,
f'\n*** No stream open on `{local_uid[0]}` side! ***\n'
)
text = '\n'.join(lines)
# XXX: lul, this really can't be backpressure since any
# blocking here will block the entire msg loop rpc sched for
# a whole channel.. maybe we should rename it?
if self._allow_overruns:
text += f'\nStarting overflow queuing task on msg: {msg}'
log.warning(text)
if (
not self._in_overrun
):
self._overflow_q.append(msg)
n = self._scope_nursery
assert not n.child_tasks
try:
n.start_soon(
self._drain_overflows,
)
except RuntimeError:
# if the nursery is already cancelled due to
# this context exiting or in error, we ignore
# the nursery error since we never expected
# anything different.
return False
else:
# raise local overrun and immediately pack as IPC
# msg for far end.
try:
raise StreamOverrun(
text,
sender=from_uid,
)
except StreamOverrun as err:
err_msg: dict[str, dict] = pack_error(
err,
cid=cid,
)
# err_msg['cid']: str = cid
try:
await chan.send(err_msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
return False
def mk_context(
chan: Channel,
cid: str,
msg_buffer_size: int = 2**6,
**kwargs,
) -> Context:
'''
Internal factory to create an inter-actor task ``Context``.
This is called by internals and should generally never be called
by user code.
'''
send_chan: trio.MemorySendChannel
recv_chan: trio.MemoryReceiveChannel
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
ctx = Context(
chan,
cid,
_send_chan=send_chan,
_recv_chan=recv_chan,
**kwargs,
)
ctx._result: int | Any = id(ctx)
return ctx
def context(func: Callable) -> Callable:
'''
Mark an async function as a streaming routine with ``@context``.
'''
# TODO: apply whatever solution ``mypy`` ends up picking for this:
# https://github.com/python/mypy/issues/2087#issuecomment-769266912
func._tractor_context_function = True # type: ignore
sig = inspect.signature(func)
params = sig.parameters
if 'ctx' not in params:
raise TypeError(
"The first argument to the context function "
f"{func.__name__} must be `ctx: tractor.Context`"
)
return func