From 758fbc6790bf3bb95d4255ebe88da102d3bcff65 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Aug 2018 13:40:37 -0400 Subject: [PATCH] Drop `Channel.aiter_recv()` Internalize the implementation of this and expect client code to iterate the `Channel` directly. Resolves #16 --- tractor/_actor.py | 2 +- tractor/_ipc.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 1aa7efd..871fcc2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -278,7 +278,7 @@ class Actor: # worked out we'll likely want to use that! log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - async for msg in chan.aiter_recv(): + async for msg in chan: if msg is None: # terminate sentinel log.debug( f"Cancelling all tasks for {chan} from {chan.uid}") diff --git a/tractor/_ipc.py b/tractor/_ipc.py index c8a811b..2d86e56 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -87,6 +87,7 @@ class Channel: self._destaddr = destaddr or self.squeue.raddr # set after handshake - always uid of far end self.uid = None + self._agen = self._aiter_recv() def __repr__(self): if self.squeue: @@ -134,8 +135,8 @@ class Channel: async def __aexit__(self, *args): await self.aclose(*args) - async def __aiter__(self): - return self.aiter_recv() + def __aiter__(self): + return self._agen async def _reconnect(self): """Handle connection failures by polling until a reconnect can be @@ -166,7 +167,7 @@ class Channel: " for re-establishment") await trio.sleep(1) - async def aiter_recv(self): + async def _aiter_recv(self): """Async iterate items from underlying stream. """ while True: