diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py new file mode 100644 index 0000000..48ca24e --- /dev/null +++ b/tests/test_resource_cache.py @@ -0,0 +1,104 @@ +''' +Async context manager cache api testing: ``trionics.maybe_open_context():`` + +''' +from contextlib import asynccontextmanager as acm +from typing import Awaitable + +import trio +import tractor + + +@tractor.context +async def streamer( + ctx: tractor.Context, + seq: list[int] = list(range(1000)), +) -> None: + + await ctx.started() + async with ctx.open_stream() as stream: + for val in seq: + await stream.send(val) + await trio.sleep(0.001) + + +@acm +async def open_stream() -> Awaitable[tractor.MsgStream]: + + async with tractor.open_nursery() as tn: + portal = await tn.start_actor('streamer', enable_modules=[__name__]) + async with ( + portal.open_context(streamer) as (ctx, first), + ctx.open_stream() as stream, + ): + yield stream + breakpoint() + + print('CANCELLING STREAMER') + await portal.cancel_actor() + + +@acm +async def maybe_open_stream(taskname: str): + async with tractor.trionics.maybe_open_context( + # NOTE: all secondary tasks should cache hit on the same key + key='stream', + mngr=open_stream(), + ) as (cache_hit, stream): + + if cache_hit: + print(f'{taskname} loaded from cache') + + # add a new broadcast subscription for the quote stream + # if this feed is already allocated by the first + # task that entereed + async with stream.subscribe() as bstream: + yield bstream + else: + # yield the actual stream + yield stream + + +def test_open_local_sub_to_stream(): + ''' + Verify a single inter-actor stream can can be fanned-out shared to + N local tasks using ``trionics.maybe_open_context():``. + + ''' + async def main(): + + full = list(range(1000)) + + async def get_sub_and_pull(taskname: str): + async with ( + maybe_open_stream(taskname) as stream, + ): + if '0' in taskname: + assert isinstance(stream, tractor.MsgStream) + else: + assert isinstance( + stream, + tractor.trionics.BroadcastReceiver + ) + + first = await stream.receive() + print(f'{taskname} started with value {first}') + seq = [] + async for msg in stream: + seq.append(msg) + print(f'{taskname} received {msg}') + + assert set(seq).issubset(set(full)) + print(f'{taskname} finished') + + # TODO: turns out this isn't multi-task entrant XD + # We probably need an indepotent entry semantic? + async with tractor.open_root_actor(): + async with ( + trio.open_nursery() as nurse, + ): + for i in range(10): + nurse.start_soon(get_sub_and_pull, f'task_{i}') + await trio.sleep(0.001) + + trio.run(main)