forked from goodboy/tractor
1
0
Fork 0
tractor/tests/test_pubsub.py

293 lines
7.7 KiB
Python
Raw Permalink Normal View History

2019-01-24 03:35:04 +00:00
import time
2019-01-21 17:31:03 +00:00
from itertools import cycle
import pytest
import trio
import tractor
from tractor.experimental import msgpub
Add (back) a `tractor._testing` sub-pkg Since importing from our top level `conftest.py` is not scaleable or as "future forward thinking" in terms of: - LoC-wise (it's only one file), - prevents "external" (aka non-test) example scripts from importing content easily, - seemingly(?) can't be used via abs-import if using a `[tool.pytest.ini_options]` in a `pyproject.toml` vs. a `pytest.ini`, see: https://docs.pytest.org/en/8.0.x/reference/customize.html#pyproject-toml) => Go back to having an internal "testing" pkg like `trio` (kinda) does. Deats: - move generic top level helpers into pkg-mod including the new `expect_ctxc()` (which i needed in the advanced faults testing script. - move `@tractor_test` into `._testing.pytest` sub-mod. - adjust all the helper imports to be a `from tractor._testing import <..>` Rework `test_ipc_channel_break_during_stream()` and backing script: - make test(s) pull `debug_mode` from new fixture (which is now controlled manually from `--tpdb` flag) and drop the previous parametrized input. - update logic in ^ test for "which-side-fails" cases to better match recently updated/stricter cancel/failure semantics in terms of `ClosedResouruceError` vs. `EndOfChannel` expectations. - handle `ExceptionGroup`s with expected embedded errors in test. - better pendantics around whether to expect a user simulated KBI. - for `examples/advanced_faults/ipc_failure_during_stream.py` script: - generalize ipc breakage in new `break_ipc()` with support for diff internal `trio` methods and a #TODO for future disti frameworks - only make one sub-actor task break and the other just stream. - use new `._testing.expect_ctxc()` around ctx block. - add a bit of exception handling with `print()`s around ctxc (unused except if 'msg' break method is set) and eoc cases. - don't break parent side ipc in loop any more then once after first break, checked via flag var. - add a `pre_close: bool` flag to control whether `MsgStreama.aclose()` is called *before* any ipc breakage method. Still TODO: - drop `pytest.ini` and add the alt section to `pyproject.py`. -> currently can't get `--rootdir=` opt to work.. not showing in console header. -> ^ also breaks on 'tests' `enable_modules` imports in subactors during discovery tests?
2024-03-12 19:48:20 +00:00
from tractor._testing import tractor_test
2022-10-09 22:03:17 +00:00
2019-01-21 17:31:03 +00:00
def test_type_checks():
with pytest.raises(TypeError) as err:
@msgpub
async def no_get_topics(yo):
yield
assert "must define a `get_topics`" in str(err.value)
with pytest.raises(TypeError) as err:
@msgpub
def not_async_gen(yo):
pass
assert "must be an async generator function" in str(err.value)
2019-01-21 17:31:03 +00:00
def is_even(i):
return i % 2 == 0
# placeholder for topics getter
_get_topics = None
@msgpub
async def pubber(get_topics, seed=10):
2019-01-24 03:35:04 +00:00
# ensure topic subscriptions are as expected
global _get_topics
_get_topics = get_topics
2019-01-24 03:35:04 +00:00
for i in cycle(range(seed)):
2019-01-24 03:35:04 +00:00
2019-01-21 17:31:03 +00:00
yield {'even' if is_even(i) else 'odd': i}
await trio.sleep(0.1)
async def subs(
which,
pub_actor_name,
seed=10,
task_status=trio.TASK_STATUS_IGNORED,
):
2019-01-21 17:31:03 +00:00
if len(which) == 1:
if which[0] == 'even':
pred = is_even
else:
def pred(i):
return not is_even(i)
2019-01-21 17:31:03 +00:00
else:
def pred(i):
return isinstance(i, int)
2019-01-21 17:31:03 +00:00
# TODO: https://github.com/goodboy/tractor/issues/207
async with tractor.wait_for_actor(pub_actor_name) as portal:
assert portal
async with portal.open_stream_from(
pubber,
topics=which,
seed=seed,
) as stream:
2021-04-28 15:55:37 +00:00
task_status.started(stream)
times = 10
count = 0
await stream.__anext__()
async for pkt in stream:
2019-01-21 17:31:03 +00:00
for topic, value in pkt.items():
2021-04-28 15:55:37 +00:00
assert pred(value)
count += 1
if count >= times:
break
2021-04-28 15:55:37 +00:00
await stream.aclose()
2019-01-21 17:31:03 +00:00
async with portal.open_stream_from(
pubber,
topics=['odd'],
seed=seed,
) as stream:
2021-04-28 15:55:37 +00:00
await stream.__anext__()
count = 0
# async with aclosing(stream) as stream:
try:
async for pkt in stream:
for topic, value in pkt.items():
pass
# assert pred(value)
count += 1
if count >= times:
break
finally:
await stream.aclose()
2019-01-21 17:31:03 +00:00
@msgpub(tasks=['one', 'two'])
async def multilock_pubber(get_topics):
yield {'doggy': 10}
@pytest.mark.parametrize(
'callwith_expecterror',
[
(pubber, {}, TypeError),
# missing a `topics`
(multilock_pubber, {'ctx': None}, TypeError),
# missing a `task_name`
(multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError),
# should work
(multilock_pubber,
{'ctx': None, 'topics': ['doggy'], 'task_name': 'one'},
None),
],
)
@tractor_test
async def test_required_args(callwith_expecterror):
func, kwargs, err = callwith_expecterror
if err is not None:
with pytest.raises(err):
await func(**kwargs)
else:
async with tractor.open_nursery() as n:
2021-04-28 15:55:37 +00:00
portal = await n.start_actor(
name='pubber',
2021-04-28 15:55:37 +00:00
enable_modules=[__name__],
)
async with tractor.wait_for_actor('pubber'):
pass
await trio.sleep(0.5)
2021-04-28 15:55:37 +00:00
async with portal.open_stream_from(
multilock_pubber,
**kwargs
) as stream:
async for val in stream:
assert val == {'doggy': 10}
await portal.cancel_actor()
2019-01-24 03:35:04 +00:00
@pytest.mark.parametrize(
'pub_actor',
['streamer', 'arbiter']
)
def test_multi_actor_subs_arbiter_pub(
2019-01-21 17:31:03 +00:00
loglevel,
reg_addr,
2019-01-24 03:35:04 +00:00
pub_actor,
2019-01-21 17:31:03 +00:00
):
2019-01-24 03:35:04 +00:00
"""Try out the neato @pub decorator system.
"""
global _get_topics
2019-01-21 17:31:03 +00:00
async def main():
2019-01-24 03:35:04 +00:00
async with tractor.open_nursery(
registry_addrs=[reg_addr],
enable_modules=[__name__],
) as n:
2019-01-21 17:31:03 +00:00
name = 'root'
2019-01-24 03:35:04 +00:00
if pub_actor == 'streamer':
2019-01-24 03:35:04 +00:00
# start the publisher as a daemon
master_portal = await n.start_actor(
'streamer',
enable_modules=[__name__],
2019-01-24 03:35:04 +00:00
)
2021-10-12 16:03:57 +00:00
name = 'streamer'
2019-01-24 03:35:04 +00:00
even_portal = await n.run_in_actor(
subs,
which=['even'],
name='evens',
pub_actor_name=name
)
2019-01-24 03:35:04 +00:00
odd_portal = await n.run_in_actor(
subs,
which=['odd'],
name='odds',
pub_actor_name=name
)
2019-01-24 03:35:04 +00:00
async with tractor.wait_for_actor('evens'):
# block until 2nd actor is initialized
pass
if pub_actor == 'arbiter':
2019-01-24 03:35:04 +00:00
# wait for publisher task to be spawned in a local RPC task
while _get_topics is None:
2019-01-24 03:35:04 +00:00
await trio.sleep(0.1)
get_topics = _get_topics
2019-01-24 03:35:04 +00:00
assert 'even' in get_topics()
2019-01-21 17:31:03 +00:00
async with tractor.wait_for_actor('odds'):
# block until 2nd actor is initialized
pass
if pub_actor == 'arbiter':
2019-01-24 03:35:04 +00:00
start = time.time()
while 'odd' not in get_topics():
await trio.sleep(0.1)
if time.time() - start > 1:
pytest.fail("odds subscription never arrived?")
2019-01-21 17:31:03 +00:00
# TODO: how to make this work when the arbiter gets
# a portal to itself? Currently this causes a hang
# when the channel server is torn down due to a lingering
# loopback channel
# with trio.move_on_after(1):
# await subs(['even', 'odd'])
# XXX: this would cause infinite
# blocking due to actor never terminating loop
# await even_portal.result()
2019-01-24 03:35:04 +00:00
await trio.sleep(0.5)
2019-01-21 17:31:03 +00:00
await even_portal.cancel_actor()
await trio.sleep(1)
2019-01-24 03:35:04 +00:00
if pub_actor == 'arbiter':
2019-01-24 03:35:04 +00:00
assert 'even' not in get_topics()
2019-01-21 17:31:03 +00:00
await odd_portal.cancel_actor()
if pub_actor == 'arbiter':
2019-01-24 03:35:04 +00:00
while get_topics():
await trio.sleep(0.1)
if time.time() - start > 2:
2019-01-24 03:35:04 +00:00
pytest.fail("odds subscription never dropped?")
else:
await master_portal.cancel_actor()
2019-01-21 17:31:03 +00:00
trio.run(main)
def test_single_subactor_pub_multitask_subs(
loglevel,
reg_addr,
):
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
enable_modules=[__name__],
) as n:
portal = await n.start_actor(
'streamer',
enable_modules=[__name__],
)
async with tractor.wait_for_actor('streamer'):
# block until 2nd actor is initialized
pass
async with trio.open_nursery() as tn:
agen = await tn.start(subs, ['even'], 'streamer')
await trio.sleep(0.1)
tn.start_soon(subs, ['even'], 'streamer')
# XXX this will trigger the python bug:
# https://bugs.python.org/issue32526
# if using async generators to wrap tractor channels
await agen.aclose()
await trio.sleep(0.1)
tn.start_soon(subs, ['even'], 'streamer')
await trio.sleep(0.1)
tn.start_soon(subs, ['even'], 'streamer')
await portal.cancel_actor()
trio.run(main)