diff --git a/requirements-test.txt b/requirements-test.txt index 5ad6c45..1d3cfad 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -2,6 +2,6 @@ pytest pytest-trio pdbpp mypy<0.920 -trio_typing +trio_typing<0.7.0 pexpect towncrier diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 4c74a9a..ebf830e 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -5,6 +5,7 @@ Verify the we raise errors when streams are opened prior to sync-opening a ``tractor.Context`` beforehand. ''' +from contextlib import asynccontextmanager as acm from itertools import count import platform from typing import Optional @@ -466,8 +467,11 @@ async def cancel_self( try: async with ctx.open_stream(): pass - except ContextCancelled: + except tractor.ContextCancelled: + # suppress for now so we can do checkpoint tests below pass + else: + raise RuntimeError('Context didnt cancel itself?!') # check a real ``trio.Cancelled`` is raised on a checkpoint try: @@ -507,6 +511,9 @@ async def test_callee_cancels_before_started(): except tractor.ContextCancelled as ce: ce.type == trio.Cancelled + # the traceback should be informative + assert 'cancelled itself' in ce.msgdata['tb_str'] + # teardown the actor await portal.cancel_actor() @@ -601,33 +608,19 @@ def test_one_end_stream_not_opened(overrun_by): # 2 overrun cases and the no overrun case (which pushes right up to # the msg limit) - if overrunner == 'caller': + if overrunner == 'caller' or 'cance' in overrunner: with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) assert excinfo.value.type == StreamOverrun - elif 'cancel' in overrunner: - with pytest.raises(trio.MultiError) as excinfo: - trio.run(main) - - multierr = excinfo.value - - for exc in multierr.exceptions: - etype = type(exc) - if etype == tractor.RemoteActorError: - assert exc.type == StreamOverrun - else: - assert etype == tractor.ContextCancelled - elif overrunner == 'callee': with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) # TODO: embedded remote errors so that we can verify the source - # error? - # the callee delivers an error which is an overrun wrapped - # in a remote actor error. + # error? the callee delivers an error which is an overrun + # wrapped in a remote actor error. assert excinfo.value.type == tractor.RemoteActorError else: @@ -712,3 +705,92 @@ def test_stream_backpressure(): await portal.cancel_actor() trio.run(main) + + +@tractor.context +async def sleep_forever( + ctx: tractor.Context, +) -> None: + await ctx.started() + async with ctx.open_stream(): + await trio.sleep_forever() + + +@acm +async def attach_to_sleep_forever(): + ''' + Cancel a context **before** any underlying error is raised in order + to trigger a local reception of a ``ContextCancelled`` which **should not** + be re-raised in the local surrounding ``Context`` *iff* the cancel was + requested by **this** side of the context. + + ''' + async with tractor.wait_for_actor('sleeper') as p2: + async with ( + p2.open_context(sleep_forever) as (peer_ctx, first), + peer_ctx.open_stream(), + ): + try: + yield + finally: + # XXX: previously this would trigger local + # ``ContextCancelled`` to be received and raised in the + # local context overriding any local error due to + # logic inside ``_invoke()`` which checked for + # an error set on ``Context._error`` and raised it in + # under a cancellation scenario. + + # The problem is you can have a remote cancellation + # that is part of a local error and we shouldn't raise + # ``ContextCancelled`` **iff** we weren't the side of + # the context to initiate it, i.e. + # ``Context._cancel_called`` should **NOT** have been + # set. The special logic to handle this case is now + # inside ``Context._may_raise_from_remote_msg()`` XD + await peer_ctx.cancel() + + +@tractor.context +async def error_before_started( + ctx: tractor.Context, +) -> None: + ''' + This simulates exactly an original bug discovered in: + https://github.com/pikers/piker/issues/244 + + ''' + async with attach_to_sleep_forever(): + # send an unserializable type which should raise a type error + # here and **NOT BE SWALLOWED** by the surrounding acm!!?! + await ctx.started(object()) + + +def test_do_not_swallow_error_before_started_by_remote_contextcancelled(): + ''' + Verify that an error raised in a remote context which itself opens another + remote context, which it cancels, does not ovverride the original error that + caused the cancellation of the secondardy context. + + ''' + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'errorer', + enable_modules=[__name__], + ) + await n.start_actor( + 'sleeper', + enable_modules=[__name__], + ) + + async with ( + portal.open_context( + error_before_started + ) as (ctx, sent), + ): + await trio.sleep_forever() + + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) + + assert excinfo.value.type == TypeError diff --git a/tractor/_actor.py b/tractor/_actor.py index 82e7ae9..e737004 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -185,9 +185,6 @@ async def _invoke( task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) - except trio.Cancelled as err: - tb = err.__traceback__ - except trio.MultiError: # if a context error was set then likely # thei multierror was raised due to that @@ -916,8 +913,9 @@ class Actor: # ``_async_main()`` kwargs['chan'] = chan log.cancel( - f"Actor {self.uid} was remotely cancelled;" - " waiting on cancellation completion..") + f'{self.uid} was remotely cancelled by\n' + f'{chan.uid}!' + ) await _invoke( self, cid, chan, func, kwargs, is_rpc=False ) diff --git a/tractor/_portal.py b/tractor/_portal.py index 564d329..1dc6ee4 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -27,6 +27,7 @@ from typing import ( ) from functools import partial from dataclasses import dataclass +from pprint import pformat import warnings import trio @@ -85,6 +86,9 @@ def _unwrap_msg( assert msg.get('cid'), "Received internal error at portal?" raise unpack_error(msg, channel) +class MessagingError(Exception): + 'Some kind of unexpected SC messaging dialog issue' + class Portal: ''' @@ -408,8 +412,6 @@ class Portal: raise TypeError( f'{func} must be an async generator function!') - __tracebackhide__ = True - fn_mod_path, fn_name = func_deats(func) ctx = await self.actor.start_remote_task( @@ -428,14 +430,17 @@ class Portal: first = msg['started'] ctx._started_called = True - except KeyError: + except KeyError as kerr: assert msg.get('cid'), ("Received internal error at context?") if msg.get('error'): - # raise the error message - raise unpack_error(msg, self.channel) + # raise kerr from unpack_error(msg, self.channel) + raise unpack_error(msg, self.channel) from None else: - raise + raise MessagingError( + f'Context for {ctx.cid} was expecting a `started` message' + f' but received a non-error msg:\n{pformat(msg)}' + ) _err: Optional[BaseException] = None ctx._portal = self diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 05932b8..47fd08a 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -425,8 +425,17 @@ class Context: f'Remote context error for {self.chan.uid}:{self.cid}:\n' f'{msg["error"]["tb_str"]}' ) - # await ctx._maybe_error_from_remote_msg(msg) - self._error = unpack_error(msg, self.chan) + error = unpack_error(msg, self.chan) + if ( + isinstance(error, ContextCancelled) and + self._cancel_called + ): + # this is an expected cancel request response message + # and we don't need to raise it in scope since it will + # potentially override a real error + return + + self._error = error # TODO: tempted to **not** do this by-reraising in a # nursery and instead cancel a surrounding scope, detect