diff --git a/tractor/__init__.py b/tractor/__init__.py index 938a404..18b6bc9 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -77,21 +77,33 @@ async def _invoke( coro = func(**kwargs) if inspect.isasyncgen(coro): - async for item in coro: - # TODO: can we send values back in here? - # it's gonna require a `while True:` and - # some non-blocking way to retrieve new `asend()` - # values from the channel: - # to_send = await chan.recv_nowait() - # if to_send is not None: - # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) + with trio.open_cancel_scope() as cs: + async for item in coro: + # TODO: can we send values back in here? + # it's gonna require a `while True:` and + # some non-blocking way to retrieve new `asend()` + # values from the channel: + # to_send = await chan.recv_nowait() + # if to_send is not None: + # to_yield = await coro.asend(to_send) - log.debug(f"Finished iterating {coro}") - # TODO: we should really support a proper - # `StopAsyncIteration` system here for returning a final - # value if desired - await chan.send({'stop': None, 'cid': cid}) + # XXX: massive gotcha! If the containing scope + # is cancelled and we execute the below line, + # any ``ActorNursery.__aexit__()`` WON'T be + # triggered in the underlying async gen! So we + # have to shield here (which shouldn't matter) + # in order to be sure the cancel is propagated! + cs.shield = True + await chan.send({'yield': item, 'cid': cid}) + cs.shield = False + + log.debug(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + cs.shield = True + await chan.send({'stop': None, 'cid': cid}) + cs.shield = False else: if treat_as_gen: # XXX: the async-func may spawn further tasks which push @@ -102,14 +114,15 @@ async def _invoke( else: await chan.send({'return': await coro, 'cid': cid}) - task_status.started() except Exception: + log.exception("Actor errored:") if not raise_errs: await chan.send({'error': traceback.format_exc(), 'cid': cid}) - log.exception("Actor errored:") else: raise + task_status.started() + async def result_from_q(q, chan): """Process a msg from a remote actor. @@ -444,33 +457,36 @@ class Actor: self._process_messages, self._parent_chan) if self.main: - with trio.open_cancel_scope() as main_scope: - self._main_scope = main_scope - try: - if self._parent_chan: + try: + if self._parent_chan: + async with trio.open_nursery() as n: + self._main_scope = n.cancel_scope log.debug(f"Starting main task `{self.main}`") # spawned subactor so deliver "main" # task result(s) back to parent - await nursery.start( + await n.start( _invoke, 'main', self._parent_chan, self.main, {}, # treat_as_gen, raise_errs params False, True ) - else: - # run directly - we are an "unspawned actor" + else: + with trio.open_cancel_scope() as main_scope: + self._main_scope = main_scope + # run directly we are an "unspawned actor" log.debug(f"Running `{self.main}` directly") result = await self.main() - finally: - self._main_complete.set() - # tear down channel server in order to ensure - # we exit normally when the main task is done - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() - log.debug(f"Shutting down root nursery") - nursery.cancel_scope.cancel() - if main_scope.cancelled_caught: + finally: + # tear down channel server in order to ensure + # we exit normally when the main task is done + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() + log.debug(f"Shutting down root nursery") + nursery.cancel_scope.cancel() + self._main_complete.set() + + if self._main_scope.cancelled_caught: log.debug("Main task was cancelled sucessfully") log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until @@ -659,6 +675,7 @@ class Portal: log.debug( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") + raise return yield_from_q() @@ -874,14 +891,19 @@ class ActorNursery: """Wait on all subactor's main routines to complete. """ if etype is not None: + # XXX: hypothetically an error could be raised and then + # a cancel signal shows up slightly after in which case the + # else block here might not complete? Should both be shielded? if etype is trio.Cancelled: - log.warn(f"{current_actor().uid} was cancelled with {etype}, " - "cancelling actor nursery") with trio.open_cancel_scope(shield=True): + log.warn( + f"{current_actor().uid} was cancelled with {etype}" + ", cancelling actor nursery") await self.cancel() else: - log.exception(f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") + log.exception( + f"{current_actor().uid} errored with {etype}, " + "cancelling actor nursery") await self.cancel() else: # XXX: this is effectively the lone cancellation/supervisor