From d81eb1a51ec9732db3e3def201f6e8cc07aa61b9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Dec 2021 22:17:09 -0500 Subject: [PATCH] Finally, deterministic remote cancellation support On msg loop termination we now check and see if a channel is associated with a child-actor registered in some local task's nursery. If so, we attempt to wait on channel closure initiated from the child side (by draining the underlying msg stream) so as to avoid closing it too early resulting in the child not relaying its termination status response. This means we now support the ideal case in 2-general's where we get back the ack to the closure request instead of just ignoring it and timing out XD The main implementation detail is that when `Portal.cancel_actor()` remotely calls `Actor.cancel()` we actually wait for the RPC response from that request before allowing the channel shutdown sequence to engage. The new msg stream draining support enables this. Also, factor child-to-parent error propagation logic into a helper func and improve some docs (yeah yeah y'all don't like the ''', i don't care - it makes my eyes not hurt). --- tractor/_actor.py | 133 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 105 insertions(+), 28 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index a9ce438..dbc34b0 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -2,6 +2,7 @@ Actor primitives and helpers """ +from __future__ import annotations from collections import defaultdict from functools import partial from itertools import chain @@ -57,6 +58,8 @@ async def _invoke( ''' Invoke local func and deliver result(s) over provided channel. + This is the core "RPC task" starting machinery. + ''' __tracebackhide__ = True treat_as_gen = False @@ -263,14 +266,51 @@ def _get_mod_abspath(module): _lifetime_stack: ExitStack = ExitStack() -class Actor: - """The fundamental concurrency primitive. +async def try_ship_error_to_parent( + actor: Actor, + err: Exception, - An *actor* is the combination of a regular Python process - executing a ``trio`` task tree, communicating - with other actors through "portals" which provide a native async API - around various IPC transport "channels". - """ +) -> None: + with trio.CancelScope(shield=True): + try: + # internal error so ship to parent without cid + await actor._parent_chan.send(pack_error(err)) + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + log.error( + f"Failed to ship error to parent " + f"{actor._parent_chan.uid}, channel was closed" + ) + + +class Actor: + ''' + The fundamental "runtime" concurrency primitive. + + An *actor* is the combination of a regular Python process executing + a ``trio`` task tree, communicating with other actors through + "memory boundary portals" - which provide a native async API around + IPC transport "channels" which themselves encapsulate various + (swappable) network protocols. + + + Each "actor" is ``trio.run()`` scheduled "runtime" composed of many + concurrent tasks in a single thread. The "runtime" tasks conduct + a slew of low(er) level functions to make it possible for message + passing between actors as well as the ability to create new actors + (aka new "runtimes" in new processes which are supervised via + a nursery construct). Each task which sends messages to a task in + a "peer" (not necessarily a parent-child, depth hierarchy)) is able + to do so via an "address", which maps IPC connections across memory + boundaries, and task request id which allows for per-actor + tasks to send and receive messages to specific peer-actor tasks with + which there is an ongoing RPC/IPC dialog. + + ''' + # ugh, we need to get rid of this and replace with a "registry" sys + # https://github.com/goodboy/tractor/issues/216 is_arbiter: bool = False # nursery placeholders filled in by `_async_main()` after fork @@ -441,8 +481,8 @@ class Actor: # we need this for ``msgspec`` for some reason? # for now, it's been put in the stream backend. # trio.BrokenResourceError, - # trio.ClosedResourceError, + TransportClosed, ): # XXX: This may propagate up from ``Channel._aiter_recv()`` @@ -482,7 +522,49 @@ class Actor: # process received reponses. try: await self._process_messages(chan) + + except trio.Cancelled: + log.cancel(f"Msg loop was cancelled for {chan}") + raise + finally: + # This is set in ``Portal.cancel_actor()``. So if + # the peer was cancelled we try to wait for them + # to tear down their side of the connection before + # moving on with closing our own side. + local_nursery = self._actoruid2nursery.get(chan.uid) + if ( + local_nursery + ): + log.cancel(f"Waiting on cancel request to peer {chan.uid}") + # XXX: this is a soft wait on the channel (and its + # underlying transport protocol) to close from the remote + # peer side since we presume that any channel which + # is mapped to a sub-actor (i.e. it's managed by + # one of our local nurseries) + # message is sent to the peer likely by this actor which is + # now in a cancelled condition) when the local runtime here + # is now cancelled while (presumably) in the middle of msg + # loop processing. + with trio.move_on_after(0.1) as cs: + cs.shield = True + # Attempt to wait for the far end to close the channel + # and bail after timeout (2-generals on closure). + async for msg in chan.msgstream.drain(): + # try to deliver any lingering msgs + # before we destroy the channel. + # This accomplishes deterministic + # ``Portal.cancel_actor()`` cancellation by + # making sure any RPC response to that call is + # delivered the local calling task. + # TODO: factor this into a helper? + log.runtime(f'drained {msg} for {chan.uid}') + cid = msg.get('cid') + if cid: + # deliver response to local caller/waiter + await self._push_result(chan, cid, msg) + + await local_nursery.exited.wait() # channel cleanup sequence @@ -593,10 +675,12 @@ class Actor: func: str, kwargs: dict ) -> Tuple[str, trio.abc.ReceiveChannel]: - """Send a ``'cmd'`` message to a remote actor and return a + ''' + Send a ``'cmd'`` message to a remote actor and return a caller id and a ``trio.Queue`` that can be used to wait for responses delivered by the local message processing loop. - """ + + ''' cid = str(uuid.uuid4()) assert chan.uid send_chan, recv_chan = self.get_memchans(chan.uid, cid) @@ -609,11 +693,14 @@ class Actor: chan: Channel, shield: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: - """Process messages for the channel async-RPC style. + ''' + Process messages for the channel async-RPC style. Receive multiplexed RPC requests and deliver responses over ``chan``. - """ + + ''' # TODO: once https://github.com/python-trio/trio/issues/467 gets # worked out we'll likely want to use that! msg = None @@ -692,8 +779,9 @@ class Actor: # msg loop and break out into # ``_async_main()`` log.cancel( - f"Actor {self.uid} was remotely cancelled;" - " waiting on cancellation completion..") + f"Actor {self.uid} was remotely cancelled " + f"by {chan.uid}" + ) await _invoke( self, cid, chan, func, kwargs, is_rpc=False ) @@ -789,17 +877,12 @@ class Actor: # machinery not from an rpc task) to parent log.exception("Actor errored:") if self._parent_chan: - await self._parent_chan.send(pack_error(err)) + await try_ship_error_to_parent(self, err) # if this is the `MainProcess` we expect the error broadcasting # above to trigger an error at consuming portal "checkpoints" raise - except trio.Cancelled: - # debugging only - log.runtime(f"Msg loop was cancelled for {chan}") - raise - finally: # msg debugging for when he machinery is brokey log.runtime( @@ -891,6 +974,7 @@ class Actor: # establish primary connection with immediate parent self._parent_chan = None if parent_addr is not None: + self._parent_chan, accept_addr_rent = await self._from_parent( parent_addr) @@ -994,14 +1078,7 @@ class Actor: ) if self._parent_chan: - with trio.CancelScope(shield=True): - try: - # internal error so ship to parent without cid - await self._parent_chan.send(pack_error(err)) - except trio.ClosedResourceError: - log.error( - f"Failed to ship error to parent " - f"{self._parent_chan.uid}, channel was closed") + await try_ship_error_to_parent(self, err) # always! log.exception("Actor errored:")