Add a separate cancel scope for the main task

Cancellation requires that each actor cancel it's spawned subactors
before cancelling its own root (nursery's) cancel scope to avoid breaking
channel connections before kill commands (`Actor.cancel()`) have been sent
off to peers. To solve this, ensure each main task is cancelled to
completion first (which will guarantee that all actor nurseries have
completed their cancellation steps) before cancelling the actor's "core"
tasks under the "root" scope.
asyncgen_closing_fix
Tyler Goodlet 2018-07-11 00:20:50 -04:00
parent 1854471992
commit 209a6a2096
1 changed files with 37 additions and 23 deletions

View File

@ -176,6 +176,8 @@ class Actor:
self._peers = defaultdict(list) self._peers = defaultdict(list)
self._peer_connected = {} self._peer_connected = {}
self._no_more_peers = trio.Event() self._no_more_peers = trio.Event()
self._main_complete = trio.Event()
self._main_scope = None
self._no_more_peers.set() self._no_more_peers.set()
self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
self._listeners = [] self._listeners = []
@ -443,28 +445,32 @@ class Actor:
self._process_messages, self._parent_chan) self._process_messages, self._parent_chan)
if self.main: if self.main:
try: with trio.open_cancel_scope() as main_scope:
if self._parent_chan: self._main_scope = main_scope
log.debug(f"Starting main task `{self.main}`") try:
# spawned subactor so deliver "main" task result(s) if self._parent_chan:
# back to parent log.debug(f"Starting main task `{self.main}`")
await nursery.start( # spawned subactor so deliver "main" task result(s)
_invoke, 'main', # back to parent
self._parent_chan, self.main, {}, await nursery.start(
False, True # treat_as_gen, raise_errs params _invoke, 'main',
) self._parent_chan, self.main, {},
else: False, True # treat_as_gen, raise_errs params
# run directly - we are an "unspawned actor" )
log.debug(f"Running `{self.main}` directly") else:
result = await self.main() # run directly - we are an "unspawned actor"
log.debug(f"Running `{self.main}` directly")
finally: result = await self.main()
# tear down channel server in order to ensure
# we exit normally when the main task is done
if not self._outlive_main:
log.debug(f"Shutting down channel server")
self.cancel_server()
finally:
# tear down channel server in order to ensure
# we exit normally when the main task is done
if not self._outlive_main:
log.debug(f"Shutting down channel server")
self.cancel_server()
if main_scope.cancelled_caught:
log.debug("Main task was cancelled sucessfully")
self._main_complete.set()
log.debug("Waiting on root nursery to complete") log.debug("Waiting on root nursery to complete")
# blocks here as expected if no nursery was provided until # blocks here as expected if no nursery was provided until
# the channel server is killed (i.e. this actor is # the channel server is killed (i.e. this actor is
@ -550,6 +556,10 @@ class Actor:
cancelling (for all intents and purposes) this actor. cancelling (for all intents and purposes) this actor.
""" """
self.cancel_server() self.cancel_server()
if self._main_scope:
self._main_scope.cancel()
log.debug("Waiting on main task to complete")
await self._main_complete.wait()
self._root_nursery.cancel_scope.cancel() self._root_nursery.cancel_scope.cancel()
def cancel_server(self): def cancel_server(self):
@ -684,9 +694,11 @@ class Portal:
cancel_scope.shield = True cancel_scope.shield = True
# send cancel cmd - might not get response # send cancel cmd - might not get response
await self.run('self', 'cancel') await self.run('self', 'cancel')
return True
except trio.ClosedStreamError: except trio.ClosedStreamError:
log.warn( log.warn(
f"{self.channel} for {self.channel.uid} was alreaday closed?") f"{self.channel} for {self.channel.uid} was already closed?")
return False
@asynccontextmanager @asynccontextmanager
@ -795,7 +807,8 @@ class ActorNursery:
return portal return portal
async def wait(self): async def wait(self):
"""Wait for all subactors to complete.
"""
async def wait_for_proc(proc, actor, portal): async def wait_for_proc(proc, actor, portal):
# TODO: timeout block here? # TODO: timeout block here?
if proc.is_alive(): if proc.is_alive():
@ -1022,6 +1035,7 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
actor = Actor( actor = Actor(
name or 'anonymous', name or 'anonymous',
main=main, main=main,
arbiter_addr=arbiter_addr,
**kwargs **kwargs
) )
host, port = (host, 0) host, port = (host, 0)