From d367d68fc2514d8dcd3e995d5dc835807c8b095b Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Mon, 17 Jan 2022 17:47:20 -0300 Subject: [PATCH] Still WIP, switch to using new marketstore client, missing streaming from marketstore --- piker/data/cli.py | 101 ++++++------ piker/data/marketstore.py | 315 ++++++++++++++++---------------------- 2 files changed, 183 insertions(+), 233 deletions(-) diff --git a/piker/data/cli.py b/piker/data/cli.py index 2206bd6a..21416a80 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -25,11 +25,13 @@ import trio import tractor import click +from anyio_marketstore import open_marketstore_client + from .marketstore import ( get_client, - stream_quotes, + # stream_quotes, ingest_quote_stream, - _url, + # _url, _tick_tbk_ids, mk_tbk, ) @@ -54,46 +56,47 @@ def ms_stream(config: dict, names: List[str], url: str): and print to console. """ async def main(): - async for quote in stream_quotes(symbols=names): - log.info(f"Received quote:\n{quote}") + # async for quote in stream_quotes(symbols=names): + # log.info(f"Received quote:\n{quote}") + ... trio.run(main) -@cli.command() -@click.option( - '--url', - default=_url, - help='HTTP URL of marketstore instance' -) -@click.argument('names', nargs=-1) -@click.pass_obj -def ms_destroy(config: dict, names: List[str], url: str) -> None: - """Destroy symbol entries in the local marketstore instance. - """ - async def main(): - nonlocal names - async with get_client(url) as client: - - if not names: - names = await client.list_symbols() - - # default is to wipe db entirely. - answer = input( - "This will entirely wipe you local marketstore db @ " - f"{url} of the following symbols:\n {pformat(names)}" - "\n\nDelete [N/y]?\n") - - if answer == 'y': - for sym in names: - # tbk = _tick_tbk.format(sym) - tbk = tuple(sym, *_tick_tbk_ids) - print(f"Destroying {tbk}..") - await client.destroy(mk_tbk(tbk)) - else: - print("Nothing deleted.") - - tractor.run(main) +# @cli.command() +# @click.option( +# '--url', +# default=_url, +# help='HTTP URL of marketstore instance' +# ) +# @click.argument('names', nargs=-1) +# @click.pass_obj +# def ms_destroy(config: dict, names: List[str], url: str) -> None: +# """Destroy symbol entries in the local marketstore instance. +# """ +# async def main(): +# nonlocal names +# async with get_client(url) as client: +# +# if not names: +# names = await client.list_symbols() +# +# # default is to wipe db entirely. +# answer = input( +# "This will entirely wipe you local marketstore db @ " +# f"{url} of the following symbols:\n {pformat(names)}" +# "\n\nDelete [N/y]?\n") +# +# if answer == 'y': +# for sym in names: +# # tbk = _tick_tbk.format(sym) +# tbk = tuple(sym, *_tick_tbk_ids) +# print(f"Destroying {tbk}..") +# await client.destroy(mk_tbk(tbk)) +# else: +# print("Nothing deleted.") +# +# tractor.run(main) @cli.command() @@ -102,17 +105,19 @@ def ms_destroy(config: dict, names: List[str], url: str) -> None: is_flag=True, help='Enable tractor logging') @click.option( - '--url', - default=_url, - help='HTTP URL of marketstore instance' + '--host', + default='localhost' +) +@click.option( + '--port', + default=5995 ) -@click.argument('name', nargs=1, required=True) @click.pass_obj -def ms_shell(config, name, tl, url): +def ms_shell(config, tl, host, port): """Start an IPython shell ready to query the local marketstore db. """ async def main(): - async with get_client(url) as client: + async with open_marketstore_client(host, port) as client: query = client.query # noqa # TODO: write magics to query marketstore from IPython import embed @@ -124,15 +129,9 @@ def ms_shell(config, name, tl, url): @cli.command() @click.option('--test-file', '-t', help='Test quote stream file') @click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option( - '--url', - default=_url, - help='HTTP URL of marketstore instance' -) @click.argument('name', nargs=1, required=True) @click.pass_obj -def ingest(config, name, test_file, tl, url): +def ingest(config, name, test_file, tl): """Ingest real-time broker quotes and ticks to a marketstore instance. """ # global opts diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 1d271452..3f75c994 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -14,7 +14,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' ``marketstore`` integration. - client management routines @@ -22,7 +22,7 @@ - websocket client for subscribing to write triggers - todo: tick sequence stream-cloning for testing - todo: docker container management automation -""" +''' from contextlib import asynccontextmanager from typing import Dict, Any, List, Callable, Tuple, Optional import time @@ -31,9 +31,9 @@ from math import isnan import msgpack import numpy as np import pandas as pd -import pymarketstore as pymkts import tractor from trio_websocket import open_websocket_url +from anyio_marketstore import open_marketstore_client, MarketstoreClient from ..log import get_logger, get_console_log from ..data import open_feed @@ -43,7 +43,7 @@ log = get_logger(__name__) _tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK') _tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) -_url: str = 'http://localhost:5993/rpc' + _quote_dt = [ # these two are required for as a "primary key" ('Epoch', 'i8'), @@ -51,34 +51,27 @@ _quote_dt = [ ('IsTrade', 'i1'), ('IsBid', 'i1'), - ('Price', 'f8'), - ('Size', 'f8') + ('Price', 'f4'), + ('Size', 'f4') ] -_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) -class MarketStoreError(Exception): - "Generic marketstore client error" - - -def err_on_resp(response: dict) -> None: - """Raise any errors found in responses from client request. +def mk_tbk(keys: Tuple[str, str, str]) -> str: + """Generate a marketstore table key from a tuple. + Converts, + ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"``` """ - responses = response['responses'] - if responses is not None: - for r in responses: - err = r['error'] - if err: - raise MarketStoreError(err) + return '{}/' + '/'.join(keys) def quote_to_marketstore_structarray( quote: Dict[str, Any], - last_fill: Optional[float], - + last_fill: Optional[float] ) -> np.array: - """Return marketstore writeable structarray from quote ``dict``. - """ + ''' + Return marketstore writeable structarray from quote ``dict``. + ''' + if last_fill: # new fill bby now = timestamp(last_fill, unit='s') @@ -112,82 +105,21 @@ def quote_to_marketstore_structarray( def timestamp(date, **kwargs) -> int: - """Return marketstore compatible 'Epoch' integer in nanoseconds + ''' + Return marketstore compatible 'Epoch' integer in nanoseconds from a date formatted str. - """ + ''' + return int(pd.Timestamp(date, **kwargs).value) -def mk_tbk(keys: Tuple[str, str, str]) -> str: - """Generate a marketstore table key from a tuple. - - Converts, - ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"``` - """ - return '{}/' + '/'.join(keys) - - -class Client: - """Async wrapper around the alpaca ``pymarketstore`` sync client. - - This will server as the shell for building out a proper async client - that isn't horribly documented and un-tested.. - """ - def __init__(self, url: str): - self._client = pymkts.Client(url) - - async def _invoke( - self, - meth: Callable, - *args, - **kwargs, - ) -> Any: - return err_on_resp(meth(*args, **kwargs)) - - async def destroy( - self, - tbk: Tuple[str, str, str], - ) -> None: - return await self._invoke(self._client.destroy, mk_tbk(tbk)) - - async def list_symbols( - self, - tbk: str, - ) -> List[str]: - return await self._invoke(self._client.list_symbols, mk_tbk(tbk)) - - async def write( - self, - symbol: str, - array: np.ndarray, - ) -> None: - start = time.time() - await self._invoke( - self._client.write, - array, - _tick_tbk.format(symbol), - isvariablelength=True - ) - log.debug(f"{symbol} write time (s): {time.time() - start}") - - def query( - self, - symbol, - tbk: Tuple[str, str] = _tick_tbk_ids, - ) -> pd.DataFrame: - # XXX: causes crash - # client.query(pymkts.Params(symbol, '*', 'OHCLV' - result = self._client.query( - pymkts.Params(symbol, *tbk), - ) - return result.first().df() - - @asynccontextmanager async def get_client( - url: str = _url, -) -> Client: - yield Client(url) + host: str = 'localhost', + port: int = 5995 +) -> MarketstoreClient: + async with open_marketstore_client(host, port) as client: + yield client async def ingest_quote_stream( @@ -196,8 +128,9 @@ async def ingest_quote_stream( tries: int = 1, actorloglevel: str = None, ) -> None: - """Ingest a broker quote stream into marketstore in (sampled) tick format. - """ + ''' + Ingest a broker quote stream into marketstore. + ''' async with ( open_feed(brokername, symbols, loglevel=actorloglevel) as feed, get_client() as ms_client @@ -212,107 +145,125 @@ async def ingest_quote_stream( # okkk.. continue - a = quote_to_marketstore_structarray({ + array = quote_to_marketstore_structarray({ 'IsTrade': 1 if ticktype == 'trade' else 0, 'IsBid': 1 if ticktype in ('bid', 'bsize') else 0, 'Price': tick.get('price'), 'Size': tick.get('size') }, last_fill=quote.get('broker_ts', None)) - log.info(a) - await ms_client.write(symbol, a) - + await ms_client.write( + array, _tick_tbk) + async def stream_quotes( symbols: List[str], + timeframe: str = '1Min', + attr_group: str = 'TICK', host: str = 'localhost', port: int = 5993, - diff_cached: bool = True, - loglevel: str = None, + loglevel: str = None ) -> None: - """Open a symbol stream from a running instance of marketstore and + ''' + Open a symbol stream from a running instance of marketstore and log to console. - """ - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) + ''' - tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} + tbks: Dict[str, str] = { + sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols} - async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: - # send subs topics to server - resp = await ws.send_message( - msgpack.dumps({'streams': list(tbks.values())}) - ) - log.info(resp) - async def recv() -> Dict[str, Any]: - return msgpack.loads((await ws.get_message()), encoding='utf-8') - streams = (await recv())['streams'] - log.info(f"Subscribed to {streams}") - - _cache = {} - - while True: - msg = await recv() - - # unpack symbol and quote data - # key is in format ``//`` - symbol = msg['key'].split('/')[0] - data = msg['data'] - - # calc time stamp(s) - s, ns = data.pop('Epoch'), data.pop('Nanoseconds') - ts = s * 10**9 + ns - data['broker_fill_time_ns'] = ts - - quote = {} - for k, v in data.items(): - if isnan(v): - continue - - quote[k.lower()] = v - - quote['symbol'] = symbol - - quotes = {} - - if diff_cached: - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info(f"New quote {quote['symbol']}:\n{new}") - - # only ship diff updates and other required fields - payload = {k: quote[k] for k, v in new} - payload['symbol'] = symbol - - # if there was volume likely the last size of - # shares traded is useful info and it's possible - # that the set difference from above will disregard - # a "size" value since the same # of shares were traded - size = quote.get('size') - volume = quote.get('volume') - if size and volume: - new_volume_since_last = max( - volume - last.get('volume', 0), 0) - log.warning( - f"NEW VOLUME {symbol}:{new_volume_since_last}") - payload['size'] = size - payload['last'] = quote.get('last') - - # XXX: we append to a list for the options case where the - # subscription topic (key) is the same for all - # expiries even though this is uncessary for the - # stock case (different topic [i.e. symbol] for each - # quote). - quotes.setdefault(symbol, []).append(payload) - - # update cache - _cache[symbol].update(quote) - else: - quotes = { - symbol: [{key.lower(): val for key, val in quote.items()}]} - - if quotes: - yield quotes +# async def stream_quotes( +# symbols: List[str], +# host: str = 'localhost', +# port: int = 5993, +# diff_cached: bool = True, +# loglevel: str = None, +# ) -> None: +# """Open a symbol stream from a running instance of marketstore and +# log to console. +# """ +# # XXX: required to propagate ``tractor`` loglevel to piker logging +# get_console_log(loglevel or tractor.current_actor().loglevel) +# +# tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} +# +# async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: +# # send subs topics to server +# resp = await ws.send_message( +# msgpack.dumps({'streams': list(tbks.values())}) +# ) +# log.info(resp) +# +# async def recv() -> Dict[str, Any]: +# return msgpack.loads((await ws.get_message()), encoding='utf-8') +# +# streams = (await recv())['streams'] +# log.info(f"Subscribed to {streams}") +# +# _cache = {} +# +# while True: +# msg = await recv() +# +# # unpack symbol and quote data +# # key is in format ``//`` +# symbol = msg['key'].split('/')[0] +# data = msg['data'] +# +# # calc time stamp(s) +# s, ns = data.pop('Epoch'), data.pop('Nanoseconds') +# ts = s * 10**9 + ns +# data['broker_fill_time_ns'] = ts +# +# quote = {} +# for k, v in data.items(): +# if isnan(v): +# continue +# +# quote[k.lower()] = v +# +# quote['symbol'] = symbol +# +# quotes = {} +# +# if diff_cached: +# last = _cache.setdefault(symbol, {}) +# new = set(quote.items()) - set(last.items()) +# if new: +# log.info(f"New quote {quote['symbol']}:\n{new}") +# +# # only ship diff updates and other required fields +# payload = {k: quote[k] for k, v in new} +# payload['symbol'] = symbol +# +# # if there was volume likely the last size of +# # shares traded is useful info and it's possible +# # that the set difference from above will disregard +# # a "size" value since the same # of shares were traded +# size = quote.get('size') +# volume = quote.get('volume') +# if size and volume: +# new_volume_since_last = max( +# volume - last.get('volume', 0), 0) +# log.warning( +# f"NEW VOLUME {symbol}:{new_volume_since_last}") +# payload['size'] = size +# payload['last'] = quote.get('last') +# +# # XXX: we append to a list for the options case where the +# # subscription topic (key) is the same for all +# # expiries even though this is uncessary for the +# # stock case (different topic [i.e. symbol] for each +# # quote). +# quotes.setdefault(symbol, []).append(payload) +# +# # update cache +# _cache[symbol].update(quote) +# else: +# quotes = { +# symbol: [{key.lower(): val for key, val in quote.items()}]} +# +# if quotes: +# yield quotes