Still WIP, switch to using new marketstore client, missing streaming from marketstore

marketstore
Guillermo Rodriguez 2022-01-17 17:47:20 -03:00 committed by Tyler Goodlet
parent b0401b91c1
commit d367d68fc2
2 changed files with 183 additions and 233 deletions

View File

@ -25,11 +25,13 @@ import trio
import tractor
import click
from anyio_marketstore import open_marketstore_client
from .marketstore import (
get_client,
stream_quotes,
# stream_quotes,
ingest_quote_stream,
_url,
# _url,
_tick_tbk_ids,
mk_tbk,
)
@ -54,46 +56,47 @@ def ms_stream(config: dict, names: List[str], url: str):
and print to console.
"""
async def main():
async for quote in stream_quotes(symbols=names):
log.info(f"Received quote:\n{quote}")
# async for quote in stream_quotes(symbols=names):
# log.info(f"Received quote:\n{quote}")
...
trio.run(main)
@cli.command()
@click.option(
'--url',
default=_url,
help='HTTP URL of marketstore instance'
)
@click.argument('names', nargs=-1)
@click.pass_obj
def ms_destroy(config: dict, names: List[str], url: str) -> None:
"""Destroy symbol entries in the local marketstore instance.
"""
async def main():
nonlocal names
async with get_client(url) as client:
if not names:
names = await client.list_symbols()
# default is to wipe db entirely.
answer = input(
"This will entirely wipe you local marketstore db @ "
f"{url} of the following symbols:\n {pformat(names)}"
"\n\nDelete [N/y]?\n")
if answer == 'y':
for sym in names:
# tbk = _tick_tbk.format(sym)
tbk = tuple(sym, *_tick_tbk_ids)
print(f"Destroying {tbk}..")
await client.destroy(mk_tbk(tbk))
else:
print("Nothing deleted.")
tractor.run(main)
# @cli.command()
# @click.option(
# '--url',
# default=_url,
# help='HTTP URL of marketstore instance'
# )
# @click.argument('names', nargs=-1)
# @click.pass_obj
# def ms_destroy(config: dict, names: List[str], url: str) -> None:
# """Destroy symbol entries in the local marketstore instance.
# """
# async def main():
# nonlocal names
# async with get_client(url) as client:
#
# if not names:
# names = await client.list_symbols()
#
# # default is to wipe db entirely.
# answer = input(
# "This will entirely wipe you local marketstore db @ "
# f"{url} of the following symbols:\n {pformat(names)}"
# "\n\nDelete [N/y]?\n")
#
# if answer == 'y':
# for sym in names:
# # tbk = _tick_tbk.format(sym)
# tbk = tuple(sym, *_tick_tbk_ids)
# print(f"Destroying {tbk}..")
# await client.destroy(mk_tbk(tbk))
# else:
# print("Nothing deleted.")
#
# tractor.run(main)
@cli.command()
@ -102,17 +105,19 @@ def ms_destroy(config: dict, names: List[str], url: str) -> None:
is_flag=True,
help='Enable tractor logging')
@click.option(
'--url',
default=_url,
help='HTTP URL of marketstore instance'
'--host',
default='localhost'
)
@click.option(
'--port',
default=5995
)
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ms_shell(config, name, tl, url):
def ms_shell(config, tl, host, port):
"""Start an IPython shell ready to query the local marketstore db.
"""
async def main():
async with get_client(url) as client:
async with open_marketstore_client(host, port) as client:
query = client.query # noqa
# TODO: write magics to query marketstore
from IPython import embed
@ -124,15 +129,9 @@ def ms_shell(config, name, tl, url):
@cli.command()
@click.option('--test-file', '-t', help='Test quote stream file')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option(
'--url',
default=_url,
help='HTTP URL of marketstore instance'
)
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl, url):
def ingest(config, name, test_file, tl):
"""Ingest real-time broker quotes and ticks to a marketstore instance.
"""
# global opts

View File

@ -14,7 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
'''
``marketstore`` integration.
- client management routines
@ -22,7 +22,7 @@
- websocket client for subscribing to write triggers
- todo: tick sequence stream-cloning for testing
- todo: docker container management automation
"""
'''
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Callable, Tuple, Optional
import time
@ -31,9 +31,9 @@ from math import isnan
import msgpack
import numpy as np
import pandas as pd
import pymarketstore as pymkts
import tractor
from trio_websocket import open_websocket_url
from anyio_marketstore import open_marketstore_client, MarketstoreClient
from ..log import get_logger, get_console_log
from ..data import open_feed
@ -43,7 +43,7 @@ log = get_logger(__name__)
_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
_url: str = 'http://localhost:5993/rpc'
_quote_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'),
@ -51,34 +51,27 @@ _quote_dt = [
('IsTrade', 'i1'),
('IsBid', 'i1'),
('Price', 'f8'),
('Size', 'f8')
('Price', 'f4'),
('Size', 'f4')
]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
class MarketStoreError(Exception):
"Generic marketstore client error"
def err_on_resp(response: dict) -> None:
"""Raise any errors found in responses from client request.
def mk_tbk(keys: Tuple[str, str, str]) -> str:
"""Generate a marketstore table key from a tuple.
Converts,
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
"""
responses = response['responses']
if responses is not None:
for r in responses:
err = r['error']
if err:
raise MarketStoreError(err)
return '{}/' + '/'.join(keys)
def quote_to_marketstore_structarray(
quote: Dict[str, Any],
last_fill: Optional[float],
last_fill: Optional[float]
) -> np.array:
"""Return marketstore writeable structarray from quote ``dict``.
"""
'''
Return marketstore writeable structarray from quote ``dict``.
'''
if last_fill:
# new fill bby
now = timestamp(last_fill, unit='s')
@ -112,82 +105,21 @@ def quote_to_marketstore_structarray(
def timestamp(date, **kwargs) -> int:
"""Return marketstore compatible 'Epoch' integer in nanoseconds
'''
Return marketstore compatible 'Epoch' integer in nanoseconds
from a date formatted str.
"""
'''
return int(pd.Timestamp(date, **kwargs).value)
def mk_tbk(keys: Tuple[str, str, str]) -> str:
"""Generate a marketstore table key from a tuple.
Converts,
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
"""
return '{}/' + '/'.join(keys)
class Client:
"""Async wrapper around the alpaca ``pymarketstore`` sync client.
This will server as the shell for building out a proper async client
that isn't horribly documented and un-tested..
"""
def __init__(self, url: str):
self._client = pymkts.Client(url)
async def _invoke(
self,
meth: Callable,
*args,
**kwargs,
) -> Any:
return err_on_resp(meth(*args, **kwargs))
async def destroy(
self,
tbk: Tuple[str, str, str],
) -> None:
return await self._invoke(self._client.destroy, mk_tbk(tbk))
async def list_symbols(
self,
tbk: str,
) -> List[str]:
return await self._invoke(self._client.list_symbols, mk_tbk(tbk))
async def write(
self,
symbol: str,
array: np.ndarray,
) -> None:
start = time.time()
await self._invoke(
self._client.write,
array,
_tick_tbk.format(symbol),
isvariablelength=True
)
log.debug(f"{symbol} write time (s): {time.time() - start}")
def query(
self,
symbol,
tbk: Tuple[str, str] = _tick_tbk_ids,
) -> pd.DataFrame:
# XXX: causes crash
# client.query(pymkts.Params(symbol, '*', 'OHCLV'
result = self._client.query(
pymkts.Params(symbol, *tbk),
)
return result.first().df()
@asynccontextmanager
async def get_client(
url: str = _url,
) -> Client:
yield Client(url)
host: str = 'localhost',
port: int = 5995
) -> MarketstoreClient:
async with open_marketstore_client(host, port) as client:
yield client
async def ingest_quote_stream(
@ -196,8 +128,9 @@ async def ingest_quote_stream(
tries: int = 1,
actorloglevel: str = None,
) -> None:
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
"""
'''
Ingest a broker quote stream into marketstore.
'''
async with (
open_feed(brokername, symbols, loglevel=actorloglevel) as feed,
get_client() as ms_client
@ -212,107 +145,125 @@ async def ingest_quote_stream(
# okkk..
continue
a = quote_to_marketstore_structarray({
array = quote_to_marketstore_structarray({
'IsTrade': 1 if ticktype == 'trade' else 0,
'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
'Price': tick.get('price'),
'Size': tick.get('size')
}, last_fill=quote.get('broker_ts', None))
log.info(a)
await ms_client.write(symbol, a)
await ms_client.write(
array, _tick_tbk)
async def stream_quotes(
symbols: List[str],
timeframe: str = '1Min',
attr_group: str = 'TICK',
host: str = 'localhost',
port: int = 5993,
diff_cached: bool = True,
loglevel: str = None,
loglevel: str = None
) -> None:
"""Open a symbol stream from a running instance of marketstore and
'''
Open a symbol stream from a running instance of marketstore and
log to console.
"""
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
'''
tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
tbks: Dict[str, str] = {
sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server
resp = await ws.send_message(
msgpack.dumps({'streams': list(tbks.values())})
)
log.info(resp)
async def recv() -> Dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8')
streams = (await recv())['streams']
log.info(f"Subscribed to {streams}")
_cache = {}
while True:
msg = await recv()
# unpack symbol and quote data
# key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
symbol = msg['key'].split('/')[0]
data = msg['data']
# calc time stamp(s)
s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
ts = s * 10**9 + ns
data['broker_fill_time_ns'] = ts
quote = {}
for k, v in data.items():
if isnan(v):
continue
quote[k.lower()] = v
quote['symbol'] = symbol
quotes = {}
if diff_cached:
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(f"New quote {quote['symbol']}:\n{new}")
# only ship diff updates and other required fields
payload = {k: quote[k] for k, v in new}
payload['symbol'] = symbol
# if there was volume likely the last size of
# shares traded is useful info and it's possible
# that the set difference from above will disregard
# a "size" value since the same # of shares were traded
size = quote.get('size')
volume = quote.get('volume')
if size and volume:
new_volume_since_last = max(
volume - last.get('volume', 0), 0)
log.warning(
f"NEW VOLUME {symbol}:{new_volume_since_last}")
payload['size'] = size
payload['last'] = quote.get('last')
# XXX: we append to a list for the options case where the
# subscription topic (key) is the same for all
# expiries even though this is uncessary for the
# stock case (different topic [i.e. symbol] for each
# quote).
quotes.setdefault(symbol, []).append(payload)
# update cache
_cache[symbol].update(quote)
else:
quotes = {
symbol: [{key.lower(): val for key, val in quote.items()}]}
if quotes:
yield quotes
# async def stream_quotes(
# symbols: List[str],
# host: str = 'localhost',
# port: int = 5993,
# diff_cached: bool = True,
# loglevel: str = None,
# ) -> None:
# """Open a symbol stream from a running instance of marketstore and
# log to console.
# """
# # XXX: required to propagate ``tractor`` loglevel to piker logging
# get_console_log(loglevel or tractor.current_actor().loglevel)
#
# tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
#
# async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# # send subs topics to server
# resp = await ws.send_message(
# msgpack.dumps({'streams': list(tbks.values())})
# )
# log.info(resp)
#
# async def recv() -> Dict[str, Any]:
# return msgpack.loads((await ws.get_message()), encoding='utf-8')
#
# streams = (await recv())['streams']
# log.info(f"Subscribed to {streams}")
#
# _cache = {}
#
# while True:
# msg = await recv()
#
# # unpack symbol and quote data
# # key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
# symbol = msg['key'].split('/')[0]
# data = msg['data']
#
# # calc time stamp(s)
# s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
# ts = s * 10**9 + ns
# data['broker_fill_time_ns'] = ts
#
# quote = {}
# for k, v in data.items():
# if isnan(v):
# continue
#
# quote[k.lower()] = v
#
# quote['symbol'] = symbol
#
# quotes = {}
#
# if diff_cached:
# last = _cache.setdefault(symbol, {})
# new = set(quote.items()) - set(last.items())
# if new:
# log.info(f"New quote {quote['symbol']}:\n{new}")
#
# # only ship diff updates and other required fields
# payload = {k: quote[k] for k, v in new}
# payload['symbol'] = symbol
#
# # if there was volume likely the last size of
# # shares traded is useful info and it's possible
# # that the set difference from above will disregard
# # a "size" value since the same # of shares were traded
# size = quote.get('size')
# volume = quote.get('volume')
# if size and volume:
# new_volume_since_last = max(
# volume - last.get('volume', 0), 0)
# log.warning(
# f"NEW VOLUME {symbol}:{new_volume_since_last}")
# payload['size'] = size
# payload['last'] = quote.get('last')
#
# # XXX: we append to a list for the options case where the
# # subscription topic (key) is the same for all
# # expiries even though this is uncessary for the
# # stock case (different topic [i.e. symbol] for each
# # quote).
# quotes.setdefault(symbol, []).append(payload)
#
# # update cache
# _cache[symbol].update(quote)
# else:
# quotes = {
# symbol: [{key.lower(): val for key, val in quote.items()}]}
#
# if quotes:
# yield quotes