Add StreamQueue.connected()

asyncgen_closing_fix
Tyler Goodlet 2018-07-04 03:16:00 -04:00
parent 82f22b76e5
commit bf08310224
1 changed files with 8 additions and 2 deletions

View File

@ -59,6 +59,9 @@ class StreamQueue:
async def __aiter__(self): async def __aiter__(self):
return self._agen return self._agen
def connected(self):
return self.stream.socket.fileno() != -1
class Channel: class Channel:
"""A channel to actors in other processes. """A channel to actors in other processes.
@ -101,7 +104,7 @@ class Channel:
return self.squeue.raddr if self.squeue else (None, None) return self.squeue.raddr if self.squeue else (None, None)
async def connect(self, destaddr: Tuple[str, int] = None, **kwargs): async def connect(self, destaddr: Tuple[str, int] = None, **kwargs):
if self.squeue is not None: if self.connected():
raise RuntimeError("channel is already connected?") raise RuntimeError("channel is already connected?")
destaddr = destaddr or self._destaddr destaddr = destaddr or self._destaddr
stream = await trio.open_tcp_stream(*destaddr, **kwargs) stream = await trio.open_tcp_stream(*destaddr, **kwargs)
@ -119,8 +122,10 @@ class Channel:
if self._autorecon: if self._autorecon:
await self._reconnect() await self._reconnect()
return await self.recv() return await self.recv()
self.squeue = None
async def aclose(self, *args): async def aclose(self, *args):
log.debug(f"Closing {self}")
await self.squeue.stream.aclose() await self.squeue.stream.aclose()
self.squeue = None self.squeue = None
@ -178,6 +183,7 @@ class Channel:
except trio.BrokenStreamError: except trio.BrokenStreamError:
if not self._autorecon: if not self._autorecon:
raise raise
await self.aclose()
self.squeue = None self.squeue = None
if self._autorecon: # attempt reconnect if self._autorecon: # attempt reconnect
await self._reconnect() await self._reconnect()
@ -186,4 +192,4 @@ class Channel:
return return
def connected(self): def connected(self):
return self.squeue is not None return self.squeue.connected() if self.squeue else False