data._sampling.frame_ticks(): slight rework to generalize

basic_buy_bot
Tyler Goodlet 2023-06-26 19:30:20 -04:00
parent 4a8eafabb8
commit 7b4472e37e
1 changed files with 42 additions and 31 deletions

View File

@ -27,6 +27,7 @@ from collections import (
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
import time import time
from typing import ( from typing import (
Any,
AsyncIterator, AsyncIterator,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -587,9 +588,9 @@ async def sample_and_broadcast(
# TODO: we should probably not write every single # TODO: we should probably not write every single
# value to an OHLC sample stream XD # value to an OHLC sample stream XD
# for a tick stream sure.. but this is excessive.. # for a tick stream sure.. but this is excessive..
ticks = quote['ticks'] ticks: list[dict] = quote['ticks']
for tick in ticks: for tick in ticks:
ticktype = tick['type'] ticktype: str = tick['type']
# write trade events to shm last OHLC sample # write trade events to shm last OHLC sample
if ticktype in ('trade', 'utrade'): if ticktype in ('trade', 'utrade'):
@ -599,13 +600,14 @@ async def sample_and_broadcast(
# more compact inline-way to do this assignment # more compact inline-way to do this assignment
# to both buffers? # to both buffers?
for shm in [rt_shm, hist_shm]: for shm in [rt_shm, hist_shm]:
# update last entry # update last entry
# benchmarked in the 4-5 us range # benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][ o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume'] ['open', 'high', 'low', 'volume']
] ]
new_v = tick.get('size', 0) new_v: float = tick.get('size', 0)
if v == 0 and new_v: if v == 0 and new_v:
# no trades for this bar yet so the open # no trades for this bar yet so the open
@ -654,7 +656,7 @@ async def sample_and_broadcast(
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct # should?) so we have to manually generate the correct
# key here. # key here.
fqme = f'{broker_symbol}.{brokername}' fqme: str = f'{broker_symbol}.{brokername}'
lags: int = 0 lags: int = 0
# TODO: speed up this loop in an AOT compiled lang (like # TODO: speed up this loop in an AOT compiled lang (like
@ -757,28 +759,21 @@ _auction_ticks: set[str] = set.union(*_tick_groups.values())
def frame_ticks( def frame_ticks(
first_quote: dict, quote: dict[str, Any],
last_quote: dict,
ticks_by_type: dict, ticks_by_type: dict[str, list[dict[str, Any]]] = {},
) -> None: ticks_in_order: list[dict[str, Any]] | None = None
) -> dict:
# append quotes since last iteration into the last quote's # append quotes since last iteration into the last quote's
# tick array/buffer. # tick array/buffer.
ticks = last_quote.get('ticks')
# TODO: once we decide to get fancy really we should # TODO: once we decide to get fancy really we should
# have a shared mem tick buffer that is just # have a shared mem tick buffer that is just
# continually filled and the UI just ready from it # continually filled and the UI just ready from it
# at it's display rate. # at it's display rate.
if ticks:
# TODO: do we need this any more or can we just if ticks := quote.get('ticks'):
# expect the receiver to unwind the below
# `ticks_by_type: dict`?
# => undwinding would potentially require a
# `dict[str, set | list]` instead with an
# included `'types' field which is an (ordered)
# set of tick type fields in the order which
# types arrived?
first_quote['ticks'].extend(ticks)
# XXX: build a tick-by-type table of lists # XXX: build a tick-by-type table of lists
# of tick messages. This allows for less # of tick messages. This allows for less
@ -797,9 +792,25 @@ def frame_ticks(
# append in reverse FIFO order for in-order iteration on # append in reverse FIFO order for in-order iteration on
# receiver side. # receiver side.
tick: dict[str, Any]
for tick in ticks: for tick in ticks:
ttype = tick['type'] ticks_by_type.setdefault(
ticks_by_type[ttype].append(tick) tick['type'],
[],
).append(tick)
# TODO: do we need this any more or can we just
# expect the receiver to unwind the below
# `ticks_by_type: dict`?
# => undwinding would potentially require a
# `dict[str, set | list]` instead with an
# included `'types' field which is an (ordered)
# set of tick type fields in the order which
# types arrived?
if ticks_in_order:
ticks_in_order.extend(ticks)
return ticks_by_type
async def uniform_rate_send( async def uniform_rate_send(
@ -835,10 +846,10 @@ async def uniform_rate_send(
diff = 0 diff = 0
task_status.started() task_status.started()
ticks_by_type: defaultdict[ ticks_by_type: dict[
str, str,
list[dict], list[dict[str, Any]],
] = defaultdict(list) ] = {}
clear_types = _tick_groups['clears'] clear_types = _tick_groups['clears']
@ -866,9 +877,9 @@ async def uniform_rate_send(
# expired we aren't supposed to send yet so append # expired we aren't supposed to send yet so append
# to the tick frame. # to the tick frame.
frame_ticks( frame_ticks(
first_quote,
last_quote, last_quote,
ticks_by_type, ticks_in_order=first_quote['ticks'],
ticks_by_type=ticks_by_type,
) )
# send cycle isn't due yet so continue waiting # send cycle isn't due yet so continue waiting
@ -888,8 +899,8 @@ async def uniform_rate_send(
frame_ticks( frame_ticks(
first_quote, first_quote,
first_quote, ticks_in_order=first_quote['ticks'],
ticks_by_type, ticks_by_type=ticks_by_type,
) )
# we have a quote already so send it now. # we have a quote already so send it now.
@ -905,9 +916,9 @@ async def uniform_rate_send(
break break
frame_ticks( frame_ticks(
first_quote,
last_quote, last_quote,
ticks_by_type, ticks_in_order=first_quote['ticks'],
ticks_by_type=ticks_by_type,
) )
# measured_rate = 1 / (time.time() - last_send) # measured_rate = 1 / (time.time() - last_send)