diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 1ead617..082c5e6 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -10,6 +10,9 @@ import pytest import trio import tractor from tractor import ( # typing + Actor, + current_actor, + open_nursery, Portal, Context, ContextCancelled, @@ -238,19 +241,23 @@ async def stream_from_peer( # caller peer should not be the cancel requester assert not ctx.cancel_called + assert not ctx.cancel_acked - # XXX can never be true since `._invoke` only + # XXX can NEVER BE TRUE since `._invoke` only # sets this AFTER the nursery block this task # was started in, exits. - assert not ctx.cancelled_caught + assert not ctx._scope.cancelled_caught - # we never requested cancellation + # we never requested cancellation, it was the 'canceller' + # peer. assert not peer_ctx.cancel_called + assert not peer_ctx.cancel_acked + # the `.open_context()` exit definitely caught # a cancellation in the internal `Context._scope` since # likely the runtime called `_deliver_msg()` after # receiving the remote error from the streaming task. - assert peer_ctx.cancelled_caught + assert not peer_ctx._scope.cancelled_caught # TODO / NOTE `.canceller` won't have been set yet # here because that machinery is inside @@ -259,6 +266,8 @@ async def stream_from_peer( # checkpoint) that cancellation was due to # a remote, we COULD assert this here..see, # https://github.com/goodboy/tractor/issues/368 + # + # assert 'canceller' in ctx.canceller # root/parent actor task should NEVER HAVE cancelled us! assert not ctx.canceller @@ -356,8 +365,7 @@ def test_peer_canceller( 'just_caller', # but i just met her? enable_modules=[__name__], ) - - root = tractor.current_actor() + root: Actor = current_actor() try: async with ( @@ -395,8 +403,8 @@ def test_peer_canceller( # not request the sleeper cancellation ;) except ContextCancelled as ctxerr: print( - 'CAUGHT REMOTE CONTEXT CANCEL FOM\n' - f'{ctxerr}' + 'CAUGHT REMOTE CONTEXT CANCEL\n\n' + f'{ctxerr}\n' ) # canceller and caller peers should not @@ -404,6 +412,9 @@ def test_peer_canceller( assert canceller_ctx.canceller is None assert caller_ctx.canceller is None + # we were not the actor, our peer was + assert not sleeper_ctx.cancel_acked + assert ctxerr.canceller[0] == 'canceller' # XXX NOTE XXX: since THIS `ContextCancelled` @@ -411,11 +422,13 @@ def test_peer_canceller( # `sleeper.open_context().__aexit__()` this # value is not yet set, however outside this # block it should be. - assert not sleeper_ctx.cancelled_caught + assert not sleeper_ctx._scope.cancelled_caught + # CASE_1: error-during-ctxc-handling, if error_during_ctxerr_handling: raise RuntimeError('Simulated error during teardown') + # CASE_2: standard teardown inside in `.open_context()` block raise # XXX SHOULD NEVER EVER GET HERE XXX @@ -436,7 +449,6 @@ def test_peer_canceller( else: pytest.fail( 'did not rx ctxc ?!?\n\n' - f'{ctxs}\n' ) @@ -447,21 +459,48 @@ def test_peer_canceller( _loc_err = loc_err # NOTE: the main state to check on `Context` is: - # - `.cancelled_caught` (maps to nursery cs) # - `.cancel_called` (bool of whether this side # requested) + # - `.cancel_acked` (bool of whether a ctxc + # response was received due to cancel req). + # - `.maybe_error` (highest prio error to raise + # locally) + # - `.outcome` (final error or result value) # - `.canceller` (uid of cancel-causing actor-task) # - `._remote_error` (any `RemoteActorError` # instance from other side of context) + # - `._local_error` (any error caught inside the + # `.open_context()` block). + # + # XXX: Deprecated and internal only + # - `.cancelled_caught` (maps to nursery cs) + # - now just use `._scope.cancelled_caught` + # since it maps to the internal (maps to nursery cs) + # # TODO: are we really planning to use this tho? # - `._cancel_msg` (any msg that caused the # cancel) - # CASE: error raised during handling of - # `ContextCancelled` inside `.open_context()` - # block + # CASE_1: error-during-ctxc-handling, + # - far end cancels due to peer 'canceller', + # - `ContextCancelled` relayed to this scope, + # - inside `.open_context()` ctxc is caught and + # a rte raised instead + # + # => block should raise the rte but all peers + # should be cancelled by US. + # if error_during_ctxerr_handling: assert isinstance(loc_err, RuntimeError) + print(f'_loc_err: {_loc_err}\n') + # assert sleeper_ctx._local_error is _loc_err + # assert sleeper_ctx._local_error is _loc_err + assert not ( + loc_err + is sleeper_ctx.maybe_error + is sleeper_ctx.outcome + is sleeper_ctx._remote_error + ) # NOTE: this root actor task should have # called `Context.cancel()` on the @@ -495,7 +534,25 @@ def test_peer_canceller( root.uid ) - # CASE: standard teardown inside in `.open_context()` block + # since the sleeper errors while handling a + # peer-cancelled (by ctxc) scenario, we expect + # that the `.open_context()` block DOES call + # `.cancel() (despite in this test case it + # being unecessary). + assert ( + sleeper_ctx.cancel_called + and + not sleeper_ctx.cancel_acked + ) + + # CASE_2: standard teardown inside in `.open_context()` block + # - far end cancels due to peer 'canceller', + # - `ContextCancelled` relayed to this scope and + # raised locally without any raise-during-handle, + # + # => inside `.open_context()` ctxc is raised and + # propagated + # else: assert isinstance(loc_err, ContextCancelled) assert loc_err.canceller == sleeper_ctx.canceller @@ -509,24 +566,42 @@ def test_peer_canceller( # the sleeper's remote error is the error bubbled # out of the context-stack above! - re = sleeper_ctx._remote_error - assert re is loc_err + re = sleeper_ctx.outcome + assert ( + re is loc_err + is sleeper_ctx.maybe_error + is sleeper_ctx._remote_error + ) for ctx in ctxs: - re: BaseException | None = ctx._remote_error - assert re + re: BaseException|None = ctx._remote_error + re: BaseException|None = ctx.outcome + assert ( + re and + ( + re is ctx.maybe_error + is ctx._remote_error + ) + ) + le: trio.MultiError = ctx._local_error + assert ( + le + and ctx._local_error + ) # root doesn't cancel sleeper since it's # cancelled by its peer. if ctx is sleeper_ctx: assert not ctx.cancel_called + assert not ctx.cancel_acked + # since sleeper_ctx.result() IS called # above we should have (silently) # absorbed the corresponding # `ContextCancelled` for it and thus # the logic inside `.cancelled_caught` # should trigger! - assert ctx.cancelled_caught + assert ctx._scope.cancelled_caught elif ctx is caller_ctx: # since its context was remotely @@ -535,15 +610,33 @@ def test_peer_canceller( # done by the peer and also we never assert ctx.cancel_called - # TODO: figure out the details of - # this.. + # TODO: figure out the details of this..? # if you look the `._local_error` here # is a multi of ctxc + 2 Cancelleds? # assert not ctx.cancelled_caught + elif ctx is canceller_ctx: + + # XXX NOTE XXX: ONLY the canceller + # will get a self-cancelled outcome + # whilst everyone else gets + # a peer-caused cancellation! + # + # TODO: really we should avoid calling + # .cancel() whenever an interpeer + # cancel takes place since each + # reception of a ctxc + assert ( + ctx.cancel_called + and ctx.cancel_acked + ) + assert not ctx._scope.cancelled_caught + else: - assert ctx.cancel_called - assert not ctx.cancelled_caught + pytest.fail( + 'Uhh wut ctx is this?\n' + f'{ctx}\n' + ) # TODO: do we even need this flag? # -> each context should have received @@ -559,14 +652,24 @@ def test_peer_canceller( # `Context.cancel()` SHOULD NOT have been # called inside # `Portal.open_context().__aexit__()`. - assert not sleeper_ctx.cancel_called + assert not ( + sleeper_ctx.cancel_called + or + sleeper_ctx.cancel_acked + ) # XXX NOTE XXX: and see matching comment above but, - # this flag is set only AFTER the `.open_context()` - # has exited and should be set in both outcomes - # including the case where ctx-cancel handling - # itself errors. - assert sleeper_ctx.cancelled_caught + # the `._scope` is only set by `trio` AFTER the + # `.open_context()` block has exited and should be + # set in both outcomes including the case where + # ctx-cancel handling itself errors. + assert sleeper_ctx._scope.cancelled_caught + assert _loc_err is sleeper_ctx._local_error + assert ( + sleeper_ctx.outcome + is sleeper_ctx.maybe_error + is sleeper_ctx._remote_error + ) raise # always to ensure teardown @@ -582,12 +685,315 @@ def test_peer_canceller( assert excinfo.value.canceller[0] == 'canceller' -def test_client_tree_spawns_and_cancels_service_subactor(): - ... -# TODO: test for the modden `mod wks open piker` bug! -# -> start actor-tree (server) that offers sub-actor spawns via -# context API -# -> start another full actor-tree (client) which requests to the first to -# spawn over its `@context` ep / api. -# -> client actor cancels the context and should exit gracefully -# and the server's spawned child should cancel and terminate! +@tractor.context +async def basic_echo_server( + ctx: Context, + peer_name: str = 'stepbro', + +) -> None: + ''' + Just the simplest `MsgStream` echo server which resays what + you told it but with its uid in front ;) + + ''' + actor: Actor = tractor.current_actor() + uid: tuple = actor.uid + await ctx.started(uid) + async with ctx.open_stream() as ipc: + async for msg in ipc: + + # repack msg pair with our uid + # as first element. + ( + client_uid, + i, + ) = msg + resp: tuple = ( + uid, + i, + ) + # OOF! looks like my runtime-error is causing a lockup + # assert 0 + await ipc.send(resp) + + +@tractor.context +async def serve_subactors( + ctx: Context, + peer_name: str, + +) -> None: + async with open_nursery() as an: + await ctx.started(peer_name) + async with ctx.open_stream() as reqs: + async for msg in reqs: + peer_name: str = msg + peer: Portal = await an.start_actor( + name=peer_name, + enable_modules=[__name__], + ) + print( + 'Spawning new subactor\n' + f'{peer_name}\n' + f'|_{peer}\n' + ) + await reqs.send(( + peer.chan.uid, + peer.chan.raddr, + )) + + print('Spawner exiting spawn serve loop!') + + +@tractor.context +async def client_req_subactor( + ctx: Context, + peer_name: str, + + # used to simulate a user causing an error to be raised + # directly in thread (like a KBI) to better replicate the + # case where a `modden` CLI client would hang afer requesting + # a `Context.cancel()` to `bigd`'s wks spawner. + reraise_on_cancel: str|None = None, + +) -> None: + # TODO: other cases to do with sub lifetimes: + # -[ ] test that we can have the server spawn a sub + # that lives longer then ctx with this client. + # -[ ] test that + + # open ctx with peer spawn server and ask it to spawn a little + # bro which we'll then connect and stream with. + async with ( + tractor.find_actor( + name='spawn_server', + raise_on_none=True, + + # TODO: we should be isolating this from other runs! + # => ideally so we can eventually use something like + # `pytest-xdist` Bo + # registry_addrs=bigd._reg_addrs, + ) as spawner, + + spawner.open_context( + serve_subactors, + peer_name=peer_name, + ) as (spawner_ctx, first), + ): + assert first == peer_name + await ctx.started( + 'yup i had brudder', + ) + + async with spawner_ctx.open_stream() as reqs: + + # send single spawn request to the server + await reqs.send(peer_name) + with trio.fail_after(3): + ( + sub_uid, + sub_raddr, + ) = await reqs.receive() + + + await tell_little_bro( + actor_name=sub_uid[0], + caller='client', + ) + + # TODO: test different scope-layers of + # cancellation? + # with trio.CancelScope() as cs: + try: + await trio.sleep_forever() + + # TODO: would be super nice to have a special injected + # cancel type here (maybe just our ctxc) but using + # some native mechanism in `trio` :p + except ( + trio.Cancelled + ) as err: + _err = err + if reraise_on_cancel: + errtype = globals()['__builtins__'][reraise_on_cancel] + assert errtype + to_reraise: BaseException = errtype() + print(f'client re-raising on cancel: {repr(to_reraise)}') + raise err + + raise + + # if cs.cancelled_caught: + # print('client handling expected KBI!') + # await ctx. + # await trio.sleep( + # await tractor.pause() + # await spawner_ctx.cancel() + + # cancel spawned sub-actor directly? + # await sub_ctx.cancel() + + # maybe cancel runtime? + # await sub.cancel_actor() + + +async def tell_little_bro( + actor_name: str, + caller: str = '' +): + # contact target actor, do a stream dialog. + async with ( + tractor.wait_for_actor( + name=actor_name + ) as lb, + lb.open_context( + basic_echo_server, + ) as (sub_ctx, first), + sub_ctx.open_stream( + basic_echo_server, + ) as echo_ipc, + ): + actor: Actor = current_actor() + uid: tuple = actor.uid + for i in range(100): + msg: tuple = ( + uid, + i, + ) + await echo_ipc.send(msg) + resp = await echo_ipc.receive() + print( + f'{caller} => {actor_name}: {msg}\n' + f'{caller} <= {actor_name}: {resp}\n' + ) + ( + sub_uid, + _i, + ) = resp + assert sub_uid != uid + assert _i == i + + +@pytest.mark.parametrize( + 'raise_client_error', + [None, 'KeyboardInterrupt'], +) +def test_peer_spawns_and_cancels_service_subactor( + debug_mode: bool, + raise_client_error: str, +): + # NOTE: this tests for the modden `mod wks open piker` bug + # discovered as part of implementing workspace ctx + # open-.pause()-ctx.cancel() as part of the CLI.. + + # -> start actor-tree (server) that offers sub-actor spawns via + # context API + # -> start another full actor-tree (client) which requests to the first to + # spawn over its `@context` ep / api. + # -> client actor cancels the context and should exit gracefully + # and the server's spawned child should cancel and terminate! + peer_name: str = 'little_bro' + + async def main(): + async with tractor.open_nursery( + # NOTE: to halt the peer tasks on ctxc, uncomment this. + debug_mode=debug_mode, + ) as an: + server: Portal = await an.start_actor( + (server_name := 'spawn_server'), + enable_modules=[__name__], + ) + print(f'Spawned `{server_name}`') + + client: Portal = await an.start_actor( + client_name := 'client', + enable_modules=[__name__], + ) + print(f'Spawned `{client_name}`') + + try: + async with ( + server.open_context( + serve_subactors, + peer_name=peer_name, + ) as (spawn_ctx, first), + + client.open_context( + client_req_subactor, + peer_name=peer_name, + reraise_on_cancel=raise_client_error, + ) as (client_ctx, client_says), + ): + print( + f'Server says: {first}\n' + f'Client says: {client_says}\n' + ) + + # attach to client-requested-to-spawn + # (grandchild of this root actor) "little_bro" + # and ensure we can also use it as an echo + # server. + async with tractor.wait_for_actor( + name=peer_name, + ) as sub: + assert sub + + print( + 'Sub-spawn came online\n' + f'portal: {sub}\n' + f'.uid: {sub.actor.uid}\n' + f'chan.raddr: {sub.chan.raddr}\n' + ) + await tell_little_bro( + actor_name=peer_name, + caller='root', + ) + + # signal client to raise a KBI + await client_ctx.cancel() + print('root cancelled client, checking that sub-spawn is down') + + async with tractor.find_actor( + name=peer_name, + ) as sub: + assert not sub + + print('root cancelling server/client sub-actors') + + # await tractor.pause() + res = await client_ctx.result(hide_tb=False) + assert isinstance(res, ContextCancelled) + assert client_ctx.cancel_acked + assert res.canceller == current_actor().uid + + await spawn_ctx.cancel() + # await server.cancel_actor() + + # since we called `.cancel_actor()`, `.cancel_ack` + # will not be set on the ctx bc `ctx.cancel()` was not + # called directly fot this confext. + except ContextCancelled as ctxc: + print('caught ctxc from contexts!') + assert ctxc.canceller == current_actor().uid + assert ctxc is spawn_ctx.outcome + assert ctxc is spawn_ctx.maybe_error + raise + + # assert spawn_ctx.cancel_acked + assert spawn_ctx.cancel_acked + assert client_ctx.cancel_acked + + await client.cancel_actor() + await server.cancel_actor() + + # WOA WOA WOA! we need this to close..!!!?? + # that's super bad XD + + # TODO: why isn't this working!?!? + # we're now outside the `.open_context()` block so + # the internal `Context._scope: CancelScope` should be + # gracefully "closed" ;) + + # assert spawn_ctx.cancelled_caught + + trio.run(main)