Add a multi-task subscriber test
In combination with `.aclose()`-ing the async gen instance returned from `Portal.run()` this demonstrates the python bug: https://bugs.python.org/issue32526 I've commented out the line that triggers the bug for now since this case provides motivation for adding our own `trio.abc.ReceiveMemoryChannel` implementation to be used instead of async gens directly (returned from `Portal.run()`) since the latter is **not** task safe.trio_memchans
parent
61680b3729
commit
41c202db68
|
@ -4,7 +4,6 @@ from itertools import cycle
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from async_generator import aclosing
|
|
||||||
from tractor.testing import tractor_test
|
from tractor.testing import tractor_test
|
||||||
|
|
||||||
|
|
||||||
|
@ -30,10 +29,10 @@ def is_even(i):
|
||||||
|
|
||||||
|
|
||||||
@tractor.msg.pub
|
@tractor.msg.pub
|
||||||
async def pubber(get_topics):
|
async def pubber(get_topics, seed=10):
|
||||||
ss = tractor.current_actor().statespace
|
ss = tractor.current_actor().statespace
|
||||||
|
|
||||||
for i in cycle(range(10)):
|
for i in cycle(range(seed)):
|
||||||
|
|
||||||
# ensure topic subscriptions are as expected
|
# ensure topic subscriptions are as expected
|
||||||
ss['get_topics'] = get_topics
|
ss['get_topics'] = get_topics
|
||||||
|
@ -42,7 +41,11 @@ async def pubber(get_topics):
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
async def subs(which, pub_actor_name):
|
async def subs(
|
||||||
|
which, pub_actor_name, seed=10,
|
||||||
|
portal=None,
|
||||||
|
task_status=trio.TASK_STATUS_IGNORED,
|
||||||
|
):
|
||||||
if len(which) == 1:
|
if len(which) == 1:
|
||||||
if which[0] == 'even':
|
if which[0] == 'even':
|
||||||
pred = is_even
|
pred = is_even
|
||||||
|
@ -54,11 +57,43 @@ async def subs(which, pub_actor_name):
|
||||||
return isinstance(i, int)
|
return isinstance(i, int)
|
||||||
|
|
||||||
async with tractor.find_actor(pub_actor_name) as portal:
|
async with tractor.find_actor(pub_actor_name) as portal:
|
||||||
agen = await portal.run(__name__, 'pubber', topics=which)
|
agen = await portal.run(
|
||||||
async with aclosing(agen) as agen:
|
__name__, 'pubber',
|
||||||
|
topics=which,
|
||||||
|
seed=seed,
|
||||||
|
)
|
||||||
|
task_status.started(agen)
|
||||||
|
times = 10
|
||||||
|
count = 0
|
||||||
|
await agen.__anext__()
|
||||||
|
async for pkt in agen:
|
||||||
|
for topic, value in pkt.items():
|
||||||
|
assert pred(value)
|
||||||
|
count += 1
|
||||||
|
if count >= times:
|
||||||
|
break
|
||||||
|
|
||||||
|
await agen.aclose()
|
||||||
|
|
||||||
|
agen = await portal.run(
|
||||||
|
__name__, 'pubber',
|
||||||
|
topics=['odd'],
|
||||||
|
seed=seed,
|
||||||
|
)
|
||||||
|
|
||||||
|
await agen.__anext__()
|
||||||
|
count = 0
|
||||||
|
# async with aclosing(agen) as agen:
|
||||||
|
try:
|
||||||
async for pkt in agen:
|
async for pkt in agen:
|
||||||
for topic, value in pkt.items():
|
for topic, value in pkt.items():
|
||||||
assert pred(value)
|
pass
|
||||||
|
# assert pred(value)
|
||||||
|
count += 1
|
||||||
|
if count >= times:
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
await agen.aclose()
|
||||||
|
|
||||||
|
|
||||||
@tractor.msg.pub(tasks=['one', 'two'])
|
@tractor.msg.pub(tasks=['one', 'two'])
|
||||||
|
@ -101,7 +136,7 @@ async def test_required_args(callwith_expecterror):
|
||||||
'pub_actor',
|
'pub_actor',
|
||||||
['streamer', 'arbiter']
|
['streamer', 'arbiter']
|
||||||
)
|
)
|
||||||
def test_pubsub_multi_actor_subs(
|
def test_multi_actor_subs_arbiter_pub(
|
||||||
loglevel,
|
loglevel,
|
||||||
arb_addr,
|
arb_addr,
|
||||||
pub_actor,
|
pub_actor,
|
||||||
|
@ -115,7 +150,7 @@ def test_pubsub_multi_actor_subs(
|
||||||
|
|
||||||
name = 'arbiter'
|
name = 'arbiter'
|
||||||
|
|
||||||
if pub_actor is 'streamer':
|
if pub_actor == 'streamer':
|
||||||
# start the publisher as a daemon
|
# start the publisher as a daemon
|
||||||
master_portal = await n.start_actor(
|
master_portal = await n.start_actor(
|
||||||
'streamer',
|
'streamer',
|
||||||
|
@ -131,7 +166,7 @@ def test_pubsub_multi_actor_subs(
|
||||||
# block until 2nd actor is initialized
|
# block until 2nd actor is initialized
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if pub_actor is 'arbiter':
|
if pub_actor == 'arbiter':
|
||||||
# wait for publisher task to be spawned in a local RPC task
|
# wait for publisher task to be spawned in a local RPC task
|
||||||
while not ss.get('get_topics'):
|
while not ss.get('get_topics'):
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
@ -144,7 +179,7 @@ def test_pubsub_multi_actor_subs(
|
||||||
# block until 2nd actor is initialized
|
# block until 2nd actor is initialized
|
||||||
pass
|
pass
|
||||||
|
|
||||||
if pub_actor is 'arbiter':
|
if pub_actor == 'arbiter':
|
||||||
start = time.time()
|
start = time.time()
|
||||||
while 'odd' not in get_topics():
|
while 'odd' not in get_topics():
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
|
@ -166,13 +201,13 @@ def test_pubsub_multi_actor_subs(
|
||||||
await even_portal.cancel_actor()
|
await even_portal.cancel_actor()
|
||||||
await trio.sleep(0.5)
|
await trio.sleep(0.5)
|
||||||
|
|
||||||
if pub_actor is 'arbiter':
|
if pub_actor == 'arbiter':
|
||||||
assert 'even' not in get_topics()
|
assert 'even' not in get_topics()
|
||||||
|
|
||||||
await odd_portal.cancel_actor()
|
await odd_portal.cancel_actor()
|
||||||
await trio.sleep(1)
|
await trio.sleep(1)
|
||||||
|
|
||||||
if pub_actor is 'arbiter':
|
if pub_actor == 'arbiter':
|
||||||
while get_topics():
|
while get_topics():
|
||||||
await trio.sleep(0.1)
|
await trio.sleep(0.1)
|
||||||
if time.time() - start > 1:
|
if time.time() - start > 1:
|
||||||
|
@ -185,3 +220,43 @@ def test_pubsub_multi_actor_subs(
|
||||||
arbiter_addr=arb_addr,
|
arbiter_addr=arb_addr,
|
||||||
rpc_module_paths=[__name__],
|
rpc_module_paths=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_single_subactor_pub_multitask_subs(
|
||||||
|
loglevel,
|
||||||
|
arb_addr,
|
||||||
|
):
|
||||||
|
async def main():
|
||||||
|
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
|
portal = await n.start_actor(
|
||||||
|
'streamer',
|
||||||
|
rpc_module_paths=[__name__],
|
||||||
|
)
|
||||||
|
async with tractor.wait_for_actor('streamer'):
|
||||||
|
# block until 2nd actor is initialized
|
||||||
|
pass
|
||||||
|
|
||||||
|
async with trio.open_nursery() as tn:
|
||||||
|
agen = await tn.start(subs, ['even'], 'streamer')
|
||||||
|
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
tn.start_soon(subs, ['even'], 'streamer')
|
||||||
|
|
||||||
|
# XXX this will trigger the python bug:
|
||||||
|
# https://bugs.python.org/issue32526
|
||||||
|
# await agen.aclose()
|
||||||
|
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
tn.start_soon(subs, ['even'], 'streamer')
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
tn.start_soon(subs, ['even'], 'streamer')
|
||||||
|
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
tractor.run(
|
||||||
|
main,
|
||||||
|
arbiter_addr=arb_addr,
|
||||||
|
rpc_module_paths=[__name__],
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue