From 36fd75e217a64b801ad473694e2164695c632e6c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 6 Jul 2018 02:36:21 -0400 Subject: [PATCH] Fix some bugs to get tests working Fix quite a few little bugs: - async gen func detection in `_invoke()` - always cancel channel server on main task exit - wait for remaining channel peers after unsub from arbiter - return result from main task(s) all the way up to `tractor.run()` Also add a `Portal.result()` for getting the final result(s) from the actor's main task and fix up a bunch of docs. --- tractor/__init__.py | 121 ++++++++++++++++++++++++++------------------ 1 file changed, 72 insertions(+), 49 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index cb7e121..6c0fde7 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -58,7 +58,10 @@ async def _invoke( if isinstance(func, partial): is_async_partial = inspect.iscoroutinefunction(func.func) - if not inspect.iscoroutinefunction(func) and not is_async_partial: + if ( + not inspect.iscoroutinefunction(func) and not is_async_partial + and not inspect.isasyncgenfunction(func) + ): await chan.send({'return': func(**kwargs), 'cid': cid}) else: coro = func(**kwargs) @@ -276,9 +279,6 @@ class Actor: cid = msg.get('cid') if cid: # deliver response to local caller/waiter self._push_result(chan.uid, cid, msg) - if 'error' in msg: - # TODO: need something better then this slop - raise RemoteActorError(msg['error']) log.debug(f"Waiting on next msg for {chan}") continue else: @@ -343,7 +343,8 @@ class Actor: parent_addr=None, nursery=None ): - """Start the channel server and main task. + """Start the channel server, maybe connect back to the parent, and + start the main task. A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. @@ -387,28 +388,24 @@ class Actor: name=self.name, sockaddr=self.accept_addr) if self.main: - if self._parent_chan: - log.debug(f"Starting main task `{self.main}`") - # start "main" routine in a task - await nursery.start( - _invoke, 'main', self._parent_chan, self.main, {}, - False, True # treat_as_gen, raise_errs params - ) - else: - # run directly - log.debug(f"Running `{self.main}` directly") - result = await self.main() + try: + if self._parent_chan: + log.debug(f"Starting main task `{self.main}`") + # start "main" routine in a task + await nursery.start( + _invoke, 'main', self._parent_chan, self.main, {}, + False, True # treat_as_gen, raise_errs params + ) + else: + # run directly + log.debug(f"Running `{self.main}` directly") + result = await self.main() - # terminate local in-proc once its main completes - log.debug( - f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() - log.debug(f"All peer channels are complete") - - # tear down channel server - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() + finally: + # tear down channel server + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -431,6 +428,12 @@ class Actor: except OSError: log.warn(f"Unable to unregister {self.name} from arbiter") + # terminate local in-proc once its main completes + log.debug( + f"Waiting for remaining peers {self._peers} to clear") + await self._no_more_peers.wait() + log.debug(f"All peer channels are complete") + return result async def _serve_forever( @@ -441,9 +444,10 @@ class Actor: accept_port=0, task_status=trio.TASK_STATUS_IGNORED ): - """Main coroutine: connect back to the parent, spawn main task, begin - listening for new messages. + """Start the channel server, begin listening for new connections. + This will cause an actor to continue living (blocking) until + ``cancel_server()`` is called. """ async with trio.open_nursery() as nursery: self._server_nursery = nursery @@ -454,6 +458,8 @@ class Actor: partial( trio.serve_tcp, self._stream_handler, + # new connections will stay alive even if this server + # is cancelled handler_nursery=self._root_nursery, port=accept_port, host=accept_host, ) @@ -543,13 +549,13 @@ class Portal: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - chan = self.channel - # ship a function call request to the remote actor actor = current_actor() + # ship a function call request to the remote actor + cid, q = await actor.send_cmd(self.channel, ns, func, kwargs) + # wait on first response msg and handle + return await self._return_from_resptype(cid, *(await result_from_q(q))) - cid, q = await actor.send_cmd(chan, ns, func, kwargs) - # wait on first response msg - resptype, first_msg, q = await result_from_q(q) + async def _return_from_resptype(self, cid, resptype, first_msg, q): if resptype == 'yield': @@ -562,7 +568,9 @@ class Portal: except KeyError: raise RemoteActorError(msg['error']) except GeneratorExit: - log.debug(f"Cancelling async gen call {cid} to {chan.uid}") + log.debug( + f"Cancelling async gen call {cid} to " + "{self.channel.uid}") return yield_from_q() @@ -571,12 +579,22 @@ class Portal: else: raise ValueError(f"Unknown msg response type: {first_msg}") + async def result(self): + """Return the result(s) from the remote actor's "main" task. + """ + q = current_actor().get_waitq(self.channel.uid, 'main') + first_msg = (await result_from_q(q)) + val = await self._return_from_resptype( + 'main', *first_msg) + await q.put(first_msg) # for next consumer (e.g. nursery) + return val + @asynccontextmanager async def open_portal(channel, nursery=None): """Open a ``Portal`` through the provided ``channel``. - Spawns a background task to handle rpc message processing. + Spawns a background task to handle message processing. """ actor = current_actor() assert actor @@ -643,12 +661,12 @@ class ActorNursery: statespace=None, rpc_module_paths=None, outlive_main=False, # sub-actors die when their main task completes - loglevel=None, # set console logging per subactor + loglevel=None, # set log level per subactor ): actor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths, + rpc_module_paths=rpc_module_paths or [], statespace=statespace, # global proc state vars main=main, # main coroutine to be invoked outlive_main=outlive_main, @@ -783,7 +801,7 @@ async def start_actor(actor, host, port, arbiter_addr, nursery=None): # NOTE: this won't block since we provide the nursery log.info(f"Starting local {actor} @ {host}:{port}") - await actor._async_main( + result = await actor._async_main( accept_addr=(host, port), parent_addr=None, arbiter_addr=arbiter_addr, @@ -799,11 +817,14 @@ async def start_actor(actor, host, port, arbiter_addr, nursery=None): _current_actor = None log.info("Completed async main") + return result + @asynccontextmanager async def _connect_chan(host, port): """Attempt to connect to an arbiter's channel server. - Return the channel on success or None on failure. + + Return the channel on success or ``None`` on failure. """ chan = Channel((host, port)) await chan.connect() @@ -837,8 +858,8 @@ async def find_actor( ): """Ask the arbiter to find actor(s) by name. - Returns a sequence of unconnected portals for each matching actor - known to the arbiter (client code is expected to connect the portals). + Returns a connected portal to the last registered matching actor + known to the arbiter. """ actor = current_actor() if not actor: @@ -847,14 +868,14 @@ async def find_actor( async with get_arbiter(*arbiter_sockaddr) as arb_portal: sockaddrs = await arb_portal.run('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just - # the first one we find + # the last one that registered if sockaddrs: sockaddr = sockaddrs[-1] async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal else: - yield + yield None async def _main(async_fn, args, kwargs, name, arbiter_addr): @@ -871,9 +892,9 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): except OSError: log.warn(f"No actor could be found @ {host}:{port}") + # create a local actor and start up its main routine/task if arbiter_found: # we were able to connect to an arbiter log.info(f"Arbiter seems to exist @ {host}:{port}") - # create a local actor and start up its main routine/task actor = Actor( name or 'anonymous', main=main, @@ -882,13 +903,15 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): host, port = (_default_arbiter_host, 0) else: # start this local actor as the arbiter + # this should eventually get passed `outlive_main=True`? actor = Arbiter(name or 'arbiter', main=main, **kwargs) - await start_actor(actor, host, port, arbiter_addr=arbiter_addr) - # Creates an internal nursery which shouldn't be cancelled even if - # the one opened below is (this is desirable because the arbiter should - # stay up until a re-election process has taken place - which is not - # implemented yet FYI). + # ``Actor._async_main()`` creates an internal nursery if one is not + # provided and thus blocks here until it's main task completes. + # Note that if the current actor is the arbiter it is desirable + # for it to stay up indefinitely until a re-election process has + # taken place - which is not implemented yet FYI). + return await start_actor(actor, host, port, arbiter_addr=arbiter_addr) def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs):