From 1976e61d1acb8a23f365220246c32bdfbb29cb1d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Dec 2021 18:46:15 -0500 Subject: [PATCH] Add `.drain()` support to msg streams Enables "draining" the last set of messages after a channel/stream has been terminated mostly for the purposes of receiving a final ACK to a remote cancel command. Also, add an internal `Channel._cancel_called` flag which can be set by `Portal.cancel_actor()`. --- tractor/_ipc.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 28bef97..97b63f1 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -93,6 +93,9 @@ class MsgpackTCPStream: self._agen = self._iter_packets() self._send_lock = trio.StrictFIFOLock() + # public i guess? + self.drained = [] + async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ @@ -132,7 +135,7 @@ class MsgpackTCPStream: if data == b'': raise TransportClosed( - f'transport {self} was already closed prior ro read' + f'transport {self} was already closed prior to read' ) unpacker.feed(data) @@ -156,6 +159,14 @@ class MsgpackTCPStream: async def recv(self) -> Any: return await self._agen.asend(None) + async def drain(self): + try: + async for msg in self._iter_packets(): + self.drained.append(msg) + except TransportClosed: + for msg in self.drained: + yield msg + def __aiter__(self): return self._agen @@ -164,7 +175,8 @@ class MsgpackTCPStream: class MsgspecTCPStream(MsgpackTCPStream): - '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data + ''' + A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgspec``. ''' @@ -259,9 +271,12 @@ def get_msg_transport( class Channel: - '''An inter-process channel for communication between (remote) actors. + ''' + An inter-process channel for communication between (remote) actors. - Currently the only supported transport is a ``trio.SocketStream``. + Wraps a ``MsgStream``: transport + encoding IPC connection. + Currently we only support ``trio.SocketStream`` for transport + (aka TCP). ''' def __init__( @@ -299,10 +314,12 @@ class Channel: # set after handshake - always uid of far end self.uid: Optional[Tuple[str, str]] = None - # set if far end actor errors internally - self._exc: Optional[Exception] = None self._agen = self._aiter_recv() + self._exc: Optional[Exception] = None # set if far end actor errors self._closed: bool = False + # flag set on ``Portal.cancel_actor()`` indicating + # remote (peer) cancellation of the far end actor runtime. + self._cancel_called: bool = False # set on ``Portal.cancel_actor()`` @classmethod def from_stream(