forked from goodboy/tractor
Raise stream overruns on one side never opened
A context stream overrun should normally never take place since if a stream is opened (via ``Context.open_stream()``) backpressure is applied on the message buffer (unless explicitly disabled by the ``backpressure=False`` flag) such that an overrun on the receiving task should result in blocking the (remote) sender task (eventually depending on the underlying ``MsgStream`` transport). Here we add a special error message that reports if one side never opened a stream and let's the user know in the overrun error message that they may be trying to push messages to a task that isn't ready to receive them. Further fixes / details: - pop any `Context` at the end of any `_invoke()` task that creates one and registers with the runtime. - ignore but warn about messages received for a context that either no longer exists or is unknown (guarding against crashes by malicious packets in the latter case)stricter_context_starting
parent
b826ec8103
commit
318027ebd1
|
@ -171,19 +171,17 @@ async def _invoke(
|
||||||
|
|
||||||
except trio.Cancelled as err:
|
except trio.Cancelled as err:
|
||||||
tb = err.__traceback__
|
tb = err.__traceback__
|
||||||
if ctx._error is not None:
|
|
||||||
tb = ctx._error.__traceback__
|
|
||||||
raise ctx._error
|
|
||||||
|
|
||||||
except trio.MultiError as err:
|
except trio.MultiError:
|
||||||
|
# if a context error was set then likely
|
||||||
|
# thei multierror was raised due to that
|
||||||
if ctx._error is not None:
|
if ctx._error is not None:
|
||||||
tb = ctx._error.__traceback__
|
raise ctx._error from None
|
||||||
raise ctx._error from err
|
|
||||||
else:
|
raise
|
||||||
raise
|
|
||||||
|
|
||||||
assert cs
|
assert cs
|
||||||
if cs.cancelled_caught or ctx._error:
|
if cs.cancelled_caught:
|
||||||
|
|
||||||
# TODO: pack in ``trio.Cancelled.__traceback__`` here
|
# TODO: pack in ``trio.Cancelled.__traceback__`` here
|
||||||
# so they can be unwrapped and displayed on the caller
|
# so they can be unwrapped and displayed on the caller
|
||||||
|
@ -257,6 +255,11 @@ async def _invoke(
|
||||||
task_status.started(err)
|
task_status.started(err)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
assert chan.uid
|
||||||
|
ctx = actor._contexts.pop((chan.uid, cid))
|
||||||
|
if ctx:
|
||||||
|
log.cancel(f'{ctx} was terminated')
|
||||||
|
|
||||||
# RPC task bookeeping
|
# RPC task bookeeping
|
||||||
try:
|
try:
|
||||||
scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))
|
scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))
|
||||||
|
@ -594,6 +597,10 @@ class Actor:
|
||||||
log.runtime(f"No more channels for {chan.uid}")
|
log.runtime(f"No more channels for {chan.uid}")
|
||||||
self._peers.pop(chan.uid, None)
|
self._peers.pop(chan.uid, None)
|
||||||
|
|
||||||
|
# for (uid, cid) in self._contexts.copy():
|
||||||
|
# if chan.uid == uid:
|
||||||
|
# self._contexts.pop((uid, cid))
|
||||||
|
|
||||||
log.runtime(f"Peers is {self._peers}")
|
log.runtime(f"Peers is {self._peers}")
|
||||||
|
|
||||||
if not self._peers: # no more channels connected
|
if not self._peers: # no more channels connected
|
||||||
|
@ -629,12 +636,20 @@ class Actor:
|
||||||
Push an RPC result to the local consumer's queue.
|
Push an RPC result to the local consumer's queue.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
assert chan.uid, f"`chan.uid` can't be {chan.uid}"
|
uid = chan.uid
|
||||||
ctx = self._contexts[(chan.uid, cid)]
|
assert uid, f"`chan.uid` can't be {uid}"
|
||||||
|
try:
|
||||||
|
ctx = self._contexts[(uid, cid)]
|
||||||
|
except KeyError:
|
||||||
|
log.warning(
|
||||||
|
f'Ignoring {msg} for unknwon context with {uid}')
|
||||||
|
return
|
||||||
|
|
||||||
send_chan = ctx._send_chan
|
send_chan = ctx._send_chan
|
||||||
assert send_chan
|
assert send_chan
|
||||||
|
|
||||||
if msg.get('error'):
|
error = msg.get('error')
|
||||||
|
if error:
|
||||||
# If this is an error message from a context opened by
|
# If this is an error message from a context opened by
|
||||||
# ``Portal.open_context()`` we want to interrupt any ongoing
|
# ``Portal.open_context()`` we want to interrupt any ongoing
|
||||||
# (child) tasks within that context to be notified of the remote
|
# (child) tasks within that context to be notified of the remote
|
||||||
|
@ -650,7 +665,7 @@ class Actor:
|
||||||
# (currently) that other portal APIs (``Portal.run()``,
|
# (currently) that other portal APIs (``Portal.run()``,
|
||||||
# ``.run_in_actor()``) do their own error checking at the point
|
# ``.run_in_actor()``) do their own error checking at the point
|
||||||
# of the call and result processing.
|
# of the call and result processing.
|
||||||
log.warning(f'Remote context for {chan.uid}:{cid} errored')
|
log.warning(f'Remote context for {chan.uid}:{cid} errored {msg}')
|
||||||
ctx._maybe_error_from_remote_msg(msg)
|
ctx._maybe_error_from_remote_msg(msg)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -661,14 +676,36 @@ class Actor:
|
||||||
try:
|
try:
|
||||||
send_chan.send_nowait(msg)
|
send_chan.send_nowait(msg)
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
log.warning(f'Caller task {cid} was overrun!?')
|
|
||||||
|
# XXX: do we need this?
|
||||||
|
# if we're trying to push an error but we're in
|
||||||
|
# an overrun state we'll just get stuck sending
|
||||||
|
# the error that was sent to us back to it's sender
|
||||||
|
# instead of it actually being raises in the target
|
||||||
|
# task..
|
||||||
|
# if error:
|
||||||
|
# raise unpack_error(msg, chan) from None
|
||||||
|
|
||||||
|
uid = chan.uid
|
||||||
|
|
||||||
|
lines = [
|
||||||
|
'Task context stream was overrun',
|
||||||
|
f'local task: {cid} @ {self.uid}',
|
||||||
|
f'remote sender: {chan.uid}',
|
||||||
|
]
|
||||||
|
if not ctx._stream_opened:
|
||||||
|
lines.insert(
|
||||||
|
1,
|
||||||
|
f'\n*** No stream open on {self.uid[0]} side! ***\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
text = '\n'.join(lines)
|
||||||
|
log.warning(text)
|
||||||
if ctx._backpressure:
|
if ctx._backpressure:
|
||||||
await send_chan.send(msg)
|
await send_chan.send(msg)
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
raise StreamOverrun(
|
raise StreamOverrun(text) from None
|
||||||
f'Context stream {cid} for {chan.uid} was overrun!'
|
|
||||||
)
|
|
||||||
except StreamOverrun as err:
|
except StreamOverrun as err:
|
||||||
err_msg = pack_error(err)
|
err_msg = pack_error(err)
|
||||||
err_msg['cid'] = cid
|
err_msg['cid'] = cid
|
||||||
|
|
Loading…
Reference in New Issue