From fc90e1f1715516c2f3949d972d5a883f10f3eb0a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Feb 2022 09:25:35 -0500 Subject: [PATCH] Make `Actor._process_messages()` report disconnects The method now returns a `bool` which flags whether the transport died to the caller and allows for reporting a disconnect in the channel-transport handler task. This is something a user will normally want to know about on the caller side especially after seeing a traceback from the peer (if in tree) on console. --- tractor/_actor.py | 65 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 18 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 0991ed2..63ad616 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -200,7 +200,7 @@ async def _invoke( ctx = actor._contexts.pop((chan.uid, cid)) if ctx: log.runtime( - f'Context entrypoint for {func} was terminated:\n{ctx}' + f'Context entrypoint {func} was terminated:\n{ctx}' ) assert cs @@ -316,7 +316,9 @@ async def try_ship_error_to_parent( trio.ClosedResourceError, trio.BrokenResourceError, ): - log.error( + # in SC terms this is one of the worst things that can + # happen and creates the 2-general's dilemma. + log.critical( f"Failed to ship error to parent " f"{channel.uid}, channel was closed" ) @@ -560,33 +562,49 @@ class Actor: # append new channel self._peers[uid].append(chan) + local_nursery: Optional['ActorNursery'] = None # noqa + # Begin channel management - respond to remote requests and # process received reponses. try: - await self._process_messages(chan) + disconnected = await self._process_messages(chan) except trio.Cancelled: log.cancel(f"Msg loop was cancelled for {chan}") raise finally: + local_nursery = self._actoruid2nursery.get(uid, local_nursery) + # This is set in ``Portal.cancel_actor()``. So if # the peer was cancelled we try to wait for them # to tear down their side of the connection before # moving on with closing our own side. - local_nursery = self._actoruid2nursery.get(chan.uid) if ( local_nursery ): + if disconnected: + # if the transport died and this actor is still + # registered within a local nursery, we report that the + # IPC layer may have failed unexpectedly since it may be + # the cause of other downstream errors. + entry = local_nursery._children.get(uid) + if entry: + _, proc, _ = entry + if proc.poll() is not None: + log.error('Actor {uid} proc died and IPC broke?') + else: + log.error(f'Actor {uid} IPC connection broke!?') + log.cancel(f"Waiting on cancel request to peer {chan.uid}") # XXX: this is a soft wait on the channel (and its - # underlying transport protocol) to close from the remote - # peer side since we presume that any channel which - # is mapped to a sub-actor (i.e. it's managed by - # one of our local nurseries) - # message is sent to the peer likely by this actor which is - # now in a cancelled condition) when the local runtime here - # is now cancelled while (presumably) in the middle of msg + # underlying transport protocol) to close from the + # remote peer side since we presume that any channel + # which is mapped to a sub-actor (i.e. it's managed by + # one of our local nurseries) has a message is sent to + # the peer likely by this actor (which is now in + # a cancelled condition) when the local runtime here is + # now cancelled while (presumably) in the middle of msg # loop processing. with trio.move_on_after(0.5) as cs: cs.shield = True @@ -609,16 +627,19 @@ class Actor: await local_nursery.exited.wait() + # if local_nursery._children + # ``Channel`` teardown and closure sequence # Drop ref to channel so it can be gc-ed and disconnected log.runtime(f"Releasing channel {chan} from {chan.uid}") chans = self._peers.get(chan.uid) chans.remove(chan) + uid = chan.uid if not chans: log.runtime(f"No more channels for {chan.uid}") - self._peers.pop(chan.uid, None) + self._peers.pop(uid, None) # for (uid, cid) in self._contexts.copy(): # if chan.uid == uid: @@ -626,11 +647,13 @@ class Actor: log.runtime(f"Peers is {self._peers}") - if not self._peers: # no more channels connected + # No more channels to other actors (at all) registered + # as connected. + if not self._peers: log.runtime("Signalling no more peer channels") self._no_more_peers.set() - # # XXX: is this necessary (GC should do it?) + # XXX: is this necessary (GC should do it)? if chan.connected(): # if the channel is still connected it may mean the far # end has not closed and we may have gotten here due to @@ -665,8 +688,8 @@ class Actor: ctx = self._contexts[(uid, cid)] except KeyError: log.warning( - f'Ignoring msg from [no-longer/un]known context with {uid}:' - f'\n{msg}') + f'Ignoring msg from [no-longer/un]known context {uid}:' + f'\n{msg}') return send_chan = ctx._send_chan @@ -813,7 +836,7 @@ class Actor: shield: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - ) -> None: + ) -> bool: ''' Process messages for the channel async-RPC style. @@ -839,7 +862,7 @@ class Actor: if msg is None: # loop terminate sentinel log.cancel( - f"Channerl to {chan.uid} terminated?\n" + f"Channel to {chan.uid} terminated?\n" "Cancelling all associated tasks..") for (channel, cid) in self._rpc_tasks.copy(): @@ -986,6 +1009,9 @@ class Actor: # up. log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}') + # transport **was** disconnected + return True + except (Exception, trio.MultiError) as err: if nursery_cancelled_before_task: sn = self._service_n @@ -1010,6 +1036,9 @@ class Actor: f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") + # transport **was not** disconnected + return False + async def _from_parent( self, parent_addr: Optional[tuple[str, int]],