From 846e6a3c747a29dd478854e685e0c6a47203229e Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sat, 15 Jan 2022 23:33:23 -0300 Subject: [PATCH] Add multi ingestor support and update to new feed API --- piker/data/cli.py | 36 +++++++++----- piker/data/marketstore.py | 102 ++++++++++++++++++++++---------------- 2 files changed, 83 insertions(+), 55 deletions(-) diff --git a/piker/data/cli.py b/piker/data/cli.py index 7a774fb5..2206bd6a 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -136,7 +136,7 @@ def ingest(config, name, test_file, tl, url): """Ingest real-time broker quotes and ticks to a marketstore instance. """ # global opts - brokermod = config['brokermod'] + brokermods = config['brokermods'] loglevel = config['loglevel'] tractorloglevel = config['tractorloglevel'] # log = config['log'] @@ -145,15 +145,25 @@ def ingest(config, name, test_file, tl, url): watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) symbols = watchlists[name] - tractor.run( - partial( - ingest_quote_stream, - symbols, - brokermod.name, - tries=1, - loglevel=loglevel, - ), - name='ingest_marketstore', - loglevel=tractorloglevel, - debug_mode=True, - ) + grouped_syms = {} + for sym in symbols: + symbol, _, provider = sym.rpartition('.') + if provider not in grouped_syms: + grouped_syms[provider] = [] + + grouped_syms[provider].append(symbol) + + async def entry_point(): + async with tractor.open_nursery() as n: + for provider, symbols in grouped_syms.items(): + await n.run_in_actor( + ingest_quote_stream, + name='ingest_marketstore', + symbols=symbols, + brokername=provider, + tries=1, + actorloglevel=loglevel, + loglevel=tractorloglevel + ) + + tractor.run(entry_point) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 27bcda70..8fd737cf 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -24,7 +24,7 @@ - todo: docker container management automation """ from contextlib import asynccontextmanager -from typing import Dict, Any, List, Callable, Tuple +from typing import Dict, Any, List, Callable, Tuple, Optional import time from math import isnan @@ -49,25 +49,16 @@ _quote_dt = [ ('Epoch', 'i8'), ('Nanoseconds', 'i4'), - ('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask) - # ('fill_time', 'f4'), + ('Tick', 'i4'), ('Last', 'f4'), ('Bid', 'f4'), - ('Bsize', 'i8'), - ('Asize', 'i8'), + ('Bsize', 'f4'), + ('Asize', 'f4'), ('Ask', 'f4'), ('Size', 'i8'), - ('Volume', 'i8'), - # ('brokerd_ts', 'i64'), - # ('VWAP', 'f4') + ('Volume', 'f4'), ] _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) -_tick_map = { - 'Up': 1, - 'Equal': 0, - 'Down': -1, - None: np.nan, -} class MarketStoreError(Exception): @@ -87,18 +78,20 @@ def err_on_resp(response: dict) -> None: def quote_to_marketstore_structarray( quote: Dict[str, Any], - last_fill: str, + last_fill: Optional[float], + ) -> np.array: """Return marketstore writeable structarray from quote ``dict``. """ if last_fill: # new fill bby - now = timestamp(last_fill) + now = timestamp(last_fill, unit='s') + else: # this should get inserted upstream by the broker-client to # subtract from IPC latency now = time.time_ns() - + secs, ns = now / 10**9, now % 10**9 # pack into List[Tuple[str, Any]] @@ -123,11 +116,11 @@ def quote_to_marketstore_structarray( return np.array([tuple(array_input)], dtype=_quote_dt) -def timestamp(datestr: str) -> int: +def timestamp(date, **kwargs) -> int: """Return marketstore compatible 'Epoch' integer in nanoseconds from a date formatted str. """ - return int(pd.Timestamp(datestr).value) + return int(pd.Timestamp(date, **kwargs).value) def mk_tbk(keys: Tuple[str, str, str]) -> str: @@ -206,46 +199,71 @@ async def ingest_quote_stream( symbols: List[str], brokername: str, tries: int = 1, - loglevel: str = None, + actorloglevel: str = None, ) -> None: """Ingest a broker quote stream into marketstore in (sampled) tick format. """ async with open_feed( brokername, symbols, - loglevel=loglevel, - ) as (first_quotes, qstream): - - quote_cache = first_quotes.copy() - + loglevel=actorloglevel, + ) as feed: async with get_client() as ms_client: + # _quote_dt = [ + # # these two are required for as a "primary key" + # ('Epoch', 'i8'), + # ('Nanoseconds', 'i4'), + # ('Tick', 'i4'), + # + # ('Last', 'f4'), + # ('Bid', 'f4'), + # ('Bsize', 'f4'), + # ('Asize', 'f4'), + # ('Ask', 'f4'), + # ('Size', 'i8'), + # ('Volume', 'f4'), + # ] + + quote_cache = { + 'size': 0, + 'tick': 0 + } # start ingest to marketstore - async for quotes in qstream: + async for quotes in feed.stream: log.info(quotes) for symbol, quote in quotes.items(): - # remap tick strs to ints - quote['tick'] = _tick_map[quote.get('tick', 'Equal')] + for tick in quote.get('ticks', ()): + ticktype = tick.get('type') + price = tick.get('price') + size = tick.get('size') - # check for volume update (i.e. did trades happen - # since last quote) - new_vol = quote.get('volume', None) - if new_vol is None: - log.debug(f"No fills for {symbol}") - if new_vol == quote_cache.get('volume'): - # should never happen due to field diffing - # on sender side - log.error( - f"{symbol}: got same volume as last quote?") + if ticktype == 'n/a' or price == -1: + # okkk.. + continue - quote_cache.update(quote) + # clearing price event + if ticktype == 'trade': + quote_cache['volume'] = quote['volume'] + quote_cache['last'] = price + # quote_cache['broker_ts'] = quote['broker_ts'] + + # l1 book events + elif ticktype in ('ask', 'asize'): + quote_cache['ask'] = price + quote_cache['asize'] = size + + elif ticktype in ('bid', 'bsize'): + quote_cache['bid'] = price + quote_cache['bsize'] = size a = quote_to_marketstore_structarray( - quote, - # TODO: check this closer to the broker query api - last_fill=quote.get('fill_time', '') + quote_cache, + last_fill=quote.get('broker_ts', None) ) + log.info(a) + # breakpoint() await ms_client.write(symbol, a)