Remote `Context` cancellation semantics rework B)
This adds remote cancellation semantics to our `tractor.Context` machinery to more closely match that of `trio.CancelScope` but with operational differences to handle the nature of parallel tasks interoperating across multiple memory boundaries: - if an actor task cancels some context it has opened via `Context.cancel()`, the remote (scope linked) task will be cancelled using the normal `CancelScope` semantics of `trio` meaning the remote cancel scope surrounding the far side task is cancelled and `trio.Cancelled`s are expected to be raised in that scope as per normal `trio` operation, and in the case where no error is raised in that remote scope, a `ContextCancelled` error is raised inside the runtime machinery and relayed back to the opener/caller side of the context. - if any actor task cancels a full remote actor runtime using `Portal.cancel_actor()` the same semantics as above apply except every other remote actor task which also has an open context with the actor which was cancelled will also be sent a `ContextCancelled` **but** with the `.canceller` field set to the uid of the original cancel requesting actor. This changeset also includes a more "proper" solution to the issue of "allowing overruns" during streaming without attempting to implement any form of IPC streaming backpressure. Implementing task-granularity backpressure cross-process turns out to be more or less impossible without augmenting out streaming protocol (likely at the cost of performance). Further allowing overruns requires special care since any blocking of the runtime RPC msg loop task effectively can block control msgs such as cancels and stream terminations. The implementation details per abstraction layer are as follows. ._streaming.Context: - add a new contructor factor func `mk_context()` which provides a strictly private init-er whilst allowing us to not have to define an `.__init__()` on the type def. - add public `.cancel_called` and `.cancel_called_remote` properties. - general rename of what was the internal `._backpressure` var to `._allow_overruns: bool`. - move the old contents of `Actor._push_result()` into a new `._deliver_msg()` allowing for better encapsulation of per-ctx msg handling. - always check for received 'error' msgs and process them with the new `_maybe_cancel_and_set_remote_error()` **before** any msg delivery to the local task, thus guaranteeing error and cancellation handling despite any overflow handling. - add a new `._drain_overflows()` task-method for use with new `._allow_overruns: bool = True` mode. - add back a `._scope_nursery: trio.Nursery` (allocated in `Portal.open_context()`) who's sole purpose is to spawn a single task which runs the above method; anything else is an error. - augment `._deliver_msg()` to start a task and run the above method when operating in no overrun mode; the task queues overflow msgs and attempts to send them to the underlying mem chan using a blocking `.send()` call. - on context exit, any existing "drainer task" will be cancelled and remaining overflow queued msgs are discarded with a warning. - rename `._error` -> `_remote_error` and set it in a new method `_maybe_cancel_and_set_remote_error()` which is called before processing - adjust `.result()` to always call `._maybe_raise_remote_err()` at its start such that whenever a `ContextCancelled` arrives we do logic for whether or not to immediately raise that error or ignore it due to the current actor being the one who requested the cancel, by checking the error's `.canceller` field. - set the default value of `._result` to be `id(Context()` thus avoiding conflict with any `.result()` actually being `False`.. ._runtime.Actor: - augment `.cancel()` and `._cancel_task()` and `.cancel_rpc_tasks()` to take a `requesting_uid: tuple` indicating the source actor of every cancellation request. - pass through the new `Context._allow_overruns` through `.get_context()` - call the new `Context._deliver_msg()` from `._push_result()` (since the factoring out that method's contents). ._runtime._invoke: - `TastStatus.started()` back a `Context` (unless an error is raised) instead of the cancel scope to make it easy to set/get state on that context for the purposes of cancellation and remote error relay. - always raise any remote error via `Context._maybe_raise_remote_err()` before doing any `ContextCancelled` logic. - assign any `Context._cancel_called_remote` set by the `requesting_uid` cancel methods (mentioned above) to the `ContextCancelled.canceller`. ._runtime.process_messages: - always pass a `requesting_uid: tuple` to `Actor.cancel()` and `._cancel_task` to that any corresponding `ContextCancelled.canceller` can be set inside `._invoke()`.proper_breakpoint_hooking
parent
1ec30577de
commit
e97ed377b0
|
@ -103,7 +103,7 @@ class Portal:
|
||||||
# When set to a ``Context`` (when _submit_for_result is called)
|
# When set to a ``Context`` (when _submit_for_result is called)
|
||||||
# it is expected that ``result()`` will be awaited at some
|
# it is expected that ``result()`` will be awaited at some
|
||||||
# point.
|
# point.
|
||||||
self._expect_result: Optional[Context] = None
|
self._expect_result: Context | None = None
|
||||||
self._streams: set[MsgStream] = set()
|
self._streams: set[MsgStream] = set()
|
||||||
self.actor = current_actor()
|
self.actor = current_actor()
|
||||||
|
|
||||||
|
@ -209,7 +209,10 @@ class Portal:
|
||||||
try:
|
try:
|
||||||
# send cancel cmd - might not get response
|
# send cancel cmd - might not get response
|
||||||
# XXX: sure would be nice to make this work with a proper shield
|
# XXX: sure would be nice to make this work with a proper shield
|
||||||
with trio.move_on_after(timeout or self.cancel_timeout) as cs:
|
with trio.move_on_after(
|
||||||
|
timeout
|
||||||
|
or self.cancel_timeout
|
||||||
|
) as cs:
|
||||||
cs.shield = True
|
cs.shield = True
|
||||||
|
|
||||||
await self.run_from_ns('self', 'cancel')
|
await self.run_from_ns('self', 'cancel')
|
||||||
|
@ -330,7 +333,9 @@ class Portal:
|
||||||
f'{async_gen_func} must be an async generator function!')
|
f'{async_gen_func} must be an async generator function!')
|
||||||
|
|
||||||
fn_mod_path, fn_name = NamespacePath.from_ref(
|
fn_mod_path, fn_name = NamespacePath.from_ref(
|
||||||
async_gen_func).to_tuple()
|
async_gen_func
|
||||||
|
).to_tuple()
|
||||||
|
|
||||||
ctx = await self.actor.start_remote_task(
|
ctx = await self.actor.start_remote_task(
|
||||||
self.channel,
|
self.channel,
|
||||||
fn_mod_path,
|
fn_mod_path,
|
||||||
|
@ -396,13 +401,16 @@ class Portal:
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
f'{func} must be an async generator function!')
|
f'{func} must be an async generator function!')
|
||||||
|
|
||||||
|
# TODO: i think from here onward should probably
|
||||||
|
# just be factored into an `@acm` inside a new
|
||||||
|
# a new `_context.py` mod.
|
||||||
fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple()
|
fn_mod_path, fn_name = NamespacePath.from_ref(func).to_tuple()
|
||||||
|
|
||||||
ctx = await self.actor.start_remote_task(
|
ctx = await self.actor.start_remote_task(
|
||||||
self.channel,
|
self.channel,
|
||||||
fn_mod_path,
|
fn_mod_path,
|
||||||
fn_name,
|
fn_name,
|
||||||
kwargs
|
kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
assert ctx._remote_func_type == 'context'
|
assert ctx._remote_func_type == 'context'
|
||||||
|
@ -426,29 +434,47 @@ class Portal:
|
||||||
f' but received a non-error msg:\n{pformat(msg)}'
|
f' but received a non-error msg:\n{pformat(msg)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
_err: Optional[BaseException] = None
|
_err: BaseException | None = None
|
||||||
ctx._portal = self
|
ctx._portal: Portal = self
|
||||||
|
|
||||||
uid = self.channel.uid
|
uid: tuple = self.channel.uid
|
||||||
cid = ctx.cid
|
cid: str = ctx.cid
|
||||||
etype: Optional[Type[BaseException]] = None
|
etype: Type[BaseException] | None = None
|
||||||
|
|
||||||
# deliver context instance and .started() msg value in open tuple.
|
# deliver context instance and .started() msg value in enter
|
||||||
|
# tuple.
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as scope_nursery:
|
async with trio.open_nursery() as nurse:
|
||||||
ctx._scope_nursery = scope_nursery
|
ctx._scope_nursery = nurse
|
||||||
|
ctx._scope = nurse.cancel_scope
|
||||||
# do we need this?
|
|
||||||
# await trio.lowlevel.checkpoint()
|
|
||||||
|
|
||||||
yield ctx, first
|
yield ctx, first
|
||||||
|
|
||||||
|
# when in allow_ovveruns mode there may be lingering
|
||||||
|
# overflow sender tasks remaining?
|
||||||
|
if nurse.child_tasks:
|
||||||
|
# ensure we are in overrun state with
|
||||||
|
# ``._allow_overruns=True`` bc otherwise
|
||||||
|
# there should be no tasks in this nursery!
|
||||||
|
if (
|
||||||
|
not ctx._allow_overruns
|
||||||
|
or len(nurse.child_tasks) > 1
|
||||||
|
):
|
||||||
|
raise RuntimeError(
|
||||||
|
'Context has sub-tasks but is '
|
||||||
|
'not in `allow_overruns=True` Mode!?'
|
||||||
|
)
|
||||||
|
ctx._scope.cancel()
|
||||||
|
|
||||||
except ContextCancelled as err:
|
except ContextCancelled as err:
|
||||||
_err = err
|
_err = err
|
||||||
|
|
||||||
|
# swallow and mask cross-actor task context cancels that
|
||||||
|
# were initiated by *this* side's task.
|
||||||
if not ctx._cancel_called:
|
if not ctx._cancel_called:
|
||||||
# context was cancelled at the far end but was
|
# XXX: this should NEVER happen!
|
||||||
# not part of this end requesting that cancel
|
# from ._debug import breakpoint
|
||||||
# so raise for the local task to respond and handle.
|
# await breakpoint()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# if the context was cancelled by client code
|
# if the context was cancelled by client code
|
||||||
|
@ -468,17 +494,17 @@ class Portal:
|
||||||
|
|
||||||
) as err:
|
) as err:
|
||||||
etype = type(err)
|
etype = type(err)
|
||||||
# the context cancels itself on any cancel
|
|
||||||
# causing error.
|
|
||||||
|
|
||||||
if ctx.chan.connected():
|
# cancel ourselves on any error.
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Context cancelled for task, sending cancel request..\n'
|
'Context cancelled for task, sending cancel request..\n'
|
||||||
f'task:{cid}\n'
|
f'task:{cid}\n'
|
||||||
f'actor:{uid}'
|
f'actor:{uid}'
|
||||||
)
|
)
|
||||||
|
try:
|
||||||
|
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
else:
|
except trio.BrokenResourceError:
|
||||||
log.warning(
|
log.warning(
|
||||||
'IPC connection for context is broken?\n'
|
'IPC connection for context is broken?\n'
|
||||||
f'task:{cid}\n'
|
f'task:{cid}\n'
|
||||||
|
@ -487,12 +513,7 @@ class Portal:
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
else:
|
||||||
# in the case where a runtime nursery (due to internal bug)
|
|
||||||
# or a remote actor transmits an error we want to be
|
|
||||||
# sure we get the error the underlying feeder mem chan.
|
|
||||||
# if it's not raised here it *should* be raised from the
|
|
||||||
# msg loop nursery right?
|
|
||||||
if ctx.chan.connected():
|
if ctx.chan.connected():
|
||||||
log.info(
|
log.info(
|
||||||
'Waiting on final context-task result for\n'
|
'Waiting on final context-task result for\n'
|
||||||
|
@ -505,6 +526,7 @@ class Portal:
|
||||||
f'value from callee `{result}`'
|
f'value from callee `{result}`'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
finally:
|
||||||
# though it should be impossible for any tasks
|
# though it should be impossible for any tasks
|
||||||
# operating *in* this scope to have survived
|
# operating *in* this scope to have survived
|
||||||
# we tear down the runtime feeder chan last
|
# we tear down the runtime feeder chan last
|
||||||
|
|
|
@ -251,7 +251,9 @@ async def open_root_actor(
|
||||||
# tempn.start_soon(an.exited.wait)
|
# tempn.start_soon(an.exited.wait)
|
||||||
|
|
||||||
logger.cancel("Shutting down root actor")
|
logger.cancel("Shutting down root actor")
|
||||||
await actor.cancel()
|
await actor.cancel(
|
||||||
|
requesting_uid=actor.uid,
|
||||||
|
)
|
||||||
finally:
|
finally:
|
||||||
_state._current_actor = None
|
_state._current_actor = None
|
||||||
logger.runtime("Root actor terminated")
|
logger.runtime("Root actor terminated")
|
||||||
|
|
|
@ -28,9 +28,11 @@ import inspect
|
||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
from typing import (
|
from typing import (
|
||||||
Any, Optional,
|
Any,
|
||||||
Union, TYPE_CHECKING,
|
|
||||||
Callable,
|
Callable,
|
||||||
|
Optional,
|
||||||
|
Union,
|
||||||
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
import uuid
|
import uuid
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
@ -44,7 +46,10 @@ import trio # type: ignore
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._streaming import Context
|
from ._streaming import (
|
||||||
|
mk_context,
|
||||||
|
Context,
|
||||||
|
)
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
pack_error,
|
pack_error,
|
||||||
|
@ -53,7 +58,6 @@ from ._exceptions import (
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
StreamOverrun,
|
|
||||||
)
|
)
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._discovery import get_arbiter
|
from ._discovery import get_arbiter
|
||||||
|
@ -79,7 +83,7 @@ async def _invoke(
|
||||||
|
|
||||||
is_rpc: bool = True,
|
is_rpc: bool = True,
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
Union[trio.CancelScope, BaseException]
|
Union[Context, BaseException]
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -99,7 +103,14 @@ async def _invoke(
|
||||||
# activated cancel scope ref
|
# activated cancel scope ref
|
||||||
cs: Optional[trio.CancelScope] = None
|
cs: Optional[trio.CancelScope] = None
|
||||||
|
|
||||||
ctx = actor.get_context(chan, cid)
|
ctx = actor.get_context(
|
||||||
|
chan,
|
||||||
|
cid,
|
||||||
|
# We shouldn't ever need to pass this through right?
|
||||||
|
# it's up to the soon-to-be called rpc task to
|
||||||
|
# open the stream with this option.
|
||||||
|
# allow_overruns=True,
|
||||||
|
)
|
||||||
context: bool = False
|
context: bool = False
|
||||||
|
|
||||||
if getattr(func, '_tractor_stream_function', False):
|
if getattr(func, '_tractor_stream_function', False):
|
||||||
|
@ -138,7 +149,10 @@ async def _invoke(
|
||||||
):
|
):
|
||||||
raise TypeError(f'{func} must be an async function!')
|
raise TypeError(f'{func} must be an async function!')
|
||||||
|
|
||||||
coro = func(**kwargs)
|
try:
|
||||||
|
coro = func(**kwargs)
|
||||||
|
except TypeError:
|
||||||
|
raise
|
||||||
|
|
||||||
if inspect.isasyncgen(coro):
|
if inspect.isasyncgen(coro):
|
||||||
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
await chan.send({'functype': 'asyncgen', 'cid': cid})
|
||||||
|
@ -150,7 +164,8 @@ async def _invoke(
|
||||||
# of the async gen in order to be sure the cancel
|
# of the async gen in order to be sure the cancel
|
||||||
# is propagated!
|
# is propagated!
|
||||||
with cancel_scope as cs:
|
with cancel_scope as cs:
|
||||||
task_status.started(cs)
|
ctx._scope = cs
|
||||||
|
task_status.started(ctx)
|
||||||
async with aclosing(coro) as agen:
|
async with aclosing(coro) as agen:
|
||||||
async for item in agen:
|
async for item in agen:
|
||||||
# TODO: can we send values back in here?
|
# TODO: can we send values back in here?
|
||||||
|
@ -176,7 +191,8 @@ async def _invoke(
|
||||||
# manualy construct the response dict-packet-responses as
|
# manualy construct the response dict-packet-responses as
|
||||||
# above
|
# above
|
||||||
with cancel_scope as cs:
|
with cancel_scope as cs:
|
||||||
task_status.started(cs)
|
ctx._scope = cs
|
||||||
|
task_status.started(ctx)
|
||||||
await coro
|
await coro
|
||||||
|
|
||||||
if not cs.cancelled_caught:
|
if not cs.cancelled_caught:
|
||||||
|
@ -189,19 +205,25 @@ async def _invoke(
|
||||||
await chan.send({'functype': 'context', 'cid': cid})
|
await chan.send({'functype': 'context', 'cid': cid})
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as scope_nursery:
|
with cancel_scope as cs:
|
||||||
ctx._scope_nursery = scope_nursery
|
ctx._scope = cs
|
||||||
cs = scope_nursery.cancel_scope
|
task_status.started(ctx)
|
||||||
task_status.started(cs)
|
|
||||||
res = await coro
|
res = await coro
|
||||||
await chan.send({'return': res, 'cid': cid})
|
await chan.send({'return': res, 'cid': cid})
|
||||||
|
|
||||||
except BaseExceptionGroup:
|
# XXX: do we ever trigger this block any more?
|
||||||
|
except (
|
||||||
|
BaseExceptionGroup,
|
||||||
|
trio.Cancelled,
|
||||||
|
):
|
||||||
# if a context error was set then likely
|
# if a context error was set then likely
|
||||||
# thei multierror was raised due to that
|
# thei multierror was raised due to that
|
||||||
if ctx._error is not None:
|
if ctx._remote_error is not None:
|
||||||
raise ctx._error from None
|
raise ctx._remote_error
|
||||||
|
|
||||||
|
# maybe TODO: pack in ``trio.Cancelled.__traceback__`` here
|
||||||
|
# so they can be unwrapped and displayed on the caller
|
||||||
|
# side?
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
@ -213,7 +235,6 @@ async def _invoke(
|
||||||
# associated child isn't in debug any more
|
# associated child isn't in debug any more
|
||||||
await _debug.maybe_wait_for_debugger()
|
await _debug.maybe_wait_for_debugger()
|
||||||
ctx = actor._contexts.pop((chan.uid, cid))
|
ctx = actor._contexts.pop((chan.uid, cid))
|
||||||
|
|
||||||
if ctx:
|
if ctx:
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Context entrypoint {func} was terminated:\n{ctx}'
|
f'Context entrypoint {func} was terminated:\n{ctx}'
|
||||||
|
@ -221,32 +242,60 @@ async def _invoke(
|
||||||
|
|
||||||
assert cs
|
assert cs
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
# if 'brokerd.kraken' in actor.uid:
|
|
||||||
# await _debug.breakpoint()
|
|
||||||
|
|
||||||
# TODO: pack in ``trio.Cancelled.__traceback__`` here
|
# first check for and raise any remote error
|
||||||
# so they can be unwrapped and displayed on the caller
|
# before raising any context cancelled case
|
||||||
# side!
|
# so that real remote errors don't get masked as
|
||||||
|
# ``ContextCancelled``s.
|
||||||
|
re = ctx._remote_error
|
||||||
|
if re:
|
||||||
|
ctx._maybe_raise_remote_err(re)
|
||||||
|
|
||||||
fname = func.__name__
|
fname = func.__name__
|
||||||
if ctx._cancel_called:
|
if cs.cancel_called:
|
||||||
msg = f'`{fname}()`@{actor.uid} cancelled itself'
|
canceller = ctx._cancel_called_remote
|
||||||
|
# await _debug.breakpoint()
|
||||||
|
|
||||||
elif cs.cancel_called:
|
# NOTE / TODO: if we end up having
|
||||||
msg = (
|
# ``Actor._cancel_task()`` call
|
||||||
f'`{fname}()`@{actor.uid} was remotely cancelled by its caller '
|
# ``Context.cancel()`` directly, we're going to
|
||||||
f'{ctx.chan.uid}'
|
# need to change this logic branch since it will
|
||||||
|
# always enter..
|
||||||
|
if ctx._cancel_called:
|
||||||
|
msg = f'`{fname}()`@{actor.uid} cancelled itself'
|
||||||
|
|
||||||
|
else:
|
||||||
|
msg = (
|
||||||
|
f'`{fname}()`@{actor.uid} '
|
||||||
|
'was remotely cancelled by '
|
||||||
|
)
|
||||||
|
|
||||||
|
# if the channel which spawned the ctx is the
|
||||||
|
# one that cancelled it then we report that, vs.
|
||||||
|
# it being some other random actor that for ex.
|
||||||
|
# some actor who calls `Portal.cancel_actor()`
|
||||||
|
# and by side-effect cancels this ctx.
|
||||||
|
if canceller == ctx.chan.uid:
|
||||||
|
msg += f'its caller {canceller}'
|
||||||
|
else:
|
||||||
|
msg += f'remote actor {canceller}'
|
||||||
|
|
||||||
|
# TODO: does this ever get set any more or can
|
||||||
|
# we remove it?
|
||||||
|
if ctx._cancel_msg:
|
||||||
|
msg += f' with msg:\n{ctx._cancel_msg}'
|
||||||
|
|
||||||
|
# task-contex was either cancelled by request using
|
||||||
|
# ``Portal.cancel_actor()`` or ``Context.cancel()``
|
||||||
|
# on the far end, or it was cancelled by the local
|
||||||
|
# (callee) task, so relay this cancel signal to the
|
||||||
|
# other side.
|
||||||
|
raise ContextCancelled(
|
||||||
|
msg,
|
||||||
|
suberror_type=trio.Cancelled,
|
||||||
|
canceller=canceller,
|
||||||
)
|
)
|
||||||
|
|
||||||
if ctx._cancel_msg:
|
|
||||||
msg += f' with msg:\n{ctx._cancel_msg}'
|
|
||||||
|
|
||||||
# task-contex was cancelled so relay to the cancel to caller
|
|
||||||
raise ContextCancelled(
|
|
||||||
msg,
|
|
||||||
suberror_type=trio.Cancelled,
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# regular async function
|
# regular async function
|
||||||
try:
|
try:
|
||||||
|
@ -261,12 +310,17 @@ async def _invoke(
|
||||||
)
|
)
|
||||||
|
|
||||||
with cancel_scope as cs:
|
with cancel_scope as cs:
|
||||||
task_status.started(cs)
|
ctx._scope = cs
|
||||||
|
task_status.started(ctx)
|
||||||
result = await coro
|
result = await coro
|
||||||
log.cancel(f'result: {result}')
|
fname = func.__name__
|
||||||
|
log.runtime(f'{fname}() result: {result}')
|
||||||
if not failed_resp:
|
if not failed_resp:
|
||||||
# only send result if we know IPC isn't down
|
# only send result if we know IPC isn't down
|
||||||
await chan.send({'return': result, 'cid': cid})
|
await chan.send(
|
||||||
|
{'return': result,
|
||||||
|
'cid': cid}
|
||||||
|
)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
Exception,
|
Exception,
|
||||||
|
@ -309,6 +363,7 @@ async def _invoke(
|
||||||
# always ship errors back to caller
|
# always ship errors back to caller
|
||||||
err_msg = pack_error(err, tb=tb)
|
err_msg = pack_error(err, tb=tb)
|
||||||
err_msg['cid'] = cid
|
err_msg['cid'] = cid
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await chan.send(err_msg)
|
await chan.send(err_msg)
|
||||||
|
|
||||||
|
@ -325,14 +380,21 @@ async def _invoke(
|
||||||
f"Failed to ship error to caller @ {chan.uid} !?"
|
f"Failed to ship error to caller @ {chan.uid} !?"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# error is probably from above coro running code *not from the
|
||||||
|
# underlyingn rpc invocation* since a scope was never allocated
|
||||||
|
# around actual coroutine await.
|
||||||
if cs is None:
|
if cs is None:
|
||||||
# error is from above code not from rpc invocation
|
# we don't ever raise directly here to allow the
|
||||||
|
# msg-loop-scheduler to continue running for this
|
||||||
|
# channel.
|
||||||
task_status.started(err)
|
task_status.started(err)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# RPC task bookeeping
|
# RPC task bookeeping
|
||||||
try:
|
try:
|
||||||
scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))
|
ctx, func, is_complete = actor._rpc_tasks.pop(
|
||||||
|
(chan, cid)
|
||||||
|
)
|
||||||
is_complete.set()
|
is_complete.set()
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -341,6 +403,9 @@ async def _invoke(
|
||||||
# cancel scope will not have been inserted yet
|
# cancel scope will not have been inserted yet
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Task {func} likely errored or cancelled before start")
|
f"Task {func} likely errored or cancelled before start")
|
||||||
|
else:
|
||||||
|
log.cancel(f'{func.__name__}({kwargs}) failed?')
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if not actor._rpc_tasks:
|
if not actor._rpc_tasks:
|
||||||
log.runtime("All RPC tasks have completed")
|
log.runtime("All RPC tasks have completed")
|
||||||
|
@ -439,6 +504,7 @@ class Actor:
|
||||||
self.uid = (name, uid or str(uuid.uuid4()))
|
self.uid = (name, uid or str(uuid.uuid4()))
|
||||||
|
|
||||||
self._cancel_complete = trio.Event()
|
self._cancel_complete = trio.Event()
|
||||||
|
self._cancel_called_remote: tuple[str, tuple] | None = None
|
||||||
self._cancel_called: bool = False
|
self._cancel_called: bool = False
|
||||||
|
|
||||||
# retreive and store parent `__main__` data which
|
# retreive and store parent `__main__` data which
|
||||||
|
@ -477,7 +543,7 @@ class Actor:
|
||||||
# (chan, cid) -> (cancel_scope, func)
|
# (chan, cid) -> (cancel_scope, func)
|
||||||
self._rpc_tasks: dict[
|
self._rpc_tasks: dict[
|
||||||
tuple[Channel, str],
|
tuple[Channel, str],
|
||||||
tuple[trio.CancelScope, Callable, trio.Event]
|
tuple[Context, Callable, trio.Event]
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
# map {actor uids -> Context}
|
# map {actor uids -> Context}
|
||||||
|
@ -652,8 +718,8 @@ class Actor:
|
||||||
if (
|
if (
|
||||||
local_nursery
|
local_nursery
|
||||||
):
|
):
|
||||||
|
if chan._cancel_called:
|
||||||
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
|
log.cancel(f"Waiting on cancel request to peer {chan.uid}")
|
||||||
# XXX: this is a soft wait on the channel (and its
|
# XXX: this is a soft wait on the channel (and its
|
||||||
# underlying transport protocol) to close from the
|
# underlying transport protocol) to close from the
|
||||||
# remote peer side since we presume that any channel
|
# remote peer side since we presume that any channel
|
||||||
|
@ -786,76 +852,15 @@ class Actor:
|
||||||
f'\n{msg}')
|
f'\n{msg}')
|
||||||
return
|
return
|
||||||
|
|
||||||
send_chan = ctx._send_chan
|
return await ctx._deliver_msg(msg)
|
||||||
|
|
||||||
log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
|
|
||||||
|
|
||||||
# XXX: we do **not** maintain backpressure and instead
|
|
||||||
# opt to relay stream overrun errors to the sender.
|
|
||||||
try:
|
|
||||||
send_chan.send_nowait(msg)
|
|
||||||
# if an error is deteced we should always
|
|
||||||
# expect it to be raised by any context (stream)
|
|
||||||
# consumer task
|
|
||||||
await ctx._maybe_raise_from_remote_msg(msg)
|
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
|
||||||
# TODO: what is the right way to handle the case where the
|
|
||||||
# local task has already sent a 'stop' / StopAsyncInteration
|
|
||||||
# to the other side but and possibly has closed the local
|
|
||||||
# feeder mem chan? Do we wait for some kind of ack or just
|
|
||||||
# let this fail silently and bubble up (currently)?
|
|
||||||
|
|
||||||
# XXX: local consumer has closed their side
|
|
||||||
# so cancel the far end streaming task
|
|
||||||
log.warning(f"{send_chan} consumer is already closed")
|
|
||||||
return
|
|
||||||
|
|
||||||
except trio.WouldBlock:
|
|
||||||
# XXX: always push an error even if the local
|
|
||||||
# receiver is in overrun state.
|
|
||||||
await ctx._maybe_raise_from_remote_msg(msg)
|
|
||||||
|
|
||||||
uid = chan.uid
|
|
||||||
lines = [
|
|
||||||
'Task context stream was overrun',
|
|
||||||
f'local task: {cid} @ {self.uid}',
|
|
||||||
f'remote sender: {uid}',
|
|
||||||
]
|
|
||||||
if not ctx._stream_opened:
|
|
||||||
lines.insert(
|
|
||||||
1,
|
|
||||||
f'\n*** No stream open on `{self.uid[0]}` side! ***\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
text = '\n'.join(lines)
|
|
||||||
|
|
||||||
if ctx._backpressure:
|
|
||||||
log.warning(text)
|
|
||||||
try:
|
|
||||||
await send_chan.send(msg)
|
|
||||||
except trio.BrokenResourceError:
|
|
||||||
# XXX: local consumer has closed their side
|
|
||||||
# so cancel the far end streaming task
|
|
||||||
log.warning(f"{chan} is already closed")
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
raise StreamOverrun(text) from None
|
|
||||||
except StreamOverrun as err:
|
|
||||||
err_msg = pack_error(err)
|
|
||||||
err_msg['cid'] = cid
|
|
||||||
try:
|
|
||||||
await chan.send(err_msg)
|
|
||||||
except trio.BrokenResourceError:
|
|
||||||
# XXX: local consumer has closed their side
|
|
||||||
# so cancel the far end streaming task
|
|
||||||
log.warning(f"{chan} is already closed")
|
|
||||||
|
|
||||||
def get_context(
|
def get_context(
|
||||||
self,
|
self,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
cid: str,
|
cid: str,
|
||||||
msg_buffer_size: Optional[int] = None,
|
|
||||||
|
msg_buffer_size: int | None = None,
|
||||||
|
allow_overruns: bool = False,
|
||||||
|
|
||||||
) -> Context:
|
) -> Context:
|
||||||
'''
|
'''
|
||||||
|
@ -871,6 +876,7 @@ class Actor:
|
||||||
assert actor_uid
|
assert actor_uid
|
||||||
try:
|
try:
|
||||||
ctx = self._contexts[(actor_uid, cid)]
|
ctx = self._contexts[(actor_uid, cid)]
|
||||||
|
ctx._allow_overruns = allow_overruns
|
||||||
|
|
||||||
# adjust buffer size if specified
|
# adjust buffer size if specified
|
||||||
state = ctx._send_chan._state # type: ignore
|
state = ctx._send_chan._state # type: ignore
|
||||||
|
@ -878,15 +884,11 @@ class Actor:
|
||||||
state.max_buffer_size = msg_buffer_size
|
state.max_buffer_size = msg_buffer_size
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
send_chan: trio.MemorySendChannel
|
ctx = mk_context(
|
||||||
recv_chan: trio.MemoryReceiveChannel
|
|
||||||
send_chan, recv_chan = trio.open_memory_channel(
|
|
||||||
msg_buffer_size or self.msg_buffer_size)
|
|
||||||
ctx = Context(
|
|
||||||
chan,
|
chan,
|
||||||
cid,
|
cid,
|
||||||
_send_chan=send_chan,
|
msg_buffer_size=msg_buffer_size or self.msg_buffer_size,
|
||||||
_recv_chan=recv_chan,
|
_allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
self._contexts[(actor_uid, cid)] = ctx
|
self._contexts[(actor_uid, cid)] = ctx
|
||||||
|
|
||||||
|
@ -898,7 +900,8 @@ class Actor:
|
||||||
ns: str,
|
ns: str,
|
||||||
func: str,
|
func: str,
|
||||||
kwargs: dict,
|
kwargs: dict,
|
||||||
msg_buffer_size: Optional[int] = None,
|
msg_buffer_size: int | None = None,
|
||||||
|
allow_overruns: bool = False,
|
||||||
|
|
||||||
) -> Context:
|
) -> Context:
|
||||||
'''
|
'''
|
||||||
|
@ -916,6 +919,7 @@ class Actor:
|
||||||
chan,
|
chan,
|
||||||
cid,
|
cid,
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||||
await chan.send(
|
await chan.send(
|
||||||
|
@ -1046,7 +1050,11 @@ class Actor:
|
||||||
assert self._service_n
|
assert self._service_n
|
||||||
self._service_n.start_soon(self.cancel)
|
self._service_n.start_soon(self.cancel)
|
||||||
|
|
||||||
async def cancel(self) -> bool:
|
async def cancel(
|
||||||
|
self,
|
||||||
|
requesting_uid: tuple[str, str],
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Cancel this actor's runtime.
|
Cancel this actor's runtime.
|
||||||
|
|
||||||
|
@ -1060,6 +1068,7 @@ class Actor:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
log.cancel(f"{self.uid} is trying to cancel")
|
log.cancel(f"{self.uid} is trying to cancel")
|
||||||
|
self._cancel_called_remote: tuple = requesting_uid
|
||||||
self._cancel_called = True
|
self._cancel_called = True
|
||||||
|
|
||||||
# cancel all ongoing rpc tasks
|
# cancel all ongoing rpc tasks
|
||||||
|
@ -1073,7 +1082,7 @@ class Actor:
|
||||||
dbcs.cancel()
|
dbcs.cancel()
|
||||||
|
|
||||||
# kill all ongoing tasks
|
# kill all ongoing tasks
|
||||||
await self.cancel_rpc_tasks()
|
await self.cancel_rpc_tasks(requesting_uid=requesting_uid)
|
||||||
|
|
||||||
# stop channel server
|
# stop channel server
|
||||||
self.cancel_server()
|
self.cancel_server()
|
||||||
|
@ -1099,7 +1108,13 @@ class Actor:
|
||||||
# for n in root.child_nurseries:
|
# for n in root.child_nurseries:
|
||||||
# n.cancel_scope.cancel()
|
# n.cancel_scope.cancel()
|
||||||
|
|
||||||
async def _cancel_task(self, cid, chan):
|
async def _cancel_task(
|
||||||
|
self,
|
||||||
|
cid: str,
|
||||||
|
chan: Channel,
|
||||||
|
|
||||||
|
requesting_uid: tuple[str, str] | None = None,
|
||||||
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Cancel a local task by call-id / channel.
|
Cancel a local task by call-id / channel.
|
||||||
|
|
||||||
|
@ -1114,35 +1129,51 @@ class Actor:
|
||||||
try:
|
try:
|
||||||
# this ctx based lookup ensures the requested task to
|
# this ctx based lookup ensures the requested task to
|
||||||
# be cancelled was indeed spawned by a request from this channel
|
# be cancelled was indeed spawned by a request from this channel
|
||||||
scope, func, is_complete = self._rpc_tasks[(chan, cid)]
|
ctx, func, is_complete = self._rpc_tasks[(chan, cid)]
|
||||||
|
scope = ctx._scope
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.cancel(f"{cid} has already completed/terminated?")
|
log.cancel(f"{cid} has already completed/terminated?")
|
||||||
return
|
return True
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
|
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
|
||||||
f"peer: {chan.uid}\n")
|
f"peer: {chan.uid}\n")
|
||||||
|
|
||||||
|
if (
|
||||||
|
ctx._cancel_called_remote is None
|
||||||
|
and requesting_uid
|
||||||
|
):
|
||||||
|
ctx._cancel_called_remote: tuple = requesting_uid
|
||||||
|
|
||||||
# don't allow cancelling this function mid-execution
|
# don't allow cancelling this function mid-execution
|
||||||
# (is this necessary?)
|
# (is this necessary?)
|
||||||
if func is self._cancel_task:
|
if func is self._cancel_task:
|
||||||
return
|
return True
|
||||||
|
|
||||||
|
# TODO: shouldn't we eventually be calling ``Context.cancel()``
|
||||||
|
# directly here instead (since that method can handle both
|
||||||
|
# side's calls into it?
|
||||||
scope.cancel()
|
scope.cancel()
|
||||||
|
|
||||||
# wait for _invoke to mark the task complete
|
# wait for _invoke to mark the task complete
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Waiting on task to cancel:\ncid: {cid}\nfunc: {func}\n"
|
'Waiting on task to cancel:\n'
|
||||||
f"peer: {chan.uid}\n")
|
f'cid: {cid}\nfunc: {func}\n'
|
||||||
|
f'peer: {chan.uid}\n'
|
||||||
|
)
|
||||||
await is_complete.wait()
|
await is_complete.wait()
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
|
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
|
||||||
f"peer: {chan.uid}\n")
|
f"peer: {chan.uid}\n")
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
async def cancel_rpc_tasks(
|
async def cancel_rpc_tasks(
|
||||||
self,
|
self,
|
||||||
only_chan: Channel | None = None,
|
only_chan: Channel | None = None,
|
||||||
|
requesting_uid: tuple[str, str] | None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Cancel all existing RPC responder tasks using the cancel scope
|
Cancel all existing RPC responder tasks using the cancel scope
|
||||||
|
@ -1154,7 +1185,7 @@ class Actor:
|
||||||
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
|
||||||
for (
|
for (
|
||||||
(chan, cid),
|
(chan, cid),
|
||||||
(scope, func, is_complete),
|
(ctx, func, is_complete),
|
||||||
) in tasks.copy().items():
|
) in tasks.copy().items():
|
||||||
if only_chan is not None:
|
if only_chan is not None:
|
||||||
if only_chan != chan:
|
if only_chan != chan:
|
||||||
|
@ -1162,7 +1193,11 @@ class Actor:
|
||||||
|
|
||||||
# TODO: this should really done in a nursery batch
|
# TODO: this should really done in a nursery batch
|
||||||
if func != self._cancel_task:
|
if func != self._cancel_task:
|
||||||
await self._cancel_task(cid, chan)
|
await self._cancel_task(
|
||||||
|
cid,
|
||||||
|
chan,
|
||||||
|
requesting_uid=requesting_uid,
|
||||||
|
)
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Waiting for remaining rpc tasks to complete {tasks}")
|
f"Waiting for remaining rpc tasks to complete {tasks}")
|
||||||
|
@ -1248,8 +1283,8 @@ async def async_main(
|
||||||
Actor runtime entrypoint; start the IPC channel server, maybe connect
|
Actor runtime entrypoint; start the IPC channel server, maybe connect
|
||||||
back to the parent, and startup all core machinery tasks.
|
back to the parent, and startup all core machinery tasks.
|
||||||
|
|
||||||
A "root-most" (or "top-level") nursery for this actor is opened here
|
A "root" (or "top-level") nursery for this actor is opened here and
|
||||||
and when cancelled effectively cancels the actor.
|
when cancelled/terminated effectively closes the actor's "runtime".
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# attempt to retreive ``trio``'s sigint handler and stash it
|
# attempt to retreive ``trio``'s sigint handler and stash it
|
||||||
|
@ -1446,15 +1481,16 @@ async def process_messages(
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Process messages for the IPC transport channel async-RPC style.
|
This is the per-channel, low level RPC task scheduler loop.
|
||||||
|
|
||||||
Receive multiplexed RPC requests, spawn handler tasks and deliver
|
Receive multiplexed RPC request messages from some remote process,
|
||||||
responses over or boxed errors back to the "caller" task.
|
spawn handler tasks depending on request type and deliver responses
|
||||||
|
or boxed errors back to the remote caller (task).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# TODO: once https://github.com/python-trio/trio/issues/467 gets
|
# TODO: once https://github.com/python-trio/trio/issues/467 gets
|
||||||
# worked out we'll likely want to use that!
|
# worked out we'll likely want to use that!
|
||||||
msg = None
|
msg: dict | None = None
|
||||||
nursery_cancelled_before_task: bool = False
|
nursery_cancelled_before_task: bool = False
|
||||||
|
|
||||||
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
log.runtime(f"Entering msg loop for {chan} from {chan.uid}")
|
||||||
|
@ -1476,7 +1512,10 @@ async def process_messages(
|
||||||
|
|
||||||
for (channel, cid) in actor._rpc_tasks.copy():
|
for (channel, cid) in actor._rpc_tasks.copy():
|
||||||
if channel is chan:
|
if channel is chan:
|
||||||
await actor._cancel_task(cid, channel)
|
await actor._cancel_task(
|
||||||
|
cid,
|
||||||
|
channel,
|
||||||
|
)
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Msg loop signalled to terminate for"
|
f"Msg loop signalled to terminate for"
|
||||||
|
@ -1490,12 +1529,14 @@ async def process_messages(
|
||||||
cid = msg.get('cid')
|
cid = msg.get('cid')
|
||||||
if cid:
|
if cid:
|
||||||
# deliver response to local caller/waiter
|
# deliver response to local caller/waiter
|
||||||
|
# via its per-remote-context memory channel.
|
||||||
await actor._push_result(chan, cid, msg)
|
await actor._push_result(chan, cid, msg)
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
# TODO: implement with ``match:`` syntax?
|
||||||
# process command request
|
# process command request
|
||||||
try:
|
try:
|
||||||
ns, funcname, kwargs, actorid, cid = msg['cmd']
|
ns, funcname, kwargs, actorid, cid = msg['cmd']
|
||||||
|
@ -1515,13 +1556,12 @@ async def process_messages(
|
||||||
f"{ns}.{funcname}({kwargs})")
|
f"{ns}.{funcname}({kwargs})")
|
||||||
|
|
||||||
if ns == 'self':
|
if ns == 'self':
|
||||||
func = getattr(actor, funcname)
|
|
||||||
|
|
||||||
if funcname == 'cancel':
|
if funcname == 'cancel':
|
||||||
|
func = actor.cancel
|
||||||
|
kwargs['requesting_uid'] = chan.uid
|
||||||
|
|
||||||
# don't start entire actor runtime
|
# don't start entire actor runtime cancellation
|
||||||
# cancellation if this actor is in debug
|
# if this actor is currently in debug mode!
|
||||||
# mode
|
|
||||||
pdb_complete = _debug.Lock.local_pdb_complete
|
pdb_complete = _debug.Lock.local_pdb_complete
|
||||||
if pdb_complete:
|
if pdb_complete:
|
||||||
await pdb_complete.wait()
|
await pdb_complete.wait()
|
||||||
|
@ -1533,43 +1573,56 @@ async def process_messages(
|
||||||
# msg loop and break out into
|
# msg loop and break out into
|
||||||
# ``async_main()``
|
# ``async_main()``
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Actor {actor.uid} was remotely cancelled "
|
"Actor runtime for was remotely cancelled "
|
||||||
f"by {chan.uid}"
|
f"by {chan.uid}"
|
||||||
)
|
)
|
||||||
await _invoke(
|
await _invoke(
|
||||||
actor, cid, chan, func, kwargs, is_rpc=False
|
actor,
|
||||||
|
cid,
|
||||||
|
chan,
|
||||||
|
func,
|
||||||
|
kwargs,
|
||||||
|
is_rpc=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log.cancel(
|
||||||
|
f'Cancelling msg loop for {chan.uid}'
|
||||||
|
)
|
||||||
loop_cs.cancel()
|
loop_cs.cancel()
|
||||||
break
|
break
|
||||||
|
|
||||||
if funcname == '_cancel_task':
|
if funcname == '_cancel_task':
|
||||||
|
func = actor._cancel_task
|
||||||
|
|
||||||
# we immediately start the runtime machinery
|
# we immediately start the runtime machinery
|
||||||
# shutdown
|
# shutdown
|
||||||
with trio.CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
# actor.cancel() was called so kill this
|
kwargs['chan'] = chan
|
||||||
# msg loop and break out into
|
target_cid = kwargs['cid']
|
||||||
# ``async_main()``
|
kwargs['requesting_uid'] = chan.uid
|
||||||
kwargs['chan'] = chan
|
log.cancel(
|
||||||
log.cancel(
|
f'Remote request to cancel task\n'
|
||||||
f'Remote request to cancel task\n'
|
f'remote actor: {chan.uid}\n'
|
||||||
f'remote actor: {chan.uid}\n'
|
f'task: {target_cid}'
|
||||||
f'task: {cid}'
|
)
|
||||||
|
try:
|
||||||
|
await _invoke(
|
||||||
|
actor,
|
||||||
|
cid,
|
||||||
|
chan,
|
||||||
|
func,
|
||||||
|
kwargs,
|
||||||
|
is_rpc=False,
|
||||||
)
|
)
|
||||||
try:
|
except BaseException:
|
||||||
await _invoke(
|
log.exception("failed to cancel task?")
|
||||||
actor,
|
|
||||||
cid,
|
continue
|
||||||
chan,
|
else:
|
||||||
func,
|
# normally registry methods, eg.
|
||||||
kwargs,
|
# ``.register_actor()`` etc.
|
||||||
is_rpc=False,
|
func = getattr(actor, funcname)
|
||||||
)
|
|
||||||
except BaseException:
|
|
||||||
log.exception("failed to cancel task?")
|
|
||||||
|
|
||||||
continue
|
|
||||||
else:
|
else:
|
||||||
# complain to client about restricted modules
|
# complain to client about restricted modules
|
||||||
try:
|
try:
|
||||||
|
@ -1584,34 +1637,49 @@ async def process_messages(
|
||||||
log.runtime(f"Spawning task for {func}")
|
log.runtime(f"Spawning task for {func}")
|
||||||
assert actor._service_n
|
assert actor._service_n
|
||||||
try:
|
try:
|
||||||
cs = await actor._service_n.start(
|
ctx: Context = await actor._service_n.start(
|
||||||
partial(_invoke, actor, cid, chan, func, kwargs),
|
partial(
|
||||||
|
_invoke,
|
||||||
|
actor,
|
||||||
|
cid,
|
||||||
|
chan,
|
||||||
|
func,
|
||||||
|
kwargs,
|
||||||
|
),
|
||||||
name=funcname,
|
name=funcname,
|
||||||
)
|
)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
RuntimeError,
|
RuntimeError,
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
):
|
):
|
||||||
# avoid reporting a benign race condition
|
# avoid reporting a benign race condition
|
||||||
# during actor runtime teardown.
|
# during actor runtime teardown.
|
||||||
nursery_cancelled_before_task = True
|
nursery_cancelled_before_task: bool = True
|
||||||
break
|
break
|
||||||
|
|
||||||
# never allow cancelling cancel requests (results in
|
# in the lone case where a ``Context`` is not
|
||||||
# deadlock and other weird behaviour)
|
# delivered, it's likely going to be a locally
|
||||||
# if func != actor.cancel:
|
# scoped exception from ``_invoke()`` itself.
|
||||||
if isinstance(cs, Exception):
|
if isinstance(ctx, Exception):
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Task for RPC func {func} failed with"
|
f"Task for RPC func {func} failed with"
|
||||||
f"{cs}")
|
f"{ctx}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# mark that we have ongoing rpc tasks
|
# mark that we have ongoing rpc tasks
|
||||||
actor._ongoing_rpc_tasks = trio.Event()
|
actor._ongoing_rpc_tasks = trio.Event()
|
||||||
log.runtime(f"RPC func is {func}")
|
log.runtime(f"RPC func is {func}")
|
||||||
|
|
||||||
# store cancel scope such that the rpc task can be
|
# store cancel scope such that the rpc task can be
|
||||||
# cancelled gracefully if requested
|
# cancelled gracefully if requested
|
||||||
actor._rpc_tasks[(chan, cid)] = (
|
actor._rpc_tasks[(chan, cid)] = (
|
||||||
cs, func, trio.Event())
|
ctx,
|
||||||
|
func,
|
||||||
|
trio.Event(),
|
||||||
|
)
|
||||||
|
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Waiting on next msg for {chan} from {chan.uid}")
|
f"Waiting on next msg for {chan} from {chan.uid}")
|
||||||
|
@ -1655,7 +1723,7 @@ async def process_messages(
|
||||||
match err:
|
match err:
|
||||||
case ContextCancelled():
|
case ContextCancelled():
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f'Actor: {actor.uid} was task-context-cancelled with,\n'
|
f'Actor: {actor.uid} was context-cancelled with,\n'
|
||||||
f'str(err)'
|
f'str(err)'
|
||||||
)
|
)
|
||||||
case _:
|
case _:
|
||||||
|
@ -1672,7 +1740,8 @@ async def process_messages(
|
||||||
# msg debugging for when he machinery is brokey
|
# msg debugging for when he machinery is brokey
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f"Exiting msg loop for {chan} from {chan.uid} "
|
f"Exiting msg loop for {chan} from {chan.uid} "
|
||||||
f"with last msg:\n{msg}")
|
f"with last msg:\n{msg}"
|
||||||
|
)
|
||||||
|
|
||||||
# transport **was not** disconnected
|
# transport **was not** disconnected
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -21,24 +21,41 @@ Message stream types and APIs.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import inspect
|
import inspect
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from dataclasses import dataclass
|
from collections import deque
|
||||||
|
from dataclasses import (
|
||||||
|
dataclass,
|
||||||
|
field,
|
||||||
|
)
|
||||||
|
from functools import partial
|
||||||
|
from pprint import pformat
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Optional,
|
Optional,
|
||||||
Callable,
|
Callable,
|
||||||
AsyncGenerator,
|
AsyncGenerator,
|
||||||
AsyncIterator
|
AsyncIterator,
|
||||||
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from ._ipc import Channel
|
from ._ipc import Channel
|
||||||
from ._exceptions import unpack_error, ContextCancelled
|
from ._exceptions import (
|
||||||
from ._state import current_actor
|
unpack_error,
|
||||||
|
pack_error,
|
||||||
|
ContextCancelled,
|
||||||
|
StreamOverrun,
|
||||||
|
)
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from .trionics import broadcast_receiver, BroadcastReceiver
|
from ._state import current_actor
|
||||||
|
from .trionics import (
|
||||||
|
broadcast_receiver,
|
||||||
|
BroadcastReceiver,
|
||||||
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ._portal import Portal
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -335,8 +352,8 @@ class MsgStream(trio.abc.Channel):
|
||||||
Send a message over this stream to the far end.
|
Send a message over this stream to the far end.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if self._ctx._error:
|
if self._ctx._remote_error:
|
||||||
raise self._ctx._error # from None
|
raise self._ctx._remote_error # from None
|
||||||
|
|
||||||
if self._closed:
|
if self._closed:
|
||||||
raise trio.ClosedResourceError('This stream was already closed')
|
raise trio.ClosedResourceError('This stream was already closed')
|
||||||
|
@ -373,25 +390,61 @@ class Context:
|
||||||
_recv_chan: trio.MemoryReceiveChannel
|
_recv_chan: trio.MemoryReceiveChannel
|
||||||
_send_chan: trio.MemorySendChannel
|
_send_chan: trio.MemorySendChannel
|
||||||
|
|
||||||
_remote_func_type: Optional[str] = None
|
_remote_func_type: str | None = None
|
||||||
|
|
||||||
# only set on the caller side
|
# only set on the caller side
|
||||||
_portal: Optional['Portal'] = None # type: ignore # noqa
|
_portal: Portal | None = None # type: ignore # noqa
|
||||||
_result: Optional[Any] = False
|
_result: Any | int = None
|
||||||
_error: Optional[BaseException] = None
|
_remote_error: BaseException | None = None
|
||||||
|
|
||||||
# status flags
|
# cancellation state
|
||||||
_cancel_called: bool = False
|
_cancel_called: bool = False
|
||||||
_cancel_msg: Optional[str] = None
|
_cancel_called_remote: tuple | None = None
|
||||||
|
_cancel_msg: str | None = None
|
||||||
|
_scope: trio.CancelScope | None = None
|
||||||
_enter_debugger_on_cancel: bool = True
|
_enter_debugger_on_cancel: bool = True
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cancel_called(self) -> bool:
|
||||||
|
'''
|
||||||
|
Records whether cancellation has been requested for this context
|
||||||
|
by either an explicit call to ``.cancel()`` or an implicit call
|
||||||
|
due to an error caught inside the ``Portal.open_context()``
|
||||||
|
block.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._cancel_called
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cancel_called_remote(self) -> tuple[str, str] | None:
|
||||||
|
'''
|
||||||
|
``Actor.uid`` of the remote actor who's task was cancelled
|
||||||
|
causing this side of the context to also be cancelled.
|
||||||
|
|
||||||
|
'''
|
||||||
|
remote_uid = self._cancel_called_remote
|
||||||
|
if remote_uid:
|
||||||
|
return tuple(remote_uid)
|
||||||
|
|
||||||
|
# init and streaming state
|
||||||
_started_called: bool = False
|
_started_called: bool = False
|
||||||
_started_received: bool = False
|
_started_received: bool = False
|
||||||
_stream_opened: bool = False
|
_stream_opened: bool = False
|
||||||
|
|
||||||
# only set on the callee side
|
# overrun handling machinery
|
||||||
_scope_nursery: Optional[trio.Nursery] = None
|
# NOTE: none of this provides "backpressure" to the remote
|
||||||
|
# task, only an ability to not lose messages when the local
|
||||||
_backpressure: bool = True
|
# task is configured to NOT transmit ``StreamOverrun``s back
|
||||||
|
# to the other side.
|
||||||
|
_overflow_q: deque[dict] = field(
|
||||||
|
default_factory=partial(
|
||||||
|
deque,
|
||||||
|
maxlen=616,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
_scope_nursery: trio.Nursery | None = None
|
||||||
|
_in_overrun: bool = False
|
||||||
|
_allow_overruns: bool = False
|
||||||
|
|
||||||
async def send_yield(
|
async def send_yield(
|
||||||
self,
|
self,
|
||||||
|
@ -410,9 +463,9 @@ class Context:
|
||||||
async def send_stop(self) -> None:
|
async def send_stop(self) -> None:
|
||||||
await self.chan.send({'stop': True, 'cid': self.cid})
|
await self.chan.send({'stop': True, 'cid': self.cid})
|
||||||
|
|
||||||
async def _maybe_raise_from_remote_msg(
|
async def _maybe_cancel_and_set_remote_error(
|
||||||
self,
|
self,
|
||||||
msg: dict[str, Any],
|
error_msg: dict[str, Any],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -423,55 +476,77 @@ class Context:
|
||||||
in the corresponding remote callee task.
|
in the corresponding remote callee task.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
error = msg.get('error')
|
# If this is an error message from a context opened by
|
||||||
if error:
|
# ``Portal.open_context()`` we want to interrupt any ongoing
|
||||||
# If this is an error message from a context opened by
|
# (child) tasks within that context to be notified of the remote
|
||||||
# ``Portal.open_context()`` we want to interrupt any ongoing
|
# error relayed here.
|
||||||
# (child) tasks within that context to be notified of the remote
|
#
|
||||||
# error relayed here.
|
# The reason we may want to raise the remote error immediately
|
||||||
#
|
# is that there is no guarantee the associated local task(s)
|
||||||
# The reason we may want to raise the remote error immediately
|
# will attempt to read from any locally opened stream any time
|
||||||
# is that there is no guarantee the associated local task(s)
|
# soon.
|
||||||
# will attempt to read from any locally opened stream any time
|
#
|
||||||
# soon.
|
# NOTE: this only applies when
|
||||||
#
|
# ``Portal.open_context()`` has been called since it is assumed
|
||||||
# NOTE: this only applies when
|
# (currently) that other portal APIs (``Portal.run()``,
|
||||||
# ``Portal.open_context()`` has been called since it is assumed
|
# ``.run_in_actor()``) do their own error checking at the point
|
||||||
# (currently) that other portal APIs (``Portal.run()``,
|
# of the call and result processing.
|
||||||
# ``.run_in_actor()``) do their own error checking at the point
|
error = unpack_error(
|
||||||
# of the call and result processing.
|
error_msg,
|
||||||
log.error(
|
self.chan,
|
||||||
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
)
|
||||||
f'{msg["error"]["tb_str"]}'
|
|
||||||
|
# XXX: set the remote side's error so that after we cancel
|
||||||
|
# whatever task is the opener of this context it can raise
|
||||||
|
# that error as the reason.
|
||||||
|
self._remote_error = error
|
||||||
|
|
||||||
|
if (
|
||||||
|
isinstance(error, ContextCancelled)
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
'Remote task-context sucessfully cancelled for '
|
||||||
|
f'{self.chan.uid}:{self.cid}'
|
||||||
)
|
)
|
||||||
error = unpack_error(msg, self.chan)
|
|
||||||
if (
|
if self._cancel_called:
|
||||||
isinstance(error, ContextCancelled) and
|
|
||||||
self._cancel_called
|
|
||||||
):
|
|
||||||
# this is an expected cancel request response message
|
# this is an expected cancel request response message
|
||||||
# and we don't need to raise it in scope since it will
|
# and we don't need to raise it in scope since it will
|
||||||
# potentially override a real error
|
# potentially override a real error
|
||||||
return
|
return
|
||||||
|
else:
|
||||||
|
log.error(
|
||||||
|
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
||||||
|
f'{error_msg["error"]["tb_str"]}'
|
||||||
|
)
|
||||||
|
# TODO: tempted to **not** do this by-reraising in a
|
||||||
|
# nursery and instead cancel a surrounding scope, detect
|
||||||
|
# the cancellation, then lookup the error that was set?
|
||||||
|
# YES! this is way better and simpler!
|
||||||
|
if (
|
||||||
|
self._scope
|
||||||
|
):
|
||||||
|
# from trio.testing import wait_all_tasks_blocked
|
||||||
|
# await wait_all_tasks_blocked()
|
||||||
|
self._cancel_called_remote = self.chan.uid
|
||||||
|
self._scope.cancel()
|
||||||
|
|
||||||
self._error = error
|
# NOTE: this usage actually works here B)
|
||||||
|
# from ._debug import breakpoint
|
||||||
|
# await breakpoint()
|
||||||
|
|
||||||
# TODO: tempted to **not** do this by-reraising in a
|
|
||||||
# nursery and instead cancel a surrounding scope, detect
|
|
||||||
# the cancellation, then lookup the error that was set?
|
|
||||||
if self._scope_nursery:
|
|
||||||
|
|
||||||
async def raiser():
|
# XXX: this will break early callee results sending
|
||||||
raise self._error from None
|
# since when `.result()` is finally called, this
|
||||||
|
# chan will be closed..
|
||||||
# from trio.testing import wait_all_tasks_blocked
|
# if self._recv_chan:
|
||||||
# await wait_all_tasks_blocked()
|
# await self._recv_chan.aclose()
|
||||||
if not self._scope_nursery._closed: # type: ignore
|
|
||||||
self._scope_nursery.start_soon(raiser)
|
|
||||||
|
|
||||||
async def cancel(
|
async def cancel(
|
||||||
self,
|
self,
|
||||||
msg: str | None = None,
|
msg: str | None = None,
|
||||||
|
timeout: float = 0.5,
|
||||||
|
# timeout: float = 1000,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -488,6 +563,8 @@ class Context:
|
||||||
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
log.cancel(f'Cancelling {side} side of context to {self.chan.uid}')
|
||||||
|
|
||||||
self._cancel_called = True
|
self._cancel_called = True
|
||||||
|
# await _debug.breakpoint()
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
if side == 'caller':
|
if side == 'caller':
|
||||||
if not self._portal:
|
if not self._portal:
|
||||||
|
@ -496,8 +573,8 @@ class Context:
|
||||||
)
|
)
|
||||||
|
|
||||||
cid = self.cid
|
cid = self.cid
|
||||||
with trio.move_on_after(0.5) as cs:
|
with trio.move_on_after(timeout) as cs:
|
||||||
cs.shield = True
|
# cs.shield = True
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Cancelling stream {cid} to "
|
f"Cancelling stream {cid} to "
|
||||||
f"{self._portal.channel.uid}")
|
f"{self._portal.channel.uid}")
|
||||||
|
@ -505,7 +582,12 @@ class Context:
|
||||||
# NOTE: we're telling the far end actor to cancel a task
|
# NOTE: we're telling the far end actor to cancel a task
|
||||||
# corresponding to *this actor*. The far end local channel
|
# corresponding to *this actor*. The far end local channel
|
||||||
# instance is passed to `Actor._cancel_task()` implicitly.
|
# instance is passed to `Actor._cancel_task()` implicitly.
|
||||||
await self._portal.run_from_ns('self', '_cancel_task', cid=cid)
|
await self._portal.run_from_ns(
|
||||||
|
'self',
|
||||||
|
'_cancel_task',
|
||||||
|
cid=cid,
|
||||||
|
)
|
||||||
|
# print("EXITING CANCEL CALL")
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
# XXX: there's no way to know if the remote task was indeed
|
# XXX: there's no way to know if the remote task was indeed
|
||||||
|
@ -530,17 +612,14 @@ class Context:
|
||||||
# {'error': trio.Cancelled, cid: "blah"} enough?
|
# {'error': trio.Cancelled, cid: "blah"} enough?
|
||||||
# This probably gets into the discussion in
|
# This probably gets into the discussion in
|
||||||
# https://github.com/goodboy/tractor/issues/36
|
# https://github.com/goodboy/tractor/issues/36
|
||||||
assert self._scope_nursery
|
assert self._scope
|
||||||
self._scope_nursery.cancel_scope.cancel()
|
self._scope.cancel()
|
||||||
|
|
||||||
if self._recv_chan:
|
|
||||||
await self._recv_chan.aclose()
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def open_stream(
|
async def open_stream(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
backpressure: bool | None = True,
|
allow_overruns: bool | None = False,
|
||||||
msg_buffer_size: int | None = None,
|
msg_buffer_size: int | None = None,
|
||||||
|
|
||||||
) -> AsyncGenerator[MsgStream, None]:
|
) -> AsyncGenerator[MsgStream, None]:
|
||||||
|
@ -592,8 +671,9 @@ class Context:
|
||||||
self.chan,
|
self.chan,
|
||||||
self.cid,
|
self.cid,
|
||||||
msg_buffer_size=msg_buffer_size,
|
msg_buffer_size=msg_buffer_size,
|
||||||
|
allow_overruns=allow_overruns,
|
||||||
)
|
)
|
||||||
ctx._backpressure = backpressure
|
ctx._allow_overruns = allow_overruns
|
||||||
assert ctx is self
|
assert ctx is self
|
||||||
|
|
||||||
# XXX: If the underlying channel feeder receive mem chan has
|
# XXX: If the underlying channel feeder receive mem chan has
|
||||||
|
@ -637,48 +717,115 @@ class Context:
|
||||||
f'ctx id: {self.cid}'
|
f'ctx id: {self.cid}'
|
||||||
)
|
)
|
||||||
|
|
||||||
async def result(self) -> Any:
|
def _maybe_raise_remote_err(
|
||||||
|
self,
|
||||||
|
err: Exception,
|
||||||
|
) -> None:
|
||||||
|
# NOTE: whenever the context's "opener" side (task) **is**
|
||||||
|
# the side which requested the cancellation (likekly via
|
||||||
|
# ``Context.cancel()``), we don't want to re-raise that
|
||||||
|
# cancellation signal locally (would be akin to
|
||||||
|
# a ``trio.Nursery`` nursery raising ``trio.Cancelled``
|
||||||
|
# whenever ``CancelScope.cancel()`` was called) and instead
|
||||||
|
# silently reap the expected cancellation "error"-msg.
|
||||||
|
# if 'pikerd' in err.msgdata['tb_str']:
|
||||||
|
# # from . import _debug
|
||||||
|
# # await _debug.breakpoint()
|
||||||
|
# breakpoint()
|
||||||
|
|
||||||
|
if (
|
||||||
|
isinstance(err, ContextCancelled)
|
||||||
|
and (
|
||||||
|
self._cancel_called
|
||||||
|
or self.chan._cancel_called
|
||||||
|
or tuple(err.canceller) == current_actor().uid
|
||||||
|
)
|
||||||
|
):
|
||||||
|
return err
|
||||||
|
|
||||||
|
raise err from None
|
||||||
|
|
||||||
|
async def result(self) -> Any | Exception:
|
||||||
'''
|
'''
|
||||||
From a caller side, wait for and return the final result from
|
From some (caller) side task, wait for and return the final
|
||||||
the callee side task.
|
result from the remote (callee) side's task.
|
||||||
|
|
||||||
|
This provides a mechanism for one task running in some actor to wait
|
||||||
|
on another task at the other side, in some other actor, to terminate.
|
||||||
|
|
||||||
|
If the remote task is still in a streaming state (it is delivering
|
||||||
|
values from inside a ``Context.open_stream():`` block, then those
|
||||||
|
msgs are drained but discarded since it is presumed this side of
|
||||||
|
the context has already finished with its own streaming logic.
|
||||||
|
|
||||||
|
If the remote context (or its containing actor runtime) was
|
||||||
|
canceled, either by a local task calling one of
|
||||||
|
``Context.cancel()`` or `Portal.cancel_actor()``, we ignore the
|
||||||
|
received ``ContextCancelled`` exception if the context or
|
||||||
|
underlying IPC channel is marked as having been "cancel called".
|
||||||
|
This is similar behavior to using ``trio.Nursery.cancel()``
|
||||||
|
wherein tasks which raise ``trio.Cancel`` are silently reaped;
|
||||||
|
the main different in this API is in the "cancel called" case,
|
||||||
|
instead of just not raising, we also return the exception *as
|
||||||
|
the result* since client code may be interested in the details
|
||||||
|
of the remote cancellation.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
assert self._portal, "Context.result() can not be called from callee!"
|
assert self._portal, "Context.result() can not be called from callee!"
|
||||||
assert self._recv_chan
|
assert self._recv_chan
|
||||||
|
|
||||||
if self._result is False:
|
# from . import _debug
|
||||||
|
# await _debug.breakpoint()
|
||||||
|
|
||||||
if not self._recv_chan._closed: # type: ignore
|
re = self._remote_error
|
||||||
|
if re:
|
||||||
|
self._maybe_raise_remote_err(re)
|
||||||
|
return re
|
||||||
|
|
||||||
# wait for a final context result consuming
|
if (
|
||||||
# and discarding any bi dir stream msgs still
|
self._result == id(self)
|
||||||
# in transit from the far end.
|
and not self._remote_error
|
||||||
while True:
|
and not self._recv_chan._closed # type: ignore
|
||||||
|
):
|
||||||
|
# wait for a final context result consuming
|
||||||
|
# and discarding any bi dir stream msgs still
|
||||||
|
# in transit from the far end.
|
||||||
|
while True:
|
||||||
|
msg = await self._recv_chan.receive()
|
||||||
|
try:
|
||||||
|
self._result = msg['return']
|
||||||
|
|
||||||
msg = await self._recv_chan.receive()
|
# NOTE: we don't need to do this right?
|
||||||
try:
|
# XXX: only close the rx mem chan AFTER
|
||||||
self._result = msg['return']
|
# a final result is retreived.
|
||||||
break
|
# if self._recv_chan:
|
||||||
except KeyError as msgerr:
|
# await self._recv_chan.aclose()
|
||||||
|
|
||||||
if 'yield' in msg:
|
break
|
||||||
# far end task is still streaming to us so discard
|
except KeyError: # as msgerr:
|
||||||
log.warning(f'Discarding stream delivered {msg}')
|
|
||||||
continue
|
|
||||||
|
|
||||||
elif 'stop' in msg:
|
if 'yield' in msg:
|
||||||
log.debug('Remote stream terminated')
|
# far end task is still streaming to us so discard
|
||||||
continue
|
log.warning(f'Discarding stream delivered {msg}')
|
||||||
|
continue
|
||||||
|
|
||||||
# internal error should never get here
|
elif 'stop' in msg:
|
||||||
assert msg.get('cid'), (
|
log.debug('Remote stream terminated')
|
||||||
"Received internal error at portal?")
|
continue
|
||||||
|
|
||||||
raise unpack_error(
|
# internal error should never get here
|
||||||
msg, self._portal.channel
|
assert msg.get('cid'), (
|
||||||
) from msgerr
|
"Received internal error at portal?")
|
||||||
|
|
||||||
return self._result
|
err = unpack_error(
|
||||||
|
msg,
|
||||||
|
self._portal.channel
|
||||||
|
) # from msgerr
|
||||||
|
|
||||||
|
err = self._maybe_raise_remote_err(err)
|
||||||
|
self._remote_err = err
|
||||||
|
|
||||||
|
return self._remote_error or self._result
|
||||||
|
|
||||||
async def started(
|
async def started(
|
||||||
self,
|
self,
|
||||||
|
@ -708,6 +855,187 @@ class Context:
|
||||||
# async def restart(self) -> None:
|
# async def restart(self) -> None:
|
||||||
# pass
|
# pass
|
||||||
|
|
||||||
|
async def _drain_overflows(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Private task spawned to push newly received msgs to the local
|
||||||
|
task which getting overrun by the remote side.
|
||||||
|
|
||||||
|
In order to not block the rpc msg loop, but also not discard
|
||||||
|
msgs received in this context, we need to async push msgs in
|
||||||
|
a new task which only runs for as long as the local task is in
|
||||||
|
an overrun state.
|
||||||
|
|
||||||
|
'''
|
||||||
|
self._in_overrun = True
|
||||||
|
try:
|
||||||
|
while self._overflow_q:
|
||||||
|
# NOTE: these msgs should never be errors since we always do
|
||||||
|
# the check prior to checking if we're in an overrun state
|
||||||
|
# inside ``.deliver_msg()``.
|
||||||
|
msg = self._overflow_q.popleft()
|
||||||
|
try:
|
||||||
|
await self._send_chan.send(msg)
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
log.warning(
|
||||||
|
f"{self._send_chan} consumer is already closed"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
except trio.Cancelled:
|
||||||
|
# we are obviously still in overrun
|
||||||
|
# but the context is being closed anyway
|
||||||
|
# so we just warn that there are un received
|
||||||
|
# msgs still..
|
||||||
|
self._overflow_q.appendleft(msg)
|
||||||
|
fmt_msgs = ''
|
||||||
|
for msg in self._overflow_q:
|
||||||
|
fmt_msgs += f'{pformat(msg)}\n'
|
||||||
|
|
||||||
|
log.warning(
|
||||||
|
f'Context for {self.cid} is being closed while '
|
||||||
|
'in an overrun state!\n'
|
||||||
|
'Discarding the following msgs:\n'
|
||||||
|
f'{fmt_msgs}\n'
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# task is now finished with the backlog so mark us as
|
||||||
|
# no longer in backlog.
|
||||||
|
self._in_overrun = False
|
||||||
|
|
||||||
|
async def _deliver_msg(
|
||||||
|
self,
|
||||||
|
msg: dict,
|
||||||
|
|
||||||
|
draining: bool = False,
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
|
||||||
|
cid = self.cid
|
||||||
|
chan = self.chan
|
||||||
|
uid = chan.uid
|
||||||
|
send_chan: trio.MemorySendChannel = self._send_chan
|
||||||
|
|
||||||
|
log.runtime(
|
||||||
|
f"Delivering {msg} from {uid} to caller {cid}"
|
||||||
|
)
|
||||||
|
|
||||||
|
error = msg.get('error')
|
||||||
|
if error:
|
||||||
|
await self._maybe_cancel_and_set_remote_error(msg)
|
||||||
|
|
||||||
|
if (
|
||||||
|
self._in_overrun
|
||||||
|
):
|
||||||
|
self._overflow_q.append(msg)
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
send_chan.send_nowait(msg)
|
||||||
|
return True
|
||||||
|
# if an error is deteced we should always
|
||||||
|
# expect it to be raised by any context (stream)
|
||||||
|
# consumer task
|
||||||
|
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
# TODO: what is the right way to handle the case where the
|
||||||
|
# local task has already sent a 'stop' / StopAsyncInteration
|
||||||
|
# to the other side but and possibly has closed the local
|
||||||
|
# feeder mem chan? Do we wait for some kind of ack or just
|
||||||
|
# let this fail silently and bubble up (currently)?
|
||||||
|
|
||||||
|
# XXX: local consumer has closed their side
|
||||||
|
# so cancel the far end streaming task
|
||||||
|
log.warning(f"{send_chan} consumer is already closed")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# NOTE XXX: by default we do **not** maintain context-stream
|
||||||
|
# backpressure and instead opt to relay stream overrun errors to
|
||||||
|
# the sender; the main motivation is that using bp can block the
|
||||||
|
# msg handling loop which calls into this method!
|
||||||
|
except trio.WouldBlock:
|
||||||
|
# XXX: always push an error even if the local
|
||||||
|
# receiver is in overrun state.
|
||||||
|
# await self._maybe_cancel_and_set_remote_error(msg)
|
||||||
|
|
||||||
|
local_uid = current_actor().uid
|
||||||
|
lines = [
|
||||||
|
f'Actor-task context {cid}@{local_uid} was overrun by remote!',
|
||||||
|
f'sender actor: {uid}',
|
||||||
|
]
|
||||||
|
if not self._stream_opened:
|
||||||
|
lines.insert(
|
||||||
|
1,
|
||||||
|
f'\n*** No stream open on `{local_uid[0]}` side! ***\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
text = '\n'.join(lines)
|
||||||
|
|
||||||
|
# XXX: lul, this really can't be backpressure since any
|
||||||
|
# blocking here will block the entire msg loop rpc sched for
|
||||||
|
# a whole channel.. maybe we should rename it?
|
||||||
|
if self._allow_overruns:
|
||||||
|
text += f'\nStarting overflow queuing task on msg: {msg}'
|
||||||
|
log.warning(text)
|
||||||
|
if (
|
||||||
|
not self._in_overrun
|
||||||
|
):
|
||||||
|
self._overflow_q.append(msg)
|
||||||
|
n = self._scope_nursery
|
||||||
|
if n.child_tasks:
|
||||||
|
from . import _debug
|
||||||
|
await _debug.breakpoint()
|
||||||
|
assert not n.child_tasks
|
||||||
|
n.start_soon(
|
||||||
|
self._drain_overflows,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
raise StreamOverrun(text)
|
||||||
|
except StreamOverrun as err:
|
||||||
|
err_msg = pack_error(err)
|
||||||
|
err_msg['cid'] = cid
|
||||||
|
try:
|
||||||
|
await chan.send(err_msg)
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
# XXX: local consumer has closed their side
|
||||||
|
# so cancel the far end streaming task
|
||||||
|
log.warning(f"{chan} is already closed")
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def mk_context(
|
||||||
|
chan: Channel,
|
||||||
|
cid: str,
|
||||||
|
msg_buffer_size: int = 2**6,
|
||||||
|
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> Context:
|
||||||
|
'''
|
||||||
|
Internal factory to create an inter-actor task ``Context``.
|
||||||
|
|
||||||
|
This is called by internals and should generally never be called
|
||||||
|
by user code.
|
||||||
|
|
||||||
|
'''
|
||||||
|
send_chan: trio.MemorySendChannel
|
||||||
|
recv_chan: trio.MemoryReceiveChannel
|
||||||
|
send_chan, recv_chan = trio.open_memory_channel(msg_buffer_size)
|
||||||
|
|
||||||
|
ctx = Context(
|
||||||
|
chan,
|
||||||
|
cid,
|
||||||
|
_send_chan=send_chan,
|
||||||
|
_recv_chan=recv_chan,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
ctx._result = id(ctx)
|
||||||
|
return ctx
|
||||||
|
|
||||||
|
|
||||||
def stream(func: Callable) -> Callable:
|
def stream(func: Callable) -> Callable:
|
||||||
'''
|
'''
|
||||||
|
|
Loading…
Reference in New Issue