diff --git a/tractor/__init__.py b/tractor/__init__.py index 3426b81..515f8dc 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -49,23 +49,22 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): log.info(f"Arbiter seems to exist @ {host}:{port}") actor = Actor( name or 'anonymous', - main=main, arbiter_addr=arbiter_addr, **kwargs ) host, port = (host, 0) else: # start this local actor as the arbiter - # this should eventually get passed `outlive_main=True`? actor = Arbiter( - name or 'arbiter', main=main, arbiter_addr=arbiter_addr, **kwargs) + name or 'arbiter', arbiter_addr=arbiter_addr, **kwargs) # ``Actor._async_main()`` creates an internal nursery if one is not # provided and thus blocks here until it's main task completes. # Note that if the current actor is the arbiter it is desirable # for it to stay up indefinitely until a re-election process has # taken place - which is not implemented yet FYI). - return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr) + return await _start_actor( + actor, main, host, port, arbiter_addr=arbiter_addr) def run( diff --git a/tractor/_actor.py b/tractor/_actor.py index 3c4acb2..fe99c73 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -5,9 +5,9 @@ import inspect import importlib from collections import defaultdict from functools import partial -from typing import Coroutine import traceback import uuid +from itertools import chain import trio from async_generator import asynccontextmanager, aclosing @@ -27,9 +27,13 @@ class ActorFailure(Exception): "General actor failure" +class InternalActorError(RuntimeError): + "Actor primitive internals failure" + + async def _invoke( - cid, chan, func, kwargs, - treat_as_gen=False, raise_errs=False, + actor, cid, chan, func, kwargs, + treat_as_gen=False, task_status=trio.TASK_STATUS_IGNORED ): """Invoke local func and return results over provided channel. @@ -47,11 +51,15 @@ async def _invoke( not is_async_partial and not is_async_gen_partial ): - await chan.send({'return': func(**kwargs), 'cid': cid}) + await chan.send({'functype': 'function', 'cid': cid}) + with trio.open_cancel_scope() as cs: + task_status.started(cs) + await chan.send({'return': func(**kwargs), 'cid': cid}) else: coro = func(**kwargs) if inspect.isasyncgen(coro): + await chan.send({'functype': 'asyncgen', 'cid': cid}) # XXX: massive gotcha! If the containing scope # is cancelled and we execute the below line, # any ``ActorNursery.__aexit__()`` WON'T be @@ -59,40 +67,55 @@ async def _invoke( # have to properly handle the closing (aclosing) # of the async gen in order to be sure the cancel # is propagated! - async with aclosing(coro) as agen: - async for item in agen: - # TODO: can we send values back in here? - # it's gonna require a `while True:` and - # some non-blocking way to retrieve new `asend()` - # values from the channel: - # to_send = await chan.recv_nowait() - # if to_send is not None: - # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) + with trio.open_cancel_scope() as cs: + task_status.started(cs) + async with aclosing(coro) as agen: + async for item in agen: + # TODO: can we send values back in here? + # it's gonna require a `while True:` and + # some non-blocking way to retrieve new `asend()` + # values from the channel: + # to_send = await chan.recv_nowait() + # if to_send is not None: + # to_yield = await coro.asend(to_send) + await chan.send({'yield': item, 'cid': cid}) - log.debug(f"Finished iterating {coro}") - # TODO: we should really support a proper - # `StopAsyncIteration` system here for returning a final - # value if desired - await chan.send({'stop': None, 'cid': cid}) + log.debug(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + await chan.send({'stop': None, 'cid': cid}) else: if treat_as_gen: + await chan.send({'functype': 'asyncgen', 'cid': cid}) # XXX: the async-func may spawn further tasks which push # back values like an async-generator would but must # manualy construct the response dict-packet-responses as # above - await coro + with trio.open_cancel_scope() as cs: + task_status.started(cs) + await coro else: - await chan.send({'return': await coro, 'cid': cid}) - + await chan.send({'functype': 'asyncfunction', 'cid': cid}) + with trio.open_cancel_scope() as cs: + task_status.started(cs) + await chan.send({'return': await coro, 'cid': cid}) except Exception: + # always ship errors back to caller log.exception("Actor errored:") - if not raise_errs: - await chan.send({'error': traceback.format_exc(), 'cid': cid}) - else: - raise + await chan.send({'error': traceback.format_exc(), 'cid': cid}) + finally: + # RPC task bookeeping + tasks = actor._rpc_tasks.get(chan, None) + if tasks: + tasks.remove((cs, func)) - task_status.started() + if not tasks: + actor._rpc_tasks.pop(chan, None) + + if not actor._rpc_tasks: + log.info(f"All RPC tasks have completed") + actor._no_more_rpc_tasks.set() class Actor: @@ -108,12 +131,10 @@ class Actor: def __init__( self, name: str, - main: Coroutine = None, rpc_module_paths: [str] = [], statespace: dict = {}, uid: str = None, allow_rpc: bool = True, - outlive_main: bool = False, loglevel: str = None, arbiter_addr: (str, int) = None, ): @@ -121,22 +142,25 @@ class Actor: self.uid = (name, uid or str(uuid.uuid1())) self.rpc_module_paths = rpc_module_paths self._mods = {} - self.main = main # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.statespace = statespace self._allow_rpc = allow_rpc - self._outlive_main = outlive_main self.loglevel = loglevel self._arb_addr = arbiter_addr # filled in by `_async_main` after fork + self._root_nursery = None + self._server_nursery = None self._peers = defaultdict(list) self._peer_connected = {} self._no_more_peers = trio.Event() - self._main_complete = trio.Event() - self._main_scope = None self._no_more_peers.set() + + self._no_more_rpc_tasks = trio.Event() + self._no_more_rpc_tasks.set() + self._rpc_tasks = {} + self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._listeners = [] self._parent_chan = None @@ -209,12 +233,6 @@ class Actor: if not chans: log.debug(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) - if not self._actors2calls.get(chan.uid, {}).get('main'): - # fake a "main task" result for any waiting - # nurseries/portals - log.debug(f"Faking result for {chan} from {chan.uid}") - q = self.get_waitq(chan.uid, 'main') - q.put_nowait({'return': None, 'cid': 'main'}) log.debug(f"Peers is {self._peers}") @@ -222,7 +240,7 @@ class Actor: self._no_more_peers.set() log.debug(f"Signalling no more peer channels") - # XXX: is this necessary? + # # XXX: is this necessary (GC should do it?) if chan.connected(): log.debug(f"Disconnecting channel {chan}") await chan.send(None) @@ -259,63 +277,91 @@ class Actor: # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! log.debug(f"Entering msg loop for {chan} from {chan.uid}") - async with trio.open_nursery() as nursery: - try: - async for msg in chan.aiter_recv(): - if msg is None: # terminate sentinel - log.debug( - f"Cancelling all tasks for {chan} from {chan.uid}") - nursery.cancel_scope.cancel() - log.debug( - f"Msg loop signalled to terminate for" - f" {chan} from {chan.uid}") - break - log.debug(f"Received msg {msg} from {chan.uid}") - cid = msg.get('cid') - if cid: # deliver response to local caller/waiter - await self._push_result(chan.uid, cid, msg) - log.debug( - f"Waiting on next msg for {chan} from {chan.uid}") - continue - else: - ns, funcname, kwargs, actorid, cid = msg['cmd'] + try: + async for msg in chan.aiter_recv(): + if msg is None: # terminate sentinel + log.debug( + f"Cancelling all tasks for {chan} from {chan.uid}") + scopes = self._rpc_tasks.pop(chan, None) + if scopes: + for scope, func in scopes: + scope.cancel() log.debug( - f"Processing request from {actorid}\n" - f"{ns}.{funcname}({kwargs})") - if ns == 'self': - func = getattr(self, funcname) - else: - func = getattr(self._mods[ns], funcname) + f"Msg loop signalled to terminate for" + f" {chan} from {chan.uid}") + break + log.debug(f"Received msg {msg} from {chan.uid}") + cid = msg.get('cid') + if cid: + if cid == 'internal': # internal actor error + # import pdb; pdb.set_trace() + raise InternalActorError( + f"{chan.uid}\n" + msg['error']) - # spin up a task for the requested function - sig = inspect.signature(func) - treat_as_gen = False - if 'chan' in sig.parameters: - assert 'cid' in sig.parameters, \ - f"{func} must accept a `cid` (caller id) kwarg" - kwargs['chan'] = chan - kwargs['cid'] = cid - # TODO: eventually we want to be more stringent - # about what is considered a far-end async-generator. - # Right now both actual async gens and any async - # function which declares a `chan` kwarg in its - # signature will be treated as one. - treat_as_gen = True - - log.debug(f"Spawning task for {func}") - nursery.start_soon( - _invoke, cid, chan, func, kwargs, treat_as_gen, - name=funcname - ) + # deliver response to local caller/waiter + await self._push_result(chan.uid, cid, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") - else: # channel disconnect - log.debug(f"{chan} from {chan.uid} disconnected") - except trio.ClosedStreamError: - log.error(f"{chan} form {chan.uid} broke") + continue - log.debug(f"Exiting msg loop for {chan} from {chan.uid}") + # process command request + ns, funcname, kwargs, actorid, cid = msg['cmd'] + + log.debug( + f"Processing request from {actorid}\n" + f"{ns}.{funcname}({kwargs})") + if ns == 'self': + func = getattr(self, funcname) + else: + func = getattr(self._mods[ns], funcname) + + # spin up a task for the requested function + sig = inspect.signature(func) + treat_as_gen = False + if 'chan' in sig.parameters: + assert 'cid' in sig.parameters, \ + f"{func} must accept a `cid` (caller id) kwarg" + kwargs['chan'] = chan + kwargs['cid'] = cid + # TODO: eventually we want to be more stringent + # about what is considered a far-end async-generator. + # Right now both actual async gens and any async + # function which declares a `chan` kwarg in its + # signature will be treated as one. + treat_as_gen = True + + log.debug(f"Spawning task for {func}") + cs = await self._root_nursery.start( + _invoke, self, cid, chan, func, kwargs, treat_as_gen, + name=funcname + ) + # never allow cancelling cancel requests (results in + # deadlock and other weird behaviour) + if func != self.cancel: + self._no_more_rpc_tasks.clear() + log.info(f"RPC func is {func}") + self._rpc_tasks.setdefault(chan, []).append((cs, func)) + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") + else: # channel disconnect + log.debug(f"{chan} from {chan.uid} disconnected") + + except InternalActorError: + # ship internal errors upwards + if self._parent_chan: + await self._parent_chan.send( + {'error': traceback.format_exc(), 'cid': 'internal'}) + raise + except trio.ClosedResourceError: + log.error(f"{chan} form {chan.uid} broke") + except Exception: + # ship exception (from above code) to peer as an internal error + await chan.send( + {'error': traceback.format_exc(), 'cid': 'internal'}) + raise + finally: + log.debug(f"Exiting msg loop for {chan} from {chan.uid}") def _fork_main(self, accept_addr, parent_addr=None): # after fork routine which invokes a fresh ``trio.run`` @@ -324,7 +370,7 @@ class Actor: if self.loglevel is not None: get_console_log(self.loglevel) log.info( - f"Started new {ctx.current_process()} for actor {self.uid}") + f"Started new {ctx.current_process()} for {self.uid}") _state._current_actor = self log.debug(f"parent_addr is {parent_addr}") try: @@ -332,14 +378,15 @@ class Actor: self._async_main, accept_addr, parent_addr=parent_addr)) except KeyboardInterrupt: pass # handle it the same way trio does? - log.debug(f"Actor {self.uid} terminated") + log.info(f"Actor {self.uid} terminated") async def _async_main( self, accept_addr, arbiter_addr=None, parent_addr=None, - nursery=None + nursery=None, + task_status=trio.TASK_STATUS_IGNORED, ): """Start the channel server, maybe connect back to the parent, and start the main task. @@ -347,7 +394,6 @@ class Actor: A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ - result = None arbiter_addr = arbiter_addr or self._arb_addr registered_with_arbiter = False try: @@ -373,7 +419,6 @@ class Actor: # exception back to the parent actor) chan = self._parent_chan = Channel( destaddr=parent_addr, - on_reconnect=self.main ) await chan.connect() # initial handshake, report who we are, who they are @@ -382,7 +427,7 @@ class Actor: log.warn( f"Failed to connect to parent @ {parent_addr}," " closing server") - self.cancel_server() + await self.cancel() self._parent_chan = None # register with the arbiter if we're told its addr @@ -393,6 +438,7 @@ class Actor: name=self.name, sockaddr=self.accept_addr) registered_with_arbiter = True + task_status.started() # handle new connection back to parent optionally # begin responding to RPC if self._allow_rpc: @@ -401,38 +447,6 @@ class Actor: nursery.start_soon( self._process_messages, self._parent_chan) - if self.main: - try: - if self._parent_chan: - async with trio.open_nursery() as n: - self._main_scope = n.cancel_scope - log.debug(f"Starting main task `{self.main}`") - # spawned subactor so deliver "main" - # task result(s) back to parent - await n.start( - _invoke, 'main', - self._parent_chan, self.main, {}, - # treat_as_gen, raise_errs params - False, True - ) - else: - with trio.open_cancel_scope() as main_scope: - self._main_scope = main_scope - # run directly we are an "unspawned actor" - log.debug(f"Running `{self.main}` directly") - result = await self.main() - finally: - # tear down channel server in order to ensure - # we exit normally when the main task is done - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() - log.debug(f"Shutting down root nursery") - nursery.cancel_scope.cancel() - self._main_complete.set() - - if self._main_scope.cancelled_caught: - log.debug("Main task was cancelled sucessfully") log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -441,7 +455,7 @@ class Actor: if self._parent_chan: try: await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'main'}) + {'error': traceback.format_exc(), 'cid': 'internal'}) except trio.ClosedStreamError: log.error( f"Failed to ship error to parent " @@ -458,18 +472,18 @@ class Actor: # terminate actor once all it's peers (actors that connected # to it as clients) have disappeared if not self._no_more_peers.is_set(): - log.debug( - f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() + if any( + chan.connected() for chan in chain(*self._peers.values()) + ): + log.debug( + f"Waiting for remaining peers {self._peers} to clear") + await self._no_more_peers.wait() log.debug(f"All peer channels are complete") # tear down channel server no matter what since we errored # or completed - log.debug(f"Shutting down channel server") self.cancel_server() - return result - async def _serve_forever( self, *, @@ -514,20 +528,39 @@ class Actor: log.warn(f"Unable to unregister {self.name} from arbiter") async def cancel(self): - """This cancels the internal root-most nursery thereby gracefully - cancelling (for all intents and purposes) this actor. + """Cancel this actor. + + The sequence in order is: + - cancelling all rpc tasks + - cancelling the channel server + - cancel the "root" nursery """ + # cancel all ongoing rpc tasks + await self.cancel_rpc_tasks() self.cancel_server() - if self._main_scope: - self._main_scope.cancel() - log.debug("Waiting on main task to complete") - await self._main_complete.wait() self._root_nursery.cancel_scope.cancel() + async def cancel_rpc_tasks(self): + """Cancel all existing RPC responder tasks using the cancel scope + registered for each. + """ + scopes = self._rpc_tasks + log.info(f"Cancelling all {len(scopes)} rpc tasks:\n{scopes}") + for chan, scopes in scopes.items(): + log.debug(f"Cancelling all tasks for {chan.uid}") + for scope, func in scopes: + log.debug(f"Cancelling task for {func}") + scope.cancel() + if scopes: + log.info( + f"Waiting for remaining rpc tasks to complete {scopes}") + await self._no_more_rpc_tasks.wait() + def cancel_server(self): """Cancel the internal channel server nursery thereby preventing any new inbound connections from being established. """ + log.debug("Shutting down channel server") self._server_nursery.cancel_scope.cancel() @property @@ -568,7 +601,7 @@ class Arbiter(Actor): self._registry.pop(name, None) -async def _start_actor(actor, host, port, arbiter_addr, nursery=None): +async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): """Spawn a local actor by starting a task to execute it's main async function. @@ -582,16 +615,19 @@ async def _start_actor(actor, host, port, arbiter_addr, nursery=None): # NOTE: this won't block since we provide the nursery log.info(f"Starting local {actor} @ {host}:{port}") - result = await actor._async_main( - accept_addr=(host, port), - parent_addr=None, - arbiter_addr=arbiter_addr, - nursery=nursery, - ) - # XXX: If spawned locally, the actor is cancelled when this - # context is complete given that there are no more active - # peer channels connected to it. - if not actor._outlive_main: + async with trio.open_nursery() as nursery: + await nursery.start( + partial( + actor._async_main, + accept_addr=(host, port), + parent_addr=None, + arbiter_addr=arbiter_addr, + ) + ) + result = await main() + # XXX: If spawned locally, the actor is cancelled when this + # context is complete given that there are no more active + # peer channels connected to it. actor.cancel_server() # unset module state diff --git a/tractor/_portal.py b/tractor/_portal.py index b324069..ebae431 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -42,20 +42,6 @@ async def _do_handshake(actor, chan): return uid -async def result_from_q(q, chan): - """Process a msg from a remote actor. - """ - first_msg = await q.get() - if 'return' in first_msg: - return 'return', first_msg, q - elif 'yield' in first_msg: - return 'yield', first_msg, q - elif 'error' in first_msg: - raise RemoteActorError(f"{chan.uid}\n" + first_msg['error']) - else: - raise ValueError(f"{first_msg} is an invalid response packet?") - - class Portal: """A 'portal' to a(n) (remote) ``Actor``. @@ -67,7 +53,12 @@ class Portal: """ def __init__(self, channel): self.channel = channel + # when this is set to a tuple returned from ``_submit()`` then + # it is expected that ``result()`` will be awaited at some point + # during the portal's lifetime self._result = None + self._expect_result = None + self._errored = False async def aclose(self): log.debug(f"Closing {self}") @@ -75,26 +66,56 @@ class Portal: # gets in! await self.channel.aclose() - async def run(self, ns, func, **kwargs): - """Submit a function to be scheduled and run by actor, return its - (stream of) result(s). + async def _submit(self, ns, func, **kwargs): + """Submit a function to be scheduled and run by actor, return the + associated caller id, response queue, response type str, + first message packet as a tuple. + + This is an async call. """ + # ship a function call request to the remote actor + cid, q = await current_actor().send_cmd(self.channel, ns, func, kwargs) + + # wait on first response msg and handle (this should be + # in an immediate response) + first_msg = await q.get() + functype = first_msg.get('functype') + + if functype == 'function' or functype == 'asyncfunction': + resp_type = 'return' + elif functype == 'asyncgen': + resp_type = 'yield' + elif 'error' in first_msg: + raise RemoteActorError( + f"{self.channel.uid}\n" + first_msg['error']) + else: + raise ValueError(f"{first_msg} is an invalid response packet?") + + return cid, q, resp_type, first_msg + + async def _submit_for_result(self, ns, func, **kwargs): + assert self._expect_result is None, \ + "A pending main result has already been submitted" + self._expect_result = await self._submit(ns, func, **kwargs) + + async def run(self, ns, func, **kwargs): + """Submit a function to be scheduled and run by actor, wrap and return + its (stream of) result(s). + + This is a blocking call. + """ + return await self._return_from_resptype( + *(await self._submit(ns, func, **kwargs)) + ) + + async def _return_from_resptype(self, cid, q, resptype, first_msg): # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - actor = current_actor() - # ship a function call request to the remote actor - cid, q = await actor.send_cmd(self.channel, ns, func, kwargs) - # wait on first response msg and handle - return await self._return_from_resptype( - cid, *(await result_from_q(q, self.channel))) - - async def _return_from_resptype(self, cid, resptype, first_msg, q): if resptype == 'yield': async def yield_from_q(): - yield first_msg['yield'] try: async for msg in q: try: @@ -103,8 +124,9 @@ class Portal: if 'stop' in msg: break # far end async gen terminated else: - raise RemoteActorError(msg['error']) - except GeneratorExit: + raise RemoteActorError( + f"{self.channel.uid}\n" + msg['error']) + except StopAsyncIteration: log.debug( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") @@ -113,22 +135,24 @@ class Portal: return yield_from_q() elif resptype == 'return': - return first_msg['return'] + msg = await q.get() + try: + return msg['return'] + except KeyError: + raise RemoteActorError( + f"{self.channel.uid}\n" + msg['error']) else: raise ValueError(f"Unknown msg response type: {first_msg}") async def result(self): """Return the result(s) from the remote actor's "main" task. """ - if self._result is None: - q = current_actor().get_waitq(self.channel.uid, 'main') - resptype, first_msg, q = (await result_from_q(q, self.channel)) + if self._expect_result is None: + raise RuntimeError("This portal is not expecting a final result?") + elif self._result is None: self._result = await self._return_from_resptype( - 'main', resptype, first_msg, q) - log.warn( - f"Retrieved first result `{self._result}` " - f"for {self.channel.uid}") - # await q.put(first_msg) # for next consumer (e.g. nursery) + *self._expect_result + ) return self._result async def close(self): @@ -153,7 +177,9 @@ class Portal: log.warn( f"{self.channel} for {self.channel.uid} was already closed?") return False - + else: + log.warn(f"May have failed to cancel {self.channel.uid}") + return False class LocalPortal: """A 'portal' to a local ``Actor``. diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 08856f9..aa9becb 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -2,9 +2,10 @@ ``trio`` inspired apis and helpers """ import multiprocessing as mp +import inspect import trio -from async_generator import asynccontextmanager +from async_generator import asynccontextmanager, aclosing from ._state import current_actor from .log import get_logger, get_loglevel @@ -22,9 +23,9 @@ class ActorNursery: def __init__(self, actor, supervisor=None): self.supervisor = supervisor # TODO self._actor = actor - # We'll likely want some way to cancel all sub-actors eventually - # self.cancel_scope = cancel_scope self._children = {} + # portals spawned with ``run_in_actor()`` + self._cancel_after_result_on_exit = set() self.cancelled = False async def __aenter__(self): @@ -33,11 +34,9 @@ class ActorNursery: async def start_actor( self, name: str, - main=None, bind_addr=('127.0.0.1', 0), statespace=None, rpc_module_paths=None, - outlive_main=False, # sub-actors die when their main task completes loglevel=None, # set log level per subactor ): loglevel = loglevel or self._actor.loglevel or get_loglevel() @@ -46,8 +45,6 @@ class ActorNursery: # modules allowed to invoked funcs from rpc_module_paths=rpc_module_paths or [], statespace=statespace, # global proc state vars - main=main, # main coroutine to be invoked - outlive_main=outlive_main, loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, ) @@ -59,6 +56,11 @@ class ActorNursery: # daemon=True, name=name, ) + # register the process before start in case we get a cancel + # request before the actor has fully spawned - then we can wait + # for it to fully come up before sending a cancel request + self._children[actor.uid] = [actor, proc, None] + proc.start() if not proc.is_alive(): raise ActorFailure("Couldn't start sub-actor?") @@ -69,7 +71,39 @@ class ActorNursery: # local actor by the time we get a ref to it event, chan = await self._actor.wait_for_peer(actor.uid) portal = Portal(chan) - self._children[(name, proc.pid)] = (actor, proc, portal) + self._children[actor.uid][2] = portal + return portal + + async def run_in_actor( + self, + name, + fn, + bind_addr=('127.0.0.1', 0), + rpc_module_paths=None, + statespace=None, + loglevel=None, # set log level per subactor + **kwargs, # explicit args to ``fn`` + ): + """Spawn a new actor, run a lone task, then terminate the actor and + return its result. + + Actors spawned using this method are kept alive at nursery teardown + until the task spawned by executing ``fn`` completes at which point + the actor is terminated. + """ + mod_path = fn.__module__ + portal = await self.start_actor( + name, + rpc_module_paths=[mod_path], + bind_addr=bind_addr, + statespace=statespace, + ) + await portal._submit_for_result( + mod_path, + fn.__name__, + **kwargs + ) + self._cancel_after_result_on_exit.add(portal) return portal async def wait(self): @@ -82,27 +116,34 @@ class ActorNursery: # please god don't hang proc.join() log.debug(f"Joined {proc}") - event = self._actor._peers.get(actor.uid) - if isinstance(event, trio.Event): - event.set() - log.warn( - f"Cancelled `wait_for_peer()` call since {actor.uid}" - f" is already dead!") - if not portal._result: - log.debug(f"Faking result for {actor.uid}") - q = self._actor.get_waitq(actor.uid, 'main') - q.put_nowait({'return': None, 'cid': 'main'}) + self._children.pop(actor.uid) - async def wait_for_result(portal): - if portal.channel.connected(): - log.debug(f"Waiting on final result from {subactor.uid}") - await portal.result() + async def wait_for_result(portal, actor): + # cancel the actor gracefully + log.info(f"Cancelling {portal.channel.uid} gracefully") + await portal.cancel_actor() + + log.debug(f"Waiting on final result from {subactor.uid}") + res = await portal.result() + # if it's an async-gen then we should alert the user + # that we're cancelling it + if inspect.isasyncgen(res): + log.warn( + f"Blindly consuming asyncgen for {actor.uid}") + with trio.fail_after(1): + async with aclosing(res) as agen: + async for item in agen: + log.debug(f"Consuming item {item}") # unblocks when all waiter tasks have completed + children = self._children.copy() async with trio.open_nursery() as nursery: - for subactor, proc, portal in self._children.values(): + for subactor, proc, portal in children.values(): nursery.start_soon(wait_for_proc, proc, subactor, portal) - nursery.start_soon(wait_for_result, portal) + if proc.is_alive() and ( + portal in self._cancel_after_result_on_exit + ): + nursery.start_soon(wait_for_result, portal, subactor) async def cancel(self, hard_kill=False): """Cancel this nursery by instructing each subactor to cancel @@ -111,20 +152,37 @@ class ActorNursery: If ``hard_killl`` is set to ``True`` then kill the processes directly without any far end graceful ``trio`` cancellation. """ + def do_hard_kill(proc): + log.warn(f"Hard killing subactors {self._children}") + proc.terminate() + # XXX: below doesn't seem to work? + # send KeyBoardInterrupt (trio abort signal) to sub-actors + # os.kill(proc.pid, signal.SIGINT) + log.debug(f"Cancelling nursery") - for subactor, proc, portal in self._children.values(): - if proc is mp.current_process(): - # XXX: does this even make sense? - await subactor.cancel() - else: - if hard_kill: - log.warn(f"Hard killing subactors {self._children}") - proc.terminate() - # XXX: doesn't seem to work? - # send KeyBoardInterrupt (trio abort signal) to sub-actors - # os.kill(proc.pid, signal.SIGINT) - else: - await portal.cancel_actor() + with trio.fail_after(3): + async with trio.open_nursery() as n: + for subactor, proc, portal in self._children.values(): + if hard_kill: + do_hard_kill(proc) + else: + if portal is None: # actor hasn't fully spawned yet + event = self._actor._peer_connected[subactor.uid] + log.warn( + f"{subactor.uid} wasn't finished spawning?") + await event.wait() + # channel/portal should now be up + _, _, portal = self._children[subactor.uid] + if portal is None: + # cancelled while waiting on the event? + chan = self._actor._peers[subactor.uid][-1] + if chan: + portal = Portal(chan) + else: # there's no other choice left + do_hard_kill(proc) + + # spawn cancel tasks async + n.start_soon(portal.cancel_actor) log.debug(f"Waiting on all subactors to complete") await self.wait() @@ -134,32 +192,37 @@ class ActorNursery: async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ - if etype is not None: - # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case the - # else block here might not complete? Should both be shielded? - if etype is trio.Cancelled: + try: + if etype is not None: + # XXX: hypothetically an error could be raised and then + # a cancel signal shows up slightly after in which case the + # else block here might not complete? Should both be shielded? with trio.open_cancel_scope(shield=True): - log.warn( - f"{current_actor().uid} was cancelled with {etype}" - ", cancelling actor nursery") - await self.cancel() + if etype is trio.Cancelled: + log.warn( + f"{current_actor().uid} was cancelled with {etype}" + ", cancelling actor nursery") + await self.cancel() + else: + log.exception( + f"{current_actor().uid} errored with {etype}, " + "cancelling actor nursery") + await self.cancel() else: - log.exception( - f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") - await self.cancel() - else: - # XXX: this is effectively the lone cancellation/supervisor - # strategy which exactly mimicks trio's behaviour - log.debug(f"Waiting on subactors {self._children} to complete") - try: - await self.wait() - except Exception as err: - log.warn(f"Nursery caught {err}, cancelling") - await self.cancel() - raise - log.debug(f"Nursery teardown complete") + # XXX: this is effectively the lone cancellation/supervisor + # strategy which exactly mimicks trio's behaviour + log.debug(f"Waiting on subactors {self._children} to complete") + try: + await self.wait() + except Exception as err: + log.warn(f"Nursery caught {err}, cancelling") + await self.cancel() + raise + log.debug(f"Nursery teardown complete") + except Exception: + log.exception("Error on nursery exit:") + await self.wait() + raise @asynccontextmanager