Maybe wait for actor result(s) after proc join
parent
d4da80c558
commit
f8111e51cd
|
@ -137,30 +137,11 @@ class ActorNursery:
|
||||||
async def wait(self):
|
async def wait(self):
|
||||||
"""Wait for all subactors to complete.
|
"""Wait for all subactors to complete.
|
||||||
"""
|
"""
|
||||||
async def wait_for_proc(proc, actor, portal, cancel_scope):
|
async def maybe_consume_result(portal, actor):
|
||||||
# TODO: timeout block here?
|
if (
|
||||||
if proc.is_alive():
|
portal in self._cancel_after_result_on_exit and
|
||||||
await trio.hazmat.wait_readable(proc.sentinel)
|
(portal._result is None and portal._exc is None)
|
||||||
# please god don't hang
|
):
|
||||||
proc.join()
|
|
||||||
log.debug(f"Joined {proc}")
|
|
||||||
self._children.pop(actor.uid)
|
|
||||||
# proc terminated, cancel result waiter
|
|
||||||
if cancel_scope:
|
|
||||||
log.warn(
|
|
||||||
f"Cancelling existing result waiter task for {actor.uid}")
|
|
||||||
cancel_scope.cancel()
|
|
||||||
|
|
||||||
async def wait_for_result(
|
|
||||||
portal, actor,
|
|
||||||
task_status=trio.TASK_STATUS_IGNORED,
|
|
||||||
):
|
|
||||||
# cancel the actor gracefully
|
|
||||||
with trio.open_cancel_scope() as cs:
|
|
||||||
task_status.started(cs)
|
|
||||||
log.info(f"Cancelling {portal.channel.uid} gracefully")
|
|
||||||
await portal.cancel_actor()
|
|
||||||
|
|
||||||
log.debug(f"Waiting on final result from {subactor.uid}")
|
log.debug(f"Waiting on final result from {subactor.uid}")
|
||||||
res = await portal.result()
|
res = await portal.result()
|
||||||
# if it's an async-gen then we should alert the user
|
# if it's an async-gen then we should alert the user
|
||||||
|
@ -173,6 +154,33 @@ class ActorNursery:
|
||||||
async for item in agen:
|
async for item in agen:
|
||||||
log.debug(f"Consuming item {item}")
|
log.debug(f"Consuming item {item}")
|
||||||
|
|
||||||
|
async def wait_for_proc(proc, actor, portal, cancel_scope):
|
||||||
|
# TODO: timeout block here?
|
||||||
|
if proc.is_alive():
|
||||||
|
await trio.hazmat.wait_readable(proc.sentinel)
|
||||||
|
# please god don't hang
|
||||||
|
proc.join()
|
||||||
|
log.debug(f"Joined {proc}")
|
||||||
|
await maybe_consume_result(portal, actor)
|
||||||
|
|
||||||
|
self._children.pop(actor.uid)
|
||||||
|
# proc terminated, cancel result waiter
|
||||||
|
if cancel_scope:
|
||||||
|
log.warn(
|
||||||
|
f"Cancelling existing result waiter task for {actor.uid}")
|
||||||
|
cancel_scope.cancel()
|
||||||
|
|
||||||
|
async def wait_for_actor(
|
||||||
|
portal, actor,
|
||||||
|
task_status=trio.TASK_STATUS_IGNORED,
|
||||||
|
):
|
||||||
|
# cancel the actor gracefully
|
||||||
|
with trio.open_cancel_scope() as cs:
|
||||||
|
task_status.started(cs)
|
||||||
|
await maybe_consume_result(portal, actor)
|
||||||
|
log.info(f"Cancelling {portal.channel.uid} gracefully")
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
log.warn("Result waiter was cancelled")
|
log.warn("Result waiter was cancelled")
|
||||||
|
|
||||||
|
@ -182,7 +190,7 @@ class ActorNursery:
|
||||||
for subactor, proc, portal in children.values():
|
for subactor, proc, portal in children.values():
|
||||||
cs = None
|
cs = None
|
||||||
if portal in self._cancel_after_result_on_exit:
|
if portal in self._cancel_after_result_on_exit:
|
||||||
cs = await nursery.start(wait_for_result, portal, subactor)
|
cs = await nursery.start(wait_for_actor, portal, subactor)
|
||||||
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
|
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
|
||||||
|
|
||||||
async def cancel(self, hard_kill=False):
|
async def cancel(self, hard_kill=False):
|
||||||
|
|
Loading…
Reference in New Issue