diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a96d9ef..d954d79 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,6 +1,11 @@ name: CI -on: push +on: + # any time someone pushes a new branch to origin + push: + + # Allows you to run this workflow manually from the Actions tab + workflow_dispatch: jobs: diff --git a/nooz/295.misc.rst b/nooz/295.misc.rst index 1724e06..74a0f23 100644 --- a/nooz/295.misc.rst +++ b/nooz/295.misc.rst @@ -1,3 +1,3 @@ -Add an experimental `tractor.msg.NamespacePath` type for passing Python +Add an experimental ``tractor.msg.NamespacePath`` type for passing Python objects by "reference" through a ``str``-subtype message and using the new ``pkgutil.resolve_name()`` for reference loading. diff --git a/nooz/300.misc.rst b/nooz/300.misc.rst index 0fe5c38..4953a68 100644 --- a/nooz/300.misc.rst +++ b/nooz/300.misc.rst @@ -1,3 +1,3 @@ -Update to and pin latest `msgpack` (1.0.3) and `msgspec` (0.4.0) +Update to and pin latest ``msgpack`` (1.0.3) and ``msgspec`` (0.4.0) both of which required adjustments for backwards imcompatible API tweaks. diff --git a/nooz/303.misc.rst b/nooz/303.misc.rst new file mode 100644 index 0000000..9464fd5 --- /dev/null +++ b/nooz/303.misc.rst @@ -0,0 +1,4 @@ +Fence off ``multiprocessing`` imports until absolutely necessary in an +effort to avoid "resource tracker" spawning side effects that seem to +have varying degrees of unreliability per Python release. Port to new +``msgspec.DecodeError``. diff --git a/tractor/_ipc.py b/tractor/_ipc.py index b996c5f..4d62f1d 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -264,7 +264,7 @@ class MsgspecTCPStream(MsgpackTCPStream): try: yield self.decode(msg_bytes) except ( - msgspec.DecodingError, + msgspec.DecodeError, UnicodeDecodeError, ): if not last_decode_failed: diff --git a/tractor/_mp_fixup_main.py b/tractor/_mp_fixup_main.py index 9d0352b..11d5f1c 100644 --- a/tractor/_mp_fixup_main.py +++ b/tractor/_mp_fixup_main.py @@ -18,9 +18,9 @@ Helpers pulled mostly verbatim from ``multiprocessing.spawn`` to aid with "fixing up" the ``__main__`` module in subprocesses. -These helpers are needed for any spawing backend that doesn't already handle this. -For example when using ``trio_run_in_process`` it is needed but obviously not when -we're already using ``multiprocessing``. +These helpers are needed for any spawing backend that doesn't already +handle this. For example when using ``trio_run_in_process`` it is needed +but obviously not when we're already using ``multiprocessing``. """ import os @@ -28,13 +28,12 @@ import sys import platform import types import runpy -from typing import Dict ORIGINAL_DIR = os.path.abspath(os.getcwd()) -def _mp_figure_out_main() -> Dict[str, str]: +def _mp_figure_out_main() -> dict[str, str]: """Taken from ``multiprocessing.spawn.get_preparation_data()``. Retrieve parent actor `__main__` module data. diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 3d7e6b1..06f6532 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -18,30 +18,17 @@ Machinery for actor process spawning using multiple backends. """ +from __future__ import annotations import sys -import multiprocessing as mp import platform from typing import ( - Any, Dict, Optional, Callable, - TypeVar, + Any, Optional, Callable, TypeVar, TYPE_CHECKING ) from collections.abc import Awaitable import trio from trio_typing import TaskStatus -try: - from multiprocessing import semaphore_tracker # type: ignore - resource_tracker = semaphore_tracker - resource_tracker._resource_tracker = resource_tracker._semaphore_tracker -except ImportError: - # 3.8 introduces a more general version that also tracks shared mems - from multiprocessing import resource_tracker # type: ignore - -from multiprocessing import forkserver # type: ignore -from typing import Tuple - -from . import _forkserver_override from ._debug import ( maybe_wait_for_debugger, acquire_debug_lock, @@ -60,8 +47,11 @@ from ._entry import _mp_main from ._exceptions import ActorFailure +if TYPE_CHECKING: + import multiprocessing as mp + ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) + log = get_logger('tractor') -ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) # placeholder for an mp start context if so using that backend _ctx: Optional[mp.context.BaseContext] = None @@ -70,6 +60,7 @@ _spawn_method: str = "trio" if platform.system() == 'Windows': + import multiprocessing as mp _ctx = mp.get_context("spawn") async def proc_waiter(proc: mp.Process) -> None: @@ -92,6 +83,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: ``subprocess.Popen``. ''' + import multiprocessing as mp global _ctx global _spawn_method @@ -108,6 +100,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: f"Spawn method `{name}` is invalid please choose one of {methods}" ) elif name == 'forkserver': + from . import _forkserver_override _forkserver_override.override_stdlib() _ctx = mp.get_context(name) elif name == 'trio': @@ -155,7 +148,7 @@ async def cancel_on_completion( portal: Portal, actor: Actor, - errors: Dict[Tuple[str, str], Exception], + errors: dict[tuple[str, str], Exception], ) -> None: ''' @@ -258,12 +251,12 @@ async def new_proc( name: str, actor_nursery: 'ActorNursery', # type: ignore # noqa subactor: Actor, - errors: Dict[Tuple[str, str], Exception], + errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: Tuple[str, int], - parent_addr: Tuple[str, int], - _runtime_vars: Dict[str, Any], # serialized and sent to _child + bind_addr: tuple[str, int], + parent_addr: tuple[str, int], + _runtime_vars: dict[str, Any], # serialized and sent to _child *, @@ -408,7 +401,8 @@ async def new_proc( if is_root_process(): await maybe_wait_for_debugger( - child_in_debug=_runtime_vars.get('_debug_mode', False), + child_in_debug=_runtime_vars.get( + '_debug_mode', False), ) if proc.poll() is None: @@ -447,20 +441,30 @@ async def mp_new_proc( name: str, actor_nursery: 'ActorNursery', # type: ignore # noqa subactor: Actor, - errors: Dict[Tuple[str, str], Exception], + errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addr: Tuple[str, int], - parent_addr: Tuple[str, int], - _runtime_vars: Dict[str, Any], # serialized and sent to _child + bind_addr: tuple[str, int], + parent_addr: tuple[str, int], + _runtime_vars: dict[str, Any], # serialized and sent to _child *, infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: + # uggh zone + try: + from multiprocessing import semaphore_tracker # type: ignore + resource_tracker = semaphore_tracker + resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa + except ImportError: + # 3.8 introduces a more general version that also tracks shared mems + from multiprocessing import resource_tracker # type: ignore + assert _ctx start_method = _ctx.get_start_method() if start_method == 'forkserver': + from multiprocessing import forkserver # type: ignore # XXX do our hackery on the stdlib to avoid multiple # forkservers (one at each subproc layer). fs = forkserver._forkserver diff --git a/tractor/_state.py b/tractor/_state.py index 919c0cf..073bc99 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -20,7 +20,6 @@ Per process state """ from typing import Optional, Dict, Any from collections.abc import Mapping -import multiprocessing as mp import trio @@ -71,6 +70,7 @@ class ActorContextInfo(Mapping): def is_main_process() -> bool: """Bool determining if this actor is running in the top-most process. """ + import multiprocessing as mp return mp.current_process().name == 'MainProcess' diff --git a/tractor/_supervise.py b/tractor/_supervise.py index f2d907d..958d445 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -20,8 +20,7 @@ """ from functools import partial import inspect -import multiprocessing as mp -from typing import Tuple, List, Dict, Optional +from typing import Tuple, List, Dict, Optional, TYPE_CHECKING import typing import warnings @@ -39,6 +38,9 @@ from . import _state from . import _spawn +if TYPE_CHECKING: + import multiprocessing as mp + log = get_logger(__name__) _default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)