forked from goodboy/tractor
1
0
Fork 0

Drop the "main" task via kwarg idea

Stop worrying about a "main task" in each actor and instead add an
additional `ActorNursery.run_in_actor()` method which wraps calls
to create an actor and run a lone RPC task inside it. Note this
adjusts the public API of `ActorNursery.start_actor()` to drop
its `main` kwarg.

The dirty deats of making this possible:
- each spawned RPC task is now tracked with a specific cancel scope such
  that when the actor is cancelled all ongoing responders are cancelled
  before any IPC/channel machinery is closed (turns out that spawning
  new actors from `outlive_main=True` actors was probably borked before
  finally getting this working).
- make each initial RPC response be a packet which describes the
  `functype` (eg. `{'functype': 'asyncfunction'}`) allowing for async
  calls/submissions by client actors (this was required to make
  `run_in_actor()` work - `Portal._submit()` is the new async method).
- hooray we can stop faking "main task" results for daemon actors
- add better handling/raising of internal errors caught in the bowels of
  the `Actor` itself.
- drop the rpc spawning nursery; just use the `Actor._root_nursery`
- only wait on `_no_more_peers` if there are existing peer channels that
  are actually still connected.
- an `ActorNursery.__aexit__()` now implicitly waits on `Portal.result()` on close
  for each `run_in_actor()` spawned actor.
- handle cancelling partial started actors which haven't yet connected
  back to the parent

Resolves #24
drop_main_kwarg
Tyler Goodlet 2018-08-01 15:15:18 -04:00
parent f726bd81da
commit bb13b79df5
4 changed files with 380 additions and 256 deletions

View File

@ -49,23 +49,22 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
log.info(f"Arbiter seems to exist @ {host}:{port}") log.info(f"Arbiter seems to exist @ {host}:{port}")
actor = Actor( actor = Actor(
name or 'anonymous', name or 'anonymous',
main=main,
arbiter_addr=arbiter_addr, arbiter_addr=arbiter_addr,
**kwargs **kwargs
) )
host, port = (host, 0) host, port = (host, 0)
else: else:
# start this local actor as the arbiter # start this local actor as the arbiter
# this should eventually get passed `outlive_main=True`?
actor = Arbiter( actor = Arbiter(
name or 'arbiter', main=main, arbiter_addr=arbiter_addr, **kwargs) name or 'arbiter', arbiter_addr=arbiter_addr, **kwargs)
# ``Actor._async_main()`` creates an internal nursery if one is not # ``Actor._async_main()`` creates an internal nursery if one is not
# provided and thus blocks here until it's main task completes. # provided and thus blocks here until it's main task completes.
# Note that if the current actor is the arbiter it is desirable # Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has # for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI). # taken place - which is not implemented yet FYI).
return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr) return await _start_actor(
actor, main, host, port, arbiter_addr=arbiter_addr)
def run( def run(

View File

@ -5,9 +5,9 @@ import inspect
import importlib import importlib
from collections import defaultdict from collections import defaultdict
from functools import partial from functools import partial
from typing import Coroutine
import traceback import traceback
import uuid import uuid
from itertools import chain
import trio import trio
from async_generator import asynccontextmanager, aclosing from async_generator import asynccontextmanager, aclosing
@ -27,9 +27,13 @@ class ActorFailure(Exception):
"General actor failure" "General actor failure"
class InternalActorError(RuntimeError):
"Actor primitive internals failure"
async def _invoke( async def _invoke(
cid, chan, func, kwargs, actor, cid, chan, func, kwargs,
treat_as_gen=False, raise_errs=False, treat_as_gen=False,
task_status=trio.TASK_STATUS_IGNORED task_status=trio.TASK_STATUS_IGNORED
): ):
"""Invoke local func and return results over provided channel. """Invoke local func and return results over provided channel.
@ -47,11 +51,15 @@ async def _invoke(
not is_async_partial and not is_async_partial and
not is_async_gen_partial not is_async_gen_partial
): ):
await chan.send({'return': func(**kwargs), 'cid': cid}) await chan.send({'functype': 'function', 'cid': cid})
with trio.open_cancel_scope() as cs:
task_status.started(cs)
await chan.send({'return': func(**kwargs), 'cid': cid})
else: else:
coro = func(**kwargs) coro = func(**kwargs)
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: massive gotcha! If the containing scope # XXX: massive gotcha! If the containing scope
# is cancelled and we execute the below line, # is cancelled and we execute the below line,
# any ``ActorNursery.__aexit__()`` WON'T be # any ``ActorNursery.__aexit__()`` WON'T be
@ -59,40 +67,55 @@ async def _invoke(
# have to properly handle the closing (aclosing) # have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel # of the async gen in order to be sure the cancel
# is propagated! # is propagated!
async with aclosing(coro) as agen: with trio.open_cancel_scope() as cs:
async for item in agen: task_status.started(cs)
# TODO: can we send values back in here? async with aclosing(coro) as agen:
# it's gonna require a `while True:` and async for item in agen:
# some non-blocking way to retrieve new `asend()` # TODO: can we send values back in here?
# values from the channel: # it's gonna require a `while True:` and
# to_send = await chan.recv_nowait() # some non-blocking way to retrieve new `asend()`
# if to_send is not None: # values from the channel:
# to_yield = await coro.asend(to_send) # to_send = await chan.recv_nowait()
await chan.send({'yield': item, 'cid': cid}) # if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
log.debug(f"Finished iterating {coro}") log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper # TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final # `StopAsyncIteration` system here for returning a final
# value if desired # value if desired
await chan.send({'stop': None, 'cid': cid}) await chan.send({'stop': None, 'cid': cid})
else: else:
if treat_as_gen: if treat_as_gen:
await chan.send({'functype': 'asyncgen', 'cid': cid})
# XXX: the async-func may spawn further tasks which push # XXX: the async-func may spawn further tasks which push
# back values like an async-generator would but must # back values like an async-generator would but must
# manualy construct the response dict-packet-responses as # manualy construct the response dict-packet-responses as
# above # above
await coro with trio.open_cancel_scope() as cs:
task_status.started(cs)
await coro
else: else:
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'functype': 'asyncfunction', 'cid': cid})
with trio.open_cancel_scope() as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
except Exception: except Exception:
# always ship errors back to caller
log.exception("Actor errored:") log.exception("Actor errored:")
if not raise_errs: await chan.send({'error': traceback.format_exc(), 'cid': cid})
await chan.send({'error': traceback.format_exc(), 'cid': cid}) finally:
else: # RPC task bookeeping
raise tasks = actor._rpc_tasks.get(chan, None)
if tasks:
tasks.remove((cs, func))
task_status.started() if not tasks:
actor._rpc_tasks.pop(chan, None)
if not actor._rpc_tasks:
log.info(f"All RPC tasks have completed")
actor._no_more_rpc_tasks.set()
class Actor: class Actor:
@ -108,12 +131,10 @@ class Actor:
def __init__( def __init__(
self, self,
name: str, name: str,
main: Coroutine = None,
rpc_module_paths: [str] = [], rpc_module_paths: [str] = [],
statespace: dict = {}, statespace: dict = {},
uid: str = None, uid: str = None,
allow_rpc: bool = True, allow_rpc: bool = True,
outlive_main: bool = False,
loglevel: str = None, loglevel: str = None,
arbiter_addr: (str, int) = None, arbiter_addr: (str, int) = None,
): ):
@ -121,22 +142,25 @@ class Actor:
self.uid = (name, uid or str(uuid.uuid1())) self.uid = (name, uid or str(uuid.uuid1()))
self.rpc_module_paths = rpc_module_paths self.rpc_module_paths = rpc_module_paths
self._mods = {} self._mods = {}
self.main = main
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
# @dataclass once we get py3.7 # @dataclass once we get py3.7
self.statespace = statespace self.statespace = statespace
self._allow_rpc = allow_rpc self._allow_rpc = allow_rpc
self._outlive_main = outlive_main
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = arbiter_addr self._arb_addr = arbiter_addr
# filled in by `_async_main` after fork # filled in by `_async_main` after fork
self._root_nursery = None
self._server_nursery = None
self._peers = defaultdict(list) self._peers = defaultdict(list)
self._peer_connected = {} self._peer_connected = {}
self._no_more_peers = trio.Event() self._no_more_peers = trio.Event()
self._main_complete = trio.Event()
self._main_scope = None
self._no_more_peers.set() self._no_more_peers.set()
self._no_more_rpc_tasks = trio.Event()
self._no_more_rpc_tasks.set()
self._rpc_tasks = {}
self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
self._listeners = [] self._listeners = []
self._parent_chan = None self._parent_chan = None
@ -209,12 +233,6 @@ class Actor:
if not chans: if not chans:
log.debug(f"No more channels for {chan.uid}") log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None) self._peers.pop(chan.uid, None)
if not self._actors2calls.get(chan.uid, {}).get('main'):
# fake a "main task" result for any waiting
# nurseries/portals
log.debug(f"Faking result for {chan} from {chan.uid}")
q = self.get_waitq(chan.uid, 'main')
q.put_nowait({'return': None, 'cid': 'main'})
log.debug(f"Peers is {self._peers}") log.debug(f"Peers is {self._peers}")
@ -222,7 +240,7 @@ class Actor:
self._no_more_peers.set() self._no_more_peers.set()
log.debug(f"Signalling no more peer channels") log.debug(f"Signalling no more peer channels")
# XXX: is this necessary? # # XXX: is this necessary (GC should do it?)
if chan.connected(): if chan.connected():
log.debug(f"Disconnecting channel {chan}") log.debug(f"Disconnecting channel {chan}")
await chan.send(None) await chan.send(None)
@ -259,63 +277,91 @@ class Actor:
# TODO: once https://github.com/python-trio/trio/issues/467 gets # TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that! # worked out we'll likely want to use that!
log.debug(f"Entering msg loop for {chan} from {chan.uid}") log.debug(f"Entering msg loop for {chan} from {chan.uid}")
async with trio.open_nursery() as nursery: try:
try: async for msg in chan.aiter_recv():
async for msg in chan.aiter_recv(): if msg is None: # terminate sentinel
if msg is None: # terminate sentinel log.debug(
log.debug( f"Cancelling all tasks for {chan} from {chan.uid}")
f"Cancelling all tasks for {chan} from {chan.uid}") scopes = self._rpc_tasks.pop(chan, None)
nursery.cancel_scope.cancel() if scopes:
log.debug( for scope, func in scopes:
f"Msg loop signalled to terminate for" scope.cancel()
f" {chan} from {chan.uid}")
break
log.debug(f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid')
if cid: # deliver response to local caller/waiter
await self._push_result(chan.uid, cid, msg)
log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
else:
ns, funcname, kwargs, actorid, cid = msg['cmd']
log.debug( log.debug(
f"Processing request from {actorid}\n" f"Msg loop signalled to terminate for"
f"{ns}.{funcname}({kwargs})") f" {chan} from {chan.uid}")
if ns == 'self': break
func = getattr(self, funcname) log.debug(f"Received msg {msg} from {chan.uid}")
else: cid = msg.get('cid')
func = getattr(self._mods[ns], funcname) if cid:
if cid == 'internal': # internal actor error
# import pdb; pdb.set_trace()
raise InternalActorError(
f"{chan.uid}\n" + msg['error'])
# spin up a task for the requested function # deliver response to local caller/waiter
sig = inspect.signature(func) await self._push_result(chan.uid, cid, msg)
treat_as_gen = False
if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg"
kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `chan` kwarg in its
# signature will be treated as one.
treat_as_gen = True
log.debug(f"Spawning task for {func}")
nursery.start_soon(
_invoke, cid, chan, func, kwargs, treat_as_gen,
name=funcname
)
log.debug( log.debug(
f"Waiting on next msg for {chan} from {chan.uid}") f"Waiting on next msg for {chan} from {chan.uid}")
else: # channel disconnect continue
log.debug(f"{chan} from {chan.uid} disconnected")
except trio.ClosedStreamError:
log.error(f"{chan} form {chan.uid} broke")
log.debug(f"Exiting msg loop for {chan} from {chan.uid}") # process command request
ns, funcname, kwargs, actorid, cid = msg['cmd']
log.debug(
f"Processing request from {actorid}\n"
f"{ns}.{funcname}({kwargs})")
if ns == 'self':
func = getattr(self, funcname)
else:
func = getattr(self._mods[ns], funcname)
# spin up a task for the requested function
sig = inspect.signature(func)
treat_as_gen = False
if 'chan' in sig.parameters:
assert 'cid' in sig.parameters, \
f"{func} must accept a `cid` (caller id) kwarg"
kwargs['chan'] = chan
kwargs['cid'] = cid
# TODO: eventually we want to be more stringent
# about what is considered a far-end async-generator.
# Right now both actual async gens and any async
# function which declares a `chan` kwarg in its
# signature will be treated as one.
treat_as_gen = True
log.debug(f"Spawning task for {func}")
cs = await self._root_nursery.start(
_invoke, self, cid, chan, func, kwargs, treat_as_gen,
name=funcname
)
# never allow cancelling cancel requests (results in
# deadlock and other weird behaviour)
if func != self.cancel:
self._no_more_rpc_tasks.clear()
log.info(f"RPC func is {func}")
self._rpc_tasks.setdefault(chan, []).append((cs, func))
log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
else: # channel disconnect
log.debug(f"{chan} from {chan.uid} disconnected")
except InternalActorError:
# ship internal errors upwards
if self._parent_chan:
await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'internal'})
raise
except trio.ClosedResourceError:
log.error(f"{chan} form {chan.uid} broke")
except Exception:
# ship exception (from above code) to peer as an internal error
await chan.send(
{'error': traceback.format_exc(), 'cid': 'internal'})
raise
finally:
log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
def _fork_main(self, accept_addr, parent_addr=None): def _fork_main(self, accept_addr, parent_addr=None):
# after fork routine which invokes a fresh ``trio.run`` # after fork routine which invokes a fresh ``trio.run``
@ -324,7 +370,7 @@ class Actor:
if self.loglevel is not None: if self.loglevel is not None:
get_console_log(self.loglevel) get_console_log(self.loglevel)
log.info( log.info(
f"Started new {ctx.current_process()} for actor {self.uid}") f"Started new {ctx.current_process()} for {self.uid}")
_state._current_actor = self _state._current_actor = self
log.debug(f"parent_addr is {parent_addr}") log.debug(f"parent_addr is {parent_addr}")
try: try:
@ -332,14 +378,15 @@ class Actor:
self._async_main, accept_addr, parent_addr=parent_addr)) self._async_main, accept_addr, parent_addr=parent_addr))
except KeyboardInterrupt: except KeyboardInterrupt:
pass # handle it the same way trio does? pass # handle it the same way trio does?
log.debug(f"Actor {self.uid} terminated") log.info(f"Actor {self.uid} terminated")
async def _async_main( async def _async_main(
self, self,
accept_addr, accept_addr,
arbiter_addr=None, arbiter_addr=None,
parent_addr=None, parent_addr=None,
nursery=None nursery=None,
task_status=trio.TASK_STATUS_IGNORED,
): ):
"""Start the channel server, maybe connect back to the parent, and """Start the channel server, maybe connect back to the parent, and
start the main task. start the main task.
@ -347,7 +394,6 @@ class Actor:
A "root-most" (or "top-level") nursery for this actor is opened here A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor. and when cancelled effectively cancels the actor.
""" """
result = None
arbiter_addr = arbiter_addr or self._arb_addr arbiter_addr = arbiter_addr or self._arb_addr
registered_with_arbiter = False registered_with_arbiter = False
try: try:
@ -373,7 +419,6 @@ class Actor:
# exception back to the parent actor) # exception back to the parent actor)
chan = self._parent_chan = Channel( chan = self._parent_chan = Channel(
destaddr=parent_addr, destaddr=parent_addr,
on_reconnect=self.main
) )
await chan.connect() await chan.connect()
# initial handshake, report who we are, who they are # initial handshake, report who we are, who they are
@ -382,7 +427,7 @@ class Actor:
log.warn( log.warn(
f"Failed to connect to parent @ {parent_addr}," f"Failed to connect to parent @ {parent_addr},"
" closing server") " closing server")
self.cancel_server() await self.cancel()
self._parent_chan = None self._parent_chan = None
# register with the arbiter if we're told its addr # register with the arbiter if we're told its addr
@ -393,6 +438,7 @@ class Actor:
name=self.name, sockaddr=self.accept_addr) name=self.name, sockaddr=self.accept_addr)
registered_with_arbiter = True registered_with_arbiter = True
task_status.started()
# handle new connection back to parent optionally # handle new connection back to parent optionally
# begin responding to RPC # begin responding to RPC
if self._allow_rpc: if self._allow_rpc:
@ -401,38 +447,6 @@ class Actor:
nursery.start_soon( nursery.start_soon(
self._process_messages, self._parent_chan) self._process_messages, self._parent_chan)
if self.main:
try:
if self._parent_chan:
async with trio.open_nursery() as n:
self._main_scope = n.cancel_scope
log.debug(f"Starting main task `{self.main}`")
# spawned subactor so deliver "main"
# task result(s) back to parent
await n.start(
_invoke, 'main',
self._parent_chan, self.main, {},
# treat_as_gen, raise_errs params
False, True
)
else:
with trio.open_cancel_scope() as main_scope:
self._main_scope = main_scope
# run directly we are an "unspawned actor"
log.debug(f"Running `{self.main}` directly")
result = await self.main()
finally:
# tear down channel server in order to ensure
# we exit normally when the main task is done
if not self._outlive_main:
log.debug(f"Shutting down channel server")
self.cancel_server()
log.debug(f"Shutting down root nursery")
nursery.cancel_scope.cancel()
self._main_complete.set()
if self._main_scope.cancelled_caught:
log.debug("Main task was cancelled sucessfully")
log.debug("Waiting on root nursery to complete") log.debug("Waiting on root nursery to complete")
# blocks here as expected if no nursery was provided until # blocks here as expected if no nursery was provided until
# the channel server is killed (i.e. this actor is # the channel server is killed (i.e. this actor is
@ -441,7 +455,7 @@ class Actor:
if self._parent_chan: if self._parent_chan:
try: try:
await self._parent_chan.send( await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'main'}) {'error': traceback.format_exc(), 'cid': 'internal'})
except trio.ClosedStreamError: except trio.ClosedStreamError:
log.error( log.error(
f"Failed to ship error to parent " f"Failed to ship error to parent "
@ -458,18 +472,18 @@ class Actor:
# terminate actor once all it's peers (actors that connected # terminate actor once all it's peers (actors that connected
# to it as clients) have disappeared # to it as clients) have disappeared
if not self._no_more_peers.is_set(): if not self._no_more_peers.is_set():
log.debug( if any(
f"Waiting for remaining peers {self._peers} to clear") chan.connected() for chan in chain(*self._peers.values())
await self._no_more_peers.wait() ):
log.debug(
f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait()
log.debug(f"All peer channels are complete") log.debug(f"All peer channels are complete")
# tear down channel server no matter what since we errored # tear down channel server no matter what since we errored
# or completed # or completed
log.debug(f"Shutting down channel server")
self.cancel_server() self.cancel_server()
return result
async def _serve_forever( async def _serve_forever(
self, self,
*, *,
@ -514,20 +528,39 @@ class Actor:
log.warn(f"Unable to unregister {self.name} from arbiter") log.warn(f"Unable to unregister {self.name} from arbiter")
async def cancel(self): async def cancel(self):
"""This cancels the internal root-most nursery thereby gracefully """Cancel this actor.
cancelling (for all intents and purposes) this actor.
The sequence in order is:
- cancelling all rpc tasks
- cancelling the channel server
- cancel the "root" nursery
""" """
# cancel all ongoing rpc tasks
await self.cancel_rpc_tasks()
self.cancel_server() self.cancel_server()
if self._main_scope:
self._main_scope.cancel()
log.debug("Waiting on main task to complete")
await self._main_complete.wait()
self._root_nursery.cancel_scope.cancel() self._root_nursery.cancel_scope.cancel()
async def cancel_rpc_tasks(self):
"""Cancel all existing RPC responder tasks using the cancel scope
registered for each.
"""
scopes = self._rpc_tasks
log.info(f"Cancelling all {len(scopes)} rpc tasks:\n{scopes}")
for chan, scopes in scopes.items():
log.debug(f"Cancelling all tasks for {chan.uid}")
for scope, func in scopes:
log.debug(f"Cancelling task for {func}")
scope.cancel()
if scopes:
log.info(
f"Waiting for remaining rpc tasks to complete {scopes}")
await self._no_more_rpc_tasks.wait()
def cancel_server(self): def cancel_server(self):
"""Cancel the internal channel server nursery thereby """Cancel the internal channel server nursery thereby
preventing any new inbound connections from being established. preventing any new inbound connections from being established.
""" """
log.debug("Shutting down channel server")
self._server_nursery.cancel_scope.cancel() self._server_nursery.cancel_scope.cancel()
@property @property
@ -568,7 +601,7 @@ class Arbiter(Actor):
self._registry.pop(name, None) self._registry.pop(name, None)
async def _start_actor(actor, host, port, arbiter_addr, nursery=None): async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
"""Spawn a local actor by starting a task to execute it's main """Spawn a local actor by starting a task to execute it's main
async function. async function.
@ -582,16 +615,19 @@ async def _start_actor(actor, host, port, arbiter_addr, nursery=None):
# NOTE: this won't block since we provide the nursery # NOTE: this won't block since we provide the nursery
log.info(f"Starting local {actor} @ {host}:{port}") log.info(f"Starting local {actor} @ {host}:{port}")
result = await actor._async_main( async with trio.open_nursery() as nursery:
accept_addr=(host, port), await nursery.start(
parent_addr=None, partial(
arbiter_addr=arbiter_addr, actor._async_main,
nursery=nursery, accept_addr=(host, port),
) parent_addr=None,
# XXX: If spawned locally, the actor is cancelled when this arbiter_addr=arbiter_addr,
# context is complete given that there are no more active )
# peer channels connected to it. )
if not actor._outlive_main: result = await main()
# XXX: If spawned locally, the actor is cancelled when this
# context is complete given that there are no more active
# peer channels connected to it.
actor.cancel_server() actor.cancel_server()
# unset module state # unset module state

View File

@ -42,20 +42,6 @@ async def _do_handshake(actor, chan):
return uid return uid
async def result_from_q(q, chan):
"""Process a msg from a remote actor.
"""
first_msg = await q.get()
if 'return' in first_msg:
return 'return', first_msg, q
elif 'yield' in first_msg:
return 'yield', first_msg, q
elif 'error' in first_msg:
raise RemoteActorError(f"{chan.uid}\n" + first_msg['error'])
else:
raise ValueError(f"{first_msg} is an invalid response packet?")
class Portal: class Portal:
"""A 'portal' to a(n) (remote) ``Actor``. """A 'portal' to a(n) (remote) ``Actor``.
@ -67,7 +53,12 @@ class Portal:
""" """
def __init__(self, channel): def __init__(self, channel):
self.channel = channel self.channel = channel
# when this is set to a tuple returned from ``_submit()`` then
# it is expected that ``result()`` will be awaited at some point
# during the portal's lifetime
self._result = None self._result = None
self._expect_result = None
self._errored = False
async def aclose(self): async def aclose(self):
log.debug(f"Closing {self}") log.debug(f"Closing {self}")
@ -75,26 +66,56 @@ class Portal:
# gets in! # gets in!
await self.channel.aclose() await self.channel.aclose()
async def run(self, ns, func, **kwargs): async def _submit(self, ns, func, **kwargs):
"""Submit a function to be scheduled and run by actor, return its """Submit a function to be scheduled and run by actor, return the
(stream of) result(s). associated caller id, response queue, response type str,
first message packet as a tuple.
This is an async call.
""" """
# ship a function call request to the remote actor
cid, q = await current_actor().send_cmd(self.channel, ns, func, kwargs)
# wait on first response msg and handle (this should be
# in an immediate response)
first_msg = await q.get()
functype = first_msg.get('functype')
if functype == 'function' or functype == 'asyncfunction':
resp_type = 'return'
elif functype == 'asyncgen':
resp_type = 'yield'
elif 'error' in first_msg:
raise RemoteActorError(
f"{self.channel.uid}\n" + first_msg['error'])
else:
raise ValueError(f"{first_msg} is an invalid response packet?")
return cid, q, resp_type, first_msg
async def _submit_for_result(self, ns, func, **kwargs):
assert self._expect_result is None, \
"A pending main result has already been submitted"
self._expect_result = await self._submit(ns, func, **kwargs)
async def run(self, ns, func, **kwargs):
"""Submit a function to be scheduled and run by actor, wrap and return
its (stream of) result(s).
This is a blocking call.
"""
return await self._return_from_resptype(
*(await self._submit(ns, func, **kwargs))
)
async def _return_from_resptype(self, cid, q, resptype, first_msg):
# TODO: not this needs some serious work and thinking about how # TODO: not this needs some serious work and thinking about how
# to make async-generators the fundamental IPC API over channels! # to make async-generators the fundamental IPC API over channels!
# (think `yield from`, `gen.send()`, and functional reactive stuff) # (think `yield from`, `gen.send()`, and functional reactive stuff)
actor = current_actor()
# ship a function call request to the remote actor
cid, q = await actor.send_cmd(self.channel, ns, func, kwargs)
# wait on first response msg and handle
return await self._return_from_resptype(
cid, *(await result_from_q(q, self.channel)))
async def _return_from_resptype(self, cid, resptype, first_msg, q):
if resptype == 'yield': if resptype == 'yield':
async def yield_from_q(): async def yield_from_q():
yield first_msg['yield']
try: try:
async for msg in q: async for msg in q:
try: try:
@ -103,8 +124,9 @@ class Portal:
if 'stop' in msg: if 'stop' in msg:
break # far end async gen terminated break # far end async gen terminated
else: else:
raise RemoteActorError(msg['error']) raise RemoteActorError(
except GeneratorExit: f"{self.channel.uid}\n" + msg['error'])
except StopAsyncIteration:
log.debug( log.debug(
f"Cancelling async gen call {cid} to " f"Cancelling async gen call {cid} to "
f"{self.channel.uid}") f"{self.channel.uid}")
@ -113,22 +135,24 @@ class Portal:
return yield_from_q() return yield_from_q()
elif resptype == 'return': elif resptype == 'return':
return first_msg['return'] msg = await q.get()
try:
return msg['return']
except KeyError:
raise RemoteActorError(
f"{self.channel.uid}\n" + msg['error'])
else: else:
raise ValueError(f"Unknown msg response type: {first_msg}") raise ValueError(f"Unknown msg response type: {first_msg}")
async def result(self): async def result(self):
"""Return the result(s) from the remote actor's "main" task. """Return the result(s) from the remote actor's "main" task.
""" """
if self._result is None: if self._expect_result is None:
q = current_actor().get_waitq(self.channel.uid, 'main') raise RuntimeError("This portal is not expecting a final result?")
resptype, first_msg, q = (await result_from_q(q, self.channel)) elif self._result is None:
self._result = await self._return_from_resptype( self._result = await self._return_from_resptype(
'main', resptype, first_msg, q) *self._expect_result
log.warn( )
f"Retrieved first result `{self._result}` "
f"for {self.channel.uid}")
# await q.put(first_msg) # for next consumer (e.g. nursery)
return self._result return self._result
async def close(self): async def close(self):
@ -153,7 +177,9 @@ class Portal:
log.warn( log.warn(
f"{self.channel} for {self.channel.uid} was already closed?") f"{self.channel} for {self.channel.uid} was already closed?")
return False return False
else:
log.warn(f"May have failed to cancel {self.channel.uid}")
return False
class LocalPortal: class LocalPortal:
"""A 'portal' to a local ``Actor``. """A 'portal' to a local ``Actor``.

View File

@ -2,9 +2,10 @@
``trio`` inspired apis and helpers ``trio`` inspired apis and helpers
""" """
import multiprocessing as mp import multiprocessing as mp
import inspect
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager, aclosing
from ._state import current_actor from ._state import current_actor
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
@ -22,9 +23,9 @@ class ActorNursery:
def __init__(self, actor, supervisor=None): def __init__(self, actor, supervisor=None):
self.supervisor = supervisor # TODO self.supervisor = supervisor # TODO
self._actor = actor self._actor = actor
# We'll likely want some way to cancel all sub-actors eventually
# self.cancel_scope = cancel_scope
self._children = {} self._children = {}
# portals spawned with ``run_in_actor()``
self._cancel_after_result_on_exit = set()
self.cancelled = False self.cancelled = False
async def __aenter__(self): async def __aenter__(self):
@ -33,11 +34,9 @@ class ActorNursery:
async def start_actor( async def start_actor(
self, self,
name: str, name: str,
main=None,
bind_addr=('127.0.0.1', 0), bind_addr=('127.0.0.1', 0),
statespace=None, statespace=None,
rpc_module_paths=None, rpc_module_paths=None,
outlive_main=False, # sub-actors die when their main task completes
loglevel=None, # set log level per subactor loglevel=None, # set log level per subactor
): ):
loglevel = loglevel or self._actor.loglevel or get_loglevel() loglevel = loglevel or self._actor.loglevel or get_loglevel()
@ -46,8 +45,6 @@ class ActorNursery:
# modules allowed to invoked funcs from # modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths or [], rpc_module_paths=rpc_module_paths or [],
statespace=statespace, # global proc state vars statespace=statespace, # global proc state vars
main=main, # main coroutine to be invoked
outlive_main=outlive_main,
loglevel=loglevel, loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr, arbiter_addr=current_actor()._arb_addr,
) )
@ -59,6 +56,11 @@ class ActorNursery:
# daemon=True, # daemon=True,
name=name, name=name,
) )
# register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request
self._children[actor.uid] = [actor, proc, None]
proc.start() proc.start()
if not proc.is_alive(): if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?") raise ActorFailure("Couldn't start sub-actor?")
@ -69,7 +71,39 @@ class ActorNursery:
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await self._actor.wait_for_peer(actor.uid) event, chan = await self._actor.wait_for_peer(actor.uid)
portal = Portal(chan) portal = Portal(chan)
self._children[(name, proc.pid)] = (actor, proc, portal) self._children[actor.uid][2] = portal
return portal
async def run_in_actor(
self,
name,
fn,
bind_addr=('127.0.0.1', 0),
rpc_module_paths=None,
statespace=None,
loglevel=None, # set log level per subactor
**kwargs, # explicit args to ``fn``
):
"""Spawn a new actor, run a lone task, then terminate the actor and
return its result.
Actors spawned using this method are kept alive at nursery teardown
until the task spawned by executing ``fn`` completes at which point
the actor is terminated.
"""
mod_path = fn.__module__
portal = await self.start_actor(
name,
rpc_module_paths=[mod_path],
bind_addr=bind_addr,
statespace=statespace,
)
await portal._submit_for_result(
mod_path,
fn.__name__,
**kwargs
)
self._cancel_after_result_on_exit.add(portal)
return portal return portal
async def wait(self): async def wait(self):
@ -82,27 +116,34 @@ class ActorNursery:
# please god don't hang # please god don't hang
proc.join() proc.join()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
event = self._actor._peers.get(actor.uid) self._children.pop(actor.uid)
if isinstance(event, trio.Event):
event.set()
log.warn(
f"Cancelled `wait_for_peer()` call since {actor.uid}"
f" is already dead!")
if not portal._result:
log.debug(f"Faking result for {actor.uid}")
q = self._actor.get_waitq(actor.uid, 'main')
q.put_nowait({'return': None, 'cid': 'main'})
async def wait_for_result(portal): async def wait_for_result(portal, actor):
if portal.channel.connected(): # cancel the actor gracefully
log.debug(f"Waiting on final result from {subactor.uid}") log.info(f"Cancelling {portal.channel.uid} gracefully")
await portal.result() await portal.cancel_actor()
log.debug(f"Waiting on final result from {subactor.uid}")
res = await portal.result()
# if it's an async-gen then we should alert the user
# that we're cancelling it
if inspect.isasyncgen(res):
log.warn(
f"Blindly consuming asyncgen for {actor.uid}")
with trio.fail_after(1):
async with aclosing(res) as agen:
async for item in agen:
log.debug(f"Consuming item {item}")
# unblocks when all waiter tasks have completed # unblocks when all waiter tasks have completed
children = self._children.copy()
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values(): for subactor, proc, portal in children.values():
nursery.start_soon(wait_for_proc, proc, subactor, portal) nursery.start_soon(wait_for_proc, proc, subactor, portal)
nursery.start_soon(wait_for_result, portal) if proc.is_alive() and (
portal in self._cancel_after_result_on_exit
):
nursery.start_soon(wait_for_result, portal, subactor)
async def cancel(self, hard_kill=False): async def cancel(self, hard_kill=False):
"""Cancel this nursery by instructing each subactor to cancel """Cancel this nursery by instructing each subactor to cancel
@ -111,20 +152,37 @@ class ActorNursery:
If ``hard_killl`` is set to ``True`` then kill the processes If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation. directly without any far end graceful ``trio`` cancellation.
""" """
def do_hard_kill(proc):
log.warn(f"Hard killing subactors {self._children}")
proc.terminate()
# XXX: below doesn't seem to work?
# send KeyBoardInterrupt (trio abort signal) to sub-actors
# os.kill(proc.pid, signal.SIGINT)
log.debug(f"Cancelling nursery") log.debug(f"Cancelling nursery")
for subactor, proc, portal in self._children.values(): with trio.fail_after(3):
if proc is mp.current_process(): async with trio.open_nursery() as n:
# XXX: does this even make sense? for subactor, proc, portal in self._children.values():
await subactor.cancel() if hard_kill:
else: do_hard_kill(proc)
if hard_kill: else:
log.warn(f"Hard killing subactors {self._children}") if portal is None: # actor hasn't fully spawned yet
proc.terminate() event = self._actor._peer_connected[subactor.uid]
# XXX: doesn't seem to work? log.warn(
# send KeyBoardInterrupt (trio abort signal) to sub-actors f"{subactor.uid} wasn't finished spawning?")
# os.kill(proc.pid, signal.SIGINT) await event.wait()
else: # channel/portal should now be up
await portal.cancel_actor() _, _, portal = self._children[subactor.uid]
if portal is None:
# cancelled while waiting on the event?
chan = self._actor._peers[subactor.uid][-1]
if chan:
portal = Portal(chan)
else: # there's no other choice left
do_hard_kill(proc)
# spawn cancel tasks async
n.start_soon(portal.cancel_actor)
log.debug(f"Waiting on all subactors to complete") log.debug(f"Waiting on all subactors to complete")
await self.wait() await self.wait()
@ -134,32 +192,37 @@ class ActorNursery:
async def __aexit__(self, etype, value, tb): async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete. """Wait on all subactor's main routines to complete.
""" """
if etype is not None: try:
# XXX: hypothetically an error could be raised and then if etype is not None:
# a cancel signal shows up slightly after in which case the # XXX: hypothetically an error could be raised and then
# else block here might not complete? Should both be shielded? # a cancel signal shows up slightly after in which case the
if etype is trio.Cancelled: # else block here might not complete? Should both be shielded?
with trio.open_cancel_scope(shield=True): with trio.open_cancel_scope(shield=True):
log.warn( if etype is trio.Cancelled:
f"{current_actor().uid} was cancelled with {etype}" log.warn(
", cancelling actor nursery") f"{current_actor().uid} was cancelled with {etype}"
await self.cancel() ", cancelling actor nursery")
await self.cancel()
else:
log.exception(
f"{current_actor().uid} errored with {etype}, "
"cancelling actor nursery")
await self.cancel()
else: else:
log.exception( # XXX: this is effectively the lone cancellation/supervisor
f"{current_actor().uid} errored with {etype}, " # strategy which exactly mimicks trio's behaviour
"cancelling actor nursery") log.debug(f"Waiting on subactors {self._children} to complete")
await self.cancel() try:
else: await self.wait()
# XXX: this is effectively the lone cancellation/supervisor except Exception as err:
# strategy which exactly mimicks trio's behaviour log.warn(f"Nursery caught {err}, cancelling")
log.debug(f"Waiting on subactors {self._children} to complete") await self.cancel()
try: raise
await self.wait() log.debug(f"Nursery teardown complete")
except Exception as err: except Exception:
log.warn(f"Nursery caught {err}, cancelling") log.exception("Error on nursery exit:")
await self.cancel() await self.wait()
raise raise
log.debug(f"Nursery teardown complete")
@asynccontextmanager @asynccontextmanager