forked from goodboy/tractor
Attempt to gracefully handle channel breakage?
parent
bb8452dbdb
commit
593fd24a9e
|
@ -141,8 +141,11 @@ class MsgspecTCPStream(MsgpackTCPStream):
|
||||||
self.decode = msgspec.Decoder().decode # dict[str, Any])
|
self.decode = msgspec.Decoder().decode # dict[str, Any])
|
||||||
|
|
||||||
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
|
||||||
"""Yield packets from the underlying stream.
|
'''Yield packets from the underlying stream.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
|
import msgspec # noqa
|
||||||
|
last_decode_failed: bool = False
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
@ -172,7 +175,18 @@ class MsgspecTCPStream(MsgpackTCPStream):
|
||||||
msg_bytes = await self.recv_stream.receive_exactly(size)
|
msg_bytes = await self.recv_stream.receive_exactly(size)
|
||||||
|
|
||||||
log.transport(f"received {msg_bytes}") # type: ignore
|
log.transport(f"received {msg_bytes}") # type: ignore
|
||||||
|
try:
|
||||||
|
assert not last_decode_failed
|
||||||
yield self.decode(msg_bytes)
|
yield self.decode(msg_bytes)
|
||||||
|
except (
|
||||||
|
msgspec.DecodingError,
|
||||||
|
UnicodeDecodeError,
|
||||||
|
):
|
||||||
|
# ignore decoding errors for now and assume they have to
|
||||||
|
# do with a channel drop - hope that receiving from the
|
||||||
|
# channel will raise an expected error and bubble up.
|
||||||
|
log.error(f'`msgspec` failed to decode!?\n{msg_bytes}')
|
||||||
|
last_decode_failed = True
|
||||||
|
|
||||||
async def send(self, data: Any) -> None:
|
async def send(self, data: Any) -> None:
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
|
|
Loading…
Reference in New Issue