Tool-up `test_resource_cache.test_open_local_sub_to_stream`
Since I recently discovered a very subtle race-case that can sometimes cause the suite to hang, seemingly due to the `an: ActorNursery` allocated *behind* the `.trionics.maybe_open_context()` usage; this can result in never cancelling the 'streamer' subactor despite the `main()` timeout-guard? This led me to dig in and find that the underlying issue was 2-fold, - our `BroadcastReceiver` termination-mgmt semantics in `MsgStream.subscribe()` can result in the first subscribing task to always keep the `MsgStream._broadcaster` instance allocated; it's never `.aclose()`ed, which makes it tough to determine (and thus trace) when all subscriber-tasks are actually complete and exited-from-`.subscribe()`.. - i was shield waiting `.ipc._server.Server.wait_for_no_more_peers()` in `._runtime.async_main()`'s shutdown sequence which would then compound the issue resulting in a SIGINT-shielded hang.. the worst kind XD Actual changes here are just styling, printing, and some mucking with passing the `an`-ref up to the parent task in the root-actor where i was doing a conditional `ActorNursery.cancel()` to mk sure that was actually the problem. Presuming this is fixed the `.pause()` i left unmasked should never hit.multicast_revertable_streams
parent
20628cc0b8
commit
285ebba4b1
|
@ -72,11 +72,13 @@ def test_resource_only_entered_once(key_on):
|
|||
with trio.move_on_after(0.5):
|
||||
async with (
|
||||
tractor.open_root_actor(),
|
||||
trio.open_nursery() as n,
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
|
||||
for i in range(10):
|
||||
n.start_soon(enter_cached_mngr, f'task_{i}')
|
||||
tn.start_soon(
|
||||
enter_cached_mngr,
|
||||
f'task_{i}',
|
||||
)
|
||||
await trio.sleep(0.001)
|
||||
|
||||
trio.run(main)
|
||||
|
@ -98,21 +100,32 @@ async def streamer(
|
|||
|
||||
|
||||
@acm
|
||||
async def open_stream() -> Awaitable[tractor.MsgStream]:
|
||||
|
||||
async def open_stream() -> Awaitable[
|
||||
tuple[
|
||||
tractor.ActorNursery,
|
||||
tractor.MsgStream,
|
||||
]
|
||||
]:
|
||||
try:
|
||||
async with tractor.open_nursery() as an:
|
||||
portal = await an.start_actor(
|
||||
'streamer',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
try:
|
||||
async with (
|
||||
portal.open_context(streamer) as (ctx, first),
|
||||
ctx.open_stream() as stream,
|
||||
):
|
||||
yield stream
|
||||
print('Entered open_stream() caller')
|
||||
yield an, stream
|
||||
print('Exited open_stream() caller')
|
||||
# await tractor.pause(shield=True)
|
||||
|
||||
finally:
|
||||
print('Cancelling streamer')
|
||||
# await tractor.pause(shield=True)
|
||||
with trio.CancelScope(shield=True):
|
||||
await portal.cancel_actor()
|
||||
print('Cancelled streamer')
|
||||
|
||||
|
@ -130,8 +143,10 @@ async def maybe_open_stream(taskname: str):
|
|||
async with tractor.trionics.maybe_open_context(
|
||||
# NOTE: all secondary tasks should cache hit on the same key
|
||||
acm_func=open_stream,
|
||||
) as (cache_hit, stream):
|
||||
|
||||
) as (
|
||||
cache_hit,
|
||||
(an, stream)
|
||||
):
|
||||
if cache_hit:
|
||||
print(f'{taskname} loaded from cache')
|
||||
|
||||
|
@ -139,10 +154,32 @@ async def maybe_open_stream(taskname: str):
|
|||
# if this feed is already allocated by the first
|
||||
# task that entereed
|
||||
async with stream.subscribe() as bstream:
|
||||
yield bstream
|
||||
yield an, bstream
|
||||
print(
|
||||
f'cached task exited\n'
|
||||
f')>\n'
|
||||
f' |_{taskname}\n'
|
||||
)
|
||||
else:
|
||||
# yield the actual stream
|
||||
yield stream
|
||||
try:
|
||||
yield an, stream
|
||||
finally:
|
||||
print(
|
||||
f'NON-cached task exited\n'
|
||||
f')>\n'
|
||||
f' |_{taskname}\n'
|
||||
)
|
||||
|
||||
bstream = stream._broadcaster
|
||||
if (
|
||||
not bstream.state.subs
|
||||
and
|
||||
not cache_hit
|
||||
):
|
||||
await tractor.pause(shield=True)
|
||||
# await an.cancel()
|
||||
|
||||
|
||||
|
||||
def test_open_local_sub_to_stream(
|
||||
|
@ -159,16 +196,24 @@ def test_open_local_sub_to_stream(
|
|||
|
||||
if debug_mode:
|
||||
timeout = 999
|
||||
print(f'IN debug_mode, setting large timeout={timeout!r}..')
|
||||
|
||||
async def main():
|
||||
|
||||
full = list(range(1000))
|
||||
an: tractor.ActorNursery|None = None
|
||||
num_tasks: int = 10
|
||||
|
||||
async def get_sub_and_pull(taskname: str):
|
||||
|
||||
nonlocal an
|
||||
|
||||
stream: tractor.MsgStream
|
||||
async with (
|
||||
maybe_open_stream(taskname) as stream,
|
||||
maybe_open_stream(taskname) as (
|
||||
an,
|
||||
stream,
|
||||
),
|
||||
):
|
||||
if '0' in taskname:
|
||||
assert isinstance(stream, tractor.MsgStream)
|
||||
|
@ -180,34 +225,58 @@ def test_open_local_sub_to_stream(
|
|||
|
||||
first = await stream.receive()
|
||||
print(f'{taskname} started with value {first}')
|
||||
seq = []
|
||||
seq: list[int] = []
|
||||
async for msg in stream:
|
||||
seq.append(msg)
|
||||
|
||||
assert set(seq).issubset(set(full))
|
||||
|
||||
# end of @acm block
|
||||
print(f'{taskname} finished')
|
||||
|
||||
root: tractor.Actor
|
||||
with trio.fail_after(timeout) as cs:
|
||||
# TODO: turns out this isn't multi-task entrant XD
|
||||
# We probably need an indepotent entry semantic?
|
||||
async with tractor.open_root_actor(
|
||||
debug_mode=debug_mode,
|
||||
):
|
||||
# maybe_enable_greenback=True,
|
||||
#
|
||||
# ^TODO? doesn't seem to mk breakpoint() usage work
|
||||
# bc each bg task needs to open a portal??
|
||||
# - [ ] we should consider making this part of
|
||||
# our taskman defaults?
|
||||
# |_see https://github.com/goodboy/tractor/pull/363
|
||||
#
|
||||
) as root:
|
||||
assert root.is_registrar
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
for i in range(10):
|
||||
for i in range(num_tasks):
|
||||
tn.start_soon(
|
||||
get_sub_and_pull,
|
||||
f'task_{i}',
|
||||
)
|
||||
await trio.sleep(0.001)
|
||||
|
||||
print('all consumer tasks finished')
|
||||
print('all consumer tasks finished.')
|
||||
|
||||
# ensure actor-nursery is shutdown or we might
|
||||
# hang here..?
|
||||
# if root.ipc_server.has_peers():
|
||||
# await tractor.pause()
|
||||
# await an.cancel()
|
||||
|
||||
# end of runtime scope
|
||||
print('root actor terminated.')
|
||||
|
||||
if cs.cancelled_caught:
|
||||
pytest.fail(
|
||||
'Should NOT time out in `open_root_actor()` ?'
|
||||
)
|
||||
|
||||
print('exiting main.')
|
||||
|
||||
trio.run(main)
|
||||
|
|
Loading…
Reference in New Issue