From 63d067792c9cd2029a4dbc66be2e206537b7ed30 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Mar 2019 22:46:49 -0400 Subject: [PATCH] Rename `StreamQueue` to `MsgpackStream` Prepares for other possible interchange formats plus it wasn't really a queue, just a TCP stream wrapper + `msgpack` interchange. --- tractor/_ipc.py | 49 ++++++++++++++++++++++++------------------------- 1 file changed, 24 insertions(+), 25 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 0477740..1cddeca 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -13,8 +13,8 @@ from .log import get_logger log = get_logger('ipc') -class StreamQueue: - """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. +class MsgpackStream: + """A ``trio.SocketStream`` delivering ``msgpack`` formatted data. """ def __init__(self, stream: trio.SocketStream) -> None: self.stream = stream @@ -51,12 +51,12 @@ class StreamQueue: def raddr(self) -> Tuple[str, int]: return self._raddr - async def put(self, data: Any) -> int: + async def send(self, data: Any) -> int: async with self._send_lock: return await self.stream.send_all( msgpack.dumps(data, use_bin_type=True)) - async def get(self) -> Any: + async def recv(self) -> Any: return await self._agen.asend(None) def __aiter__(self): @@ -67,10 +67,9 @@ class StreamQueue: class Channel: - """A channel to actors in other processes. + """An inter-process channel for communication between (remote) actors. - Use this to talk to any micro-service daemon or other client(s) over a - a transport managed by ``trio``. + Currently the only supported transport is a ``trio.SocketStream``. """ def __init__( self, @@ -81,13 +80,13 @@ class Channel: ) -> None: self._recon_seq = on_reconnect self._autorecon = auto_reconnect - self.squeue: Optional[StreamQueue] = StreamQueue( + self.msgstream: Optional[MsgpackStream] = MsgpackStream( stream) if stream else None - if self.squeue and destaddr: + if self.msgstream and destaddr: raise ValueError( f"A stream was provided with local addr {self.laddr}" ) - self._destaddr = self.squeue.raddr if self.squeue else destaddr + self._destaddr = self.msgstream.raddr if self.msgstream else destaddr # set after handshake - always uid of far end self.uid: Optional[Tuple[str, str]] = None # set if far end actor errors internally @@ -95,19 +94,19 @@ class Channel: self._agen = self._aiter_recv() def __repr__(self) -> str: - if self.squeue: + if self.msgstream: return repr( - self.squeue.stream.socket._sock).replace( + self.msgstream.stream.socket._sock).replace( "socket.socket", "Channel") return object.__repr__(self) @property def laddr(self) -> Optional[Tuple[str, int]]: - return self.squeue.laddr if self.squeue else None + return self.msgstream.laddr if self.msgstream else None @property def raddr(self) -> Optional[Tuple[str, int]]: - return self.squeue.raddr if self.squeue else None + return self.msgstream.raddr if self.msgstream else None async def connect( self, destaddr: Tuple[str, int] = None, **kwargs @@ -116,18 +115,18 @@ class Channel: raise RuntimeError("channel is already connected?") destaddr = destaddr or self._destaddr stream = await trio.open_tcp_stream(*destaddr, **kwargs) - self.squeue = StreamQueue(stream) + self.msgstream = MsgpackStream(stream) return stream async def send(self, item: Any) -> None: log.trace(f"send `{item}`") # type: ignore - assert self.squeue - await self.squeue.put(item) + assert self.msgstream + await self.msgstream.send(item) async def recv(self) -> Any: - assert self.squeue + assert self.msgstream try: - return await self.squeue.get() + return await self.msgstream.recv() except trio.BrokenResourceError: if self._autorecon: await self._reconnect() @@ -135,8 +134,8 @@ class Channel: async def aclose(self) -> None: log.debug(f"Closing {self}") - assert self.squeue - await self.squeue.stream.aclose() + assert self.msgstream + await self.msgstream.stream.aclose() async def __aenter__(self): await self.connect() @@ -182,16 +181,16 @@ class Channel: ) -> typing.AsyncGenerator[Any, None]: """Async iterate items from underlying stream. """ - assert self.squeue + assert self.msgstream while True: try: - async for item in self.squeue: + async for item in self.msgstream: yield item # sent = yield item # if sent is not None: # # optimization, passing None through all the # # time is pointless - # await self.squeue.put(sent) + # await self.msgstream.send(sent) except trio.BrokenResourceError: if not self._autorecon: raise @@ -203,7 +202,7 @@ class Channel: return def connected(self) -> bool: - return self.squeue.connected() if self.squeue else False + return self.msgstream.connected() if self.msgstream else False @dataclass(frozen=True)