forked from goodboy/tractor
1
0
Fork 0

Merge pull request #143 from goodboy/ensure_deregister

Ensure actors de-register with arbiter when cancelled during infitinite streaming.
matrix
Guillermo Rodriguez 2020-08-04 12:19:02 -03:00 committed by GitHub
commit 8da45eedf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 265 additions and 66 deletions

View File

@ -1,9 +1,13 @@
""" """
``tractor`` testing!! ``tractor`` testing!!
""" """
import sys
import subprocess
import os import os
import random import random
import signal
import platform import platform
import time
import pytest import pytest
import tractor import tractor
@ -16,6 +20,19 @@ pytest_plugins = ['pytester']
_arb_addr = '127.0.0.1', random.randint(1000, 9999) _arb_addr = '127.0.0.1', random.randint(1000, 9999)
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
if platform.system() == 'Windows':
_KILL_SIGNAL = signal.CTRL_BREAK_EVENT
_INT_SIGNAL = signal.CTRL_C_EVENT
_INT_RETURN_CODE = 3221225786
_PROC_SPAWN_WAIT = 2
else:
_KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4
no_windows = pytest.mark.skipif( no_windows = pytest.mark.skipif(
platform.system() == "Windows", platform.system() == "Windows",
reason="Test is unsupported on windows", reason="Test is unsupported on windows",
@ -89,3 +106,43 @@ def pytest_generate_tests(metafunc):
methods = ['trio'] methods = ['trio']
metafunc.parametrize("start_method", methods, scope='module') metafunc.parametrize("start_method", methods, scope='module')
def sig_prog(proc, sig):
"Kill the actor-process with ``sig``."
proc.send_signal(sig)
time.sleep(0.1)
if not proc.poll():
# TODO: why sometimes does SIGINT not work on teardown?
# seems to happen only when trace logging enabled?
proc.send_signal(_KILL_SIGNAL)
ret = proc.wait()
assert ret
@pytest.fixture
def daemon(loglevel, testdir, arb_addr):
"""Run a daemon actor as a "remote arbiter".
"""
cmdargs = [
sys.executable, '-c',
"import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})"
.format(
arb_addr,
"'{}'".format(loglevel) if loglevel else None)
]
kwargs = dict()
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = testdir.popen(
cmdargs,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
assert not proc.returncode
time.sleep(_PROC_SPAWN_WAIT)
yield proc
sig_prog(proc, _INT_SIGNAL)

View File

@ -1,6 +1,12 @@
""" """
Actor "discovery" testing Actor "discovery" testing
""" """
import os
import signal
import platform
from functools import partial
import itertools
import pytest import pytest
import tractor import tractor
import trio import trio
@ -80,3 +86,145 @@ async def test_trynamic_trio(func, start_method):
print(await gretchen.result()) print(await gretchen.result())
print(await donny.result()) print(await donny.result())
print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...")
async def stream_forever():
for i in itertools.count():
yield i
await trio.sleep(0.01)
async def cancel(use_signal, delay=0):
# hold on there sally
await trio.sleep(delay)
# trigger cancel
if use_signal:
if platform.system() == 'Windows':
pytest.skip("SIGINT not supported on windows")
os.kill(os.getpid(), signal.SIGINT)
else:
raise KeyboardInterrupt
async def stream_from(portal):
async for value in await portal.result():
print(value)
async def spawn_and_check_registry(
arb_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
with_streaming: bool = False,
) -> None:
actor = tractor.current_actor()
if remote_arbiter:
assert not actor.is_arbiter
async with tractor.get_arbiter(*arb_addr) as portal:
if actor.is_arbiter:
async def get_reg():
return actor._registry
extra = 1 # arbiter is local root actor
else:
get_reg = partial(portal.run, 'self', 'get_registry')
extra = 2 # local root actor + remote arbiter
# ensure current actor is registered
registry = await get_reg()
assert actor.uid in registry
if with_streaming:
to_run = stream_forever
else:
to_run = trio.sleep_forever
async with trio.open_nursery() as trion:
try:
async with tractor.open_nursery() as n:
portals = {}
for i in range(3):
name = f'a{i}'
portals[name] = await n.run_in_actor(name, to_run)
# wait on last actor to come up
async with tractor.wait_for_actor(name):
registry = await get_reg()
for uid in n._children:
assert uid in registry
assert len(portals) + extra == len(registry)
if with_streaming:
await trio.sleep(0.1)
pts = list(portals.values())
for p in pts[:-1]:
trion.start_soon(stream_from, p)
# stream for 1 sec
trion.start_soon(cancel, use_signal, 1)
last_p = pts[-1]
async for value in await last_p.result():
print(value)
else:
await cancel(use_signal)
finally:
with trio.CancelScope(shield=True):
await trio.sleep(0.5)
# all subactors should have de-registered
registry = await get_reg()
assert len(registry) == extra
assert actor.uid in registry
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel(
start_method,
use_signal,
arb_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(
spawn_and_check_registry,
arb_addr,
use_signal,
False,
with_streaming,
arbiter_addr=arb_addr
)
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
start_method,
use_signal,
arb_addr,
with_streaming,
):
"""Verify that cancelling a nursery results in all subactors
deregistering themselves with a **remote** (not in the local process
tree) arbiter.
"""
with pytest.raises(KeyboardInterrupt):
tractor.run(
spawn_and_check_registry,
arb_addr,
use_signal,
True,
with_streaming,
# XXX: required to use remote daemon!
arbiter_addr=arb_addr
)

View File

@ -2,64 +2,16 @@
Multiple python programs invoking ``tractor.run()`` Multiple python programs invoking ``tractor.run()``
""" """
import platform import platform
import sys
import time import time
import signal
import subprocess
import pytest import pytest
import tractor import tractor
from conftest import tractor_test from conftest import (
tractor_test,
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives sig_prog,
if platform.system() == 'Windows': _INT_SIGNAL,
_KILL_SIGNAL = signal.CTRL_BREAK_EVENT _INT_RETURN_CODE,
_INT_SIGNAL = signal.CTRL_C_EVENT )
_INT_RETURN_CODE = 3221225786
_PROC_SPAWN_WAIT = 2
else:
_KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4
def sig_prog(proc, sig):
"Kill the actor-process with ``sig``."
proc.send_signal(sig)
time.sleep(0.1)
if not proc.poll():
# TODO: why sometimes does SIGINT not work on teardown?
# seems to happen only when trace logging enabled?
proc.send_signal(_KILL_SIGNAL)
ret = proc.wait()
assert ret
@pytest.fixture
def daemon(loglevel, testdir, arb_addr):
cmdargs = [
sys.executable, '-c',
"import tractor; tractor.run_daemon((), arbiter_addr={}, loglevel={})"
.format(
arb_addr,
"'{}'".format(loglevel) if loglevel else None)
]
kwargs = dict()
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = testdir.popen(
cmdargs,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
assert not proc.returncode
time.sleep(_PROC_SPAWN_WAIT)
yield proc
sig_prog(proc, _INT_SIGNAL)
def test_abort_on_sigint(daemon): def test_abort_on_sigint(daemon):
@ -67,6 +19,7 @@ def test_abort_on_sigint(daemon):
time.sleep(0.1) time.sleep(0.1)
sig_prog(daemon, _INT_SIGNAL) sig_prog(daemon, _INT_SIGNAL)
assert daemon.returncode == _INT_RETURN_CODE assert daemon.returncode == _INT_RETURN_CODE
# XXX: oddly, couldn't get capfd.readouterr() to work here? # XXX: oddly, couldn't get capfd.readouterr() to work here?
if platform.system() != 'Windows': if platform.system() != 'Windows':
# don't check stderr on windows as its empty when sending CTRL_C_EVENT # don't check stderr on windows as its empty when sending CTRL_C_EVENT

View File

@ -440,7 +440,7 @@ class Actor:
f"Cancelling all tasks for {chan} from {chan.uid}") f"Cancelling all tasks for {chan} from {chan.uid}")
for (channel, cid) in self._rpc_tasks: for (channel, cid) in self._rpc_tasks:
if channel is chan: if channel is chan:
self._cancel_task(cid, channel) await self._cancel_task(cid, channel)
log.debug( log.debug(
f"Msg loop signalled to terminate for" f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}") f" {chan} from {chan.uid}")
@ -678,7 +678,10 @@ class Actor:
finally: finally:
if registered_with_arbiter: if registered_with_arbiter:
# with trio.move_on_after(3) as cs:
# cs.shield = True
await self._do_unreg(self._arb_addr) await self._do_unreg(self._arb_addr)
# terminate actor once all it's peers (actors that connected # terminate actor once all it's peers (actors that connected
# to it as clients) have disappeared # to it as clients) have disappeared
if not self._no_more_peers.is_set(): if not self._no_more_peers.is_set():
@ -863,6 +866,17 @@ class Arbiter(Actor):
return None return None
async def get_registry(
self
) -> Dict[str, Tuple[str, str]]:
"""Return current name registry.
"""
# NOTE: requires ``strict_map_key=False`` to the msgpack
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return self._registry
async def wait_for_actor( async def wait_for_actor(
self, name: str self, name: str
) -> List[Tuple[str, int]]: ) -> List[Tuple[str, int]]:

View File

@ -23,6 +23,7 @@ async def get_arbiter(
arbiter. arbiter.
""" """
actor = current_actor() actor = current_actor()
if not actor: if not actor:
raise RuntimeError("No actor instance has been defined yet?") raise RuntimeError("No actor instance has been defined yet?")
@ -38,7 +39,8 @@ async def get_arbiter(
@asynccontextmanager @asynccontextmanager
async def find_actor( async def find_actor(
name: str, arbiter_sockaddr: Tuple[str, int] = None name: str,
arbiter_sockaddr: Tuple[str, int] = None
) -> typing.AsyncGenerator[Optional[Portal], None]: ) -> typing.AsyncGenerator[Optional[Portal], None]:
"""Ask the arbiter to find actor(s) by name. """Ask the arbiter to find actor(s) by name.

View File

@ -72,8 +72,10 @@ def _trio_main(
actor._async_main, actor._async_main,
parent_addr=parent_addr parent_addr=parent_addr
) )
try: try:
trio.run(trio_main) trio.run(trio_main)
except KeyboardInterrupt: except KeyboardInterrupt:
pass # handle it the same way trio does? log.warning(f"Actor {actor.uid} received KBI")
log.info(f"Actor {actor.uid} terminated") log.info(f"Actor {actor.uid} terminated")

View File

@ -3,6 +3,8 @@ Inter-process comms abstractions
""" """
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional
from functools import partial
import inspect
import msgpack import msgpack
import trio import trio
@ -11,6 +13,14 @@ from async_generator import asynccontextmanager
from .log import get_logger from .log import get_logger
log = get_logger('ipc') log = get_logger('ipc')
# :eyeroll:
try:
import msgpack_numpy
Unpacker = msgpack_numpy.Unpacker
except ImportError:
# just plain ``msgpack`` requires tweaking key settings
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
class MsgpackStream: class MsgpackStream:
"""A ``trio.SocketStream`` delivering ``msgpack`` formatted data. """A ``trio.SocketStream`` delivering ``msgpack`` formatted data.
@ -32,7 +42,10 @@ class MsgpackStream:
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream. """Yield packets from the underlying stream.
""" """
unpacker = msgpack.Unpacker(raw=False, use_list=False) unpacker = Unpacker(
raw=False,
use_list=False,
)
while True: while True:
try: try:
data = await self.stream.receive_some(2**10) data = await self.stream.receive_some(2**10)

View File

@ -157,7 +157,6 @@ async def cancel_on_completion(
@asynccontextmanager @asynccontextmanager
async def spawn_subactor( async def spawn_subactor(
subactor: 'Actor', subactor: 'Actor',
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
): ):
@ -167,8 +166,13 @@ async def spawn_subactor(
# Hardcode this (instead of using ``_child.__name__`` to avoid a # Hardcode this (instead of using ``_child.__name__`` to avoid a
# double import warning: https://stackoverflow.com/a/45070583 # double import warning: https://stackoverflow.com/a/45070583
"tractor._child", "tractor._child",
# We provide the child's unique identifier on this exec/spawn
# line for debugging purposes when viewing the process tree from
# the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy).
"--uid", "--uid",
str(subactor.uid), str(subactor.uid),
# Address the child must connect to on startup
"--parent_addr", "--parent_addr",
str(parent_addr) str(parent_addr)
] ]
@ -179,8 +183,14 @@ async def spawn_subactor(
subactor.loglevel subactor.loglevel
] ]
async with await trio.open_process(spawn_cmd) as proc: proc = await trio.open_process(spawn_cmd)
yield proc yield proc
# XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early
# since trio does this internally on ``__aexit__()``
async with proc:
log.debug(f"Terminating {proc}")
async def new_proc( async def new_proc(
@ -206,7 +216,6 @@ async def new_proc(
if use_trio_run_in_process or _spawn_method == 'trio': if use_trio_run_in_process or _spawn_method == 'trio':
async with spawn_subactor( async with spawn_subactor(
subactor, subactor,
bind_addr,
parent_addr, parent_addr,
) as proc: ) as proc:
log.info(f"Started {proc}") log.info(f"Started {proc}")

View File

@ -230,7 +230,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
f"Waiting on subactors {anursery._children}" f"Waiting on subactors {anursery._children}"
"to complete" "to complete"
) )
except (BaseException, Exception) as err: except BaseException as err:
# if the caller's scope errored then we activate our # if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't # one-cancels-all supervisor strategy (don't
# worry more are coming). # worry more are coming).
@ -241,10 +241,11 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
# the `else:` block here might not complete? # the `else:` block here might not complete?
# For now, shield both. # For now, shield both.
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
if err in (trio.Cancelled, KeyboardInterrupt): etype = type(err)
if etype in (trio.Cancelled, KeyboardInterrupt):
log.warning( log.warning(
f"Nursery for {current_actor().uid} was " f"Nursery for {current_actor().uid} was "
f"cancelled with {err}") f"cancelled with {etype}")
else: else:
log.exception( log.exception(
f"Nursery for {current_actor().uid} " f"Nursery for {current_actor().uid} "