diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 28bef97..97b63f1 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -93,6 +93,9 @@ class MsgpackTCPStream: self._agen = self._iter_packets() self._send_lock = trio.StrictFIFOLock() + # public i guess? + self.drained = [] + async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ @@ -132,7 +135,7 @@ class MsgpackTCPStream: if data == b'': raise TransportClosed( - f'transport {self} was already closed prior ro read' + f'transport {self} was already closed prior to read' ) unpacker.feed(data) @@ -156,6 +159,14 @@ class MsgpackTCPStream: async def recv(self) -> Any: return await self._agen.asend(None) + async def drain(self): + try: + async for msg in self._iter_packets(): + self.drained.append(msg) + except TransportClosed: + for msg in self.drained: + yield msg + def __aiter__(self): return self._agen @@ -164,7 +175,8 @@ class MsgpackTCPStream: class MsgspecTCPStream(MsgpackTCPStream): - '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data + ''' + A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgspec``. ''' @@ -259,9 +271,12 @@ def get_msg_transport( class Channel: - '''An inter-process channel for communication between (remote) actors. + ''' + An inter-process channel for communication between (remote) actors. - Currently the only supported transport is a ``trio.SocketStream``. + Wraps a ``MsgStream``: transport + encoding IPC connection. + Currently we only support ``trio.SocketStream`` for transport + (aka TCP). ''' def __init__( @@ -299,10 +314,12 @@ class Channel: # set after handshake - always uid of far end self.uid: Optional[Tuple[str, str]] = None - # set if far end actor errors internally - self._exc: Optional[Exception] = None self._agen = self._aiter_recv() + self._exc: Optional[Exception] = None # set if far end actor errors self._closed: bool = False + # flag set on ``Portal.cancel_actor()`` indicating + # remote (peer) cancellation of the far end actor runtime. + self._cancel_called: bool = False # set on ``Portal.cancel_actor()`` @classmethod def from_stream(