Simplify and optimize tick format, similar to techtonicdb's

m4_corrections
Guillermo Rodriguez 2022-01-16 00:20:57 -03:00 committed by Tyler Goodlet
parent 9e4dec98da
commit c91599472d
1 changed files with 23 additions and 67 deletions

View File

@ -49,14 +49,10 @@ _quote_dt = [
('Epoch', 'i8'), ('Epoch', 'i8'),
('Nanoseconds', 'i4'), ('Nanoseconds', 'i4'),
('Tick', 'i4'), ('IsTrade', 'i1'),
('Last', 'f4'), ('IsBid', 'i1'),
('Bid', 'f4'), ('Price', 'f8'),
('Bsize', 'f4'), ('Size', 'f8')
('Asize', 'f4'),
('Ask', 'f4'),
('Size', 'i8'),
('Volume', 'f4'),
] ]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
@ -109,8 +105,7 @@ def quote_to_marketstore_structarray(
# for ``np.int`` we use 0 as a null value # for ``np.int`` we use 0 as a null value
none = 0 none = 0
# casefold? see https://github.com/alpacahq/marketstore/issues/324 val = quote.get(name, none)
val = quote.get(name.casefold(), none)
array_input.append(val) array_input.append(val)
return np.array([tuple(array_input)], dtype=_quote_dt) return np.array([tuple(array_input)], dtype=_quote_dt)
@ -203,67 +198,28 @@ async def ingest_quote_stream(
) -> None: ) -> None:
"""Ingest a broker quote stream into marketstore in (sampled) tick format. """Ingest a broker quote stream into marketstore in (sampled) tick format.
""" """
async with open_feed( async with (
brokername, open_feed(brokername, symbols, loglevel=actorloglevel) as feed,
symbols, get_client() as ms_client
loglevel=actorloglevel, ):
) as feed:
async with get_client() as ms_client:
# _quote_dt = [
# # these two are required for as a "primary key"
# ('Epoch', 'i8'),
# ('Nanoseconds', 'i4'),
# ('Tick', 'i4'),
#
# ('Last', 'f4'),
# ('Bid', 'f4'),
# ('Bsize', 'f4'),
# ('Asize', 'f4'),
# ('Ask', 'f4'),
# ('Size', 'i8'),
# ('Volume', 'f4'),
# ]
quote_cache = {
'size': 0,
'tick': 0
}
# start ingest to marketstore
async for quotes in feed.stream: async for quotes in feed.stream:
log.info(quotes) log.info(quotes)
for symbol, quote in quotes.items(): for symbol, quote in quotes.items():
for tick in quote.get('ticks', ()): for tick in quote.get('ticks', ()):
ticktype = tick.get('type') ticktype = tick.get('type', 'n/a')
price = tick.get('price')
size = tick.get('size')
if ticktype == 'n/a' or price == -1: if ticktype == 'n/a':
# okkk.. # okkk..
continue continue
# clearing price event a = quote_to_marketstore_structarray({
if ticktype == 'trade': 'IsTrade': 1 if ticktype == 'trade' else 0,
quote_cache['volume'] = quote['volume'] 'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
quote_cache['last'] = price 'Price': tick.get('price'),
# quote_cache['broker_ts'] = quote['broker_ts'] 'Size': tick.get('size')
}, last_fill=quote.get('broker_ts', None))
# l1 book events
elif ticktype in ('ask', 'asize'):
quote_cache['ask'] = price
quote_cache['asize'] = size
elif ticktype in ('bid', 'bsize'):
quote_cache['bid'] = price
quote_cache['bsize'] = size
a = quote_to_marketstore_structarray(
quote_cache,
last_fill=quote.get('broker_ts', None)
)
log.info(a) log.info(a)
# breakpoint()
await ms_client.write(symbol, a) await ms_client.write(symbol, a)