Add StreamQueue.connected()
parent
ddf27e5e7f
commit
1d3fde4a4d
10
piker/ipc.py
10
piker/ipc.py
|
@ -59,6 +59,9 @@ class StreamQueue:
|
||||||
async def __aiter__(self):
|
async def __aiter__(self):
|
||||||
return self._agen
|
return self._agen
|
||||||
|
|
||||||
|
def connected(self):
|
||||||
|
return self.stream.socket.fileno() != -1
|
||||||
|
|
||||||
|
|
||||||
class Channel:
|
class Channel:
|
||||||
"""A channel to actors in other processes.
|
"""A channel to actors in other processes.
|
||||||
|
@ -101,7 +104,7 @@ class Channel:
|
||||||
return self.squeue.raddr if self.squeue else (None, None)
|
return self.squeue.raddr if self.squeue else (None, None)
|
||||||
|
|
||||||
async def connect(self, destaddr: Tuple[str, int] = None, **kwargs):
|
async def connect(self, destaddr: Tuple[str, int] = None, **kwargs):
|
||||||
if self.squeue is not None:
|
if self.connected():
|
||||||
raise RuntimeError("channel is already connected?")
|
raise RuntimeError("channel is already connected?")
|
||||||
destaddr = destaddr or self._destaddr
|
destaddr = destaddr or self._destaddr
|
||||||
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
|
stream = await trio.open_tcp_stream(*destaddr, **kwargs)
|
||||||
|
@ -119,8 +122,10 @@ class Channel:
|
||||||
if self._autorecon:
|
if self._autorecon:
|
||||||
await self._reconnect()
|
await self._reconnect()
|
||||||
return await self.recv()
|
return await self.recv()
|
||||||
|
self.squeue = None
|
||||||
|
|
||||||
async def aclose(self, *args):
|
async def aclose(self, *args):
|
||||||
|
log.debug(f"Closing {self}")
|
||||||
await self.squeue.stream.aclose()
|
await self.squeue.stream.aclose()
|
||||||
self.squeue = None
|
self.squeue = None
|
||||||
|
|
||||||
|
@ -178,6 +183,7 @@ class Channel:
|
||||||
except trio.BrokenStreamError:
|
except trio.BrokenStreamError:
|
||||||
if not self._autorecon:
|
if not self._autorecon:
|
||||||
raise
|
raise
|
||||||
|
await self.aclose()
|
||||||
self.squeue = None
|
self.squeue = None
|
||||||
if self._autorecon: # attempt reconnect
|
if self._autorecon: # attempt reconnect
|
||||||
await self._reconnect()
|
await self._reconnect()
|
||||||
|
@ -186,4 +192,4 @@ class Channel:
|
||||||
return
|
return
|
||||||
|
|
||||||
def connected(self):
|
def connected(self):
|
||||||
return self.squeue is not None
|
return self.squeue.connected() if self.squeue else False
|
||||||
|
|
Loading…
Reference in New Issue