From afe6f0b42b0b0679587951cc7ce9fdaccf03c97a Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sun, 16 Jan 2022 00:20:57 -0300 Subject: [PATCH] Simplify and optimize tick format, similar to techtonicdb's --- piker/data/marketstore.py | 90 ++++++++++----------------------------- 1 file changed, 23 insertions(+), 67 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 8fd737cf..1d271452 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -49,14 +49,10 @@ _quote_dt = [ ('Epoch', 'i8'), ('Nanoseconds', 'i4'), - ('Tick', 'i4'), - ('Last', 'f4'), - ('Bid', 'f4'), - ('Bsize', 'f4'), - ('Asize', 'f4'), - ('Ask', 'f4'), - ('Size', 'i8'), - ('Volume', 'f4'), + ('IsTrade', 'i1'), + ('IsBid', 'i1'), + ('Price', 'f8'), + ('Size', 'f8') ] _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) @@ -109,8 +105,7 @@ def quote_to_marketstore_structarray( # for ``np.int`` we use 0 as a null value none = 0 - # casefold? see https://github.com/alpacahq/marketstore/issues/324 - val = quote.get(name.casefold(), none) + val = quote.get(name, none) array_input.append(val) return np.array([tuple(array_input)], dtype=_quote_dt) @@ -203,67 +198,28 @@ async def ingest_quote_stream( ) -> None: """Ingest a broker quote stream into marketstore in (sampled) tick format. """ - async with open_feed( - brokername, - symbols, - loglevel=actorloglevel, - ) as feed: - async with get_client() as ms_client: + async with ( + open_feed(brokername, symbols, loglevel=actorloglevel) as feed, + get_client() as ms_client + ): + async for quotes in feed.stream: + log.info(quotes) + for symbol, quote in quotes.items(): + for tick in quote.get('ticks', ()): + ticktype = tick.get('type', 'n/a') - # _quote_dt = [ - # # these two are required for as a "primary key" - # ('Epoch', 'i8'), - # ('Nanoseconds', 'i4'), - # ('Tick', 'i4'), - # - # ('Last', 'f4'), - # ('Bid', 'f4'), - # ('Bsize', 'f4'), - # ('Asize', 'f4'), - # ('Ask', 'f4'), - # ('Size', 'i8'), - # ('Volume', 'f4'), - # ] + if ticktype == 'n/a': + # okkk.. + continue - quote_cache = { - 'size': 0, - 'tick': 0 - } - # start ingest to marketstore - async for quotes in feed.stream: - log.info(quotes) - for symbol, quote in quotes.items(): + a = quote_to_marketstore_structarray({ + 'IsTrade': 1 if ticktype == 'trade' else 0, + 'IsBid': 1 if ticktype in ('bid', 'bsize') else 0, + 'Price': tick.get('price'), + 'Size': tick.get('size') + }, last_fill=quote.get('broker_ts', None)) - for tick in quote.get('ticks', ()): - ticktype = tick.get('type') - price = tick.get('price') - size = tick.get('size') - - if ticktype == 'n/a' or price == -1: - # okkk.. - continue - - # clearing price event - if ticktype == 'trade': - quote_cache['volume'] = quote['volume'] - quote_cache['last'] = price - # quote_cache['broker_ts'] = quote['broker_ts'] - - # l1 book events - elif ticktype in ('ask', 'asize'): - quote_cache['ask'] = price - quote_cache['asize'] = size - - elif ticktype in ('bid', 'bsize'): - quote_cache['bid'] = price - quote_cache['bsize'] = size - - a = quote_to_marketstore_structarray( - quote_cache, - last_fill=quote.get('broker_ts', None) - ) log.info(a) - # breakpoint() await ms_client.write(symbol, a)