diff --git a/tractor/__init__.py b/tractor/__init__.py index 6c0fde7..8da16f0 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -23,6 +23,7 @@ log = get_logger('tractor') _current_actor = None _default_arbiter_host = '127.0.0.1' _default_arbiter_port = 1616 +_default_loglevel = None class ActorFailure(Exception): @@ -55,12 +56,16 @@ async def _invoke( """ try: is_async_partial = False + is_async_gen_partial = False if isinstance(func, partial): is_async_partial = inspect.iscoroutinefunction(func.func) + is_async_gen_partial = inspect.isasyncgenfunction(func.func) if ( - not inspect.iscoroutinefunction(func) and not is_async_partial - and not inspect.isasyncgenfunction(func) + not inspect.iscoroutinefunction(func) and + not inspect.isasyncgenfunction(func) and + not is_async_partial and + not is_async_gen_partial ): await chan.send({'return': func(**kwargs), 'cid': cid}) else: @@ -76,6 +81,12 @@ async def _invoke( # if to_send is not None: # to_yield = await coro.asend(to_send) await chan.send({'yield': item, 'cid': cid}) + + log.warn(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + await chan.send({'stop': None, 'cid': cid}) else: if treat_as_gen: # XXX: the async-func may spawn further tasks which push @@ -139,6 +150,7 @@ class Actor: uid: str = None, allow_rpc: bool = True, outlive_main: bool = False, + loglevel: str = None, ): self.name = name self.uid = (name, uid or str(uuid.uuid1())) @@ -150,6 +162,7 @@ class Actor: self.statespace = statespace self._allow_rpc = allow_rpc self._outlive_main = outlive_main + self.loglevel = loglevel # filled in by `_async_main` after fork self._peers = defaultdict(list) @@ -226,6 +239,10 @@ class Actor: log.debug(f"Releasing channel {chan}") chans = self._peers.get(chan.uid) chans.remove(chan) + if chan.connected(): + log.debug(f"Disconnecting channel {chan}") + await chan.send(None) + await chan.aclose() if not chans: log.debug(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) @@ -233,15 +250,12 @@ class Actor: self._no_more_peers.set() log.debug(f"No more peer channels") - def _push_result(self, actorid, cid, msg): + async def _push_result(self, actorid, cid, msg): assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") - waiters = q.statistics().tasks_waiting_get - if not waiters: - log.warn( - f"No tasks are currently waiting for results from call {cid}?") - q.put_nowait(msg) + # maintain backpressure + await q.put(msg) def get_waitq(self, actorid, cid): log.debug(f"Registering for callid {cid} queue results from {actorid}") @@ -266,20 +280,23 @@ class Actor: """ # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! - log.debug(f"Entering msg loop for {chan}") + log.debug(f"Entering msg loop for {chan} from {chan.uid}") async with trio.open_nursery() as nursery: try: async for msg in chan.aiter_recv(): if msg is None: # terminate sentinel - log.debug(f"Cancelling all tasks for {chan}") + log.debug( + f"Cancelling all tasks for {chan} from {chan.uid}") nursery.cancel_scope.cancel() - log.debug(f"Terminating msg loop for {chan}") + log.debug( + f"Terminating msg loop for {chan} from {chan.uid}") break - log.debug(f"Received msg {msg}") + log.debug(f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') if cid: # deliver response to local caller/waiter - self._push_result(chan.uid, cid, msg) - log.debug(f"Waiting on next msg for {chan}") + await self._push_result(chan.uid, cid, msg) + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") continue else: ns, funcname, kwargs, actorid, cid = msg['cmd'] @@ -312,22 +329,23 @@ class Actor: _invoke, cid, chan, func, kwargs, treat_as_gen, name=funcname ) - log.debug(f"Waiting on next msg for {chan}") + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") else: # channel disconnect - log.debug(f"{chan} disconnected") + log.debug(f"{chan} from {chan.uid} disconnected") except trio.ClosedStreamError: - log.error(f"{chan} broke") + log.error(f"{chan} form {chan.uid} broke") - log.debug(f"Exiting msg loop for {chan}") + log.debug(f"Exiting msg loop for {chan} from {chan.uid}") - def _fork_main(self, accept_addr, parent_addr=None, loglevel=None): + def _fork_main(self, accept_addr, parent_addr=None): # after fork routine which invokes a fresh ``trio.run`` + if self.loglevel: + get_console_log(self.loglevel) log.info( f"Started new {ctx.current_process()} for actor {self.uid}") global _current_actor _current_actor = self - if loglevel: - get_console_log(loglevel) log.debug(f"parent_addr is {parent_addr}") try: trio.run(partial( @@ -372,6 +390,13 @@ class Actor: # initial handshake, report who we are, who they are await _do_handshake(self, chan) + # register with the arbiter if we're told its addr + log.debug(f"Registering {self} for role `{self.name}`") + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'register_actor', + name=self.name, sockaddr=self.accept_addr) + # handle new connection back to parent optionally # begin responding to RPC if self._allow_rpc: @@ -380,20 +405,14 @@ class Actor: nursery.start_soon( self._process_messages, self._parent_chan) - # register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'register_actor', - name=self.name, sockaddr=self.accept_addr) - if self.main: try: if self._parent_chan: log.debug(f"Starting main task `{self.main}`") # start "main" routine in a task await nursery.start( - _invoke, 'main', self._parent_chan, self.main, {}, + _invoke, 'main', + self._parent_chan, self.main, {}, False, True # treat_as_gen, raise_errs params ) else: @@ -412,9 +431,14 @@ class Actor: # cancelled or signalled by the parent actor) except Exception: if self._parent_chan: + try: + await self._parent_chan.send( + {'error': traceback.format_exc(), 'cid': 'main'}) + except trio.ClosedStreamError: + log.error( + f"Failed to ship error to parent " + f"{self._parent_chan.uid}, channel was closed") log.exception("Actor errored:") - await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'main'}) else: raise finally: @@ -423,7 +447,7 @@ class Actor: if arbiter_addr is not None: async with get_arbiter(*arbiter_addr) as arb_portal: await arb_portal.run( - 'self', 'register_actor', + 'self', 'unregister_actor', name=self.name, sockaddr=self.accept_addr) except OSError: log.warn(f"Unable to unregister {self.name} from arbiter") @@ -434,6 +458,11 @@ class Actor: await self._no_more_peers.wait() log.debug(f"All peer channels are complete") + # tear down channel server + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() + return result async def _serve_forever( @@ -473,6 +502,7 @@ class Actor: """This cancels the internal root-most nursery thereby gracefully cancelling (for all intents and purposes) this actor. """ + self.cancel_server() self._root_nursery.cancel_scope.cancel() def cancel_server(self): @@ -535,6 +565,7 @@ class Portal: """ def __init__(self, channel): self.channel = channel + self._result = None async def aclose(self): log.debug(f"Closing {self}") @@ -566,11 +597,14 @@ class Portal: try: yield msg['yield'] except KeyError: - raise RemoteActorError(msg['error']) + if 'stop' in msg: + break # far end async gen terminated + else: + raise RemoteActorError(msg['error']) except GeneratorExit: log.debug( f"Cancelling async gen call {cid} to " - "{self.channel.uid}") + f"{self.channel.uid}") return yield_from_q() @@ -582,12 +616,19 @@ class Portal: async def result(self): """Return the result(s) from the remote actor's "main" task. """ - q = current_actor().get_waitq(self.channel.uid, 'main') - first_msg = (await result_from_q(q)) - val = await self._return_from_resptype( - 'main', *first_msg) - await q.put(first_msg) # for next consumer (e.g. nursery) - return val + if self._result is None: + q = current_actor().get_waitq(self.channel.uid, 'main') + resptype, first_msg, q = (await result_from_q(q)) + self._result = await self._return_from_resptype( + 'main', resptype, first_msg, q) + # await q.put(first_msg) # for next consumer (e.g. nursery) + return self._result + + async def close(self): + # trigger remote msg loop `break` + chan = self.channel + log.debug(f"Closing portal for {chan} to {chan.uid}") + await self.channel.send(None) @asynccontextmanager @@ -614,7 +655,12 @@ async def open_portal(channel, nursery=None): actor._peers[channel.uid].append(channel) nursery.start_soon(actor._process_messages, channel) - yield Portal(channel) + portal = Portal(channel) + yield portal + + # cancel remote channel-msg loop + if channel.connected(): + await portal.close() # cancel background msg loop task nursery.cancel_scope.cancel() @@ -644,7 +690,7 @@ class ActorNursery: """Spawn scoped subprocess actors. """ def __init__(self, actor, supervisor=None): - self.supervisor = supervisor + self.supervisor = supervisor # TODO self._actor = actor # We'll likely want some way to cancel all sub-actors eventually # self.cancel_scope = cancel_scope @@ -661,7 +707,7 @@ class ActorNursery: statespace=None, rpc_module_paths=None, outlive_main=False, # sub-actors die when their main task completes - loglevel=None, # set log level per subactor + loglevel=_default_loglevel, # set log level per subactor ): actor = Actor( name, @@ -670,19 +716,21 @@ class ActorNursery: statespace=statespace, # global proc state vars main=main, # main coroutine to be invoked outlive_main=outlive_main, + loglevel=loglevel, ) parent_addr = self._actor.accept_addr assert parent_addr proc = ctx.Process( target=actor._fork_main, - args=(bind_addr, parent_addr, loglevel), - daemon=True, + args=(bind_addr, parent_addr), + # daemon=True, name=name, ) proc.start() if not proc.is_alive(): raise ActorFailure("Couldn't start sub-actor?") + log.info(f"Started {proc}") # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it @@ -709,6 +757,12 @@ class ActorNursery: nursery.start_soon(wait_for_proc, proc) async def cancel(self, hard_kill=False): + """Cancel this nursery by instructing each subactor to cancel + iteslf and wait for all subprocesses to terminate. + + If ``hard_killl`` is set to ``True`` then kill the processes + directly without any far end graceful ``trio`` cancellation. + """ log.debug(f"Cancelling nursery") for subactor, proc, main_q in self._children.values(): if proc is mp.current_process(): @@ -718,8 +772,8 @@ class ActorNursery: if hard_kill: log.warn(f"Hard killing subactors {self._children}") proc.terminate() - # send KeyBoardInterrupt (trio abort signal) to underlying - # sub-actors + # XXX: doesn't seem to work? + # send KeyBoardInterrupt (trio abort signal) to sub-actors # os.kill(proc.pid, signal.SIGINT) else: # send cancel cmd - likely no response from subactor @@ -750,11 +804,11 @@ class ActorNursery: await chan.send(None) if etype is not None: - log.warn(f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") + log.exception(f"{current_actor().uid} errored with {etype}, " + "cancelling actor nursery") await self.cancel() else: - log.debug(f"Waiting on subactors to complete") + log.debug(f"Waiting on subactors {self._children} to complete") async with trio.open_nursery() as nursery: for subactor, proc, main_q in self._children.values(): nursery.start_soon(wait_for_actor, subactor, proc, main_q) @@ -786,7 +840,7 @@ class NoArbiterFound(Exception): "Couldn't find the arbiter?" -async def start_actor(actor, host, port, arbiter_addr, nursery=None): +async def _start_actor(actor, host, port, arbiter_addr, nursery=None): """Spawn a local actor by starting a task to execute it's main async function. @@ -884,6 +938,8 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): main = partial(async_fn, *args) if async_fn else None arbiter_addr = (host, port) = arbiter_addr or ( _default_arbiter_host, _default_arbiter_port) + get_console_log(kwargs.get('loglevel', _default_loglevel)) + # make a temporary connection to see if an arbiter exists arbiter_found = False try: @@ -911,7 +967,7 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): # Note that if the current actor is the arbiter it is desirable # for it to stay up indefinitely until a re-election process has # taken place - which is not implemented yet FYI). - return await start_actor(actor, host, port, arbiter_addr=arbiter_addr) + return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr) def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs):