diff --git a/tractor/_ipc.py b/tractor/_ipc.py index c516189..6296d94 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -141,8 +141,11 @@ class MsgspecTCPStream(MsgpackTCPStream): self.decode = msgspec.Decoder().decode # dict[str, Any]) async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: - """Yield packets from the underlying stream. - """ + '''Yield packets from the underlying stream. + + ''' + import msgspec # noqa + last_decode_failed: bool = False while True: try: @@ -172,7 +175,18 @@ class MsgspecTCPStream(MsgpackTCPStream): msg_bytes = await self.recv_stream.receive_exactly(size) log.transport(f"received {msg_bytes}") # type: ignore - yield self.decode(msg_bytes) + try: + assert not last_decode_failed + yield self.decode(msg_bytes) + except ( + msgspec.DecodingError, + UnicodeDecodeError, + ): + # ignore decoding errors for now and assume they have to + # do with a channel drop - hope that receiving from the + # channel will raise an expected error and bubble up. + log.error(f'`msgspec` failed to decode!?\n{msg_bytes}') + last_decode_failed = True async def send(self, data: Any) -> None: async with self._send_lock: