First attempt data feed side quote throttling

Adding binance's "hft" ws feeds has resulted in a lot of context
switching in our Qt charts, so much so it's chewin CPU and definitely
worth it to throttle to the detected display rate as per discussion in
issue #192.

This is a first very very naive attempt at throttling L1 tick feeds on
the `brokerd` end (producer side) using a constant and uniform delivery
rate by way of a `trio` task + mem chan.  The new func is
`data._sampling.uniform_rate_send()`. Basically if a client request
a feed and provides a throttle rate we just spawn a task and queue up
ticks until approximately the next display rate's worth period of time
has passed before forwarding. It's definitely nothing fancy but does
provide fodder and a start point for an up and coming queueing eng to
start digging into both #107 and #109 ;)
naive_feed_throttling
Tyler Goodlet 2021-06-14 21:43:19 -04:00
parent 57a35a3c6c
commit ccf81520cb
1 changed files with 73 additions and 16 deletions

View File

@ -17,6 +17,7 @@
"""
Data buffers for fast shared humpy.
"""
import time
from typing import Dict, List
import tractor
@ -152,10 +153,12 @@ async def iter_ohlc_periods(
async def sample_and_broadcast(
bus: '_FeedBus', # noqa
shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel,
sum_tick_vlm: bool = True,
) -> None:
log.info("Started shared mem bar writer")
@ -177,11 +180,10 @@ async def sample_and_broadcast(
# trade data
for tick in quote['ticks']:
# if tick['type'] in ('utrade',):
# print(tick)
ticktype = tick['type']
# write trade events to shm last OHLC sample
if tick['type'] in ('trade', 'utrade'):
if ticktype in ('trade', 'utrade'):
last = tick['price']
@ -229,16 +231,71 @@ async def sample_and_broadcast(
# thus other consumers still attached.
subs = bus._subscribers[sym.lower()]
for stream in subs:
# print(f'sub is {ctx.chan.uid}')
try:
await stream.send({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
# XXX: do we need to deregister here
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.error(f'{stream._ctx.chan.uid} dropped connection')
for (stream, tick_throttle) in subs:
if tick_throttle:
await stream.send(quote)
else:
try:
await stream.send({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
# XXX: do we need to deregister here
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.error(f'{stream._ctx.chan.uid} dropped connection')
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream,
) -> None:
sleep_period = 1/rate - 0.000616
last_send = time.time()
while True:
first_quote = await quote_stream.receive()
start = time.time()
# append quotes since last iteration into the last quote's
# tick array/buffer.
# TODO: once we decide to get fancy really we should have
# a shared mem tick buffer that is just continually filled and
# the UI just ready from it at it's display rate.
# we'll likely head toward this once we get this issue going:
#
while True:
try:
next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks')
if ticks:
first_quote['ticks'].extend(ticks)
except trio.WouldBlock:
now = time.time()
rate = 1 / (now - last_send)
last_send = now
# print(f'{rate} Hz sending quotes\n{first_quote}')
# TODO: now if only we could sync this to the display
# rate timing exactly lul
await stream.send({first_quote['symbol']: first_quote})
break
end = time.time()
diff = end - start
# throttle to provided transmit rate
period = max(sleep_period - diff, 0)
if period > 0:
await trio.sleep(period)