diff --git a/tractor/__init__.py b/tractor/__init__.py index 2342a0a..9658b80 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -14,7 +14,7 @@ import uuid import trio from async_generator import asynccontextmanager -from .ipc import Channel +from .ipc import Channel, _connect_chan from .log import get_console_log, get_logger ctx = mp.get_context("forkserver") @@ -245,7 +245,6 @@ class Actor: await self._process_messages(chan) finally: # Drop ref to channel so it can be gc-ed and disconnected - # if chan is not self._parent_chan: log.debug(f"Releasing channel {chan} from {chan.uid}") chans = self._peers.get(chan.uid) chans.remove(chan) @@ -450,27 +449,29 @@ class Actor: try: if self._parent_chan: log.debug(f"Starting main task `{self.main}`") - # spawned subactor so deliver "main" task result(s) - # back to parent + # spawned subactor so deliver "main" + # task result(s) back to parent await nursery.start( _invoke, 'main', self._parent_chan, self.main, {}, - False, True # treat_as_gen, raise_errs params + # treat_as_gen, raise_errs params + False, True ) else: # run directly - we are an "unspawned actor" log.debug(f"Running `{self.main}` directly") result = await self.main() - finally: + self._main_complete.set() # tear down channel server in order to ensure # we exit normally when the main task is done if not self._outlive_main: log.debug(f"Shutting down channel server") self.cancel_server() + log.debug(f"Shutting down root nursery") + nursery.cancel_scope.cancel() if main_scope.cancelled_caught: log.debug("Main task was cancelled sucessfully") - self._main_complete.set() log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is @@ -674,6 +675,9 @@ class Portal: resptype, first_msg, q = (await result_from_q(q, self.channel)) self._result = await self._return_from_resptype( 'main', resptype, first_msg, q) + log.warn( + f"Retrieved first result `{self._result}` " + f"for {self.channel.uid}") # await q.put(first_msg) # for next consumer (e.g. nursery) return self._result @@ -760,6 +764,7 @@ class ActorNursery: # We'll likely want some way to cancel all sub-actors eventually # self.cancel_scope = cancel_scope self._children = {} + self.cancelled = False async def __aenter__(self): return self @@ -867,17 +872,6 @@ class ActorNursery: async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ - async def wait_for_actor(actor, proc, portal): - if proc.is_alive(): - res = await portal.result() - log.info(f"{actor.uid} main task completed with {res}") - if not actor._outlive_main: - # trigger msg loop to break - chans = self._actor.get_chans(actor.uid) - for chan in chans: - log.info(f"Signalling msg loop exit for {actor.uid}") - await chan.send(None) - if etype is not None: if etype is trio.Cancelled: log.warn(f"{current_actor().uid} was cancelled with {etype}, " @@ -889,13 +883,17 @@ class ActorNursery: "cancelling actor nursery") await self.cancel() else: + # XXX: this is effectively the lone cancellation/supervisor + # strategy which exactly mimicks trio's behaviour log.debug(f"Waiting on subactors {self._children} to complete") - async with trio.open_nursery() as nursery: - for subactor, proc, portal in self._children.values(): - nursery.start_soon(wait_for_actor, subactor, proc, portal) - - await self.wait() - log.debug(f"Nursery teardown complete") + try: + await self.wait() + except Exception as err: + log.warn(f"Nursery caught {err}, cancelling") + await self.cancel() + self.cancelled = True + raise + log.debug(f"Nursery teardown complete") def current_actor() -> Actor: @@ -955,18 +953,6 @@ async def _start_actor(actor, host, port, arbiter_addr, nursery=None): return result -@asynccontextmanager -async def _connect_chan(host, port): - """Attempt to connect to an arbiter's channel server. - - Return the channel on success or ``None`` on failure. - """ - chan = Channel((host, port)) - await chan.connect() - yield chan - await chan.aclose() - - @asynccontextmanager async def get_arbiter(host, port): """Return a portal instance connected to a local or remote