From bd14cbe0828214bcc4248b2dcec621e4c3806cbb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 4 Aug 2018 17:59:10 -0400 Subject: [PATCH 1/4] Port to trio's new resource error --- tractor/_actor.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index fe38929..1aa7efd 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -189,8 +189,7 @@ class Actor: self, stream: trio.SocketStream, ): - """ - Entry point for new inbound connections to the channel server. + """Entry point for new inbound connections to the channel server. """ self._no_more_peers.clear() chan = Channel(stream=stream) @@ -456,7 +455,7 @@ class Actor: try: await self._parent_chan.send( {'error': traceback.format_exc(), 'cid': 'internal'}) - except trio.ClosedStreamError: + except trio.ClosedResourceError: log.error( f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") From 758fbc6790bf3bb95d4255ebe88da102d3bcff65 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Aug 2018 13:40:37 -0400 Subject: [PATCH 2/4] Drop `Channel.aiter_recv()` Internalize the implementation of this and expect client code to iterate the `Channel` directly. Resolves #16 --- tractor/_actor.py | 2 +- tractor/_ipc.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 1aa7efd..871fcc2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -278,7 +278,7 @@ class Actor: # worked out we'll likely want to use that! log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - async for msg in chan.aiter_recv(): + async for msg in chan: if msg is None: # terminate sentinel log.debug( f"Cancelling all tasks for {chan} from {chan.uid}") diff --git a/tractor/_ipc.py b/tractor/_ipc.py index c8a811b..2d86e56 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -87,6 +87,7 @@ class Channel: self._destaddr = destaddr or self.squeue.raddr # set after handshake - always uid of far end self.uid = None + self._agen = self._aiter_recv() def __repr__(self): if self.squeue: @@ -134,8 +135,8 @@ class Channel: async def __aexit__(self, *args): await self.aclose(*args) - async def __aiter__(self): - return self.aiter_recv() + def __aiter__(self): + return self._agen async def _reconnect(self): """Handle connection failures by polling until a reconnect can be @@ -166,7 +167,7 @@ class Channel: " for re-establishment") await trio.sleep(1) - async def aiter_recv(self): + async def _aiter_recv(self): """Async iterate items from underlying stream. """ while True: From 1bd5582d8ab30a13315b93e0b62130ae4474b300 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 26 Jul 2018 17:29:23 -0400 Subject: [PATCH 3/4] Register each actor using its unique ID tuple This allows for registering more then one actor with the same "name" when you have multiple actors fulfilling the same role. Eventually we'll need support for looking up all actors registered under a given "service name" (or whatever we decide to call it). Also, a fix to the arbiter such that each new instance refers to a separate `_registry` dict (found an issue with duplicate names during testing). Resolves #7 --- tractor/_actor.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 871fcc2..9dfe73d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -434,7 +434,7 @@ class Actor: async with get_arbiter(*arbiter_addr) as arb_portal: await arb_portal.run( 'self', 'register_actor', - name=self.name, sockaddr=self.accept_addr) + uid=self.uid, sockaddr=self.accept_addr) registered_with_arbiter = True task_status.started() @@ -522,7 +522,7 @@ class Actor: if arbiter_addr is not None: async with get_arbiter(*arbiter_addr) as arb_portal: await arb_portal.run( - 'self', 'unregister_actor', name=self.name) + 'self', 'unregister_actor', uid=self.uid) except OSError: log.warn(f"Unable to unregister {self.name} from arbiter") @@ -580,24 +580,30 @@ class Actor: class Arbiter(Actor): """A special actor who knows all the other actors and always has - access to the top level nursery. + access to a top level nursery. The arbiter is by default the first actor spawned on each host and is responsible for keeping track of all other actors for coordination purposes. If a new main process is launched and an arbiter is already running that arbiter will be used. """ - _registry = defaultdict(list) is_arbiter = True + def __init__(self, *args, **kwargs): + self._registry = defaultdict(list) + super().__init__(*args, **kwargs) + def find_actor(self, name): - return self._registry[name] + for uid, actor in self._registry.items(): + if name in uid: + print('found it!') + return actor - def register_actor(self, name, sockaddr): - self._registry[name].append(sockaddr) + def register_actor(self, uid, sockaddr): + self._registry[uid].append(sockaddr) - def unregister_actor(self, name): - self._registry.pop(name, None) + def unregister_actor(self, uid): + self._registry.pop(uid, None) async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): From e4ef973be9c218edad153b5933165bca88985e69 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Aug 2018 14:30:25 -0400 Subject: [PATCH 4/4] Add discovery testing Add a new test to verify actors register with their `.uid` tuple as per #7. Break off relevant "discovery" tests into a new test module. --- tests/conftest.py | 28 +++++++++++++ tests/test_discovery.py | 73 ++++++++++++++++++++++++++++++++++ tests/test_tractor.py | 87 +++++++---------------------------------- 3 files changed, 116 insertions(+), 72 deletions(-) create mode 100644 tests/test_discovery.py diff --git a/tests/conftest.py b/tests/conftest.py index 164dc8b..9822a65 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,16 @@ """ ``tractor`` testing!! """ +import random +from functools import partial, wraps + import pytest import tractor +_arb_addr = '127.0.0.1', random.randint(1000, 9999) + + def pytest_addoption(parser): parser.addoption("--ll", action="store", dest='loglevel', default=None, help="logging level to set when testing") @@ -16,3 +22,25 @@ def loglevel(request): level = tractor.log._default_loglevel = request.config.option.loglevel yield level tractor.log._default_loglevel = orig + + +@pytest.fixture(scope='session') +def arb_addr(): + return _arb_addr + + +def tractor_test(fn): + """ + Use: + + @tractor_test + async def test_whatever(): + await ... + """ + @wraps(fn) + def wrapper(*args, **kwargs): + # __tracebackhide__ = True + return tractor.run( + partial(fn, *args, **kwargs), arbiter_addr=_arb_addr) + + return wrapper diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 0000000..3e92df4 --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,73 @@ +""" +Actor "discovery" testing +""" +import tractor +import trio + +from conftest import tractor_test + + +@tractor_test +async def test_reg_then_unreg(arb_addr): + actor = tractor.current_actor() + assert actor.is_arbiter + assert len(actor._registry) == 1 # only self is registered + + async with tractor.open_nursery() as n: + portal = await n.start_actor('actor', rpc_module_paths=[__name__]) + uid = portal.channel.uid + + async with tractor.get_arbiter(*arb_addr) as aportal: + # local actor should be the arbiter + assert actor is aportal.actor + + # sub-actor uid should be in the registry + await trio.sleep(0.1) # registering is async, so.. + assert uid in aportal.actor._registry + sockaddrs = actor._registry[uid] + # XXX: can we figure out what the listen addr will be? + assert sockaddrs + + await n.cancel() # tear down nursery + + await trio.sleep(0.1) + assert uid not in aportal.actor._registry + sockaddrs = actor._registry[uid] + assert not sockaddrs + + +the_line = 'Hi my name is {}' + + +async def hi(): + return the_line.format(tractor.current_actor().name) + + +async def say_hello(other_actor): + await trio.sleep(0.4) # wait for other actor to spawn + async with tractor.find_actor(other_actor) as portal: + return await portal.run(__name__, 'hi') + + +@tractor_test +async def test_trynamic_trio(): + """Main tractor entry point, the "master" process (for now + acts as the "director"). + """ + async with tractor.open_nursery() as n: + print("Alright... Action!") + + donny = await n.run_in_actor( + 'donny', + say_hello, + other_actor='gretchen', + ) + gretchen = await n.run_in_actor( + 'gretchen', + say_hello, + other_actor='donny', + ) + print(await gretchen.result()) + print(await donny.result()) + await donny.cancel_actor() + print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 84cbe36..8fbd22c 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -2,33 +2,13 @@ Actor model API testing """ import time -from functools import partial, wraps from itertools import repeat -import random import pytest import trio import tractor - -_arb_addr = '127.0.0.1', random.randint(1000, 9999) - - -def tractor_test(fn): - """ - Use: - - @tractor_test - async def test_whatever(): - await ... - """ - @wraps(fn) - def wrapper(*args, **kwargs): - __tracebackhide__ = True - return tractor.run( - partial(fn, *args), arbiter_addr=_arb_addr, **kwargs) - - return wrapper +from conftest import tractor_test @pytest.mark.trio @@ -44,7 +24,7 @@ async def test_no_arbitter(): pass -def test_local_actor_async_func(): +def test_local_actor_async_func(arb_addr): """Verify a simple async function in-process. """ nums = [] @@ -58,7 +38,7 @@ def test_local_actor_async_func(): await trio.sleep(0.1) start = time.time() - tractor.run(print_loop, arbiter_addr=_arb_addr) + tractor.run(print_loop, arbiter_addr=arb_addr) # ensure the sleeps were actually awaited assert time.time() - start >= 1 @@ -99,13 +79,13 @@ async def spawn(is_arbiter): return 10 -def test_local_arbiter_subactor_global_state(): +def test_local_arbiter_subactor_global_state(arb_addr): result = tractor.run( spawn, True, name='arbiter', statespace=statespace, - arbiter_addr=_arb_addr, + arbiter_addr=arb_addr, ) assert result == 10 @@ -153,17 +133,17 @@ async def stream_from_single_subactor(): # await nursery.cancel() -def test_stream_from_single_subactor(): +def test_stream_from_single_subactor(arb_addr): """Verify streaming from a spawned async generator. """ - tractor.run(stream_from_single_subactor, arbiter_addr=_arb_addr) + tractor.run(stream_from_single_subactor, arbiter_addr=arb_addr) async def assert_err(): assert 0 -def test_remote_error(): +def test_remote_error(arb_addr): """Verify an error raises in a subactor is propagated to the parent. """ async def main(): @@ -183,7 +163,7 @@ def test_remote_error(): with pytest.raises(tractor.RemoteActorError): # also raises - tractor.run(main, arbiter_addr=_arb_addr) + tractor.run(main, arbiter_addr=arb_addr) async def stream_forever(): @@ -238,43 +218,6 @@ async def test_one_cancels_all(): pytest.fail("Should have gotten a remote assertion error?") -the_line = 'Hi my name is {}' - - -async def hi(): - return the_line.format(tractor.current_actor().name) - - -async def say_hello(other_actor): - await trio.sleep(0.4) # wait for other actor to spawn - async with tractor.find_actor(other_actor) as portal: - return await portal.run(__name__, 'hi') - - -@tractor_test -async def test_trynamic_trio(): - """Main tractor entry point, the "master" process (for now - acts as the "director"). - """ - async with tractor.open_nursery() as n: - print("Alright... Action!") - - donny = await n.run_in_actor( - 'donny', - say_hello, - other_actor='gretchen', - ) - gretchen = await n.run_in_actor( - 'gretchen', - say_hello, - other_actor='donny', - ) - print(await gretchen.result()) - print(await donny.result()) - await donny.cancel_actor() - print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") - - def movie_theatre_question(): """A question asked in a dark theatre, in a tangent (errr, I mean different) process. @@ -335,7 +278,7 @@ def do_nothing(): pass -def test_cancel_single_subactor(): +def test_cancel_single_subactor(arb_addr): async def main(): @@ -349,7 +292,7 @@ def test_cancel_single_subactor(): # would hang otherwise await nursery.cancel() - tractor.run(main, arbiter_addr=_arb_addr) + tractor.run(main, arbiter_addr=arb_addr) async def stream_data(seed): @@ -440,19 +383,19 @@ async def cancel_after(wait): return await a_quadruple_example() -def test_a_quadruple_example(): +def test_a_quadruple_example(arb_addr): """This also serves as a kind of "we'd like to eventually be this fast test". """ - results = tractor.run(cancel_after, 2.1, arbiter_addr=_arb_addr) + results = tractor.run(cancel_after, 2.1, arbiter_addr=arb_addr) assert results @pytest.mark.parametrize('cancel_delay', list(range(1, 7))) -def test_not_fast_enough_quad(cancel_delay): +def test_not_fast_enough_quad(arb_addr, cancel_delay): """Verify we can cancel midway through the quad example and all actors cancel gracefully. """ delay = 1 + cancel_delay/10 - results = tractor.run(cancel_after, delay, arbiter_addr=_arb_addr) + results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr) assert results is None