Add multi ingestor support and update to new feed API

incr_update_backup
Guillermo Rodriguez 2022-01-15 23:33:23 -03:00 committed by Tyler Goodlet
parent e2ce341f93
commit 5d539b7c49
2 changed files with 83 additions and 55 deletions

View File

@ -136,7 +136,7 @@ def ingest(config, name, test_file, tl, url):
"""Ingest real-time broker quotes and ticks to a marketstore instance. """Ingest real-time broker quotes and ticks to a marketstore instance.
""" """
# global opts # global opts
brokermod = config['brokermod'] brokermods = config['brokermods']
loglevel = config['loglevel'] loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel'] tractorloglevel = config['tractorloglevel']
# log = config['log'] # log = config['log']
@ -145,15 +145,25 @@ def ingest(config, name, test_file, tl, url):
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
symbols = watchlists[name] symbols = watchlists[name]
tractor.run( grouped_syms = {}
partial( for sym in symbols:
ingest_quote_stream, symbol, _, provider = sym.rpartition('.')
symbols, if provider not in grouped_syms:
brokermod.name, grouped_syms[provider] = []
tries=1,
loglevel=loglevel, grouped_syms[provider].append(symbol)
),
name='ingest_marketstore', async def entry_point():
loglevel=tractorloglevel, async with tractor.open_nursery() as n:
debug_mode=True, for provider, symbols in grouped_syms.items():
) await n.run_in_actor(
ingest_quote_stream,
name='ingest_marketstore',
symbols=symbols,
brokername=provider,
tries=1,
actorloglevel=loglevel,
loglevel=tractorloglevel
)
tractor.run(entry_point)

View File

@ -24,7 +24,7 @@
- todo: docker container management automation - todo: docker container management automation
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Dict, Any, List, Callable, Tuple from typing import Dict, Any, List, Callable, Tuple, Optional
import time import time
from math import isnan from math import isnan
@ -49,25 +49,16 @@ _quote_dt = [
('Epoch', 'i8'), ('Epoch', 'i8'),
('Nanoseconds', 'i4'), ('Nanoseconds', 'i4'),
('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask) ('Tick', 'i4'),
# ('fill_time', 'f4'),
('Last', 'f4'), ('Last', 'f4'),
('Bid', 'f4'), ('Bid', 'f4'),
('Bsize', 'i8'), ('Bsize', 'f4'),
('Asize', 'i8'), ('Asize', 'f4'),
('Ask', 'f4'), ('Ask', 'f4'),
('Size', 'i8'), ('Size', 'i8'),
('Volume', 'i8'), ('Volume', 'f4'),
# ('brokerd_ts', 'i64'),
# ('VWAP', 'f4')
] ]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
_tick_map = {
'Up': 1,
'Equal': 0,
'Down': -1,
None: np.nan,
}
class MarketStoreError(Exception): class MarketStoreError(Exception):
@ -87,13 +78,15 @@ def err_on_resp(response: dict) -> None:
def quote_to_marketstore_structarray( def quote_to_marketstore_structarray(
quote: Dict[str, Any], quote: Dict[str, Any],
last_fill: str, last_fill: Optional[float],
) -> np.array: ) -> np.array:
"""Return marketstore writeable structarray from quote ``dict``. """Return marketstore writeable structarray from quote ``dict``.
""" """
if last_fill: if last_fill:
# new fill bby # new fill bby
now = timestamp(last_fill) now = timestamp(last_fill, unit='s')
else: else:
# this should get inserted upstream by the broker-client to # this should get inserted upstream by the broker-client to
# subtract from IPC latency # subtract from IPC latency
@ -123,11 +116,11 @@ def quote_to_marketstore_structarray(
return np.array([tuple(array_input)], dtype=_quote_dt) return np.array([tuple(array_input)], dtype=_quote_dt)
def timestamp(datestr: str) -> int: def timestamp(date, **kwargs) -> int:
"""Return marketstore compatible 'Epoch' integer in nanoseconds """Return marketstore compatible 'Epoch' integer in nanoseconds
from a date formatted str. from a date formatted str.
""" """
return int(pd.Timestamp(datestr).value) return int(pd.Timestamp(date, **kwargs).value)
def mk_tbk(keys: Tuple[str, str, str]) -> str: def mk_tbk(keys: Tuple[str, str, str]) -> str:
@ -206,46 +199,71 @@ async def ingest_quote_stream(
symbols: List[str], symbols: List[str],
brokername: str, brokername: str,
tries: int = 1, tries: int = 1,
loglevel: str = None, actorloglevel: str = None,
) -> None: ) -> None:
"""Ingest a broker quote stream into marketstore in (sampled) tick format. """Ingest a broker quote stream into marketstore in (sampled) tick format.
""" """
async with open_feed( async with open_feed(
brokername, brokername,
symbols, symbols,
loglevel=loglevel, loglevel=actorloglevel,
) as (first_quotes, qstream): ) as feed:
quote_cache = first_quotes.copy()
async with get_client() as ms_client: async with get_client() as ms_client:
# _quote_dt = [
# # these two are required for as a "primary key"
# ('Epoch', 'i8'),
# ('Nanoseconds', 'i4'),
# ('Tick', 'i4'),
#
# ('Last', 'f4'),
# ('Bid', 'f4'),
# ('Bsize', 'f4'),
# ('Asize', 'f4'),
# ('Ask', 'f4'),
# ('Size', 'i8'),
# ('Volume', 'f4'),
# ]
quote_cache = {
'size': 0,
'tick': 0
}
# start ingest to marketstore # start ingest to marketstore
async for quotes in qstream: async for quotes in feed.stream:
log.info(quotes) log.info(quotes)
for symbol, quote in quotes.items(): for symbol, quote in quotes.items():
# remap tick strs to ints for tick in quote.get('ticks', ()):
quote['tick'] = _tick_map[quote.get('tick', 'Equal')] ticktype = tick.get('type')
price = tick.get('price')
size = tick.get('size')
# check for volume update (i.e. did trades happen if ticktype == 'n/a' or price == -1:
# since last quote) # okkk..
new_vol = quote.get('volume', None) continue
if new_vol is None:
log.debug(f"No fills for {symbol}")
if new_vol == quote_cache.get('volume'):
# should never happen due to field diffing
# on sender side
log.error(
f"{symbol}: got same volume as last quote?")
quote_cache.update(quote) # clearing price event
if ticktype == 'trade':
quote_cache['volume'] = quote['volume']
quote_cache['last'] = price
# quote_cache['broker_ts'] = quote['broker_ts']
# l1 book events
elif ticktype in ('ask', 'asize'):
quote_cache['ask'] = price
quote_cache['asize'] = size
elif ticktype in ('bid', 'bsize'):
quote_cache['bid'] = price
quote_cache['bsize'] = size
a = quote_to_marketstore_structarray( a = quote_to_marketstore_structarray(
quote, quote_cache,
# TODO: check this closer to the broker query api last_fill=quote.get('broker_ts', None)
last_fill=quote.get('fill_time', '')
) )
log.info(a)
# breakpoint()
await ms_client.write(symbol, a) await ms_client.write(symbol, a)