Re-org root actor startup into context manager

This begins the move to dropping support for `tractor.run()` which we
don't really need since the runtime is started (as it always has been)
from a new sub-task / nursery. Instead this introduces starting the
actor tree through a `open_root_actor()` async context manager which
we'll likely implicitly call (from the root) on the first use of an
actor nursery.

Drop `_actor._start_actor()` and factor its contents into this new api.
Make `run()` and `run_daemon()` use `open_root_actor()` until we decide
to remove them.

Relates to #168 and #177
drop_tractor_run
Tyler Goodlet 2020-12-27 10:55:00 -05:00
parent f427c98cf6
commit f05534e472
4 changed files with 211 additions and 206 deletions

View File

@ -2,181 +2,38 @@
tractor: An actor model micro-framework built on
``trio`` and ``multiprocessing``.
"""
import importlib
from functools import partial
from typing import Tuple, Any, Optional, List
import typing
import trio # type: ignore
from trio import MultiError
from . import log
from ._ipc import _connect_chan, Channel
from ._ipc import Channel
from ._streaming import Context, stream
from ._discovery import get_arbiter, find_actor, wait_for_actor
from ._actor import Actor, _start_actor, Arbiter
from ._trionics import open_nursery
from ._state import current_actor, is_root_process
from . import _state
from ._exceptions import RemoteActorError, ModuleNotExposed
from ._debug import breakpoint, post_mortem
from . import _spawn
from . import msg
from ._root import run, run_daemon, open_root_actor
__all__ = [
'Channel',
'Context',
'ModuleNotExposed',
'MultiError',
'RemoteActorError',
'breakpoint',
'post_mortem',
'current_actor',
'find_actor',
'get_arbiter',
'is_root_process',
'msg',
'open_nursery',
'wait_for_actor',
'Channel',
'Context',
'open_root_actor',
'post_mortem',
'run',
'run_daemon',
'stream',
'MultiError',
'RemoteActorError',
'ModuleNotExposed',
'msg'
'wait_for_actor',
'to_asyncio',
'wait_for_actor',
]
# set at startup and after forks
_default_arbiter_host = '127.0.0.1'
_default_arbiter_port = 1616
async def _main(
async_fn: typing.Callable[..., typing.Awaitable],
args: Tuple,
arbiter_addr: Tuple[str, int],
name: Optional[str] = None,
start_method: Optional[str] = None,
debug_mode: bool = False,
**kwargs,
) -> typing.Any:
"""Async entry point for ``tractor``.
"""
logger = log.get_logger('tractor')
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
if start_method is not None:
_spawn.try_set_start_method(start_method)
if debug_mode and _spawn._spawn_method == 'trio':
_state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
kwargs.setdefault('rpc_module_paths', []).append('tractor._debug')
elif debug_mode:
raise RuntimeError(
"Debug mode is only supported for the `trio` backend!"
)
main = partial(async_fn, *args)
arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host,
_default_arbiter_port
)
loglevel = kwargs.get('loglevel', log.get_loglevel())
if loglevel is not None:
log._default_loglevel = loglevel
log.get_console_log(loglevel)
# make a temporary connection to see if an arbiter exists
arbiter_found = False
try:
async with _connect_chan(host, port):
arbiter_found = True
except OSError:
logger.warning(f"No actor could be found @ {host}:{port}")
# create a local actor and start up its main routine/task
if arbiter_found: # we were able to connect to an arbiter
logger.info(f"Arbiter seems to exist @ {host}:{port}")
actor = Actor(
name or 'anonymous',
arbiter_addr=arbiter_addr,
**kwargs
)
host, port = (host, 0)
else:
# start this local actor as the arbiter
actor = Arbiter(
name or 'arbiter', arbiter_addr=arbiter_addr, **kwargs)
# ``Actor._async_main()`` creates an internal nursery if one is not
# provided and thus blocks here until it's main task completes.
# Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI).
try:
return await _start_actor(
actor, main, host, port, arbiter_addr=arbiter_addr
)
finally:
logger.info("Root actor terminated")
def run(
async_fn: typing.Callable[..., typing.Awaitable],
*args,
name: Optional[str] = None,
arbiter_addr: Tuple[str, int] = (
_default_arbiter_host,
_default_arbiter_port,
),
# either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default).
start_method: Optional[str] = None,
debug_mode: bool = False,
**kwargs,
) -> Any:
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.
"""
return trio.run(
partial(
# our entry
_main,
# user entry point
async_fn,
args,
# global kwargs
arbiter_addr=arbiter_addr,
name=name,
start_method=start_method,
debug_mode=debug_mode,
**kwargs,
)
)
def run_daemon(
rpc_module_paths: List[str],
**kwargs
) -> None:
"""Spawn daemon actor which will respond to RPC.
This is a convenience wrapper around
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
is meant to run forever responding to RPC requests.
"""
kwargs['rpc_module_paths'] = list(rpc_module_paths)
for path in rpc_module_paths:
importlib.import_module(path)
return run(partial(trio.sleep, float('inf')), **kwargs)

View File

@ -1076,49 +1076,3 @@ class Arbiter(Actor):
def unregister_actor(self, uid: Tuple[str, str]) -> None:
self._registry.pop(uid, None)
async def _start_actor(
actor: Actor,
main: typing.Callable[..., typing.Awaitable],
host: str,
port: int,
arbiter_addr: Tuple[str, int],
nursery: trio.Nursery = None
) -> Any:
"""Spawn a local actor by starting a task to execute it's main async
function.
Blocks if no nursery is provided, in which case it is expected the nursery
provider is responsible for waiting on the task to complete.
"""
# assign process-local actor
_state._current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
log.info(f"Starting local {actor} @ {host}:{port}")
async with trio.open_nursery() as nursery:
await nursery.start(
partial(
actor._async_main,
accept_addr=(host, port),
parent_addr=None
)
)
try:
result = await main()
except (Exception, trio.MultiError) as err:
log.exception("Actor crashed:")
await _debug._maybe_enter_pm(err)
raise
finally:
await actor.cancel()
# unset module state
_state._current_actor = None
log.info("Completed async main")
return result

View File

@ -1,5 +1,5 @@
"""
Process entry points.
Sub-process entry points.
"""
from functools import partial
from typing import Tuple, Any

194
tractor/_root.py 100644
View File

@ -0,0 +1,194 @@
"""
Root actor runtime ignition(s).
"""
from contextlib import asynccontextmanager
from functools import partial
import importlib
from typing import Tuple, Optional, List, Any
import typing
import trio
from ._actor import Actor, Arbiter
from . import _debug
from . import _spawn
from . import _state
from . import log
from ._ipc import _connect_chan
# set at startup and after forks
_default_arbiter_host = '127.0.0.1'
_default_arbiter_port = 1616
logger = log.get_logger('tractor')
@asynccontextmanager
async def open_root_actor(
arbiter_addr: Tuple[str, int],
name: Optional[str] = None,
start_method: Optional[str] = None,
debug_mode: bool = False,
**kwargs,
) -> typing.Any:
"""Async entry point for ``tractor``.
"""
# mark top most level process as root actor
_state._runtime_vars['_is_root'] = True
if start_method is not None:
_spawn.try_set_start_method(start_method)
if debug_mode and _spawn._spawn_method == 'trio':
_state._runtime_vars['_debug_mode'] = True
# expose internal debug module to every actor allowing
# for use of ``await tractor.breakpoint()``
kwargs.setdefault('rpc_module_paths', []).append('tractor._debug')
elif debug_mode:
raise RuntimeError(
"Debug mode is only supported for the `trio` backend!"
)
arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host,
_default_arbiter_port
)
loglevel = kwargs.get('loglevel', log.get_loglevel())
if loglevel is not None:
log._default_loglevel = loglevel
log.get_console_log(loglevel)
# make a temporary connection to see if an arbiter exists
arbiter_found = False
try:
async with _connect_chan(host, port):
arbiter_found = True
except OSError:
logger.warning(f"No actor could be found @ {host}:{port}")
# create a local actor and start up its main routine/task
if arbiter_found: # we were able to connect to an arbiter
logger.info(f"Arbiter seems to exist @ {host}:{port}")
actor = Actor(
name or 'anonymous',
arbiter_addr=arbiter_addr,
**kwargs
)
host, port = (host, 0)
else:
# start this local actor as the arbiter
# Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI).
actor = Arbiter(
name or 'arbiter',
arbiter_addr=arbiter_addr,
**kwargs
)
try:
# assign process-local actor
_state._current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
logger.info(f"Starting local {actor} @ {host}:{port}")
# start the actor runtime in a new task
async with trio.open_nursery() as nursery:
# ``Actor._async_main()`` creates an internal nursery and
# thus blocks here until the entire underlying actor tree has
# terminated thereby conducting structured concurrency.
await nursery.start(
partial(
actor._async_main,
accept_addr=(host, port),
parent_addr=None
)
)
try:
yield actor
# result = await main()
except (Exception, trio.MultiError) as err:
logger.exception("Actor crashed:")
await _debug._maybe_enter_pm(err)
raise
finally:
logger.info("Shutting down root actor")
await actor.cancel()
finally:
_state._current_actor = None
logger.info("Root actor terminated")
def run(
# target
async_fn: typing.Callable[..., typing.Awaitable],
*args,
# runtime kwargs
name: Optional[str] = None,
arbiter_addr: Tuple[str, int] = (
_default_arbiter_host,
_default_arbiter_port,
),
# either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# OR `trio` (the new default).
start_method: Optional[str] = None,
debug_mode: bool = False,
**kwargs,
) -> Any:
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.
"""
async def _main():
async with open_root_actor(
arbiter_addr=arbiter_addr,
name=name,
start_method=start_method,
debug_mode=debug_mode,
**kwargs,
):
return await async_fn(*args)
return trio.run(_main)
def run_daemon(
rpc_module_paths: List[str],
**kwargs
) -> None:
"""Spawn daemon actor which will respond to RPC.
This is a convenience wrapper around
``tractor.run(trio.sleep(float('inf')))`` such that the first actor spawned
is meant to run forever responding to RPC requests.
"""
kwargs['rpc_module_paths'] = list(rpc_module_paths)
for path in rpc_module_paths:
importlib.import_module(path)
return run(partial(trio.sleep, float('inf')), **kwargs)