diff --git a/tractor/_portal.py b/tractor/_portal.py index fff39db..6a47330 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -5,9 +5,11 @@ import importlib import inspect import typing from typing import Tuple, Any, Dict, Optional, Set +from functools import partial +from dataclasses import dataclass import trio -from async_generator import asynccontextmanager +from async_generator import asynccontextmanager, aclosing from ._state import current_actor from ._ipc import Channel @@ -53,7 +55,7 @@ class Portal: underlying ``tractor.Channel`` as though the remote (async) function / generator was invoked locally. - Think of this like an native async IPC API. + Think of this like a native async IPC API. """ def __init__(self, channel: Channel) -> None: self.channel = channel @@ -124,7 +126,7 @@ class Portal: # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) - if resptype == 'yield': + if resptype == 'yield': # stream response async def yield_from_q(): try: @@ -140,12 +142,21 @@ class Portal: "Received internal error at portal?") raise unpack_error(msg, self.channel) - except GeneratorExit: - # for now this msg cancels an ongoing remote task - await self.channel.send({'cancel': True, 'cid': cid}) - log.warn( + except (GeneratorExit, trio.Cancelled): + log.warning( f"Cancelling async gen call {cid} to " f"{self.channel.uid}") + with trio.open_cancel_scope() as cleanup_scope: + cleanup_scope.shield = True + # TODO: yeah.. it'd be nice if this was just an + # async func on the far end. Gotta figure out a + # better way then implicitly feeding the ctx + # to declaring functions; likely a decorator + # sytsem. + agen = await self.run('self', 'cancel_task', cid=cid) + async with aclosing(agen) as agen: + async for _ in agen: + pass raise # TODO: use AsyncExitStack to aclose() all agens @@ -154,7 +165,7 @@ class Portal: self._agens.add(agen) return agen - elif resptype == 'return': + elif resptype == 'return': # single response msg = await q.get() try: return msg['return'] @@ -176,7 +187,7 @@ class Portal: # not expecting a "main" result if self._expect_result is None: - log.warn( + log.warning( f"Portal for {self.channel.uid} not expecting a final" " result?\nresult() should only be called if subactor" " was spawned with `ActorNursery.run_in_actor()`") @@ -198,22 +209,34 @@ class Portal: return self._result + async def _cancel_streams(self): + # terminate all locally running async generator + # IPC calls + if self._agens: + log.warning( + f"Cancelling all streams with {self.channel.uid}") + for agen in self._agens: + await agen.aclose() + async def close(self) -> None: - # trigger remote msg loop `break` - chan = self.channel - log.debug(f"Closing portal for {chan} to {chan.uid}") - await self.channel.send(None) + await self._cancel_streams() async def cancel_actor(self) -> bool: """Cancel the actor on the other end of this portal. """ + if not self.channel.connected(): + log.warning("This portal is already closed can't cancel") + return False + + await self._cancel_streams() + log.warning( - f"Sending cancel request to {self.channel.uid} on " + f"Sending actor cancel request to {self.channel.uid} on " f"{self.channel}") try: - with trio.move_on_after(0.1) as cancel_scope: + # send cancel cmd - might not get response + with trio.move_on_after(0.5) as cancel_scope: cancel_scope.shield = True - # send cancel cmd - might not get response await self.run('self', 'cancel') return True except trio.ClosedResourceError: @@ -225,17 +248,14 @@ class Portal: return False +@dataclass class LocalPortal: """A 'portal' to a local ``Actor``. A compatibility shim for normal portals but for invoking functions using an in process actor instance. """ - def __init__( - self, - actor: 'Actor' # type: ignore - ) -> None: - self.actor = actor + actor: 'Actor' # type: ignore async def run(self, ns: str, func_name: str, **kwargs) -> Any: """Run a requested function locally and return it's result. @@ -270,20 +290,26 @@ async def open_portal( if channel.uid is None: await _do_handshake(actor, channel) - nursery.start_soon(actor._process_messages, channel) + msg_loop_cs = await nursery.start( + partial( + actor._process_messages, + channel, + # if the local task is cancelled we want to keep + # the msg loop running until our block ends + shield=True, + ) + ) portal = Portal(channel) try: yield portal finally: - # tear down all async generators - for agen in portal._agens: - await agen.aclose() + await portal.close() - # cancel remote channel-msg loop - if channel.connected(): - await portal.close() + if was_connected: + # cancel remote channel-msg loop + await channel.send(None) + await channel.aclose() # cancel background msg loop task + msg_loop_cs.cancel() nursery.cancel_scope.cancel() - if was_connected: - await channel.aclose()