Use an async generator inside `StreamQueue`

Async generators are faster and less code. Handle segmented packets
which can happen during periods of high quote volume. Move per-broker
rate limit logic into daemon task.
kivy_mainline_and_py3.8
Tyler Goodlet 2018-04-16 13:34:26 -04:00
parent 4898459bcd
commit f80735121c
2 changed files with 36 additions and 22 deletions

View File

@ -80,36 +80,49 @@ class StreamQueue:
self.stream = stream self.stream = stream
self._delim = delim self._delim = delim
self.peer = stream.socket.getpeername() self.peer = stream.socket.getpeername()
self._agen = self._iter_packets()
async def get(self): async def _iter_packets(self):
"""Get a packet from the underlying stream. """Get a packet from the underlying stream.
""" """
delim = self._delim delim = self._delim
buff = b'' buff = b''
while True: while True:
data = await self.stream.receive_some(2**10) packets = []
try:
data = await self.stream.receive_some(2**10)
except trio.BrokenStreamError as err:
log.debug("Stream connection was broken")
return
log.trace(f"Data is {data}") log.trace(f"Data is {data}")
if data == b'': if data == b'':
raise Disconnect("Stream connection was closed") log.debug("Stream connection was closed")
buff += data return
if delim in buff:
if buff: # last received packet was segmented
data = buff + data
# if last packet has not fully arrived it will
# be a truncated byte-stream
packets = data.split(delim)
buff = packets.pop()
for packet in packets:
try: try:
return json.loads(buff) yield json.loads(packet)
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
log.exception("Failed to process JSON packet:") log.exception(f"Failed to process JSON packet: {buff}")
continue continue
async def put(self, data): async def put(self, data):
return await self.stream.send_all(json.dumps(data).encode() + b'\n') return await self.stream.send_all(json.dumps(data).encode() + b'\n')
async def __aiter__(self): async def get(self):
return self return await self._agen.asend(None)
async def __anext__(self): async def __aiter__(self):
try: return self._agen
return await self.get()
except Disconnect:
raise StopAsyncIteration
async def poll_tickers( async def poll_tickers(
@ -205,6 +218,12 @@ async def _handle_subs(
async def _daemon_main(brokermod): async def _daemon_main(brokermod):
"""Main entry point for the piker daemon. """Main entry point for the piker daemon.
""" """
rate = 5
broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec")
stream2tickers = {} stream2tickers = {}
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
@ -217,8 +236,9 @@ async def _daemon_main(brokermod):
await nursery.start( await nursery.start(
_handle_subs, queue, stream2tickers, nursery) _handle_subs, queue, stream2tickers, nursery)
nursery.start_soon( nursery.start_soon(
poll_tickers, client, brokermod.quoter, partial(
stream2tickers[queue.peer], queue poll_tickers, client, brokermod.quoter,
stream2tickers[queue.peer], queue, rate=rate)
) )
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:

View File

@ -134,12 +134,6 @@ def watch(loglevel, broker, rate, name):
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path)
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
broker_limit = getattr(brokermod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec")
trio.run(_async_main, name, watchlists[name], brokermod, rate) trio.run(_async_main, name, watchlists[name], brokermod, rate)