forked from goodboy/tractor
Don't alert nursery on expected cancel result
parent
c01d2f8aea
commit
4d30e25591
|
@ -31,8 +31,12 @@ from .log import get_logger
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._actor import Actor
|
from ._actor import Actor
|
||||||
from ._entry import _mp_main
|
from ._entry import _mp_main
|
||||||
from ._exceptions import ActorFailure, RemoteActorError
|
from ._exceptions import (
|
||||||
from ._debug import maybe_wait_for_debugger
|
ActorFailure,
|
||||||
|
RemoteActorError,
|
||||||
|
ContextCancelled,
|
||||||
|
)
|
||||||
|
from ._debug import maybe_wait_for_debugger, breakpoint
|
||||||
|
|
||||||
|
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
@ -120,12 +124,12 @@ async def result_from_portal(
|
||||||
# a MultiError and we still send out a cancel request
|
# a MultiError and we still send out a cancel request
|
||||||
# result = await exhaust_portal(portal, actor)
|
# result = await exhaust_portal(portal, actor)
|
||||||
try:
|
try:
|
||||||
log.debug(f"Waiting on final result from {actor.uid}")
|
log.info(f"Waiting on final result from {actor.uid}")
|
||||||
|
|
||||||
# XXX: streams should never be reaped here since they should
|
# XXX: streams should never be reaped here since they should
|
||||||
# always be established and shutdown using a context manager api
|
# always be established and shutdown using a context manager api
|
||||||
result = await portal.result()
|
result = await portal.result()
|
||||||
log.debug(f"Returning final result: {result}")
|
log.info(f"Returning final result: {result}")
|
||||||
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# we reraise in the parent task via a ``trio.MultiError``
|
# we reraise in the parent task via a ``trio.MultiError``
|
||||||
|
@ -204,6 +208,7 @@ async def do_hard_kill(
|
||||||
async def reap_proc(
|
async def reap_proc(
|
||||||
|
|
||||||
proc: trio.Process,
|
proc: trio.Process,
|
||||||
|
uid: tuple[str, str],
|
||||||
terminate_after: float = float('inf'),
|
terminate_after: float = float('inf'),
|
||||||
hard_kill_after: int = 0.1,
|
hard_kill_after: int = 0.1,
|
||||||
|
|
||||||
|
@ -214,6 +219,7 @@ async def reap_proc(
|
||||||
# killing. This is a "light" (cancellable) join,
|
# killing. This is a "light" (cancellable) join,
|
||||||
# the hard join is below after timeout
|
# the hard join is below after timeout
|
||||||
await proc.wait()
|
await proc.wait()
|
||||||
|
log.info(f'{uid} terminated gracefully')
|
||||||
|
|
||||||
if cs.cancelled_caught and terminate_after is not float('inf'):
|
if cs.cancelled_caught and terminate_after is not float('inf'):
|
||||||
# Always "hard" join lingering sub procs since no
|
# Always "hard" join lingering sub procs since no
|
||||||
|
@ -398,6 +404,8 @@ async def new_proc(
|
||||||
if portal.channel.connected() and ria:
|
if portal.channel.connected() and ria:
|
||||||
|
|
||||||
# we wait for result and cancel on completion
|
# we wait for result and cancel on completion
|
||||||
|
# if uid[0] == 'odds':
|
||||||
|
# await breakpoint()
|
||||||
await result_from_portal(
|
await result_from_portal(
|
||||||
portal,
|
portal,
|
||||||
subactor,
|
subactor,
|
||||||
|
@ -414,11 +422,19 @@ async def new_proc(
|
||||||
# )
|
# )
|
||||||
|
|
||||||
# soft & cancellable
|
# soft & cancellable
|
||||||
await reap_proc(proc)
|
await reap_proc(proc, uid)
|
||||||
|
|
||||||
# # if proc terminates before portal result
|
# # if proc terminates before portal result
|
||||||
# if cancel_scope:
|
# if cancel_scope:
|
||||||
# cancel_scope.cancel()
|
# cancel_scope.cancel()
|
||||||
|
except (
|
||||||
|
ContextCancelled,
|
||||||
|
) as err:
|
||||||
|
if portal.cancel_called:
|
||||||
|
log.cancel('{uid} received expected cancel')
|
||||||
|
|
||||||
|
# soft & cancellable
|
||||||
|
await reap_proc(proc, uid, terminate_after=0.1)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
RemoteActorError,
|
RemoteActorError,
|
||||||
|
@ -471,6 +487,7 @@ async def new_proc(
|
||||||
|
|
||||||
await reap_proc(
|
await reap_proc(
|
||||||
proc,
|
proc,
|
||||||
|
uid,
|
||||||
# this is the same as previous timeout
|
# this is the same as previous timeout
|
||||||
# setting before rewriting this spawn
|
# setting before rewriting this spawn
|
||||||
# section
|
# section
|
||||||
|
@ -488,6 +505,7 @@ async def new_proc(
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await reap_proc(
|
await reap_proc(
|
||||||
proc,
|
proc,
|
||||||
|
uid,
|
||||||
terminate_after=0.1,
|
terminate_after=0.1,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue