forked from goodboy/tractor
Merge pull request #276 from goodboy/expected_ctx_cancelled
Expected ctx cancelled should not override a source errorwin_ci_timeout
commit
cdf1f8c2f7
|
@ -2,6 +2,6 @@ pytest
|
||||||
pytest-trio
|
pytest-trio
|
||||||
pdbpp
|
pdbpp
|
||||||
mypy<0.920
|
mypy<0.920
|
||||||
trio_typing
|
trio_typing<0.7.0
|
||||||
pexpect
|
pexpect
|
||||||
towncrier
|
towncrier
|
||||||
|
|
|
@ -5,6 +5,7 @@ Verify the we raise errors when streams are opened prior to sync-opening
|
||||||
a ``tractor.Context`` beforehand.
|
a ``tractor.Context`` beforehand.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
from contextlib import asynccontextmanager as acm
|
||||||
from itertools import count
|
from itertools import count
|
||||||
import platform
|
import platform
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
@ -466,8 +467,11 @@ async def cancel_self(
|
||||||
try:
|
try:
|
||||||
async with ctx.open_stream():
|
async with ctx.open_stream():
|
||||||
pass
|
pass
|
||||||
except ContextCancelled:
|
except tractor.ContextCancelled:
|
||||||
|
# suppress for now so we can do checkpoint tests below
|
||||||
pass
|
pass
|
||||||
|
else:
|
||||||
|
raise RuntimeError('Context didnt cancel itself?!')
|
||||||
|
|
||||||
# check a real ``trio.Cancelled`` is raised on a checkpoint
|
# check a real ``trio.Cancelled`` is raised on a checkpoint
|
||||||
try:
|
try:
|
||||||
|
@ -507,6 +511,9 @@ async def test_callee_cancels_before_started():
|
||||||
except tractor.ContextCancelled as ce:
|
except tractor.ContextCancelled as ce:
|
||||||
ce.type == trio.Cancelled
|
ce.type == trio.Cancelled
|
||||||
|
|
||||||
|
# the traceback should be informative
|
||||||
|
assert 'cancelled itself' in ce.msgdata['tb_str']
|
||||||
|
|
||||||
# teardown the actor
|
# teardown the actor
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
@ -601,33 +608,19 @@ def test_one_end_stream_not_opened(overrun_by):
|
||||||
|
|
||||||
# 2 overrun cases and the no overrun case (which pushes right up to
|
# 2 overrun cases and the no overrun case (which pushes right up to
|
||||||
# the msg limit)
|
# the msg limit)
|
||||||
if overrunner == 'caller':
|
if overrunner == 'caller' or 'cance' in overrunner:
|
||||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
assert excinfo.value.type == StreamOverrun
|
assert excinfo.value.type == StreamOverrun
|
||||||
|
|
||||||
elif 'cancel' in overrunner:
|
|
||||||
with pytest.raises(trio.MultiError) as excinfo:
|
|
||||||
trio.run(main)
|
|
||||||
|
|
||||||
multierr = excinfo.value
|
|
||||||
|
|
||||||
for exc in multierr.exceptions:
|
|
||||||
etype = type(exc)
|
|
||||||
if etype == tractor.RemoteActorError:
|
|
||||||
assert exc.type == StreamOverrun
|
|
||||||
else:
|
|
||||||
assert etype == tractor.ContextCancelled
|
|
||||||
|
|
||||||
elif overrunner == 'callee':
|
elif overrunner == 'callee':
|
||||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# TODO: embedded remote errors so that we can verify the source
|
# TODO: embedded remote errors so that we can verify the source
|
||||||
# error?
|
# error? the callee delivers an error which is an overrun
|
||||||
# the callee delivers an error which is an overrun wrapped
|
# wrapped in a remote actor error.
|
||||||
# in a remote actor error.
|
|
||||||
assert excinfo.value.type == tractor.RemoteActorError
|
assert excinfo.value.type == tractor.RemoteActorError
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
@ -712,3 +705,92 @@ def test_stream_backpressure():
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def sleep_forever(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
) -> None:
|
||||||
|
await ctx.started()
|
||||||
|
async with ctx.open_stream():
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def attach_to_sleep_forever():
|
||||||
|
'''
|
||||||
|
Cancel a context **before** any underlying error is raised in order
|
||||||
|
to trigger a local reception of a ``ContextCancelled`` which **should not**
|
||||||
|
be re-raised in the local surrounding ``Context`` *iff* the cancel was
|
||||||
|
requested by **this** side of the context.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with tractor.wait_for_actor('sleeper') as p2:
|
||||||
|
async with (
|
||||||
|
p2.open_context(sleep_forever) as (peer_ctx, first),
|
||||||
|
peer_ctx.open_stream(),
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
# XXX: previously this would trigger local
|
||||||
|
# ``ContextCancelled`` to be received and raised in the
|
||||||
|
# local context overriding any local error due to
|
||||||
|
# logic inside ``_invoke()`` which checked for
|
||||||
|
# an error set on ``Context._error`` and raised it in
|
||||||
|
# under a cancellation scenario.
|
||||||
|
|
||||||
|
# The problem is you can have a remote cancellation
|
||||||
|
# that is part of a local error and we shouldn't raise
|
||||||
|
# ``ContextCancelled`` **iff** we weren't the side of
|
||||||
|
# the context to initiate it, i.e.
|
||||||
|
# ``Context._cancel_called`` should **NOT** have been
|
||||||
|
# set. The special logic to handle this case is now
|
||||||
|
# inside ``Context._may_raise_from_remote_msg()`` XD
|
||||||
|
await peer_ctx.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def error_before_started(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
This simulates exactly an original bug discovered in:
|
||||||
|
https://github.com/pikers/piker/issues/244
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with attach_to_sleep_forever():
|
||||||
|
# send an unserializable type which should raise a type error
|
||||||
|
# here and **NOT BE SWALLOWED** by the surrounding acm!!?!
|
||||||
|
await ctx.started(object())
|
||||||
|
|
||||||
|
|
||||||
|
def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
|
||||||
|
'''
|
||||||
|
Verify that an error raised in a remote context which itself opens another
|
||||||
|
remote context, which it cancels, does not ovverride the original error that
|
||||||
|
caused the cancellation of the secondardy context.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
portal = await n.start_actor(
|
||||||
|
'errorer',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
await n.start_actor(
|
||||||
|
'sleeper',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
portal.open_context(
|
||||||
|
error_before_started
|
||||||
|
) as (ctx, sent),
|
||||||
|
):
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
assert excinfo.value.type == TypeError
|
||||||
|
|
|
@ -185,9 +185,6 @@ async def _invoke(
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
await chan.send({'return': await coro, 'cid': cid})
|
await chan.send({'return': await coro, 'cid': cid})
|
||||||
|
|
||||||
except trio.Cancelled as err:
|
|
||||||
tb = err.__traceback__
|
|
||||||
|
|
||||||
except trio.MultiError:
|
except trio.MultiError:
|
||||||
# if a context error was set then likely
|
# if a context error was set then likely
|
||||||
# thei multierror was raised due to that
|
# thei multierror was raised due to that
|
||||||
|
@ -916,8 +913,9 @@ class Actor:
|
||||||
# ``_async_main()``
|
# ``_async_main()``
|
||||||
kwargs['chan'] = chan
|
kwargs['chan'] = chan
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Actor {self.uid} was remotely cancelled;"
|
f'{self.uid} was remotely cancelled by\n'
|
||||||
" waiting on cancellation completion..")
|
f'{chan.uid}!'
|
||||||
|
)
|
||||||
await _invoke(
|
await _invoke(
|
||||||
self, cid, chan, func, kwargs, is_rpc=False
|
self, cid, chan, func, kwargs, is_rpc=False
|
||||||
)
|
)
|
||||||
|
|
|
@ -27,6 +27,7 @@ from typing import (
|
||||||
)
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from pprint import pformat
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
@ -85,6 +86,9 @@ def _unwrap_msg(
|
||||||
assert msg.get('cid'), "Received internal error at portal?"
|
assert msg.get('cid'), "Received internal error at portal?"
|
||||||
raise unpack_error(msg, channel)
|
raise unpack_error(msg, channel)
|
||||||
|
|
||||||
|
class MessagingError(Exception):
|
||||||
|
'Some kind of unexpected SC messaging dialog issue'
|
||||||
|
|
||||||
|
|
||||||
class Portal:
|
class Portal:
|
||||||
'''
|
'''
|
||||||
|
@ -408,8 +412,6 @@ class Portal:
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
f'{func} must be an async generator function!')
|
f'{func} must be an async generator function!')
|
||||||
|
|
||||||
__tracebackhide__ = True
|
|
||||||
|
|
||||||
fn_mod_path, fn_name = func_deats(func)
|
fn_mod_path, fn_name = func_deats(func)
|
||||||
|
|
||||||
ctx = await self.actor.start_remote_task(
|
ctx = await self.actor.start_remote_task(
|
||||||
|
@ -428,14 +430,17 @@ class Portal:
|
||||||
first = msg['started']
|
first = msg['started']
|
||||||
ctx._started_called = True
|
ctx._started_called = True
|
||||||
|
|
||||||
except KeyError:
|
except KeyError as kerr:
|
||||||
assert msg.get('cid'), ("Received internal error at context?")
|
assert msg.get('cid'), ("Received internal error at context?")
|
||||||
|
|
||||||
if msg.get('error'):
|
if msg.get('error'):
|
||||||
# raise the error message
|
# raise kerr from unpack_error(msg, self.channel)
|
||||||
raise unpack_error(msg, self.channel)
|
raise unpack_error(msg, self.channel) from None
|
||||||
else:
|
else:
|
||||||
raise
|
raise MessagingError(
|
||||||
|
f'Context for {ctx.cid} was expecting a `started` message'
|
||||||
|
f' but received a non-error msg:\n{pformat(msg)}'
|
||||||
|
)
|
||||||
|
|
||||||
_err: Optional[BaseException] = None
|
_err: Optional[BaseException] = None
|
||||||
ctx._portal = self
|
ctx._portal = self
|
||||||
|
|
|
@ -425,8 +425,17 @@ class Context:
|
||||||
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
f'Remote context error for {self.chan.uid}:{self.cid}:\n'
|
||||||
f'{msg["error"]["tb_str"]}'
|
f'{msg["error"]["tb_str"]}'
|
||||||
)
|
)
|
||||||
# await ctx._maybe_error_from_remote_msg(msg)
|
error = unpack_error(msg, self.chan)
|
||||||
self._error = unpack_error(msg, self.chan)
|
if (
|
||||||
|
isinstance(error, ContextCancelled) and
|
||||||
|
self._cancel_called
|
||||||
|
):
|
||||||
|
# this is an expected cancel request response message
|
||||||
|
# and we don't need to raise it in scope since it will
|
||||||
|
# potentially override a real error
|
||||||
|
return
|
||||||
|
|
||||||
|
self._error = error
|
||||||
|
|
||||||
# TODO: tempted to **not** do this by-reraising in a
|
# TODO: tempted to **not** do this by-reraising in a
|
||||||
# nursery and instead cancel a surrounding scope, detect
|
# nursery and instead cancel a surrounding scope, detect
|
||||||
|
|
Loading…
Reference in New Issue