diff --git a/piker/tractor.py b/piker/tractor.py index 7ead604..0023fba 100644 --- a/piker/tractor.py +++ b/piker/tractor.py @@ -52,23 +52,24 @@ async def _invoke( """Invoke local func and return results over provided channel. """ try: - is_async_func = False + is_async_partial = False if isinstance(func, partial): - is_async_func = inspect.iscoroutinefunction(func.func) + is_async_partial = inspect.iscoroutinefunction(func.func) - if not inspect.iscoroutinefunction(func) and not is_async_func: + if not inspect.iscoroutinefunction(func) and not is_async_partial: await chan.send({'return': func(**kwargs), 'cid': cid}) else: coro = func(**kwargs) if inspect.isasyncgen(coro): - # await chan.send('gen') async for item in coro: # TODO: can we send values back in here? - # How do we do it, spawn another task? - # to_send = await chan.recv() + # it's gonna require a `while True:` and + # some non-blocking way to retrieve new `asend()` + # values from the channel: + # to_send = await chan.recv_nowait() # if to_send is not None: - # await coro.send(to_send) + # to_yield = await coro.asend(to_send) await chan.send({'yield': item, 'cid': cid}) else: if treat_as_gen: @@ -276,9 +277,13 @@ class Actor: _invoke, cid, chan, func, kwargs, treat_as_gen, name=funcname ) + else: # channel disconnect + log.warn(f"Cancelling all tasks for {chan}") + nursery.cancel_scope.cancel() + log.debug(f"Exiting msg loop for {chan}") - def _fork_main(self, accept_addr, parent_addr=None, loglevel=None): + def _fork_main(self, accept_addr, parent_addr=None, loglevel='debug'): # after fork routine which invokes a fresh ``trio.run`` log.info( f"Started new {ctx.current_process()} for actor {self.uid}") @@ -287,8 +292,11 @@ class Actor: if loglevel: get_console_log(loglevel) log.debug(f"parent_addr is {parent_addr}") - trio.run( - partial(self._async_main, accept_addr, parent_addr=parent_addr)) + try: + trio.run(partial( + self._async_main, accept_addr, parent_addr=parent_addr)) + except KeyboardInterrupt: + pass # handle it the same way trio does? log.debug(f"Actor {self.uid} terminated") async def _async_main(self, accept_addr, parent_addr=None, nursery=None): @@ -415,8 +423,10 @@ class Actor: def accept_addr(self): """Primary address to which the channel server is bound. """ - return self._listeners[0].socket.getsockname() \ - if self._listeners else None + try: + return self._listeners[0].socket.getsockname() + except OSError: + return def get_parent(self): return Portal(self._parent_chan) @@ -495,11 +505,14 @@ class Portal: async def yield_from_q(): yield first_msg['yield'] - async for msg in q: - try: - yield msg['yield'] - except KeyError: - raise RemoteActorError(msg['error']) + try: + async for msg in q: + try: + yield msg['yield'] + except KeyError: + raise RemoteActorError(msg['error']) + except GeneratorExit: + log.warn(f"Cancelling async gen call {cid} to {chan.uid}") return yield_from_q() @@ -558,6 +571,7 @@ class ActorNursery: outlive_main=outlive_main, ) parent_addr = self._actor.accept_addr + assert parent_addr proc = ctx.Process( target=actor._fork_main, args=(bind_addr, parent_addr, loglevel), @@ -611,14 +625,10 @@ class ActorNursery: actor = self._actor chan = actor.get_chan(subactor.uid) if chan: - cid, q = await actor.send_cmd( - chan, # channel lookup - 'self', - 'cancel', - {}, - ) + await actor.send_cmd(chan, 'self', 'cancel', {}) else: - log.warn(f"Channel for {subactor.uid} is already down?") + log.warn( + f"Channel for {subactor.uid} is already down?") log.debug(f"Waiting on all subactors to complete") await self.wait() log.debug(f"All subactors for {self} have terminated") @@ -637,7 +647,7 @@ class ActorNursery: if etype is not None: log.warn(f"{current_actor().uid} errored with {etype}, " - "cancelling nursery") + "cancelling actor nursery") await self.cancel() else: log.debug(f"Waiting on subactors to complete") @@ -687,7 +697,7 @@ class NoArbiterFound: @asynccontextmanager -async def get_arbiter(host='127.0.0.1', port=1616, main=None, **kwargs): +async def get_arbiter(host='127.0.0.1', port=1617, main=None, **kwargs): actor = current_actor() if actor and not actor.is_arbiter: try: @@ -698,16 +708,11 @@ async def get_arbiter(host='127.0.0.1', port=1616, main=None, **kwargs): raise NoArbiterFound(err) else: if actor and actor.is_arbiter: - # we're already the arbiter (re-entrant call from the arbiter actor) + # we're already the arbiter + # (likely a re-entrant call from the arbiter actor) yield LocalPortal(actor) else: - arbiter = Arbiter( - 'arbiter', - # rpc_module_paths=[], # the arbiter doesn't allow module rpc - # statespace={}, # global proc state vars - main=main, # main coroutine to be invoked - **kwargs, - ) + arbiter = Arbiter('arbiter', main=main, **kwargs) # assign process-local actor global _current_actor _current_actor = arbiter @@ -754,10 +759,10 @@ async def _main(async_fn, args, kwargs, name): # implemented yet FYI). async with get_arbiter( host=kwargs.pop('arbiter_host', '127.0.0.1'), - port=kwargs.pop('arbiter_port', 1616), + port=kwargs.pop('arbiter_port', 1617), main=main, **kwargs, - ) as portal: + ): if not current_actor().is_arbiter: # create a local actor and start it up its main routine actor = Actor( @@ -775,6 +780,10 @@ async def _main(async_fn, args, kwargs, name): # block waiting for the arbiter main task to complete pass + # unset module state + global _current_actor + _current_actor = None + def run(async_fn, *args, arbiter_host=None, name='anonymous', **kwargs): """Run a trio-actor async function in process.