Begin to wrap marketstore as a data feed

Wrap the sync client in an async interface in anticipation of an actual
async client. This starts support for the `open_fee()`/`stream_quotes()`
api though the tick normalization isn't correct yet.
marketstore_integration
Tyler Goodlet 2020-08-01 20:08:05 -04:00
parent 702c63f607
commit 316137fdf2
2 changed files with 187 additions and 163 deletions

View File

@ -1,7 +1,7 @@
""" """
Data feed apis and infra. Data feed apis and infra.
We ship some tsdb integrations for retrieving We provide tsdb integrations for retrieving
and storing data from your brokers as well as and storing data from your brokers as well as
sharing your feeds with other fellow pikers. sharing your feeds with other fellow pikers.
""" """
@ -91,6 +91,8 @@ async def open_feed(
symbols: Sequence[str], symbols: Sequence[str],
loglevel: str = 'info', loglevel: str = 'info',
) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[Dict[str, Any]]:
"""Open a "data feed" which provides streamed real-time quotes.
"""
try: try:
mod = get_brokermod(name) mod = get_brokermod(name)
except ImportError: except ImportError:

View File

@ -1,41 +1,38 @@
""" """
``marketstore`` integration. ``marketstore`` integration.
- TICK data ingest routines - client management routines
- ticK data ingest routines
- websocket client for subscribing to write triggers - websocket client for subscribing to write triggers
- docker container management automation - todo: tick sequence stream-cloning for testing
- todo: docker container management automation
""" """
from pprint import pformat from contextlib import asynccontextmanager
from typing import Dict, Any, List from typing import Dict, Any, List, Callable, Tuple
from functools import partial
import time import time
from math import isnan
import msgpack import msgpack
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import pymarketstore as pymkts import pymarketstore as pymkts
import click
import trio
import tractor
from trio_websocket import open_websocket_url from trio_websocket import open_websocket_url
from . import maybe_spawn_brokerd from ..log import get_logger, get_console_log
from ..cli import cli from ..data import open_feed
from .. import watchlists as wl
from ..brokers.data import DataFeed
from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
_tick_tbk_ids = ('1Sec', 'TICK') _tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk = '{}/' + '/'.join(_tick_tbk_ids) _tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
_url: str = 'http://localhost:5993/rpc'
_quote_dt = [ _quote_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'), ('Epoch', 'i8'),
('Nanoseconds', 'i4'), ('Nanoseconds', 'i4'),
('Tick', 'i4'),
('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask)
# ('fill_time', 'f4'), # ('fill_time', 'f4'),
('Last', 'f4'), ('Last', 'f4'),
('Bid', 'f4'), ('Bid', 'f4'),
@ -44,11 +41,10 @@ _quote_dt = [
('Ask', 'f4'), ('Ask', 'f4'),
('Size', 'i8'), ('Size', 'i8'),
('Volume', 'i8'), ('Volume', 'i8'),
# ('Broker_time_ns', 'i64'),
# ('VWAP', 'f4') # ('VWAP', 'f4')
] ]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
_tick_map = { _tick_map = {
'Up': 1, 'Up': 1,
'Equal': 0, 'Equal': 0,
@ -84,7 +80,7 @@ def quote_to_marketstore_structarray(
else: else:
# this should get inserted upstream by the broker-client to # this should get inserted upstream by the broker-client to
# subtract from IPC latency # subtract from IPC latency
now = timestamp(pd.Timestamp.now()) now = time.time_ns()
secs, ns = now / 10**9, now % 10**9 secs, ns = now / 10**9, now % 10**9
@ -109,172 +105,152 @@ def quote_to_marketstore_structarray(
def timestamp(datestr: str) -> int: def timestamp(datestr: str) -> int:
"""Return marketstore compatible 'Epoch' integer in nanoseconds. """Return marketstore compatible 'Epoch' integer in nanoseconds
from a date formatted str.
""" """
return int(pd.Timestamp(datestr).value) return int(pd.Timestamp(datestr).value)
@cli.command() def mk_tbk(keys: Tuple[str, str, str]) -> str:
@click.option('--test-file', '-t', help='Test quote stream file') """Generate a marketstore table key from a tuple.
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--tl', is_flag=True, help='Enable tractor logging') Converts,
@click.option( ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
'--url',
default='http://localhost:5993/rpc',
help='HTTP URL of marketstore instance'
)
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl, url):
"""Ingest real-time broker quotes and ticks to a marketstore instance.
""" """
# global opts return '{}/' + '/'.join(keys)
brokermod = config['brokermod']
loglevel = config['loglevel']
log = config['log']
watchlist_from_file = wl.ensure_watchlists(config['wl_path'])
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
symbols = watchlists[name]
async def main(tries): class Client:
async with maybe_spawn_brokerd( """Async wrapper around the alpaca ``pymarketstore`` sync client.
brokername=brokermod.name,
tries=tries,
loglevel=loglevel
) as portal:
# connect to broker data feed
feed = DataFeed(portal, brokermod)
qstream, quotes = await feed.open_stream(
symbols,
'stock',
rate=3,
test=test_file,
)
first_quotes, _ = feed.format_quotes(quotes) This will server as the shell for building out a proper async client
that isn't horribly documented and un-tested..
"""
def __init__(self, url: str):
self._client = pymkts.Client(url)
if first_quotes[0].get('last') is None: async def _invoke(
log.error("Broker API is down temporarily") self,
return meth: Callable,
*args,
**kwargs,
) -> Any:
return err_on_resp(meth(*args, **kwargs))
quote_cache = {quote['symbol']: quote for quote in first_quotes} async def destroy(
self,
tbk: Tuple[str, str, str],
) -> None:
return await self._invoke(self._client.destroy, mk_tbk(tbk))
client = pymkts.Client(endpoint=url) async def list_symbols(
self,
tbk: str,
) -> List[str]:
return await self._invoke(self._client.list_symbols, mk_tbk(tbk))
async def write(
self,
symbol: str,
array: np.ndarray,
) -> None:
start = time.time()
await self._invoke(
self._client.write,
array,
_tick_tbk.format(symbol),
isvariablelength=True
)
log.debug(f"{symbol} write time (s): {time.time() - start}")
def query(
self,
symbol,
tbk: Tuple[str, str] = _tick_tbk_ids,
) -> pd.DataFrame:
# XXX: causes crash
# client.query(pymkts.Params(symbol, '*', 'OHCLV'
result = self._client.query(
pymkts.Params(symbol, *tbk),
)
return result.first().df()
@asynccontextmanager
async def get_client(
url: str = _url,
) -> Client:
yield Client(url)
async def ingest_quote_stream(
symbols: List[str],
brokername: str,
tries: int = 1,
loglevel: str = None,
) -> None:
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
"""
async with open_feed(
brokername,
symbols,
loglevel=loglevel,
) as (first_quotes, qstream):
quote_cache = first_quotes.copy()
async with get_client() as ms_client:
# start ingest to marketstore # start ingest to marketstore
async for quotes in qstream: async for quotes in qstream:
log.info(quotes)
for symbol, quote in quotes.items(): for symbol, quote in quotes.items():
fmt_quote, _ = brokermod.format_stock_quote(
quote,
feed._symbol_data_cache
)
# remap tick strs to ints # remap tick strs to ints
fmt_quote['tick'] = _tick_map[ quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
fmt_quote.get('tick', 'Equal')
]
# check for volume update (i.e. did trades happen # check for volume update (i.e. did trades happen
# since last quote) # since last quote)
new_vol = fmt_quote.get('volume', None) new_vol = quote.get('volume', None)
if new_vol is None: if new_vol is None:
log.debug(f"No fills for {symbol}") log.debug(f"No fills for {symbol}")
if new_vol == quote_cache.get('volume'): if new_vol == quote_cache.get('volume'):
# should never happen due to field diffing
# on sender side
log.error( log.error(
f"{symbol}: got same volume as last quote?") f"{symbol}: got same volume as last quote?")
quote_cache.update(fmt_quote) quote_cache.update(quote)
a = quote_to_marketstore_structarray( a = quote_to_marketstore_structarray(
fmt_quote, quote,
# TODO: check this closer to the broker query api # TODO: check this closer to the broker query api
last_fill=fmt_quote.get('last_fill', '') last_fill=quote.get('fill_time', '')
) )
start = time.time() await ms_client.write(symbol, a)
err_on_resp(client.write(
a, _tick_tbk.format(symbol), isvariablelength=True)
)
log.trace(
f"{symbol} write time (s): {time.time() - start}")
tractor.run(
partial(main, tries=1),
name='ingest_marketstore',
loglevel=loglevel if tl else None,
# start_method='forkserver',
)
@cli.command() async def stream_quotes(
@click.option( symbols: List[str],
'--tl',
is_flag=True,
help='Enable tractor logging')
@click.option(
'--url',
default='http://localhost:5993/rpc',
help='HTTP URL of marketstore instance'
)
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ms_shell(config, name, tl, url):
"""Start an IPython shell ready to query the local marketstore db.
"""
client = pymkts.Client(url)
def query(name, tbk=_tick_tbk_ids):
return client.query(
pymkts.Params(name, *tbk)).first().df()
# causes crash
# client.query(pymkts.Params(symbol, '*', 'OHCLV'
from IPython import embed
embed()
@cli.command()
@click.option(
'--url',
default='http://localhost:5993/rpc',
help='HTTP URL of marketstore instance'
)
@click.argument('names', nargs=-1)
@click.pass_obj
def marketstore_destroy(config: dict, names: List[str], url: str) -> None:
"""Destroy symbol entries in the local marketstore instance.
"""
client = pymkts.Client(url)
if not names:
names = client.list_symbols()
# default is to wipe db entirely.
answer = input(
"This will entirely wipe you local marketstore db @ "
f"{url} of the following symbols:\n {pformat(names)}"
"\n\nDelete [N/y]?\n")
if answer == 'y':
for sym in names:
tbk = _tick_tbk.format(sym)
print(f"Destroying {tbk}..")
err_on_resp(client.destroy(_tick_tbk.format(sym)))
else:
print("Nothing deleted.")
async def open_quote_stream(
tbks: List[str],
host: str = 'localhost', host: str = 'localhost',
port: int = 5993 port: int = 5993,
diff_cached: bool = True,
loglevel: str = None,
) -> None: ) -> None:
"""Open a symbol stream from a running instance of marketstore and """Open a symbol stream from a running instance of marketstore and
log to console. log to console.
""" """
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server # send subs topics to server
await ws.send_message(msgpack.dumps({'streams': tbks})) resp = await ws.send_message(
msgpack.dumps({'streams': list(tbks.values())})
)
log.info(resp)
async def recv() -> Dict[str, Any]: async def recv() -> Dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8') return msgpack.loads((await ws.get_message()), encoding='utf-8')
@ -282,21 +258,67 @@ async def open_quote_stream(
streams = (await recv())['streams'] streams = (await recv())['streams']
log.info(f"Subscribed to {streams}") log.info(f"Subscribed to {streams}")
_cache = {}
while True: while True:
msg = await recv() msg = await recv()
log.info(f"Received quote:\n{msg}")
# unpack symbol and quote data
# key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
symbol = msg['key'].split('/')[0]
data = msg['data']
@cli.command() # calc time stamp(s)
@click.option( s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
'--url', ts = s * 10**9 + ns
default='ws://localhost:5993/ws', data['broker_fill_time_ns'] = ts
help='HTTP URL of marketstore instance'
) quote = {}
@click.argument('names', nargs=-1) for k, v in data.items():
@click.pass_obj if isnan(v):
def marketstore_stream(config: dict, names: List[str], url: str): continue
"""Connect to a marketstore time bucket stream for (a set of) symbols(s)
and print to console. quote[k.lower()] = v
"""
trio.run(open_quote_stream, names) quote['symbol'] = symbol
quotes = {}
if diff_cached:
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(f"New quote {quote['symbol']}:\n{new}")
# only ship diff updates and other required fields
payload = {k: quote[k] for k, v in new}
payload['symbol'] = symbol
# if there was volume likely the last size of
# shares traded is useful info and it's possible
# that the set difference from above will disregard
# a "size" value since the same # of shares were traded
size = quote.get('size')
volume = quote.get('volume')
if size and volume:
new_volume_since_last = max(
volume - last.get('volume', 0), 0)
log.warning(
f"NEW VOLUME {symbol}:{new_volume_since_last}")
payload['size'] = size
payload['last'] = quote.get('last')
# XXX: we append to a list for the options case where the
# subscription topic (key) is the same for all
# expiries even though this is uncessary for the
# stock case (different topic [i.e. symbol] for each
# quote).
quotes.setdefault(symbol, []).append(payload)
# update cache
_cache[symbol].update(quote)
else:
quotes = {symbol: [{key.lower(): val for key, val in quote.items()}]}
if quotes:
yield quotes