Improved clearing-tick-burst-oriented throttling

Instead of uniformly distributing the msg send rate for a given
aggregate subscription, choose to be more bursty around clearing ticks
so as to avoid saturating the consumer with L1 book updates and vs.
delivering real trade data as-fast-as-possible.

Presuming the consumer is in the "UI land of slow" (eg. modern display
frame rates) such an approach serves more useful for seeing "material
changes" in the market as-bursty-as-possible (i.e. more short lived fast
changes in last clearing price vs. many slower changes in the bid-ask
spread queues). Such an approach also lends better to multi-feed
overlays which in aggregate tend to scale linearly with the number of
feeds/overlays; centralization of bursty arrival rates allows for
a higher overall throttle rate if used cleverly with framing.
samplerd_service
Tyler Goodlet 2022-11-16 17:39:54 -05:00
parent 43717c92d9
commit 715e693564
1 changed files with 18 additions and 0 deletions

View File

@ -383,6 +383,7 @@ async def sample_and_broadcast(
trio.ClosedResourceError, trio.ClosedResourceError,
trio.EndOfChannel, trio.EndOfChannel,
): ):
ctx = stream._ctx
chan = ctx.chan chan = ctx.chan
if ctx: if ctx:
log.warning( log.warning(
@ -427,6 +428,7 @@ async def uniform_rate_send(
diff = 0 diff = 0
task_status.started() task_status.started()
types: set[str] = set()
while True: while True:
@ -473,6 +475,7 @@ async def uniform_rate_send(
# at it's display rate. # at it's display rate.
if ticks: if ticks:
first_quote['ticks'].extend(ticks) first_quote['ticks'].extend(ticks)
types.update(item['type'] for item in ticks)
# send cycle isn't due yet so continue waiting # send cycle isn't due yet so continue waiting
continue continue
@ -491,6 +494,20 @@ async def uniform_rate_send(
# we have a quote already so send it now. # we have a quote already so send it now.
with trio.move_on_after(1/60) as cs:
while (
not types.intersection({'trade', 'utrade', 'last'})
):
try:
sym, last_quote = await quote_stream.receive()
except trio.EndOfChannel:
log.exception(f"feed for {stream} ended?")
break
ticks = last_quote.get('ticks')
first_quote['ticks'].extend(ticks)
types.update(item['type'] for item in ticks)
# measured_rate = 1 / (time.time() - last_send) # measured_rate = 1 / (time.time() - last_send)
# log.info( # log.info(
# f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}'
@ -520,3 +537,4 @@ async def uniform_rate_send(
first_quote = last_quote = None first_quote = last_quote = None
diff = 0 diff = 0
last_send = time.time() last_send = time.time()
types.clear()