forked from goodboy/tractor
1
0
Fork 0

Merge pull request #335 from goodboy/spawn_backend_table

Spawn backend table
callable_key_maybe_open_context
goodboy 2022-10-09 21:26:28 -04:00 committed by GitHub
commit 9e6266dda3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 367 additions and 347 deletions

View File

@ -26,7 +26,7 @@ jobs:
run: pip install -U . --upgrade-strategy eager -r requirements-test.txt run: pip install -U . --upgrade-strategy eager -r requirements-test.txt
- name: Run MyPy check - name: Run MyPy check
run: mypy tractor/ --ignore-missing-imports run: mypy tractor/ --ignore-missing-imports --show-traceback
# test that we can generate a software distribution and install it # test that we can generate a software distribution and install it
# thus avoid missing file issues after packaging. # thus avoid missing file issues after packaging.
@ -60,7 +60,11 @@ jobs:
matrix: matrix:
os: [ubuntu-latest] os: [ubuntu-latest]
python: ['3.10'] python: ['3.10']
spawn_backend: ['trio', 'mp'] spawn_backend: [
'trio',
'mp_spawn',
'mp_forkserver',
]
steps: steps:

View File

@ -0,0 +1,5 @@
Establish an explicit "backend spawning" method table; use it from CI
More clearly lays out the current set of (3) backends: ``['trio',
'mp_spawn', 'mp_forkserver']`` and adjusts the ``._spawn.py`` internals
as well as the test suite to accommodate.

View File

@ -2,7 +2,7 @@ pytest
pytest-trio pytest-trio
pytest-timeout pytest-timeout
pdbpp pdbpp
mypy<0.920 mypy
trio_typing<0.7.0 trio_typing
pexpect pexpect
towncrier towncrier

View File

@ -38,7 +38,6 @@ setup(
'tractor', 'tractor',
'tractor.experimental', 'tractor.experimental',
'tractor.trionics', 'tractor.trionics',
'tractor.testing',
], ],
install_requires=[ install_requires=[

View File

@ -8,15 +8,89 @@ import random
import signal import signal
import platform import platform
import time import time
import inspect
from functools import partial, wraps
import pytest import pytest
import trio
import tractor import tractor
# export for tests
from tractor.testing import tractor_test # noqa
pytest_plugins = ['pytester'] pytest_plugins = ['pytester']
def tractor_test(fn):
"""
Use:
@tractor_test
async def test_whatever():
await ...
If fixtures:
- ``arb_addr`` (a socket addr tuple where arbiter is listening)
- ``loglevel`` (logging level passed to tractor internals)
- ``start_method`` (subprocess spawning backend)
are defined in the `pytest` fixture space they will be automatically
injected to tests declaring these funcargs.
"""
@wraps(fn)
def wrapper(
*args,
loglevel=None,
arb_addr=None,
start_method=None,
**kwargs
):
# __tracebackhide__ = True
if 'arb_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well
# as `run()`
kwargs['arb_addr'] = arb_addr
if 'loglevel' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
kwargs['loglevel'] = loglevel
if start_method is None:
if platform.system() == "Windows":
start_method = 'trio'
if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['start_method'] = start_method
if kwargs:
# use explicit root actor start
async def _main():
async with tractor.open_root_actor(
# **kwargs,
arbiter_addr=arb_addr,
loglevel=loglevel,
start_method=start_method,
# TODO: only enable when pytest is passed --pdb
# debug_mode=True,
):
await fn(*args, **kwargs)
main = _main
else:
# use implicit root actor start
main = partial(fn, *args, **kwargs)
return trio.run(main)
return wrapper
_arb_addr = '127.0.0.1', random.randint(1000, 9999) _arb_addr = '127.0.0.1', random.randint(1000, 9999)
@ -64,11 +138,7 @@ def pytest_addoption(parser):
def pytest_configure(config): def pytest_configure(config):
backend = config.option.spawn_backend backend = config.option.spawn_backend
tractor._spawn.try_set_start_method(backend)
if backend == 'mp':
tractor._spawn.try_set_start_method('spawn')
elif backend == 'trio':
tractor._spawn.try_set_start_method(backend)
@pytest.fixture(scope='session', autouse=True) @pytest.fixture(scope='session', autouse=True)
@ -102,24 +172,24 @@ def arb_addr():
def pytest_generate_tests(metafunc): def pytest_generate_tests(metafunc):
spawn_backend = metafunc.config.option.spawn_backend spawn_backend = metafunc.config.option.spawn_backend
if not spawn_backend: if not spawn_backend:
# XXX some weird windows bug with `pytest`? # XXX some weird windows bug with `pytest`?
spawn_backend = 'mp' spawn_backend = 'trio'
assert spawn_backend in ('mp', 'trio')
# TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
assert spawn_backend in (
'mp_spawn',
'mp_forkserver',
'trio',
)
# NOTE: used to be used to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect
# that cli input to be manually specified, BUT, maybe we'll do
# something like this again in the future?
if 'start_method' in metafunc.fixturenames: if 'start_method' in metafunc.fixturenames:
if spawn_backend == 'mp': metafunc.parametrize("start_method", [spawn_backend], scope='module')
from multiprocessing import get_all_start_methods
methods = get_all_start_methods()
if 'fork' in methods:
# fork not available on windows, so check before
# removing XXX: the fork method is in general
# incompatible with trio's global scheduler state
methods.remove('fork')
elif spawn_backend == 'trio':
methods = ['trio']
metafunc.parametrize("start_method", methods, scope='module')
def sig_prog(proc, sig): def sig_prog(proc, sig):

View File

@ -435,7 +435,7 @@ def test_cancel_via_SIGINT(
with trio.fail_after(2): with trio.fail_after(2):
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as tn:
await tn.start_actor('sucka') await tn.start_actor('sucka')
if spawn_backend == 'mp': if 'mp' in spawn_backend:
time.sleep(0.1) time.sleep(0.1)
os.kill(pid, signal.SIGINT) os.kill(pid, signal.SIGINT)
await trio.sleep_forever() await trio.sleep_forever()
@ -474,7 +474,7 @@ def test_cancel_via_SIGINT_other_task(
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
await n.start(spawn_and_sleep_forever) await n.start(spawn_and_sleep_forever)
if spawn_backend == 'mp': if 'mp' in spawn_backend:
time.sleep(0.1) time.sleep(0.1)
os.kill(pid, signal.SIGINT) os.kill(pid, signal.SIGINT)

View File

@ -4,9 +4,10 @@ from itertools import cycle
import pytest import pytest
import trio import trio
import tractor import tractor
from tractor.testing import tractor_test
from tractor.experimental import msgpub from tractor.experimental import msgpub
from conftest import tractor_test
def test_type_checks(): def test_type_checks():

View File

@ -142,7 +142,7 @@ def test_loglevel_propagated_to_subactor(
capfd, capfd,
arb_addr, arb_addr,
): ):
if start_method == 'forkserver': if start_method == 'mp_forkserver':
pytest.skip( pytest.skip(
"a bug with `capfd` seems to make forkserver capture not work?") "a bug with `capfd` seems to make forkserver capture not work?")

View File

@ -7,9 +7,10 @@ import platform
import trio import trio
import tractor import tractor
from tractor.testing import tractor_test
import pytest import pytest
from conftest import tractor_test
def test_must_define_ctx(): def test_must_define_ctx():

View File

@ -18,15 +18,28 @@
Sub-process entry points. Sub-process entry points.
""" """
from __future__ import annotations
from functools import partial from functools import partial
from typing import Any from typing import (
Any,
TYPE_CHECKING,
)
import trio # type: ignore import trio # type: ignore
from .log import get_console_log, get_logger from .log import (
get_console_log,
get_logger,
)
from . import _state from . import _state
from .to_asyncio import run_as_asyncio_guest from .to_asyncio import run_as_asyncio_guest
from ._runtime import async_main, Actor from ._runtime import (
async_main,
Actor,
)
if TYPE_CHECKING:
from ._spawn import SpawnMethodKey
log = get_logger(__name__) log = get_logger(__name__)
@ -34,10 +47,10 @@ log = get_logger(__name__)
def _mp_main( def _mp_main(
actor: 'Actor', # type: ignore actor: Actor, # type: ignore
accept_addr: tuple[str, int], accept_addr: tuple[str, int],
forkserver_info: tuple[Any, Any, Any, Any, Any], forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: str, start_method: SpawnMethodKey,
parent_addr: tuple[str, int] = None, parent_addr: tuple[str, int] = None,
infect_asyncio: bool = False, infect_asyncio: bool = False,

View File

@ -62,7 +62,7 @@ async def open_root_actor(
# either the `multiprocessing` start method: # either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default). # OR `trio` (the new default).
start_method: Optional[str] = None, start_method: Optional[_spawn.SpawnMethodKey] = None,
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,

View File

@ -22,7 +22,12 @@ from __future__ import annotations
import sys import sys
import platform import platform
from typing import ( from typing import (
Any, Optional, Callable, TypeVar, TYPE_CHECKING Any,
Literal,
Optional,
Callable,
TypeVar,
TYPE_CHECKING,
) )
from collections.abc import Awaitable from collections.abc import Awaitable
@ -39,7 +44,6 @@ from ._state import (
is_root_process, is_root_process,
debug_mode, debug_mode,
) )
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._runtime import Actor from ._runtime import Actor
@ -48,6 +52,7 @@ from ._exceptions import ActorFailure
if TYPE_CHECKING: if TYPE_CHECKING:
from ._supervise import ActorNursery
import multiprocessing as mp import multiprocessing as mp
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
@ -55,7 +60,12 @@ log = get_logger('tractor')
# placeholder for an mp start context if so using that backend # placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None _ctx: Optional[mp.context.BaseContext] = None
_spawn_method: str = "trio" SpawnMethodKey = Literal[
'trio', # supported on all platforms
'mp_spawn',
'mp_forkserver', # posix only
]
_spawn_method: SpawnMethodKey = 'trio'
if platform.system() == 'Windows': if platform.system() == 'Windows':
@ -72,7 +82,10 @@ else:
await trio.lowlevel.wait_readable(proc.sentinel) await trio.lowlevel.wait_readable(proc.sentinel)
def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: def try_set_start_method(
key: SpawnMethodKey
) -> Optional[mp.context.BaseContext]:
''' '''
Attempt to set the method for process starting, aka the "actor Attempt to set the method for process starting, aka the "actor
spawning backend". spawning backend".
@ -87,28 +100,30 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
global _ctx global _ctx
global _spawn_method global _spawn_method
methods = mp.get_all_start_methods() mp_methods = mp.get_all_start_methods()
if 'fork' in methods: if 'fork' in mp_methods:
# forking is incompatible with ``trio``s global task tree # forking is incompatible with ``trio``s global task tree
methods.remove('fork') mp_methods.remove('fork')
# supported on all platforms match key:
methods += ['trio'] case 'mp_forkserver':
from . import _forkserver_override
_forkserver_override.override_stdlib()
_ctx = mp.get_context('forkserver')
if name not in methods: case 'mp_spawn':
raise ValueError( _ctx = mp.get_context('spawn')
f"Spawn method `{name}` is invalid please choose one of {methods}"
)
elif name == 'forkserver':
from . import _forkserver_override
_forkserver_override.override_stdlib()
_ctx = mp.get_context(name)
elif name == 'trio':
_ctx = None
else:
_ctx = mp.get_context(name)
_spawn_method = name case 'trio':
_ctx = None
case _:
raise ValueError(
f'Spawn method `{key}` is invalid!\n'
f'Please choose one of {SpawnMethodKey}'
)
_spawn_method = key
return _ctx return _ctx
@ -247,9 +262,8 @@ async def soft_wait(
async def new_proc( async def new_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa actor_nursery: ActorNursery,
subactor: Actor, subactor: Actor,
errors: dict[tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
@ -263,6 +277,41 @@ async def new_proc(
infect_asyncio: bool = False, infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
# lookup backend spawning target
target = _methods[_spawn_method]
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
await target(
name,
actor_nursery,
subactor,
errors,
bind_addr,
parent_addr,
_runtime_vars, # run time vars
infect_asyncio=infect_asyncio,
task_status=task_status,
)
async def trio_proc(
name: str,
actor_nursery: ActorNursery,
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
bind_addr: tuple[str, int],
parent_addr: tuple[str, int],
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
''' '''
Create a new ``Process`` using a "spawn method" as (configured using Create a new ``Process`` using a "spawn method" as (configured using
@ -272,178 +321,159 @@ async def new_proc(
here is to be considered the core supervision strategy. here is to be considered the core supervision strategy.
''' '''
# mark the new actor with the global spawn method spawn_cmd = [
subactor._spawn_method = _spawn_method sys.executable,
uid = subactor.uid "-m",
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
# We provide the child's unique identifier on this exec/spawn
# line for debugging purposes when viewing the process tree from
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr",
str(parent_addr)
]
if _spawn_method == 'trio': if subactor.loglevel:
spawn_cmd = [ spawn_cmd += [
sys.executable, "--loglevel",
"-m", subactor.loglevel
# Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583
"tractor._child",
# We provide the child's unique identifier on this exec/spawn
# line for debugging purposes when viewing the process tree from
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid",
str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr",
str(parent_addr)
] ]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
if subactor.loglevel: cancelled_during_spawn: bool = False
spawn_cmd += [ proc: Optional[trio.Process] = None
"--loglevel", try:
subactor.loglevel
]
# Tell child to run in guest mode on top of ``asyncio`` loop
if infect_asyncio:
spawn_cmd.append("--asyncio")
cancelled_during_spawn: bool = False
proc: Optional[trio.Process] = None
try: try:
try: # TODO: needs ``trio_typing`` patch?
# TODO: needs ``trio_typing`` patch? proc = await trio.lowlevel.open_process( # type: ignore
proc = await trio.lowlevel.open_process(spawn_cmd) # type: ignore spawn_cmd)
log.runtime(f"Started {proc}") log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid) subactor.uid)
except trio.Cancelled: except trio.Cancelled:
cancelled_during_spawn = True cancelled_during_spawn = True
# we may cancel before the child connects back in which # we may cancel before the child connects back in which
# case avoid clobbering the pdb tty. # case avoid clobbering the pdb tty.
if debug_mode(): if debug_mode():
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if is_root_process():
await maybe_wait_for_debugger()
elif proc is not None:
async with acquire_debug_lock(uid):
# soft wait on the proc to terminate
with trio.move_on_after(0.5):
await proc.wait()
raise
# a sub-proc ref **must** exist now
assert proc
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (
subactor, proc, portal)
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
})
# track subactor in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up
task_status.started(portal)
# wait for ActorNursery.wait() to be called
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
trio.Process.wait,
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
nursery.cancel_scope.cancel()
finally:
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {uid}')
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
if cancelled_during_spawn: if is_root_process():
# Try again to avoid TTY clobbering. await maybe_wait_for_debugger()
async with acquire_debug_lock(uid):
elif proc is not None:
async with acquire_debug_lock(subactor.uid):
# soft wait on the proc to terminate
with trio.move_on_after(0.5): with trio.move_on_after(0.5):
await proc.wait() await proc.wait()
raise
if is_root_process(): # a sub-proc ref **must** exist now
await maybe_wait_for_debugger( assert proc
child_in_debug=_runtime_vars.get(
'_debug_mode', False),
)
if proc.poll() is None: portal = Portal(chan)
log.cancel(f"Attempting to hard kill {proc}") actor_nursery._children[subactor.uid] = (
await do_hard_kill(proc) subactor,
proc,
log.debug(f"Joined {proc}") portal,
else:
log.warning('Nursery cancelled before sub-proc started')
if not cancelled_during_spawn:
# pop child entry to indicate we no longer managing this
# subactor
actor_nursery._children.pop(subactor.uid)
else:
# `multiprocessing`
# async with trio.open_nursery() as nursery:
await mp_new_proc(
name=name,
actor_nursery=actor_nursery,
subactor=subactor,
errors=errors,
# passed through to actor main
bind_addr=bind_addr,
parent_addr=parent_addr,
_runtime_vars=_runtime_vars,
infect_asyncio=infect_asyncio,
task_status=task_status,
) )
# send additional init params
await chan.send({
"_parent_main_data": subactor._parent_main_data,
"enable_modules": subactor.enable_modules,
"_arb_addr": subactor._arb_addr,
"bind_host": bind_addr[0],
"bind_port": bind_addr[1],
"_runtime_vars": _runtime_vars,
})
async def mp_new_proc( # track subactor in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up
task_status.started(portal)
# wait for ActorNursery.wait() to be called
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
cancel_on_completion,
portal,
subactor,
errors
)
# This is a "soft" (cancellable) join/reap which
# will remote cancel the actor on a ``trio.Cancelled``
# condition.
await soft_wait(
proc,
trio.Process.wait,
portal
)
# cancel result waiter that may have been spawned in
# tandem if not done already
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
nursery.cancel_scope.cancel()
finally:
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if cancelled_during_spawn:
# Try again to avoid TTY clobbering.
async with acquire_debug_lock(subactor.uid):
with trio.move_on_after(0.5):
await proc.wait()
if is_root_process():
await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get(
'_debug_mode', False),
)
if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}")
await do_hard_kill(proc)
log.debug(f"Joined {proc}")
else:
log.warning('Nursery cancelled before sub-proc started')
if not cancelled_during_spawn:
# pop child entry to indicate we no longer managing this
# subactor
actor_nursery._children.pop(subactor.uid)
async def mp_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa actor_nursery: ActorNursery, # type: ignore # noqa
subactor: Actor, subactor: Actor,
errors: dict[tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
@ -468,6 +498,7 @@ async def mp_new_proc(
assert _ctx assert _ctx
start_method = _ctx.get_start_method() start_method = _ctx.get_start_method()
if start_method == 'forkserver': if start_method == 'forkserver':
from multiprocessing import forkserver # type: ignore from multiprocessing import forkserver # type: ignore
# XXX do our hackery on the stdlib to avoid multiple # XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer). # forkservers (one at each subproc layer).
@ -480,23 +511,24 @@ async def mp_new_proc(
# forkserver.set_forkserver_preload(enable_modules) # forkserver.set_forkserver_preload(enable_modules)
forkserver.ensure_running() forkserver.ensure_running()
fs_info = ( fs_info = (
fs._forkserver_address, fs._forkserver_address, # type: ignore # noqa
fs._forkserver_alive_fd, fs._forkserver_alive_fd, # type: ignore # noqa
getattr(fs, '_forkserver_pid', None), getattr(fs, '_forkserver_pid', None),
getattr( getattr(
resource_tracker._resource_tracker, '_pid', None), resource_tracker._resource_tracker, '_pid', None),
resource_tracker._resource_tracker._fd, resource_tracker._resource_tracker._fd,
) )
else: else: # request to forkerserver to fork a new child
assert curr_actor._forkserver_info assert curr_actor._forkserver_info
fs_info = ( fs_info = (
fs._forkserver_address, fs._forkserver_address, # type: ignore # noqa
fs._forkserver_alive_fd, fs._forkserver_alive_fd, # type: ignore # noqa
fs._forkserver_pid, fs._forkserver_pid, # type: ignore # noqa
resource_tracker._resource_tracker._pid, resource_tracker._resource_tracker._pid,
resource_tracker._resource_tracker._fd, resource_tracker._resource_tracker._fd,
) = curr_actor._forkserver_info ) = curr_actor._forkserver_info
else: else:
# spawn method
fs_info = (None, None, None, None, None) fs_info = (None, None, None, None, None)
proc: mp.Process = _ctx.Process( # type: ignore proc: mp.Process = _ctx.Process( # type: ignore
@ -505,7 +537,7 @@ async def mp_new_proc(
subactor, subactor,
bind_addr, bind_addr,
fs_info, fs_info,
start_method, _spawn_method,
parent_addr, parent_addr,
infect_asyncio, infect_asyncio,
), ),
@ -599,4 +631,16 @@ async def mp_new_proc(
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing subactor # pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid) actor_nursery._children.pop(subactor.uid)
# TODO: prolly report to ``mypy`` how this causes all sorts of
# false errors..
# subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# proc spawning backend target map
_methods: dict[SpawnMethodKey, Callable] = {
'trio': trio_proc,
'mp_spawn': mp_proc,
'mp_forkserver': mp_proc,
}

View File

@ -90,7 +90,11 @@ class ActorNursery:
self._da_nursery = da_nursery self._da_nursery = da_nursery
self._children: dict[ self._children: dict[
tuple[str, str], tuple[str, str],
tuple[Actor, mp.Process, Optional[Portal]] tuple[
Actor,
trio.Process | mp.Process,
Optional[Portal],
]
] = {} ] = {}
# portals spawned with ``run_in_actor()`` are # portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives # cancelled when their "main" result arrives

View File

@ -1,17 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from ._tractor_test import tractor_test

View File

@ -1,104 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import inspect
import platform
from functools import partial, wraps
import trio
import tractor
__all__ = ['tractor_test']
def tractor_test(fn):
"""
Use:
@tractor_test
async def test_whatever():
await ...
If fixtures:
- ``arb_addr`` (a socket addr tuple where arbiter is listening)
- ``loglevel`` (logging level passed to tractor internals)
- ``start_method`` (subprocess spawning backend)
are defined in the `pytest` fixture space they will be automatically
injected to tests declaring these funcargs.
"""
@wraps(fn)
def wrapper(
*args,
loglevel=None,
arb_addr=None,
start_method=None,
**kwargs
):
# __tracebackhide__ = True
if 'arb_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well
# as `run()`
kwargs['arb_addr'] = arb_addr
if 'loglevel' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
kwargs['loglevel'] = loglevel
if start_method is None:
if platform.system() == "Windows":
start_method = 'spawn'
else:
start_method = 'trio'
if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['start_method'] = start_method
if kwargs:
# use explicit root actor start
async def _main():
async with tractor.open_root_actor(
# **kwargs,
arbiter_addr=arb_addr,
loglevel=loglevel,
start_method=start_method,
# TODO: only enable when pytest is passed --pdb
# debug_mode=True,
) as actor:
await fn(*args, **kwargs)
main = _main
else:
# use implicit root actor start
main = partial(fn, *args, **kwargs)
return trio.run(main)
# arbiter_addr=arb_addr,
# loglevel=loglevel,
# start_method=start_method,
# )
return wrapper