Compare commits
	
		
			14 Commits 
		
	
	
		
			ba384ca83d
			...
			2248ffb74f
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						2248ffb74f | |
| 
							
							
								 | 
						1eb0d785a8 | |
| 
							
							
								 | 
						98d0ca88e5 | |
| 
							
							
								 | 
						37f843a128 | |
| 
							
							
								 | 
						29cd2ddbac | |
| 
							
							
								 | 
						295b06511b | |
| 
							
							
								 | 
						1e6b5b3f0a | |
| 
							
							
								 | 
						36ddb85197 | |
| 
							
							
								 | 
						d6b0ddecd7 | |
| 
							
							
								 | 
						9e5475391c | |
| 
							
							
								 | 
						ef7ed7ac6f | |
| 
							
							
								 | 
						d8094f4420 | |
| 
							
							
								 | 
						d7b12735a8 | |
| 
							
							
								 | 
						47107e44ed | 
| 
						 | 
				
			
			@ -1,8 +1,5 @@
 | 
			
		|||
|logo| ``tractor``: distributed structurred concurrency
 | 
			
		||||
 | 
			
		||||
|gh_actions|
 | 
			
		||||
|docs|
 | 
			
		||||
 | 
			
		||||
``tractor`` is a `structured concurrency`_ (SC), multi-processing_ runtime built on trio_.
 | 
			
		||||
 | 
			
		||||
Fundamentally, ``tractor`` provides parallelism via
 | 
			
		||||
| 
						 | 
				
			
			@ -66,6 +63,13 @@ Features
 | 
			
		|||
  - (WIP) a ``TaskMngr``: one-cancels-one style nursery supervisor.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
Status of `main` / infra
 | 
			
		||||
------------------------
 | 
			
		||||
 | 
			
		||||
- |gh_actions|
 | 
			
		||||
- |docs|
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
Install
 | 
			
		||||
-------
 | 
			
		||||
``tractor`` is still in a *alpha-near-beta-stage* for many
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -120,6 +120,7 @@ async def main(
 | 
			
		|||
    break_parent_ipc_after: int|bool = False,
 | 
			
		||||
    break_child_ipc_after: int|bool = False,
 | 
			
		||||
    pre_close: bool = False,
 | 
			
		||||
    tpt_proto: str = 'tcp',
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -131,6 +132,7 @@ async def main(
 | 
			
		|||
            # a hang since it never engages due to broken IPC
 | 
			
		||||
            debug_mode=debug_mode,
 | 
			
		||||
            loglevel=loglevel,
 | 
			
		||||
            enable_transports=[tpt_proto],
 | 
			
		||||
 | 
			
		||||
        ) as an,
 | 
			
		||||
    ):
 | 
			
		||||
| 
						 | 
				
			
			@ -145,7 +147,8 @@ async def main(
 | 
			
		|||
            _testing.expect_ctxc(
 | 
			
		||||
                yay=(
 | 
			
		||||
                    break_parent_ipc_after
 | 
			
		||||
                    or break_child_ipc_after
 | 
			
		||||
                    or
 | 
			
		||||
                    break_child_ipc_after
 | 
			
		||||
                ),
 | 
			
		||||
                # TODO: we CAN'T remove this right?
 | 
			
		||||
                # since we need the ctxc to bubble up from either
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -24,10 +24,9 @@ async def spawn_until(depth=0):
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
async def main():
 | 
			
		||||
    """The main ``tractor`` routine.
 | 
			
		||||
 | 
			
		||||
    The process tree should look as approximately as follows when the debugger
 | 
			
		||||
    first engages:
 | 
			
		||||
    '''
 | 
			
		||||
    The process tree should look as approximately as follows when the
 | 
			
		||||
    debugger first engages:
 | 
			
		||||
 | 
			
		||||
    python examples/debugging/multi_nested_subactors_bp_forever.py
 | 
			
		||||
    ├─ python -m tractor._child --uid ('spawner1', '7eab8462 ...)
 | 
			
		||||
| 
						 | 
				
			
			@ -37,10 +36,11 @@ async def main():
 | 
			
		|||
    └─ python -m tractor._child --uid ('spawner0', '1d42012b ...)
 | 
			
		||||
       └─ python -m tractor._child --uid ('name_error', '6c2733b8 ...)
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    '''
 | 
			
		||||
    async with tractor.open_nursery(
 | 
			
		||||
        debug_mode=True,
 | 
			
		||||
        loglevel='warning'
 | 
			
		||||
        loglevel='devx',
 | 
			
		||||
        enable_transports=['uds'],
 | 
			
		||||
    ) as n:
 | 
			
		||||
 | 
			
		||||
        # spawn both actors
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,6 +37,7 @@ async def main(
 | 
			
		|||
            enable_stack_on_sig=True,
 | 
			
		||||
            # maybe_enable_greenback=False,
 | 
			
		||||
            loglevel='devx',
 | 
			
		||||
            enable_transports=['uds'],
 | 
			
		||||
        ) as an,
 | 
			
		||||
    ):
 | 
			
		||||
        ptl: tractor.Portal  = await an.start_actor(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,24 +1,27 @@
 | 
			
		|||
"""
 | 
			
		||||
``tractor`` testing!!
 | 
			
		||||
Top level of the testing suites!
 | 
			
		||||
 | 
			
		||||
"""
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
import sys
 | 
			
		||||
import subprocess
 | 
			
		||||
import os
 | 
			
		||||
import random
 | 
			
		||||
import signal
 | 
			
		||||
import platform
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor._testing import (
 | 
			
		||||
    examples_dir as examples_dir,
 | 
			
		||||
    tractor_test as tractor_test,
 | 
			
		||||
    expect_ctxc as expect_ctxc,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
# TODO: include wtv plugin(s) we build in `._testing.pytest`?
 | 
			
		||||
pytest_plugins = ['pytester']
 | 
			
		||||
pytest_plugins: list[str] = [
 | 
			
		||||
    'pytester',
 | 
			
		||||
    'tractor._testing.pytest',
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
 | 
			
		||||
if platform.system() == 'Windows':
 | 
			
		||||
| 
						 | 
				
			
			@ -30,7 +33,11 @@ else:
 | 
			
		|||
    _KILL_SIGNAL = signal.SIGKILL
 | 
			
		||||
    _INT_SIGNAL = signal.SIGINT
 | 
			
		||||
    _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
 | 
			
		||||
    _PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4
 | 
			
		||||
    _PROC_SPAWN_WAIT = (
 | 
			
		||||
        0.6
 | 
			
		||||
        if sys.version_info < (3, 7)
 | 
			
		||||
        else 0.4
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
no_windows = pytest.mark.skipif(
 | 
			
		||||
| 
						 | 
				
			
			@ -39,7 +46,12 @@ no_windows = pytest.mark.skipif(
 | 
			
		|||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def pytest_addoption(parser):
 | 
			
		||||
def pytest_addoption(
 | 
			
		||||
    parser: pytest.Parser,
 | 
			
		||||
):
 | 
			
		||||
    # ?TODO? should this be exposed from our `._testing.pytest`
 | 
			
		||||
    # plugin or should we make it more explicit with `--tl` for
 | 
			
		||||
    # tractor logging like we do in other client projects?
 | 
			
		||||
    parser.addoption(
 | 
			
		||||
        "--ll",
 | 
			
		||||
        action="store",
 | 
			
		||||
| 
						 | 
				
			
			@ -47,42 +59,10 @@ def pytest_addoption(parser):
 | 
			
		|||
        default='ERROR', help="logging level to set when testing"
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    parser.addoption(
 | 
			
		||||
        "--spawn-backend",
 | 
			
		||||
        action="store",
 | 
			
		||||
        dest='spawn_backend',
 | 
			
		||||
        default='trio',
 | 
			
		||||
        help="Processing spawning backend to use for test run",
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    parser.addoption(
 | 
			
		||||
        "--tpdb", "--debug-mode",
 | 
			
		||||
        action="store_true",
 | 
			
		||||
        dest='tractor_debug_mode',
 | 
			
		||||
        # default=False,
 | 
			
		||||
        help=(
 | 
			
		||||
            'Enable a flag that can be used by tests to to set the '
 | 
			
		||||
            '`debug_mode: bool` for engaging the internal '
 | 
			
		||||
            'multi-proc debugger sys.'
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def pytest_configure(config):
 | 
			
		||||
    backend = config.option.spawn_backend
 | 
			
		||||
    tractor._spawn.try_set_start_method(backend)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='session')
 | 
			
		||||
def debug_mode(request):
 | 
			
		||||
    debug_mode: bool = request.config.option.tractor_debug_mode
 | 
			
		||||
    # if debug_mode:
 | 
			
		||||
    #     breakpoint()
 | 
			
		||||
    return debug_mode
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='session', autouse=True)
 | 
			
		||||
def loglevel(request):
 | 
			
		||||
    import tractor
 | 
			
		||||
    orig = tractor.log._default_loglevel
 | 
			
		||||
    level = tractor.log._default_loglevel = request.config.option.loglevel
 | 
			
		||||
    tractor.log.get_console_log(level)
 | 
			
		||||
| 
						 | 
				
			
			@ -90,106 +70,44 @@ def loglevel(request):
 | 
			
		|||
    tractor.log._default_loglevel = orig
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='session')
 | 
			
		||||
def spawn_backend(request) -> str:
 | 
			
		||||
    return request.config.option.spawn_backend
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# @pytest.fixture(scope='function', autouse=True)
 | 
			
		||||
# def debug_enabled(request) -> str:
 | 
			
		||||
#     from tractor import _state
 | 
			
		||||
#     if _state._runtime_vars['_debug_mode']:
 | 
			
		||||
#         breakpoint()
 | 
			
		||||
 | 
			
		||||
_ci_env: bool = os.environ.get('CI', False)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='session')
 | 
			
		||||
def ci_env() -> bool:
 | 
			
		||||
    '''
 | 
			
		||||
    Detect CI envoirment.
 | 
			
		||||
    Detect CI environment.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    return _ci_env
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: also move this to `._testing` for now?
 | 
			
		||||
# -[ ] possibly generalize and re-use for multi-tree spawning
 | 
			
		||||
#    along with the new stuff for multi-addrs in distribute_dis
 | 
			
		||||
#    branch?
 | 
			
		||||
#
 | 
			
		||||
# choose randomly at import time
 | 
			
		||||
_reg_addr: tuple[str, int] = (
 | 
			
		||||
    '127.0.0.1',
 | 
			
		||||
    random.randint(1000, 9999),
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='session')
 | 
			
		||||
def reg_addr() -> tuple[str, int]:
 | 
			
		||||
 | 
			
		||||
    # globally override the runtime to the per-test-session-dynamic
 | 
			
		||||
    # addr so that all tests never conflict with any other actor
 | 
			
		||||
    # tree using the default.
 | 
			
		||||
    from tractor import _root
 | 
			
		||||
    _root._default_lo_addrs = [_reg_addr]
 | 
			
		||||
 | 
			
		||||
    return _reg_addr
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def pytest_generate_tests(metafunc):
 | 
			
		||||
    spawn_backend = metafunc.config.option.spawn_backend
 | 
			
		||||
 | 
			
		||||
    if not spawn_backend:
 | 
			
		||||
        # XXX some weird windows bug with `pytest`?
 | 
			
		||||
        spawn_backend = 'trio'
 | 
			
		||||
 | 
			
		||||
    # TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
 | 
			
		||||
    assert spawn_backend in (
 | 
			
		||||
        'mp_spawn',
 | 
			
		||||
        'mp_forkserver',
 | 
			
		||||
        'trio',
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # NOTE: used to be used to dyanmically parametrize tests for when
 | 
			
		||||
    # you just passed --spawn-backend=`mp` on the cli, but now we expect
 | 
			
		||||
    # that cli input to be manually specified, BUT, maybe we'll do
 | 
			
		||||
    # something like this again in the future?
 | 
			
		||||
    if 'start_method' in metafunc.fixturenames:
 | 
			
		||||
        metafunc.parametrize("start_method", [spawn_backend], scope='module')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: a way to let test scripts (like from `examples/`)
 | 
			
		||||
# guarantee they won't registry addr collide!
 | 
			
		||||
# @pytest.fixture
 | 
			
		||||
# def open_test_runtime(
 | 
			
		||||
#     reg_addr: tuple,
 | 
			
		||||
# ) -> AsyncContextManager:
 | 
			
		||||
#     return partial(
 | 
			
		||||
#         tractor.open_nursery,
 | 
			
		||||
#         registry_addrs=[reg_addr],
 | 
			
		||||
#     )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def sig_prog(proc, sig):
 | 
			
		||||
def sig_prog(
 | 
			
		||||
    proc: subprocess.Popen,
 | 
			
		||||
    sig: int,
 | 
			
		||||
    canc_timeout: float = 0.1,
 | 
			
		||||
) -> int:
 | 
			
		||||
    "Kill the actor-process with ``sig``."
 | 
			
		||||
    proc.send_signal(sig)
 | 
			
		||||
    time.sleep(0.1)
 | 
			
		||||
    time.sleep(canc_timeout)
 | 
			
		||||
    if not proc.poll():
 | 
			
		||||
        # TODO: why sometimes does SIGINT not work on teardown?
 | 
			
		||||
        # seems to happen only when trace logging enabled?
 | 
			
		||||
        proc.send_signal(_KILL_SIGNAL)
 | 
			
		||||
    ret = proc.wait()
 | 
			
		||||
    ret: int = proc.wait()
 | 
			
		||||
    assert ret
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: factor into @cm and move to `._testing`?
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def daemon(
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    loglevel: str,
 | 
			
		||||
    testdir,
 | 
			
		||||
    reg_addr: tuple[str, int],
 | 
			
		||||
):
 | 
			
		||||
    tpt_proto: str,
 | 
			
		||||
 | 
			
		||||
) -> subprocess.Popen:
 | 
			
		||||
    '''
 | 
			
		||||
    Run a daemon root actor as a separate actor-process tree and
 | 
			
		||||
    "remote registrar" for discovery-protocol related tests.
 | 
			
		||||
| 
						 | 
				
			
			@ -200,28 +118,100 @@ def daemon(
 | 
			
		|||
        loglevel: str = 'info'
 | 
			
		||||
 | 
			
		||||
    code: str = (
 | 
			
		||||
            "import tractor; "
 | 
			
		||||
            "tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})"
 | 
			
		||||
        "import tractor; "
 | 
			
		||||
        "tractor.run_daemon([], "
 | 
			
		||||
        "registry_addrs={reg_addrs}, "
 | 
			
		||||
        "debug_mode={debug_mode}, "
 | 
			
		||||
        "loglevel={ll})"
 | 
			
		||||
    ).format(
 | 
			
		||||
        reg_addrs=str([reg_addr]),
 | 
			
		||||
        ll="'{}'".format(loglevel) if loglevel else None,
 | 
			
		||||
        debug_mode=debug_mode,
 | 
			
		||||
    )
 | 
			
		||||
    cmd: list[str] = [
 | 
			
		||||
        sys.executable,
 | 
			
		||||
        '-c', code,
 | 
			
		||||
    ]
 | 
			
		||||
    # breakpoint()
 | 
			
		||||
    kwargs = {}
 | 
			
		||||
    if platform.system() == 'Windows':
 | 
			
		||||
        # without this, tests hang on windows forever
 | 
			
		||||
        kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
 | 
			
		||||
 | 
			
		||||
    proc = testdir.popen(
 | 
			
		||||
    proc: subprocess.Popen = testdir.popen(
 | 
			
		||||
        cmd,
 | 
			
		||||
        stdout=subprocess.PIPE,
 | 
			
		||||
        stderr=subprocess.PIPE,
 | 
			
		||||
        **kwargs,
 | 
			
		||||
    )
 | 
			
		||||
    assert not proc.returncode
 | 
			
		||||
 | 
			
		||||
    # UDS sockets are **really** fast to bind()/listen()/connect()
 | 
			
		||||
    # so it's often required that we delay a bit more starting
 | 
			
		||||
    # the first actor-tree..
 | 
			
		||||
    if tpt_proto == 'uds':
 | 
			
		||||
        global _PROC_SPAWN_WAIT
 | 
			
		||||
        _PROC_SPAWN_WAIT = 0.6
 | 
			
		||||
 | 
			
		||||
    time.sleep(_PROC_SPAWN_WAIT)
 | 
			
		||||
 | 
			
		||||
    assert not proc.returncode
 | 
			
		||||
    yield proc
 | 
			
		||||
    sig_prog(proc, _INT_SIGNAL)
 | 
			
		||||
 | 
			
		||||
    # XXX! yeah.. just be reaaal careful with this bc sometimes it
 | 
			
		||||
    # can lock up on the `_io.BufferedReader` and hang..
 | 
			
		||||
    stderr: str = proc.stderr.read().decode()
 | 
			
		||||
    if stderr:
 | 
			
		||||
        print(
 | 
			
		||||
            f'Daemon actor tree produced STDERR:\n'
 | 
			
		||||
            f'{proc.args}\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'{stderr}\n'
 | 
			
		||||
        )
 | 
			
		||||
    if proc.returncode != -2:
 | 
			
		||||
        raise RuntimeError(
 | 
			
		||||
            'Daemon actor tree failed !?\n'
 | 
			
		||||
            f'{proc.args}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# @pytest.fixture(autouse=True)
 | 
			
		||||
# def shared_last_failed(pytestconfig):
 | 
			
		||||
#     val = pytestconfig.cache.get("example/value", None)
 | 
			
		||||
#     breakpoint()
 | 
			
		||||
#     if val is None:
 | 
			
		||||
#         pytestconfig.cache.set("example/value", val)
 | 
			
		||||
#     return val
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: a way to let test scripts (like from `examples/`)
 | 
			
		||||
# guarantee they won't `registry_addrs` collide!
 | 
			
		||||
# -[ ] maybe use some kinda standard `def main()` arg-spec that
 | 
			
		||||
#     we can introspect from a fixture that is called from the test
 | 
			
		||||
#     body?
 | 
			
		||||
# -[ ] test and figure out typing for below prototype! Bp
 | 
			
		||||
#
 | 
			
		||||
# @pytest.fixture
 | 
			
		||||
# def set_script_runtime_args(
 | 
			
		||||
#     reg_addr: tuple,
 | 
			
		||||
# ) -> Callable[[...], None]:
 | 
			
		||||
 | 
			
		||||
#     def import_n_partial_in_args_n_triorun(
 | 
			
		||||
#         script: Path,  # under examples?
 | 
			
		||||
#         **runtime_args,
 | 
			
		||||
#     ) -> Callable[[], Any]:  # a `partial`-ed equiv of `trio.run()`
 | 
			
		||||
 | 
			
		||||
#         # NOTE, below is taken from
 | 
			
		||||
#         # `.test_advanced_faults.test_ipc_channel_break_during_stream`
 | 
			
		||||
#         mod: ModuleType = import_path(
 | 
			
		||||
#             examples_dir() / 'advanced_faults'
 | 
			
		||||
#             / 'ipc_failure_during_stream.py',
 | 
			
		||||
#             root=examples_dir(),
 | 
			
		||||
#             consider_namespace_packages=False,
 | 
			
		||||
#         )
 | 
			
		||||
#         return partial(
 | 
			
		||||
#             trio.run,
 | 
			
		||||
#             partial(
 | 
			
		||||
#                 mod.main,
 | 
			
		||||
#                 **runtime_args,
 | 
			
		||||
#             )
 | 
			
		||||
#         )
 | 
			
		||||
#     return import_n_partial_in_args_n_triorun
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,4 @@
 | 
			
		|||
'''
 | 
			
		||||
`tractor.ipc` subsystem(s)/unit testing suites.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,95 @@
 | 
			
		|||
'''
 | 
			
		||||
Verify the `enable_transports` param drives various
 | 
			
		||||
per-root/sub-actor IPC endpoint/server settings.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
import trio
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor import (
 | 
			
		||||
    Actor,
 | 
			
		||||
    Portal,
 | 
			
		||||
    ipc,
 | 
			
		||||
    msg,
 | 
			
		||||
    _state,
 | 
			
		||||
    _addr,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def chk_tpts(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
    tpt_proto_key: str,
 | 
			
		||||
):
 | 
			
		||||
    rtvars = _state._runtime_vars
 | 
			
		||||
    assert (
 | 
			
		||||
        tpt_proto_key
 | 
			
		||||
        in
 | 
			
		||||
        rtvars['_enable_tpts']
 | 
			
		||||
    )
 | 
			
		||||
    actor: Actor = tractor.current_actor()
 | 
			
		||||
    spec: msg.types.SpawnSpec = actor._spawn_spec
 | 
			
		||||
    assert spec._runtime_vars == rtvars
 | 
			
		||||
 | 
			
		||||
    # ensure individual IPC ep-addr types
 | 
			
		||||
    serv: ipc._server.Server = actor.ipc_server
 | 
			
		||||
    addr: ipc._types.Address
 | 
			
		||||
    for addr in serv.addrs:
 | 
			
		||||
        assert addr.proto_key == tpt_proto_key
 | 
			
		||||
 | 
			
		||||
    # Actor delegate-props enforcement
 | 
			
		||||
    assert (
 | 
			
		||||
        actor.accept_addrs
 | 
			
		||||
        ==
 | 
			
		||||
        serv.accept_addrs
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    await ctx.started(serv.accept_addrs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO, parametrize over mis-matched-proto-typed `registry_addrs`
 | 
			
		||||
# since i seems to work in `piker` but not exactly sure if both tcp
 | 
			
		||||
# & uds are being deployed then?
 | 
			
		||||
#
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'tpt_proto_key',
 | 
			
		||||
    ['tcp', 'uds'],
 | 
			
		||||
    ids=lambda item: f'ipc_tpt={item!r}'
 | 
			
		||||
)
 | 
			
		||||
def test_root_passes_tpt_to_sub(
 | 
			
		||||
    tpt_proto_key: str,
 | 
			
		||||
    reg_addr: tuple,
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
):
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery(
 | 
			
		||||
            enable_transports=[tpt_proto_key],
 | 
			
		||||
            registry_addrs=[reg_addr],
 | 
			
		||||
            debug_mode=debug_mode,
 | 
			
		||||
        ) as an:
 | 
			
		||||
 | 
			
		||||
            assert (
 | 
			
		||||
                tpt_proto_key
 | 
			
		||||
                in
 | 
			
		||||
                _state._runtime_vars['_enable_tpts']
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            ptl: Portal = await an.start_actor(
 | 
			
		||||
                name='sub',
 | 
			
		||||
                enable_modules=[__name__],
 | 
			
		||||
            )
 | 
			
		||||
            async with ptl.open_context(
 | 
			
		||||
                chk_tpts,
 | 
			
		||||
                tpt_proto_key=tpt_proto_key,
 | 
			
		||||
            ) as (ctx, accept_addrs):
 | 
			
		||||
 | 
			
		||||
                uw_addr: tuple
 | 
			
		||||
                for uw_addr in accept_addrs:
 | 
			
		||||
                    addr = _addr.wrap_address(uw_addr)
 | 
			
		||||
                    assert addr.is_valid
 | 
			
		||||
 | 
			
		||||
            # shudown sub-actor(s)
 | 
			
		||||
            await an.cancel()
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,72 @@
 | 
			
		|||
'''
 | 
			
		||||
High-level `.ipc._server` unit tests.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
import trio
 | 
			
		||||
from tractor import (
 | 
			
		||||
    devx,
 | 
			
		||||
    ipc,
 | 
			
		||||
    log,
 | 
			
		||||
)
 | 
			
		||||
from tractor._testing.addr import (
 | 
			
		||||
    get_rando_addr,
 | 
			
		||||
)
 | 
			
		||||
# TODO, use/check-roundtripping with some of these wrapper types?
 | 
			
		||||
#
 | 
			
		||||
# from .._addr import Address
 | 
			
		||||
# from ._chan import Channel
 | 
			
		||||
# from ._transport import MsgTransport
 | 
			
		||||
# from ._uds import UDSAddress
 | 
			
		||||
# from ._tcp import TCPAddress
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    '_tpt_proto',
 | 
			
		||||
    ['uds', 'tcp']
 | 
			
		||||
)
 | 
			
		||||
def test_basic_ipc_server(
 | 
			
		||||
    _tpt_proto: str,
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    loglevel: str,
 | 
			
		||||
):
 | 
			
		||||
 | 
			
		||||
    # so we see the socket-listener reporting on console
 | 
			
		||||
    log.get_console_log("INFO")
 | 
			
		||||
 | 
			
		||||
    rando_addr: tuple = get_rando_addr(
 | 
			
		||||
        tpt_proto=_tpt_proto,
 | 
			
		||||
    )
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with ipc._server.open_ipc_server() as server:
 | 
			
		||||
 | 
			
		||||
            assert (
 | 
			
		||||
                server._parent_tn
 | 
			
		||||
                and
 | 
			
		||||
                server._parent_tn is server._stream_handler_tn
 | 
			
		||||
            )
 | 
			
		||||
            assert server._no_more_peers.is_set()
 | 
			
		||||
 | 
			
		||||
            eps: list[ipc._server.Endpoint] = await server.listen_on(
 | 
			
		||||
                accept_addrs=[rando_addr],
 | 
			
		||||
                stream_handler_nursery=None,
 | 
			
		||||
            )
 | 
			
		||||
            assert (
 | 
			
		||||
                len(eps) == 1
 | 
			
		||||
                and
 | 
			
		||||
                (ep := eps[0])._listener
 | 
			
		||||
                and
 | 
			
		||||
                not ep.peer_tpts
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            server._parent_tn.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
        # !TODO! actually make a bg-task connection from a client
 | 
			
		||||
        # using `ipc._chan._connect_chan()`
 | 
			
		||||
 | 
			
		||||
    with devx.maybe_open_crash_handler(
 | 
			
		||||
        pdb=debug_mode,
 | 
			
		||||
    ):
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			@ -10,6 +10,9 @@ import pytest
 | 
			
		|||
from _pytest.pathlib import import_path
 | 
			
		||||
import trio
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor import (
 | 
			
		||||
    TransportClosed,
 | 
			
		||||
)
 | 
			
		||||
from tractor._testing import (
 | 
			
		||||
    examples_dir,
 | 
			
		||||
    break_ipc,
 | 
			
		||||
| 
						 | 
				
			
			@ -74,6 +77,7 @@ def test_ipc_channel_break_during_stream(
 | 
			
		|||
    spawn_backend: str,
 | 
			
		||||
    ipc_break: dict|None,
 | 
			
		||||
    pre_aclose_msgstream: bool,
 | 
			
		||||
    tpt_proto: str,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Ensure we can have an IPC channel break its connection during
 | 
			
		||||
| 
						 | 
				
			
			@ -91,7 +95,7 @@ def test_ipc_channel_break_during_stream(
 | 
			
		|||
        # non-`trio` spawners should never hit the hang condition that
 | 
			
		||||
        # requires the user to do ctl-c to cancel the actor tree.
 | 
			
		||||
        # expect_final_exc = trio.ClosedResourceError
 | 
			
		||||
        expect_final_exc = tractor.TransportClosed
 | 
			
		||||
        expect_final_exc = TransportClosed
 | 
			
		||||
 | 
			
		||||
    mod: ModuleType = import_path(
 | 
			
		||||
        examples_dir() / 'advanced_faults'
 | 
			
		||||
| 
						 | 
				
			
			@ -104,6 +108,8 @@ def test_ipc_channel_break_during_stream(
 | 
			
		|||
    # period" wherein the user eventually hits ctl-c to kill the
 | 
			
		||||
    # root-actor tree.
 | 
			
		||||
    expect_final_exc: BaseException = KeyboardInterrupt
 | 
			
		||||
    expect_final_cause: BaseException|None = None
 | 
			
		||||
 | 
			
		||||
    if (
 | 
			
		||||
        # only expect EoC if trans is broken on the child side,
 | 
			
		||||
        ipc_break['break_child_ipc_after'] is not False
 | 
			
		||||
| 
						 | 
				
			
			@ -138,6 +144,9 @@ def test_ipc_channel_break_during_stream(
 | 
			
		|||
        # a user sending ctl-c by raising a KBI.
 | 
			
		||||
        if pre_aclose_msgstream:
 | 
			
		||||
            expect_final_exc = KeyboardInterrupt
 | 
			
		||||
            if tpt_proto == 'uds':
 | 
			
		||||
                expect_final_exc = TransportClosed
 | 
			
		||||
                expect_final_cause = trio.BrokenResourceError
 | 
			
		||||
 | 
			
		||||
            # XXX OLD XXX
 | 
			
		||||
            # if child calls `MsgStream.aclose()` then expect EoC.
 | 
			
		||||
| 
						 | 
				
			
			@ -157,6 +166,10 @@ def test_ipc_channel_break_during_stream(
 | 
			
		|||
        if pre_aclose_msgstream:
 | 
			
		||||
            expect_final_exc = KeyboardInterrupt
 | 
			
		||||
 | 
			
		||||
            if tpt_proto == 'uds':
 | 
			
		||||
                expect_final_exc = TransportClosed
 | 
			
		||||
                expect_final_cause = trio.BrokenResourceError
 | 
			
		||||
 | 
			
		||||
    # NOTE when the parent IPC side dies (even if the child does as well
 | 
			
		||||
    # but the child fails BEFORE the parent) we always expect the
 | 
			
		||||
    # IPC layer to raise a closed-resource, NEVER do we expect
 | 
			
		||||
| 
						 | 
				
			
			@ -169,8 +182,8 @@ def test_ipc_channel_break_during_stream(
 | 
			
		|||
        and
 | 
			
		||||
        ipc_break['break_child_ipc_after'] is False
 | 
			
		||||
    ):
 | 
			
		||||
        # expect_final_exc = trio.ClosedResourceError
 | 
			
		||||
        expect_final_exc = tractor.TransportClosed
 | 
			
		||||
        expect_final_cause = trio.ClosedResourceError
 | 
			
		||||
 | 
			
		||||
    # BOTH but, PARENT breaks FIRST
 | 
			
		||||
    elif (
 | 
			
		||||
| 
						 | 
				
			
			@ -181,8 +194,8 @@ def test_ipc_channel_break_during_stream(
 | 
			
		|||
            ipc_break['break_parent_ipc_after']
 | 
			
		||||
        )
 | 
			
		||||
    ):
 | 
			
		||||
        # expect_final_exc = trio.ClosedResourceError
 | 
			
		||||
        expect_final_exc = tractor.TransportClosed
 | 
			
		||||
        expect_final_cause = trio.ClosedResourceError
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(
 | 
			
		||||
        expected_exception=(
 | 
			
		||||
| 
						 | 
				
			
			@ -198,6 +211,7 @@ def test_ipc_channel_break_during_stream(
 | 
			
		|||
                    start_method=spawn_backend,
 | 
			
		||||
                    loglevel=loglevel,
 | 
			
		||||
                    pre_close=pre_aclose_msgstream,
 | 
			
		||||
                    tpt_proto=tpt_proto,
 | 
			
		||||
                    **ipc_break,
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
| 
						 | 
				
			
			@ -220,10 +234,15 @@ def test_ipc_channel_break_during_stream(
 | 
			
		|||
                )
 | 
			
		||||
            cause: Exception = tc.__cause__
 | 
			
		||||
            assert (
 | 
			
		||||
                type(cause) is trio.ClosedResourceError
 | 
			
		||||
                and
 | 
			
		||||
                cause.args[0] == 'another task closed this fd'
 | 
			
		||||
                # type(cause) is trio.ClosedResourceError
 | 
			
		||||
                type(cause) is expect_final_cause
 | 
			
		||||
 | 
			
		||||
                # TODO, should we expect a certain exc-message (per
 | 
			
		||||
                # tpt) as well??
 | 
			
		||||
                # and
 | 
			
		||||
                # cause.args[0] == 'another task closed this fd'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    # get raw instance from pytest wrapper
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -7,7 +7,9 @@ import platform
 | 
			
		|||
from functools import partial
 | 
			
		||||
import itertools
 | 
			
		||||
 | 
			
		||||
import psutil
 | 
			
		||||
import pytest
 | 
			
		||||
import subprocess
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor._testing import tractor_test
 | 
			
		||||
import trio
 | 
			
		||||
| 
						 | 
				
			
			@ -152,13 +154,23 @@ async def unpack_reg(actor_or_portal):
 | 
			
		|||
async def spawn_and_check_registry(
 | 
			
		||||
    reg_addr: tuple,
 | 
			
		||||
    use_signal: bool,
 | 
			
		||||
    debug_mode: bool = False,
 | 
			
		||||
    remote_arbiter: bool = False,
 | 
			
		||||
    with_streaming: bool = False,
 | 
			
		||||
    maybe_daemon: tuple[
 | 
			
		||||
        subprocess.Popen,
 | 
			
		||||
        psutil.Process,
 | 
			
		||||
    ]|None = None,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
    if maybe_daemon:
 | 
			
		||||
        popen, proc = maybe_daemon
 | 
			
		||||
        # breakpoint()
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_root_actor(
 | 
			
		||||
        registry_addrs=[reg_addr],
 | 
			
		||||
        debug_mode=debug_mode,
 | 
			
		||||
    ):
 | 
			
		||||
        async with tractor.get_registry(reg_addr) as portal:
 | 
			
		||||
            # runtime needs to be up to call this
 | 
			
		||||
| 
						 | 
				
			
			@ -176,11 +188,11 @@ async def spawn_and_check_registry(
 | 
			
		|||
                extra = 2  # local root actor + remote arbiter
 | 
			
		||||
 | 
			
		||||
            # ensure current actor is registered
 | 
			
		||||
            registry = await get_reg()
 | 
			
		||||
            registry: dict = await get_reg()
 | 
			
		||||
            assert actor.uid in registry
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                async with tractor.open_nursery() as n:
 | 
			
		||||
                async with tractor.open_nursery() as an:
 | 
			
		||||
                    async with trio.open_nursery(
 | 
			
		||||
                        strict_exception_groups=False,
 | 
			
		||||
                    ) as trion:
 | 
			
		||||
| 
						 | 
				
			
			@ -189,17 +201,17 @@ async def spawn_and_check_registry(
 | 
			
		|||
                        for i in range(3):
 | 
			
		||||
                            name = f'a{i}'
 | 
			
		||||
                            if with_streaming:
 | 
			
		||||
                                portals[name] = await n.start_actor(
 | 
			
		||||
                                portals[name] = await an.start_actor(
 | 
			
		||||
                                    name=name, enable_modules=[__name__])
 | 
			
		||||
 | 
			
		||||
                            else:  # no streaming
 | 
			
		||||
                                portals[name] = await n.run_in_actor(
 | 
			
		||||
                                portals[name] = await an.run_in_actor(
 | 
			
		||||
                                    trio.sleep_forever, name=name)
 | 
			
		||||
 | 
			
		||||
                        # wait on last actor to come up
 | 
			
		||||
                        async with tractor.wait_for_actor(name):
 | 
			
		||||
                            registry = await get_reg()
 | 
			
		||||
                            for uid in n._children:
 | 
			
		||||
                            for uid in an._children:
 | 
			
		||||
                                assert uid in registry
 | 
			
		||||
 | 
			
		||||
                        assert len(portals) + extra == len(registry)
 | 
			
		||||
| 
						 | 
				
			
			@ -232,6 +244,7 @@ async def spawn_and_check_registry(
 | 
			
		|||
@pytest.mark.parametrize('use_signal', [False, True])
 | 
			
		||||
@pytest.mark.parametrize('with_streaming', [False, True])
 | 
			
		||||
def test_subactors_unregister_on_cancel(
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    start_method,
 | 
			
		||||
    use_signal,
 | 
			
		||||
    reg_addr,
 | 
			
		||||
| 
						 | 
				
			
			@ -248,6 +261,7 @@ def test_subactors_unregister_on_cancel(
 | 
			
		|||
                spawn_and_check_registry,
 | 
			
		||||
                reg_addr,
 | 
			
		||||
                use_signal,
 | 
			
		||||
                debug_mode=debug_mode,
 | 
			
		||||
                remote_arbiter=False,
 | 
			
		||||
                with_streaming=with_streaming,
 | 
			
		||||
            ),
 | 
			
		||||
| 
						 | 
				
			
			@ -257,7 +271,8 @@ def test_subactors_unregister_on_cancel(
 | 
			
		|||
@pytest.mark.parametrize('use_signal', [False, True])
 | 
			
		||||
@pytest.mark.parametrize('with_streaming', [False, True])
 | 
			
		||||
def test_subactors_unregister_on_cancel_remote_daemon(
 | 
			
		||||
    daemon,
 | 
			
		||||
    daemon: subprocess.Popen,
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    start_method,
 | 
			
		||||
    use_signal,
 | 
			
		||||
    reg_addr,
 | 
			
		||||
| 
						 | 
				
			
			@ -273,8 +288,13 @@ def test_subactors_unregister_on_cancel_remote_daemon(
 | 
			
		|||
                spawn_and_check_registry,
 | 
			
		||||
                reg_addr,
 | 
			
		||||
                use_signal,
 | 
			
		||||
                debug_mode=debug_mode,
 | 
			
		||||
                remote_arbiter=True,
 | 
			
		||||
                with_streaming=with_streaming,
 | 
			
		||||
                maybe_daemon=(
 | 
			
		||||
                    daemon,
 | 
			
		||||
                    psutil.Process(daemon.pid)
 | 
			
		||||
                ),
 | 
			
		||||
            ),
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -373,7 +393,7 @@ def test_close_channel_explicit(
 | 
			
		|||
 | 
			
		||||
@pytest.mark.parametrize('use_signal', [False, True])
 | 
			
		||||
def test_close_channel_explicit_remote_arbiter(
 | 
			
		||||
    daemon,
 | 
			
		||||
    daemon: subprocess.Popen,
 | 
			
		||||
    start_method,
 | 
			
		||||
    use_signal,
 | 
			
		||||
    reg_addr,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -100,16 +100,29 @@ async def streamer(
 | 
			
		|||
@acm
 | 
			
		||||
async def open_stream() -> Awaitable[tractor.MsgStream]:
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_nursery() as tn:
 | 
			
		||||
        portal = await tn.start_actor('streamer', enable_modules=[__name__])
 | 
			
		||||
        async with (
 | 
			
		||||
            portal.open_context(streamer) as (ctx, first),
 | 
			
		||||
            ctx.open_stream() as stream,
 | 
			
		||||
        ):
 | 
			
		||||
            yield stream
 | 
			
		||||
    try:
 | 
			
		||||
        async with tractor.open_nursery() as an:
 | 
			
		||||
            portal = await an.start_actor(
 | 
			
		||||
                'streamer',
 | 
			
		||||
                enable_modules=[__name__],
 | 
			
		||||
            )
 | 
			
		||||
            async with (
 | 
			
		||||
                portal.open_context(streamer) as (ctx, first),
 | 
			
		||||
                ctx.open_stream() as stream,
 | 
			
		||||
            ):
 | 
			
		||||
                yield stream
 | 
			
		||||
 | 
			
		||||
        await portal.cancel_actor()
 | 
			
		||||
    print('CANCELLED STREAMER')
 | 
			
		||||
            print('Cancelling streamer')
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
            print('Cancelled streamer')
 | 
			
		||||
 | 
			
		||||
    except Exception as err:
 | 
			
		||||
        print(
 | 
			
		||||
            f'`open_stream()` errored?\n'
 | 
			
		||||
            f'{err!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
        await tractor.pause(shield=True)
 | 
			
		||||
        raise err
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
| 
						 | 
				
			
			@ -132,19 +145,28 @@ async def maybe_open_stream(taskname: str):
 | 
			
		|||
            yield stream
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_open_local_sub_to_stream():
 | 
			
		||||
def test_open_local_sub_to_stream(
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Verify a single inter-actor stream can can be fanned-out shared to
 | 
			
		||||
    N local tasks using ``trionics.maybe_open_context():``.
 | 
			
		||||
    N local tasks using `trionics.maybe_open_context()`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    timeout: float = 3.6 if platform.system() != "Windows" else 10
 | 
			
		||||
    timeout: float = 3.6
 | 
			
		||||
    if platform.system() == "Windows":
 | 
			
		||||
        timeout: float = 10
 | 
			
		||||
 | 
			
		||||
    if debug_mode:
 | 
			
		||||
        timeout = 999
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
 | 
			
		||||
        full = list(range(1000))
 | 
			
		||||
 | 
			
		||||
        async def get_sub_and_pull(taskname: str):
 | 
			
		||||
 | 
			
		||||
            stream: tractor.MsgStream
 | 
			
		||||
            async with (
 | 
			
		||||
                maybe_open_stream(taskname) as stream,
 | 
			
		||||
            ):
 | 
			
		||||
| 
						 | 
				
			
			@ -165,17 +187,27 @@ def test_open_local_sub_to_stream():
 | 
			
		|||
                assert set(seq).issubset(set(full))
 | 
			
		||||
            print(f'{taskname} finished')
 | 
			
		||||
 | 
			
		||||
        with trio.fail_after(timeout):
 | 
			
		||||
        with trio.fail_after(timeout) as cs:
 | 
			
		||||
            # TODO: turns out this isn't multi-task entrant XD
 | 
			
		||||
            # We probably need an indepotent entry semantic?
 | 
			
		||||
            async with tractor.open_root_actor():
 | 
			
		||||
            async with tractor.open_root_actor(
 | 
			
		||||
                debug_mode=debug_mode,
 | 
			
		||||
            ):
 | 
			
		||||
                async with (
 | 
			
		||||
                    trio.open_nursery() as nurse,
 | 
			
		||||
                    trio.open_nursery() as tn,
 | 
			
		||||
                ):
 | 
			
		||||
                    for i in range(10):
 | 
			
		||||
                        nurse.start_soon(get_sub_and_pull, f'task_{i}')
 | 
			
		||||
                        tn.start_soon(
 | 
			
		||||
                            get_sub_and_pull,
 | 
			
		||||
                            f'task_{i}',
 | 
			
		||||
                        )
 | 
			
		||||
                        await trio.sleep(0.001)
 | 
			
		||||
 | 
			
		||||
                print('all consumer tasks finished')
 | 
			
		||||
 | 
			
		||||
        if cs.cancelled_caught:
 | 
			
		||||
            pytest.fail(
 | 
			
		||||
                'Should NOT time out in `open_root_actor()` ?'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -180,7 +180,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
 | 
			
		|||
        with tractor.devx.maybe_open_crash_handler(
 | 
			
		||||
            pdb=debug_mode,
 | 
			
		||||
        ) as bxerr:
 | 
			
		||||
            assert not bxerr.value
 | 
			
		||||
            if bxerr:
 | 
			
		||||
                assert not bxerr.value
 | 
			
		||||
 | 
			
		||||
            async with (
 | 
			
		||||
                wraps_tn_that_always_cancels() as tn,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,70 @@
 | 
			
		|||
# tractor: structured concurrent "actors".
 | 
			
		||||
# Copyright 2018-eternity Tyler Goodlet.
 | 
			
		||||
 | 
			
		||||
# This program is free software: you can redistribute it and/or modify
 | 
			
		||||
# it under the terms of the GNU Affero General Public License as published by
 | 
			
		||||
# the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
# (at your option) any later version.
 | 
			
		||||
 | 
			
		||||
# This program is distributed in the hope that it will be useful,
 | 
			
		||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
# GNU Affero General Public License for more details.
 | 
			
		||||
 | 
			
		||||
# You should have received a copy of the GNU Affero General Public License
 | 
			
		||||
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
Random IPC addr generation for isolating
 | 
			
		||||
the discovery space between test sessions.
 | 
			
		||||
 | 
			
		||||
Might be eventually useful to expose as a util set from
 | 
			
		||||
our `tractor.discovery` subsys?
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
import random
 | 
			
		||||
from typing import (
 | 
			
		||||
    Type,
 | 
			
		||||
)
 | 
			
		||||
from tractor import (
 | 
			
		||||
    _addr,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_rando_addr(
 | 
			
		||||
    tpt_proto: str,
 | 
			
		||||
    *,
 | 
			
		||||
 | 
			
		||||
    # choose random port at import time
 | 
			
		||||
    _rando_port: str = random.randint(1000, 9999)
 | 
			
		||||
 | 
			
		||||
) -> tuple[str, str|int]:
 | 
			
		||||
    '''
 | 
			
		||||
    Used to globally override the runtime to the
 | 
			
		||||
    per-test-session-dynamic addr so that all tests never conflict
 | 
			
		||||
    with any other actor tree using the default.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto]
 | 
			
		||||
    def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
 | 
			
		||||
 | 
			
		||||
    # this is the "unwrapped" form expected to be passed to
 | 
			
		||||
    # `.open_root_actor()` by test body.
 | 
			
		||||
    testrun_reg_addr: tuple[str, int|str]
 | 
			
		||||
    match tpt_proto:
 | 
			
		||||
        case 'tcp':
 | 
			
		||||
            testrun_reg_addr = (
 | 
			
		||||
                addr_type.def_bindspace,
 | 
			
		||||
                _rando_port,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # NOTE, file-name uniqueness (no-collisions) will be based on
 | 
			
		||||
        # the runtime-directory and root (pytest-proc's) pid.
 | 
			
		||||
        case 'uds':
 | 
			
		||||
            testrun_reg_addr = addr_type.get_random().unwrap()
 | 
			
		||||
 | 
			
		||||
    # XXX, as sanity it should never the same as the default for the
 | 
			
		||||
    # host-singleton registry actor.
 | 
			
		||||
    assert def_reg_addr != testrun_reg_addr
 | 
			
		||||
 | 
			
		||||
    return testrun_reg_addr
 | 
			
		||||
| 
						 | 
				
			
			@ -26,29 +26,46 @@ from functools import (
 | 
			
		|||
import inspect
 | 
			
		||||
import platform
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
import tractor
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def tractor_test(fn):
 | 
			
		||||
    '''
 | 
			
		||||
    Decorator for async test funcs to present them as "native"
 | 
			
		||||
    looking sync funcs runnable by `pytest` using `trio.run()`.
 | 
			
		||||
    Decorator for async test fns to decorator-wrap them as "native"
 | 
			
		||||
    looking sync funcs runnable by `pytest` and auto invoked with
 | 
			
		||||
    `trio.run()` (much like the `pytest-trio` plugin's approach).
 | 
			
		||||
 | 
			
		||||
    Use:
 | 
			
		||||
    Further the test fn body will be invoked AFTER booting the actor
 | 
			
		||||
    runtime, i.e. from inside a `tractor.open_root_actor()` block AND
 | 
			
		||||
    with various runtime and tooling parameters implicitly passed as
 | 
			
		||||
    requested by by the test session's config; see immediately below.
 | 
			
		||||
 | 
			
		||||
    @tractor_test
 | 
			
		||||
    async def test_whatever():
 | 
			
		||||
        await ...
 | 
			
		||||
    Basic deco use:
 | 
			
		||||
    ---------------
 | 
			
		||||
 | 
			
		||||
    If fixtures:
 | 
			
		||||
      @tractor_test
 | 
			
		||||
      async def test_whatever():
 | 
			
		||||
          await ...
 | 
			
		||||
 | 
			
		||||
        - ``reg_addr`` (a socket addr tuple where arbiter is listening)
 | 
			
		||||
        - ``loglevel`` (logging level passed to tractor internals)
 | 
			
		||||
        - ``start_method`` (subprocess spawning backend)
 | 
			
		||||
 | 
			
		||||
    are defined in the `pytest` fixture space they will be automatically
 | 
			
		||||
    injected to tests declaring these funcargs.
 | 
			
		||||
    Runtime config via special fixtures:
 | 
			
		||||
    ------------------------------------
 | 
			
		||||
    If any of the following fixture are requested by the wrapped test
 | 
			
		||||
    fn (via normal func-args declaration),
 | 
			
		||||
 | 
			
		||||
    - `reg_addr` (a socket addr tuple where arbiter is listening)
 | 
			
		||||
    - `loglevel` (logging level passed to tractor internals)
 | 
			
		||||
    - `start_method` (subprocess spawning backend)
 | 
			
		||||
 | 
			
		||||
    (TODO support)
 | 
			
		||||
    - `tpt_proto` (IPC transport protocol key)
 | 
			
		||||
 | 
			
		||||
    they will be automatically injected to each test as normally
 | 
			
		||||
    expected as well as passed to the initial
 | 
			
		||||
    `tractor.open_root_actor()` funcargs.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    @wraps(fn)
 | 
			
		||||
    def wrapper(
 | 
			
		||||
| 
						 | 
				
			
			@ -111,3 +128,164 @@ def tractor_test(fn):
 | 
			
		|||
        return trio.run(main)
 | 
			
		||||
 | 
			
		||||
    return wrapper
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def pytest_addoption(
 | 
			
		||||
    parser: pytest.Parser,
 | 
			
		||||
):
 | 
			
		||||
    # parser.addoption(
 | 
			
		||||
    #     "--ll",
 | 
			
		||||
    #     action="store",
 | 
			
		||||
    #     dest='loglevel',
 | 
			
		||||
    #     default='ERROR', help="logging level to set when testing"
 | 
			
		||||
    # )
 | 
			
		||||
 | 
			
		||||
    parser.addoption(
 | 
			
		||||
        "--spawn-backend",
 | 
			
		||||
        action="store",
 | 
			
		||||
        dest='spawn_backend',
 | 
			
		||||
        default='trio',
 | 
			
		||||
        help="Processing spawning backend to use for test run",
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    parser.addoption(
 | 
			
		||||
        "--tpdb",
 | 
			
		||||
        "--debug-mode",
 | 
			
		||||
        action="store_true",
 | 
			
		||||
        dest='tractor_debug_mode',
 | 
			
		||||
        # default=False,
 | 
			
		||||
        help=(
 | 
			
		||||
            'Enable a flag that can be used by tests to to set the '
 | 
			
		||||
            '`debug_mode: bool` for engaging the internal '
 | 
			
		||||
            'multi-proc debugger sys.'
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # provide which IPC transport protocols opting-in test suites
 | 
			
		||||
    # should accumulatively run against.
 | 
			
		||||
    parser.addoption(
 | 
			
		||||
        "--tpt-proto",
 | 
			
		||||
        nargs='+',  # accumulate-multiple-args
 | 
			
		||||
        action="store",
 | 
			
		||||
        dest='tpt_protos',
 | 
			
		||||
        default=['tcp'],
 | 
			
		||||
        help="Transport protocol to use under the `tractor.ipc.Channel`",
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def pytest_configure(config):
 | 
			
		||||
    backend = config.option.spawn_backend
 | 
			
		||||
    tractor._spawn.try_set_start_method(backend)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='session')
 | 
			
		||||
def debug_mode(request) -> bool:
 | 
			
		||||
    '''
 | 
			
		||||
    Flag state for whether `--tpdb` (for `tractor`-py-debugger)
 | 
			
		||||
    was passed to the test run.
 | 
			
		||||
 | 
			
		||||
    Normally tests should pass this directly to `.open_root_actor()`
 | 
			
		||||
    to allow the user to opt into suite-wide crash handling.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    debug_mode: bool = request.config.option.tractor_debug_mode
 | 
			
		||||
    return debug_mode
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='session')
 | 
			
		||||
def spawn_backend(request) -> str:
 | 
			
		||||
    return request.config.option.spawn_backend
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='session')
 | 
			
		||||
def tpt_protos(request) -> list[str]:
 | 
			
		||||
 | 
			
		||||
    # allow quoting on CLI
 | 
			
		||||
    proto_keys: list[str] = [
 | 
			
		||||
        proto_key.replace('"', '').replace("'", "")
 | 
			
		||||
        for proto_key in request.config.option.tpt_protos
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    # ?TODO, eventually support multiple protos per test-sesh?
 | 
			
		||||
    if len(proto_keys) > 1:
 | 
			
		||||
        pytest.fail(
 | 
			
		||||
            'We only support one `--tpt-proto <key>` atm!\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # XXX ensure we support the protocol by name via lookup!
 | 
			
		||||
    for proto_key in proto_keys:
 | 
			
		||||
        addr_type = tractor._addr._address_types[proto_key]
 | 
			
		||||
        assert addr_type.proto_key == proto_key
 | 
			
		||||
 | 
			
		||||
    yield proto_keys
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(
 | 
			
		||||
    scope='session',
 | 
			
		||||
    autouse=True,
 | 
			
		||||
)
 | 
			
		||||
def tpt_proto(
 | 
			
		||||
    tpt_protos: list[str],
 | 
			
		||||
) -> str:
 | 
			
		||||
    proto_key: str = tpt_protos[0]
 | 
			
		||||
 | 
			
		||||
    from tractor import _state
 | 
			
		||||
    if _state._def_tpt_proto != proto_key:
 | 
			
		||||
        _state._def_tpt_proto = proto_key
 | 
			
		||||
 | 
			
		||||
    yield proto_key
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='session')
 | 
			
		||||
def reg_addr(
 | 
			
		||||
    tpt_proto: str,
 | 
			
		||||
) -> tuple[str, int|str]:
 | 
			
		||||
    '''
 | 
			
		||||
    Deliver a test-sesh unique registry address such
 | 
			
		||||
    that each run's (tests which use this fixture) will
 | 
			
		||||
    have no conflicts/cross-talk when running simultaneously
 | 
			
		||||
    nor will interfere with other live `tractor` apps active
 | 
			
		||||
    on the same network-host (namespace).
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    from tractor._testing.addr import get_rando_addr
 | 
			
		||||
    return get_rando_addr(
 | 
			
		||||
        tpt_proto=tpt_proto,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def pytest_generate_tests(
 | 
			
		||||
    metafunc: pytest.Metafunc,
 | 
			
		||||
):
 | 
			
		||||
    spawn_backend: str = metafunc.config.option.spawn_backend
 | 
			
		||||
 | 
			
		||||
    if not spawn_backend:
 | 
			
		||||
        # XXX some weird windows bug with `pytest`?
 | 
			
		||||
        spawn_backend = 'trio'
 | 
			
		||||
 | 
			
		||||
    # TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
 | 
			
		||||
    assert spawn_backend in (
 | 
			
		||||
        'mp_spawn',
 | 
			
		||||
        'mp_forkserver',
 | 
			
		||||
        'trio',
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # NOTE: used-to-be-used-to dyanmically parametrize tests for when
 | 
			
		||||
    # you just passed --spawn-backend=`mp` on the cli, but now we expect
 | 
			
		||||
    # that cli input to be manually specified, BUT, maybe we'll do
 | 
			
		||||
    # something like this again in the future?
 | 
			
		||||
    if 'start_method' in metafunc.fixturenames:
 | 
			
		||||
        metafunc.parametrize(
 | 
			
		||||
            "start_method",
 | 
			
		||||
            [spawn_backend],
 | 
			
		||||
            scope='module',
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # TODO, parametrize any `tpt_proto: str` declaring tests!
 | 
			
		||||
    # proto_tpts: list[str] = metafunc.config.option.proto_tpts
 | 
			
		||||
    # if 'tpt_proto' in metafunc.fixturenames:
 | 
			
		||||
    #     metafunc.parametrize(
 | 
			
		||||
    #         'tpt_proto',
 | 
			
		||||
    #         proto_tpts,  # TODO, double check this list usage!
 | 
			
		||||
    #         scope='module',
 | 
			
		||||
    #     )
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -289,7 +289,7 @@ async def maybe_wait_on_canced_subs(
 | 
			
		|||
#
 | 
			
		||||
# -[x] maybe change to mod-func and rename for implied
 | 
			
		||||
#    multi-transport semantics?
 | 
			
		||||
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
 | 
			
		||||
# -[ ] register each stream/tpt/chan with the owning `Endpoint`
 | 
			
		||||
#     so that we can query per tpt all peer contact infos?
 | 
			
		||||
#  |_[ ] possibly provide a global viewing via a
 | 
			
		||||
#        `collections.ChainMap`?
 | 
			
		||||
| 
						 | 
				
			
			@ -309,7 +309,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
    any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
 | 
			
		||||
    such that it is invoked as,
 | 
			
		||||
 | 
			
		||||
      IPCEndpoint.stream_handler_tn.start_soon(
 | 
			
		||||
      Endpoint.stream_handler_tn.start_soon(
 | 
			
		||||
          handle_stream,
 | 
			
		||||
          stream,
 | 
			
		||||
      )
 | 
			
		||||
| 
						 | 
				
			
			@ -577,7 +577,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
    # finally block closure
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IPCEndpoint(Struct):
 | 
			
		||||
class Endpoint(Struct):
 | 
			
		||||
    '''
 | 
			
		||||
    An instance of an IPC "bound" address where the lifetime of the
 | 
			
		||||
    "ability to accept connections" (from clients) and then handle
 | 
			
		||||
| 
						 | 
				
			
			@ -636,7 +636,7 @@ class IPCEndpoint(Struct):
 | 
			
		|||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IPCServer(Struct):
 | 
			
		||||
class Server(Struct):
 | 
			
		||||
    _parent_tn: Nursery
 | 
			
		||||
    _stream_handler_tn: Nursery
 | 
			
		||||
    # level-triggered sig for whether "no peers are currently
 | 
			
		||||
| 
						 | 
				
			
			@ -644,7 +644,7 @@ class IPCServer(Struct):
 | 
			
		|||
    # initialized with `.is_set() == True`.
 | 
			
		||||
    _no_more_peers: trio.Event
 | 
			
		||||
 | 
			
		||||
    _endpoints: list[IPCEndpoint] = []
 | 
			
		||||
    _endpoints: list[Endpoint] = []
 | 
			
		||||
 | 
			
		||||
    # connection tracking & mgmt
 | 
			
		||||
    _peers: defaultdict[
 | 
			
		||||
| 
						 | 
				
			
			@ -659,10 +659,10 @@ class IPCServer(Struct):
 | 
			
		|||
    # syncs for setup/teardown sequences
 | 
			
		||||
    _shutdown: trio.Event|None = None
 | 
			
		||||
 | 
			
		||||
    # TODO, maybe just make `._endpoints: list[IPCEndpoint]` and
 | 
			
		||||
    # TODO, maybe just make `._endpoints: list[Endpoint]` and
 | 
			
		||||
    # provide dict-views onto it?
 | 
			
		||||
    # @property
 | 
			
		||||
    # def addrs2eps(self) -> dict[Address, IPCEndpoint]:
 | 
			
		||||
    # def addrs2eps(self) -> dict[Address, Endpoint]:
 | 
			
		||||
    #     ...
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
| 
						 | 
				
			
			@ -708,7 +708,7 @@ class IPCServer(Struct):
 | 
			
		|||
            await self._shutdown.wait()
 | 
			
		||||
        else:
 | 
			
		||||
            tpt_protos: list[str] = []
 | 
			
		||||
            ep: IPCEndpoint
 | 
			
		||||
            ep: Endpoint
 | 
			
		||||
            for ep in self._endpoints:
 | 
			
		||||
                tpt_protos.append(ep.addr.proto_key)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -790,7 +790,7 @@ class IPCServer(Struct):
 | 
			
		|||
 | 
			
		||||
    def epsdict(self) -> dict[
 | 
			
		||||
        Address,
 | 
			
		||||
        IPCEndpoint,
 | 
			
		||||
        Endpoint,
 | 
			
		||||
    ]:
 | 
			
		||||
        return {
 | 
			
		||||
            ep.addr: ep
 | 
			
		||||
| 
						 | 
				
			
			@ -804,7 +804,7 @@ class IPCServer(Struct):
 | 
			
		|||
        return ev.is_set()
 | 
			
		||||
 | 
			
		||||
    def pformat(self) -> str:
 | 
			
		||||
        eps: list[IPCEndpoint] = self._endpoints
 | 
			
		||||
        eps: list[Endpoint] = self._endpoints
 | 
			
		||||
 | 
			
		||||
        state_repr: str = (
 | 
			
		||||
            f'{len(eps)!r} IPC-endpoints active'
 | 
			
		||||
| 
						 | 
				
			
			@ -835,13 +835,13 @@ class IPCServer(Struct):
 | 
			
		|||
 | 
			
		||||
    # TODO? maybe allow shutting down a `.listen_on()`s worth of
 | 
			
		||||
    # listeners by cancelling the corresponding
 | 
			
		||||
    # `IPCEndpoint._listen_tn` only ?
 | 
			
		||||
    # `Endpoint._listen_tn` only ?
 | 
			
		||||
    # -[ ] in theory you could use this to
 | 
			
		||||
    #     "boot-and-wait-for-reconnect" of all current and connecting
 | 
			
		||||
    #     peers?
 | 
			
		||||
    #  |_ would require that the stream-handler is intercepted so we
 | 
			
		||||
    #     can intercept every `MsgTransport` (stream) and track per
 | 
			
		||||
    #     `IPCEndpoint` likely?
 | 
			
		||||
    #     `Endpoint` likely?
 | 
			
		||||
    #
 | 
			
		||||
    # async def unlisten(
 | 
			
		||||
    #     self,
 | 
			
		||||
| 
						 | 
				
			
			@ -854,7 +854,7 @@ class IPCServer(Struct):
 | 
			
		|||
        *,
 | 
			
		||||
        accept_addrs: list[tuple[str, int|str]]|None = None,
 | 
			
		||||
        stream_handler_nursery: Nursery|None = None,
 | 
			
		||||
    ) -> list[IPCEndpoint]:
 | 
			
		||||
    ) -> list[Endpoint]:
 | 
			
		||||
        '''
 | 
			
		||||
        Start `SocketListeners` (i.e. bind and call `socket.listen()`)
 | 
			
		||||
        for all IPC-transport-protocol specific `Address`-types
 | 
			
		||||
| 
						 | 
				
			
			@ -888,7 +888,7 @@ class IPCServer(Struct):
 | 
			
		|||
            f'Binding to endpoints for,\n'
 | 
			
		||||
            f'{accept_addrs}\n'
 | 
			
		||||
        )
 | 
			
		||||
        eps: list[IPCEndpoint] = await self._parent_tn.start(
 | 
			
		||||
        eps: list[Endpoint] = await self._parent_tn.start(
 | 
			
		||||
            partial(
 | 
			
		||||
                _serve_ipc_eps,
 | 
			
		||||
                server=self,
 | 
			
		||||
| 
						 | 
				
			
			@ -904,7 +904,7 @@ class IPCServer(Struct):
 | 
			
		|||
        self._endpoints.extend(eps)
 | 
			
		||||
        # XXX, just a little bit of sanity
 | 
			
		||||
        group_tn: Nursery|None = None
 | 
			
		||||
        ep: IPCEndpoint
 | 
			
		||||
        ep: Endpoint
 | 
			
		||||
        for ep in eps:
 | 
			
		||||
            if ep.addr not in self.addrs:
 | 
			
		||||
                breakpoint()
 | 
			
		||||
| 
						 | 
				
			
			@ -917,6 +917,10 @@ class IPCServer(Struct):
 | 
			
		|||
        return eps
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# alias until we decide on final naming
 | 
			
		||||
IPCServer = Server
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def _serve_ipc_eps(
 | 
			
		||||
    *,
 | 
			
		||||
    server: IPCServer,
 | 
			
		||||
| 
						 | 
				
			
			@ -941,12 +945,12 @@ async def _serve_ipc_eps(
 | 
			
		|||
        listen_tn: Nursery
 | 
			
		||||
        async with trio.open_nursery() as listen_tn:
 | 
			
		||||
 | 
			
		||||
            eps: list[IPCEndpoint] = []
 | 
			
		||||
            eps: list[Endpoint] = []
 | 
			
		||||
            # XXX NOTE, required to call `serve_listeners()` below.
 | 
			
		||||
            # ?TODO, maybe just pass `list(eps.values()` tho?
 | 
			
		||||
            listeners: list[trio.abc.Listener] = []
 | 
			
		||||
            for addr in listen_addrs:
 | 
			
		||||
                ep = IPCEndpoint(
 | 
			
		||||
                ep = Endpoint(
 | 
			
		||||
                    addr=addr,
 | 
			
		||||
                    listen_tn=listen_tn,
 | 
			
		||||
                    stream_handler_tn=stream_handler_tn,
 | 
			
		||||
| 
						 | 
				
			
			@ -1010,7 +1014,7 @@ async def _serve_ipc_eps(
 | 
			
		|||
    finally:
 | 
			
		||||
        if eps:
 | 
			
		||||
            addr: Address
 | 
			
		||||
            ep: IPCEndpoint
 | 
			
		||||
            ep: Endpoint
 | 
			
		||||
            for addr, ep in server.epsdict().items():
 | 
			
		||||
                ep.close_listener()
 | 
			
		||||
                server._endpoints.remove(ep)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue