Implement by-type tick-framing in throttler loop

This has been an outstanding idea for a while and changes the framing
format of tick events into a `dict[str, list[dict]]` wherein for each
tick "type" (eg. 'bid', 'ask', 'trade', 'asize'..etc) we create an FIFO
ordered `list` of events (data) and then pack this table into each
(throttled) send. This gives an additional implied downsample reduction
(in terms of iteration on the consumer side) from `N` tick-events to
a (max) `T` tick-types presuming the rx side only needs the latest tick
event.

Drop the `types: set` and adjust clearing event test to use the new
`ticks_by_type` map's keys.
epoch_index_backup
Tyler Goodlet 2022-11-17 17:28:26 -05:00
parent 6f0b1ea283
commit 5fcc34a9e6
1 changed files with 56 additions and 19 deletions

View File

@ -20,7 +20,10 @@ financial data flows.
""" """
from __future__ import annotations from __future__ import annotations
from collections import Counter from collections import (
Counter,
defaultdict,
)
import time import time
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -405,10 +408,17 @@ async def sample_and_broadcast(
) )
# a working tick-type-classes template
_tick_groups = {
'clears': {'trade', 'utrade', 'last'},
'bids': {'bid', 'bsize'},
'asks': {'ask', 'asize'},
}
# TODO: a less naive throttler, here's some snippets: # TODO: a less naive throttler, here's some snippets:
# token bucket by njs: # token bucket by njs:
# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 # https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
async def uniform_rate_send( async def uniform_rate_send(
rate: float, rate: float,
@ -428,7 +438,12 @@ async def uniform_rate_send(
diff = 0 diff = 0
task_status.started() task_status.started()
types: set[str] = set() ticks_by_type: defaultdict[
str,
list[dict],
] = defaultdict(list)
clear_types = _tick_groups['clears']
while True: while True:
@ -457,25 +472,41 @@ async def uniform_rate_send(
# tick array/buffer. # tick array/buffer.
ticks = last_quote.get('ticks') ticks = last_quote.get('ticks')
# XXX: idea for frame type data structure we could
# use on the wire instead of a simple list?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }
# TODO: once we decide to get fancy really we should # TODO: once we decide to get fancy really we should
# have a shared mem tick buffer that is just # have a shared mem tick buffer that is just
# continually filled and the UI just ready from it # continually filled and the UI just ready from it
# at it's display rate. # at it's display rate.
if ticks: if ticks:
# TODO: do we need this any more or can we just
# expect the receiver to unwind the below
# `ticks_by_type: dict`?
# => undwinding would potentially require a
# `dict[str, set | list]` instead with an
# included `'types' field which is an (ordered)
# set of tick type fields in the order which
# types arrived?
first_quote['ticks'].extend(ticks) first_quote['ticks'].extend(ticks)
types.update(item['type'] for item in ticks)
# XXX: build a tick-by-type table of lists
# of tick messages. This allows for less
# iteration on the receiver side by allowing for
# a single "latest tick event" look up by
# indexing the last entry in each sub-list.
# tbt = {
# 'types': ['bid', 'asize', 'last', .. '<type_n>'],
# 'bid': [tick0, tick1, tick2, .., tickn],
# 'asize': [tick0, tick1, tick2, .., tickn],
# 'last': [tick0, tick1, tick2, .., tickn],
# ...
# '<type_n>': [tick0, tick1, tick2, .., tickn],
# }
for tick in ticks:
# append in reverse FIFO order for in-order
# iteration on receiver side.
ticks_by_type[tick['type']].append(tick)
first_quote['tbt'] = ticks_by_type
# send cycle isn't due yet so continue waiting # send cycle isn't due yet so continue waiting
continue continue
@ -496,7 +527,7 @@ async def uniform_rate_send(
with trio.move_on_after(1/60) as cs: with trio.move_on_after(1/60) as cs:
while ( while (
not types.intersection({'trade', 'utrade', 'last'}) not set(ticks_by_type).intersection(clear_types)
): ):
try: try:
sym, last_quote = await quote_stream.receive() sym, last_quote = await quote_stream.receive()
@ -506,7 +537,13 @@ async def uniform_rate_send(
ticks = last_quote.get('ticks') ticks = last_quote.get('ticks')
first_quote['ticks'].extend(ticks) first_quote['ticks'].extend(ticks)
types.update(item['type'] for item in ticks) if ticks:
for tick in ticks:
# append in reverse FIFO order for in-order
# iteration on receiver side.
ticks_by_type[tick['type']].append(tick)
first_quote['tbt'] = ticks_by_type
# measured_rate = 1 / (time.time() - last_send) # measured_rate = 1 / (time.time() - last_send)
# log.info( # log.info(
@ -537,4 +574,4 @@ async def uniform_rate_send(
first_quote = last_quote = None first_quote = last_quote = None
diff = 0 diff = 0
last_send = time.time() last_send = time.time()
types.clear() ticks_by_type.clear()