diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index a2017780..32b753a2 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -20,7 +20,10 @@ financial data flows. """ from __future__ import annotations -from collections import Counter +from collections import ( + Counter, + defaultdict, +) import time from typing import ( TYPE_CHECKING, @@ -405,10 +408,17 @@ async def sample_and_broadcast( ) +# a working tick-type-classes template +_tick_groups = { + 'clears': {'trade', 'utrade', 'last'}, + 'bids': {'bid', 'bsize'}, + 'asks': {'ask', 'asize'}, +} + + # TODO: a less naive throttler, here's some snippets: # token bucket by njs: # https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 - async def uniform_rate_send( rate: float, @@ -428,7 +438,12 @@ async def uniform_rate_send( diff = 0 task_status.started() - types: set[str] = set() + ticks_by_type: defaultdict[ + str, + list[dict], + ] = defaultdict(list) + + clear_types = _tick_groups['clears'] while True: @@ -457,25 +472,41 @@ async def uniform_rate_send( # tick array/buffer. ticks = last_quote.get('ticks') - # XXX: idea for frame type data structure we could - # use on the wire instead of a simple list? - # frames = { - # 'index': ['type_a', 'type_c', 'type_n', 'type_n'], - - # 'type_a': [tick0, tick1, tick2, .., tickn], - # 'type_b': [tick0, tick1, tick2, .., tickn], - # 'type_c': [tick0, tick1, tick2, .., tickn], - # ... - # 'type_n': [tick0, tick1, tick2, .., tickn], - # } - # TODO: once we decide to get fancy really we should # have a shared mem tick buffer that is just # continually filled and the UI just ready from it # at it's display rate. if ticks: + # TODO: do we need this any more or can we just + # expect the receiver to unwind the below + # `ticks_by_type: dict`? + # => undwinding would potentially require a + # `dict[str, set | list]` instead with an + # included `'types' field which is an (ordered) + # set of tick type fields in the order which + # types arrived? first_quote['ticks'].extend(ticks) - types.update(item['type'] for item in ticks) + + # XXX: build a tick-by-type table of lists + # of tick messages. This allows for less + # iteration on the receiver side by allowing for + # a single "latest tick event" look up by + # indexing the last entry in each sub-list. + # tbt = { + # 'types': ['bid', 'asize', 'last', .. ''], + + # 'bid': [tick0, tick1, tick2, .., tickn], + # 'asize': [tick0, tick1, tick2, .., tickn], + # 'last': [tick0, tick1, tick2, .., tickn], + # ... + # '': [tick0, tick1, tick2, .., tickn], + # } + for tick in ticks: + # append in reverse FIFO order for in-order + # iteration on receiver side. + ticks_by_type[tick['type']].append(tick) + + first_quote['tbt'] = ticks_by_type # send cycle isn't due yet so continue waiting continue @@ -496,7 +527,7 @@ async def uniform_rate_send( with trio.move_on_after(1/60) as cs: while ( - not types.intersection({'trade', 'utrade', 'last'}) + not set(ticks_by_type).intersection(clear_types) ): try: sym, last_quote = await quote_stream.receive() @@ -506,7 +537,13 @@ async def uniform_rate_send( ticks = last_quote.get('ticks') first_quote['ticks'].extend(ticks) - types.update(item['type'] for item in ticks) + if ticks: + for tick in ticks: + # append in reverse FIFO order for in-order + # iteration on receiver side. + ticks_by_type[tick['type']].append(tick) + + first_quote['tbt'] = ticks_by_type # measured_rate = 1 / (time.time() - last_send) # log.info( @@ -537,4 +574,4 @@ async def uniform_rate_send( first_quote = last_quote = None diff = 0 last_send = time.time() - types.clear() + ticks_by_type.clear()