diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5989a2e..6c47205 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -23,7 +23,7 @@ log = get_logger(__name__) ms_decode = msgspec.Encoder().encode -class MsgpackStream: +class MsgpackTCPStream: '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgpack-python``. @@ -122,7 +122,7 @@ class MsgpackStream: return self.stream.socket.fileno() != -1 -class MsgspecStream(MsgpackStream): +class MsgspecTCPStream(MsgpackTCPStream): '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgspec``. @@ -147,24 +147,22 @@ class MsgspecStream(MsgpackStream): while True: try: header = await self.recv_stream.receive_exactly(4) - if header is None: - continue - if header == b'': - log.debug(f"Stream connection {self.raddr} was closed") - return + except (ValueError): + raise TransportClosed( + f'transport {self} was already closed prior ro read' + ) - size, = struct.unpack(" None: @@ -236,7 +234,6 @@ class Channel: return self.msgstream.raddr if self.msgstream else None async def connect( - self, destaddr: Tuple[Any, ...] = None, **kwargs