From 593fd24a9e3de9c52c7444eb3ce966dc3e1784d4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 10:43:33 -0400 Subject: [PATCH] Attempt to gracefully handle channel breakage? --- tractor/_ipc.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index c516189..6296d94 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -141,8 +141,11 @@ class MsgspecTCPStream(MsgpackTCPStream): self.decode = msgspec.Decoder().decode # dict[str, Any]) async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: - """Yield packets from the underlying stream. - """ + '''Yield packets from the underlying stream. + + ''' + import msgspec # noqa + last_decode_failed: bool = False while True: try: @@ -172,7 +175,18 @@ class MsgspecTCPStream(MsgpackTCPStream): msg_bytes = await self.recv_stream.receive_exactly(size) log.transport(f"received {msg_bytes}") # type: ignore - yield self.decode(msg_bytes) + try: + assert not last_decode_failed + yield self.decode(msg_bytes) + except ( + msgspec.DecodingError, + UnicodeDecodeError, + ): + # ignore decoding errors for now and assume they have to + # do with a channel drop - hope that receiving from the + # channel will raise an expected error and bubble up. + log.error(f'`msgspec` failed to decode!?\n{msg_bytes}') + last_decode_failed = True async def send(self, data: Any) -> None: async with self._send_lock: