diff --git a/tractor/__init__.py b/tractor/__init__.py index f9a1730..7e6f800 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -2,181 +2,38 @@ tractor: An actor model micro-framework built on ``trio`` and ``multiprocessing``. """ -import importlib -from functools import partial -from typing import Tuple, Any, Optional, List -import typing - -import trio # type: ignore from trio import MultiError -from . import log -from ._ipc import _connect_chan, Channel +from ._ipc import Channel from ._streaming import Context, stream from ._discovery import get_arbiter, find_actor, wait_for_actor -from ._actor import Actor, _start_actor, Arbiter from ._trionics import open_nursery from ._state import current_actor, is_root_process -from . import _state from ._exceptions import RemoteActorError, ModuleNotExposed from ._debug import breakpoint, post_mortem -from . import _spawn from . import msg +from ._root import run, run_daemon, open_root_actor __all__ = [ + 'Channel', + 'Context', + 'ModuleNotExposed', + 'MultiError', + 'RemoteActorError', 'breakpoint', - 'post_mortem', 'current_actor', 'find_actor', 'get_arbiter', + 'is_root_process', + 'msg', 'open_nursery', - 'wait_for_actor', - 'Channel', - 'Context', + 'open_root_actor', + 'post_mortem', + 'run', + 'run_daemon', 'stream', - 'MultiError', - 'RemoteActorError', - 'ModuleNotExposed', - 'msg' + 'wait_for_actor', + 'to_asyncio', + 'wait_for_actor', ] - - -# set at startup and after forks -_default_arbiter_host = '127.0.0.1' -_default_arbiter_port = 1616 - - -async def _main( - async_fn: typing.Callable[..., typing.Awaitable], - args: Tuple, - arbiter_addr: Tuple[str, int], - name: Optional[str] = None, - start_method: Optional[str] = None, - debug_mode: bool = False, - **kwargs, -) -> typing.Any: - """Async entry point for ``tractor``. - """ - logger = log.get_logger('tractor') - - # mark top most level process as root actor - _state._runtime_vars['_is_root'] = True - - if start_method is not None: - _spawn.try_set_start_method(start_method) - - if debug_mode and _spawn._spawn_method == 'trio': - _state._runtime_vars['_debug_mode'] = True - - # expose internal debug module to every actor allowing - # for use of ``await tractor.breakpoint()`` - kwargs.setdefault('rpc_module_paths', []).append('tractor._debug') - - elif debug_mode: - raise RuntimeError( - "Debug mode is only supported for the `trio` backend!" - ) - - main = partial(async_fn, *args) - - arbiter_addr = (host, port) = arbiter_addr or ( - _default_arbiter_host, - _default_arbiter_port - ) - - loglevel = kwargs.get('loglevel', log.get_loglevel()) - if loglevel is not None: - log._default_loglevel = loglevel - log.get_console_log(loglevel) - - # make a temporary connection to see if an arbiter exists - arbiter_found = False - try: - async with _connect_chan(host, port): - arbiter_found = True - except OSError: - logger.warning(f"No actor could be found @ {host}:{port}") - - # create a local actor and start up its main routine/task - if arbiter_found: # we were able to connect to an arbiter - logger.info(f"Arbiter seems to exist @ {host}:{port}") - actor = Actor( - name or 'anonymous', - arbiter_addr=arbiter_addr, - **kwargs - ) - host, port = (host, 0) - else: - # start this local actor as the arbiter - actor = Arbiter( - name or 'arbiter', arbiter_addr=arbiter_addr, **kwargs) - - # ``Actor._async_main()`` creates an internal nursery if one is not - # provided and thus blocks here until it's main task completes. - # Note that if the current actor is the arbiter it is desirable - # for it to stay up indefinitely until a re-election process has - # taken place - which is not implemented yet FYI). - - try: - return await _start_actor( - actor, main, host, port, arbiter_addr=arbiter_addr - ) - finally: - logger.info("Root actor terminated") - - -def run( - async_fn: typing.Callable[..., typing.Awaitable], - *args, - name: Optional[str] = None, - arbiter_addr: Tuple[str, int] = ( - _default_arbiter_host, - _default_arbiter_port, - ), - # either the `multiprocessing` start method: - # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods - # OR `trio` (the new default). - start_method: Optional[str] = None, - debug_mode: bool = False, - **kwargs, -) -> Any: - """Run a trio-actor async function in process. - - This is tractor's main entry and the start point for any async actor. - """ - return trio.run( - partial( - # our entry - _main, - - # user entry point - async_fn, - args, - - # global kwargs - arbiter_addr=arbiter_addr, - name=name, - start_method=start_method, - debug_mode=debug_mode, - **kwargs, - ) - ) - - -def run_daemon( - rpc_module_paths: List[str], - **kwargs -) -> None: - """Spawn daemon actor which will respond to RPC. - - This is a convenience wrapper around - ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned - is meant to run forever responding to RPC requests. - """ - kwargs['rpc_module_paths'] = list(rpc_module_paths) - - for path in rpc_module_paths: - importlib.import_module(path) - - return run(partial(trio.sleep, float('inf')), **kwargs) diff --git a/tractor/_actor.py b/tractor/_actor.py index 3a53ed9..3656889 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -1076,49 +1076,3 @@ class Arbiter(Actor): def unregister_actor(self, uid: Tuple[str, str]) -> None: self._registry.pop(uid, None) - - -async def _start_actor( - actor: Actor, - main: typing.Callable[..., typing.Awaitable], - host: str, - port: int, - arbiter_addr: Tuple[str, int], - nursery: trio.Nursery = None -) -> Any: - """Spawn a local actor by starting a task to execute it's main async - function. - - Blocks if no nursery is provided, in which case it is expected the nursery - provider is responsible for waiting on the task to complete. - """ - # assign process-local actor - _state._current_actor = actor - - # start local channel-server and fake the portal API - # NOTE: this won't block since we provide the nursery - log.info(f"Starting local {actor} @ {host}:{port}") - - async with trio.open_nursery() as nursery: - await nursery.start( - partial( - actor._async_main, - accept_addr=(host, port), - parent_addr=None - ) - ) - try: - result = await main() - except (Exception, trio.MultiError) as err: - log.exception("Actor crashed:") - await _debug._maybe_enter_pm(err) - - raise - finally: - await actor.cancel() - - # unset module state - _state._current_actor = None - log.info("Completed async main") - - return result diff --git a/tractor/_entry.py b/tractor/_entry.py index 0faf486..670f03c 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -1,5 +1,5 @@ """ -Process entry points. +Sub-process entry points. """ from functools import partial from typing import Tuple, Any diff --git a/tractor/_root.py b/tractor/_root.py new file mode 100644 index 0000000..7c84af1 --- /dev/null +++ b/tractor/_root.py @@ -0,0 +1,212 @@ +""" +Root actor runtime ignition(s). +""" +from contextlib import asynccontextmanager +from functools import partial +import importlib +from typing import Tuple, Optional, List, Any +import typing +import warnings + +import trio + +from ._actor import Actor, Arbiter +from . import _debug +from . import _spawn +from . import _state +from . import log +from ._ipc import _connect_chan + + +# set at startup and after forks +_default_arbiter_host = '127.0.0.1' +_default_arbiter_port = 1616 + + +logger = log.get_logger('tractor') + + +@asynccontextmanager +async def open_root_actor( + + # defaults are above + arbiter_addr: Tuple[str, int], + + name: Optional[str] = 'root', + + # either the `multiprocessing` start method: + # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods + # OR `trio` (the new default). + start_method: Optional[str] = None, + + # enables the multi-process debugger support + debug_mode: bool = False, + + **kwargs, +) -> typing.Any: + """Async entry point for ``tractor``. + + """ + # mark top most level process as root actor + _state._runtime_vars['_is_root'] = True + + if start_method is not None: + _spawn.try_set_start_method(start_method) + + if debug_mode and _spawn._spawn_method == 'trio': + _state._runtime_vars['_debug_mode'] = True + + # expose internal debug module to every actor allowing + # for use of ``await tractor.breakpoint()`` + kwargs.setdefault('rpc_module_paths', []).append('tractor._debug') + + elif debug_mode: + raise RuntimeError( + "Debug mode is only supported for the `trio` backend!" + ) + + arbiter_addr = (host, port) = arbiter_addr or ( + _default_arbiter_host, + _default_arbiter_port + ) + + loglevel = kwargs.get('loglevel', log.get_loglevel()) + if loglevel is not None: + log._default_loglevel = loglevel + log.get_console_log(loglevel) + + # make a temporary connection to see if an arbiter exists + arbiter_found = False + + try: + async with _connect_chan(host, port): + arbiter_found = True + + except OSError: + logger.warning(f"No actor could be found @ {host}:{port}") + + # create a local actor and start up its main routine/task + if arbiter_found: + + # we were able to connect to an arbiter + logger.info(f"Arbiter seems to exist @ {host}:{port}") + + actor = Actor( + name or 'anonymous', + arbiter_addr=arbiter_addr, + **kwargs + ) + host, port = (host, 0) + + else: + # start this local actor as the arbiter + + # Note that if the current actor is the arbiter it is desirable + # for it to stay up indefinitely until a re-election process has + # taken place - which is not implemented yet FYI). + + actor = Arbiter( + name or 'arbiter', + arbiter_addr=arbiter_addr, + **kwargs + ) + + try: + # assign process-local actor + _state._current_actor = actor + + # start local channel-server and fake the portal API + # NOTE: this won't block since we provide the nursery + logger.info(f"Starting local {actor} @ {host}:{port}") + + # start the actor runtime in a new task + async with trio.open_nursery() as nursery: + + # ``Actor._async_main()`` creates an internal nursery and + # thus blocks here until the entire underlying actor tree has + # terminated thereby conducting structured concurrency. + + await nursery.start( + partial( + actor._async_main, + accept_addr=(host, port), + parent_addr=None + ) + ) + try: + yield actor + # result = await main() + except (Exception, trio.MultiError) as err: + logger.exception("Actor crashed:") + await _debug._maybe_enter_pm(err) + + raise + finally: + logger.info("Shutting down root actor") + await actor.cancel() + finally: + _state._current_actor = None + logger.info("Root actor terminated") + + +def run( + + # target + async_fn: typing.Callable[..., typing.Awaitable], + *args, + + # runtime kwargs + name: Optional[str] = None, + arbiter_addr: Tuple[str, int] = ( + _default_arbiter_host, + _default_arbiter_port, + ), + + start_method: Optional[str] = None, + debug_mode: bool = False, + **kwargs, + +) -> Any: + """Run a trio-actor async function in process. + + This is tractor's main entry and the start point for any async actor. + """ + async def _main(): + + async with open_root_actor( + arbiter_addr=arbiter_addr, + name=name, + start_method=start_method, + debug_mode=debug_mode, + **kwargs, + ): + + return await async_fn(*args) + + warnings.warn( + "`tractor.run()` is now deprecated. `tractor` now" + " implicitly starts the root actor on first actor nursery" + " use. If you want to start the root actor manually, use" + " `tractor.open_root_actor()`.", + DeprecationWarning, + stacklevel=2, + ) + return trio.run(_main) + + +def run_daemon( + rpc_module_paths: List[str], + **kwargs +) -> None: + """Spawn daemon actor which will respond to RPC. + + This is a convenience wrapper around + ``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned + is meant to run forever responding to RPC requests. + """ + kwargs['rpc_module_paths'] = list(rpc_module_paths) + + for path in rpc_module_paths: + importlib.import_module(path) + + return run(partial(trio.sleep, float('inf')), **kwargs)