One more super subtle cancellation fix

See python-trio/trio#455 for the deats...
asyncgen_closing_fix
Tyler Goodlet 2018-07-13 17:49:41 -04:00
parent c326a90484
commit 2b7bbf32a1
1 changed files with 59 additions and 37 deletions

View File

@ -77,21 +77,33 @@ async def _invoke(
coro = func(**kwargs) coro = func(**kwargs)
if inspect.isasyncgen(coro): if inspect.isasyncgen(coro):
async for item in coro: with trio.open_cancel_scope() as cs:
# TODO: can we send values back in here? async for item in coro:
# it's gonna require a `while True:` and # TODO: can we send values back in here?
# some non-blocking way to retrieve new `asend()` # it's gonna require a `while True:` and
# values from the channel: # some non-blocking way to retrieve new `asend()`
# to_send = await chan.recv_nowait() # values from the channel:
# if to_send is not None: # to_send = await chan.recv_nowait()
# to_yield = await coro.asend(to_send) # if to_send is not None:
await chan.send({'yield': item, 'cid': cid}) # to_yield = await coro.asend(to_send)
log.debug(f"Finished iterating {coro}") # XXX: massive gotcha! If the containing scope
# TODO: we should really support a proper # is cancelled and we execute the below line,
# `StopAsyncIteration` system here for returning a final # any ``ActorNursery.__aexit__()`` WON'T be
# value if desired # triggered in the underlying async gen! So we
await chan.send({'stop': None, 'cid': cid}) # have to shield here (which shouldn't matter)
# in order to be sure the cancel is propagated!
cs.shield = True
await chan.send({'yield': item, 'cid': cid})
cs.shield = False
log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
cs.shield = True
await chan.send({'stop': None, 'cid': cid})
cs.shield = False
else: else:
if treat_as_gen: if treat_as_gen:
# XXX: the async-func may spawn further tasks which push # XXX: the async-func may spawn further tasks which push
@ -102,14 +114,15 @@ async def _invoke(
else: else:
await chan.send({'return': await coro, 'cid': cid}) await chan.send({'return': await coro, 'cid': cid})
task_status.started()
except Exception: except Exception:
log.exception("Actor errored:")
if not raise_errs: if not raise_errs:
await chan.send({'error': traceback.format_exc(), 'cid': cid}) await chan.send({'error': traceback.format_exc(), 'cid': cid})
log.exception("Actor errored:")
else: else:
raise raise
task_status.started()
async def result_from_q(q, chan): async def result_from_q(q, chan):
"""Process a msg from a remote actor. """Process a msg from a remote actor.
@ -444,33 +457,36 @@ class Actor:
self._process_messages, self._parent_chan) self._process_messages, self._parent_chan)
if self.main: if self.main:
with trio.open_cancel_scope() as main_scope: try:
self._main_scope = main_scope if self._parent_chan:
try: async with trio.open_nursery() as n:
if self._parent_chan: self._main_scope = n.cancel_scope
log.debug(f"Starting main task `{self.main}`") log.debug(f"Starting main task `{self.main}`")
# spawned subactor so deliver "main" # spawned subactor so deliver "main"
# task result(s) back to parent # task result(s) back to parent
await nursery.start( await n.start(
_invoke, 'main', _invoke, 'main',
self._parent_chan, self.main, {}, self._parent_chan, self.main, {},
# treat_as_gen, raise_errs params # treat_as_gen, raise_errs params
False, True False, True
) )
else: else:
# run directly - we are an "unspawned actor" with trio.open_cancel_scope() as main_scope:
self._main_scope = main_scope
# run directly we are an "unspawned actor"
log.debug(f"Running `{self.main}` directly") log.debug(f"Running `{self.main}` directly")
result = await self.main() result = await self.main()
finally: finally:
self._main_complete.set() # tear down channel server in order to ensure
# tear down channel server in order to ensure # we exit normally when the main task is done
# we exit normally when the main task is done if not self._outlive_main:
if not self._outlive_main: log.debug(f"Shutting down channel server")
log.debug(f"Shutting down channel server") self.cancel_server()
self.cancel_server() log.debug(f"Shutting down root nursery")
log.debug(f"Shutting down root nursery") nursery.cancel_scope.cancel()
nursery.cancel_scope.cancel() self._main_complete.set()
if main_scope.cancelled_caught:
if self._main_scope.cancelled_caught:
log.debug("Main task was cancelled sucessfully") log.debug("Main task was cancelled sucessfully")
log.debug("Waiting on root nursery to complete") log.debug("Waiting on root nursery to complete")
# blocks here as expected if no nursery was provided until # blocks here as expected if no nursery was provided until
@ -659,6 +675,7 @@ class Portal:
log.debug( log.debug(
f"Cancelling async gen call {cid} to " f"Cancelling async gen call {cid} to "
f"{self.channel.uid}") f"{self.channel.uid}")
raise
return yield_from_q() return yield_from_q()
@ -874,14 +891,19 @@ class ActorNursery:
"""Wait on all subactor's main routines to complete. """Wait on all subactor's main routines to complete.
""" """
if etype is not None: if etype is not None:
# XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case the
# else block here might not complete? Should both be shielded?
if etype is trio.Cancelled: if etype is trio.Cancelled:
log.warn(f"{current_actor().uid} was cancelled with {etype}, "
"cancelling actor nursery")
with trio.open_cancel_scope(shield=True): with trio.open_cancel_scope(shield=True):
log.warn(
f"{current_actor().uid} was cancelled with {etype}"
", cancelling actor nursery")
await self.cancel() await self.cancel()
else: else:
log.exception(f"{current_actor().uid} errored with {etype}, " log.exception(
"cancelling actor nursery") f"{current_actor().uid} errored with {etype}, "
"cancelling actor nursery")
await self.cancel() await self.cancel()
else: else:
# XXX: this is effectively the lone cancellation/supervisor # XXX: this is effectively the lone cancellation/supervisor