From 4dccb44c6783a6f12f4a133ef465a97e1d7aa521 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Dec 2018 23:12:46 -0500 Subject: [PATCH 1/7] Add support for cancelling remote tasks via a msg --- tractor/_actor.py | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index f1beb45..2604fa4 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -103,7 +103,7 @@ async def _invoke( # TODO: we should really support a proper # `StopAsyncIteration` system here for returning a final # value if desired - await chan.send({'stop': None, 'cid': cid}) + await chan.send({'stop': True, 'cid': cid}) else: if treat_as_gen: await chan.send({'functype': 'asyncgen', 'cid': cid}) @@ -117,7 +117,7 @@ async def _invoke( if not cs.cancelled_caught: # task was not cancelled so we can instruct the # far end async gen to tear down - await chan.send({'stop': None, 'cid': cid}) + await chan.send({'stop': True, 'cid': cid}) else: await chan.send({'functype': 'asyncfunction', 'cid': cid}) with trio.open_cancel_scope() as cs: @@ -141,7 +141,7 @@ async def _invoke( tasks = actor._rpc_tasks.get(chan, None) if tasks: try: - tasks.remove((cs, func)) + scope, func = tasks.pop(cid) except ValueError: # If we're cancelled before the task returns then the # cancel scope will not have been inserted yet @@ -197,7 +197,7 @@ class Actor: self._no_more_rpc_tasks.set() self._rpc_tasks: Dict[ Channel, - List[Tuple[trio._core._run.CancelScope, typing.Callable]] + Dict[str, Tuple[trio._core._run.CancelScope, typing.Callable]] ] = {} # map {uids -> {callids -> waiter queues}} self._actors2calls: Dict[Tuple[str, str], Dict[str, trio.Queue]] = {} @@ -344,9 +344,10 @@ class Actor: if msg is None: # terminate sentinel log.debug( f"Cancelling all tasks for {chan} from {chan.uid}") - for scope, func in self._rpc_tasks.pop(chan, ()): + for cid, (scope, func) in self._rpc_tasks.pop( + chan, {} + ).items(): scope.cancel() - log.debug( f"Msg loop signalled to terminate for" f" {chan} from {chan.uid}") @@ -354,10 +355,20 @@ class Actor: log.debug(f"Received msg {msg} from {chan.uid}") cid = msg.get('cid') if cid: - # deliver response to local caller/waiter - await self._push_result(chan.uid, cid, msg) - log.debug( - f"Waiting on next msg for {chan} from {chan.uid}") + cancel = msg.get('cancel') + if cancel: + # right now this is only implicitly used by + # async generator IPC + scope, func = self._rpc_tasks[chan][cid] + log.debug( + f"Received cancel request for task {cid}" + f" from {chan.uid}") + scope.cancel() + else: + # deliver response to local caller/waiter + await self._push_result(chan.uid, cid, msg) + log.debug( + f"Waiting on next msg for {chan} from {chan.uid}") continue # process command request @@ -403,7 +414,7 @@ class Actor: log.info(f"RPC func is {func}") # store cancel scope such that the rpc task can be # cancelled gracefully if requested - self._rpc_tasks.setdefault(chan, []).append((cs, func)) + self._rpc_tasks.setdefault(chan, {})[cid] = (cs, func) log.debug( f"Waiting on next msg for {chan} from {chan.uid}") else: @@ -611,9 +622,9 @@ class Actor: """ tasks = self._rpc_tasks log.info(f"Cancelling all {len(tasks)} rpc tasks:\n{tasks}") - for chan, scopes in tasks.items(): + for chan, cids2scopes in tasks.items(): log.debug(f"Cancelling all tasks for {chan.uid}") - for scope, func in scopes: + for cid, (scope, func) in cids2scopes.items(): log.debug(f"Cancelling task for {func}") scope.cancel() if tasks: From 32c7a06e6a197cfc5dc6406b9a0dd29e68094b79 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Dec 2018 23:13:25 -0500 Subject: [PATCH 2/7] Cancel remote async gens when `aclose()` is called --- tractor/_portal.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 28e28f4..136683f 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -139,7 +139,9 @@ class Portal: "Received internal error at portal?") raise unpack_error(msg, self.channel) - except StopAsyncIteration: + except GeneratorExit: + # for now this msg cancels an ongoing remote task + await self.channel.send({'cancel': True, 'cid': cid}) log.debug( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") From 47b531a43a80b9355bae7eeacadca02b92afed6c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Dec 2018 23:13:58 -0500 Subject: [PATCH 3/7] Add test to verify remote task cancellation --- tests/test_streaming.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 5691fbd..e25a000 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -13,6 +13,12 @@ async def stream_seq(sequence): yield i await trio.sleep(0.1) + # block indefinitely waiting to be cancelled by ``aclose()`` call + with trio.open_cancel_scope() as cs: + await trio.sleep(float('inf')) + assert 0 + assert cs.cancelled_caught + async def stream_from_single_subactor(): """Verify we can spawn a daemon actor and retrieve streamed data. @@ -37,17 +43,22 @@ async def stream_from_single_subactor(): ) # it'd sure be nice to have an asyncitertools here... iseq = iter(seq) + ival = next(iseq) async for val in agen: - assert val == next(iseq) - # TODO: test breaking the loop (should it kill the - # far end?) - # break - # terminate far-end async-gen - # await gen.asend(None) - # break + assert val == ival + try: + ival = next(iseq) + except StopIteration: + # should cancel far end task which will be + # caught and no error is raised + await agen.aclose() - # stop all spawned subactors - await portal.cancel_actor() + await trio.sleep(0.3) + try: + await agen.__anext__() + except StopAsyncIteration: + # stop all spawned subactors + await portal.cancel_actor() # await nursery.cancel() From d492236f3a92ae9e29877310fae9d0b9663e1e88 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 15 Dec 2018 02:19:47 -0500 Subject: [PATCH 4/7] Handle broken channels more resiliently on teardown --- tractor/_actor.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 2604fa4..750487a 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -294,8 +294,11 @@ class Actor: # # XXX: is this necessary (GC should do it?) if chan.connected(): log.debug(f"Disconnecting channel {chan}") - await chan.send(None) - await chan.aclose() + try: + await chan.send(None) + await chan.aclose() + except trio.BrokenResourceError: + log.exception(f"Channel for {chan.uid} was already zonked..") async def _push_result(self, actorid, cid: str, msg: dict) -> None: """Push an RPC result to the local consumer's queue. From db85e13657cb5bdf2439bae9327150e70385d337 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 15 Dec 2018 02:20:19 -0500 Subject: [PATCH 5/7] Use a fifo lock for IPC --- tractor/_ipc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 52b1ca5..f7bebbe 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -20,7 +20,7 @@ class StreamQueue: self._agen = self._iter_packets() self._laddr = self.stream.socket.getsockname()[:2] self._raddr = self.stream.socket.getpeername()[:2] - self._send_lock = trio.Lock() + self._send_lock = trio.StrictFIFOLock() async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. From eb6e82f577eb3fa01c797fa7b54dc6a8e0acae9d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 15 Dec 2018 02:20:55 -0500 Subject: [PATCH 6/7] Close all portal created async gens on shutdown --- tractor/_portal.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 136683f..eb89252 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -65,6 +65,7 @@ class Portal: self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None + self._agens: Set(AsyncGenerator) = set() async def aclose(self) -> None: log.debug(f"Closing {self}") @@ -142,14 +143,16 @@ class Portal: except GeneratorExit: # for now this msg cancels an ongoing remote task await self.channel.send({'cancel': True, 'cid': cid}) - log.debug( + log.warn( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") raise # TODO: use AsyncExitStack to aclose() all agens # on teardown - return yield_from_q() + agen = yield_from_q() + self._agens.add(agen) + return agen elif resptype == 'return': msg = await q.get() @@ -269,13 +272,18 @@ async def open_portal( nursery.start_soon(actor._process_messages, channel) portal = Portal(channel) - yield portal + try: + yield portal + finally: + # tear down all async generators + for agen in portal._agens: + await agen.aclose() - # cancel remote channel-msg loop - if channel.connected(): - await portal.close() + # cancel remote channel-msg loop + if channel.connected(): + await portal.close() - # cancel background msg loop task - nursery.cancel_scope.cancel() - if was_connected: - await channel.aclose() + # cancel background msg loop task + nursery.cancel_scope.cancel() + if was_connected: + await channel.aclose() From ef23055d12228ba8decd7ce4736518c074d497f5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Jan 2019 12:14:57 -0500 Subject: [PATCH 7/7] Use proper typing syntax --- tractor/_portal.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index eb89252..fff39db 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -4,7 +4,7 @@ Portal api import importlib import inspect import typing -from typing import Tuple, Any, Dict, Optional +from typing import Tuple, Any, Dict, Optional, Set import trio from async_generator import asynccontextmanager @@ -65,7 +65,7 @@ class Portal: self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None - self._agens: Set(AsyncGenerator) = set() + self._agens: Set[typing.AsyncGenerator] = set() async def aclose(self) -> None: log.debug(f"Closing {self}")