From 3ed309f019536104e351111a4576c73709b669b5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 29 Feb 2024 14:21:45 -0500 Subject: [PATCH] Add test for `modden` sub-spawner-server hangs on cancel As per a lot of the recent refinements to `Context` cancellation, add a new test case to replicate the original hang-on-cancel found with `modden` when using a client actor to spawn a subactor in some other tree where despite `Context.cancel()` being called the requesting client would hang on the opened context with the server. The specific scenario added here is to have, - root actor spawns 2 children: a client and a spawn server. - the spawn server opens with a spawn-request serve loop and begins to wait for the client. - client spawns and connects to the sibling spawn server, requests to spawn a sub-actor, the "little bro", connects to it then does some echo streaming, cancels the request with it's sibling (the spawn server) which should in turn cancel the root's-grandchild and result in a cancel-ack back to the client's `.open_context()`. - root ensures that it can also connect to the grandchild (little bro), do the same echo streaming, then ensure everything tears down correctly after cancelling all the children. More refinements to come here obvi in the specific cancellation semantics and possibly causes. Also tweaks the other tests in suite to use the new `Context` properties recently introduced and similarly updated in the previous patch to the ctx-semantics suite. --- tests/test_inter_peer_cancellation.py | 484 +++++++++++++++++++++++--- 1 file changed, 445 insertions(+), 39 deletions(-) diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 1ead617..082c5e6 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -10,6 +10,9 @@ import pytest import trio import tractor from tractor import ( # typing + Actor, + current_actor, + open_nursery, Portal, Context, ContextCancelled, @@ -238,19 +241,23 @@ async def stream_from_peer( # caller peer should not be the cancel requester assert not ctx.cancel_called + assert not ctx.cancel_acked - # XXX can never be true since `._invoke` only + # XXX can NEVER BE TRUE since `._invoke` only # sets this AFTER the nursery block this task # was started in, exits. - assert not ctx.cancelled_caught + assert not ctx._scope.cancelled_caught - # we never requested cancellation + # we never requested cancellation, it was the 'canceller' + # peer. assert not peer_ctx.cancel_called + assert not peer_ctx.cancel_acked + # the `.open_context()` exit definitely caught # a cancellation in the internal `Context._scope` since # likely the runtime called `_deliver_msg()` after # receiving the remote error from the streaming task. - assert peer_ctx.cancelled_caught + assert not peer_ctx._scope.cancelled_caught # TODO / NOTE `.canceller` won't have been set yet # here because that machinery is inside @@ -259,6 +266,8 @@ async def stream_from_peer( # checkpoint) that cancellation was due to # a remote, we COULD assert this here..see, # https://github.com/goodboy/tractor/issues/368 + # + # assert 'canceller' in ctx.canceller # root/parent actor task should NEVER HAVE cancelled us! assert not ctx.canceller @@ -356,8 +365,7 @@ def test_peer_canceller( 'just_caller', # but i just met her? enable_modules=[__name__], ) - - root = tractor.current_actor() + root: Actor = current_actor() try: async with ( @@ -395,8 +403,8 @@ def test_peer_canceller( # not request the sleeper cancellation ;) except ContextCancelled as ctxerr: print( - 'CAUGHT REMOTE CONTEXT CANCEL FOM\n' - f'{ctxerr}' + 'CAUGHT REMOTE CONTEXT CANCEL\n\n' + f'{ctxerr}\n' ) # canceller and caller peers should not @@ -404,6 +412,9 @@ def test_peer_canceller( assert canceller_ctx.canceller is None assert caller_ctx.canceller is None + # we were not the actor, our peer was + assert not sleeper_ctx.cancel_acked + assert ctxerr.canceller[0] == 'canceller' # XXX NOTE XXX: since THIS `ContextCancelled` @@ -411,11 +422,13 @@ def test_peer_canceller( # `sleeper.open_context().__aexit__()` this # value is not yet set, however outside this # block it should be. - assert not sleeper_ctx.cancelled_caught + assert not sleeper_ctx._scope.cancelled_caught + # CASE_1: error-during-ctxc-handling, if error_during_ctxerr_handling: raise RuntimeError('Simulated error during teardown') + # CASE_2: standard teardown inside in `.open_context()` block raise # XXX SHOULD NEVER EVER GET HERE XXX @@ -436,7 +449,6 @@ def test_peer_canceller( else: pytest.fail( 'did not rx ctxc ?!?\n\n' - f'{ctxs}\n' ) @@ -447,21 +459,48 @@ def test_peer_canceller( _loc_err = loc_err # NOTE: the main state to check on `Context` is: - # - `.cancelled_caught` (maps to nursery cs) # - `.cancel_called` (bool of whether this side # requested) + # - `.cancel_acked` (bool of whether a ctxc + # response was received due to cancel req). + # - `.maybe_error` (highest prio error to raise + # locally) + # - `.outcome` (final error or result value) # - `.canceller` (uid of cancel-causing actor-task) # - `._remote_error` (any `RemoteActorError` # instance from other side of context) + # - `._local_error` (any error caught inside the + # `.open_context()` block). + # + # XXX: Deprecated and internal only + # - `.cancelled_caught` (maps to nursery cs) + # - now just use `._scope.cancelled_caught` + # since it maps to the internal (maps to nursery cs) + # # TODO: are we really planning to use this tho? # - `._cancel_msg` (any msg that caused the # cancel) - # CASE: error raised during handling of - # `ContextCancelled` inside `.open_context()` - # block + # CASE_1: error-during-ctxc-handling, + # - far end cancels due to peer 'canceller', + # - `ContextCancelled` relayed to this scope, + # - inside `.open_context()` ctxc is caught and + # a rte raised instead + # + # => block should raise the rte but all peers + # should be cancelled by US. + # if error_during_ctxerr_handling: assert isinstance(loc_err, RuntimeError) + print(f'_loc_err: {_loc_err}\n') + # assert sleeper_ctx._local_error is _loc_err + # assert sleeper_ctx._local_error is _loc_err + assert not ( + loc_err + is sleeper_ctx.maybe_error + is sleeper_ctx.outcome + is sleeper_ctx._remote_error + ) # NOTE: this root actor task should have # called `Context.cancel()` on the @@ -495,7 +534,25 @@ def test_peer_canceller( root.uid ) - # CASE: standard teardown inside in `.open_context()` block + # since the sleeper errors while handling a + # peer-cancelled (by ctxc) scenario, we expect + # that the `.open_context()` block DOES call + # `.cancel() (despite in this test case it + # being unecessary). + assert ( + sleeper_ctx.cancel_called + and + not sleeper_ctx.cancel_acked + ) + + # CASE_2: standard teardown inside in `.open_context()` block + # - far end cancels due to peer 'canceller', + # - `ContextCancelled` relayed to this scope and + # raised locally without any raise-during-handle, + # + # => inside `.open_context()` ctxc is raised and + # propagated + # else: assert isinstance(loc_err, ContextCancelled) assert loc_err.canceller == sleeper_ctx.canceller @@ -509,24 +566,42 @@ def test_peer_canceller( # the sleeper's remote error is the error bubbled # out of the context-stack above! - re = sleeper_ctx._remote_error - assert re is loc_err + re = sleeper_ctx.outcome + assert ( + re is loc_err + is sleeper_ctx.maybe_error + is sleeper_ctx._remote_error + ) for ctx in ctxs: - re: BaseException | None = ctx._remote_error - assert re + re: BaseException|None = ctx._remote_error + re: BaseException|None = ctx.outcome + assert ( + re and + ( + re is ctx.maybe_error + is ctx._remote_error + ) + ) + le: trio.MultiError = ctx._local_error + assert ( + le + and ctx._local_error + ) # root doesn't cancel sleeper since it's # cancelled by its peer. if ctx is sleeper_ctx: assert not ctx.cancel_called + assert not ctx.cancel_acked + # since sleeper_ctx.result() IS called # above we should have (silently) # absorbed the corresponding # `ContextCancelled` for it and thus # the logic inside `.cancelled_caught` # should trigger! - assert ctx.cancelled_caught + assert ctx._scope.cancelled_caught elif ctx is caller_ctx: # since its context was remotely @@ -535,15 +610,33 @@ def test_peer_canceller( # done by the peer and also we never assert ctx.cancel_called - # TODO: figure out the details of - # this.. + # TODO: figure out the details of this..? # if you look the `._local_error` here # is a multi of ctxc + 2 Cancelleds? # assert not ctx.cancelled_caught + elif ctx is canceller_ctx: + + # XXX NOTE XXX: ONLY the canceller + # will get a self-cancelled outcome + # whilst everyone else gets + # a peer-caused cancellation! + # + # TODO: really we should avoid calling + # .cancel() whenever an interpeer + # cancel takes place since each + # reception of a ctxc + assert ( + ctx.cancel_called + and ctx.cancel_acked + ) + assert not ctx._scope.cancelled_caught + else: - assert ctx.cancel_called - assert not ctx.cancelled_caught + pytest.fail( + 'Uhh wut ctx is this?\n' + f'{ctx}\n' + ) # TODO: do we even need this flag? # -> each context should have received @@ -559,14 +652,24 @@ def test_peer_canceller( # `Context.cancel()` SHOULD NOT have been # called inside # `Portal.open_context().__aexit__()`. - assert not sleeper_ctx.cancel_called + assert not ( + sleeper_ctx.cancel_called + or + sleeper_ctx.cancel_acked + ) # XXX NOTE XXX: and see matching comment above but, - # this flag is set only AFTER the `.open_context()` - # has exited and should be set in both outcomes - # including the case where ctx-cancel handling - # itself errors. - assert sleeper_ctx.cancelled_caught + # the `._scope` is only set by `trio` AFTER the + # `.open_context()` block has exited and should be + # set in both outcomes including the case where + # ctx-cancel handling itself errors. + assert sleeper_ctx._scope.cancelled_caught + assert _loc_err is sleeper_ctx._local_error + assert ( + sleeper_ctx.outcome + is sleeper_ctx.maybe_error + is sleeper_ctx._remote_error + ) raise # always to ensure teardown @@ -582,12 +685,315 @@ def test_peer_canceller( assert excinfo.value.canceller[0] == 'canceller' -def test_client_tree_spawns_and_cancels_service_subactor(): - ... -# TODO: test for the modden `mod wks open piker` bug! -# -> start actor-tree (server) that offers sub-actor spawns via -# context API -# -> start another full actor-tree (client) which requests to the first to -# spawn over its `@context` ep / api. -# -> client actor cancels the context and should exit gracefully -# and the server's spawned child should cancel and terminate! +@tractor.context +async def basic_echo_server( + ctx: Context, + peer_name: str = 'stepbro', + +) -> None: + ''' + Just the simplest `MsgStream` echo server which resays what + you told it but with its uid in front ;) + + ''' + actor: Actor = tractor.current_actor() + uid: tuple = actor.uid + await ctx.started(uid) + async with ctx.open_stream() as ipc: + async for msg in ipc: + + # repack msg pair with our uid + # as first element. + ( + client_uid, + i, + ) = msg + resp: tuple = ( + uid, + i, + ) + # OOF! looks like my runtime-error is causing a lockup + # assert 0 + await ipc.send(resp) + + +@tractor.context +async def serve_subactors( + ctx: Context, + peer_name: str, + +) -> None: + async with open_nursery() as an: + await ctx.started(peer_name) + async with ctx.open_stream() as reqs: + async for msg in reqs: + peer_name: str = msg + peer: Portal = await an.start_actor( + name=peer_name, + enable_modules=[__name__], + ) + print( + 'Spawning new subactor\n' + f'{peer_name}\n' + f'|_{peer}\n' + ) + await reqs.send(( + peer.chan.uid, + peer.chan.raddr, + )) + + print('Spawner exiting spawn serve loop!') + + +@tractor.context +async def client_req_subactor( + ctx: Context, + peer_name: str, + + # used to simulate a user causing an error to be raised + # directly in thread (like a KBI) to better replicate the + # case where a `modden` CLI client would hang afer requesting + # a `Context.cancel()` to `bigd`'s wks spawner. + reraise_on_cancel: str|None = None, + +) -> None: + # TODO: other cases to do with sub lifetimes: + # -[ ] test that we can have the server spawn a sub + # that lives longer then ctx with this client. + # -[ ] test that + + # open ctx with peer spawn server and ask it to spawn a little + # bro which we'll then connect and stream with. + async with ( + tractor.find_actor( + name='spawn_server', + raise_on_none=True, + + # TODO: we should be isolating this from other runs! + # => ideally so we can eventually use something like + # `pytest-xdist` Bo + # registry_addrs=bigd._reg_addrs, + ) as spawner, + + spawner.open_context( + serve_subactors, + peer_name=peer_name, + ) as (spawner_ctx, first), + ): + assert first == peer_name + await ctx.started( + 'yup i had brudder', + ) + + async with spawner_ctx.open_stream() as reqs: + + # send single spawn request to the server + await reqs.send(peer_name) + with trio.fail_after(3): + ( + sub_uid, + sub_raddr, + ) = await reqs.receive() + + + await tell_little_bro( + actor_name=sub_uid[0], + caller='client', + ) + + # TODO: test different scope-layers of + # cancellation? + # with trio.CancelScope() as cs: + try: + await trio.sleep_forever() + + # TODO: would be super nice to have a special injected + # cancel type here (maybe just our ctxc) but using + # some native mechanism in `trio` :p + except ( + trio.Cancelled + ) as err: + _err = err + if reraise_on_cancel: + errtype = globals()['__builtins__'][reraise_on_cancel] + assert errtype + to_reraise: BaseException = errtype() + print(f'client re-raising on cancel: {repr(to_reraise)}') + raise err + + raise + + # if cs.cancelled_caught: + # print('client handling expected KBI!') + # await ctx. + # await trio.sleep( + # await tractor.pause() + # await spawner_ctx.cancel() + + # cancel spawned sub-actor directly? + # await sub_ctx.cancel() + + # maybe cancel runtime? + # await sub.cancel_actor() + + +async def tell_little_bro( + actor_name: str, + caller: str = '' +): + # contact target actor, do a stream dialog. + async with ( + tractor.wait_for_actor( + name=actor_name + ) as lb, + lb.open_context( + basic_echo_server, + ) as (sub_ctx, first), + sub_ctx.open_stream( + basic_echo_server, + ) as echo_ipc, + ): + actor: Actor = current_actor() + uid: tuple = actor.uid + for i in range(100): + msg: tuple = ( + uid, + i, + ) + await echo_ipc.send(msg) + resp = await echo_ipc.receive() + print( + f'{caller} => {actor_name}: {msg}\n' + f'{caller} <= {actor_name}: {resp}\n' + ) + ( + sub_uid, + _i, + ) = resp + assert sub_uid != uid + assert _i == i + + +@pytest.mark.parametrize( + 'raise_client_error', + [None, 'KeyboardInterrupt'], +) +def test_peer_spawns_and_cancels_service_subactor( + debug_mode: bool, + raise_client_error: str, +): + # NOTE: this tests for the modden `mod wks open piker` bug + # discovered as part of implementing workspace ctx + # open-.pause()-ctx.cancel() as part of the CLI.. + + # -> start actor-tree (server) that offers sub-actor spawns via + # context API + # -> start another full actor-tree (client) which requests to the first to + # spawn over its `@context` ep / api. + # -> client actor cancels the context and should exit gracefully + # and the server's spawned child should cancel and terminate! + peer_name: str = 'little_bro' + + async def main(): + async with tractor.open_nursery( + # NOTE: to halt the peer tasks on ctxc, uncomment this. + debug_mode=debug_mode, + ) as an: + server: Portal = await an.start_actor( + (server_name := 'spawn_server'), + enable_modules=[__name__], + ) + print(f'Spawned `{server_name}`') + + client: Portal = await an.start_actor( + client_name := 'client', + enable_modules=[__name__], + ) + print(f'Spawned `{client_name}`') + + try: + async with ( + server.open_context( + serve_subactors, + peer_name=peer_name, + ) as (spawn_ctx, first), + + client.open_context( + client_req_subactor, + peer_name=peer_name, + reraise_on_cancel=raise_client_error, + ) as (client_ctx, client_says), + ): + print( + f'Server says: {first}\n' + f'Client says: {client_says}\n' + ) + + # attach to client-requested-to-spawn + # (grandchild of this root actor) "little_bro" + # and ensure we can also use it as an echo + # server. + async with tractor.wait_for_actor( + name=peer_name, + ) as sub: + assert sub + + print( + 'Sub-spawn came online\n' + f'portal: {sub}\n' + f'.uid: {sub.actor.uid}\n' + f'chan.raddr: {sub.chan.raddr}\n' + ) + await tell_little_bro( + actor_name=peer_name, + caller='root', + ) + + # signal client to raise a KBI + await client_ctx.cancel() + print('root cancelled client, checking that sub-spawn is down') + + async with tractor.find_actor( + name=peer_name, + ) as sub: + assert not sub + + print('root cancelling server/client sub-actors') + + # await tractor.pause() + res = await client_ctx.result(hide_tb=False) + assert isinstance(res, ContextCancelled) + assert client_ctx.cancel_acked + assert res.canceller == current_actor().uid + + await spawn_ctx.cancel() + # await server.cancel_actor() + + # since we called `.cancel_actor()`, `.cancel_ack` + # will not be set on the ctx bc `ctx.cancel()` was not + # called directly fot this confext. + except ContextCancelled as ctxc: + print('caught ctxc from contexts!') + assert ctxc.canceller == current_actor().uid + assert ctxc is spawn_ctx.outcome + assert ctxc is spawn_ctx.maybe_error + raise + + # assert spawn_ctx.cancel_acked + assert spawn_ctx.cancel_acked + assert client_ctx.cancel_acked + + await client.cancel_actor() + await server.cancel_actor() + + # WOA WOA WOA! we need this to close..!!!?? + # that's super bad XD + + # TODO: why isn't this working!?!? + # we're now outside the `.open_context()` block so + # the internal `Context._scope: CancelScope` should be + # gracefully "closed" ;) + + # assert spawn_ctx.cancelled_caught + + trio.run(main)