Frame ticks in helper routine

Wow, turns out tick framing was totally borked since we weren't framing
on "greater then throttle period long waits" XD

This moves all the framing logic into a common func and calls it in
every case:
- every (normal) "pre throttle period expires" quote receive
- each "no new quote before throttle period expires" (slow case)
- each "no clearing tick yet received" / only burst on clears case
epoch_index_backup
Tyler Goodlet 2022-11-29 13:36:33 -05:00
parent ac0166f936
commit 16d5ea5b33
1 changed files with 65 additions and 50 deletions

View File

@ -431,12 +431,58 @@ async def sample_and_broadcast(
# a working tick-type-classes template
_tick_groups = {
'clears': {'trade', 'utrade', 'last'},
'clears': {'trade', 'dark_trade', 'last'},
'bids': {'bid', 'bsize'},
'asks': {'ask', 'asize'},
}
def frame_ticks(
first_quote: dict,
last_quote: dict,
ticks_by_type: dict,
) -> None:
# append quotes since last iteration into the last quote's
# tick array/buffer.
ticks = last_quote.get('ticks')
# TODO: once we decide to get fancy really we should
# have a shared mem tick buffer that is just
# continually filled and the UI just ready from it
# at it's display rate.
if ticks:
# TODO: do we need this any more or can we just
# expect the receiver to unwind the below
# `ticks_by_type: dict`?
# => undwinding would potentially require a
# `dict[str, set | list]` instead with an
# included `'types' field which is an (ordered)
# set of tick type fields in the order which
# types arrived?
first_quote['ticks'].extend(ticks)
# XXX: build a tick-by-type table of lists
# of tick messages. This allows for less
# iteration on the receiver side by allowing for
# a single "latest tick event" look up by
# indexing the last entry in each sub-list.
# tbt = {
# 'types': ['bid', 'asize', 'last', .. '<type_n>'],
# 'bid': [tick0, tick1, tick2, .., tickn],
# 'asize': [tick0, tick1, tick2, .., tickn],
# 'last': [tick0, tick1, tick2, .., tickn],
# ...
# '<type_n>': [tick0, tick1, tick2, .., tickn],
# }
# append in reverse FIFO order for in-order iteration on
# receiver side.
for tick in ticks:
ttype = tick['type']
ticks_by_type[ttype].append(tick)
# TODO: a less naive throttler, here's some snippets:
# token bucket by njs:
# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
@ -483,51 +529,17 @@ async def uniform_rate_send(
if not first_quote:
first_quote = last_quote
# first_quote['tbt'] = ticks_by_type
if (throttle_period - diff) > 0:
# received a quote but the send cycle period hasn't yet
# expired we aren't supposed to send yet so append
# to the tick frame.
# append quotes since last iteration into the last quote's
# tick array/buffer.
ticks = last_quote.get('ticks')
# TODO: once we decide to get fancy really we should
# have a shared mem tick buffer that is just
# continually filled and the UI just ready from it
# at it's display rate.
if ticks:
# TODO: do we need this any more or can we just
# expect the receiver to unwind the below
# `ticks_by_type: dict`?
# => undwinding would potentially require a
# `dict[str, set | list]` instead with an
# included `'types' field which is an (ordered)
# set of tick type fields in the order which
# types arrived?
first_quote['ticks'].extend(ticks)
# XXX: build a tick-by-type table of lists
# of tick messages. This allows for less
# iteration on the receiver side by allowing for
# a single "latest tick event" look up by
# indexing the last entry in each sub-list.
# tbt = {
# 'types': ['bid', 'asize', 'last', .. '<type_n>'],
# 'bid': [tick0, tick1, tick2, .., tickn],
# 'asize': [tick0, tick1, tick2, .., tickn],
# 'last': [tick0, tick1, tick2, .., tickn],
# ...
# '<type_n>': [tick0, tick1, tick2, .., tickn],
# }
for tick in ticks:
# append in reverse FIFO order for in-order
# iteration on receiver side.
ticks_by_type[tick['type']].append(tick)
first_quote['tbt'] = ticks_by_type
frame_ticks(
first_quote,
last_quote,
ticks_by_type,
)
# send cycle isn't due yet so continue waiting
continue
@ -544,6 +556,12 @@ async def uniform_rate_send(
# received quote ASAP.
sym, first_quote = await quote_stream.receive()
frame_ticks(
first_quote,
first_quote,
ticks_by_type,
)
# we have a quote already so send it now.
with trio.move_on_after(throttle_period) as cs:
@ -556,20 +574,17 @@ async def uniform_rate_send(
log.exception(f"feed for {stream} ended?")
break
ticks = last_quote.get('ticks')
first_quote['ticks'].extend(ticks)
if ticks:
for tick in ticks:
# append in reverse FIFO order for in-order
# iteration on receiver side.
ticks_by_type[tick['type']].append(tick)
first_quote['tbt'] = ticks_by_type
frame_ticks(
first_quote,
last_quote,
ticks_by_type,
)
# measured_rate = 1 / (time.time() - last_send)
# log.info(
# f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}'
# )
first_quote['tbt'] = ticks_by_type
# TODO: now if only we could sync this to the display
# rate timing exactly lul