From f80735121ca28e026dc9d97bad63adae34556e55 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Apr 2018 13:34:26 -0400 Subject: [PATCH] Use an async generator inside `StreamQueue` Async generators are faster and less code. Handle segmented packets which can happen during periods of high quote volume. Move per-broker rate limit logic into daemon task. --- piker/brokers/core.py | 52 ++++++++++++++++++++++++++++++------------- piker/cli.py | 6 ----- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index e199ecd2..db3ec785 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -80,36 +80,49 @@ class StreamQueue: self.stream = stream self._delim = delim self.peer = stream.socket.getpeername() + self._agen = self._iter_packets() - async def get(self): + async def _iter_packets(self): """Get a packet from the underlying stream. """ delim = self._delim buff = b'' while True: - data = await self.stream.receive_some(2**10) + packets = [] + try: + data = await self.stream.receive_some(2**10) + except trio.BrokenStreamError as err: + log.debug("Stream connection was broken") + return + log.trace(f"Data is {data}") if data == b'': - raise Disconnect("Stream connection was closed") - buff += data - if delim in buff: + log.debug("Stream connection was closed") + return + + if buff: # last received packet was segmented + data = buff + data + + # if last packet has not fully arrived it will + # be a truncated byte-stream + packets = data.split(delim) + buff = packets.pop() + + for packet in packets: try: - return json.loads(buff) + yield json.loads(packet) except json.decoder.JSONDecodeError: - log.exception("Failed to process JSON packet:") + log.exception(f"Failed to process JSON packet: {buff}") continue async def put(self, data): return await self.stream.send_all(json.dumps(data).encode() + b'\n') - async def __aiter__(self): - return self + async def get(self): + return await self._agen.asend(None) - async def __anext__(self): - try: - return await self.get() - except Disconnect: - raise StopAsyncIteration + async def __aiter__(self): + return self._agen async def poll_tickers( @@ -205,6 +218,12 @@ async def _handle_subs( async def _daemon_main(brokermod): """Main entry point for the piker daemon. """ + rate = 5 + broker_limit = getattr(brokermod, '_rate_limit', float('inf')) + if broker_limit < rate: + rate = broker_limit + log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + stream2tickers = {} async with brokermod.get_client() as client: @@ -217,8 +236,9 @@ async def _daemon_main(brokermod): await nursery.start( _handle_subs, queue, stream2tickers, nursery) nursery.start_soon( - poll_tickers, client, brokermod.quoter, - stream2tickers[queue.peer], queue + partial( + poll_tickers, client, brokermod.quoter, + stream2tickers[queue.peer], queue, rate=rate) ) async with trio.open_nursery() as nursery: diff --git a/piker/cli.py b/piker/cli.py index c6342eec..a8f9b924 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -134,12 +134,6 @@ def watch(loglevel, broker, rate, name): brokermod = get_brokermod(broker) watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) - broker_limit = getattr(brokermod, '_rate_limit', float('inf')) - - if broker_limit < rate: - rate = broker_limit - log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") - trio.run(_async_main, name, watchlists[name], brokermod, rate)