Attempt to gracefully handle channel breakage?

optional_msgspec_support
Tyler Goodlet 2021-09-07 10:43:33 -04:00
parent 19d6885243
commit 076f37c589
1 changed files with 17 additions and 3 deletions
tractor

View File

@ -141,8 +141,11 @@ class MsgspecTCPStream(MsgpackTCPStream):
self.decode = msgspec.Decoder().decode # dict[str, Any])
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream.
"""
'''Yield packets from the underlying stream.
'''
import msgspec # noqa
last_decode_failed: bool = False
while True:
try:
@ -172,7 +175,18 @@ class MsgspecTCPStream(MsgpackTCPStream):
msg_bytes = await self.recv_stream.receive_exactly(size)
log.transport(f"received {msg_bytes}") # type: ignore
yield self.decode(msg_bytes)
try:
assert not last_decode_failed
yield self.decode(msg_bytes)
except (
msgspec.DecodingError,
UnicodeDecodeError,
):
# ignore decoding errors for now and assume they have to
# do with a channel drop - hope that receiving from the
# channel will raise an expected error and bubble up.
log.error(f'`msgspec` failed to decode!?\n{msg_bytes}')
last_decode_failed = True
async def send(self, data: Any) -> None:
async with self._send_lock: