Add our own "transport closed" signal

This change some super old (and bad) code from the project's very early
days. For some redic reason i must have thought masking `trio`'s
internal stream / transport errors and a TCP EOF as `StopAsyncIteration`
somehow a good idea. The reality is you probably
want to know the difference between an unexpected transport error
and a simple EOF lol. This begins to resolve that by adding our own
special `TransportClosed` error to signal the "graceful" termination of
a channel's underlying transport. Oh, and this builds on the `msgspec`
integration which helped shed light on the core issues here B)
transport_cleaning
Tyler Goodlet 2021-06-24 18:49:51 -04:00
parent 44d7988204
commit 80e100f818
2 changed files with 46 additions and 16 deletions

View File

@ -38,6 +38,10 @@ class InternalActorError(RemoteActorError):
""" """
class TransportClosed(trio.ClosedResourceError):
"Underlying channel transport was closed prior to use"
class NoResult(RuntimeError): class NoResult(RuntimeError):
"No final result is expected for this actor" "No final result is expected for this actor"
@ -63,12 +67,15 @@ def pack_error(exc: BaseException) -> Dict[str, Any]:
def unpack_error( def unpack_error(
msg: Dict[str, Any], msg: Dict[str, Any],
chan=None, chan=None,
err_type=RemoteActorError err_type=RemoteActorError
) -> Exception: ) -> Exception:
"""Unpack an 'error' message from the wire """Unpack an 'error' message from the wire
into a local ``RemoteActorError``. into a local ``RemoteActorError``.
""" """
tb_str = msg['error'].get('tb_str', '') tb_str = msg['error'].get('tb_str', '')
return err_type( return err_type(

View File

@ -10,7 +10,8 @@ import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from .log import get_logger from .log import get_logger
log = get_logger('ipc') from ._exceptions import TransportClosed
log = get_logger(__name__)
# :eyeroll: # :eyeroll:
try: try:
@ -21,10 +22,17 @@ except ImportError:
Unpacker = partial(msgpack.Unpacker, strict_map_key=False) Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
class MsgpackStream: class MsgpackTCPStream:
"""A ``trio.SocketStream`` delivering ``msgpack`` formatted data. '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
""" using ``msgpack-python``.
def __init__(self, stream: trio.SocketStream) -> None:
'''
def __init__(
self,
stream: trio.SocketStream,
) -> None:
self.stream = stream self.stream = stream
assert self.stream.socket assert self.stream.socket
# should both be IP sockets # should both be IP sockets
@ -35,7 +43,10 @@ class MsgpackStream:
assert isinstance(rsockname, tuple) assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2] self._raddr = rsockname[:2]
# start and seed first entry to read loop
self._agen = self._iter_packets() self._agen = self._iter_packets()
# self._agen.asend(None) is None
self._send_lock = trio.StrictFIFOLock() self._send_lock = trio.StrictFIFOLock()
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
@ -46,16 +57,13 @@ class MsgpackStream:
use_list=False, use_list=False,
) )
while True: while True:
try: data = await self.stream.receive_some(2**10)
data = await self.stream.receive_some(2**10) log.trace(f"received {data}") # type: ignore
log.trace(f"received {data}") # type: ignore
except trio.BrokenResourceError:
log.warning(f"Stream connection {self.raddr} broke")
return
if data == b'': if data == b'':
log.debug(f"Stream connection {self.raddr} was closed") raise TransportClosed(
return f'transport {self} was already closed prior ro read'
)
unpacker.feed(data) unpacker.feed(data)
for packet in unpacker: for packet in unpacker:
@ -96,10 +104,11 @@ class Channel:
on_reconnect: typing.Callable[..., typing.Awaitable] = None, on_reconnect: typing.Callable[..., typing.Awaitable] = None,
auto_reconnect: bool = False, auto_reconnect: bool = False,
stream: trio.SocketStream = None, # expected to be active stream: trio.SocketStream = None, # expected to be active
) -> None: ) -> None:
self._recon_seq = on_reconnect self._recon_seq = on_reconnect
self._autorecon = auto_reconnect self._autorecon = auto_reconnect
self.msgstream: Optional[MsgpackStream] = MsgpackStream( self.msgstream: Optional[MsgpackTCPStream] = MsgpackTCPStream(
stream) if stream else None stream) if stream else None
if self.msgstream and destaddr: if self.msgstream and destaddr:
raise ValueError( raise ValueError(
@ -112,6 +121,8 @@ class Channel:
self._exc: Optional[Exception] = None self._exc: Optional[Exception] = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
self._closed: bool = False
def __repr__(self) -> str: def __repr__(self) -> str:
if self.msgstream: if self.msgstream:
return repr( return repr(
@ -128,35 +139,47 @@ class Channel:
return self.msgstream.raddr if self.msgstream else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, destaddr: Tuple[Any, ...] = None, self,
destaddr: Tuple[Any, ...] = None,
**kwargs **kwargs
) -> trio.SocketStream: ) -> trio.SocketStream:
if self.connected(): if self.connected():
raise RuntimeError("channel is already connected?") raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr destaddr = destaddr or self._destaddr
assert isinstance(destaddr, tuple) assert isinstance(destaddr, tuple)
stream = await trio.open_tcp_stream(*destaddr, **kwargs) stream = await trio.open_tcp_stream(*destaddr, **kwargs)
self.msgstream = MsgpackStream(stream) self.msgstream = MsgpackTCPStream(stream)
return stream return stream
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
log.trace(f"send `{item}`") # type: ignore log.trace(f"send `{item}`") # type: ignore
assert self.msgstream assert self.msgstream
await self.msgstream.send(item) await self.msgstream.send(item)
async def recv(self) -> Any: async def recv(self) -> Any:
assert self.msgstream assert self.msgstream
try: try:
return await self.msgstream.recv() return await self.msgstream.recv()
except trio.BrokenResourceError: except trio.BrokenResourceError:
if self._autorecon: if self._autorecon:
await self._reconnect() await self._reconnect()
return await self.recv() return await self.recv()
raise
async def aclose(self) -> None: async def aclose(self) -> None:
log.debug(f"Closing {self}") log.debug(f"Closing {self}")
assert self.msgstream assert self.msgstream
await self.msgstream.stream.aclose() await self.msgstream.stream.aclose()
self._closed = True
log.error(f'CLOSING CHAN {self}')
async def __aenter__(self): async def __aenter__(self):
await self.connect() await self.connect()