tractor/tests/test_local.py

73 lines
1.9 KiB
Python
Raw Permalink Normal View History

"""
Arbiter and "local" actor api
"""
2018-06-12 19:23:58 +00:00
import time
import pytest
2018-06-12 19:23:58 +00:00
import trio
import tractor
Add (back) a `tractor._testing` sub-pkg Since importing from our top level `conftest.py` is not scaleable or as "future forward thinking" in terms of: - LoC-wise (it's only one file), - prevents "external" (aka non-test) example scripts from importing content easily, - seemingly(?) can't be used via abs-import if using a `[tool.pytest.ini_options]` in a `pyproject.toml` vs. a `pytest.ini`, see: https://docs.pytest.org/en/8.0.x/reference/customize.html#pyproject-toml) => Go back to having an internal "testing" pkg like `trio` (kinda) does. Deats: - move generic top level helpers into pkg-mod including the new `expect_ctxc()` (which i needed in the advanced faults testing script. - move `@tractor_test` into `._testing.pytest` sub-mod. - adjust all the helper imports to be a `from tractor._testing import <..>` Rework `test_ipc_channel_break_during_stream()` and backing script: - make test(s) pull `debug_mode` from new fixture (which is now controlled manually from `--tpdb` flag) and drop the previous parametrized input. - update logic in ^ test for "which-side-fails" cases to better match recently updated/stricter cancel/failure semantics in terms of `ClosedResouruceError` vs. `EndOfChannel` expectations. - handle `ExceptionGroup`s with expected embedded errors in test. - better pendantics around whether to expect a user simulated KBI. - for `examples/advanced_faults/ipc_failure_during_stream.py` script: - generalize ipc breakage in new `break_ipc()` with support for diff internal `trio` methods and a #TODO for future disti frameworks - only make one sub-actor task break and the other just stream. - use new `._testing.expect_ctxc()` around ctx block. - add a bit of exception handling with `print()`s around ctxc (unused except if 'msg' break method is set) and eoc cases. - don't break parent side ipc in loop any more then once after first break, checked via flag var. - add a `pre_close: bool` flag to control whether `MsgStreama.aclose()` is called *before* any ipc breakage method. Still TODO: - drop `pytest.ini` and add the alt section to `pyproject.py`. -> currently can't get `--rootdir=` opt to work.. not showing in console header. -> ^ also breaks on 'tests' `enable_modules` imports in subactors during discovery tests?
2024-03-12 19:48:20 +00:00
from tractor._testing import tractor_test
2018-06-12 19:23:58 +00:00
@pytest.mark.trio
async def test_no_runtime():
2018-06-12 19:23:58 +00:00
"""An arbitter must be established before any nurseries
can be created.
2021-02-24 19:59:48 +00:00
(In other words ``tractor.open_root_actor()`` must be engaged at
some point?)
2018-06-12 19:23:58 +00:00
"""
with pytest.raises(RuntimeError) :
async with tractor.find_actor('doggy'):
2018-06-12 19:23:58 +00:00
pass
2018-09-21 04:32:23 +00:00
@tractor_test
async def test_self_is_registered(reg_addr):
2018-09-21 04:32:23 +00:00
"Verify waiting on the arbiter to register itself using the standard api."
actor = tractor.current_actor()
assert actor.is_arbiter
2021-01-09 01:40:49 +00:00
with trio.fail_after(0.2):
async with tractor.wait_for_actor('root') as portal:
assert portal.channel.uid[0] == 'root'
2018-09-21 04:32:23 +00:00
@tractor_test
async def test_self_is_registered_localportal(reg_addr):
2018-09-21 04:32:23 +00:00
"Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
async with tractor.get_arbiter(*reg_addr) as portal:
2018-09-21 04:32:23 +00:00
assert isinstance(portal, tractor._portal.LocalPortal)
2021-01-09 01:40:49 +00:00
with trio.fail_after(0.2):
2021-02-24 19:59:48 +00:00
sockaddr = await portal.run_from_ns(
'self', 'wait_for_actor', name='root')
assert sockaddr[0] == reg_addr
2018-09-21 04:32:23 +00:00
def test_local_actor_async_func(reg_addr):
2018-06-12 19:23:58 +00:00
"""Verify a simple async function in-process.
"""
nums = []
async def print_loop():
2021-02-24 19:59:48 +00:00
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
2021-02-24 19:59:48 +00:00
):
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter
for i in range(10):
nums.append(i)
await trio.sleep(0.1)
2018-06-12 19:23:58 +00:00
start = time.time()
2021-02-24 19:59:48 +00:00
trio.run(print_loop)
2018-06-12 19:23:58 +00:00
# ensure the sleeps were actually awaited
assert time.time() - start >= 1
assert nums == list(range(10))