From 92d7ffd332c2bb624e371116d6bbc776e98ab166 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Sep 2021 10:47:41 -0400 Subject: [PATCH] WIP fsp output throttling - not working yet --- piker/data/_sampling.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 4c5aaded..160f3fc9 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -246,7 +246,7 @@ async def sample_and_broadcast( if tick_throttle: # this is a send mem chan that likely # pushes to the ``uniform_rate_send()`` below. - await stream.send(quote) + await stream.send((sym, quote)) else: await stream.send({sym: quote}) @@ -285,10 +285,14 @@ async def uniform_rate_send( sleep_period = 1/rate - 0.000616 last_send = time.time() + aname = stream._ctx.chan.uid[0] + fsp = False + if 'fsp' in aname: + fsp = True while True: - first_quote = await quote_stream.receive() + sym, first_quote = await quote_stream.receive() start = time.time() # append quotes since last iteration into the last quote's @@ -301,7 +305,7 @@ async def uniform_rate_send( # while True: try: - next_quote = quote_stream.receive_nowait() + sym, next_quote = quote_stream.receive_nowait() ticks = next_quote.get('ticks') if ticks: @@ -312,12 +316,12 @@ async def uniform_rate_send( rate = 1 / (now - last_send) last_send = now - # print(f'{rate} Hz sending quotes') # \n{first_quote}') + # log.info(f'{rate} Hz sending quotes') # \n{first_quote}') # TODO: now if only we could sync this to the display # rate timing exactly lul try: - await stream.send({first_quote['symbol']: first_quote}) + await stream.send({sym: first_quote}) break except trio.ClosedResourceError: # if the feed consumer goes down then drop