Improve bit of tooling for `test_resource_cache.py`
Namely while what I was actually trying to solve was why `TransportClosed` was getting raised from `Portal.cancel_actor()` but still useful edge case auditing either way. Also opts into the `debug_mode` fixture with apprope timeout adjustment B)structural_dynamics_of_flow
parent
9807318e3d
commit
0f8b299b4f
|
@ -100,16 +100,29 @@ async def streamer(
|
|||
@acm
|
||||
async def open_stream() -> Awaitable[tractor.MsgStream]:
|
||||
|
||||
async with tractor.open_nursery() as tn:
|
||||
portal = await tn.start_actor('streamer', enable_modules=[__name__])
|
||||
async with (
|
||||
portal.open_context(streamer) as (ctx, first),
|
||||
ctx.open_stream() as stream,
|
||||
):
|
||||
yield stream
|
||||
try:
|
||||
async with tractor.open_nursery() as an:
|
||||
portal = await an.start_actor(
|
||||
'streamer',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
async with (
|
||||
portal.open_context(streamer) as (ctx, first),
|
||||
ctx.open_stream() as stream,
|
||||
):
|
||||
yield stream
|
||||
|
||||
await portal.cancel_actor()
|
||||
print('CANCELLED STREAMER')
|
||||
print('Cancelling streamer')
|
||||
await portal.cancel_actor()
|
||||
print('Cancelled streamer')
|
||||
|
||||
except Exception as err:
|
||||
print(
|
||||
f'`open_stream()` errored?\n'
|
||||
f'{err!r}\n'
|
||||
)
|
||||
await tractor.pause(shield=True)
|
||||
raise err
|
||||
|
||||
|
||||
@acm
|
||||
|
@ -132,19 +145,28 @@ async def maybe_open_stream(taskname: str):
|
|||
yield stream
|
||||
|
||||
|
||||
def test_open_local_sub_to_stream():
|
||||
def test_open_local_sub_to_stream(
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify a single inter-actor stream can can be fanned-out shared to
|
||||
N local tasks using ``trionics.maybe_open_context():``.
|
||||
N local tasks using `trionics.maybe_open_context()`.
|
||||
|
||||
'''
|
||||
timeout: float = 3.6 if platform.system() != "Windows" else 10
|
||||
timeout: float = 3.6
|
||||
if platform.system() == "Windows":
|
||||
timeout: float = 10
|
||||
|
||||
if debug_mode:
|
||||
timeout = 999
|
||||
|
||||
async def main():
|
||||
|
||||
full = list(range(1000))
|
||||
|
||||
async def get_sub_and_pull(taskname: str):
|
||||
|
||||
stream: tractor.MsgStream
|
||||
async with (
|
||||
maybe_open_stream(taskname) as stream,
|
||||
):
|
||||
|
@ -165,17 +187,27 @@ def test_open_local_sub_to_stream():
|
|||
assert set(seq).issubset(set(full))
|
||||
print(f'{taskname} finished')
|
||||
|
||||
with trio.fail_after(timeout):
|
||||
with trio.fail_after(timeout) as cs:
|
||||
# TODO: turns out this isn't multi-task entrant XD
|
||||
# We probably need an indepotent entry semantic?
|
||||
async with tractor.open_root_actor():
|
||||
async with tractor.open_root_actor(
|
||||
debug_mode=debug_mode,
|
||||
):
|
||||
async with (
|
||||
trio.open_nursery() as nurse,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
for i in range(10):
|
||||
nurse.start_soon(get_sub_and_pull, f'task_{i}')
|
||||
tn.start_soon(
|
||||
get_sub_and_pull,
|
||||
f'task_{i}',
|
||||
)
|
||||
await trio.sleep(0.001)
|
||||
|
||||
print('all consumer tasks finished')
|
||||
|
||||
if cs.cancelled_caught:
|
||||
pytest.fail(
|
||||
'Should NOT time out in `open_root_actor()` ?'
|
||||
)
|
||||
|
||||
trio.run(main)
|
||||
|
|
Loading…
Reference in New Issue