tractor/tractor/_actor.py

902 lines
34 KiB
Python
Raw Normal View History

2018-07-14 20:09:05 +00:00
"""
Actor primitives and helpers
"""
from collections import defaultdict
from functools import partial
from itertools import chain
import importlib
import inspect
2018-07-14 20:09:05 +00:00
import uuid
2018-08-31 21:16:24 +00:00
import typing
from typing import Dict, List, Tuple, Any, Optional, Union
2018-07-14 20:09:05 +00:00
2018-08-26 17:12:29 +00:00
import trio # type: ignore
2018-07-14 20:09:05 +00:00
from async_generator import asynccontextmanager, aclosing
from ._ipc import Channel, _connect_chan, Context
2018-07-14 20:09:05 +00:00
from .log import get_console_log, get_logger
from ._exceptions import (
pack_error,
unpack_error,
ModuleNotExposed
)
from ._portal import (
Portal,
open_portal,
_do_handshake,
LocalPortal,
)
2018-07-14 20:09:05 +00:00
from . import _state
from ._state import current_actor
log = get_logger('tractor')
class ActorFailure(Exception):
"General actor failure"
async def _invoke(
actor: 'Actor',
cid: str,
chan: Channel,
func: typing.Callable,
2018-08-31 21:16:24 +00:00
kwargs: Dict[str, Any],
2018-07-14 20:09:05 +00:00
task_status=trio.TASK_STATUS_IGNORED
):
"""Invoke local func and return results over provided channel.
"""
sig = inspect.signature(func)
treat_as_gen = False
cs = None
ctx = Context(chan, cid)
if 'ctx' in sig.parameters:
kwargs['ctx'] = ctx
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `ctx` kwarg in its
# signature will be treated as one.
treat_as_gen = True
2018-07-14 20:09:05 +00:00
try:
is_async_partial = False
is_async_gen_partial = False
if isinstance(func, partial):
is_async_partial = inspect.iscoroutinefunction(func.func)
is_async_gen_partial = inspect.isasyncgenfunction(func.func)
if (
not inspect.iscoroutinefunction(func) and
not inspect.isasyncgenfunction(func) and
not is_async_partial and
not is_async_gen_partial
):
2018-08-01 19:15:18 +00:00
await chan.send({'functype': 'function', 'cid': cid})
with trio.open_cancel_scope() as cs:
task_status.started(cs)
await chan.send({'return': func(**kwargs), 'cid': cid})
2018-07-14 20:09:05 +00:00
else:
coro = func(**kwargs)
if inspect.isasyncgen(coro):
2018-08-01 19:15:18 +00:00
await chan.send({'functype': 'asyncgen', 'cid': cid})
2018-07-14 20:09:05 +00:00
# XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be
# triggered in the underlying async gen! So we
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
2018-08-01 19:15:18 +00:00
with trio.open_cancel_scope() as cs:
task_status.started(cs)
async with aclosing(coro) as agen:
async for item in agen:
# TODO: can we send values back in here?
# it's gonna require a `while True:` and
# some non-blocking way to retrieve new `asend()`
# values from the channel:
# to_send = await chan.recv_nowait()
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': True, 'cid': cid})
2018-07-14 20:09:05 +00:00
else:
if treat_as_gen:
2018-08-01 19:15:18 +00:00
await chan.send({'functype': 'asyncgen', 'cid': cid})
2018-07-14 20:09:05 +00:00
# XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
2018-08-01 19:15:18 +00:00
with trio.open_cancel_scope() as cs:
task_status.started(cs)
await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': True, 'cid': cid})
2018-07-14 20:09:05 +00:00
else:
2018-08-01 19:15:18 +00:00
await chan.send({'functype': 'asyncfunction', 'cid': cid})
with trio.open_cancel_scope() as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
except Exception as err:
2018-08-01 19:15:18 +00:00
# always ship errors back to caller
2018-07-14 20:09:05 +00:00
log.exception("Actor errored:")
err_msg = pack_error(err)
err_msg['cid'] = cid
try:
await chan.send(err_msg)
except trio.ClosedResourceError:
log.exception(
f"Failed to ship error to caller @ {chan.uid}")
if cs is None:
# error is from above code not from rpc invocation
task_status.started(err)
2018-08-01 19:15:18 +00:00
finally:
# RPC task bookeeping
try:
scope, func, is_complete = actor._rpc_tasks.pop((chan, cid))
is_complete.set()
except KeyError:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warn(
f"Task {func} was likely cancelled before it was started")
2018-07-14 20:09:05 +00:00
2018-08-01 19:15:18 +00:00
if not actor._rpc_tasks:
log.info(f"All RPC tasks have completed")
actor._no_more_rpc_tasks.set()
2018-07-14 20:09:05 +00:00
class Actor:
"""The fundamental concurrency primitive.
An *actor* is the combination of a regular Python or
``multiprocessing.Process`` executing a ``trio`` task tree, communicating
with other actors through "portals" which provide a native async API
around "channels".
"""
is_arbiter = False
def __init__(
self,
name: str,
2018-08-31 21:16:24 +00:00
rpc_module_paths: List[str] = [],
statespace: Optional[Dict[str, Any]] = None,
2018-07-14 20:09:05 +00:00
uid: str = None,
loglevel: str = None,
2018-08-31 21:16:24 +00:00
arbiter_addr: Optional[Tuple[str, int]] = None,
) -> None:
2018-07-14 20:09:05 +00:00
self.name = name
self.uid = (name, uid or str(uuid.uuid1()))
self.rpc_module_paths = rpc_module_paths
2018-08-31 21:16:24 +00:00
self._mods: dict = {}
2018-07-14 20:09:05 +00:00
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
2018-08-31 21:16:24 +00:00
self.statespace = statespace or {}
2018-07-14 20:09:05 +00:00
self.loglevel = loglevel
self._arb_addr = arbiter_addr
# filled in by `_async_main` after fork
2018-08-31 21:16:24 +00:00
self._root_nursery: trio._core._run.Nursery = None
self._server_nursery: trio._core._run.Nursery = None
self._peers: defaultdict = defaultdict(list)
self._peer_connected: dict = {}
2018-07-14 20:09:05 +00:00
self._no_more_peers = trio.Event()
self._no_more_peers.set()
2018-08-01 19:15:18 +00:00
self._no_more_rpc_tasks = trio.Event()
self._no_more_rpc_tasks.set()
# (chan, cid) -> (cancel_scope, func)
2018-08-31 21:16:24 +00:00
self._rpc_tasks: Dict[
Tuple[Channel, str],
Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event]
2018-08-31 21:16:24 +00:00
] = {}
# map {uids -> {callids -> waiter queues}}
2019-02-15 21:23:58 +00:00
self._cids2qs: Dict[
Tuple[Tuple[str, str], str],
trio.abc.SendChannel[Any]] = {}
2018-08-31 21:16:24 +00:00
self._listeners: List[trio.abc.Listener] = []
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None
2018-07-14 20:09:05 +00:00
2018-08-26 17:12:29 +00:00
async def wait_for_peer(
2018-08-31 21:16:24 +00:00
self, uid: Tuple[str, str]
) -> Tuple[trio.Event, Channel]:
2018-07-14 20:09:05 +00:00
"""Wait for a connection back from a spawned actor with a given
``uid``.
"""
log.debug(f"Waiting for peer {uid} to connect")
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.debug(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1]
def load_modules(self) -> None:
"""Load allowed RPC modules locally (after fork).
Since this actor may be spawned on a different machine from
the original nursery we need to try and load the local module
code (if it exists).
"""
2018-07-14 20:09:05 +00:00
for path in self.rpc_module_paths:
log.debug(f"Attempting to import {path}")
2018-07-14 20:09:05 +00:00
self._mods[path] = importlib.import_module(path)
def _get_rpc_func(self, ns, funcname):
try:
return getattr(self._mods[ns], funcname)
except KeyError as err:
raise ModuleNotExposed(*err.args)
2018-07-14 20:09:05 +00:00
async def _stream_handler(
self,
stream: trio.SocketStream,
) -> None:
2018-08-04 21:59:10 +00:00
"""Entry point for new inbound connections to the channel server.
2018-07-14 20:09:05 +00:00
"""
self._no_more_peers.clear()
chan = Channel(stream=stream)
log.info(f"New connection to us {chan}")
# send/receive initial handshake response
try:
uid = await _do_handshake(self, chan)
except StopAsyncIteration:
2018-09-10 19:19:49 +00:00
log.warning(f"Channel {chan} failed to handshake")
2018-07-14 20:09:05 +00:00
return
# channel tracking
event = self._peer_connected.pop(uid, None)
if event:
# Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via
# async-rpc calls.
log.debug(f"Waking channel waiters {event.statistics()}")
# Alert any task waiting on this connection to come up
event.set()
chans = self._peers[uid]
if chans:
2018-09-10 19:19:49 +00:00
log.warning(
2018-07-14 20:09:05 +00:00
f"already have channel(s) for {uid}:{chans}?"
)
log.trace(f"Registered {chan} for {uid}") # type: ignore
2018-07-14 20:09:05 +00:00
# append new channel
self._peers[uid].append(chan)
# Begin channel management - respond to remote requests and
# process received reponses.
try:
await self._process_messages(chan)
finally:
# Drop ref to channel so it can be gc-ed and disconnected
log.debug(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid)
chans.remove(chan)
if not chans:
log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None)
log.debug(f"Peers is {self._peers}")
if not self._peers: # no more channels connected
self._no_more_peers.set()
log.debug(f"Signalling no more peer channels")
2018-08-01 19:15:18 +00:00
# # XXX: is this necessary (GC should do it?)
2018-07-14 20:09:05 +00:00
if chan.connected():
log.debug(f"Disconnecting channel {chan}")
try:
# send our msg loop terminate sentinel
await chan.send(None)
# await chan.aclose()
except trio.BrokenResourceError:
log.exception(
f"Channel for {chan.uid} was already zonked..")
2018-07-14 20:09:05 +00:00
2019-02-15 21:23:58 +00:00
async def _push_result(
self,
actorid: Tuple[str, str],
msg: Dict[str, Any],
) -> None:
"""Push an RPC result to the local consumer's queue.
"""
2018-07-14 20:09:05 +00:00
assert actorid, f"`actorid` can't be {actorid}"
2019-02-15 21:23:58 +00:00
cid = msg['cid']
send_chan = self._cids2qs[(actorid, cid)]
assert send_chan.cid == cid
if 'stop' in msg:
log.debug(f"{send_chan} was terminated at remote end")
return await send_chan.aclose()
2019-02-15 21:23:58 +00:00
try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
2019-02-15 21:23:58 +00:00
# maintain backpressure
await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed")
def get_memchans(
2018-08-31 21:16:24 +00:00
self,
actorid: Tuple[str, str],
cid: str
2019-02-15 21:23:58 +00:00
) -> trio.abc.ReceiveChannel:
2018-07-14 20:09:05 +00:00
log.debug(f"Getting result queue for {actorid} cid {cid}")
2019-02-15 21:23:58 +00:00
try:
recv_chan = self._cids2qs[(actorid, cid)]
except KeyError:
send_chan, recv_chan = trio.open_memory_channel(1000)
send_chan.cid = cid
self._cids2qs[(actorid, cid)] = send_chan
return recv_chan
2018-07-14 20:09:05 +00:00
async def send_cmd(
2019-02-15 21:23:58 +00:00
self,
chan: Channel,
ns: str,
func: str,
kwargs: dict
) -> Tuple[str, trio.abc.ReceiveChannel]:
2018-07-14 20:09:05 +00:00
"""Send a ``'cmd'`` message to a remote actor and return a
caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop.
"""
cid = str(uuid.uuid1())
2018-08-31 21:16:24 +00:00
assert chan.uid
2019-02-15 21:23:58 +00:00
recv_chan = self.get_memchans(chan.uid, cid)
2018-07-14 20:09:05 +00:00
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
2019-02-15 21:23:58 +00:00
return cid, recv_chan
2018-07-14 20:09:05 +00:00
async def _process_messages(
self, chan: Channel,
treat_as_gen: bool = False,
shield: bool = False,
task_status=trio.TASK_STATUS_IGNORED,
) -> None:
"""Process messages for the channel async-RPC style.
2018-07-14 20:09:05 +00:00
Receive multiplexed RPC requests and deliver responses over ``chan``.
2018-07-14 20:09:05 +00:00
"""
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
msg = None
2018-07-14 20:09:05 +00:00
log.debug(f"Entering msg loop for {chan} from {chan.uid}")
2018-08-01 19:15:18 +00:00
try:
# internal scope allows for keeping this message
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
# a locally spawned task)
with trio.open_cancel_scope(shield=shield) as cs:
task_status.started(cs)
async for msg in chan:
if msg is None: # loop terminate sentinel
log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}")
for (channel, cid) in self._rpc_tasks:
if channel is chan:
self.cancel_task(cid, Context(channel, cid))
log.debug(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break
2019-02-15 21:23:58 +00:00
log.trace(f"Received msg {msg} from {chan.uid}")
if msg.get('cid'):
# deliver response to local caller/waiter
2019-02-15 21:23:58 +00:00
await self._push_result(chan.uid, msg)
log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
2018-08-01 19:15:18 +00:00
# process command request
try:
ns, funcname, kwargs, actorid, cid = msg['cmd']
except KeyError:
# This is the non-rpc error case, that is, an
# error **not** raised inside a call to ``_invoke()``
# (i.e. no cid was provided in the msg - see above).
# Push this error to all local channel consumers
# (normally portals) by marking the channel as errored
assert chan.uid
exc = unpack_error(msg, chan=chan)
chan._exc = exc
raise exc
2018-08-01 19:15:18 +00:00
log.debug(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(self, funcname)
else:
# complain to client about restricted modules
try:
func = self._get_rpc_func(ns, funcname)
except (ModuleNotExposed, AttributeError) as err:
err_msg = pack_error(err)
err_msg['cid'] = cid
await chan.send(err_msg)
continue
# spin up a task for the requested function
log.debug(f"Spawning task for {func}")
cs = await self._root_nursery.start(
_invoke, self, cid, chan, func, kwargs,
name=funcname
)
# never allow cancelling cancel requests (results in
# deadlock and other weird behaviour)
if func != self.cancel:
if isinstance(cs, Exception):
log.warn(f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
self._no_more_rpc_tasks.clear()
log.info(f"RPC func is {func}")
# store cancel scope such that the rpc task can be
# cancelled gracefully if requested
self._rpc_tasks[(chan, cid)] = (
cs, func, trio.Event())
log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
else:
# channel disconnect
log.debug(f"{chan} from {chan.uid} disconnected")
2018-08-01 19:15:18 +00:00
except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke")
except Exception as err:
# ship any "internal" exception (i.e. one from internal machinery
# not from an rpc task) to parent
log.exception("Actor errored:")
if self._parent_chan:
await self._parent_chan.send(pack_error(err))
raise
# if this is the `MainProcess` we expect the error broadcasting
# above to trigger an error at consuming portal "checkpoints"
except trio.Cancelled:
# debugging only
log.debug("Msg loop was cancelled")
raise
2018-08-01 19:15:18 +00:00
finally:
log.debug(
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
2018-07-14 20:09:05 +00:00
def _fork_main(
2018-08-31 21:16:24 +00:00
self,
accept_addr: Tuple[str, int],
forkserver_info: Tuple[Any, Any, Any, Any, Any],
parent_addr: Tuple[str, int] = None
) -> None:
2018-11-09 06:40:12 +00:00
"""The routine called *after fork* which invokes a fresh ``trio.run``
"""
self._forkserver_info = forkserver_info
2018-07-14 20:09:05 +00:00
from ._trionics import ctx
if self.loglevel is not None:
2019-01-16 22:09:30 +00:00
log.info(
f"Setting loglevel for {self.uid} to {self.loglevel}")
2018-07-14 20:09:05 +00:00
get_console_log(self.loglevel)
log.info(
2018-08-01 19:15:18 +00:00
f"Started new {ctx.current_process()} for {self.uid}")
2018-07-14 20:09:05 +00:00
_state._current_actor = self
log.debug(f"parent_addr is {parent_addr}")
try:
trio.run(partial(
self._async_main, accept_addr, parent_addr=parent_addr))
except KeyboardInterrupt:
pass # handle it the same way trio does?
2018-08-01 19:15:18 +00:00
log.info(f"Actor {self.uid} terminated")
2018-07-14 20:09:05 +00:00
async def _async_main(
self,
2018-08-31 21:16:24 +00:00
accept_addr: Tuple[str, int],
arbiter_addr: Optional[Tuple[str, int]] = None,
parent_addr: Optional[Tuple[str, int]] = None,
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None:
2018-07-14 20:09:05 +00:00
"""Start the channel server, maybe connect back to the parent, and
start the main task.
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
"""
arbiter_addr = arbiter_addr or self._arb_addr
registered_with_arbiter = False
try:
async with trio.open_nursery() as nursery:
2018-07-14 20:09:05 +00:00
self._root_nursery = nursery
# Startup up channel server
host, port = accept_addr
await nursery.start(partial(
self._serve_forever, accept_host=host, accept_port=port)
)
if parent_addr is not None:
try:
# Connect back to the parent actor and conduct initial
2018-11-09 06:40:12 +00:00
# handshake (From this point on if we error, ship the
2018-07-14 20:09:05 +00:00
# exception back to the parent actor)
chan = self._parent_chan = Channel(
destaddr=parent_addr,
)
await chan.connect()
# initial handshake, report who we are, who they are
await _do_handshake(self, chan)
except OSError: # failed to connect
2018-09-10 19:19:49 +00:00
log.warning(
2018-07-14 20:09:05 +00:00
f"Failed to connect to parent @ {parent_addr},"
" closing server")
2018-08-01 19:15:18 +00:00
await self.cancel()
2018-07-14 20:09:05 +00:00
self._parent_chan = None
# handle new connection back to parent
nursery.start_soon(
self._process_messages, self._parent_chan)
# load exposed/allowed RPC modules
# XXX: do this **after** establishing connection to parent
# so that import errors are properly propagated upwards
self.load_modules()
2018-07-14 20:09:05 +00:00
# register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
uid=self.uid, sockaddr=self.accept_addr)
2018-07-14 20:09:05 +00:00
registered_with_arbiter = True
2018-08-01 19:15:18 +00:00
task_status.started()
2018-07-14 20:09:05 +00:00
log.debug("Waiting on root nursery to complete")
2018-11-09 06:40:12 +00:00
# blocks here as expected until the channel server is
# killed (i.e. this actor is cancelled or signalled by the parent)
except Exception as err:
if not registered_with_arbiter:
log.exception(
f"Actor errored and failed to register with arbiter "
f"@ {arbiter_addr}")
2018-07-14 20:09:05 +00:00
if self._parent_chan:
try:
# internal error so ship to parent without cid
await self._parent_chan.send(pack_error(err))
2018-08-04 21:59:10 +00:00
except trio.ClosedResourceError:
2018-07-14 20:09:05 +00:00
log.error(
f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed")
log.exception("Actor errored:")
else:
# XXX wait, why?
# causes a hang if I always raise..
2018-07-14 20:09:05 +00:00
raise
2018-07-14 20:09:05 +00:00
finally:
if registered_with_arbiter:
await self._do_unreg(arbiter_addr)
2018-07-14 20:09:05 +00:00
# terminate actor once all it's peers (actors that connected
# to it as clients) have disappeared
if not self._no_more_peers.is_set():
2018-08-01 19:15:18 +00:00
if any(
chan.connected() for chan in chain(*self._peers.values())
):
log.debug(
f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait()
2018-07-14 20:09:05 +00:00
log.debug(f"All peer channels are complete")
# tear down channel server no matter what since we errored
# or completed
self.cancel_server()
async def _serve_forever(
self,
*,
# (host, port) to bind for channel server
2018-08-31 21:16:24 +00:00
accept_host: Tuple[str, int] = None,
accept_port: int = 0,
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None:
2018-07-14 20:09:05 +00:00
"""Start the channel server, begin listening for new connections.
This will cause an actor to continue living (blocking) until
``cancel_server()`` is called.
"""
async with trio.open_nursery() as nursery:
self._server_nursery = nursery
# TODO: might want to consider having a separate nursery
# for the stream handler such that the server can be cancelled
# whilst leaving existing channels up
listeners = await nursery.start(
partial(
trio.serve_tcp,
self._stream_handler,
# new connections will stay alive even if this server
# is cancelled
handler_nursery=self._root_nursery,
port=accept_port, host=accept_host,
)
)
log.debug(
f"Started tcp server(s) on {[l.socket for l in listeners]}")
self._listeners.extend(listeners)
task_status.started()
2018-08-31 21:16:24 +00:00
async def _do_unreg(self, arbiter_addr: Optional[Tuple[str, int]]) -> None:
2018-07-14 20:09:05 +00:00
# UNregister actor from the arbiter
try:
if arbiter_addr is not None:
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'unregister_actor', uid=self.uid)
2018-07-14 20:09:05 +00:00
except OSError:
2018-09-10 19:19:49 +00:00
log.warning(f"Unable to unregister {self.name} from arbiter")
2018-07-14 20:09:05 +00:00
async def cancel(self) -> None:
2018-08-01 19:15:18 +00:00
"""Cancel this actor.
The sequence in order is:
- cancelling all rpc tasks
- cancelling the channel server
- cancel the "root" nursery
2018-07-14 20:09:05 +00:00
"""
2018-08-01 19:15:18 +00:00
# cancel all ongoing rpc tasks
await self.cancel_rpc_tasks()
2018-07-14 20:09:05 +00:00
self.cancel_server()
self._root_nursery.cancel_scope.cancel()
async def cancel_task(self, cid, ctx):
"""Cancel a local task.
Note this method will be treated as a streaming funciton
by remote actor-callers due to the declaration of ``ctx``
in the signature (for now).
"""
# right now this is only implicitly called by
# streaming IPC but it should be called
# to cancel any remotely spawned task
chan = ctx.chan
try:
# this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from this channel
scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)]
except KeyError:
log.warning(f"{cid} has already completed/terminated?")
return
log.debug(
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
# don't allow cancelling this function mid-execution
# (is this necessary?)
if func is self.cancel_task:
return
scope.cancel()
# wait for _invoke to mark the task complete
await is_complete.wait()
log.debug(
f"Sucessfully cancelled task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")
async def cancel_rpc_tasks(self) -> None:
2018-08-01 19:15:18 +00:00
"""Cancel all existing RPC responder tasks using the cancel scope
registered for each.
"""
2018-08-31 21:16:24 +00:00
tasks = self._rpc_tasks
log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks} ")
for (chan, cid) in tasks.copy():
# TODO: this should really done in a nursery batch
await self.cancel_task(cid, Context(chan, cid))
# if tasks:
log.info(
f"Waiting for remaining rpc tasks to complete {tasks}")
await self._no_more_rpc_tasks.wait()
2018-08-01 19:15:18 +00:00
def cancel_server(self) -> None:
2018-07-14 20:09:05 +00:00
"""Cancel the internal channel server nursery thereby
preventing any new inbound connections from being established.
"""
2018-08-01 19:15:18 +00:00
log.debug("Shutting down channel server")
2018-07-14 20:09:05 +00:00
self._server_nursery.cancel_scope.cancel()
@property
2018-08-31 21:16:24 +00:00
def accept_addr(self) -> Optional[Tuple[str, int]]:
2018-07-14 20:09:05 +00:00
"""Primary address to which the channel server is bound.
"""
try:
return self._listeners[0].socket.getsockname()
except OSError:
2018-08-31 21:16:24 +00:00
return None
2018-07-14 20:09:05 +00:00
def get_parent(self) -> Portal:
"""Return a portal to our parent actor."""
2018-08-31 21:16:24 +00:00
assert self._parent_chan, "No parent channel for this actor?"
2018-07-14 20:09:05 +00:00
return Portal(self._parent_chan)
2018-08-31 21:16:24 +00:00
def get_chans(self, uid: Tuple[str, str]) -> List[Channel]:
"""Return all channels to the actor with provided uid."""
return self._peers[uid]
2018-07-14 20:09:05 +00:00
class Arbiter(Actor):
"""A special actor who knows all the other actors and always has
access to a top level nursery.
2018-07-14 20:09:05 +00:00
The arbiter is by default the first actor spawned on each host
and is responsible for keeping track of all other actors for
coordination purposes. If a new main process is launched and an
arbiter is already running that arbiter will be used.
"""
is_arbiter = True
def __init__(self, *args, **kwargs):
self._registry = defaultdict(list)
self._waiters = {}
super().__init__(*args, **kwargs)
2018-08-31 21:16:24 +00:00
def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
for uid, sockaddr in self._registry.items():
if name in uid:
return sockaddr
2018-08-31 21:16:24 +00:00
return None
2018-08-26 17:12:29 +00:00
async def wait_for_actor(
self, name: str
2018-08-31 21:16:24 +00:00
) -> List[Tuple[str, int]]:
"""Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently
registered.
"""
sockaddrs = []
for (aname, _), sockaddr in self._registry.items():
if name == aname:
sockaddrs.append(sockaddr)
if not sockaddrs:
waiter = trio.Event()
self._waiters.setdefault(name, []).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
sockaddrs.append(self._registry[uid])
return sockaddrs
2018-07-14 20:09:05 +00:00
2018-08-26 17:12:29 +00:00
def register_actor(
2018-08-31 21:16:24 +00:00
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
2018-08-26 17:12:29 +00:00
) -> None:
name, uuid = uid
self._registry[uid] = sockaddr
# pop and signal all waiter events
events = self._waiters.pop(name, ())
self._waiters.setdefault(name, []).append(uid)
for event in events:
if isinstance(event, trio.Event):
event.set()
2018-07-14 20:09:05 +00:00
2018-08-31 21:16:24 +00:00
def unregister_actor(self, uid: Tuple[str, str]) -> None:
self._registry.pop(uid, None)
2018-07-14 20:09:05 +00:00
async def _start_actor(
actor: Actor,
2018-08-31 21:16:24 +00:00
main: typing.Callable[..., typing.Awaitable],
host: str,
port: int,
2018-08-31 21:16:24 +00:00
arbiter_addr: Tuple[str, int],
nursery: trio._core._run.Nursery = None
):
"""Spawn a local actor by starting a task to execute it's main async
function.
2018-07-14 20:09:05 +00:00
Blocks if no nursery is provided, in which case it is expected the nursery
provider is responsible for waiting on the task to complete.
"""
# assign process-local actor
_state._current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
log.info(f"Starting local {actor} @ {host}:{port}")
2018-08-01 19:15:18 +00:00
async with trio.open_nursery() as nursery:
await nursery.start(
partial(
actor._async_main,
accept_addr=(host, port),
parent_addr=None,
arbiter_addr=arbiter_addr,
)
)
result = await main()
# XXX: the actor is cancelled when this context is complete
# given that there are no more active peer channels connected
actor.cancel_server()
2018-07-14 20:09:05 +00:00
# unset module state
_state._current_actor = None
log.info("Completed async main")
return result
@asynccontextmanager
2018-08-31 21:16:24 +00:00
async def get_arbiter(
host: str, port: int
) -> typing.AsyncGenerator[Union[Portal, LocalPortal], None]:
2018-07-14 20:09:05 +00:00
"""Return a portal instance connected to a local or remote
arbiter.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_arbiter:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal:
yield arb_portal
@asynccontextmanager
2018-08-26 17:12:29 +00:00
async def find_actor(
2018-08-31 21:16:24 +00:00
name: str, arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Optional[Portal], None]:
2018-07-14 20:09:05 +00:00
"""Ask the arbiter to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddr = await arb_portal.run('self', 'find_actor', name=name)
2018-07-14 20:09:05 +00:00
# TODO: return portals to all available actors - for now just
# the last one that registered
if name == 'arbiter' and actor.is_arbiter:
raise RuntimeError("The current actor is the arbiter")
elif sockaddr:
2018-07-14 20:09:05 +00:00
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield None
@asynccontextmanager
async def wait_for_actor(
name: str,
2018-08-31 21:16:24 +00:00
arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Portal, None]:
"""Wait on an actor to register with the arbiter.
A portal to the first registered actor is returned.
"""
actor = current_actor()
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'wait_for_actor', name=name)
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal