diff --git a/nooz/248.misc.rst b/nooz/248.misc.rst new file mode 100644 index 0000000..453c829 --- /dev/null +++ b/nooz/248.misc.rst @@ -0,0 +1,8 @@ +Adjust the `tractor._spawn.soft_wait()` strategy to avoid sending an +actor cancel request (via `Portal.cancel_actor()`) if either the child +process is detected as having terminated or the IPC channel is detected +to be closed. + +This ensures (even) more deterministic inter-actor cancellation by +avoiding the timeout condition where possible when a whild never +sucessfully spawned, crashed, or became un-contactable over IPC. diff --git a/setup.py b/setup.py index 14a65f7..b17bb57 100755 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ with open('docs/README.rst', encoding='utf-8') as f: setup( name="tractor", - version='0.1.0a4', # alpha zone + version='0.1.0a5.dev', # alpha zone description='structured concurrrent "actors"', long_description=readme, license='GPLv3', diff --git a/tractor/_spawn.py b/tractor/_spawn.py index ead91df..3d7e6b1 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -82,14 +82,16 @@ else: def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: - """Attempt to set the method for process starting, aka the "actor + ''' + Attempt to set the method for process starting, aka the "actor spawning backend". If the desired method is not supported this function will error. On Windows only the ``multiprocessing`` "spawn" method is offered besides the default ``trio`` which uses async wrapping around ``subprocess.Popen``. - """ + + ''' global _ctx global _spawn_method @@ -218,7 +220,9 @@ async def soft_wait( # ``trio.Process.__aexit__()`` (it tears down stdio # which will kill any waiting remote pdb trace). # This is a "soft" (cancellable) join/reap. + uid = portal.channel.uid try: + log.cancel(f'Soft waiting on actor:\n{uid}') await wait_func(proc) except trio.Cancelled: # if cancelled during a soft wait, cancel the child @@ -226,8 +230,26 @@ async def soft_wait( # below. This means we try to do a graceful teardown # via sending a cancel message before getting out # zombie killing tools. - with trio.CancelScope(shield=True): + async with trio.open_nursery() as n: + n.cancel_scope.shield = True + + async def cancel_on_proc_deth(): + ''' + Cancel the actor cancel request if we detect that + that the process terminated. + + ''' + await wait_func(proc) + n.cancel_scope.cancel() + + n.start_soon(cancel_on_proc_deth) await portal.cancel_actor() + + if proc.poll() is None: # type: ignore + log.warning( + f'Process still alive after cancel request:\n{uid}') + + n.cancel_scope.cancel() raise @@ -373,9 +395,8 @@ async def new_proc( # The "hard" reap since no actor zombies are allowed! # XXX: do this **after** cancellation/tearfown to avoid # killing the process too early. - log.cancel(f'Hard reap sequence starting for {uid}') - if proc: + log.cancel(f'Hard reap sequence starting for {uid}') with trio.CancelScope(shield=True): # don't clobber an ongoing pdb @@ -483,6 +504,7 @@ async def mp_new_proc( # daemon=True, name=name, ) + # `multiprocessing` only (since no async interface): # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait @@ -501,6 +523,11 @@ async def mp_new_proc( # local actor by the time we get a ref to it event, chan = await actor_nursery._actor.wait_for_peer( subactor.uid) + + # XXX: monkey patch poll API to match the ``subprocess`` API.. + # not sure why they don't expose this but kk. + proc.poll = lambda: proc.exitcode # type: ignore + # except: # TODO: in the case we were cancelled before the sub-proc # registered itself back we must be sure to try and clean diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 3834983..76f6467 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -170,7 +170,6 @@ async def maybe_open_context( await _Cache.lock.acquire() ctx_key = (id(acm_func), key or tuple(kwargs.items())) - print(ctx_key) value = None try: @@ -180,7 +179,7 @@ async def maybe_open_context( value = _Cache.values[ctx_key] except KeyError: - log.info(f'Allocating new resource for {ctx_key}') + log.info(f'Allocating new {acm_func} for {ctx_key}') mngr = acm_func(**kwargs) # TODO: avoid pulling from ``tractor`` internals and