From 49573c9a03920c825ef8e637c20a383f5cc22ade Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Jul 2018 15:06:42 -0400 Subject: [PATCH] More fixes to do cancellation correctly Here is a bunch of code tightening to make sure cancellation works even if recently spawned actors haven't fully started up and the parent is cancelled. The fixes include: - passing the arbiter socket address to each actor - ensure all spawned actors respect the spawner's log level - handle process versus final `portal.result()` teardown in multiple tasks such that if a proc dies before shipping a result we don't wait - more detailed debug logging in teardown code paths - don't store peer connected events in the same `dict` as the peer channels - if necessary fake main task results on peer channel disconnect - warn when a `trio.Cancelled` is what causes a nursery to bail otherwise error - store the subactor portal in the nursery for teardown purposes - add dedicated `Portal.cancel_actor()` which acts as a "hard cancel" and never blocks (indefinitely) - add `Arbiter.unregister_actor()` it's more explicit what's being requested --- tractor/__init__.py | 305 +++++++++++++++++++++++++++----------------- 1 file changed, 190 insertions(+), 115 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 8da16f0..22fe303 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -1,5 +1,6 @@ """ -tracor: An actor model micro-framework. +tractor: An actor model micro-framework built on + ``trio`` and ``multiprocessing``. """ from collections import defaultdict from functools import partial @@ -26,6 +27,10 @@ _default_arbiter_port = 1616 _default_loglevel = None +def get_loglevel(): + return _default_loglevel + + class ActorFailure(Exception): "General actor failure" @@ -82,7 +87,7 @@ async def _invoke( # to_yield = await coro.asend(to_send) await chan.send({'yield': item, 'cid': cid}) - log.warn(f"Finished iterating {coro}") + log.debug(f"Finished iterating {coro}") # TODO: we should really support a proper # `StopAsyncIteration` system here for returning a final # value if desired @@ -101,11 +106,12 @@ async def _invoke( except Exception: if not raise_errs: await chan.send({'error': traceback.format_exc(), 'cid': cid}) + log.exception("Actor errored:") else: raise -async def result_from_q(q): +async def result_from_q(q, chan): """Process a msg from a remote actor. """ first_msg = await q.get() @@ -114,7 +120,7 @@ async def result_from_q(q): elif 'yield' in first_msg: return 'yield', first_msg, q elif 'error' in first_msg: - raise RemoteActorError(first_msg['error']) + raise RemoteActorError(f"{chan.uid}\n" + first_msg['error']) else: raise ValueError(f"{first_msg} is an invalid response packet?") @@ -151,6 +157,7 @@ class Actor: allow_rpc: bool = True, outlive_main: bool = False, loglevel: str = None, + arbiter_addr: (str, int) = None, ): self.name = name self.uid = (name, uid or str(uuid.uuid1())) @@ -163,9 +170,11 @@ class Actor: self._allow_rpc = allow_rpc self._outlive_main = outlive_main self.loglevel = loglevel + self._arb_addr = arbiter_addr # filled in by `_async_main` after fork self._peers = defaultdict(list) + self._peer_connected = {} self._no_more_peers = trio.Event() self._no_more_peers.set() self._actors2calls = {} # map {uids -> {callids -> waiter queues}} @@ -178,7 +187,7 @@ class Actor: ``uid``. """ log.debug(f"Waiting for peer {uid} to connect") - event = self._peers.setdefault(uid, trio.Event()) + event = self._peer_connected.setdefault(uid, trio.Event()) await event.wait() log.debug(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] @@ -210,23 +219,22 @@ class Actor: return # channel tracking - event_or_chans = self._peers.pop(uid, None) - if isinstance(event_or_chans, trio.Event): + event = self._peer_connected.pop(uid, None) + if event: # Instructing connection: this is likely a new channel to # a recently spawned actor which we'd like to control via # async-rpc calls. - log.debug(f"Waking channel waiters {event_or_chans.statistics()}") + log.debug(f"Waking channel waiters {event.statistics()}") # Alert any task waiting on this connection to come up - event_or_chans.set() - event_or_chans.clear() # consumer can wait on channel to close - elif isinstance(event_or_chans, list): - log.warn( - f"already have channel(s) for {uid}:{event_or_chans}?" - ) - # append new channel - self._peers[uid].extend(event_or_chans) + event.set() + chans = self._peers[uid] + if chans: + log.warn( + f"already have channel(s) for {uid}:{chans}?" + ) log.debug(f"Registered {chan} for {uid}") + # append new channel self._peers[uid].append(chan) # Begin channel management - respond to remote requests and @@ -235,20 +243,31 @@ class Actor: await self._process_messages(chan) finally: # Drop ref to channel so it can be gc-ed and disconnected - if chan is not self._parent_chan: - log.debug(f"Releasing channel {chan}") - chans = self._peers.get(chan.uid) - chans.remove(chan) - if chan.connected(): - log.debug(f"Disconnecting channel {chan}") - await chan.send(None) - await chan.aclose() - if not chans: - log.debug(f"No more channels for {chan.uid}") - self._peers.pop(chan.uid, None) - if not self._peers: # no more channels connected - self._no_more_peers.set() - log.debug(f"No more peer channels") + # if chan is not self._parent_chan: + log.debug(f"Releasing channel {chan} from {chan.uid}") + chans = self._peers.get(chan.uid) + chans.remove(chan) + if not chans: + log.debug(f"No more channels for {chan.uid}") + self._peers.pop(chan.uid, None) + if not self._actors2calls.get(chan.uid, {}).get('main'): + # fake a "main task" result for any waiting + # nurseries/portals + log.debug(f"Faking result for {chan} from {chan.uid}") + q = self.get_waitq(chan.uid, 'main') + q.put_nowait({'return': None, 'cid': 'main'}) + + log.debug(f"Peers is {self._peers}") + + if not self._peers: # no more channels connected + self._no_more_peers.set() + log.debug(f"Signalling no more peer channels") + + # XXX: is this necessary? + if chan.connected(): + log.debug(f"Disconnecting channel {chan}") + await chan.send(None) + await chan.aclose() async def _push_result(self, actorid, cid, msg): assert actorid, f"`actorid` can't be {actorid}" @@ -258,7 +277,7 @@ class Actor: await q.put(msg) def get_waitq(self, actorid, cid): - log.debug(f"Registering for callid {cid} queue results from {actorid}") + log.debug(f"Getting result queue for {actorid} cid {cid}") cids2qs = self._actors2calls.setdefault(actorid, {}) return cids2qs.setdefault(cid, trio.Queue(1000)) @@ -289,7 +308,8 @@ class Actor: f"Cancelling all tasks for {chan} from {chan.uid}") nursery.cancel_scope.cancel() log.debug( - f"Terminating msg loop for {chan} from {chan.uid}") + f"Msg loop signalled to terminate for" + f" {chan} from {chan.uid}") break log.debug(f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') @@ -340,7 +360,8 @@ class Actor: def _fork_main(self, accept_addr, parent_addr=None): # after fork routine which invokes a fresh ``trio.run`` - if self.loglevel: + # log.warn("Log level after fork is {self.loglevel}") + if self.loglevel is not None: get_console_log(self.loglevel) log.info( f"Started new {ctx.current_process()} for actor {self.uid}") @@ -357,7 +378,7 @@ class Actor: async def _async_main( self, accept_addr, - arbiter_addr=(_default_arbiter_host, _default_arbiter_port), + arbiter_addr=None, parent_addr=None, nursery=None ): @@ -368,6 +389,8 @@ class Actor: and when cancelled effectively cancels the actor. """ result = None + arbiter_addr = arbiter_addr or self._arb_addr + registered_with_arbiter = False try: async with maybe_open_nursery(nursery) as nursery: self._root_nursery = nursery @@ -378,17 +401,30 @@ class Actor: self._serve_forever, accept_host=host, accept_port=port) ) + # XXX: I wonder if a better name is maybe "requester" + # since I don't think the notion of a "parent" actor + # necessarily sticks given that eventually we want + # ``'MainProcess'`` (the actor who initially starts the + # forkserver) to eventually be the only one who is + # allowed to spawn new processes per Python program. if parent_addr is not None: - # Connect back to the parent actor and conduct initial - # handshake (From this point on if we error ship the - # exception back to the parent actor) - chan = self._parent_chan = Channel( - destaddr=parent_addr, - on_reconnect=self.main - ) - await chan.connect() - # initial handshake, report who we are, who they are - await _do_handshake(self, chan) + try: + # Connect back to the parent actor and conduct initial + # handshake (From this point on if we error ship the + # exception back to the parent actor) + chan = self._parent_chan = Channel( + destaddr=parent_addr, + on_reconnect=self.main + ) + await chan.connect() + # initial handshake, report who we are, who they are + await _do_handshake(self, chan) + except OSError: # failed to connect + log.warn( + f"Failed to connect to parent @ {parent_addr}," + " closing server") + self.cancel_server() + self._parent_chan = None # register with the arbiter if we're told its addr log.debug(f"Registering {self} for role `{self.name}`") @@ -396,6 +432,7 @@ class Actor: await arb_portal.run( 'self', 'register_actor', name=self.name, sockaddr=self.accept_addr) + registered_with_arbiter = True # handle new connection back to parent optionally # begin responding to RPC @@ -409,23 +446,26 @@ class Actor: try: if self._parent_chan: log.debug(f"Starting main task `{self.main}`") - # start "main" routine in a task + # spawned subactor so deliver "main" task result(s) + # back to parent await nursery.start( _invoke, 'main', self._parent_chan, self.main, {}, False, True # treat_as_gen, raise_errs params ) else: - # run directly + # run directly - we are an "unspawned actor" log.debug(f"Running `{self.main}` directly") result = await self.main() finally: - # tear down channel server + # tear down channel server in order to ensure + # we exit normally when the main task is done if not self._outlive_main: log.debug(f"Shutting down channel server") self.cancel_server() + log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is # cancelled or signalled by the parent actor) @@ -438,30 +478,27 @@ class Actor: log.error( f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") - log.exception("Actor errored:") + log.exception("Actor errored:") + + if not registered_with_arbiter: + log.exception( + f"Failed to register with arbiter @ {arbiter_addr}") else: raise finally: - # UNregister actor from the arbiter - try: - if arbiter_addr is not None: - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'unregister_actor', - name=self.name, sockaddr=self.accept_addr) - except OSError: - log.warn(f"Unable to unregister {self.name} from arbiter") - - # terminate local in-proc once its main completes - log.debug( - f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() + await self._do_unreg(arbiter_addr) + # terminate actor once all it's peers (actors that connected + # to it as clients) have disappeared + if not self._no_more_peers.is_set(): + log.debug( + f"Waiting for remaining peers {self._peers} to clear") + await self._no_more_peers.wait() log.debug(f"All peer channels are complete") - # tear down channel server - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() + # tear down channel server no matter what since we errored + # or completed + log.debug(f"Shutting down channel server") + self.cancel_server() return result @@ -498,7 +535,17 @@ class Actor: self._listeners.extend(listeners) task_status.started() - def cancel(self): + async def _do_unreg(self, arbiter_addr): + # UNregister actor from the arbiter + try: + if arbiter_addr is not None: + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'unregister_actor', name=self.name) + except OSError: + log.warn(f"Unable to unregister {self.name} from arbiter") + + async def cancel(self): """This cancels the internal root-most nursery thereby gracefully cancelling (for all intents and purposes) this actor. """ @@ -545,13 +592,8 @@ class Arbiter(Actor): def register_actor(self, name, sockaddr): self._registry[name].append(sockaddr) - def unregister_actor(self, name, sockaddr): - sockaddrs = self._registry.get(name) - if sockaddrs: - try: - sockaddrs.remove(sockaddr) - except ValueError: - pass + def unregister_actor(self, name): + self._registry.pop(name, None) class Portal: @@ -584,7 +626,8 @@ class Portal: # ship a function call request to the remote actor cid, q = await actor.send_cmd(self.channel, ns, func, kwargs) # wait on first response msg and handle - return await self._return_from_resptype(cid, *(await result_from_q(q))) + return await self._return_from_resptype( + cid, *(await result_from_q(q, self.channel))) async def _return_from_resptype(self, cid, resptype, first_msg, q): @@ -618,7 +661,7 @@ class Portal: """ if self._result is None: q = current_actor().get_waitq(self.channel.uid, 'main') - resptype, first_msg, q = (await result_from_q(q)) + resptype, first_msg, q = (await result_from_q(q, self.channel)) self._result = await self._return_from_resptype( 'main', resptype, first_msg, q) # await q.put(first_msg) # for next consumer (e.g. nursery) @@ -630,6 +673,21 @@ class Portal: log.debug(f"Closing portal for {chan} to {chan.uid}") await self.channel.send(None) + async def cancel_actor(self): + """Cancel the actor on the other end of this portal. + """ + log.warn( + f"Sending cancel request to {self.channel.uid} on " + f"{self.channel}") + try: + with trio.move_on_after(0.1) as cancel_scope: + cancel_scope.shield = True + # send cancel cmd - might not get response + await self.run('self', 'cancel') + except trio.ClosedStreamError: + log.warn( + f"{self.channel} for {self.channel.uid} was alreaday closed?") + @asynccontextmanager async def open_portal(channel, nursery=None): @@ -650,10 +708,6 @@ async def open_portal(channel, nursery=None): if channel.uid is None: await _do_handshake(actor, channel) - if not actor.get_chans(channel.uid): - # actor is not currently managing this channel - actor._peers[channel.uid].append(channel) - nursery.start_soon(actor._process_messages, channel) portal = Portal(channel) yield portal @@ -665,7 +719,6 @@ async def open_portal(channel, nursery=None): # cancel background msg loop task nursery.cancel_scope.cancel() if was_connected: - actor._peers[channel.uid].remove(channel) await channel.aclose() @@ -707,8 +760,9 @@ class ActorNursery: statespace=None, rpc_module_paths=None, outlive_main=False, # sub-actors die when their main task completes - loglevel=_default_loglevel, # set log level per subactor + loglevel=None, # set log level per subactor ): + loglevel = loglevel or self._actor.loglevel or get_loglevel() actor = Actor( name, # modules allowed to invoked funcs from @@ -717,6 +771,7 @@ class ActorNursery: main=main, # main coroutine to be invoked outlive_main=outlive_main, loglevel=loglevel, + arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr assert parent_addr @@ -735,26 +790,40 @@ class ActorNursery: # channel should have handshake completed by the # local actor by the time we get a ref to it event, chan = await self._actor.wait_for_peer(actor.uid) - # channel is up, get queue which delivers result from main routine - main_q = self._actor.get_waitq(actor.uid, 'main') - self._children[(name, proc.pid)] = (actor, proc, main_q) - - return Portal(chan) + portal = Portal(chan) + self._children[(name, proc.pid)] = (actor, proc, portal) + return portal async def wait(self): - async def wait_for_proc(proc): + async def wait_for_proc(proc, actor, portal): # TODO: timeout block here? if proc.is_alive(): await trio.hazmat.wait_readable(proc.sentinel) # please god don't hang proc.join() log.debug(f"Joined {proc}") + event = self._actor._peers.get(actor.uid) + if isinstance(event, trio.Event): + event.set() + log.warn( + f"Cancelled `wait_for_peer()` call since {actor.uid}" + f" is already dead!") + if not portal._result: + log.debug(f"Faking result for {actor.uid}") + q = self._actor.get_waitq(actor.uid, 'main') + q.put_nowait({'return': None, 'cid': 'main'}) + + async def wait_for_result(portal): + if portal.channel.connected(): + log.debug(f"Waiting on final result from {subactor.uid}") + await portal.result() # unblocks when all waiter tasks have completed async with trio.open_nursery() as nursery: - for subactor, proc, main_q in self._children.values(): - nursery.start_soon(wait_for_proc, proc) + for subactor, proc, portal in self._children.values(): + nursery.start_soon(wait_for_proc, proc, subactor, portal) + nursery.start_soon(wait_for_result, portal) async def cancel(self, hard_kill=False): """Cancel this nursery by instructing each subactor to cancel @@ -764,7 +833,7 @@ class ActorNursery: directly without any far end graceful ``trio`` cancellation. """ log.debug(f"Cancelling nursery") - for subactor, proc, main_q in self._children.values(): + for subactor, proc, portal in self._children.values(): if proc is mp.current_process(): # XXX: does this even make sense? await subactor.cancel() @@ -776,15 +845,8 @@ class ActorNursery: # send KeyBoardInterrupt (trio abort signal) to sub-actors # os.kill(proc.pid, signal.SIGINT) else: - # send cancel cmd - likely no response from subactor - actor = self._actor - chans = actor.get_chans(subactor.uid) - if chans: - for chan in chans: - await actor.send_cmd(chan, 'self', 'cancel', {}) - else: - log.warn( - f"Channel for {subactor.uid} is already down?") + await portal.cancel_actor() + log.debug(f"Waiting on all subactors to complete") await self.wait() log.debug(f"All subactors for {self} have terminated") @@ -792,10 +854,10 @@ class ActorNursery: async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ - async def wait_for_actor(actor, proc, q): + async def wait_for_actor(actor, proc, portal): if proc.is_alive(): - ret_type, msg, q = await result_from_q(q) - log.info(f"{actor.uid} main task completed with {msg}") + res = await portal.result() + log.info(f"{actor.uid} main task completed with {res}") if not actor._outlive_main: # trigger msg loop to break chans = self._actor.get_chans(actor.uid) @@ -804,14 +866,20 @@ class ActorNursery: await chan.send(None) if etype is not None: - log.exception(f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") - await self.cancel() + if etype is trio.Cancelled: + log.warn(f"{current_actor().uid} was cancelled with {etype}, " + "cancelling actor nursery") + with trio.open_cancel_scope(shield=True): + await self.cancel() + else: + log.exception(f"{current_actor().uid} errored with {etype}, " + "cancelling actor nursery") + await self.cancel() else: log.debug(f"Waiting on subactors {self._children} to complete") async with trio.open_nursery() as nursery: - for subactor, proc, main_q in self._children.values(): - nursery.start_soon(wait_for_actor, subactor, proc, main_q) + for subactor, proc, portal in self._children.values(): + nursery.start_soon(wait_for_actor, subactor, proc, portal) await self.wait() log.debug(f"Nursery teardown complete") @@ -824,7 +892,7 @@ def current_actor() -> Actor: @asynccontextmanager -async def open_nursery(supervisor=None, loglevel='WARNING'): +async def open_nursery(supervisor=None): """Create and yield a new ``ActorNursery``. """ actor = current_actor() @@ -908,7 +976,7 @@ async def get_arbiter(host, port): @asynccontextmanager async def find_actor( name, - arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port) + arbiter_sockaddr=None, ): """Ask the arbiter to find actor(s) by name. @@ -919,7 +987,7 @@ async def find_actor( if not actor: raise RuntimeError("No actor instance has been defined yet?") - async with get_arbiter(*arbiter_sockaddr) as arb_portal: + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: sockaddrs = await arb_portal.run('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just # the last one that registered @@ -938,7 +1006,7 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): main = partial(async_fn, *args) if async_fn else None arbiter_addr = (host, port) = arbiter_addr or ( _default_arbiter_host, _default_arbiter_port) - get_console_log(kwargs.get('loglevel', _default_loglevel)) + get_console_log(kwargs.get('loglevel', get_loglevel())) # make a temporary connection to see if an arbiter exists arbiter_found = False @@ -956,11 +1024,12 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): main=main, **kwargs ) - host, port = (_default_arbiter_host, 0) + host, port = (host, 0) else: # start this local actor as the arbiter # this should eventually get passed `outlive_main=True`? - actor = Arbiter(name or 'arbiter', main=main, **kwargs) + actor = Arbiter( + name or 'arbiter', main=main, arbiter_addr=arbiter_addr, **kwargs) # ``Actor._async_main()`` creates an internal nursery if one is not # provided and thus blocks here until it's main task completes. @@ -970,7 +1039,13 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr) -def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): +def run( + async_fn, + *args, + name=None, + arbiter_addr=(_default_arbiter_host, _default_arbiter_port), + **kwargs +): """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor.