forked from goodboy/tractor
Add our own "transport closed" signal
This change some super old (and bad) code from the project's very early days. For some redic reason i must have thought masking `trio`'s internal stream / transport errors and a TCP EOF as `StopAsyncIteration` somehow a good idea. The reality is you probably want to know the difference between an unexpected transport error and a simple EOF lol. This begins to resolve that by adding our own special `TransportClosed` error to signal the "graceful" termination of a channel's underlying transport. Oh, and this builds on the `msgspec` integration which helped shed light on the core issues here B)optional_msgspec_support
parent
95e35f3d60
commit
112117c1fc
|
@ -23,7 +23,7 @@ log = get_logger(__name__)
|
||||||
ms_decode = msgspec.Encoder().encode
|
ms_decode = msgspec.Encoder().encode
|
||||||
|
|
||||||
|
|
||||||
class MsgpackStream:
|
class MsgpackTCPStream:
|
||||||
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||||
using ``msgpack-python``.
|
using ``msgpack-python``.
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ class MsgpackStream:
|
||||||
return self.stream.socket.fileno() != -1
|
return self.stream.socket.fileno() != -1
|
||||||
|
|
||||||
|
|
||||||
class MsgspecStream(MsgpackStream):
|
class MsgspecTCPStream(MsgpackTCPStream):
|
||||||
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||||
using ``msgspec``.
|
using ``msgspec``.
|
||||||
|
|
||||||
|
@ -147,24 +147,22 @@ class MsgspecStream(MsgpackStream):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
header = await self.recv_stream.receive_exactly(4)
|
header = await self.recv_stream.receive_exactly(4)
|
||||||
if header is None:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if header == b'':
|
except (ValueError):
|
||||||
log.debug(f"Stream connection {self.raddr} was closed")
|
raise TransportClosed(
|
||||||
return
|
f'transport {self} was already closed prior ro read'
|
||||||
|
)
|
||||||
|
|
||||||
size, = struct.unpack("<I", header)
|
if header == b'':
|
||||||
|
raise TransportClosed(
|
||||||
|
f'transport {self} was already closed prior ro read'
|
||||||
|
)
|
||||||
|
|
||||||
log.trace(f'received header {size}')
|
size, = struct.unpack("<I", header)
|
||||||
|
|
||||||
msg_bytes = await self.recv_stream.receive_exactly(size)
|
log.trace(f'received header {size}')
|
||||||
|
|
||||||
# the value error here is to catch a connect with immediate
|
msg_bytes = await self.recv_stream.receive_exactly(size)
|
||||||
# disconnect that will cause an EOF error inside `tricycle`.
|
|
||||||
except (ValueError, trio.BrokenResourceError):
|
|
||||||
log.warning(f"Stream connection {self.raddr} broke")
|
|
||||||
return
|
|
||||||
|
|
||||||
log.trace(f"received {msg_bytes}") # type: ignore
|
log.trace(f"received {msg_bytes}") # type: ignore
|
||||||
yield decoder.decode(msg_bytes)
|
yield decoder.decode(msg_bytes)
|
||||||
|
@ -194,8 +192,8 @@ class Channel:
|
||||||
auto_reconnect: bool = False,
|
auto_reconnect: bool = False,
|
||||||
stream: trio.SocketStream = None, # expected to be active
|
stream: trio.SocketStream = None, # expected to be active
|
||||||
|
|
||||||
# stream_serializer: type = MsgpackStream,
|
# stream_serializer_type: type = MsgspecTCPStream,
|
||||||
stream_serializer_type: type = MsgspecStream,
|
stream_serializer_type: type = MsgpackTCPStream,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
@ -236,7 +234,6 @@ class Channel:
|
||||||
return self.msgstream.raddr if self.msgstream else None
|
return self.msgstream.raddr if self.msgstream else None
|
||||||
|
|
||||||
async def connect(
|
async def connect(
|
||||||
|
|
||||||
self,
|
self,
|
||||||
destaddr: Tuple[Any, ...] = None,
|
destaddr: Tuple[Any, ...] = None,
|
||||||
**kwargs
|
**kwargs
|
||||||
|
|
Loading…
Reference in New Issue