tractor/tests/test_multi_program.py

150 lines
3.7 KiB
Python

"""
Multiple python programs invoking the runtime.
"""
from __future__ import annotations
import platform
import subprocess
import time
from typing import (
TYPE_CHECKING,
)
import pytest
import trio
import tractor
from tractor._testing import (
tractor_test,
)
from tractor import (
Actor,
Context,
Portal,
)
from .conftest import (
sig_prog,
_INT_SIGNAL,
_INT_RETURN_CODE,
)
if TYPE_CHECKING:
from tractor.msg import Aid
from tractor._addr import (
UnwrappedAddress,
)
def test_abort_on_sigint(
daemon: subprocess.Popen,
):
assert daemon.returncode is None
time.sleep(0.1)
sig_prog(daemon, _INT_SIGNAL)
assert daemon.returncode == _INT_RETURN_CODE
# XXX: oddly, couldn't get capfd.readouterr() to work here?
if platform.system() != 'Windows':
# don't check stderr on windows as its empty when sending CTRL_C_EVENT
assert "KeyboardInterrupt" in str(daemon.stderr.read())
@tractor_test
async def test_cancel_remote_arbiter(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
assert not tractor.current_actor().is_arbiter
async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
# the arbiter channel server is cancelled but not its main task
assert daemon.returncode is None
# no arbiter socket should exist
with pytest.raises(OSError):
async with tractor.get_registry(reg_addr) as portal:
pass
def test_register_duplicate_name(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
) as an:
assert not tractor.current_actor().is_arbiter
p1 = await an.start_actor('doggy')
p2 = await an.start_actor('doggy')
async with tractor.wait_for_actor('doggy') as portal:
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
await an.cancel()
# XXX, run manually since we want to start this root **after**
# the other "daemon" program with it's own root.
trio.run(main)
@tractor.context
async def get_root_portal(
ctx: Context,
):
'''
Connect back to the root actor manually (using `._discovery` API)
and ensure it's contact info is the same as our immediate parent.
'''
await ctx.started()
from tractor._discovery import get_root
ptl: Portal
async with get_root() as ptl:
root_aid: Aid = ptl.chan.aid
parent_ptl: Portal = tractor.current_actor().get_parent()
assert (
root_aid.name == 'root'
and
parent_ptl.chan.aid == root_aid
)
def test_non_registrar_spawns_child(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
'''
Ensure a non-regristar (serving) root actor can spawn a sub and
that sub can connect back (manually) to it's rent that is the
root without issue.
More or less this audits the global contact info in
`._state._runtime_vars`.
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
) as an:
actor: Actor = tractor.current_actor()
assert not actor.is_registrar
sub_ptl: Portal = await an.start_actor(
name='sub',
enable_modules=[__name__],
)
async with sub_ptl.open_context(
get_root_portal,
) as (ctx, first):
pass
await an.cancel()
# XXX, run manually since we want to start this root **after**
# the other "daemon" program with it's own root.
trio.run(main)