forked from goodboy/tractor
1
0
Fork 0
tractor/tractor/__init__.py

183 lines
5.0 KiB
Python
Raw Normal View History

"""
tractor: An actor model micro-framework built on
``trio`` and ``multiprocessing``.
"""
import importlib
from functools import partial
2020-08-13 15:53:45 +00:00
from typing import Tuple, Any, Optional, List
import typing
2018-08-26 17:12:29 +00:00
import trio # type: ignore
2018-11-19 19:15:28 +00:00
from trio import MultiError
from . import log
from ._ipc import _connect_chan, Channel
from ._streaming import Context, stream
from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._actor import Actor, _start_actor, Arbiter
2018-07-14 20:09:05 +00:00
from ._trionics import open_nursery
from ._state import current_actor
2020-10-13 15:03:55 +00:00
from . import _state
from ._exceptions import RemoteActorError, ModuleNotExposed
from ._debug import breakpoint, post_mortem
from . import _spawn
2020-10-16 02:47:11 +00:00
from . import msg
2018-07-14 20:09:05 +00:00
__all__ = [
2020-10-13 15:03:55 +00:00
'breakpoint',
'post_mortem',
'current_actor',
'find_actor',
'get_arbiter',
'open_nursery',
2018-11-19 19:15:28 +00:00
'wait_for_actor',
'Channel',
2019-03-15 23:40:34 +00:00
'Context',
'stream',
2018-11-19 19:15:28 +00:00
'MultiError',
'RemoteActorError',
'ModuleNotExposed',
'msg'
2018-07-14 20:09:05 +00:00
]
2018-06-12 19:17:48 +00:00
# set at startup and after forks
_default_arbiter_host = '127.0.0.1'
_default_arbiter_port = 1616
2018-06-12 19:17:48 +00:00
async def _main(
async_fn: typing.Callable[..., typing.Awaitable],
args: Tuple,
2019-12-10 05:55:03 +00:00
arbiter_addr: Tuple[str, int],
name: Optional[str] = None,
start_method: Optional[str] = None,
debug_mode: bool = False,
2020-10-13 15:03:55 +00:00
**kwargs,
) -> typing.Any:
"""Async entry point for ``tractor``.
"""
logger = log.get_logger('tractor')
2020-08-03 02:30:03 +00:00
2020-10-16 02:47:11 +00:00
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
if start_method is not None:
_spawn.try_set_start_method(start_method)
if debug_mode and _spawn._spawn_method == 'trio':
_state._runtime_vars['_debug_mode'] = True
2020-10-16 02:47:11 +00:00
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
kwargs.setdefault('rpc_module_paths', []).append('tractor._debug')
2020-10-16 02:47:11 +00:00
elif debug_mode:
2020-10-16 02:47:11 +00:00
raise RuntimeError(
"Debug mode is only supported for the `trio` backend!"
)
main = partial(async_fn, *args)
2020-08-03 02:30:03 +00:00
arbiter_addr = (host, port) = arbiter_addr or (
2020-08-03 02:30:03 +00:00
_default_arbiter_host,
_default_arbiter_port
)
loglevel = kwargs.get('loglevel', log.get_loglevel())
if loglevel is not None:
log._default_loglevel = loglevel
log.get_console_log(loglevel)
# make a temporary connection to see if an arbiter exists
arbiter_found = False
try:
async with _connect_chan(host, port):
arbiter_found = True
except OSError:
logger.warning(f"No actor could be found @ {host}:{port}")
# create a local actor and start up its main routine/task
if arbiter_found: # we were able to connect to an arbiter
logger.info(f"Arbiter seems to exist @ {host}:{port}")
actor = Actor(
name or 'anonymous',
arbiter_addr=arbiter_addr,
**kwargs
)
host, port = (host, 0)
else:
# start this local actor as the arbiter
actor = Arbiter(
2018-08-01 19:15:18 +00:00
name or 'arbiter', arbiter_addr=arbiter_addr, **kwargs)
# ``Actor._async_main()`` creates an internal nursery if one is not
# provided and thus blocks here until it's main task completes.
# Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI).
try:
return await _start_actor(
actor, main, host, port, arbiter_addr=arbiter_addr
)
finally:
logger.info("Root actor terminated")
2018-06-12 19:17:48 +00:00
def run(
async_fn: typing.Callable[..., typing.Awaitable],
2019-12-10 05:55:03 +00:00
*args,
name: Optional[str] = None,
arbiter_addr: Tuple[str, int] = (
2020-08-03 02:30:03 +00:00
_default_arbiter_host,
_default_arbiter_port,
),
# either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default).
start_method: Optional[str] = None,
debug_mode: bool = False,
2019-12-10 05:55:03 +00:00
**kwargs,
) -> Any:
2018-06-12 19:17:48 +00:00
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.
"""
return trio.run(
partial(
# our entry
_main,
# user entry point
async_fn,
args,
# global kwargs
arbiter_addr=arbiter_addr,
name=name,
start_method=start_method,
debug_mode=debug_mode,
**kwargs,
)
)
def run_daemon(
2020-08-13 15:53:45 +00:00
rpc_module_paths: List[str],
**kwargs
) -> None:
2018-09-14 20:33:45 +00:00
"""Spawn daemon actor which will respond to RPC.
This is a convenience wrapper around
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
2018-11-09 06:40:12 +00:00
is meant to run forever responding to RPC requests.
"""
2020-08-13 15:53:45 +00:00
kwargs['rpc_module_paths'] = list(rpc_module_paths)
2018-09-10 19:19:49 +00:00
2018-09-14 20:33:45 +00:00
for path in rpc_module_paths:
importlib.import_module(path)
2018-09-10 19:19:49 +00:00
return run(partial(trio.sleep, float('inf')), **kwargs)