From ad623119fa5cd04539c16236db0fa2f36204a5d9 Mon Sep 17 00:00:00 2001 From: wattygetlood <61716739+wattygetlood@users.noreply.github.com> Date: Thu, 16 Sep 2021 16:35:24 -0400 Subject: [PATCH] Fix divide-by-zero when quote read is too fast in throttle task --- piker/data/_sampling.py | 40 ++++++++++++---------------------------- 1 file changed, 12 insertions(+), 28 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f8bda8a4..a751396a 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -172,6 +172,7 @@ async def sample_and_broadcast( # iterate stream delivered by broker async for quotes in quote_stream: + # TODO: ``numba`` this! for sym, quote in quotes.items(): @@ -184,12 +185,8 @@ async def sample_and_broadcast( # start writing the shm buffer with appropriate # trade data + for tick in quote['ticks']: - # TODO: we should probably not write every single - # value to an OHLC sample stream XD - # for a tick stream sure.. but this is excessive.. - ticks = quote['ticks'] - for tick in ticks: ticktype = tick['type'] # write trade events to shm last OHLC sample @@ -261,8 +258,7 @@ async def sample_and_broadcast( except ( trio.BrokenResourceError, - trio.ClosedResourceError, - trio.EndOfChannel, + trio.ClosedResourceError ): # XXX: do we need to deregister here # if it's done in the fee bus code? @@ -272,10 +268,6 @@ async def sample_and_broadcast( f'{stream._ctx.chan.uid} dropped ' '`brokerd`-quotes-feed connection' ) - if tick_throttle: - assert stream.closed() - # await stream.aclose() - subs.remove((stream, tick_throttle)) @@ -291,8 +283,12 @@ async def uniform_rate_send( ) -> None: - sleep_period = 1/rate - 0.0001 # 100us + sleep_period = 1/rate - 0.000616 last_send = time.time() + aname = stream._ctx.chan.uid[0] + fsp = False + if 'fsp' in aname: + fsp = True while True: @@ -312,33 +308,21 @@ async def uniform_rate_send( sym, next_quote = quote_stream.receive_nowait() ticks = next_quote.get('ticks') - # XXX: idea for frame type data structure we could use on the - # wire instead of a simple list? - # frames = { - # 'index': ['type_a', 'type_c', 'type_n', 'type_n'], - - # 'type_a': [tick0, tick1, tick2, .., tickn], - # 'type_b': [tick0, tick1, tick2, .., tickn], - # 'type_c': [tick0, tick1, tick2, .., tickn], - # ... - # 'type_n': [tick0, tick1, tick2, .., tickn], - # } if ticks: first_quote['ticks'].extend(ticks) except trio.WouldBlock: now = time.time() - rate = 1 / (now - last_send) + diff = now - last_send + rate = 1 / diff if diff else float('inf') + last_send = now - log.debug( - f'`{sym}` throttled send hz: {round(rate, ndigits=1)}' - ) + # log.info(f'{rate} Hz sending quotes') # \n{first_quote}') # TODO: now if only we could sync this to the display # rate timing exactly lul try: await stream.send({sym: first_quote}) - last_send = now break except trio.ClosedResourceError: # if the feed consumer goes down then drop