diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 4ddb95c..e0879a3 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -274,9 +274,9 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): @asynccontextmanager async def subscribe( self, - ) -> AsyncIterator[BroadcastReceiver]: - '''Allocate and return a ``BroadcastReceiver`` which delegates + ''' + Allocate and return a ``BroadcastReceiver`` which delegates to this message stream. This allows multiple local tasks to receive each their own copy @@ -692,6 +692,7 @@ class Context: result = consume(msg) if result: self._result = result + break if not self._result: while True: