diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 6137590..f6d9f47 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -38,6 +38,10 @@ class InternalActorError(RemoteActorError): """ +class TransportClosed(trio.ClosedResourceError): + "Underlying channel transport was closed prior to use" + + class NoResult(RuntimeError): "No final result is expected for this actor" @@ -63,12 +67,15 @@ def pack_error(exc: BaseException) -> Dict[str, Any]: def unpack_error( + msg: Dict[str, Any], chan=None, err_type=RemoteActorError + ) -> Exception: """Unpack an 'error' message from the wire into a local ``RemoteActorError``. + """ tb_str = msg['error'].get('tb_str', '') return err_type( diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 48023ea..f3a7b44 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -2,6 +2,7 @@ Inter-process comms abstractions """ from functools import partial +import math import struct import typing from typing import Any, Tuple, Optional @@ -13,6 +14,7 @@ import trio from async_generator import asynccontextmanager from .log import get_logger +from ._exceptions import TransportClosed log = get_logger(__name__) # :eyeroll: @@ -24,7 +26,7 @@ except ImportError: Unpacker = partial(msgpack.Unpacker, strict_map_key=False) -class MsgpackStream: +class MsgpackTCPStream: '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgpack-python``. @@ -47,7 +49,10 @@ class MsgpackStream: assert isinstance(rsockname, tuple) self._raddr = rsockname[:2] + # start and seed first entry to read loop self._agen = self._iter_packets() + # self._agen.asend(None) is None + self._send_lock = trio.StrictFIFOLock() async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: @@ -58,16 +63,13 @@ class MsgpackStream: use_list=False, ) while True: - try: - data = await self.stream.receive_some(2**10) - log.trace(f"received {data}") # type: ignore - except trio.BrokenResourceError: - log.warning(f"Stream connection {self.raddr} broke") - return + data = await self.stream.receive_some(2**10) + log.trace(f"received {data}") # type: ignore if data == b'': - log.debug(f"Stream connection {self.raddr} was closed") - return + raise TransportClosed( + f'transport {self} was already closed prior ro read' + ) unpacker.feed(data) for packet in unpacker: @@ -98,7 +100,7 @@ class MsgpackStream: return self.stream.socket.fileno() != -1 -class MsgspecStream(MsgpackStream): +class MsgspecTCPStream(MsgpackTCPStream): '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgspec``. @@ -123,24 +125,22 @@ class MsgspecStream(MsgpackStream): while True: try: header = await self.recv_stream.receive_exactly(4) - if header is None: - continue - if header == b'': - log.debug(f"Stream connection {self.raddr} was closed") - return + except (ValueError): + raise TransportClosed( + f'transport {self} was already closed prior ro read' + ) - size, = struct.unpack(" None: @@ -192,6 +193,8 @@ class Channel: self._exc: Optional[Exception] = None self._agen = self._aiter_recv() + self._closed: bool = False + def __repr__(self) -> str: if self.msgstream: return repr( @@ -208,35 +211,52 @@ class Channel: return self.msgstream.raddr if self.msgstream else None async def connect( - self, destaddr: Tuple[Any, ...] = None, + self, + destaddr: Tuple[Any, ...] = None, **kwargs + ) -> trio.SocketStream: + if self.connected(): raise RuntimeError("channel is already connected?") + destaddr = destaddr or self._destaddr assert isinstance(destaddr, tuple) - stream = await trio.open_tcp_stream(*destaddr, **kwargs) + + stream = await trio.open_tcp_stream( + *destaddr, + happy_eyeballs_delay=math.inf, + **kwargs + ) self.msgstream = self.stream_serializer_type(stream) return stream async def send(self, item: Any) -> None: + log.trace(f"send `{item}`") # type: ignore assert self.msgstream + await self.msgstream.send(item) async def recv(self) -> Any: assert self.msgstream + try: return await self.msgstream.recv() + except trio.BrokenResourceError: if self._autorecon: await self._reconnect() return await self.recv() + raise + async def aclose(self) -> None: log.debug(f"Closing {self}") assert self.msgstream await self.msgstream.stream.aclose() + self._closed = True + log.error(f'CLOSING CHAN {self}') async def __aenter__(self): await self.connect()