Add back in legacy write loop for reference
parent
797ba15923
commit
f0ae7edb00
|
@ -59,15 +59,25 @@ _quote_dt = [
|
||||||
('Epoch', 'i8'),
|
('Epoch', 'i8'),
|
||||||
('Nanoseconds', 'i4'),
|
('Nanoseconds', 'i4'),
|
||||||
|
|
||||||
('Tick', 'i4'), # do we need this?
|
('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask)
|
||||||
|
# ('fill_time', 'f4'),
|
||||||
('Last', 'f4'),
|
('Last', 'f4'),
|
||||||
('Bid', 'f4'),
|
('Bid', 'f4'),
|
||||||
('Bsize', 'f4'),
|
('Bsize', 'i8'),
|
||||||
('Asize', 'f4'),
|
('Asize', 'i8'),
|
||||||
('Ask', 'f4'),
|
('Ask', 'f4'),
|
||||||
('Size', 'i8'),
|
('Size', 'i8'),
|
||||||
('Volume', 'f4'),
|
('Volume', 'i8'),
|
||||||
|
# ('brokerd_ts', 'i64'),
|
||||||
|
# ('VWAP', 'f4')
|
||||||
]
|
]
|
||||||
|
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
|
||||||
|
_tick_map = {
|
||||||
|
'Up': 1,
|
||||||
|
'Equal': 0,
|
||||||
|
'Down': -1,
|
||||||
|
None: np.nan,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def mk_tbk(keys: tuple[str, str, str]) -> str:
|
def mk_tbk(keys: tuple[str, str, str]) -> str:
|
||||||
|
@ -91,8 +101,7 @@ def quote_to_marketstore_structarray(
|
||||||
'''
|
'''
|
||||||
if last_fill:
|
if last_fill:
|
||||||
# new fill bby
|
# new fill bby
|
||||||
now = timestamp(last_fill, unit='s')
|
now = timestamp(last_fill)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# this should get inserted upstream by the broker-client to
|
# this should get inserted upstream by the broker-client to
|
||||||
# subtract from IPC latency
|
# subtract from IPC latency
|
||||||
|
@ -126,8 +135,8 @@ def timestamp(date, **kwargs) -> int:
|
||||||
'''
|
'''
|
||||||
Return marketstore compatible 'Epoch' integer in nanoseconds
|
Return marketstore compatible 'Epoch' integer in nanoseconds
|
||||||
from a date formatted str.
|
from a date formatted str.
|
||||||
'''
|
|
||||||
|
|
||||||
|
'''
|
||||||
return int(pd.Timestamp(date, **kwargs).value)
|
return int(pd.Timestamp(date, **kwargs).value)
|
||||||
|
|
||||||
|
|
||||||
|
@ -162,21 +171,6 @@ async def ingest_quote_stream(
|
||||||
for tick in quote.get('ticks', ()):
|
for tick in quote.get('ticks', ()):
|
||||||
ticktype = tick.get('type', 'n/a')
|
ticktype = tick.get('type', 'n/a')
|
||||||
|
|
||||||
# _quote_dt = [
|
|
||||||
# # these two are required for as a "primary key"
|
|
||||||
# ('Epoch', 'i8'),
|
|
||||||
# ('Nanoseconds', 'i4'),
|
|
||||||
# ('Tick', 'i4'),
|
|
||||||
#
|
|
||||||
# ('Last', 'f4'),
|
|
||||||
# ('Bid', 'f4'),
|
|
||||||
# ('Bsize', 'f4'),
|
|
||||||
# ('Asize', 'f4'),
|
|
||||||
# ('Ask', 'f4'),
|
|
||||||
# ('Size', 'i8'),
|
|
||||||
# ('Volume', 'f4'),
|
|
||||||
# ]
|
|
||||||
|
|
||||||
# techtonic tick write
|
# techtonic tick write
|
||||||
array = quote_to_marketstore_structarray({
|
array = quote_to_marketstore_structarray({
|
||||||
'IsTrade': 1 if ticktype == 'trade' else 0,
|
'IsTrade': 1 if ticktype == 'trade' else 0,
|
||||||
|
@ -187,46 +181,38 @@ async def ingest_quote_stream(
|
||||||
|
|
||||||
await ms_client.write(array, _tick_tbk)
|
await ms_client.write(array, _tick_tbk)
|
||||||
|
|
||||||
quote_cache = {
|
# LEGACY WRITE LOOP (using old tick dt)
|
||||||
'size': 0,
|
# quote_cache = {
|
||||||
'tick': 0
|
# 'size': 0,
|
||||||
}
|
# 'tick': 0
|
||||||
# start ingest to marketstore
|
# }
|
||||||
async for quotes in feed.stream:
|
|
||||||
log.info(quotes)
|
|
||||||
for symbol, quote in quotes.items():
|
|
||||||
|
|
||||||
for tick in quote.get('ticks', ()):
|
# async for quotes in qstream:
|
||||||
ticktype = tick.get('type')
|
# log.info(quotes)
|
||||||
price = tick.get('price')
|
# for symbol, quote in quotes.items():
|
||||||
size = tick.get('size')
|
|
||||||
|
|
||||||
if ticktype == 'n/a' or price == -1:
|
# # remap tick strs to ints
|
||||||
# okkk..
|
# quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
|
||||||
continue
|
|
||||||
|
|
||||||
# clearing price event
|
# # check for volume update (i.e. did trades happen
|
||||||
if ticktype == 'trade':
|
# # since last quote)
|
||||||
quote_cache['volume'] = quote['volume']
|
# new_vol = quote.get('volume', None)
|
||||||
quote_cache['last'] = price
|
# if new_vol is None:
|
||||||
# quote_cache['broker_ts'] = quote['broker_ts']
|
# log.debug(f"No fills for {symbol}")
|
||||||
|
# if new_vol == quote_cache.get('volume'):
|
||||||
|
# # should never happen due to field diffing
|
||||||
|
# # on sender side
|
||||||
|
# log.error(
|
||||||
|
# f"{symbol}: got same volume as last quote?")
|
||||||
|
|
||||||
# l1 book events
|
# quote_cache.update(quote)
|
||||||
elif ticktype in ('ask', 'asize'):
|
|
||||||
quote_cache['ask'] = price
|
|
||||||
quote_cache['asize'] = size
|
|
||||||
|
|
||||||
elif ticktype in ('bid', 'bsize'):
|
# a = quote_to_marketstore_structarray(
|
||||||
quote_cache['bid'] = price
|
# quote,
|
||||||
quote_cache['bsize'] = size
|
# # TODO: check this closer to the broker query api
|
||||||
|
# last_fill=quote.get('fill_time', '')
|
||||||
a = quote_to_marketstore_structarray(
|
# )
|
||||||
quote_cache,
|
# await ms_client.write(symbol, a)
|
||||||
last_fill=quote.get('broker_ts', None)
|
|
||||||
)
|
|
||||||
log.info(a)
|
|
||||||
# breakpoint()
|
|
||||||
await ms_client.write(symbol, a)
|
|
||||||
|
|
||||||
|
|
||||||
# async def stream_quotes(
|
# async def stream_quotes(
|
||||||
|
|
Loading…
Reference in New Issue