Add back in OHLCV dtype template and client side ws streamer

marketstore
Tyler Goodlet 2022-02-18 07:35:56 -05:00
parent ec501da681
commit 216ad65933
1 changed files with 190 additions and 118 deletions

View File

@ -26,12 +26,12 @@
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Any, Optional from typing import Any, Optional
import time import time
# from math import isnan from math import isnan
# import msgpack import msgpack
import numpy as np import numpy as np
import pandas as pd import pandas as pd
# import tractor import tractor
from trio_websocket import open_websocket_url from trio_websocket import open_websocket_url
from anyio_marketstore import open_marketstore_client, MarketstoreClient from anyio_marketstore import open_marketstore_client, MarketstoreClient
@ -44,15 +44,29 @@ log = get_logger(__name__)
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') _tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) _tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
_tick_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'),
('Nanoseconds', 'i4'),
('IsTrade', 'i1'),
('IsBid', 'i1'),
('Price', 'f4'),
('Size', 'f4')
]
_quote_dt = [ _quote_dt = [
# these two are required for as a "primary key" # these two are required for as a "primary key"
('Epoch', 'i8'), ('Epoch', 'i8'),
('Nanoseconds', 'i4'), ('Nanoseconds', 'i4'),
('IsTrade', 'i1'), ('Tick', 'i4'), # do we need this?
('IsBid', 'i1'), ('Last', 'f4'),
('Price', 'f4'), ('Bid', 'f4'),
('Size', 'f4') ('Bsize', 'f4'),
('Asize', 'f4'),
('Ask', 'f4'),
('Size', 'i8'),
('Volume', 'f4'),
] ]
@ -69,11 +83,12 @@ def mk_tbk(keys: tuple[str, str, str]) -> str:
def quote_to_marketstore_structarray( def quote_to_marketstore_structarray(
quote: dict[str, Any], quote: dict[str, Any],
last_fill: Optional[float] last_fill: Optional[float]
) -> np.array: ) -> np.array:
''' '''
Return marketstore writeable structarray from quote ``dict``. Return marketstore writeable structarray from quote ``dict``.
'''
'''
if last_fill: if last_fill:
# new fill bby # new fill bby
now = timestamp(last_fill, unit='s') now = timestamp(last_fill, unit='s')
@ -100,7 +115,8 @@ def quote_to_marketstore_structarray(
# for ``np.int`` we use 0 as a null value # for ``np.int`` we use 0 as a null value
none = 0 none = 0
val = quote.get(name, none) # casefold? see https://github.com/alpacahq/marketstore/issues/324
val = quote.get(name.casefold(), none)
array_input.append(val) array_input.append(val)
return np.array([tuple(array_input)], dtype=_quote_dt) return np.array([tuple(array_input)], dtype=_quote_dt)
@ -119,6 +135,7 @@ def timestamp(date, **kwargs) -> int:
async def get_client( async def get_client(
host: str = 'localhost', host: str = 'localhost',
port: int = 5995 port: int = 5995
) -> MarketstoreClient: ) -> MarketstoreClient:
async with open_marketstore_client(host, port) as client: async with open_marketstore_client(host, port) as client:
yield client yield client
@ -145,129 +162,184 @@ async def ingest_quote_stream(
for tick in quote.get('ticks', ()): for tick in quote.get('ticks', ()):
ticktype = tick.get('type', 'n/a') ticktype = tick.get('type', 'n/a')
if ticktype == 'n/a': # _quote_dt = [
# okkk.. # # these two are required for as a "primary key"
continue # ('Epoch', 'i8'),
# ('Nanoseconds', 'i4'),
# ('Tick', 'i4'),
#
# ('Last', 'f4'),
# ('Bid', 'f4'),
# ('Bsize', 'f4'),
# ('Asize', 'f4'),
# ('Ask', 'f4'),
# ('Size', 'i8'),
# ('Volume', 'f4'),
# ]
array = quote_to_marketstore_structarray({ # techtonic tick write
'IsTrade': 1 if ticktype == 'trade' else 0, array = quote_to_marketstore_structarray({
'IsBid': 1 if ticktype in ('bid', 'bsize') else 0, 'IsTrade': 1 if ticktype == 'trade' else 0,
'Price': tick.get('price'), 'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
'Size': tick.get('size') 'Price': tick.get('price'),
}, last_fill=quote.get('broker_ts', None)) 'Size': tick.get('size')
}, last_fill=quote.get('broker_ts', None))
await ms_client.write(array, _tick_tbk) await ms_client.write(array, _tick_tbk)
quote_cache = {
'size': 0,
'tick': 0
}
# start ingest to marketstore
async for quotes in feed.stream:
log.info(quotes)
for symbol, quote in quotes.items():
for tick in quote.get('ticks', ()):
ticktype = tick.get('type')
price = tick.get('price')
size = tick.get('size')
if ticktype == 'n/a' or price == -1:
# okkk..
continue
# clearing price event
if ticktype == 'trade':
quote_cache['volume'] = quote['volume']
quote_cache['last'] = price
# quote_cache['broker_ts'] = quote['broker_ts']
# l1 book events
elif ticktype in ('ask', 'asize'):
quote_cache['ask'] = price
quote_cache['asize'] = size
elif ticktype in ('bid', 'bsize'):
quote_cache['bid'] = price
quote_cache['bsize'] = size
a = quote_to_marketstore_structarray(
quote_cache,
last_fill=quote.get('broker_ts', None)
)
log.info(a)
# breakpoint()
await ms_client.write(symbol, a)
# async def stream_quotes(
# symbols: list[str],
# timeframe: str = '1Min',
# attr_group: str = 'TICK',
# host: str = 'localhost',
# port: int = 5993,
# loglevel: str = None
# ) -> None:
# '''
# Open a symbol stream from a running instance of marketstore and
# log to console.
# '''
# tbks: dict[str, str] = {
# sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
async def stream_quotes( async def stream_quotes(
symbols: list[str], symbols: list[str],
timeframe: str = '1Min',
attr_group: str = 'TICK',
host: str = 'localhost', host: str = 'localhost',
port: int = 5993, port: int = 5993,
loglevel: str = None diff_cached: bool = True,
loglevel: str = None,
) -> None: ) -> None:
''' '''
Open a symbol stream from a running instance of marketstore and Open a symbol stream from a running instance of marketstore and
log to console. log to console.
''' '''
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
tbks: dict[str, str] = { tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server
resp = await ws.send_message(
msgpack.dumps({'streams': list(tbks.values())})
)
log.info(resp)
async def recv() -> dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8')
# async def stream_quotes( streams = (await recv())['streams']
# symbols: list[str], log.info(f"Subscribed to {streams}")
# host: str = 'localhost',
# port: int = 5993, _cache = {}
# diff_cached: bool = True,
# loglevel: str = None, while True:
# ) -> None: msg = await recv()
# """Open a symbol stream from a running instance of marketstore and
# log to console. # unpack symbol and quote data
# """ # key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
# # XXX: required to propagate ``tractor`` loglevel to piker logging symbol = msg['key'].split('/')[0]
# get_console_log(loglevel or tractor.current_actor().loglevel) data = msg['data']
#
# tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} # calc time stamp(s)
# s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
# async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: ts = s * 10**9 + ns
# # send subs topics to server data['broker_fill_time_ns'] = ts
# resp = await ws.send_message(
# msgpack.dumps({'streams': list(tbks.values())}) quote = {}
# ) for k, v in data.items():
# log.info(resp) if isnan(v):
# continue
# async def recv() -> dict[str, Any]:
# return msgpack.loads((await ws.get_message()), encoding='utf-8') quote[k.lower()] = v
#
# streams = (await recv())['streams'] quote['symbol'] = symbol
# log.info(f"Subscribed to {streams}")
# quotes = {}
# _cache = {}
# if diff_cached:
# while True: last = _cache.setdefault(symbol, {})
# msg = await recv() new = set(quote.items()) - set(last.items())
# if new:
# # unpack symbol and quote data log.info(f"New quote {quote['symbol']}:\n{new}")
# # key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
# symbol = msg['key'].split('/')[0] # only ship diff updates and other required fields
# data = msg['data'] payload = {k: quote[k] for k, v in new}
# payload['symbol'] = symbol
# # calc time stamp(s)
# s, ns = data.pop('Epoch'), data.pop('Nanoseconds') # if there was volume likely the last size of
# ts = s * 10**9 + ns # shares traded is useful info and it's possible
# data['broker_fill_time_ns'] = ts # that the set difference from above will disregard
# # a "size" value since the same # of shares were traded
# quote = {} size = quote.get('size')
# for k, v in data.items(): volume = quote.get('volume')
# if isnan(v): if size and volume:
# continue new_volume_since_last = max(
# volume - last.get('volume', 0), 0)
# quote[k.lower()] = v log.warning(
# f"NEW VOLUME {symbol}:{new_volume_since_last}")
# quote['symbol'] = symbol payload['size'] = size
# payload['last'] = quote.get('last')
# quotes = {}
# # XXX: we append to a list for the options case where the
# if diff_cached: # subscription topic (key) is the same for all
# last = _cache.setdefault(symbol, {}) # expiries even though this is uncessary for the
# new = set(quote.items()) - set(last.items()) # stock case (different topic [i.e. symbol] for each
# if new: # quote).
# log.info(f"New quote {quote['symbol']}:\n{new}") quotes.setdefault(symbol, []).append(payload)
#
# # only ship diff updates and other required fields # update cache
# payload = {k: quote[k] for k, v in new} _cache[symbol].update(quote)
# payload['symbol'] = symbol else:
# quotes = {
# # if there was volume likely the last size of symbol: [{key.lower(): val for key, val in quote.items()}]}
# # shares traded is useful info and it's possible
# # that the set difference from above will disregard if quotes:
# # a "size" value since the same # of shares were traded yield quotes
# size = quote.get('size')
# volume = quote.get('volume')
# if size and volume:
# new_volume_since_last = max(
# volume - last.get('volume', 0), 0)
# log.warning(
# f"NEW VOLUME {symbol}:{new_volume_since_last}")
# payload['size'] = size
# payload['last'] = quote.get('last')
#
# # XXX: we append to a list for the options case where the
# # subscription topic (key) is the same for all
# # expiries even though this is uncessary for the
# # stock case (different topic [i.e. symbol] for each
# # quote).
# quotes.setdefault(symbol, []).append(payload)
#
# # update cache
# _cache[symbol].update(quote)
# else:
# quotes = {
# symbol: [{key.lower(): val for key, val in quote.items()}]}
#
# if quotes:
# yield quotes