Simplify and optimize tick format, similar to techtonicdb's
parent
5d539b7c49
commit
afe6f0b42b
|
@ -49,14 +49,10 @@ _quote_dt = [
|
||||||
('Epoch', 'i8'),
|
('Epoch', 'i8'),
|
||||||
('Nanoseconds', 'i4'),
|
('Nanoseconds', 'i4'),
|
||||||
|
|
||||||
('Tick', 'i4'),
|
('IsTrade', 'i1'),
|
||||||
('Last', 'f4'),
|
('IsBid', 'i1'),
|
||||||
('Bid', 'f4'),
|
('Price', 'f8'),
|
||||||
('Bsize', 'f4'),
|
('Size', 'f8')
|
||||||
('Asize', 'f4'),
|
|
||||||
('Ask', 'f4'),
|
|
||||||
('Size', 'i8'),
|
|
||||||
('Volume', 'f4'),
|
|
||||||
]
|
]
|
||||||
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
|
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
|
||||||
|
|
||||||
|
@ -109,8 +105,7 @@ def quote_to_marketstore_structarray(
|
||||||
# for ``np.int`` we use 0 as a null value
|
# for ``np.int`` we use 0 as a null value
|
||||||
none = 0
|
none = 0
|
||||||
|
|
||||||
# casefold? see https://github.com/alpacahq/marketstore/issues/324
|
val = quote.get(name, none)
|
||||||
val = quote.get(name.casefold(), none)
|
|
||||||
array_input.append(val)
|
array_input.append(val)
|
||||||
|
|
||||||
return np.array([tuple(array_input)], dtype=_quote_dt)
|
return np.array([tuple(array_input)], dtype=_quote_dt)
|
||||||
|
@ -203,67 +198,28 @@ async def ingest_quote_stream(
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
|
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
|
||||||
"""
|
"""
|
||||||
async with open_feed(
|
async with (
|
||||||
brokername,
|
open_feed(brokername, symbols, loglevel=actorloglevel) as feed,
|
||||||
symbols,
|
get_client() as ms_client
|
||||||
loglevel=actorloglevel,
|
):
|
||||||
) as feed:
|
|
||||||
async with get_client() as ms_client:
|
|
||||||
|
|
||||||
# _quote_dt = [
|
|
||||||
# # these two are required for as a "primary key"
|
|
||||||
# ('Epoch', 'i8'),
|
|
||||||
# ('Nanoseconds', 'i4'),
|
|
||||||
# ('Tick', 'i4'),
|
|
||||||
#
|
|
||||||
# ('Last', 'f4'),
|
|
||||||
# ('Bid', 'f4'),
|
|
||||||
# ('Bsize', 'f4'),
|
|
||||||
# ('Asize', 'f4'),
|
|
||||||
# ('Ask', 'f4'),
|
|
||||||
# ('Size', 'i8'),
|
|
||||||
# ('Volume', 'f4'),
|
|
||||||
# ]
|
|
||||||
|
|
||||||
quote_cache = {
|
|
||||||
'size': 0,
|
|
||||||
'tick': 0
|
|
||||||
}
|
|
||||||
# start ingest to marketstore
|
|
||||||
async for quotes in feed.stream:
|
async for quotes in feed.stream:
|
||||||
log.info(quotes)
|
log.info(quotes)
|
||||||
for symbol, quote in quotes.items():
|
for symbol, quote in quotes.items():
|
||||||
|
|
||||||
for tick in quote.get('ticks', ()):
|
for tick in quote.get('ticks', ()):
|
||||||
ticktype = tick.get('type')
|
ticktype = tick.get('type', 'n/a')
|
||||||
price = tick.get('price')
|
|
||||||
size = tick.get('size')
|
|
||||||
|
|
||||||
if ticktype == 'n/a' or price == -1:
|
if ticktype == 'n/a':
|
||||||
# okkk..
|
# okkk..
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# clearing price event
|
a = quote_to_marketstore_structarray({
|
||||||
if ticktype == 'trade':
|
'IsTrade': 1 if ticktype == 'trade' else 0,
|
||||||
quote_cache['volume'] = quote['volume']
|
'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
|
||||||
quote_cache['last'] = price
|
'Price': tick.get('price'),
|
||||||
# quote_cache['broker_ts'] = quote['broker_ts']
|
'Size': tick.get('size')
|
||||||
|
}, last_fill=quote.get('broker_ts', None))
|
||||||
|
|
||||||
# l1 book events
|
|
||||||
elif ticktype in ('ask', 'asize'):
|
|
||||||
quote_cache['ask'] = price
|
|
||||||
quote_cache['asize'] = size
|
|
||||||
|
|
||||||
elif ticktype in ('bid', 'bsize'):
|
|
||||||
quote_cache['bid'] = price
|
|
||||||
quote_cache['bsize'] = size
|
|
||||||
|
|
||||||
a = quote_to_marketstore_structarray(
|
|
||||||
quote_cache,
|
|
||||||
last_fill=quote.get('broker_ts', None)
|
|
||||||
)
|
|
||||||
log.info(a)
|
log.info(a)
|
||||||
# breakpoint()
|
|
||||||
await ms_client.write(symbol, a)
|
await ms_client.write(symbol, a)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue