diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 15a2763..9d91f17 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -26,7 +26,7 @@ jobs:
run: pip install -U . --upgrade-strategy eager -r requirements-test.txt
- name: Run MyPy check
- run: mypy tractor/ --ignore-missing-imports
+ run: mypy tractor/ --ignore-missing-imports --show-traceback
# test that we can generate a software distribution and install it
# thus avoid missing file issues after packaging.
@@ -60,7 +60,11 @@ jobs:
matrix:
os: [ubuntu-latest]
python: ['3.10']
- spawn_backend: ['trio', 'mp']
+ spawn_backend: [
+ 'trio',
+ 'mp_spawn',
+ 'mp_forkserver',
+ ]
steps:
diff --git a/nooz/335.trivial.rst b/nooz/335.trivial.rst
new file mode 100644
index 0000000..1030f4d
--- /dev/null
+++ b/nooz/335.trivial.rst
@@ -0,0 +1,5 @@
+Establish an explicit "backend spawning" method table; use it from CI
+
+More clearly lays out the current set of (3) backends: ``['trio',
+'mp_spawn', 'mp_forkserver']`` and adjusts the ``._spawn.py`` internals
+as well as the test suite to accommodate.
diff --git a/requirements-test.txt b/requirements-test.txt
index c2b43c1..579b6f0 100644
--- a/requirements-test.txt
+++ b/requirements-test.txt
@@ -2,7 +2,7 @@ pytest
pytest-trio
pytest-timeout
pdbpp
-mypy<0.920
-trio_typing<0.7.0
+mypy
+trio_typing
pexpect
towncrier
diff --git a/setup.py b/setup.py
index 7a46d98..7e966ec 100755
--- a/setup.py
+++ b/setup.py
@@ -38,7 +38,6 @@ setup(
'tractor',
'tractor.experimental',
'tractor.trionics',
- 'tractor.testing',
],
install_requires=[
diff --git a/tests/conftest.py b/tests/conftest.py
index 3739eae..9b26de6 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -8,15 +8,89 @@ import random
import signal
import platform
import time
+import inspect
+from functools import partial, wraps
import pytest
+import trio
import tractor
-# export for tests
-from tractor.testing import tractor_test # noqa
-
-
pytest_plugins = ['pytester']
+
+
+def tractor_test(fn):
+ """
+ Use:
+
+ @tractor_test
+ async def test_whatever():
+ await ...
+
+ If fixtures:
+
+ - ``arb_addr`` (a socket addr tuple where arbiter is listening)
+ - ``loglevel`` (logging level passed to tractor internals)
+ - ``start_method`` (subprocess spawning backend)
+
+ are defined in the `pytest` fixture space they will be automatically
+ injected to tests declaring these funcargs.
+ """
+ @wraps(fn)
+ def wrapper(
+ *args,
+ loglevel=None,
+ arb_addr=None,
+ start_method=None,
+ **kwargs
+ ):
+ # __tracebackhide__ = True
+
+ if 'arb_addr' in inspect.signature(fn).parameters:
+ # injects test suite fixture value to test as well
+ # as `run()`
+ kwargs['arb_addr'] = arb_addr
+
+ if 'loglevel' in inspect.signature(fn).parameters:
+ # allows test suites to define a 'loglevel' fixture
+ # that activates the internal logging
+ kwargs['loglevel'] = loglevel
+
+ if start_method is None:
+ if platform.system() == "Windows":
+ start_method = 'trio'
+
+ if 'start_method' in inspect.signature(fn).parameters:
+ # set of subprocess spawning backends
+ kwargs['start_method'] = start_method
+
+ if kwargs:
+
+ # use explicit root actor start
+
+ async def _main():
+ async with tractor.open_root_actor(
+ # **kwargs,
+ arbiter_addr=arb_addr,
+ loglevel=loglevel,
+ start_method=start_method,
+
+ # TODO: only enable when pytest is passed --pdb
+ # debug_mode=True,
+
+ ):
+ await fn(*args, **kwargs)
+
+ main = _main
+
+ else:
+ # use implicit root actor start
+ main = partial(fn, *args, **kwargs)
+
+ return trio.run(main)
+
+ return wrapper
+
+
_arb_addr = '127.0.0.1', random.randint(1000, 9999)
@@ -64,11 +138,7 @@ def pytest_addoption(parser):
def pytest_configure(config):
backend = config.option.spawn_backend
-
- if backend == 'mp':
- tractor._spawn.try_set_start_method('spawn')
- elif backend == 'trio':
- tractor._spawn.try_set_start_method(backend)
+ tractor._spawn.try_set_start_method(backend)
@pytest.fixture(scope='session', autouse=True)
@@ -102,24 +172,24 @@ def arb_addr():
def pytest_generate_tests(metafunc):
spawn_backend = metafunc.config.option.spawn_backend
+
if not spawn_backend:
# XXX some weird windows bug with `pytest`?
- spawn_backend = 'mp'
- assert spawn_backend in ('mp', 'trio')
+ spawn_backend = 'trio'
+ # TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
+ assert spawn_backend in (
+ 'mp_spawn',
+ 'mp_forkserver',
+ 'trio',
+ )
+
+ # NOTE: used to be used to dyanmically parametrize tests for when
+ # you just passed --spawn-backend=`mp` on the cli, but now we expect
+ # that cli input to be manually specified, BUT, maybe we'll do
+ # something like this again in the future?
if 'start_method' in metafunc.fixturenames:
- if spawn_backend == 'mp':
- from multiprocessing import get_all_start_methods
- methods = get_all_start_methods()
- if 'fork' in methods:
- # fork not available on windows, so check before
- # removing XXX: the fork method is in general
- # incompatible with trio's global scheduler state
- methods.remove('fork')
- elif spawn_backend == 'trio':
- methods = ['trio']
-
- metafunc.parametrize("start_method", methods, scope='module')
+ metafunc.parametrize("start_method", [spawn_backend], scope='module')
def sig_prog(proc, sig):
diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py
index e709531..21681f0 100644
--- a/tests/test_cancellation.py
+++ b/tests/test_cancellation.py
@@ -435,7 +435,7 @@ def test_cancel_via_SIGINT(
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
await tn.start_actor('sucka')
- if spawn_backend == 'mp':
+ if 'mp' in spawn_backend:
time.sleep(0.1)
os.kill(pid, signal.SIGINT)
await trio.sleep_forever()
@@ -474,7 +474,7 @@ def test_cancel_via_SIGINT_other_task(
with trio.fail_after(timeout):
async with trio.open_nursery() as n:
await n.start(spawn_and_sleep_forever)
- if spawn_backend == 'mp':
+ if 'mp' in spawn_backend:
time.sleep(0.1)
os.kill(pid, signal.SIGINT)
diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py
index 390ca29..ababcb5 100644
--- a/tests/test_pubsub.py
+++ b/tests/test_pubsub.py
@@ -4,9 +4,10 @@ from itertools import cycle
import pytest
import trio
import tractor
-from tractor.testing import tractor_test
from tractor.experimental import msgpub
+from conftest import tractor_test
+
def test_type_checks():
diff --git a/tests/test_spawning.py b/tests/test_spawning.py
index 3e5ab1b..17798c0 100644
--- a/tests/test_spawning.py
+++ b/tests/test_spawning.py
@@ -142,7 +142,7 @@ def test_loglevel_propagated_to_subactor(
capfd,
arb_addr,
):
- if start_method == 'forkserver':
+ if start_method == 'mp_forkserver':
pytest.skip(
"a bug with `capfd` seems to make forkserver capture not work?")
diff --git a/tests/test_streaming.py b/tests/test_streaming.py
index 0d3ee1f..4e54e02 100644
--- a/tests/test_streaming.py
+++ b/tests/test_streaming.py
@@ -7,9 +7,10 @@ import platform
import trio
import tractor
-from tractor.testing import tractor_test
import pytest
+from conftest import tractor_test
+
def test_must_define_ctx():
diff --git a/tractor/_entry.py b/tractor/_entry.py
index 931b2e2..9e95fee 100644
--- a/tractor/_entry.py
+++ b/tractor/_entry.py
@@ -18,15 +18,28 @@
Sub-process entry points.
"""
+from __future__ import annotations
from functools import partial
-from typing import Any
+from typing import (
+ Any,
+ TYPE_CHECKING,
+)
import trio # type: ignore
-from .log import get_console_log, get_logger
+from .log import (
+ get_console_log,
+ get_logger,
+)
from . import _state
from .to_asyncio import run_as_asyncio_guest
-from ._runtime import async_main, Actor
+from ._runtime import (
+ async_main,
+ Actor,
+)
+
+if TYPE_CHECKING:
+ from ._spawn import SpawnMethodKey
log = get_logger(__name__)
@@ -34,10 +47,10 @@ log = get_logger(__name__)
def _mp_main(
- actor: 'Actor', # type: ignore
+ actor: Actor, # type: ignore
accept_addr: tuple[str, int],
forkserver_info: tuple[Any, Any, Any, Any, Any],
- start_method: str,
+ start_method: SpawnMethodKey,
parent_addr: tuple[str, int] = None,
infect_asyncio: bool = False,
diff --git a/tractor/_root.py b/tractor/_root.py
index 1def614..0e5b2aa 100644
--- a/tractor/_root.py
+++ b/tractor/_root.py
@@ -62,7 +62,7 @@ async def open_root_actor(
# either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default).
- start_method: Optional[str] = None,
+ start_method: Optional[_spawn.SpawnMethodKey] = None,
# enables the multi-process debugger support
debug_mode: bool = False,
diff --git a/tractor/_spawn.py b/tractor/_spawn.py
index 78b3ba8..4a9f118 100644
--- a/tractor/_spawn.py
+++ b/tractor/_spawn.py
@@ -22,7 +22,12 @@ from __future__ import annotations
import sys
import platform
from typing import (
- Any, Optional, Callable, TypeVar, TYPE_CHECKING
+ Any,
+ Literal,
+ Optional,
+ Callable,
+ TypeVar,
+ TYPE_CHECKING,
)
from collections.abc import Awaitable
@@ -39,7 +44,6 @@ from ._state import (
is_root_process,
debug_mode,
)
-
from .log import get_logger
from ._portal import Portal
from ._runtime import Actor
@@ -48,6 +52,7 @@ from ._exceptions import ActorFailure
if TYPE_CHECKING:
+ from ._supervise import ActorNursery
import multiprocessing as mp
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
@@ -55,7 +60,12 @@ log = get_logger('tractor')
# placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None
-_spawn_method: str = "trio"
+SpawnMethodKey = Literal[
+ 'trio', # supported on all platforms
+ 'mp_spawn',
+ 'mp_forkserver', # posix only
+]
+_spawn_method: SpawnMethodKey = 'trio'
if platform.system() == 'Windows':
@@ -72,7 +82,10 @@ else:
await trio.lowlevel.wait_readable(proc.sentinel)
-def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
+def try_set_start_method(
+ key: SpawnMethodKey
+
+) -> Optional[mp.context.BaseContext]:
'''
Attempt to set the method for process starting, aka the "actor
spawning backend".
@@ -87,28 +100,30 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
global _ctx
global _spawn_method
- methods = mp.get_all_start_methods()
- if 'fork' in methods:
+ mp_methods = mp.get_all_start_methods()
+ if 'fork' in mp_methods:
# forking is incompatible with ``trio``s global task tree
- methods.remove('fork')
+ mp_methods.remove('fork')
- # supported on all platforms
- methods += ['trio']
+ match key:
+ case 'mp_forkserver':
+ from . import _forkserver_override
+ _forkserver_override.override_stdlib()
+ _ctx = mp.get_context('forkserver')
- if name not in methods:
- raise ValueError(
- f"Spawn method `{name}` is invalid please choose one of {methods}"
- )
- elif name == 'forkserver':
- from . import _forkserver_override
- _forkserver_override.override_stdlib()
- _ctx = mp.get_context(name)
- elif name == 'trio':
- _ctx = None
- else:
- _ctx = mp.get_context(name)
+ case 'mp_spawn':
+ _ctx = mp.get_context('spawn')
- _spawn_method = name
+ case 'trio':
+ _ctx = None
+
+ case _:
+ raise ValueError(
+ f'Spawn method `{key}` is invalid!\n'
+ f'Please choose one of {SpawnMethodKey}'
+ )
+
+ _spawn_method = key
return _ctx
@@ -247,9 +262,8 @@ async def soft_wait(
async def new_proc(
-
name: str,
- actor_nursery: 'ActorNursery', # type: ignore # noqa
+ actor_nursery: ActorNursery,
subactor: Actor,
errors: dict[tuple[str, str], Exception],
@@ -263,6 +277,41 @@ async def new_proc(
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
+) -> None:
+
+ # lookup backend spawning target
+ target = _methods[_spawn_method]
+
+ # mark the new actor with the global spawn method
+ subactor._spawn_method = _spawn_method
+
+ await target(
+ name,
+ actor_nursery,
+ subactor,
+ errors,
+ bind_addr,
+ parent_addr,
+ _runtime_vars, # run time vars
+ infect_asyncio=infect_asyncio,
+ task_status=task_status,
+ )
+
+
+async def trio_proc(
+ name: str,
+ actor_nursery: ActorNursery,
+ subactor: Actor,
+ errors: dict[tuple[str, str], Exception],
+
+ # passed through to actor main
+ bind_addr: tuple[str, int],
+ parent_addr: tuple[str, int],
+ _runtime_vars: dict[str, Any], # serialized and sent to _child
+ *,
+ infect_asyncio: bool = False,
+ task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
+
) -> None:
'''
Create a new ``Process`` using a "spawn method" as (configured using
@@ -272,178 +321,159 @@ async def new_proc(
here is to be considered the core supervision strategy.
'''
- # mark the new actor with the global spawn method
- subactor._spawn_method = _spawn_method
- uid = subactor.uid
+ spawn_cmd = [
+ sys.executable,
+ "-m",
+ # Hardcode this (instead of using ``_child.__name__`` to avoid a
+ # double import warning: https://stackoverflow.com/a/45070583
+ "tractor._child",
+ # We provide the child's unique identifier on this exec/spawn
+ # line for debugging purposes when viewing the process tree from
+ # the OS; it otherwise can be passed via the parent channel if
+ # we prefer in the future (for privacy).
+ "--uid",
+ str(subactor.uid),
+ # Address the child must connect to on startup
+ "--parent_addr",
+ str(parent_addr)
+ ]
- if _spawn_method == 'trio':
- spawn_cmd = [
- sys.executable,
- "-m",
- # Hardcode this (instead of using ``_child.__name__`` to avoid a
- # double import warning: https://stackoverflow.com/a/45070583
- "tractor._child",
- # We provide the child's unique identifier on this exec/spawn
- # line for debugging purposes when viewing the process tree from
- # the OS; it otherwise can be passed via the parent channel if
- # we prefer in the future (for privacy).
- "--uid",
- str(subactor.uid),
- # Address the child must connect to on startup
- "--parent_addr",
- str(parent_addr)
+ if subactor.loglevel:
+ spawn_cmd += [
+ "--loglevel",
+ subactor.loglevel
]
+ # Tell child to run in guest mode on top of ``asyncio`` loop
+ if infect_asyncio:
+ spawn_cmd.append("--asyncio")
- if subactor.loglevel:
- spawn_cmd += [
- "--loglevel",
- subactor.loglevel
- ]
- # Tell child to run in guest mode on top of ``asyncio`` loop
- if infect_asyncio:
- spawn_cmd.append("--asyncio")
-
- cancelled_during_spawn: bool = False
- proc: Optional[trio.Process] = None
+ cancelled_during_spawn: bool = False
+ proc: Optional[trio.Process] = None
+ try:
try:
- try:
- # TODO: needs ``trio_typing`` patch?
- proc = await trio.lowlevel.open_process(spawn_cmd) # type: ignore
+ # TODO: needs ``trio_typing`` patch?
+ proc = await trio.lowlevel.open_process( # type: ignore
+ spawn_cmd)
- log.runtime(f"Started {proc}")
+ log.runtime(f"Started {proc}")
- # wait for actor to spawn and connect back to us
- # channel should have handshake completed by the
- # local actor by the time we get a ref to it
- event, chan = await actor_nursery._actor.wait_for_peer(
- subactor.uid)
+ # wait for actor to spawn and connect back to us
+ # channel should have handshake completed by the
+ # local actor by the time we get a ref to it
+ event, chan = await actor_nursery._actor.wait_for_peer(
+ subactor.uid)
- except trio.Cancelled:
- cancelled_during_spawn = True
- # we may cancel before the child connects back in which
- # case avoid clobbering the pdb tty.
- if debug_mode():
- with trio.CancelScope(shield=True):
- # don't clobber an ongoing pdb
- if is_root_process():
- await maybe_wait_for_debugger()
-
- elif proc is not None:
- async with acquire_debug_lock(uid):
- # soft wait on the proc to terminate
- with trio.move_on_after(0.5):
- await proc.wait()
- raise
-
- # a sub-proc ref **must** exist now
- assert proc
-
- portal = Portal(chan)
- actor_nursery._children[subactor.uid] = (
- subactor, proc, portal)
-
- # send additional init params
- await chan.send({
- "_parent_main_data": subactor._parent_main_data,
- "enable_modules": subactor.enable_modules,
- "_arb_addr": subactor._arb_addr,
- "bind_host": bind_addr[0],
- "bind_port": bind_addr[1],
- "_runtime_vars": _runtime_vars,
- })
-
- # track subactor in current nursery
- curr_actor = current_actor()
- curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
-
- # resume caller at next checkpoint now that child is up
- task_status.started(portal)
-
- # wait for ActorNursery.wait() to be called
- with trio.CancelScope(shield=True):
- await actor_nursery._join_procs.wait()
-
- async with trio.open_nursery() as nursery:
- if portal in actor_nursery._cancel_after_result_on_exit:
- nursery.start_soon(
- cancel_on_completion,
- portal,
- subactor,
- errors
- )
-
- # This is a "soft" (cancellable) join/reap which
- # will remote cancel the actor on a ``trio.Cancelled``
- # condition.
- await soft_wait(
- proc,
- trio.Process.wait,
- portal
- )
-
- # cancel result waiter that may have been spawned in
- # tandem if not done already
- log.warning(
- "Cancelling existing result waiter task for "
- f"{subactor.uid}")
- nursery.cancel_scope.cancel()
-
- finally:
- # The "hard" reap since no actor zombies are allowed!
- # XXX: do this **after** cancellation/tearfown to avoid
- # killing the process too early.
- if proc:
- log.cancel(f'Hard reap sequence starting for {uid}')
+ except trio.Cancelled:
+ cancelled_during_spawn = True
+ # we may cancel before the child connects back in which
+ # case avoid clobbering the pdb tty.
+ if debug_mode():
with trio.CancelScope(shield=True):
-
# don't clobber an ongoing pdb
- if cancelled_during_spawn:
- # Try again to avoid TTY clobbering.
- async with acquire_debug_lock(uid):
+ if is_root_process():
+ await maybe_wait_for_debugger()
+
+ elif proc is not None:
+ async with acquire_debug_lock(subactor.uid):
+ # soft wait on the proc to terminate
with trio.move_on_after(0.5):
await proc.wait()
+ raise
- if is_root_process():
- await maybe_wait_for_debugger(
- child_in_debug=_runtime_vars.get(
- '_debug_mode', False),
- )
+ # a sub-proc ref **must** exist now
+ assert proc
- if proc.poll() is None:
- log.cancel(f"Attempting to hard kill {proc}")
- await do_hard_kill(proc)
-
- log.debug(f"Joined {proc}")
- else:
- log.warning('Nursery cancelled before sub-proc started')
-
- if not cancelled_during_spawn:
- # pop child entry to indicate we no longer managing this
- # subactor
- actor_nursery._children.pop(subactor.uid)
-
- else:
- # `multiprocessing`
- # async with trio.open_nursery() as nursery:
- await mp_new_proc(
- name=name,
- actor_nursery=actor_nursery,
- subactor=subactor,
- errors=errors,
-
- # passed through to actor main
- bind_addr=bind_addr,
- parent_addr=parent_addr,
- _runtime_vars=_runtime_vars,
- infect_asyncio=infect_asyncio,
- task_status=task_status,
+ portal = Portal(chan)
+ actor_nursery._children[subactor.uid] = (
+ subactor,
+ proc,
+ portal,
)
+ # send additional init params
+ await chan.send({
+ "_parent_main_data": subactor._parent_main_data,
+ "enable_modules": subactor.enable_modules,
+ "_arb_addr": subactor._arb_addr,
+ "bind_host": bind_addr[0],
+ "bind_port": bind_addr[1],
+ "_runtime_vars": _runtime_vars,
+ })
-async def mp_new_proc(
+ # track subactor in current nursery
+ curr_actor = current_actor()
+ curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
+ # resume caller at next checkpoint now that child is up
+ task_status.started(portal)
+
+ # wait for ActorNursery.wait() to be called
+ with trio.CancelScope(shield=True):
+ await actor_nursery._join_procs.wait()
+
+ async with trio.open_nursery() as nursery:
+ if portal in actor_nursery._cancel_after_result_on_exit:
+ nursery.start_soon(
+ cancel_on_completion,
+ portal,
+ subactor,
+ errors
+ )
+
+ # This is a "soft" (cancellable) join/reap which
+ # will remote cancel the actor on a ``trio.Cancelled``
+ # condition.
+ await soft_wait(
+ proc,
+ trio.Process.wait,
+ portal
+ )
+
+ # cancel result waiter that may have been spawned in
+ # tandem if not done already
+ log.warning(
+ "Cancelling existing result waiter task for "
+ f"{subactor.uid}")
+ nursery.cancel_scope.cancel()
+
+ finally:
+ # The "hard" reap since no actor zombies are allowed!
+ # XXX: do this **after** cancellation/tearfown to avoid
+ # killing the process too early.
+ if proc:
+ log.cancel(f'Hard reap sequence starting for {subactor.uid}')
+ with trio.CancelScope(shield=True):
+
+ # don't clobber an ongoing pdb
+ if cancelled_during_spawn:
+ # Try again to avoid TTY clobbering.
+ async with acquire_debug_lock(subactor.uid):
+ with trio.move_on_after(0.5):
+ await proc.wait()
+
+ if is_root_process():
+ await maybe_wait_for_debugger(
+ child_in_debug=_runtime_vars.get(
+ '_debug_mode', False),
+ )
+
+ if proc.poll() is None:
+ log.cancel(f"Attempting to hard kill {proc}")
+ await do_hard_kill(proc)
+
+ log.debug(f"Joined {proc}")
+ else:
+ log.warning('Nursery cancelled before sub-proc started')
+
+ if not cancelled_during_spawn:
+ # pop child entry to indicate we no longer managing this
+ # subactor
+ actor_nursery._children.pop(subactor.uid)
+
+
+async def mp_proc(
name: str,
- actor_nursery: 'ActorNursery', # type: ignore # noqa
+ actor_nursery: ActorNursery, # type: ignore # noqa
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
@@ -468,6 +498,7 @@ async def mp_new_proc(
assert _ctx
start_method = _ctx.get_start_method()
if start_method == 'forkserver':
+
from multiprocessing import forkserver # type: ignore
# XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer).
@@ -480,23 +511,24 @@ async def mp_new_proc(
# forkserver.set_forkserver_preload(enable_modules)
forkserver.ensure_running()
fs_info = (
- fs._forkserver_address,
- fs._forkserver_alive_fd,
+ fs._forkserver_address, # type: ignore # noqa
+ fs._forkserver_alive_fd, # type: ignore # noqa
getattr(fs, '_forkserver_pid', None),
getattr(
resource_tracker._resource_tracker, '_pid', None),
resource_tracker._resource_tracker._fd,
)
- else:
+ else: # request to forkerserver to fork a new child
assert curr_actor._forkserver_info
fs_info = (
- fs._forkserver_address,
- fs._forkserver_alive_fd,
- fs._forkserver_pid,
+ fs._forkserver_address, # type: ignore # noqa
+ fs._forkserver_alive_fd, # type: ignore # noqa
+ fs._forkserver_pid, # type: ignore # noqa
resource_tracker._resource_tracker._pid,
resource_tracker._resource_tracker._fd,
) = curr_actor._forkserver_info
else:
+ # spawn method
fs_info = (None, None, None, None, None)
proc: mp.Process = _ctx.Process( # type: ignore
@@ -505,7 +537,7 @@ async def mp_new_proc(
subactor,
bind_addr,
fs_info,
- start_method,
+ _spawn_method,
parent_addr,
infect_asyncio,
),
@@ -599,4 +631,16 @@ async def mp_new_proc(
log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor
- subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
+ actor_nursery._children.pop(subactor.uid)
+
+ # TODO: prolly report to ``mypy`` how this causes all sorts of
+ # false errors..
+ # subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
+
+
+# proc spawning backend target map
+_methods: dict[SpawnMethodKey, Callable] = {
+ 'trio': trio_proc,
+ 'mp_spawn': mp_proc,
+ 'mp_forkserver': mp_proc,
+}
diff --git a/tractor/_supervise.py b/tractor/_supervise.py
index 06ee38d..4708e1e 100644
--- a/tractor/_supervise.py
+++ b/tractor/_supervise.py
@@ -90,7 +90,11 @@ class ActorNursery:
self._da_nursery = da_nursery
self._children: dict[
tuple[str, str],
- tuple[Actor, mp.Process, Optional[Portal]]
+ tuple[
+ Actor,
+ trio.Process | mp.Process,
+ Optional[Portal],
+ ]
] = {}
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
diff --git a/tractor/testing/__init__.py b/tractor/testing/__init__.py
deleted file mode 100644
index 2361f99..0000000
--- a/tractor/testing/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-# tractor: structured concurrent "actors".
-# Copyright 2018-eternity Tyler Goodlet.
-
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-
-from ._tractor_test import tractor_test
diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py
deleted file mode 100644
index ccfe349..0000000
--- a/tractor/testing/_tractor_test.py
+++ /dev/null
@@ -1,104 +0,0 @@
-# tractor: structured concurrent "actors".
-# Copyright 2018-eternity Tyler Goodlet.
-
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
-
-import inspect
-import platform
-from functools import partial, wraps
-
-import trio
-import tractor
-
-
-__all__ = ['tractor_test']
-
-
-def tractor_test(fn):
- """
- Use:
-
- @tractor_test
- async def test_whatever():
- await ...
-
- If fixtures:
-
- - ``arb_addr`` (a socket addr tuple where arbiter is listening)
- - ``loglevel`` (logging level passed to tractor internals)
- - ``start_method`` (subprocess spawning backend)
-
- are defined in the `pytest` fixture space they will be automatically
- injected to tests declaring these funcargs.
- """
- @wraps(fn)
- def wrapper(
- *args,
- loglevel=None,
- arb_addr=None,
- start_method=None,
- **kwargs
- ):
- # __tracebackhide__ = True
-
- if 'arb_addr' in inspect.signature(fn).parameters:
- # injects test suite fixture value to test as well
- # as `run()`
- kwargs['arb_addr'] = arb_addr
-
- if 'loglevel' in inspect.signature(fn).parameters:
- # allows test suites to define a 'loglevel' fixture
- # that activates the internal logging
- kwargs['loglevel'] = loglevel
-
- if start_method is None:
- if platform.system() == "Windows":
- start_method = 'spawn'
- else:
- start_method = 'trio'
-
- if 'start_method' in inspect.signature(fn).parameters:
- # set of subprocess spawning backends
- kwargs['start_method'] = start_method
-
- if kwargs:
-
- # use explicit root actor start
-
- async def _main():
- async with tractor.open_root_actor(
- # **kwargs,
- arbiter_addr=arb_addr,
- loglevel=loglevel,
- start_method=start_method,
-
- # TODO: only enable when pytest is passed --pdb
- # debug_mode=True,
-
- ) as actor:
- await fn(*args, **kwargs)
-
- main = _main
-
- else:
- # use implicit root actor start
- main = partial(fn, *args, **kwargs)
-
- return trio.run(main)
- # arbiter_addr=arb_addr,
- # loglevel=loglevel,
- # start_method=start_method,
- # )
-
- return wrapper