From a23afb0bb8086691448f4feac48d7417e6c7aba2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Dec 2021 18:48:29 -0500 Subject: [PATCH] Set channel cancel called flag on cancel requests --- tractor/_portal.py | 77 ++++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 80fc902..d32f26b 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -1,5 +1,6 @@ """ -Portal api +Memory boundary "Portals": an API for structured +concurrency linked tasks running in disparate memory domains. """ import importlib @@ -21,7 +22,6 @@ from .log import get_logger from ._exceptions import ( unpack_error, NoResult, - # RemoteActorError, ContextCancelled, ) from ._streaming import Context, ReceiveMsgStream @@ -35,10 +35,12 @@ async def maybe_open_nursery( nursery: trio.Nursery = None, shield: bool = False, ) -> AsyncGenerator[trio.Nursery, Any]: - """Create a new nursery if None provided. + ''' + Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. - """ + + ''' if nursery is not None: yield nursery else: @@ -87,14 +89,18 @@ class Portal: like having a "portal" between the seperate actor memory spaces. ''' + # the timeout for a remote cancel request sent to + # a(n) (peer) actor. + cancel_timeout = 0.5 + def __init__(self, channel: Channel) -> None: self.channel = channel - # when this is set to a tuple returned from ``_submit()`` then - # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime self._result_msg: Optional[dict] = None - # set when _submit_for_result is called + # When this is set to a tuple returned from ``_submit()`` then + # it is expected that ``result()`` will be awaited at some + # point. Set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None @@ -199,9 +205,15 @@ class Portal: # we'll need to .aclose all those channels here await self._cancel_streams() - async def cancel_actor(self): - """Cancel the actor on the other end of this portal. - """ + async def cancel_actor( + self, + timeout: float = None, + + ) -> bool: + ''' + Cancel the actor on the other end of this portal. + + ''' if not self.channel.connected(): log.cancel("This portal is already closed can't cancel") return False @@ -211,16 +223,19 @@ class Portal: log.cancel( f"Sending actor cancel request to {self.channel.uid} on " f"{self.channel}") + + self.channel._cancel_called = True + try: # send cancel cmd - might not get response # XXX: sure would be nice to make this work with a proper shield - with trio.move_on_after(0.5) as cancel_scope: - cancel_scope.shield = True + with trio.move_on_after(timeout or self.cancel_timeout) as cs: + cs.shield = True await self.run_from_ns('self', 'cancel') return True - if cancel_scope.cancelled_caught: + if cs.cancelled_caught: log.cancel(f"May have failed to cancel {self.channel.uid}") # if we get here some weird cancellation case happened @@ -237,7 +252,9 @@ class Portal: function_name: str, **kwargs, ) -> Any: - """Run a function from a (remote) namespace in a new task on the far-end actor. + ''' + Run a function from a (remote) namespace in a new task on the + far-end actor. This is a more explitcit way to run tasks in a remote-process actor using explicit object-path syntax. Hint: this is how @@ -246,9 +263,11 @@ class Portal: Note:: A special namespace `self` can be used to invoke `Actor` - instance methods in the remote runtime. Currently this should only - be used for `tractor` internals. - """ + instance methods in the remote runtime. Currently this + should only be used solely for ``tractor`` runtime + internals. + + ''' msg = await self._return_once( *(await self._submit(namespace_path, function_name, kwargs)) ) @@ -447,7 +466,8 @@ class Portal: except ( BaseException, - # more specifically, we need to handle: + # more specifically, we need to handle these but not + # sure it's worth being pedantic: # Exception, # trio.Cancelled, # trio.MultiError, @@ -495,19 +515,22 @@ class Portal: @dataclass class LocalPortal: - """A 'portal' to a local ``Actor``. + ''' + A 'portal' to a local ``Actor``. A compatibility shim for normal portals but for invoking functions using an in process actor instance. - """ + + ''' actor: 'Actor' # type: ignore # noqa channel: Channel async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any: - """Run a requested local function from a namespace path and + ''' + Run a requested local function from a namespace path and return it's result. - """ + ''' obj = self.actor if ns == 'self' else importlib.import_module(ns) func = getattr(obj, func_name) return await func(**kwargs) @@ -522,10 +545,13 @@ async def open_portal( shield: bool = False, ) -> AsyncGenerator[Portal, None]: - """Open a ``Portal`` through the provided ``channel``. + ''' + Open a ``Portal`` through the provided ``channel``. - Spawns a background task to handle message processing. - """ + Spawns a background task to handle message processing (normally + done by the actor-runtime implicitly). + + ''' actor = current_actor() assert actor was_connected = False @@ -553,7 +579,6 @@ async def open_portal( portal = Portal(channel) try: yield portal - finally: await portal.aclose()