diff --git a/tractor/_actor.py b/tractor/_actor.py index 1aa7efd..871fcc2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -278,7 +278,7 @@ class Actor: # worked out we'll likely want to use that! log.debug(f"Entering msg loop for {chan} from {chan.uid}") try: - async for msg in chan.aiter_recv(): + async for msg in chan: if msg is None: # terminate sentinel log.debug( f"Cancelling all tasks for {chan} from {chan.uid}") diff --git a/tractor/_ipc.py b/tractor/_ipc.py index c8a811b..2d86e56 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -87,6 +87,7 @@ class Channel: self._destaddr = destaddr or self.squeue.raddr # set after handshake - always uid of far end self.uid = None + self._agen = self._aiter_recv() def __repr__(self): if self.squeue: @@ -134,8 +135,8 @@ class Channel: async def __aexit__(self, *args): await self.aclose(*args) - async def __aiter__(self): - return self.aiter_recv() + def __aiter__(self): + return self._agen async def _reconnect(self): """Handle connection failures by polling until a reconnect can be @@ -166,7 +167,7 @@ class Channel: " for re-establishment") await trio.sleep(1) - async def aiter_recv(self): + async def _aiter_recv(self): """Async iterate items from underlying stream. """ while True: