183 lines
4.9 KiB
Python
183 lines
4.9 KiB
Python
'''
|
|
Async context manager cache api testing: ``trionics.maybe_open_context():``
|
|
|
|
'''
|
|
from contextlib import asynccontextmanager as acm
|
|
import platform
|
|
from typing import Awaitable
|
|
|
|
import pytest
|
|
import trio
|
|
import tractor
|
|
|
|
|
|
_resource: int = 0
|
|
|
|
|
|
@acm
|
|
async def maybe_increment_counter(task_name: str):
|
|
global _resource
|
|
|
|
_resource += 1
|
|
await trio.lowlevel.checkpoint()
|
|
yield _resource
|
|
await trio.lowlevel.checkpoint()
|
|
_resource -= 1
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'key_on',
|
|
['key_value', 'kwargs'],
|
|
ids="key_on={}".format,
|
|
)
|
|
def test_resource_only_entered_once(key_on):
|
|
global _resource
|
|
_resource = 0
|
|
|
|
kwargs = {}
|
|
key = None
|
|
if key_on == 'key_value':
|
|
key = 'some_common_key'
|
|
|
|
async def main():
|
|
cache_active: bool = False
|
|
|
|
async def enter_cached_mngr(name: str):
|
|
nonlocal cache_active
|
|
|
|
if key_on == 'kwargs':
|
|
# make a common kwargs input to key on it
|
|
kwargs = {'task_name': 'same_task_name'}
|
|
assert key is None
|
|
else:
|
|
# different task names per task will be used
|
|
kwargs = {'task_name': name}
|
|
|
|
async with tractor.trionics.maybe_open_context(
|
|
maybe_increment_counter,
|
|
kwargs=kwargs,
|
|
key=key,
|
|
|
|
) as (cache_hit, resource):
|
|
if cache_hit:
|
|
try:
|
|
cache_active = True
|
|
assert resource == 1
|
|
await trio.sleep_forever()
|
|
finally:
|
|
cache_active = False
|
|
else:
|
|
assert resource == 1
|
|
await trio.sleep_forever()
|
|
|
|
with trio.move_on_after(0.5):
|
|
async with (
|
|
tractor.open_root_actor(),
|
|
trio.open_nursery() as n,
|
|
):
|
|
|
|
for i in range(10):
|
|
n.start_soon(enter_cached_mngr, f'task_{i}')
|
|
await trio.sleep(0.001)
|
|
|
|
trio.run(main)
|
|
|
|
|
|
@tractor.context
|
|
async def streamer(
|
|
ctx: tractor.Context,
|
|
seq: list[int] = list(range(1000)),
|
|
) -> None:
|
|
|
|
await ctx.started()
|
|
async with ctx.open_stream() as stream:
|
|
for val in seq:
|
|
await stream.send(val)
|
|
await trio.sleep(0.001)
|
|
|
|
print('producer finished')
|
|
|
|
|
|
@acm
|
|
async def open_stream() -> Awaitable[tractor.MsgStream]:
|
|
|
|
async with tractor.open_nursery() as tn:
|
|
portal = await tn.start_actor('streamer', enable_modules=[__name__])
|
|
async with (
|
|
portal.open_context(streamer) as (ctx, first),
|
|
ctx.open_stream() as stream,
|
|
):
|
|
yield stream
|
|
|
|
await portal.cancel_actor()
|
|
print('CANCELLED STREAMER')
|
|
|
|
|
|
@acm
|
|
async def maybe_open_stream(taskname: str):
|
|
async with tractor.trionics.maybe_open_context(
|
|
# NOTE: all secondary tasks should cache hit on the same key
|
|
acm_func=open_stream,
|
|
) as (cache_hit, stream):
|
|
|
|
if cache_hit:
|
|
print(f'{taskname} loaded from cache')
|
|
|
|
# add a new broadcast subscription for the quote stream
|
|
# if this feed is already allocated by the first
|
|
# task that entereed
|
|
async with stream.subscribe() as bstream:
|
|
yield bstream
|
|
else:
|
|
# yield the actual stream
|
|
yield stream
|
|
|
|
|
|
def test_open_local_sub_to_stream():
|
|
'''
|
|
Verify a single inter-actor stream can can be fanned-out shared to
|
|
N local tasks using ``trionics.maybe_open_context():``.
|
|
|
|
'''
|
|
timeout = 3 if platform.system() != "Windows" else 10
|
|
|
|
async def main():
|
|
|
|
full = list(range(1000))
|
|
|
|
async def get_sub_and_pull(taskname: str):
|
|
async with (
|
|
maybe_open_stream(taskname) as stream,
|
|
):
|
|
if '0' in taskname:
|
|
assert isinstance(stream, tractor.MsgStream)
|
|
else:
|
|
assert isinstance(
|
|
stream,
|
|
tractor.trionics.BroadcastReceiver
|
|
)
|
|
|
|
first = await stream.receive()
|
|
print(f'{taskname} started with value {first}')
|
|
seq = []
|
|
async for msg in stream:
|
|
seq.append(msg)
|
|
|
|
assert set(seq).issubset(set(full))
|
|
print(f'{taskname} finished')
|
|
|
|
with trio.fail_after(timeout):
|
|
# TODO: turns out this isn't multi-task entrant XD
|
|
# We probably need an indepotent entry semantic?
|
|
async with tractor.open_root_actor():
|
|
async with (
|
|
trio.open_nursery() as nurse,
|
|
):
|
|
for i in range(10):
|
|
nurse.start_soon(get_sub_and_pull, f'task_{i}')
|
|
await trio.sleep(0.001)
|
|
|
|
print('all consumer tasks finished')
|
|
|
|
trio.run(main)
|