From fdd2da238a88d7c0dd4a597eedfa0e3df8055dd5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Jun 2021 18:49:51 -0400 Subject: [PATCH] Add our own "transport closed" signal This change some super old (and bad) code from the project's very early days. For some redic reason i must have thought masking `trio`'s internal stream / transport errors and a TCP EOF as `StopAsyncIteration` somehow a good idea. The reality is you probably want to know the difference between an unexpected transport error and a simple EOF lol. This begins to resolve that by adding our own special `TransportClosed` error to signal the "graceful" termination of a channel's underlying transport. Oh, and this builds on the `msgspec` integration which helped shed light on the core issues here B) --- tractor/_ipc.py | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5989a2e..6c47205 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -23,7 +23,7 @@ log = get_logger(__name__) ms_decode = msgspec.Encoder().encode -class MsgpackStream: +class MsgpackTCPStream: '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgpack-python``. @@ -122,7 +122,7 @@ class MsgpackStream: return self.stream.socket.fileno() != -1 -class MsgspecStream(MsgpackStream): +class MsgspecTCPStream(MsgpackTCPStream): '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgspec``. @@ -147,24 +147,22 @@ class MsgspecStream(MsgpackStream): while True: try: header = await self.recv_stream.receive_exactly(4) - if header is None: - continue - if header == b'': - log.debug(f"Stream connection {self.raddr} was closed") - return + except (ValueError): + raise TransportClosed( + f'transport {self} was already closed prior ro read' + ) - size, = struct.unpack(" None: @@ -236,7 +234,6 @@ class Channel: return self.msgstream.raddr if self.msgstream else None async def connect( - self, destaddr: Tuple[Any, ...] = None, **kwargs