From 1f85f7153428ec9a1e928e4f5fb0c21939e8d427 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 13 Jul 2018 22:18:08 -0400 Subject: [PATCH] Use `async_generator`'s `aclosing()` helper Take @njsmith's advice and properly close actor invoked async generators using `async_generator.aclosing()` instead of hacking it (as previous) with a shielded cancel scope. --- tractor/__init__.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 18b6bc9..1b10913 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -12,7 +12,7 @@ import traceback import uuid import trio -from async_generator import asynccontextmanager +from async_generator import asynccontextmanager, aclosing from .ipc import Channel, _connect_chan from .log import get_console_log, get_logger @@ -77,8 +77,15 @@ async def _invoke( coro = func(**kwargs) if inspect.isasyncgen(coro): - with trio.open_cancel_scope() as cs: - async for item in coro: + # XXX: massive gotcha! If the containing scope + # is cancelled and we execute the below line, + # any ``ActorNursery.__aexit__()`` WON'T be + # triggered in the underlying async gen! So we + # have to properly handle the closing (aclosing) + # of the async gen in order to be sure the cancel + # is propagated! + async with aclosing(coro) as agen: + async for item in agen: # TODO: can we send values back in here? # it's gonna require a `while True:` and # some non-blocking way to retrieve new `asend()` @@ -86,24 +93,13 @@ async def _invoke( # to_send = await chan.recv_nowait() # if to_send is not None: # to_yield = await coro.asend(to_send) - - # XXX: massive gotcha! If the containing scope - # is cancelled and we execute the below line, - # any ``ActorNursery.__aexit__()`` WON'T be - # triggered in the underlying async gen! So we - # have to shield here (which shouldn't matter) - # in order to be sure the cancel is propagated! - cs.shield = True await chan.send({'yield': item, 'cid': cid}) - cs.shield = False log.debug(f"Finished iterating {coro}") # TODO: we should really support a proper # `StopAsyncIteration` system here for returning a final # value if desired - cs.shield = True await chan.send({'stop': None, 'cid': cid}) - cs.shield = False else: if treat_as_gen: # XXX: the async-func may spawn further tasks which push