diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 4298eb96..c1484706 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -59,15 +59,25 @@ _quote_dt = [ ('Epoch', 'i8'), ('Nanoseconds', 'i4'), - ('Tick', 'i4'), # do we need this? + ('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask) + # ('fill_time', 'f4'), ('Last', 'f4'), ('Bid', 'f4'), - ('Bsize', 'f4'), - ('Asize', 'f4'), + ('Bsize', 'i8'), + ('Asize', 'i8'), ('Ask', 'f4'), ('Size', 'i8'), - ('Volume', 'f4'), + ('Volume', 'i8'), + # ('brokerd_ts', 'i64'), + # ('VWAP', 'f4') ] +_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) +_tick_map = { + 'Up': 1, + 'Equal': 0, + 'Down': -1, + None: np.nan, +} def mk_tbk(keys: tuple[str, str, str]) -> str: @@ -91,8 +101,7 @@ def quote_to_marketstore_structarray( ''' if last_fill: # new fill bby - now = timestamp(last_fill, unit='s') - + now = timestamp(last_fill) else: # this should get inserted upstream by the broker-client to # subtract from IPC latency @@ -126,8 +135,8 @@ def timestamp(date, **kwargs) -> int: ''' Return marketstore compatible 'Epoch' integer in nanoseconds from a date formatted str. - ''' + ''' return int(pd.Timestamp(date, **kwargs).value) @@ -162,21 +171,6 @@ async def ingest_quote_stream( for tick in quote.get('ticks', ()): ticktype = tick.get('type', 'n/a') - # _quote_dt = [ - # # these two are required for as a "primary key" - # ('Epoch', 'i8'), - # ('Nanoseconds', 'i4'), - # ('Tick', 'i4'), - # - # ('Last', 'f4'), - # ('Bid', 'f4'), - # ('Bsize', 'f4'), - # ('Asize', 'f4'), - # ('Ask', 'f4'), - # ('Size', 'i8'), - # ('Volume', 'f4'), - # ] - # techtonic tick write array = quote_to_marketstore_structarray({ 'IsTrade': 1 if ticktype == 'trade' else 0, @@ -187,46 +181,38 @@ async def ingest_quote_stream( await ms_client.write(array, _tick_tbk) - quote_cache = { - 'size': 0, - 'tick': 0 - } - # start ingest to marketstore - async for quotes in feed.stream: - log.info(quotes) - for symbol, quote in quotes.items(): + # LEGACY WRITE LOOP (using old tick dt) + # quote_cache = { + # 'size': 0, + # 'tick': 0 + # } - for tick in quote.get('ticks', ()): - ticktype = tick.get('type') - price = tick.get('price') - size = tick.get('size') + # async for quotes in qstream: + # log.info(quotes) + # for symbol, quote in quotes.items(): - if ticktype == 'n/a' or price == -1: - # okkk.. - continue + # # remap tick strs to ints + # quote['tick'] = _tick_map[quote.get('tick', 'Equal')] - # clearing price event - if ticktype == 'trade': - quote_cache['volume'] = quote['volume'] - quote_cache['last'] = price - # quote_cache['broker_ts'] = quote['broker_ts'] + # # check for volume update (i.e. did trades happen + # # since last quote) + # new_vol = quote.get('volume', None) + # if new_vol is None: + # log.debug(f"No fills for {symbol}") + # if new_vol == quote_cache.get('volume'): + # # should never happen due to field diffing + # # on sender side + # log.error( + # f"{symbol}: got same volume as last quote?") - # l1 book events - elif ticktype in ('ask', 'asize'): - quote_cache['ask'] = price - quote_cache['asize'] = size + # quote_cache.update(quote) - elif ticktype in ('bid', 'bsize'): - quote_cache['bid'] = price - quote_cache['bsize'] = size - - a = quote_to_marketstore_structarray( - quote_cache, - last_fill=quote.get('broker_ts', None) - ) - log.info(a) - # breakpoint() - await ms_client.write(symbol, a) + # a = quote_to_marketstore_structarray( + # quote, + # # TODO: check this closer to the broker query api + # last_fill=quote.get('fill_time', '') + # ) + # await ms_client.write(symbol, a) # async def stream_quotes(