forked from goodboy/tractor
1
0
Fork 0

Make ctx tests support `debug_mode: bool` fixture

Such that with `--tpdb` passed (sub)actors will engage the `pdbp` REPL
automatically and so that we can use the new `stackscope` support when
complex cases hang Bo

Also,
- simplified some type-annots (ns paths),
- doc-ed an inter-peer test func with some ascii msg flows,
- added a bottom #TODO for replicating the scenario i hit in `modden`
  where a separate client actor-tree was hanging on cancelling a `bigd`
  sub-workspace..
modden_spawn_from_client_req
Tyler Goodlet 2024-02-20 15:14:58 -05:00
parent 1d7cf7d1dd
commit 6c9bc627d8
2 changed files with 150 additions and 67 deletions

View File

@ -8,7 +8,9 @@ sync-opening a ``tractor.Context`` beforehand.
# from contextlib import asynccontextmanager as acm # from contextlib import asynccontextmanager as acm
from itertools import count from itertools import count
import platform import platform
from typing import Optional from typing import (
Callable,
)
import pytest import pytest
import trio import trio
@ -69,7 +71,7 @@ _state: bool = False
@tractor.context @tractor.context
async def too_many_starteds( async def too_many_starteds(
ctx: tractor.Context, ctx: Context,
) -> None: ) -> None:
''' '''
Call ``Context.started()`` more then once (an error). Call ``Context.started()`` more then once (an error).
@ -84,7 +86,7 @@ async def too_many_starteds(
@tractor.context @tractor.context
async def not_started_but_stream_opened( async def not_started_but_stream_opened(
ctx: tractor.Context, ctx: Context,
) -> None: ) -> None:
''' '''
Enter ``Context.open_stream()`` without calling ``.started()``. Enter ``Context.open_stream()`` without calling ``.started()``.
@ -105,11 +107,15 @@ async def not_started_but_stream_opened(
], ],
ids='misuse_type={}'.format, ids='misuse_type={}'.format,
) )
def test_started_misuse(target): def test_started_misuse(
target: Callable,
debug_mode: bool,
):
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
portal = await n.start_actor( debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
target.__name__, target.__name__,
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -124,7 +130,7 @@ def test_started_misuse(target):
@tractor.context @tractor.context
async def simple_setup_teardown( async def simple_setup_teardown(
ctx: tractor.Context, ctx: Context,
data: int, data: int,
block_forever: bool = False, block_forever: bool = False,
@ -170,6 +176,7 @@ def test_simple_context(
error_parent, error_parent,
callee_blocks_forever, callee_blocks_forever,
pointlessly_open_stream, pointlessly_open_stream,
debug_mode: bool,
): ):
timeout = 1.5 if not platform.system() == 'Windows' else 4 timeout = 1.5 if not platform.system() == 'Windows' else 4
@ -177,9 +184,10 @@ def test_simple_context(
async def main(): async def main():
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with tractor.open_nursery() as nursery: async with tractor.open_nursery(
debug_mode=debug_mode,
portal = await nursery.start_actor( ) as an:
portal = await an.start_actor(
'simple_context', 'simple_context',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -260,6 +268,7 @@ def test_caller_cancels(
cancel_method: str, cancel_method: str,
chk_ctx_result_before_exit: bool, chk_ctx_result_before_exit: bool,
callee_returns_early: bool, callee_returns_early: bool,
debug_mode: bool,
): ):
''' '''
Verify that when the opening side of a context (aka the caller) Verify that when the opening side of a context (aka the caller)
@ -268,7 +277,7 @@ def test_caller_cancels(
''' '''
async def check_canceller( async def check_canceller(
ctx: tractor.Context, ctx: Context,
) -> None: ) -> None:
# should not raise yet return the remote # should not raise yet return the remote
# context cancelled error. # context cancelled error.
@ -287,8 +296,10 @@ def test_caller_cancels(
) )
async def main(): async def main():
async with tractor.open_nursery() as nursery: async with tractor.open_nursery(
portal = await nursery.start_actor( debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'simple_context', 'simple_context',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -338,7 +349,7 @@ def test_caller_cancels(
@tractor.context @tractor.context
async def close_ctx_immediately( async def close_ctx_immediately(
ctx: tractor.Context, ctx: Context,
) -> None: ) -> None:
@ -350,17 +361,33 @@ async def close_ctx_immediately(
@tractor_test @tractor_test
async def test_callee_closes_ctx_after_stream_open(): async def test_callee_closes_ctx_after_stream_open(
'callee context closes without using stream' debug_mode: bool,
):
'''
callee context closes without using stream.
async with tractor.open_nursery() as n: This should result in a msg sequence
|_<root>_
|_<fast_stream_closer>
portal = await n.start_actor( <= {'started': <Any>, 'cid': <str>}
<= {'stop': True, 'cid': <str>}
<= {'result': Any, ..}
(ignored by child)
=> {'stop': True, 'cid': <str>}
'''
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'fast_stream_closer', 'fast_stream_closer',
enable_modules=[__name__], enable_modules=[__name__],
) )
with trio.fail_after(2): with trio.fail_after(0.5):
async with portal.open_context( async with portal.open_context(
close_ctx_immediately, close_ctx_immediately,
@ -368,10 +395,9 @@ async def test_callee_closes_ctx_after_stream_open():
# cancel_on_exit=True, # cancel_on_exit=True,
) as (ctx, sent): ) as (ctx, sent):
assert sent is None assert sent is None
with trio.fail_after(0.5): with trio.fail_after(0.4):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
# should fall through since ``StopAsyncIteration`` # should fall through since ``StopAsyncIteration``
@ -379,11 +405,14 @@ async def test_callee_closes_ctx_after_stream_open():
# a ``trio.EndOfChannel`` by # a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()`` # ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream: async for _ in stream:
# trigger failure if we DO NOT
# get an EOC!
assert 0 assert 0
else: else:
# verify stream is now closed # verify stream is now closed
try: try:
with trio.fail_after(0.3):
await stream.receive() await stream.receive()
except trio.EndOfChannel: except trio.EndOfChannel:
pass pass
@ -405,7 +434,7 @@ async def test_callee_closes_ctx_after_stream_open():
@tractor.context @tractor.context
async def expect_cancelled( async def expect_cancelled(
ctx: tractor.Context, ctx: Context,
) -> None: ) -> None:
global _state global _state
@ -434,11 +463,15 @@ async def expect_cancelled(
@tractor_test @tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream( async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool, use_ctx_cancel_method: bool,
debug_mode: bool,
): ):
'caller context closes without using stream' '''
caller context closes without using/opening stream
async with tractor.open_nursery() as an:
'''
async with tractor.open_nursery(
debug_mode=debug_mode,
) as an:
root: Actor = current_actor() root: Actor = current_actor()
portal = await an.start_actor( portal = await an.start_actor(
@ -522,11 +555,13 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
@tractor_test @tractor_test
async def test_multitask_caller_cancels_from_nonroot_task(): async def test_multitask_caller_cancels_from_nonroot_task(
debug_mode: bool,
async with tractor.open_nursery() as n: ):
async with tractor.open_nursery(
portal = await n.start_actor( debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'ctx_cancelled', 'ctx_cancelled',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -573,7 +608,7 @@ async def test_multitask_caller_cancels_from_nonroot_task():
@tractor.context @tractor.context
async def cancel_self( async def cancel_self(
ctx: tractor.Context, ctx: Context,
) -> None: ) -> None:
global _state global _state
@ -610,16 +645,20 @@ async def cancel_self(
raise RuntimeError('Context didnt cancel itself?!') raise RuntimeError('Context didnt cancel itself?!')
@tractor_test @tractor_test
async def test_callee_cancels_before_started(): async def test_callee_cancels_before_started(
debug_mode: bool,
):
''' '''
Callee calls `Context.cancel()` while streaming and caller Callee calls `Context.cancel()` while streaming and caller
sees stream terminated in `ContextCancelled`. sees stream terminated in `ContextCancelled`.
''' '''
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=debug_mode,
portal = await n.start_actor( ) as an:
portal = await an.start_actor(
'cancels_self', 'cancels_self',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -645,7 +684,7 @@ async def test_callee_cancels_before_started():
@tractor.context @tractor.context
async def never_open_stream( async def never_open_stream(
ctx: tractor.Context, ctx: Context,
) -> None: ) -> None:
''' '''
@ -659,8 +698,8 @@ async def never_open_stream(
@tractor.context @tractor.context
async def keep_sending_from_callee( async def keep_sending_from_callee(
ctx: tractor.Context, ctx: Context,
msg_buffer_size: Optional[int] = None, msg_buffer_size: int|None = None,
) -> None: ) -> None:
''' '''
@ -685,7 +724,10 @@ async def keep_sending_from_callee(
], ],
ids='overrun_condition={}'.format, ids='overrun_condition={}'.format,
) )
def test_one_end_stream_not_opened(overrun_by): def test_one_end_stream_not_opened(
overrun_by: tuple[str, int, Callable],
debug_mode: bool,
):
''' '''
This should exemplify the bug from: This should exemplify the bug from:
https://github.com/goodboy/tractor/issues/265 https://github.com/goodboy/tractor/issues/265
@ -696,8 +738,10 @@ def test_one_end_stream_not_opened(overrun_by):
buf_size = buf_size_increase + Actor.msg_buffer_size buf_size = buf_size_increase + Actor.msg_buffer_size
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
portal = await n.start_actor( debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
entrypoint.__name__, entrypoint.__name__,
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -754,7 +798,7 @@ def test_one_end_stream_not_opened(overrun_by):
@tractor.context @tractor.context
async def echo_back_sequence( async def echo_back_sequence(
ctx: tractor.Context, ctx: Context,
seq: list[int], seq: list[int],
wait_for_cancel: bool, wait_for_cancel: bool,
allow_overruns_side: str, allow_overruns_side: str,
@ -837,6 +881,7 @@ def test_maybe_allow_overruns_stream(
slow_side: str, slow_side: str,
allow_overruns_side: str, allow_overruns_side: str,
loglevel: str, loglevel: str,
debug_mode: bool,
): ):
''' '''
Demonstrate small overruns of each task back and forth Demonstrate small overruns of each task back and forth
@ -855,13 +900,14 @@ def test_maybe_allow_overruns_stream(
''' '''
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
portal = await n.start_actor( debug_mode=debug_mode,
) as an:
portal = await an.start_actor(
'callee_sends_forever', 'callee_sends_forever',
enable_modules=[__name__], enable_modules=[__name__],
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode,
# debug_mode=True,
) )
seq = list(range(10)) seq = list(range(10))
async with portal.open_context( async with portal.open_context(

View File

@ -123,7 +123,9 @@ async def error_before_started(
await peer_ctx.cancel() await peer_ctx.cancel()
def test_do_not_swallow_error_before_started_by_remote_contextcancelled(): def test_do_not_swallow_error_before_started_by_remote_contextcancelled(
debug_mode: bool,
):
''' '''
Verify that an error raised in a remote context which itself Verify that an error raised in a remote context which itself
opens YET ANOTHER remote context, which it then cancels, does not opens YET ANOTHER remote context, which it then cancels, does not
@ -132,7 +134,9 @@ def test_do_not_swallow_error_before_started_by_remote_contextcancelled():
''' '''
async def main(): async def main():
async with tractor.open_nursery() as n: async with tractor.open_nursery(
debug_mode=debug_mode,
) as n:
portal = await n.start_actor( portal = await n.start_actor(
'errorer', 'errorer',
enable_modules=[__name__], enable_modules=[__name__],
@ -225,13 +229,16 @@ async def stream_from_peer(
# NOTE: cancellation of the (sleeper) peer should always # NOTE: cancellation of the (sleeper) peer should always
# cause a `ContextCancelled` raise in this streaming # cause a `ContextCancelled` raise in this streaming
# actor. # actor.
except ContextCancelled as ctxerr: except ContextCancelled as ctxc:
err = ctxerr ctxerr = ctxc
assert peer_ctx._remote_error is ctxerr assert peer_ctx._remote_error is ctxerr
assert peer_ctx._remote_error.msgdata == ctxerr.msgdata
assert peer_ctx.canceller == ctxerr.canceller assert peer_ctx.canceller == ctxerr.canceller
# caller peer should not be the cancel requester # caller peer should not be the cancel requester
assert not ctx.cancel_called assert not ctx.cancel_called
# XXX can never be true since `._invoke` only # XXX can never be true since `._invoke` only
# sets this AFTER the nursery block this task # sets this AFTER the nursery block this task
# was started in, exits. # was started in, exits.
@ -269,9 +276,7 @@ async def stream_from_peer(
# assert ctx.canceller[0] == 'root' # assert ctx.canceller[0] == 'root'
# assert peer_ctx.canceller[0] == 'sleeper' # assert peer_ctx.canceller[0] == 'sleeper'
raise RuntimeError( raise RuntimeError('Never triggered local `ContextCancelled` ?!?')
'peer never triggered local `ContextCancelled`?'
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -280,6 +285,7 @@ async def stream_from_peer(
) )
def test_peer_canceller( def test_peer_canceller(
error_during_ctxerr_handling: bool, error_during_ctxerr_handling: bool,
debug_mode: bool,
): ):
''' '''
Verify that a cancellation triggered by an in-actor-tree peer Verify that a cancellation triggered by an in-actor-tree peer
@ -336,7 +342,7 @@ def test_peer_canceller(
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this. # NOTE: to halt the peer tasks on ctxc, uncomment this.
# debug_mode=True debug_mode=debug_mode,
) as an: ) as an:
canceller: Portal = await an.start_actor( canceller: Portal = await an.start_actor(
'canceller', 'canceller',
@ -377,7 +383,8 @@ def test_peer_canceller(
try: try:
print('PRE CONTEXT RESULT') print('PRE CONTEXT RESULT')
await sleeper_ctx.result() res = await sleeper_ctx.result()
assert res
# should never get here # should never get here
pytest.fail( pytest.fail(
@ -387,7 +394,10 @@ def test_peer_canceller(
# should always raise since this root task does # should always raise since this root task does
# not request the sleeper cancellation ;) # not request the sleeper cancellation ;)
except ContextCancelled as ctxerr: except ContextCancelled as ctxerr:
print(f'CAUGHT REMOTE CONTEXT CANCEL {ctxerr}') print(
'CAUGHT REMOTE CONTEXT CANCEL FOM\n'
f'{ctxerr}'
)
# canceller and caller peers should not # canceller and caller peers should not
# have been remotely cancelled. # have been remotely cancelled.
@ -410,16 +420,31 @@ def test_peer_canceller(
# XXX SHOULD NEVER EVER GET HERE XXX # XXX SHOULD NEVER EVER GET HERE XXX
except BaseException as berr: except BaseException as berr:
err = berr raise
pytest.fail('did not rx ctx-cancelled error?')
# XXX if needed to debug failure
# _err = berr
# await tractor.pause()
# await trio.sleep_forever()
pytest.fail(
'did not rx ctxc ?!?\n\n'
f'{berr}\n'
)
else: else:
pytest.fail('did not rx ctx-cancelled error?') pytest.fail(
'did not rx ctxc ?!?\n\n'
f'{ctxs}\n'
)
except ( except (
ContextCancelled, ContextCancelled,
RuntimeError, RuntimeError,
)as ctxerr: )as loc_err:
_err = ctxerr _loc_err = loc_err
# NOTE: the main state to check on `Context` is: # NOTE: the main state to check on `Context` is:
# - `.cancelled_caught` (maps to nursery cs) # - `.cancelled_caught` (maps to nursery cs)
@ -436,7 +461,7 @@ def test_peer_canceller(
# `ContextCancelled` inside `.open_context()` # `ContextCancelled` inside `.open_context()`
# block # block
if error_during_ctxerr_handling: if error_during_ctxerr_handling:
assert isinstance(ctxerr, RuntimeError) assert isinstance(loc_err, RuntimeError)
# NOTE: this root actor task should have # NOTE: this root actor task should have
# called `Context.cancel()` on the # called `Context.cancel()` on the
@ -472,9 +497,10 @@ def test_peer_canceller(
# CASE: standard teardown inside in `.open_context()` block # CASE: standard teardown inside in `.open_context()` block
else: else:
assert ctxerr.canceller == sleeper_ctx.canceller assert isinstance(loc_err, ContextCancelled)
assert loc_err.canceller == sleeper_ctx.canceller
assert ( assert (
ctxerr.canceller[0] loc_err.canceller[0]
== ==
sleeper_ctx.canceller[0] sleeper_ctx.canceller[0]
== ==
@ -484,7 +510,7 @@ def test_peer_canceller(
# the sleeper's remote error is the error bubbled # the sleeper's remote error is the error bubbled
# out of the context-stack above! # out of the context-stack above!
re = sleeper_ctx._remote_error re = sleeper_ctx._remote_error
assert re is ctxerr assert re is loc_err
for ctx in ctxs: for ctx in ctxs:
re: BaseException | None = ctx._remote_error re: BaseException | None = ctx._remote_error
@ -554,3 +580,14 @@ def test_peer_canceller(
assert excinfo.value.type == ContextCancelled assert excinfo.value.type == ContextCancelled
assert excinfo.value.canceller[0] == 'canceller' assert excinfo.value.canceller[0] == 'canceller'
def test_client_tree_spawns_and_cancels_service_subactor():
...
# TODO: test for the modden `mod wks open piker` bug!
# -> start actor-tree (server) that offers sub-actor spawns via
# context API
# -> start another full actor-tree (client) which requests to the first to
# spawn over its `@context` ep / api.
# -> client actor cancels the context and should exit gracefully
# and the server's spawned child should cancel and terminate!