Add multi ingestor support and update to new feed API

m4_corrections
Guillermo Rodriguez 2022-01-15 23:33:23 -03:00 committed by Tyler Goodlet
parent 8800ee0661
commit 9e4dec98da
2 changed files with 83 additions and 55 deletions

View File

@ -136,7 +136,7 @@ def ingest(config, name, test_file, tl, url):
"""Ingest real-time broker quotes and ticks to a marketstore instance.
"""
# global opts
brokermod = config['brokermod']
brokermods = config['brokermods']
loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel']
# log = config['log']
@ -145,15 +145,25 @@ def ingest(config, name, test_file, tl, url):
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
symbols = watchlists[name]
tractor.run(
partial(
ingest_quote_stream,
symbols,
brokermod.name,
tries=1,
loglevel=loglevel,
),
name='ingest_marketstore',
loglevel=tractorloglevel,
debug_mode=True,
)
grouped_syms = {}
for sym in symbols:
symbol, _, provider = sym.rpartition('.')
if provider not in grouped_syms:
grouped_syms[provider] = []
grouped_syms[provider].append(symbol)
async def entry_point():
async with tractor.open_nursery() as n:
for provider, symbols in grouped_syms.items():
await n.run_in_actor(
ingest_quote_stream,
name='ingest_marketstore',
symbols=symbols,
brokername=provider,
tries=1,
actorloglevel=loglevel,
loglevel=tractorloglevel
)
tractor.run(entry_point)

View File

@ -24,7 +24,7 @@
- todo: docker container management automation
"""
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Callable, Tuple
from typing import Dict, Any, List, Callable, Tuple, Optional
import time
from math import isnan
@ -49,25 +49,16 @@ _quote_dt = [
('Epoch', 'i8'),
('Nanoseconds', 'i4'),
('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask)
# ('fill_time', 'f4'),
('Tick', 'i4'),
('Last', 'f4'),
('Bid', 'f4'),
('Bsize', 'i8'),
('Asize', 'i8'),
('Bsize', 'f4'),
('Asize', 'f4'),
('Ask', 'f4'),
('Size', 'i8'),
('Volume', 'i8'),
# ('brokerd_ts', 'i64'),
# ('VWAP', 'f4')
('Volume', 'f4'),
]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
_tick_map = {
'Up': 1,
'Equal': 0,
'Down': -1,
None: np.nan,
}
class MarketStoreError(Exception):
@ -87,18 +78,20 @@ def err_on_resp(response: dict) -> None:
def quote_to_marketstore_structarray(
quote: Dict[str, Any],
last_fill: str,
last_fill: Optional[float],
) -> np.array:
"""Return marketstore writeable structarray from quote ``dict``.
"""
if last_fill:
# new fill bby
now = timestamp(last_fill)
now = timestamp(last_fill, unit='s')
else:
# this should get inserted upstream by the broker-client to
# subtract from IPC latency
now = time.time_ns()
secs, ns = now / 10**9, now % 10**9
# pack into List[Tuple[str, Any]]
@ -123,11 +116,11 @@ def quote_to_marketstore_structarray(
return np.array([tuple(array_input)], dtype=_quote_dt)
def timestamp(datestr: str) -> int:
def timestamp(date, **kwargs) -> int:
"""Return marketstore compatible 'Epoch' integer in nanoseconds
from a date formatted str.
"""
return int(pd.Timestamp(datestr).value)
return int(pd.Timestamp(date, **kwargs).value)
def mk_tbk(keys: Tuple[str, str, str]) -> str:
@ -206,46 +199,71 @@ async def ingest_quote_stream(
symbols: List[str],
brokername: str,
tries: int = 1,
loglevel: str = None,
actorloglevel: str = None,
) -> None:
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
"""
async with open_feed(
brokername,
symbols,
loglevel=loglevel,
) as (first_quotes, qstream):
quote_cache = first_quotes.copy()
loglevel=actorloglevel,
) as feed:
async with get_client() as ms_client:
# _quote_dt = [
# # these two are required for as a "primary key"
# ('Epoch', 'i8'),
# ('Nanoseconds', 'i4'),
# ('Tick', 'i4'),
#
# ('Last', 'f4'),
# ('Bid', 'f4'),
# ('Bsize', 'f4'),
# ('Asize', 'f4'),
# ('Ask', 'f4'),
# ('Size', 'i8'),
# ('Volume', 'f4'),
# ]
quote_cache = {
'size': 0,
'tick': 0
}
# start ingest to marketstore
async for quotes in qstream:
async for quotes in feed.stream:
log.info(quotes)
for symbol, quote in quotes.items():
# remap tick strs to ints
quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
for tick in quote.get('ticks', ()):
ticktype = tick.get('type')
price = tick.get('price')
size = tick.get('size')
# check for volume update (i.e. did trades happen
# since last quote)
new_vol = quote.get('volume', None)
if new_vol is None:
log.debug(f"No fills for {symbol}")
if new_vol == quote_cache.get('volume'):
# should never happen due to field diffing
# on sender side
log.error(
f"{symbol}: got same volume as last quote?")
if ticktype == 'n/a' or price == -1:
# okkk..
continue
quote_cache.update(quote)
# clearing price event
if ticktype == 'trade':
quote_cache['volume'] = quote['volume']
quote_cache['last'] = price
# quote_cache['broker_ts'] = quote['broker_ts']
# l1 book events
elif ticktype in ('ask', 'asize'):
quote_cache['ask'] = price
quote_cache['asize'] = size
elif ticktype in ('bid', 'bsize'):
quote_cache['bid'] = price
quote_cache['bsize'] = size
a = quote_to_marketstore_structarray(
quote,
# TODO: check this closer to the broker query api
last_fill=quote.get('fill_time', '')
quote_cache,
last_fill=quote.get('broker_ts', None)
)
log.info(a)
# breakpoint()
await ms_client.write(symbol, a)