Add back in OHLCV dtype template and client side ws streamer
parent
8732b2bd5e
commit
2427c96336
|
@ -26,12 +26,12 @@
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
import time
|
import time
|
||||||
# from math import isnan
|
from math import isnan
|
||||||
|
|
||||||
# import msgpack
|
import msgpack
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
# import tractor
|
import tractor
|
||||||
from trio_websocket import open_websocket_url
|
from trio_websocket import open_websocket_url
|
||||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
|
from anyio_marketstore import open_marketstore_client, MarketstoreClient
|
||||||
|
|
||||||
|
@ -44,15 +44,29 @@ log = get_logger(__name__)
|
||||||
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
|
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
|
||||||
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
|
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
|
||||||
|
|
||||||
|
_tick_dt = [
|
||||||
|
# these two are required for as a "primary key"
|
||||||
|
('Epoch', 'i8'),
|
||||||
|
('Nanoseconds', 'i4'),
|
||||||
|
('IsTrade', 'i1'),
|
||||||
|
('IsBid', 'i1'),
|
||||||
|
('Price', 'f4'),
|
||||||
|
('Size', 'f4')
|
||||||
|
]
|
||||||
|
|
||||||
_quote_dt = [
|
_quote_dt = [
|
||||||
# these two are required for as a "primary key"
|
# these two are required for as a "primary key"
|
||||||
('Epoch', 'i8'),
|
('Epoch', 'i8'),
|
||||||
('Nanoseconds', 'i4'),
|
('Nanoseconds', 'i4'),
|
||||||
|
|
||||||
('IsTrade', 'i1'),
|
('Tick', 'i4'), # do we need this?
|
||||||
('IsBid', 'i1'),
|
('Last', 'f4'),
|
||||||
('Price', 'f4'),
|
('Bid', 'f4'),
|
||||||
('Size', 'f4')
|
('Bsize', 'f4'),
|
||||||
|
('Asize', 'f4'),
|
||||||
|
('Ask', 'f4'),
|
||||||
|
('Size', 'i8'),
|
||||||
|
('Volume', 'f4'),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -69,11 +83,12 @@ def mk_tbk(keys: tuple[str, str, str]) -> str:
|
||||||
def quote_to_marketstore_structarray(
|
def quote_to_marketstore_structarray(
|
||||||
quote: dict[str, Any],
|
quote: dict[str, Any],
|
||||||
last_fill: Optional[float]
|
last_fill: Optional[float]
|
||||||
|
|
||||||
) -> np.array:
|
) -> np.array:
|
||||||
'''
|
'''
|
||||||
Return marketstore writeable structarray from quote ``dict``.
|
Return marketstore writeable structarray from quote ``dict``.
|
||||||
'''
|
|
||||||
|
|
||||||
|
'''
|
||||||
if last_fill:
|
if last_fill:
|
||||||
# new fill bby
|
# new fill bby
|
||||||
now = timestamp(last_fill, unit='s')
|
now = timestamp(last_fill, unit='s')
|
||||||
|
@ -100,7 +115,8 @@ def quote_to_marketstore_structarray(
|
||||||
# for ``np.int`` we use 0 as a null value
|
# for ``np.int`` we use 0 as a null value
|
||||||
none = 0
|
none = 0
|
||||||
|
|
||||||
val = quote.get(name, none)
|
# casefold? see https://github.com/alpacahq/marketstore/issues/324
|
||||||
|
val = quote.get(name.casefold(), none)
|
||||||
array_input.append(val)
|
array_input.append(val)
|
||||||
|
|
||||||
return np.array([tuple(array_input)], dtype=_quote_dt)
|
return np.array([tuple(array_input)], dtype=_quote_dt)
|
||||||
|
@ -119,6 +135,7 @@ def timestamp(date, **kwargs) -> int:
|
||||||
async def get_client(
|
async def get_client(
|
||||||
host: str = 'localhost',
|
host: str = 'localhost',
|
||||||
port: int = 5995
|
port: int = 5995
|
||||||
|
|
||||||
) -> MarketstoreClient:
|
) -> MarketstoreClient:
|
||||||
async with open_marketstore_client(host, port) as client:
|
async with open_marketstore_client(host, port) as client:
|
||||||
yield client
|
yield client
|
||||||
|
@ -145,129 +162,184 @@ async def ingest_quote_stream(
|
||||||
for tick in quote.get('ticks', ()):
|
for tick in quote.get('ticks', ()):
|
||||||
ticktype = tick.get('type', 'n/a')
|
ticktype = tick.get('type', 'n/a')
|
||||||
|
|
||||||
if ticktype == 'n/a':
|
# _quote_dt = [
|
||||||
# okkk..
|
# # these two are required for as a "primary key"
|
||||||
continue
|
# ('Epoch', 'i8'),
|
||||||
|
# ('Nanoseconds', 'i4'),
|
||||||
|
# ('Tick', 'i4'),
|
||||||
|
#
|
||||||
|
# ('Last', 'f4'),
|
||||||
|
# ('Bid', 'f4'),
|
||||||
|
# ('Bsize', 'f4'),
|
||||||
|
# ('Asize', 'f4'),
|
||||||
|
# ('Ask', 'f4'),
|
||||||
|
# ('Size', 'i8'),
|
||||||
|
# ('Volume', 'f4'),
|
||||||
|
# ]
|
||||||
|
|
||||||
array = quote_to_marketstore_structarray({
|
# techtonic tick write
|
||||||
'IsTrade': 1 if ticktype == 'trade' else 0,
|
array = quote_to_marketstore_structarray({
|
||||||
'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
|
'IsTrade': 1 if ticktype == 'trade' else 0,
|
||||||
'Price': tick.get('price'),
|
'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
|
||||||
'Size': tick.get('size')
|
'Price': tick.get('price'),
|
||||||
}, last_fill=quote.get('broker_ts', None))
|
'Size': tick.get('size')
|
||||||
|
}, last_fill=quote.get('broker_ts', None))
|
||||||
|
|
||||||
await ms_client.write(array, _tick_tbk)
|
await ms_client.write(array, _tick_tbk)
|
||||||
|
|
||||||
|
quote_cache = {
|
||||||
|
'size': 0,
|
||||||
|
'tick': 0
|
||||||
|
}
|
||||||
|
# start ingest to marketstore
|
||||||
|
async for quotes in feed.stream:
|
||||||
|
log.info(quotes)
|
||||||
|
for symbol, quote in quotes.items():
|
||||||
|
|
||||||
|
for tick in quote.get('ticks', ()):
|
||||||
|
ticktype = tick.get('type')
|
||||||
|
price = tick.get('price')
|
||||||
|
size = tick.get('size')
|
||||||
|
|
||||||
|
if ticktype == 'n/a' or price == -1:
|
||||||
|
# okkk..
|
||||||
|
continue
|
||||||
|
|
||||||
|
# clearing price event
|
||||||
|
if ticktype == 'trade':
|
||||||
|
quote_cache['volume'] = quote['volume']
|
||||||
|
quote_cache['last'] = price
|
||||||
|
# quote_cache['broker_ts'] = quote['broker_ts']
|
||||||
|
|
||||||
|
# l1 book events
|
||||||
|
elif ticktype in ('ask', 'asize'):
|
||||||
|
quote_cache['ask'] = price
|
||||||
|
quote_cache['asize'] = size
|
||||||
|
|
||||||
|
elif ticktype in ('bid', 'bsize'):
|
||||||
|
quote_cache['bid'] = price
|
||||||
|
quote_cache['bsize'] = size
|
||||||
|
|
||||||
|
a = quote_to_marketstore_structarray(
|
||||||
|
quote_cache,
|
||||||
|
last_fill=quote.get('broker_ts', None)
|
||||||
|
)
|
||||||
|
log.info(a)
|
||||||
|
# breakpoint()
|
||||||
|
await ms_client.write(symbol, a)
|
||||||
|
|
||||||
|
|
||||||
|
# async def stream_quotes(
|
||||||
|
# symbols: list[str],
|
||||||
|
# timeframe: str = '1Min',
|
||||||
|
# attr_group: str = 'TICK',
|
||||||
|
# host: str = 'localhost',
|
||||||
|
# port: int = 5993,
|
||||||
|
# loglevel: str = None
|
||||||
|
|
||||||
|
# ) -> None:
|
||||||
|
# '''
|
||||||
|
# Open a symbol stream from a running instance of marketstore and
|
||||||
|
# log to console.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# tbks: dict[str, str] = {
|
||||||
|
# sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
symbols: list[str],
|
symbols: list[str],
|
||||||
timeframe: str = '1Min',
|
|
||||||
attr_group: str = 'TICK',
|
|
||||||
host: str = 'localhost',
|
host: str = 'localhost',
|
||||||
port: int = 5993,
|
port: int = 5993,
|
||||||
loglevel: str = None
|
diff_cached: bool = True,
|
||||||
|
loglevel: str = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Open a symbol stream from a running instance of marketstore and
|
Open a symbol stream from a running instance of marketstore and
|
||||||
log to console.
|
log to console.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||||
|
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||||
|
|
||||||
tbks: dict[str, str] = {
|
tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
|
||||||
sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
|
|
||||||
|
|
||||||
|
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
|
||||||
|
# send subs topics to server
|
||||||
|
resp = await ws.send_message(
|
||||||
|
msgpack.dumps({'streams': list(tbks.values())})
|
||||||
|
)
|
||||||
|
log.info(resp)
|
||||||
|
|
||||||
|
async def recv() -> dict[str, Any]:
|
||||||
|
return msgpack.loads((await ws.get_message()), encoding='utf-8')
|
||||||
|
|
||||||
# async def stream_quotes(
|
streams = (await recv())['streams']
|
||||||
# symbols: list[str],
|
log.info(f"Subscribed to {streams}")
|
||||||
# host: str = 'localhost',
|
|
||||||
# port: int = 5993,
|
_cache = {}
|
||||||
# diff_cached: bool = True,
|
|
||||||
# loglevel: str = None,
|
while True:
|
||||||
# ) -> None:
|
msg = await recv()
|
||||||
# """Open a symbol stream from a running instance of marketstore and
|
|
||||||
# log to console.
|
# unpack symbol and quote data
|
||||||
# """
|
# key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
|
||||||
# # XXX: required to propagate ``tractor`` loglevel to piker logging
|
symbol = msg['key'].split('/')[0]
|
||||||
# get_console_log(loglevel or tractor.current_actor().loglevel)
|
data = msg['data']
|
||||||
#
|
|
||||||
# tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
|
# calc time stamp(s)
|
||||||
#
|
s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
|
||||||
# async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
|
ts = s * 10**9 + ns
|
||||||
# # send subs topics to server
|
data['broker_fill_time_ns'] = ts
|
||||||
# resp = await ws.send_message(
|
|
||||||
# msgpack.dumps({'streams': list(tbks.values())})
|
quote = {}
|
||||||
# )
|
for k, v in data.items():
|
||||||
# log.info(resp)
|
if isnan(v):
|
||||||
#
|
continue
|
||||||
# async def recv() -> dict[str, Any]:
|
|
||||||
# return msgpack.loads((await ws.get_message()), encoding='utf-8')
|
quote[k.lower()] = v
|
||||||
#
|
|
||||||
# streams = (await recv())['streams']
|
quote['symbol'] = symbol
|
||||||
# log.info(f"Subscribed to {streams}")
|
|
||||||
#
|
quotes = {}
|
||||||
# _cache = {}
|
|
||||||
#
|
if diff_cached:
|
||||||
# while True:
|
last = _cache.setdefault(symbol, {})
|
||||||
# msg = await recv()
|
new = set(quote.items()) - set(last.items())
|
||||||
#
|
if new:
|
||||||
# # unpack symbol and quote data
|
log.info(f"New quote {quote['symbol']}:\n{new}")
|
||||||
# # key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
|
|
||||||
# symbol = msg['key'].split('/')[0]
|
# only ship diff updates and other required fields
|
||||||
# data = msg['data']
|
payload = {k: quote[k] for k, v in new}
|
||||||
#
|
payload['symbol'] = symbol
|
||||||
# # calc time stamp(s)
|
|
||||||
# s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
|
# if there was volume likely the last size of
|
||||||
# ts = s * 10**9 + ns
|
# shares traded is useful info and it's possible
|
||||||
# data['broker_fill_time_ns'] = ts
|
# that the set difference from above will disregard
|
||||||
#
|
# a "size" value since the same # of shares were traded
|
||||||
# quote = {}
|
size = quote.get('size')
|
||||||
# for k, v in data.items():
|
volume = quote.get('volume')
|
||||||
# if isnan(v):
|
if size and volume:
|
||||||
# continue
|
new_volume_since_last = max(
|
||||||
#
|
volume - last.get('volume', 0), 0)
|
||||||
# quote[k.lower()] = v
|
log.warning(
|
||||||
#
|
f"NEW VOLUME {symbol}:{new_volume_since_last}")
|
||||||
# quote['symbol'] = symbol
|
payload['size'] = size
|
||||||
#
|
payload['last'] = quote.get('last')
|
||||||
# quotes = {}
|
|
||||||
#
|
# XXX: we append to a list for the options case where the
|
||||||
# if diff_cached:
|
# subscription topic (key) is the same for all
|
||||||
# last = _cache.setdefault(symbol, {})
|
# expiries even though this is uncessary for the
|
||||||
# new = set(quote.items()) - set(last.items())
|
# stock case (different topic [i.e. symbol] for each
|
||||||
# if new:
|
# quote).
|
||||||
# log.info(f"New quote {quote['symbol']}:\n{new}")
|
quotes.setdefault(symbol, []).append(payload)
|
||||||
#
|
|
||||||
# # only ship diff updates and other required fields
|
# update cache
|
||||||
# payload = {k: quote[k] for k, v in new}
|
_cache[symbol].update(quote)
|
||||||
# payload['symbol'] = symbol
|
else:
|
||||||
#
|
quotes = {
|
||||||
# # if there was volume likely the last size of
|
symbol: [{key.lower(): val for key, val in quote.items()}]}
|
||||||
# # shares traded is useful info and it's possible
|
|
||||||
# # that the set difference from above will disregard
|
if quotes:
|
||||||
# # a "size" value since the same # of shares were traded
|
yield quotes
|
||||||
# size = quote.get('size')
|
|
||||||
# volume = quote.get('volume')
|
|
||||||
# if size and volume:
|
|
||||||
# new_volume_since_last = max(
|
|
||||||
# volume - last.get('volume', 0), 0)
|
|
||||||
# log.warning(
|
|
||||||
# f"NEW VOLUME {symbol}:{new_volume_since_last}")
|
|
||||||
# payload['size'] = size
|
|
||||||
# payload['last'] = quote.get('last')
|
|
||||||
#
|
|
||||||
# # XXX: we append to a list for the options case where the
|
|
||||||
# # subscription topic (key) is the same for all
|
|
||||||
# # expiries even though this is uncessary for the
|
|
||||||
# # stock case (different topic [i.e. symbol] for each
|
|
||||||
# # quote).
|
|
||||||
# quotes.setdefault(symbol, []).append(payload)
|
|
||||||
#
|
|
||||||
# # update cache
|
|
||||||
# _cache[symbol].update(quote)
|
|
||||||
# else:
|
|
||||||
# quotes = {
|
|
||||||
# symbol: [{key.lower(): val for key, val in quote.items()}]}
|
|
||||||
#
|
|
||||||
# if quotes:
|
|
||||||
# yield quotes
|
|
||||||
|
|
Loading…
Reference in New Issue