diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index dda096c..4eb06e8 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -8,7 +8,9 @@ sync-opening a ``tractor.Context`` beforehand. # from contextlib import asynccontextmanager as acm from itertools import count import platform -from typing import Optional +from typing import ( + Callable, +) import pytest import trio @@ -69,7 +71,7 @@ _state: bool = False @tractor.context async def too_many_starteds( - ctx: tractor.Context, + ctx: Context, ) -> None: ''' Call ``Context.started()`` more then once (an error). @@ -84,7 +86,7 @@ async def too_many_starteds( @tractor.context async def not_started_but_stream_opened( - ctx: tractor.Context, + ctx: Context, ) -> None: ''' Enter ``Context.open_stream()`` without calling ``.started()``. @@ -105,11 +107,15 @@ async def not_started_but_stream_opened( ], ids='misuse_type={}'.format, ) -def test_started_misuse(target): - +def test_started_misuse( + target: Callable, + debug_mode: bool, +): async def main(): - async with tractor.open_nursery() as n: - portal = await n.start_actor( + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: + portal = await an.start_actor( target.__name__, enable_modules=[__name__], ) @@ -124,7 +130,7 @@ def test_started_misuse(target): @tractor.context async def simple_setup_teardown( - ctx: tractor.Context, + ctx: Context, data: int, block_forever: bool = False, @@ -170,6 +176,7 @@ def test_simple_context( error_parent, callee_blocks_forever, pointlessly_open_stream, + debug_mode: bool, ): timeout = 1.5 if not platform.system() == 'Windows' else 4 @@ -177,9 +184,10 @@ def test_simple_context( async def main(): with trio.fail_after(timeout): - async with tractor.open_nursery() as nursery: - - portal = await nursery.start_actor( + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: + portal = await an.start_actor( 'simple_context', enable_modules=[__name__], ) @@ -260,6 +268,7 @@ def test_caller_cancels( cancel_method: str, chk_ctx_result_before_exit: bool, callee_returns_early: bool, + debug_mode: bool, ): ''' Verify that when the opening side of a context (aka the caller) @@ -268,7 +277,7 @@ def test_caller_cancels( ''' async def check_canceller( - ctx: tractor.Context, + ctx: Context, ) -> None: # should not raise yet return the remote # context cancelled error. @@ -287,8 +296,10 @@ def test_caller_cancels( ) async def main(): - async with tractor.open_nursery() as nursery: - portal = await nursery.start_actor( + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: + portal = await an.start_actor( 'simple_context', enable_modules=[__name__], ) @@ -338,7 +349,7 @@ def test_caller_cancels( @tractor.context async def close_ctx_immediately( - ctx: tractor.Context, + ctx: Context, ) -> None: @@ -350,17 +361,33 @@ async def close_ctx_immediately( @tractor_test -async def test_callee_closes_ctx_after_stream_open(): - 'callee context closes without using stream' +async def test_callee_closes_ctx_after_stream_open( + debug_mode: bool, +): + ''' + callee context closes without using stream. - async with tractor.open_nursery() as n: + This should result in a msg sequence + |__ + |_ - portal = await n.start_actor( + <= {'started': , 'cid': } + <= {'stop': True, 'cid': } + <= {'result': Any, ..} + + (ignored by child) + => {'stop': True, 'cid': } + + ''' + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: + portal = await an.start_actor( 'fast_stream_closer', enable_modules=[__name__], ) - with trio.fail_after(2): + with trio.fail_after(0.5): async with portal.open_context( close_ctx_immediately, @@ -368,10 +395,9 @@ async def test_callee_closes_ctx_after_stream_open(): # cancel_on_exit=True, ) as (ctx, sent): - assert sent is None - with trio.fail_after(0.5): + with trio.fail_after(0.4): async with ctx.open_stream() as stream: # should fall through since ``StopAsyncIteration`` @@ -379,12 +405,15 @@ async def test_callee_closes_ctx_after_stream_open(): # a ``trio.EndOfChannel`` by # ``trio.abc.ReceiveChannel.__anext__()`` async for _ in stream: + # trigger failure if we DO NOT + # get an EOC! assert 0 else: # verify stream is now closed try: - await stream.receive() + with trio.fail_after(0.3): + await stream.receive() except trio.EndOfChannel: pass @@ -405,7 +434,7 @@ async def test_callee_closes_ctx_after_stream_open(): @tractor.context async def expect_cancelled( - ctx: tractor.Context, + ctx: Context, ) -> None: global _state @@ -434,11 +463,15 @@ async def expect_cancelled( @tractor_test async def test_caller_closes_ctx_after_callee_opens_stream( use_ctx_cancel_method: bool, + debug_mode: bool, ): - 'caller context closes without using stream' - - async with tractor.open_nursery() as an: + ''' + caller context closes without using/opening stream + ''' + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: root: Actor = current_actor() portal = await an.start_actor( @@ -522,11 +555,13 @@ async def test_caller_closes_ctx_after_callee_opens_stream( @tractor_test -async def test_multitask_caller_cancels_from_nonroot_task(): - - async with tractor.open_nursery() as n: - - portal = await n.start_actor( +async def test_multitask_caller_cancels_from_nonroot_task( + debug_mode: bool, +): + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: + portal = await an.start_actor( 'ctx_cancelled', enable_modules=[__name__], ) @@ -573,7 +608,7 @@ async def test_multitask_caller_cancels_from_nonroot_task(): @tractor.context async def cancel_self( - ctx: tractor.Context, + ctx: Context, ) -> None: global _state @@ -610,16 +645,20 @@ async def cancel_self( raise RuntimeError('Context didnt cancel itself?!') + @tractor_test -async def test_callee_cancels_before_started(): +async def test_callee_cancels_before_started( + debug_mode: bool, +): ''' Callee calls `Context.cancel()` while streaming and caller sees stream terminated in `ContextCancelled`. ''' - async with tractor.open_nursery() as n: - - portal = await n.start_actor( + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: + portal = await an.start_actor( 'cancels_self', enable_modules=[__name__], ) @@ -645,7 +684,7 @@ async def test_callee_cancels_before_started(): @tractor.context async def never_open_stream( - ctx: tractor.Context, + ctx: Context, ) -> None: ''' @@ -659,8 +698,8 @@ async def never_open_stream( @tractor.context async def keep_sending_from_callee( - ctx: tractor.Context, - msg_buffer_size: Optional[int] = None, + ctx: Context, + msg_buffer_size: int|None = None, ) -> None: ''' @@ -685,7 +724,10 @@ async def keep_sending_from_callee( ], ids='overrun_condition={}'.format, ) -def test_one_end_stream_not_opened(overrun_by): +def test_one_end_stream_not_opened( + overrun_by: tuple[str, int, Callable], + debug_mode: bool, +): ''' This should exemplify the bug from: https://github.com/goodboy/tractor/issues/265 @@ -696,8 +738,10 @@ def test_one_end_stream_not_opened(overrun_by): buf_size = buf_size_increase + Actor.msg_buffer_size async def main(): - async with tractor.open_nursery() as n: - portal = await n.start_actor( + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: + portal = await an.start_actor( entrypoint.__name__, enable_modules=[__name__], ) @@ -754,7 +798,7 @@ def test_one_end_stream_not_opened(overrun_by): @tractor.context async def echo_back_sequence( - ctx: tractor.Context, + ctx: Context, seq: list[int], wait_for_cancel: bool, allow_overruns_side: str, @@ -837,6 +881,7 @@ def test_maybe_allow_overruns_stream( slow_side: str, allow_overruns_side: str, loglevel: str, + debug_mode: bool, ): ''' Demonstrate small overruns of each task back and forth @@ -855,13 +900,14 @@ def test_maybe_allow_overruns_stream( ''' async def main(): - async with tractor.open_nursery() as n: - portal = await n.start_actor( + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as an: + portal = await an.start_actor( 'callee_sends_forever', enable_modules=[__name__], loglevel=loglevel, - - # debug_mode=True, + debug_mode=debug_mode, ) seq = list(range(10)) async with portal.open_context( diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 5e1a4ca..1ead617 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -123,7 +123,9 @@ async def error_before_started( await peer_ctx.cancel() -def test_do_not_swallow_error_before_started_by_remote_contextcancelled(): +def test_do_not_swallow_error_before_started_by_remote_contextcancelled( + debug_mode: bool, +): ''' Verify that an error raised in a remote context which itself opens YET ANOTHER remote context, which it then cancels, does not @@ -132,7 +134,9 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled(): ''' async def main(): - async with tractor.open_nursery() as n: + async with tractor.open_nursery( + debug_mode=debug_mode, + ) as n: portal = await n.start_actor( 'errorer', enable_modules=[__name__], @@ -225,13 +229,16 @@ async def stream_from_peer( # NOTE: cancellation of the (sleeper) peer should always # cause a `ContextCancelled` raise in this streaming # actor. - except ContextCancelled as ctxerr: - err = ctxerr + except ContextCancelled as ctxc: + ctxerr = ctxc + assert peer_ctx._remote_error is ctxerr + assert peer_ctx._remote_error.msgdata == ctxerr.msgdata assert peer_ctx.canceller == ctxerr.canceller # caller peer should not be the cancel requester assert not ctx.cancel_called + # XXX can never be true since `._invoke` only # sets this AFTER the nursery block this task # was started in, exits. @@ -269,9 +276,7 @@ async def stream_from_peer( # assert ctx.canceller[0] == 'root' # assert peer_ctx.canceller[0] == 'sleeper' - raise RuntimeError( - 'peer never triggered local `ContextCancelled`?' - ) + raise RuntimeError('Never triggered local `ContextCancelled` ?!?') @pytest.mark.parametrize( @@ -280,6 +285,7 @@ async def stream_from_peer( ) def test_peer_canceller( error_during_ctxerr_handling: bool, + debug_mode: bool, ): ''' Verify that a cancellation triggered by an in-actor-tree peer @@ -336,7 +342,7 @@ def test_peer_canceller( async def main(): async with tractor.open_nursery( # NOTE: to halt the peer tasks on ctxc, uncomment this. - # debug_mode=True + debug_mode=debug_mode, ) as an: canceller: Portal = await an.start_actor( 'canceller', @@ -377,7 +383,8 @@ def test_peer_canceller( try: print('PRE CONTEXT RESULT') - await sleeper_ctx.result() + res = await sleeper_ctx.result() + assert res # should never get here pytest.fail( @@ -387,7 +394,10 @@ def test_peer_canceller( # should always raise since this root task does # not request the sleeper cancellation ;) except ContextCancelled as ctxerr: - print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}') + print( + 'CAUGHT REMOTE CONTEXT CANCEL FOM\n' + f'{ctxerr}' + ) # canceller and caller peers should not # have been remotely cancelled. @@ -410,16 +420,31 @@ def test_peer_canceller( # XXX SHOULD NEVER EVER GET HERE XXX except BaseException as berr: - err = berr - pytest.fail('did not rx ctx-cancelled error?') + raise + + # XXX if needed to debug failure + # _err = berr + # await tractor.pause() + # await trio.sleep_forever() + + pytest.fail( + 'did not rx ctxc ?!?\n\n' + + f'{berr}\n' + ) + else: - pytest.fail('did not rx ctx-cancelled error?') + pytest.fail( + 'did not rx ctxc ?!?\n\n' + + f'{ctxs}\n' + ) except ( ContextCancelled, RuntimeError, - )as ctxerr: - _err = ctxerr + )as loc_err: + _loc_err = loc_err # NOTE: the main state to check on `Context` is: # - `.cancelled_caught` (maps to nursery cs) @@ -436,7 +461,7 @@ def test_peer_canceller( # `ContextCancelled` inside `.open_context()` # block if error_during_ctxerr_handling: - assert isinstance(ctxerr, RuntimeError) + assert isinstance(loc_err, RuntimeError) # NOTE: this root actor task should have # called `Context.cancel()` on the @@ -472,9 +497,10 @@ def test_peer_canceller( # CASE: standard teardown inside in `.open_context()` block else: - assert ctxerr.canceller == sleeper_ctx.canceller + assert isinstance(loc_err, ContextCancelled) + assert loc_err.canceller == sleeper_ctx.canceller assert ( - ctxerr.canceller[0] + loc_err.canceller[0] == sleeper_ctx.canceller[0] == @@ -484,7 +510,7 @@ def test_peer_canceller( # the sleeper's remote error is the error bubbled # out of the context-stack above! re = sleeper_ctx._remote_error - assert re is ctxerr + assert re is loc_err for ctx in ctxs: re: BaseException | None = ctx._remote_error @@ -554,3 +580,14 @@ def test_peer_canceller( assert excinfo.value.type == ContextCancelled assert excinfo.value.canceller[0] == 'canceller' + + +def test_client_tree_spawns_and_cancels_service_subactor(): + ... +# TODO: test for the modden `mod wks open piker` bug! +# -> start actor-tree (server) that offers sub-actor spawns via +# context API +# -> start another full actor-tree (client) which requests to the first to +# spawn over its `@context` ep / api. +# -> client actor cancels the context and should exit gracefully +# and the server's spawned child should cancel and terminate!