forked from goodboy/tractor
1
0
Fork 0

Set channel cancel called flag on cancel requests

acked_backup
Tyler Goodlet 2021-12-01 18:48:29 -05:00
parent 1976e61d1a
commit a23afb0bb8
1 changed files with 51 additions and 26 deletions

View File

@ -1,5 +1,6 @@
""" """
Portal api Memory boundary "Portals": an API for structured
concurrency linked tasks running in disparate memory domains.
""" """
import importlib import importlib
@ -21,7 +22,6 @@ from .log import get_logger
from ._exceptions import ( from ._exceptions import (
unpack_error, unpack_error,
NoResult, NoResult,
# RemoteActorError,
ContextCancelled, ContextCancelled,
) )
from ._streaming import Context, ReceiveMsgStream from ._streaming import Context, ReceiveMsgStream
@ -35,10 +35,12 @@ async def maybe_open_nursery(
nursery: trio.Nursery = None, nursery: trio.Nursery = None,
shield: bool = False, shield: bool = False,
) -> AsyncGenerator[trio.Nursery, Any]: ) -> AsyncGenerator[trio.Nursery, Any]:
"""Create a new nursery if None provided. '''
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided. Blocks on exit as expected if no input nursery is provided.
"""
'''
if nursery is not None: if nursery is not None:
yield nursery yield nursery
else: else:
@ -87,14 +89,18 @@ class Portal:
like having a "portal" between the seperate actor memory spaces. like having a "portal" between the seperate actor memory spaces.
''' '''
# the timeout for a remote cancel request sent to
# a(n) (peer) actor.
cancel_timeout = 0.5
def __init__(self, channel: Channel) -> None: def __init__(self, channel: Channel) -> None:
self.channel = channel self.channel = channel
# when this is set to a tuple returned from ``_submit()`` then
# it is expected that ``result()`` will be awaited at some point
# during the portal's lifetime # during the portal's lifetime
self._result_msg: Optional[dict] = None self._result_msg: Optional[dict] = None
# set when _submit_for_result is called # When this is set to a tuple returned from ``_submit()`` then
# it is expected that ``result()`` will be awaited at some
# point. Set when _submit_for_result is called
self._expect_result: Optional[ self._expect_result: Optional[
Tuple[str, Any, str, Dict[str, Any]] Tuple[str, Any, str, Dict[str, Any]]
] = None ] = None
@ -199,9 +205,15 @@ class Portal:
# we'll need to .aclose all those channels here # we'll need to .aclose all those channels here
await self._cancel_streams() await self._cancel_streams()
async def cancel_actor(self): async def cancel_actor(
"""Cancel the actor on the other end of this portal. self,
""" timeout: float = None,
) -> bool:
'''
Cancel the actor on the other end of this portal.
'''
if not self.channel.connected(): if not self.channel.connected():
log.cancel("This portal is already closed can't cancel") log.cancel("This portal is already closed can't cancel")
return False return False
@ -211,16 +223,19 @@ class Portal:
log.cancel( log.cancel(
f"Sending actor cancel request to {self.channel.uid} on " f"Sending actor cancel request to {self.channel.uid} on "
f"{self.channel}") f"{self.channel}")
self.channel._cancel_called = True
try: try:
# send cancel cmd - might not get response # send cancel cmd - might not get response
# XXX: sure would be nice to make this work with a proper shield # XXX: sure would be nice to make this work with a proper shield
with trio.move_on_after(0.5) as cancel_scope: with trio.move_on_after(timeout or self.cancel_timeout) as cs:
cancel_scope.shield = True cs.shield = True
await self.run_from_ns('self', 'cancel') await self.run_from_ns('self', 'cancel')
return True return True
if cancel_scope.cancelled_caught: if cs.cancelled_caught:
log.cancel(f"May have failed to cancel {self.channel.uid}") log.cancel(f"May have failed to cancel {self.channel.uid}")
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
@ -237,7 +252,9 @@ class Portal:
function_name: str, function_name: str,
**kwargs, **kwargs,
) -> Any: ) -> Any:
"""Run a function from a (remote) namespace in a new task on the far-end actor. '''
Run a function from a (remote) namespace in a new task on the
far-end actor.
This is a more explitcit way to run tasks in a remote-process This is a more explitcit way to run tasks in a remote-process
actor using explicit object-path syntax. Hint: this is how actor using explicit object-path syntax. Hint: this is how
@ -246,9 +263,11 @@ class Portal:
Note:: Note::
A special namespace `self` can be used to invoke `Actor` A special namespace `self` can be used to invoke `Actor`
instance methods in the remote runtime. Currently this should only instance methods in the remote runtime. Currently this
be used for `tractor` internals. should only be used solely for ``tractor`` runtime
""" internals.
'''
msg = await self._return_once( msg = await self._return_once(
*(await self._submit(namespace_path, function_name, kwargs)) *(await self._submit(namespace_path, function_name, kwargs))
) )
@ -447,7 +466,8 @@ class Portal:
except ( except (
BaseException, BaseException,
# more specifically, we need to handle: # more specifically, we need to handle these but not
# sure it's worth being pedantic:
# Exception, # Exception,
# trio.Cancelled, # trio.Cancelled,
# trio.MultiError, # trio.MultiError,
@ -495,19 +515,22 @@ class Portal:
@dataclass @dataclass
class LocalPortal: class LocalPortal:
"""A 'portal' to a local ``Actor``. '''
A 'portal' to a local ``Actor``.
A compatibility shim for normal portals but for invoking functions A compatibility shim for normal portals but for invoking functions
using an in process actor instance. using an in process actor instance.
"""
'''
actor: 'Actor' # type: ignore # noqa actor: 'Actor' # type: ignore # noqa
channel: Channel channel: Channel
async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any: async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any:
"""Run a requested local function from a namespace path and '''
Run a requested local function from a namespace path and
return it's result. return it's result.
""" '''
obj = self.actor if ns == 'self' else importlib.import_module(ns) obj = self.actor if ns == 'self' else importlib.import_module(ns)
func = getattr(obj, func_name) func = getattr(obj, func_name)
return await func(**kwargs) return await func(**kwargs)
@ -522,10 +545,13 @@ async def open_portal(
shield: bool = False, shield: bool = False,
) -> AsyncGenerator[Portal, None]: ) -> AsyncGenerator[Portal, None]:
"""Open a ``Portal`` through the provided ``channel``. '''
Open a ``Portal`` through the provided ``channel``.
Spawns a background task to handle message processing. Spawns a background task to handle message processing (normally
""" done by the actor-runtime implicitly).
'''
actor = current_actor() actor = current_actor()
assert actor assert actor
was_connected = False was_connected = False
@ -553,7 +579,6 @@ async def open_portal(
portal = Portal(channel) portal = Portal(channel)
try: try:
yield portal yield portal
finally: finally:
await portal.aclose() await portal.aclose()