diff --git a/tests/conftest.py b/tests/conftest.py index 164dc8b..9822a65 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,16 @@ """ ``tractor`` testing!! """ +import random +from functools import partial, wraps + import pytest import tractor +_arb_addr = '127.0.0.1', random.randint(1000, 9999) + + def pytest_addoption(parser): parser.addoption("--ll", action="store", dest='loglevel', default=None, help="logging level to set when testing") @@ -16,3 +22,25 @@ def loglevel(request): level = tractor.log._default_loglevel = request.config.option.loglevel yield level tractor.log._default_loglevel = orig + + +@pytest.fixture(scope='session') +def arb_addr(): + return _arb_addr + + +def tractor_test(fn): + """ + Use: + + @tractor_test + async def test_whatever(): + await ... + """ + @wraps(fn) + def wrapper(*args, **kwargs): + # __tracebackhide__ = True + return tractor.run( + partial(fn, *args, **kwargs), arbiter_addr=_arb_addr) + + return wrapper diff --git a/tests/test_discovery.py b/tests/test_discovery.py new file mode 100644 index 0000000..3e92df4 --- /dev/null +++ b/tests/test_discovery.py @@ -0,0 +1,73 @@ +""" +Actor "discovery" testing +""" +import tractor +import trio + +from conftest import tractor_test + + +@tractor_test +async def test_reg_then_unreg(arb_addr): + actor = tractor.current_actor() + assert actor.is_arbiter + assert len(actor._registry) == 1 # only self is registered + + async with tractor.open_nursery() as n: + portal = await n.start_actor('actor', rpc_module_paths=[__name__]) + uid = portal.channel.uid + + async with tractor.get_arbiter(*arb_addr) as aportal: + # local actor should be the arbiter + assert actor is aportal.actor + + # sub-actor uid should be in the registry + await trio.sleep(0.1) # registering is async, so.. + assert uid in aportal.actor._registry + sockaddrs = actor._registry[uid] + # XXX: can we figure out what the listen addr will be? + assert sockaddrs + + await n.cancel() # tear down nursery + + await trio.sleep(0.1) + assert uid not in aportal.actor._registry + sockaddrs = actor._registry[uid] + assert not sockaddrs + + +the_line = 'Hi my name is {}' + + +async def hi(): + return the_line.format(tractor.current_actor().name) + + +async def say_hello(other_actor): + await trio.sleep(0.4) # wait for other actor to spawn + async with tractor.find_actor(other_actor) as portal: + return await portal.run(__name__, 'hi') + + +@tractor_test +async def test_trynamic_trio(): + """Main tractor entry point, the "master" process (for now + acts as the "director"). + """ + async with tractor.open_nursery() as n: + print("Alright... Action!") + + donny = await n.run_in_actor( + 'donny', + say_hello, + other_actor='gretchen', + ) + gretchen = await n.run_in_actor( + 'gretchen', + say_hello, + other_actor='donny', + ) + print(await gretchen.result()) + print(await donny.result()) + await donny.cancel_actor() + print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 84cbe36..8fbd22c 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -2,33 +2,13 @@ Actor model API testing """ import time -from functools import partial, wraps from itertools import repeat -import random import pytest import trio import tractor - -_arb_addr = '127.0.0.1', random.randint(1000, 9999) - - -def tractor_test(fn): - """ - Use: - - @tractor_test - async def test_whatever(): - await ... - """ - @wraps(fn) - def wrapper(*args, **kwargs): - __tracebackhide__ = True - return tractor.run( - partial(fn, *args), arbiter_addr=_arb_addr, **kwargs) - - return wrapper +from conftest import tractor_test @pytest.mark.trio @@ -44,7 +24,7 @@ async def test_no_arbitter(): pass -def test_local_actor_async_func(): +def test_local_actor_async_func(arb_addr): """Verify a simple async function in-process. """ nums = [] @@ -58,7 +38,7 @@ def test_local_actor_async_func(): await trio.sleep(0.1) start = time.time() - tractor.run(print_loop, arbiter_addr=_arb_addr) + tractor.run(print_loop, arbiter_addr=arb_addr) # ensure the sleeps were actually awaited assert time.time() - start >= 1 @@ -99,13 +79,13 @@ async def spawn(is_arbiter): return 10 -def test_local_arbiter_subactor_global_state(): +def test_local_arbiter_subactor_global_state(arb_addr): result = tractor.run( spawn, True, name='arbiter', statespace=statespace, - arbiter_addr=_arb_addr, + arbiter_addr=arb_addr, ) assert result == 10 @@ -153,17 +133,17 @@ async def stream_from_single_subactor(): # await nursery.cancel() -def test_stream_from_single_subactor(): +def test_stream_from_single_subactor(arb_addr): """Verify streaming from a spawned async generator. """ - tractor.run(stream_from_single_subactor, arbiter_addr=_arb_addr) + tractor.run(stream_from_single_subactor, arbiter_addr=arb_addr) async def assert_err(): assert 0 -def test_remote_error(): +def test_remote_error(arb_addr): """Verify an error raises in a subactor is propagated to the parent. """ async def main(): @@ -183,7 +163,7 @@ def test_remote_error(): with pytest.raises(tractor.RemoteActorError): # also raises - tractor.run(main, arbiter_addr=_arb_addr) + tractor.run(main, arbiter_addr=arb_addr) async def stream_forever(): @@ -238,43 +218,6 @@ async def test_one_cancels_all(): pytest.fail("Should have gotten a remote assertion error?") -the_line = 'Hi my name is {}' - - -async def hi(): - return the_line.format(tractor.current_actor().name) - - -async def say_hello(other_actor): - await trio.sleep(0.4) # wait for other actor to spawn - async with tractor.find_actor(other_actor) as portal: - return await portal.run(__name__, 'hi') - - -@tractor_test -async def test_trynamic_trio(): - """Main tractor entry point, the "master" process (for now - acts as the "director"). - """ - async with tractor.open_nursery() as n: - print("Alright... Action!") - - donny = await n.run_in_actor( - 'donny', - say_hello, - other_actor='gretchen', - ) - gretchen = await n.run_in_actor( - 'gretchen', - say_hello, - other_actor='donny', - ) - print(await gretchen.result()) - print(await donny.result()) - await donny.cancel_actor() - print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") - - def movie_theatre_question(): """A question asked in a dark theatre, in a tangent (errr, I mean different) process. @@ -335,7 +278,7 @@ def do_nothing(): pass -def test_cancel_single_subactor(): +def test_cancel_single_subactor(arb_addr): async def main(): @@ -349,7 +292,7 @@ def test_cancel_single_subactor(): # would hang otherwise await nursery.cancel() - tractor.run(main, arbiter_addr=_arb_addr) + tractor.run(main, arbiter_addr=arb_addr) async def stream_data(seed): @@ -440,19 +383,19 @@ async def cancel_after(wait): return await a_quadruple_example() -def test_a_quadruple_example(): +def test_a_quadruple_example(arb_addr): """This also serves as a kind of "we'd like to eventually be this fast test". """ - results = tractor.run(cancel_after, 2.1, arbiter_addr=_arb_addr) + results = tractor.run(cancel_after, 2.1, arbiter_addr=arb_addr) assert results @pytest.mark.parametrize('cancel_delay', list(range(1, 7))) -def test_not_fast_enough_quad(cancel_delay): +def test_not_fast_enough_quad(arb_addr, cancel_delay): """Verify we can cancel midway through the quad example and all actors cancel gracefully. """ delay = 1 + cancel_delay/10 - results = tractor.run(cancel_after, delay, arbiter_addr=_arb_addr) + results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr) assert results is None diff --git a/tractor/_actor.py b/tractor/_actor.py index fe38929..9dfe73d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -189,8 +189,7 @@ class Actor: self, stream: trio.SocketStream, ): - """ - Entry point for new inbound connections to the channel server. + """Entry point for new inbound connections to the channel server. """ self._no_more_peers.clear() chan = Channel(stream=stream) @@ -279,7 +278,7 @@ class Actor: # worked out we'll likely want to use that! log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - async for msg in chan.aiter_recv(): + async for msg in chan: if msg is None: # terminate sentinel log.debug( f"Cancelling all tasks for {chan} from {chan.uid}") @@ -435,7 +434,7 @@ class Actor: async with get_arbiter(*arbiter_addr) as arb_portal: await arb_portal.run( 'self', 'register_actor', - name=self.name, sockaddr=self.accept_addr) + uid=self.uid, sockaddr=self.accept_addr) registered_with_arbiter = True task_status.started() @@ -456,7 +455,7 @@ class Actor: try: await self._parent_chan.send( {'error': traceback.format_exc(), 'cid': 'internal'}) - except trio.ClosedStreamError: + except trio.ClosedResourceError: log.error( f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") @@ -523,7 +522,7 @@ class Actor: if arbiter_addr is not None: async with get_arbiter(*arbiter_addr) as arb_portal: await arb_portal.run( - 'self', 'unregister_actor', name=self.name) + 'self', 'unregister_actor', uid=self.uid) except OSError: log.warn(f"Unable to unregister {self.name} from arbiter") @@ -581,24 +580,30 @@ class Actor: class Arbiter(Actor): """A special actor who knows all the other actors and always has - access to the top level nursery. + access to a top level nursery. The arbiter is by default the first actor spawned on each host and is responsible for keeping track of all other actors for coordination purposes. If a new main process is launched and an arbiter is already running that arbiter will be used. """ - _registry = defaultdict(list) is_arbiter = True + def __init__(self, *args, **kwargs): + self._registry = defaultdict(list) + super().__init__(*args, **kwargs) + def find_actor(self, name): - return self._registry[name] + for uid, actor in self._registry.items(): + if name in uid: + print('found it!') + return actor - def register_actor(self, name, sockaddr): - self._registry[name].append(sockaddr) + def register_actor(self, uid, sockaddr): + self._registry[uid].append(sockaddr) - def unregister_actor(self, name): - self._registry.pop(name, None) + def unregister_actor(self, uid): + self._registry.pop(uid, None) async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): diff --git a/tractor/_ipc.py b/tractor/_ipc.py index c8a811b..2d86e56 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -87,6 +87,7 @@ class Channel: self._destaddr = destaddr or self.squeue.raddr # set after handshake - always uid of far end self.uid = None + self._agen = self._aiter_recv() def __repr__(self): if self.squeue: @@ -134,8 +135,8 @@ class Channel: async def __aexit__(self, *args): await self.aclose(*args) - async def __aiter__(self): - return self.aiter_recv() + def __aiter__(self): + return self._agen async def _reconnect(self): """Handle connection failures by polling until a reconnect can be @@ -166,7 +167,7 @@ class Channel: " for re-establishment") await trio.sleep(1) - async def aiter_recv(self): + async def _aiter_recv(self): """Async iterate items from underlying stream. """ while True: