Add onc-cancels-all strategy to actor nursery

asyncgen_closing_fix
Tyler Goodlet 2018-07-11 18:09:38 -04:00
parent 25852794a8
commit 1ade5c5fbb
1 changed files with 22 additions and 36 deletions

View File

@ -14,7 +14,7 @@ import uuid
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from .ipc import Channel from .ipc import Channel, _connect_chan
from .log import get_console_log, get_logger from .log import get_console_log, get_logger
ctx = mp.get_context("forkserver") ctx = mp.get_context("forkserver")
@ -245,7 +245,6 @@ class Actor:
await self._process_messages(chan) await self._process_messages(chan)
finally: finally:
# Drop ref to channel so it can be gc-ed and disconnected # Drop ref to channel so it can be gc-ed and disconnected
# if chan is not self._parent_chan:
log.debug(f"Releasing channel {chan} from {chan.uid}") log.debug(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid) chans = self._peers.get(chan.uid)
chans.remove(chan) chans.remove(chan)
@ -450,27 +449,29 @@ class Actor:
try: try:
if self._parent_chan: if self._parent_chan:
log.debug(f"Starting main task `{self.main}`") log.debug(f"Starting main task `{self.main}`")
# spawned subactor so deliver "main" task result(s) # spawned subactor so deliver "main"
# back to parent # task result(s) back to parent
await nursery.start( await nursery.start(
_invoke, 'main', _invoke, 'main',
self._parent_chan, self.main, {}, self._parent_chan, self.main, {},
False, True # treat_as_gen, raise_errs params # treat_as_gen, raise_errs params
False, True
) )
else: else:
# run directly - we are an "unspawned actor" # run directly - we are an "unspawned actor"
log.debug(f"Running `{self.main}` directly") log.debug(f"Running `{self.main}` directly")
result = await self.main() result = await self.main()
finally: finally:
self._main_complete.set()
# tear down channel server in order to ensure # tear down channel server in order to ensure
# we exit normally when the main task is done # we exit normally when the main task is done
if not self._outlive_main: if not self._outlive_main:
log.debug(f"Shutting down channel server") log.debug(f"Shutting down channel server")
self.cancel_server() self.cancel_server()
log.debug(f"Shutting down root nursery")
nursery.cancel_scope.cancel()
if main_scope.cancelled_caught: if main_scope.cancelled_caught:
log.debug("Main task was cancelled sucessfully") log.debug("Main task was cancelled sucessfully")
self._main_complete.set()
log.debug("Waiting on root nursery to complete") log.debug("Waiting on root nursery to complete")
# blocks here as expected if no nursery was provided until # blocks here as expected if no nursery was provided until
# the channel server is killed (i.e. this actor is # the channel server is killed (i.e. this actor is
@ -674,6 +675,9 @@ class Portal:
resptype, first_msg, q = (await result_from_q(q, self.channel)) resptype, first_msg, q = (await result_from_q(q, self.channel))
self._result = await self._return_from_resptype( self._result = await self._return_from_resptype(
'main', resptype, first_msg, q) 'main', resptype, first_msg, q)
log.warn(
f"Retrieved first result `{self._result}` "
f"for {self.channel.uid}")
# await q.put(first_msg) # for next consumer (e.g. nursery) # await q.put(first_msg) # for next consumer (e.g. nursery)
return self._result return self._result
@ -760,6 +764,7 @@ class ActorNursery:
# We'll likely want some way to cancel all sub-actors eventually # We'll likely want some way to cancel all sub-actors eventually
# self.cancel_scope = cancel_scope # self.cancel_scope = cancel_scope
self._children = {} self._children = {}
self.cancelled = False
async def __aenter__(self): async def __aenter__(self):
return self return self
@ -867,17 +872,6 @@ class ActorNursery:
async def __aexit__(self, etype, value, tb): async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete. """Wait on all subactor's main routines to complete.
""" """
async def wait_for_actor(actor, proc, portal):
if proc.is_alive():
res = await portal.result()
log.info(f"{actor.uid} main task completed with {res}")
if not actor._outlive_main:
# trigger msg loop to break
chans = self._actor.get_chans(actor.uid)
for chan in chans:
log.info(f"Signalling msg loop exit for {actor.uid}")
await chan.send(None)
if etype is not None: if etype is not None:
if etype is trio.Cancelled: if etype is trio.Cancelled:
log.warn(f"{current_actor().uid} was cancelled with {etype}, " log.warn(f"{current_actor().uid} was cancelled with {etype}, "
@ -889,13 +883,17 @@ class ActorNursery:
"cancelling actor nursery") "cancelling actor nursery")
await self.cancel() await self.cancel()
else: else:
# XXX: this is effectively the lone cancellation/supervisor
# strategy which exactly mimicks trio's behaviour
log.debug(f"Waiting on subactors {self._children} to complete") log.debug(f"Waiting on subactors {self._children} to complete")
async with trio.open_nursery() as nursery: try:
for subactor, proc, portal in self._children.values(): await self.wait()
nursery.start_soon(wait_for_actor, subactor, proc, portal) except Exception as err:
log.warn(f"Nursery caught {err}, cancelling")
await self.wait() await self.cancel()
log.debug(f"Nursery teardown complete") self.cancelled = True
raise
log.debug(f"Nursery teardown complete")
def current_actor() -> Actor: def current_actor() -> Actor:
@ -955,18 +953,6 @@ async def _start_actor(actor, host, port, arbiter_addr, nursery=None):
return result return result
@asynccontextmanager
async def _connect_chan(host, port):
"""Attempt to connect to an arbiter's channel server.
Return the channel on success or ``None`` on failure.
"""
chan = Channel((host, port))
await chan.connect()
yield chan
await chan.aclose()
@asynccontextmanager @asynccontextmanager
async def get_arbiter(host, port): async def get_arbiter(host, port):
"""Return a portal instance connected to a local or remote """Return a portal instance connected to a local or remote