forked from goodboy/tractor
				
			Make `Actor._process_messages()` report disconnects
The method now returns a `bool` which flags whether the transport died to the caller and allows for reporting a disconnect in the channel-transport handler task. This is something a user will normally want to know about on the caller side especially after seeing a traceback from the peer (if in tree) on console.sigintsaviour_citesthackin
							parent
							
								
									bf0ac3116c
								
							
						
					
					
						commit
						206c7c0720
					
				| 
						 | 
					@ -200,7 +200,7 @@ async def _invoke(
 | 
				
			||||||
                ctx = actor._contexts.pop((chan.uid, cid))
 | 
					                ctx = actor._contexts.pop((chan.uid, cid))
 | 
				
			||||||
                if ctx:
 | 
					                if ctx:
 | 
				
			||||||
                    log.runtime(
 | 
					                    log.runtime(
 | 
				
			||||||
                        f'Context entrypoint for {func} was terminated:\n{ctx}'
 | 
					                        f'Context entrypoint {func} was terminated:\n{ctx}'
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            assert cs
 | 
					            assert cs
 | 
				
			||||||
| 
						 | 
					@ -316,7 +316,9 @@ async def try_ship_error_to_parent(
 | 
				
			||||||
            trio.ClosedResourceError,
 | 
					            trio.ClosedResourceError,
 | 
				
			||||||
            trio.BrokenResourceError,
 | 
					            trio.BrokenResourceError,
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
            log.error(
 | 
					            # in SC terms this is one of the worst things that can
 | 
				
			||||||
 | 
					            # happen and creates the 2-general's dilemma.
 | 
				
			||||||
 | 
					            log.critical(
 | 
				
			||||||
                f"Failed to ship error to parent "
 | 
					                f"Failed to ship error to parent "
 | 
				
			||||||
                f"{channel.uid}, channel was closed"
 | 
					                f"{channel.uid}, channel was closed"
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
| 
						 | 
					@ -560,33 +562,49 @@ class Actor:
 | 
				
			||||||
        # append new channel
 | 
					        # append new channel
 | 
				
			||||||
        self._peers[uid].append(chan)
 | 
					        self._peers[uid].append(chan)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        local_nursery: Optional['ActorNursery'] = None  # noqa
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Begin channel management - respond to remote requests and
 | 
					        # Begin channel management - respond to remote requests and
 | 
				
			||||||
        # process received reponses.
 | 
					        # process received reponses.
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            await self._process_messages(chan)
 | 
					            disconnected = await self._process_messages(chan)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        except trio.Cancelled:
 | 
					        except trio.Cancelled:
 | 
				
			||||||
            log.cancel(f"Msg loop was cancelled for {chan}")
 | 
					            log.cancel(f"Msg loop was cancelled for {chan}")
 | 
				
			||||||
            raise
 | 
					            raise
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        finally:
 | 
					        finally:
 | 
				
			||||||
 | 
					            local_nursery = self._actoruid2nursery.get(uid, local_nursery)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # This is set in ``Portal.cancel_actor()``. So if
 | 
					            # This is set in ``Portal.cancel_actor()``. So if
 | 
				
			||||||
            # the peer was cancelled we try to wait for them
 | 
					            # the peer was cancelled we try to wait for them
 | 
				
			||||||
            # to tear down their side of the connection before
 | 
					            # to tear down their side of the connection before
 | 
				
			||||||
            # moving on with closing our own side.
 | 
					            # moving on with closing our own side.
 | 
				
			||||||
            local_nursery = self._actoruid2nursery.get(chan.uid)
 | 
					 | 
				
			||||||
            if (
 | 
					            if (
 | 
				
			||||||
                local_nursery
 | 
					                local_nursery
 | 
				
			||||||
            ):
 | 
					            ):
 | 
				
			||||||
 | 
					                if disconnected:
 | 
				
			||||||
 | 
					                    # if the transport died and this actor is still
 | 
				
			||||||
 | 
					                    # registered within a local nursery, we report that the
 | 
				
			||||||
 | 
					                    # IPC layer may have failed unexpectedly since it may be
 | 
				
			||||||
 | 
					                    # the cause of other downstream errors.
 | 
				
			||||||
 | 
					                    entry = local_nursery._children.get(uid)
 | 
				
			||||||
 | 
					                    if entry:
 | 
				
			||||||
 | 
					                        _, proc, _ = entry
 | 
				
			||||||
 | 
					                        if proc.poll() is not None:
 | 
				
			||||||
 | 
					                            log.error('Actor {uid} proc died and IPC broke?')
 | 
				
			||||||
 | 
					                        else:
 | 
				
			||||||
 | 
					                            log.error(f'Actor {uid} IPC connection broke!?')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                log.cancel(f"Waiting on cancel request to peer {chan.uid}")
 | 
					                log.cancel(f"Waiting on cancel request to peer {chan.uid}")
 | 
				
			||||||
                # XXX: this is a soft wait on the channel (and its
 | 
					                # XXX: this is a soft wait on the channel (and its
 | 
				
			||||||
                # underlying transport protocol) to close from the remote
 | 
					                # underlying transport protocol) to close from the
 | 
				
			||||||
                # peer side since we presume that any channel which
 | 
					                # remote peer side since we presume that any channel
 | 
				
			||||||
                # is mapped to a sub-actor (i.e. it's managed by
 | 
					                # which is mapped to a sub-actor (i.e. it's managed by
 | 
				
			||||||
                # one of our local nurseries)
 | 
					                # one of our local nurseries) has a message is sent to
 | 
				
			||||||
                # message is sent to the peer likely by this actor which is
 | 
					                # the peer likely by this actor (which is now in
 | 
				
			||||||
                # now in a cancelled condition) when the local runtime here
 | 
					                # a cancelled condition) when the local runtime here is
 | 
				
			||||||
                # is now cancelled while (presumably) in the middle of msg
 | 
					                # now cancelled while (presumably) in the middle of msg
 | 
				
			||||||
                # loop processing.
 | 
					                # loop processing.
 | 
				
			||||||
                with trio.move_on_after(0.5) as cs:
 | 
					                with trio.move_on_after(0.5) as cs:
 | 
				
			||||||
                    cs.shield = True
 | 
					                    cs.shield = True
 | 
				
			||||||
| 
						 | 
					@ -609,16 +627,19 @@ class Actor:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    await local_nursery.exited.wait()
 | 
					                    await local_nursery.exited.wait()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # if local_nursery._children
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # ``Channel`` teardown and closure sequence
 | 
					            # ``Channel`` teardown and closure sequence
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # Drop ref to channel so it can be gc-ed and disconnected
 | 
					            # Drop ref to channel so it can be gc-ed and disconnected
 | 
				
			||||||
            log.runtime(f"Releasing channel {chan} from {chan.uid}")
 | 
					            log.runtime(f"Releasing channel {chan} from {chan.uid}")
 | 
				
			||||||
            chans = self._peers.get(chan.uid)
 | 
					            chans = self._peers.get(chan.uid)
 | 
				
			||||||
            chans.remove(chan)
 | 
					            chans.remove(chan)
 | 
				
			||||||
 | 
					            uid = chan.uid
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if not chans:
 | 
					            if not chans:
 | 
				
			||||||
                log.runtime(f"No more channels for {chan.uid}")
 | 
					                log.runtime(f"No more channels for {chan.uid}")
 | 
				
			||||||
                self._peers.pop(chan.uid, None)
 | 
					                self._peers.pop(uid, None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # for (uid, cid) in self._contexts.copy():
 | 
					                # for (uid, cid) in self._contexts.copy():
 | 
				
			||||||
                #     if chan.uid == uid:
 | 
					                #     if chan.uid == uid:
 | 
				
			||||||
| 
						 | 
					@ -626,11 +647,13 @@ class Actor:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            log.runtime(f"Peers is {self._peers}")
 | 
					            log.runtime(f"Peers is {self._peers}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            if not self._peers:  # no more channels connected
 | 
					            # No more channels to other actors (at all) registered
 | 
				
			||||||
 | 
					            # as connected.
 | 
				
			||||||
 | 
					            if not self._peers:
 | 
				
			||||||
                log.runtime("Signalling no more peer channels")
 | 
					                log.runtime("Signalling no more peer channels")
 | 
				
			||||||
                self._no_more_peers.set()
 | 
					                self._no_more_peers.set()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # # XXX: is this necessary (GC should do it?)
 | 
					            # XXX: is this necessary (GC should do it)?
 | 
				
			||||||
            if chan.connected():
 | 
					            if chan.connected():
 | 
				
			||||||
                # if the channel is still connected it may mean the far
 | 
					                # if the channel is still connected it may mean the far
 | 
				
			||||||
                # end has not closed and we may have gotten here due to
 | 
					                # end has not closed and we may have gotten here due to
 | 
				
			||||||
| 
						 | 
					@ -665,7 +688,7 @@ class Actor:
 | 
				
			||||||
            ctx = self._contexts[(uid, cid)]
 | 
					            ctx = self._contexts[(uid, cid)]
 | 
				
			||||||
        except KeyError:
 | 
					        except KeyError:
 | 
				
			||||||
            log.warning(
 | 
					            log.warning(
 | 
				
			||||||
                    f'Ignoring msg from [no-longer/un]known context with {uid}:'
 | 
					                f'Ignoring msg from [no-longer/un]known context {uid}:'
 | 
				
			||||||
                f'\n{msg}')
 | 
					                f'\n{msg}')
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -813,7 +836,7 @@ class Actor:
 | 
				
			||||||
        shield: bool = False,
 | 
					        shield: bool = False,
 | 
				
			||||||
        task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
 | 
					        task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> bool:
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        Process messages for the channel async-RPC style.
 | 
					        Process messages for the channel async-RPC style.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -839,7 +862,7 @@ class Actor:
 | 
				
			||||||
                    if msg is None:  # loop terminate sentinel
 | 
					                    if msg is None:  # loop terminate sentinel
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        log.cancel(
 | 
					                        log.cancel(
 | 
				
			||||||
                            f"Channerl to {chan.uid} terminated?\n"
 | 
					                            f"Channel to {chan.uid} terminated?\n"
 | 
				
			||||||
                            "Cancelling all associated tasks..")
 | 
					                            "Cancelling all associated tasks..")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        for (channel, cid) in self._rpc_tasks.copy():
 | 
					                        for (channel, cid) in self._rpc_tasks.copy():
 | 
				
			||||||
| 
						 | 
					@ -986,6 +1009,9 @@ class Actor:
 | 
				
			||||||
            # up.
 | 
					            # up.
 | 
				
			||||||
            log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
 | 
					            log.runtime(f'channel from {chan.uid} closed abruptly:\n{chan}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # transport **was** disconnected
 | 
				
			||||||
 | 
					            return True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        except (Exception, trio.MultiError) as err:
 | 
					        except (Exception, trio.MultiError) as err:
 | 
				
			||||||
            if nursery_cancelled_before_task:
 | 
					            if nursery_cancelled_before_task:
 | 
				
			||||||
                sn = self._service_n
 | 
					                sn = self._service_n
 | 
				
			||||||
| 
						 | 
					@ -1010,6 +1036,9 @@ class Actor:
 | 
				
			||||||
                f"Exiting msg loop for {chan} from {chan.uid} "
 | 
					                f"Exiting msg loop for {chan} from {chan.uid} "
 | 
				
			||||||
                f"with last msg:\n{msg}")
 | 
					                f"with last msg:\n{msg}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # transport **was not** disconnected
 | 
				
			||||||
 | 
					        return False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def _from_parent(
 | 
					    async def _from_parent(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        parent_addr: Optional[tuple[str, int]],
 | 
					        parent_addr: Optional[tuple[str, int]],
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue