From bf6958cdbe2a901bf115f79336aad66e036398f3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Nov 2021 10:43:14 -0400 Subject: [PATCH 01/13] Handle cancelled-before-proc-created spawn case It's definitely possible to have a nursery spawn task be cancelled before a `trio.Process` handle is ever returned; we now handle this case as a cancelled-during-spawn scenario. Zombie collection logic also is bypassed in this case. --- tractor/_spawn.py | 45 +++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 6ede566..6671c27 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -230,17 +230,19 @@ async def new_proc( ] cancelled_during_spawn: bool = False + proc: Optional[trio.Process] = None try: - proc = await trio.open_process(spawn_cmd) - - log.runtime(f"Started {proc}") - - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it try: + proc = await trio.open_process(spawn_cmd) + + log.runtime(f"Started {proc}") + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it event, chan = await actor_nursery._actor.wait_for_peer( subactor.uid) + except trio.Cancelled: cancelled_during_spawn = True # we may cancel before the child connects back in which @@ -320,23 +322,26 @@ async def new_proc( # killing the process too early. log.cancel(f'Hard reap sequence starting for {uid}') - with trio.CancelScope(shield=True): + if proc: + with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - if cancelled_during_spawn: - # Try again to avoid TTY clobbering. - async with acquire_debug_lock(uid): - with trio.move_on_after(0.5): - await proc.wait() + # don't clobber an ongoing pdb + if cancelled_during_spawn: + # Try again to avoid TTY clobbering. + async with acquire_debug_lock(uid): + with trio.move_on_after(0.5): + await proc.wait() - if is_root_process(): - await maybe_wait_for_debugger() + if is_root_process(): + await maybe_wait_for_debugger() - if proc.poll() is None: - log.cancel(f"Attempting to hard kill {proc}") - await do_hard_kill(proc) + if proc.poll() is None: + log.cancel(f"Attempting to hard kill {proc}") + await do_hard_kill(proc) - log.debug(f"Joined {proc}") + log.debug(f"Joined {proc}") + else: + log.warning(f'Nursery cancelled before sub-proc started') if not cancelled_during_spawn: # pop child entry to indicate we no longer managing this From 62b2867e07240bbe6f614c27a98514b2881e05fa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Nov 2021 13:01:22 -0500 Subject: [PATCH 02/13] Tweak doc strings --- tractor/_spawn.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 6671c27..da4b2c8 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -97,14 +97,17 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: async def exhaust_portal( + portal: Portal, actor: Actor + ) -> Any: - """Pull final result from portal (assuming it has one). + ''' + Pull final result from portal (assuming it has one). If the main task is an async generator do our best to consume what's left of it. - """ + ''' try: log.debug(f"Waiting on final result from {actor.uid}") @@ -126,18 +129,19 @@ async def exhaust_portal( async def cancel_on_completion( + portal: Portal, actor: Actor, errors: Dict[Tuple[str, str], Exception], ) -> None: - """ + ''' Cancel actor gracefully once it's "main" portal's result arrives. Should only be called for actors spawned with `run_in_actor()`. - """ + ''' # if this call errors we store the exception for later # in ``errors`` which will be reraised inside # a MultiError and we still send out a cancel request From 0ac3397dbbd28c25ed739cda7b53bd474b955222 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 28 Nov 2021 12:48:00 -0500 Subject: [PATCH 03/13] Only soft-acquire debug lock if a proc was spawned --- tractor/_spawn.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index da4b2c8..3857f36 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -256,7 +256,8 @@ async def new_proc( # don't clobber an ongoing pdb if is_root_process(): await maybe_wait_for_debugger() - else: + + elif proc is not None: async with acquire_debug_lock(uid): # soft wait on the proc to terminate with trio.move_on_after(0.5): From 1976e61d1acb8a23f365220246c32bdfbb29cb1d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Dec 2021 18:46:15 -0500 Subject: [PATCH 04/13] Add `.drain()` support to msg streams Enables "draining" the last set of messages after a channel/stream has been terminated mostly for the purposes of receiving a final ACK to a remote cancel command. Also, add an internal `Channel._cancel_called` flag which can be set by `Portal.cancel_actor()`. --- tractor/_ipc.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 28bef97..97b63f1 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -93,6 +93,9 @@ class MsgpackTCPStream: self._agen = self._iter_packets() self._send_lock = trio.StrictFIFOLock() + # public i guess? + self.drained = [] + async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ @@ -132,7 +135,7 @@ class MsgpackTCPStream: if data == b'': raise TransportClosed( - f'transport {self} was already closed prior ro read' + f'transport {self} was already closed prior to read' ) unpacker.feed(data) @@ -156,6 +159,14 @@ class MsgpackTCPStream: async def recv(self) -> Any: return await self._agen.asend(None) + async def drain(self): + try: + async for msg in self._iter_packets(): + self.drained.append(msg) + except TransportClosed: + for msg in self.drained: + yield msg + def __aiter__(self): return self._agen @@ -164,7 +175,8 @@ class MsgpackTCPStream: class MsgspecTCPStream(MsgpackTCPStream): - '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data + ''' + A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgspec``. ''' @@ -259,9 +271,12 @@ def get_msg_transport( class Channel: - '''An inter-process channel for communication between (remote) actors. + ''' + An inter-process channel for communication between (remote) actors. - Currently the only supported transport is a ``trio.SocketStream``. + Wraps a ``MsgStream``: transport + encoding IPC connection. + Currently we only support ``trio.SocketStream`` for transport + (aka TCP). ''' def __init__( @@ -299,10 +314,12 @@ class Channel: # set after handshake - always uid of far end self.uid: Optional[Tuple[str, str]] = None - # set if far end actor errors internally - self._exc: Optional[Exception] = None self._agen = self._aiter_recv() + self._exc: Optional[Exception] = None # set if far end actor errors self._closed: bool = False + # flag set on ``Portal.cancel_actor()`` indicating + # remote (peer) cancellation of the far end actor runtime. + self._cancel_called: bool = False # set on ``Portal.cancel_actor()`` @classmethod def from_stream( From a23afb0bb8086691448f4feac48d7417e6c7aba2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Dec 2021 18:48:29 -0500 Subject: [PATCH 05/13] Set channel cancel called flag on cancel requests --- tractor/_portal.py | 77 ++++++++++++++++++++++++++++++---------------- 1 file changed, 51 insertions(+), 26 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 80fc902..d32f26b 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -1,5 +1,6 @@ """ -Portal api +Memory boundary "Portals": an API for structured +concurrency linked tasks running in disparate memory domains. """ import importlib @@ -21,7 +22,6 @@ from .log import get_logger from ._exceptions import ( unpack_error, NoResult, - # RemoteActorError, ContextCancelled, ) from ._streaming import Context, ReceiveMsgStream @@ -35,10 +35,12 @@ async def maybe_open_nursery( nursery: trio.Nursery = None, shield: bool = False, ) -> AsyncGenerator[trio.Nursery, Any]: - """Create a new nursery if None provided. + ''' + Create a new nursery if None provided. Blocks on exit as expected if no input nursery is provided. - """ + + ''' if nursery is not None: yield nursery else: @@ -87,14 +89,18 @@ class Portal: like having a "portal" between the seperate actor memory spaces. ''' + # the timeout for a remote cancel request sent to + # a(n) (peer) actor. + cancel_timeout = 0.5 + def __init__(self, channel: Channel) -> None: self.channel = channel - # when this is set to a tuple returned from ``_submit()`` then - # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime self._result_msg: Optional[dict] = None - # set when _submit_for_result is called + # When this is set to a tuple returned from ``_submit()`` then + # it is expected that ``result()`` will be awaited at some + # point. Set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None @@ -199,9 +205,15 @@ class Portal: # we'll need to .aclose all those channels here await self._cancel_streams() - async def cancel_actor(self): - """Cancel the actor on the other end of this portal. - """ + async def cancel_actor( + self, + timeout: float = None, + + ) -> bool: + ''' + Cancel the actor on the other end of this portal. + + ''' if not self.channel.connected(): log.cancel("This portal is already closed can't cancel") return False @@ -211,16 +223,19 @@ class Portal: log.cancel( f"Sending actor cancel request to {self.channel.uid} on " f"{self.channel}") + + self.channel._cancel_called = True + try: # send cancel cmd - might not get response # XXX: sure would be nice to make this work with a proper shield - with trio.move_on_after(0.5) as cancel_scope: - cancel_scope.shield = True + with trio.move_on_after(timeout or self.cancel_timeout) as cs: + cs.shield = True await self.run_from_ns('self', 'cancel') return True - if cancel_scope.cancelled_caught: + if cs.cancelled_caught: log.cancel(f"May have failed to cancel {self.channel.uid}") # if we get here some weird cancellation case happened @@ -237,7 +252,9 @@ class Portal: function_name: str, **kwargs, ) -> Any: - """Run a function from a (remote) namespace in a new task on the far-end actor. + ''' + Run a function from a (remote) namespace in a new task on the + far-end actor. This is a more explitcit way to run tasks in a remote-process actor using explicit object-path syntax. Hint: this is how @@ -246,9 +263,11 @@ class Portal: Note:: A special namespace `self` can be used to invoke `Actor` - instance methods in the remote runtime. Currently this should only - be used for `tractor` internals. - """ + instance methods in the remote runtime. Currently this + should only be used solely for ``tractor`` runtime + internals. + + ''' msg = await self._return_once( *(await self._submit(namespace_path, function_name, kwargs)) ) @@ -447,7 +466,8 @@ class Portal: except ( BaseException, - # more specifically, we need to handle: + # more specifically, we need to handle these but not + # sure it's worth being pedantic: # Exception, # trio.Cancelled, # trio.MultiError, @@ -495,19 +515,22 @@ class Portal: @dataclass class LocalPortal: - """A 'portal' to a local ``Actor``. + ''' + A 'portal' to a local ``Actor``. A compatibility shim for normal portals but for invoking functions using an in process actor instance. - """ + + ''' actor: 'Actor' # type: ignore # noqa channel: Channel async def run_from_ns(self, ns: str, func_name: str, **kwargs) -> Any: - """Run a requested local function from a namespace path and + ''' + Run a requested local function from a namespace path and return it's result. - """ + ''' obj = self.actor if ns == 'self' else importlib.import_module(ns) func = getattr(obj, func_name) return await func(**kwargs) @@ -522,10 +545,13 @@ async def open_portal( shield: bool = False, ) -> AsyncGenerator[Portal, None]: - """Open a ``Portal`` through the provided ``channel``. + ''' + Open a ``Portal`` through the provided ``channel``. - Spawns a background task to handle message processing. - """ + Spawns a background task to handle message processing (normally + done by the actor-runtime implicitly). + + ''' actor = current_actor() assert actor was_connected = False @@ -553,7 +579,6 @@ async def open_portal( portal = Portal(channel) try: yield portal - finally: await portal.aclose() From d817f1a65826eabb69bb87c6e2b7aa63fb69b876 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Dec 2021 22:05:23 -0500 Subject: [PATCH 06/13] Add a nursery "exited" signal Use a `trio.Event` to enable nursery closure detection such that core runtime tasks can be notified when a local nursery exits and allow shutdown protocols to operate without close-before-terminate issues (such as IPC channel closure during remote peer cancellation). --- tractor/_supervise.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 1259be6..3f381ef 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -52,6 +52,7 @@ class ActorNursery: self.cancelled: bool = False self._join_procs = trio.Event() self.errors = errors + self.exited = trio.Event() async def start_actor( self, @@ -207,7 +208,8 @@ class ActorNursery: # spawn cancel tasks for each sub-actor assert portal - nursery.start_soon(portal.cancel_actor) + if portal.channel.connected(): + nursery.start_soon(portal.cancel_actor) # if we cancelled the cancel (we hung cancelling remote actors) # then hard kill all sub-processes @@ -401,18 +403,23 @@ async def open_nursery( async with open_root_actor(**kwargs) as actor: assert actor is current_actor() - # try: + try: + async with _open_and_supervise_one_cancels_all_nursery( + actor + ) as anursery: + yield anursery + finally: + anursery.exited.set() + + else: # sub-nursery case + + try: async with _open_and_supervise_one_cancels_all_nursery( actor ) as anursery: yield anursery - - else: # sub-nursery case - - async with _open_and_supervise_one_cancels_all_nursery( - actor - ) as anursery: - yield anursery + finally: + anursery.exited.set() finally: log.debug("Nursery teardown complete") From d81eb1a51ec9732db3e3def201f6e8cc07aa61b9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Dec 2021 22:17:09 -0500 Subject: [PATCH 07/13] Finally, deterministic remote cancellation support On msg loop termination we now check and see if a channel is associated with a child-actor registered in some local task's nursery. If so, we attempt to wait on channel closure initiated from the child side (by draining the underlying msg stream) so as to avoid closing it too early resulting in the child not relaying its termination status response. This means we now support the ideal case in 2-general's where we get back the ack to the closure request instead of just ignoring it and timing out XD The main implementation detail is that when `Portal.cancel_actor()` remotely calls `Actor.cancel()` we actually wait for the RPC response from that request before allowing the channel shutdown sequence to engage. The new msg stream draining support enables this. Also, factor child-to-parent error propagation logic into a helper func and improve some docs (yeah yeah y'all don't like the ''', i don't care - it makes my eyes not hurt). --- tractor/_actor.py | 133 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 105 insertions(+), 28 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index a9ce438..dbc34b0 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -2,6 +2,7 @@ Actor primitives and helpers """ +from __future__ import annotations from collections import defaultdict from functools import partial from itertools import chain @@ -57,6 +58,8 @@ async def _invoke( ''' Invoke local func and deliver result(s) over provided channel. + This is the core "RPC task" starting machinery. + ''' __tracebackhide__ = True treat_as_gen = False @@ -263,14 +266,51 @@ def _get_mod_abspath(module): _lifetime_stack: ExitStack = ExitStack() -class Actor: - """The fundamental concurrency primitive. +async def try_ship_error_to_parent( + actor: Actor, + err: Exception, - An *actor* is the combination of a regular Python process - executing a ``trio`` task tree, communicating - with other actors through "portals" which provide a native async API - around various IPC transport "channels". - """ +) -> None: + with trio.CancelScope(shield=True): + try: + # internal error so ship to parent without cid + await actor._parent_chan.send(pack_error(err)) + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + log.error( + f"Failed to ship error to parent " + f"{actor._parent_chan.uid}, channel was closed" + ) + + +class Actor: + ''' + The fundamental "runtime" concurrency primitive. + + An *actor* is the combination of a regular Python process executing + a ``trio`` task tree, communicating with other actors through + "memory boundary portals" - which provide a native async API around + IPC transport "channels" which themselves encapsulate various + (swappable) network protocols. + + + Each "actor" is ``trio.run()`` scheduled "runtime" composed of many + concurrent tasks in a single thread. The "runtime" tasks conduct + a slew of low(er) level functions to make it possible for message + passing between actors as well as the ability to create new actors + (aka new "runtimes" in new processes which are supervised via + a nursery construct). Each task which sends messages to a task in + a "peer" (not necessarily a parent-child, depth hierarchy)) is able + to do so via an "address", which maps IPC connections across memory + boundaries, and task request id which allows for per-actor + tasks to send and receive messages to specific peer-actor tasks with + which there is an ongoing RPC/IPC dialog. + + ''' + # ugh, we need to get rid of this and replace with a "registry" sys + # https://github.com/goodboy/tractor/issues/216 is_arbiter: bool = False # nursery placeholders filled in by `_async_main()` after fork @@ -441,8 +481,8 @@ class Actor: # we need this for ``msgspec`` for some reason? # for now, it's been put in the stream backend. # trio.BrokenResourceError, - # trio.ClosedResourceError, + TransportClosed, ): # XXX: This may propagate up from ``Channel._aiter_recv()`` @@ -482,7 +522,49 @@ class Actor: # process received reponses. try: await self._process_messages(chan) + + except trio.Cancelled: + log.cancel(f"Msg loop was cancelled for {chan}") + raise + finally: + # This is set in ``Portal.cancel_actor()``. So if + # the peer was cancelled we try to wait for them + # to tear down their side of the connection before + # moving on with closing our own side. + local_nursery = self._actoruid2nursery.get(chan.uid) + if ( + local_nursery + ): + log.cancel(f"Waiting on cancel request to peer {chan.uid}") + # XXX: this is a soft wait on the channel (and its + # underlying transport protocol) to close from the remote + # peer side since we presume that any channel which + # is mapped to a sub-actor (i.e. it's managed by + # one of our local nurseries) + # message is sent to the peer likely by this actor which is + # now in a cancelled condition) when the local runtime here + # is now cancelled while (presumably) in the middle of msg + # loop processing. + with trio.move_on_after(0.1) as cs: + cs.shield = True + # Attempt to wait for the far end to close the channel + # and bail after timeout (2-generals on closure). + async for msg in chan.msgstream.drain(): + # try to deliver any lingering msgs + # before we destroy the channel. + # This accomplishes deterministic + # ``Portal.cancel_actor()`` cancellation by + # making sure any RPC response to that call is + # delivered the local calling task. + # TODO: factor this into a helper? + log.runtime(f'drained {msg} for {chan.uid}') + cid = msg.get('cid') + if cid: + # deliver response to local caller/waiter + await self._push_result(chan, cid, msg) + + await local_nursery.exited.wait() # channel cleanup sequence @@ -593,10 +675,12 @@ class Actor: func: str, kwargs: dict ) -> Tuple[str, trio.abc.ReceiveChannel]: - """Send a ``'cmd'`` message to a remote actor and return a + ''' + Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. - """ + + ''' cid = str(uuid.uuid4()) assert chan.uid send_chan, recv_chan = self.get_memchans(chan.uid, cid) @@ -609,11 +693,14 @@ class Actor: chan: Channel, shield: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: - """Process messages for the channel async-RPC style. + ''' + Process messages for the channel async-RPC style. Receive multiplexed RPC requests and deliver responses over ``chan``. - """ + + ''' # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! msg = None @@ -692,8 +779,9 @@ class Actor: # msg loop and break out into # ``_async_main()`` log.cancel( - f"Actor {self.uid} was remotely cancelled;" - " waiting on cancellation completion..") + f"Actor {self.uid} was remotely cancelled " + f"by {chan.uid}" + ) await _invoke( self, cid, chan, func, kwargs, is_rpc=False ) @@ -789,17 +877,12 @@ class Actor: # machinery not from an rpc task) to parent log.exception("Actor errored:") if self._parent_chan: - await self._parent_chan.send(pack_error(err)) + await try_ship_error_to_parent(self, err) # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" raise - except trio.Cancelled: - # debugging only - log.runtime(f"Msg loop was cancelled for {chan}") - raise - finally: # msg debugging for when he machinery is brokey log.runtime( @@ -891,6 +974,7 @@ class Actor: # establish primary connection with immediate parent self._parent_chan = None if parent_addr is not None: + self._parent_chan, accept_addr_rent = await self._from_parent( parent_addr) @@ -994,14 +1078,7 @@ class Actor: ) if self._parent_chan: - with trio.CancelScope(shield=True): - try: - # internal error so ship to parent without cid - await self._parent_chan.send(pack_error(err)) - except trio.ClosedResourceError: - log.error( - f"Failed to ship error to parent " - f"{self._parent_chan.uid}, channel was closed") + await try_ship_error_to_parent(self, err) # always! log.exception("Actor errored:") From 46070f99ded3d756c2e63b3fedbe04e755949d1d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Dec 2021 22:41:10 -0500 Subject: [PATCH 08/13] Factor soft-wait logic into a helper, use with mp --- tractor/_spawn.py | 78 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 55 insertions(+), 23 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 3857f36..d42a63a 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -5,7 +5,7 @@ Machinery for actor process spawning using multiple backends. import sys import multiprocessing as mp import platform -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union, Callable import trio from trio_typing import TaskStatus @@ -179,10 +179,38 @@ async def do_hard_kill( # XXX: should pretty much never get here unless we have # to move the bits from ``proc.__aexit__()`` out and # into here. - log.critical(f"HARD KILLING {proc}") + log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}") proc.kill() +async def soft_wait( + + proc: Union[mp.Process, trio.Process], + wait_func: Callable[ + Union[mp.Process, trio.Process], + None, + ], + portal: Portal, + +) -> None: + # Wait for proc termination but **dont' yet** call + # ``trio.Process.__aexit__()`` (it tears down stdio + # which will kill any waiting remote pdb trace). + # This is a "soft" (cancellable) join/reap. + try: + # await proc.wait() + await wait_func(proc) + except trio.Cancelled: + # if cancelled during a soft wait, cancel the child + # actor before entering the hard reap sequence + # below. This means we try to do a graceful teardown + # via sending a cancel message before getting out + # zombie killing tools. + with trio.CancelScope(shield=True): + await portal.cancel_actor() + raise + + async def new_proc( name: str, @@ -199,11 +227,14 @@ async def new_proc( task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: - """ - Create a new ``multiprocessing.Process`` using the - spawn method as configured using ``try_set_start_method()``. + ''' + Create a new ``Process`` using a "spawn method" as (configured using + ``try_set_start_method()``). - """ + This routine should be started in a actor runtime task and the logic + here is to be considered the core supervision strategy. + + ''' # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method uid = subactor.uid @@ -298,21 +329,14 @@ async def new_proc( errors ) - # Wait for proc termination but **dont' yet** call - # ``trio.Process.__aexit__()`` (it tears down stdio - # which will kill any waiting remote pdb trace). - # This is a "soft" (cancellable) join/reap. - try: - await proc.wait() - except trio.Cancelled: - # if cancelled during a soft wait, cancel the child - # actor before entering the hard reap sequence - # below. This means we try to do a graceful teardown - # via sending a cancel message before getting out - # zombie killing tools. - with trio.CancelScope(shield=True): - await portal.cancel_actor() - raise + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_wait( + proc, + trio.Process.wait, + portal + ) # cancel result waiter that may have been spawned in # tandem if not done already @@ -346,7 +370,7 @@ async def new_proc( log.debug(f"Joined {proc}") else: - log.warning(f'Nursery cancelled before sub-proc started') + log.warning('Nursery cancelled before sub-proc started') if not cancelled_during_spawn: # pop child entry to indicate we no longer managing this @@ -361,6 +385,7 @@ async def new_proc( actor_nursery=actor_nursery, subactor=subactor, errors=errors, + # passed through to actor main bind_addr=bind_addr, parent_addr=parent_addr, @@ -479,7 +504,14 @@ async def mp_new_proc( errors ) - await proc_waiter(proc) + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_wait( + proc, + proc_waiter, + portal + ) # cancel result waiter that may have been spawned in # tandem if not done already From a29924f330813d02d2e414f1d96cc26709475990 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Dec 2021 08:45:58 -0500 Subject: [PATCH 09/13] Don't assume exception order from nursery --- tests/test_cancellation.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 9f8ae0d..c346806 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -128,7 +128,11 @@ def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay): if len(exceptions) == 2: # sometimes oddly now there's an embedded BrokenResourceError ? - exceptions = exceptions[1].exceptions + for exc in exceptions: + excs = getattr(exc, 'exceptions', None) + if excs: + exceptions = excs + break assert len(exceptions) == num_subactors From e561a4908fd4ffb1e5d8ecc73bca408012878d85 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Dec 2021 12:34:27 -0500 Subject: [PATCH 10/13] Appease mypy --- tractor/_actor.py | 33 +++++++++++++++++---------------- tractor/_ipc.py | 37 +++++++++++++++++++++++++++---------- tractor/_spawn.py | 13 +++++++++---- 3 files changed, 53 insertions(+), 30 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index dbc34b0..de86df1 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -11,7 +11,7 @@ import importlib.util import inspect import uuid import typing -from typing import Dict, List, Tuple, Any, Optional, Union +from typing import List, Tuple, Any, Optional, Union from types import ModuleType import sys import os @@ -49,7 +49,7 @@ async def _invoke( cid: str, chan: Channel, func: typing.Callable, - kwargs: Dict[str, Any], + kwargs: dict[str, Any], is_rpc: bool = True, task_status: TaskStatus[ Union[trio.CancelScope, BaseException] @@ -267,21 +267,21 @@ _lifetime_stack: ExitStack = ExitStack() async def try_ship_error_to_parent( - actor: Actor, - err: Exception, + channel: Channel, + err: Union[Exception, trio.MultiError], ) -> None: with trio.CancelScope(shield=True): try: # internal error so ship to parent without cid - await actor._parent_chan.send(pack_error(err)) + await channel.send(pack_error(err)) except ( trio.ClosedResourceError, trio.BrokenResourceError, ): log.error( f"Failed to ship error to parent " - f"{actor._parent_chan.uid}, channel was closed" + f"{channel.uid}, channel was closed" ) @@ -319,7 +319,7 @@ class Actor: _server_n: Optional[trio.Nursery] = None # Information about `__main__` from parent - _parent_main_data: Dict[str, str] + _parent_main_data: dict[str, str] _parent_chan_cs: Optional[trio.CancelScope] = None # syncs for setup/teardown sequences @@ -357,7 +357,7 @@ class Actor: mods[name] = _get_mod_abspath(mod) self.enable_modules = mods - self._mods: Dict[str, ModuleType] = {} + self._mods: dict[str, ModuleType] = {} # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 @@ -380,12 +380,12 @@ class Actor: self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks.set() # (chan, cid) -> (cancel_scope, func) - self._rpc_tasks: Dict[ + self._rpc_tasks: dict[ Tuple[Channel, str], Tuple[trio.CancelScope, typing.Callable, trio.Event] ] = {} # map {uids -> {callids -> waiter queues}} - self._cids2qs: Dict[ + self._cids2qs: dict[ Tuple[Tuple[str, str], str], Tuple[ trio.abc.SendChannel[Any], @@ -396,7 +396,7 @@ class Actor: self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ Tuple[Any, Any, Any, Any, Any]] = None - self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa + self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa async def wait_for_peer( self, uid: Tuple[str, str] @@ -550,6 +550,7 @@ class Actor: cs.shield = True # Attempt to wait for the far end to close the channel # and bail after timeout (2-generals on closure). + assert chan.msgstream async for msg in chan.msgstream.drain(): # try to deliver any lingering msgs # before we destroy the channel. @@ -616,7 +617,7 @@ class Actor: self, chan: Channel, cid: str, - msg: Dict[str, Any], + msg: dict[str, Any], ) -> None: """Push an RPC result to the local consumer's queue. """ @@ -877,7 +878,7 @@ class Actor: # machinery not from an rpc task) to parent log.exception("Actor errored:") if self._parent_chan: - await try_ship_error_to_parent(self, err) + await try_ship_error_to_parent(self._parent_chan, err) # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" @@ -1078,7 +1079,7 @@ class Actor: ) if self._parent_chan: - await try_ship_error_to_parent(self, err) + await try_ship_error_to_parent(self._parent_chan, err) # always! log.exception("Actor errored:") @@ -1360,7 +1361,7 @@ class Arbiter(Actor): def __init__(self, *args, **kwargs): - self._registry: Dict[ + self._registry: dict[ Tuple[str, str], Tuple[str, int], ] = {} @@ -1377,7 +1378,7 @@ class Arbiter(Actor): async def get_registry( self - ) -> Dict[Tuple[str, str], Tuple[str, int]]: + ) -> dict[Tuple[str, str], Tuple[str, int]]: '''Return current name registry. This method is async to allow for cross-actor invocation. diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 97b63f1..47f8cd9 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -6,9 +6,10 @@ from __future__ import annotations import platform import struct import typing +from collections.abc import AsyncGenerator, AsyncIterator from typing import ( Any, Tuple, Optional, - Type, Protocol, TypeVar + Type, Protocol, TypeVar, ) from tricycle import BufferedReceiveStream @@ -46,6 +47,7 @@ MsgType = TypeVar("MsgType") class MsgTransport(Protocol[MsgType]): stream: trio.SocketStream + drained: list[MsgType] def __init__(self, stream: trio.SocketStream) -> None: ... @@ -63,6 +65,11 @@ class MsgTransport(Protocol[MsgType]): def connected(self) -> bool: ... + # defining this sync otherwise it causes a mypy error because it + # can't figure out it's a generator i guess?..? + def drain(self) -> AsyncIterator[dict]: + ... + @property def laddr(self) -> Tuple[str, int]: ... @@ -94,9 +101,9 @@ class MsgpackTCPStream: self._send_lock = trio.StrictFIFOLock() # public i guess? - self.drained = [] + self.drained: list[dict] = [] - async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: + async def _iter_packets(self) -> AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ unpacker = msgpack.Unpacker( @@ -159,7 +166,13 @@ class MsgpackTCPStream: async def recv(self) -> Any: return await self._agen.asend(None) - async def drain(self): + async def drain(self) -> AsyncIterator[dict]: + ''' + Drain the stream's remaining messages sent from + the far end until the connection is closed by + the peer. + + ''' try: async for msg in self._iter_packets(): self.drained.append(msg) @@ -196,7 +209,7 @@ class MsgspecTCPStream(MsgpackTCPStream): self.encode = msgspec.Encoder().encode self.decode = msgspec.Decoder().decode # dict[str, Any]) - async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: + async def _iter_packets(self) -> AsyncGenerator[dict, None]: '''Yield packets from the underlying stream. ''' @@ -458,9 +471,11 @@ class Channel: async def _aiter_recv( self - ) -> typing.AsyncGenerator[Any, None]: - """Async iterate items from underlying stream. - """ + ) -> AsyncGenerator[Any, None]: + ''' + Async iterate items from underlying stream. + + ''' assert self.msgstream while True: try: @@ -490,9 +505,11 @@ class Channel: async def _connect_chan( host: str, port: int ) -> typing.AsyncGenerator[Channel, None]: - """Create and connect a channel with disconnect on context manager + ''' + Create and connect a channel with disconnect on context manager teardown. - """ + + ''' chan = Channel((host, port)) await chan.connect() yield chan diff --git a/tractor/_spawn.py b/tractor/_spawn.py index d42a63a..04c1008 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -5,7 +5,11 @@ Machinery for actor process spawning using multiple backends. import sys import multiprocessing as mp import platform -from typing import Any, Dict, Optional, Union, Callable +from typing import ( + Any, Dict, Optional, Union, Callable, + TypeVar, +) +from collections.abc import Awaitable, Coroutine import trio from trio_typing import TaskStatus @@ -41,6 +45,7 @@ from ._exceptions import ActorFailure log = get_logger('tractor') +ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) # placeholder for an mp start context if so using that backend _ctx: Optional[mp.context.BaseContext] = None @@ -185,10 +190,10 @@ async def do_hard_kill( async def soft_wait( - proc: Union[mp.Process, trio.Process], + proc: ProcessType, wait_func: Callable[ - Union[mp.Process, trio.Process], - None, + [ProcessType], + Awaitable, ], portal: Portal, From 14f84571fbf6c7f78ed76518dd72935c20ae3892 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Dec 2021 12:42:29 -0500 Subject: [PATCH 11/13] Don't cancel receive streams inside `.cancel_actor()` We don't need to any more presuming you get ideal remote cancellation conditions where the remote actor should teardown and kill the streams from its end. --- tractor/_portal.py | 2 -- tractor/_spawn.py | 1 - 2 files changed, 3 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index d32f26b..c3a4340 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -218,8 +218,6 @@ class Portal: log.cancel("This portal is already closed can't cancel") return False - await self._cancel_streams() - log.cancel( f"Sending actor cancel request to {self.channel.uid} on " f"{self.channel}") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 04c1008..c7eb2d2 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -203,7 +203,6 @@ async def soft_wait( # which will kill any waiting remote pdb trace). # This is a "soft" (cancellable) join/reap. try: - # await proc.wait() await wait_func(proc) except trio.Cancelled: # if cancelled during a soft wait, cancel the child From 08e959330694cc2a0e417e3e8d2010355e5980cb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Dec 2021 14:14:49 -0500 Subject: [PATCH 12/13] Suppress broken resources errors in `Portal.cancel_actor()` --- tractor/_portal.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index c3a4340..70339fa 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -215,7 +215,7 @@ class Portal: ''' if not self.channel.connected(): - log.cancel("This portal is already closed can't cancel") + log.cancel("This channel is already closed can't cancel") return False log.cancel( @@ -239,9 +239,12 @@ class Portal: # if we get here some weird cancellation case happened return False - except trio.ClosedResourceError: + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): log.cancel( - f"{self.channel} for {self.channel.uid} was already closed?") + f"{self.channel} for {self.channel.uid} was already closed or broken?") return False async def run_from_ns( From 94a3cc532c7f0ff7b5bfca2b05981d8aa50c8f92 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Dec 2021 18:09:07 -0500 Subject: [PATCH 13/13] Add nooz --- newsfragments/267.misc.rst | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 newsfragments/267.misc.rst diff --git a/newsfragments/267.misc.rst b/newsfragments/267.misc.rst new file mode 100644 index 0000000..727a879 --- /dev/null +++ b/newsfragments/267.misc.rst @@ -0,0 +1,16 @@ +This (finally) adds fully acknowledged remote cancellation messaging +support for both explicit ``Portal.cancel_actor()`` calls as well as +when there is a "runtime-wide" cancellations (eg. during KBI or general +actor nursery exception handling which causes a full actor +"crash"/termination). + +You can think of this as the most ideal case in 2-generals where the +actor requesting the cancel of its child is able to always receive back +the ACK to that request. This leads to a more deterministic shutdown of +the child where the parent is able to wait for the child to fully +respond to the request. On a localhost setup, where the parent can +monitor the state of the child through process or other OS APIs instead +of solely through IPC messaging, the parent can know whether or not the +child decided to cancel with more certainty. In the case of separate +hosts, we still rely on a simple timeout approach until such a time +where we prefer to get "fancier".