From cd2d8c217a6342f084ace275a88b2b8cf6dc8c4d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 12:53:03 -0400 Subject: [PATCH 01/12] Test that subactors deregister on cancel --- tests/test_discovery.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index f458494..844c4a8 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -80,3 +80,43 @@ async def test_trynamic_trio(func, start_method): print(await gretchen.result()) print(await donny.result()) print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") + + +def test_subactors_unregister_on_cancel(start_method): + """Verify that cancelling a nursery results in all subactors + deregistering themselves with the arbiter. + """ + async def main(): + actor = tractor.current_actor() + assert actor.is_arbiter + registry = actor._registry + + # arbiter is registered + assert actor.uid in registry + + try: + async with tractor.open_nursery() as n: + portals = {} + for i in range(3): + name = f'a{i}' + portals[name] = await n.run_in_actor( + name, trio.sleep_forever) + + # wait on last actor to come up + async with tractor.wait_for_actor(name): + for uid in n._children: + assert uid in registry + + assert len(portals) + 1 == len(registry) + + # trigger cancel + raise KeyboardInterrupt + + finally: + # all subactors should have de-registered + await trio.sleep(0.5) + assert len(registry) == 1 + assert actor.uid in registry + + with pytest.raises(KeyboardInterrupt): + tractor.run(main) From 0d9483376db3c3b67f7fea37344ed271f3675958 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 13:01:56 -0400 Subject: [PATCH 02/12] Test cancel with SIGINT on non-windows as well --- tests/test_discovery.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 844c4a8..df4a09a 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,6 +1,10 @@ """ Actor "discovery" testing """ +import os +import signal +import platform + import pytest import tractor import trio @@ -82,7 +86,8 @@ async def test_trynamic_trio(func, start_method): print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") -def test_subactors_unregister_on_cancel(start_method): +@pytest.mark.parametrize('use_signal', [False, True]) +def test_subactors_unregister_on_cancel(start_method, use_signal): """Verify that cancelling a nursery results in all subactors deregistering themselves with the arbiter. """ @@ -110,8 +115,12 @@ def test_subactors_unregister_on_cancel(start_method): assert len(portals) + 1 == len(registry) # trigger cancel - raise KeyboardInterrupt - + if use_signal: + if platform.system() == 'Windows': + pytest.skip("SIGINT not supported on windows") + os.kill(os.getpid(), signal.SIGINT) + else: + raise KeyboardInterrupt finally: # all subactors should have de-registered await trio.sleep(0.5) From 2ccaa94c607c4bedf4747fbd3e1840901afe120d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 14:49:46 -0400 Subject: [PATCH 03/12] Move daemon fixture up to conftest --- tests/conftest.py | 57 ++++++++++++++++++++++++++++++++++ tests/test_multi_program.py | 61 +++++-------------------------------- 2 files changed, 64 insertions(+), 54 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9f029ee..07a455c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,13 @@ """ ``tractor`` testing!! """ +import sys +import subprocess import os import random +import signal import platform +import time import pytest import tractor @@ -16,6 +20,19 @@ pytest_plugins = ['pytester'] _arb_addr = '127.0.0.1', random.randint(1000, 9999) +# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives +if platform.system() == 'Windows': + _KILL_SIGNAL = signal.CTRL_BREAK_EVENT + _INT_SIGNAL = signal.CTRL_C_EVENT + _INT_RETURN_CODE = 3221225786 + _PROC_SPAWN_WAIT = 2 +else: + _KILL_SIGNAL = signal.SIGKILL + _INT_SIGNAL = signal.SIGINT + _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value + _PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4 + + no_windows = pytest.mark.skipif( platform.system() == "Windows", reason="Test is unsupported on windows", @@ -89,3 +106,43 @@ def pytest_generate_tests(metafunc): methods = ['trio'] metafunc.parametrize("start_method", methods, scope='module') + + +def sig_prog(proc, sig): + "Kill the actor-process with ``sig``." + proc.send_signal(sig) + time.sleep(0.1) + if not proc.poll(): + # TODO: why sometimes does SIGINT not work on teardown? + # seems to happen only when trace logging enabled? + proc.send_signal(_KILL_SIGNAL) + ret = proc.wait() + assert ret + + +@pytest.fixture +def daemon(loglevel, testdir, arb_addr): + """Run a daemon actor as a "remote arbiter". + """ + cmdargs = [ + sys.executable, '-c', + "import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})" + .format( + arb_addr, + "'{}'".format(loglevel) if loglevel else None) + ] + kwargs = dict() + if platform.system() == 'Windows': + # without this, tests hang on windows forever + kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + + proc = testdir.popen( + cmdargs, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs, + ) + assert not proc.returncode + time.sleep(_PROC_SPAWN_WAIT) + yield proc + sig_prog(proc, _INT_SIGNAL) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 20f4226..12ca3ef 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -2,64 +2,16 @@ Multiple python programs invoking ``tractor.run()`` """ import platform -import sys import time -import signal -import subprocess import pytest import tractor -from conftest import tractor_test - -# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives -if platform.system() == 'Windows': - _KILL_SIGNAL = signal.CTRL_BREAK_EVENT - _INT_SIGNAL = signal.CTRL_C_EVENT - _INT_RETURN_CODE = 3221225786 - _PROC_SPAWN_WAIT = 2 -else: - _KILL_SIGNAL = signal.SIGKILL - _INT_SIGNAL = signal.SIGINT - _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value - _PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4 - - -def sig_prog(proc, sig): - "Kill the actor-process with ``sig``." - proc.send_signal(sig) - time.sleep(0.1) - if not proc.poll(): - # TODO: why sometimes does SIGINT not work on teardown? - # seems to happen only when trace logging enabled? - proc.send_signal(_KILL_SIGNAL) - ret = proc.wait() - assert ret - - -@pytest.fixture -def daemon(loglevel, testdir, arb_addr): - cmdargs = [ - sys.executable, '-c', - "import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})" - .format( - arb_addr, - "'{}'".format(loglevel) if loglevel else None) - ] - kwargs = dict() - if platform.system() == 'Windows': - # without this, tests hang on windows forever - kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP - - proc = testdir.popen( - cmdargs, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs, - ) - assert not proc.returncode - time.sleep(_PROC_SPAWN_WAIT) - yield proc - sig_prog(proc, _INT_SIGNAL) +from conftest import ( + tractor_test, + sig_prog, + _INT_SIGNAL, + _INT_RETURN_CODE, +) def test_abort_on_sigint(daemon): @@ -67,6 +19,7 @@ def test_abort_on_sigint(daemon): time.sleep(0.1) sig_prog(daemon, _INT_SIGNAL) assert daemon.returncode == _INT_RETURN_CODE + # XXX: oddly, couldn't get capfd.readouterr() to work here? if platform.system() != 'Windows': # don't check stderr on windows as its empty when sending CTRL_C_EVENT From 639299e6eb1a442fbe27e7c2a18c6678396b14b2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 15:40:41 -0400 Subject: [PATCH 04/12] Expose a `.get_registry()` method on the arbiter --- tractor/_actor.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tractor/_actor.py b/tractor/_actor.py index 6b6cdef..5934d7e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -678,7 +678,10 @@ class Actor: finally: if registered_with_arbiter: + # with trio.move_on_after(3) as cs: + # cs.shield = True await self._do_unreg(self._arb_addr) + # terminate actor once all it's peers (actors that connected # to it as clients) have disappeared if not self._no_more_peers.is_set(): @@ -863,6 +866,13 @@ class Arbiter(Actor): return None + async def get_registry( + self + ) -> Dict[str, Tuple[str, str]]: + """Return current name registry. + """ + return list(self._registry) + async def wait_for_actor( self, name: str ) -> List[Tuple[str, int]]: From 699bfd18577cef9aa56251b413f7df8f3779486e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 15:41:41 -0400 Subject: [PATCH 05/12] Run unreg on cancel tests with remote arbiter as well --- tests/test_discovery.py | 72 +++++++++++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 13 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index df4a09a..7d6f69b 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -4,6 +4,7 @@ Actor "discovery" testing import os import signal import platform +from functools import partial import pytest import tractor @@ -86,17 +87,29 @@ async def test_trynamic_trio(func, start_method): print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") -@pytest.mark.parametrize('use_signal', [False, True]) -def test_subactors_unregister_on_cancel(start_method, use_signal): - """Verify that cancelling a nursery results in all subactors - deregistering themselves with the arbiter. - """ - async def main(): - actor = tractor.current_actor() - assert actor.is_arbiter - registry = actor._registry +async def spawn_and_check_registry( + arb_addr: tuple, + use_signal: bool, + remote_arbiter: bool = False, +) -> None: + actor = tractor.current_actor() - # arbiter is registered + if remote_arbiter: + assert not actor.is_arbiter + + async with tractor.get_arbiter(*arb_addr) as portal: + if actor.is_arbiter: + async def get_reg(): + return actor._registry + + extra = 1 # arbiter is local root actor + + else: + get_reg = partial(portal.run, 'self', 'get_registry') + extra = 2 # local root actor + remote arbiter + + # ensure current actor is registered + registry = await get_reg() assert actor.uid in registry try: @@ -109,10 +122,11 @@ def test_subactors_unregister_on_cancel(start_method, use_signal): # wait on last actor to come up async with tractor.wait_for_actor(name): + registry = await get_reg() for uid in n._children: assert uid in registry - assert len(portals) + 1 == len(registry) + assert len(portals) + extra == len(registry) # trigger cancel if use_signal: @@ -124,8 +138,40 @@ def test_subactors_unregister_on_cancel(start_method, use_signal): finally: # all subactors should have de-registered await trio.sleep(0.5) - assert len(registry) == 1 + registry = await get_reg() + assert len(registry) == extra assert actor.uid in registry + +@pytest.mark.parametrize('use_signal', [False, True]) +def test_subactors_unregister_on_cancel( + start_method, + use_signal, + arb_addr +): + """Verify that cancelling a nursery results in all subactors + deregistering themselves with the arbiter. + """ with pytest.raises(KeyboardInterrupt): - tractor.run(main) + tractor.run(spawn_and_check_registry, arb_addr, use_signal) + + +@pytest.mark.parametrize('use_signal', [False, True]) +def test_subactors_unregister_on_cancel_remote_daemon( + daemon, + start_method, + use_signal, + arb_addr, +): + """Verify that cancelling a nursery results in all subactors + deregistering themselves with the arbiter. + """ + with pytest.raises(KeyboardInterrupt): + tractor.run( + spawn_and_check_registry, + arb_addr, + use_signal, + True, + # XXX: required to use remote daemon! + arbiter_addr=arb_addr + ) From a5279f80a781b669ea03839f1b686d5c2ca92087 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 18:24:28 -0400 Subject: [PATCH 06/12] Actually reproduce the de-registration problem This truly reproduces #141. It turns out the problem only occurs when we're cancelled in the middle of consuming "infinite streams". Good news is this tests a lot of edge cases :) --- tests/test_discovery.py | 139 +++++++++++++++++++++++++++------------- 1 file changed, 96 insertions(+), 43 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 7d6f69b..6370fe2 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -5,6 +5,7 @@ import os import signal import platform from functools import partial +import itertools import pytest import tractor @@ -87,10 +88,35 @@ async def test_trynamic_trio(func, start_method): print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") +async def stream_forever(): + for i in itertools.count(): + yield i + await trio.sleep(0.01) + + +async def cancel(use_signal, delay=0): + # hold on there sally + await trio.sleep(delay) + + # trigger cancel + if use_signal: + if platform.system() == 'Windows': + pytest.skip("SIGINT not supported on windows") + os.kill(os.getpid(), signal.SIGINT) + else: + raise KeyboardInterrupt + + +async def stream_from(portal): + async for value in await portal.result(): + print(value) + + async def spawn_and_check_registry( arb_addr: tuple, use_signal: bool, remote_arbiter: bool = False, + with_streaming: bool = False, ) -> None: actor = tractor.current_actor() @@ -101,9 +127,7 @@ async def spawn_and_check_registry( if actor.is_arbiter: async def get_reg(): return actor._registry - extra = 1 # arbiter is local root actor - else: get_reg = partial(portal.run, 'self', 'get_registry') extra = 2 # local root actor + remote arbiter @@ -112,66 +136,95 @@ async def spawn_and_check_registry( registry = await get_reg() assert actor.uid in registry - try: - async with tractor.open_nursery() as n: - portals = {} - for i in range(3): - name = f'a{i}' - portals[name] = await n.run_in_actor( - name, trio.sleep_forever) + if with_streaming: + to_run = stream_forever + else: + to_run = trio.sleep_forever - # wait on last actor to come up - async with tractor.wait_for_actor(name): + async with trio.open_nursery() as trion: + try: + async with tractor.open_nursery() as n: + portals = {} + for i in range(3): + name = f'a{i}' + portals[name] = await n.run_in_actor(name, to_run) + + # wait on last actor to come up + async with tractor.wait_for_actor(name): + registry = await get_reg() + for uid in n._children: + assert uid in registry + + assert len(portals) + extra == len(registry) + + if with_streaming: + await trio.sleep(0.1) + + pts = list(portals.values()) + for p in pts[:-1]: + trion.start_soon(stream_from, p) + + # stream for 1 sec + trion.start_soon(cancel, use_signal, 1) + + last_p = pts[-1] + async for value in await last_p.result(): + print(value) + else: + await cancel(use_signal) + + finally: + with trio.CancelScope(shield=True): + await trio.sleep(0.5) + + # all subactors should have de-registered registry = await get_reg() - for uid in n._children: - assert uid in registry - - assert len(portals) + extra == len(registry) - - # trigger cancel - if use_signal: - if platform.system() == 'Windows': - pytest.skip("SIGINT not supported on windows") - os.kill(os.getpid(), signal.SIGINT) - else: - raise KeyboardInterrupt - finally: - # all subactors should have de-registered - await trio.sleep(0.5) - registry = await get_reg() - assert len(registry) == extra - assert actor.uid in registry + assert len(registry) == extra + assert actor.uid in registry @pytest.mark.parametrize('use_signal', [False, True]) +@pytest.mark.parametrize('with_streaming', [False, True]) def test_subactors_unregister_on_cancel( - start_method, - use_signal, - arb_addr -): - """Verify that cancelling a nursery results in all subactors - deregistering themselves with the arbiter. - """ - with pytest.raises(KeyboardInterrupt): - tractor.run(spawn_and_check_registry, arb_addr, use_signal) - - -@pytest.mark.parametrize('use_signal', [False, True]) -def test_subactors_unregister_on_cancel_remote_daemon( - daemon, start_method, use_signal, arb_addr, + with_streaming, ): """Verify that cancelling a nursery results in all subactors deregistering themselves with the arbiter. """ + with pytest.raises(KeyboardInterrupt): + tractor.run( + spawn_and_check_registry, + arb_addr, + use_signal, + False, + with_streaming, + arbiter_addr=arb_addr + ) + + +@pytest.mark.parametrize('use_signal', [False, True]) +@pytest.mark.parametrize('with_streaming', [False, True]) +def test_subactors_unregister_on_cancel_remote_daemon( + daemon, + start_method, + use_signal, + arb_addr, + with_streaming, +): + """Verify that cancelling a nursery results in all subactors + deregistering themselves with a **remote** (not in the local process + tree) arbiter. + """ with pytest.raises(KeyboardInterrupt): tractor.run( spawn_and_check_registry, arb_addr, use_signal, True, + with_streaming, # XXX: required to use remote daemon! arbiter_addr=arb_addr ) From fbd68d2d913f16bf935469f734fa210dd669b35e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 18:41:21 -0400 Subject: [PATCH 07/12] Allow for tuple keys with std `msgpack` --- tractor/_ipc.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 0d24307..a3a271d 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -3,6 +3,8 @@ Inter-process comms abstractions """ import typing from typing import Any, Tuple, Optional +from functools import partial +import inspect import msgpack import trio @@ -11,6 +13,14 @@ from async_generator import asynccontextmanager from .log import get_logger log = get_logger('ipc') +# :eyeroll: +try: + import msgpack_numpy + Unpacker = msgpack_numpy.Unpacker +except ImportError: + # just plain ``msgpack`` requires tweaking key settings + Unpacker = partial(msgpack.Unpacker, strict_map_key=False) + class MsgpackStream: """A ``trio.SocketStream`` delivering ``msgpack`` formatted data. @@ -32,7 +42,10 @@ class MsgpackStream: async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ - unpacker = msgpack.Unpacker(raw=False, use_list=False) + unpacker = Unpacker( + raw=False, + use_list=False, + ) while True: try: data = await self.stream.receive_some(2**10) From 56b81f07e58a42edbfa8a02ce6d992ff87a0513f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 18:42:23 -0400 Subject: [PATCH 08/12] Return `Dict[Tuple, Tuple]` from `.get_registry()` --- tractor/_actor.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 5934d7e..a0fb229 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -440,7 +440,7 @@ class Actor: f"Cancelling all tasks for {chan} from {chan.uid}") for (channel, cid) in self._rpc_tasks: if channel is chan: - self._cancel_task(cid, channel) + await self._cancel_task(cid, channel) log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") @@ -871,7 +871,11 @@ class Arbiter(Actor): ) -> Dict[str, Tuple[str, str]]: """Return current name registry. """ - return list(self._registry) + # NOTE: requires ``strict_map_key=False`` to the msgpack + # unpacker since we have tuples as keys (not this makes the + # arbiter suscetible to hashdos): + # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 + return self._registry async def wait_for_actor( self, name: str From a24c6bfdd2813add33de598cf9578a2591503f6d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 18:44:50 -0400 Subject: [PATCH 09/12] Correctly catch cancelled nursery case (purely for logging) --- tractor/_trionics.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 82d2653..746960c 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -230,7 +230,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: f"Waiting on subactors {anursery._children}" "to complete" ) - except (BaseException, Exception) as err: + except BaseException as err: # if the caller's scope errored then we activate our # one-cancels-all supervisor strategy (don't # worry more are coming). @@ -241,10 +241,11 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # the `else:` block here might not complete? # For now, shield both. with trio.CancelScope(shield=True): - if err in (trio.Cancelled, KeyboardInterrupt): + etype = type(err) + if etype in (trio.Cancelled, KeyboardInterrupt): log.warning( f"Nursery for {current_actor().uid} was " - f"cancelled with {err}") + f"cancelled with {etype}") else: log.exception( f"Nursery for {current_actor().uid} " From ae9016c06a886e87cc2382129ceac80c72e8f952 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 18:46:18 -0400 Subject: [PATCH 10/12] Log on KBI cancelled termination --- tractor/_discovery.py | 4 +++- tractor/_entry.py | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 3667c90..0407896 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -23,6 +23,7 @@ async def get_arbiter( arbiter. """ actor = current_actor() + if not actor: raise RuntimeError("No actor instance has been defined yet?") @@ -38,7 +39,8 @@ async def get_arbiter( @asynccontextmanager async def find_actor( - name: str, arbiter_sockaddr: Tuple[str, int] = None + name: str, + arbiter_sockaddr: Tuple[str, int] = None ) -> typing.AsyncGenerator[Optional[Portal], None]: """Ask the arbiter to find actor(s) by name. diff --git a/tractor/_entry.py b/tractor/_entry.py index 1ccc0b5..883cc6b 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -72,8 +72,10 @@ def _trio_main( actor._async_main, parent_addr=parent_addr ) + try: trio.run(trio_main) except KeyboardInterrupt: - pass # handle it the same way trio does? - log.info(f"Actor {actor.uid} terminated") \ No newline at end of file + log.warning(f"Actor {actor.uid} received KBI") + + log.info(f"Actor {actor.uid} terminated") From 4f92cfe74fb14913e07ce703c190582ec136869c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Aug 2020 18:57:00 -0400 Subject: [PATCH 11/12] Don't `.aclose` `trio` processes until the very end Trio will kill subprocesses via `Process.__aexit__()` using a `finally:` block (which, yes, will get triggered on cancellation) so we avoid that until true process "tear down" since subactors do many things during graceful shutdown (such as de-registering from the name discovery system). Oddly this only seems to be an issue during cancellation of infinite stream consumption. Resolves #141 --- tractor/_spawn.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index baed198..02d14bc 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -157,7 +157,6 @@ async def cancel_on_completion( @asynccontextmanager async def spawn_subactor( subactor: 'Actor', - accept_addr: Tuple[str, int], parent_addr: Tuple[str, int], ): @@ -167,8 +166,11 @@ async def spawn_subactor( # Hardcode this (instead of using ``_child.__name__`` to avoid a # double import warning: https://stackoverflow.com/a/45070583 "tractor._child", + # This is merely an identifier for debugging purposes when + # viewing the process tree from the OS "--uid", str(subactor.uid), + # Address the child must connect to on startup "--parent_addr", str(parent_addr) ] @@ -179,8 +181,14 @@ async def spawn_subactor( subactor.loglevel ] - async with await trio.open_process(spawn_cmd) as proc: - yield proc + proc = await trio.open_process(spawn_cmd) + yield proc + + # XXX: do this **after** cancellation/tearfown + # to avoid killing the process too early + # since trio does this internally on ``__aexit__()`` + async with proc: + log.debug(f"Terminating {proc}") async def new_proc( @@ -206,7 +214,6 @@ async def new_proc( if use_trio_run_in_process or _spawn_method == 'trio': async with spawn_subactor( subactor, - bind_addr, parent_addr, ) as proc: log.info(f"Started {proc}") From 09ae51900d7ade83a937057a842747bd61a6be9a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 4 Aug 2020 09:52:49 -0400 Subject: [PATCH 12/12] Better clarify uid comment --- tractor/_spawn.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 02d14bc..b8620c2 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -166,8 +166,10 @@ async def spawn_subactor( # Hardcode this (instead of using ``_child.__name__`` to avoid a # double import warning: https://stackoverflow.com/a/45070583 "tractor._child", - # This is merely an identifier for debugging purposes when - # viewing the process tree from the OS + # We provide the child's unique identifier on this exec/spawn + # line for debugging purposes when viewing the process tree from + # the OS; it otherwise can be passed via the parent channel if + # we prefer in the future (for privacy). "--uid", str(subactor.uid), # Address the child must connect to on startup