From 715e693564773789fdb3760e6a71de9a51ae7e07 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Nov 2022 17:39:54 -0500 Subject: [PATCH] Improved clearing-tick-burst-oriented throttling Instead of uniformly distributing the msg send rate for a given aggregate subscription, choose to be more bursty around clearing ticks so as to avoid saturating the consumer with L1 book updates and vs. delivering real trade data as-fast-as-possible. Presuming the consumer is in the "UI land of slow" (eg. modern display frame rates) such an approach serves more useful for seeing "material changes" in the market as-bursty-as-possible (i.e. more short lived fast changes in last clearing price vs. many slower changes in the bid-ask spread queues). Such an approach also lends better to multi-feed overlays which in aggregate tend to scale linearly with the number of feeds/overlays; centralization of bursty arrival rates allows for a higher overall throttle rate if used cleverly with framing. --- piker/data/_sampling.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f8230bd7..a2017780 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -383,6 +383,7 @@ async def sample_and_broadcast( trio.ClosedResourceError, trio.EndOfChannel, ): + ctx = stream._ctx chan = ctx.chan if ctx: log.warning( @@ -427,6 +428,7 @@ async def uniform_rate_send( diff = 0 task_status.started() + types: set[str] = set() while True: @@ -473,6 +475,7 @@ async def uniform_rate_send( # at it's display rate. if ticks: first_quote['ticks'].extend(ticks) + types.update(item['type'] for item in ticks) # send cycle isn't due yet so continue waiting continue @@ -491,6 +494,20 @@ async def uniform_rate_send( # we have a quote already so send it now. + with trio.move_on_after(1/60) as cs: + while ( + not types.intersection({'trade', 'utrade', 'last'}) + ): + try: + sym, last_quote = await quote_stream.receive() + except trio.EndOfChannel: + log.exception(f"feed for {stream} ended?") + break + + ticks = last_quote.get('ticks') + first_quote['ticks'].extend(ticks) + types.update(item['type'] for item in ticks) + # measured_rate = 1 / (time.time() - last_send) # log.info( # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' @@ -520,3 +537,4 @@ async def uniform_rate_send( first_quote = last_quote = None diff = 0 last_send = time.time() + types.clear()