diff --git a/tractor/_actor.py b/tractor/_actor.py index 1eafa43..95e8592 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -263,7 +263,7 @@ class Actor: self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ Tuple[Any, Any, Any, Any, Any]] = None - self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore + self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa async def wait_for_peer( self, uid: Tuple[str, str] @@ -341,8 +341,8 @@ class Actor: uid = await self._do_handshake(chan) except ( - trio.BrokenResourceError, - trio.ClosedResourceError, + # trio.BrokenResourceError, + # trio.ClosedResourceError, TransportClosed, ): # XXX: This may propagate up from ``Channel._aiter_recv()`` @@ -592,20 +592,16 @@ class Actor: except ( TransportClosed, - trio.BrokenResourceError, - trio.ClosedResourceError ): - # channels "breaking" is ok since we don't have a teardown - # handshake for them (yet) and instead we simply bail out - # of the message loop and expect the teardown sequence - # to clean up. - log.error(f"{chan} form {chan.uid} closed abruptly") - # raise - - except trio.ClosedResourceError: - log.error(f"{chan} form {chan.uid} broke") + # channels "breaking" (for TCP streams by EOF or 104 + # connection-reset) is ok since we don't have a teardown + # handshake for them (yet) and instead we simply bail out of + # the message loop and expect the teardown sequence to clean + # up. + log.debug(f'channel from {chan.uid} closed abruptly:\n{chan}') except (Exception, trio.MultiError) as err: + # ship any "internal" exception (i.e. one from internal machinery # not from an rpc task) to parent log.exception("Actor errored:") diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5dc1a2a..ec8981f 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -57,7 +57,26 @@ class MsgpackTCPStream: use_list=False, ) while True: - data = await self.stream.receive_some(2**10) + + try: + data = await self.stream.receive_some(2**10) + + except trio.BrokenResourceError as err: + msg = err.args[0] + + # XXX: handle connection-reset-by-peer the same as a EOF. + # we're currently remapping this since we allow + # a quick connect then drop for root actors when + # checking to see if there exists an "arbiter" + # on the chosen sockaddr (``_root.py:108`` or thereabouts) + if '[Errno 104]' in msg: + raise TransportClosed( + f'{self} was broken with {msg}' + ) + + else: + raise + log.trace(f"received {data}") # type: ignore if data == b'': @@ -175,11 +194,13 @@ class Channel: raise async def aclose(self) -> None: - log.debug(f"Closing {self}") + log.debug( + f'Closing channel to {self.uid} ' + f'{self.laddr} -> {self.raddr}' + ) assert self.msgstream await self.msgstream.stream.aclose() self._closed = True - log.error(f'CLOSING CHAN {self}') async def __aenter__(self): await self.connect() diff --git a/tractor/_root.py b/tractor/_root.py index 8f4eb9a..63152b0 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -105,6 +105,11 @@ async def open_root_actor( arbiter_found = False try: + # TODO: this connect-and-bail forces us to have to carefully + # rewrap TCP 104-connection-reset errors as EOF so as to avoid + # propagating cancel-causing errors to the channel-msg loop + # machinery. Likely it would be better to eventually have + # a "discovery" protocol with basic handshake instead. async with _connect_chan(host, port): arbiter_found = True @@ -174,8 +179,11 @@ async def open_root_actor( finally: logger.info("Shutting down root actor") - with trio.CancelScope(shield=True): - await actor.cancel() + try: + with trio.CancelScope(shield=True): + await actor.cancel() + except Exception as err: + log.warning('Root was already cancelled') finally: _state._current_actor = None logger.info("Root actor terminated")