Merge pull request #322 from goodboy/we_bein_all_matchy

3.10 and friends
lifetime_stack_tests
goodboy 2022-09-15 23:49:34 -04:00 committed by GitHub
commit 368e9f3f7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 623 additions and 578 deletions

View File

@ -396,7 +396,7 @@ tasks spawned via multiple RPC calls to an actor can modify
# a per process cache
_actor_cache: Dict[str, bool] = {}
_actor_cache: dict[str, bool] = {}
def ping_endpoints(endpoints: List[str]):

View File

@ -9,7 +9,7 @@ is ``tractor``'s channels.
"""
from contextlib import asynccontextmanager
from typing import List, Callable
from typing import Callable
import itertools
import math
import time
@ -71,8 +71,8 @@ async def worker_pool(workers=4):
async def _map(
worker_func: Callable[[int], bool],
sequence: List[int]
) -> List[bool]:
sequence: list[int]
) -> list[bool]:
# define an async (local) task to collect results from workers
async def send_result(func, value, portal):

View File

@ -0,0 +1,16 @@
Strictly support Python 3.10+, start runtime machinery reorg
Since we want to push forward using the new `match:` syntax for our
internal RPC-msg loops, we officially drop 3.9 support for the next
release which should coincide well with the first release of 3.11.
This patch set also officially removes the ``tractor.run()`` API (which
has been deprecated for some time) as well as starts an initial re-org
of the internal runtime core by:
- renaming ``tractor._actor`` -> ``._runtime``
- moving the ``._runtime.ActorActor._process_messages()`` and
``._async_main()`` to be module level singleton-task-functions since
they are only started once for each connection and actor spawn
respectively; this internal API thus looks more similar to (at the
time of writing) the ``trio``-internals in ``trio._core._run``.
- officially remove ``tractor.run()``, now deprecated for some time.

View File

@ -25,7 +25,7 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup(
name="tractor",
version='0.1.0a5', # alpha zone
version='0.1.0a6dev0', # alpha zone
description='structured concurrrent "actors"',
long_description=readme,
license='AGPLv3',
@ -55,11 +55,13 @@ setup(
'colorlog',
'wrapt',
# serialization
'msgspec',
# pip ref docs on these specs:
# https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples
# and pep:
# https://peps.python.org/pep-0440/#version-specifiers
'pdbpp <= 0.10.1; python_version < "3.10"',
# windows deps workaround for ``pdbpp``
# https://github.com/pdbpp/pdbpp/issues/498
@ -71,9 +73,6 @@ setup(
# we need a specific patch on master atm.
'pdbpp @ git+https://github.com/pdbpp/pdbpp@76c4be5#egg=pdbpp ; python_version > "3.9"', # noqa: E501
# serialization
'msgspec >= "0.4.0"'
],
tests_require=['pytest'],
python_requires=">=3.9",
@ -94,7 +93,6 @@ setup(
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",

View File

@ -5,7 +5,6 @@ Advanced streaming patterns using bidirectional streams and contexts.
from collections import Counter
import itertools
import platform
from typing import Set, Dict, List
import trio
import tractor
@ -15,7 +14,7 @@ def is_win():
return platform.system() == 'Windows'
_registry: Dict[str, Set[tractor.ReceiveMsgStream]] = {
_registry: dict[str, set[tractor.ReceiveMsgStream]] = {
'even': set(),
'odd': set(),
}
@ -77,7 +76,7 @@ async def subscribe(
async def consumer(
subs: List[str],
subs: list[str],
) -> None:

View File

@ -571,7 +571,7 @@ def test_one_end_stream_not_opened(overrun_by):
'''
overrunner, buf_size_increase, entrypoint = overrun_by
from tractor._actor import Actor
from tractor._runtime import Actor
buf_size = buf_size_increase + Actor.msg_buffer_size
async def main():

View File

@ -23,13 +23,6 @@ async def test_no_arbitter():
pass
def test_no_main():
"""An async function **must** be passed to ``tractor.run()``.
"""
with pytest.raises(TypeError):
tractor.run(None)
@tractor_test
async def test_self_is_registered(arb_addr):
"Verify waiting on the arbiter to register itself using the standard api."

View File

@ -1,7 +1,7 @@
"""
Spawning basics
"""
from typing import Dict, Tuple, Optional
from typing import Optional
import pytest
import trio
@ -14,8 +14,8 @@ data_to_pass_down = {'doggy': 10, 'kitty': 4}
async def spawn(
is_arbiter: bool,
data: Dict,
arb_addr: Tuple[str, int],
data: dict,
arb_addr: tuple[str, int],
):
namespaces = [__name__]

View File

@ -6,7 +6,7 @@ from contextlib import asynccontextmanager
from functools import partial
from itertools import cycle
import time
from typing import Optional, List, Tuple
from typing import Optional
import pytest
import trio
@ -62,8 +62,8 @@ async def ensure_sequence(
@asynccontextmanager
async def open_sequence_streamer(
sequence: List[int],
arb_addr: Tuple[str, int],
sequence: list[int],
arb_addr: tuple[str, int],
start_method: str,
) -> tractor.MsgStream:

View File

@ -36,7 +36,10 @@ from ._discovery import (
query_actor,
)
from ._supervise import open_nursery
from ._state import current_actor, is_root_process
from ._state import (
current_actor,
is_root_process,
)
from ._exceptions import (
RemoteActorError,
ModuleNotExposed,
@ -44,11 +47,16 @@ from ._exceptions import (
)
from ._debug import breakpoint, post_mortem
from . import msg
from ._root import run, run_daemon, open_root_actor
from ._root import (
run_daemon,
open_root_actor,
)
from ._portal import Portal
from ._runtime import Actor
__all__ = [
'Actor',
'Channel',
'Context',
'ContextCancelled',
@ -70,7 +78,6 @@ __all__ = [
'open_root_actor',
'post_mortem',
'query_actor',
'run',
'run_daemon',
'stream',
'to_asyncio',

View File

@ -24,7 +24,7 @@ import argparse
from ast import literal_eval
from ._actor import Actor
from ._runtime import Actor
from ._entry import _trio_main

View File

@ -25,7 +25,6 @@ import signal
from functools import partial
from contextlib import asynccontextmanager as acm
from typing import (
Tuple,
Optional,
Callable,
AsyncIterator,
@ -74,7 +73,7 @@ class Lock:
local_task_in_debug: Optional[str] = None
# actor tree-wide actor uid that supposedly has the tty lock
global_actor_in_debug: Optional[Tuple[str, str]] = None
global_actor_in_debug: Optional[tuple[str, str]] = None
local_pdb_complete: Optional[trio.Event] = None
no_remote_has_tty: Optional[trio.Event] = None
@ -172,7 +171,7 @@ class MultiActorPdb(pdbpp.Pdb):
@acm
async def _acquire_debug_lock_from_root_task(
uid: Tuple[str, str]
uid: tuple[str, str]
) -> AsyncIterator[trio.StrictFIFOLock]:
'''
@ -252,7 +251,7 @@ async def _acquire_debug_lock_from_root_task(
async def lock_tty_for_child(
ctx: tractor.Context,
subactor_uid: Tuple[str, str]
subactor_uid: tuple[str, str]
) -> str:
'''
@ -302,7 +301,7 @@ async def lock_tty_for_child(
async def wait_for_parent_stdin_hijack(
actor_uid: Tuple[str, str],
actor_uid: tuple[str, str],
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED
):
'''
@ -643,7 +642,7 @@ def shield_sigint(
def _set_trace(
actor: Optional[tractor._actor.Actor] = None,
actor: Optional[tractor.Actor] = None,
pdb: Optional[MultiActorPdb] = None,
):
__tracebackhide__ = True
@ -676,7 +675,7 @@ breakpoint = partial(
def _post_mortem(
actor: tractor._actor.Actor,
actor: tractor.Actor,
pdb: MultiActorPdb,
) -> None:
@ -732,7 +731,7 @@ async def _maybe_enter_pm(err):
@acm
async def acquire_debug_lock(
subactor_uid: Tuple[str, str],
subactor_uid: tuple[str, str],
) -> AsyncGenerator[None, tuple]:
'''
Grab root's debug lock on entry, release on exit.

View File

@ -18,7 +18,11 @@
Actor discovery API.
"""
from typing import Tuple, Optional, Union, AsyncGenerator
from typing import (
Optional,
Union,
AsyncGenerator,
)
from contextlib import asynccontextmanager as acm
from ._ipc import _connect_chan, Channel
@ -104,7 +108,7 @@ async def query_actor(
@acm
async def find_actor(
name: str,
arbiter_sockaddr: Tuple[str, int] = None
arbiter_sockaddr: tuple[str, int] = None
) -> AsyncGenerator[Optional[Portal], None]:
'''
@ -130,7 +134,7 @@ async def find_actor(
@acm
async def wait_for_actor(
name: str,
arbiter_sockaddr: Tuple[str, int] = None
arbiter_sockaddr: tuple[str, int] = None
) -> AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter.

View File

@ -19,14 +19,14 @@ Sub-process entry points.
"""
from functools import partial
from typing import Tuple, Any
import signal
from typing import Any
import trio # type: ignore
from .log import get_console_log, get_logger
from . import _state
from .to_asyncio import run_as_asyncio_guest
from ._runtime import async_main, Actor
log = get_logger(__name__)
@ -35,10 +35,10 @@ log = get_logger(__name__)
def _mp_main(
actor: 'Actor', # type: ignore
accept_addr: Tuple[str, int],
forkserver_info: Tuple[Any, Any, Any, Any, Any],
accept_addr: tuple[str, int],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: str,
parent_addr: Tuple[str, int] = None,
parent_addr: tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
@ -63,7 +63,8 @@ def _mp_main(
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
actor._async_main,
async_main,
actor,
accept_addr,
parent_addr=parent_addr
)
@ -82,9 +83,9 @@ def _mp_main(
def _trio_main(
actor: 'Actor', # type: ignore
actor: Actor, # type: ignore
*,
parent_addr: Tuple[str, int] = None,
parent_addr: tuple[str, int] = None,
infect_asyncio: bool = False,
) -> None:
@ -106,7 +107,8 @@ def _trio_main(
log.debug(f"parent_addr is {parent_addr}")
trio_main = partial(
actor._async_main,
async_main,
actor,
parent_addr=parent_addr
)

View File

@ -18,7 +18,11 @@
Our classy exception set.
"""
from typing import Dict, Any, Optional, Type
from typing import (
Any,
Optional,
Type,
)
import importlib
import builtins
import traceback
@ -95,7 +99,7 @@ def pack_error(
exc: BaseException,
tb=None,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Create an "error message" for tranmission over
a channel (aka the wire).
"""
@ -114,7 +118,7 @@ def pack_error(
def unpack_error(
msg: Dict[str, Any],
msg: dict[str, Any],
chan=None,
err_type=RemoteActorError

View File

@ -611,9 +611,11 @@ async def open_portal(
msg_loop_cs: Optional[trio.CancelScope] = None
if start_msg_loop:
from ._runtime import process_messages
msg_loop_cs = await nursery.start(
partial(
actor._process_messages,
process_messages,
actor,
channel,
# if the local task is cancelled we want to keep
# the msg loop running until our block ends

View File

@ -23,13 +23,15 @@ from functools import partial
import importlib
import logging
import os
from typing import Tuple, Optional, List, Any
from typing import (
Optional,
)
import typing
import warnings
import trio
from ._actor import Actor, Arbiter
from ._runtime import Actor, Arbiter, async_main
from . import _debug
from . import _spawn
from . import _state
@ -50,7 +52,7 @@ logger = log.get_logger('tractor')
async def open_root_actor(
# defaults are above
arbiter_addr: Optional[Tuple[str, int]] = (
arbiter_addr: Optional[tuple[str, int]] = (
_default_arbiter_host,
_default_arbiter_port,
),
@ -68,8 +70,8 @@ async def open_root_actor(
# internal logging
loglevel: Optional[str] = None,
enable_modules: Optional[List] = None,
rpc_module_paths: Optional[List] = None,
enable_modules: Optional[list] = None,
rpc_module_paths: Optional[list] = None,
) -> typing.Any:
"""Async entry point for ``tractor``.
@ -188,13 +190,14 @@ async def open_root_actor(
# start the actor runtime in a new task
async with trio.open_nursery() as nursery:
# ``Actor._async_main()`` creates an internal nursery and
# ``_runtime.async_main()`` creates an internal nursery and
# thus blocks here until the entire underlying actor tree has
# terminated thereby conducting structured concurrency.
await nursery.start(
partial(
actor._async_main,
async_main,
actor,
accept_addr=(host, port),
parent_addr=None
)
@ -229,28 +232,35 @@ async def open_root_actor(
logger.runtime("Root actor terminated")
def run(
# target
async_fn: typing.Callable[..., typing.Awaitable],
*args,
def run_daemon(
enable_modules: list[str],
# runtime kwargs
name: Optional[str] = 'root',
arbiter_addr: Tuple[str, int] = (
arbiter_addr: tuple[str, int] = (
_default_arbiter_host,
_default_arbiter_port,
),
start_method: Optional[str] = None,
debug_mode: bool = False,
**kwargs,
**kwargs
) -> Any:
"""Run a trio-actor async function in process.
) -> None:
'''
Spawn daemon actor which will respond to RPC; the main task simply
starts the runtime and then sleeps forever.
This is a very minimal convenience wrapper around starting
a "run-until-cancelled" root actor which can be started with a set
of enabled modules for RPC request handling.
'''
kwargs['enable_modules'] = list(enable_modules)
for path in enable_modules:
importlib.import_module(path)
This is tractor's main entry and the start point for any async actor.
"""
async def _main():
async with open_root_actor(
@ -260,35 +270,6 @@ def run(
debug_mode=debug_mode,
**kwargs,
):
return await trio.sleep_forever()
return await async_fn(*args)
warnings.warn(
"`tractor.run()` is now deprecated. `tractor` now"
" implicitly starts the root actor on first actor nursery"
" use. If you want to start the root actor manually, use"
" `tractor.open_root_actor()`.",
DeprecationWarning,
stacklevel=2,
)
return trio.run(_main)
def run_daemon(
enable_modules: list[str],
**kwargs
) -> None:
'''
Spawn daemon actor which will respond to RPC.
This is a convenience wrapper around
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
is meant to run forever responding to RPC requests.
'''
kwargs['enable_modules'] = list(enable_modules)
for path in enable_modules:
importlib.import_module(path)
return run(partial(trio.sleep, float('inf')), **kwargs)

File diff suppressed because it is too large Load Diff

View File

@ -42,7 +42,7 @@ from ._state import (
from .log import get_logger
from ._portal import Portal
from ._actor import Actor
from ._runtime import Actor
from ._entry import _mp_main
from ._exceptions import ActorFailure

View File

@ -18,7 +18,10 @@
Per process state
"""
from typing import Optional, Dict, Any
from typing import (
Optional,
Any,
)
from collections.abc import Mapping
import trio
@ -27,7 +30,7 @@ from ._exceptions import NoRuntime
_current_actor: Optional['Actor'] = None # type: ignore # noqa
_runtime_vars: Dict[str, Any] = {
_runtime_vars: dict[str, Any] = {
'_debug_mode': False,
'_is_root': False,
'_root_mailbox': (None, None)

View File

@ -23,8 +23,10 @@ import inspect
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import (
Any, Optional, Callable,
AsyncGenerator, Dict,
Any,
Optional,
Callable,
AsyncGenerator,
AsyncIterator
)
@ -393,7 +395,7 @@ class Context:
async def _maybe_raise_from_remote_msg(
self,
msg: Dict[str, Any],
msg: dict[str, Any],
) -> None:
'''

View File

@ -20,7 +20,10 @@
"""
from functools import partial
import inspect
from typing import Tuple, List, Dict, Optional, TYPE_CHECKING
from typing import (
Optional,
TYPE_CHECKING,
)
import typing
import warnings
@ -30,7 +33,7 @@ from async_generator import asynccontextmanager
from ._debug import maybe_wait_for_debugger
from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel
from ._actor import Actor
from ._runtime import Actor
from ._portal import Portal
from ._exceptions import is_multi_cancelled
from ._root import open_root_actor
@ -43,7 +46,7 @@ if TYPE_CHECKING:
log = get_logger(__name__)
_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0)
_default_bind_addr: tuple[str, int] = ('127.0.0.1', 0)
class ActorNursery:
@ -79,15 +82,15 @@ class ActorNursery:
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: Dict[Tuple[str, str], Exception],
errors: dict[tuple[str, str], Exception],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
self._ria_nursery = ria_nursery
self._da_nursery = da_nursery
self._children: Dict[
Tuple[str, str],
Tuple[Actor, mp.Process, Optional[Portal]]
self._children: dict[
tuple[str, str],
tuple[Actor, mp.Process, Optional[Portal]]
] = {}
# portals spawned with ``run_in_actor()`` are
# cancelled when their "main" result arrives
@ -102,9 +105,9 @@ class ActorNursery:
self,
name: str,
*,
bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: List[str] = None,
enable_modules: List[str] = None,
bind_addr: tuple[str, int] = _default_bind_addr,
rpc_module_paths: list[str] = None,
enable_modules: list[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
debug_mode: Optional[bool] = None,
@ -173,9 +176,9 @@ class ActorNursery:
*,
name: Optional[str] = None,
bind_addr: Tuple[str, int] = _default_bind_addr,
rpc_module_paths: Optional[List[str]] = None,
enable_modules: List[str] = None,
bind_addr: tuple[str, int] = _default_bind_addr,
rpc_module_paths: Optional[list[str]] = None,
enable_modules: list[str] = None,
loglevel: str = None, # set log level per subactor
infect_asyncio: bool = False,
@ -293,7 +296,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
) -> typing.AsyncGenerator[ActorNursery, None]:
# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}
errors: dict[tuple[str, str], Exception] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for

View File

@ -26,7 +26,10 @@ support provided by ``tractor.Context.open_stream()`` and friends.
from __future__ import annotations
import inspect
import typing
from typing import Dict, Any, Set, Callable, List, Tuple
from typing import (
Any,
Callable,
)
from functools import partial
from async_generator import aclosing
@ -44,7 +47,7 @@ log = get_logger('messaging')
async def fan_out_to_ctxs(
pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy
topics2ctxs: Dict[str, list],
topics2ctxs: dict[str, list],
packetizer: typing.Callable = None,
) -> None:
'''
@ -61,7 +64,7 @@ async def fan_out_to_ctxs(
async for published in pub_gen:
ctx_payloads: List[Tuple[Context, Any]] = []
ctx_payloads: list[tuple[Context, Any]] = []
for topic, data in published.items():
log.debug(f"publishing {topic, data}")
@ -103,8 +106,8 @@ async def fan_out_to_ctxs(
def modify_subs(
topics2ctxs: Dict[str, List[Context]],
topics: Set[str],
topics2ctxs: dict[str, list[Context]],
topics: set[str],
ctx: Context,
) -> None:
@ -136,20 +139,20 @@ def modify_subs(
topics2ctxs.pop(topic)
_pub_state: Dict[str, dict] = {}
_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {}
_pub_state: dict[str, dict] = {}
_pubtask2lock: dict[str, trio.StrictFIFOLock] = {}
def pub(
wrapped: typing.Callable = None,
*,
tasks: Set[str] = set(),
tasks: set[str] = set(),
):
"""Publisher async generator decorator.
A publisher can be called multiple times from different actors but
will only spawn a finite set of internal tasks to stream values to
each caller. The ``tasks: Set[str]`` argument to the decorator
each caller. The ``tasks: set[str]`` argument to the decorator
specifies the names of the mutex set of publisher tasks. When the
publisher function is called, an argument ``task_name`` must be
passed to specify which task (of the set named in ``tasks``) should
@ -158,9 +161,9 @@ def pub(
necessary.
Values yielded from the decorated async generator must be
``Dict[str, Dict[str, Any]]`` where the fist level key is the topic
``dict[str, dict[str, Any]]`` where the fist level key is the topic
string and determines which subscription the packet will be
delivered to and the value is a packet ``Dict[str, Any]`` by default
delivered to and the value is a packet ``dict[str, Any]`` by default
of the form:
.. ::python
@ -186,7 +189,7 @@ def pub(
The publisher must be called passing in the following arguments:
- ``topics: Set[str]`` the topic sequence or "subscriptions"
- ``topics: set[str]`` the topic sequence or "subscriptions"
- ``task_name: str`` the task to use (if ``tasks`` was passed)
- ``ctx: Context`` the tractor context (only needed if calling the
pub func without a nursery, otherwise this is provided implicitly)
@ -231,7 +234,7 @@ def pub(
if wrapped is None:
return partial(pub, tasks=tasks)
task2lock: Dict[str, trio.StrictFIFOLock] = {}
task2lock: dict[str, trio.StrictFIFOLock] = {}
for name in tasks:
task2lock[name] = trio.StrictFIFOLock()
@ -243,7 +246,7 @@ def pub(
# `wrapt` docs
async def _execute(
ctx: Context,
topics: Set[str],
topics: set[str],
*args,
# *,
task_name: str = None, # default: only one task allocated

View File

@ -24,7 +24,7 @@ Built-in messaging patterns, types, APIs and helpers.
# ``pkgutil.resolve_name()`` internally uses
# ``importlib.import_module()`` which can be filtered by inserting
# a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before
# entering the ``Actor._process_messages()`` loop).
# entering the ``_runtime.process_messages()`` loop).
# - https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645
# - https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules
# - https://stackoverflow.com/a/63320902

View File

@ -102,6 +102,8 @@ async def gather_contexts(
# deliver control once all managers have started up
await all_entered.wait()
# NOTE: order *should* be preserved in the output values
# since ``dict``s are now implicitly ordered.
yield tuple(unwrapped.values())
# we don't need a try/finally since cancellation will be triggered