Begin to wrap marketstore as a data feed

Wrap the sync client in an async interface in anticipation of an actual
async client. This starts support for the `open_fee()`/`stream_quotes()`
api though the tick normalization isn't correct yet.
its_happening
Tyler Goodlet 2020-08-01 20:08:05 -04:00
parent fa899c3979
commit 06f03c690c
2 changed files with 187 additions and 163 deletions

View File

@ -1,7 +1,7 @@
"""
Data feed apis and infra.
We ship some tsdb integrations for retrieving
We provide tsdb integrations for retrieving
and storing data from your brokers as well as
sharing your feeds with other fellow pikers.
"""
@ -91,6 +91,8 @@ async def open_feed(
symbols: Sequence[str],
loglevel: str = 'info',
) -> AsyncIterator[Dict[str, Any]]:
"""Open a "data feed" which provides streamed real-time quotes.
"""
try:
mod = get_brokermod(name)
except ImportError:

View File

@ -1,41 +1,38 @@
"""
``marketstore`` integration.
- TICK data ingest routines
- client management routines
- ticK data ingest routines
- websocket client for subscribing to write triggers
- docker container management automation
- todo: tick sequence stream-cloning for testing
- todo: docker container management automation
"""
from pprint import pformat
from typing import Dict, Any, List
from functools import partial
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Callable, Tuple
import time
from math import isnan
import msgpack
import numpy as np
import pandas as pd
import pymarketstore as pymkts
import click
import trio
import tractor
from trio_websocket import open_websocket_url
from . import maybe_spawn_brokerd
from ..cli import cli
from .. import watchlists as wl
from ..brokers.data import DataFeed
from ..log import get_logger
from ..log import get_logger, get_console_log
from ..data import open_feed
log = get_logger(__name__)
_tick_tbk_ids = ('1Sec', 'TICK')
_tick_tbk = '{}/' + '/'.join(_tick_tbk_ids)
_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
_url: str = 'http://localhost:5993/rpc'
_quote_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'),
('Nanoseconds', 'i4'),
('Tick', 'i4'),
('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask)
# ('fill_time', 'f4'),
('Last', 'f4'),
('Bid', 'f4'),
@ -44,11 +41,10 @@ _quote_dt = [
('Ask', 'f4'),
('Size', 'i8'),
('Volume', 'i8'),
# ('Broker_time_ns', 'i64'),
# ('VWAP', 'f4')
]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
_tick_map = {
'Up': 1,
'Equal': 0,
@ -84,7 +80,7 @@ def quote_to_marketstore_structarray(
else:
# this should get inserted upstream by the broker-client to
# subtract from IPC latency
now = timestamp(pd.Timestamp.now())
now = time.time_ns()
secs, ns = now / 10**9, now % 10**9
@ -109,172 +105,152 @@ def quote_to_marketstore_structarray(
def timestamp(datestr: str) -> int:
"""Return marketstore compatible 'Epoch' integer in nanoseconds.
"""Return marketstore compatible 'Epoch' integer in nanoseconds
from a date formatted str.
"""
return int(pd.Timestamp(datestr).value)
@cli.command()
@click.option('--test-file', '-t', help='Test quote stream file')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option(
'--url',
default='http://localhost:5993/rpc',
help='HTTP URL of marketstore instance'
)
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl, url):
"""Ingest real-time broker quotes and ticks to a marketstore instance.
def mk_tbk(keys: Tuple[str, str, str]) -> str:
"""Generate a marketstore table key from a tuple.
Converts,
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
"""
# global opts
brokermod = config['brokermod']
loglevel = config['loglevel']
log = config['log']
return '{}/' + '/'.join(keys)
watchlist_from_file = wl.ensure_watchlists(config['wl_path'])
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
symbols = watchlists[name]
async def main(tries):
async with maybe_spawn_brokerd(
brokername=brokermod.name,
tries=tries,
loglevel=loglevel
) as portal:
# connect to broker data feed
feed = DataFeed(portal, brokermod)
qstream, quotes = await feed.open_stream(
symbols,
'stock',
rate=3,
test=test_file,
)
class Client:
"""Async wrapper around the alpaca ``pymarketstore`` sync client.
first_quotes, _ = feed.format_quotes(quotes)
This will server as the shell for building out a proper async client
that isn't horribly documented and un-tested..
"""
def __init__(self, url: str):
self._client = pymkts.Client(url)
if first_quotes[0].get('last') is None:
log.error("Broker API is down temporarily")
return
async def _invoke(
self,
meth: Callable,
*args,
**kwargs,
) -> Any:
return err_on_resp(meth(*args, **kwargs))
quote_cache = {quote['symbol']: quote for quote in first_quotes}
async def destroy(
self,
tbk: Tuple[str, str, str],
) -> None:
return await self._invoke(self._client.destroy, mk_tbk(tbk))
client = pymkts.Client(endpoint=url)
async def list_symbols(
self,
tbk: str,
) -> List[str]:
return await self._invoke(self._client.list_symbols, mk_tbk(tbk))
async def write(
self,
symbol: str,
array: np.ndarray,
) -> None:
start = time.time()
await self._invoke(
self._client.write,
array,
_tick_tbk.format(symbol),
isvariablelength=True
)
log.debug(f"{symbol} write time (s): {time.time() - start}")
def query(
self,
symbol,
tbk: Tuple[str, str] = _tick_tbk_ids,
) -> pd.DataFrame:
# XXX: causes crash
# client.query(pymkts.Params(symbol, '*', 'OHCLV'
result = self._client.query(
pymkts.Params(symbol, *tbk),
)
return result.first().df()
@asynccontextmanager
async def get_client(
url: str = _url,
) -> Client:
yield Client(url)
async def ingest_quote_stream(
symbols: List[str],
brokername: str,
tries: int = 1,
loglevel: str = None,
) -> None:
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
"""
async with open_feed(
brokername,
symbols,
loglevel=loglevel,
) as (first_quotes, qstream):
quote_cache = first_quotes.copy()
async with get_client() as ms_client:
# start ingest to marketstore
async for quotes in qstream:
log.info(quotes)
for symbol, quote in quotes.items():
fmt_quote, _ = brokermod.format_stock_quote(
quote,
feed._symbol_data_cache
)
# remap tick strs to ints
fmt_quote['tick'] = _tick_map[
fmt_quote.get('tick', 'Equal')
]
quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
# check for volume update (i.e. did trades happen
# since last quote)
new_vol = fmt_quote.get('volume', None)
new_vol = quote.get('volume', None)
if new_vol is None:
log.debug(f"No fills for {symbol}")
if new_vol == quote_cache.get('volume'):
# should never happen due to field diffing
# on sender side
log.error(
f"{symbol}: got same volume as last quote?")
quote_cache.update(fmt_quote)
quote_cache.update(quote)
a = quote_to_marketstore_structarray(
fmt_quote,
quote,
# TODO: check this closer to the broker query api
last_fill=fmt_quote.get('last_fill', '')
last_fill=quote.get('fill_time', '')
)
start = time.time()
err_on_resp(client.write(
a, _tick_tbk.format(symbol), isvariablelength=True)
)
log.trace(
f"{symbol} write time (s): {time.time() - start}")
tractor.run(
partial(main, tries=1),
name='ingest_marketstore',
loglevel=loglevel if tl else None,
# start_method='forkserver',
)
await ms_client.write(symbol, a)
@cli.command()
@click.option(
'--tl',
is_flag=True,
help='Enable tractor logging')
@click.option(
'--url',
default='http://localhost:5993/rpc',
help='HTTP URL of marketstore instance'
)
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ms_shell(config, name, tl, url):
"""Start an IPython shell ready to query the local marketstore db.
"""
client = pymkts.Client(url)
def query(name, tbk=_tick_tbk_ids):
return client.query(
pymkts.Params(name, *tbk)).first().df()
# causes crash
# client.query(pymkts.Params(symbol, '*', 'OHCLV'
from IPython import embed
embed()
@cli.command()
@click.option(
'--url',
default='http://localhost:5993/rpc',
help='HTTP URL of marketstore instance'
)
@click.argument('names', nargs=-1)
@click.pass_obj
def marketstore_destroy(config: dict, names: List[str], url: str) -> None:
"""Destroy symbol entries in the local marketstore instance.
"""
client = pymkts.Client(url)
if not names:
names = client.list_symbols()
# default is to wipe db entirely.
answer = input(
"This will entirely wipe you local marketstore db @ "
f"{url} of the following symbols:\n {pformat(names)}"
"\n\nDelete [N/y]?\n")
if answer == 'y':
for sym in names:
tbk = _tick_tbk.format(sym)
print(f"Destroying {tbk}..")
err_on_resp(client.destroy(_tick_tbk.format(sym)))
else:
print("Nothing deleted.")
async def open_quote_stream(
tbks: List[str],
async def stream_quotes(
symbols: List[str],
host: str = 'localhost',
port: int = 5993
port: int = 5993,
diff_cached: bool = True,
loglevel: str = None,
) -> None:
"""Open a symbol stream from a running instance of marketstore and
log to console.
"""
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server
await ws.send_message(msgpack.dumps({'streams': tbks}))
resp = await ws.send_message(
msgpack.dumps({'streams': list(tbks.values())})
)
log.info(resp)
async def recv() -> Dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8')
@ -282,21 +258,67 @@ async def open_quote_stream(
streams = (await recv())['streams']
log.info(f"Subscribed to {streams}")
_cache = {}
while True:
msg = await recv()
log.info(f"Received quote:\n{msg}")
# unpack symbol and quote data
# key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
symbol = msg['key'].split('/')[0]
data = msg['data']
@cli.command()
@click.option(
'--url',
default='ws://localhost:5993/ws',
help='HTTP URL of marketstore instance'
)
@click.argument('names', nargs=-1)
@click.pass_obj
def marketstore_stream(config: dict, names: List[str], url: str):
"""Connect to a marketstore time bucket stream for (a set of) symbols(s)
and print to console.
"""
trio.run(open_quote_stream, names)
# calc time stamp(s)
s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
ts = s * 10**9 + ns
data['broker_fill_time_ns'] = ts
quote = {}
for k, v in data.items():
if isnan(v):
continue
quote[k.lower()] = v
quote['symbol'] = symbol
quotes = {}
if diff_cached:
last = _cache.setdefault(symbol, {})
new = set(quote.items()) - set(last.items())
if new:
log.info(f"New quote {quote['symbol']}:\n{new}")
# only ship diff updates and other required fields
payload = {k: quote[k] for k, v in new}
payload['symbol'] = symbol
# if there was volume likely the last size of
# shares traded is useful info and it's possible
# that the set difference from above will disregard
# a "size" value since the same # of shares were traded
size = quote.get('size')
volume = quote.get('volume')
if size and volume:
new_volume_since_last = max(
volume - last.get('volume', 0), 0)
log.warning(
f"NEW VOLUME {symbol}:{new_volume_since_last}")
payload['size'] = size
payload['last'] = quote.get('last')
# XXX: we append to a list for the options case where the
# subscription topic (key) is the same for all
# expiries even though this is uncessary for the
# stock case (different topic [i.e. symbol] for each
# quote).
quotes.setdefault(symbol, []).append(payload)
# update cache
_cache[symbol].update(quote)
else:
quotes = {symbol: [{key.lower(): val for key, val in quote.items()}]}
if quotes:
yield quotes