Add a working arbiter registry system

Every actor now registers (and unregisters) with the arbiter at
startup/teardown. For now the registry is stored in a plain `dict` in
the arbiter's memory. This makes it possible to easily coordinate actors
started as plain Python processes or via `multiprocessing`.

A whole smörgåsbord of changes was required to accomplish this:
- factor handshake steps into a func
- track *every* channel connected to an actor including multiples to the
  same remote peer (may want to optimize this later)
- handle `trio.ClosedStreamError` gracefully in the message loop
- add an `open_portal` asynccontextmanager which handles channel
  creation, handshaking, and spawning a bg task for msg processing
- add a `start_actor()` for starting in-process actors directly
- add working `get_arbiter()` and `find_actor()` public routines
- `_main` now tries an anonymous channel connect to the stated
  arbiter sockaddr and uses that to determine whether to crown itself
asyncgen_closing_fix
Tyler Goodlet 2018-07-04 12:51:04 -04:00
parent bf08310224
commit 56d3f6cffb
1 changed files with 293 additions and 191 deletions

View File

@ -21,6 +21,8 @@ log = get_logger('tractor')
# set at startup and after forks
_current_actor = None
_default_arbiter_host = '127.0.0.1'
_default_arbiter_port = 1616
class ActorFailure(Exception):
@ -103,12 +105,25 @@ async def result_from_q(q):
raise ValueError(f"{first_msg} is an invalid response packet?")
async def _do_handshake(actor, chan):
await chan.send(actor.uid)
uid = await chan.recv()
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid
class Actor:
"""The fundamental concurrency primitive.
An actor is the combination of a ``multiprocessing.Process``
executing a ``trio`` task tree, communicating with other actors
through "portals" which provide a native async API around "channels".
An *actor* is the combination of a regular Python or
``multiprocessing.Process`` executing a ``trio`` task tree, communicating
with other actors through "portals" which provide a native async API
around "channels".
"""
is_arbiter = False
@ -122,6 +137,7 @@ class Actor:
allow_rpc: bool = True,
outlive_main: bool = False,
):
self.name = name
self.uid = (name, uid or str(uuid.uuid1()))
self.rpc_module_paths = rpc_module_paths
self._mods = {}
@ -133,7 +149,7 @@ class Actor:
self._outlive_main = outlive_main
# filled in by `_async_main` after fork
self._peers = {}
self._peers = defaultdict(list)
self._no_more_peers = trio.Event()
self._no_more_peers.set()
self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
@ -148,7 +164,8 @@ class Actor:
log.debug(f"Waiting for peer {uid} to connect")
event = self._peers.setdefault(uid, trio.Event())
await event.wait()
return event, self._peers[uid]
log.debug(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1]
def load_namespaces(self):
# We load namespaces after fork since this actor may
@ -168,26 +185,33 @@ class Actor:
self._no_more_peers.clear()
chan = Channel(stream=stream)
log.info(f"New connection to us {chan}")
# send/receive initial handshake response
await chan.send(self.uid)
uid = await chan.recv()
chan.uid = uid
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
try:
uid = await _do_handshake(self, chan)
except StopAsyncIteration:
log.warn(f"Channel {chan} failed to handshake")
return
# channel tracking
event = self._peers.pop(uid, None)
chan.event = event
self._peers[uid] = chan
log.debug(f"Registered {chan} for {uid}")
# Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via
# async-rpc calls.
if event and getattr(event, 'set', None):
log.debug(f"Waking channel waiters {event.statistics()}")
event_or_chans = self._peers.pop(uid, None)
if isinstance(event_or_chans, trio.Event):
# Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via
# async-rpc calls.
log.debug(f"Waking channel waiters {event_or_chans.statistics()}")
# Alert any task waiting on this connection to come up
event.set()
event.clear() # now consumer can wait on this channel to close
event_or_chans.set()
event_or_chans.clear() # consumer can wait on channel to close
elif isinstance(event_or_chans, list):
log.warn(
f"already have channel(s) for {uid}:{event_or_chans}?"
)
# append new channel
self._peers[uid].extend(event_or_chans)
log.debug(f"Registered {chan} for {uid}")
self._peers[uid].append(chan)
# Begin channel management - respond to remote requests and
# process received reponses.
@ -197,23 +221,24 @@ class Actor:
# Drop ref to channel so it can be gc-ed and disconnected
if chan is not self._parent_chan:
log.debug(f"Releasing channel {chan}")
self._peers.pop(chan.uid, None)
chan.event.set() # signal teardown/disconnection
chans = self._peers.get(chan.uid)
chans.remove(chan)
if not chans:
log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None)
if not self._peers: # no more channels connected
self._no_more_peers.set()
log.debug(f"No more peer channels")
def _push_result(self, actorid, cid, msg):
assert actorid, f"`actorid` can't be {actorid}"
q = self.get_waitq(actorid, cid)
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
q.put_nowait(msg)
def get_waitq(self, actorid, cid):
if actorid not in self._actors2calls:
log.debug(f"Registering for results from {actorid}")
log.debug(f"Registering for callid {cid} queue results from {actorid}")
cids2qs = self._actors2calls.setdefault(actorid, {})
if cid not in cids2qs:
log.debug(f"Registering for result from call id {cid}")
return cids2qs.setdefault(cid, trio.Queue(1000))
async def send_cmd(self, chan, ns, func, kwargs):
@ -232,58 +257,66 @@ class Actor:
Process rpc requests and deliver retrieved responses from channels.
"""
log.debug(f"Entering async-rpc loop for {chan}")
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
log.debug(f"Entering msg loop for {chan}")
async with trio.open_nursery() as nursery:
async for msg in chan.aiter_recv():
if msg is None: # terminate sentinel
log.debug(f"Terminating msg loop for {chan}")
break
log.debug(f"Received msg {msg}")
cid = msg.get('cid')
if cid: # deliver response to local caller/waiter
self._push_result(chan.uid, cid, msg)
if 'error' in msg:
# TODO: need something better then this slop
raise RemoteActorError(msg['error'])
continue
else:
ns, funcname, kwargs, actorid, cid = msg['cmd']
try:
async for msg in chan.aiter_recv():
if msg is None: # terminate sentinel
log.debug(f"Terminating msg loop for {chan}")
break
log.debug(f"Received msg {msg}")
cid = msg.get('cid')
if cid: # deliver response to local caller/waiter
self._push_result(chan.uid, cid, msg)
if 'error' in msg:
# TODO: need something better then this slop
raise RemoteActorError(msg['error'])
log.debug(f"Waiting on next msg for {chan}")
continue
else:
ns, funcname, kwargs, actorid, cid = msg['cmd']
log.debug(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
# TODO: accept a sentinel which cancels this task tree?
if ns == 'self':
func = getattr(self, funcname)
else:
func = getattr(self._mods[ns], funcname)
log.debug(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(self, funcname)
else:
func = getattr(self._mods[ns], funcname)
# spin up a task for the requested function
sig = inspect.signature(func)
treat_as_gen = False
if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg"
kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `chan` kwarg in its
# signature will be treated as one.
treat_as_gen = True
# spin up a task for the requested function
sig = inspect.signature(func)
treat_as_gen = False
if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg"
kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `chan` kwarg in its
# signature will be treated as one.
treat_as_gen = True
nursery.start_soon(
_invoke, cid, chan, func, kwargs, treat_as_gen,
name=funcname
)
else: # channel disconnect
log.warn(f"Cancelling all tasks for {chan}")
nursery.cancel_scope.cancel()
log.debug(f"Spawning task for {func}")
nursery.start_soon(
_invoke, cid, chan, func, kwargs, treat_as_gen,
name=funcname
)
log.debug(f"Waiting on next msg for {chan}")
else: # channel disconnect
log.debug(f"{chan} disconnected")
except trio.ClosedStreamError:
log.error(f"{chan} broke")
log.debug(f"Cancelling all tasks for {chan}")
nursery.cancel_scope.cancel()
log.debug(f"Exiting msg loop for {chan}")
def _fork_main(self, accept_addr, parent_addr=None, loglevel='debug'):
def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):
# after fork routine which invokes a fresh ``trio.run``
log.info(
f"Started new {ctx.current_process()} for actor {self.uid}")
@ -299,7 +332,13 @@ class Actor:
pass # handle it the same way trio does?
log.debug(f"Actor {self.uid} terminated")
async def _async_main(self, accept_addr, parent_addr=None, nursery=None):
async def _async_main(
self,
accept_addr,
arbiter_addr=(_default_arbiter_host, _default_arbiter_port),
parent_addr=None,
nursery=None
):
"""Start the channel server and main task.
A "root-most" (or "top-level") nursery for this actor is opened here
@ -325,20 +364,23 @@ class Actor:
on_reconnect=self.main
)
await chan.connect()
# initial handshake, report who we are, who they are
await chan.send(self.uid)
uid = await chan.recv()
if uid in self._peers:
log.warn(
f"already have channel for {uid} registered?"
)
await _do_handshake(self, chan)
# handle new connection back to parent optionally
# begin responding to RPC
if self._allow_rpc:
self.load_namespaces()
nursery.start_soon(self._process_messages, chan)
# handle new connection back to parent optionally
# begin responding to RPC
if self._allow_rpc:
self.load_namespaces()
if self._parent_chan:
nursery.start_soon(
self._process_messages, self._parent_chan)
# register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr)
if self.main:
if self._parent_chan:
@ -374,6 +416,16 @@ class Actor:
{'error': traceback.format_exc(), 'cid': 'main'})
else:
raise
finally:
# UNregister actor from the arbiter
try:
if arbiter_addr is not None:
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr)
except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter")
return result
@ -389,7 +441,6 @@ class Actor:
listening for new messages.
"""
log.debug(f"Starting tcp server on {accept_host}:{accept_port}")
async with trio.open_nursery() as nursery:
self._server_nursery = nursery
# TODO: might want to consider having a separate nursery
@ -403,8 +454,9 @@ class Actor:
port=accept_port, host=accept_host,
)
)
log.debug(
f"Started tcp server(s) on {[l.socket for l in listeners]}")
self._listeners.extend(listeners)
log.debug(f"Spawned {listeners}")
task_status.started()
def cancel(self):
@ -431,8 +483,8 @@ class Actor:
def get_parent(self):
return Portal(self._parent_chan)
def get_chan(self, actorid):
return self._peers.get(actorid)
def get_chans(self, actorid):
return self._peers[actorid]
class Arbiter(Actor):
@ -453,6 +505,14 @@ class Arbiter(Actor):
def register_actor(self, name, sockaddr):
self._registry[name].append(sockaddr)
def unregister_actor(self, name, sockaddr):
sockaddrs = self._registry.get(name)
if sockaddrs:
try:
sockaddrs.remove(sockaddr)
except ValueError:
pass
class Portal:
"""A 'portal' to a(n) (remote) ``Actor``.
@ -463,28 +523,14 @@ class Portal:
Think of this like an native async IPC API.
"""
def __init__(self, channel, event=None):
def __init__(self, channel):
self.channel = channel
self._uid = channel.uid
self._event = event
async def __aenter__(self):
await self.channel.connect()
# do the handshake
await self.channel.send(_current_actor.uid)
self._uid = uid = await self.channel.recv()
_current_actor._peers[uid] = self.channel
return self
async def aclose(self):
log.debug(f"Closing {self}")
# XXX: won't work until https://github.com/python-trio/trio/pull/460
# gets in!
await self.channel.aclose()
if self._event:
# alert the _stream_handler task that we are done with the channel
# so it can terminate / move on
self._event.set()
async def __aexit__(self, etype, value, tb):
await self.aclose()
async def run(self, ns, func, **kwargs):
"""Submit a function to be scheduled and run by actor, return its
@ -512,7 +558,7 @@ class Portal:
except KeyError:
raise RemoteActorError(msg['error'])
except GeneratorExit:
log.warn(f"Cancelling async gen call {cid} to {chan.uid}")
log.debug(f"Cancelling async gen call {cid} to {chan.uid}")
return yield_from_q()
@ -522,6 +568,39 @@ class Portal:
raise ValueError(f"Unknown msg response type: {first_msg}")
@asynccontextmanager
async def open_portal(channel, nursery=None):
"""Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle rpc message processing.
"""
actor = current_actor()
assert actor
was_connected = False
async with maybe_open_nursery(nursery) as nursery:
if not channel.connected():
await channel.connect()
was_connected = True
if channel.uid is None:
await _do_handshake(actor, channel)
if not actor.get_chans(channel.uid):
# actor is not currently managing this channel
actor._peers[channel.uid].append(channel)
nursery.start_soon(actor._process_messages, channel)
yield Portal(channel)
# cancel background msg loop task
nursery.cancel_scope.cancel()
if was_connected:
actor._peers[channel.uid].remove(channel)
await channel.aclose()
class LocalPortal:
"""A 'portal' to a local ``Actor``.
@ -590,7 +669,7 @@ class ActorNursery:
main_q = self._actor.get_waitq(actor.uid, 'main')
self._children[(name, proc.pid)] = (actor, proc, main_q)
return Portal(chan, event=event)
return Portal(chan)
async def wait(self):
@ -623,9 +702,10 @@ class ActorNursery:
else:
# send cancel cmd - likely no response from subactor
actor = self._actor
chan = actor.get_chan(subactor.uid)
if chan:
await actor.send_cmd(chan, 'self', 'cancel', {})
chans = actor.get_chans(subactor.uid)
if chans:
for chan in chans:
await actor.send_cmd(chan, 'self', 'cancel', {})
else:
log.warn(
f"Channel for {subactor.uid} is already down?")
@ -642,8 +722,10 @@ class ActorNursery:
log.info(f"{actor.uid} main task completed with {msg}")
if not actor._outlive_main:
# trigger msg loop to break
log.info(f"Signalling msg loop exit for {actor.uid}")
await self._actor.get_chan(actor.uid).send(None)
chans = self._actor.get_chans(actor.uid)
for chan in chans:
log.info(f"Signalling msg loop exit for {actor.uid}")
await chan.send(None)
if etype is not None:
log.warn(f"{current_actor().uid} errored with {etype}, "
@ -655,6 +737,7 @@ class ActorNursery:
for subactor, proc, main_q in self._children.values():
nursery.start_soon(wait_for_actor, subactor, proc, main_q)
await self.wait()
log.debug(f"Nursery teardown complete")
@ -677,117 +760,136 @@ async def open_nursery(supervisor=None, loglevel='WARNING'):
yield nursery
async def serve_local_actor(actor, nursery=None, accept_addr=(None, 0)):
class NoArbiterFound(Exception):
"Couldn't find the arbiter?"
async def start_actor(actor, host, port, arbiter_addr, nursery=None):
"""Spawn a local actor by starting a task to execute it's main
async function.
Blocks if no nursery is provided, in which case it is expected the nursery
provider is responsible for waiting on the task to complete.
"""
# assign process-local actor
global _current_actor
_current_actor = actor
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
log.info(f"Starting local {actor} @ {host}:{port}")
await actor._async_main(
accept_addr=accept_addr,
accept_addr=(host, port),
parent_addr=None,
arbiter_addr=arbiter_addr,
nursery=nursery,
)
return actor
# XXX: If spawned locally, the actor is cancelled when this
# context is complete given that there are no more active
# peer channels connected to it.
if not actor._outlive_main:
actor.cancel_server()
class NoArbiterFound:
"Couldn't find the arbiter?"
# unset module state
_current_actor = None
log.info("Completed async main")
@asynccontextmanager
async def get_arbiter(host='127.0.0.1', port=1617, main=None, **kwargs):
async def _connect_chan(host, port):
"""Attempt to connect to an arbiter's channel server.
Return the channel on success or None on failure.
"""
chan = Channel((host, port))
await chan.connect()
yield chan
await chan.aclose()
@asynccontextmanager
async def get_arbiter(host, port):
"""Return a portal instance connected to a local or remote
arbiter.
"""
actor = current_actor()
if actor and not actor.is_arbiter:
try:
# If an arbiter is already running on this host connect to it
async with Portal(Channel((host, port))) as portal:
yield portal
except OSError as err:
raise NoArbiterFound(err)
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
if actor.is_arbiter:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
if actor and actor.is_arbiter:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(actor)
else:
arbiter = Arbiter('arbiter', main=main, **kwargs)
# assign process-local actor
global _current_actor
_current_actor = arbiter
# start the arbiter in process in a new task
async with trio.open_nursery() as nursery:
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
await serve_local_actor(
arbiter, nursery=nursery, accept_addr=(host, port))
yield LocalPortal(arbiter)
# XXX: If spawned locally, the arbiter is cancelled when this
# context is complete given that there are no more active
# peer channels connected to it.
if not arbiter._outlive_main:
arbiter.cancel_server()
async with _connect_chan(host, port) as chan:
async with open_portal(chan) as arb_portal:
yield arb_portal
@asynccontextmanager
async def find_actor(name):
async def find_actor(
name,
arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port)
):
"""Ask the arbiter to find actor(s) by name.
Returns a sequence of unconnected portals for each matching actor
known to the arbiter (client code is expected to connect the portals).
"""
async with get_arbiter() as portal:
sockaddrs = await portal.run('self', 'find_actor', name=name)
portals = []
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
async with get_arbiter(*arbiter_sockaddr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just
# the first one we find
if sockaddrs:
for sockaddr in sockaddrs:
portals.append(Portal(Channel(sockaddr)))
yield portals # XXX: these are "unconnected" portals
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield
async def _main(async_fn, args, kwargs, name):
async def _main(async_fn, args, kwargs, name, arbiter_addr):
"""Async entry point for ``tractor``.
"""
main = partial(async_fn, *args) if async_fn else None
arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host, _default_arbiter_port)
# make a temporary connection to see if an arbiter exists
arbiter_found = False
try:
async with _connect_chan(host, port):
arbiter_found = True
except OSError:
log.warn(f"No actor could be found @ {host}:{port}")
if arbiter_found: # we were able to connect to an arbiter
log.info(f"Arbiter seems to exist @ {host}:{port}")
# create a local actor and start up its main routine/task
actor = Actor(
name or 'anonymous',
main=main,
**kwargs
)
host, port = (_default_arbiter_host, 0)
else:
# start this local actor as the arbiter
actor = Arbiter(name or 'arbiter', main=main, **kwargs)
await start_actor(actor, host, port, arbiter_addr=arbiter_addr)
# Creates an internal nursery which shouldn't be cancelled even if
# the one opened below is (this is desirable because the arbitter should
# the one opened below is (this is desirable because the arbiter should
# stay up until a re-election process has taken place - which is not
# implemented yet FYI).
async with get_arbiter(
host=kwargs.pop('arbiter_host', '127.0.0.1'),
port=kwargs.pop('arbiter_port', 1617),
main=main,
**kwargs,
):
if not current_actor().is_arbiter:
# create a local actor and start it up its main routine
actor = Actor(
name or 'anonymous',
main=main, # main coroutine to be invoked
**kwargs
)
# this will block and yield control to the `trio` run loop
await serve_local_actor(
actor, accept_addr=kwargs.pop('accept_addr', (None, 0)))
log.info("Completed async main")
# TODO: when the local actor's main has completed we cancel?
# actor.cancel()
else:
# block waiting for the arbiter main task to complete
pass
# unset module state
global _current_actor
_current_actor = None
def run(async_fn, *args, arbiter_host=None, name='anonymous', **kwargs):
def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs):
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.
"""
return trio.run(_main, async_fn, args, kwargs, name)
return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr)