Cancel result waiter once proc terminates

wait_for_actor
Tyler Goodlet 2018-08-14 22:51:37 -04:00
parent b1f17dea1f
commit 09e3a94060
2 changed files with 38 additions and 24 deletions

View File

@ -148,7 +148,9 @@ class Portal:
"""Return the result(s) from the remote actor's "main" task. """Return the result(s) from the remote actor's "main" task.
""" """
if self._expect_result is None: if self._expect_result is None:
raise RuntimeError("This portal is not expecting a final result?") raise RuntimeError(
f"Portal for {self.channel.uid} is not expecting a final"
"result?")
elif self._result is None: elif self._result is None:
self._result = await self._return_from_resptype( self._result = await self._return_from_resptype(
*self._expect_result *self._expect_result

View File

@ -126,18 +126,18 @@ class ActorNursery:
bind_addr=bind_addr, bind_addr=bind_addr,
statespace=statespace, statespace=statespace,
) )
self._cancel_after_result_on_exit.add(portal)
await portal._submit_for_result( await portal._submit_for_result(
mod_path, mod_path,
fn.__name__, fn.__name__,
**kwargs **kwargs
) )
self._cancel_after_result_on_exit.add(portal)
return portal return portal
async def wait(self): async def wait(self):
"""Wait for all subactors to complete. """Wait for all subactors to complete.
""" """
async def wait_for_proc(proc, actor, portal): async def wait_for_proc(proc, actor, portal, cancel_scope):
# TODO: timeout block here? # TODO: timeout block here?
if proc.is_alive(): if proc.is_alive():
await trio.hazmat.wait_readable(proc.sentinel) await trio.hazmat.wait_readable(proc.sentinel)
@ -145,9 +145,19 @@ class ActorNursery:
proc.join() proc.join()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
self._children.pop(actor.uid) self._children.pop(actor.uid)
# proc terminated, cancel result waiter
if cancel_scope:
log.warn(
f"Cancelling existing result waiter task for {actor.uid}")
cancel_scope.cancel()
async def wait_for_result(portal, actor): async def wait_for_result(
portal, actor,
task_status=trio.TASK_STATUS_IGNORED,
):
# cancel the actor gracefully # cancel the actor gracefully
with trio.open_cancel_scope() as cs:
task_status.started(cs)
log.info(f"Cancelling {portal.channel.uid} gracefully") log.info(f"Cancelling {portal.channel.uid} gracefully")
await portal.cancel_actor() await portal.cancel_actor()
@ -163,15 +173,17 @@ class ActorNursery:
async for item in agen: async for item in agen:
log.debug(f"Consuming item {item}") log.debug(f"Consuming item {item}")
if cs.cancelled_caught:
log.warn("Result waiter was cancelled")
# unblocks when all waiter tasks have completed # unblocks when all waiter tasks have completed
children = self._children.copy() children = self._children.copy()
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
for subactor, proc, portal in children.values(): for subactor, proc, portal in children.values():
nursery.start_soon(wait_for_proc, proc, subactor, portal) cs = None
if proc.is_alive() and ( if portal in self._cancel_after_result_on_exit:
portal in self._cancel_after_result_on_exit cs = await nursery.start(wait_for_result, portal, subactor)
): nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
nursery.start_soon(wait_for_result, portal, subactor)
async def cancel(self, hard_kill=False): async def cancel(self, hard_kill=False):
"""Cancel this nursery by instructing each subactor to cancel """Cancel this nursery by instructing each subactor to cancel