forked from goodboy/tractor
1
0
Fork 0

Add our own "transport closed" signal

This change some super old (and bad) code from the project's very early
days. For some redic reason i must have thought masking `trio`'s
internal stream / transport errors and a TCP EOF as `StopAsyncIteration`
somehow a good idea. The reality is you probably
want to know the difference between an unexpected transport error
and a simple EOF lol. This begins to resolve that by adding our own
special `TransportClosed` error to signal the "graceful" termination of
a channel's underlying transport. Oh, and this builds on the `msgspec`
integration which helped shed light on the core issues here B)
msgspec_infect_asyncio
Tyler Goodlet 2021-06-24 18:49:51 -04:00
parent bc6af2219e
commit fdd2da238a
1 changed files with 15 additions and 18 deletions

View File

@ -23,7 +23,7 @@ log = get_logger(__name__)
ms_decode = msgspec.Encoder().encode ms_decode = msgspec.Encoder().encode
class MsgpackStream: class MsgpackTCPStream:
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgpack-python``. using ``msgpack-python``.
@ -122,7 +122,7 @@ class MsgpackStream:
return self.stream.socket.fileno() != -1 return self.stream.socket.fileno() != -1
class MsgspecStream(MsgpackStream): class MsgspecTCPStream(MsgpackTCPStream):
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgspec``. using ``msgspec``.
@ -147,12 +147,16 @@ class MsgspecStream(MsgpackStream):
while True: while True:
try: try:
header = await self.recv_stream.receive_exactly(4) header = await self.recv_stream.receive_exactly(4)
if header is None:
continue except (ValueError):
raise TransportClosed(
f'transport {self} was already closed prior ro read'
)
if header == b'': if header == b'':
log.debug(f"Stream connection {self.raddr} was closed") raise TransportClosed(
return f'transport {self} was already closed prior ro read'
)
size, = struct.unpack("<I", header) size, = struct.unpack("<I", header)
@ -160,12 +164,6 @@ class MsgspecStream(MsgpackStream):
msg_bytes = await self.recv_stream.receive_exactly(size) msg_bytes = await self.recv_stream.receive_exactly(size)
# the value error here is to catch a connect with immediate
# disconnect that will cause an EOF error inside `tricycle`.
except (ValueError, trio.BrokenResourceError):
log.warning(f"Stream connection {self.raddr} broke")
return
log.trace(f"received {msg_bytes}") # type: ignore log.trace(f"received {msg_bytes}") # type: ignore
yield decoder.decode(msg_bytes) yield decoder.decode(msg_bytes)
@ -194,8 +192,8 @@ class Channel:
auto_reconnect: bool = False, auto_reconnect: bool = False,
stream: trio.SocketStream = None, # expected to be active stream: trio.SocketStream = None, # expected to be active
# stream_serializer: type = MsgpackStream, # stream_serializer_type: type = MsgspecTCPStream,
stream_serializer_type: type = MsgspecStream, stream_serializer_type: type = MsgpackTCPStream,
) -> None: ) -> None:
@ -236,7 +234,6 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, self,
destaddr: Tuple[Any, ...] = None, destaddr: Tuple[Any, ...] = None,
**kwargs **kwargs