diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f3c92d56..c4b182ae 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -431,12 +431,58 @@ async def sample_and_broadcast( # a working tick-type-classes template _tick_groups = { - 'clears': {'trade', 'utrade', 'last'}, + 'clears': {'trade', 'dark_trade', 'last'}, 'bids': {'bid', 'bsize'}, 'asks': {'ask', 'asize'}, } +def frame_ticks( + first_quote: dict, + last_quote: dict, + ticks_by_type: dict, +) -> None: + # append quotes since last iteration into the last quote's + # tick array/buffer. + ticks = last_quote.get('ticks') + + # TODO: once we decide to get fancy really we should + # have a shared mem tick buffer that is just + # continually filled and the UI just ready from it + # at it's display rate. + if ticks: + # TODO: do we need this any more or can we just + # expect the receiver to unwind the below + # `ticks_by_type: dict`? + # => undwinding would potentially require a + # `dict[str, set | list]` instead with an + # included `'types' field which is an (ordered) + # set of tick type fields in the order which + # types arrived? + first_quote['ticks'].extend(ticks) + + # XXX: build a tick-by-type table of lists + # of tick messages. This allows for less + # iteration on the receiver side by allowing for + # a single "latest tick event" look up by + # indexing the last entry in each sub-list. + # tbt = { + # 'types': ['bid', 'asize', 'last', .. ''], + + # 'bid': [tick0, tick1, tick2, .., tickn], + # 'asize': [tick0, tick1, tick2, .., tickn], + # 'last': [tick0, tick1, tick2, .., tickn], + # ... + # '': [tick0, tick1, tick2, .., tickn], + # } + + # append in reverse FIFO order for in-order iteration on + # receiver side. + for tick in ticks: + ttype = tick['type'] + ticks_by_type[ttype].append(tick) + + # TODO: a less naive throttler, here's some snippets: # token bucket by njs: # https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 @@ -483,51 +529,17 @@ async def uniform_rate_send( if not first_quote: first_quote = last_quote + # first_quote['tbt'] = ticks_by_type if (throttle_period - diff) > 0: # received a quote but the send cycle period hasn't yet # expired we aren't supposed to send yet so append # to the tick frame. - - # append quotes since last iteration into the last quote's - # tick array/buffer. - ticks = last_quote.get('ticks') - - # TODO: once we decide to get fancy really we should - # have a shared mem tick buffer that is just - # continually filled and the UI just ready from it - # at it's display rate. - if ticks: - # TODO: do we need this any more or can we just - # expect the receiver to unwind the below - # `ticks_by_type: dict`? - # => undwinding would potentially require a - # `dict[str, set | list]` instead with an - # included `'types' field which is an (ordered) - # set of tick type fields in the order which - # types arrived? - first_quote['ticks'].extend(ticks) - - # XXX: build a tick-by-type table of lists - # of tick messages. This allows for less - # iteration on the receiver side by allowing for - # a single "latest tick event" look up by - # indexing the last entry in each sub-list. - # tbt = { - # 'types': ['bid', 'asize', 'last', .. ''], - - # 'bid': [tick0, tick1, tick2, .., tickn], - # 'asize': [tick0, tick1, tick2, .., tickn], - # 'last': [tick0, tick1, tick2, .., tickn], - # ... - # '': [tick0, tick1, tick2, .., tickn], - # } - for tick in ticks: - # append in reverse FIFO order for in-order - # iteration on receiver side. - ticks_by_type[tick['type']].append(tick) - - first_quote['tbt'] = ticks_by_type + frame_ticks( + first_quote, + last_quote, + ticks_by_type, + ) # send cycle isn't due yet so continue waiting continue @@ -544,6 +556,12 @@ async def uniform_rate_send( # received quote ASAP. sym, first_quote = await quote_stream.receive() + frame_ticks( + first_quote, + first_quote, + ticks_by_type, + ) + # we have a quote already so send it now. with trio.move_on_after(throttle_period) as cs: @@ -556,20 +574,17 @@ async def uniform_rate_send( log.exception(f"feed for {stream} ended?") break - ticks = last_quote.get('ticks') - first_quote['ticks'].extend(ticks) - if ticks: - for tick in ticks: - # append in reverse FIFO order for in-order - # iteration on receiver side. - ticks_by_type[tick['type']].append(tick) - - first_quote['tbt'] = ticks_by_type + frame_ticks( + first_quote, + last_quote, + ticks_by_type, + ) # measured_rate = 1 / (time.time() - last_send) # log.info( # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' # ) + first_quote['tbt'] = ticks_by_type # TODO: now if only we could sync this to the display # rate timing exactly lul