Merge pull request #28 from tgoodlet/reg_with_uid

Reg with uid and drop aiter_recv()
attrs_it_up
goodboy 2018-08-07 15:07:34 -04:00 committed by GitHub
commit a51fbbcf9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 138 additions and 88 deletions

View File

@ -1,10 +1,16 @@
""" """
``tractor`` testing!! ``tractor`` testing!!
""" """
import random
from functools import partial, wraps
import pytest import pytest
import tractor import tractor
_arb_addr = '127.0.0.1', random.randint(1000, 9999)
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption("--ll", action="store", dest='loglevel', parser.addoption("--ll", action="store", dest='loglevel',
default=None, help="logging level to set when testing") default=None, help="logging level to set when testing")
@ -16,3 +22,25 @@ def loglevel(request):
level = tractor.log._default_loglevel = request.config.option.loglevel level = tractor.log._default_loglevel = request.config.option.loglevel
yield level yield level
tractor.log._default_loglevel = orig tractor.log._default_loglevel = orig
@pytest.fixture(scope='session')
def arb_addr():
return _arb_addr
def tractor_test(fn):
"""
Use:
@tractor_test
async def test_whatever():
await ...
"""
@wraps(fn)
def wrapper(*args, **kwargs):
# __tracebackhide__ = True
return tractor.run(
partial(fn, *args, **kwargs), arbiter_addr=_arb_addr)
return wrapper

View File

@ -0,0 +1,73 @@
"""
Actor "discovery" testing
"""
import tractor
import trio
from conftest import tractor_test
@tractor_test
async def test_reg_then_unreg(arb_addr):
actor = tractor.current_actor()
assert actor.is_arbiter
assert len(actor._registry) == 1 # only self is registered
async with tractor.open_nursery() as n:
portal = await n.start_actor('actor', rpc_module_paths=[__name__])
uid = portal.channel.uid
async with tractor.get_arbiter(*arb_addr) as aportal:
# local actor should be the arbiter
assert actor is aportal.actor
# sub-actor uid should be in the registry
await trio.sleep(0.1) # registering is async, so..
assert uid in aportal.actor._registry
sockaddrs = actor._registry[uid]
# XXX: can we figure out what the listen addr will be?
assert sockaddrs
await n.cancel() # tear down nursery
await trio.sleep(0.1)
assert uid not in aportal.actor._registry
sockaddrs = actor._registry[uid]
assert not sockaddrs
the_line = 'Hi my name is {}'
async def hi():
return the_line.format(tractor.current_actor().name)
async def say_hello(other_actor):
await trio.sleep(0.4) # wait for other actor to spawn
async with tractor.find_actor(other_actor) as portal:
return await portal.run(__name__, 'hi')
@tractor_test
async def test_trynamic_trio():
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.run_in_actor(
'donny',
say_hello,
other_actor='gretchen',
)
gretchen = await n.run_in_actor(
'gretchen',
say_hello,
other_actor='donny',
)
print(await gretchen.result())
print(await donny.result())
await donny.cancel_actor()
print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...")

View File

@ -2,33 +2,13 @@
Actor model API testing Actor model API testing
""" """
import time import time
from functools import partial, wraps
from itertools import repeat from itertools import repeat
import random
import pytest import pytest
import trio import trio
import tractor import tractor
from conftest import tractor_test
_arb_addr = '127.0.0.1', random.randint(1000, 9999)
def tractor_test(fn):
"""
Use:
@tractor_test
async def test_whatever():
await ...
"""
@wraps(fn)
def wrapper(*args, **kwargs):
__tracebackhide__ = True
return tractor.run(
partial(fn, *args), arbiter_addr=_arb_addr, **kwargs)
return wrapper
@pytest.mark.trio @pytest.mark.trio
@ -44,7 +24,7 @@ async def test_no_arbitter():
pass pass
def test_local_actor_async_func(): def test_local_actor_async_func(arb_addr):
"""Verify a simple async function in-process. """Verify a simple async function in-process.
""" """
nums = [] nums = []
@ -58,7 +38,7 @@ def test_local_actor_async_func():
await trio.sleep(0.1) await trio.sleep(0.1)
start = time.time() start = time.time()
tractor.run(print_loop, arbiter_addr=_arb_addr) tractor.run(print_loop, arbiter_addr=arb_addr)
# ensure the sleeps were actually awaited # ensure the sleeps were actually awaited
assert time.time() - start >= 1 assert time.time() - start >= 1
@ -99,13 +79,13 @@ async def spawn(is_arbiter):
return 10 return 10
def test_local_arbiter_subactor_global_state(): def test_local_arbiter_subactor_global_state(arb_addr):
result = tractor.run( result = tractor.run(
spawn, spawn,
True, True,
name='arbiter', name='arbiter',
statespace=statespace, statespace=statespace,
arbiter_addr=_arb_addr, arbiter_addr=arb_addr,
) )
assert result == 10 assert result == 10
@ -153,17 +133,17 @@ async def stream_from_single_subactor():
# await nursery.cancel() # await nursery.cancel()
def test_stream_from_single_subactor(): def test_stream_from_single_subactor(arb_addr):
"""Verify streaming from a spawned async generator. """Verify streaming from a spawned async generator.
""" """
tractor.run(stream_from_single_subactor, arbiter_addr=_arb_addr) tractor.run(stream_from_single_subactor, arbiter_addr=arb_addr)
async def assert_err(): async def assert_err():
assert 0 assert 0
def test_remote_error(): def test_remote_error(arb_addr):
"""Verify an error raises in a subactor is propagated to the parent. """Verify an error raises in a subactor is propagated to the parent.
""" """
async def main(): async def main():
@ -183,7 +163,7 @@ def test_remote_error():
with pytest.raises(tractor.RemoteActorError): with pytest.raises(tractor.RemoteActorError):
# also raises # also raises
tractor.run(main, arbiter_addr=_arb_addr) tractor.run(main, arbiter_addr=arb_addr)
async def stream_forever(): async def stream_forever():
@ -238,43 +218,6 @@ async def test_one_cancels_all():
pytest.fail("Should have gotten a remote assertion error?") pytest.fail("Should have gotten a remote assertion error?")
the_line = 'Hi my name is {}'
async def hi():
return the_line.format(tractor.current_actor().name)
async def say_hello(other_actor):
await trio.sleep(0.4) # wait for other actor to spawn
async with tractor.find_actor(other_actor) as portal:
return await portal.run(__name__, 'hi')
@tractor_test
async def test_trynamic_trio():
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.run_in_actor(
'donny',
say_hello,
other_actor='gretchen',
)
gretchen = await n.run_in_actor(
'gretchen',
say_hello,
other_actor='donny',
)
print(await gretchen.result())
print(await donny.result())
await donny.cancel_actor()
print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...")
def movie_theatre_question(): def movie_theatre_question():
"""A question asked in a dark theatre, in a tangent """A question asked in a dark theatre, in a tangent
(errr, I mean different) process. (errr, I mean different) process.
@ -335,7 +278,7 @@ def do_nothing():
pass pass
def test_cancel_single_subactor(): def test_cancel_single_subactor(arb_addr):
async def main(): async def main():
@ -349,7 +292,7 @@ def test_cancel_single_subactor():
# would hang otherwise # would hang otherwise
await nursery.cancel() await nursery.cancel()
tractor.run(main, arbiter_addr=_arb_addr) tractor.run(main, arbiter_addr=arb_addr)
async def stream_data(seed): async def stream_data(seed):
@ -440,19 +383,19 @@ async def cancel_after(wait):
return await a_quadruple_example() return await a_quadruple_example()
def test_a_quadruple_example(): def test_a_quadruple_example(arb_addr):
"""This also serves as a kind of "we'd like to eventually be this """This also serves as a kind of "we'd like to eventually be this
fast test". fast test".
""" """
results = tractor.run(cancel_after, 2.1, arbiter_addr=_arb_addr) results = tractor.run(cancel_after, 2.1, arbiter_addr=arb_addr)
assert results assert results
@pytest.mark.parametrize('cancel_delay', list(range(1, 7))) @pytest.mark.parametrize('cancel_delay', list(range(1, 7)))
def test_not_fast_enough_quad(cancel_delay): def test_not_fast_enough_quad(arb_addr, cancel_delay):
"""Verify we can cancel midway through the quad example and all actors """Verify we can cancel midway through the quad example and all actors
cancel gracefully. cancel gracefully.
""" """
delay = 1 + cancel_delay/10 delay = 1 + cancel_delay/10
results = tractor.run(cancel_after, delay, arbiter_addr=_arb_addr) results = tractor.run(cancel_after, delay, arbiter_addr=arb_addr)
assert results is None assert results is None

View File

@ -189,8 +189,7 @@ class Actor:
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
): ):
""" """Entry point for new inbound connections to the channel server.
Entry point for new inbound connections to the channel server.
""" """
self._no_more_peers.clear() self._no_more_peers.clear()
chan = Channel(stream=stream) chan = Channel(stream=stream)
@ -279,7 +278,7 @@ class Actor:
# worked out we'll likely want to use that! # worked out we'll likely want to use that!
log.debug(f"Entering msg loop for {chan} from {chan.uid}") log.debug(f"Entering msg loop for {chan} from {chan.uid}")
try: try:
async for msg in chan.aiter_recv(): async for msg in chan:
if msg is None: # terminate sentinel if msg is None: # terminate sentinel
log.debug( log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}") f"Cancelling all tasks for {chan} from {chan.uid}")
@ -435,7 +434,7 @@ class Actor:
async with get_arbiter(*arbiter_addr) as arb_portal: async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run( await arb_portal.run(
'self', 'register_actor', 'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr) uid=self.uid, sockaddr=self.accept_addr)
registered_with_arbiter = True registered_with_arbiter = True
task_status.started() task_status.started()
@ -456,7 +455,7 @@ class Actor:
try: try:
await self._parent_chan.send( await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'internal'}) {'error': traceback.format_exc(), 'cid': 'internal'})
except trio.ClosedStreamError: except trio.ClosedResourceError:
log.error( log.error(
f"Failed to ship error to parent " f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed") f"{self._parent_chan.uid}, channel was closed")
@ -523,7 +522,7 @@ class Actor:
if arbiter_addr is not None: if arbiter_addr is not None:
async with get_arbiter(*arbiter_addr) as arb_portal: async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run( await arb_portal.run(
'self', 'unregister_actor', name=self.name) 'self', 'unregister_actor', uid=self.uid)
except OSError: except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter") log.warn(f"Unable to unregister {self.name} from arbiter")
@ -581,24 +580,30 @@ class Actor:
class Arbiter(Actor): class Arbiter(Actor):
"""A special actor who knows all the other actors and always has """A special actor who knows all the other actors and always has
access to the top level nursery. access to a top level nursery.
The arbiter is by default the first actor spawned on each host The arbiter is by default the first actor spawned on each host
and is responsible for keeping track of all other actors for and is responsible for keeping track of all other actors for
coordination purposes. If a new main process is launched and an coordination purposes. If a new main process is launched and an
arbiter is already running that arbiter will be used. arbiter is already running that arbiter will be used.
""" """
_registry = defaultdict(list)
is_arbiter = True is_arbiter = True
def __init__(self, *args, **kwargs):
self._registry = defaultdict(list)
super().__init__(*args, **kwargs)
def find_actor(self, name): def find_actor(self, name):
return self._registry[name] for uid, actor in self._registry.items():
if name in uid:
print('found it!')
return actor
def register_actor(self, name, sockaddr): def register_actor(self, uid, sockaddr):
self._registry[name].append(sockaddr) self._registry[uid].append(sockaddr)
def unregister_actor(self, name): def unregister_actor(self, uid):
self._registry.pop(name, None) self._registry.pop(uid, None)
async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None): async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):

View File

@ -87,6 +87,7 @@ class Channel:
self._destaddr = destaddr or self.squeue.raddr self._destaddr = destaddr or self.squeue.raddr
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid = None self.uid = None
self._agen = self._aiter_recv()
def __repr__(self): def __repr__(self):
if self.squeue: if self.squeue:
@ -134,8 +135,8 @@ class Channel:
async def __aexit__(self, *args): async def __aexit__(self, *args):
await self.aclose(*args) await self.aclose(*args)
async def __aiter__(self): def __aiter__(self):
return self.aiter_recv() return self._agen
async def _reconnect(self): async def _reconnect(self):
"""Handle connection failures by polling until a reconnect can be """Handle connection failures by polling until a reconnect can be
@ -166,7 +167,7 @@ class Channel:
" for re-establishment") " for re-establishment")
await trio.sleep(1) await trio.sleep(1)
async def aiter_recv(self): async def _aiter_recv(self):
"""Async iterate items from underlying stream. """Async iterate items from underlying stream.
""" """
while True: while True: