diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e569571 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,13 @@ +language: python +python: + - '3.6' + # setup.py reading README breaks this? + # - pypy + # - nightly + +install: + - cd $TRAVIS_BUILD_DIR + - pip install . -r requirements-test.txt + +script: + - pytest tests/ diff --git a/requirements-test.txt b/requirements-test.txt index be31bc5..285c77f 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,2 +1,3 @@ pytest +pytest-trio pdbpp diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..e7b63da --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,18 @@ +""" +``tractor`` testing!! +""" +import pytest +import tractor + + +def pytest_addoption(parser): + parser.addoption("--ll", action="store", dest='loglevel', + default=None, help="logging level to set when testing") + + +@pytest.fixture(scope='session', autouse=True) +def loglevel(request): + orig = tractor._default_loglevel + level = tractor._default_loglevel = request.config.option.loglevel + yield level + tractor._default_loglevel = orig diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 410144e..0a416ef 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -2,16 +2,33 @@ Actor model API testing """ import time -from functools import partial +from functools import partial, wraps +from itertools import repeat +import random import pytest import trio import tractor -@pytest.fixture -def us_symbols(): - return ['TSLA', 'AAPL', 'CGC', 'CRON'] +_arb_addr = '127.0.0.1', random.randint(1000, 9999) + + +def tractor_test(fn): + """ + Use: + + @tractor_test + async def test_whatever(): + await ... + """ + @wraps(fn) + def wrapper(*args, **kwargs): + __tracebackhide__ = True + return tractor.run( + partial(fn, *args), arbiter_addr=_arb_addr, **kwargs) + + return wrapper @pytest.mark.trio @@ -41,18 +58,20 @@ def test_local_actor_async_func(): await trio.sleep(0.1) start = time.time() - tractor.run(print_loop) + tractor.run(print_loop, arbiter_addr=_arb_addr) # ensure the sleeps were actually awaited assert time.time() - start >= 1 assert nums == list(range(10)) +statespace = {'doggy': 10, 'kitty': 4} + + # NOTE: this func must be defined at module level in order for the # interal pickling infra of the forkserver to work async def spawn(is_arbiter): - statespace = {'doggy': 10, 'kitty': 4} - namespaces = ['piker.brokers.core'] + namespaces = [__name__] await trio.sleep(0.1) actor = tractor.current_actor() @@ -72,22 +91,33 @@ async def spawn(is_arbiter): ) assert len(nursery._children) == 1 assert portal.channel.uid in tractor.current_actor()._peers + # be sure we can still get the result + result = await portal.result() + assert result == 10 + return result else: return 10 def test_local_arbiter_subactor_global_state(): - statespace = {'doggy': 10, 'kitty': 4} - tractor.run( + result = tractor.run( spawn, True, name='arbiter', statespace=statespace, + arbiter_addr=_arb_addr, ) + assert result == 10 -async def rx_price_quotes_from_brokerd(us_symbols): - """Verify we can spawn a daemon actor and retrieve streamed price data. +async def stream_seq(sequence): + for i in sequence: + yield i + await trio.sleep(0.1) + + +async def stream_from_single_subactor(): + """Verify we can spawn a daemon actor and retrieve streamed data. """ async with tractor.find_actor('brokerd') as portals: if not portals: @@ -95,49 +125,341 @@ async def rx_price_quotes_from_brokerd(us_symbols): async with tractor.open_nursery() as nursery: # no brokerd actor found portal = await nursery.start_actor( - 'brokerd', - rpc_module_paths=['piker.brokers.core'], - statespace={ - 'brokers2tickersubs': {}, - 'clients': {}, - 'dtasks': set() - }, - main=None, # don't start a main func - use rpc + 'streamerd', + rpc_module_paths=[__name__], + statespace={'global_dict': {}}, + # don't start a main func - use rpc + # currently the same as outlive_main=False + main=None, ) - # gotta expose in a broker agnostic way... - # retrieve initial symbol data - # sd = await portal.run( - # 'piker.brokers.core', 'symbol_data', symbols=us_symbols) - # assert list(sd.keys()) == us_symbols + seq = range(10) - gen = await portal.run( - 'piker.brokers.core', - '_test_price_stream', - broker='robinhood', - symbols=us_symbols, + agen = await portal.run( + __name__, + 'stream_seq', # the func above + sequence=list(seq), # has to be msgpack serializable ) # it'd sure be nice to have an asyncitertools here... - async for quotes in gen: - assert quotes - for key in quotes: - assert key in us_symbols - break + iseq = iter(seq) + async for val in agen: + assert val == next(iseq) + # TODO: test breaking the loop (should it kill the + # far end?) + # break # terminate far-end async-gen # await gen.asend(None) # break # stop all spawned subactors - await nursery.cancel() - - # arbitter is cancelled here due to `find_actors()` internals - # (which internally uses `get_arbiter` which kills its channel - # server scope on exit) + await portal.cancel_actor() + # await nursery.cancel() -def test_rx_price_quotes_from_brokerd(us_symbols): - tractor.run( - rx_price_quotes_from_brokerd, - us_symbols, - name='arbiter', - ) +def test_stream_from_single_subactor(): + """Verify streaming from a spawned async generator. + """ + tractor.run(stream_from_single_subactor, arbiter_addr=_arb_addr) + + +async def assert_err(): + assert 0 + + +def test_remote_error(): + """Verify an error raises in a subactor is propagated to the parent. + """ + async def main(): + async with tractor.open_nursery() as nursery: + + portal = await nursery.start_actor('errorer', main=assert_err) + + # get result(s) from main task + try: + return await portal.result() + except tractor.RemoteActorError: + print("Look Maa that actor failed hard, hehh") + raise + except Exception: + pass + assert 0, "Remote error was not raised?" + + with pytest.raises(tractor.RemoteActorError): + # also raises + tractor.run(main, arbiter_addr=_arb_addr) + + +async def stream_forever(): + for i in repeat("I can see these little future bubble things"): + yield i + await trio.sleep(0.01) + + +@tractor_test +async def test_cancel_infinite_streamer(): + + # stream for at most 5 seconds + with trio.move_on_after(1) as cancel_scope: + async with tractor.open_nursery() as n: + portal = await n.start_actor( + f'donny', + rpc_module_paths=[__name__], + outlive_main=True + ) + async for letter in await portal.run(__name__, 'stream_forever'): + print(letter) + + assert cancel_scope.cancelled_caught + assert n.cancelled + + +@tractor_test +async def test_one_cancels_all(): + """Verify one failed actor causes all others in the nursery + to be cancelled just like in trio. + + This is the first and only supervisory strategy at the moment. + """ + try: + async with tractor.open_nursery() as n: + real_actors = [] + for i in range(3): + real_actors.append(await n.start_actor( + f'actor_{i}', + rpc_module_paths=[__name__], + outlive_main=True + )) + + # start one actor that will fail immediately + await n.start_actor('extra', main=assert_err) + + # should error here with a ``RemoteActorError`` containing + # an ``AssertionError` + + except tractor.RemoteActorError: + assert n.cancelled is True + else: + pytest.fail("Should have gotten a remote assertion error?") + + +the_line = 'Hi my name is {}' + + +async def hi(): + return the_line.format(tractor.current_actor().name) + + +async def say_hello(other_actor): + await trio.sleep(0.4) # wait for other actor to spawn + async with tractor.find_actor(other_actor) as portal: + return await portal.run(__name__, 'hi') + + +@tractor_test +async def test_trynamic_trio(): + """Main tractor entry point, the "master" process (for now + acts as the "director"). + """ + async with tractor.open_nursery() as n: + print("Alright... Action!") + + donny = await n.start_actor( + 'donny', + main=partial(say_hello, 'gretchen'), + rpc_module_paths=[__name__], + outlive_main=True + ) + gretchen = await n.start_actor( + 'gretchen', + main=partial(say_hello, 'donny'), + rpc_module_paths=[__name__], + ) + print(await gretchen.result()) + print(await donny.result()) + await donny.cancel_actor() + print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...") + + +def movie_theatre_question(): + """A question asked in a dark theatre, in a tangent + (errr, I mean different) process. + """ + return 'have you ever seen a portal?' + + +@tractor_test +async def test_movie_theatre_convo(): + """The main ``tractor`` routine. + """ + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'frank', + # enable the actor to run funcs from this current module + rpc_module_paths=[__name__], + outlive_main=True, + ) + + print(await portal.run(__name__, 'movie_theatre_question')) + # calls the subactor a 2nd time + print(await portal.run(__name__, 'movie_theatre_question')) + + # the async with will block here indefinitely waiting + # for our actor "frank" to complete, but since it's an + # "outlive_main" actor it will never end until cancelled + await portal.cancel_actor() + + +@tractor_test +async def test_movie_theatre_convo_main_task(): + async with tractor.open_nursery() as n: + portal = await n.start_actor('some_linguist', main=cellar_door) + + # The ``async with`` will unblock here since the 'some_linguist' + # actor has completed its main task ``cellar_door``. + + print(await portal.result()) + + +def cellar_door(): + return "Dang that's beautiful" + + +@tractor_test +async def test_most_beautiful_word(): + """The main ``tractor`` routine. + """ + async with tractor.open_nursery() as n: + portal = await n.start_actor('some_linguist', main=cellar_door) + + # The ``async with`` will unblock here since the 'some_linguist' + # actor has completed its main task ``cellar_door``. + + print(await portal.result()) + + +def do_nothing(): + pass + + +def test_cancel_single_subactor(): + + async def main(): + + async with tractor.open_nursery() as nursery: + + portal = await nursery.start_actor( + 'nothin', rpc_module_paths=[__name__], + ) + assert (await portal.run(__name__, 'do_nothing')) is None + + # would hang otherwise + await nursery.cancel() + + tractor.run(main, arbiter_addr=_arb_addr) + + +async def stream_data(seed): + for i in range(seed): + yield i + await trio.sleep(0) # trigger scheduler + + +async def aggregate(seed): + """Ensure that the two streams we receive match but only stream + a single set of values to the parent. + """ + async with tractor.open_nursery() as nursery: + portals = [] + for i in range(1, 3): + # fork point + portal = await nursery.start_actor( + name=f'streamer_{i}', + rpc_module_paths=[__name__], + outlive_main=True, # daemonize these actors + ) + + portals.append(portal) + + q = trio.Queue(500) + + async def push_to_q(portal): + async for value in await portal.run( + __name__, 'stream_data', seed=seed + ): + # leverage trio's built-in backpressure + await q.put(value) + + await q.put(None) + print(f"FINISHED ITERATING {portal.channel.uid}") + + # spawn 2 trio tasks to collect streams and push to a local queue + async with trio.open_nursery() as n: + for portal in portals: + n.start_soon(push_to_q, portal) + + unique_vals = set() + async for value in q: + if value not in unique_vals: + unique_vals.add(value) + # yield upwards to the spawning parent actor + yield value + + if value is None: + break + + assert value in unique_vals + + print("FINISHED ITERATING in aggregator") + + await nursery.cancel() + print("WAITING on `ActorNursery` to finish") + print("AGGREGATOR COMPLETE!") + + +# @tractor_test +async def a_quadruple_example(): + # a nursery which spawns "actors" + async with tractor.open_nursery() as nursery: + + seed = int(1e3) + pre_start = time.time() + + portal = await nursery.start_actor( + name='aggregator', + # executed in the actor's "main task" immediately + main=partial(aggregate, seed), + ) + + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "main" function was called locally + result_stream = [] + async for value in await portal.result(): + result_stream.append(value) + + print(f"STREAM TIME = {time.time() - start}") + print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") + assert result_stream == list(range(seed)) + [None] + return result_stream + + +async def cancel_after(wait): + with trio.move_on_after(wait): + return await a_quadruple_example() + + +def test_a_quadruple_example(): + """Verify the *show me the code* readme example works. + """ + results = tractor.run(cancel_after, 2, arbiter_addr=_arb_addr) + assert results + + +def test_not_fast_enough_quad(): + """Verify we can cancel midway through the quad example and all actors + cancel gracefully. + + This also serves as a kind of "we'd like to eventually be this fast test". + """ + results = tractor.run(cancel_after, 1, arbiter_addr=_arb_addr) + assert results is None diff --git a/tractor/__init__.py b/tractor/__init__.py index cb7e121..938a404 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -1,5 +1,6 @@ """ -tracor: An actor model micro-framework. +tractor: An actor model micro-framework built on + ``trio`` and ``multiprocessing``. """ from collections import defaultdict from functools import partial @@ -13,7 +14,7 @@ import uuid import trio from async_generator import asynccontextmanager -from .ipc import Channel +from .ipc import Channel, _connect_chan from .log import get_console_log, get_logger ctx = mp.get_context("forkserver") @@ -23,6 +24,11 @@ log = get_logger('tractor') _current_actor = None _default_arbiter_host = '127.0.0.1' _default_arbiter_port = 1616 +_default_loglevel = None + + +def get_loglevel(): + return _default_loglevel class ActorFailure(Exception): @@ -55,10 +61,17 @@ async def _invoke( """ try: is_async_partial = False + is_async_gen_partial = False if isinstance(func, partial): is_async_partial = inspect.iscoroutinefunction(func.func) + is_async_gen_partial = inspect.isasyncgenfunction(func.func) - if not inspect.iscoroutinefunction(func) and not is_async_partial: + if ( + not inspect.iscoroutinefunction(func) and + not inspect.isasyncgenfunction(func) and + not is_async_partial and + not is_async_gen_partial + ): await chan.send({'return': func(**kwargs), 'cid': cid}) else: coro = func(**kwargs) @@ -73,6 +86,12 @@ async def _invoke( # if to_send is not None: # to_yield = await coro.asend(to_send) await chan.send({'yield': item, 'cid': cid}) + + log.debug(f"Finished iterating {coro}") + # TODO: we should really support a proper + # `StopAsyncIteration` system here for returning a final + # value if desired + await chan.send({'stop': None, 'cid': cid}) else: if treat_as_gen: # XXX: the async-func may spawn further tasks which push @@ -87,11 +106,12 @@ async def _invoke( except Exception: if not raise_errs: await chan.send({'error': traceback.format_exc(), 'cid': cid}) + log.exception("Actor errored:") else: raise -async def result_from_q(q): +async def result_from_q(q, chan): """Process a msg from a remote actor. """ first_msg = await q.get() @@ -100,7 +120,7 @@ async def result_from_q(q): elif 'yield' in first_msg: return 'yield', first_msg, q elif 'error' in first_msg: - raise RemoteActorError(first_msg['error']) + raise RemoteActorError(f"{chan.uid}\n" + first_msg['error']) else: raise ValueError(f"{first_msg} is an invalid response packet?") @@ -136,6 +156,8 @@ class Actor: uid: str = None, allow_rpc: bool = True, outlive_main: bool = False, + loglevel: str = None, + arbiter_addr: (str, int) = None, ): self.name = name self.uid = (name, uid or str(uuid.uuid1())) @@ -147,10 +169,15 @@ class Actor: self.statespace = statespace self._allow_rpc = allow_rpc self._outlive_main = outlive_main + self.loglevel = loglevel + self._arb_addr = arbiter_addr # filled in by `_async_main` after fork self._peers = defaultdict(list) + self._peer_connected = {} self._no_more_peers = trio.Event() + self._main_complete = trio.Event() + self._main_scope = None self._no_more_peers.set() self._actors2calls = {} # map {uids -> {callids -> waiter queues}} self._listeners = [] @@ -162,7 +189,7 @@ class Actor: ``uid``. """ log.debug(f"Waiting for peer {uid} to connect") - event = self._peers.setdefault(uid, trio.Event()) + event = self._peer_connected.setdefault(uid, trio.Event()) await event.wait() log.debug(f"{uid} successfully connected back to us") return event, self._peers[uid][-1] @@ -194,23 +221,22 @@ class Actor: return # channel tracking - event_or_chans = self._peers.pop(uid, None) - if isinstance(event_or_chans, trio.Event): + event = self._peer_connected.pop(uid, None) + if event: # Instructing connection: this is likely a new channel to # a recently spawned actor which we'd like to control via # async-rpc calls. - log.debug(f"Waking channel waiters {event_or_chans.statistics()}") + log.debug(f"Waking channel waiters {event.statistics()}") # Alert any task waiting on this connection to come up - event_or_chans.set() - event_or_chans.clear() # consumer can wait on channel to close - elif isinstance(event_or_chans, list): - log.warn( - f"already have channel(s) for {uid}:{event_or_chans}?" - ) - # append new channel - self._peers[uid].extend(event_or_chans) + event.set() + chans = self._peers[uid] + if chans: + log.warn( + f"already have channel(s) for {uid}:{chans}?" + ) log.debug(f"Registered {chan} for {uid}") + # append new channel self._peers[uid].append(chan) # Begin channel management - respond to remote requests and @@ -219,29 +245,40 @@ class Actor: await self._process_messages(chan) finally: # Drop ref to channel so it can be gc-ed and disconnected - if chan is not self._parent_chan: - log.debug(f"Releasing channel {chan}") - chans = self._peers.get(chan.uid) - chans.remove(chan) - if not chans: - log.debug(f"No more channels for {chan.uid}") - self._peers.pop(chan.uid, None) - if not self._peers: # no more channels connected - self._no_more_peers.set() - log.debug(f"No more peer channels") + log.debug(f"Releasing channel {chan} from {chan.uid}") + chans = self._peers.get(chan.uid) + chans.remove(chan) + if not chans: + log.debug(f"No more channels for {chan.uid}") + self._peers.pop(chan.uid, None) + if not self._actors2calls.get(chan.uid, {}).get('main'): + # fake a "main task" result for any waiting + # nurseries/portals + log.debug(f"Faking result for {chan} from {chan.uid}") + q = self.get_waitq(chan.uid, 'main') + q.put_nowait({'return': None, 'cid': 'main'}) - def _push_result(self, actorid, cid, msg): + log.debug(f"Peers is {self._peers}") + + if not self._peers: # no more channels connected + self._no_more_peers.set() + log.debug(f"Signalling no more peer channels") + + # XXX: is this necessary? + if chan.connected(): + log.debug(f"Disconnecting channel {chan}") + await chan.send(None) + await chan.aclose() + + async def _push_result(self, actorid, cid, msg): assert actorid, f"`actorid` can't be {actorid}" q = self.get_waitq(actorid, cid) log.debug(f"Delivering {msg} from {actorid} to caller {cid}") - waiters = q.statistics().tasks_waiting_get - if not waiters: - log.warn( - f"No tasks are currently waiting for results from call {cid}?") - q.put_nowait(msg) + # maintain backpressure + await q.put(msg) def get_waitq(self, actorid, cid): - log.debug(f"Registering for callid {cid} queue results from {actorid}") + log.debug(f"Getting result queue for {actorid} cid {cid}") cids2qs = self._actors2calls.setdefault(actorid, {}) return cids2qs.setdefault(cid, trio.Queue(1000)) @@ -263,23 +300,24 @@ class Actor: """ # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! - log.debug(f"Entering msg loop for {chan}") + log.debug(f"Entering msg loop for {chan} from {chan.uid}") async with trio.open_nursery() as nursery: try: async for msg in chan.aiter_recv(): if msg is None: # terminate sentinel - log.debug(f"Cancelling all tasks for {chan}") + log.debug( + f"Cancelling all tasks for {chan} from {chan.uid}") nursery.cancel_scope.cancel() - log.debug(f"Terminating msg loop for {chan}") + log.debug( + f"Msg loop signalled to terminate for" + f" {chan} from {chan.uid}") break - log.debug(f"Received msg {msg}") + log.debug(f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') if cid: # deliver response to local caller/waiter - self._push_result(chan.uid, cid, msg) - if 'error' in msg: - # TODO: need something better then this slop - raise RemoteActorError(msg['error']) - log.debug(f"Waiting on next msg for {chan}") + await self._push_result(chan.uid, cid, msg) + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") continue else: ns, funcname, kwargs, actorid, cid = msg['cmd'] @@ -312,22 +350,24 @@ class Actor: _invoke, cid, chan, func, kwargs, treat_as_gen, name=funcname ) - log.debug(f"Waiting on next msg for {chan}") + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") else: # channel disconnect - log.debug(f"{chan} disconnected") + log.debug(f"{chan} from {chan.uid} disconnected") except trio.ClosedStreamError: - log.error(f"{chan} broke") + log.error(f"{chan} form {chan.uid} broke") - log.debug(f"Exiting msg loop for {chan}") + log.debug(f"Exiting msg loop for {chan} from {chan.uid}") - def _fork_main(self, accept_addr, parent_addr=None, loglevel=None): + def _fork_main(self, accept_addr, parent_addr=None): # after fork routine which invokes a fresh ``trio.run`` + # log.warn("Log level after fork is {self.loglevel}") + if self.loglevel is not None: + get_console_log(self.loglevel) log.info( f"Started new {ctx.current_process()} for actor {self.uid}") global _current_actor _current_actor = self - if loglevel: - get_console_log(loglevel) log.debug(f"parent_addr is {parent_addr}") try: trio.run(partial( @@ -339,16 +379,19 @@ class Actor: async def _async_main( self, accept_addr, - arbiter_addr=(_default_arbiter_host, _default_arbiter_port), + arbiter_addr=None, parent_addr=None, nursery=None ): - """Start the channel server and main task. + """Start the channel server, maybe connect back to the parent, and + start the main task. A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. """ result = None + arbiter_addr = arbiter_addr or self._arb_addr + registered_with_arbiter = False try: async with maybe_open_nursery(nursery) as nursery: self._root_nursery = nursery @@ -359,17 +402,38 @@ class Actor: self._serve_forever, accept_host=host, accept_port=port) ) + # XXX: I wonder if a better name is maybe "requester" + # since I don't think the notion of a "parent" actor + # necessarily sticks given that eventually we want + # ``'MainProcess'`` (the actor who initially starts the + # forkserver) to eventually be the only one who is + # allowed to spawn new processes per Python program. if parent_addr is not None: - # Connect back to the parent actor and conduct initial - # handshake (From this point on if we error ship the - # exception back to the parent actor) - chan = self._parent_chan = Channel( - destaddr=parent_addr, - on_reconnect=self.main - ) - await chan.connect() - # initial handshake, report who we are, who they are - await _do_handshake(self, chan) + try: + # Connect back to the parent actor and conduct initial + # handshake (From this point on if we error ship the + # exception back to the parent actor) + chan = self._parent_chan = Channel( + destaddr=parent_addr, + on_reconnect=self.main + ) + await chan.connect() + # initial handshake, report who we are, who they are + await _do_handshake(self, chan) + except OSError: # failed to connect + log.warn( + f"Failed to connect to parent @ {parent_addr}," + " closing server") + self.cancel_server() + self._parent_chan = None + + # register with the arbiter if we're told its addr + log.debug(f"Registering {self} for role `{self.name}`") + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'register_actor', + name=self.name, sockaddr=self.accept_addr) + registered_with_arbiter = True # handle new connection back to parent optionally # begin responding to RPC @@ -379,57 +443,69 @@ class Actor: nursery.start_soon( self._process_messages, self._parent_chan) - # register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'register_actor', - name=self.name, sockaddr=self.accept_addr) - if self.main: - if self._parent_chan: - log.debug(f"Starting main task `{self.main}`") - # start "main" routine in a task - await nursery.start( - _invoke, 'main', self._parent_chan, self.main, {}, - False, True # treat_as_gen, raise_errs params - ) - else: - # run directly - log.debug(f"Running `{self.main}` directly") - result = await self.main() - - # terminate local in-proc once its main completes - log.debug( - f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() - log.debug(f"All peer channels are complete") - - # tear down channel server - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() - + with trio.open_cancel_scope() as main_scope: + self._main_scope = main_scope + try: + if self._parent_chan: + log.debug(f"Starting main task `{self.main}`") + # spawned subactor so deliver "main" + # task result(s) back to parent + await nursery.start( + _invoke, 'main', + self._parent_chan, self.main, {}, + # treat_as_gen, raise_errs params + False, True + ) + else: + # run directly - we are an "unspawned actor" + log.debug(f"Running `{self.main}` directly") + result = await self.main() + finally: + self._main_complete.set() + # tear down channel server in order to ensure + # we exit normally when the main task is done + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() + log.debug(f"Shutting down root nursery") + nursery.cancel_scope.cancel() + if main_scope.cancelled_caught: + log.debug("Main task was cancelled sucessfully") + log.debug("Waiting on root nursery to complete") # blocks here as expected if no nursery was provided until # the channel server is killed (i.e. this actor is # cancelled or signalled by the parent actor) except Exception: if self._parent_chan: - log.exception("Actor errored:") - await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'main'}) + try: + await self._parent_chan.send( + {'error': traceback.format_exc(), 'cid': 'main'}) + except trio.ClosedStreamError: + log.error( + f"Failed to ship error to parent " + f"{self._parent_chan.uid}, channel was closed") + log.exception("Actor errored:") + + if not registered_with_arbiter: + log.exception( + f"Failed to register with arbiter @ {arbiter_addr}") else: raise finally: - # UNregister actor from the arbiter - try: - if arbiter_addr is not None: - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'register_actor', - name=self.name, sockaddr=self.accept_addr) - except OSError: - log.warn(f"Unable to unregister {self.name} from arbiter") + await self._do_unreg(arbiter_addr) + # terminate actor once all it's peers (actors that connected + # to it as clients) have disappeared + if not self._no_more_peers.is_set(): + log.debug( + f"Waiting for remaining peers {self._peers} to clear") + await self._no_more_peers.wait() + log.debug(f"All peer channels are complete") + + # tear down channel server no matter what since we errored + # or completed + log.debug(f"Shutting down channel server") + self.cancel_server() return result @@ -441,9 +517,10 @@ class Actor: accept_port=0, task_status=trio.TASK_STATUS_IGNORED ): - """Main coroutine: connect back to the parent, spawn main task, begin - listening for new messages. + """Start the channel server, begin listening for new connections. + This will cause an actor to continue living (blocking) until + ``cancel_server()`` is called. """ async with trio.open_nursery() as nursery: self._server_nursery = nursery @@ -454,6 +531,8 @@ class Actor: partial( trio.serve_tcp, self._stream_handler, + # new connections will stay alive even if this server + # is cancelled handler_nursery=self._root_nursery, port=accept_port, host=accept_host, ) @@ -463,10 +542,25 @@ class Actor: self._listeners.extend(listeners) task_status.started() - def cancel(self): + async def _do_unreg(self, arbiter_addr): + # UNregister actor from the arbiter + try: + if arbiter_addr is not None: + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'unregister_actor', name=self.name) + except OSError: + log.warn(f"Unable to unregister {self.name} from arbiter") + + async def cancel(self): """This cancels the internal root-most nursery thereby gracefully cancelling (for all intents and purposes) this actor. """ + self.cancel_server() + if self._main_scope: + self._main_scope.cancel() + log.debug("Waiting on main task to complete") + await self._main_complete.wait() self._root_nursery.cancel_scope.cancel() def cancel_server(self): @@ -509,13 +603,8 @@ class Arbiter(Actor): def register_actor(self, name, sockaddr): self._registry[name].append(sockaddr) - def unregister_actor(self, name, sockaddr): - sockaddrs = self._registry.get(name) - if sockaddrs: - try: - sockaddrs.remove(sockaddr) - except ValueError: - pass + def unregister_actor(self, name): + self._registry.pop(name, None) class Portal: @@ -529,6 +618,7 @@ class Portal: """ def __init__(self, channel): self.channel = channel + self._result = None async def aclose(self): log.debug(f"Closing {self}") @@ -543,13 +633,14 @@ class Portal: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - chan = self.channel - # ship a function call request to the remote actor actor = current_actor() + # ship a function call request to the remote actor + cid, q = await actor.send_cmd(self.channel, ns, func, kwargs) + # wait on first response msg and handle + return await self._return_from_resptype( + cid, *(await result_from_q(q, self.channel))) - cid, q = await actor.send_cmd(chan, ns, func, kwargs) - # wait on first response msg - resptype, first_msg, q = await result_from_q(q) + async def _return_from_resptype(self, cid, resptype, first_msg, q): if resptype == 'yield': @@ -560,9 +651,14 @@ class Portal: try: yield msg['yield'] except KeyError: - raise RemoteActorError(msg['error']) + if 'stop' in msg: + break # far end async gen terminated + else: + raise RemoteActorError(msg['error']) except GeneratorExit: - log.debug(f"Cancelling async gen call {cid} to {chan.uid}") + log.debug( + f"Cancelling async gen call {cid} to " + f"{self.channel.uid}") return yield_from_q() @@ -571,12 +667,49 @@ class Portal: else: raise ValueError(f"Unknown msg response type: {first_msg}") + async def result(self): + """Return the result(s) from the remote actor's "main" task. + """ + if self._result is None: + q = current_actor().get_waitq(self.channel.uid, 'main') + resptype, first_msg, q = (await result_from_q(q, self.channel)) + self._result = await self._return_from_resptype( + 'main', resptype, first_msg, q) + log.warn( + f"Retrieved first result `{self._result}` " + f"for {self.channel.uid}") + # await q.put(first_msg) # for next consumer (e.g. nursery) + return self._result + + async def close(self): + # trigger remote msg loop `break` + chan = self.channel + log.debug(f"Closing portal for {chan} to {chan.uid}") + await self.channel.send(None) + + async def cancel_actor(self): + """Cancel the actor on the other end of this portal. + """ + log.warn( + f"Sending cancel request to {self.channel.uid} on " + f"{self.channel}") + try: + with trio.move_on_after(0.1) as cancel_scope: + cancel_scope.shield = True + # send cancel cmd - might not get response + await self.run('self', 'cancel') + return True + except trio.ClosedStreamError: + log.warn( + f"{self.channel} for {self.channel.uid} was already closed?") + return False + @asynccontextmanager async def open_portal(channel, nursery=None): """Open a ``Portal`` through the provided ``channel``. - Spawns a background task to handle rpc message processing. + Spawns a background task to handle message processing. """ actor = current_actor() assert actor @@ -591,17 +724,17 @@ async def open_portal(channel, nursery=None): if channel.uid is None: await _do_handshake(actor, channel) - if not actor.get_chans(channel.uid): - # actor is not currently managing this channel - actor._peers[channel.uid].append(channel) - nursery.start_soon(actor._process_messages, channel) - yield Portal(channel) + portal = Portal(channel) + yield portal + + # cancel remote channel-msg loop + if channel.connected(): + await portal.close() # cancel background msg loop task nursery.cancel_scope.cancel() if was_connected: - actor._peers[channel.uid].remove(channel) await channel.aclose() @@ -626,11 +759,12 @@ class ActorNursery: """Spawn scoped subprocess actors. """ def __init__(self, actor, supervisor=None): - self.supervisor = supervisor + self.supervisor = supervisor # TODO self._actor = actor # We'll likely want some way to cancel all sub-actors eventually # self.cancel_scope = cancel_scope self._children = {} + self.cancelled = False async def __aenter__(self): return self @@ -643,56 +777,81 @@ class ActorNursery: statespace=None, rpc_module_paths=None, outlive_main=False, # sub-actors die when their main task completes - loglevel=None, # set console logging per subactor + loglevel=None, # set log level per subactor ): + loglevel = loglevel or self._actor.loglevel or get_loglevel() actor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths, + rpc_module_paths=rpc_module_paths or [], statespace=statespace, # global proc state vars main=main, # main coroutine to be invoked outlive_main=outlive_main, + loglevel=loglevel, + arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr assert parent_addr proc = ctx.Process( target=actor._fork_main, - args=(bind_addr, parent_addr, loglevel), - daemon=True, + args=(bind_addr, parent_addr), + # daemon=True, name=name, ) proc.start() if not proc.is_alive(): raise ActorFailure("Couldn't start sub-actor?") + log.info(f"Started {proc}") # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it event, chan = await self._actor.wait_for_peer(actor.uid) - # channel is up, get queue which delivers result from main routine - main_q = self._actor.get_waitq(actor.uid, 'main') - self._children[(name, proc.pid)] = (actor, proc, main_q) - - return Portal(chan) + portal = Portal(chan) + self._children[(name, proc.pid)] = (actor, proc, portal) + return portal async def wait(self): - - async def wait_for_proc(proc): + """Wait for all subactors to complete. + """ + async def wait_for_proc(proc, actor, portal): # TODO: timeout block here? if proc.is_alive(): await trio.hazmat.wait_readable(proc.sentinel) # please god don't hang proc.join() log.debug(f"Joined {proc}") + event = self._actor._peers.get(actor.uid) + if isinstance(event, trio.Event): + event.set() + log.warn( + f"Cancelled `wait_for_peer()` call since {actor.uid}" + f" is already dead!") + if not portal._result: + log.debug(f"Faking result for {actor.uid}") + q = self._actor.get_waitq(actor.uid, 'main') + q.put_nowait({'return': None, 'cid': 'main'}) + + async def wait_for_result(portal): + if portal.channel.connected(): + log.debug(f"Waiting on final result from {subactor.uid}") + await portal.result() # unblocks when all waiter tasks have completed async with trio.open_nursery() as nursery: - for subactor, proc, main_q in self._children.values(): - nursery.start_soon(wait_for_proc, proc) + for subactor, proc, portal in self._children.values(): + nursery.start_soon(wait_for_proc, proc, subactor, portal) + nursery.start_soon(wait_for_result, portal) async def cancel(self, hard_kill=False): + """Cancel this nursery by instructing each subactor to cancel + iteslf and wait for all subprocesses to terminate. + + If ``hard_killl`` is set to ``True`` then kill the processes + directly without any far end graceful ``trio`` cancellation. + """ log.debug(f"Cancelling nursery") - for subactor, proc, main_q in self._children.values(): + for subactor, proc, portal in self._children.values(): if proc is mp.current_process(): # XXX: does this even make sense? await subactor.cancel() @@ -700,49 +859,41 @@ class ActorNursery: if hard_kill: log.warn(f"Hard killing subactors {self._children}") proc.terminate() - # send KeyBoardInterrupt (trio abort signal) to underlying - # sub-actors + # XXX: doesn't seem to work? + # send KeyBoardInterrupt (trio abort signal) to sub-actors # os.kill(proc.pid, signal.SIGINT) else: - # send cancel cmd - likely no response from subactor - actor = self._actor - chans = actor.get_chans(subactor.uid) - if chans: - for chan in chans: - await actor.send_cmd(chan, 'self', 'cancel', {}) - else: - log.warn( - f"Channel for {subactor.uid} is already down?") + await portal.cancel_actor() + log.debug(f"Waiting on all subactors to complete") await self.wait() + self.cancelled = True log.debug(f"All subactors for {self} have terminated") async def __aexit__(self, etype, value, tb): """Wait on all subactor's main routines to complete. """ - async def wait_for_actor(actor, proc, q): - if proc.is_alive(): - ret_type, msg, q = await result_from_q(q) - log.info(f"{actor.uid} main task completed with {msg}") - if not actor._outlive_main: - # trigger msg loop to break - chans = self._actor.get_chans(actor.uid) - for chan in chans: - log.info(f"Signalling msg loop exit for {actor.uid}") - await chan.send(None) - if etype is not None: - log.warn(f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") - await self.cancel() + if etype is trio.Cancelled: + log.warn(f"{current_actor().uid} was cancelled with {etype}, " + "cancelling actor nursery") + with trio.open_cancel_scope(shield=True): + await self.cancel() + else: + log.exception(f"{current_actor().uid} errored with {etype}, " + "cancelling actor nursery") + await self.cancel() else: - log.debug(f"Waiting on subactors to complete") - async with trio.open_nursery() as nursery: - for subactor, proc, main_q in self._children.values(): - nursery.start_soon(wait_for_actor, subactor, proc, main_q) - - await self.wait() - log.debug(f"Nursery teardown complete") + # XXX: this is effectively the lone cancellation/supervisor + # strategy which exactly mimicks trio's behaviour + log.debug(f"Waiting on subactors {self._children} to complete") + try: + await self.wait() + except Exception as err: + log.warn(f"Nursery caught {err}, cancelling") + await self.cancel() + raise + log.debug(f"Nursery teardown complete") def current_actor() -> Actor: @@ -752,7 +903,7 @@ def current_actor() -> Actor: @asynccontextmanager -async def open_nursery(supervisor=None, loglevel='WARNING'): +async def open_nursery(supervisor=None): """Create and yield a new ``ActorNursery``. """ actor = current_actor() @@ -768,7 +919,7 @@ class NoArbiterFound(Exception): "Couldn't find the arbiter?" -async def start_actor(actor, host, port, arbiter_addr, nursery=None): +async def _start_actor(actor, host, port, arbiter_addr, nursery=None): """Spawn a local actor by starting a task to execute it's main async function. @@ -783,7 +934,7 @@ async def start_actor(actor, host, port, arbiter_addr, nursery=None): # NOTE: this won't block since we provide the nursery log.info(f"Starting local {actor} @ {host}:{port}") - await actor._async_main( + result = await actor._async_main( accept_addr=(host, port), parent_addr=None, arbiter_addr=arbiter_addr, @@ -799,16 +950,7 @@ async def start_actor(actor, host, port, arbiter_addr, nursery=None): _current_actor = None log.info("Completed async main") - -@asynccontextmanager -async def _connect_chan(host, port): - """Attempt to connect to an arbiter's channel server. - Return the channel on success or None on failure. - """ - chan = Channel((host, port)) - await chan.connect() - yield chan - await chan.aclose() + return result @asynccontextmanager @@ -833,28 +975,28 @@ async def get_arbiter(host, port): @asynccontextmanager async def find_actor( name, - arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port) + arbiter_sockaddr=None, ): """Ask the arbiter to find actor(s) by name. - Returns a sequence of unconnected portals for each matching actor - known to the arbiter (client code is expected to connect the portals). + Returns a connected portal to the last registered matching actor + known to the arbiter. """ actor = current_actor() if not actor: raise RuntimeError("No actor instance has been defined yet?") - async with get_arbiter(*arbiter_sockaddr) as arb_portal: + async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal: sockaddrs = await arb_portal.run('self', 'find_actor', name=name) # TODO: return portals to all available actors - for now just - # the first one we find + # the last one that registered if sockaddrs: sockaddr = sockaddrs[-1] async with _connect_chan(*sockaddr) as chan: async with open_portal(chan) as portal: yield portal else: - yield + yield None async def _main(async_fn, args, kwargs, name, arbiter_addr): @@ -863,6 +1005,8 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): main = partial(async_fn, *args) if async_fn else None arbiter_addr = (host, port) = arbiter_addr or ( _default_arbiter_host, _default_arbiter_port) + get_console_log(kwargs.get('loglevel', get_loglevel())) + # make a temporary connection to see if an arbiter exists arbiter_found = False try: @@ -871,27 +1015,37 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr): except OSError: log.warn(f"No actor could be found @ {host}:{port}") + # create a local actor and start up its main routine/task if arbiter_found: # we were able to connect to an arbiter log.info(f"Arbiter seems to exist @ {host}:{port}") - # create a local actor and start up its main routine/task actor = Actor( name or 'anonymous', main=main, + arbiter_addr=arbiter_addr, **kwargs ) - host, port = (_default_arbiter_host, 0) + host, port = (host, 0) else: # start this local actor as the arbiter - actor = Arbiter(name or 'arbiter', main=main, **kwargs) + # this should eventually get passed `outlive_main=True`? + actor = Arbiter( + name or 'arbiter', main=main, arbiter_addr=arbiter_addr, **kwargs) - await start_actor(actor, host, port, arbiter_addr=arbiter_addr) - # Creates an internal nursery which shouldn't be cancelled even if - # the one opened below is (this is desirable because the arbiter should - # stay up until a re-election process has taken place - which is not - # implemented yet FYI). + # ``Actor._async_main()`` creates an internal nursery if one is not + # provided and thus blocks here until it's main task completes. + # Note that if the current actor is the arbiter it is desirable + # for it to stay up indefinitely until a re-election process has + # taken place - which is not implemented yet FYI). + return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr) -def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): +def run( + async_fn, + *args, + name=None, + arbiter_addr=(_default_arbiter_host, _default_arbiter_port), + **kwargs +): """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. diff --git a/tractor/ipc.py b/tractor/ipc.py index 7bc647c..56c09db 100644 --- a/tractor/ipc.py +++ b/tractor/ipc.py @@ -5,6 +5,7 @@ from typing import Coroutine, Tuple import msgpack import trio +from async_generator import asynccontextmanager from .log import get_logger log = get_logger('ipc') @@ -189,3 +190,14 @@ class Channel: def connected(self): return self.squeue.connected() if self.squeue else False + + +@asynccontextmanager +async def _connect_chan(host, port): + """Create and connect a channel with disconnect on + context manager teardown. + """ + chan = Channel((host, port)) + await chan.connect() + yield chan + await chan.aclose()