forked from goodboy/tractor
1
0
Fork 0

Merge pull request #56 from tgoodlet/trio_memchans

Use trio memory channels!
docs_example_fixes
goodboy 2019-02-20 21:24:47 -05:00 committed by GitHub
commit a927966170
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 367 additions and 199 deletions

View File

@ -371,16 +371,16 @@ and print the results to your screen:
portals.append(portal)
q = trio.Queue(500)
send_chan, recv_chan = trio.open_memory_channel(500)
async def push_to_q(portal):
async for value in await portal.run(
__name__, 'stream_data', seed=seed
):
# leverage trio's built-in backpressure
await q.put(value)
await send_chan.send(value)
await q.put(None)
await send_chan.send(None)
print(f"FINISHED ITERATING {portal.channel.uid}")
# spawn 2 trio tasks to collect streams and push to a local queue
@ -389,7 +389,7 @@ and print the results to your screen:
n.start_soon(push_to_q, portal)
unique_vals = set()
async for value in q:
async for value in recv_chan:
if value not in unique_vals:
unique_vals.add(value)
# yield upwards to the spawning parent actor

View File

@ -40,7 +40,7 @@ setup(
install_requires=[
'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt'],
tests_require=['pytest'],
python_requires=">=3.6",
python_requires=">=3.7",
keywords=[
"async", "concurrency", "actor model", "distributed",
'trio', 'multiprocessing'

View File

@ -70,7 +70,7 @@ async def test_cancel_remote_arbiter(daemon, arb_addr):
pass
async def test_register_duplicate_name(daemon, arb_addr):
def test_register_duplicate_name(daemon, arb_addr):
async def main():
assert not tractor.current_actor().is_arbiter
@ -85,4 +85,4 @@ async def test_register_duplicate_name(daemon, arb_addr):
# run it manually since we want to start **after**
# the other "daemon" program
tractor.run(main, arb_addr=arbiter_addr)
tractor.run(main, arbiter_addr=arb_addr)

View File

@ -4,7 +4,6 @@ from itertools import cycle
import pytest
import trio
import tractor
from async_generator import aclosing
from tractor.testing import tractor_test
@ -30,10 +29,10 @@ def is_even(i):
@tractor.msg.pub
async def pubber(get_topics):
async def pubber(get_topics, seed=10):
ss = tractor.current_actor().statespace
for i in cycle(range(10)):
for i in cycle(range(seed)):
# ensure topic subscriptions are as expected
ss['get_topics'] = get_topics
@ -42,7 +41,11 @@ async def pubber(get_topics):
await trio.sleep(0.1)
async def subs(which, pub_actor_name):
async def subs(
which, pub_actor_name, seed=10,
portal=None,
task_status=trio.TASK_STATUS_IGNORED,
):
if len(which) == 1:
if which[0] == 'even':
pred = is_even
@ -54,11 +57,43 @@ async def subs(which, pub_actor_name):
return isinstance(i, int)
async with tractor.find_actor(pub_actor_name) as portal:
agen = await portal.run(__name__, 'pubber', topics=which)
async with aclosing(agen) as agen:
async for pkt in agen:
stream = await portal.run(
__name__, 'pubber',
topics=which,
seed=seed,
)
task_status.started(stream)
times = 10
count = 0
await stream.__anext__()
async for pkt in stream:
for topic, value in pkt.items():
assert pred(value)
count += 1
if count >= times:
break
await stream.aclose()
stream = await portal.run(
__name__, 'pubber',
topics=['odd'],
seed=seed,
)
await stream.__anext__()
count = 0
# async with aclosing(stream) as stream:
try:
async for pkt in stream:
for topic, value in pkt.items():
pass
# assert pred(value)
count += 1
if count >= times:
break
finally:
await stream.aclose()
@tractor.msg.pub(tasks=['one', 'two'])
@ -76,7 +111,7 @@ async def multilock_pubber(get_topics):
(multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError),
# should work
(multilock_pubber,
{'ctx': None, 'topics': ['topic1'], 'task_name': 'one'},
{'ctx': None, 'topics': ['doggy'], 'task_name': 'one'},
None),
],
)
@ -91,7 +126,12 @@ async def test_required_args(callwith_expecterror):
async with tractor.open_nursery() as n:
# await func(**kwargs)
portal = await n.run_in_actor(
'sub', multilock_pubber, **kwargs)
'pubber', multilock_pubber, **kwargs)
async with tractor.wait_for_actor('pubber'):
pass
await trio.sleep(0.5)
async for val in await portal.result():
assert val == {'doggy': 10}
@ -101,7 +141,7 @@ async def test_required_args(callwith_expecterror):
'pub_actor',
['streamer', 'arbiter']
)
def test_pubsub_multi_actor_subs(
def test_multi_actor_subs_arbiter_pub(
loglevel,
arb_addr,
pub_actor,
@ -115,7 +155,7 @@ def test_pubsub_multi_actor_subs(
name = 'arbiter'
if pub_actor is 'streamer':
if pub_actor == 'streamer':
# start the publisher as a daemon
master_portal = await n.start_actor(
'streamer',
@ -131,7 +171,7 @@ def test_pubsub_multi_actor_subs(
# block until 2nd actor is initialized
pass
if pub_actor is 'arbiter':
if pub_actor == 'arbiter':
# wait for publisher task to be spawned in a local RPC task
while not ss.get('get_topics'):
await trio.sleep(0.1)
@ -144,7 +184,7 @@ def test_pubsub_multi_actor_subs(
# block until 2nd actor is initialized
pass
if pub_actor is 'arbiter':
if pub_actor == 'arbiter':
start = time.time()
while 'odd' not in get_topics():
await trio.sleep(0.1)
@ -166,13 +206,13 @@ def test_pubsub_multi_actor_subs(
await even_portal.cancel_actor()
await trio.sleep(0.5)
if pub_actor is 'arbiter':
if pub_actor == 'arbiter':
assert 'even' not in get_topics()
await odd_portal.cancel_actor()
await trio.sleep(1)
if pub_actor is 'arbiter':
if pub_actor == 'arbiter':
while get_topics():
await trio.sleep(0.1)
if time.time() - start > 1:
@ -185,3 +225,44 @@ def test_pubsub_multi_actor_subs(
arbiter_addr=arb_addr,
rpc_module_paths=[__name__],
)
def test_single_subactor_pub_multitask_subs(
loglevel,
arb_addr,
):
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'streamer',
rpc_module_paths=[__name__],
)
async with tractor.wait_for_actor('streamer'):
# block until 2nd actor is initialized
pass
async with trio.open_nursery() as tn:
agen = await tn.start(subs, ['even'], 'streamer')
await trio.sleep(0.1)
tn.start_soon(subs, ['even'], 'streamer')
# XXX this will trigger the python bug:
# https://bugs.python.org/issue32526
# if using async generators to wrap tractor channels
await agen.aclose()
await trio.sleep(0.1)
tn.start_soon(subs, ['even'], 'streamer')
await trio.sleep(0.1)
tn.start_soon(subs, ['even'], 'streamer')
await portal.cancel_actor()
tractor.run(
main,
arbiter_addr=arb_addr,
rpc_module_paths=[__name__],
)

View File

@ -14,7 +14,7 @@ async def stream_seq(sequence):
await trio.sleep(0.1)
# block indefinitely waiting to be cancelled by ``aclose()`` call
with trio.open_cancel_scope() as cs:
with trio.CancelScope() as cs:
await trio.sleep(float('inf'))
assert 0
assert cs.cancelled_caught
@ -91,25 +91,25 @@ async def aggregate(seed):
portals.append(portal)
q = trio.Queue(500)
send_chan, recv_chan = trio.open_memory_channel(500)
async def push_to_q(portal):
async def push_to_chan(portal):
async for value in await portal.run(
__name__, 'stream_data', seed=seed
):
# leverage trio's built-in backpressure
await q.put(value)
await send_chan.send(value)
await q.put(None)
await send_chan.send(None)
print(f"FINISHED ITERATING {portal.channel.uid}")
# spawn 2 trio tasks to collect streams and push to a local queue
async with trio.open_nursery() as n:
for portal in portals:
n.start_soon(push_to_q, portal)
n.start_soon(push_to_chan, portal)
unique_vals = set()
async for value in q:
async for value in recv_chan:
if value not in unique_vals:
unique_vals.add(value)
# yield upwards to the spawning parent actor

View File

@ -73,7 +73,7 @@ async def _invoke(
not is_async_gen_partial
):
await chan.send({'functype': 'function', 'cid': cid})
with trio.open_cancel_scope() as cs:
with trio.CancelScope() as cs:
task_status.started(cs)
await chan.send({'return': func(**kwargs), 'cid': cid})
else:
@ -88,7 +88,7 @@ async def _invoke(
# have to properly handle the closing (aclosing)
# of the async gen in order to be sure the cancel
# is propagated!
with trio.open_cancel_scope() as cs:
with trio.CancelScope() as cs:
task_status.started(cs)
async with aclosing(coro) as agen:
async for item in agen:
@ -113,7 +113,7 @@ async def _invoke(
# back values like an async-generator would but must
# manualy construct the response dict-packet-responses as
# above
with trio.open_cancel_scope() as cs:
with trio.CancelScope() as cs:
task_status.started(cs)
await coro
if not cs.cancelled_caught:
@ -122,7 +122,7 @@ async def _invoke(
await chan.send({'stop': True, 'cid': cid})
else:
await chan.send({'functype': 'asyncfunction', 'cid': cid})
with trio.open_cancel_scope() as cs:
with trio.CancelScope() as cs:
task_status.started(cs)
await chan.send({'return': await coro, 'cid': cid})
except Exception as err:
@ -199,7 +199,9 @@ class Actor:
Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event]
] = {}
# map {uids -> {callids -> waiter queues}}
self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {}
self._cids2qs: Dict[
Tuple[Tuple[str, str], str],
trio.abc.SendChannel[Any]] = {}
self._listeners: List[trio.abc.Listener] = []
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None
@ -299,37 +301,62 @@ class Actor:
log.exception(
f"Channel for {chan.uid} was already zonked..")
async def _push_result(self, actorid, cid: str, msg: dict) -> None:
async def _push_result(
self,
chan: Channel,
msg: Dict[str, Any],
) -> None:
"""Push an RPC result to the local consumer's queue.
"""
actorid = chan.uid
assert actorid, f"`actorid` can't be {actorid}"
q = self.get_waitq(actorid, cid)
cid = msg['cid']
send_chan = self._cids2qs[(actorid, cid)]
assert send_chan.cid == cid
if 'stop' in msg:
log.debug(f"{send_chan} was terminated at remote end")
return await send_chan.aclose()
try:
log.debug(f"Delivering {msg} from {actorid} to caller {cid}")
# maintain backpressure
await q.put(msg)
await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{send_chan} consumer is already closed")
def get_waitq(
def get_memchans(
self,
actorid: Tuple[str, str],
cid: str
) -> trio.Queue:
) -> trio.abc.ReceiveChannel:
log.debug(f"Getting result queue for {actorid} cid {cid}")
cids2qs = self._actors2calls.setdefault(actorid, {})
return cids2qs.setdefault(cid, trio.Queue(1000))
try:
recv_chan = self._cids2qs[(actorid, cid)]
except KeyError:
send_chan, recv_chan = trio.open_memory_channel(1000)
send_chan.cid = cid
self._cids2qs[(actorid, cid)] = send_chan
return recv_chan
async def send_cmd(
self, chan: Channel, ns: str, func: str, kwargs: dict
) -> Tuple[str, trio.Queue]:
self,
chan: Channel,
ns: str,
func: str,
kwargs: dict
) -> Tuple[str, trio.abc.ReceiveChannel]:
"""Send a ``'cmd'`` message to a remote actor and return a
caller id and a ``trio.Queue`` that can be used to wait for
responses delivered by the local message processing loop.
"""
cid = str(uuid.uuid1())
assert chan.uid
q = self.get_waitq(chan.uid, cid)
recv_chan = self.get_memchans(chan.uid, cid)
log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})")
await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)})
return cid, q
return cid, recv_chan
async def _process_messages(
self, chan: Channel,
@ -350,7 +377,7 @@ class Actor:
# loop running despite the current task having been
# cancelled (eg. `open_portal()` may call this method from
# a locally spawned task)
with trio.open_cancel_scope(shield=shield) as cs:
with trio.CancelScope(shield=shield) as cs:
task_status.started(cs)
async for msg in chan:
if msg is None: # loop terminate sentinel
@ -364,11 +391,10 @@ class Actor:
f" {chan} from {chan.uid}")
break
log.debug(f"Received msg {msg} from {chan.uid}")
cid = msg.get('cid')
if cid:
log.trace(f"Received msg {msg} from {chan.uid}") # type: ignore
if msg.get('cid'):
# deliver response to local caller/waiter
await self._push_result(chan.uid, cid, msg)
await self._push_result(chan, msg)
log.debug(
f"Waiting on next msg for {chan} from {chan.uid}")
continue
@ -643,9 +669,14 @@ class Actor:
# streaming IPC but it should be called
# to cancel any remotely spawned task
chan = ctx.chan
# the ``dict.get()`` ensures the requested task to be cancelled
# was indeed spawned by a request from this channel
try:
# this ctx based lookup ensures the requested task to
# be cancelled was indeed spawned by a request from this channel
scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)]
except KeyError:
log.warning(f"{cid} has already completed/terminated?")
return
log.debug(
f"Cancelling task:\ncid: {cid}\nfunc: {func}\n"
f"peer: {chan.uid}\n")

View File

@ -9,7 +9,7 @@ from functools import partial
from dataclasses import dataclass
import trio
from async_generator import asynccontextmanager, aclosing
from async_generator import asynccontextmanager
from ._state import current_actor
from ._ipc import Channel
@ -48,6 +48,91 @@ async def _do_handshake(
return uid
class StreamReceiveChannel(trio.abc.ReceiveChannel):
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
special behaviour for signalling stream termination across an
inter-actor ``Channel``. This is the type returned to a local task
which invoked a remote streaming function using `Portal.run()`.
Termination rules:
- if the local task signals stop iteration a cancel signal is
relayed to the remote task indicating to stop streaming
- if the remote task signals the end of a stream, raise a
``StopAsyncIteration`` to terminate the local ``async for``
"""
def __init__(
self,
cid: str,
rx_chan: trio.abc.ReceiveChannel,
portal: 'Portal',
) -> None:
self._cid = cid
self._rx_chan = rx_chan
self._portal = portal
# delegate directly to underlying mem channel
def receive_nowait(self):
return self._rx_chan.receive_nowait()
async def receive(self):
try:
msg = await self._rx_chan.receive()
return msg['yield']
except trio.ClosedResourceError:
# when the send is closed we assume the stream has
# terminated and signal this local iterator to stop
await self.aclose()
raise StopAsyncIteration
except trio.Cancelled:
# relay cancels to the remote task
await self.aclose()
raise
except KeyError:
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
raise unpack_error(msg, self._portal.channel)
async def aclose(self):
"""Cancel associate remote actor task on close
as well as the local memory channel.
"""
if self._rx_chan._closed:
log.warning(f"{self} is already closed")
return
cid = self._cid
with trio.move_on_after(0.5) as cs:
cs.shield = True
log.warning(
f"Cancelling stream {cid} to "
f"{self._portal.channel.uid}")
# TODO: yeah.. it'd be nice if this was just an
# async func on the far end. Gotta figure out a
# better way then implicitly feeding the ctx
# to declaring functions; likely a decorator
# system.
rchan = await self._portal.run(
'self', 'cancel_task', cid=cid)
async for _ in rchan:
pass
if cs.cancelled_caught:
# XXX: there's no way to know if the remote task was indeed
# cancelled in the case where the connection is broken or
# some other network error occurred.
if not self._portal.channel.connected():
log.warning(
"May have failed to cancel remote task "
f"{cid} for {self._portal.channel.uid}")
with trio.CancelScope(shield=True):
await self._rx_chan.aclose()
def clone(self):
return self
class Portal:
"""A 'portal' to a(n) (remote) ``Actor``.
@ -67,17 +152,14 @@ class Portal:
self._expect_result: Optional[
Tuple[str, Any, str, Dict[str, Any]]
] = None
self._agens: Set[typing.AsyncGenerator] = set()
async def aclose(self) -> None:
log.debug(f"Closing {self}")
# XXX: won't work until https://github.com/python-trio/trio/pull/460
# gets in!
await self.channel.aclose()
self._streams: Set[StreamReceiveChannel] = set()
async def _submit(
self, ns: str, func: str, **kwargs
) -> Tuple[str, trio.Queue, str, Dict[str, Any]]:
self,
ns: str,
func: str,
kwargs,
) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]:
"""Submit a function to be scheduled and run by actor, return the
associated caller id, response queue, response type str,
first message packet as a tuple.
@ -85,11 +167,13 @@ class Portal:
This is an async call.
"""
# ship a function call request to the remote actor
cid, q = await current_actor().send_cmd(self.channel, ns, func, kwargs)
cid, recv_chan = await current_actor().send_cmd(
self.channel, ns, func, kwargs)
# wait on first response msg and handle (this should be
# in an immediate response)
first_msg = await q.get()
first_msg = await recv_chan.receive()
functype = first_msg.get('functype')
if functype == 'function' or functype == 'asyncfunction':
@ -101,12 +185,12 @@ class Portal:
else:
raise ValueError(f"{first_msg} is an invalid response packet?")
return cid, q, resp_type, first_msg
return cid, recv_chan, resp_type, first_msg
async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None:
assert self._expect_result is None, \
"A pending main result has already been submitted"
self._expect_result = await self._submit(ns, func, **kwargs)
self._expect_result = await self._submit(ns, func, kwargs)
async def run(self, ns: str, func: str, **kwargs) -> Any:
"""Submit a remote function to be scheduled and run by actor,
@ -116,62 +200,26 @@ class Portal:
remote rpc task or a local async generator instance.
"""
return await self._return_from_resptype(
*(await self._submit(ns, func, **kwargs))
*(await self._submit(ns, func, kwargs))
)
async def _return_from_resptype(
self, cid: str, q: trio.Queue, resptype: str, first_msg: dict
self,
cid: str,
recv_chan: trio.abc.ReceiveChannel,
resptype: str,
first_msg: dict
) -> Any:
# TODO: not this needs some serious work and thinking about how
# to make async-generators the fundamental IPC API over channels!
# (think `yield from`, `gen.send()`, and functional reactive stuff)
if resptype == 'yield': # stream response
async def yield_from_q():
try:
async for msg in q:
try:
yield msg['yield']
except KeyError:
if 'stop' in msg:
break # far end async gen terminated
else:
# internal error should never get here
assert msg.get('cid'), (
"Received internal error at portal?")
raise unpack_error(msg, self.channel)
except (GeneratorExit, trio.Cancelled):
log.warning(
f"Cancelling async gen call {cid} to "
f"{self.channel.uid}")
with trio.move_on_after(0.5) as cs:
cs.shield = True
# TODO: yeah.. it'd be nice if this was just an
# async func on the far end. Gotta figure out a
# better way then implicitly feeding the ctx
# to declaring functions; likely a decorator
# sytsem.
agen = await self.run('self', 'cancel_task', cid=cid)
async with aclosing(agen) as agen:
async for _ in agen:
pass
if cs.cancelled_caught:
if not self.channel.connected():
log.warning(
"May have failed to cancel remote task "
f"{cid} for {self.channel.uid}")
raise
# TODO: use AsyncExitStack to aclose() all agens
# on teardown
agen = yield_from_q()
self._agens.add(agen)
return agen
rchan = StreamReceiveChannel(cid, recv_chan, self)
self._streams.add(rchan)
return rchan
elif resptype == 'return': # single response
msg = await q.get()
msg = await recv_chan.receive()
try:
return msg['return']
except KeyError:
@ -217,13 +265,17 @@ class Portal:
async def _cancel_streams(self):
# terminate all locally running async generator
# IPC calls
if self._agens:
if self._streams:
log.warning(
f"Cancelling all streams with {self.channel.uid}")
for agen in self._agens:
await agen.aclose()
for stream in self._streams.copy():
await stream.aclose()
async def close(self) -> None:
async def aclose(self) -> None:
log.debug(f"Closing {self}")
# TODO: once we move to implementing our own `ReceiveChannel`
# (including remote task cancellation inside its `.aclose()`)
# we'll need to .aclose all those channels here
await self._cancel_streams()
async def cancel_actor(self) -> bool:
@ -308,7 +360,7 @@ async def open_portal(
try:
yield portal
finally:
await portal.close()
await portal.aclose()
if was_connected:
# cancel remote channel-msg loop

View File

@ -1,6 +1,7 @@
"""
Per process state
"""
import multiprocessing as mp
from typing import Optional
@ -13,3 +14,9 @@ def current_actor() -> 'Actor': # type: ignore
if not _current_actor:
raise RuntimeError("No actor instance has been defined yet?")
return _current_actor
def is_main_process():
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'

View File

@ -185,7 +185,7 @@ class ActorNursery:
Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.open_cancel_scope() as cs:
with trio.CancelScope() as cs:
task_status.started(cs)
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
@ -316,7 +316,7 @@ class ActorNursery:
# a cancel signal shows up slightly after in which case
# the `else:` block here might not complete?
# For now, shield both.
with trio.open_cancel_scope(shield=True):
with trio.CancelScope(shield=True):
if etype in (trio.Cancelled, KeyboardInterrupt):
log.warning(
f"Nursery for {current_actor().uid} was "
@ -353,11 +353,5 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
raise RuntimeError("No actor instance has been defined yet?")
# TODO: figure out supervisors from erlang
async with ActorNursery(current_actor()) as nursery:
async with ActorNursery(actor) as nursery:
yield nursery
def is_main_process():
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'

View File

@ -43,6 +43,9 @@ async def fan_out_to_ctxs(
for ctx in topics2ctxs.get(topic, set()):
ctx_payloads.setdefault(ctx, {}).update(packet),
if not ctx_payloads:
log.debug(f"Unconsumed values:\n{published}")
# deliver to each subscriber (fan out)
if ctx_payloads:
for ctx, payload in ctx_payloads.items():
@ -50,7 +53,7 @@ async def fan_out_to_ctxs(
await ctx.send_yield(payload)
except (
# That's right, anything you can think of...
trio.ClosedStreamError, ConnectionResetError,
trio.ClosedResourceError, ConnectionResetError,
ConnectionRefusedError,
):
log.warning(f"{ctx.chan} went down?")