diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f8230bd7..a2017780 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -383,6 +383,7 @@ async def sample_and_broadcast( trio.ClosedResourceError, trio.EndOfChannel, ): + ctx = stream._ctx chan = ctx.chan if ctx: log.warning( @@ -427,6 +428,7 @@ async def uniform_rate_send( diff = 0 task_status.started() + types: set[str] = set() while True: @@ -473,6 +475,7 @@ async def uniform_rate_send( # at it's display rate. if ticks: first_quote['ticks'].extend(ticks) + types.update(item['type'] for item in ticks) # send cycle isn't due yet so continue waiting continue @@ -491,6 +494,20 @@ async def uniform_rate_send( # we have a quote already so send it now. + with trio.move_on_after(1/60) as cs: + while ( + not types.intersection({'trade', 'utrade', 'last'}) + ): + try: + sym, last_quote = await quote_stream.receive() + except trio.EndOfChannel: + log.exception(f"feed for {stream} ended?") + break + + ticks = last_quote.get('ticks') + first_quote['ticks'].extend(ticks) + types.update(item['type'] for item in ticks) + # measured_rate = 1 / (time.time() - last_send) # log.info( # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' @@ -520,3 +537,4 @@ async def uniform_rate_send( first_quote = last_quote = None diff = 0 last_send = time.time() + types.clear()