forked from goodboy/tractor
				
			Use `async_generator`'s `aclosing()` helper
Take @njsmith's advice and properly close actor invoked async generators using `async_generator.aclosing()` instead of hacking it (as previous) with a shielded cancel scope.asyncgen_closing_fix
							parent
							
								
									2b7bbf32a1
								
							
						
					
					
						commit
						1f85f71534
					
				| 
						 | 
				
			
			@ -12,7 +12,7 @@ import traceback
 | 
			
		|||
import uuid
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
from async_generator import asynccontextmanager
 | 
			
		||||
from async_generator import asynccontextmanager, aclosing
 | 
			
		||||
 | 
			
		||||
from .ipc import Channel, _connect_chan
 | 
			
		||||
from .log import get_console_log, get_logger
 | 
			
		||||
| 
						 | 
				
			
			@ -77,8 +77,15 @@ async def _invoke(
 | 
			
		|||
            coro = func(**kwargs)
 | 
			
		||||
 | 
			
		||||
            if inspect.isasyncgen(coro):
 | 
			
		||||
                with trio.open_cancel_scope() as cs:
 | 
			
		||||
                    async for item in coro:
 | 
			
		||||
                # XXX: massive gotcha! If the containing scope
 | 
			
		||||
                # is cancelled and we execute the below line,
 | 
			
		||||
                # any ``ActorNursery.__aexit__()`` WON'T be
 | 
			
		||||
                # triggered in the underlying async gen! So we
 | 
			
		||||
                # have to properly handle the closing (aclosing)
 | 
			
		||||
                # of the async gen in order to be sure the cancel
 | 
			
		||||
                # is propagated!
 | 
			
		||||
                async with aclosing(coro) as agen:
 | 
			
		||||
                    async for item in agen:
 | 
			
		||||
                        # TODO: can we send values back in here?
 | 
			
		||||
                        # it's gonna require a `while True:` and
 | 
			
		||||
                        # some non-blocking way to retrieve new `asend()`
 | 
			
		||||
| 
						 | 
				
			
			@ -86,24 +93,13 @@ async def _invoke(
 | 
			
		|||
                        # to_send = await chan.recv_nowait()
 | 
			
		||||
                        # if to_send is not None:
 | 
			
		||||
                        #     to_yield = await coro.asend(to_send)
 | 
			
		||||
 | 
			
		||||
                        # XXX: massive gotcha! If the containing scope
 | 
			
		||||
                        # is cancelled and we execute the below line,
 | 
			
		||||
                        # any ``ActorNursery.__aexit__()`` WON'T be
 | 
			
		||||
                        # triggered in the underlying async gen! So we
 | 
			
		||||
                        # have to shield here (which shouldn't matter)
 | 
			
		||||
                        # in order to be sure the cancel is propagated!
 | 
			
		||||
                        cs.shield = True
 | 
			
		||||
                        await chan.send({'yield': item, 'cid': cid})
 | 
			
		||||
                        cs.shield = False
 | 
			
		||||
 | 
			
		||||
                    log.debug(f"Finished iterating {coro}")
 | 
			
		||||
                    # TODO: we should really support a proper
 | 
			
		||||
                    # `StopAsyncIteration` system here for returning a final
 | 
			
		||||
                    # value if desired
 | 
			
		||||
                    cs.shield = True
 | 
			
		||||
                    await chan.send({'stop': None, 'cid': cid})
 | 
			
		||||
                    cs.shield = False
 | 
			
		||||
            else:
 | 
			
		||||
                if treat_as_gen:
 | 
			
		||||
                    # XXX: the async-func may spawn further tasks which push
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue