From 804771410103143b21731a0fd5c7d85323afbfe7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Feb 2022 07:35:56 -0500 Subject: [PATCH] Add back in OHLCV dtype template and client side ws streamer --- piker/data/marketstore.py | 308 +++++++++++++++++++++++--------------- 1 file changed, 190 insertions(+), 118 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index c9fc5d16..4298eb96 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -26,12 +26,12 @@ from contextlib import asynccontextmanager from typing import Any, Optional import time -# from math import isnan +from math import isnan -# import msgpack +import msgpack import numpy as np import pandas as pd -# import tractor +import tractor from trio_websocket import open_websocket_url from anyio_marketstore import open_marketstore_client, MarketstoreClient @@ -44,15 +44,29 @@ log = get_logger(__name__) _tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') _tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) +_tick_dt = [ + # these two are required for as a "primary key" + ('Epoch', 'i8'), + ('Nanoseconds', 'i4'), + ('IsTrade', 'i1'), + ('IsBid', 'i1'), + ('Price', 'f4'), + ('Size', 'f4') +] + _quote_dt = [ # these two are required for as a "primary key" ('Epoch', 'i8'), ('Nanoseconds', 'i4'), - ('IsTrade', 'i1'), - ('IsBid', 'i1'), - ('Price', 'f4'), - ('Size', 'f4') + ('Tick', 'i4'), # do we need this? + ('Last', 'f4'), + ('Bid', 'f4'), + ('Bsize', 'f4'), + ('Asize', 'f4'), + ('Ask', 'f4'), + ('Size', 'i8'), + ('Volume', 'f4'), ] @@ -69,11 +83,12 @@ def mk_tbk(keys: tuple[str, str, str]) -> str: def quote_to_marketstore_structarray( quote: dict[str, Any], last_fill: Optional[float] + ) -> np.array: ''' Return marketstore writeable structarray from quote ``dict``. - ''' + ''' if last_fill: # new fill bby now = timestamp(last_fill, unit='s') @@ -82,7 +97,7 @@ def quote_to_marketstore_structarray( # this should get inserted upstream by the broker-client to # subtract from IPC latency now = time.time_ns() - + secs, ns = now / 10**9, now % 10**9 # pack into list[tuple[str, Any]] @@ -100,7 +115,8 @@ def quote_to_marketstore_structarray( # for ``np.int`` we use 0 as a null value none = 0 - val = quote.get(name, none) + # casefold? see https://github.com/alpacahq/marketstore/issues/324 + val = quote.get(name.casefold(), none) array_input.append(val) return np.array([tuple(array_input)], dtype=_quote_dt) @@ -119,6 +135,7 @@ def timestamp(date, **kwargs) -> int: async def get_client( host: str = 'localhost', port: int = 5995 + ) -> MarketstoreClient: async with open_marketstore_client(host, port) as client: yield client @@ -145,129 +162,184 @@ async def ingest_quote_stream( for tick in quote.get('ticks', ()): ticktype = tick.get('type', 'n/a') - if ticktype == 'n/a': - # okkk.. - continue + # _quote_dt = [ + # # these two are required for as a "primary key" + # ('Epoch', 'i8'), + # ('Nanoseconds', 'i4'), + # ('Tick', 'i4'), + # + # ('Last', 'f4'), + # ('Bid', 'f4'), + # ('Bsize', 'f4'), + # ('Asize', 'f4'), + # ('Ask', 'f4'), + # ('Size', 'i8'), + # ('Volume', 'f4'), + # ] - array = quote_to_marketstore_structarray({ - 'IsTrade': 1 if ticktype == 'trade' else 0, - 'IsBid': 1 if ticktype in ('bid', 'bsize') else 0, - 'Price': tick.get('price'), - 'Size': tick.get('size') - }, last_fill=quote.get('broker_ts', None)) + # techtonic tick write + array = quote_to_marketstore_structarray({ + 'IsTrade': 1 if ticktype == 'trade' else 0, + 'IsBid': 1 if ticktype in ('bid', 'bsize') else 0, + 'Price': tick.get('price'), + 'Size': tick.get('size') + }, last_fill=quote.get('broker_ts', None)) - await ms_client.write(array, _tick_tbk) + await ms_client.write(array, _tick_tbk) + + quote_cache = { + 'size': 0, + 'tick': 0 + } + # start ingest to marketstore + async for quotes in feed.stream: + log.info(quotes) + for symbol, quote in quotes.items(): + + for tick in quote.get('ticks', ()): + ticktype = tick.get('type') + price = tick.get('price') + size = tick.get('size') + + if ticktype == 'n/a' or price == -1: + # okkk.. + continue + + # clearing price event + if ticktype == 'trade': + quote_cache['volume'] = quote['volume'] + quote_cache['last'] = price + # quote_cache['broker_ts'] = quote['broker_ts'] + + # l1 book events + elif ticktype in ('ask', 'asize'): + quote_cache['ask'] = price + quote_cache['asize'] = size + + elif ticktype in ('bid', 'bsize'): + quote_cache['bid'] = price + quote_cache['bsize'] = size + + a = quote_to_marketstore_structarray( + quote_cache, + last_fill=quote.get('broker_ts', None) + ) + log.info(a) + # breakpoint() + await ms_client.write(symbol, a) + + +# async def stream_quotes( +# symbols: list[str], +# timeframe: str = '1Min', +# attr_group: str = 'TICK', +# host: str = 'localhost', +# port: int = 5993, +# loglevel: str = None + +# ) -> None: +# ''' +# Open a symbol stream from a running instance of marketstore and +# log to console. + +# ''' +# tbks: dict[str, str] = { +# sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols} async def stream_quotes( symbols: list[str], - timeframe: str = '1Min', - attr_group: str = 'TICK', host: str = 'localhost', port: int = 5993, - loglevel: str = None + diff_cached: bool = True, + loglevel: str = None, ) -> None: ''' Open a symbol stream from a running instance of marketstore and log to console. + ''' + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) - tbks: dict[str, str] = { - sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols} + tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} + async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: + # send subs topics to server + resp = await ws.send_message( + msgpack.dumps({'streams': list(tbks.values())}) + ) + log.info(resp) + async def recv() -> dict[str, Any]: + return msgpack.loads((await ws.get_message()), encoding='utf-8') -# async def stream_quotes( -# symbols: list[str], -# host: str = 'localhost', -# port: int = 5993, -# diff_cached: bool = True, -# loglevel: str = None, -# ) -> None: -# """Open a symbol stream from a running instance of marketstore and -# log to console. -# """ -# # XXX: required to propagate ``tractor`` loglevel to piker logging -# get_console_log(loglevel or tractor.current_actor().loglevel) -# -# tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} -# -# async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: -# # send subs topics to server -# resp = await ws.send_message( -# msgpack.dumps({'streams': list(tbks.values())}) -# ) -# log.info(resp) -# -# async def recv() -> dict[str, Any]: -# return msgpack.loads((await ws.get_message()), encoding='utf-8') -# -# streams = (await recv())['streams'] -# log.info(f"Subscribed to {streams}") -# -# _cache = {} -# -# while True: -# msg = await recv() -# -# # unpack symbol and quote data -# # key is in format ``//`` -# symbol = msg['key'].split('/')[0] -# data = msg['data'] -# -# # calc time stamp(s) -# s, ns = data.pop('Epoch'), data.pop('Nanoseconds') -# ts = s * 10**9 + ns -# data['broker_fill_time_ns'] = ts -# -# quote = {} -# for k, v in data.items(): -# if isnan(v): -# continue -# -# quote[k.lower()] = v -# -# quote['symbol'] = symbol -# -# quotes = {} -# -# if diff_cached: -# last = _cache.setdefault(symbol, {}) -# new = set(quote.items()) - set(last.items()) -# if new: -# log.info(f"New quote {quote['symbol']}:\n{new}") -# -# # only ship diff updates and other required fields -# payload = {k: quote[k] for k, v in new} -# payload['symbol'] = symbol -# -# # if there was volume likely the last size of -# # shares traded is useful info and it's possible -# # that the set difference from above will disregard -# # a "size" value since the same # of shares were traded -# size = quote.get('size') -# volume = quote.get('volume') -# if size and volume: -# new_volume_since_last = max( -# volume - last.get('volume', 0), 0) -# log.warning( -# f"NEW VOLUME {symbol}:{new_volume_since_last}") -# payload['size'] = size -# payload['last'] = quote.get('last') -# -# # XXX: we append to a list for the options case where the -# # subscription topic (key) is the same for all -# # expiries even though this is uncessary for the -# # stock case (different topic [i.e. symbol] for each -# # quote). -# quotes.setdefault(symbol, []).append(payload) -# -# # update cache -# _cache[symbol].update(quote) -# else: -# quotes = { -# symbol: [{key.lower(): val for key, val in quote.items()}]} -# -# if quotes: -# yield quotes + streams = (await recv())['streams'] + log.info(f"Subscribed to {streams}") + + _cache = {} + + while True: + msg = await recv() + + # unpack symbol and quote data + # key is in format ``//`` + symbol = msg['key'].split('/')[0] + data = msg['data'] + + # calc time stamp(s) + s, ns = data.pop('Epoch'), data.pop('Nanoseconds') + ts = s * 10**9 + ns + data['broker_fill_time_ns'] = ts + + quote = {} + for k, v in data.items(): + if isnan(v): + continue + + quote[k.lower()] = v + + quote['symbol'] = symbol + + quotes = {} + + if diff_cached: + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info(f"New quote {quote['symbol']}:\n{new}") + + # only ship diff updates and other required fields + payload = {k: quote[k] for k, v in new} + payload['symbol'] = symbol + + # if there was volume likely the last size of + # shares traded is useful info and it's possible + # that the set difference from above will disregard + # a "size" value since the same # of shares were traded + size = quote.get('size') + volume = quote.get('volume') + if size and volume: + new_volume_since_last = max( + volume - last.get('volume', 0), 0) + log.warning( + f"NEW VOLUME {symbol}:{new_volume_since_last}") + payload['size'] = size + payload['last'] = quote.get('last') + + # XXX: we append to a list for the options case where the + # subscription topic (key) is the same for all + # expiries even though this is uncessary for the + # stock case (different topic [i.e. symbol] for each + # quote). + quotes.setdefault(symbol, []).append(payload) + + # update cache + _cache[symbol].update(quote) + else: + quotes = { + symbol: [{key.lower(): val for key, val in quote.items()}]} + + if quotes: + yield quotes