diff --git a/docs/index.rst b/docs/index.rst index 2bdf993..4ff198f 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -396,7 +396,7 @@ tasks spawned via multiple RPC calls to an actor can modify # a per process cache - _actor_cache: Dict[str, bool] = {} + _actor_cache: dict[str, bool] = {} def ping_endpoints(endpoints: List[str]): diff --git a/examples/parallelism/concurrent_actors_primes.py b/examples/parallelism/concurrent_actors_primes.py index a7a7396..feaaca7 100644 --- a/examples/parallelism/concurrent_actors_primes.py +++ b/examples/parallelism/concurrent_actors_primes.py @@ -9,7 +9,7 @@ is ``tractor``'s channels. """ from contextlib import asynccontextmanager -from typing import List, Callable +from typing import Callable import itertools import math import time @@ -71,8 +71,8 @@ async def worker_pool(workers=4): async def _map( worker_func: Callable[[int], bool], - sequence: List[int] - ) -> List[bool]: + sequence: list[int] + ) -> list[bool]: # define an async (local) task to collect results from workers async def send_result(func, value, portal): diff --git a/nooz/322.trivial.rst b/nooz/322.trivial.rst new file mode 100644 index 0000000..a9697a9 --- /dev/null +++ b/nooz/322.trivial.rst @@ -0,0 +1,16 @@ +Strictly support Python 3.10+, start runtime machinery reorg + +Since we want to push forward using the new `match:` syntax for our +internal RPC-msg loops, we officially drop 3.9 support for the next +release which should coincide well with the first release of 3.11. + +This patch set also officially removes the ``tractor.run()`` API (which +has been deprecated for some time) as well as starts an initial re-org +of the internal runtime core by: +- renaming ``tractor._actor`` -> ``._runtime`` +- moving the ``._runtime.ActorActor._process_messages()`` and + ``._async_main()`` to be module level singleton-task-functions since + they are only started once for each connection and actor spawn + respectively; this internal API thus looks more similar to (at the + time of writing) the ``trio``-internals in ``trio._core._run``. +- officially remove ``tractor.run()``, now deprecated for some time. diff --git a/setup.py b/setup.py index 0ecbbb4..f27a20d 100755 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ with open('docs/README.rst', encoding='utf-8') as f: setup( name="tractor", - version='0.1.0a5', # alpha zone + version='0.1.0a6dev0', # alpha zone description='structured concurrrent "actors"', long_description=readme, license='AGPLv3', @@ -55,11 +55,13 @@ setup( 'colorlog', 'wrapt', + # serialization + 'msgspec', + # pip ref docs on these specs: # https://pip.pypa.io/en/stable/reference/requirement-specifiers/#examples # and pep: # https://peps.python.org/pep-0440/#version-specifiers - 'pdbpp <= 0.10.1; python_version < "3.10"', # windows deps workaround for ``pdbpp`` # https://github.com/pdbpp/pdbpp/issues/498 @@ -71,9 +73,6 @@ setup( # we need a specific patch on master atm. 'pdbpp @ git+https://github.com/pdbpp/pdbpp@76c4be5#egg=pdbpp ; python_version > "3.9"', # noqa: E501 - # serialization - 'msgspec >= "0.4.0"' - ], tests_require=['pytest'], python_requires=">=3.9", @@ -94,7 +93,6 @@ setup( "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Intended Audience :: Science/Research", "Intended Audience :: Developers", diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index d38f243..99414a5 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -5,7 +5,6 @@ Advanced streaming patterns using bidirectional streams and contexts. from collections import Counter import itertools import platform -from typing import Set, Dict, List import trio import tractor @@ -15,7 +14,7 @@ def is_win(): return platform.system() == 'Windows' -_registry: Dict[str, Set[tractor.ReceiveMsgStream]] = { +_registry: dict[str, set[tractor.ReceiveMsgStream]] = { 'even': set(), 'odd': set(), } @@ -77,7 +76,7 @@ async def subscribe( async def consumer( - subs: List[str], + subs: list[str], ) -> None: diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index b240c19..c92c440 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -571,7 +571,7 @@ def test_one_end_stream_not_opened(overrun_by): ''' overrunner, buf_size_increase, entrypoint = overrun_by - from tractor._actor import Actor + from tractor._runtime import Actor buf_size = buf_size_increase + Actor.msg_buffer_size async def main(): diff --git a/tests/test_local.py b/tests/test_local.py index 016df3f..47a7c43 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -23,13 +23,6 @@ async def test_no_arbitter(): pass -def test_no_main(): - """An async function **must** be passed to ``tractor.run()``. - """ - with pytest.raises(TypeError): - tractor.run(None) - - @tractor_test async def test_self_is_registered(arb_addr): "Verify waiting on the arbiter to register itself using the standard api." diff --git a/tests/test_spawning.py b/tests/test_spawning.py index e624da3..f1679f6 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -1,7 +1,7 @@ """ Spawning basics """ -from typing import Dict, Tuple, Optional +from typing import Optional import pytest import trio @@ -14,8 +14,8 @@ data_to_pass_down = {'doggy': 10, 'kitty': 4} async def spawn( is_arbiter: bool, - data: Dict, - arb_addr: Tuple[str, int], + data: dict, + arb_addr: tuple[str, int], ): namespaces = [__name__] diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 9b4258e..1e2f6b4 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -6,7 +6,7 @@ from contextlib import asynccontextmanager from functools import partial from itertools import cycle import time -from typing import Optional, List, Tuple +from typing import Optional import pytest import trio @@ -62,8 +62,8 @@ async def ensure_sequence( @asynccontextmanager async def open_sequence_streamer( - sequence: List[int], - arb_addr: Tuple[str, int], + sequence: list[int], + arb_addr: tuple[str, int], start_method: str, ) -> tractor.MsgStream: diff --git a/tractor/__init__.py b/tractor/__init__.py index 85d759c..c4d40f8 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -36,7 +36,10 @@ from ._discovery import ( query_actor, ) from ._supervise import open_nursery -from ._state import current_actor, is_root_process +from ._state import ( + current_actor, + is_root_process, +) from ._exceptions import ( RemoteActorError, ModuleNotExposed, @@ -44,11 +47,16 @@ from ._exceptions import ( ) from ._debug import breakpoint, post_mortem from . import msg -from ._root import run, run_daemon, open_root_actor +from ._root import ( + run_daemon, + open_root_actor, +) from ._portal import Portal +from ._runtime import Actor __all__ = [ + 'Actor', 'Channel', 'Context', 'ContextCancelled', @@ -70,7 +78,6 @@ __all__ = [ 'open_root_actor', 'post_mortem', 'query_actor', - 'run', 'run_daemon', 'stream', 'to_asyncio', diff --git a/tractor/_child.py b/tractor/_child.py index 7790731..91aaec4 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -24,7 +24,7 @@ import argparse from ast import literal_eval -from ._actor import Actor +from ._runtime import Actor from ._entry import _trio_main diff --git a/tractor/_debug.py b/tractor/_debug.py index 9f8a704..46b69f0 100644 --- a/tractor/_debug.py +++ b/tractor/_debug.py @@ -25,7 +25,6 @@ import signal from functools import partial from contextlib import asynccontextmanager as acm from typing import ( - Tuple, Optional, Callable, AsyncIterator, @@ -74,7 +73,7 @@ class Lock: local_task_in_debug: Optional[str] = None # actor tree-wide actor uid that supposedly has the tty lock - global_actor_in_debug: Optional[Tuple[str, str]] = None + global_actor_in_debug: Optional[tuple[str, str]] = None local_pdb_complete: Optional[trio.Event] = None no_remote_has_tty: Optional[trio.Event] = None @@ -172,7 +171,7 @@ class MultiActorPdb(pdbpp.Pdb): @acm async def _acquire_debug_lock_from_root_task( - uid: Tuple[str, str] + uid: tuple[str, str] ) -> AsyncIterator[trio.StrictFIFOLock]: ''' @@ -252,7 +251,7 @@ async def _acquire_debug_lock_from_root_task( async def lock_tty_for_child( ctx: tractor.Context, - subactor_uid: Tuple[str, str] + subactor_uid: tuple[str, str] ) -> str: ''' @@ -302,7 +301,7 @@ async def lock_tty_for_child( async def wait_for_parent_stdin_hijack( - actor_uid: Tuple[str, str], + actor_uid: tuple[str, str], task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED ): ''' @@ -643,7 +642,7 @@ def shield_sigint( def _set_trace( - actor: Optional[tractor._actor.Actor] = None, + actor: Optional[tractor.Actor] = None, pdb: Optional[MultiActorPdb] = None, ): __tracebackhide__ = True @@ -676,7 +675,7 @@ breakpoint = partial( def _post_mortem( - actor: tractor._actor.Actor, + actor: tractor.Actor, pdb: MultiActorPdb, ) -> None: @@ -732,7 +731,7 @@ async def _maybe_enter_pm(err): @acm async def acquire_debug_lock( - subactor_uid: Tuple[str, str], + subactor_uid: tuple[str, str], ) -> AsyncGenerator[None, tuple]: ''' Grab root's debug lock on entry, release on exit. diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 25951b3..58aeb3e 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -18,7 +18,11 @@ Actor discovery API. """ -from typing import Tuple, Optional, Union, AsyncGenerator +from typing import ( + Optional, + Union, + AsyncGenerator, +) from contextlib import asynccontextmanager as acm from ._ipc import _connect_chan, Channel @@ -104,7 +108,7 @@ async def query_actor( @acm async def find_actor( name: str, - arbiter_sockaddr: Tuple[str, int] = None + arbiter_sockaddr: tuple[str, int] = None ) -> AsyncGenerator[Optional[Portal], None]: ''' @@ -130,7 +134,7 @@ async def find_actor( @acm async def wait_for_actor( name: str, - arbiter_sockaddr: Tuple[str, int] = None + arbiter_sockaddr: tuple[str, int] = None ) -> AsyncGenerator[Portal, None]: """Wait on an actor to register with the arbiter. diff --git a/tractor/_entry.py b/tractor/_entry.py index c860f2b..931b2e2 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -19,14 +19,14 @@ Sub-process entry points. """ from functools import partial -from typing import Tuple, Any -import signal +from typing import Any import trio # type: ignore from .log import get_console_log, get_logger from . import _state from .to_asyncio import run_as_asyncio_guest +from ._runtime import async_main, Actor log = get_logger(__name__) @@ -35,10 +35,10 @@ log = get_logger(__name__) def _mp_main( actor: 'Actor', # type: ignore - accept_addr: Tuple[str, int], - forkserver_info: Tuple[Any, Any, Any, Any, Any], + accept_addr: tuple[str, int], + forkserver_info: tuple[Any, Any, Any, Any, Any], start_method: str, - parent_addr: Tuple[str, int] = None, + parent_addr: tuple[str, int] = None, infect_asyncio: bool = False, ) -> None: @@ -63,7 +63,8 @@ def _mp_main( log.debug(f"parent_addr is {parent_addr}") trio_main = partial( - actor._async_main, + async_main, + actor, accept_addr, parent_addr=parent_addr ) @@ -82,9 +83,9 @@ def _mp_main( def _trio_main( - actor: 'Actor', # type: ignore + actor: Actor, # type: ignore *, - parent_addr: Tuple[str, int] = None, + parent_addr: tuple[str, int] = None, infect_asyncio: bool = False, ) -> None: @@ -106,7 +107,8 @@ def _trio_main( log.debug(f"parent_addr is {parent_addr}") trio_main = partial( - actor._async_main, + async_main, + actor, parent_addr=parent_addr ) diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index f3beb5a..9ce59e8 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -18,7 +18,11 @@ Our classy exception set. """ -from typing import Dict, Any, Optional, Type +from typing import ( + Any, + Optional, + Type, +) import importlib import builtins import traceback @@ -95,7 +99,7 @@ def pack_error( exc: BaseException, tb=None, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Create an "error message" for tranmission over a channel (aka the wire). """ @@ -114,7 +118,7 @@ def pack_error( def unpack_error( - msg: Dict[str, Any], + msg: dict[str, Any], chan=None, err_type=RemoteActorError diff --git a/tractor/_portal.py b/tractor/_portal.py index c7c8700..94a285b 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -611,9 +611,11 @@ async def open_portal( msg_loop_cs: Optional[trio.CancelScope] = None if start_msg_loop: + from ._runtime import process_messages msg_loop_cs = await nursery.start( partial( - actor._process_messages, + process_messages, + actor, channel, # if the local task is cancelled we want to keep # the msg loop running until our block ends diff --git a/tractor/_root.py b/tractor/_root.py index b2bfbfc..1def614 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -23,13 +23,15 @@ from functools import partial import importlib import logging import os -from typing import Tuple, Optional, List, Any +from typing import ( + Optional, +) import typing import warnings import trio -from ._actor import Actor, Arbiter +from ._runtime import Actor, Arbiter, async_main from . import _debug from . import _spawn from . import _state @@ -50,7 +52,7 @@ logger = log.get_logger('tractor') async def open_root_actor( # defaults are above - arbiter_addr: Optional[Tuple[str, int]] = ( + arbiter_addr: Optional[tuple[str, int]] = ( _default_arbiter_host, _default_arbiter_port, ), @@ -68,8 +70,8 @@ async def open_root_actor( # internal logging loglevel: Optional[str] = None, - enable_modules: Optional[List] = None, - rpc_module_paths: Optional[List] = None, + enable_modules: Optional[list] = None, + rpc_module_paths: Optional[list] = None, ) -> typing.Any: """Async entry point for ``tractor``. @@ -188,13 +190,14 @@ async def open_root_actor( # start the actor runtime in a new task async with trio.open_nursery() as nursery: - # ``Actor._async_main()`` creates an internal nursery and + # ``_runtime.async_main()`` creates an internal nursery and # thus blocks here until the entire underlying actor tree has # terminated thereby conducting structured concurrency. await nursery.start( partial( - actor._async_main, + async_main, + actor, accept_addr=(host, port), parent_addr=None ) @@ -229,28 +232,35 @@ async def open_root_actor( logger.runtime("Root actor terminated") -def run( - - # target - async_fn: typing.Callable[..., typing.Awaitable], - *args, +def run_daemon( + enable_modules: list[str], # runtime kwargs name: Optional[str] = 'root', - arbiter_addr: Tuple[str, int] = ( + arbiter_addr: tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port, ), start_method: Optional[str] = None, debug_mode: bool = False, - **kwargs, + **kwargs -) -> Any: - """Run a trio-actor async function in process. +) -> None: + ''' + Spawn daemon actor which will respond to RPC; the main task simply + starts the runtime and then sleeps forever. + + This is a very minimal convenience wrapper around starting + a "run-until-cancelled" root actor which can be started with a set + of enabled modules for RPC request handling. + + ''' + kwargs['enable_modules'] = list(enable_modules) + + for path in enable_modules: + importlib.import_module(path) - This is tractor's main entry and the start point for any async actor. - """ async def _main(): async with open_root_actor( @@ -260,35 +270,6 @@ def run( debug_mode=debug_mode, **kwargs, ): + return await trio.sleep_forever() - return await async_fn(*args) - - warnings.warn( - "`tractor.run()` is now deprecated. `tractor` now" - " implicitly starts the root actor on first actor nursery" - " use. If you want to start the root actor manually, use" - " `tractor.open_root_actor()`.", - DeprecationWarning, - stacklevel=2, - ) return trio.run(_main) - - -def run_daemon( - enable_modules: list[str], - **kwargs -) -> None: - ''' - Spawn daemon actor which will respond to RPC. - - This is a convenience wrapper around - ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned - is meant to run forever responding to RPC requests. - - ''' - kwargs['enable_modules'] = list(enable_modules) - - for path in enable_modules: - importlib.import_module(path) - - return run(partial(trio.sleep, float('inf')), **kwargs) diff --git a/tractor/_actor.py b/tractor/_runtime.py similarity index 70% rename from tractor/_actor.py rename to tractor/_runtime.py index f3fb0d9..4634375 100644 --- a/tractor/_actor.py +++ b/tractor/_runtime.py @@ -391,7 +391,7 @@ class Actor: is_arbiter: bool = False msg_buffer_size: int = 2**6 - # nursery placeholders filled in by `_async_main()` after fork + # nursery placeholders filled in by `async_main()` after fork _root_n: Optional[trio.Nursery] = None _service_n: Optional[trio.Nursery] = None _server_n: Optional[trio.Nursery] = None @@ -416,9 +416,11 @@ class Actor: arbiter_addr: Optional[tuple[str, int]] = None, spawn_method: Optional[str] = None ) -> None: - """This constructor is called in the parent actor **before** the spawning + ''' + This constructor is called in the parent actor **before** the spawning phase (aka before a new process is executed). - """ + + ''' self.name = name self.uid = (name, uid or str(uuid.uuid4())) @@ -439,9 +441,6 @@ class Actor: self.enable_modules = mods self._mods: dict[str, ModuleType] = {} - - # TODO: consider making this a dynamically defined - # @dataclass once we get py3.7 self.loglevel = loglevel self._arb_addr = ( @@ -482,9 +481,11 @@ class Actor: async def wait_for_peer( self, uid: tuple[str, str] ) -> tuple[trio.Event, Channel]: - """Wait for a connection back from a spawned actor with a given + ''' + Wait for a connection back from a spawned actor with a given ``uid``. - """ + + ''' log.runtime(f"Waiting for peer {uid} to connect") event = self._peer_connected.setdefault(uid, trio.Event()) await event.wait() @@ -492,12 +493,14 @@ class Actor: return event, self._peers[uid][-1] def load_modules(self) -> None: - """Load allowed RPC modules locally (after fork). + ''' + Load allowed RPC modules locally (after fork). Since this actor may be spawned on a different machine from the original nursery we need to try and load the local module code (if it exists). - """ + + ''' try: if self._spawn_method == 'trio': parent_data = self._parent_main_data @@ -614,7 +617,7 @@ class Actor: # Begin channel management - respond to remote requests and # process received reponses. try: - disconnected = await self._process_messages(chan) + disconnected = await process_messages(self, chan) except ( trio.Cancelled, @@ -884,227 +887,6 @@ class Actor: ctx._remote_func_type = functype return ctx - async def _process_messages( - self, - chan: Channel, - shield: bool = False, - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - - ) -> bool: - ''' - Process messages for the channel async-RPC style. - - Receive multiplexed RPC requests and deliver responses over ``chan``. - - ''' - # TODO: once https://github.com/python-trio/trio/issues/467 gets - # worked out we'll likely want to use that! - msg = None - nursery_cancelled_before_task: bool = False - - log.runtime(f"Entering msg loop for {chan} from {chan.uid}") - try: - with trio.CancelScope(shield=shield) as loop_cs: - # this internal scope allows for keeping this message - # loop running despite the current task having been - # cancelled (eg. `open_portal()` may call this method from - # a locally spawned task) and recieve this scope using - # ``scope = Nursery.start()`` - task_status.started(loop_cs) - async for msg in chan: - - if msg is None: # loop terminate sentinel - - log.cancel( - f"Channel to {chan.uid} terminated?\n" - "Cancelling all associated tasks..") - - for (channel, cid) in self._rpc_tasks.copy(): - if channel is chan: - await self._cancel_task(cid, channel) - - log.runtime( - f"Msg loop signalled to terminate for" - f" {chan} from {chan.uid}") - - break - - log.transport( # type: ignore - f"Received msg {msg} from {chan.uid}") - - cid = msg.get('cid') - if cid: - # deliver response to local caller/waiter - await self._push_result(chan, cid, msg) - - log.runtime( - f"Waiting on next msg for {chan} from {chan.uid}") - continue - - # process command request - try: - ns, funcname, kwargs, actorid, cid = msg['cmd'] - except KeyError: - # This is the non-rpc error case, that is, an - # error **not** raised inside a call to ``_invoke()`` - # (i.e. no cid was provided in the msg - see above). - # Push this error to all local channel consumers - # (normally portals) by marking the channel as errored - assert chan.uid - exc = unpack_error(msg, chan=chan) - chan._exc = exc - raise exc - - log.runtime( - f"Processing request from {actorid}\n" - f"{ns}.{funcname}({kwargs})") - - if ns == 'self': - func = getattr(self, funcname) - - if funcname == 'cancel': - - # don't start entire actor runtime - # cancellation if this actor is in debug - # mode - pdb_complete = _debug.Lock.local_pdb_complete - if pdb_complete: - await pdb_complete.wait() - - # we immediately start the runtime machinery - # shutdown - with trio.CancelScope(shield=True): - # self.cancel() was called so kill this - # msg loop and break out into - # ``_async_main()`` - log.cancel( - f"Actor {self.uid} was remotely cancelled " - f"by {chan.uid}" - ) - await _invoke( - self, cid, chan, func, kwargs, is_rpc=False - ) - - loop_cs.cancel() - break - - if funcname == '_cancel_task': - - # we immediately start the runtime machinery - # shutdown - with trio.CancelScope(shield=True): - # self.cancel() was called so kill this - # msg loop and break out into - # ``_async_main()`` - kwargs['chan'] = chan - log.cancel( - f'Remote request to cancel task\n' - f'remote actor: {chan.uid}\n' - f'task: {cid}' - ) - try: - await _invoke( - self, - cid, - chan, - func, - kwargs, - is_rpc=False, - ) - except BaseException: - log.exception("failed to cancel task?") - - continue - else: - # complain to client about restricted modules - try: - func = self._get_rpc_func(ns, funcname) - except (ModuleNotExposed, AttributeError) as err: - err_msg = pack_error(err) - err_msg['cid'] = cid - await chan.send(err_msg) - continue - - # spin up a task for the requested function - log.runtime(f"Spawning task for {func}") - assert self._service_n - try: - cs = await self._service_n.start( - partial(_invoke, self, cid, chan, func, kwargs), - name=funcname, - ) - except (RuntimeError, trio.MultiError): - # avoid reporting a benign race condition - # during actor runtime teardown. - nursery_cancelled_before_task = True - break - - # never allow cancelling cancel requests (results in - # deadlock and other weird behaviour) - # if func != self.cancel: - if isinstance(cs, Exception): - log.warning( - f"Task for RPC func {func} failed with" - f"{cs}") - else: - # mark that we have ongoing rpc tasks - self._ongoing_rpc_tasks = trio.Event() - log.runtime(f"RPC func is {func}") - # store cancel scope such that the rpc task can be - # cancelled gracefully if requested - self._rpc_tasks[(chan, cid)] = ( - cs, func, trio.Event()) - - log.runtime( - f"Waiting on next msg for {chan} from {chan.uid}") - - # end of async for, channel disconnect vis - # ``trio.EndOfChannel`` - log.runtime( - f"{chan} for {chan.uid} disconnected, cancelling tasks" - ) - await self.cancel_rpc_tasks(chan) - - except ( - TransportClosed, - ): - # channels "breaking" (for TCP streams by EOF or 104 - # connection-reset) is ok since we don't have a teardown - # handshake for them (yet) and instead we simply bail out of - # the message loop and expect the teardown sequence to clean - # up. - log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') - - # transport **was** disconnected - return True - - except (Exception, trio.MultiError) as err: - if nursery_cancelled_before_task: - sn = self._service_n - assert sn and sn.cancel_scope.cancel_called - log.cancel( - f'Service nursery cancelled before it handled {funcname}' - ) - else: - # ship any "internal" exception (i.e. one from internal - # machinery not from an rpc task) to parent - log.exception("Actor errored:") - if self._parent_chan: - await try_ship_error_to_parent(self._parent_chan, err) - - # if this is the `MainProcess` we expect the error broadcasting - # above to trigger an error at consuming portal "checkpoints" - raise - - finally: - # msg debugging for when he machinery is brokey - log.runtime( - f"Exiting msg loop for {chan} from {chan.uid} " - f"with last msg:\n{msg}") - - # transport **was not** disconnected - return False - async def _from_parent( self, parent_addr: Optional[tuple[str, int]], @@ -1161,198 +943,6 @@ class Actor: await self.cancel() raise - async def _async_main( - self, - accept_addr: Optional[tuple[str, int]] = None, - - # XXX: currently ``parent_addr`` is only needed for the - # ``multiprocessing`` backend (which pickles state sent to - # the child instead of relaying it over the connect-back - # channel). Once that backend is removed we can likely just - # change this to a simple ``is_subactor: bool`` which will - # be False when running as root actor and True when as - # a subactor. - parent_addr: Optional[tuple[str, int]] = None, - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, - - ) -> None: - """ - Start the channel server, maybe connect back to the parent, and - start the main task. - - A "root-most" (or "top-level") nursery for this actor is opened here - and when cancelled effectively cancels the actor. - - """ - registered_with_arbiter = False - try: - - # establish primary connection with immediate parent - self._parent_chan = None - if parent_addr is not None: - - self._parent_chan, accept_addr_rent = await self._from_parent( - parent_addr) - - # either it's passed in because we're not a child - # or because we're running in mp mode - if accept_addr_rent is not None: - accept_addr = accept_addr_rent - - # load exposed/allowed RPC modules - # XXX: do this **after** establishing a channel to the parent - # but **before** starting the message loop for that channel - # such that import errors are properly propagated upwards - self.load_modules() - - # The "root" nursery ensures the channel with the immediate - # parent is kept alive as a resilient service until - # cancellation steps have (mostly) occurred in - # a deterministic way. - async with trio.open_nursery() as root_nursery: - self._root_n = root_nursery - assert self._root_n - - async with trio.open_nursery() as service_nursery: - # This nursery is used to handle all inbound - # connections to us such that if the TCP server - # is killed, connections can continue to process - # in the background until this nursery is cancelled. - self._service_n = service_nursery - assert self._service_n - - # Startup up the channel server with, - # - subactor: the bind address is sent by our parent - # over our established channel - # - root actor: the ``accept_addr`` passed to this method - assert accept_addr - host, port = accept_addr - - self._server_n = await service_nursery.start( - partial( - self._serve_forever, - service_nursery, - accept_host=host, - accept_port=port - ) - ) - accept_addr = self.accept_addr - if _state._runtime_vars['_is_root']: - _state._runtime_vars['_root_mailbox'] = accept_addr - - # Register with the arbiter if we're told its addr - log.runtime(f"Registering {self} for role `{self.name}`") - assert isinstance(self._arb_addr, tuple) - - async with get_arbiter(*self._arb_addr) as arb_portal: - await arb_portal.run_from_ns( - 'self', - 'register_actor', - uid=self.uid, - sockaddr=accept_addr, - ) - - registered_with_arbiter = True - - # init steps complete - task_status.started() - - # Begin handling our new connection back to our - # parent. This is done last since we don't want to - # start processing parent requests until our channel - # server is 100% up and running. - if self._parent_chan: - await root_nursery.start( - partial( - self._process_messages, - self._parent_chan, - shield=True, - ) - ) - log.runtime("Waiting on service nursery to complete") - log.runtime("Service nursery complete") - log.runtime("Waiting on root nursery to complete") - - # Blocks here as expected until the root nursery is - # killed (i.e. this actor is cancelled or signalled by the parent) - except Exception as err: - log.info("Closing all actor lifetime contexts") - _lifetime_stack.close() - - if not registered_with_arbiter: - # TODO: I guess we could try to connect back - # to the parent through a channel and engage a debugger - # once we have that all working with std streams locking? - log.exception( - f"Actor errored and failed to register with arbiter " - f"@ {self._arb_addr}?") - log.error( - "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" - "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" - "\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n" - "\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n" - ) - - if self._parent_chan: - await try_ship_error_to_parent(self._parent_chan, err) - - # always! - log.exception("Actor errored:") - raise - - finally: - log.info("Runtime nursery complete") - - # tear down all lifetime contexts if not in guest mode - # XXX: should this just be in the entrypoint? - log.info("Closing all actor lifetime contexts") - - # TODO: we can't actually do this bc the debugger - # uses the _service_n to spawn the lock task, BUT, - # in theory if we had the root nursery surround this finally - # block it might be actually possible to debug THIS - # machinery in the same way as user task code? - # if self.name == 'brokerd.ib': - # with trio.CancelScope(shield=True): - # await _debug.breakpoint() - - _lifetime_stack.close() - - # Unregister actor from the arbiter - if registered_with_arbiter and ( - self._arb_addr is not None - ): - failed = False - with trio.move_on_after(0.5) as cs: - cs.shield = True - try: - async with get_arbiter(*self._arb_addr) as arb_portal: - await arb_portal.run_from_ns( - 'self', - 'unregister_actor', - uid=self.uid - ) - except OSError: - failed = True - if cs.cancelled_caught: - failed = True - if failed: - log.warning( - f"Failed to unregister {self.name} from arbiter") - - # Ensure all peers (actors connected to us as clients) are finished - if not self._no_more_peers.is_set(): - if any( - chan.connected() for chan in chain(*self._peers.values()) - ): - log.runtime( - f"Waiting for remaining peers {self._peers} to clear") - with trio.CancelScope(shield=True): - await self._no_more_peers.wait() - log.runtime("All peer channels are complete") - - log.runtime("Runtime completed") - async def _serve_forever( self, handler_nursery: trio.Nursery, @@ -1362,11 +952,13 @@ class Actor: accept_port: int = 0, task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: - """Start the channel server, begin listening for new connections. + ''' + Start the channel server, begin listening for new connections. This will cause an actor to continue living (blocking) until ``cancel_server()`` is called. - """ + + ''' self._server_down = trio.Event() try: async with trio.open_nursery() as server_n: @@ -1391,16 +983,19 @@ class Actor: self._server_down.set() def cancel_soon(self) -> None: - """Cancel this actor asap; can be called from a sync context. + ''' + Cancel this actor asap; can be called from a sync context. Schedules `.cancel()` to be run immediately just like when cancelled by the parent. - """ + + ''' assert self._service_n self._service_n.start_soon(self.cancel) async def cancel(self) -> bool: - """Cancel this actor's runtime. + ''' + Cancel this actor's runtime. The "deterministic" teardown sequence in order is: - cancel all ongoing rpc tasks by cancel scope @@ -1409,7 +1004,8 @@ class Actor: - cancel the "service" nursery reponsible for spawning new rpc tasks - return control the parent channel message loop - """ + + ''' log.cancel(f"{self.uid} is trying to cancel") self._cancel_called = True @@ -1495,9 +1091,11 @@ class Actor: self, only_chan: Optional[Channel] = None, ) -> None: - """Cancel all existing RPC responder tasks using the cancel scope + ''' + Cancel all existing RPC responder tasks using the cancel scope registered for each. - """ + + ''' tasks = self._rpc_tasks if tasks: log.cancel(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ") @@ -1518,27 +1116,37 @@ class Actor: await self._ongoing_rpc_tasks.wait() def cancel_server(self) -> None: - """Cancel the internal channel server nursery thereby + ''' + Cancel the internal channel server nursery thereby preventing any new inbound connections from being established. - """ + + ''' if self._server_n: log.runtime("Shutting down channel server") self._server_n.cancel_scope.cancel() @property def accept_addr(self) -> Optional[tuple[str, int]]: - """Primary address to which the channel server is bound. - """ + ''' + Primary address to which the channel server is bound. + + ''' # throws OSError on failure return self._listeners[0].socket.getsockname() # type: ignore def get_parent(self) -> Portal: - """Return a portal to our parent actor.""" + ''' + Return a portal to our parent actor. + + ''' assert self._parent_chan, "No parent channel for this actor?" return Portal(self._parent_chan) def get_chans(self, uid: tuple[str, str]) -> list[Channel]: - """Return all channels to the actor with provided uid.""" + ''' + Return all channels to the actor with provided uid. + + ''' return self._peers[uid] async def _do_handshake( @@ -1546,11 +1154,13 @@ class Actor: chan: Channel ) -> tuple[str, str]: - """Exchange (name, UUIDs) identifiers as the first communication step. + ''' + Exchange (name, UUIDs) identifiers as the first communication step. These are essentially the "mailbox addresses" found in actor model parlance. - """ + + ''' await chan.send(self.uid) value = await chan.recv() uid: tuple[str, str] = (str(value[0]), str(value[1])) @@ -1566,6 +1176,423 @@ class Actor: return self._infected_aio +async def async_main( + actor: Actor, + accept_addr: Optional[tuple[str, int]] = None, + + # XXX: currently ``parent_addr`` is only needed for the + # ``multiprocessing`` backend (which pickles state sent to + # the child instead of relaying it over the connect-back + # channel). Once that backend is removed we can likely just + # change this to a simple ``is_subactor: bool`` which will + # be False when running as root actor and True when as + # a subactor. + parent_addr: Optional[tuple[str, int]] = None, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Actor runtime entrypoint; start the IPC channel server, maybe connect + back to the parent, and startup all core machinery tasks. + + A "root-most" (or "top-level") nursery for this actor is opened here + and when cancelled effectively cancels the actor. + + ''' + registered_with_arbiter = False + try: + + # establish primary connection with immediate parent + actor._parent_chan = None + if parent_addr is not None: + + actor._parent_chan, accept_addr_rent = await actor._from_parent( + parent_addr) + + # either it's passed in because we're not a child + # or because we're running in mp mode + if accept_addr_rent is not None: + accept_addr = accept_addr_rent + + # load exposed/allowed RPC modules + # XXX: do this **after** establishing a channel to the parent + # but **before** starting the message loop for that channel + # such that import errors are properly propagated upwards + actor.load_modules() + + # The "root" nursery ensures the channel with the immediate + # parent is kept alive as a resilient service until + # cancellation steps have (mostly) occurred in + # a deterministic way. + async with trio.open_nursery() as root_nursery: + actor._root_n = root_nursery + assert actor._root_n + + async with trio.open_nursery() as service_nursery: + # This nursery is used to handle all inbound + # connections to us such that if the TCP server + # is killed, connections can continue to process + # in the background until this nursery is cancelled. + actor._service_n = service_nursery + assert actor._service_n + + # Startup up the channel server with, + # - subactor: the bind address is sent by our parent + # over our established channel + # - root actor: the ``accept_addr`` passed to this method + assert accept_addr + host, port = accept_addr + + actor._server_n = await service_nursery.start( + partial( + actor._serve_forever, + service_nursery, + accept_host=host, + accept_port=port + ) + ) + accept_addr = actor.accept_addr + if _state._runtime_vars['_is_root']: + _state._runtime_vars['_root_mailbox'] = accept_addr + + # Register with the arbiter if we're told its addr + log.runtime(f"Registering {actor} for role `{actor.name}`") + assert isinstance(actor._arb_addr, tuple) + + async with get_arbiter(*actor._arb_addr) as arb_portal: + await arb_portal.run_from_ns( + 'self', + 'register_actor', + uid=actor.uid, + sockaddr=accept_addr, + ) + + registered_with_arbiter = True + + # init steps complete + task_status.started() + + # Begin handling our new connection back to our + # parent. This is done last since we don't want to + # start processing parent requests until our channel + # server is 100% up and running. + if actor._parent_chan: + await root_nursery.start( + partial( + process_messages, + actor, + actor._parent_chan, + shield=True, + ) + ) + log.runtime("Waiting on service nursery to complete") + log.runtime("Service nursery complete") + log.runtime("Waiting on root nursery to complete") + + # Blocks here as expected until the root nursery is + # killed (i.e. this actor is cancelled or signalled by the parent) + except Exception as err: + log.info("Closing all actor lifetime contexts") + _lifetime_stack.close() + + if not registered_with_arbiter: + # TODO: I guess we could try to connect back + # to the parent through a channel and engage a debugger + # once we have that all working with std streams locking? + log.exception( + f"Actor errored and failed to register with arbiter " + f"@ {actor._arb_addr}?") + log.error( + "\n\n\t^^^ THIS IS PROBABLY A TRACTOR BUGGGGG!!! ^^^\n" + "\tCALMLY CALL THE AUTHORITIES AND HIDE YOUR CHILDREN.\n\n" + "\tYOUR PARENT CODE IS GOING TO KEEP WORKING FINE!!!\n" + "\tTHIS IS HOW RELIABlE SYSTEMS ARE SUPPOSED TO WORK!?!?\n" + ) + + if actor._parent_chan: + await try_ship_error_to_parent(actor._parent_chan, err) + + # always! + log.exception("Actor errored:") + raise + + finally: + log.info("Runtime nursery complete") + + # tear down all lifetime contexts if not in guest mode + # XXX: should this just be in the entrypoint? + log.info("Closing all actor lifetime contexts") + + # TODO: we can't actually do this bc the debugger + # uses the _service_n to spawn the lock task, BUT, + # in theory if we had the root nursery surround this finally + # block it might be actually possible to debug THIS + # machinery in the same way as user task code? + # if actor.name == 'brokerd.ib': + # with trio.CancelScope(shield=True): + # await _debug.breakpoint() + + _lifetime_stack.close() + + # Unregister actor from the arbiter + if registered_with_arbiter and ( + actor._arb_addr is not None + ): + failed = False + with trio.move_on_after(0.5) as cs: + cs.shield = True + try: + async with get_arbiter(*actor._arb_addr) as arb_portal: + await arb_portal.run_from_ns( + 'self', + 'unregister_actor', + uid=actor.uid + ) + except OSError: + failed = True + if cs.cancelled_caught: + failed = True + if failed: + log.warning( + f"Failed to unregister {actor.name} from arbiter") + + # Ensure all peers (actors connected to us as clients) are finished + if not actor._no_more_peers.is_set(): + if any( + chan.connected() for chan in chain(*actor._peers.values()) + ): + log.runtime( + f"Waiting for remaining peers {actor._peers} to clear") + with trio.CancelScope(shield=True): + await actor._no_more_peers.wait() + log.runtime("All peer channels are complete") + + log.runtime("Runtime completed") + + +async def process_messages( + actor: Actor, + chan: Channel, + shield: bool = False, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> bool: + ''' + Process messages for the IPC transport channel async-RPC style. + + Receive multiplexed RPC requests, spawn handler tasks and deliver + responses over or boxed errors back to the "caller" task. + + ''' + # TODO: once https://github.com/python-trio/trio/issues/467 gets + # worked out we'll likely want to use that! + msg = None + nursery_cancelled_before_task: bool = False + + log.runtime(f"Entering msg loop for {chan} from {chan.uid}") + try: + with trio.CancelScope(shield=shield) as loop_cs: + # this internal scope allows for keeping this message + # loop running despite the current task having been + # cancelled (eg. `open_portal()` may call this method from + # a locally spawned task) and recieve this scope using + # ``scope = Nursery.start()`` + task_status.started(loop_cs) + async for msg in chan: + + if msg is None: # loop terminate sentinel + + log.cancel( + f"Channel to {chan.uid} terminated?\n" + "Cancelling all associated tasks..") + + for (channel, cid) in actor._rpc_tasks.copy(): + if channel is chan: + await actor._cancel_task(cid, channel) + + log.runtime( + f"Msg loop signalled to terminate for" + f" {chan} from {chan.uid}") + + break + + log.transport( # type: ignore + f"Received msg {msg} from {chan.uid}") + + cid = msg.get('cid') + if cid: + # deliver response to local caller/waiter + await actor._push_result(chan, cid, msg) + + log.runtime( + f"Waiting on next msg for {chan} from {chan.uid}") + continue + + # process command request + try: + ns, funcname, kwargs, actorid, cid = msg['cmd'] + except KeyError: + # This is the non-rpc error case, that is, an + # error **not** raised inside a call to ``_invoke()`` + # (i.e. no cid was provided in the msg - see above). + # Push this error to all local channel consumers + # (normally portals) by marking the channel as errored + assert chan.uid + exc = unpack_error(msg, chan=chan) + chan._exc = exc + raise exc + + log.runtime( + f"Processing request from {actorid}\n" + f"{ns}.{funcname}({kwargs})") + + if ns == 'self': + func = getattr(actor, funcname) + + if funcname == 'cancel': + + # don't start entire actor runtime + # cancellation if this actor is in debug + # mode + pdb_complete = _debug.Lock.local_pdb_complete + if pdb_complete: + await pdb_complete.wait() + + # we immediately start the runtime machinery + # shutdown + with trio.CancelScope(shield=True): + # actor.cancel() was called so kill this + # msg loop and break out into + # ``async_main()`` + log.cancel( + f"Actor {actor.uid} was remotely cancelled " + f"by {chan.uid}" + ) + await _invoke( + actor, cid, chan, func, kwargs, is_rpc=False + ) + + loop_cs.cancel() + break + + if funcname == '_cancel_task': + + # we immediately start the runtime machinery + # shutdown + with trio.CancelScope(shield=True): + # actor.cancel() was called so kill this + # msg loop and break out into + # ``async_main()`` + kwargs['chan'] = chan + log.cancel( + f'Remote request to cancel task\n' + f'remote actor: {chan.uid}\n' + f'task: {cid}' + ) + try: + await _invoke( + actor, + cid, + chan, + func, + kwargs, + is_rpc=False, + ) + except BaseException: + log.exception("failed to cancel task?") + + continue + else: + # complain to client about restricted modules + try: + func = actor._get_rpc_func(ns, funcname) + except (ModuleNotExposed, AttributeError) as err: + err_msg = pack_error(err) + err_msg['cid'] = cid + await chan.send(err_msg) + continue + + # spin up a task for the requested function + log.runtime(f"Spawning task for {func}") + assert actor._service_n + try: + cs = await actor._service_n.start( + partial(_invoke, actor, cid, chan, func, kwargs), + name=funcname, + ) + except (RuntimeError, trio.MultiError): + # avoid reporting a benign race condition + # during actor runtime teardown. + nursery_cancelled_before_task = True + break + + # never allow cancelling cancel requests (results in + # deadlock and other weird behaviour) + # if func != actor.cancel: + if isinstance(cs, Exception): + log.warning( + f"Task for RPC func {func} failed with" + f"{cs}") + else: + # mark that we have ongoing rpc tasks + actor._ongoing_rpc_tasks = trio.Event() + log.runtime(f"RPC func is {func}") + # store cancel scope such that the rpc task can be + # cancelled gracefully if requested + actor._rpc_tasks[(chan, cid)] = ( + cs, func, trio.Event()) + + log.runtime( + f"Waiting on next msg for {chan} from {chan.uid}") + + # end of async for, channel disconnect vis + # ``trio.EndOfChannel`` + log.runtime( + f"{chan} for {chan.uid} disconnected, cancelling tasks" + ) + await actor.cancel_rpc_tasks(chan) + + except ( + TransportClosed, + ): + # channels "breaking" (for TCP streams by EOF or 104 + # connection-reset) is ok since we don't have a teardown + # handshake for them (yet) and instead we simply bail out of + # the message loop and expect the teardown sequence to clean + # up. + log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') + + # transport **was** disconnected + return True + + except (Exception, trio.MultiError) as err: + if nursery_cancelled_before_task: + sn = actor._service_n + assert sn and sn.cancel_scope.cancel_called + log.cancel( + f'Service nursery cancelled before it handled {funcname}' + ) + else: + # ship any "internal" exception (i.e. one from internal + # machinery not from an rpc task) to parent + log.exception("Actor errored:") + if actor._parent_chan: + await try_ship_error_to_parent(actor._parent_chan, err) + + # if this is the `MainProcess` we expect the error broadcasting + # above to trigger an error at consuming portal "checkpoints" + raise + + finally: + # msg debugging for when he machinery is brokey + log.runtime( + f"Exiting msg loop for {chan} from {chan.uid} " + f"with last msg:\n{msg}") + + # transport **was not** disconnected + return False + + class Arbiter(Actor): ''' A special actor who knows all the other actors and always has diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 4230bfb..78b3ba8 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -42,7 +42,7 @@ from ._state import ( from .log import get_logger from ._portal import Portal -from ._actor import Actor +from ._runtime import Actor from ._entry import _mp_main from ._exceptions import ActorFailure diff --git a/tractor/_state.py b/tractor/_state.py index 073bc99..c0c957b 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -18,7 +18,10 @@ Per process state """ -from typing import Optional, Dict, Any +from typing import ( + Optional, + Any, +) from collections.abc import Mapping import trio @@ -27,7 +30,7 @@ from ._exceptions import NoRuntime _current_actor: Optional['Actor'] = None # type: ignore # noqa -_runtime_vars: Dict[str, Any] = { +_runtime_vars: dict[str, Any] = { '_debug_mode': False, '_is_root': False, '_root_mailbox': (None, None) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 34bc0a1..4500ec0 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -23,8 +23,10 @@ import inspect from contextlib import asynccontextmanager from dataclasses import dataclass from typing import ( - Any, Optional, Callable, - AsyncGenerator, Dict, + Any, + Optional, + Callable, + AsyncGenerator, AsyncIterator ) @@ -393,7 +395,7 @@ class Context: async def _maybe_raise_from_remote_msg( self, - msg: Dict[str, Any], + msg: dict[str, Any], ) -> None: ''' diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 958d445..06ee38d 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -20,7 +20,10 @@ """ from functools import partial import inspect -from typing import Tuple, List, Dict, Optional, TYPE_CHECKING +from typing import ( + Optional, + TYPE_CHECKING, +) import typing import warnings @@ -30,7 +33,7 @@ from async_generator import asynccontextmanager from ._debug import maybe_wait_for_debugger from ._state import current_actor, is_main_process from .log import get_logger, get_loglevel -from ._actor import Actor +from ._runtime import Actor from ._portal import Portal from ._exceptions import is_multi_cancelled from ._root import open_root_actor @@ -43,7 +46,7 @@ if TYPE_CHECKING: log = get_logger(__name__) -_default_bind_addr: Tuple[str, int] = ('127.0.0.1', 0) +_default_bind_addr: tuple[str, int] = ('127.0.0.1', 0) class ActorNursery: @@ -79,15 +82,15 @@ class ActorNursery: actor: Actor, ria_nursery: trio.Nursery, da_nursery: trio.Nursery, - errors: Dict[Tuple[str, str], Exception], + errors: dict[tuple[str, str], Exception], ) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor self._ria_nursery = ria_nursery self._da_nursery = da_nursery - self._children: Dict[ - Tuple[str, str], - Tuple[Actor, mp.Process, Optional[Portal]] + self._children: dict[ + tuple[str, str], + tuple[Actor, mp.Process, Optional[Portal]] ] = {} # portals spawned with ``run_in_actor()`` are # cancelled when their "main" result arrives @@ -102,9 +105,9 @@ class ActorNursery: self, name: str, *, - bind_addr: Tuple[str, int] = _default_bind_addr, - rpc_module_paths: List[str] = None, - enable_modules: List[str] = None, + bind_addr: tuple[str, int] = _default_bind_addr, + rpc_module_paths: list[str] = None, + enable_modules: list[str] = None, loglevel: str = None, # set log level per subactor nursery: trio.Nursery = None, debug_mode: Optional[bool] = None, @@ -173,9 +176,9 @@ class ActorNursery: *, name: Optional[str] = None, - bind_addr: Tuple[str, int] = _default_bind_addr, - rpc_module_paths: Optional[List[str]] = None, - enable_modules: List[str] = None, + bind_addr: tuple[str, int] = _default_bind_addr, + rpc_module_paths: Optional[list[str]] = None, + enable_modules: list[str] = None, loglevel: str = None, # set log level per subactor infect_asyncio: bool = False, @@ -293,7 +296,7 @@ async def _open_and_supervise_one_cancels_all_nursery( ) -> typing.AsyncGenerator[ActorNursery, None]: # the collection of errors retreived from spawned sub-actors - errors: Dict[Tuple[str, str], Exception] = {} + errors: dict[tuple[str, str], Exception] = {} # This is the outermost level "deamon actor" nursery. It is awaited # **after** the below inner "run in actor nursery". This allows for diff --git a/tractor/experimental/_pubsub.py b/tractor/experimental/_pubsub.py index 7a8ec37..0481f77 100644 --- a/tractor/experimental/_pubsub.py +++ b/tractor/experimental/_pubsub.py @@ -26,7 +26,10 @@ support provided by ``tractor.Context.open_stream()`` and friends. from __future__ import annotations import inspect import typing -from typing import Dict, Any, Set, Callable, List, Tuple +from typing import ( + Any, + Callable, +) from functools import partial from async_generator import aclosing @@ -44,7 +47,7 @@ log = get_logger('messaging') async def fan_out_to_ctxs( pub_async_gen_func: typing.Callable, # it's an async gen ... gd mypy - topics2ctxs: Dict[str, list], + topics2ctxs: dict[str, list], packetizer: typing.Callable = None, ) -> None: ''' @@ -61,7 +64,7 @@ async def fan_out_to_ctxs( async for published in pub_gen: - ctx_payloads: List[Tuple[Context, Any]] = [] + ctx_payloads: list[tuple[Context, Any]] = [] for topic, data in published.items(): log.debug(f"publishing {topic, data}") @@ -103,8 +106,8 @@ async def fan_out_to_ctxs( def modify_subs( - topics2ctxs: Dict[str, List[Context]], - topics: Set[str], + topics2ctxs: dict[str, list[Context]], + topics: set[str], ctx: Context, ) -> None: @@ -136,20 +139,20 @@ def modify_subs( topics2ctxs.pop(topic) -_pub_state: Dict[str, dict] = {} -_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {} +_pub_state: dict[str, dict] = {} +_pubtask2lock: dict[str, trio.StrictFIFOLock] = {} def pub( wrapped: typing.Callable = None, *, - tasks: Set[str] = set(), + tasks: set[str] = set(), ): """Publisher async generator decorator. A publisher can be called multiple times from different actors but will only spawn a finite set of internal tasks to stream values to - each caller. The ``tasks: Set[str]`` argument to the decorator + each caller. The ``tasks: set[str]`` argument to the decorator specifies the names of the mutex set of publisher tasks. When the publisher function is called, an argument ``task_name`` must be passed to specify which task (of the set named in ``tasks``) should @@ -158,9 +161,9 @@ def pub( necessary. Values yielded from the decorated async generator must be - ``Dict[str, Dict[str, Any]]`` where the fist level key is the topic + ``dict[str, dict[str, Any]]`` where the fist level key is the topic string and determines which subscription the packet will be - delivered to and the value is a packet ``Dict[str, Any]`` by default + delivered to and the value is a packet ``dict[str, Any]`` by default of the form: .. ::python @@ -186,7 +189,7 @@ def pub( The publisher must be called passing in the following arguments: - - ``topics: Set[str]`` the topic sequence or "subscriptions" + - ``topics: set[str]`` the topic sequence or "subscriptions" - ``task_name: str`` the task to use (if ``tasks`` was passed) - ``ctx: Context`` the tractor context (only needed if calling the pub func without a nursery, otherwise this is provided implicitly) @@ -231,7 +234,7 @@ def pub( if wrapped is None: return partial(pub, tasks=tasks) - task2lock: Dict[str, trio.StrictFIFOLock] = {} + task2lock: dict[str, trio.StrictFIFOLock] = {} for name in tasks: task2lock[name] = trio.StrictFIFOLock() @@ -243,7 +246,7 @@ def pub( # `wrapt` docs async def _execute( ctx: Context, - topics: Set[str], + topics: set[str], *args, # *, task_name: str = None, # default: only one task allocated diff --git a/tractor/msg.py b/tractor/msg.py index 138718b..9af3ccd 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -24,7 +24,7 @@ Built-in messaging patterns, types, APIs and helpers. # ``pkgutil.resolve_name()`` internally uses # ``importlib.import_module()`` which can be filtered by inserting # a ``MetaPathFinder`` into ``sys.meta_path`` (which we could do before -# entering the ``Actor._process_messages()`` loop). +# entering the ``_runtime.process_messages()`` loop). # - https://github.com/python/cpython/blob/main/Lib/pkgutil.py#L645 # - https://stackoverflow.com/questions/1350466/preventing-python-code-from-importing-certain-modules # - https://stackoverflow.com/a/63320902 diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index ab4b735..5473a04 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -102,6 +102,8 @@ async def gather_contexts( # deliver control once all managers have started up await all_entered.wait() + # NOTE: order *should* be preserved in the output values + # since ``dict``s are now implicitly ordered. yield tuple(unwrapped.values()) # we don't need a try/finally since cancellation will be triggered