Add back in legacy write loop for reference

marketstore
Tyler Goodlet 2022-02-18 08:21:17 -05:00
parent 216ad65933
commit ecc3613654
1 changed files with 43 additions and 57 deletions

View File

@ -59,15 +59,25 @@ _quote_dt = [
('Epoch', 'i8'),
('Nanoseconds', 'i4'),
('Tick', 'i4'), # do we need this?
('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask)
# ('fill_time', 'f4'),
('Last', 'f4'),
('Bid', 'f4'),
('Bsize', 'f4'),
('Asize', 'f4'),
('Bsize', 'i8'),
('Asize', 'i8'),
('Ask', 'f4'),
('Size', 'i8'),
('Volume', 'f4'),
('Volume', 'i8'),
# ('brokerd_ts', 'i64'),
# ('VWAP', 'f4')
]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
_tick_map = {
'Up': 1,
'Equal': 0,
'Down': -1,
None: np.nan,
}
def mk_tbk(keys: tuple[str, str, str]) -> str:
@ -91,8 +101,7 @@ def quote_to_marketstore_structarray(
'''
if last_fill:
# new fill bby
now = timestamp(last_fill, unit='s')
now = timestamp(last_fill)
else:
# this should get inserted upstream by the broker-client to
# subtract from IPC latency
@ -126,8 +135,8 @@ def timestamp(date, **kwargs) -> int:
'''
Return marketstore compatible 'Epoch' integer in nanoseconds
from a date formatted str.
'''
'''
return int(pd.Timestamp(date, **kwargs).value)
@ -162,21 +171,6 @@ async def ingest_quote_stream(
for tick in quote.get('ticks', ()):
ticktype = tick.get('type', 'n/a')
# _quote_dt = [
# # these two are required for as a "primary key"
# ('Epoch', 'i8'),
# ('Nanoseconds', 'i4'),
# ('Tick', 'i4'),
#
# ('Last', 'f4'),
# ('Bid', 'f4'),
# ('Bsize', 'f4'),
# ('Asize', 'f4'),
# ('Ask', 'f4'),
# ('Size', 'i8'),
# ('Volume', 'f4'),
# ]
# techtonic tick write
array = quote_to_marketstore_structarray({
'IsTrade': 1 if ticktype == 'trade' else 0,
@ -187,46 +181,38 @@ async def ingest_quote_stream(
await ms_client.write(array, _tick_tbk)
quote_cache = {
'size': 0,
'tick': 0
}
# start ingest to marketstore
async for quotes in feed.stream:
log.info(quotes)
for symbol, quote in quotes.items():
# LEGACY WRITE LOOP (using old tick dt)
# quote_cache = {
# 'size': 0,
# 'tick': 0
# }
for tick in quote.get('ticks', ()):
ticktype = tick.get('type')
price = tick.get('price')
size = tick.get('size')
# async for quotes in qstream:
# log.info(quotes)
# for symbol, quote in quotes.items():
if ticktype == 'n/a' or price == -1:
# okkk..
continue
# # remap tick strs to ints
# quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
# clearing price event
if ticktype == 'trade':
quote_cache['volume'] = quote['volume']
quote_cache['last'] = price
# quote_cache['broker_ts'] = quote['broker_ts']
# # check for volume update (i.e. did trades happen
# # since last quote)
# new_vol = quote.get('volume', None)
# if new_vol is None:
# log.debug(f"No fills for {symbol}")
# if new_vol == quote_cache.get('volume'):
# # should never happen due to field diffing
# # on sender side
# log.error(
# f"{symbol}: got same volume as last quote?")
# l1 book events
elif ticktype in ('ask', 'asize'):
quote_cache['ask'] = price
quote_cache['asize'] = size
# quote_cache.update(quote)
elif ticktype in ('bid', 'bsize'):
quote_cache['bid'] = price
quote_cache['bsize'] = size
a = quote_to_marketstore_structarray(
quote_cache,
last_fill=quote.get('broker_ts', None)
)
log.info(a)
# breakpoint()
await ms_client.write(symbol, a)
# a = quote_to_marketstore_structarray(
# quote,
# # TODO: check this closer to the broker query api
# last_fill=quote.get('fill_time', '')
# )
# await ms_client.write(symbol, a)
# async def stream_quotes(