Add multi ingestor support and update to new feed API
parent
c849bb9c4c
commit
3c09bfba57
|
@ -136,7 +136,7 @@ def ingest(config, name, test_file, tl, url):
|
||||||
"""Ingest real-time broker quotes and ticks to a marketstore instance.
|
"""Ingest real-time broker quotes and ticks to a marketstore instance.
|
||||||
"""
|
"""
|
||||||
# global opts
|
# global opts
|
||||||
brokermod = config['brokermod']
|
brokermods = config['brokermods']
|
||||||
loglevel = config['loglevel']
|
loglevel = config['loglevel']
|
||||||
tractorloglevel = config['tractorloglevel']
|
tractorloglevel = config['tractorloglevel']
|
||||||
# log = config['log']
|
# log = config['log']
|
||||||
|
@ -145,15 +145,25 @@ def ingest(config, name, test_file, tl, url):
|
||||||
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
|
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
|
||||||
symbols = watchlists[name]
|
symbols = watchlists[name]
|
||||||
|
|
||||||
tractor.run(
|
grouped_syms = {}
|
||||||
partial(
|
for sym in symbols:
|
||||||
ingest_quote_stream,
|
symbol, _, provider = sym.rpartition('.')
|
||||||
symbols,
|
if provider not in grouped_syms:
|
||||||
brokermod.name,
|
grouped_syms[provider] = []
|
||||||
tries=1,
|
|
||||||
loglevel=loglevel,
|
grouped_syms[provider].append(symbol)
|
||||||
),
|
|
||||||
name='ingest_marketstore',
|
async def entry_point():
|
||||||
loglevel=tractorloglevel,
|
async with tractor.open_nursery() as n:
|
||||||
debug_mode=True,
|
for provider, symbols in grouped_syms.items():
|
||||||
)
|
await n.run_in_actor(
|
||||||
|
ingest_quote_stream,
|
||||||
|
name='ingest_marketstore',
|
||||||
|
symbols=symbols,
|
||||||
|
brokername=provider,
|
||||||
|
tries=1,
|
||||||
|
actorloglevel=loglevel,
|
||||||
|
loglevel=tractorloglevel
|
||||||
|
)
|
||||||
|
|
||||||
|
tractor.run(entry_point)
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
- todo: docker container management automation
|
- todo: docker container management automation
|
||||||
"""
|
"""
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import Dict, Any, List, Callable, Tuple
|
from typing import Dict, Any, List, Callable, Tuple, Optional
|
||||||
import time
|
import time
|
||||||
from math import isnan
|
from math import isnan
|
||||||
|
|
||||||
|
@ -49,25 +49,16 @@ _quote_dt = [
|
||||||
('Epoch', 'i8'),
|
('Epoch', 'i8'),
|
||||||
('Nanoseconds', 'i4'),
|
('Nanoseconds', 'i4'),
|
||||||
|
|
||||||
('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask)
|
('Tick', 'i4'),
|
||||||
# ('fill_time', 'f4'),
|
|
||||||
('Last', 'f4'),
|
('Last', 'f4'),
|
||||||
('Bid', 'f4'),
|
('Bid', 'f4'),
|
||||||
('Bsize', 'i8'),
|
('Bsize', 'f4'),
|
||||||
('Asize', 'i8'),
|
('Asize', 'f4'),
|
||||||
('Ask', 'f4'),
|
('Ask', 'f4'),
|
||||||
('Size', 'i8'),
|
('Size', 'i8'),
|
||||||
('Volume', 'i8'),
|
('Volume', 'f4'),
|
||||||
# ('brokerd_ts', 'i64'),
|
|
||||||
# ('VWAP', 'f4')
|
|
||||||
]
|
]
|
||||||
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
|
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
|
||||||
_tick_map = {
|
|
||||||
'Up': 1,
|
|
||||||
'Equal': 0,
|
|
||||||
'Down': -1,
|
|
||||||
None: np.nan,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class MarketStoreError(Exception):
|
class MarketStoreError(Exception):
|
||||||
|
@ -87,18 +78,20 @@ def err_on_resp(response: dict) -> None:
|
||||||
|
|
||||||
def quote_to_marketstore_structarray(
|
def quote_to_marketstore_structarray(
|
||||||
quote: Dict[str, Any],
|
quote: Dict[str, Any],
|
||||||
last_fill: str,
|
last_fill: Optional[float],
|
||||||
|
|
||||||
) -> np.array:
|
) -> np.array:
|
||||||
"""Return marketstore writeable structarray from quote ``dict``.
|
"""Return marketstore writeable structarray from quote ``dict``.
|
||||||
"""
|
"""
|
||||||
if last_fill:
|
if last_fill:
|
||||||
# new fill bby
|
# new fill bby
|
||||||
now = timestamp(last_fill)
|
now = timestamp(last_fill, unit='s')
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# this should get inserted upstream by the broker-client to
|
# this should get inserted upstream by the broker-client to
|
||||||
# subtract from IPC latency
|
# subtract from IPC latency
|
||||||
now = time.time_ns()
|
now = time.time_ns()
|
||||||
|
|
||||||
secs, ns = now / 10**9, now % 10**9
|
secs, ns = now / 10**9, now % 10**9
|
||||||
|
|
||||||
# pack into List[Tuple[str, Any]]
|
# pack into List[Tuple[str, Any]]
|
||||||
|
@ -123,11 +116,11 @@ def quote_to_marketstore_structarray(
|
||||||
return np.array([tuple(array_input)], dtype=_quote_dt)
|
return np.array([tuple(array_input)], dtype=_quote_dt)
|
||||||
|
|
||||||
|
|
||||||
def timestamp(datestr: str) -> int:
|
def timestamp(date, **kwargs) -> int:
|
||||||
"""Return marketstore compatible 'Epoch' integer in nanoseconds
|
"""Return marketstore compatible 'Epoch' integer in nanoseconds
|
||||||
from a date formatted str.
|
from a date formatted str.
|
||||||
"""
|
"""
|
||||||
return int(pd.Timestamp(datestr).value)
|
return int(pd.Timestamp(date, **kwargs).value)
|
||||||
|
|
||||||
|
|
||||||
def mk_tbk(keys: Tuple[str, str, str]) -> str:
|
def mk_tbk(keys: Tuple[str, str, str]) -> str:
|
||||||
|
@ -206,46 +199,71 @@ async def ingest_quote_stream(
|
||||||
symbols: List[str],
|
symbols: List[str],
|
||||||
brokername: str,
|
brokername: str,
|
||||||
tries: int = 1,
|
tries: int = 1,
|
||||||
loglevel: str = None,
|
actorloglevel: str = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
|
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
|
||||||
"""
|
"""
|
||||||
async with open_feed(
|
async with open_feed(
|
||||||
brokername,
|
brokername,
|
||||||
symbols,
|
symbols,
|
||||||
loglevel=loglevel,
|
loglevel=actorloglevel,
|
||||||
) as (first_quotes, qstream):
|
) as feed:
|
||||||
|
|
||||||
quote_cache = first_quotes.copy()
|
|
||||||
|
|
||||||
async with get_client() as ms_client:
|
async with get_client() as ms_client:
|
||||||
|
|
||||||
|
# _quote_dt = [
|
||||||
|
# # these two are required for as a "primary key"
|
||||||
|
# ('Epoch', 'i8'),
|
||||||
|
# ('Nanoseconds', 'i4'),
|
||||||
|
# ('Tick', 'i4'),
|
||||||
|
#
|
||||||
|
# ('Last', 'f4'),
|
||||||
|
# ('Bid', 'f4'),
|
||||||
|
# ('Bsize', 'f4'),
|
||||||
|
# ('Asize', 'f4'),
|
||||||
|
# ('Ask', 'f4'),
|
||||||
|
# ('Size', 'i8'),
|
||||||
|
# ('Volume', 'f4'),
|
||||||
|
# ]
|
||||||
|
|
||||||
|
quote_cache = {
|
||||||
|
'size': 0,
|
||||||
|
'tick': 0
|
||||||
|
}
|
||||||
# start ingest to marketstore
|
# start ingest to marketstore
|
||||||
async for quotes in qstream:
|
async for quotes in feed.stream:
|
||||||
log.info(quotes)
|
log.info(quotes)
|
||||||
for symbol, quote in quotes.items():
|
for symbol, quote in quotes.items():
|
||||||
|
|
||||||
# remap tick strs to ints
|
for tick in quote.get('ticks', ()):
|
||||||
quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
|
ticktype = tick.get('type')
|
||||||
|
price = tick.get('price')
|
||||||
|
size = tick.get('size')
|
||||||
|
|
||||||
# check for volume update (i.e. did trades happen
|
if ticktype == 'n/a' or price == -1:
|
||||||
# since last quote)
|
# okkk..
|
||||||
new_vol = quote.get('volume', None)
|
continue
|
||||||
if new_vol is None:
|
|
||||||
log.debug(f"No fills for {symbol}")
|
|
||||||
if new_vol == quote_cache.get('volume'):
|
|
||||||
# should never happen due to field diffing
|
|
||||||
# on sender side
|
|
||||||
log.error(
|
|
||||||
f"{symbol}: got same volume as last quote?")
|
|
||||||
|
|
||||||
quote_cache.update(quote)
|
# clearing price event
|
||||||
|
if ticktype == 'trade':
|
||||||
|
quote_cache['volume'] = quote['volume']
|
||||||
|
quote_cache['last'] = price
|
||||||
|
# quote_cache['broker_ts'] = quote['broker_ts']
|
||||||
|
|
||||||
|
# l1 book events
|
||||||
|
elif ticktype in ('ask', 'asize'):
|
||||||
|
quote_cache['ask'] = price
|
||||||
|
quote_cache['asize'] = size
|
||||||
|
|
||||||
|
elif ticktype in ('bid', 'bsize'):
|
||||||
|
quote_cache['bid'] = price
|
||||||
|
quote_cache['bsize'] = size
|
||||||
|
|
||||||
a = quote_to_marketstore_structarray(
|
a = quote_to_marketstore_structarray(
|
||||||
quote,
|
quote_cache,
|
||||||
# TODO: check this closer to the broker query api
|
last_fill=quote.get('broker_ts', None)
|
||||||
last_fill=quote.get('fill_time', '')
|
|
||||||
)
|
)
|
||||||
|
log.info(a)
|
||||||
|
# breakpoint()
|
||||||
await ms_client.write(symbol, a)
|
await ms_client.write(symbol, a)
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue