forked from goodboy/tractor
1
0
Fork 0

Merge pull request #303 from goodboy/fence_mp

Fence mp
sort_subs_results_infected_aio
goodboy 2022-04-12 10:13:57 -04:00 committed by GitHub
commit 1109d96263
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 52 additions and 38 deletions

View File

@ -1,6 +1,11 @@
name: CI name: CI
on: push on:
# any time someone pushes a new branch to origin
push:
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:
jobs: jobs:

View File

@ -1,3 +1,3 @@
Add an experimental `tractor.msg.NamespacePath` type for passing Python Add an experimental ``tractor.msg.NamespacePath`` type for passing Python
objects by "reference" through a ``str``-subtype message and using the objects by "reference" through a ``str``-subtype message and using the
new ``pkgutil.resolve_name()`` for reference loading. new ``pkgutil.resolve_name()`` for reference loading.

View File

@ -1,3 +1,3 @@
Update to and pin latest `msgpack` (1.0.3) and `msgspec` (0.4.0) Update to and pin latest ``msgpack`` (1.0.3) and ``msgspec`` (0.4.0)
both of which required adjustments for backwards imcompatible API both of which required adjustments for backwards imcompatible API
tweaks. tweaks.

View File

@ -0,0 +1,4 @@
Fence off ``multiprocessing`` imports until absolutely necessary in an
effort to avoid "resource tracker" spawning side effects that seem to
have varying degrees of unreliability per Python release. Port to new
``msgspec.DecodeError``.

View File

@ -264,7 +264,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
try: try:
yield self.decode(msg_bytes) yield self.decode(msg_bytes)
except ( except (
msgspec.DecodingError, msgspec.DecodeError,
UnicodeDecodeError, UnicodeDecodeError,
): ):
if not last_decode_failed: if not last_decode_failed:

View File

@ -18,9 +18,9 @@
Helpers pulled mostly verbatim from ``multiprocessing.spawn`` Helpers pulled mostly verbatim from ``multiprocessing.spawn``
to aid with "fixing up" the ``__main__`` module in subprocesses. to aid with "fixing up" the ``__main__`` module in subprocesses.
These helpers are needed for any spawing backend that doesn't already handle this. These helpers are needed for any spawing backend that doesn't already
For example when using ``trio_run_in_process`` it is needed but obviously not when handle this. For example when using ``trio_run_in_process`` it is needed
we're already using ``multiprocessing``. but obviously not when we're already using ``multiprocessing``.
""" """
import os import os
@ -28,13 +28,12 @@ import sys
import platform import platform
import types import types
import runpy import runpy
from typing import Dict
ORIGINAL_DIR = os.path.abspath(os.getcwd()) ORIGINAL_DIR = os.path.abspath(os.getcwd())
def _mp_figure_out_main() -> Dict[str, str]: def _mp_figure_out_main() -> dict[str, str]:
"""Taken from ``multiprocessing.spawn.get_preparation_data()``. """Taken from ``multiprocessing.spawn.get_preparation_data()``.
Retrieve parent actor `__main__` module data. Retrieve parent actor `__main__` module data.

View File

@ -18,30 +18,17 @@
Machinery for actor process spawning using multiple backends. Machinery for actor process spawning using multiple backends.
""" """
from __future__ import annotations
import sys import sys
import multiprocessing as mp
import platform import platform
from typing import ( from typing import (
Any, Dict, Optional, Callable, Any, Optional, Callable, TypeVar, TYPE_CHECKING
TypeVar,
) )
from collections.abc import Awaitable from collections.abc import Awaitable
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
from multiprocessing import forkserver # type: ignore
from typing import Tuple
from . import _forkserver_override
from ._debug import ( from ._debug import (
maybe_wait_for_debugger, maybe_wait_for_debugger,
acquire_debug_lock, acquire_debug_lock,
@ -60,9 +47,12 @@ from ._entry import _mp_main
from ._exceptions import ActorFailure from ._exceptions import ActorFailure
log = get_logger('tractor') if TYPE_CHECKING:
import multiprocessing as mp
ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
log = get_logger('tractor')
# placeholder for an mp start context if so using that backend # placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None _ctx: Optional[mp.context.BaseContext] = None
_spawn_method: str = "trio" _spawn_method: str = "trio"
@ -70,6 +60,7 @@ _spawn_method: str = "trio"
if platform.system() == 'Windows': if platform.system() == 'Windows':
import multiprocessing as mp
_ctx = mp.get_context("spawn") _ctx = mp.get_context("spawn")
async def proc_waiter(proc: mp.Process) -> None: async def proc_waiter(proc: mp.Process) -> None:
@ -92,6 +83,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
``subprocess.Popen``. ``subprocess.Popen``.
''' '''
import multiprocessing as mp
global _ctx global _ctx
global _spawn_method global _spawn_method
@ -108,6 +100,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
f"Spawn method `{name}` is invalid please choose one of {methods}" f"Spawn method `{name}` is invalid please choose one of {methods}"
) )
elif name == 'forkserver': elif name == 'forkserver':
from . import _forkserver_override
_forkserver_override.override_stdlib() _forkserver_override.override_stdlib()
_ctx = mp.get_context(name) _ctx = mp.get_context(name)
elif name == 'trio': elif name == 'trio':
@ -155,7 +148,7 @@ async def cancel_on_completion(
portal: Portal, portal: Portal,
actor: Actor, actor: Actor,
errors: Dict[Tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
) -> None: ) -> None:
''' '''
@ -258,12 +251,12 @@ async def new_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor, subactor: Actor,
errors: Dict[Tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addr: Tuple[str, int], bind_addr: tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: dict[str, Any], # serialized and sent to _child
*, *,
@ -408,7 +401,8 @@ async def new_proc(
if is_root_process(): if is_root_process():
await maybe_wait_for_debugger( await maybe_wait_for_debugger(
child_in_debug=_runtime_vars.get('_debug_mode', False), child_in_debug=_runtime_vars.get(
'_debug_mode', False),
) )
if proc.poll() is None: if proc.poll() is None:
@ -447,20 +441,30 @@ async def mp_new_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor, subactor: Actor,
errors: Dict[Tuple[str, str], Exception], errors: dict[tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addr: Tuple[str, int], bind_addr: tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: dict[str, Any], # serialized and sent to _child
*, *,
infect_asyncio: bool = False, infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
# uggh zone
try:
from multiprocessing import semaphore_tracker # type: ignore
resource_tracker = semaphore_tracker
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa
except ImportError:
# 3.8 introduces a more general version that also tracks shared mems
from multiprocessing import resource_tracker # type: ignore
assert _ctx assert _ctx
start_method = _ctx.get_start_method() start_method = _ctx.get_start_method()
if start_method == 'forkserver': if start_method == 'forkserver':
from multiprocessing import forkserver # type: ignore
# XXX do our hackery on the stdlib to avoid multiple # XXX do our hackery on the stdlib to avoid multiple
# forkservers (one at each subproc layer). # forkservers (one at each subproc layer).
fs = forkserver._forkserver fs = forkserver._forkserver

View File

@ -20,7 +20,6 @@ Per process state
""" """
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
from collections.abc import Mapping from collections.abc import Mapping
import multiprocessing as mp
import trio import trio
@ -71,6 +70,7 @@ class ActorContextInfo(Mapping):
def is_main_process() -> bool: def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process. """Bool determining if this actor is running in the top-most process.
""" """
import multiprocessing as mp
return mp.current_process().name == 'MainProcess' return mp.current_process().name == 'MainProcess'

View File

@ -20,8 +20,7 @@
""" """
from functools import partial from functools import partial
import inspect import inspect
import multiprocessing as mp from typing import Tuple, List, Dict, Optional, TYPE_CHECKING
from typing import Tuple, List, Dict, Optional
import typing import typing
import warnings import warnings
@ -39,6 +38,9 @@ from . import _state
from . import _spawn from . import _spawn
if TYPE_CHECKING:
import multiprocessing as mp
log = get_logger(__name__) log = get_logger(__name__)
_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0) _default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)