From b42e118e89f363257a257e9d9e9d95a27b9750c6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Feb 2019 13:08:37 -0500 Subject: [PATCH 01/12] Go 3.7 since dataclasses --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 885d992..70725a7 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ setup( install_requires=[ 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt'], tests_require=['pytest'], - python_requires=">=3.6", + python_requires=">=3.7", keywords=[ "async", "concurrency", "actor model", "distributed", 'trio', 'multiprocessing' From f44ac4528ab291757518d0151f6ba9b24a2cfe22 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Feb 2019 16:23:58 -0500 Subject: [PATCH 02/12] Use mem chan in actor core --- tractor/_actor.py | 56 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 39 insertions(+), 17 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index ceee16b..2a8ec66 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -199,7 +199,9 @@ class Actor: Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event] ] = {} # map {uids -> {callids -> waiter queues}} - self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {} + self._cids2qs: Dict[ + Tuple[Tuple[str, str], str], + trio.abc.SendChannel[Any]] = {} self._listeners: List[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None @@ -299,37 +301,58 @@ class Actor: log.exception( f"Channel for {chan.uid} was already zonked..") - async def _push_result(self, actorid, cid: str, msg: dict) -> None: + async def _push_result( + self, + actorid: Tuple[str, str], + msg: Dict[str, Any], + ) -> None: """Push an RPC result to the local consumer's queue. """ assert actorid, f"`actorid` can't be {actorid}" - q = self.get_waitq(actorid, cid) + cid = msg['cid'] + send_chan = self._cids2qs[(actorid, cid)] + assert send_chan.cid == cid log.debug(f"Delivering {msg} from {actorid} to caller {cid}") - # maintain backpressure - await q.put(msg) + try: + # maintain backpressure + await send_chan.send(msg) + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{send_chan} consumer is already closed") - def get_waitq( + def get_memchans( self, actorid: Tuple[str, str], cid: str - ) -> trio.Queue: + ) -> trio.abc.ReceiveChannel: log.debug(f"Getting result queue for {actorid} cid {cid}") - cids2qs = self._actors2calls.setdefault(actorid, {}) - return cids2qs.setdefault(cid, trio.Queue(1000)) + try: + recv_chan = self._cids2qs[(actorid, cid)] + except KeyError: + send_chan, recv_chan = trio.open_memory_channel(1000) + send_chan.cid = cid + self._cids2qs[(actorid, cid)] = send_chan + + return recv_chan async def send_cmd( - self, chan: Channel, ns: str, func: str, kwargs: dict - ) -> Tuple[str, trio.Queue]: + self, + chan: Channel, + ns: str, + func: str, + kwargs: dict + ) -> Tuple[str, trio.abc.ReceiveChannel]: """Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. """ cid = str(uuid.uuid1()) assert chan.uid - q = self.get_waitq(chan.uid, cid) + recv_chan = self.get_memchans(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return cid, q + return cid, recv_chan async def _process_messages( self, chan: Channel, @@ -364,11 +387,10 @@ class Actor: f" {chan} from {chan.uid}") break - log.debug(f"Received msg {msg} from {chan.uid}") - cid = msg.get('cid') - if cid: + log.trace(f"Received msg {msg} from {chan.uid}") + if msg.get('cid'): # deliver response to local caller/waiter - await self._push_result(chan.uid, cid, msg) + await self._push_result(chan.uid, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue From 61680b3729f8f0929a2ad78074352d1f9f04969a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Feb 2019 16:27:18 -0500 Subject: [PATCH 03/12] Use a receive mem channel inside portals For now stop `.aclose()`-ing all async gens on portal close since it can cause hangs and other weird behaviour if another task operates on the same instance. See https://bugs.python.org/issue32526. --- tractor/_portal.py | 129 +++++++++++++++++++++++++-------------------- 1 file changed, 71 insertions(+), 58 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 2683d88..8a1b562 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -36,7 +36,7 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): async def _do_handshake( actor: 'Actor', # type: ignore chan: Channel -)-> Any: +) -> Any: await chan.send(actor.uid) uid: Tuple[str, str] = await chan.recv() @@ -76,8 +76,11 @@ class Portal: await self.channel.aclose() async def _submit( - self, ns: str, func: str, **kwargs - ) -> Tuple[str, trio.Queue, str, Dict[str, Any]]: + self, + ns: str, + func: str, + kwargs, + ) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -85,11 +88,13 @@ class Portal: This is an async call. """ # ship a function call request to the remote actor - cid, q = await current_actor().send_cmd(self.channel, ns, func, kwargs) + cid, recv_chan = await current_actor().send_cmd( + self.channel, ns, func, kwargs) # wait on first response msg and handle (this should be # in an immediate response) - first_msg = await q.get() + + first_msg = await recv_chan.receive() functype = first_msg.get('functype') if functype == 'function' or functype == 'asyncfunction': @@ -101,12 +106,12 @@ class Portal: else: raise ValueError(f"{first_msg} is an invalid response packet?") - return cid, q, resp_type, first_msg + return cid, recv_chan, resp_type, first_msg async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: assert self._expect_result is None, \ "A pending main result has already been submitted" - self._expect_result = await self._submit(ns, func, **kwargs) + self._expect_result = await self._submit(ns, func, kwargs) async def run(self, ns: str, func: str, **kwargs) -> Any: """Submit a remote function to be scheduled and run by actor, @@ -116,62 +121,67 @@ class Portal: remote rpc task or a local async generator instance. """ return await self._return_from_resptype( - *(await self._submit(ns, func, **kwargs)) + *(await self._submit(ns, func, kwargs)) ) async def _return_from_resptype( - self, cid: str, q: trio.Queue, resptype: str, first_msg: dict + self, + cid: str, + recv_chan: trio.abc.ReceiveChannel, + resptype: str, + first_msg: dict ) -> Any: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - if resptype == 'yield': # stream response - async def yield_from_q(): - try: - async for msg in q: - try: - yield msg['yield'] - except KeyError: - if 'stop' in msg: - break # far end async gen terminated - else: - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?") - raise unpack_error(msg, self.channel) + async def yield_from_recvchan(): + async with recv_chan: + try: + async for msg in recv_chan: + try: + yield msg['yield'] + except KeyError: + if 'stop' in msg: + break # far end async gen terminated + else: + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + raise unpack_error(msg, self.channel) - except (GeneratorExit, trio.Cancelled): - log.warning( - f"Cancelling async gen call {cid} to " - f"{self.channel.uid}") - with trio.move_on_after(0.5) as cs: - cs.shield = True - # TODO: yeah.. it'd be nice if this was just an - # async func on the far end. Gotta figure out a - # better way then implicitly feeding the ctx - # to declaring functions; likely a decorator - # sytsem. - agen = await self.run('self', 'cancel_task', cid=cid) - async with aclosing(agen) as agen: - async for _ in agen: - pass - if cs.cancelled_caught: - if not self.channel.connected(): - log.warning( - "May have failed to cancel remote task " - f"{cid} for {self.channel.uid}") - raise + except (GeneratorExit, trio.Cancelled) as err: + log.warning( + f"Cancelling async gen call {cid} to " + f"{self.channel.uid}") + with trio.move_on_after(0.5) as cs: + cs.shield = True + # TODO: yeah.. it'd be nice if this was just an + # async func on the far end. Gotta figure out a + # better way then implicitly feeding the ctx + # to declaring functions; likely a decorator + # system. + agen = await self.run( + 'self', 'cancel_task', cid=cid) + async with aclosing(agen) as agen: + async for _ in agen: + pass + + if cs.cancelled_caught: + if not self.channel.connected(): + log.warning( + "May have failed to cancel remote task " + f"{cid} for {self.channel.uid}") # TODO: use AsyncExitStack to aclose() all agens # on teardown - agen = yield_from_q() + agen = yield_from_recvchan() self._agens.add(agen) return agen elif resptype == 'return': # single response - msg = await q.get() + msg = await recv_chan.receive() try: return msg['return'] except KeyError: @@ -214,17 +224,20 @@ class Portal: return self._result - async def _cancel_streams(self): - # terminate all locally running async generator - # IPC calls - if self._agens: - log.warning( - f"Cancelling all streams with {self.channel.uid}") - for agen in self._agens: - await agen.aclose() + # async def _cancel_streams(self): + # # terminate all locally running async generator + # # IPC calls + # if self._agens: + # log.warning( + # f"Cancelling all streams with {self.channel.uid}") + # for agen in self._agens: + # await agen.aclose() - async def close(self) -> None: - await self._cancel_streams() + async def aclose(self) -> None: + # TODO: once we move to implementing our own `ReceiveChannel` + # (including remote task cancellation inside its `.aclose()`) + # we'll need to .aclose all those channels here + pass async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. @@ -233,7 +246,7 @@ class Portal: log.warning("This portal is already closed can't cancel") return False - await self._cancel_streams() + # await self._cancel_streams() log.warning( f"Sending actor cancel request to {self.channel.uid} on " @@ -308,7 +321,7 @@ async def open_portal( try: yield portal finally: - await portal.close() + await portal.aclose() if was_connected: # cancel remote channel-msg loop From 41c202db68a94d244d6677d9539f75201b315074 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Feb 2019 17:04:41 -0500 Subject: [PATCH 04/12] Add a multi-task subscriber test In combination with `.aclose()`-ing the async gen instance returned from `Portal.run()` this demonstrates the python bug: https://bugs.python.org/issue32526 I've commented out the line that triggers the bug for now since this case provides motivation for adding our own `trio.abc.ReceiveMemoryChannel` implementation to be used instead of async gens directly (returned from `Portal.run()`) since the latter is **not** task safe. --- tests/test_pubsub.py | 101 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 88 insertions(+), 13 deletions(-) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index f0c4beb..745731f 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -4,7 +4,6 @@ from itertools import cycle import pytest import trio import tractor -from async_generator import aclosing from tractor.testing import tractor_test @@ -30,10 +29,10 @@ def is_even(i): @tractor.msg.pub -async def pubber(get_topics): +async def pubber(get_topics, seed=10): ss = tractor.current_actor().statespace - for i in cycle(range(10)): + for i in cycle(range(seed)): # ensure topic subscriptions are as expected ss['get_topics'] = get_topics @@ -42,7 +41,11 @@ async def pubber(get_topics): await trio.sleep(0.1) -async def subs(which, pub_actor_name): +async def subs( + which, pub_actor_name, seed=10, + portal=None, + task_status=trio.TASK_STATUS_IGNORED, +): if len(which) == 1: if which[0] == 'even': pred = is_even @@ -54,11 +57,43 @@ async def subs(which, pub_actor_name): return isinstance(i, int) async with tractor.find_actor(pub_actor_name) as portal: - agen = await portal.run(__name__, 'pubber', topics=which) - async with aclosing(agen) as agen: + agen = await portal.run( + __name__, 'pubber', + topics=which, + seed=seed, + ) + task_status.started(agen) + times = 10 + count = 0 + await agen.__anext__() + async for pkt in agen: + for topic, value in pkt.items(): + assert pred(value) + count += 1 + if count >= times: + break + + await agen.aclose() + + agen = await portal.run( + __name__, 'pubber', + topics=['odd'], + seed=seed, + ) + + await agen.__anext__() + count = 0 + # async with aclosing(agen) as agen: + try: async for pkt in agen: for topic, value in pkt.items(): - assert pred(value) + pass + # assert pred(value) + count += 1 + if count >= times: + break + finally: + await agen.aclose() @tractor.msg.pub(tasks=['one', 'two']) @@ -101,7 +136,7 @@ async def test_required_args(callwith_expecterror): 'pub_actor', ['streamer', 'arbiter'] ) -def test_pubsub_multi_actor_subs( +def test_multi_actor_subs_arbiter_pub( loglevel, arb_addr, pub_actor, @@ -115,7 +150,7 @@ def test_pubsub_multi_actor_subs( name = 'arbiter' - if pub_actor is 'streamer': + if pub_actor == 'streamer': # start the publisher as a daemon master_portal = await n.start_actor( 'streamer', @@ -131,7 +166,7 @@ def test_pubsub_multi_actor_subs( # block until 2nd actor is initialized pass - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': # wait for publisher task to be spawned in a local RPC task while not ss.get('get_topics'): await trio.sleep(0.1) @@ -144,7 +179,7 @@ def test_pubsub_multi_actor_subs( # block until 2nd actor is initialized pass - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': start = time.time() while 'odd' not in get_topics(): await trio.sleep(0.1) @@ -166,13 +201,13 @@ def test_pubsub_multi_actor_subs( await even_portal.cancel_actor() await trio.sleep(0.5) - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': assert 'even' not in get_topics() await odd_portal.cancel_actor() await trio.sleep(1) - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': while get_topics(): await trio.sleep(0.1) if time.time() - start > 1: @@ -185,3 +220,43 @@ def test_pubsub_multi_actor_subs( arbiter_addr=arb_addr, rpc_module_paths=[__name__], ) + + +def test_single_subactor_pub_multitask_subs( + loglevel, + arb_addr, +): + async def main(): + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'streamer', + rpc_module_paths=[__name__], + ) + async with tractor.wait_for_actor('streamer'): + # block until 2nd actor is initialized + pass + + async with trio.open_nursery() as tn: + agen = await tn.start(subs, ['even'], 'streamer') + + await trio.sleep(0.1) + tn.start_soon(subs, ['even'], 'streamer') + + # XXX this will trigger the python bug: + # https://bugs.python.org/issue32526 + # await agen.aclose() + + await trio.sleep(0.1) + tn.start_soon(subs, ['even'], 'streamer') + await trio.sleep(0.1) + tn.start_soon(subs, ['even'], 'streamer') + + await portal.cancel_actor() + + tractor.run( + main, + arbiter_addr=arb_addr, + rpc_module_paths=[__name__], + ) From 51f082fff77e4c98f2be7e67b2eb962d9c5160f0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Feb 2019 17:10:57 -0500 Subject: [PATCH 05/12] Use mem chan in streaming tests --- tests/test_streaming.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index e25a000..55b4bee 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -91,25 +91,25 @@ async def aggregate(seed): portals.append(portal) - q = trio.Queue(500) + send_chan, recv_chan = trio.open_memory_channel(500) - async def push_to_q(portal): + async def push_to_chan(portal): async for value in await portal.run( __name__, 'stream_data', seed=seed ): # leverage trio's built-in backpressure - await q.put(value) + await send_chan.send(value) - await q.put(None) + await send_chan.send(None) print(f"FINISHED ITERATING {portal.channel.uid}") # spawn 2 trio tasks to collect streams and push to a local queue async with trio.open_nursery() as n: for portal in portals: - n.start_soon(push_to_q, portal) + n.start_soon(push_to_chan, portal) unique_vals = set() - async for value in q: + async for value in recv_chan: if value not in unique_vals: unique_vals.add(value) # yield upwards to the spawning parent actor From b91d13cfeab1b33b6cdac2f8a11f1989e6f3e945 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Feb 2019 17:11:26 -0500 Subject: [PATCH 06/12] Use local actor var --- tractor/_trionics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 44f8dcd..894f3ec 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -353,7 +353,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: raise RuntimeError("No actor instance has been defined yet?") # TODO: figure out supervisors from erlang - async with ActorNursery(current_actor()) as nursery: + async with ActorNursery(actor) as nursery: yield nursery From 616192d85394f930b528d9fbb38928be9a1857b2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Feb 2019 21:59:42 -0500 Subject: [PATCH 07/12] Don't use async gen functions for the stream API As mentioned in prior commits there's currently a bug in Python that make async gens **not** task safe. Since this is the core cause of almost all recent problems, instead implement our own async iterator derivative of `trio.abc.ReceiveChannel` by wrapping a `trio._channel.MemoryReceiveChannel`. This fits more natively with the memory channel API in ``trio`` and adds potentially more flexibility for possible bidirectional inter-actor streaming in the future. Huge thanks to @oremanj and of course @njsmith for guidance on this one! --- tractor/_actor.py | 16 +++-- tractor/_portal.py | 159 +++++++++++++++++++++++++++------------------ 2 files changed, 109 insertions(+), 66 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 2a8ec66..9842f12 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -312,8 +312,11 @@ class Actor: cid = msg['cid'] send_chan = self._cids2qs[(actorid, cid)] assert send_chan.cid == cid - log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + if 'stop' in msg: + log.debug(f"{send_chan} was terminated at remote end") + return await send_chan.aclose() try: + log.debug(f"Delivering {msg} from {actorid} to caller {cid}") # maintain backpressure await send_chan.send(msg) except trio.BrokenResourceError: @@ -665,9 +668,14 @@ class Actor: # streaming IPC but it should be called # to cancel any remotely spawned task chan = ctx.chan - # the ``dict.get()`` ensures the requested task to be cancelled - # was indeed spawned by a request from this channel - scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)] + try: + # this ctx based lookup ensures the requested task to + # be cancelled was indeed spawned by a request from this channel + scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)] + except KeyError: + log.warning(f"{cid} has already completed/terminated?") + return + log.debug( f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") diff --git a/tractor/_portal.py b/tractor/_portal.py index 8a1b562..9b2265e 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -9,7 +9,7 @@ from functools import partial from dataclasses import dataclass import trio -from async_generator import asynccontextmanager, aclosing +from async_generator import asynccontextmanager from ._state import current_actor from ._ipc import Channel @@ -48,6 +48,87 @@ async def _do_handshake( return uid +class StreamReceiveChannel(trio.abc.ReceiveChannel): + """A wrapper around a ``trio.abc.ReceiveChannel`` with + special behaviour for stream termination on both ends of + an inter-actor channel. + + Termination rules: + - if the local task signals stop iteration a cancel signal is + relayed to the remote task indicating to stop streaming + - if the remote task signals the end of a stream, raise a + ``StopAsyncIteration`` to terminate the local ``async for`` + + """ + def __init__( + self, + cid: str, + rx_chan: trio.abc.ReceiveChannel, + portal: 'Portal', + ): + self._cid = cid + self._rx_chan = rx_chan + self._portal = portal + + # delegate directly to underlying mem channel + def receive_nowait(self): + return self._rx_chan.receive_nowait() + + async def receive(self): + try: + msg = await self._rx_chan.receive() + return msg['yield'] + except trio.ClosedResourceError: + # when the send is closed we assume the stream has + # terminated and signal this local iterator to stop + await self.aclose() + raise StopAsyncIteration + except trio.Cancelled: + await self.aclose() + raise + except KeyError: + # if 'stop' in msg: + # break # far end async gen terminated + # else: + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + raise unpack_error(msg, self._portal.channel) + + async def aclose(self): + if self._rx_chan._closed: + log.warning(f"{self} is already closed") + return + cid = self._cid + # XXX: cancel remote task on close + log.warning( + f"Cancelling stream {cid} to " + f"{self._portal.channel.uid}") + with trio.move_on_after(0.5) as cs: + cs.shield = True + # TODO: yeah.. it'd be nice if this was just an + # async func on the far end. Gotta figure out a + # better way then implicitly feeding the ctx + # to declaring functions; likely a decorator + # system. + rchan = await self._portal.run( + 'self', 'cancel_task', cid=cid) + async for _ in rchan: + pass + + if cs.cancelled_caught: + if not self._portal.channel.connected(): + log.warning( + "May have failed to cancel remote task " + f"{cid} for {self._portal.channel.uid}") + + with trio.open_cancel_scope(shield=True): + await self._rx_chan.aclose() + + def clone(self): + return self + + class Portal: """A 'portal' to a(n) (remote) ``Actor``. @@ -67,13 +148,7 @@ class Portal: self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None - self._agens: Set[typing.AsyncGenerator] = set() - - async def aclose(self) -> None: - log.debug(f"Closing {self}") - # XXX: won't work until https://github.com/python-trio/trio/pull/460 - # gets in! - await self.channel.aclose() + self._streams: Set[StreamReceiveChannel] = set() async def _submit( self, @@ -135,50 +210,9 @@ class Portal: # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) if resptype == 'yield': # stream response - - async def yield_from_recvchan(): - async with recv_chan: - try: - async for msg in recv_chan: - try: - yield msg['yield'] - except KeyError: - if 'stop' in msg: - break # far end async gen terminated - else: - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?") - raise unpack_error(msg, self.channel) - - except (GeneratorExit, trio.Cancelled) as err: - log.warning( - f"Cancelling async gen call {cid} to " - f"{self.channel.uid}") - with trio.move_on_after(0.5) as cs: - cs.shield = True - # TODO: yeah.. it'd be nice if this was just an - # async func on the far end. Gotta figure out a - # better way then implicitly feeding the ctx - # to declaring functions; likely a decorator - # system. - agen = await self.run( - 'self', 'cancel_task', cid=cid) - async with aclosing(agen) as agen: - async for _ in agen: - pass - - if cs.cancelled_caught: - if not self.channel.connected(): - log.warning( - "May have failed to cancel remote task " - f"{cid} for {self.channel.uid}") - - # TODO: use AsyncExitStack to aclose() all agens - # on teardown - agen = yield_from_recvchan() - self._agens.add(agen) - return agen + rchan = StreamReceiveChannel(cid, recv_chan, self) + self._streams.add(rchan) + return rchan elif resptype == 'return': # single response msg = await recv_chan.receive() @@ -224,20 +258,21 @@ class Portal: return self._result - # async def _cancel_streams(self): - # # terminate all locally running async generator - # # IPC calls - # if self._agens: - # log.warning( - # f"Cancelling all streams with {self.channel.uid}") - # for agen in self._agens: - # await agen.aclose() + async def _cancel_streams(self): + # terminate all locally running async generator + # IPC calls + if self._streams: + log.warning( + f"Cancelling all streams with {self.channel.uid}") + for stream in self._streams.copy(): + await stream.aclose() async def aclose(self) -> None: + log.debug(f"Closing {self}") # TODO: once we move to implementing our own `ReceiveChannel` # (including remote task cancellation inside its `.aclose()`) # we'll need to .aclose all those channels here - pass + await self._cancel_streams() async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. @@ -246,7 +281,7 @@ class Portal: log.warning("This portal is already closed can't cancel") return False - # await self._cancel_streams() + await self._cancel_streams() log.warning( f"Sending actor cancel request to {self.channel.uid} on " From 85a07007167b1a8b77515b72cb61454ed1c700de Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Feb 2019 22:10:55 -0500 Subject: [PATCH 08/12] Add back line that breaks with async gens --- tests/test_pubsub.py | 32 +++++++++++++++++++------------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 745731f..745e5ec 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -57,35 +57,35 @@ async def subs( return isinstance(i, int) async with tractor.find_actor(pub_actor_name) as portal: - agen = await portal.run( + stream = await portal.run( __name__, 'pubber', topics=which, seed=seed, ) - task_status.started(agen) + task_status.started(stream) times = 10 count = 0 - await agen.__anext__() - async for pkt in agen: + await stream.__anext__() + async for pkt in stream: for topic, value in pkt.items(): assert pred(value) count += 1 if count >= times: break - await agen.aclose() + await stream.aclose() - agen = await portal.run( + stream = await portal.run( __name__, 'pubber', topics=['odd'], seed=seed, ) - await agen.__anext__() + await stream.__anext__() count = 0 - # async with aclosing(agen) as agen: + # async with aclosing(stream) as stream: try: - async for pkt in agen: + async for pkt in stream: for topic, value in pkt.items(): pass # assert pred(value) @@ -93,7 +93,7 @@ async def subs( if count >= times: break finally: - await agen.aclose() + await stream.aclose() @tractor.msg.pub(tasks=['one', 'two']) @@ -111,7 +111,7 @@ async def multilock_pubber(get_topics): (multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError), # should work (multilock_pubber, - {'ctx': None, 'topics': ['topic1'], 'task_name': 'one'}, + {'ctx': None, 'topics': ['doggy'], 'task_name': 'one'}, None), ], ) @@ -126,7 +126,12 @@ async def test_required_args(callwith_expecterror): async with tractor.open_nursery() as n: # await func(**kwargs) portal = await n.run_in_actor( - 'sub', multilock_pubber, **kwargs) + 'pubber', multilock_pubber, **kwargs) + + async with tractor.wait_for_actor('pubber'): + pass + + await trio.sleep(0.5) async for val in await portal.result(): assert val == {'doggy': 10} @@ -246,7 +251,8 @@ def test_single_subactor_pub_multitask_subs( # XXX this will trigger the python bug: # https://bugs.python.org/issue32526 - # await agen.aclose() + # if using async generators to wrap tractor channels + await agen.aclose() await trio.sleep(0.1) tn.start_soon(subs, ['even'], 'streamer') From fe1c4dbc4ccc07fb99b03f659ec4963cd5a910ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 16 Feb 2019 14:05:03 -0500 Subject: [PATCH 09/12] mpypy and docs fixups --- tractor/_actor.py | 7 ++++--- tractor/_portal.py | 26 +++++++++++++++----------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 9842f12..02cdbc0 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -303,11 +303,12 @@ class Actor: async def _push_result( self, - actorid: Tuple[str, str], + chan: Channel, msg: Dict[str, Any], ) -> None: """Push an RPC result to the local consumer's queue. """ + actorid = chan.uid assert actorid, f"`actorid` can't be {actorid}" cid = msg['cid'] send_chan = self._cids2qs[(actorid, cid)] @@ -390,10 +391,10 @@ class Actor: f" {chan} from {chan.uid}") break - log.trace(f"Received msg {msg} from {chan.uid}") + log.trace(f"Received msg {msg} from {chan.uid}") # type: ignore if msg.get('cid'): # deliver response to local caller/waiter - await self._push_result(chan.uid, msg) + await self._push_result(chan, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue diff --git a/tractor/_portal.py b/tractor/_portal.py index 9b2265e..11ccdbf 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -49,9 +49,10 @@ async def _do_handshake( class StreamReceiveChannel(trio.abc.ReceiveChannel): - """A wrapper around a ``trio.abc.ReceiveChannel`` with - special behaviour for stream termination on both ends of - an inter-actor channel. + """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with + special behaviour for signalling stream termination on across an + inter-actor ``Channel``. This is the type returned to a local task + which invoked a remote streaming function using `Portal.run()`. Termination rules: - if the local task signals stop iteration a cancel signal is @@ -65,7 +66,7 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): cid: str, rx_chan: trio.abc.ReceiveChannel, portal: 'Portal', - ): + ) -> None: self._cid = cid self._rx_chan = rx_chan self._portal = portal @@ -84,28 +85,28 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): await self.aclose() raise StopAsyncIteration except trio.Cancelled: + # relay cancels to the remote task await self.aclose() raise except KeyError: - # if 'stop' in msg: - # break # far end async gen terminated - # else: # internal error should never get here assert msg.get('cid'), ( "Received internal error at portal?") raise unpack_error(msg, self._portal.channel) async def aclose(self): + """Cancel associate remote actor task on close + as well as the local memory channel. + """ if self._rx_chan._closed: log.warning(f"{self} is already closed") return cid = self._cid - # XXX: cancel remote task on close - log.warning( - f"Cancelling stream {cid} to " - f"{self._portal.channel.uid}") with trio.move_on_after(0.5) as cs: cs.shield = True + log.warning( + f"Cancelling stream {cid} to " + f"{self._portal.channel.uid}") # TODO: yeah.. it'd be nice if this was just an # async func on the far end. Gotta figure out a # better way then implicitly feeding the ctx @@ -117,6 +118,9 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): pass if cs.cancelled_caught: + # XXX: there's no way to know if the remote task was indeed + # cancelled in the case where the connection is broken or + # some other network error occurred. if not self._portal.channel.connected(): log.warning( "May have failed to cancel remote task " From 02e0c0e1a468aaf29c57fa893e683e1a3cbedaeb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 16 Feb 2019 14:05:24 -0500 Subject: [PATCH 10/12] `trio.ClosedResourceError is deprecated --- tractor/msg.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tractor/msg.py b/tractor/msg.py index ebed198..73e39d5 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -43,6 +43,9 @@ async def fan_out_to_ctxs( for ctx in topics2ctxs.get(topic, set()): ctx_payloads.setdefault(ctx, {}).update(packet), + if not ctx_payloads: + log.debug(f"Unconsumed values:\n{published}") + # deliver to each subscriber (fan out) if ctx_payloads: for ctx, payload in ctx_payloads.items(): @@ -50,7 +53,7 @@ async def fan_out_to_ctxs( await ctx.send_yield(payload) except ( # That's right, anything you can think of... - trio.ClosedStreamError, ConnectionResetError, + trio.ClosedResourceError, ConnectionResetError, ConnectionRefusedError, ): log.warning(f"{ctx.chan} went down?") From 78ddd33e3a9b1c333cc166a5a21139b48c4c0a02 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 16 Feb 2019 14:25:06 -0500 Subject: [PATCH 11/12] Move to `trio.CancelScope` --- tests/test_multi_program.py | 4 ++-- tests/test_streaming.py | 2 +- tractor/_actor.py | 10 +++++----- tractor/_portal.py | 4 ++-- tractor/_state.py | 7 +++++++ tractor/_trionics.py | 10 ++-------- 6 files changed, 19 insertions(+), 18 deletions(-) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 53413b7..299a698 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -70,7 +70,7 @@ async def test_cancel_remote_arbiter(daemon, arb_addr): pass -async def test_register_duplicate_name(daemon, arb_addr): +def test_register_duplicate_name(daemon, arb_addr): async def main(): assert not tractor.current_actor().is_arbiter @@ -85,4 +85,4 @@ async def test_register_duplicate_name(daemon, arb_addr): # run it manually since we want to start **after** # the other "daemon" program - tractor.run(main, arb_addr=arbiter_addr) + tractor.run(main, arbiter_addr=arb_addr) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 55b4bee..2cddd41 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -14,7 +14,7 @@ async def stream_seq(sequence): await trio.sleep(0.1) # block indefinitely waiting to be cancelled by ``aclose()`` call - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: await trio.sleep(float('inf')) assert 0 assert cs.cancelled_caught diff --git a/tractor/_actor.py b/tractor/_actor.py index 02cdbc0..7a5e72e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -73,7 +73,7 @@ async def _invoke( not is_async_gen_partial ): await chan.send({'functype': 'function', 'cid': cid}) - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) await chan.send({'return': func(**kwargs), 'cid': cid}) else: @@ -88,7 +88,7 @@ async def _invoke( # have to properly handle the closing (aclosing) # of the async gen in order to be sure the cancel # is propagated! - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) async with aclosing(coro) as agen: async for item in agen: @@ -113,7 +113,7 @@ async def _invoke( # back values like an async-generator would but must # manualy construct the response dict-packet-responses as # above - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) await coro if not cs.cancelled_caught: @@ -122,7 +122,7 @@ async def _invoke( await chan.send({'stop': True, 'cid': cid}) else: await chan.send({'functype': 'asyncfunction', 'cid': cid}) - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) except Exception as err: @@ -377,7 +377,7 @@ class Actor: # loop running despite the current task having been # cancelled (eg. `open_portal()` may call this method from # a locally spawned task) - with trio.open_cancel_scope(shield=shield) as cs: + with trio.CancelScope(shield=shield) as cs: task_status.started(cs) async for msg in chan: if msg is None: # loop terminate sentinel diff --git a/tractor/_portal.py b/tractor/_portal.py index 11ccdbf..160db19 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -50,7 +50,7 @@ async def _do_handshake( class StreamReceiveChannel(trio.abc.ReceiveChannel): """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with - special behaviour for signalling stream termination on across an + special behaviour for signalling stream termination across an inter-actor ``Channel``. This is the type returned to a local task which invoked a remote streaming function using `Portal.run()`. @@ -126,7 +126,7 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): "May have failed to cancel remote task " f"{cid} for {self._portal.channel.uid}") - with trio.open_cancel_scope(shield=True): + with trio.CancelScope(shield=True): await self._rx_chan.aclose() def clone(self): diff --git a/tractor/_state.py b/tractor/_state.py index 704fae7..2606ff3 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -1,6 +1,7 @@ """ Per process state """ +import multiprocessing as mp from typing import Optional @@ -13,3 +14,9 @@ def current_actor() -> 'Actor': # type: ignore if not _current_actor: raise RuntimeError("No actor instance has been defined yet?") return _current_actor + + +def is_main_process(): + """Bool determining if this actor is running in the top-most process. + """ + return mp.current_process().name == 'MainProcess' diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 894f3ec..dd18158 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -185,7 +185,7 @@ class ActorNursery: Should only be called for actors spawned with `run_in_actor()`. """ - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) # if this call errors we store the exception for later # in ``errors`` which will be reraised inside @@ -316,7 +316,7 @@ class ActorNursery: # a cancel signal shows up slightly after in which case # the `else:` block here might not complete? # For now, shield both. - with trio.open_cancel_scope(shield=True): + with trio.CancelScope(shield=True): if etype in (trio.Cancelled, KeyboardInterrupt): log.warning( f"Nursery for {current_actor().uid} was " @@ -355,9 +355,3 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # TODO: figure out supervisors from erlang async with ActorNursery(actor) as nursery: yield nursery - - -def is_main_process(): - """Bool determining if this actor is running in the top-most process. - """ - return mp.current_process().name == 'MainProcess' From fd4e126e1f0bb2702ac481cf6c11a1a7c0694c09 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 17 Feb 2019 10:04:27 -0500 Subject: [PATCH 12/12] Adjust streaming ex to use memory channel --- README.rst | 142 ++++++++++++++++++++++++++--------------------------- 1 file changed, 71 insertions(+), 71 deletions(-) diff --git a/README.rst b/README.rst index ad0f1e6..56fbc73 100644 --- a/README.rst +++ b/README.rst @@ -343,99 +343,99 @@ and print the results to your screen: .. code:: python - import time - import trio - import tractor + import time + import trio + import tractor - # this is the first 2 actors, streamer_1 and streamer_2 - async def stream_data(seed): - for i in range(seed): - yield i - await trio.sleep(0) # trigger scheduler + # this is the first 2 actors, streamer_1 and streamer_2 + async def stream_data(seed): + for i in range(seed): + yield i + await trio.sleep(0) # trigger scheduler - # this is the third actor; the aggregator - async def aggregate(seed): - """Ensure that the two streams we receive match but only stream - a single set of values to the parent. - """ - async with tractor.open_nursery() as nursery: - portals = [] - for i in range(1, 3): - # fork point - portal = await nursery.start_actor( - name=f'streamer_{i}', - rpc_module_paths=[__name__], - ) + # this is the third actor; the aggregator + async def aggregate(seed): + """Ensure that the two streams we receive match but only stream + a single set of values to the parent. + """ + async with tractor.open_nursery() as nursery: + portals = [] + for i in range(1, 3): + # fork point + portal = await nursery.start_actor( + name=f'streamer_{i}', + rpc_module_paths=[__name__], + ) - portals.append(portal) + portals.append(portal) - q = trio.Queue(500) + send_chan, recv_chan = trio.open_memory_channel(500) - async def push_to_q(portal): - async for value in await portal.run( - __name__, 'stream_data', seed=seed - ): - # leverage trio's built-in backpressure - await q.put(value) + async def push_to_q(portal): + async for value in await portal.run( + __name__, 'stream_data', seed=seed + ): + # leverage trio's built-in backpressure + await send_chan.send(value) - await q.put(None) - print(f"FINISHED ITERATING {portal.channel.uid}") + await send_chan.send(None) + print(f"FINISHED ITERATING {portal.channel.uid}") - # spawn 2 trio tasks to collect streams and push to a local queue - async with trio.open_nursery() as n: - for portal in portals: - n.start_soon(push_to_q, portal) + # spawn 2 trio tasks to collect streams and push to a local queue + async with trio.open_nursery() as n: + for portal in portals: + n.start_soon(push_to_q, portal) - unique_vals = set() - async for value in q: - if value not in unique_vals: - unique_vals.add(value) - # yield upwards to the spawning parent actor - yield value + unique_vals = set() + async for value in recv_chan: + if value not in unique_vals: + unique_vals.add(value) + # yield upwards to the spawning parent actor + yield value - if value is None: - break + if value is None: + break - assert value in unique_vals + assert value in unique_vals - print("FINISHED ITERATING in aggregator") + print("FINISHED ITERATING in aggregator") - await nursery.cancel() - print("WAITING on `ActorNursery` to finish") - print("AGGREGATOR COMPLETE!") + await nursery.cancel() + print("WAITING on `ActorNursery` to finish") + print("AGGREGATOR COMPLETE!") - # this is the main actor and *arbiter* - async def main(): - # a nursery which spawns "actors" - async with tractor.open_nursery() as nursery: + # this is the main actor and *arbiter* + async def main(): + # a nursery which spawns "actors" + async with tractor.open_nursery() as nursery: - seed = int(1e3) - import time - pre_start = time.time() + seed = int(1e3) + import time + pre_start = time.time() - portal = await nursery.run_in_actor( - 'aggregator', - aggregate, - seed=seed, - ) + portal = await nursery.run_in_actor( + 'aggregator', + aggregate, + seed=seed, + ) - start = time.time() - # the portal call returns exactly what you'd expect - # as if the remote "aggregate" function was called locally - result_stream = [] - async for value in await portal.result(): - result_stream.append(value) + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "aggregate" function was called locally + result_stream = [] + async for value in await portal.result(): + result_stream.append(value) - print(f"STREAM TIME = {time.time() - start}") - print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") - assert result_stream == list(range(seed)) + [None] - return result_stream + print(f"STREAM TIME = {time.time() - start}") + print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") + assert result_stream == list(range(seed)) + [None] + return result_stream - final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) + final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) Here there's four actors running in separate processes (using all the