Add tbk tick streaming with trio-websocket

marketstore_integration
Tyler Goodlet 2020-06-01 14:13:30 -04:00
parent acd32341e2
commit 436e4d2df4
1 changed files with 87 additions and 44 deletions

View File

@ -1,15 +1,23 @@
"""
``marketstore`` integration.
- TICK data ingest routines
- websocket client for subscribing to write triggers
- docker container management automation
"""
from pprint import pformat
from typing import Dict, Any
from typing import Dict, Any, List
from functools import partial
import time
import msgpack
import numpy as np
import pandas as pd
import pymarketstore as pymkts
import click
import trio
import tractor
from trio_websocket import open_websocket_url
from ..cli import cli
from .. import watchlists as wl
@ -38,12 +46,14 @@ _quote_dt = [
('Volume', 'i8'),
# ('VWAP', 'f4')
]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
_tick_map = {
'Up': 1,
'Equal': 0,
'Down': -1
'Down': -1,
None: np.nan,
}
@ -64,37 +74,36 @@ def err_on_resp(response: dict) -> None:
def quote_to_marketstore_structarray(
quote: Dict[str, Any],
last_fills: Dict[str, str],
last_fill: str,
) -> np.array:
"""Return marketstore writeable recarray from quote ``dict``.
"""Return marketstore writeable structarray from quote ``dict``.
"""
if last_fill:
# new fill bby
now = timestamp(last_fill)
else:
# this should get inserted upstream by the broker-client to
# subtract from IPC latency
now = timestamp(pd.Timestamp.now())
secs, ns = now / 10**9, now % 10**9
# pack into List[Tuple[str, Any]]
array_input = []
# this should get inserted by the broker-client to subtract from
# IPC latency
now = timestamp(pd.Timestamp.now())
sym = quote['symbol']
last_fill_time = quote['fill_time']
if last_fills.get(sym) != last_fill_time:
# new fill
now = timestamp(last_fill_time)
last_fills[sym] = last_fill_time
secs, ns = now / 10**9, now % 10**9
# insert 'Epoch' entry first
array_input.append(int(secs))
# insert 'Nanoseconds' field
array_input.append(int(ns))
# tick mapping to int
array_input.append(_tick_map[quote['tick']])
# append remaining fields
for name, dt in _quote_dt[3:]:
array_input.append(quote[name.casefold()])
for name, dt in _quote_dt[2:]:
if 'f' in dt:
none = np.nan
else:
none = 0
val = quote.get(name.casefold(), none)
array_input.append(val)
return np.array([tuple(array_input)], dtype=_quote_dt)
@ -148,26 +157,43 @@ def ingest(config, name, test_file, tl, url):
log.error("Broker API is down temporarily")
return
client = pymkts.Client(endpoint=url)
quote_cache = {quote['symbol']: quote for quote in first_quotes}
# keep track of last executed fill for each symbol
last_fills = {}
client = pymkts.Client(endpoint=url)
# start ingest to marketstore
async for quotes in qstream:
for symbol, quote in quotes.items():
breakpoint()
fmt_quote, _ = brokermod.format_stock_quote(
quote,
feed._symbol_data_cache
)
a = quote_to_marketstore_structarray(fmt_quote, last_fills)
# start = time.time()
# err_on_resp(client.write(
# a, _tick_tbk.format(symbol), isvariablelength=True)
# )
# log.trace(
# f"{symbol} write time (s): {time.time() - start}")
# remap tick strs to ints
fmt_quote['tick'] = _tick_map[
fmt_quote.get('tick', 'Equal')
]
# check for volume update (i.e. did trades happen
# since last quote)
new_vol = fmt_quote.get('volume', None)
if new_vol is None:
log.info(f"NO trades for {symbol}")
if new_vol == quote_cache.get('volume'):
log.error(
f"{symbol}: got same volume as last quote?")
a = quote_to_marketstore_structarray(
fmt_quote,
# TODO: check this closer to the broker query api
last_fill=fmt_quote.get('last_fill', '')
)
start = time.time()
err_on_resp(client.write(
a, _tick_tbk.format(symbol), isvariablelength=True)
)
log.trace(
f"{symbol} write time (s): {time.time() - start}")
tractor.run(
partial(main, tries=1),
@ -213,7 +239,7 @@ def ms_shell(config, name, tl, url):
)
@click.argument('names', nargs=-1)
@click.pass_obj
def marketstore_destroy(config, names, url):
def marketstore_destroy(config: dict, names: List[str], url: str) -> None:
"""Destroy symbol entries in the local marketstore instance.
"""
client = pymkts.Client(url)
@ -235,6 +261,29 @@ def marketstore_destroy(config, names, url):
print("Nothing deleted.")
async def open_quote_stream(
tbks: List[str],
host: str = 'localhost',
port: int = 5993
) -> None:
"""Open a symbol stream from a running instance of marketstore and
log to console.
"""
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server
await ws.send_message(msgpack.dumps({'streams': tbks}))
async def recv() -> Dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8')
streams = (await recv())['streams']
log.info(f"Subscribed to {streams}")
while True:
msg = await recv()
log.info(f"Received quote:\n{msg}")
@cli.command()
@click.option(
'--url',
@ -243,14 +292,8 @@ def marketstore_destroy(config, names, url):
)
@click.argument('names', nargs=-1)
@click.pass_obj
def marketstore_stream(config, names, url):
"""Destroy symbol entries in the local marketstore instance.
def marketstore_stream(config: dict, names: List[str], url: str):
"""Connect to a marketstore time bucket stream for (a set of) symbols(s)
and print to console.
"""
symbol = 'APHA'
conn = pymkts.StreamConn('ws://localhost:5993/ws')
@conn.on(r'^{}/'.format(symbol))
def on_tsla(conn, msg):
print(f'received {symbol}', msg['data'])
conn.run(['APHA/*/*']) # runs until exception
trio.run(open_quote_stream, names)