forked from goodboy/tractor
1
0
Fork 0

Merge pull request #10 from tgoodlet/initial_tests_and_ci

Initial tests and ci
asyncgen_closing_fix
goodboy 2018-07-12 00:15:58 -04:00 committed by GitHub
commit c326a90484
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 768 additions and 248 deletions

13
.travis.yml 100644
View File

@ -0,0 +1,13 @@
language: python
python:
- '3.6'
# setup.py reading README breaks this?
# - pypy
# - nightly
install:
- cd $TRAVIS_BUILD_DIR
- pip install . -r requirements-test.txt
script:
- pytest tests/

View File

@ -1,2 +1,3 @@
pytest
pytest-trio
pdbpp

18
tests/conftest.py 100644
View File

@ -0,0 +1,18 @@
"""
``tractor`` testing!!
"""
import pytest
import tractor
def pytest_addoption(parser):
parser.addoption("--ll", action="store", dest='loglevel',
default=None, help="logging level to set when testing")
@pytest.fixture(scope='session', autouse=True)
def loglevel(request):
orig = tractor._default_loglevel
level = tractor._default_loglevel = request.config.option.loglevel
yield level
tractor._default_loglevel = orig

View File

@ -2,16 +2,33 @@
Actor model API testing
"""
import time
from functools import partial
from functools import partial, wraps
from itertools import repeat
import random
import pytest
import trio
import tractor
@pytest.fixture
def us_symbols():
return ['TSLA', 'AAPL', 'CGC', 'CRON']
_arb_addr = '127.0.0.1', random.randint(1000, 9999)
def tractor_test(fn):
"""
Use:
@tractor_test
async def test_whatever():
await ...
"""
@wraps(fn)
def wrapper(*args, **kwargs):
__tracebackhide__ = True
return tractor.run(
partial(fn, *args), arbiter_addr=_arb_addr, **kwargs)
return wrapper
@pytest.mark.trio
@ -41,18 +58,20 @@ def test_local_actor_async_func():
await trio.sleep(0.1)
start = time.time()
tractor.run(print_loop)
tractor.run(print_loop, arbiter_addr=_arb_addr)
# ensure the sleeps were actually awaited
assert time.time() - start >= 1
assert nums == list(range(10))
statespace = {'doggy': 10, 'kitty': 4}
# NOTE: this func must be defined at module level in order for the
# interal pickling infra of the forkserver to work
async def spawn(is_arbiter):
statespace = {'doggy': 10, 'kitty': 4}
namespaces = ['piker.brokers.core']
namespaces = [__name__]
await trio.sleep(0.1)
actor = tractor.current_actor()
@ -72,22 +91,33 @@ async def spawn(is_arbiter):
)
assert len(nursery._children) == 1
assert portal.channel.uid in tractor.current_actor()._peers
# be sure we can still get the result
result = await portal.result()
assert result == 10
return result
else:
return 10
def test_local_arbiter_subactor_global_state():
statespace = {'doggy': 10, 'kitty': 4}
tractor.run(
result = tractor.run(
spawn,
True,
name='arbiter',
statespace=statespace,
arbiter_addr=_arb_addr,
)
assert result == 10
async def rx_price_quotes_from_brokerd(us_symbols):
"""Verify we can spawn a daemon actor and retrieve streamed price data.
async def stream_seq(sequence):
for i in sequence:
yield i
await trio.sleep(0.1)
async def stream_from_single_subactor():
"""Verify we can spawn a daemon actor and retrieve streamed data.
"""
async with tractor.find_actor('brokerd') as portals:
if not portals:
@ -95,49 +125,341 @@ async def rx_price_quotes_from_brokerd(us_symbols):
async with tractor.open_nursery() as nursery:
# no brokerd actor found
portal = await nursery.start_actor(
'brokerd',
rpc_module_paths=['piker.brokers.core'],
statespace={
'brokers2tickersubs': {},
'clients': {},
'dtasks': set()
},
main=None, # don't start a main func - use rpc
'streamerd',
rpc_module_paths=[__name__],
statespace={'global_dict': {}},
# don't start a main func - use rpc
# currently the same as outlive_main=False
main=None,
)
# gotta expose in a broker agnostic way...
# retrieve initial symbol data
# sd = await portal.run(
# 'piker.brokers.core', 'symbol_data', symbols=us_symbols)
# assert list(sd.keys()) == us_symbols
seq = range(10)
gen = await portal.run(
'piker.brokers.core',
'_test_price_stream',
broker='robinhood',
symbols=us_symbols,
agen = await portal.run(
__name__,
'stream_seq', # the func above
sequence=list(seq), # has to be msgpack serializable
)
# it'd sure be nice to have an asyncitertools here...
async for quotes in gen:
assert quotes
for key in quotes:
assert key in us_symbols
break
iseq = iter(seq)
async for val in agen:
assert val == next(iseq)
# TODO: test breaking the loop (should it kill the
# far end?)
# break
# terminate far-end async-gen
# await gen.asend(None)
# break
# stop all spawned subactors
await portal.cancel_actor()
# await nursery.cancel()
def test_stream_from_single_subactor():
"""Verify streaming from a spawned async generator.
"""
tractor.run(stream_from_single_subactor, arbiter_addr=_arb_addr)
async def assert_err():
assert 0
def test_remote_error():
"""Verify an error raises in a subactor is propagated to the parent.
"""
async def main():
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor('errorer', main=assert_err)
# get result(s) from main task
try:
return await portal.result()
except tractor.RemoteActorError:
print("Look Maa that actor failed hard, hehh")
raise
except Exception:
pass
assert 0, "Remote error was not raised?"
with pytest.raises(tractor.RemoteActorError):
# also raises
tractor.run(main, arbiter_addr=_arb_addr)
async def stream_forever():
for i in repeat("I can see these little future bubble things"):
yield i
await trio.sleep(0.01)
@tractor_test
async def test_cancel_infinite_streamer():
# stream for at most 5 seconds
with trio.move_on_after(1) as cancel_scope:
async with tractor.open_nursery() as n:
portal = await n.start_actor(
f'donny',
rpc_module_paths=[__name__],
outlive_main=True
)
async for letter in await portal.run(__name__, 'stream_forever'):
print(letter)
assert cancel_scope.cancelled_caught
assert n.cancelled
@tractor_test
async def test_one_cancels_all():
"""Verify one failed actor causes all others in the nursery
to be cancelled just like in trio.
This is the first and only supervisory strategy at the moment.
"""
try:
async with tractor.open_nursery() as n:
real_actors = []
for i in range(3):
real_actors.append(await n.start_actor(
f'actor_{i}',
rpc_module_paths=[__name__],
outlive_main=True
))
# start one actor that will fail immediately
await n.start_actor('extra', main=assert_err)
# should error here with a ``RemoteActorError`` containing
# an ``AssertionError`
except tractor.RemoteActorError:
assert n.cancelled is True
else:
pytest.fail("Should have gotten a remote assertion error?")
the_line = 'Hi my name is {}'
async def hi():
return the_line.format(tractor.current_actor().name)
async def say_hello(other_actor):
await trio.sleep(0.4) # wait for other actor to spawn
async with tractor.find_actor(other_actor) as portal:
return await portal.run(__name__, 'hi')
@tractor_test
async def test_trynamic_trio():
"""Main tractor entry point, the "master" process (for now
acts as the "director").
"""
async with tractor.open_nursery() as n:
print("Alright... Action!")
donny = await n.start_actor(
'donny',
main=partial(say_hello, 'gretchen'),
rpc_module_paths=[__name__],
outlive_main=True
)
gretchen = await n.start_actor(
'gretchen',
main=partial(say_hello, 'donny'),
rpc_module_paths=[__name__],
)
print(await gretchen.result())
print(await donny.result())
await donny.cancel_actor()
print("CUTTTT CUUTT CUT!!?! Donny!! You're supposed to say...")
def movie_theatre_question():
"""A question asked in a dark theatre, in a tangent
(errr, I mean different) process.
"""
return 'have you ever seen a portal?'
@tractor_test
async def test_movie_theatre_convo():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'frank',
# enable the actor to run funcs from this current module
rpc_module_paths=[__name__],
outlive_main=True,
)
print(await portal.run(__name__, 'movie_theatre_question'))
# calls the subactor a 2nd time
print(await portal.run(__name__, 'movie_theatre_question'))
# the async with will block here indefinitely waiting
# for our actor "frank" to complete, but since it's an
# "outlive_main" actor it will never end until cancelled
await portal.cancel_actor()
@tractor_test
async def test_movie_theatre_convo_main_task():
async with tractor.open_nursery() as n:
portal = await n.start_actor('some_linguist', main=cellar_door)
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.
print(await portal.result())
def cellar_door():
return "Dang that's beautiful"
@tractor_test
async def test_most_beautiful_word():
"""The main ``tractor`` routine.
"""
async with tractor.open_nursery() as n:
portal = await n.start_actor('some_linguist', main=cellar_door)
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.
print(await portal.result())
def do_nothing():
pass
def test_cancel_single_subactor():
async def main():
async with tractor.open_nursery() as nursery:
portal = await nursery.start_actor(
'nothin', rpc_module_paths=[__name__],
)
assert (await portal.run(__name__, 'do_nothing')) is None
# would hang otherwise
await nursery.cancel()
# arbitter is cancelled here due to `find_actors()` internals
# (which internally uses `get_arbiter` which kills its channel
# server scope on exit)
tractor.run(main, arbiter_addr=_arb_addr)
def test_rx_price_quotes_from_brokerd(us_symbols):
tractor.run(
rx_price_quotes_from_brokerd,
us_symbols,
name='arbiter',
async def stream_data(seed):
for i in range(seed):
yield i
await trio.sleep(0) # trigger scheduler
async def aggregate(seed):
"""Ensure that the two streams we receive match but only stream
a single set of values to the parent.
"""
async with tractor.open_nursery() as nursery:
portals = []
for i in range(1, 3):
# fork point
portal = await nursery.start_actor(
name=f'streamer_{i}',
rpc_module_paths=[__name__],
outlive_main=True, # daemonize these actors
)
portals.append(portal)
q = trio.Queue(500)
async def push_to_q(portal):
async for value in await portal.run(
__name__, 'stream_data', seed=seed
):
# leverage trio's built-in backpressure
await q.put(value)
await q.put(None)
print(f"FINISHED ITERATING {portal.channel.uid}")
# spawn 2 trio tasks to collect streams and push to a local queue
async with trio.open_nursery() as n:
for portal in portals:
n.start_soon(push_to_q, portal)
unique_vals = set()
async for value in q:
if value not in unique_vals:
unique_vals.add(value)
# yield upwards to the spawning parent actor
yield value
if value is None:
break
assert value in unique_vals
print("FINISHED ITERATING in aggregator")
await nursery.cancel()
print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!")
# @tractor_test
async def a_quadruple_example():
# a nursery which spawns "actors"
async with tractor.open_nursery() as nursery:
seed = int(1e3)
pre_start = time.time()
portal = await nursery.start_actor(
name='aggregator',
# executed in the actor's "main task" immediately
main=partial(aggregate, seed),
)
start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "main" function was called locally
result_stream = []
async for value in await portal.result():
result_stream.append(value)
print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
assert result_stream == list(range(seed)) + [None]
return result_stream
async def cancel_after(wait):
with trio.move_on_after(wait):
return await a_quadruple_example()
def test_a_quadruple_example():
"""Verify the *show me the code* readme example works.
"""
results = tractor.run(cancel_after, 2, arbiter_addr=_arb_addr)
assert results
def test_not_fast_enough_quad():
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
This also serves as a kind of "we'd like to eventually be this fast test".
"""
results = tractor.run(cancel_after, 1, arbiter_addr=_arb_addr)
assert results is None

View File

@ -1,5 +1,6 @@
"""
tracor: An actor model micro-framework.
tractor: An actor model micro-framework built on
``trio`` and ``multiprocessing``.
"""
from collections import defaultdict
from functools import partial
@ -13,7 +14,7 @@ import uuid
import trio
from async_generator import asynccontextmanager
from .ipc import Channel
from .ipc import Channel, _connect_chan
from .log import get_console_log, get_logger
ctx = mp.get_context("forkserver")
@ -23,6 +24,11 @@ log = get_logger('tractor')
_current_actor = None
_default_arbiter_host = '127.0.0.1'
_default_arbiter_port = 1616
_default_loglevel = None
def get_loglevel():
return _default_loglevel
class ActorFailure(Exception):
@ -55,10 +61,17 @@ async def _invoke(
"""
try:
is_async_partial = False
is_async_gen_partial = False
if isinstance(func, partial):
is_async_partial = inspect.iscoroutinefunction(func.func)
is_async_gen_partial = inspect.isasyncgenfunction(func.func)
if not inspect.iscoroutinefunction(func) and not is_async_partial:
if (
not inspect.iscoroutinefunction(func) and
not inspect.isasyncgenfunction(func) and
not is_async_partial and
not is_async_gen_partial
):
await chan.send({'return': func(**kwargs), 'cid': cid})
else:
coro = func(**kwargs)
@ -73,6 +86,12 @@ async def _invoke(
# if to_send is not None:
# to_yield = await coro.asend(to_send)
await chan.send({'yield': item, 'cid': cid})
log.debug(f"Finished iterating {coro}")
# TODO: we should really support a proper
# `StopAsyncIteration` system here for returning a final
# value if desired
await chan.send({'stop': None, 'cid': cid})
else:
if treat_as_gen:
# XXX: the async-func may spawn further tasks which push
@ -87,11 +106,12 @@ async def _invoke(
except Exception:
if not raise_errs:
await chan.send({'error': traceback.format_exc(), 'cid': cid})
log.exception("Actor errored:")
else:
raise
async def result_from_q(q):
async def result_from_q(q, chan):
"""Process a msg from a remote actor.
"""
first_msg = await q.get()
@ -100,7 +120,7 @@ async def result_from_q(q):
elif 'yield' in first_msg:
return 'yield', first_msg, q
elif 'error' in first_msg:
raise RemoteActorError(first_msg['error'])
raise RemoteActorError(f"{chan.uid}\n" + first_msg['error'])
else:
raise ValueError(f"{first_msg} is an invalid response packet?")
@ -136,6 +156,8 @@ class Actor:
uid: str = None,
allow_rpc: bool = True,
outlive_main: bool = False,
loglevel: str = None,
arbiter_addr: (str, int) = None,
):
self.name = name
self.uid = (name, uid or str(uuid.uuid1()))
@ -147,10 +169,15 @@ class Actor:
self.statespace = statespace
self._allow_rpc = allow_rpc
self._outlive_main = outlive_main
self.loglevel = loglevel
self._arb_addr = arbiter_addr
# filled in by `_async_main` after fork
self._peers = defaultdict(list)
self._peer_connected = {}
self._no_more_peers = trio.Event()
self._main_complete = trio.Event()
self._main_scope = None
self._no_more_peers.set()
self._actors2calls = {} # map {uids -> {callids -> waiter queues}}
self._listeners = []
@ -162,7 +189,7 @@ class Actor:
``uid``.
"""
log.debug(f"Waiting for peer {uid} to connect")
event = self._peers.setdefault(uid, trio.Event())
event = self._peer_connected.setdefault(uid, trio.Event())
await event.wait()
log.debug(f"{uid} successfully connected back to us")
return event, self._peers[uid][-1]
@ -194,23 +221,22 @@ class Actor:
return
# channel tracking
event_or_chans = self._peers.pop(uid, None)
if isinstance(event_or_chans, trio.Event):
event = self._peer_connected.pop(uid, None)
if event:
# Instructing connection: this is likely a new channel to
# a recently spawned actor which we'd like to control via
# async-rpc calls.
log.debug(f"Waking channel waiters {event_or_chans.statistics()}")
log.debug(f"Waking channel waiters {event.statistics()}")
# Alert any task waiting on this connection to come up
event_or_chans.set()
event_or_chans.clear() # consumer can wait on channel to close
elif isinstance(event_or_chans, list):
log.warn(
f"already have channel(s) for {uid}:{event_or_chans}?"
)
# append new channel
self._peers[uid].extend(event_or_chans)
event.set()
chans = self._peers[uid]
if chans:
log.warn(
f"already have channel(s) for {uid}:{chans}?"
)
log.debug(f"Registered {chan} for {uid}")
# append new channel
self._peers[uid].append(chan)
# Begin channel management - respond to remote requests and
@ -219,29 +245,40 @@ class Actor:
await self._process_messages(chan)
finally:
# Drop ref to channel so it can be gc-ed and disconnected
if chan is not self._parent_chan:
log.debug(f"Releasing channel {chan}")
log.debug(f"Releasing channel {chan} from {chan.uid}")
chans = self._peers.get(chan.uid)
chans.remove(chan)
if not chans:
log.debug(f"No more channels for {chan.uid}")
self._peers.pop(chan.uid, None)
if not self._actors2calls.get(chan.uid, {}).get('main'):
# fake a "main task" result for any waiting
# nurseries/portals
log.debug(f"Faking result for {chan} from {chan.uid}")
q = self.get_waitq(chan.uid, 'main')
q.put_nowait({'return': None, 'cid': 'main'})
log.debug(f"Peers is {self._peers}")
if not self._peers: # no more channels connected
self._no_more_peers.set()
log.debug(f"No more peer channels")
log.debug(f"Signalling no more peer channels")
def _push_result(self, actorid, cid, msg):
# XXX: is this necessary?
if chan.connected():
log.debug(f"Disconnecting channel {chan}")
await chan.send(None)
await chan.aclose()
async def _push_result(self, actorid, cid, msg):
assert actorid, f"`actorid` can't be {actorid}"
q = self.get_waitq(actorid, cid)
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
waiters = q.statistics().tasks_waiting_get
if not waiters:
log.warn(
f"No tasks are currently waiting for results from call {cid}?")
q.put_nowait(msg)
# maintain backpressure
await q.put(msg)
def get_waitq(self, actorid, cid):
log.debug(f"Registering for callid {cid} queue results from {actorid}")
log.debug(f"Getting result queue for {actorid} cid {cid}")
cids2qs = self._actors2calls.setdefault(actorid, {})
return cids2qs.setdefault(cid, trio.Queue(1000))
@ -263,23 +300,24 @@ class Actor:
"""
# TODO: once https://github.com/python-trio/trio/issues/467 gets
# worked out we'll likely want to use that!
log.debug(f"Entering msg loop for {chan}")
log.debug(f"Entering msg loop for {chan} from {chan.uid}")
async with trio.open_nursery() as nursery:
try:
async for msg in chan.aiter_recv():
if msg is None: # terminate sentinel
log.debug(f"Cancelling all tasks for {chan}")
log.debug(
f"Cancelling all tasks for {chan} from {chan.uid}")
nursery.cancel_scope.cancel()
log.debug(f"Terminating msg loop for {chan}")
log.debug(
f"Msg loop signalled to terminate for"
f" {chan} from {chan.uid}")
break
log.debug(f"Received msg {msg}")
log.debug(f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid')
if cid: # deliver response to local caller/waiter
self._push_result(chan.uid, cid, msg)
if 'error' in msg:
# TODO: need something better then this slop
raise RemoteActorError(msg['error'])
log.debug(f"Waiting on next msg for {chan}")
await self._push_result(chan.uid, cid, msg)
log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
else:
ns, funcname, kwargs, actorid, cid = msg['cmd']
@ -312,22 +350,24 @@ class Actor:
_invoke, cid, chan, func, kwargs, treat_as_gen,
name=funcname
)
log.debug(f"Waiting on next msg for {chan}")
log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
else: # channel disconnect
log.debug(f"{chan} disconnected")
log.debug(f"{chan} from {chan.uid} disconnected")
except trio.ClosedStreamError:
log.error(f"{chan} broke")
log.error(f"{chan} form {chan.uid} broke")
log.debug(f"Exiting msg loop for {chan}")
log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
def _fork_main(self, accept_addr, parent_addr=None, loglevel=None):
def _fork_main(self, accept_addr, parent_addr=None):
# after fork routine which invokes a fresh ``trio.run``
# log.warn("Log level after fork is {self.loglevel}")
if self.loglevel is not None:
get_console_log(self.loglevel)
log.info(
f"Started new {ctx.current_process()} for actor {self.uid}")
global _current_actor
_current_actor = self
if loglevel:
get_console_log(loglevel)
log.debug(f"parent_addr is {parent_addr}")
try:
trio.run(partial(
@ -339,16 +379,19 @@ class Actor:
async def _async_main(
self,
accept_addr,
arbiter_addr=(_default_arbiter_host, _default_arbiter_port),
arbiter_addr=None,
parent_addr=None,
nursery=None
):
"""Start the channel server and main task.
"""Start the channel server, maybe connect back to the parent, and
start the main task.
A "root-most" (or "top-level") nursery for this actor is opened here
and when cancelled effectively cancels the actor.
"""
result = None
arbiter_addr = arbiter_addr or self._arb_addr
registered_with_arbiter = False
try:
async with maybe_open_nursery(nursery) as nursery:
self._root_nursery = nursery
@ -359,7 +402,14 @@ class Actor:
self._serve_forever, accept_host=host, accept_port=port)
)
# XXX: I wonder if a better name is maybe "requester"
# since I don't think the notion of a "parent" actor
# necessarily sticks given that eventually we want
# ``'MainProcess'`` (the actor who initially starts the
# forkserver) to eventually be the only one who is
# allowed to spawn new processes per Python program.
if parent_addr is not None:
try:
# Connect back to the parent actor and conduct initial
# handshake (From this point on if we error ship the
# exception back to the parent actor)
@ -370,6 +420,20 @@ class Actor:
await chan.connect()
# initial handshake, report who we are, who they are
await _do_handshake(self, chan)
except OSError: # failed to connect
log.warn(
f"Failed to connect to parent @ {parent_addr},"
" closing server")
self.cancel_server()
self._parent_chan = None
# register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr)
registered_with_arbiter = True
# handle new connection back to parent optionally
# begin responding to RPC
@ -379,57 +443,69 @@ class Actor:
nursery.start_soon(
self._process_messages, self._parent_chan)
# register with the arbiter if we're told its addr
log.debug(f"Registering {self} for role `{self.name}`")
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr)
if self.main:
with trio.open_cancel_scope() as main_scope:
self._main_scope = main_scope
try:
if self._parent_chan:
log.debug(f"Starting main task `{self.main}`")
# start "main" routine in a task
# spawned subactor so deliver "main"
# task result(s) back to parent
await nursery.start(
_invoke, 'main', self._parent_chan, self.main, {},
False, True # treat_as_gen, raise_errs params
_invoke, 'main',
self._parent_chan, self.main, {},
# treat_as_gen, raise_errs params
False, True
)
else:
# run directly
# run directly - we are an "unspawned actor"
log.debug(f"Running `{self.main}` directly")
result = await self.main()
# terminate local in-proc once its main completes
log.debug(
f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait()
log.debug(f"All peer channels are complete")
# tear down channel server
finally:
self._main_complete.set()
# tear down channel server in order to ensure
# we exit normally when the main task is done
if not self._outlive_main:
log.debug(f"Shutting down channel server")
self.cancel_server()
log.debug(f"Shutting down root nursery")
nursery.cancel_scope.cancel()
if main_scope.cancelled_caught:
log.debug("Main task was cancelled sucessfully")
log.debug("Waiting on root nursery to complete")
# blocks here as expected if no nursery was provided until
# the channel server is killed (i.e. this actor is
# cancelled or signalled by the parent actor)
except Exception:
if self._parent_chan:
log.exception("Actor errored:")
try:
await self._parent_chan.send(
{'error': traceback.format_exc(), 'cid': 'main'})
except trio.ClosedStreamError:
log.error(
f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed")
log.exception("Actor errored:")
if not registered_with_arbiter:
log.exception(
f"Failed to register with arbiter @ {arbiter_addr}")
else:
raise
finally:
# UNregister actor from the arbiter
try:
if arbiter_addr is not None:
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'register_actor',
name=self.name, sockaddr=self.accept_addr)
except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter")
await self._do_unreg(arbiter_addr)
# terminate actor once all it's peers (actors that connected
# to it as clients) have disappeared
if not self._no_more_peers.is_set():
log.debug(
f"Waiting for remaining peers {self._peers} to clear")
await self._no_more_peers.wait()
log.debug(f"All peer channels are complete")
# tear down channel server no matter what since we errored
# or completed
log.debug(f"Shutting down channel server")
self.cancel_server()
return result
@ -441,9 +517,10 @@ class Actor:
accept_port=0,
task_status=trio.TASK_STATUS_IGNORED
):
"""Main coroutine: connect back to the parent, spawn main task, begin
listening for new messages.
"""Start the channel server, begin listening for new connections.
This will cause an actor to continue living (blocking) until
``cancel_server()`` is called.
"""
async with trio.open_nursery() as nursery:
self._server_nursery = nursery
@ -454,6 +531,8 @@ class Actor:
partial(
trio.serve_tcp,
self._stream_handler,
# new connections will stay alive even if this server
# is cancelled
handler_nursery=self._root_nursery,
port=accept_port, host=accept_host,
)
@ -463,10 +542,25 @@ class Actor:
self._listeners.extend(listeners)
task_status.started()
def cancel(self):
async def _do_unreg(self, arbiter_addr):
# UNregister actor from the arbiter
try:
if arbiter_addr is not None:
async with get_arbiter(*arbiter_addr) as arb_portal:
await arb_portal.run(
'self', 'unregister_actor', name=self.name)
except OSError:
log.warn(f"Unable to unregister {self.name} from arbiter")
async def cancel(self):
"""This cancels the internal root-most nursery thereby gracefully
cancelling (for all intents and purposes) this actor.
"""
self.cancel_server()
if self._main_scope:
self._main_scope.cancel()
log.debug("Waiting on main task to complete")
await self._main_complete.wait()
self._root_nursery.cancel_scope.cancel()
def cancel_server(self):
@ -509,13 +603,8 @@ class Arbiter(Actor):
def register_actor(self, name, sockaddr):
self._registry[name].append(sockaddr)
def unregister_actor(self, name, sockaddr):
sockaddrs = self._registry.get(name)
if sockaddrs:
try:
sockaddrs.remove(sockaddr)
except ValueError:
pass
def unregister_actor(self, name):
self._registry.pop(name, None)
class Portal:
@ -529,6 +618,7 @@ class Portal:
"""
def __init__(self, channel):
self.channel = channel
self._result = None
async def aclose(self):
log.debug(f"Closing {self}")
@ -543,13 +633,14 @@ class Portal:
# TODO: not this needs some serious work and thinking about how
# to make async-generators the fundamental IPC API over channels!
# (think `yield from`, `gen.send()`, and functional reactive stuff)
chan = self.channel
# ship a function call request to the remote actor
actor = current_actor()
# ship a function call request to the remote actor
cid, q = await actor.send_cmd(self.channel, ns, func, kwargs)
# wait on first response msg and handle
return await self._return_from_resptype(
cid, *(await result_from_q(q, self.channel)))
cid, q = await actor.send_cmd(chan, ns, func, kwargs)
# wait on first response msg
resptype, first_msg, q = await result_from_q(q)
async def _return_from_resptype(self, cid, resptype, first_msg, q):
if resptype == 'yield':
@ -560,9 +651,14 @@ class Portal:
try:
yield msg['yield']
except KeyError:
if 'stop' in msg:
break # far end async gen terminated
else:
raise RemoteActorError(msg['error'])
except GeneratorExit:
log.debug(f"Cancelling async gen call {cid} to {chan.uid}")
log.debug(
f"Cancelling async gen call {cid} to "
f"{self.channel.uid}")
return yield_from_q()
@ -571,12 +667,49 @@ class Portal:
else:
raise ValueError(f"Unknown msg response type: {first_msg}")
async def result(self):
"""Return the result(s) from the remote actor's "main" task.
"""
if self._result is None:
q = current_actor().get_waitq(self.channel.uid, 'main')
resptype, first_msg, q = (await result_from_q(q, self.channel))
self._result = await self._return_from_resptype(
'main', resptype, first_msg, q)
log.warn(
f"Retrieved first result `{self._result}` "
f"for {self.channel.uid}")
# await q.put(first_msg) # for next consumer (e.g. nursery)
return self._result
async def close(self):
# trigger remote msg loop `break`
chan = self.channel
log.debug(f"Closing portal for {chan} to {chan.uid}")
await self.channel.send(None)
async def cancel_actor(self):
"""Cancel the actor on the other end of this portal.
"""
log.warn(
f"Sending cancel request to {self.channel.uid} on "
f"{self.channel}")
try:
with trio.move_on_after(0.1) as cancel_scope:
cancel_scope.shield = True
# send cancel cmd - might not get response
await self.run('self', 'cancel')
return True
except trio.ClosedStreamError:
log.warn(
f"{self.channel} for {self.channel.uid} was already closed?")
return False
@asynccontextmanager
async def open_portal(channel, nursery=None):
"""Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle rpc message processing.
Spawns a background task to handle message processing.
"""
actor = current_actor()
assert actor
@ -591,17 +724,17 @@ async def open_portal(channel, nursery=None):
if channel.uid is None:
await _do_handshake(actor, channel)
if not actor.get_chans(channel.uid):
# actor is not currently managing this channel
actor._peers[channel.uid].append(channel)
nursery.start_soon(actor._process_messages, channel)
yield Portal(channel)
portal = Portal(channel)
yield portal
# cancel remote channel-msg loop
if channel.connected():
await portal.close()
# cancel background msg loop task
nursery.cancel_scope.cancel()
if was_connected:
actor._peers[channel.uid].remove(channel)
await channel.aclose()
@ -626,11 +759,12 @@ class ActorNursery:
"""Spawn scoped subprocess actors.
"""
def __init__(self, actor, supervisor=None):
self.supervisor = supervisor
self.supervisor = supervisor # TODO
self._actor = actor
# We'll likely want some way to cancel all sub-actors eventually
# self.cancel_scope = cancel_scope
self._children = {}
self.cancelled = False
async def __aenter__(self):
return self
@ -643,56 +777,81 @@ class ActorNursery:
statespace=None,
rpc_module_paths=None,
outlive_main=False, # sub-actors die when their main task completes
loglevel=None, # set console logging per subactor
loglevel=None, # set log level per subactor
):
loglevel = loglevel or self._actor.loglevel or get_loglevel()
actor = Actor(
name,
# modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths,
rpc_module_paths=rpc_module_paths or [],
statespace=statespace, # global proc state vars
main=main, # main coroutine to be invoked
outlive_main=outlive_main,
loglevel=loglevel,
arbiter_addr=current_actor()._arb_addr,
)
parent_addr = self._actor.accept_addr
assert parent_addr
proc = ctx.Process(
target=actor._fork_main,
args=(bind_addr, parent_addr, loglevel),
daemon=True,
args=(bind_addr, parent_addr),
# daemon=True,
name=name,
)
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await self._actor.wait_for_peer(actor.uid)
# channel is up, get queue which delivers result from main routine
main_q = self._actor.get_waitq(actor.uid, 'main')
self._children[(name, proc.pid)] = (actor, proc, main_q)
return Portal(chan)
portal = Portal(chan)
self._children[(name, proc.pid)] = (actor, proc, portal)
return portal
async def wait(self):
async def wait_for_proc(proc):
"""Wait for all subactors to complete.
"""
async def wait_for_proc(proc, actor, portal):
# TODO: timeout block here?
if proc.is_alive():
await trio.hazmat.wait_readable(proc.sentinel)
# please god don't hang
proc.join()
log.debug(f"Joined {proc}")
event = self._actor._peers.get(actor.uid)
if isinstance(event, trio.Event):
event.set()
log.warn(
f"Cancelled `wait_for_peer()` call since {actor.uid}"
f" is already dead!")
if not portal._result:
log.debug(f"Faking result for {actor.uid}")
q = self._actor.get_waitq(actor.uid, 'main')
q.put_nowait({'return': None, 'cid': 'main'})
async def wait_for_result(portal):
if portal.channel.connected():
log.debug(f"Waiting on final result from {subactor.uid}")
await portal.result()
# unblocks when all waiter tasks have completed
async with trio.open_nursery() as nursery:
for subactor, proc, main_q in self._children.values():
nursery.start_soon(wait_for_proc, proc)
for subactor, proc, portal in self._children.values():
nursery.start_soon(wait_for_proc, proc, subactor, portal)
nursery.start_soon(wait_for_result, portal)
async def cancel(self, hard_kill=False):
"""Cancel this nursery by instructing each subactor to cancel
iteslf and wait for all subprocesses to terminate.
If ``hard_killl`` is set to ``True`` then kill the processes
directly without any far end graceful ``trio`` cancellation.
"""
log.debug(f"Cancelling nursery")
for subactor, proc, main_q in self._children.values():
for subactor, proc, portal in self._children.values():
if proc is mp.current_process():
# XXX: does this even make sense?
await subactor.cancel()
@ -700,48 +859,40 @@ class ActorNursery:
if hard_kill:
log.warn(f"Hard killing subactors {self._children}")
proc.terminate()
# send KeyBoardInterrupt (trio abort signal) to underlying
# sub-actors
# XXX: doesn't seem to work?
# send KeyBoardInterrupt (trio abort signal) to sub-actors
# os.kill(proc.pid, signal.SIGINT)
else:
# send cancel cmd - likely no response from subactor
actor = self._actor
chans = actor.get_chans(subactor.uid)
if chans:
for chan in chans:
await actor.send_cmd(chan, 'self', 'cancel', {})
else:
log.warn(
f"Channel for {subactor.uid} is already down?")
await portal.cancel_actor()
log.debug(f"Waiting on all subactors to complete")
await self.wait()
self.cancelled = True
log.debug(f"All subactors for {self} have terminated")
async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete.
"""
async def wait_for_actor(actor, proc, q):
if proc.is_alive():
ret_type, msg, q = await result_from_q(q)
log.info(f"{actor.uid} main task completed with {msg}")
if not actor._outlive_main:
# trigger msg loop to break
chans = self._actor.get_chans(actor.uid)
for chan in chans:
log.info(f"Signalling msg loop exit for {actor.uid}")
await chan.send(None)
if etype is not None:
log.warn(f"{current_actor().uid} errored with {etype}, "
if etype is trio.Cancelled:
log.warn(f"{current_actor().uid} was cancelled with {etype}, "
"cancelling actor nursery")
with trio.open_cancel_scope(shield=True):
await self.cancel()
else:
log.exception(f"{current_actor().uid} errored with {etype}, "
"cancelling actor nursery")
await self.cancel()
else:
log.debug(f"Waiting on subactors to complete")
async with trio.open_nursery() as nursery:
for subactor, proc, main_q in self._children.values():
nursery.start_soon(wait_for_actor, subactor, proc, main_q)
# XXX: this is effectively the lone cancellation/supervisor
# strategy which exactly mimicks trio's behaviour
log.debug(f"Waiting on subactors {self._children} to complete")
try:
await self.wait()
except Exception as err:
log.warn(f"Nursery caught {err}, cancelling")
await self.cancel()
raise
log.debug(f"Nursery teardown complete")
@ -752,7 +903,7 @@ def current_actor() -> Actor:
@asynccontextmanager
async def open_nursery(supervisor=None, loglevel='WARNING'):
async def open_nursery(supervisor=None):
"""Create and yield a new ``ActorNursery``.
"""
actor = current_actor()
@ -768,7 +919,7 @@ class NoArbiterFound(Exception):
"Couldn't find the arbiter?"
async def start_actor(actor, host, port, arbiter_addr, nursery=None):
async def _start_actor(actor, host, port, arbiter_addr, nursery=None):
"""Spawn a local actor by starting a task to execute it's main
async function.
@ -783,7 +934,7 @@ async def start_actor(actor, host, port, arbiter_addr, nursery=None):
# NOTE: this won't block since we provide the nursery
log.info(f"Starting local {actor} @ {host}:{port}")
await actor._async_main(
result = await actor._async_main(
accept_addr=(host, port),
parent_addr=None,
arbiter_addr=arbiter_addr,
@ -799,16 +950,7 @@ async def start_actor(actor, host, port, arbiter_addr, nursery=None):
_current_actor = None
log.info("Completed async main")
@asynccontextmanager
async def _connect_chan(host, port):
"""Attempt to connect to an arbiter's channel server.
Return the channel on success or None on failure.
"""
chan = Channel((host, port))
await chan.connect()
yield chan
await chan.aclose()
return result
@asynccontextmanager
@ -833,28 +975,28 @@ async def get_arbiter(host, port):
@asynccontextmanager
async def find_actor(
name,
arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port)
arbiter_sockaddr=None,
):
"""Ask the arbiter to find actor(s) by name.
Returns a sequence of unconnected portals for each matching actor
known to the arbiter (client code is expected to connect the portals).
Returns a connected portal to the last registered matching actor
known to the arbiter.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
async with get_arbiter(*arbiter_sockaddr) as arb_portal:
async with get_arbiter(*arbiter_sockaddr or actor._arb_addr) as arb_portal:
sockaddrs = await arb_portal.run('self', 'find_actor', name=name)
# TODO: return portals to all available actors - for now just
# the first one we find
# the last one that registered
if sockaddrs:
sockaddr = sockaddrs[-1]
async with _connect_chan(*sockaddr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
yield
yield None
async def _main(async_fn, args, kwargs, name, arbiter_addr):
@ -863,6 +1005,8 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
main = partial(async_fn, *args) if async_fn else None
arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host, _default_arbiter_port)
get_console_log(kwargs.get('loglevel', get_loglevel()))
# make a temporary connection to see if an arbiter exists
arbiter_found = False
try:
@ -871,27 +1015,37 @@ async def _main(async_fn, args, kwargs, name, arbiter_addr):
except OSError:
log.warn(f"No actor could be found @ {host}:{port}")
# create a local actor and start up its main routine/task
if arbiter_found: # we were able to connect to an arbiter
log.info(f"Arbiter seems to exist @ {host}:{port}")
# create a local actor and start up its main routine/task
actor = Actor(
name or 'anonymous',
main=main,
arbiter_addr=arbiter_addr,
**kwargs
)
host, port = (_default_arbiter_host, 0)
host, port = (host, 0)
else:
# start this local actor as the arbiter
actor = Arbiter(name or 'arbiter', main=main, **kwargs)
# this should eventually get passed `outlive_main=True`?
actor = Arbiter(
name or 'arbiter', main=main, arbiter_addr=arbiter_addr, **kwargs)
await start_actor(actor, host, port, arbiter_addr=arbiter_addr)
# Creates an internal nursery which shouldn't be cancelled even if
# the one opened below is (this is desirable because the arbiter should
# stay up until a re-election process has taken place - which is not
# implemented yet FYI).
# ``Actor._async_main()`` creates an internal nursery if one is not
# provided and thus blocks here until it's main task completes.
# Note that if the current actor is the arbiter it is desirable
# for it to stay up indefinitely until a re-election process has
# taken place - which is not implemented yet FYI).
return await _start_actor(actor, host, port, arbiter_addr=arbiter_addr)
def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs):
def run(
async_fn,
*args,
name=None,
arbiter_addr=(_default_arbiter_host, _default_arbiter_port),
**kwargs
):
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.

View File

@ -5,6 +5,7 @@ from typing import Coroutine, Tuple
import msgpack
import trio
from async_generator import asynccontextmanager
from .log import get_logger
log = get_logger('ipc')
@ -189,3 +190,14 @@ class Channel:
def connected(self):
return self.squeue.connected() if self.squeue else False
@asynccontextmanager
async def _connect_chan(host, port):
"""Create and connect a channel with disconnect on
context manager teardown.
"""
chan = Channel((host, port))
await chan.connect()
yield chan
await chan.aclose()