2018-06-07 04:26:49 +00:00
|
|
|
"""
|
|
|
|
tracor: An actor model micro-framework.
|
|
|
|
"""
|
2018-06-19 19:30:50 +00:00
|
|
|
from collections import defaultdict
|
2018-06-07 04:26:49 +00:00
|
|
|
from functools import partial
|
|
|
|
from typing import Coroutine
|
2018-06-19 19:30:50 +00:00
|
|
|
import importlib
|
|
|
|
import inspect
|
|
|
|
import multiprocessing as mp
|
|
|
|
import traceback
|
|
|
|
import uuid
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
import trio
|
|
|
|
from async_generator import asynccontextmanager
|
|
|
|
|
|
|
|
from .ipc import Channel
|
2018-06-12 19:17:48 +00:00
|
|
|
from .log import get_console_log, get_logger
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
ctx = mp.get_context("forkserver")
|
2018-06-12 19:17:48 +00:00
|
|
|
log = get_logger('tractor')
|
|
|
|
|
2018-06-19 19:30:50 +00:00
|
|
|
# set at startup and after forks
|
|
|
|
_current_actor = None
|
2018-07-04 16:51:04 +00:00
|
|
|
_default_arbiter_host = '127.0.0.1'
|
|
|
|
_default_arbiter_port = 1616
|
2018-06-19 19:30:50 +00:00
|
|
|
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
class ActorFailure(Exception):
|
|
|
|
"General actor failure"
|
|
|
|
|
|
|
|
|
2018-06-19 19:30:50 +00:00
|
|
|
class RemoteActorError(ActorFailure):
|
|
|
|
"Remote actor exception bundled locally"
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
|
2018-06-12 19:17:48 +00:00
|
|
|
@asynccontextmanager
|
|
|
|
async def maybe_open_nursery(nursery=None):
|
|
|
|
"""Create a new nursery if None provided.
|
|
|
|
|
|
|
|
Blocks on exit as expected if no input nursery is provided.
|
|
|
|
"""
|
|
|
|
if nursery is not None:
|
|
|
|
yield nursery
|
|
|
|
else:
|
|
|
|
async with trio.open_nursery() as nursery:
|
|
|
|
yield nursery
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
|
2018-06-19 19:30:50 +00:00
|
|
|
async def _invoke(
|
|
|
|
cid, chan, func, kwargs,
|
2018-06-23 03:30:55 +00:00
|
|
|
treat_as_gen=False, raise_errs=False,
|
|
|
|
task_status=trio.TASK_STATUS_IGNORED
|
|
|
|
):
|
2018-06-19 19:30:50 +00:00
|
|
|
"""Invoke local func and return results over provided channel.
|
|
|
|
"""
|
|
|
|
try:
|
2018-06-27 15:34:22 +00:00
|
|
|
is_async_partial = False
|
2018-06-19 19:30:50 +00:00
|
|
|
if isinstance(func, partial):
|
2018-06-27 15:34:22 +00:00
|
|
|
is_async_partial = inspect.iscoroutinefunction(func.func)
|
2018-06-19 19:30:50 +00:00
|
|
|
|
2018-06-27 15:34:22 +00:00
|
|
|
if not inspect.iscoroutinefunction(func) and not is_async_partial:
|
2018-06-19 19:30:50 +00:00
|
|
|
await chan.send({'return': func(**kwargs), 'cid': cid})
|
|
|
|
else:
|
|
|
|
coro = func(**kwargs)
|
|
|
|
|
|
|
|
if inspect.isasyncgen(coro):
|
|
|
|
async for item in coro:
|
|
|
|
# TODO: can we send values back in here?
|
2018-06-27 15:34:22 +00:00
|
|
|
# it's gonna require a `while True:` and
|
|
|
|
# some non-blocking way to retrieve new `asend()`
|
|
|
|
# values from the channel:
|
|
|
|
# to_send = await chan.recv_nowait()
|
2018-06-19 19:30:50 +00:00
|
|
|
# if to_send is not None:
|
2018-06-27 15:34:22 +00:00
|
|
|
# to_yield = await coro.asend(to_send)
|
2018-06-19 19:30:50 +00:00
|
|
|
await chan.send({'yield': item, 'cid': cid})
|
|
|
|
else:
|
|
|
|
if treat_as_gen:
|
|
|
|
# XXX: the async-func may spawn further tasks which push
|
|
|
|
# back values like an async-generator would but must
|
2018-06-23 03:30:55 +00:00
|
|
|
# manualy construct the response dict-packet-responses as
|
|
|
|
# above
|
2018-06-19 19:30:50 +00:00
|
|
|
await coro
|
|
|
|
else:
|
|
|
|
await chan.send({'return': await coro, 'cid': cid})
|
2018-06-23 03:30:55 +00:00
|
|
|
|
|
|
|
task_status.started()
|
2018-06-19 19:30:50 +00:00
|
|
|
except Exception:
|
|
|
|
if not raise_errs:
|
|
|
|
await chan.send({'error': traceback.format_exc(), 'cid': cid})
|
|
|
|
else:
|
|
|
|
raise
|
|
|
|
|
2018-06-23 03:30:55 +00:00
|
|
|
|
|
|
|
async def result_from_q(q):
|
2018-06-19 19:30:50 +00:00
|
|
|
"""Process a msg from a remote actor.
|
|
|
|
"""
|
|
|
|
first_msg = await q.get()
|
|
|
|
if 'return' in first_msg:
|
|
|
|
return 'return', first_msg, q
|
|
|
|
elif 'yield' in first_msg:
|
|
|
|
return 'yield', first_msg, q
|
|
|
|
elif 'error' in first_msg:
|
|
|
|
raise RemoteActorError(first_msg['error'])
|
|
|
|
else:
|
|
|
|
raise ValueError(f"{first_msg} is an invalid response packet?")
|
|
|
|
|
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
async def _do_handshake(actor, chan):
|
|
|
|
await chan.send(actor.uid)
|
|
|
|
uid = await chan.recv()
|
|
|
|
|
|
|
|
if not isinstance(uid, tuple):
|
|
|
|
raise ValueError(f"{uid} is not a valid uid?!")
|
|
|
|
|
|
|
|
chan.uid = uid
|
|
|
|
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
|
|
|
return uid
|
|
|
|
|
|
|
|
|
2018-06-07 04:26:49 +00:00
|
|
|
class Actor:
|
|
|
|
"""The fundamental concurrency primitive.
|
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
An *actor* is the combination of a regular Python or
|
|
|
|
``multiprocessing.Process`` executing a ``trio`` task tree, communicating
|
|
|
|
with other actors through "portals" which provide a native async API
|
|
|
|
around "channels".
|
2018-06-07 04:26:49 +00:00
|
|
|
"""
|
2018-06-12 19:17:48 +00:00
|
|
|
is_arbiter = False
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
name: str,
|
2018-06-19 19:30:50 +00:00
|
|
|
main: Coroutine = None,
|
|
|
|
rpc_module_paths: [str] = [],
|
|
|
|
statespace: dict = {},
|
2018-06-12 19:17:48 +00:00
|
|
|
uid: str = None,
|
|
|
|
allow_rpc: bool = True,
|
2018-06-23 03:30:55 +00:00
|
|
|
outlive_main: bool = False,
|
2018-06-07 04:26:49 +00:00
|
|
|
):
|
2018-07-04 16:51:04 +00:00
|
|
|
self.name = name
|
2018-06-12 19:17:48 +00:00
|
|
|
self.uid = (name, uid or str(uuid.uuid1()))
|
2018-06-19 19:30:50 +00:00
|
|
|
self.rpc_module_paths = rpc_module_paths
|
2018-06-07 04:26:49 +00:00
|
|
|
self._mods = {}
|
|
|
|
self.main = main
|
|
|
|
# TODO: consider making this a dynamically defined
|
|
|
|
# @dataclass once we get py3.7
|
|
|
|
self.statespace = statespace
|
2018-06-12 19:17:48 +00:00
|
|
|
self._allow_rpc = allow_rpc
|
2018-06-23 03:30:55 +00:00
|
|
|
self._outlive_main = outlive_main
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
# filled in by `_async_main` after fork
|
2018-07-04 16:51:04 +00:00
|
|
|
self._peers = defaultdict(list)
|
2018-06-23 03:30:55 +00:00
|
|
|
self._no_more_peers = trio.Event()
|
|
|
|
self._no_more_peers.set()
|
2018-06-19 19:30:50 +00:00
|
|
|
self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
|
2018-06-07 04:26:49 +00:00
|
|
|
self._listeners = []
|
|
|
|
self._parent_chan = None
|
|
|
|
self._accept_host = None
|
|
|
|
|
|
|
|
async def wait_for_peer(self, uid):
|
|
|
|
"""Wait for a connection back from a spawned actor with a given
|
|
|
|
``uid``.
|
|
|
|
"""
|
|
|
|
log.debug(f"Waiting for peer {uid} to connect")
|
|
|
|
event = self._peers.setdefault(uid, trio.Event())
|
|
|
|
await event.wait()
|
2018-07-04 16:51:04 +00:00
|
|
|
log.debug(f"{uid} successfully connected back to us")
|
|
|
|
return event, self._peers[uid][-1]
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
def load_namespaces(self):
|
|
|
|
# We load namespaces after fork since this actor may
|
|
|
|
# be spawned on a different machine from the original nursery
|
|
|
|
# and we need to try and load the local module code (if it
|
|
|
|
# exists)
|
2018-06-19 19:30:50 +00:00
|
|
|
for path in self.rpc_module_paths:
|
2018-06-07 04:26:49 +00:00
|
|
|
self._mods[path] = importlib.import_module(path)
|
|
|
|
|
|
|
|
async def _stream_handler(
|
|
|
|
self,
|
|
|
|
stream: trio.SocketStream,
|
|
|
|
):
|
2018-06-19 19:30:50 +00:00
|
|
|
"""
|
|
|
|
Entry point for new inbound connections to the channel server.
|
2018-06-07 04:26:49 +00:00
|
|
|
"""
|
2018-06-23 03:30:55 +00:00
|
|
|
self._no_more_peers.clear()
|
2018-06-07 04:26:49 +00:00
|
|
|
chan = Channel(stream=stream)
|
2018-06-23 03:30:55 +00:00
|
|
|
log.info(f"New connection to us {chan}")
|
2018-07-04 16:51:04 +00:00
|
|
|
|
2018-06-07 04:26:49 +00:00
|
|
|
# send/receive initial handshake response
|
2018-07-04 16:51:04 +00:00
|
|
|
try:
|
|
|
|
uid = await _do_handshake(self, chan)
|
|
|
|
except StopAsyncIteration:
|
|
|
|
log.warn(f"Channel {chan} failed to handshake")
|
|
|
|
return
|
2018-06-07 04:26:49 +00:00
|
|
|
|
2018-06-23 03:30:55 +00:00
|
|
|
# channel tracking
|
2018-07-04 16:51:04 +00:00
|
|
|
event_or_chans = self._peers.pop(uid, None)
|
|
|
|
if isinstance(event_or_chans, trio.Event):
|
|
|
|
# Instructing connection: this is likely a new channel to
|
|
|
|
# a recently spawned actor which we'd like to control via
|
|
|
|
# async-rpc calls.
|
|
|
|
log.debug(f"Waking channel waiters {event_or_chans.statistics()}")
|
2018-06-07 04:26:49 +00:00
|
|
|
# Alert any task waiting on this connection to come up
|
2018-07-04 16:51:04 +00:00
|
|
|
event_or_chans.set()
|
|
|
|
event_or_chans.clear() # consumer can wait on channel to close
|
|
|
|
elif isinstance(event_or_chans, list):
|
|
|
|
log.warn(
|
|
|
|
f"already have channel(s) for {uid}:{event_or_chans}?"
|
|
|
|
)
|
|
|
|
# append new channel
|
|
|
|
self._peers[uid].extend(event_or_chans)
|
|
|
|
|
|
|
|
log.debug(f"Registered {chan} for {uid}")
|
|
|
|
self._peers[uid].append(chan)
|
2018-06-12 19:17:48 +00:00
|
|
|
|
2018-06-19 19:30:50 +00:00
|
|
|
# Begin channel management - respond to remote requests and
|
|
|
|
# process received reponses.
|
|
|
|
try:
|
2018-06-07 04:26:49 +00:00
|
|
|
await self._process_messages(chan)
|
2018-06-19 19:30:50 +00:00
|
|
|
finally:
|
2018-06-23 03:30:55 +00:00
|
|
|
# Drop ref to channel so it can be gc-ed and disconnected
|
|
|
|
if chan is not self._parent_chan:
|
|
|
|
log.debug(f"Releasing channel {chan}")
|
2018-07-04 16:51:04 +00:00
|
|
|
chans = self._peers.get(chan.uid)
|
|
|
|
chans.remove(chan)
|
|
|
|
if not chans:
|
|
|
|
log.debug(f"No more channels for {chan.uid}")
|
|
|
|
self._peers.pop(chan.uid, None)
|
2018-06-23 03:30:55 +00:00
|
|
|
if not self._peers: # no more channels connected
|
|
|
|
self._no_more_peers.set()
|
|
|
|
log.debug(f"No more peer channels")
|
2018-06-19 19:30:50 +00:00
|
|
|
|
|
|
|
def _push_result(self, actorid, cid, msg):
|
2018-07-04 16:51:04 +00:00
|
|
|
assert actorid, f"`actorid` can't be {actorid}"
|
2018-06-19 19:30:50 +00:00
|
|
|
q = self.get_waitq(actorid, cid)
|
|
|
|
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
|
2018-07-05 19:27:02 +00:00
|
|
|
waiters = q.statistics().tasks_waiting_get
|
|
|
|
if not waiters:
|
|
|
|
log.warn(
|
|
|
|
f"No tasks are currently waiting for results from call {cid}?")
|
2018-06-19 19:30:50 +00:00
|
|
|
q.put_nowait(msg)
|
|
|
|
|
|
|
|
def get_waitq(self, actorid, cid):
|
2018-07-04 16:51:04 +00:00
|
|
|
log.debug(f"Registering for callid {cid} queue results from {actorid}")
|
2018-06-19 19:30:50 +00:00
|
|
|
cids2qs = self._actors2calls.setdefault(actorid, {})
|
|
|
|
return cids2qs.setdefault(cid, trio.Queue(1000))
|
|
|
|
|
2018-06-23 17:48:04 +00:00
|
|
|
async def send_cmd(self, chan, ns, func, kwargs):
|
|
|
|
"""Send a ``'cmd'`` message to a remote actor and return a
|
|
|
|
caller id and a ``trio.Queue`` that can be used to wait for
|
|
|
|
responses delivered by the local message processing loop.
|
2018-06-19 19:30:50 +00:00
|
|
|
"""
|
2018-06-23 03:30:55 +00:00
|
|
|
cid = str(uuid.uuid1())
|
2018-06-19 19:30:50 +00:00
|
|
|
q = self.get_waitq(chan.uid, cid)
|
2018-06-23 17:48:04 +00:00
|
|
|
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
|
2018-06-23 03:30:55 +00:00
|
|
|
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
|
2018-06-23 17:48:04 +00:00
|
|
|
return cid, q
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
async def _process_messages(self, chan, treat_as_gen=False):
|
2018-06-19 19:30:50 +00:00
|
|
|
"""Process messages async-RPC style.
|
|
|
|
|
|
|
|
Process rpc requests and deliver retrieved responses from channels.
|
2018-06-07 04:26:49 +00:00
|
|
|
"""
|
2018-07-04 16:51:04 +00:00
|
|
|
# TODO: once https://github.com/python-trio/trio/issues/467 gets
|
|
|
|
# worked out we'll likely want to use that!
|
|
|
|
log.debug(f"Entering msg loop for {chan}")
|
2018-06-19 19:30:50 +00:00
|
|
|
async with trio.open_nursery() as nursery:
|
2018-07-04 16:51:04 +00:00
|
|
|
try:
|
|
|
|
async for msg in chan.aiter_recv():
|
|
|
|
if msg is None: # terminate sentinel
|
2018-07-05 19:27:02 +00:00
|
|
|
log.debug(f"Cancelling all tasks for {chan}")
|
|
|
|
nursery.cancel_scope.cancel()
|
2018-07-04 16:51:04 +00:00
|
|
|
log.debug(f"Terminating msg loop for {chan}")
|
|
|
|
break
|
|
|
|
log.debug(f"Received msg {msg}")
|
|
|
|
cid = msg.get('cid')
|
|
|
|
if cid: # deliver response to local caller/waiter
|
|
|
|
self._push_result(chan.uid, cid, msg)
|
|
|
|
if 'error' in msg:
|
|
|
|
# TODO: need something better then this slop
|
|
|
|
raise RemoteActorError(msg['error'])
|
|
|
|
log.debug(f"Waiting on next msg for {chan}")
|
|
|
|
continue
|
|
|
|
else:
|
|
|
|
ns, funcname, kwargs, actorid, cid = msg['cmd']
|
2018-06-27 15:34:22 +00:00
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
log.debug(
|
|
|
|
f"Processing request from {actorid}\n"
|
|
|
|
f"{ns}.{funcname}({kwargs})")
|
|
|
|
if ns == 'self':
|
|
|
|
func = getattr(self, funcname)
|
|
|
|
else:
|
|
|
|
func = getattr(self._mods[ns], funcname)
|
|
|
|
|
|
|
|
# spin up a task for the requested function
|
|
|
|
sig = inspect.signature(func)
|
|
|
|
treat_as_gen = False
|
|
|
|
if 'chan' in sig.parameters:
|
|
|
|
assert 'cid' in sig.parameters, \
|
|
|
|
f"{func} must accept a `cid` (caller id) kwarg"
|
|
|
|
kwargs['chan'] = chan
|
|
|
|
kwargs['cid'] = cid
|
|
|
|
# TODO: eventually we want to be more stringent
|
|
|
|
# about what is considered a far-end async-generator.
|
|
|
|
# Right now both actual async gens and any async
|
|
|
|
# function which declares a `chan` kwarg in its
|
|
|
|
# signature will be treated as one.
|
|
|
|
treat_as_gen = True
|
|
|
|
|
|
|
|
log.debug(f"Spawning task for {func}")
|
|
|
|
nursery.start_soon(
|
|
|
|
_invoke, cid, chan, func, kwargs, treat_as_gen,
|
|
|
|
name=funcname
|
|
|
|
)
|
|
|
|
log.debug(f"Waiting on next msg for {chan}")
|
|
|
|
else: # channel disconnect
|
|
|
|
log.debug(f"{chan} disconnected")
|
|
|
|
except trio.ClosedStreamError:
|
|
|
|
log.error(f"{chan} broke")
|
|
|
|
|
2018-06-19 19:30:50 +00:00
|
|
|
log.debug(f"Exiting msg loop for {chan}")
|
2018-06-07 04:26:49 +00:00
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):
|
2018-06-12 19:17:48 +00:00
|
|
|
# after fork routine which invokes a fresh ``trio.run``
|
2018-06-07 04:26:49 +00:00
|
|
|
log.info(
|
|
|
|
f"Started new {ctx.current_process()} for actor {self.uid}")
|
|
|
|
global _current_actor
|
|
|
|
_current_actor = self
|
2018-06-25 21:41:30 +00:00
|
|
|
if loglevel:
|
|
|
|
get_console_log(loglevel)
|
2018-06-07 04:26:49 +00:00
|
|
|
log.debug(f"parent_addr is {parent_addr}")
|
2018-06-27 15:34:22 +00:00
|
|
|
try:
|
|
|
|
trio.run(partial(
|
|
|
|
self._async_main, accept_addr, parent_addr=parent_addr))
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
pass # handle it the same way trio does?
|
2018-06-07 04:26:49 +00:00
|
|
|
log.debug(f"Actor {self.uid} terminated")
|
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
async def _async_main(
|
|
|
|
self,
|
|
|
|
accept_addr,
|
|
|
|
arbiter_addr=(_default_arbiter_host, _default_arbiter_port),
|
|
|
|
parent_addr=None,
|
|
|
|
nursery=None
|
|
|
|
):
|
2018-06-12 19:17:48 +00:00
|
|
|
"""Start the channel server and main task.
|
|
|
|
|
|
|
|
A "root-most" (or "top-level") nursery for this actor is opened here
|
|
|
|
and when cancelled effectively cancels the actor.
|
|
|
|
"""
|
2018-06-19 19:30:50 +00:00
|
|
|
result = None
|
|
|
|
try:
|
|
|
|
async with maybe_open_nursery(nursery) as nursery:
|
|
|
|
self._root_nursery = nursery
|
|
|
|
|
2018-06-23 03:30:55 +00:00
|
|
|
# Startup up channel server
|
2018-06-19 19:30:50 +00:00
|
|
|
host, port = accept_addr
|
2018-06-23 17:48:04 +00:00
|
|
|
await nursery.start(partial(
|
2018-06-23 03:30:55 +00:00
|
|
|
self._serve_forever, accept_host=host, accept_port=port)
|
2018-06-19 19:30:50 +00:00
|
|
|
)
|
2018-06-12 19:17:48 +00:00
|
|
|
|
2018-06-19 19:30:50 +00:00
|
|
|
if parent_addr is not None:
|
|
|
|
# Connect back to the parent actor and conduct initial
|
|
|
|
# handshake (From this point on if we error ship the
|
|
|
|
# exception back to the parent actor)
|
|
|
|
chan = self._parent_chan = Channel(
|
|
|
|
destaddr=parent_addr,
|
|
|
|
on_reconnect=self.main
|
|
|
|
)
|
|
|
|
await chan.connect()
|
2018-06-23 03:30:55 +00:00
|
|
|
# initial handshake, report who we are, who they are
|
2018-07-04 16:51:04 +00:00
|
|
|
await _do_handshake(self, chan)
|
2018-06-19 19:30:50 +00:00
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
# handle new connection back to parent optionally
|
|
|
|
# begin responding to RPC
|
|
|
|
if self._allow_rpc:
|
|
|
|
self.load_namespaces()
|
|
|
|
if self._parent_chan:
|
|
|
|
nursery.start_soon(
|
|
|
|
self._process_messages, self._parent_chan)
|
|
|
|
|
|
|
|
# register with the arbiter if we're told its addr
|
|
|
|
log.debug(f"Registering {self} for role `{self.name}`")
|
|
|
|
async with get_arbiter(*arbiter_addr) as arb_portal:
|
|
|
|
await arb_portal.run(
|
|
|
|
'self', 'register_actor',
|
|
|
|
name=self.name, sockaddr=self.accept_addr)
|
2018-06-19 19:30:50 +00:00
|
|
|
|
|
|
|
if self.main:
|
|
|
|
if self._parent_chan:
|
2018-06-23 03:30:55 +00:00
|
|
|
log.debug(f"Starting main task `{self.main}`")
|
2018-06-19 19:30:50 +00:00
|
|
|
# start "main" routine in a task
|
2018-06-23 03:30:55 +00:00
|
|
|
await nursery.start(
|
2018-06-19 19:30:50 +00:00
|
|
|
_invoke, 'main', self._parent_chan, self.main, {},
|
|
|
|
False, True # treat_as_gen, raise_errs params
|
|
|
|
)
|
|
|
|
else:
|
|
|
|
# run directly
|
2018-06-23 03:30:55 +00:00
|
|
|
log.debug(f"Running `{self.main}` directly")
|
2018-06-19 19:30:50 +00:00
|
|
|
result = await self.main()
|
|
|
|
|
2018-06-23 03:30:55 +00:00
|
|
|
# terminate local in-proc once its main completes
|
|
|
|
log.debug(
|
|
|
|
f"Waiting for remaining peers {self._peers} to clear")
|
|
|
|
await self._no_more_peers.wait()
|
|
|
|
log.debug(f"All peer channels are complete")
|
|
|
|
|
|
|
|
# tear down channel server
|
|
|
|
if not self._outlive_main:
|
|
|
|
log.debug(f"Shutting down channel server")
|
2018-06-23 17:48:04 +00:00
|
|
|
self.cancel_server()
|
2018-06-23 03:30:55 +00:00
|
|
|
|
|
|
|
# blocks here as expected if no nursery was provided until
|
|
|
|
# the channel server is killed (i.e. this actor is
|
|
|
|
# cancelled or signalled by the parent actor)
|
2018-06-19 19:30:50 +00:00
|
|
|
except Exception:
|
|
|
|
if self._parent_chan:
|
|
|
|
log.exception("Actor errored:")
|
|
|
|
await self._parent_chan.send(
|
|
|
|
{'error': traceback.format_exc(), 'cid': 'main'})
|
|
|
|
else:
|
|
|
|
raise
|
2018-07-04 16:51:04 +00:00
|
|
|
finally:
|
|
|
|
# UNregister actor from the arbiter
|
|
|
|
try:
|
|
|
|
if arbiter_addr is not None:
|
|
|
|
async with get_arbiter(*arbiter_addr) as arb_portal:
|
|
|
|
await arb_portal.run(
|
|
|
|
'self', 'register_actor',
|
|
|
|
name=self.name, sockaddr=self.accept_addr)
|
|
|
|
except OSError:
|
|
|
|
log.warn(f"Unable to unregister {self.name} from arbiter")
|
2018-06-12 19:17:48 +00:00
|
|
|
|
2018-06-19 19:30:50 +00:00
|
|
|
return result
|
2018-06-12 19:17:48 +00:00
|
|
|
|
|
|
|
async def _serve_forever(
|
|
|
|
self,
|
|
|
|
*,
|
|
|
|
# (host, port) to bind for channel server
|
|
|
|
accept_host=None,
|
|
|
|
accept_port=0,
|
2018-06-07 04:26:49 +00:00
|
|
|
task_status=trio.TASK_STATUS_IGNORED
|
|
|
|
):
|
|
|
|
"""Main coroutine: connect back to the parent, spawn main task, begin
|
|
|
|
listening for new messages.
|
|
|
|
|
|
|
|
"""
|
2018-06-23 03:30:55 +00:00
|
|
|
async with trio.open_nursery() as nursery:
|
|
|
|
self._server_nursery = nursery
|
|
|
|
# TODO: might want to consider having a separate nursery
|
|
|
|
# for the stream handler such that the server can be cancelled
|
|
|
|
# whilst leaving existing channels up
|
|
|
|
listeners = await nursery.start(
|
|
|
|
partial(
|
|
|
|
trio.serve_tcp,
|
|
|
|
self._stream_handler,
|
|
|
|
handler_nursery=self._root_nursery,
|
|
|
|
port=accept_port, host=accept_host,
|
|
|
|
)
|
2018-06-12 19:17:48 +00:00
|
|
|
)
|
2018-07-04 16:51:04 +00:00
|
|
|
log.debug(
|
|
|
|
f"Started tcp server(s) on {[l.socket for l in listeners]}")
|
2018-06-23 03:30:55 +00:00
|
|
|
self._listeners.extend(listeners)
|
|
|
|
task_status.started()
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
def cancel(self):
|
|
|
|
"""This cancels the internal root-most nursery thereby gracefully
|
|
|
|
cancelling (for all intents and purposes) this actor.
|
|
|
|
"""
|
|
|
|
self._root_nursery.cancel_scope.cancel()
|
|
|
|
|
2018-06-23 17:48:04 +00:00
|
|
|
def cancel_server(self):
|
|
|
|
"""Cancel the internal channel server nursery thereby
|
|
|
|
preventing any new inbound connections from being established.
|
|
|
|
"""
|
|
|
|
self._server_nursery.cancel_scope.cancel()
|
|
|
|
|
2018-06-12 19:17:48 +00:00
|
|
|
@property
|
|
|
|
def accept_addr(self):
|
|
|
|
"""Primary address to which the channel server is bound.
|
|
|
|
"""
|
2018-06-27 15:34:22 +00:00
|
|
|
try:
|
|
|
|
return self._listeners[0].socket.getsockname()
|
|
|
|
except OSError:
|
|
|
|
return
|
2018-06-12 19:17:48 +00:00
|
|
|
|
|
|
|
def get_parent(self):
|
|
|
|
return Portal(self._parent_chan)
|
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
def get_chans(self, actorid):
|
|
|
|
return self._peers[actorid]
|
2018-06-23 17:48:04 +00:00
|
|
|
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
class Arbiter(Actor):
|
|
|
|
"""A special actor who knows all the other actors and always has
|
|
|
|
access to the top level nursery.
|
|
|
|
|
|
|
|
The arbiter is by default the first actor spawned on each host
|
|
|
|
and is responsible for keeping track of all other actors for
|
|
|
|
coordination purposes. If a new main process is launched and an
|
|
|
|
arbiter is already running that arbiter will be used.
|
|
|
|
"""
|
|
|
|
_registry = defaultdict(list)
|
2018-06-12 19:17:48 +00:00
|
|
|
is_arbiter = True
|
2018-06-07 04:26:49 +00:00
|
|
|
|
2018-06-12 19:17:48 +00:00
|
|
|
def find_actor(self, name):
|
2018-06-07 04:26:49 +00:00
|
|
|
return self._registry[name]
|
|
|
|
|
|
|
|
def register_actor(self, name, sockaddr):
|
|
|
|
self._registry[name].append(sockaddr)
|
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
def unregister_actor(self, name, sockaddr):
|
|
|
|
sockaddrs = self._registry.get(name)
|
|
|
|
if sockaddrs:
|
|
|
|
try:
|
|
|
|
sockaddrs.remove(sockaddr)
|
|
|
|
except ValueError:
|
|
|
|
pass
|
|
|
|
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
class Portal:
|
|
|
|
"""A 'portal' to a(n) (remote) ``Actor``.
|
|
|
|
|
|
|
|
Allows for invoking remote routines and receiving results through an
|
|
|
|
underlying ``tractor.Channel`` as though the remote (async)
|
2018-06-12 19:17:48 +00:00
|
|
|
function / generator was invoked locally.
|
|
|
|
|
|
|
|
Think of this like an native async IPC API.
|
2018-06-07 04:26:49 +00:00
|
|
|
"""
|
2018-07-04 16:51:04 +00:00
|
|
|
def __init__(self, channel):
|
2018-06-07 04:26:49 +00:00
|
|
|
self.channel = channel
|
|
|
|
|
|
|
|
async def aclose(self):
|
2018-07-04 16:51:04 +00:00
|
|
|
log.debug(f"Closing {self}")
|
|
|
|
# XXX: won't work until https://github.com/python-trio/trio/pull/460
|
|
|
|
# gets in!
|
2018-06-07 04:26:49 +00:00
|
|
|
await self.channel.aclose()
|
|
|
|
|
|
|
|
async def run(self, ns, func, **kwargs):
|
|
|
|
"""Submit a function to be scheduled and run by actor, return its
|
|
|
|
(stream of) result(s).
|
|
|
|
"""
|
|
|
|
# TODO: not this needs some serious work and thinking about how
|
|
|
|
# to make async-generators the fundamental IPC API over channels!
|
|
|
|
# (think `yield from`, `gen.send()`, and functional reactive stuff)
|
|
|
|
chan = self.channel
|
|
|
|
# ship a function call request to the remote actor
|
2018-06-19 19:30:50 +00:00
|
|
|
actor = current_actor()
|
|
|
|
|
2018-06-23 17:48:04 +00:00
|
|
|
cid, q = await actor.send_cmd(chan, ns, func, kwargs)
|
|
|
|
# wait on first response msg
|
|
|
|
resptype, first_msg, q = await result_from_q(q)
|
2018-06-19 19:30:50 +00:00
|
|
|
|
|
|
|
if resptype == 'yield':
|
|
|
|
|
|
|
|
async def yield_from_q():
|
2018-06-23 17:48:04 +00:00
|
|
|
yield first_msg['yield']
|
2018-06-27 15:34:22 +00:00
|
|
|
try:
|
|
|
|
async for msg in q:
|
|
|
|
try:
|
|
|
|
yield msg['yield']
|
|
|
|
except KeyError:
|
|
|
|
raise RemoteActorError(msg['error'])
|
|
|
|
except GeneratorExit:
|
2018-07-04 16:51:04 +00:00
|
|
|
log.debug(f"Cancelling async gen call {cid} to {chan.uid}")
|
2018-06-19 19:30:50 +00:00
|
|
|
|
|
|
|
return yield_from_q()
|
|
|
|
|
|
|
|
elif resptype == 'return':
|
|
|
|
return first_msg['return']
|
2018-06-07 04:26:49 +00:00
|
|
|
else:
|
2018-06-19 19:30:50 +00:00
|
|
|
raise ValueError(f"Unknown msg response type: {first_msg}")
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
@asynccontextmanager
|
|
|
|
async def open_portal(channel, nursery=None):
|
|
|
|
"""Open a ``Portal`` through the provided ``channel``.
|
|
|
|
|
|
|
|
Spawns a background task to handle rpc message processing.
|
|
|
|
"""
|
|
|
|
actor = current_actor()
|
|
|
|
assert actor
|
|
|
|
was_connected = False
|
|
|
|
|
|
|
|
async with maybe_open_nursery(nursery) as nursery:
|
|
|
|
|
|
|
|
if not channel.connected():
|
|
|
|
await channel.connect()
|
|
|
|
was_connected = True
|
|
|
|
|
|
|
|
if channel.uid is None:
|
|
|
|
await _do_handshake(actor, channel)
|
|
|
|
|
|
|
|
if not actor.get_chans(channel.uid):
|
|
|
|
# actor is not currently managing this channel
|
|
|
|
actor._peers[channel.uid].append(channel)
|
|
|
|
|
|
|
|
nursery.start_soon(actor._process_messages, channel)
|
|
|
|
yield Portal(channel)
|
|
|
|
|
|
|
|
# cancel background msg loop task
|
|
|
|
nursery.cancel_scope.cancel()
|
|
|
|
if was_connected:
|
|
|
|
actor._peers[channel.uid].remove(channel)
|
|
|
|
await channel.aclose()
|
|
|
|
|
|
|
|
|
2018-06-12 19:17:48 +00:00
|
|
|
class LocalPortal:
|
|
|
|
"""A 'portal' to a local ``Actor``.
|
|
|
|
|
|
|
|
A compatibility shim for normal portals but for invoking functions
|
|
|
|
using an in process actor instance.
|
|
|
|
"""
|
|
|
|
def __init__(self, actor):
|
|
|
|
self.actor = actor
|
|
|
|
|
|
|
|
async def run(self, ns, func, **kwargs):
|
|
|
|
"""Run a requested function locally and return it's result.
|
|
|
|
"""
|
|
|
|
obj = self.actor if ns == 'self' else importlib.import_module(ns)
|
|
|
|
func = getattr(obj, func)
|
|
|
|
return func(**kwargs)
|
|
|
|
|
|
|
|
|
2018-06-07 04:26:49 +00:00
|
|
|
class ActorNursery:
|
|
|
|
"""Spawn scoped subprocess actors.
|
|
|
|
"""
|
2018-06-19 19:30:50 +00:00
|
|
|
def __init__(self, actor, supervisor=None):
|
2018-06-07 04:26:49 +00:00
|
|
|
self.supervisor = supervisor
|
2018-06-19 19:30:50 +00:00
|
|
|
self._actor = actor
|
2018-06-07 04:26:49 +00:00
|
|
|
# We'll likely want some way to cancel all sub-actors eventually
|
|
|
|
# self.cancel_scope = cancel_scope
|
|
|
|
self._children = {}
|
|
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
async def start_actor(
|
2018-06-25 21:41:30 +00:00
|
|
|
self,
|
|
|
|
name: str,
|
|
|
|
main=None,
|
2018-06-12 19:17:48 +00:00
|
|
|
bind_addr=('127.0.0.1', 0),
|
2018-06-07 04:26:49 +00:00
|
|
|
statespace=None,
|
2018-06-19 19:30:50 +00:00
|
|
|
rpc_module_paths=None,
|
2018-06-23 03:30:55 +00:00
|
|
|
outlive_main=False, # sub-actors die when their main task completes
|
2018-06-25 21:41:30 +00:00
|
|
|
loglevel=None, # set console logging per subactor
|
2018-06-07 04:26:49 +00:00
|
|
|
):
|
|
|
|
actor = Actor(
|
|
|
|
name,
|
2018-06-19 19:30:50 +00:00
|
|
|
# modules allowed to invoked funcs from
|
|
|
|
rpc_module_paths=rpc_module_paths,
|
2018-06-07 04:26:49 +00:00
|
|
|
statespace=statespace, # global proc state vars
|
|
|
|
main=main, # main coroutine to be invoked
|
2018-06-23 03:30:55 +00:00
|
|
|
outlive_main=outlive_main,
|
2018-06-07 04:26:49 +00:00
|
|
|
)
|
2018-06-19 19:30:50 +00:00
|
|
|
parent_addr = self._actor.accept_addr
|
2018-06-27 15:34:22 +00:00
|
|
|
assert parent_addr
|
2018-06-07 04:26:49 +00:00
|
|
|
proc = ctx.Process(
|
|
|
|
target=actor._fork_main,
|
2018-06-25 21:41:30 +00:00
|
|
|
args=(bind_addr, parent_addr, loglevel),
|
2018-06-07 04:26:49 +00:00
|
|
|
daemon=True,
|
|
|
|
name=name,
|
|
|
|
)
|
|
|
|
proc.start()
|
2018-06-19 19:30:50 +00:00
|
|
|
if not proc.is_alive():
|
|
|
|
raise ActorFailure("Couldn't start sub-actor?")
|
|
|
|
|
2018-06-07 04:26:49 +00:00
|
|
|
# wait for actor to spawn and connect back to us
|
|
|
|
# channel should have handshake completed by the
|
|
|
|
# local actor by the time we get a ref to it
|
2018-06-19 19:30:50 +00:00
|
|
|
event, chan = await self._actor.wait_for_peer(actor.uid)
|
|
|
|
# channel is up, get queue which delivers result from main routine
|
|
|
|
main_q = self._actor.get_waitq(actor.uid, 'main')
|
|
|
|
self._children[(name, proc.pid)] = (actor, proc, main_q)
|
2018-06-07 04:26:49 +00:00
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
return Portal(chan)
|
2018-06-19 19:30:50 +00:00
|
|
|
|
|
|
|
async def wait(self):
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
async def wait_for_proc(proc):
|
|
|
|
# TODO: timeout block here?
|
|
|
|
if proc.is_alive():
|
|
|
|
await trio.hazmat.wait_readable(proc.sentinel)
|
|
|
|
# please god don't hang
|
|
|
|
proc.join()
|
|
|
|
log.debug(f"Joined {proc}")
|
|
|
|
|
|
|
|
# unblocks when all waiter tasks have completed
|
|
|
|
async with trio.open_nursery() as nursery:
|
2018-06-23 03:30:55 +00:00
|
|
|
for subactor, proc, main_q in self._children.values():
|
2018-06-19 19:30:50 +00:00
|
|
|
nursery.start_soon(wait_for_proc, proc)
|
|
|
|
|
2018-06-23 03:30:55 +00:00
|
|
|
async def cancel(self, hard_kill=False):
|
2018-06-23 17:48:04 +00:00
|
|
|
log.debug(f"Cancelling nursery")
|
2018-06-23 03:30:55 +00:00
|
|
|
for subactor, proc, main_q in self._children.values():
|
2018-06-19 19:30:50 +00:00
|
|
|
if proc is mp.current_process():
|
2018-06-23 03:30:55 +00:00
|
|
|
# XXX: does this even make sense?
|
2018-06-23 17:48:04 +00:00
|
|
|
await subactor.cancel()
|
2018-06-19 19:30:50 +00:00
|
|
|
else:
|
2018-06-23 03:30:55 +00:00
|
|
|
if hard_kill:
|
2018-06-23 17:48:04 +00:00
|
|
|
log.warn(f"Hard killing subactors {self._children}")
|
2018-06-23 03:30:55 +00:00
|
|
|
proc.terminate()
|
|
|
|
# send KeyBoardInterrupt (trio abort signal) to underlying
|
|
|
|
# sub-actors
|
|
|
|
# os.kill(proc.pid, signal.SIGINT)
|
|
|
|
else:
|
2018-06-23 17:48:04 +00:00
|
|
|
# send cancel cmd - likely no response from subactor
|
2018-06-23 03:30:55 +00:00
|
|
|
actor = self._actor
|
2018-07-04 16:51:04 +00:00
|
|
|
chans = actor.get_chans(subactor.uid)
|
|
|
|
if chans:
|
|
|
|
for chan in chans:
|
|
|
|
await actor.send_cmd(chan, 'self', 'cancel', {})
|
2018-06-25 21:41:30 +00:00
|
|
|
else:
|
2018-06-27 15:34:22 +00:00
|
|
|
log.warn(
|
|
|
|
f"Channel for {subactor.uid} is already down?")
|
2018-06-23 03:30:55 +00:00
|
|
|
log.debug(f"Waiting on all subactors to complete")
|
2018-06-19 19:30:50 +00:00
|
|
|
await self.wait()
|
2018-06-23 03:30:55 +00:00
|
|
|
log.debug(f"All subactors for {self} have terminated")
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
async def __aexit__(self, etype, value, tb):
|
2018-06-19 19:30:50 +00:00
|
|
|
"""Wait on all subactor's main routines to complete.
|
|
|
|
"""
|
2018-06-23 03:30:55 +00:00
|
|
|
async def wait_for_actor(actor, proc, q):
|
2018-06-23 17:48:04 +00:00
|
|
|
if proc.is_alive():
|
|
|
|
ret_type, msg, q = await result_from_q(q)
|
|
|
|
log.info(f"{actor.uid} main task completed with {msg}")
|
|
|
|
if not actor._outlive_main:
|
|
|
|
# trigger msg loop to break
|
2018-07-04 16:51:04 +00:00
|
|
|
chans = self._actor.get_chans(actor.uid)
|
|
|
|
for chan in chans:
|
|
|
|
log.info(f"Signalling msg loop exit for {actor.uid}")
|
|
|
|
await chan.send(None)
|
2018-06-23 17:48:04 +00:00
|
|
|
|
|
|
|
if etype is not None:
|
|
|
|
log.warn(f"{current_actor().uid} errored with {etype}, "
|
2018-06-27 15:34:22 +00:00
|
|
|
"cancelling actor nursery")
|
2018-06-23 17:48:04 +00:00
|
|
|
await self.cancel()
|
|
|
|
else:
|
|
|
|
log.debug(f"Waiting on subactors to complete")
|
|
|
|
async with trio.open_nursery() as nursery:
|
|
|
|
for subactor, proc, main_q in self._children.values():
|
|
|
|
nursery.start_soon(wait_for_actor, subactor, proc, main_q)
|
2018-06-23 03:30:55 +00:00
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
await self.wait()
|
2018-06-23 17:48:04 +00:00
|
|
|
log.debug(f"Nursery teardown complete")
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
|
2018-06-12 19:17:48 +00:00
|
|
|
def current_actor() -> Actor:
|
|
|
|
"""Get the process-local actor instance.
|
|
|
|
"""
|
|
|
|
return _current_actor
|
|
|
|
|
|
|
|
|
2018-06-07 04:26:49 +00:00
|
|
|
@asynccontextmanager
|
|
|
|
async def open_nursery(supervisor=None, loglevel='WARNING'):
|
|
|
|
"""Create and yield a new ``ActorNursery``.
|
|
|
|
"""
|
2018-06-12 19:17:48 +00:00
|
|
|
actor = current_actor()
|
|
|
|
if not actor:
|
|
|
|
raise RuntimeError("No actor instance has been defined yet?")
|
|
|
|
|
2018-06-07 04:26:49 +00:00
|
|
|
# TODO: figure out supervisors from erlang
|
2018-06-12 19:17:48 +00:00
|
|
|
async with ActorNursery(current_actor(), supervisor) as nursery:
|
2018-06-07 04:26:49 +00:00
|
|
|
yield nursery
|
|
|
|
|
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
class NoArbiterFound(Exception):
|
|
|
|
"Couldn't find the arbiter?"
|
|
|
|
|
|
|
|
|
|
|
|
async def start_actor(actor, host, port, arbiter_addr, nursery=None):
|
2018-06-12 19:17:48 +00:00
|
|
|
"""Spawn a local actor by starting a task to execute it's main
|
|
|
|
async function.
|
|
|
|
|
|
|
|
Blocks if no nursery is provided, in which case it is expected the nursery
|
|
|
|
provider is responsible for waiting on the task to complete.
|
|
|
|
"""
|
2018-07-04 16:51:04 +00:00
|
|
|
# assign process-local actor
|
|
|
|
global _current_actor
|
|
|
|
_current_actor = actor
|
|
|
|
|
|
|
|
# start local channel-server and fake the portal API
|
|
|
|
# NOTE: this won't block since we provide the nursery
|
|
|
|
log.info(f"Starting local {actor} @ {host}:{port}")
|
|
|
|
|
2018-06-12 19:17:48 +00:00
|
|
|
await actor._async_main(
|
2018-07-04 16:51:04 +00:00
|
|
|
accept_addr=(host, port),
|
2018-06-12 19:17:48 +00:00
|
|
|
parent_addr=None,
|
2018-07-04 16:51:04 +00:00
|
|
|
arbiter_addr=arbiter_addr,
|
2018-06-12 19:17:48 +00:00
|
|
|
nursery=nursery,
|
|
|
|
)
|
2018-07-04 16:51:04 +00:00
|
|
|
# XXX: If spawned locally, the actor is cancelled when this
|
|
|
|
# context is complete given that there are no more active
|
|
|
|
# peer channels connected to it.
|
|
|
|
if not actor._outlive_main:
|
|
|
|
actor.cancel_server()
|
2018-06-12 19:17:48 +00:00
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
# unset module state
|
|
|
|
_current_actor = None
|
|
|
|
log.info("Completed async main")
|
2018-06-12 19:17:48 +00:00
|
|
|
|
|
|
|
|
2018-06-07 04:26:49 +00:00
|
|
|
@asynccontextmanager
|
2018-07-04 16:51:04 +00:00
|
|
|
async def _connect_chan(host, port):
|
|
|
|
"""Attempt to connect to an arbiter's channel server.
|
|
|
|
Return the channel on success or None on failure.
|
|
|
|
"""
|
|
|
|
chan = Channel((host, port))
|
|
|
|
await chan.connect()
|
|
|
|
yield chan
|
|
|
|
await chan.aclose()
|
2018-06-07 04:26:49 +00:00
|
|
|
|
2018-06-12 19:17:48 +00:00
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
@asynccontextmanager
|
|
|
|
async def get_arbiter(host, port):
|
|
|
|
"""Return a portal instance connected to a local or remote
|
|
|
|
arbiter.
|
|
|
|
"""
|
|
|
|
actor = current_actor()
|
|
|
|
if not actor:
|
|
|
|
raise RuntimeError("No actor instance has been defined yet?")
|
2018-06-12 19:17:48 +00:00
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
if actor.is_arbiter:
|
|
|
|
# we're already the arbiter
|
|
|
|
# (likely a re-entrant call from the arbiter actor)
|
|
|
|
yield LocalPortal(actor)
|
|
|
|
else:
|
|
|
|
async with _connect_chan(host, port) as chan:
|
|
|
|
async with open_portal(chan) as arb_portal:
|
|
|
|
yield arb_portal
|
2018-06-07 04:26:49 +00:00
|
|
|
|
|
|
|
|
|
|
|
@asynccontextmanager
|
2018-07-04 16:51:04 +00:00
|
|
|
async def find_actor(
|
|
|
|
name,
|
|
|
|
arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port)
|
|
|
|
):
|
2018-06-12 19:17:48 +00:00
|
|
|
"""Ask the arbiter to find actor(s) by name.
|
|
|
|
|
|
|
|
Returns a sequence of unconnected portals for each matching actor
|
|
|
|
known to the arbiter (client code is expected to connect the portals).
|
|
|
|
"""
|
2018-07-04 16:51:04 +00:00
|
|
|
actor = current_actor()
|
|
|
|
if not actor:
|
|
|
|
raise RuntimeError("No actor instance has been defined yet?")
|
2018-06-07 04:26:49 +00:00
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
async with get_arbiter(*arbiter_sockaddr) as arb_portal:
|
|
|
|
sockaddrs = await arb_portal.run('self', 'find_actor', name=name)
|
|
|
|
# TODO: return portals to all available actors - for now just
|
|
|
|
# the first one we find
|
|
|
|
if sockaddrs:
|
|
|
|
sockaddr = sockaddrs[-1]
|
|
|
|
async with _connect_chan(*sockaddr) as chan:
|
|
|
|
async with open_portal(chan) as portal:
|
|
|
|
yield portal
|
|
|
|
else:
|
|
|
|
yield
|
2018-06-12 19:17:48 +00:00
|
|
|
|
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
async def _main(async_fn, args, kwargs, name, arbiter_addr):
|
|
|
|
"""Async entry point for ``tractor``.
|
|
|
|
"""
|
2018-06-25 21:41:30 +00:00
|
|
|
main = partial(async_fn, *args) if async_fn else None
|
2018-07-04 16:51:04 +00:00
|
|
|
arbiter_addr = (host, port) = arbiter_addr or (
|
|
|
|
_default_arbiter_host, _default_arbiter_port)
|
|
|
|
# make a temporary connection to see if an arbiter exists
|
|
|
|
arbiter_found = False
|
|
|
|
try:
|
|
|
|
async with _connect_chan(host, port):
|
|
|
|
arbiter_found = True
|
|
|
|
except OSError:
|
|
|
|
log.warn(f"No actor could be found @ {host}:{port}")
|
|
|
|
|
|
|
|
if arbiter_found: # we were able to connect to an arbiter
|
|
|
|
log.info(f"Arbiter seems to exist @ {host}:{port}")
|
|
|
|
# create a local actor and start up its main routine/task
|
|
|
|
actor = Actor(
|
|
|
|
name or 'anonymous',
|
|
|
|
main=main,
|
|
|
|
**kwargs
|
|
|
|
)
|
|
|
|
host, port = (_default_arbiter_host, 0)
|
|
|
|
else:
|
|
|
|
# start this local actor as the arbiter
|
|
|
|
actor = Arbiter(name or 'arbiter', main=main, **kwargs)
|
|
|
|
|
|
|
|
await start_actor(actor, host, port, arbiter_addr=arbiter_addr)
|
2018-06-12 19:17:48 +00:00
|
|
|
# Creates an internal nursery which shouldn't be cancelled even if
|
2018-07-04 16:51:04 +00:00
|
|
|
# the one opened below is (this is desirable because the arbiter should
|
2018-06-12 19:17:48 +00:00
|
|
|
# stay up until a re-election process has taken place - which is not
|
|
|
|
# implemented yet FYI).
|
2018-06-27 15:34:22 +00:00
|
|
|
|
2018-06-12 19:17:48 +00:00
|
|
|
|
2018-07-04 16:51:04 +00:00
|
|
|
def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs):
|
2018-06-12 19:17:48 +00:00
|
|
|
"""Run a trio-actor async function in process.
|
|
|
|
|
|
|
|
This is tractor's main entry and the start point for any async actor.
|
|
|
|
"""
|
2018-07-04 16:51:04 +00:00
|
|
|
return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)
|