diff --git a/tests/conftest.py b/tests/conftest.py index 9f029ee..07a455c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,9 +1,13 @@ """ ``tractor`` testing!! """ +import sys +import subprocess import os import random +import signal import platform +import time import pytest import tractor @@ -16,6 +20,19 @@ pytest_plugins = ['pytester'] _arb_addr = '127.0.0.1', random.randint(1000, 9999) +# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives +if platform.system() == 'Windows': + _KILL_SIGNAL = signal.CTRL_BREAK_EVENT + _INT_SIGNAL = signal.CTRL_C_EVENT + _INT_RETURN_CODE = 3221225786 + _PROC_SPAWN_WAIT = 2 +else: + _KILL_SIGNAL = signal.SIGKILL + _INT_SIGNAL = signal.SIGINT + _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value + _PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4 + + no_windows = pytest.mark.skipif( platform.system() == "Windows", reason="Test is unsupported on windows", @@ -89,3 +106,43 @@ def pytest_generate_tests(metafunc): methods = ['trio'] metafunc.parametrize("start_method", methods, scope='module') + + +def sig_prog(proc, sig): + "Kill the actor-process with ``sig``." + proc.send_signal(sig) + time.sleep(0.1) + if not proc.poll(): + # TODO: why sometimes does SIGINT not work on teardown? + # seems to happen only when trace logging enabled? + proc.send_signal(_KILL_SIGNAL) + ret = proc.wait() + assert ret + + +@pytest.fixture +def daemon(loglevel, testdir, arb_addr): + """Run a daemon actor as a "remote arbiter". + """ + cmdargs = [ + sys.executable, '-c', + "import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})" + .format( + arb_addr, + "'{}'".format(loglevel) if loglevel else None) + ] + kwargs = dict() + if platform.system() == 'Windows': + # without this, tests hang on windows forever + kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + + proc = testdir.popen( + cmdargs, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + **kwargs, + ) + assert not proc.returncode + time.sleep(_PROC_SPAWN_WAIT) + yield proc + sig_prog(proc, _INT_SIGNAL) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index f458494..6370fe2 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,6 +1,12 @@ """ Actor "discovery" testing """ +import os +import signal +import platform +from functools import partial +import itertools + import pytest import tractor import trio @@ -80,3 +86,145 @@ async def test_trynamic_trio(func, start_method): print(await gretchen.result()) print(await donny.result()) print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") + + +async def stream_forever(): + for i in itertools.count(): + yield i + await trio.sleep(0.01) + + +async def cancel(use_signal, delay=0): + # hold on there sally + await trio.sleep(delay) + + # trigger cancel + if use_signal: + if platform.system() == 'Windows': + pytest.skip("SIGINT not supported on windows") + os.kill(os.getpid(), signal.SIGINT) + else: + raise KeyboardInterrupt + + +async def stream_from(portal): + async for value in await portal.result(): + print(value) + + +async def spawn_and_check_registry( + arb_addr: tuple, + use_signal: bool, + remote_arbiter: bool = False, + with_streaming: bool = False, +) -> None: + actor = tractor.current_actor() + + if remote_arbiter: + assert not actor.is_arbiter + + async with tractor.get_arbiter(*arb_addr) as portal: + if actor.is_arbiter: + async def get_reg(): + return actor._registry + extra = 1 # arbiter is local root actor + else: + get_reg = partial(portal.run, 'self', 'get_registry') + extra = 2 # local root actor + remote arbiter + + # ensure current actor is registered + registry = await get_reg() + assert actor.uid in registry + + if with_streaming: + to_run = stream_forever + else: + to_run = trio.sleep_forever + + async with trio.open_nursery() as trion: + try: + async with tractor.open_nursery() as n: + portals = {} + for i in range(3): + name = f'a{i}' + portals[name] = await n.run_in_actor(name, to_run) + + # wait on last actor to come up + async with tractor.wait_for_actor(name): + registry = await get_reg() + for uid in n._children: + assert uid in registry + + assert len(portals) + extra == len(registry) + + if with_streaming: + await trio.sleep(0.1) + + pts = list(portals.values()) + for p in pts[:-1]: + trion.start_soon(stream_from, p) + + # stream for 1 sec + trion.start_soon(cancel, use_signal, 1) + + last_p = pts[-1] + async for value in await last_p.result(): + print(value) + else: + await cancel(use_signal) + + finally: + with trio.CancelScope(shield=True): + await trio.sleep(0.5) + + # all subactors should have de-registered + registry = await get_reg() + assert len(registry) == extra + assert actor.uid in registry + + +@pytest.mark.parametrize('use_signal', [False, True]) +@pytest.mark.parametrize('with_streaming', [False, True]) +def test_subactors_unregister_on_cancel( + start_method, + use_signal, + arb_addr, + with_streaming, +): + """Verify that cancelling a nursery results in all subactors + deregistering themselves with the arbiter. + """ + with pytest.raises(KeyboardInterrupt): + tractor.run( + spawn_and_check_registry, + arb_addr, + use_signal, + False, + with_streaming, + arbiter_addr=arb_addr + ) + + +@pytest.mark.parametrize('use_signal', [False, True]) +@pytest.mark.parametrize('with_streaming', [False, True]) +def test_subactors_unregister_on_cancel_remote_daemon( + daemon, + start_method, + use_signal, + arb_addr, + with_streaming, +): + """Verify that cancelling a nursery results in all subactors + deregistering themselves with a **remote** (not in the local process + tree) arbiter. + """ + with pytest.raises(KeyboardInterrupt): + tractor.run( + spawn_and_check_registry, + arb_addr, + use_signal, + True, + with_streaming, + # XXX: required to use remote daemon! + arbiter_addr=arb_addr + ) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 20f4226..12ca3ef 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -2,64 +2,16 @@ Multiple python programs invoking ``tractor.run()`` """ import platform -import sys import time -import signal -import subprocess import pytest import tractor -from conftest import tractor_test - -# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives -if platform.system() == 'Windows': - _KILL_SIGNAL = signal.CTRL_BREAK_EVENT - _INT_SIGNAL = signal.CTRL_C_EVENT - _INT_RETURN_CODE = 3221225786 - _PROC_SPAWN_WAIT = 2 -else: - _KILL_SIGNAL = signal.SIGKILL - _INT_SIGNAL = signal.SIGINT - _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value - _PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4 - - -def sig_prog(proc, sig): - "Kill the actor-process with ``sig``." - proc.send_signal(sig) - time.sleep(0.1) - if not proc.poll(): - # TODO: why sometimes does SIGINT not work on teardown? - # seems to happen only when trace logging enabled? - proc.send_signal(_KILL_SIGNAL) - ret = proc.wait() - assert ret - - -@pytest.fixture -def daemon(loglevel, testdir, arb_addr): - cmdargs = [ - sys.executable, '-c', - "import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})" - .format( - arb_addr, - "'{}'".format(loglevel) if loglevel else None) - ] - kwargs = dict() - if platform.system() == 'Windows': - # without this, tests hang on windows forever - kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP - - proc = testdir.popen( - cmdargs, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - **kwargs, - ) - assert not proc.returncode - time.sleep(_PROC_SPAWN_WAIT) - yield proc - sig_prog(proc, _INT_SIGNAL) +from conftest import ( + tractor_test, + sig_prog, + _INT_SIGNAL, + _INT_RETURN_CODE, +) def test_abort_on_sigint(daemon): @@ -67,6 +19,7 @@ def test_abort_on_sigint(daemon): time.sleep(0.1) sig_prog(daemon, _INT_SIGNAL) assert daemon.returncode == _INT_RETURN_CODE + # XXX: oddly, couldn't get capfd.readouterr() to work here? if platform.system() != 'Windows': # don't check stderr on windows as its empty when sending CTRL_C_EVENT diff --git a/tractor/_actor.py b/tractor/_actor.py index 6b6cdef..a0fb229 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -440,7 +440,7 @@ class Actor: f"Cancelling all tasks for {chan} from {chan.uid}") for (channel, cid) in self._rpc_tasks: if channel is chan: - self._cancel_task(cid, channel) + await self._cancel_task(cid, channel) log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") @@ -678,7 +678,10 @@ class Actor: finally: if registered_with_arbiter: + # with trio.move_on_after(3) as cs: + # cs.shield = True await self._do_unreg(self._arb_addr) + # terminate actor once all it's peers (actors that connected # to it as clients) have disappeared if not self._no_more_peers.is_set(): @@ -863,6 +866,17 @@ class Arbiter(Actor): return None + async def get_registry( + self + ) -> Dict[str, Tuple[str, str]]: + """Return current name registry. + """ + # NOTE: requires ``strict_map_key=False`` to the msgpack + # unpacker since we have tuples as keys (not this makes the + # arbiter suscetible to hashdos): + # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 + return self._registry + async def wait_for_actor( self, name: str ) -> List[Tuple[str, int]]: diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 3667c90..0407896 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -23,6 +23,7 @@ async def get_arbiter( arbiter. """ actor = current_actor() + if not actor: raise RuntimeError("No actor instance has been defined yet?") @@ -38,7 +39,8 @@ async def get_arbiter( @asynccontextmanager async def find_actor( - name: str, arbiter_sockaddr: Tuple[str, int] = None + name: str, + arbiter_sockaddr: Tuple[str, int] = None ) -> typing.AsyncGenerator[Optional[Portal], None]: """Ask the arbiter to find actor(s) by name. diff --git a/tractor/_entry.py b/tractor/_entry.py index 1ccc0b5..883cc6b 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -72,8 +72,10 @@ def _trio_main( actor._async_main, parent_addr=parent_addr ) + try: trio.run(trio_main) except KeyboardInterrupt: - pass # handle it the same way trio does? - log.info(f"Actor {actor.uid} terminated") \ No newline at end of file + log.warning(f"Actor {actor.uid} received KBI") + + log.info(f"Actor {actor.uid} terminated") diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 0d24307..a3a271d 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -3,6 +3,8 @@ Inter-process comms abstractions """ import typing from typing import Any, Tuple, Optional +from functools import partial +import inspect import msgpack import trio @@ -11,6 +13,14 @@ from async_generator import asynccontextmanager from .log import get_logger log = get_logger('ipc') +# :eyeroll: +try: + import msgpack_numpy + Unpacker = msgpack_numpy.Unpacker +except ImportError: + # just plain ``msgpack`` requires tweaking key settings + Unpacker = partial(msgpack.Unpacker, strict_map_key=False) + class MsgpackStream: """A ``trio.SocketStream`` delivering ``msgpack`` formatted data. @@ -32,7 +42,10 @@ class MsgpackStream: async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ - unpacker = msgpack.Unpacker(raw=False, use_list=False) + unpacker = Unpacker( + raw=False, + use_list=False, + ) while True: try: data = await self.stream.receive_some(2**10) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index baed198..b8620c2 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -157,7 +157,6 @@ async def cancel_on_completion( @asynccontextmanager async def spawn_subactor( subactor: 'Actor', - accept_addr: Tuple[str, int], parent_addr: Tuple[str, int], ): @@ -167,8 +166,13 @@ async def spawn_subactor( # Hardcode this (instead of using ``_child.__name__`` to avoid a # double import warning: https://stackoverflow.com/a/45070583 "tractor._child", + # We provide the child's unique identifier on this exec/spawn + # line for debugging purposes when viewing the process tree from + # the OS; it otherwise can be passed via the parent channel if + # we prefer in the future (for privacy). "--uid", str(subactor.uid), + # Address the child must connect to on startup "--parent_addr", str(parent_addr) ] @@ -179,8 +183,14 @@ async def spawn_subactor( subactor.loglevel ] - async with await trio.open_process(spawn_cmd) as proc: - yield proc + proc = await trio.open_process(spawn_cmd) + yield proc + + # XXX: do this **after** cancellation/tearfown + # to avoid killing the process too early + # since trio does this internally on ``__aexit__()`` + async with proc: + log.debug(f"Terminating {proc}") async def new_proc( @@ -206,7 +216,6 @@ async def new_proc( if use_trio_run_in_process or _spawn_method == 'trio': async with spawn_subactor( subactor, - bind_addr, parent_addr, ) as proc: log.info(f"Started {proc}") diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 82d2653..746960c 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -230,7 +230,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: f"Waiting on subactors {anursery._children}" "to complete" ) - except (BaseException, Exception) as err: + except BaseException as err: # if the caller's scope errored then we activate our # one-cancels-all supervisor strategy (don't # worry more are coming). @@ -241,10 +241,11 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # the `else:` block here might not complete? # For now, shield both. with trio.CancelScope(shield=True): - if err in (trio.Cancelled, KeyboardInterrupt): + etype = type(err) + if etype in (trio.Cancelled, KeyboardInterrupt): log.warning( f"Nursery for {current_actor().uid} was " - f"cancelled with {err}") + f"cancelled with {etype}") else: log.exception( f"Nursery for {current_actor().uid} "