Improved clearing-tick-burst-oriented throttling
Instead of uniformly distributing the msg send rate for a given aggregate subscription, choose to be more bursty around clearing ticks so as to avoid saturating the consumer with L1 book updates and vs. delivering real trade data as-fast-as-possible. Presuming the consumer is in the "UI land of slow" (eg. modern display frame rates) such an approach serves more useful for seeing "material changes" in the market as-bursty-as-possible (i.e. more short lived fast changes in last clearing price vs. many slower changes in the bid-ask spread queues). Such an approach also lends better to multi-feed overlays which in aggregate tend to scale linearly with the number of feeds/overlays; centralization of bursty arrival rates allows for a higher overall throttle rate if used cleverly with framing.epoch_index_backup
parent
363c7a2df2
commit
947f29aefb
|
@ -383,6 +383,7 @@ async def sample_and_broadcast(
|
|||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
):
|
||||
ctx = stream._ctx
|
||||
chan = ctx.chan
|
||||
if ctx:
|
||||
log.warning(
|
||||
|
@ -427,6 +428,7 @@ async def uniform_rate_send(
|
|||
diff = 0
|
||||
|
||||
task_status.started()
|
||||
types: set[str] = set()
|
||||
|
||||
while True:
|
||||
|
||||
|
@ -473,6 +475,7 @@ async def uniform_rate_send(
|
|||
# at it's display rate.
|
||||
if ticks:
|
||||
first_quote['ticks'].extend(ticks)
|
||||
types.update(item['type'] for item in ticks)
|
||||
|
||||
# send cycle isn't due yet so continue waiting
|
||||
continue
|
||||
|
@ -491,6 +494,20 @@ async def uniform_rate_send(
|
|||
|
||||
# we have a quote already so send it now.
|
||||
|
||||
with trio.move_on_after(1/60) as cs:
|
||||
while (
|
||||
not types.intersection({'trade', 'utrade', 'last'})
|
||||
):
|
||||
try:
|
||||
sym, last_quote = await quote_stream.receive()
|
||||
except trio.EndOfChannel:
|
||||
log.exception(f"feed for {stream} ended?")
|
||||
break
|
||||
|
||||
ticks = last_quote.get('ticks')
|
||||
first_quote['ticks'].extend(ticks)
|
||||
types.update(item['type'] for item in ticks)
|
||||
|
||||
# measured_rate = 1 / (time.time() - last_send)
|
||||
# log.info(
|
||||
# f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}'
|
||||
|
@ -520,3 +537,4 @@ async def uniform_rate_send(
|
|||
first_quote = last_quote = None
|
||||
diff = 0
|
||||
last_send = time.time()
|
||||
types.clear()
|
||||
|
|
Loading…
Reference in New Issue