diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 160f3fc9..f8bda8a4 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -172,7 +172,6 @@ async def sample_and_broadcast( # iterate stream delivered by broker async for quotes in quote_stream: - # TODO: ``numba`` this! for sym, quote in quotes.items(): @@ -185,8 +184,12 @@ async def sample_and_broadcast( # start writing the shm buffer with appropriate # trade data - for tick in quote['ticks']: + # TODO: we should probably not write every single + # value to an OHLC sample stream XD + # for a tick stream sure.. but this is excessive.. + ticks = quote['ticks'] + for tick in ticks: ticktype = tick['type'] # write trade events to shm last OHLC sample @@ -258,7 +261,8 @@ async def sample_and_broadcast( except ( trio.BrokenResourceError, - trio.ClosedResourceError + trio.ClosedResourceError, + trio.EndOfChannel, ): # XXX: do we need to deregister here # if it's done in the fee bus code? @@ -268,6 +272,10 @@ async def sample_and_broadcast( f'{stream._ctx.chan.uid} dropped ' '`brokerd`-quotes-feed connection' ) + if tick_throttle: + assert stream.closed() + # await stream.aclose() + subs.remove((stream, tick_throttle)) @@ -283,12 +291,8 @@ async def uniform_rate_send( ) -> None: - sleep_period = 1/rate - 0.000616 + sleep_period = 1/rate - 0.0001 # 100us last_send = time.time() - aname = stream._ctx.chan.uid[0] - fsp = False - if 'fsp' in aname: - fsp = True while True: @@ -308,20 +312,33 @@ async def uniform_rate_send( sym, next_quote = quote_stream.receive_nowait() ticks = next_quote.get('ticks') + # XXX: idea for frame type data structure we could use on the + # wire instead of a simple list? + # frames = { + # 'index': ['type_a', 'type_c', 'type_n', 'type_n'], + + # 'type_a': [tick0, tick1, tick2, .., tickn], + # 'type_b': [tick0, tick1, tick2, .., tickn], + # 'type_c': [tick0, tick1, tick2, .., tickn], + # ... + # 'type_n': [tick0, tick1, tick2, .., tickn], + # } if ticks: first_quote['ticks'].extend(ticks) except trio.WouldBlock: now = time.time() rate = 1 / (now - last_send) - last_send = now - # log.info(f'{rate} Hz sending quotes') # \n{first_quote}') + log.debug( + f'`{sym}` throttled send hz: {round(rate, ndigits=1)}' + ) # TODO: now if only we could sync this to the display # rate timing exactly lul try: await stream.send({sym: first_quote}) + last_send = now break except trio.ClosedResourceError: # if the feed consumer goes down then drop