diff --git a/piker/data/__init__.py b/piker/data/__init__.py index dd2c6b7b..bb8fca8b 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -1,7 +1,7 @@ """ Data feed apis and infra. -We ship some tsdb integrations for retrieving +We provide tsdb integrations for retrieving and storing data from your brokers as well as sharing your feeds with other fellow pikers. """ @@ -91,6 +91,8 @@ async def open_feed( symbols: Sequence[str], loglevel: str = 'info', ) -> AsyncIterator[Dict[str, Any]]: + """Open a "data feed" which provides streamed real-time quotes. + """ try: mod = get_brokermod(name) except ImportError: diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 6f0d1bb9..1baafb1a 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -1,41 +1,38 @@ """ ``marketstore`` integration. -- TICK data ingest routines +- client management routines +- ticK data ingest routines - websocket client for subscribing to write triggers -- docker container management automation +- todo: tick sequence stream-cloning for testing +- todo: docker container management automation """ -from pprint import pformat -from typing import Dict, Any, List -from functools import partial +from contextlib import asynccontextmanager +from typing import Dict, Any, List, Callable, Tuple import time +from math import isnan import msgpack import numpy as np import pandas as pd import pymarketstore as pymkts -import click -import trio -import tractor from trio_websocket import open_websocket_url -from . import maybe_spawn_brokerd -from ..cli import cli -from .. import watchlists as wl -from ..brokers.data import DataFeed -from ..log import get_logger +from ..log import get_logger, get_console_log +from ..data import open_feed log = get_logger(__name__) -_tick_tbk_ids = ('1Sec', 'TICK') -_tick_tbk = '{}/' + '/'.join(_tick_tbk_ids) - - +_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK') +_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) +_url: str = 'http://localhost:5993/rpc' _quote_dt = [ + # these two are required for as a "primary key" ('Epoch', 'i8'), ('Nanoseconds', 'i4'), - ('Tick', 'i4'), + + ('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask) # ('fill_time', 'f4'), ('Last', 'f4'), ('Bid', 'f4'), @@ -44,11 +41,10 @@ _quote_dt = [ ('Ask', 'f4'), ('Size', 'i8'), ('Volume', 'i8'), + # ('Broker_time_ns', 'i64'), # ('VWAP', 'f4') ] _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) - - _tick_map = { 'Up': 1, 'Equal': 0, @@ -84,7 +80,7 @@ def quote_to_marketstore_structarray( else: # this should get inserted upstream by the broker-client to # subtract from IPC latency - now = timestamp(pd.Timestamp.now()) + now = time.time_ns() secs, ns = now / 10**9, now % 10**9 @@ -109,172 +105,152 @@ def quote_to_marketstore_structarray( def timestamp(datestr: str) -> int: - """Return marketstore compatible 'Epoch' integer in nanoseconds. + """Return marketstore compatible 'Epoch' integer in nanoseconds + from a date formatted str. """ return int(pd.Timestamp(datestr).value) -@cli.command() -@click.option('--test-file', '-t', help='Test quote stream file') -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option( - '--url', - default='http://localhost:5993/rpc', - help='HTTP URL of marketstore instance' -) -@click.argument('name', nargs=1, required=True) -@click.pass_obj -def ingest(config, name, test_file, tl, url): - """Ingest real-time broker quotes and ticks to a marketstore instance. +def mk_tbk(keys: Tuple[str, str, str]) -> str: + """Generate a marketstore table key from a tuple. + + Converts, + ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"``` """ - # global opts - brokermod = config['brokermod'] - loglevel = config['loglevel'] - log = config['log'] + return '{}/' + '/'.join(keys) - watchlist_from_file = wl.ensure_watchlists(config['wl_path']) - watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) - symbols = watchlists[name] - async def main(tries): - async with maybe_spawn_brokerd( - brokername=brokermod.name, - tries=tries, - loglevel=loglevel - ) as portal: - # connect to broker data feed - feed = DataFeed(portal, brokermod) - qstream, quotes = await feed.open_stream( - symbols, - 'stock', - rate=3, - test=test_file, - ) +class Client: + """Async wrapper around the alpaca ``pymarketstore`` sync client. - first_quotes, _ = feed.format_quotes(quotes) + This will server as the shell for building out a proper async client + that isn't horribly documented and un-tested.. + """ + def __init__(self, url: str): + self._client = pymkts.Client(url) - if first_quotes[0].get('last') is None: - log.error("Broker API is down temporarily") - return + async def _invoke( + self, + meth: Callable, + *args, + **kwargs, + ) -> Any: + return err_on_resp(meth(*args, **kwargs)) - quote_cache = {quote['symbol']: quote for quote in first_quotes} + async def destroy( + self, + tbk: Tuple[str, str, str], + ) -> None: + return await self._invoke(self._client.destroy, mk_tbk(tbk)) - client = pymkts.Client(endpoint=url) + async def list_symbols( + self, + tbk: str, + ) -> List[str]: + return await self._invoke(self._client.list_symbols, mk_tbk(tbk)) + + async def write( + self, + symbol: str, + array: np.ndarray, + ) -> None: + start = time.time() + await self._invoke( + self._client.write, + array, + _tick_tbk.format(symbol), + isvariablelength=True + ) + log.debug(f"{symbol} write time (s): {time.time() - start}") + + def query( + self, + symbol, + tbk: Tuple[str, str] = _tick_tbk_ids, + ) -> pd.DataFrame: + # XXX: causes crash + # client.query(pymkts.Params(symbol, '*', 'OHCLV' + result = self._client.query( + pymkts.Params(symbol, *tbk), + ) + return result.first().df() + + +@asynccontextmanager +async def get_client( + url: str = _url, +) -> Client: + yield Client(url) + + +async def ingest_quote_stream( + symbols: List[str], + brokername: str, + tries: int = 1, + loglevel: str = None, +) -> None: + """Ingest a broker quote stream into marketstore in (sampled) tick format. + """ + async with open_feed( + brokername, + symbols, + loglevel=loglevel, + ) as (first_quotes, qstream): + + quote_cache = first_quotes.copy() + + async with get_client() as ms_client: # start ingest to marketstore async for quotes in qstream: + log.info(quotes) for symbol, quote in quotes.items(): - fmt_quote, _ = brokermod.format_stock_quote( - quote, - feed._symbol_data_cache - ) # remap tick strs to ints - fmt_quote['tick'] = _tick_map[ - fmt_quote.get('tick', 'Equal') - ] + quote['tick'] = _tick_map[quote.get('tick', 'Equal')] # check for volume update (i.e. did trades happen # since last quote) - new_vol = fmt_quote.get('volume', None) + new_vol = quote.get('volume', None) if new_vol is None: log.debug(f"No fills for {symbol}") if new_vol == quote_cache.get('volume'): + # should never happen due to field diffing + # on sender side log.error( f"{symbol}: got same volume as last quote?") - quote_cache.update(fmt_quote) + quote_cache.update(quote) a = quote_to_marketstore_structarray( - fmt_quote, + quote, # TODO: check this closer to the broker query api - last_fill=fmt_quote.get('last_fill', '') + last_fill=quote.get('fill_time', '') ) - start = time.time() - err_on_resp(client.write( - a, _tick_tbk.format(symbol), isvariablelength=True) - ) - log.trace( - f"{symbol} write time (s): {time.time() - start}") - - tractor.run( - partial(main, tries=1), - name='ingest_marketstore', - loglevel=loglevel if tl else None, - # start_method='forkserver', - ) + await ms_client.write(symbol, a) -@cli.command() -@click.option( - '--tl', - is_flag=True, - help='Enable tractor logging') -@click.option( - '--url', - default='http://localhost:5993/rpc', - help='HTTP URL of marketstore instance' -) -@click.argument('name', nargs=1, required=True) -@click.pass_obj -def ms_shell(config, name, tl, url): - """Start an IPython shell ready to query the local marketstore db. - """ - client = pymkts.Client(url) - - def query(name, tbk=_tick_tbk_ids): - return client.query( - pymkts.Params(name, *tbk)).first().df() - - # causes crash - # client.query(pymkts.Params(symbol, '*', 'OHCLV' - - from IPython import embed - embed() - - -@cli.command() -@click.option( - '--url', - default='http://localhost:5993/rpc', - help='HTTP URL of marketstore instance' -) -@click.argument('names', nargs=-1) -@click.pass_obj -def marketstore_destroy(config: dict, names: List[str], url: str) -> None: - """Destroy symbol entries in the local marketstore instance. - """ - client = pymkts.Client(url) - if not names: - names = client.list_symbols() - - # default is to wipe db entirely. - answer = input( - "This will entirely wipe you local marketstore db @ " - f"{url} of the following symbols:\n {pformat(names)}" - "\n\nDelete [N/y]?\n") - - if answer == 'y': - for sym in names: - tbk = _tick_tbk.format(sym) - print(f"Destroying {tbk}..") - err_on_resp(client.destroy(_tick_tbk.format(sym))) - else: - print("Nothing deleted.") - - -async def open_quote_stream( - tbks: List[str], +async def stream_quotes( + symbols: List[str], host: str = 'localhost', - port: int = 5993 + port: int = 5993, + diff_cached: bool = True, + loglevel: str = None, ) -> None: """Open a symbol stream from a running instance of marketstore and log to console. """ + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} + async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: # send subs topics to server - await ws.send_message(msgpack.dumps({'streams': tbks})) + resp = await ws.send_message( + msgpack.dumps({'streams': list(tbks.values())}) + ) + log.info(resp) async def recv() -> Dict[str, Any]: return msgpack.loads((await ws.get_message()), encoding='utf-8') @@ -282,21 +258,67 @@ async def open_quote_stream( streams = (await recv())['streams'] log.info(f"Subscribed to {streams}") + _cache = {} + while True: msg = await recv() - log.info(f"Received quote:\n{msg}") + # unpack symbol and quote data + # key is in format ``//`` + symbol = msg['key'].split('/')[0] + data = msg['data'] -@cli.command() -@click.option( - '--url', - default='ws://localhost:5993/ws', - help='HTTP URL of marketstore instance' -) -@click.argument('names', nargs=-1) -@click.pass_obj -def marketstore_stream(config: dict, names: List[str], url: str): - """Connect to a marketstore time bucket stream for (a set of) symbols(s) - and print to console. - """ - trio.run(open_quote_stream, names) + # calc time stamp(s) + s, ns = data.pop('Epoch'), data.pop('Nanoseconds') + ts = s * 10**9 + ns + data['broker_fill_time_ns'] = ts + + quote = {} + for k, v in data.items(): + if isnan(v): + continue + + quote[k.lower()] = v + + quote['symbol'] = symbol + + quotes = {} + + if diff_cached: + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info(f"New quote {quote['symbol']}:\n{new}") + + # only ship diff updates and other required fields + payload = {k: quote[k] for k, v in new} + payload['symbol'] = symbol + + # if there was volume likely the last size of + # shares traded is useful info and it's possible + # that the set difference from above will disregard + # a "size" value since the same # of shares were traded + size = quote.get('size') + volume = quote.get('volume') + if size and volume: + new_volume_since_last = max( + volume - last.get('volume', 0), 0) + log.warning( + f"NEW VOLUME {symbol}:{new_volume_since_last}") + payload['size'] = size + payload['last'] = quote.get('last') + + # XXX: we append to a list for the options case where the + # subscription topic (key) is the same for all + # expiries even though this is uncessary for the + # stock case (different topic [i.e. symbol] for each + # quote). + quotes.setdefault(symbol, []).append(payload) + + # update cache + _cache[symbol].update(quote) + else: + quotes = {symbol: [{key.lower(): val for key, val in quote.items()}]} + + if quotes: + yield quotes