diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 46ca575..09f11b8 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -15,6 +15,26 @@ from tractor import ( # typing ContextCancelled, ) +# XXX TODO cases: +# - [ ] peer cancelled itself - so other peers should +# get errors reflecting that the peer was itself the .canceller? + +# - [x] WE cancelled the peer and thus should not see any raised +# `ContextCancelled` as it should be reaped silently? +# => pretty sure `test_context_stream_semantics::test_caller_cancels()` +# already covers this case? + +# - [x] INTER-PEER: some arbitrary remote peer cancels via +# Portal.cancel_actor(). +# => all other connected peers should get that cancel requesting peer's +# uid in the ctx-cancelled error msg raised in all open ctxs +# with that peer. + +# - [ ] PEER-FAILS-BY-CHILD-ERROR: peer spawned a sub-actor which +# (also) spawned a failing task which was unhandled and +# propagated up to the immediate parent - the peer to the actor +# that also spawned a remote task task in that same peer-parent. + # def test_self_cancel(): # ''' @@ -29,14 +49,30 @@ from tractor import ( # typing @tractor.context async def sleep_forever( ctx: Context, + expect_ctxc: bool = False, ) -> None: ''' Sync the context, open a stream then just sleep. + Allow checking for (context) cancellation locally. + ''' - await ctx.started() - async with ctx.open_stream(): - await trio.sleep_forever() + try: + await ctx.started() + async with ctx.open_stream(): + await trio.sleep_forever() + + except BaseException as berr: + + # TODO: it'd sure be nice to be able to inject our own + # `ContextCancelled` here instead of of `trio.Cancelled` + # so that our runtime can expect it and this "user code" + # would be able to tell the diff between a generic trio + # cancel and a tractor runtime-IPC cancel. + if expect_ctxc: + assert isinstance(berr, trio.Cancelled) + + raise @tractor.context @@ -145,6 +181,7 @@ async def stream_ints( async with ctx.open_stream() as stream: for i in itertools.count(): await stream.send(i) + await trio.sleep(0.01) @tractor.context @@ -157,77 +194,111 @@ async def stream_from_peer( try: async with ( tractor.wait_for_actor(peer_name) as peer, - peer.open_context(stream_ints) as (peer_ctx, first), - peer_ctx.open_stream() as stream, + # peer.open_context(stream_ints) as (peer_ctx, first), + # peer_ctx.open_stream() as stream, ): - await ctx.started() - # XXX TODO: big set of questions for this - # - should we raise `ContextCancelled` or `Cancelled` (rn - # it does that) here?! - # - test the `ContextCancelled` OUTSIDE the - # `.open_context()` call? - try: - async for msg in stream: - print(msg) + async with ( + peer.open_context(stream_ints) as (peer_ctx, first), + # peer_ctx.open_stream() as stream, + ): + # # try: + async with ( + peer_ctx.open_stream() as stream, + ): - except trio.Cancelled: - assert not ctx.cancel_called - assert not ctx.cancelled_caught + await ctx.started() + # XXX QUESTIONS & TODO: for further details around this + # in the longer run.. + # https://github.com/goodboy/tractor/issues/368 + # - should we raise `ContextCancelled` or `Cancelled` (rn + # it does latter) and should/could it be implemented + # as a general injection override for `trio` such + # that ANY next checkpoint would raise the "cancel + # error type" of choice? + # - should the `ContextCancelled` bubble from + # all `Context` and `MsgStream` apis wherein it + # prolly makes the most sense to make it + # a `trio.Cancelled` subtype? + # - what about IPC-transport specific errors, should + # they bubble from the async for and trigger + # other special cases? + # try: + # NOTE: current ctl flow: + # - stream raises `trio.EndOfChannel` and + # exits the loop + # - `.open_context()` will raise the ctxcanc + # received from the sleeper. + async for msg in stream: + assert msg is not None + print(msg) + # finally: + # await trio.sleep(0.1) + # from tractor import pause + # await pause() - assert not peer_ctx.cancel_called - assert not peer_ctx.cancelled_caught + # except BaseException as berr: + # with trio.CancelScope(shield=True): + # await tractor.pause() + # raise - assert 'root' in ctx.cancel_called_remote - - raise # XXX MUST NEVER MASK IT!! - - with trio.CancelScope(shield=True): - await tractor.pause() - # pass - # pytest.fail( - raise RuntimeError( - 'peer never triggered local `[Context]Cancelled`?!?' - ) + # except trio.Cancelled: + # with trio.CancelScope(shield=True): + # await tractor.pause() + # raise # XXX NEVER MASK IT + # from tractor import pause + # await pause() # NOTE: cancellation of the (sleeper) peer should always # cause a `ContextCancelled` raise in this streaming # actor. except ContextCancelled as ctxerr: - assert ctxerr.canceller == 'canceller' - assert ctxerr._remote_error is ctxerr + err = ctxerr + assert peer_ctx._remote_error is ctxerr + assert peer_ctx.canceller == ctxerr.canceller - # CASE 1: we were cancelled by our parent, the root actor. - # TODO: there are other cases depending on how the root - # actor and it's caller side task are written: - # - if the root does not req us to cancel then an - # IPC-transport related error should bubble from the async - # for loop and thus cause local cancellation both here - # and in the root (since in that case this task cancels the - # context with the root, not the other way around) - assert ctx.cancel_called_remote[0] == 'root' + # caller peer should not be the cancel requester + assert not ctx.cancel_called + # XXX can never be true since `._invoke` only + # sets this AFTER the nursery block this task + # was started in, exits. + assert not ctx.cancelled_caught + + # we never requested cancellation + assert not peer_ctx.cancel_called + # the `.open_context()` exit definitely + # caught a cancellation in the internal `Context._scope` + # since likely the runtime called `_deliver_msg()` + # after receiving the remote error from the streaming + # task. + assert peer_ctx.cancelled_caught + + # TODO / NOTE `.canceller` won't have been set yet + # here because that machinery is inside + # `.open_context().__aexit__()` BUT, if we had + # a way to know immediately (from the last + # checkpoint) that cancellation was due to + # a remote, we COULD assert this here..see, + # https://github.com/goodboy/tractor/issues/368 + + # root/parent actor task should NEVER HAVE cancelled us! + assert not ctx.canceller + assert 'canceller' in peer_ctx.canceller + + # TODO: IN THEORY we could have other cases depending on + # who cancels first, the root actor or the canceller peer. + # + # 1- when the peer request is first then the `.canceller` + # field should obvi be set to the 'canceller' uid, + # + # 2-if the root DOES req cancel then we should see the same + # `trio.Cancelled` implicitly raised + # assert ctx.canceller[0] == 'root' + # assert peer_ctx.canceller[0] == 'sleeper' raise - # except BaseException as err: - - # raise - -# cases: -# - some arbitrary remote peer cancels via Portal.cancel_actor(). -# => all other connected peers should get that cancel requesting peer's -# uid in the ctx-cancelled error msg. - -# - peer spawned a sub-actor which (also) spawned a failing task -# which was unhandled and propagated up to the immediate -# parent, the peer to the actor that also spawned a remote task -# task in that same peer-parent. - -# - peer cancelled itself - so other peers should -# get errors reflecting that the peer was itself the .canceller? - -# - WE cancelled the peer and thus should not see any raised -# `ContextCancelled` as it should be reaped silently? -# => pretty sure `test_context_stream_semantics::test_caller_cancels()` -# already covers this case? + raise RuntimeError( + 'peer never triggered local `ContextCancelled`?' + ) @pytest.mark.parametrize( 'error_during_ctxerr_handling', @@ -251,8 +322,8 @@ def test_peer_canceller( line and be less indented. .actor0> ()-> .actor1> - a inter-actor task context opened (by `async with `Portal.open_context()`) - from actor0 *into* actor1. + a inter-actor task context opened (by `async with + `Portal.open_context()`) from actor0 *into* actor1. .actor0> ()<=> .actor1> a inter-actor task context opened (as above) @@ -287,11 +358,11 @@ def test_peer_canceller( 5. .canceller> ()-> .sleeper> - calls `Portal.cancel_actor()` - ''' - async def main(): - async with tractor.open_nursery() as an: + async with tractor.open_nursery( + # debug_mode=True + ) as an: canceller: Portal = await an.start_actor( 'canceller', enable_modules=[__name__], @@ -305,10 +376,13 @@ def test_peer_canceller( enable_modules=[__name__], ) + root = tractor.current_actor() + try: async with ( sleeper.open_context( sleep_forever, + expect_ctxc=True, ) as (sleeper_ctx, sent), just_caller.open_context( @@ -328,6 +402,7 @@ def test_peer_canceller( try: print('PRE CONTEXT RESULT') + # await tractor.pause() await sleeper_ctx.result() # should never get here @@ -343,8 +418,8 @@ def test_peer_canceller( # canceller and caller peers should not # have been remotely cancelled. - assert canceller_ctx.cancel_called_remote is None - assert caller_ctx.cancel_called_remote is None + assert canceller_ctx.canceller is None + assert caller_ctx.canceller is None assert ctxerr.canceller[0] == 'canceller' @@ -363,8 +438,9 @@ def test_peer_canceller( raise - # SHOULD NEVER GET HERE! - except BaseException: + # XXX SHOULD NEVER EVER GET HERE XXX + except BaseException as berr: + err = berr pytest.fail('did not rx ctx-cancelled error?') else: pytest.fail('did not rx ctx-cancelled error?') @@ -375,6 +451,19 @@ def test_peer_canceller( )as ctxerr: _err = ctxerr + # NOTE: the main state to check on `Context` is: + # - `.cancelled_caught` (maps to nursery cs) + # - `.cancel_called` (bool of whether this side + # requested) + # - `.canceller` (uid of cancel-causing actor-task) + # - `._remote_error` (any `RemoteActorError` + # instance from other side of context) + # - `._cancel_msg` (any msg that caused the + # cancel) + + # CASE: error raised during handling of + # `ContextCancelled` inside `.open_context()` + # block if error_during_ctxerr_handling: assert isinstance(ctxerr, RuntimeError) @@ -384,20 +473,30 @@ def test_peer_canceller( for ctx in ctxs: assert ctx.cancel_called + # this root actor task should have + # cancelled all opened contexts except the + # sleeper which is obvi by the "canceller" + # peer. + re = ctx._remote_error + if ( + ctx is sleeper_ctx + or ctx is caller_ctx + ): + assert re.canceller == canceller.channel.uid + + else: + assert re.canceller == root.uid + # each context should have received # a silently absorbed context cancellation # from its peer actor's task. - assert ctx.chan.uid == ctx.cancel_called_remote - - # this root actor task should have - # cancelled all opened contexts except - # the sleeper which is cancelled by its - # peer "canceller" - if ctx is not sleeper_ctx: - assert ctx._remote_error.canceller[0] == 'root' + # assert ctx.chan.uid == ctx.canceller + # CASE: standard teardown inside in `.open_context()` block else: - assert ctxerr.canceller[0] == 'canceller' + assert ctxerr.canceller == sleeper_ctx.canceller + # assert ctxerr.canceller[0] == 'canceller' + # assert sleeper_ctx.canceller[0] == 'canceller' # the sleeper's remote error is the error bubbled # out of the context-stack above! @@ -405,18 +504,35 @@ def test_peer_canceller( assert re is ctxerr for ctx in ctxs: + re: BaseException | None = ctx._remote_error + assert re + # root doesn't cancel sleeper since it's + # cancelled by its peer. + # match ctx: + # case sleeper_ctx: if ctx is sleeper_ctx: assert not ctx.cancel_called + # wait WHY? assert ctx.cancelled_caught + + elif ctx is caller_ctx: + # since its context was remotely + # cancelled, we never needed to + # call `Context.cancel()` bc our + # context was already remotely + # cancelled by the time we'd do it. + assert ctx.cancel_called + else: assert ctx.cancel_called assert not ctx.cancelled_caught - # each context should have received + # TODO: do we even need this flag? + # -> each context should have received # a silently absorbed context cancellation - # from its peer actor's task. - assert ctx.chan.uid == ctx.cancel_called_remote + # in its remote nursery scope. + # assert ctx.chan.uid == ctx.canceller # NOTE: when an inter-peer cancellation # occurred, we DO NOT expect this @@ -434,7 +550,6 @@ def test_peer_canceller( # including the case where ctx-cancel handling # itself errors. assert sleeper_ctx.cancelled_caught - assert sleeper_ctx.cancel_called_remote[0] == 'sleeper' # await tractor.pause() raise # always to ensure teardown