forked from goodboy/tractor
1
0
Fork 0
tractor/tractor/__init__.py

1072 lines
39 KiB
Python
Raw Normal View History

"""
tractor: An actor model micro-framework built on
``trio`` and ``multiprocessing``.
"""
from collections import defaultdict
from functools import partial
from typing import Coroutine
import importlib
import inspect
import multiprocessing as mp
import traceback
import uuid
import trio
from async_generator import asynccontextmanager, aclosing
from .ipc import Channel, _connect_chan
2018-06-12 19:17:48 +00:00
from .log import get_console_log, get_logger
ctx = mp.get_context("forkserver")
2018-06-12 19:17:48 +00:00
log = get_logger('tractor')
# set at startup and after forks
_current_actor = None
_default_arbiter_host = '127.0.0.1'
_default_arbiter_port = 1616
_default_loglevel = None
def get_loglevel():
return _default_loglevel
class ActorFailure(Exception):
"General actor failure"
class RemoteActorError(ActorFailure):
"Remote actor exception bundled locally"
2018-06-12 19:17:48 +00:00
@asynccontextmanager
async def maybe_open_nursery(nursery=None):
"""Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
"""
if nursery is not None:
yield nursery
else:
async with trio.open_nursery() as nursery:
yield nursery
async def _invoke(
cid, chan, func, kwargs,
2018-06-23 03:30:55 +00:00
treat_as_gen=False, raise_errs=False,
task_status=trio.TASK_STATUS_IGNORED
):
"""Invoke local func and return results over provided channel.
"""
try:
is_async_partial = False
is_async_gen_partial = False
if isinstance(func, partial):
is_async_partial = inspect.iscoroutinefunction(func.func)
is_async_gen_partial = inspect.isasyncgenfunction(func.func)
if (
not inspect.iscoroutinefunction(func) and
not inspect.isasyncgenfunction(func) and
not is_async_partial and
not is_async_gen_partial
):
await chan.send({'return': func(**kwargs), 'cid': cid})
else:
coro = func(**kwargs)
if inspect.isasyncgen(coro):
# XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be
# triggered in the underlying async gen! So we
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
async with aclosing(coro) as agen:
async for item in agen:
# TODO: can we send values back in here?
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': None, 'cid': cid})
else:
if treat_as_gen:
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
2018-06-23 03:30:55 +00:00
# manualy construct the response dict-packet-responses as
# above
await coro
else:
await chan.send({'return': await coro, 'cid': cid})
2018-06-23 03:30:55 +00:00
except Exception:
log.exception("Actor errored:")
if not raise_errs:
await chan.send({'error': traceback.format_exc(), 'cid': cid})
else:
raise
task_status.started()
2018-06-23 03:30:55 +00:00
async def result_from_q(q, chan):
"""Process a msg from a remote actor.
"""
first_msg = await q.get()
if 'return' in first_msg:
return 'return', first_msg, q
elif 'yield' in first_msg:
return 'yield', first_msg, q
elif 'error' in first_msg:
raise RemoteActorError(f"{chan.uid}\n" + first_msg['error'])
else:
raise ValueError(f"{first_msg} is an invalid response packet?")
async def _do_handshake(actor, chan):
await chan.send(actor.uid)
uid = await chan.recv()
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid
class Actor:
"""The fundamental concurrency primitive.
An *actor* is the combination of a regular Python or
``multiprocessing.Process`` executing a ``trio`` task tree, communicating
with other actors through "portals" which provide a native async API
around "channels".
"""
2018-06-12 19:17:48 +00:00
is_arbiter = False
def __init__(
self,
name: str,
main: Coroutine = None,
rpc_module_paths: [str] = [],
statespace: dict = {},
2018-06-12 19:17:48 +00:00
uid: str = None,
allow_rpc: bool = True,
2018-06-23 03:30:55 +00:00
outlive_main: bool = False,
loglevel: str = None,
arbiter_addr: (str, int) = None,
):
self.name = name
2018-06-12 19:17:48 +00:00
self.uid = (name, uid or str(uuid.uuid1()))
self.rpc_module_paths = rpc_module_paths
self._mods = {}
self.main = main
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
self.statespace = statespace
2018-06-12 19:17:48 +00:00
self._allow_rpc = allow_rpc
2018-06-23 03:30:55 +00:00
self._outlive_main = outlive_main
self.loglevel = loglevel
self._arb_addr = arbiter_addr
# filled in by `_async_main` after fork
self._peers = defaultdict(list)
self._peer_connected = {}
2018-06-23 03:30:55 +00:00
self._no_more_peers = trio.Event()
self._main_complete = trio.Event()
self._main_scope = None
2018-06-23 03:30:55 +00:00
self._no_more_peers.set()
self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
self._listeners = []
self._parent_chan = None
self._accept_host = None
async def wait_for_peer(self, uid):
"""Wait for a connection back from a spawned actor with a given
``uid``.
"""
log.debug(f"Waiting for peer {uid} to connect")
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.debug(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1]
def load_namespaces(self):
# We load namespaces after fork since this actor may
# be spawned on a different machine from the original nursery
# and we need to try and load the local module code (if it
# exists)
for path in self.rpc_module_paths:
self._mods[path] = importlib.import_module(path)
async def _stream_handler(
self,
stream: trio.SocketStream,
):
"""
Entry point for new inbound connections to the channel server.
"""
2018-06-23 03:30:55 +00:00
self._no_more_peers.clear()
chan = Channel(stream=stream)
2018-06-23 03:30:55 +00:00
log.info(f"New connection to us {chan}")
# send/receive initial handshake response
try:
uid = await _do_handshake(self, chan)
except StopAsyncIteration:
log.warn(f"Channel {chan} failed to handshake")
return
2018-06-23 03:30:55 +00:00
# channel tracking
event = self._peer_connected.pop(uid, None)
if event:
# Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via
# async-rpc calls.
log.debug(f"Waking channel waiters {event.statistics()}")
# Alert any task waiting on this connection to come up
event.set()
chans = self._peers[uid]
if chans:
log.warn(
f"already have channel(s) for {uid}:{chans}?"
)
log.debug(f"Registered {chan} for {uid}")
# append new channel
self._peers[uid].append(chan)
2018-06-12 19:17:48 +00:00
# Begin channel management - respond to remote requests and
# process received reponses.
try:
await self._process_messages(chan)
finally:
2018-06-23 03:30:55 +00:00
# Drop ref to channel so it can be gc-ed and disconnected
log.debug(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid)
chans.remove(chan)
if not chans:
log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None)
if not self._actors2calls.get(chan.uid, {}).get('main'):
# fake a "main task" result for any waiting
# nurseries/portals
log.debug(f"Faking result for {chan} from {chan.uid}")
q = self.get_waitq(chan.uid, 'main')
q.put_nowait({'return': None, 'cid': 'main'})
log.debug(f"Peers is {self._peers}")
if not self._peers: # no more channels connected
self._no_more_peers.set()
log.debug(f"Signalling no more peer channels")
# XXX: is this necessary?
if chan.connected():
log.debug(f"Disconnecting channel {chan}")
await chan.send(None)
await chan.aclose()
async def _push_result(self, actorid, cid, msg):
assert actorid, f"`actorid` can't be {actorid}"
q = self.get_waitq(actorid, cid)
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
# maintain backpressure
await q.put(msg)
def get_waitq(self, actorid, cid):
log.debug(f"Getting result queue for {actorid} cid {cid}")
cids2qs = self._actors2calls.setdefault(actorid, {})
return cids2qs.setdefault(cid, trio.Queue(1000))
async def send_cmd(self, chan, ns, func, kwargs):
"""Send a ``'cmd'`` message to a remote actor and return a
caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop.
"""
2018-06-23 03:30:55 +00:00
cid = str(uuid.uuid1())
q = self.get_waitq(chan.uid, cid)
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
2018-06-23 03:30:55 +00:00
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return cid, q
async def _process_messages(self, chan, treat_as_gen=False):
"""Process messages async-RPC style.
Process rpc requests and deliver retrieved responses from channels.
"""
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
log.debug(f"Entering msg loop for {chan} from {chan.uid}")
async with trio.open_nursery() as nursery:
try:
async for msg in chan.aiter_recv():
if msg is None: # terminate sentinel
log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}")
nursery.cancel_scope.cancel()
log.debug(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break
log.debug(f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid')
if cid: # deliver response to local caller/waiter
await self._push_result(chan.uid, cid, msg)
log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
else:
ns, funcname, kwargs, actorid, cid = msg['cmd']
log.debug(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(self, funcname)
else:
func = getattr(self._mods[ns], funcname)
# spin up a task for the requested function
sig = inspect.signature(func)
treat_as_gen = False
if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg"
kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `chan` kwarg in its
# signature will be treated as one.
treat_as_gen = True
log.debug(f"Spawning task for {func}")
nursery.start_soon(
_invoke, cid, chan, func, kwargs, treat_as_gen,
name=funcname
)
log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
else: # channel disconnect
log.debug(f"{chan} from {chan.uid} disconnected")
except trio.ClosedStreamError:
log.error(f"{chan} form {chan.uid} broke")
log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
def _fork_main(self, accept_addr, parent_addr=None):
2018-06-12 19:17:48 +00:00
# after fork routine which invokes a fresh ``trio.run``
# log.warn("Log level after fork is {self.loglevel}")
if self.loglevel is not None:
get_console_log(self.loglevel)
log.info(
f"Started new {ctx.current_process()} for actor {self.uid}")
global _current_actor
_current_actor = self
log.debug(f"parent_addr is {parent_addr}")
try:
trio.run(partial(
self._async_main, accept_addr, parent_addr=parent_addr))
except KeyboardInterrupt:
pass # handle it the same way trio does?
log.debug(f"Actor {self.uid} terminated")
async def _async_main(
self,
accept_addr,
arbiter_addr=None,
parent_addr=None,
nursery=None
):
"""Start the channel server, maybe connect back to the parent, and
start the main task.
2018-06-12 19:17:48 +00:00
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
"""
result = None
arbiter_addr = arbiter_addr or self._arb_addr
registered_with_arbiter = False
try:
async with maybe_open_nursery(nursery) as nursery:
self._root_nursery = nursery
2018-06-23 03:30:55 +00:00
# Startup up channel server
host, port = accept_addr
await nursery.start(partial(
2018-06-23 03:30:55 +00:00
self._serve_forever, accept_host=host, accept_port=port)
)
2018-06-12 19:17:48 +00:00
# XXX: I wonder if a better name is maybe "requester"
# since I don't think the notion of a "parent" actor
# necessarily sticks given that eventually we want
# ``'MainProcess'`` (the actor who initially starts the
# forkserver) to eventually be the only one who is
# allowed to spawn new processes per Python program.
if parent_addr is not None:
try:
# Connect back to the parent actor and conduct initial
# handshake (From this point on if we error ship the
# exception back to the parent actor)
chan = self._parent_chan = Channel(
destaddr=parent_addr,
on_reconnect=self.main
)
await chan.connect()
# initial handshake, report who we are, who they are
await _do_handshake(self, chan)
except OSError: # failed to connect
log.warn(
f"Failed to connect to parent @ {parent_addr},"
" closing server")
self.cancel_server()
self._parent_chan = None
# register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr)
registered_with_arbiter = True
# handle new connection back to parent optionally
# begin responding to RPC
if self._allow_rpc:
self.load_namespaces()
if self._parent_chan:
nursery.start_soon(
self._process_messages, self._parent_chan)
if self.main:
try:
if self._parent_chan:
async with trio.open_nursery() as n:
self._main_scope = n.cancel_scope
log.debug(f"Starting main task `{self.main}`")
# spawned subactor so deliver "main"
# task result(s) back to parent
await n.start(
_invoke, 'main',
self._parent_chan, self.main, {},
# treat_as_gen, raise_errs params
False, True
)
else:
with trio.open_cancel_scope() as main_scope:
self._main_scope = main_scope
# run directly we are an "unspawned actor"
log.debug(f"Running `{self.main}` directly")
result = await self.main()
finally:
# tear down channel server in order to ensure
# we exit normally when the main task is done
if not self._outlive_main:
log.debug(f"Shutting down channel server")
self.cancel_server()
log.debug(f"Shutting down root nursery")
nursery.cancel_scope.cancel()
self._main_complete.set()
if self._main_scope.cancelled_caught:
log.debug("Main task was cancelled sucessfully")
log.debug("Waiting on root nursery to complete")
2018-06-23 03:30:55 +00:00
# blocks here as expected if no nursery was provided until
# the channel server is killed (i.e. this actor is
# cancelled or signalled by the parent actor)
except Exception:
if self._parent_chan:
try:
await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'main'})
except trio.ClosedStreamError:
log.error(
f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed")
log.exception("Actor errored:")
if not registered_with_arbiter:
log.exception(
f"Failed to register with arbiter @ {arbiter_addr}")
else:
raise
finally:
await self._do_unreg(arbiter_addr)
# terminate actor once all it's peers (actors that connected
# to it as clients) have disappeared
if not self._no_more_peers.is_set():
log.debug(
f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait()
log.debug(f"All peer channels are complete")
# tear down channel server no matter what since we errored
# or completed
log.debug(f"Shutting down channel server")
self.cancel_server()
return result
2018-06-12 19:17:48 +00:00
async def _serve_forever(
self,
*,
# (host, port) to bind for channel server
accept_host=None,
accept_port=0,
task_status=trio.TASK_STATUS_IGNORED
):
"""Start the channel server, begin listening for new connections.
This will cause an actor to continue living (blocking) until
``cancel_server()`` is called.
"""
2018-06-23 03:30:55 +00:00
async with trio.open_nursery() as nursery:
self._server_nursery = nursery
# TODO: might want to consider having a separate nursery
# for the stream handler such that the server can be cancelled
# whilst leaving existing channels up
listeners = await nursery.start(
partial(
trio.serve_tcp,
self._stream_handler,
# new connections will stay alive even if this server
# is cancelled
2018-06-23 03:30:55 +00:00
handler_nursery=self._root_nursery,
port=accept_port, host=accept_host,
)
2018-06-12 19:17:48 +00:00
)
log.debug(
f"Started tcp server(s) on {[l.socket for l in listeners]}")
2018-06-23 03:30:55 +00:00
self._listeners.extend(listeners)
task_status.started()
async def _do_unreg(self, arbiter_addr):
# UNregister actor from the arbiter
try:
if arbiter_addr is not None:
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'unregister_actor', name=self.name)
except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter")
async def cancel(self):
"""This cancels the internal root-most nursery thereby gracefully
cancelling (for all intents and purposes) this actor.
"""
self.cancel_server()
if self._main_scope:
self._main_scope.cancel()
log.debug("Waiting on main task to complete")
await self._main_complete.wait()
self._root_nursery.cancel_scope.cancel()
def cancel_server(self):
"""Cancel the internal channel server nursery thereby
preventing any new inbound connections from being established.
"""
self._server_nursery.cancel_scope.cancel()
2018-06-12 19:17:48 +00:00
@property
def accept_addr(self):
"""Primary address to which the channel server is bound.
"""
try:
return self._listeners[0].socket.getsockname()
except OSError:
return
2018-06-12 19:17:48 +00:00
def get_parent(self):
return Portal(self._parent_chan)
def get_chans(self, actorid):
return self._peers[actorid]
class Arbiter(Actor):
"""A special actor who knows all the other actors and always has
access to the top level nursery.
The arbiter is by default the first actor spawned on each host
and is responsible for keeping track of all other actors for
coordination purposes. If a new main process is launched and an
arbiter is already running that arbiter will be used.
"""
_registry = defaultdict(list)
2018-06-12 19:17:48 +00:00
is_arbiter = True
2018-06-12 19:17:48 +00:00
def find_actor(self, name):
return self._registry[name]
def register_actor(self, name, sockaddr):
self._registry[name].append(sockaddr)
def unregister_actor(self, name):
self._registry.pop(name, None)
class Portal:
"""A 'portal' to a(n) (remote) ``Actor``.
Allows for invoking remote routines and receiving results through an
underlying ``tractor.Channel`` as though the remote (async)
2018-06-12 19:17:48 +00:00
function / generator was invoked locally.
Think of this like an native async IPC API.
"""
def __init__(self, channel):
self.channel = channel
self._result = None
async def aclose(self):
log.debug(f"Closing {self}")
# XXX: won't work until https://github.com/python-trio/trio/pull/460
# gets in!
await self.channel.aclose()
async def run(self, ns, func, **kwargs):
"""Submit a function to be scheduled and run by actor, return its
(stream of) result(s).
"""
# TODO: not this needs some serious work and thinking about how
# to make async-generators the fundamental IPC API over channels!
# (think `yield from`, `gen.send()`, and functional reactive stuff)
actor = current_actor()
# ship a function call request to the remote actor
cid, q = await actor.send_cmd(self.channel, ns, func, kwargs)
# wait on first response msg and handle
return await self._return_from_resptype(
cid, *(await result_from_q(q, self.channel)))
async def _return_from_resptype(self, cid, resptype, first_msg, q):
if resptype == 'yield':
async def yield_from_q():
yield first_msg['yield']
try:
async for msg in q:
try:
yield msg['yield']
except KeyError:
if 'stop' in msg:
break # far end async gen terminated
else:
raise RemoteActorError(msg['error'])
except GeneratorExit:
log.debug(
f"Cancelling async gen call {cid} to "
f"{self.channel.uid}")
raise
return yield_from_q()
elif resptype == 'return':
return first_msg['return']
else:
raise ValueError(f"Unknown msg response type: {first_msg}")
async def result(self):
"""Return the result(s) from the remote actor's "main" task.
"""
if self._result is None:
q = current_actor().get_waitq(self.channel.uid, 'main')
resptype, first_msg, q = (await result_from_q(q, self.channel))
self._result = await self._return_from_resptype(
'main', resptype, first_msg, q)
log.warn(
f"Retrieved first result `{self._result}` "
f"for {self.channel.uid}")
# await q.put(first_msg) # for next consumer (e.g. nursery)
return self._result
async def close(self):
# trigger remote msg loop `break`
chan = self.channel
log.debug(f"Closing portal for {chan} to {chan.uid}")
await self.channel.send(None)
async def cancel_actor(self):
"""Cancel the actor on the other end of this portal.
"""
log.warn(
f"Sending cancel request to {self.channel.uid} on "
f"{self.channel}")
try:
with trio.move_on_after(0.1) as cancel_scope:
cancel_scope.shield = True
# send cancel cmd - might not get response
await self.run('self', 'cancel')
return True
except trio.ClosedStreamError:
log.warn(
f"{self.channel} for {self.channel.uid} was already closed?")
return False
@asynccontextmanager
async def open_portal(channel, nursery=None):
"""Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle message processing.
"""
actor = current_actor()
assert actor
was_connected = False
async with maybe_open_nursery(nursery) as nursery:
if not channel.connected():
await channel.connect()
was_connected = True
if channel.uid is None:
await _do_handshake(actor, channel)
nursery.start_soon(actor._process_messages, channel)
portal = Portal(channel)
yield portal
# cancel remote channel-msg loop
if channel.connected():
await portal.close()
# cancel background msg loop task
nursery.cancel_scope.cancel()
if was_connected:
await channel.aclose()
2018-06-12 19:17:48 +00:00
class LocalPortal:
"""A 'portal' to a local ``Actor``.
A compatibility shim for normal portals but for invoking functions
using an in process actor instance.
"""
def __init__(self, actor):
self.actor = actor
async def run(self, ns, func, **kwargs):
"""Run a requested function locally and return it's result.
"""
obj = self.actor if ns == 'self' else importlib.import_module(ns)
func = getattr(obj, func)
return func(**kwargs)
class ActorNursery:
"""Spawn scoped subprocess actors.
"""
def __init__(self, actor, supervisor=None):
self.supervisor = supervisor # TODO
self._actor = actor
# We'll likely want some way to cancel all sub-actors eventually
# self.cancel_scope = cancel_scope
self._children = {}
self.cancelled = False
async def __aenter__(self):
return self
async def start_actor(
self,
name: str,
main=None,
2018-06-12 19:17:48 +00:00
bind_addr=('127.0.0.1', 0),
statespace=None,
rpc_module_paths=None,
2018-06-23 03:30:55 +00:00
outlive_main=False, # sub-actors die when their main task completes
loglevel=None, # set log level per subactor
):
loglevel = loglevel or self._actor.loglevel or get_loglevel()
actor = Actor(
name,
# modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths or [],
statespace=statespace, # global proc state vars
main=main, # main coroutine to be invoked
2018-06-23 03:30:55 +00:00
outlive_main=outlive_main,
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
)
parent_addr = self._actor.accept_addr
assert parent_addr
proc = ctx.Process(
target=actor._fork_main,
args=(bind_addr, parent_addr),
# daemon=True,
name=name,
)
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await self._actor.wait_for_peer(actor.uid)
portal = Portal(chan)
self._children[(name, proc.pid)] = (actor, proc, portal)
return portal
async def wait(self):
"""Wait for all subactors to complete.
"""
async def wait_for_proc(proc, actor, portal):
# TODO: timeout block here?
if proc.is_alive():
await trio.hazmat.wait_readable(proc.sentinel)
# please god don't hang
proc.join()
log.debug(f"Joined {proc}")
event = self._actor._peers.get(actor.uid)
if isinstance(event, trio.Event):
event.set()
log.warn(
f"Cancelled `wait_for_peer()` call since {actor.uid}"
f" is already dead!")
if not portal._result:
log.debug(f"Faking result for {actor.uid}")
q = self._actor.get_waitq(actor.uid, 'main')
q.put_nowait({'return': None, 'cid': 'main'})
async def wait_for_result(portal):
if portal.channel.connected():
log.debug(f"Waiting on final result from {subactor.uid}")
await portal.result()
# unblocks when all waiter tasks have completed
async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values():
nursery.start_soon(wait_for_proc, proc, subactor, portal)
nursery.start_soon(wait_for_result, portal)
2018-06-23 03:30:55 +00:00
async def cancel(self, hard_kill=False):
"""Cancel this nursery by instructing each subactor to cancel
iteslf and wait for all subprocesses to terminate.
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
"""
log.debug(f"Cancelling nursery")
for subactor, proc, portal in self._children.values():
if proc is mp.current_process():
2018-06-23 03:30:55 +00:00
# XXX: does this even make sense?
await subactor.cancel()
else:
2018-06-23 03:30:55 +00:00
if hard_kill:
log.warn(f"Hard killing subactors {self._children}")
2018-06-23 03:30:55 +00:00
proc.terminate()
# XXX: doesn't seem to work?
# send KeyBoardInterrupt (trio abort signal) to sub-actors
2018-06-23 03:30:55 +00:00
# os.kill(proc.pid, signal.SIGINT)
else:
await portal.cancel_actor()
2018-06-23 03:30:55 +00:00
log.debug(f"Waiting on all subactors to complete")
await self.wait()
2018-07-11 23:24:08 +00:00
self.cancelled = True
2018-06-23 03:30:55 +00:00
log.debug(f"All subactors for {self} have terminated")
async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete.
"""
if etype is not None:
# XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case the
# else block here might not complete? Should both be shielded?
if etype is trio.Cancelled:
with trio.open_cancel_scope(shield=True):
log.warn(
f"{current_actor().uid} was cancelled with {etype}"
", cancelling actor nursery")
await self.cancel()
else:
log.exception(
f"{current_actor().uid} errored with {etype}, "
"cancelling actor nursery")
await self.cancel()
else:
# XXX: this is effectively the lone cancellation/supervisor
# strategy which exactly mimicks trio's behaviour
log.debug(f"Waiting on subactors {self._children} to complete")
try:
await self.wait()
except Exception as err:
log.warn(f"Nursery caught {err}, cancelling")
await self.cancel()
raise
log.debug(f"Nursery teardown complete")
2018-06-12 19:17:48 +00:00
def current_actor() -> Actor:
"""Get the process-local actor instance.
"""
return _current_actor
@asynccontextmanager
async def open_nursery(supervisor=None):
"""Create and yield a new ``ActorNursery``.
"""
2018-06-12 19:17:48 +00:00
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
# TODO: figure out supervisors from erlang
2018-06-12 19:17:48 +00:00
async with ActorNursery(current_actor(), supervisor) as nursery:
yield nursery
class NoArbiterFound(Exception):
"Couldn't find the arbiter?"
async def _start_actor(actor, host, port, arbiter_addr, nursery=None):
2018-06-12 19:17:48 +00:00
"""Spawn a local actor by starting a task to execute it's main
async function.
Blocks if no nursery is provided, in which case it is expected the nursery
provider is responsible for waiting on the task to complete.
"""
# assign process-local actor
global _current_actor
_current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
log.info(f"Starting local {actor} @ {host}:{port}")
result = await actor._async_main(
accept_addr=(host, port),
2018-06-12 19:17:48 +00:00
parent_addr=None,
arbiter_addr=arbiter_addr,
2018-06-12 19:17:48 +00:00
nursery=nursery,
)
# XXX: If spawned locally, the actor is cancelled when this
# context is complete given that there are no more active
# peer channels connected to it.
if not actor._outlive_main:
actor.cancel_server()
2018-06-12 19:17:48 +00:00
# unset module state
_current_actor = None
log.info("Completed async main")
2018-06-12 19:17:48 +00:00
return result
2018-06-12 19:17:48 +00:00
@asynccontextmanager
async def get_arbiter(host, port):
"""Return a portal instance connected to a local or remote
arbiter.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
2018-06-12 19:17:48 +00:00
if actor.is_arbiter:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal:
yield arb_portal
@asynccontextmanager
async def find_actor(
name,
arbiter_sockaddr=None,
):
2018-06-12 19:17:48 +00:00
"""Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
2018-06-12 19:17:48 +00:00
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just
# the last one that registered
if sockaddrs:
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
2018-06-12 19:17:48 +00:00
async def _main(async_fn, args, kwargs, name, arbiter_addr):
"""Async entry point for ``tractor``.
"""
main = partial(async_fn, *args) if async_fn else None
arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host, _default_arbiter_port)
get_console_log(kwargs.get('loglevel', get_loglevel()))
# make a temporary connection to see if an arbiter exists
arbiter_found = False
try:
async with _connect_chan(host, port):
arbiter_found = True
except OSError:
log.warn(f"No actor could be found @ {host}:{port}")
# create a local actor and start up its main routine/task
if arbiter_found: # we were able to connect to an arbiter
log.info(f"Arbiter seems to exist @ {host}:{port}")
actor = Actor(
name or 'anonymous',
main=main,
arbiter_addr=arbiter_addr,
**kwargs
)
host, port = (host, 0)
else:
# start this local actor as the arbiter
# this should eventually get passed `outlive_main=True`?
actor = Arbiter(
name or 'arbiter', main=main, arbiter_addr=arbiter_addr, **kwargs)
# ``Actor._async_main()`` creates an internal nursery if one is not
# provided and thus blocks here until it's main task completes.
# Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI).
return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr)
2018-06-12 19:17:48 +00:00
def run(
async_fn,
*args,
name=None,
arbiter_addr=(_default_arbiter_host, _default_arbiter_port),
**kwargs
):
2018-06-12 19:17:48 +00:00
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.
"""
return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)