From a2c4f0c80bc088e4619da270c5dd709a9f0be532 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 22 Apr 2018 12:48:35 -0400 Subject: [PATCH] Don't recurse in Client.aiter_recv() --- piker/brokers/core.py | 46 +++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 6797ead4..6d62d14a 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -119,13 +119,13 @@ class Client: startup_seq: Coroutine, auto_reconnect: bool = True, ): - self._sockaddr = sockaddr + self.sockaddr = sockaddr self._startup_seq = startup_seq self._autorecon = auto_reconnect self.squeue = None async def connect(self, sockaddr: tuple = None, **kwargs): - sockaddr = sockaddr or self._sockaddr + sockaddr = sockaddr or self.sockaddr stream = await trio.open_tcp_stream(*sockaddr, **kwargs) self.squeue = StreamQueue(stream) await self._startup_seq(self) @@ -146,7 +146,7 @@ class Client: await self.squeue.stream.aclose() async def __aenter__(self): - await self.connect(self._sockaddr) + await self.connect(self.sockaddr) return self async def __aexit__(self, *args): @@ -163,7 +163,8 @@ class Client: await self.connect() cancelled = cancel_scope.cancelled_caught if cancelled: - log.warn("Reconnect timed out after 3 seconds, retrying...") + log.warn( + "Reconnect timed out after 3 seconds, retrying...") continue else: log.warn("Stream connection re-established!") @@ -172,23 +173,25 @@ class Client: if not down: down = True log.warn( - f"Connection to {self._sockaddr} went down, waiting" + f"Connection to {self.sockaddr} went down, waiting" " for re-establishment") await trio.sleep(1) async def aiter_recv(self): """Async iterate items from underlying stream. """ - try: - async for item in self.squeue: - yield item - except trio.BrokenStreamError as err: - if not self._autorecon: - raise - if self._autorecon: # attempt reconnect - await self._reconnect() - async for item in self.aiter_recv(): - yield item + while True: + try: + async for item in self.squeue: + yield item + except trio.BrokenStreamError as err: + if not self._autorecon: + raise + if self._autorecon: # attempt reconnect + await self._reconnect() + continue + else: + return async def stream_quotes( @@ -232,11 +235,6 @@ async def stream_quotes( postquote_start = time.time() q_payloads = {} for symbol, quote in quotes.items(): - # FIXME: None is returned if a symbol can't be found. - # Consider filtering out such symbols before starting poll loop - if quote is None: - continue - if diff_cached: # if cache is enabled then only deliver "new" changes last = _cache.setdefault(symbol, {}) @@ -301,7 +299,6 @@ async def start_quoter( if broker not in broker2tickersubs: brokermod = get_brokermod(broker) - log.info(f"Spawning quote streamer for broker {broker}") # TODO: move to AsyncExitStack in 3.7 client_cntxmng = brokermod.get_client() @@ -310,12 +307,12 @@ async def start_quoter( clients[broker] = ( brokermod, client, client_cntxmng, get_quotes) tickers2qs = broker2tickersubs.setdefault( - broker, {}.fromkeys(tickers, {queue,})) + broker, {}.fromkeys(tickers, {queue, })) else: log.info(f"Subscribing with existing `{broker}` daemon") brokermod, client, _, get_quotes = clients[broker] tickers2qs = broker2tickersubs[broker] - # update map from each symbol to requesting new client's queue + # update map from each symbol to requesting client's queue for ticker in tickers: tickers2qs.setdefault(ticker, set()).add(queue) @@ -331,7 +328,8 @@ async def start_quoter( tickers = set(tickers) - set(quotes) for ticker in tickers: log.warn( - f"Symbol `{ticker}` not found by broker `{brokermod.name}`") + f"Symbol `{ticker}` not found by broker `{brokermod.name}`" + ) tickers2qs.pop(ticker, None) # pop any tickers that return "empty" quotes