Merge pull request #284 from goodboy/maybe_cancel_the_cancel_

Cancel the `.cancel_actor()` request on proc death
experimental_subpkg
goodboy 2022-01-21 14:22:48 -05:00 committed by GitHub
commit 0b51ebfe11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 42 additions and 8 deletions

View File

@ -0,0 +1,8 @@
Adjust the `tractor._spawn.soft_wait()` strategy to avoid sending an
actor cancel request (via `Portal.cancel_actor()`) if either the child
process is detected as having terminated or the IPC channel is detected
to be closed.
This ensures (even) more deterministic inter-actor cancellation by
avoiding the timeout condition where possible when a whild never
sucessfully spawned, crashed, or became un-contactable over IPC.

View File

@ -24,7 +24,7 @@ with open('docs/README.rst', encoding='utf-8') as f:
setup( setup(
name="tractor", name="tractor",
version='0.1.0a4', # alpha zone version='0.1.0a5.dev', # alpha zone
description='structured concurrrent "actors"', description='structured concurrrent "actors"',
long_description=readme, long_description=readme,
license='GPLv3', license='GPLv3',

View File

@ -82,14 +82,16 @@ else:
def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
"""Attempt to set the method for process starting, aka the "actor '''
Attempt to set the method for process starting, aka the "actor
spawning backend". spawning backend".
If the desired method is not supported this function will error. If the desired method is not supported this function will error.
On Windows only the ``multiprocessing`` "spawn" method is offered On Windows only the ``multiprocessing`` "spawn" method is offered
besides the default ``trio`` which uses async wrapping around besides the default ``trio`` which uses async wrapping around
``subprocess.Popen``. ``subprocess.Popen``.
"""
'''
global _ctx global _ctx
global _spawn_method global _spawn_method
@ -218,7 +220,9 @@ async def soft_wait(
# ``trio.Process.__aexit__()`` (it tears down stdio # ``trio.Process.__aexit__()`` (it tears down stdio
# which will kill any waiting remote pdb trace). # which will kill any waiting remote pdb trace).
# This is a "soft" (cancellable) join/reap. # This is a "soft" (cancellable) join/reap.
uid = portal.channel.uid
try: try:
log.cancel(f'Soft waiting on actor:\n{uid}')
await wait_func(proc) await wait_func(proc)
except trio.Cancelled: except trio.Cancelled:
# if cancelled during a soft wait, cancel the child # if cancelled during a soft wait, cancel the child
@ -226,8 +230,26 @@ async def soft_wait(
# below. This means we try to do a graceful teardown # below. This means we try to do a graceful teardown
# via sending a cancel message before getting out # via sending a cancel message before getting out
# zombie killing tools. # zombie killing tools.
with trio.CancelScope(shield=True): async with trio.open_nursery() as n:
n.cancel_scope.shield = True
async def cancel_on_proc_deth():
'''
Cancel the actor cancel request if we detect that
that the process terminated.
'''
await wait_func(proc)
n.cancel_scope.cancel()
n.start_soon(cancel_on_proc_deth)
await portal.cancel_actor() await portal.cancel_actor()
if proc.poll() is None: # type: ignore
log.warning(
f'Process still alive after cancel request:\n{uid}')
n.cancel_scope.cancel()
raise raise
@ -373,9 +395,8 @@ async def new_proc(
# The "hard" reap since no actor zombies are allowed! # The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid # XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early. # killing the process too early.
log.cancel(f'Hard reap sequence starting for {uid}')
if proc: if proc:
log.cancel(f'Hard reap sequence starting for {uid}')
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
@ -483,6 +504,7 @@ async def mp_new_proc(
# daemon=True, # daemon=True,
name=name, name=name,
) )
# `multiprocessing` only (since no async interface): # `multiprocessing` only (since no async interface):
# register the process before start in case we get a cancel # register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait # request before the actor has fully spawned - then we can wait
@ -501,6 +523,11 @@ async def mp_new_proc(
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid) subactor.uid)
# XXX: monkey patch poll API to match the ``subprocess`` API..
# not sure why they don't expose this but kk.
proc.poll = lambda: proc.exitcode # type: ignore
# except: # except:
# TODO: in the case we were cancelled before the sub-proc # TODO: in the case we were cancelled before the sub-proc
# registered itself back we must be sure to try and clean # registered itself back we must be sure to try and clean

View File

@ -170,7 +170,6 @@ async def maybe_open_context(
await _Cache.lock.acquire() await _Cache.lock.acquire()
ctx_key = (id(acm_func), key or tuple(kwargs.items())) ctx_key = (id(acm_func), key or tuple(kwargs.items()))
print(ctx_key)
value = None value = None
try: try:
@ -180,7 +179,7 @@ async def maybe_open_context(
value = _Cache.values[ctx_key] value = _Cache.values[ctx_key]
except KeyError: except KeyError:
log.info(f'Allocating new resource for {ctx_key}') log.info(f'Allocating new {acm_func} for {ctx_key}')
mngr = acm_func(**kwargs) mngr = acm_func(**kwargs)
# TODO: avoid pulling from ``tractor`` internals and # TODO: avoid pulling from ``tractor`` internals and