From 97f709cc143ebdc2cc5da08325d4c143b2c2e8d7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 21 Jan 2019 00:45:54 -0500 Subject: [PATCH] Cancel remote streaming tasks on a local cancel Use the new `Actor.cancel_task()` api to remotely cancel streaming tasks spawned by a portal. This guarantees that if an actor is cancelled all its (remote) portal spawned tasks will be as well. On portal teardown only cancel all async generator calls (though we should cancel all RPC requests in general eventually) and don't close the channel since it may have been passed in from some other context that wishes to keep it connected. In `open_portal()` run the message loop shielded so that if the local task is cancelled, messaging will continue until the internal scope is cancelled at end of block. --- tractor/_portal.py | 86 ++++++++++++++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 30 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index fff39db..6a47330 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -5,9 +5,11 @@ import importlib import inspect import typing from typing import Tuple, Any, Dict, Optional, Set +from functools import partial +from dataclasses import dataclass import trio -from async_generator import asynccontextmanager +from async_generator import asynccontextmanager, aclosing from ._state import current_actor from ._ipc import Channel @@ -53,7 +55,7 @@ class Portal: underlying ``tractor.Channel`` as though the remote (async) function / generator was invoked locally. - Think of this like an native async IPC API. + Think of this like a native async IPC API. """ def __init__(self, channel: Channel) -> None: self.channel = channel @@ -124,7 +126,7 @@ class Portal: # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - if resptype == 'yield': + if resptype == 'yield': # stream response async def yield_from_q(): try: @@ -140,12 +142,21 @@ class Portal: "Received internal error at portal?") raise unpack_error(msg, self.channel) - except GeneratorExit: - # for now this msg cancels an ongoing remote task - await self.channel.send({'cancel': True, 'cid': cid}) - log.warn( + except (GeneratorExit, trio.Cancelled): + log.warning( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") + with trio.open_cancel_scope() as cleanup_scope: + cleanup_scope.shield = True + # TODO: yeah.. it'd be nice if this was just an + # async func on the far end. Gotta figure out a + # better way then implicitly feeding the ctx + # to declaring functions; likely a decorator + # sytsem. + agen = await self.run('self', 'cancel_task', cid=cid) + async with aclosing(agen) as agen: + async for _ in agen: + pass raise # TODO: use AsyncExitStack to aclose() all agens @@ -154,7 +165,7 @@ class Portal: self._agens.add(agen) return agen - elif resptype == 'return': + elif resptype == 'return': # single response msg = await q.get() try: return msg['return'] @@ -176,7 +187,7 @@ class Portal: # not expecting a "main" result if self._expect_result is None: - log.warn( + log.warning( f"Portal for {self.channel.uid} not expecting a final" " result?\nresult() should only be called if subactor" " was spawned with `ActorNursery.run_in_actor()`") @@ -198,22 +209,34 @@ class Portal: return self._result + async def _cancel_streams(self): + # terminate all locally running async generator + # IPC calls + if self._agens: + log.warning( + f"Cancelling all streams with {self.channel.uid}") + for agen in self._agens: + await agen.aclose() + async def close(self) -> None: - # trigger remote msg loop `break` - chan = self.channel - log.debug(f"Closing portal for {chan} to {chan.uid}") - await self.channel.send(None) + await self._cancel_streams() async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. """ + if not self.channel.connected(): + log.warning("This portal is already closed can't cancel") + return False + + await self._cancel_streams() + log.warning( - f"Sending cancel request to {self.channel.uid} on " + f"Sending actor cancel request to {self.channel.uid} on " f"{self.channel}") try: - with trio.move_on_after(0.1) as cancel_scope: + # send cancel cmd - might not get response + with trio.move_on_after(0.5) as cancel_scope: cancel_scope.shield = True - # send cancel cmd - might not get response await self.run('self', 'cancel') return True except trio.ClosedResourceError: @@ -225,17 +248,14 @@ class Portal: return False +@dataclass class LocalPortal: """A 'portal' to a local ``Actor``. A compatibility shim for normal portals but for invoking functions using an in process actor instance. """ - def __init__( - self, - actor: 'Actor' # type: ignore - ) -> None: - self.actor = actor + actor: 'Actor' # type: ignore async def run(self, ns: str, func_name: str, **kwargs) -> Any: """Run a requested function locally and return it's result. @@ -270,20 +290,26 @@ async def open_portal( if channel.uid is None: await _do_handshake(actor, channel) - nursery.start_soon(actor._process_messages, channel) + msg_loop_cs = await nursery.start( + partial( + actor._process_messages, + channel, + # if the local task is cancelled we want to keep + # the msg loop running until our block ends + shield=True, + ) + ) portal = Portal(channel) try: yield portal finally: - # tear down all async generators - for agen in portal._agens: - await agen.aclose() + await portal.close() - # cancel remote channel-msg loop - if channel.connected(): - await portal.close() + if was_connected: + # cancel remote channel-msg loop + await channel.send(None) + await channel.aclose() # cancel background msg loop task + msg_loop_cs.cancel() nursery.cancel_scope.cancel() - if was_connected: - await channel.aclose()