Add tbk tick streaming with trio-websocket
parent
c47df8811b
commit
9173f22f3b
|
@ -1,15 +1,23 @@
|
||||||
"""
|
"""
|
||||||
``marketstore`` integration.
|
``marketstore`` integration.
|
||||||
|
|
||||||
|
- TICK data ingest routines
|
||||||
|
- websocket client for subscribing to write triggers
|
||||||
|
- docker container management automation
|
||||||
"""
|
"""
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any, List
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
import time
|
||||||
|
|
||||||
|
import msgpack
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import pymarketstore as pymkts
|
import pymarketstore as pymkts
|
||||||
import click
|
import click
|
||||||
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
from trio_websocket import open_websocket_url
|
||||||
|
|
||||||
from ..cli import cli
|
from ..cli import cli
|
||||||
from .. import watchlists as wl
|
from .. import watchlists as wl
|
||||||
|
@ -38,12 +46,14 @@ _quote_dt = [
|
||||||
('Volume', 'i8'),
|
('Volume', 'i8'),
|
||||||
# ('VWAP', 'f4')
|
# ('VWAP', 'f4')
|
||||||
]
|
]
|
||||||
|
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
|
||||||
|
|
||||||
|
|
||||||
_tick_map = {
|
_tick_map = {
|
||||||
'Up': 1,
|
'Up': 1,
|
||||||
'Equal': 0,
|
'Equal': 0,
|
||||||
'Down': -1
|
'Down': -1,
|
||||||
|
None: np.nan,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -64,37 +74,36 @@ def err_on_resp(response: dict) -> None:
|
||||||
|
|
||||||
def quote_to_marketstore_structarray(
|
def quote_to_marketstore_structarray(
|
||||||
quote: Dict[str, Any],
|
quote: Dict[str, Any],
|
||||||
last_fills: Dict[str, str],
|
last_fill: str,
|
||||||
) -> np.array:
|
) -> np.array:
|
||||||
"""Return marketstore writeable recarray from quote ``dict``.
|
"""Return marketstore writeable structarray from quote ``dict``.
|
||||||
"""
|
"""
|
||||||
|
if last_fill:
|
||||||
|
# new fill bby
|
||||||
|
now = timestamp(last_fill)
|
||||||
|
else:
|
||||||
|
# this should get inserted upstream by the broker-client to
|
||||||
|
# subtract from IPC latency
|
||||||
|
now = timestamp(pd.Timestamp.now())
|
||||||
|
|
||||||
|
secs, ns = now / 10**9, now % 10**9
|
||||||
|
|
||||||
# pack into List[Tuple[str, Any]]
|
# pack into List[Tuple[str, Any]]
|
||||||
array_input = []
|
array_input = []
|
||||||
|
|
||||||
# this should get inserted by the broker-client to subtract from
|
|
||||||
# IPC latency
|
|
||||||
now = timestamp(pd.Timestamp.now())
|
|
||||||
|
|
||||||
sym = quote['symbol']
|
|
||||||
last_fill_time = quote['fill_time']
|
|
||||||
if last_fills.get(sym) != last_fill_time:
|
|
||||||
# new fill
|
|
||||||
now = timestamp(last_fill_time)
|
|
||||||
last_fills[sym] = last_fill_time
|
|
||||||
|
|
||||||
secs, ns = now / 10**9, now % 10**9
|
|
||||||
# insert 'Epoch' entry first
|
# insert 'Epoch' entry first
|
||||||
array_input.append(int(secs))
|
array_input.append(int(secs))
|
||||||
|
|
||||||
# insert 'Nanoseconds' field
|
# insert 'Nanoseconds' field
|
||||||
array_input.append(int(ns))
|
array_input.append(int(ns))
|
||||||
|
|
||||||
# tick mapping to int
|
|
||||||
array_input.append(_tick_map[quote['tick']])
|
|
||||||
|
|
||||||
# append remaining fields
|
# append remaining fields
|
||||||
for name, dt in _quote_dt[3:]:
|
for name, dt in _quote_dt[2:]:
|
||||||
array_input.append(quote[name.casefold()])
|
if 'f' in dt:
|
||||||
|
none = np.nan
|
||||||
|
else:
|
||||||
|
none = 0
|
||||||
|
val = quote.get(name.casefold(), none)
|
||||||
|
array_input.append(val)
|
||||||
|
|
||||||
return np.array([tuple(array_input)], dtype=_quote_dt)
|
return np.array([tuple(array_input)], dtype=_quote_dt)
|
||||||
|
|
||||||
|
@ -148,26 +157,43 @@ def ingest(config, name, test_file, tl, url):
|
||||||
log.error("Broker API is down temporarily")
|
log.error("Broker API is down temporarily")
|
||||||
return
|
return
|
||||||
|
|
||||||
client = pymkts.Client(endpoint=url)
|
quote_cache = {quote['symbol']: quote for quote in first_quotes}
|
||||||
|
|
||||||
# keep track of last executed fill for each symbol
|
client = pymkts.Client(endpoint=url)
|
||||||
last_fills = {}
|
|
||||||
|
|
||||||
# start ingest to marketstore
|
# start ingest to marketstore
|
||||||
async for quotes in qstream:
|
async for quotes in qstream:
|
||||||
for symbol, quote in quotes.items():
|
for symbol, quote in quotes.items():
|
||||||
breakpoint()
|
|
||||||
fmt_quote, _ = brokermod.format_stock_quote(
|
fmt_quote, _ = brokermod.format_stock_quote(
|
||||||
quote,
|
quote,
|
||||||
feed._symbol_data_cache
|
feed._symbol_data_cache
|
||||||
)
|
)
|
||||||
a = quote_to_marketstore_structarray(fmt_quote, last_fills)
|
|
||||||
# start = time.time()
|
# remap tick strs to ints
|
||||||
# err_on_resp(client.write(
|
fmt_quote['tick'] = _tick_map[
|
||||||
# a, _tick_tbk.format(symbol), isvariablelength=True)
|
fmt_quote.get('tick', 'Equal')
|
||||||
# )
|
]
|
||||||
# log.trace(
|
|
||||||
# f"{symbol} write time (s): {time.time() - start}")
|
# check for volume update (i.e. did trades happen
|
||||||
|
# since last quote)
|
||||||
|
new_vol = fmt_quote.get('volume', None)
|
||||||
|
if new_vol is None:
|
||||||
|
log.info(f"NO trades for {symbol}")
|
||||||
|
if new_vol == quote_cache.get('volume'):
|
||||||
|
log.error(
|
||||||
|
f"{symbol}: got same volume as last quote?")
|
||||||
|
|
||||||
|
a = quote_to_marketstore_structarray(
|
||||||
|
fmt_quote,
|
||||||
|
# TODO: check this closer to the broker query api
|
||||||
|
last_fill=fmt_quote.get('last_fill', '')
|
||||||
|
)
|
||||||
|
start = time.time()
|
||||||
|
err_on_resp(client.write(
|
||||||
|
a, _tick_tbk.format(symbol), isvariablelength=True)
|
||||||
|
)
|
||||||
|
log.trace(
|
||||||
|
f"{symbol} write time (s): {time.time() - start}")
|
||||||
|
|
||||||
tractor.run(
|
tractor.run(
|
||||||
partial(main, tries=1),
|
partial(main, tries=1),
|
||||||
|
@ -213,7 +239,7 @@ def ms_shell(config, name, tl, url):
|
||||||
)
|
)
|
||||||
@click.argument('names', nargs=-1)
|
@click.argument('names', nargs=-1)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def marketstore_destroy(config, names, url):
|
def marketstore_destroy(config: dict, names: List[str], url: str) -> None:
|
||||||
"""Destroy symbol entries in the local marketstore instance.
|
"""Destroy symbol entries in the local marketstore instance.
|
||||||
"""
|
"""
|
||||||
client = pymkts.Client(url)
|
client = pymkts.Client(url)
|
||||||
|
@ -235,6 +261,29 @@ def marketstore_destroy(config, names, url):
|
||||||
print("Nothing deleted.")
|
print("Nothing deleted.")
|
||||||
|
|
||||||
|
|
||||||
|
async def open_quote_stream(
|
||||||
|
tbks: List[str],
|
||||||
|
host: str = 'localhost',
|
||||||
|
port: int = 5993
|
||||||
|
) -> None:
|
||||||
|
"""Open a symbol stream from a running instance of marketstore and
|
||||||
|
log to console.
|
||||||
|
"""
|
||||||
|
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
|
||||||
|
# send subs topics to server
|
||||||
|
await ws.send_message(msgpack.dumps({'streams': tbks}))
|
||||||
|
|
||||||
|
async def recv() -> Dict[str, Any]:
|
||||||
|
return msgpack.loads((await ws.get_message()), encoding='utf-8')
|
||||||
|
|
||||||
|
streams = (await recv())['streams']
|
||||||
|
log.info(f"Subscribed to {streams}")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
msg = await recv()
|
||||||
|
log.info(f"Received quote:\n{msg}")
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
@cli.command()
|
||||||
@click.option(
|
@click.option(
|
||||||
'--url',
|
'--url',
|
||||||
|
@ -243,14 +292,8 @@ def marketstore_destroy(config, names, url):
|
||||||
)
|
)
|
||||||
@click.argument('names', nargs=-1)
|
@click.argument('names', nargs=-1)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def marketstore_stream(config, names, url):
|
def marketstore_stream(config: dict, names: List[str], url: str):
|
||||||
"""Destroy symbol entries in the local marketstore instance.
|
"""Connect to a marketstore time bucket stream for (a set of) symbols(s)
|
||||||
|
and print to console.
|
||||||
"""
|
"""
|
||||||
symbol = 'APHA'
|
trio.run(open_quote_stream, names)
|
||||||
conn = pymkts.StreamConn('ws://localhost:5993/ws')
|
|
||||||
|
|
||||||
@conn.on(r'^{}/'.format(symbol))
|
|
||||||
def on_tsla(conn, msg):
|
|
||||||
print(f'received {symbol}', msg['data'])
|
|
||||||
|
|
||||||
conn.run(['APHA/*/*']) # runs until exception
|
|
||||||
|
|
Loading…
Reference in New Issue