diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 09f11b8..5e1a4ca 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -194,59 +194,33 @@ async def stream_from_peer( try: async with ( tractor.wait_for_actor(peer_name) as peer, - # peer.open_context(stream_ints) as (peer_ctx, first), - # peer_ctx.open_stream() as stream, + peer.open_context(stream_ints) as (peer_ctx, first), + peer_ctx.open_stream() as stream, ): - async with ( - peer.open_context(stream_ints) as (peer_ctx, first), - # peer_ctx.open_stream() as stream, - ): - # # try: - async with ( - peer_ctx.open_stream() as stream, - ): - - await ctx.started() - # XXX QUESTIONS & TODO: for further details around this - # in the longer run.. - # https://github.com/goodboy/tractor/issues/368 - # - should we raise `ContextCancelled` or `Cancelled` (rn - # it does latter) and should/could it be implemented - # as a general injection override for `trio` such - # that ANY next checkpoint would raise the "cancel - # error type" of choice? - # - should the `ContextCancelled` bubble from - # all `Context` and `MsgStream` apis wherein it - # prolly makes the most sense to make it - # a `trio.Cancelled` subtype? - # - what about IPC-transport specific errors, should - # they bubble from the async for and trigger - # other special cases? - # try: - # NOTE: current ctl flow: - # - stream raises `trio.EndOfChannel` and - # exits the loop - # - `.open_context()` will raise the ctxcanc - # received from the sleeper. - async for msg in stream: - assert msg is not None - print(msg) - # finally: - # await trio.sleep(0.1) - # from tractor import pause - # await pause() - - # except BaseException as berr: - # with trio.CancelScope(shield=True): - # await tractor.pause() - # raise - - # except trio.Cancelled: - # with trio.CancelScope(shield=True): - # await tractor.pause() - # raise # XXX NEVER MASK IT - # from tractor import pause - # await pause() + await ctx.started() + # XXX QUESTIONS & TODO: for further details around this + # in the longer run.. + # https://github.com/goodboy/tractor/issues/368 + # - should we raise `ContextCancelled` or `Cancelled` (rn + # it does latter) and should/could it be implemented + # as a general injection override for `trio` such + # that ANY next checkpoint would raise the "cancel + # error type" of choice? + # - should the `ContextCancelled` bubble from + # all `Context` and `MsgStream` apis wherein it + # prolly makes the most sense to make it + # a `trio.Cancelled` subtype? + # - what about IPC-transport specific errors, should + # they bubble from the async for and trigger + # other special cases? + # NOTE: current ctl flow: + # - stream raises `trio.EndOfChannel` and + # exits the loop + # - `.open_context()` will raise the ctxcanc + # received from the sleeper. + async for msg in stream: + assert msg is not None + print(msg) # NOTE: cancellation of the (sleeper) peer should always # cause a `ContextCancelled` raise in this streaming @@ -265,11 +239,10 @@ async def stream_from_peer( # we never requested cancellation assert not peer_ctx.cancel_called - # the `.open_context()` exit definitely - # caught a cancellation in the internal `Context._scope` - # since likely the runtime called `_deliver_msg()` - # after receiving the remote error from the streaming - # task. + # the `.open_context()` exit definitely caught + # a cancellation in the internal `Context._scope` since + # likely the runtime called `_deliver_msg()` after + # receiving the remote error from the streaming task. assert peer_ctx.cancelled_caught # TODO / NOTE `.canceller` won't have been set yet @@ -284,8 +257,9 @@ async def stream_from_peer( assert not ctx.canceller assert 'canceller' in peer_ctx.canceller + raise # TODO: IN THEORY we could have other cases depending on - # who cancels first, the root actor or the canceller peer. + # who cancels first, the root actor or the canceller peer?. # # 1- when the peer request is first then the `.canceller` # field should obvi be set to the 'canceller' uid, @@ -294,12 +268,12 @@ async def stream_from_peer( # `trio.Cancelled` implicitly raised # assert ctx.canceller[0] == 'root' # assert peer_ctx.canceller[0] == 'sleeper' - raise raise RuntimeError( 'peer never triggered local `ContextCancelled`?' ) + @pytest.mark.parametrize( 'error_during_ctxerr_handling', [False, True], @@ -361,6 +335,7 @@ def test_peer_canceller( ''' async def main(): async with tractor.open_nursery( + # NOTE: to halt the peer tasks on ctxc, uncomment this. # debug_mode=True ) as an: canceller: Portal = await an.start_actor( @@ -402,7 +377,6 @@ def test_peer_canceller( try: print('PRE CONTEXT RESULT') - # await tractor.pause() await sleeper_ctx.result() # should never get here @@ -410,9 +384,8 @@ def test_peer_canceller( 'Context.result() did not raise ctx-cancelled?' ) - # TODO: not sure why this isn't catching - # but maybe we need an `ExceptionGroup` and - # the whole except *errs: thinger in 3.11? + # should always raise since this root task does + # not request the sleeper cancellation ;) except ContextCancelled as ctxerr: print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}') @@ -430,9 +403,6 @@ def test_peer_canceller( # block it should be. assert not sleeper_ctx.cancelled_caught - # TODO: a test which ensures this error is - # bubbled and caught (NOT MASKED) by the - # runtime!!! if error_during_ctxerr_handling: raise RuntimeError('Simulated error during teardown') @@ -458,6 +428,7 @@ def test_peer_canceller( # - `.canceller` (uid of cancel-causing actor-task) # - `._remote_error` (any `RemoteActorError` # instance from other side of context) + # TODO: are we really planning to use this tho? # - `._cancel_msg` (any msg that caused the # cancel) @@ -482,21 +453,33 @@ def test_peer_canceller( ctx is sleeper_ctx or ctx is caller_ctx ): - assert re.canceller == canceller.channel.uid + assert ( + re.canceller + == + ctx.canceller + == + canceller.channel.uid + ) else: - assert re.canceller == root.uid - - # each context should have received - # a silently absorbed context cancellation - # from its peer actor's task. - # assert ctx.chan.uid == ctx.canceller + assert ( + re.canceller + == + ctx.canceller + == + root.uid + ) # CASE: standard teardown inside in `.open_context()` block else: assert ctxerr.canceller == sleeper_ctx.canceller - # assert ctxerr.canceller[0] == 'canceller' - # assert sleeper_ctx.canceller[0] == 'canceller' + assert ( + ctxerr.canceller[0] + == + sleeper_ctx.canceller[0] + == + 'canceller' + ) # the sleeper's remote error is the error bubbled # out of the context-stack above! @@ -509,21 +492,29 @@ def test_peer_canceller( # root doesn't cancel sleeper since it's # cancelled by its peer. - # match ctx: - # case sleeper_ctx: if ctx is sleeper_ctx: assert not ctx.cancel_called - # wait WHY? + # since sleeper_ctx.result() IS called + # above we should have (silently) + # absorbed the corresponding + # `ContextCancelled` for it and thus + # the logic inside `.cancelled_caught` + # should trigger! assert ctx.cancelled_caught elif ctx is caller_ctx: # since its context was remotely # cancelled, we never needed to - # call `Context.cancel()` bc our - # context was already remotely - # cancelled by the time we'd do it. + # call `Context.cancel()` bc it was + # done by the peer and also we never assert ctx.cancel_called + # TODO: figure out the details of + # this.. + # if you look the `._local_error` here + # is a multi of ctxc + 2 Cancelleds? + # assert not ctx.cancelled_caught + else: assert ctx.cancel_called assert not ctx.cancelled_caught @@ -551,7 +542,6 @@ def test_peer_canceller( # itself errors. assert sleeper_ctx.cancelled_caught - # await tractor.pause() raise # always to ensure teardown if error_during_ctxerr_handling: