Disable msg stream backpressure by default
Half of portal API usage requires a 1 message response (`.run()`, `.run_in_actor()`) and the streaming APIs should probably be explicitly enabled for backpressure if desired by the user. This makes more sense in (psuedo) realtime systems where it's better to notify on a block then freeze without notice. Make this default behaviour with a new error to be raised: `tractor._exceptions.StreamOverrun` when a sender overruns a stream by the default size (2**6 for now). The old behavior can be enabled with `Context.open_stream(backpressure=True)` but now with warning log messages when there are overruns. Add task-linked-context error propagation using a "nursery raising" technique such that if either end of context linked pair of tasks errors, that error can be relayed to other side and raised as a form of interrupt at the receiving task's next `trio` checkpoint. This enables reliable error relay without expecting the (error) receiving task to call an API which would raise the remote exception (which it might never currently if using `tractor.MsgStream` APIs). Further internal implementation details: - define the default msg buffer size as `Actor.msg_buffer_size` - expose a `msg_buffer_size: int` kwarg from `Actor.get_context()` - maybe raise aforementioned context errors using `Context._maybe_error_from_remote_msg()` inside `Actor._push_result()` - support optional backpressure on a stream when pushing messages in `Actor._push_result()` - in `_invote()` handle multierrors raised from a `@tractor.context` entrypoint as being potentially caused by a relayed error from the remote caller task, if `Context._error` has been set then raise that error inside the `RemoteActorError` that will be relayed back to that caller more or less proxying through the source side error back to its origin.stricter_context_starting
parent
2680a9473d
commit
185dbc7e3f
|
@ -32,6 +32,7 @@ from ._exceptions import (
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
|
StreamOverrun,
|
||||||
)
|
)
|
||||||
from . import _debug
|
from . import _debug
|
||||||
from ._discovery import get_arbiter
|
from ._discovery import get_arbiter
|
||||||
|
@ -161,16 +162,27 @@ async def _invoke(
|
||||||
# context func with support for bi-dir streaming
|
# context func with support for bi-dir streaming
|
||||||
await chan.send({'functype': 'context', 'cid': cid})
|
await chan.send({'functype': 'context', 'cid': cid})
|
||||||
|
|
||||||
|
try:
|
||||||
async with trio.open_nursery() as scope_nursery:
|
async with trio.open_nursery() as scope_nursery:
|
||||||
ctx._scope_nursery = scope_nursery
|
ctx._scope_nursery = scope_nursery
|
||||||
cs = scope_nursery.cancel_scope
|
cs = scope_nursery.cancel_scope
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
try:
|
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
await chan.send({'return': await coro, 'cid': cid})
|
||||||
|
|
||||||
except trio.Cancelled as err:
|
except trio.Cancelled as err:
|
||||||
tb = err.__traceback__
|
tb = err.__traceback__
|
||||||
|
if ctx._error is not None:
|
||||||
|
tb = ctx._error.__traceback__
|
||||||
|
raise ctx._error
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
except trio.MultiError as err:
|
||||||
|
if ctx._error is not None:
|
||||||
|
tb = ctx._error.__traceback__
|
||||||
|
raise ctx._error from err
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
if cs.cancelled_caught or ctx._error:
|
||||||
|
|
||||||
# TODO: pack in ``trio.Cancelled.__traceback__`` here
|
# TODO: pack in ``trio.Cancelled.__traceback__`` here
|
||||||
# so they can be unwrapped and displayed on the caller
|
# so they can be unwrapped and displayed on the caller
|
||||||
|
@ -314,6 +326,7 @@ class Actor:
|
||||||
# ugh, we need to get rid of this and replace with a "registry" sys
|
# ugh, we need to get rid of this and replace with a "registry" sys
|
||||||
# https://github.com/goodboy/tractor/issues/216
|
# https://github.com/goodboy/tractor/issues/216
|
||||||
is_arbiter: bool = False
|
is_arbiter: bool = False
|
||||||
|
msg_buffer_size: int = 2**6
|
||||||
|
|
||||||
# nursery placeholders filled in by `_async_main()` after fork
|
# nursery placeholders filled in by `_async_main()` after fork
|
||||||
_root_n: Optional[trio.Nursery] = None
|
_root_n: Optional[trio.Nursery] = None
|
||||||
|
@ -548,7 +561,7 @@ class Actor:
|
||||||
# now in a cancelled condition) when the local runtime here
|
# now in a cancelled condition) when the local runtime here
|
||||||
# is now cancelled while (presumably) in the middle of msg
|
# is now cancelled while (presumably) in the middle of msg
|
||||||
# loop processing.
|
# loop processing.
|
||||||
with trio.move_on_after(0.1) as cs:
|
with trio.move_on_after(0.5) as cs:
|
||||||
cs.shield = True
|
cs.shield = True
|
||||||
# Attempt to wait for the far end to close the channel
|
# Attempt to wait for the far end to close the channel
|
||||||
# and bail after timeout (2-generals on closure).
|
# and bail after timeout (2-generals on closure).
|
||||||
|
@ -611,23 +624,54 @@ class Actor:
|
||||||
cid: str,
|
cid: str,
|
||||||
msg: dict[str, Any],
|
msg: dict[str, Any],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Push an RPC result to the local consumer's queue.
|
'''
|
||||||
"""
|
Push an RPC result to the local consumer's queue.
|
||||||
|
|
||||||
|
'''
|
||||||
assert chan.uid, f"`chan.uid` can't be {chan.uid}"
|
assert chan.uid, f"`chan.uid` can't be {chan.uid}"
|
||||||
ctx = self._contexts[(chan.uid, cid)]
|
ctx = self._contexts[(chan.uid, cid)]
|
||||||
send_chan = ctx._send_chan
|
send_chan = ctx._send_chan
|
||||||
assert send_chan
|
assert send_chan
|
||||||
|
|
||||||
# TODO: relaying far end context errors to the local
|
if msg.get('error'):
|
||||||
# context through nursery raising?
|
# If this is an error message from a context opened by
|
||||||
# if 'error' in msg:
|
# ``Portal.open_context()`` we want to interrupt any ongoing
|
||||||
# ctx._error_from_remote_msg(msg)
|
# (child) tasks within that context to be notified of the remote
|
||||||
# log.runtime(f"{send_chan} was terminated at remote end")
|
# error relayed here.
|
||||||
|
#
|
||||||
|
# The reason we may want to raise the remote error immediately
|
||||||
|
# is that there is no guarantee the associated local task(s)
|
||||||
|
# will attempt to read from any locally opened stream any time
|
||||||
|
# soon.
|
||||||
|
#
|
||||||
|
# NOTE: this only applies when
|
||||||
|
# ``Portal.open_context()`` has been called since it is assumed
|
||||||
|
# (currently) that other portal APIs (``Portal.run()``,
|
||||||
|
# ``.run_in_actor()``) do their own error checking at the point
|
||||||
|
# of the call and result processing.
|
||||||
|
log.warning(f'Remote context for {chan.uid}:{cid} errored')
|
||||||
|
ctx._maybe_error_from_remote_msg(msg)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
|
log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}")
|
||||||
# maintain backpressure
|
|
||||||
|
# XXX: we do **not** maintain backpressure and instead
|
||||||
|
# opt to relay stream overrun errors to the sender.
|
||||||
|
try:
|
||||||
|
send_chan.send_nowait(msg)
|
||||||
|
except trio.WouldBlock:
|
||||||
|
log.warning(f'Caller task {cid} was overrun!?')
|
||||||
|
if ctx._backpressure:
|
||||||
await send_chan.send(msg)
|
await send_chan.send(msg)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
raise StreamOverrun(
|
||||||
|
f'Context stream {cid} for {chan.uid} was overrun!'
|
||||||
|
)
|
||||||
|
except StreamOverrun as err:
|
||||||
|
err_msg = pack_error(err)
|
||||||
|
err_msg['cid'] = cid
|
||||||
|
await chan.send(err_msg)
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
# TODO: what is the right way to handle the case where the
|
# TODO: what is the right way to handle the case where the
|
||||||
|
@ -644,7 +688,7 @@ class Actor:
|
||||||
self,
|
self,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
cid: str,
|
cid: str,
|
||||||
max_buffer_size: int = 2**6,
|
msg_buffer_size: Optional[int] = None,
|
||||||
|
|
||||||
) -> Context:
|
) -> Context:
|
||||||
'''
|
'''
|
||||||
|
@ -660,10 +704,17 @@ class Actor:
|
||||||
assert actor_uid
|
assert actor_uid
|
||||||
try:
|
try:
|
||||||
ctx = self._contexts[(actor_uid, cid)]
|
ctx = self._contexts[(actor_uid, cid)]
|
||||||
|
|
||||||
|
# adjust buffer size if specified
|
||||||
|
state = ctx._send_chan._state
|
||||||
|
if msg_buffer_size and state.max_buffer_size != msg_buffer_size:
|
||||||
|
state.max_buffer_size = msg_buffer_size
|
||||||
|
|
||||||
except KeyError:
|
except KeyError:
|
||||||
send_chan: trio.MemorySendChannel
|
send_chan: trio.MemorySendChannel
|
||||||
recv_chan: trio.MemoryReceiveChannel
|
recv_chan: trio.MemoryReceiveChannel
|
||||||
send_chan, recv_chan = trio.open_memory_channel(max_buffer_size)
|
send_chan, recv_chan = trio.open_memory_channel(
|
||||||
|
msg_buffer_size or self.msg_buffer_size)
|
||||||
ctx = Context(
|
ctx = Context(
|
||||||
chan,
|
chan,
|
||||||
cid,
|
cid,
|
||||||
|
@ -679,7 +730,8 @@ class Actor:
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
ns: str,
|
ns: str,
|
||||||
func: str,
|
func: str,
|
||||||
kwargs: dict
|
kwargs: dict,
|
||||||
|
msg_buffer_size: Optional[int] = None,
|
||||||
|
|
||||||
) -> Context:
|
) -> Context:
|
||||||
'''
|
'''
|
||||||
|
@ -693,7 +745,7 @@ class Actor:
|
||||||
'''
|
'''
|
||||||
cid = str(uuid.uuid4())
|
cid = str(uuid.uuid4())
|
||||||
assert chan.uid
|
assert chan.uid
|
||||||
ctx = self.get_context(chan, cid)
|
ctx = self.get_context(chan, cid, msg_buffer_size=msg_buffer_size)
|
||||||
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
||||||
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
||||||
|
|
||||||
|
@ -743,7 +795,8 @@ class Actor:
|
||||||
if msg is None: # loop terminate sentinel
|
if msg is None: # loop terminate sentinel
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Cancelling all tasks for {chan} from {chan.uid}")
|
f"Channerl to {chan.uid} terminated?\n"
|
||||||
|
"Cancelling all associated tasks..")
|
||||||
|
|
||||||
for (channel, cid) in self._rpc_tasks.copy():
|
for (channel, cid) in self._rpc_tasks.copy():
|
||||||
if channel is chan:
|
if channel is chan:
|
||||||
|
|
Loading…
Reference in New Issue