diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index d3859814..bdcdd6c9 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -100,16 +100,29 @@ async def streamer( @acm async def open_stream() -> Awaitable[tractor.MsgStream]: - async with tractor.open_nursery() as tn: - portal = await tn.start_actor('streamer', enable_modules=[__name__]) - async with ( - portal.open_context(streamer) as (ctx, first), - ctx.open_stream() as stream, - ): - yield stream + try: + async with tractor.open_nursery() as an: + portal = await an.start_actor( + 'streamer', + enable_modules=[__name__], + ) + async with ( + portal.open_context(streamer) as (ctx, first), + ctx.open_stream() as stream, + ): + yield stream - await portal.cancel_actor() - print('CANCELLED STREAMER') + print('Cancelling streamer') + await portal.cancel_actor() + print('Cancelled streamer') + + except Exception as err: + print( + f'`open_stream()` errored?\n' + f'{err!r}\n' + ) + await tractor.pause(shield=True) + raise err @acm @@ -132,19 +145,28 @@ async def maybe_open_stream(taskname: str): yield stream -def test_open_local_sub_to_stream(): +def test_open_local_sub_to_stream( + debug_mode: bool, +): ''' Verify a single inter-actor stream can can be fanned-out shared to - N local tasks using ``trionics.maybe_open_context():``. + N local tasks using `trionics.maybe_open_context()`. ''' - timeout: float = 3.6 if platform.system() != "Windows" else 10 + timeout: float = 3.6 + if platform.system() == "Windows": + timeout: float = 10 + + if debug_mode: + timeout = 999 async def main(): full = list(range(1000)) async def get_sub_and_pull(taskname: str): + + stream: tractor.MsgStream async with ( maybe_open_stream(taskname) as stream, ): @@ -165,17 +187,27 @@ def test_open_local_sub_to_stream(): assert set(seq).issubset(set(full)) print(f'{taskname} finished') - with trio.fail_after(timeout): + with trio.fail_after(timeout) as cs: # TODO: turns out this isn't multi-task entrant XD # We probably need an indepotent entry semantic? - async with tractor.open_root_actor(): + async with tractor.open_root_actor( + debug_mode=debug_mode, + ): async with ( - trio.open_nursery() as nurse, + trio.open_nursery() as tn, ): for i in range(10): - nurse.start_soon(get_sub_and_pull, f'task_{i}') + tn.start_soon( + get_sub_and_pull, + f'task_{i}', + ) await trio.sleep(0.001) print('all consumer tasks finished') + if cs.cancelled_caught: + pytest.fail( + 'Should NOT time out in `open_root_actor()` ?' + ) + trio.run(main)