Add `.drain()` support to msg streams

Enables "draining" the last set of messages after a channel/stream has
been terminated mostly for the purposes of receiving a final ACK to
a remote cancel command. Also, add an internal `Channel._cancel_called`
flag which can be set by `Portal.cancel_actor()`.
acked_backup
Tyler Goodlet 2021-12-01 18:46:15 -05:00
parent 0ac3397dbb
commit 1976e61d1a
1 changed files with 23 additions and 6 deletions

View File

@ -93,6 +93,9 @@ class MsgpackTCPStream:
self._agen = self._iter_packets()
self._send_lock = trio.StrictFIFOLock()
# public i guess?
self.drained = []
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream.
"""
@ -132,7 +135,7 @@ class MsgpackTCPStream:
if data == b'':
raise TransportClosed(
f'transport {self} was already closed prior ro read'
f'transport {self} was already closed prior to read'
)
unpacker.feed(data)
@ -156,6 +159,14 @@ class MsgpackTCPStream:
async def recv(self) -> Any:
return await self._agen.asend(None)
async def drain(self):
try:
async for msg in self._iter_packets():
self.drained.append(msg)
except TransportClosed:
for msg in self.drained:
yield msg
def __aiter__(self):
return self._agen
@ -164,7 +175,8 @@ class MsgpackTCPStream:
class MsgspecTCPStream(MsgpackTCPStream):
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
'''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgspec``.
'''
@ -259,9 +271,12 @@ def get_msg_transport(
class Channel:
'''An inter-process channel for communication between (remote) actors.
'''
An inter-process channel for communication between (remote) actors.
Currently the only supported transport is a ``trio.SocketStream``.
Wraps a ``MsgStream``: transport + encoding IPC connection.
Currently we only support ``trio.SocketStream`` for transport
(aka TCP).
'''
def __init__(
@ -299,10 +314,12 @@ class Channel:
# set after handshake - always uid of far end
self.uid: Optional[Tuple[str, str]] = None
# set if far end actor errors internally
self._exc: Optional[Exception] = None
self._agen = self._aiter_recv()
self._exc: Optional[Exception] = None # set if far end actor errors
self._closed: bool = False
# flag set on ``Portal.cancel_actor()`` indicating
# remote (peer) cancellation of the far end actor runtime.
self._cancel_called: bool = False # set on ``Portal.cancel_actor()``
@classmethod
def from_stream(