Implement by-type tick-framing in throttler loop
This has been an outstanding idea for a while and changes the framing format of tick events into a `dict[str, list[dict]]` wherein for each tick "type" (eg. 'bid', 'ask', 'trade', 'asize'..etc) we create an FIFO ordered `list` of events (data) and then pack this table into each (throttled) send. This gives an additional implied downsample reduction (in terms of iteration on the consumer side) from `N` tick-events to a (max) `T` tick-types presuming the rx side only needs the latest tick event. Drop the `types: set` and adjust clearing event test to use the new `ticks_by_type` map's keys.samplerd_service
parent
715e693564
commit
125e31dbf3
|
@ -20,7 +20,10 @@ financial data flows.
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from collections import Counter
|
||||
from collections import (
|
||||
Counter,
|
||||
defaultdict,
|
||||
)
|
||||
import time
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
|
@ -405,10 +408,17 @@ async def sample_and_broadcast(
|
|||
)
|
||||
|
||||
|
||||
# a working tick-type-classes template
|
||||
_tick_groups = {
|
||||
'clears': {'trade', 'utrade', 'last'},
|
||||
'bids': {'bid', 'bsize'},
|
||||
'asks': {'ask', 'asize'},
|
||||
}
|
||||
|
||||
|
||||
# TODO: a less naive throttler, here's some snippets:
|
||||
# token bucket by njs:
|
||||
# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
|
||||
|
||||
async def uniform_rate_send(
|
||||
|
||||
rate: float,
|
||||
|
@ -428,7 +438,12 @@ async def uniform_rate_send(
|
|||
diff = 0
|
||||
|
||||
task_status.started()
|
||||
types: set[str] = set()
|
||||
ticks_by_type: defaultdict[
|
||||
str,
|
||||
list[dict],
|
||||
] = defaultdict(list)
|
||||
|
||||
clear_types = _tick_groups['clears']
|
||||
|
||||
while True:
|
||||
|
||||
|
@ -457,25 +472,41 @@ async def uniform_rate_send(
|
|||
# tick array/buffer.
|
||||
ticks = last_quote.get('ticks')
|
||||
|
||||
# XXX: idea for frame type data structure we could
|
||||
# use on the wire instead of a simple list?
|
||||
# frames = {
|
||||
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
|
||||
|
||||
# 'type_a': [tick0, tick1, tick2, .., tickn],
|
||||
# 'type_b': [tick0, tick1, tick2, .., tickn],
|
||||
# 'type_c': [tick0, tick1, tick2, .., tickn],
|
||||
# ...
|
||||
# 'type_n': [tick0, tick1, tick2, .., tickn],
|
||||
# }
|
||||
|
||||
# TODO: once we decide to get fancy really we should
|
||||
# have a shared mem tick buffer that is just
|
||||
# continually filled and the UI just ready from it
|
||||
# at it's display rate.
|
||||
if ticks:
|
||||
# TODO: do we need this any more or can we just
|
||||
# expect the receiver to unwind the below
|
||||
# `ticks_by_type: dict`?
|
||||
# => undwinding would potentially require a
|
||||
# `dict[str, set | list]` instead with an
|
||||
# included `'types' field which is an (ordered)
|
||||
# set of tick type fields in the order which
|
||||
# types arrived?
|
||||
first_quote['ticks'].extend(ticks)
|
||||
types.update(item['type'] for item in ticks)
|
||||
|
||||
# XXX: build a tick-by-type table of lists
|
||||
# of tick messages. This allows for less
|
||||
# iteration on the receiver side by allowing for
|
||||
# a single "latest tick event" look up by
|
||||
# indexing the last entry in each sub-list.
|
||||
# tbt = {
|
||||
# 'types': ['bid', 'asize', 'last', .. '<type_n>'],
|
||||
|
||||
# 'bid': [tick0, tick1, tick2, .., tickn],
|
||||
# 'asize': [tick0, tick1, tick2, .., tickn],
|
||||
# 'last': [tick0, tick1, tick2, .., tickn],
|
||||
# ...
|
||||
# '<type_n>': [tick0, tick1, tick2, .., tickn],
|
||||
# }
|
||||
for tick in ticks:
|
||||
# append in reverse FIFO order for in-order
|
||||
# iteration on receiver side.
|
||||
ticks_by_type[tick['type']].append(tick)
|
||||
|
||||
first_quote['tbt'] = ticks_by_type
|
||||
|
||||
# send cycle isn't due yet so continue waiting
|
||||
continue
|
||||
|
@ -496,7 +527,7 @@ async def uniform_rate_send(
|
|||
|
||||
with trio.move_on_after(1/60) as cs:
|
||||
while (
|
||||
not types.intersection({'trade', 'utrade', 'last'})
|
||||
not set(ticks_by_type).intersection(clear_types)
|
||||
):
|
||||
try:
|
||||
sym, last_quote = await quote_stream.receive()
|
||||
|
@ -506,7 +537,13 @@ async def uniform_rate_send(
|
|||
|
||||
ticks = last_quote.get('ticks')
|
||||
first_quote['ticks'].extend(ticks)
|
||||
types.update(item['type'] for item in ticks)
|
||||
if ticks:
|
||||
for tick in ticks:
|
||||
# append in reverse FIFO order for in-order
|
||||
# iteration on receiver side.
|
||||
ticks_by_type[tick['type']].append(tick)
|
||||
|
||||
first_quote['tbt'] = ticks_by_type
|
||||
|
||||
# measured_rate = 1 / (time.time() - last_send)
|
||||
# log.info(
|
||||
|
@ -537,4 +574,4 @@ async def uniform_rate_send(
|
|||
first_quote = last_quote = None
|
||||
diff = 0
|
||||
last_send = time.time()
|
||||
types.clear()
|
||||
ticks_by_type.clear()
|
||||
|
|
Loading…
Reference in New Issue