From 1edf5c2f06f77432bc0d4198e177b4f420cd4be4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 3 Jul 2021 18:57:54 -0400 Subject: [PATCH] Specially remap TCP 104-connection-reset to `TransportClosed` Since we currently have no real "discovery protocol" between process trees, the current naive approach is to check via a connect and drop to see if a TCP server is bound to a particular address during root actor startup. This was a historical decision and had no real grounding beyond taking a simple approach to get something working when the project was first started. This is obviously problematic from an error handling perspective since we need to be able to avoid such quick connect-and-drops from cancelling an "arbiter"'s (registry actor's) channel-msg loop machinery (which would propagate and cancel the actor). For now we map this particular TCP error, which gets remapped by `trio` as a `trio.BrokenResourceError` to our own internal `TransportClosed` which is swallowed by channel message loop processing and indicates a graceful teardown of the far end actor. --- tractor/_actor.py | 24 ++++++++++-------------- tractor/_ipc.py | 27 ++++++++++++++++++++++++--- tractor/_root.py | 12 ++++++++++-- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 1eafa43..95e8592 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -263,7 +263,7 @@ class Actor: self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ Tuple[Any, Any, Any, Any, Any]] = None - self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore + self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa async def wait_for_peer( self, uid: Tuple[str, str] @@ -341,8 +341,8 @@ class Actor: uid = await self._do_handshake(chan) except ( - trio.BrokenResourceError, - trio.ClosedResourceError, + # trio.BrokenResourceError, + # trio.ClosedResourceError, TransportClosed, ): # XXX: This may propagate up from ``Channel._aiter_recv()`` @@ -592,20 +592,16 @@ class Actor: except ( TransportClosed, - trio.BrokenResourceError, - trio.ClosedResourceError ): - # channels "breaking" is ok since we don't have a teardown - # handshake for them (yet) and instead we simply bail out - # of the message loop and expect the teardown sequence - # to clean up. - log.error(f"{chan} form {chan.uid} closed abruptly") - # raise - - except trio.ClosedResourceError: - log.error(f"{chan} form {chan.uid} broke") + # channels "breaking" (for TCP streams by EOF or 104 + # connection-reset) is ok since we don't have a teardown + # handshake for them (yet) and instead we simply bail out of + # the message loop and expect the teardown sequence to clean + # up. + log.debug(f'channel from {chan.uid} closed abruptly:\n{chan}') except (Exception, trio.MultiError) as err: + # ship any "internal" exception (i.e. one from internal machinery # not from an rpc task) to parent log.exception("Actor errored:") diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5dc1a2a..ec8981f 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -57,7 +57,26 @@ class MsgpackTCPStream: use_list=False, ) while True: - data = await self.stream.receive_some(2**10) + + try: + data = await self.stream.receive_some(2**10) + + except trio.BrokenResourceError as err: + msg = err.args[0] + + # XXX: handle connection-reset-by-peer the same as a EOF. + # we're currently remapping this since we allow + # a quick connect then drop for root actors when + # checking to see if there exists an "arbiter" + # on the chosen sockaddr (``_root.py:108`` or thereabouts) + if '[Errno 104]' in msg: + raise TransportClosed( + f'{self} was broken with {msg}' + ) + + else: + raise + log.trace(f"received {data}") # type: ignore if data == b'': @@ -175,11 +194,13 @@ class Channel: raise async def aclose(self) -> None: - log.debug(f"Closing {self}") + log.debug( + f'Closing channel to {self.uid} ' + f'{self.laddr} -> {self.raddr}' + ) assert self.msgstream await self.msgstream.stream.aclose() self._closed = True - log.error(f'CLOSING CHAN {self}') async def __aenter__(self): await self.connect() diff --git a/tractor/_root.py b/tractor/_root.py index 8f4eb9a..63152b0 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -105,6 +105,11 @@ async def open_root_actor( arbiter_found = False try: + # TODO: this connect-and-bail forces us to have to carefully + # rewrap TCP 104-connection-reset errors as EOF so as to avoid + # propagating cancel-causing errors to the channel-msg loop + # machinery. Likely it would be better to eventually have + # a "discovery" protocol with basic handshake instead. async with _connect_chan(host, port): arbiter_found = True @@ -174,8 +179,11 @@ async def open_root_actor( finally: logger.info("Shutting down root actor") - with trio.CancelScope(shield=True): - await actor.cancel() + try: + with trio.CancelScope(shield=True): + await actor.cancel() + except Exception as err: + log.warning('Root was already cancelled') finally: _state._current_actor = None logger.info("Root actor terminated")