diff --git a/piker/ipc.py b/piker/ipc.py index 0470a650..c643ccc8 100644 --- a/piker/ipc.py +++ b/piker/ipc.py @@ -59,6 +59,9 @@ class StreamQueue: async def __aiter__(self): return self._agen + def connected(self): + return self.stream.socket.fileno() != -1 + class Channel: """A channel to actors in other processes. @@ -101,7 +104,7 @@ class Channel: return self.squeue.raddr if self.squeue else (None, None) async def connect(self, destaddr: Tuple[str, int] = None, **kwargs): - if self.squeue is not None: + if self.connected(): raise RuntimeError("channel is already connected?") destaddr = destaddr or self._destaddr stream = await trio.open_tcp_stream(*destaddr, **kwargs) @@ -119,8 +122,10 @@ class Channel: if self._autorecon: await self._reconnect() return await self.recv() + self.squeue = None async def aclose(self, *args): + log.debug(f"Closing {self}") await self.squeue.stream.aclose() self.squeue = None @@ -178,6 +183,7 @@ class Channel: except trio.BrokenStreamError: if not self._autorecon: raise + await self.aclose() self.squeue = None if self._autorecon: # attempt reconnect await self._reconnect() @@ -186,4 +192,4 @@ class Channel: return def connected(self): - return self.squeue is not None + return self.squeue.connected() if self.squeue else False