diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index a2281f71..c6b5cd4e 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -1,15 +1,23 @@ """ ``marketstore`` integration. + +- TICK data ingest routines +- websocket client for subscribing to write triggers +- docker container management automation """ from pprint import pformat -from typing import Dict, Any +from typing import Dict, Any, List from functools import partial +import time +import msgpack import numpy as np import pandas as pd import pymarketstore as pymkts import click +import trio import tractor +from trio_websocket import open_websocket_url from ..cli import cli from .. import watchlists as wl @@ -38,12 +46,14 @@ _quote_dt = [ ('Volume', 'i8'), # ('VWAP', 'f4') ] +_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) _tick_map = { 'Up': 1, 'Equal': 0, - 'Down': -1 + 'Down': -1, + None: np.nan, } @@ -64,37 +74,36 @@ def err_on_resp(response: dict) -> None: def quote_to_marketstore_structarray( quote: Dict[str, Any], - last_fills: Dict[str, str], + last_fill: str, ) -> np.array: - """Return marketstore writeable recarray from quote ``dict``. + """Return marketstore writeable structarray from quote ``dict``. """ + if last_fill: + # new fill bby + now = timestamp(last_fill) + else: + # this should get inserted upstream by the broker-client to + # subtract from IPC latency + now = timestamp(pd.Timestamp.now()) + + secs, ns = now / 10**9, now % 10**9 + # pack into List[Tuple[str, Any]] array_input = [] - # this should get inserted by the broker-client to subtract from - # IPC latency - now = timestamp(pd.Timestamp.now()) - - sym = quote['symbol'] - last_fill_time = quote['fill_time'] - if last_fills.get(sym) != last_fill_time: - # new fill - now = timestamp(last_fill_time) - last_fills[sym] = last_fill_time - - secs, ns = now / 10**9, now % 10**9 # insert 'Epoch' entry first array_input.append(int(secs)) - # insert 'Nanoseconds' field array_input.append(int(ns)) - # tick mapping to int - array_input.append(_tick_map[quote['tick']]) - # append remaining fields - for name, dt in _quote_dt[3:]: - array_input.append(quote[name.casefold()]) + for name, dt in _quote_dt[2:]: + if 'f' in dt: + none = np.nan + else: + none = 0 + val = quote.get(name.casefold(), none) + array_input.append(val) return np.array([tuple(array_input)], dtype=_quote_dt) @@ -148,26 +157,43 @@ def ingest(config, name, test_file, tl, url): log.error("Broker API is down temporarily") return - client = pymkts.Client(endpoint=url) + quote_cache = {quote['symbol']: quote for quote in first_quotes} - # keep track of last executed fill for each symbol - last_fills = {} + client = pymkts.Client(endpoint=url) # start ingest to marketstore async for quotes in qstream: for symbol, quote in quotes.items(): - breakpoint() fmt_quote, _ = brokermod.format_stock_quote( quote, feed._symbol_data_cache ) - a = quote_to_marketstore_structarray(fmt_quote, last_fills) - # start = time.time() - # err_on_resp(client.write( - # a, _tick_tbk.format(symbol), isvariablelength=True) - # ) - # log.trace( - # f"{symbol} write time (s): {time.time() - start}") + + # remap tick strs to ints + fmt_quote['tick'] = _tick_map[ + fmt_quote.get('tick', 'Equal') + ] + + # check for volume update (i.e. did trades happen + # since last quote) + new_vol = fmt_quote.get('volume', None) + if new_vol is None: + log.info(f"NO trades for {symbol}") + if new_vol == quote_cache.get('volume'): + log.error( + f"{symbol}: got same volume as last quote?") + + a = quote_to_marketstore_structarray( + fmt_quote, + # TODO: check this closer to the broker query api + last_fill=fmt_quote.get('last_fill', '') + ) + start = time.time() + err_on_resp(client.write( + a, _tick_tbk.format(symbol), isvariablelength=True) + ) + log.trace( + f"{symbol} write time (s): {time.time() - start}") tractor.run( partial(main, tries=1), @@ -213,7 +239,7 @@ def ms_shell(config, name, tl, url): ) @click.argument('names', nargs=-1) @click.pass_obj -def marketstore_destroy(config, names, url): +def marketstore_destroy(config: dict, names: List[str], url: str) -> None: """Destroy symbol entries in the local marketstore instance. """ client = pymkts.Client(url) @@ -235,6 +261,29 @@ def marketstore_destroy(config, names, url): print("Nothing deleted.") +async def open_quote_stream( + tbks: List[str], + host: str = 'localhost', + port: int = 5993 +) -> None: + """Open a symbol stream from a running instance of marketstore and + log to console. + """ + async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: + # send subs topics to server + await ws.send_message(msgpack.dumps({'streams': tbks})) + + async def recv() -> Dict[str, Any]: + return msgpack.loads((await ws.get_message()), encoding='utf-8') + + streams = (await recv())['streams'] + log.info(f"Subscribed to {streams}") + + while True: + msg = await recv() + log.info(f"Received quote:\n{msg}") + + @cli.command() @click.option( '--url', @@ -243,14 +292,8 @@ def marketstore_destroy(config, names, url): ) @click.argument('names', nargs=-1) @click.pass_obj -def marketstore_stream(config, names, url): - """Destroy symbol entries in the local marketstore instance. +def marketstore_stream(config: dict, names: List[str], url: str): + """Connect to a marketstore time bucket stream for (a set of) symbols(s) + and print to console. """ - symbol = 'APHA' - conn = pymkts.StreamConn('ws://localhost:5993/ws') - - @conn.on(r'^{}/'.format(symbol)) - def on_tsla(conn, msg): - print(f'received {symbol}', msg['data']) - - conn.run(['APHA/*/*']) # runs until exception + trio.run(open_quote_stream, names)