Uhhh make everything better

asyncgen_closing_fix
Tyler Goodlet 2018-06-12 15:17:48 -04:00
parent 03c57ceece
commit f36bd0f188
1 changed files with 243 additions and 98 deletions

View File

@ -13,10 +13,13 @@ import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from .ipc import Channel from .ipc import Channel
from .log import get_console_log from .log import get_console_log, get_logger
ctx = mp.get_context("forkserver") ctx = mp.get_context("forkserver")
log = get_logger('tractor')
# for debugging
log = get_console_log('debug') log = get_console_log('debug')
@ -28,8 +31,17 @@ class ActorFailure(Exception):
_current_actor = None _current_actor = None
def current_actor(): @asynccontextmanager
return _current_actor async def maybe_open_nursery(nursery=None):
"""Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
"""
if nursery is not None:
yield nursery
else:
async with trio.open_nursery() as nursery:
yield nursery
class Actor: class Actor:
@ -39,23 +51,25 @@ class Actor:
executing a ``trio`` task tree, communicating with other actors executing a ``trio`` task tree, communicating with other actors
through "portals" which provide a native async API around "channels". through "portals" which provide a native async API around "channels".
""" """
is_arbitter = False is_arbiter = False
def __init__( def __init__(
self, self,
name: str, name: str,
uuid: str,
namespaces: [str], namespaces: [str],
main: Coroutine, main: Coroutine,
statespace: dict, statespace: dict,
uid: str = None,
allow_rpc: bool = True,
): ):
self.uid = (name, uuid) self.uid = (name, uid or str(uuid.uuid1()))
self.namespaces = namespaces self.namespaces = namespaces
self._mods = {} self._mods = {}
self.main = main self.main = main
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
# @dataclass once we get py3.7 # @dataclass once we get py3.7
self.statespace = statespace self.statespace = statespace
self._allow_rpc = allow_rpc
# filled in by `_async_main` after fork # filled in by `_async_main` after fork
self._peers = {} self._peers = {}
@ -107,7 +121,12 @@ class Actor:
self._peers[uid] = chan self._peers[uid] = chan
log.info(f"Registered {chan} for {uid}") log.info(f"Registered {chan} for {uid}")
log.debug(f"Retrieved event {event}") log.debug(f"Retrieved event {event}")
# Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via
# async-rpc calls.
if event and getattr(event, 'set', None): if event and getattr(event, 'set', None):
log.info(f"Waking waiters of {event.statistics()}") log.info(f"Waking waiters of {event.statistics()}")
# Alert any task waiting on this connection to come up # Alert any task waiting on this connection to come up
# and don't manage channel messages as some external task is # and don't manage channel messages as some external task is
@ -115,11 +134,17 @@ class Actor:
# (usually an actor nursery) # (usually an actor nursery)
event.set() event.set()
event.clear() event.clear()
# wait for channel consumer (usually a portal) to be # wait for channel consumer (usually a portal) to be
# done with the channel # done with the channel
await event.wait() await event.wait()
# Drop ref to channel so it can be gc-ed
self._peers.pop(self._uid, None)
# Remote controlled connection, we are likely a subactor
# being told what to do so manage the channel with async-rpc
else: else:
# manage the channel internally
await self._process_messages(chan) await self._process_messages(chan)
async def _process_messages(self, chan, treat_as_gen=False): async def _process_messages(self, chan, treat_as_gen=False):
@ -153,9 +178,9 @@ class Actor:
log.debug(f"Entering async-rpc loop for {chan.laddr}->{chan.raddr}") log.debug(f"Entering async-rpc loop for {chan.laddr}->{chan.raddr}")
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
async for ns, funcname, kwargs, callerid in chan.aiter_recv(): async for ns, funcname, kwargs, actorid in chan.aiter_recv():
log.debug( log.debug(
f"Processing request from {callerid}\n" f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})") f"{ns}.{funcname}({kwargs})")
# TODO: accept a sentinel which cancels this task tree? # TODO: accept a sentinel which cancels this task tree?
if ns == 'self': if ns == 'self':
@ -174,78 +199,98 @@ class Actor:
# function which declares a `chan` kwarg in its # function which declares a `chan` kwarg in its
# signature will be treated as one. # signature will be treated as one.
treat_as_gen = True treat_as_gen = True
nursery.start_soon(invoke, func, kwargs, name=funcname) nursery.start_soon(invoke, func, kwargs, name=funcname)
def _fork_main(self, host, parent_addr=None): def _fork_main(self, accept_addr, parent_addr=None):
# after fork routine which invokes a new ``trio.run`` # after fork routine which invokes a fresh ``trio.run``
log.info(f"self._peers are {self._peers}") log.info(f"self._peers are {self._peers}")
log.info( log.info(
f"Started new {ctx.current_process()} for actor {self.uid}") f"Started new {ctx.current_process()} for actor {self.uid}")
global _current_actor global _current_actor
_current_actor = self _current_actor = self
log.debug(f"parent_addr is {parent_addr}") log.debug(f"parent_addr is {parent_addr}")
trio.run(self._async_main, host, parent_addr) trio.run(
partial(self._async_main, accept_addr, parent_addr=parent_addr))
log.debug(f"Actor {self.uid} terminated") log.debug(f"Actor {self.uid} terminated")
async def _async_main( async def _async_main(self, accept_addr, parent_addr=None, nursery=None):
self, accept_host, parent_addr, *, connect_to_parent=True, """Start the channel server and main task.
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
"""
async with maybe_open_nursery(nursery) as nursery:
self._root_nursery = nursery
# Startup up channel server, optionally begin serving RPC
# requests from the parent.
host, port = accept_addr
await self._serve_forever(
nursery, accept_host=host, accept_port=port,
parent_addr=parent_addr
)
# start "main" routine in a task
if self.main:
await self.main(self)
# blocks here as expected if no nursery was provided until
# the channel server is killed
async def _serve_forever(
self,
nursery, # spawns main func and channel server
*,
# (host, port) to bind for channel server
accept_host=None,
accept_port=0,
parent_addr=None,
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
"""Main coroutine: connect back to the parent, spawn main task, begin """Main coroutine: connect back to the parent, spawn main task, begin
listening for new messages. listening for new messages.
A "root-most" (or "top-level") nursery is created here and when
cancelled effectively cancels the actor.
""" """
if accept_host is None: log.debug(f"Starting tcp server on {accept_host}:{accept_port}")
# use same host addr as parent for tcp server listeners = await nursery.start(
accept_host, port = parent_addr partial(
else: trio.serve_tcp,
self.load_namespaces() self._stream_handler,
port = 0 handler_nursery=nursery,
port=accept_port, host=accept_host,
async with trio.open_nursery() as nursery:
self._root_nursery = nursery
log.debug(f"Starting tcp server on {accept_host}:{port}")
listeners = await nursery.start(
partial(
trio.serve_tcp,
self._stream_handler,
handler_nursery=nursery,
port=port, host=accept_host,
)
) )
self._listeners.extend(listeners) )
log.debug(f"Spawned {listeners}") self._listeners.extend(listeners)
log.debug(f"Spawned {listeners}")
if connect_to_parent: if parent_addr is not None:
# Connect back to the parent actor and conduct initial # Connect back to the parent actor and conduct initial
# handshake (From this point on if we error ship the # handshake (From this point on if we error ship the
# exception back to the parent actor) # exception back to the parent actor)
chan = self._parent_chan = Channel( chan = self._parent_chan = Channel(
destaddr=parent_addr, destaddr=parent_addr,
on_reconnect=self.main on_reconnect=self.main
)
await chan.connect()
# initial handshake, report who we are, figure out who they are
await chan.send(self.uid)
uid = await chan.recv()
if uid in self._peers:
log.warn(
f"already have channel for {uid} registered?"
) )
await chan.connect() else:
self._peers[uid] = chan
# initial handshake, report who we are, figure out who they are # handle new connection back to parent
await chan.send(self.uid) if self._allow_rpc:
uid = await chan.recv() self.load_namespaces()
if uid in self._peers:
log.warn(
f"already have channel for {uid} registered?"
)
else:
self._peers[uid] = chan
# handle new connection back to parent
nursery.start_soon(self._process_messages, chan) nursery.start_soon(self._process_messages, chan)
if self.main: # when launched in-process, trigger awaiter's completion
nursery.start_soon(self.main) task_status.started()
# when launched in-process, trigger awaiter's completion
task_status.started()
def cancel(self): def cancel(self):
"""This cancels the internal root-most nursery thereby gracefully """This cancels the internal root-most nursery thereby gracefully
@ -253,6 +298,16 @@ class Actor:
""" """
self._root_nursery.cancel_scope.cancel() self._root_nursery.cancel_scope.cancel()
@property
def accept_addr(self):
"""Primary address to which the channel server is bound.
"""
return self._listeners[0].socket.getsockname() \
if self._listeners else None
def get_parent(self):
return Portal(self._parent_chan)
class Arbiter(Actor): class Arbiter(Actor):
"""A special actor who knows all the other actors and always has """A special actor who knows all the other actors and always has
@ -264,9 +319,9 @@ class Arbiter(Actor):
arbiter is already running that arbiter will be used. arbiter is already running that arbiter will be used.
""" """
_registry = defaultdict(list) _registry = defaultdict(list)
is_arbitter = True is_arbiter = True
def find_actors(self, name): def find_actor(self, name):
return self._registry[name] return self._registry[name]
def register_actor(self, name, sockaddr): def register_actor(self, name, sockaddr):
@ -278,8 +333,9 @@ class Portal:
Allows for invoking remote routines and receiving results through an Allows for invoking remote routines and receiving results through an
underlying ``tractor.Channel`` as though the remote (async) underlying ``tractor.Channel`` as though the remote (async)
function / generator was invoked locally. This of this like an async-native function / generator was invoked locally.
IPC API.
Think of this like an native async IPC API.
""" """
def __init__(self, channel, event=None): def __init__(self, channel, event=None):
self.channel = channel self.channel = channel
@ -295,8 +351,6 @@ class Portal:
return self return self
async def aclose(self): async def aclose(self):
# drop ref to channel so it can be gc-ed
_current_actor._peers.pop(self._uid, None)
await self.channel.aclose() await self.channel.aclose()
if self._event: if self._event:
# alert the _stream_handler task that we are done with the channel # alert the _stream_handler task that we are done with the channel
@ -324,12 +378,29 @@ class Portal:
return await chan.recv() return await chan.recv()
class LocalPortal:
"""A 'portal' to a local ``Actor``.
A compatibility shim for normal portals but for invoking functions
using an in process actor instance.
"""
def __init__(self, actor):
self.actor = actor
async def run(self, ns, func, **kwargs):
"""Run a requested function locally and return it's result.
"""
obj = self.actor if ns == 'self' else importlib.import_module(ns)
func = getattr(obj, func)
return func(**kwargs)
class ActorNursery: class ActorNursery:
"""Spawn scoped subprocess actors. """Spawn scoped subprocess actors.
""" """
def __init__(self, supervisor=None): def __init__(self, parent_actor, supervisor=None):
self.supervisor = supervisor self.supervisor = supervisor
self._parent = _current_actor self._parent_actor = parent_actor
# We'll likely want some way to cancel all sub-actors eventually # We'll likely want some way to cancel all sub-actors eventually
# self.cancel_scope = cancel_scope # self.cancel_scope = cancel_scope
self._children = {} self._children = {}
@ -339,23 +410,20 @@ class ActorNursery:
async def start_actor( async def start_actor(
self, name, module_paths, self, name, module_paths,
host='127.0.0.1', bind_addr=('127.0.0.1', 0),
statespace=None, statespace=None,
main=None, main=None,
loglevel='WARNING',
): ):
uid = str(uuid.uuid1())
actor = Actor( actor = Actor(
name, name,
uid,
module_paths, # modules allowed to invoked funcs from module_paths, # modules allowed to invoked funcs from
statespace=statespace, # global proc state vars statespace=statespace, # global proc state vars
main=main, # main coroutine to be invoked main=main, # main coroutine to be invoked
) )
accept_addr = _current_actor._listeners[0].socket.getsockname() parent_addr = self._parent_actor.accept_addr
proc = ctx.Process( proc = ctx.Process(
target=actor._fork_main, target=actor._fork_main,
args=(host, accept_addr), args=(bind_addr, parent_addr),
daemon=True, daemon=True,
name=name, name=name,
) )
@ -366,7 +434,7 @@ class ActorNursery:
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
if proc.is_alive(): if proc.is_alive():
event, chan = await _current_actor.wait_for_peer(actor.uid) event, chan = await self._parent_actor.wait_for_peer(actor.uid)
else: else:
raise ActorFailure("Couldn't start sub-actor?") raise ActorFailure("Couldn't start sub-actor?")
@ -397,53 +465,130 @@ class ActorNursery:
await self.cancel() await self.cancel()
def current_actor() -> Actor:
"""Get the process-local actor instance.
"""
return _current_actor
@asynccontextmanager @asynccontextmanager
async def open_nursery(supervisor=None, loglevel='WARNING'): async def open_nursery(supervisor=None, loglevel='WARNING'):
"""Create and yield a new ``ActorNursery``. """Create and yield a new ``ActorNursery``.
""" """
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
# TODO: figure out supervisors from erlang # TODO: figure out supervisors from erlang
async with ActorNursery(supervisor) as nursery: async with ActorNursery(current_actor(), supervisor) as nursery:
yield nursery yield nursery
async def serve_local_actor(actor, nursery=None, accept_addr=(None, 0)):
"""Spawn a local actor by starting a task to execute it's main
async function.
Blocks if no nursery is provided, in which case it is expected the nursery
provider is responsible for waiting on the task to complete.
"""
await actor._async_main(
accept_addr=accept_addr,
parent_addr=None,
nursery=nursery,
)
return actor
class NoArbiterFound:
"Couldn't find the arbiter?"
@asynccontextmanager @asynccontextmanager
async def get_arbiter(host='127.0.0.1', port=1616, main=None): async def get_arbiter(host='127.0.0.1', port=1616, main=None):
try: actor = current_actor()
async with Portal(Channel((host, port))) as portal: if actor and not actor.is_arbiter:
yield portal try:
except OSError: # If an arbiter is already running on this host connect to it
# no arbitter found on this host so start one in-process async with Portal(Channel((host, port))) as portal:
uid = str(uuid.uuid1()) yield portal
arbitter = Arbiter( except OSError as err:
raise NoArbiterFound(err)
else:
# no arbiter found on this host so start one in-process
arbiter = Arbiter(
'arbiter', 'arbiter',
uid, namespaces=[], # the arbiter doesn't allow module rpc
namespaces=[], # the arbitter doesn't allow module rpc
statespace={}, # global proc state vars statespace={}, # global proc state vars
main=main, # main coroutine to be invoked main=main, # main coroutine to be invoked
) )
global _current_actor
_current_actor = arbitter
async with trio.open_nursery() as nursery:
await nursery.start(
partial(arbitter._async_main, None,
(host, port), connect_to_parent=False)
)
async with Portal(Channel((host, port))) as portal:
yield portal
# the arbitter is cancelled when this context is complete # assign process-local actor
global _current_actor
_current_actor = arbiter
# start the arbiter in process in a new task
async with trio.open_nursery() as nursery:
# start local channel-server and fake the portal API
# NOTE: this won't block since we provide the nursery
await serve_local_actor(
arbiter, nursery=nursery, accept_addr=(host, port))
yield LocalPortal(arbiter)
# If spawned locally, the arbiter is cancelled when this context
# is complete (i.e the underlying context manager block completes)
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()
@asynccontextmanager @asynccontextmanager
async def find_actors(role): async def find_actor(name):
"""Ask the arbiter to find actor(s) by name.
Returns a sequence of unconnected portals for each matching actor
known to the arbiter (client code is expected to connect the portals).
"""
async with get_arbiter() as portal: async with get_arbiter() as portal:
sockaddrs = await portal.run('self', 'find_actors', name=role) sockaddrs = await portal.run('self', 'find_actor', name=name)
portals = [] portals = []
if sockaddrs: if sockaddrs:
for sockaddr in sockaddrs: for sockaddr in sockaddrs:
portals.append(Portal(Channel(sockaddr))) portals.append(Portal(Channel(sockaddr)))
yield portals # XXX: these are "unconnected" portals yield portals # XXX: these are "unconnected" portals
async def _main(async_fn, args, kwargs, name):
# Creates an internal nursery which shouldn't be cancelled even if
# the one opened below is (this is desirable because the arbitter should
# stay up until a re-election process has taken place - which is not
# implemented yet FYI).
async with get_arbiter(
host=kwargs.get('arbiter_host', '127.0.0.1'),
port=kwargs.get('arbiter_port', 1616),
main=partial(async_fn, *args, **kwargs)
) as portal:
if not current_actor().is_arbiter:
# create a local actor and start it up its main routine
actor = Actor(
name or 'anonymous',
# namespaces=kwargs.get('namespaces'),
# statespace=kwargs.get('statespace'),
# main=async_fn, # main coroutine to be invoked
**kwargs
)
# this will block and yield control to the `trio` run loop
await serve_local_actor(
actor, accept_addr=kwargs.get('accept_addr', (None, 0)))
log.info("Completed async main")
else: else:
yield None # block waiting for the arbiter main task to complete
pass
def run(async_fn, *args, arbiter_host=None, name='anonymous', **kwargs):
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.
"""
return trio.run(_main, async_fn, args, kwargs, name)