diff --git a/piker/data/cli.py b/piker/data/cli.py
index 2206bd6a..21416a80 100644
--- a/piker/data/cli.py
+++ b/piker/data/cli.py
@@ -25,11 +25,13 @@ import trio
import tractor
import click
+from anyio_marketstore import open_marketstore_client
+
from .marketstore import (
get_client,
- stream_quotes,
+ # stream_quotes,
ingest_quote_stream,
- _url,
+ # _url,
_tick_tbk_ids,
mk_tbk,
)
@@ -54,46 +56,47 @@ def ms_stream(config: dict, names: List[str], url: str):
and print to console.
"""
async def main():
- async for quote in stream_quotes(symbols=names):
- log.info(f"Received quote:\n{quote}")
+ # async for quote in stream_quotes(symbols=names):
+ # log.info(f"Received quote:\n{quote}")
+ ...
trio.run(main)
-@cli.command()
-@click.option(
- '--url',
- default=_url,
- help='HTTP URL of marketstore instance'
-)
-@click.argument('names', nargs=-1)
-@click.pass_obj
-def ms_destroy(config: dict, names: List[str], url: str) -> None:
- """Destroy symbol entries in the local marketstore instance.
- """
- async def main():
- nonlocal names
- async with get_client(url) as client:
-
- if not names:
- names = await client.list_symbols()
-
- # default is to wipe db entirely.
- answer = input(
- "This will entirely wipe you local marketstore db @ "
- f"{url} of the following symbols:\n {pformat(names)}"
- "\n\nDelete [N/y]?\n")
-
- if answer == 'y':
- for sym in names:
- # tbk = _tick_tbk.format(sym)
- tbk = tuple(sym, *_tick_tbk_ids)
- print(f"Destroying {tbk}..")
- await client.destroy(mk_tbk(tbk))
- else:
- print("Nothing deleted.")
-
- tractor.run(main)
+# @cli.command()
+# @click.option(
+# '--url',
+# default=_url,
+# help='HTTP URL of marketstore instance'
+# )
+# @click.argument('names', nargs=-1)
+# @click.pass_obj
+# def ms_destroy(config: dict, names: List[str], url: str) -> None:
+# """Destroy symbol entries in the local marketstore instance.
+# """
+# async def main():
+# nonlocal names
+# async with get_client(url) as client:
+#
+# if not names:
+# names = await client.list_symbols()
+#
+# # default is to wipe db entirely.
+# answer = input(
+# "This will entirely wipe you local marketstore db @ "
+# f"{url} of the following symbols:\n {pformat(names)}"
+# "\n\nDelete [N/y]?\n")
+#
+# if answer == 'y':
+# for sym in names:
+# # tbk = _tick_tbk.format(sym)
+# tbk = tuple(sym, *_tick_tbk_ids)
+# print(f"Destroying {tbk}..")
+# await client.destroy(mk_tbk(tbk))
+# else:
+# print("Nothing deleted.")
+#
+# tractor.run(main)
@cli.command()
@@ -102,17 +105,19 @@ def ms_destroy(config: dict, names: List[str], url: str) -> None:
is_flag=True,
help='Enable tractor logging')
@click.option(
- '--url',
- default=_url,
- help='HTTP URL of marketstore instance'
+ '--host',
+ default='localhost'
+)
+@click.option(
+ '--port',
+ default=5995
)
-@click.argument('name', nargs=1, required=True)
@click.pass_obj
-def ms_shell(config, name, tl, url):
+def ms_shell(config, tl, host, port):
"""Start an IPython shell ready to query the local marketstore db.
"""
async def main():
- async with get_client(url) as client:
+ async with open_marketstore_client(host, port) as client:
query = client.query # noqa
# TODO: write magics to query marketstore
from IPython import embed
@@ -124,15 +129,9 @@ def ms_shell(config, name, tl, url):
@cli.command()
@click.option('--test-file', '-t', help='Test quote stream file')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
-@click.option('--tl', is_flag=True, help='Enable tractor logging')
-@click.option(
- '--url',
- default=_url,
- help='HTTP URL of marketstore instance'
-)
@click.argument('name', nargs=1, required=True)
@click.pass_obj
-def ingest(config, name, test_file, tl, url):
+def ingest(config, name, test_file, tl):
"""Ingest real-time broker quotes and ticks to a marketstore instance.
"""
# global opts
diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py
index 1d271452..3f75c994 100644
--- a/piker/data/marketstore.py
+++ b/piker/data/marketstore.py
@@ -14,7 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-"""
+'''
``marketstore`` integration.
- client management routines
@@ -22,7 +22,7 @@
- websocket client for subscribing to write triggers
- todo: tick sequence stream-cloning for testing
- todo: docker container management automation
-"""
+'''
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Callable, Tuple, Optional
import time
@@ -31,9 +31,9 @@ from math import isnan
import msgpack
import numpy as np
import pandas as pd
-import pymarketstore as pymkts
import tractor
from trio_websocket import open_websocket_url
+from anyio_marketstore import open_marketstore_client, MarketstoreClient
from ..log import get_logger, get_console_log
from ..data import open_feed
@@ -43,7 +43,7 @@ log = get_logger(__name__)
_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
-_url: str = 'http://localhost:5993/rpc'
+
_quote_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'),
@@ -51,34 +51,27 @@ _quote_dt = [
('IsTrade', 'i1'),
('IsBid', 'i1'),
- ('Price', 'f8'),
- ('Size', 'f8')
+ ('Price', 'f4'),
+ ('Size', 'f4')
]
-_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
-class MarketStoreError(Exception):
- "Generic marketstore client error"
-
-
-def err_on_resp(response: dict) -> None:
- """Raise any errors found in responses from client request.
+def mk_tbk(keys: Tuple[str, str, str]) -> str:
+ """Generate a marketstore table key from a tuple.
+ Converts,
+ ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
"""
- responses = response['responses']
- if responses is not None:
- for r in responses:
- err = r['error']
- if err:
- raise MarketStoreError(err)
+ return '{}/' + '/'.join(keys)
def quote_to_marketstore_structarray(
quote: Dict[str, Any],
- last_fill: Optional[float],
-
+ last_fill: Optional[float]
) -> np.array:
- """Return marketstore writeable structarray from quote ``dict``.
- """
+ '''
+ Return marketstore writeable structarray from quote ``dict``.
+ '''
+
if last_fill:
# new fill bby
now = timestamp(last_fill, unit='s')
@@ -112,82 +105,21 @@ def quote_to_marketstore_structarray(
def timestamp(date, **kwargs) -> int:
- """Return marketstore compatible 'Epoch' integer in nanoseconds
+ '''
+ Return marketstore compatible 'Epoch' integer in nanoseconds
from a date formatted str.
- """
+ '''
+
return int(pd.Timestamp(date, **kwargs).value)
-def mk_tbk(keys: Tuple[str, str, str]) -> str:
- """Generate a marketstore table key from a tuple.
-
- Converts,
- ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
- """
- return '{}/' + '/'.join(keys)
-
-
-class Client:
- """Async wrapper around the alpaca ``pymarketstore`` sync client.
-
- This will server as the shell for building out a proper async client
- that isn't horribly documented and un-tested..
- """
- def __init__(self, url: str):
- self._client = pymkts.Client(url)
-
- async def _invoke(
- self,
- meth: Callable,
- *args,
- **kwargs,
- ) -> Any:
- return err_on_resp(meth(*args, **kwargs))
-
- async def destroy(
- self,
- tbk: Tuple[str, str, str],
- ) -> None:
- return await self._invoke(self._client.destroy, mk_tbk(tbk))
-
- async def list_symbols(
- self,
- tbk: str,
- ) -> List[str]:
- return await self._invoke(self._client.list_symbols, mk_tbk(tbk))
-
- async def write(
- self,
- symbol: str,
- array: np.ndarray,
- ) -> None:
- start = time.time()
- await self._invoke(
- self._client.write,
- array,
- _tick_tbk.format(symbol),
- isvariablelength=True
- )
- log.debug(f"{symbol} write time (s): {time.time() - start}")
-
- def query(
- self,
- symbol,
- tbk: Tuple[str, str] = _tick_tbk_ids,
- ) -> pd.DataFrame:
- # XXX: causes crash
- # client.query(pymkts.Params(symbol, '*', 'OHCLV'
- result = self._client.query(
- pymkts.Params(symbol, *tbk),
- )
- return result.first().df()
-
-
@asynccontextmanager
async def get_client(
- url: str = _url,
-) -> Client:
- yield Client(url)
+ host: str = 'localhost',
+ port: int = 5995
+) -> MarketstoreClient:
+ async with open_marketstore_client(host, port) as client:
+ yield client
async def ingest_quote_stream(
@@ -196,8 +128,9 @@ async def ingest_quote_stream(
tries: int = 1,
actorloglevel: str = None,
) -> None:
- """Ingest a broker quote stream into marketstore in (sampled) tick format.
- """
+ '''
+ Ingest a broker quote stream into marketstore.
+ '''
async with (
open_feed(brokername, symbols, loglevel=actorloglevel) as feed,
get_client() as ms_client
@@ -212,107 +145,125 @@ async def ingest_quote_stream(
# okkk..
continue
- a = quote_to_marketstore_structarray({
+ array = quote_to_marketstore_structarray({
'IsTrade': 1 if ticktype == 'trade' else 0,
'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
'Price': tick.get('price'),
'Size': tick.get('size')
}, last_fill=quote.get('broker_ts', None))
- log.info(a)
- await ms_client.write(symbol, a)
-
+ await ms_client.write(
+ array, _tick_tbk)
+
async def stream_quotes(
symbols: List[str],
+ timeframe: str = '1Min',
+ attr_group: str = 'TICK',
host: str = 'localhost',
port: int = 5993,
- diff_cached: bool = True,
- loglevel: str = None,
+ loglevel: str = None
) -> None:
- """Open a symbol stream from a running instance of marketstore and
+ '''
+ Open a symbol stream from a running instance of marketstore and
log to console.
- """
- # XXX: required to propagate ``tractor`` loglevel to piker logging
- get_console_log(loglevel or tractor.current_actor().loglevel)
+ '''
- tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
+ tbks: Dict[str, str] = {
+ sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
- async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
- # send subs topics to server
- resp = await ws.send_message(
- msgpack.dumps({'streams': list(tbks.values())})
- )
- log.info(resp)
- async def recv() -> Dict[str, Any]:
- return msgpack.loads((await ws.get_message()), encoding='utf-8')
- streams = (await recv())['streams']
- log.info(f"Subscribed to {streams}")
-
- _cache = {}
-
- while True:
- msg = await recv()
-
- # unpack symbol and quote data
- # key is in format ``//``
- symbol = msg['key'].split('/')[0]
- data = msg['data']
-
- # calc time stamp(s)
- s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
- ts = s * 10**9 + ns
- data['broker_fill_time_ns'] = ts
-
- quote = {}
- for k, v in data.items():
- if isnan(v):
- continue
-
- quote[k.lower()] = v
-
- quote['symbol'] = symbol
-
- quotes = {}
-
- if diff_cached:
- last = _cache.setdefault(symbol, {})
- new = set(quote.items()) - set(last.items())
- if new:
- log.info(f"New quote {quote['symbol']}:\n{new}")
-
- # only ship diff updates and other required fields
- payload = {k: quote[k] for k, v in new}
- payload['symbol'] = symbol
-
- # if there was volume likely the last size of
- # shares traded is useful info and it's possible
- # that the set difference from above will disregard
- # a "size" value since the same # of shares were traded
- size = quote.get('size')
- volume = quote.get('volume')
- if size and volume:
- new_volume_since_last = max(
- volume - last.get('volume', 0), 0)
- log.warning(
- f"NEW VOLUME {symbol}:{new_volume_since_last}")
- payload['size'] = size
- payload['last'] = quote.get('last')
-
- # XXX: we append to a list for the options case where the
- # subscription topic (key) is the same for all
- # expiries even though this is uncessary for the
- # stock case (different topic [i.e. symbol] for each
- # quote).
- quotes.setdefault(symbol, []).append(payload)
-
- # update cache
- _cache[symbol].update(quote)
- else:
- quotes = {
- symbol: [{key.lower(): val for key, val in quote.items()}]}
-
- if quotes:
- yield quotes
+# async def stream_quotes(
+# symbols: List[str],
+# host: str = 'localhost',
+# port: int = 5993,
+# diff_cached: bool = True,
+# loglevel: str = None,
+# ) -> None:
+# """Open a symbol stream from a running instance of marketstore and
+# log to console.
+# """
+# # XXX: required to propagate ``tractor`` loglevel to piker logging
+# get_console_log(loglevel or tractor.current_actor().loglevel)
+#
+# tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
+#
+# async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
+# # send subs topics to server
+# resp = await ws.send_message(
+# msgpack.dumps({'streams': list(tbks.values())})
+# )
+# log.info(resp)
+#
+# async def recv() -> Dict[str, Any]:
+# return msgpack.loads((await ws.get_message()), encoding='utf-8')
+#
+# streams = (await recv())['streams']
+# log.info(f"Subscribed to {streams}")
+#
+# _cache = {}
+#
+# while True:
+# msg = await recv()
+#
+# # unpack symbol and quote data
+# # key is in format ``//``
+# symbol = msg['key'].split('/')[0]
+# data = msg['data']
+#
+# # calc time stamp(s)
+# s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
+# ts = s * 10**9 + ns
+# data['broker_fill_time_ns'] = ts
+#
+# quote = {}
+# for k, v in data.items():
+# if isnan(v):
+# continue
+#
+# quote[k.lower()] = v
+#
+# quote['symbol'] = symbol
+#
+# quotes = {}
+#
+# if diff_cached:
+# last = _cache.setdefault(symbol, {})
+# new = set(quote.items()) - set(last.items())
+# if new:
+# log.info(f"New quote {quote['symbol']}:\n{new}")
+#
+# # only ship diff updates and other required fields
+# payload = {k: quote[k] for k, v in new}
+# payload['symbol'] = symbol
+#
+# # if there was volume likely the last size of
+# # shares traded is useful info and it's possible
+# # that the set difference from above will disregard
+# # a "size" value since the same # of shares were traded
+# size = quote.get('size')
+# volume = quote.get('volume')
+# if size and volume:
+# new_volume_since_last = max(
+# volume - last.get('volume', 0), 0)
+# log.warning(
+# f"NEW VOLUME {symbol}:{new_volume_since_last}")
+# payload['size'] = size
+# payload['last'] = quote.get('last')
+#
+# # XXX: we append to a list for the options case where the
+# # subscription topic (key) is the same for all
+# # expiries even though this is uncessary for the
+# # stock case (different topic [i.e. symbol] for each
+# # quote).
+# quotes.setdefault(symbol, []).append(payload)
+#
+# # update cache
+# _cache[symbol].update(quote)
+# else:
+# quotes = {
+# symbol: [{key.lower(): val for key, val in quote.items()}]}
+#
+# if quotes:
+# yield quotes