From 04c0d775953d5546199c4401de2fdb5cbb6ddca9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Nov 2022 13:36:33 -0500 Subject: [PATCH] Frame ticks in helper routine Wow, turns out tick framing was totally borked since we weren't framing on "greater then throttle period long waits" XD This moves all the framing logic into a common func and calls it in every case: - every (normal) "pre throttle period expires" quote receive - each "no new quote before throttle period expires" (slow case) - each "no clearing tick yet received" / only burst on clears case --- piker/data/_sampling.py | 115 +++++++++++++++++++++++----------------- 1 file changed, 65 insertions(+), 50 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f3c92d56..c4b182ae 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -431,12 +431,58 @@ async def sample_and_broadcast( # a working tick-type-classes template _tick_groups = { - 'clears': {'trade', 'utrade', 'last'}, + 'clears': {'trade', 'dark_trade', 'last'}, 'bids': {'bid', 'bsize'}, 'asks': {'ask', 'asize'}, } +def frame_ticks( + first_quote: dict, + last_quote: dict, + ticks_by_type: dict, +) -> None: + # append quotes since last iteration into the last quote's + # tick array/buffer. + ticks = last_quote.get('ticks') + + # TODO: once we decide to get fancy really we should + # have a shared mem tick buffer that is just + # continually filled and the UI just ready from it + # at it's display rate. + if ticks: + # TODO: do we need this any more or can we just + # expect the receiver to unwind the below + # `ticks_by_type: dict`? + # => undwinding would potentially require a + # `dict[str, set | list]` instead with an + # included `'types' field which is an (ordered) + # set of tick type fields in the order which + # types arrived? + first_quote['ticks'].extend(ticks) + + # XXX: build a tick-by-type table of lists + # of tick messages. This allows for less + # iteration on the receiver side by allowing for + # a single "latest tick event" look up by + # indexing the last entry in each sub-list. + # tbt = { + # 'types': ['bid', 'asize', 'last', .. ''], + + # 'bid': [tick0, tick1, tick2, .., tickn], + # 'asize': [tick0, tick1, tick2, .., tickn], + # 'last': [tick0, tick1, tick2, .., tickn], + # ... + # '': [tick0, tick1, tick2, .., tickn], + # } + + # append in reverse FIFO order for in-order iteration on + # receiver side. + for tick in ticks: + ttype = tick['type'] + ticks_by_type[ttype].append(tick) + + # TODO: a less naive throttler, here's some snippets: # token bucket by njs: # https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 @@ -483,51 +529,17 @@ async def uniform_rate_send( if not first_quote: first_quote = last_quote + # first_quote['tbt'] = ticks_by_type if (throttle_period - diff) > 0: # received a quote but the send cycle period hasn't yet # expired we aren't supposed to send yet so append # to the tick frame. - - # append quotes since last iteration into the last quote's - # tick array/buffer. - ticks = last_quote.get('ticks') - - # TODO: once we decide to get fancy really we should - # have a shared mem tick buffer that is just - # continually filled and the UI just ready from it - # at it's display rate. - if ticks: - # TODO: do we need this any more or can we just - # expect the receiver to unwind the below - # `ticks_by_type: dict`? - # => undwinding would potentially require a - # `dict[str, set | list]` instead with an - # included `'types' field which is an (ordered) - # set of tick type fields in the order which - # types arrived? - first_quote['ticks'].extend(ticks) - - # XXX: build a tick-by-type table of lists - # of tick messages. This allows for less - # iteration on the receiver side by allowing for - # a single "latest tick event" look up by - # indexing the last entry in each sub-list. - # tbt = { - # 'types': ['bid', 'asize', 'last', .. ''], - - # 'bid': [tick0, tick1, tick2, .., tickn], - # 'asize': [tick0, tick1, tick2, .., tickn], - # 'last': [tick0, tick1, tick2, .., tickn], - # ... - # '': [tick0, tick1, tick2, .., tickn], - # } - for tick in ticks: - # append in reverse FIFO order for in-order - # iteration on receiver side. - ticks_by_type[tick['type']].append(tick) - - first_quote['tbt'] = ticks_by_type + frame_ticks( + first_quote, + last_quote, + ticks_by_type, + ) # send cycle isn't due yet so continue waiting continue @@ -544,6 +556,12 @@ async def uniform_rate_send( # received quote ASAP. sym, first_quote = await quote_stream.receive() + frame_ticks( + first_quote, + first_quote, + ticks_by_type, + ) + # we have a quote already so send it now. with trio.move_on_after(throttle_period) as cs: @@ -556,20 +574,17 @@ async def uniform_rate_send( log.exception(f"feed for {stream} ended?") break - ticks = last_quote.get('ticks') - first_quote['ticks'].extend(ticks) - if ticks: - for tick in ticks: - # append in reverse FIFO order for in-order - # iteration on receiver side. - ticks_by_type[tick['type']].append(tick) - - first_quote['tbt'] = ticks_by_type + frame_ticks( + first_quote, + last_quote, + ticks_by_type, + ) # measured_rate = 1 / (time.time() - last_send) # log.info( # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' # ) + first_quote['tbt'] = ticks_by_type # TODO: now if only we could sync this to the display # rate timing exactly lul