diff --git a/README.rst b/README.rst index ad0f1e6..56fbc73 100644 --- a/README.rst +++ b/README.rst @@ -343,99 +343,99 @@ and print the results to your screen: .. code:: python - import time - import trio - import tractor + import time + import trio + import tractor - # this is the first 2 actors, streamer_1 and streamer_2 - async def stream_data(seed): - for i in range(seed): - yield i - await trio.sleep(0) # trigger scheduler + # this is the first 2 actors, streamer_1 and streamer_2 + async def stream_data(seed): + for i in range(seed): + yield i + await trio.sleep(0) # trigger scheduler - # this is the third actor; the aggregator - async def aggregate(seed): - """Ensure that the two streams we receive match but only stream - a single set of values to the parent. - """ - async with tractor.open_nursery() as nursery: - portals = [] - for i in range(1, 3): - # fork point - portal = await nursery.start_actor( - name=f'streamer_{i}', - rpc_module_paths=[__name__], - ) + # this is the third actor; the aggregator + async def aggregate(seed): + """Ensure that the two streams we receive match but only stream + a single set of values to the parent. + """ + async with tractor.open_nursery() as nursery: + portals = [] + for i in range(1, 3): + # fork point + portal = await nursery.start_actor( + name=f'streamer_{i}', + rpc_module_paths=[__name__], + ) - portals.append(portal) + portals.append(portal) - q = trio.Queue(500) + send_chan, recv_chan = trio.open_memory_channel(500) - async def push_to_q(portal): - async for value in await portal.run( - __name__, 'stream_data', seed=seed - ): - # leverage trio's built-in backpressure - await q.put(value) + async def push_to_q(portal): + async for value in await portal.run( + __name__, 'stream_data', seed=seed + ): + # leverage trio's built-in backpressure + await send_chan.send(value) - await q.put(None) - print(f"FINISHED ITERATING {portal.channel.uid}") + await send_chan.send(None) + print(f"FINISHED ITERATING {portal.channel.uid}") - # spawn 2 trio tasks to collect streams and push to a local queue - async with trio.open_nursery() as n: - for portal in portals: - n.start_soon(push_to_q, portal) + # spawn 2 trio tasks to collect streams and push to a local queue + async with trio.open_nursery() as n: + for portal in portals: + n.start_soon(push_to_q, portal) - unique_vals = set() - async for value in q: - if value not in unique_vals: - unique_vals.add(value) - # yield upwards to the spawning parent actor - yield value + unique_vals = set() + async for value in recv_chan: + if value not in unique_vals: + unique_vals.add(value) + # yield upwards to the spawning parent actor + yield value - if value is None: - break + if value is None: + break - assert value in unique_vals + assert value in unique_vals - print("FINISHED ITERATING in aggregator") + print("FINISHED ITERATING in aggregator") - await nursery.cancel() - print("WAITING on `ActorNursery` to finish") - print("AGGREGATOR COMPLETE!") + await nursery.cancel() + print("WAITING on `ActorNursery` to finish") + print("AGGREGATOR COMPLETE!") - # this is the main actor and *arbiter* - async def main(): - # a nursery which spawns "actors" - async with tractor.open_nursery() as nursery: + # this is the main actor and *arbiter* + async def main(): + # a nursery which spawns "actors" + async with tractor.open_nursery() as nursery: - seed = int(1e3) - import time - pre_start = time.time() + seed = int(1e3) + import time + pre_start = time.time() - portal = await nursery.run_in_actor( - 'aggregator', - aggregate, - seed=seed, - ) + portal = await nursery.run_in_actor( + 'aggregator', + aggregate, + seed=seed, + ) - start = time.time() - # the portal call returns exactly what you'd expect - # as if the remote "aggregate" function was called locally - result_stream = [] - async for value in await portal.result(): - result_stream.append(value) + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "aggregate" function was called locally + result_stream = [] + async for value in await portal.result(): + result_stream.append(value) - print(f"STREAM TIME = {time.time() - start}") - print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") - assert result_stream == list(range(seed)) + [None] - return result_stream + print(f"STREAM TIME = {time.time() - start}") + print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") + assert result_stream == list(range(seed)) + [None] + return result_stream - final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) + final_stream = tractor.run(main, arbiter_addr=('127.0.0.1', 1616)) Here there's four actors running in separate processes (using all the diff --git a/setup.py b/setup.py index 885d992..70725a7 100755 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ setup( install_requires=[ 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt'], tests_require=['pytest'], - python_requires=">=3.6", + python_requires=">=3.7", keywords=[ "async", "concurrency", "actor model", "distributed", 'trio', 'multiprocessing' diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 53413b7..299a698 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -70,7 +70,7 @@ async def test_cancel_remote_arbiter(daemon, arb_addr): pass -async def test_register_duplicate_name(daemon, arb_addr): +def test_register_duplicate_name(daemon, arb_addr): async def main(): assert not tractor.current_actor().is_arbiter @@ -85,4 +85,4 @@ async def test_register_duplicate_name(daemon, arb_addr): # run it manually since we want to start **after** # the other "daemon" program - tractor.run(main, arb_addr=arbiter_addr) + tractor.run(main, arbiter_addr=arb_addr) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index f0c4beb..745e5ec 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -4,7 +4,6 @@ from itertools import cycle import pytest import trio import tractor -from async_generator import aclosing from tractor.testing import tractor_test @@ -30,10 +29,10 @@ def is_even(i): @tractor.msg.pub -async def pubber(get_topics): +async def pubber(get_topics, seed=10): ss = tractor.current_actor().statespace - for i in cycle(range(10)): + for i in cycle(range(seed)): # ensure topic subscriptions are as expected ss['get_topics'] = get_topics @@ -42,7 +41,11 @@ async def pubber(get_topics): await trio.sleep(0.1) -async def subs(which, pub_actor_name): +async def subs( + which, pub_actor_name, seed=10, + portal=None, + task_status=trio.TASK_STATUS_IGNORED, +): if len(which) == 1: if which[0] == 'even': pred = is_even @@ -54,11 +57,43 @@ async def subs(which, pub_actor_name): return isinstance(i, int) async with tractor.find_actor(pub_actor_name) as portal: - agen = await portal.run(__name__, 'pubber', topics=which) - async with aclosing(agen) as agen: - async for pkt in agen: + stream = await portal.run( + __name__, 'pubber', + topics=which, + seed=seed, + ) + task_status.started(stream) + times = 10 + count = 0 + await stream.__anext__() + async for pkt in stream: + for topic, value in pkt.items(): + assert pred(value) + count += 1 + if count >= times: + break + + await stream.aclose() + + stream = await portal.run( + __name__, 'pubber', + topics=['odd'], + seed=seed, + ) + + await stream.__anext__() + count = 0 + # async with aclosing(stream) as stream: + try: + async for pkt in stream: for topic, value in pkt.items(): - assert pred(value) + pass + # assert pred(value) + count += 1 + if count >= times: + break + finally: + await stream.aclose() @tractor.msg.pub(tasks=['one', 'two']) @@ -76,7 +111,7 @@ async def multilock_pubber(get_topics): (multilock_pubber, {'ctx': None, 'topics': ['topic1']}, TypeError), # should work (multilock_pubber, - {'ctx': None, 'topics': ['topic1'], 'task_name': 'one'}, + {'ctx': None, 'topics': ['doggy'], 'task_name': 'one'}, None), ], ) @@ -91,7 +126,12 @@ async def test_required_args(callwith_expecterror): async with tractor.open_nursery() as n: # await func(**kwargs) portal = await n.run_in_actor( - 'sub', multilock_pubber, **kwargs) + 'pubber', multilock_pubber, **kwargs) + + async with tractor.wait_for_actor('pubber'): + pass + + await trio.sleep(0.5) async for val in await portal.result(): assert val == {'doggy': 10} @@ -101,7 +141,7 @@ async def test_required_args(callwith_expecterror): 'pub_actor', ['streamer', 'arbiter'] ) -def test_pubsub_multi_actor_subs( +def test_multi_actor_subs_arbiter_pub( loglevel, arb_addr, pub_actor, @@ -115,7 +155,7 @@ def test_pubsub_multi_actor_subs( name = 'arbiter' - if pub_actor is 'streamer': + if pub_actor == 'streamer': # start the publisher as a daemon master_portal = await n.start_actor( 'streamer', @@ -131,7 +171,7 @@ def test_pubsub_multi_actor_subs( # block until 2nd actor is initialized pass - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': # wait for publisher task to be spawned in a local RPC task while not ss.get('get_topics'): await trio.sleep(0.1) @@ -144,7 +184,7 @@ def test_pubsub_multi_actor_subs( # block until 2nd actor is initialized pass - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': start = time.time() while 'odd' not in get_topics(): await trio.sleep(0.1) @@ -166,13 +206,13 @@ def test_pubsub_multi_actor_subs( await even_portal.cancel_actor() await trio.sleep(0.5) - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': assert 'even' not in get_topics() await odd_portal.cancel_actor() await trio.sleep(1) - if pub_actor is 'arbiter': + if pub_actor == 'arbiter': while get_topics(): await trio.sleep(0.1) if time.time() - start > 1: @@ -185,3 +225,44 @@ def test_pubsub_multi_actor_subs( arbiter_addr=arb_addr, rpc_module_paths=[__name__], ) + + +def test_single_subactor_pub_multitask_subs( + loglevel, + arb_addr, +): + async def main(): + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'streamer', + rpc_module_paths=[__name__], + ) + async with tractor.wait_for_actor('streamer'): + # block until 2nd actor is initialized + pass + + async with trio.open_nursery() as tn: + agen = await tn.start(subs, ['even'], 'streamer') + + await trio.sleep(0.1) + tn.start_soon(subs, ['even'], 'streamer') + + # XXX this will trigger the python bug: + # https://bugs.python.org/issue32526 + # if using async generators to wrap tractor channels + await agen.aclose() + + await trio.sleep(0.1) + tn.start_soon(subs, ['even'], 'streamer') + await trio.sleep(0.1) + tn.start_soon(subs, ['even'], 'streamer') + + await portal.cancel_actor() + + tractor.run( + main, + arbiter_addr=arb_addr, + rpc_module_paths=[__name__], + ) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index e25a000..2cddd41 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -14,7 +14,7 @@ async def stream_seq(sequence): await trio.sleep(0.1) # block indefinitely waiting to be cancelled by ``aclose()`` call - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: await trio.sleep(float('inf')) assert 0 assert cs.cancelled_caught @@ -91,25 +91,25 @@ async def aggregate(seed): portals.append(portal) - q = trio.Queue(500) + send_chan, recv_chan = trio.open_memory_channel(500) - async def push_to_q(portal): + async def push_to_chan(portal): async for value in await portal.run( __name__, 'stream_data', seed=seed ): # leverage trio's built-in backpressure - await q.put(value) + await send_chan.send(value) - await q.put(None) + await send_chan.send(None) print(f"FINISHED ITERATING {portal.channel.uid}") # spawn 2 trio tasks to collect streams and push to a local queue async with trio.open_nursery() as n: for portal in portals: - n.start_soon(push_to_q, portal) + n.start_soon(push_to_chan, portal) unique_vals = set() - async for value in q: + async for value in recv_chan: if value not in unique_vals: unique_vals.add(value) # yield upwards to the spawning parent actor diff --git a/tractor/_actor.py b/tractor/_actor.py index ceee16b..7a5e72e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -73,7 +73,7 @@ async def _invoke( not is_async_gen_partial ): await chan.send({'functype': 'function', 'cid': cid}) - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) await chan.send({'return': func(**kwargs), 'cid': cid}) else: @@ -88,7 +88,7 @@ async def _invoke( # have to properly handle the closing (aclosing) # of the async gen in order to be sure the cancel # is propagated! - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) async with aclosing(coro) as agen: async for item in agen: @@ -113,7 +113,7 @@ async def _invoke( # back values like an async-generator would but must # manualy construct the response dict-packet-responses as # above - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) await coro if not cs.cancelled_caught: @@ -122,7 +122,7 @@ async def _invoke( await chan.send({'stop': True, 'cid': cid}) else: await chan.send({'functype': 'asyncfunction', 'cid': cid}) - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) except Exception as err: @@ -199,7 +199,9 @@ class Actor: Tuple[trio._core._run.CancelScope, typing.Callable, trio.Event] ] = {} # map {uids -> {callids -> waiter queues}} - self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {} + self._cids2qs: Dict[ + Tuple[Tuple[str, str], str], + trio.abc.SendChannel[Any]] = {} self._listeners: List[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[Tuple[Any, Any, Any, Any, Any]] = None @@ -299,37 +301,62 @@ class Actor: log.exception( f"Channel for {chan.uid} was already zonked..") - async def _push_result(self, actorid, cid: str, msg: dict) -> None: + async def _push_result( + self, + chan: Channel, + msg: Dict[str, Any], + ) -> None: """Push an RPC result to the local consumer's queue. """ + actorid = chan.uid assert actorid, f"`actorid` can't be {actorid}" - q = self.get_waitq(actorid, cid) - log.debug(f"Delivering {msg} from {actorid} to caller {cid}") - # maintain backpressure - await q.put(msg) + cid = msg['cid'] + send_chan = self._cids2qs[(actorid, cid)] + assert send_chan.cid == cid + if 'stop' in msg: + log.debug(f"{send_chan} was terminated at remote end") + return await send_chan.aclose() + try: + log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + # maintain backpressure + await send_chan.send(msg) + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{send_chan} consumer is already closed") - def get_waitq( + def get_memchans( self, actorid: Tuple[str, str], cid: str - ) -> trio.Queue: + ) -> trio.abc.ReceiveChannel: log.debug(f"Getting result queue for {actorid} cid {cid}") - cids2qs = self._actors2calls.setdefault(actorid, {}) - return cids2qs.setdefault(cid, trio.Queue(1000)) + try: + recv_chan = self._cids2qs[(actorid, cid)] + except KeyError: + send_chan, recv_chan = trio.open_memory_channel(1000) + send_chan.cid = cid + self._cids2qs[(actorid, cid)] = send_chan + + return recv_chan async def send_cmd( - self, chan: Channel, ns: str, func: str, kwargs: dict - ) -> Tuple[str, trio.Queue]: + self, + chan: Channel, + ns: str, + func: str, + kwargs: dict + ) -> Tuple[str, trio.abc.ReceiveChannel]: """Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. """ cid = str(uuid.uuid1()) assert chan.uid - q = self.get_waitq(chan.uid, cid) + recv_chan = self.get_memchans(chan.uid, cid) log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return cid, q + return cid, recv_chan async def _process_messages( self, chan: Channel, @@ -350,7 +377,7 @@ class Actor: # loop running despite the current task having been # cancelled (eg. `open_portal()` may call this method from # a locally spawned task) - with trio.open_cancel_scope(shield=shield) as cs: + with trio.CancelScope(shield=shield) as cs: task_status.started(cs) async for msg in chan: if msg is None: # loop terminate sentinel @@ -364,11 +391,10 @@ class Actor: f" {chan} from {chan.uid}") break - log.debug(f"Received msg {msg} from {chan.uid}") - cid = msg.get('cid') - if cid: + log.trace(f"Received msg {msg} from {chan.uid}") # type: ignore + if msg.get('cid'): # deliver response to local caller/waiter - await self._push_result(chan.uid, cid, msg) + await self._push_result(chan, msg) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") continue @@ -643,9 +669,14 @@ class Actor: # streaming IPC but it should be called # to cancel any remotely spawned task chan = ctx.chan - # the ``dict.get()`` ensures the requested task to be cancelled - # was indeed spawned by a request from this channel - scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)] + try: + # this ctx based lookup ensures the requested task to + # be cancelled was indeed spawned by a request from this channel + scope, func, is_complete = self._rpc_tasks[(ctx.chan, cid)] + except KeyError: + log.warning(f"{cid} has already completed/terminated?") + return + log.debug( f"Cancelling task:\ncid: {cid}\nfunc: {func}\n" f"peer: {chan.uid}\n") diff --git a/tractor/_portal.py b/tractor/_portal.py index 2683d88..160db19 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -9,7 +9,7 @@ from functools import partial from dataclasses import dataclass import trio -from async_generator import asynccontextmanager, aclosing +from async_generator import asynccontextmanager from ._state import current_actor from ._ipc import Channel @@ -36,7 +36,7 @@ async def maybe_open_nursery(nursery: trio._core._run.Nursery = None): async def _do_handshake( actor: 'Actor', # type: ignore chan: Channel -)-> Any: +) -> Any: await chan.send(actor.uid) uid: Tuple[str, str] = await chan.recv() @@ -48,6 +48,91 @@ async def _do_handshake( return uid +class StreamReceiveChannel(trio.abc.ReceiveChannel): + """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with + special behaviour for signalling stream termination across an + inter-actor ``Channel``. This is the type returned to a local task + which invoked a remote streaming function using `Portal.run()`. + + Termination rules: + - if the local task signals stop iteration a cancel signal is + relayed to the remote task indicating to stop streaming + - if the remote task signals the end of a stream, raise a + ``StopAsyncIteration`` to terminate the local ``async for`` + + """ + def __init__( + self, + cid: str, + rx_chan: trio.abc.ReceiveChannel, + portal: 'Portal', + ) -> None: + self._cid = cid + self._rx_chan = rx_chan + self._portal = portal + + # delegate directly to underlying mem channel + def receive_nowait(self): + return self._rx_chan.receive_nowait() + + async def receive(self): + try: + msg = await self._rx_chan.receive() + return msg['yield'] + except trio.ClosedResourceError: + # when the send is closed we assume the stream has + # terminated and signal this local iterator to stop + await self.aclose() + raise StopAsyncIteration + except trio.Cancelled: + # relay cancels to the remote task + await self.aclose() + raise + except KeyError: + # internal error should never get here + assert msg.get('cid'), ( + "Received internal error at portal?") + raise unpack_error(msg, self._portal.channel) + + async def aclose(self): + """Cancel associate remote actor task on close + as well as the local memory channel. + """ + if self._rx_chan._closed: + log.warning(f"{self} is already closed") + return + cid = self._cid + with trio.move_on_after(0.5) as cs: + cs.shield = True + log.warning( + f"Cancelling stream {cid} to " + f"{self._portal.channel.uid}") + # TODO: yeah.. it'd be nice if this was just an + # async func on the far end. Gotta figure out a + # better way then implicitly feeding the ctx + # to declaring functions; likely a decorator + # system. + rchan = await self._portal.run( + 'self', 'cancel_task', cid=cid) + async for _ in rchan: + pass + + if cs.cancelled_caught: + # XXX: there's no way to know if the remote task was indeed + # cancelled in the case where the connection is broken or + # some other network error occurred. + if not self._portal.channel.connected(): + log.warning( + "May have failed to cancel remote task " + f"{cid} for {self._portal.channel.uid}") + + with trio.CancelScope(shield=True): + await self._rx_chan.aclose() + + def clone(self): + return self + + class Portal: """A 'portal' to a(n) (remote) ``Actor``. @@ -67,17 +152,14 @@ class Portal: self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None - self._agens: Set[typing.AsyncGenerator] = set() - - async def aclose(self) -> None: - log.debug(f"Closing {self}") - # XXX: won't work until https://github.com/python-trio/trio/pull/460 - # gets in! - await self.channel.aclose() + self._streams: Set[StreamReceiveChannel] = set() async def _submit( - self, ns: str, func: str, **kwargs - ) -> Tuple[str, trio.Queue, str, Dict[str, Any]]: + self, + ns: str, + func: str, + kwargs, + ) -> Tuple[str, trio.abc.ReceiveChannel, str, Dict[str, Any]]: """Submit a function to be scheduled and run by actor, return the associated caller id, response queue, response type str, first message packet as a tuple. @@ -85,11 +167,13 @@ class Portal: This is an async call. """ # ship a function call request to the remote actor - cid, q = await current_actor().send_cmd(self.channel, ns, func, kwargs) + cid, recv_chan = await current_actor().send_cmd( + self.channel, ns, func, kwargs) # wait on first response msg and handle (this should be # in an immediate response) - first_msg = await q.get() + + first_msg = await recv_chan.receive() functype = first_msg.get('functype') if functype == 'function' or functype == 'asyncfunction': @@ -101,12 +185,12 @@ class Portal: else: raise ValueError(f"{first_msg} is an invalid response packet?") - return cid, q, resp_type, first_msg + return cid, recv_chan, resp_type, first_msg async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: assert self._expect_result is None, \ "A pending main result has already been submitted" - self._expect_result = await self._submit(ns, func, **kwargs) + self._expect_result = await self._submit(ns, func, kwargs) async def run(self, ns: str, func: str, **kwargs) -> Any: """Submit a remote function to be scheduled and run by actor, @@ -116,62 +200,26 @@ class Portal: remote rpc task or a local async generator instance. """ return await self._return_from_resptype( - *(await self._submit(ns, func, **kwargs)) + *(await self._submit(ns, func, kwargs)) ) async def _return_from_resptype( - self, cid: str, q: trio.Queue, resptype: str, first_msg: dict + self, + cid: str, + recv_chan: trio.abc.ReceiveChannel, + resptype: str, + first_msg: dict ) -> Any: # TODO: not this needs some serious work and thinking about how # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - if resptype == 'yield': # stream response - - async def yield_from_q(): - try: - async for msg in q: - try: - yield msg['yield'] - except KeyError: - if 'stop' in msg: - break # far end async gen terminated - else: - # internal error should never get here - assert msg.get('cid'), ( - "Received internal error at portal?") - raise unpack_error(msg, self.channel) - - except (GeneratorExit, trio.Cancelled): - log.warning( - f"Cancelling async gen call {cid} to " - f"{self.channel.uid}") - with trio.move_on_after(0.5) as cs: - cs.shield = True - # TODO: yeah.. it'd be nice if this was just an - # async func on the far end. Gotta figure out a - # better way then implicitly feeding the ctx - # to declaring functions; likely a decorator - # sytsem. - agen = await self.run('self', 'cancel_task', cid=cid) - async with aclosing(agen) as agen: - async for _ in agen: - pass - if cs.cancelled_caught: - if not self.channel.connected(): - log.warning( - "May have failed to cancel remote task " - f"{cid} for {self.channel.uid}") - raise - - # TODO: use AsyncExitStack to aclose() all agens - # on teardown - agen = yield_from_q() - self._agens.add(agen) - return agen + rchan = StreamReceiveChannel(cid, recv_chan, self) + self._streams.add(rchan) + return rchan elif resptype == 'return': # single response - msg = await q.get() + msg = await recv_chan.receive() try: return msg['return'] except KeyError: @@ -217,13 +265,17 @@ class Portal: async def _cancel_streams(self): # terminate all locally running async generator # IPC calls - if self._agens: + if self._streams: log.warning( f"Cancelling all streams with {self.channel.uid}") - for agen in self._agens: - await agen.aclose() + for stream in self._streams.copy(): + await stream.aclose() - async def close(self) -> None: + async def aclose(self) -> None: + log.debug(f"Closing {self}") + # TODO: once we move to implementing our own `ReceiveChannel` + # (including remote task cancellation inside its `.aclose()`) + # we'll need to .aclose all those channels here await self._cancel_streams() async def cancel_actor(self) -> bool: @@ -308,7 +360,7 @@ async def open_portal( try: yield portal finally: - await portal.close() + await portal.aclose() if was_connected: # cancel remote channel-msg loop diff --git a/tractor/_state.py b/tractor/_state.py index 704fae7..2606ff3 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -1,6 +1,7 @@ """ Per process state """ +import multiprocessing as mp from typing import Optional @@ -13,3 +14,9 @@ def current_actor() -> 'Actor': # type: ignore if not _current_actor: raise RuntimeError("No actor instance has been defined yet?") return _current_actor + + +def is_main_process(): + """Bool determining if this actor is running in the top-most process. + """ + return mp.current_process().name == 'MainProcess' diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 44f8dcd..dd18158 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -185,7 +185,7 @@ class ActorNursery: Should only be called for actors spawned with `run_in_actor()`. """ - with trio.open_cancel_scope() as cs: + with trio.CancelScope() as cs: task_status.started(cs) # if this call errors we store the exception for later # in ``errors`` which will be reraised inside @@ -316,7 +316,7 @@ class ActorNursery: # a cancel signal shows up slightly after in which case # the `else:` block here might not complete? # For now, shield both. - with trio.open_cancel_scope(shield=True): + with trio.CancelScope(shield=True): if etype in (trio.Cancelled, KeyboardInterrupt): log.warning( f"Nursery for {current_actor().uid} was " @@ -353,11 +353,5 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: raise RuntimeError("No actor instance has been defined yet?") # TODO: figure out supervisors from erlang - async with ActorNursery(current_actor()) as nursery: + async with ActorNursery(actor) as nursery: yield nursery - - -def is_main_process(): - """Bool determining if this actor is running in the top-most process. - """ - return mp.current_process().name == 'MainProcess' diff --git a/tractor/msg.py b/tractor/msg.py index ebed198..73e39d5 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -43,6 +43,9 @@ async def fan_out_to_ctxs( for ctx in topics2ctxs.get(topic, set()): ctx_payloads.setdefault(ctx, {}).update(packet), + if not ctx_payloads: + log.debug(f"Unconsumed values:\n{published}") + # deliver to each subscriber (fan out) if ctx_payloads: for ctx, payload in ctx_payloads.items(): @@ -50,7 +53,7 @@ async def fan_out_to_ctxs( await ctx.send_yield(payload) except ( # That's right, anything you can think of... - trio.ClosedStreamError, ConnectionResetError, + trio.ClosedResourceError, ConnectionResetError, ConnectionRefusedError, ): log.warning(f"{ctx.chan} went down?")