From a6c692fb8b4fa7acfecfd6f6e0cf8bd1d692dd70 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 26 May 2020 14:34:22 -0400 Subject: [PATCH 01/24] Add support for TICK ingest to marketstore --- piker/data/__init__.py | 8 ++ piker/data/marketstore.py | 256 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 264 insertions(+) create mode 100644 piker/data/__init__.py create mode 100644 piker/data/marketstore.py diff --git a/piker/data/__init__.py b/piker/data/__init__.py new file mode 100644 index 00000000..a4633908 --- /dev/null +++ b/piker/data/__init__.py @@ -0,0 +1,8 @@ +""" +Data infra tooling and automation. + + +``piker`` ships with timeseries database integrations +for retrieving and storing data from your brokers as well +as sharing your feeds with other fellow pikers. +""" diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py new file mode 100644 index 00000000..a2281f71 --- /dev/null +++ b/piker/data/marketstore.py @@ -0,0 +1,256 @@ +""" +``marketstore`` integration. +""" +from pprint import pformat +from typing import Dict, Any +from functools import partial + +import numpy as np +import pandas as pd +import pymarketstore as pymkts +import click +import tractor + +from ..cli import cli +from .. import watchlists as wl +from ..brokers.data import DataFeed +from ..log import get_logger +from ..brokers.core import maybe_spawn_brokerd_as_subactor + + +log = get_logger(__name__) + +_tick_tbk_ids = ('1Sec', 'TICK') +_tick_tbk = '{}/' + '/'.join(_tick_tbk_ids) + + +_quote_dt = [ + ('Epoch', 'i8'), + ('Nanoseconds', 'i4'), + ('Tick', 'i4'), + # ('fill_time', 'f4'), + ('Last', 'f4'), + ('Bid', 'f4'), + ('Bsize', 'i8'), + ('Asize', 'i8'), + ('Ask', 'f4'), + ('Size', 'i8'), + ('Volume', 'i8'), + # ('VWAP', 'f4') +] + + +_tick_map = { + 'Up': 1, + 'Equal': 0, + 'Down': -1 +} + + +class MarketStoreError(Exception): + "Generic marketstore client error" + + +def err_on_resp(response: dict) -> None: + """Raise any errors found in responses from client request. + """ + responses = response['responses'] + if responses is not None: + for r in responses: + err = r['error'] + if err: + raise MarketStoreError(err) + + +def quote_to_marketstore_structarray( + quote: Dict[str, Any], + last_fills: Dict[str, str], +) -> np.array: + """Return marketstore writeable recarray from quote ``dict``. + """ + # pack into List[Tuple[str, Any]] + array_input = [] + + # this should get inserted by the broker-client to subtract from + # IPC latency + now = timestamp(pd.Timestamp.now()) + + sym = quote['symbol'] + last_fill_time = quote['fill_time'] + if last_fills.get(sym) != last_fill_time: + # new fill + now = timestamp(last_fill_time) + last_fills[sym] = last_fill_time + + secs, ns = now / 10**9, now % 10**9 + # insert 'Epoch' entry first + array_input.append(int(secs)) + + # insert 'Nanoseconds' field + array_input.append(int(ns)) + + # tick mapping to int + array_input.append(_tick_map[quote['tick']]) + + # append remaining fields + for name, dt in _quote_dt[3:]: + array_input.append(quote[name.casefold()]) + + return np.array([tuple(array_input)], dtype=_quote_dt) + + +def timestamp(datestr: str) -> int: + """Return marketstore compatible 'Epoch' integer in nanoseconds. + """ + return int(pd.Timestamp(datestr).value) + + +@cli.command() +@click.option('--test-file', '-t', help='Test quote stream file') +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option( + '--url', + default='http://localhost:5993/rpc', + help='HTTP URL of marketstore instance' +) +@click.argument('name', nargs=1, required=True) +@click.pass_obj +def ingest(config, name, test_file, tl, url): + """Ingest real-time broker quotes and ticks to a marketstore instance. + """ + # global opts + brokermod = config['brokermod'] + loglevel = config['loglevel'] + log = config['log'] + + watchlist_from_file = wl.ensure_watchlists(config['wl_path']) + watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) + symbols = watchlists[name] + + async def main(tries): + async with maybe_spawn_brokerd_as_subactor( + tries=tries, + loglevel=loglevel + ) as portal: + # connect to broker data feed + feed = DataFeed(portal, brokermod) + qstream, quotes = await feed.open_stream( + symbols, + 'stock', + rate=3, + test=test_file, + ) + + first_quotes, _ = feed.format_quotes(quotes) + + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") + return + + client = pymkts.Client(endpoint=url) + + # keep track of last executed fill for each symbol + last_fills = {} + + # start ingest to marketstore + async for quotes in qstream: + for symbol, quote in quotes.items(): + breakpoint() + fmt_quote, _ = brokermod.format_stock_quote( + quote, + feed._symbol_data_cache + ) + a = quote_to_marketstore_structarray(fmt_quote, last_fills) + # start = time.time() + # err_on_resp(client.write( + # a, _tick_tbk.format(symbol), isvariablelength=True) + # ) + # log.trace( + # f"{symbol} write time (s): {time.time() - start}") + + tractor.run( + partial(main, tries=1), + name='ingest_marketstore', + loglevel=loglevel if tl else None, + # start_method='forkserver', + ) + + +@cli.command() +@click.option( + '--tl', + is_flag=True, + help='Enable tractor logging') +@click.option( + '--url', + default='http://localhost:5993/rpc', + help='HTTP URL of marketstore instance' +) +@click.argument('name', nargs=1, required=True) +@click.pass_obj +def ms_shell(config, name, tl, url): + """Start an IPython shell ready to query the local marketstore db. + """ + client = pymkts.Client(url) + + def query(name): + return client.query( + pymkts.Params(name, *_tick_tbk_ids)).first().df() + + # causes crash + # client.query(pymkts.Params(symbol, '*', 'OHCLV' + + from IPython import embed + embed() + + +@cli.command() +@click.option( + '--url', + default='http://localhost:5993/rpc', + help='HTTP URL of marketstore instance' +) +@click.argument('names', nargs=-1) +@click.pass_obj +def marketstore_destroy(config, names, url): + """Destroy symbol entries in the local marketstore instance. + """ + client = pymkts.Client(url) + if not names: + names = client.list_symbols() + + # default is to wipe db entirely. + answer = input( + "This will entirely wipe you local marketstore db @ " + f"{url} of the following symbols:\n {pformat(names)}" + "\n\nDelete [N/y]?\n") + + if answer == 'y': + for sym in names: + tbk = _tick_tbk.format(sym) + print(f"Destroying {tbk}..") + err_on_resp(client.destroy(_tick_tbk.format(sym))) + else: + print("Nothing deleted.") + + +@cli.command() +@click.option( + '--url', + default='ws://localhost:5993/ws', + help='HTTP URL of marketstore instance' +) +@click.argument('names', nargs=-1) +@click.pass_obj +def marketstore_stream(config, names, url): + """Destroy symbol entries in the local marketstore instance. + """ + symbol = 'APHA' + conn = pymkts.StreamConn('ws://localhost:5993/ws') + + @conn.on(r'^{}/'.format(symbol)) + def on_tsla(conn, msg): + print(f'received {symbol}', msg['data']) + + conn.run(['APHA/*/*']) # runs until exception From 213a19b1913bf6f25a132a708e823a52ed2d2b91 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 26 May 2020 14:43:18 -0400 Subject: [PATCH 02/24] Update version and deps --- setup.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/setup.py b/setup.py index 1bb57932..81e3d56a 100755 --- a/setup.py +++ b/setup.py @@ -30,7 +30,6 @@ setup( license='AGPLv3', author='Tyler Goodlet', maintainer='Tyler Goodlet', - maintainer_email='tgoodlet@gmail.com', url='https://github.com/pikers/piker', platforms=['linux'], packages=[ @@ -46,13 +45,27 @@ setup( ] }, install_requires=[ - 'click', 'colorlog', 'trio', 'attrs', 'async_generator', - 'pygments', 'cython', 'asks', 'pandas', 'msgpack', + 'click', + 'colorlog', + 'trio', + 'attrs', + 'async_generator', + 'pygments', + + # brokers + 'asks', + 'ib_insync', + + # numerics + 'arrow', # better datetimes + 'cython', + 'numpy', + 'pandas', + + # tsdbs + 'pymarketstore', #'kivy', see requirement.txt; using a custom branch atm ], - extras_require={ - 'questrade': ['asks'], - }, tests_require=['pytest'], python_requires=">=3.7", # literally for ``datetime.datetime.fromisoformat``... keywords=["async", "trading", "finance", "quant", "charting"], @@ -61,7 +74,7 @@ setup( 'License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)', 'Operating System :: POSIX :: Linux', "Programming Language :: Python :: Implementation :: CPython", - "Programming Language :: Python :: Implementation :: PyPy", + # "Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", From 335cee63b28e2af4cfe6329a2c86ccbad650655e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 31 May 2020 20:05:06 -0400 Subject: [PATCH 03/24] Make stock quote formatter work with diff streams --- piker/brokers/questrade.py | 46 +++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 9d2d3892..d12d3e64 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -6,6 +6,7 @@ import inspect import time from datetime import datetime from functools import partial +from itertools import chain import configparser from typing import List, Tuple, Dict, Any, Iterator, NamedTuple @@ -839,29 +840,43 @@ def format_stock_quote( and the second is the same but with all values converted to a "display-friendly" string format. """ - last = quote['lastTradePrice'] symbol = quote['symbol'] previous = symbol_data[symbol]['prevDayClosePrice'] - change = percent_change(previous, last) - share_count = symbol_data[symbol].get('outstandingShares', None) - mktcap = share_count * last if (last and share_count) else 0 - computed = { - 'symbol': quote['symbol'], - '%': round(change, 3), - 'MC': mktcap, - # why QT do you have to be an asshole shipping null values!!! - '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3), - 'close': previous, - } + + computed = {'symbol': symbol} + last = quote.get('lastTradePrice') + if last: + change = percent_change(previous, last) + share_count = symbol_data[symbol].get('outstandingShares', None) + mktcap = share_count * last if (last and share_count) else 0 + computed.update({ + # 'symbol': quote['symbol'], + '%': round(change, 3), + 'MC': mktcap, + # why questrade do you have to be an asshole shipping null values!!! + # '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3), + 'close': previous, + }) + + vwap = quote.get('VWAP') + volume = quote.get('volume') + if volume is not None: # could be 0 + # why questrade do you have to be an asshole shipping null values!!! + computed['$ vol'] = round((vwap or 0) * (volume or 0), 3) + new = {} displayable = {} - for key, new_key in keymap.items(): - display_value = value = computed.get(key) or quote.get(key) + for key, value in chain(quote.items(), computed.items()): + new_key = keymap.get(key) + if not new_key: + continue # API servers can return `None` vals when markets are closed (weekend) value = 0 if value is None else value + display_value = value + # convert values to a displayble format using available formatting func if isinstance(new_key, tuple): new_key, func = new_key @@ -870,6 +885,7 @@ def format_stock_quote( new[new_key] = value displayable[new_key] = display_value + return new, displayable @@ -939,7 +955,7 @@ def format_option_quote( "display-friendly" string format. """ # TODO: need historical data.. - # (cause why would QT keep their quote structure consistent across + # (cause why would questrade keep their quote structure consistent across # assets..) # previous = symbol_data[symbol]['prevDayClosePrice'] # change = percent_change(previous, last) From d66cfb8fa04871709f777067f41e84da643df808 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 31 May 2020 22:36:47 -0400 Subject: [PATCH 04/24] Push only new key value pairs over quote streams This is something I've been meaning to try for a while and will likely make writing tick data to a db more straight forward (filling in NaN values is more matter of fact) plus it should minimize bandwidth usage. Note, it'll require stream consumers to be considerate of non-full quotes arriving and thus using the first "full" quote message to fill out dynamically formatted systems or displays. --- piker/brokers/data.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 8d03d684..4b7dfd41 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -130,17 +130,33 @@ async def stream_requests( for quote in quotes: symbol = quote['symbol'] last = _cache.setdefault(symbol, {}) + + # find all keys that have match to a new value compared + # to the last quote received new = set(quote.items()) - set(last.items()) if new: log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote + + # only ship diff updates and other required fields + payload['symbol'] = symbol + payload = {k: quote[k] for k, v in new} + + # if there was volume likely the last size of + # shares traded is useful info and it's possible + # that the set difference from above will disregard + # a "size" value since the same # of shares were traded + size = quote.get('size') + if size and 'volume' in payload: + payload['size'] = size + # XXX: we append to a list for the options case where the # subscription topic (key) is the same for all # expiries even though this is uncessary for the # stock case (different topic [i.e. symbol] for each # quote). - new_quotes.setdefault(quote['key'], []).append(quote) + new_quotes.setdefault(quote['key'], []).append(payload) else: # log.debug(f"Delivering quotes:\n{quotes}") for quote in quotes: From 3cfb15ed6e060069425c21adde9036468bf123a3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 31 May 2020 22:59:37 -0400 Subject: [PATCH 05/24] Make monitor handle non-full quote messages --- piker/ui/monitor.py | 21 +++++++++++++-------- piker/ui/tabular.py | 3 ++- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/piker/ui/monitor.py b/piker/ui/monitor.py index a7778fbf..cd73c3fa 100644 --- a/piker/ui/monitor.py +++ b/piker/ui/monitor.py @@ -51,23 +51,25 @@ async def update_quotes( chngcell = row.get_cell('%') # determine daily change color - color = colorcode('gray') percent_change = record.get('%') - if percent_change: - daychange = float(record['%']) + if percent_change is not None and percent_change != chngcell: + daychange = float(percent_change) if daychange < 0.: color = colorcode('red2') elif daychange > 0.: color = colorcode('forestgreen') + else: + color = colorcode('gray') - # update row header and '%' cell text color - if chngcell: - chngcell.color = color - hdrcell.color = color # if the cell has been "highlighted" make sure to change its color if hdrcell.background_color != [0]*4: hdrcell.background_color = color + # update row header and '%' cell text color + chngcell.color = color + hdrcell.color = color + + # briefly highlight bg of certain cells on each trade execution unflash = set() tick_color = None @@ -123,10 +125,13 @@ async def update_quotes( record, displayable = formatter( quote, symbol_data=symbol_data) + # don't red/green the header cell in ``row.update()`` + record.pop('symbol') + # determine if sorting should happen sort_key = table.sort_key - new = record[sort_key] last = row.get_field(sort_key) + new = record.get(sort_key, last) if new != last: to_sort.add(row.widget) diff --git a/piker/ui/tabular.py b/piker/ui/tabular.py index d995848a..27c1e091 100644 --- a/piker/ui/tabular.py +++ b/piker/ui/tabular.py @@ -340,6 +340,7 @@ class Row(HoverBehavior, GridLayout): gray = colorcode('gray') fgreen = colorcode('forestgreen') red = colorcode('red2') + for key, val in record.items(): last = self.get_field(key) color = gray @@ -361,7 +362,7 @@ class Row(HoverBehavior, GridLayout): if color != gray: cells[key] = cell - self._last_record = record + self._last_record.update(record) return cells # mouse over handlers From acd32341e25a41cc84928a8b48e5dd356669fc9c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 1 Jun 2020 14:01:55 -0400 Subject: [PATCH 06/24] Fix assignment out of order --- piker/brokers/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 4b7dfd41..48856739 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -140,8 +140,8 @@ async def stream_requests( _cache[symbol] = quote # only ship diff updates and other required fields - payload['symbol'] = symbol payload = {k: quote[k] for k, v in new} + payload['symbol'] = symbol # if there was volume likely the last size of # shares traded is useful info and it's possible From 436e4d2df48e61937ed7462f3964add7b8b440b7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 1 Jun 2020 14:13:30 -0400 Subject: [PATCH 07/24] Add tbk tick streaming with trio-websocket --- piker/data/marketstore.py | 131 +++++++++++++++++++++++++------------- 1 file changed, 87 insertions(+), 44 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index a2281f71..c6b5cd4e 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -1,15 +1,23 @@ """ ``marketstore`` integration. + +- TICK data ingest routines +- websocket client for subscribing to write triggers +- docker container management automation """ from pprint import pformat -from typing import Dict, Any +from typing import Dict, Any, List from functools import partial +import time +import msgpack import numpy as np import pandas as pd import pymarketstore as pymkts import click +import trio import tractor +from trio_websocket import open_websocket_url from ..cli import cli from .. import watchlists as wl @@ -38,12 +46,14 @@ _quote_dt = [ ('Volume', 'i8'), # ('VWAP', 'f4') ] +_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) _tick_map = { 'Up': 1, 'Equal': 0, - 'Down': -1 + 'Down': -1, + None: np.nan, } @@ -64,37 +74,36 @@ def err_on_resp(response: dict) -> None: def quote_to_marketstore_structarray( quote: Dict[str, Any], - last_fills: Dict[str, str], + last_fill: str, ) -> np.array: - """Return marketstore writeable recarray from quote ``dict``. + """Return marketstore writeable structarray from quote ``dict``. """ + if last_fill: + # new fill bby + now = timestamp(last_fill) + else: + # this should get inserted upstream by the broker-client to + # subtract from IPC latency + now = timestamp(pd.Timestamp.now()) + + secs, ns = now / 10**9, now % 10**9 + # pack into List[Tuple[str, Any]] array_input = [] - # this should get inserted by the broker-client to subtract from - # IPC latency - now = timestamp(pd.Timestamp.now()) - - sym = quote['symbol'] - last_fill_time = quote['fill_time'] - if last_fills.get(sym) != last_fill_time: - # new fill - now = timestamp(last_fill_time) - last_fills[sym] = last_fill_time - - secs, ns = now / 10**9, now % 10**9 # insert 'Epoch' entry first array_input.append(int(secs)) - # insert 'Nanoseconds' field array_input.append(int(ns)) - # tick mapping to int - array_input.append(_tick_map[quote['tick']]) - # append remaining fields - for name, dt in _quote_dt[3:]: - array_input.append(quote[name.casefold()]) + for name, dt in _quote_dt[2:]: + if 'f' in dt: + none = np.nan + else: + none = 0 + val = quote.get(name.casefold(), none) + array_input.append(val) return np.array([tuple(array_input)], dtype=_quote_dt) @@ -148,26 +157,43 @@ def ingest(config, name, test_file, tl, url): log.error("Broker API is down temporarily") return - client = pymkts.Client(endpoint=url) + quote_cache = {quote['symbol']: quote for quote in first_quotes} - # keep track of last executed fill for each symbol - last_fills = {} + client = pymkts.Client(endpoint=url) # start ingest to marketstore async for quotes in qstream: for symbol, quote in quotes.items(): - breakpoint() fmt_quote, _ = brokermod.format_stock_quote( quote, feed._symbol_data_cache ) - a = quote_to_marketstore_structarray(fmt_quote, last_fills) - # start = time.time() - # err_on_resp(client.write( - # a, _tick_tbk.format(symbol), isvariablelength=True) - # ) - # log.trace( - # f"{symbol} write time (s): {time.time() - start}") + + # remap tick strs to ints + fmt_quote['tick'] = _tick_map[ + fmt_quote.get('tick', 'Equal') + ] + + # check for volume update (i.e. did trades happen + # since last quote) + new_vol = fmt_quote.get('volume', None) + if new_vol is None: + log.info(f"NO trades for {symbol}") + if new_vol == quote_cache.get('volume'): + log.error( + f"{symbol}: got same volume as last quote?") + + a = quote_to_marketstore_structarray( + fmt_quote, + # TODO: check this closer to the broker query api + last_fill=fmt_quote.get('last_fill', '') + ) + start = time.time() + err_on_resp(client.write( + a, _tick_tbk.format(symbol), isvariablelength=True) + ) + log.trace( + f"{symbol} write time (s): {time.time() - start}") tractor.run( partial(main, tries=1), @@ -213,7 +239,7 @@ def ms_shell(config, name, tl, url): ) @click.argument('names', nargs=-1) @click.pass_obj -def marketstore_destroy(config, names, url): +def marketstore_destroy(config: dict, names: List[str], url: str) -> None: """Destroy symbol entries in the local marketstore instance. """ client = pymkts.Client(url) @@ -235,6 +261,29 @@ def marketstore_destroy(config, names, url): print("Nothing deleted.") +async def open_quote_stream( + tbks: List[str], + host: str = 'localhost', + port: int = 5993 +) -> None: + """Open a symbol stream from a running instance of marketstore and + log to console. + """ + async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: + # send subs topics to server + await ws.send_message(msgpack.dumps({'streams': tbks})) + + async def recv() -> Dict[str, Any]: + return msgpack.loads((await ws.get_message()), encoding='utf-8') + + streams = (await recv())['streams'] + log.info(f"Subscribed to {streams}") + + while True: + msg = await recv() + log.info(f"Received quote:\n{msg}") + + @cli.command() @click.option( '--url', @@ -243,14 +292,8 @@ def marketstore_destroy(config, names, url): ) @click.argument('names', nargs=-1) @click.pass_obj -def marketstore_stream(config, names, url): - """Destroy symbol entries in the local marketstore instance. +def marketstore_stream(config: dict, names: List[str], url: str): + """Connect to a marketstore time bucket stream for (a set of) symbols(s) + and print to console. """ - symbol = 'APHA' - conn = pymkts.StreamConn('ws://localhost:5993/ws') - - @conn.on(r'^{}/'.format(symbol)) - def on_tsla(conn, msg): - print(f'received {symbol}', msg['data']) - - conn.run(['APHA/*/*']) # runs until exception + trio.run(open_quote_stream, names) From bc9af977a4f9145f28a8283c7634302945187097 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 2 Jun 2020 10:18:57 -0400 Subject: [PATCH 08/24] Update quote cache on each loop --- piker/data/marketstore.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index c6b5cd4e..b6a8b864 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -183,6 +183,8 @@ def ingest(config, name, test_file, tl, url): log.error( f"{symbol}: got same volume as last quote?") + quote_cache.update(fmt_quote) + a = quote_to_marketstore_structarray( fmt_quote, # TODO: check this closer to the broker query api From 31b4d5c6a774b6ed99a7f88729c6290d056ef4c7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 3 Jun 2020 12:09:55 -0400 Subject: [PATCH 09/24] Add glue link in readme --- README.rst | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index ce7352f9..6d42a3d6 100644 --- a/README.rst +++ b/README.rst @@ -9,8 +9,8 @@ trading and financial analysis targetted at hardcore Linux users. It tries to use as much bleeding edge tech as possible including (but not limited to): -- Python 3.7+ for glue and business logic -- trio_ and asyncio_ for async +- Python 3.7+ for glue_ and business logic +- trio_ and `asyncio` for async - tractor_ as the underlying actor model - marketstore_ for historical and real-time tick data persistence and sharing - techtonicdb_ for L2 book storage @@ -23,6 +23,7 @@ It tries to use as much bleeding edge tech as possible including (but not limite .. _marketstore: https://github.com/alpacahq/marketstore .. _techtonicdb: https://github.com/0b01/tectonicdb .. _Qt: https://www.qt.io/ +.. _glue: https://numpy.org/doc/stable/user/c-info.python-as-glue.html#using-python-as-glue Focus and Features: From 167c9089f90d9b23bf0d2537745a2472dd963223 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jun 2020 13:56:18 -0400 Subject: [PATCH 10/24] Who needs it ;P --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 6d42a3d6..b5b71d2f 100644 --- a/README.rst +++ b/README.rst @@ -10,7 +10,7 @@ trading and financial analysis targetted at hardcore Linux users. It tries to use as much bleeding edge tech as possible including (but not limited to): - Python 3.7+ for glue_ and business logic -- trio_ and `asyncio` for async +- trio_ for async - tractor_ as the underlying actor model - marketstore_ for historical and real-time tick data persistence and sharing - techtonicdb_ for L2 book storage From f6f6d98a956e5ca3d691aa8b7f8812bf91b91429 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Jun 2020 14:23:37 -0400 Subject: [PATCH 11/24] Allow passing in tbk keys to query --- piker/data/marketstore.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index b6a8b864..b1c6463d 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -178,7 +178,7 @@ def ingest(config, name, test_file, tl, url): # since last quote) new_vol = fmt_quote.get('volume', None) if new_vol is None: - log.info(f"NO trades for {symbol}") + log.debug(f"No fills for {symbol}") if new_vol == quote_cache.get('volume'): log.error( f"{symbol}: got same volume as last quote?") @@ -222,9 +222,9 @@ def ms_shell(config, name, tl, url): """ client = pymkts.Client(url) - def query(name): + def query(name, tbk=_tick_tbk_ids): return client.query( - pymkts.Params(name, *_tick_tbk_ids)).first().df() + pymkts.Params(name, *tbk)).first().df() # causes crash # client.query(pymkts.Params(symbol, '*', 'OHCLV' From 57a8db8cba0701360830e9a9ada245c5e9bd120f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Jul 2020 08:22:09 -0400 Subject: [PATCH 12/24] Start enforcing a common stream setup api Add routines for brokerd spawning and quote stream creation. --- piker/data/__init__.py | 91 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 86 insertions(+), 5 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index a4633908..af05da36 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -1,8 +1,89 @@ """ -Data infra tooling and automation. +Data feed apis and infra. + +We ship some tsdb integrations for retrieving +and storing data from your brokers as well as +sharing your feeds with other fellow pikers. +""" +from contextlib import asynccontextmanager +from typing import ( + Dict, List, Any, + Sequence, AsyncIterator, Optional +) + +import tractor + +from ..brokers import get_brokermod +from ..log import get_logger -``piker`` ships with timeseries database integrations -for retrieving and storing data from your brokers as well -as sharing your feeds with other fellow pikers. -""" +log = get_logger(__name__) + + +_data_mods = [ + 'piker.brokers.core', + 'piker.brokers.data', +] + + +@asynccontextmanager +async def maybe_spawn_brokerd( + brokername: str, + sleep: float = 0.5, + tries: int = 10, + loglevel: Optional[str] = None, + expose_mods: List = [], + **tractor_kwargs, +) -> tractor._portal.Portal: + """If no ``brokerd.{brokername}`` daemon-actor can be found, + spawn one in a local subactor and return a portal to it. + """ + brokermod = get_brokermod(brokername) + dname = f'brokerd.{brokername}' + async with tractor.find_actor(dname) as portal: + # WTF: why doesn't this work? + log.info(f"YOYOYO {__name__}") + if portal is not None: + yield portal + else: + log.info(f"Spawning {brokername} broker daemon") + tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) + async with tractor.open_nursery() as nursery: + try: + # spawn new daemon + portal = await nursery.start_actor( + dname, + rpc_module_paths=_data_mods + [brokermod.__name__], + loglevel=loglevel, + **tractor_kwargs + ) + async with tractor.wait_for_actor(dname) as portal: + yield portal + finally: + # client code may block indefinitely so cancel when + # teardown is invoked + await nursery.cancel() + + +@asynccontextmanager +async def open_feed( + name: str, + symbols: Sequence[str], +) -> AsyncIterator[Dict[str, Any]]: + try: + mod = get_brokermod(name) + except ImportError: + # TODO: try to pull up ingest feeds + # - market store + # - influx + raise + + async with maybe_spawn_brokerd( + mod.name, + ) as portal: + stream = await portal.run( + mod.__name__, + 'stream_quotes', + symbols=symbols, + ) + yield stream From 78784a4bf384ed6864f236eaff1f91ac2033ae24 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Jul 2020 08:28:13 -0400 Subject: [PATCH 13/24] Port to new data apis --- piker/brokers/cli.py | 9 +-- piker/brokers/core.py | 61 +++++++++---------- piker/brokers/data.py | 24 +++++--- piker/cli/__init__.py | 13 ++-- piker/data/marketstore.py | 4 +- piker/ui/cli.py | 124 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 187 insertions(+), 48 deletions(-) create mode 100644 piker/ui/cli.py diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 370722b6..f47a7b25 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -14,7 +14,7 @@ import tractor from ..cli import cli from .. import watchlists as wl from ..log import get_console_log, colorize_json, get_logger -from ..brokers.core import maybe_spawn_brokerd_as_subactor +from ..data import maybe_spawn_brokerd from ..brokers import core, get_brokermod, data log = get_logger('cli') @@ -99,7 +99,7 @@ def quote(config, tickers, df_output): @cli.command() @click.option('--df-output', '-df', flag_value=True, help='Output in `pandas.DataFrame` format') -@click.option('--count', '-c', default=100, +@click.option('--count', '-c', default=1000, help='Number of bars to retrieve') @click.argument('symbol', required=True) @click.pass_obj @@ -117,10 +117,11 @@ def bars(config, symbol, count, df_output): brokermod, symbol, count=count, + as_np=df_output ) ) - if not bars: + if not len(bars): log.error(f"No quotes could be found for {symbol}?") return @@ -198,7 +199,7 @@ def record(config, rate, name, dhost, filename): return async def main(tries): - async with maybe_spawn_brokerd_as_subactor( + async with maybe_spawn_brokerd( tries=tries, loglevel=loglevel ) as portal: # run app "main" diff --git a/piker/brokers/core.py b/piker/brokers/core.py index d2c958e6..6162b4e5 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -1,23 +1,18 @@ """ -Broker high level API layer. +Broker high level cross-process API layer. + +This API should be kept "remote service compatible" meaning inputs to +routines should be primitive data types where possible. """ import inspect from types import ModuleType from typing import List, Dict, Any, Optional -from async_generator import asynccontextmanager -import tractor - from ..log import get_logger -from .data import DataFeed from . import get_brokermod -log = get_logger('broker.core') -_data_mods = [ - 'piker.brokers.core', - 'piker.brokers.data', -] +log = get_logger(__name__) async def api(brokername: str, methname: str, **kwargs) -> dict: @@ -25,12 +20,11 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: """ brokermod = get_brokermod(brokername) async with brokermod.get_client() as client: - - meth = getattr(client.api, methname, None) + meth = getattr(client, methname, None) if meth is None: log.warning( f"Couldn't find API method {methname} looking up on client") - meth = getattr(client, methname, None) + meth = getattr(client.api, methname, None) if meth is None: log.error(f"No api method `{methname}` could be found?") @@ -48,24 +42,6 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: return await meth(**kwargs) -@asynccontextmanager -async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None): - """If no ``brokerd`` daemon-actor can be found spawn one in a - local subactor. - """ - async with tractor.open_nursery() as nursery: - async with tractor.find_actor('brokerd') as portal: - if not portal: - log.info( - "No broker daemon could be found, spawning brokerd..") - portal = await nursery.start_actor( - 'brokerd', - rpc_module_paths=_data_mods, - loglevel=loglevel, - ) - yield portal - - async def stocks_quote( brokermod: ModuleType, tickers: List[str] @@ -121,3 +97,26 @@ async def bars( """ async with brokermod.get_client() as client: return await client.bars(symbol, **kwargs) + + +async def symbol_info( + brokermod: ModuleType, + symbol: str, + **kwargs, +) -> Dict[str, Dict[str, Dict[str, Any]]]: + """Return symbol info from broker. + """ + async with brokermod.get_client() as client: + return await client.symbol_info(symbol, **kwargs) + + +async def symbol_search( + brokermod: ModuleType, + pattern: str, + **kwargs, +) -> Dict[str, Dict[str, Dict[str, Any]]]: + """Return symbol info from broker. + """ + async with brokermod.get_client() as client: + # TODO: support multiple asset type concurrent searches. + return await client.search_stocks(pattern=pattern, **kwargs) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 48856739..15e90238 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -25,7 +25,7 @@ from ..log import get_logger, get_console_log from . import get_brokermod -log = get_logger('broker.data') +log = get_logger(__name__) async def wait_for_network( @@ -80,7 +80,7 @@ class BrokerFeed: @tractor.msg.pub(tasks=['stock', 'option']) -async def stream_requests( +async def stream_poll_requests( get_topics: typing.Callable, get_quotes: Coroutine, feed: BrokerFeed, @@ -90,6 +90,12 @@ async def stream_requests( """Stream requests for quotes for a set of symbols at the given ``rate`` (per second). + This routine is built for brokers who support quote polling for multiple + symbols per request. The ``get_topics()`` func is called to retreive the + set of symbols each iteration and ``get_quotes()`` is to retreive + the quotes. + + A stock-broker client ``get_quotes()`` async function must be provided which returns an async quote retrieval function. """ @@ -327,7 +333,7 @@ async def start_quote_stream( # push initial smoke quote response for client initialization await ctx.send_yield(payload) - await stream_requests( + await stream_poll_requests( # ``msg.pub`` required kwargs task_name=feed_type, @@ -394,15 +400,19 @@ class DataFeed: # subscribe for tickers (this performs a possible filtering # where invalid symbols are discarded) sd = await self.portal.run( - "piker.brokers.data", 'symbol_data', - broker=self.brokermod.name, tickers=symbols) + "piker.brokers.data", + 'symbol_data', + broker=self.brokermod.name, + tickers=symbols + ) self._symbol_data_cache.update(sd) if test: # stream from a local test file quote_gen = await self.portal.run( - "piker.brokers.data", 'stream_from_file', - filename=test + "piker.brokers.data", + 'stream_from_file', + filename=test, ) else: log.info(f"Starting new stream for {symbols}") diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index cc528f8a..1e7ceef5 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -8,7 +8,6 @@ import tractor from ..log import get_console_log, get_logger from ..brokers import get_brokermod, config -from ..brokers.core import _data_mods log = get_logger('cli') DEFAULT_BROKER = 'questrade' @@ -34,6 +33,7 @@ _context_defaults = dict( def pikerd(loglevel, host, tl): """Spawn the piker broker-daemon. """ + from ..data import _data_mods get_console_log(loglevel) tractor.run_daemon( rpc_module_paths=_data_mods, @@ -64,7 +64,12 @@ def cli(ctx, broker, loglevel, configdir): }) +def _load_clis() -> None: + from ..data import marketstore as _ + from ..brokers import cli as _ # noqa + from ..ui import cli as _ # noqa + from ..watchlists import cli as _ # noqa + + # load downstream cli modules -from ..brokers import cli as _ -from ..watchlists import cli as _ -from ..data import marketstore as _ +_load_clis() diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index b1c6463d..54dd0d7a 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -19,11 +19,11 @@ import trio import tractor from trio_websocket import open_websocket_url +from . import maybe_spawn_brokerd from ..cli import cli from .. import watchlists as wl from ..brokers.data import DataFeed from ..log import get_logger -from ..brokers.core import maybe_spawn_brokerd_as_subactor log = get_logger(__name__) @@ -138,7 +138,7 @@ def ingest(config, name, test_file, tl, url): symbols = watchlists[name] async def main(tries): - async with maybe_spawn_brokerd_as_subactor( + async with maybe_spawn_brokerd( tries=tries, loglevel=loglevel ) as portal: diff --git a/piker/ui/cli.py b/piker/ui/cli.py new file mode 100644 index 00000000..2be6d436 --- /dev/null +++ b/piker/ui/cli.py @@ -0,0 +1,124 @@ +""" +Console interface to UI components. +""" +from functools import partial +import os +import click +import tractor + +from ..cli import cli +from .. import watchlists as wl +from ..data import maybe_spawn_brokerd + + +_config_dir = click.get_app_dir('piker') +_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') + + +def _kivy_import_hack(): + # Command line hacks to make it work. + # See the pkg mod. + from .kivy import kivy # noqa + + +@cli.command() +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--rate', '-r', default=3, help='Quote rate limit') +@click.option('--test', '-t', help='Test quote stream file') +@click.option('--dhost', '-dh', default='127.0.0.1', + help='Daemon host address to connect to') +@click.argument('name', nargs=1, required=True) +@click.pass_obj +def monitor(config, rate, name, dhost, test, tl): + """Start a real-time watchlist UI + """ + # global opts + brokermod = config['brokermod'] + loglevel = config['loglevel'] + log = config['log'] + + watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) + watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) + tickers = watchlists[name] + if not tickers: + log.error(f"No symbols found for watchlist `{name}`?") + return + + _kivy_import_hack() + from .kivy.monitor import _async_main + + async def main(tries): + async with maybe_spawn_brokerd( + brokername=brokermod.name, + tries=tries, loglevel=loglevel + ) as portal: + # run app "main" + await _async_main( + name, portal, tickers, + brokermod, rate, test=test, + ) + + tractor.run( + partial(main, tries=1), + name='monitor', + loglevel=loglevel if tl else None, + rpc_module_paths=['piker.ui.kivy.monitor'], + start_method='forkserver', + ) + + +@cli.command() +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--date', '-d', help='Contracts expiry date') +@click.option('--test', '-t', help='Test quote stream file') +@click.option('--rate', '-r', default=1, help='Logging level') +@click.argument('symbol', required=True) +@click.pass_obj +def optschain(config, symbol, date, tl, rate, test): + """Start an option chain UI + """ + # global opts + loglevel = config['loglevel'] + brokername = config['broker'] + + _kivy_import_hack() + from .kivy.option_chain import _async_main + + async def main(tries): + async with maybe_spawn_brokerd( + tries=tries, loglevel=loglevel + ): + # run app "main" + await _async_main( + symbol, + brokername, + rate=rate, + loglevel=loglevel, + test=test, + ) + + tractor.run( + partial(main, tries=1), + name='kivy-options-chain', + loglevel=loglevel if tl else None, + start_method='forkserver', + ) + + +@cli.command() +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--date', '-d', help='Contracts expiry date') +@click.option('--test', '-t', help='Test quote stream file') +@click.option('--rate', '-r', default=1, help='Logging level') +@click.argument('symbol', required=True) +@click.pass_obj +def chart(config, symbol, date, tl, rate, test): + """Start an option chain UI + """ + from ._chart import main + + # global opts + loglevel = config['loglevel'] + brokername = config['broker'] + + main(sym=symbol, brokername=brokername, loglevel=loglevel) From 3c4699abefee02fe5ebe2b412d20fe8e4f46c4cc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Jul 2020 09:54:47 -0400 Subject: [PATCH 14/24] Pass broker name --- piker/data/marketstore.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 54dd0d7a..6f0d1bb9 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -139,6 +139,7 @@ def ingest(config, name, test_file, tl, url): async def main(tries): async with maybe_spawn_brokerd( + brokername=brokermod.name, tries=tries, loglevel=loglevel ) as portal: From dcb0a30ad66fccb8fff21e5457d25f04bd340415 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Jun 2020 13:48:21 -0400 Subject: [PATCH 15/24] Move UI spawning cmds to new module --- piker/brokers/cli.py | 80 --------------------------------------- piker/cli/__init__.py | 1 + piker/ui/__init__.py | 15 -------- piker/ui/cli.py | 19 ---------- piker/ui/kivy/__init__.py | 15 ++++++++ 5 files changed, 16 insertions(+), 114 deletions(-) delete mode 100644 piker/ui/__init__.py diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index f47a7b25..cbdd87b9 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -131,49 +131,6 @@ def bars(config, symbol, count, df_output): click.echo(colorize_json(bars)) -@cli.command() -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--rate', '-r', default=3, help='Quote rate limit') -@click.option('--test', '-t', help='Test quote stream file') -@click.option('--dhost', '-dh', default='127.0.0.1', - help='Daemon host address to connect to') -@click.argument('name', nargs=1, required=True) -@click.pass_obj -def monitor(config, rate, name, dhost, test, tl): - """Start a real-time watchlist UI - """ - # global opts - brokermod = config['brokermod'] - loglevel = config['loglevel'] - log = config['log'] - - watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) - watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) - tickers = watchlists[name] - if not tickers: - log.error(f"No symbols found for watchlist `{name}`?") - return - - from ..ui.monitor import _async_main - - async def main(tries): - async with maybe_spawn_brokerd_as_subactor( - tries=tries, loglevel=loglevel - ) as portal: - # run app "main" - await _async_main( - name, portal, tickers, - brokermod, rate, test=test, - ) - - tractor.run( - partial(main, tries=1), - name='monitor', - loglevel=loglevel if tl else None, - rpc_module_paths=['piker.ui.monitor'], - start_method='forkserver', - ) - @cli.command() @click.option('--rate', '-r', default=5, help='Logging level') @@ -269,40 +226,3 @@ def optsquote(config, symbol, df_output, date): click.echo(df) else: click.echo(colorize_json(quotes)) - - -@cli.command() -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--date', '-d', help='Contracts expiry date') -@click.option('--test', '-t', help='Test quote stream file') -@click.option('--rate', '-r', default=1, help='Logging level') -@click.argument('symbol', required=True) -@click.pass_obj -def optschain(config, symbol, date, tl, rate, test): - """Start an option chain UI - """ - # global opts - loglevel = config['loglevel'] - brokername = config['broker'] - - from ..ui.option_chain import _async_main - - async def main(tries): - async with maybe_spawn_brokerd_as_subactor( - tries=tries, loglevel=loglevel - ): - # run app "main" - await _async_main( - symbol, - brokername, - rate=rate, - loglevel=loglevel, - test=test, - ) - - tractor.run( - partial(main, tries=1), - name='kivy-options-chain', - loglevel=loglevel if tl else None, - start_method='forkserver', - ) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 1e7ceef5..ea72b6b6 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -16,6 +16,7 @@ _config_dir = click.get_app_dir('piker') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') _context_defaults = dict( default_map={ + # Questrade specific quote poll rates 'monitor': { 'rate': 3, }, diff --git a/piker/ui/__init__.py b/piker/ui/__init__.py deleted file mode 100644 index bc6e6ac9..00000000 --- a/piker/ui/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -""" -Stuff for your eyes. -""" -import os -import sys - -# XXX clear all flags at import to avoid upsetting -# ol' kivy see: https://github.com/kivy/kivy/issues/4225 -# though this is likely a ``click`` problem -sys.argv[1:] = [] - -# use the trio async loop -os.environ['KIVY_EVENTLOOP'] = 'trio' -import kivy -kivy.require('1.10.0') diff --git a/piker/ui/cli.py b/piker/ui/cli.py index 2be6d436..a869e307 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -103,22 +103,3 @@ def optschain(config, symbol, date, tl, rate, test): loglevel=loglevel if tl else None, start_method='forkserver', ) - - -@cli.command() -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--date', '-d', help='Contracts expiry date') -@click.option('--test', '-t', help='Test quote stream file') -@click.option('--rate', '-r', default=1, help='Logging level') -@click.argument('symbol', required=True) -@click.pass_obj -def chart(config, symbol, date, tl, rate, test): - """Start an option chain UI - """ - from ._chart import main - - # global opts - loglevel = config['loglevel'] - brokername = config['broker'] - - main(sym=symbol, brokername=brokername, loglevel=loglevel) diff --git a/piker/ui/kivy/__init__.py b/piker/ui/kivy/__init__.py index e69de29b..f0aa8b68 100644 --- a/piker/ui/kivy/__init__.py +++ b/piker/ui/kivy/__init__.py @@ -0,0 +1,15 @@ +""" +Legacy kivy components. +""" +import os +import sys + +# XXX clear all flags at import to avoid upsetting +# ol' kivy see: https://github.com/kivy/kivy/issues/4225 +# though this is likely a ``click`` problem +sys.argv[1:] = [] + +# use the trio async loop +os.environ['KIVY_EVENTLOOP'] = 'trio' +import kivy +kivy.require('1.10.0') From 934108a02477c5de431d997d6cf245665d54d76b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 16 Jun 2020 11:55:37 -0400 Subject: [PATCH 16/24] Add symbol-info command --- piker/brokers/cli.py | 23 +++++++++++++++++++++++ piker/brokers/core.py | 14 +------------- piker/brokers/questrade.py | 5 ++++- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index cbdd87b9..f904e221 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -226,3 +226,26 @@ def optsquote(config, symbol, df_output, date): click.echo(df) else: click.echo(colorize_json(quotes)) + + +@cli.command() +@click.argument('tickers', nargs=-1, required=True) +@click.pass_obj +def symbol_info(config, tickers): + """Print symbol quotes to the console + """ + # global opts + brokermod = config['brokermod'] + + quotes = trio.run(partial(core.symbol_info, brokermod, tickers)) + if not quotes: + log.error(f"No quotes could be found for {tickers}?") + return + + if len(quotes) < len(tickers): + syms = tuple(map(itemgetter('symbol'), quotes)) + for ticker in tickers: + if ticker not in syms: + brokermod.log.warn(f"Could not find symbol {ticker}?") + + click.echo(colorize_json(quotes)) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 6162b4e5..e65fcb40 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -22,7 +22,7 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: async with brokermod.get_client() as client: meth = getattr(client, methname, None) if meth is None: - log.warning( + log.debug( f"Couldn't find API method {methname} looking up on client") meth = getattr(client.api, methname, None) @@ -108,15 +108,3 @@ async def symbol_info( """ async with brokermod.get_client() as client: return await client.symbol_info(symbol, **kwargs) - - -async def symbol_search( - brokermod: ModuleType, - pattern: str, - **kwargs, -) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return symbol info from broker. - """ - async with brokermod.get_client() as client: - # TODO: support multiple asset type concurrent searches. - return await client.search_stocks(pattern=pattern, **kwargs) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index d12d3e64..73ad2984 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -408,7 +408,7 @@ class Client: return symbols2ids - async def symbol_data(self, tickers: List[str]): + async def symbol_info(self, tickers: List[str]): """Return symbol data for ``tickers``. """ t2ids = await self.tickers2ids(tickers) @@ -419,6 +419,9 @@ class Client: return symbols + # TODO: deprecate + symbol_data = symbol_info + async def quote(self, tickers: [str]): """Return stock quotes for each ticker in ``tickers``. """ From 519712e128c1ce56ef60dced3b7945a7790bb9a3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Jun 2020 14:12:38 -0400 Subject: [PATCH 17/24] Add stocks search to qt client --- piker/brokers/questrade.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 73ad2984..1fc3cbc8 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -6,7 +6,7 @@ import inspect import time from datetime import datetime from functools import partial -from itertools import chain +import itertools import configparser from typing import List, Tuple, Dict, Any, Iterator, NamedTuple @@ -602,6 +602,24 @@ class Client: f"Took {time.time() - start} seconds to retreive {len(bars)} bars") return bars + async def search_stocks( + self, + pattern: str, + # how many contracts to return + upto: int = 10, + ) -> Dict[str, str]: + details = {} + results = await self.api.search(prefix=pattern) + for result in results['symbols']: + sym = result['symbol'] + if '.' not in sym: + sym = f"{sym}.{result['listingExchange']}" + + details[sym] = result + + if len(details) == upto: + return details + # marketstore TSD compatible numpy dtype for bar _qt_bars_dt = [ @@ -856,7 +874,7 @@ def format_stock_quote( # 'symbol': quote['symbol'], '%': round(change, 3), 'MC': mktcap, - # why questrade do you have to be an asshole shipping null values!!! + # why questrade do you have to be shipping null values!!! # '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3), 'close': previous, }) @@ -870,7 +888,7 @@ def format_stock_quote( new = {} displayable = {} - for key, value in chain(quote.items(), computed.items()): + for key, value in itertools.chain(quote.items(), computed.items()): new_key = keymap.get(key) if not new_key: continue @@ -888,7 +906,6 @@ def format_stock_quote( new[new_key] = value displayable[new_key] = display_value - return new, displayable From 75f98276cc66dab7e7af7a260c3d59727c6f5de5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Jun 2020 14:13:00 -0400 Subject: [PATCH 18/24] Add symbol search to broker api --- piker/brokers/core.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index e65fcb40..2e672c61 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -108,3 +108,15 @@ async def symbol_info( """ async with brokermod.get_client() as client: return await client.symbol_info(symbol, **kwargs) + + +async def symbol_search( + brokermod: ModuleType, + symbol: str, + **kwargs, +) -> Dict[str, Dict[str, Dict[str, Any]]]: + """Return symbol info from broker. + """ + async with brokermod.get_client() as client: + # TODO: support multiple asset type concurrent searches. + return await client.search_stocks(symbol, **kwargs) From a6de6231475ef1469df0ce04a72d070c058264b3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Jun 2020 14:13:36 -0400 Subject: [PATCH 19/24] Add search command to cli --- piker/brokers/cli.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index f904e221..b18219bf 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -249,3 +249,20 @@ def symbol_info(config, tickers): brokermod.log.warn(f"Could not find symbol {ticker}?") click.echo(colorize_json(quotes)) + + +@cli.command() +@click.argument('pattern', required=True) +@click.pass_obj +def search(config, pattern): + """Search for symbols from broker backend(s). + """ + # global opts + brokermod = config['brokermod'] + + quotes = trio.run(partial(core.symbol_search, brokermod, pattern)) + if not quotes: + log.error(f"No matches could be found for {pattern}?") + return + + click.echo(colorize_json(quotes)) From 60b74ad7d1112ebaf287684060c58dc694a99679 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Jun 2020 14:13:56 -0400 Subject: [PATCH 20/24] Use new method name --- piker/brokers/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 15e90238..96bfa1e1 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -190,7 +190,7 @@ async def symbol_data(broker: str, tickers: List[str]): """Retrieve baseline symbol info from broker. """ async with get_cached_feed(broker) as feed: - return await feed.client.symbol_data(tickers) + return await feed.client.symbol_info(tickers) async def smoke_quote(get_quotes, tickers, broker): From 312169e79063d544cf232ee75b76baa507d2d41d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Jul 2020 16:58:40 -0400 Subject: [PATCH 21/24] Support the `stream_quotes()` api in questrade backend --- piker/brokers/questrade.py | 162 ++++++++++++++++++++++++++++++++++++- 1 file changed, 158 insertions(+), 4 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 1fc3cbc8..961190c1 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -3,15 +3,20 @@ Questrade API backend. """ from __future__ import annotations import inspect +import contextlib import time from datetime import datetime from functools import partial import itertools import configparser -from typing import List, Tuple, Dict, Any, Iterator, NamedTuple +from typing import ( + List, Tuple, Dict, Any, Iterator, NamedTuple, + AsyncGenerator, +) import arrow import trio +import tractor from async_generator import asynccontextmanager import pandas as pd import numpy as np @@ -23,6 +28,7 @@ from . import config from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json from .._async_utils import async_lifo_cache +from . import get_brokermod log = get_logger(__name__) @@ -408,10 +414,10 @@ class Client: return symbols2ids - async def symbol_info(self, tickers: List[str]): - """Return symbol data for ``tickers``. + async def symbol_info(self, symbols: List[str]): + """Return symbol data for ``symbols``. """ - t2ids = await self.tickers2ids(tickers) + t2ids = await self.tickers2ids(symbols) ids = ','.join(t2ids.values()) symbols = {} for pkt in (await self.api.symbols(ids=ids))['symbols']: @@ -1004,3 +1010,151 @@ def format_option_quote( displayable[new_key] = display_value return new, displayable + + +@asynccontextmanager +async def get_cached_client( + brokername: str, + *args, + **kwargs, +) -> 'Client': + """Get a cached broker client from the current actor's local vars. + + If one has not been setup do it and cache it. + """ + # check if a cached client is in the local actor's statespace + ss = tractor.current_actor().statespace + clients = ss.setdefault('clients', {'_lock': trio.Lock()}) + lock = clients['_lock'] + client = None + try: + log.info(f"Loading existing `{brokername}` daemon") + async with lock: + client = clients[brokername] + except KeyError: + log.info(f"Creating new client for broker {brokername}") + async with lock: + brokermod = get_brokermod(brokername) + exit_stack = contextlib.AsyncExitStack() + client = await exit_stack.enter_async_context( + brokermod.get_client()) + client._exit_stack = exit_stack + clients[brokername] = client + else: + client._consumers += 1 + yield client + finally: + client._consumers -= 1 + if client._consumers <= 0: + # teardown the client + await client._exit_stack.aclose() + + +async def smoke_quote(get_quotes, tickers): # , broker): + """Do an initial "smoke" request for symbols in ``tickers`` filtering + out any symbols not supported by the broker queried in the call to + ``get_quotes()``. + """ + from operator import itemgetter + # TODO: trim out with #37 + ################################################# + # get a single quote filtering out any bad tickers + # NOTE: this code is always run for every new client + # subscription even when a broker quoter task is already running + # since the new client needs to know what symbols are accepted + log.warn(f"Retrieving smoke quote for symbols {tickers}") + quotes = await get_quotes(tickers) + + # report any tickers that aren't returned in the first quote + invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes)) + for symbol in invalid_tickers: + tickers.remove(symbol) + log.warn( + f"Symbol `{symbol}` not found") # by broker `{broker}`" + # ) + + # pop any tickers that return "empty" quotes + payload = {} + for quote in quotes: + symbol = quote['symbol'] + if quote is None: + log.warn( + f"Symbol `{symbol}` not found") + # XXX: not this mutates the input list (for now) + tickers.remove(symbol) + continue + + # report any unknown/invalid symbols (QT specific) + if quote.get('low52w', False) is None: + log.error( + f"{symbol} seems to be defunct") + + payload[symbol] = quote + + return payload + + # end of section to be trimmed out with #37 + ########################################### + + +@tractor.stream +async def stream_quotes( + ctx: tractor.Context, # marks this as a streaming func + symbols: List[str], + feed_type: str = 'stock', + diff_cached: bool = True, + rate: int = 3, + # feed_type: str = 'stock', +) -> AsyncGenerator[str, Dict[str, Any]]: + + async with get_cached_client('questrade') as client: + if feed_type == 'stock': + formatter = format_stock_quote + get_quotes = await stock_quoter(client, symbols) + + # do a smoke quote (note this mutates the input list and filters + # out bad symbols for now) + payload = await smoke_quote(get_quotes, list(symbols)) + else: + formatter = format_option_quote + get_quotes = await option_quoter(client, symbols) + # packetize + payload = { + quote['symbol']: quote + for quote in await get_quotes(symbols) + } + + symbol_data = await client.symbol_info(symbols) + + # function to format packets delivered to subscribers + def packetizer( + topic: str, + quotes: Dict[str, Any] + ) -> Dict[str, Any]: + """Normalize quotes by name into dicts. + """ + new = {} + for quote in quotes: + new[quote['symbol']], _ = formatter(quote, symbol_data) + + return new + + # push initial smoke quote response for client initialization + await ctx.send_yield(payload) + + from .data import stream_poll_requests + + await stream_poll_requests( + + # ``msg.pub`` required kwargs + task_name=feed_type, + ctx=ctx, + topics=symbols, + packetizer=packetizer, + + # actual func args + get_quotes=get_quotes, + diff_cached=diff_cached, + rate=rate, + ) + log.info("Terminating stream quoter task") From 702c63f6072a51140dc774dc508810d72ce40fe1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Jul 2020 13:47:18 -0400 Subject: [PATCH 22/24] Define "packetizer" in specific broker mod Allows for formatting published quotes using a broker specific formatting callback. --- piker/brokers/data.py | 53 ++++++++++++++++++++++++----------- piker/brokers/questrade.py | 57 ++++++++++++++++++++++++-------------- piker/data/__init__.py | 41 ++++++++++++++++++++++----- 3 files changed, 107 insertions(+), 44 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 96bfa1e1..9580add8 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -12,7 +12,7 @@ import typing from typing import ( Coroutine, Callable, Dict, List, Any, Tuple, AsyncGenerator, - Sequence, + Sequence ) import contextlib from operator import itemgetter @@ -47,7 +47,7 @@ async def wait_for_network( continue except socket.gaierror: if not down: # only report/log network down once - log.warn(f"Network is down waiting for re-establishment...") + log.warn("Network is down waiting for re-establishment...") down = True await trio.sleep(sleep) @@ -83,7 +83,6 @@ class BrokerFeed: async def stream_poll_requests( get_topics: typing.Callable, get_quotes: Coroutine, - feed: BrokerFeed, rate: int = 3, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -93,17 +92,18 @@ async def stream_poll_requests( This routine is built for brokers who support quote polling for multiple symbols per request. The ``get_topics()`` func is called to retreive the set of symbols each iteration and ``get_quotes()`` is to retreive - the quotes. - + the quotes. A stock-broker client ``get_quotes()`` async function must be provided which returns an async quote retrieval function. - """ - broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) - if broker_limit < rate: - rate = broker_limit - log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") + .. note:: + This code is mostly tailored (for now) to the questrade backend. + It is currently the only broker that doesn't support streaming without + paying for data. See the note in the diffing section regarding volume + differentials which needs to be addressed in order to get cross-broker + support. + """ sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching @@ -136,6 +136,7 @@ async def stream_poll_requests( for quote in quotes: symbol = quote['symbol'] last = _cache.setdefault(symbol, {}) + last_volume = last.get('volume', 0) # find all keys that have match to a new value compared # to the last quote received @@ -153,9 +154,22 @@ async def stream_poll_requests( # shares traded is useful info and it's possible # that the set difference from above will disregard # a "size" value since the same # of shares were traded - size = quote.get('size') - if size and 'volume' in payload: - payload['size'] = size + volume = payload.get('volume') + if volume: + volume_since_last_quote = volume - last_volume + assert volume_since_last_quote > 0 + payload['volume_delta'] = volume_since_last_quote + + # TODO: We can emit 2 ticks here: + # - one for the volume differential + # - one for the last known trade size + # The first in theory can be unwound and + # interpolated assuming the broker passes an + # accurate daily VWAP value. + # To make this work we need a universal ``size`` + # field that is normalized before hitting this logic. + # XXX: very questrade specific + payload['size'] = quote['lastTradeSize'] # XXX: we append to a list for the options case where the # subscription topic (key) is the same for all @@ -312,6 +326,7 @@ async def start_quote_stream( # do a smoke quote (note this mutates the input list and filters # out bad symbols for now) payload = await smoke_quote(get_quotes, symbols, broker) + formatter = feed.mod.format_stock_quote elif feed_type == 'option': # FIXME: yeah we need maybe a more general way to specify @@ -326,9 +341,16 @@ async def start_quote_stream( quote['symbol']: quote for quote in await get_quotes(symbols) } + formatter = feed.mod.format_option_quote - def packetizer(topic, quotes): - return {quote['symbol']: quote for quote in quotes} + sd = await feed.client.symbol_info(symbols) + # formatter = partial(formatter, symbol_data=sd) + + packetizer = partial( + feed.mod.packetizer, + formatter=formatter, + symbol_data=sd, + ) # push initial smoke quote response for client initialization await ctx.send_yield(payload) @@ -342,7 +364,6 @@ async def start_quote_stream( packetizer=packetizer, # actual func args - feed=feed, get_quotes=get_quotes, diff_cached=diff_cached, rate=rate, diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 961190c1..de0d1e7a 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -12,6 +12,7 @@ import configparser from typing import ( List, Tuple, Dict, Any, Iterator, NamedTuple, AsyncGenerator, + Callable, ) import arrow @@ -26,7 +27,7 @@ import asks from ..calc import humanize, percent_change from . import config from ._util import resproc, BrokerError, SymbolNotFound -from ..log import get_logger, colorize_json +from ..log import get_logger, colorize_json, get_console_log from .._async_utils import async_lifo_cache from . import get_brokermod @@ -933,7 +934,8 @@ _qt_option_keys = { # "theta": ('theta', partial(round, ndigits=3)), # "vega": ('vega', partial(round, ndigits=3)), '$ vol': ('$ vol', humanize), - 'volume': ('vol', humanize), + # XXX: required key to trigger trade execution datum msg + 'volume': ('volume', humanize), # "2021-01-15T00:00:00.000000-05:00", # "isHalted": false, # "key": [ @@ -1031,18 +1033,20 @@ async def get_cached_client( log.info(f"Loading existing `{brokername}` daemon") async with lock: client = clients[brokername] + client._consumers += 1 + yield client except KeyError: log.info(f"Creating new client for broker {brokername}") async with lock: brokermod = get_brokermod(brokername) exit_stack = contextlib.AsyncExitStack() client = await exit_stack.enter_async_context( - brokermod.get_client()) + brokermod.get_client() + ) + client._consumers = 0 client._exit_stack = exit_stack clients[brokername] = client - else: - client._consumers += 1 - yield client + yield client finally: client._consumers -= 1 if client._consumers <= 0: @@ -1097,6 +1101,23 @@ async def smoke_quote(get_quotes, tickers): # , broker): ########################################### +# function to format packets delivered to subscribers +def packetizer( + topic: str, + quotes: Dict[str, Any], + formatter: Callable, + symbol_data: Dict[str, Any], +) -> Dict[str, Any]: + """Normalize quotes by name into dicts using broker-specific + processing. + """ + new = {} + for quote in quotes: + new[quote['symbol']], _ = formatter(quote, symbol_data) + + return new + + @tractor.stream async def stream_quotes( ctx: tractor.Context, # marks this as a streaming func @@ -1104,8 +1125,11 @@ async def stream_quotes( feed_type: str = 'stock', diff_cached: bool = True, rate: int = 3, + loglevel: str = None, # feed_type: str = 'stock', ) -> AsyncGenerator[str, Dict[str, Any]]: + # XXX: why do we need this again? + get_console_log(tractor.current_actor().loglevel) async with get_cached_client('questrade') as client: if feed_type == 'stock': @@ -1124,20 +1148,7 @@ async def stream_quotes( for quote in await get_quotes(symbols) } - symbol_data = await client.symbol_info(symbols) - - # function to format packets delivered to subscribers - def packetizer( - topic: str, - quotes: Dict[str, Any] - ) -> Dict[str, Any]: - """Normalize quotes by name into dicts. - """ - new = {} - for quote in quotes: - new[quote['symbol']], _ = formatter(quote, symbol_data) - - return new + sd = await client.symbol_info(symbols) # push initial smoke quote response for client initialization await ctx.send_yield(payload) @@ -1150,7 +1161,11 @@ async def stream_quotes( task_name=feed_type, ctx=ctx, topics=symbols, - packetizer=packetizer, + packetizer=partial( + packetizer, + formatter=formatter, + symboal_data=sd, + ), # actual func args get_quotes=get_quotes, diff --git a/piker/data/__init__.py b/piker/data/__init__.py index af05da36..dd2c6b7b 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -6,19 +6,35 @@ and storing data from your brokers as well as sharing your feeds with other fellow pikers. """ from contextlib import asynccontextmanager +from importlib import import_module +from types import ModuleType from typing import ( Dict, List, Any, Sequence, AsyncIterator, Optional ) +import trio import tractor from ..brokers import get_brokermod -from ..log import get_logger +from ..log import get_logger, get_console_log log = get_logger(__name__) +__ingestors__ = [ + 'marketstore', +] + + +def get_ingestor(name: str) -> ModuleType: + """Return the imported ingestor module by name. + """ + module = import_module('.' + name, 'piker.data') + # we only allow monkeying because it's for internal keying + module.name = module.__name__.split('.')[-1] + return module + _data_mods = [ 'piker.brokers.core', @@ -30,7 +46,6 @@ _data_mods = [ async def maybe_spawn_brokerd( brokername: str, sleep: float = 0.5, - tries: int = 10, loglevel: Optional[str] = None, expose_mods: List = [], **tractor_kwargs, @@ -38,6 +53,11 @@ async def maybe_spawn_brokerd( """If no ``brokerd.{brokername}`` daemon-actor can be found, spawn one in a local subactor and return a portal to it. """ + if loglevel: + get_console_log(loglevel) + + tractor_kwargs['loglevel'] = loglevel + brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' async with tractor.find_actor(dname) as portal: @@ -69,21 +89,28 @@ async def maybe_spawn_brokerd( async def open_feed( name: str, symbols: Sequence[str], + loglevel: str = 'info', ) -> AsyncIterator[Dict[str, Any]]: try: mod = get_brokermod(name) except ImportError: - # TODO: try to pull up ingest feeds - # - market store - # - influx - raise + mod = get_ingestormod(name) async with maybe_spawn_brokerd( mod.name, + loglevel=loglevel, ) as portal: stream = await portal.run( mod.__name__, 'stream_quotes', symbols=symbols, ) - yield stream + # Feed is required to deliver an initial quote asap. + # TODO: should we timeout and raise a more explicit error? + # with trio.fail_after(5): + with trio.fail_after(float('inf')): + # Retreive initial quote for each symbol + # such that consumer code can know the data layout + first_quote = await stream.__anext__() + log.info(f"Received first quote {first_quote}") + yield (first_quote, stream) From 316137fdf2e5f6523a03e9ed43ea0f382e7d631f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Aug 2020 20:08:05 -0400 Subject: [PATCH 23/24] Begin to wrap marketstore as a data feed Wrap the sync client in an async interface in anticipation of an actual async client. This starts support for the `open_fee()`/`stream_quotes()` api though the tick normalization isn't correct yet. --- piker/data/__init__.py | 4 +- piker/data/marketstore.py | 346 ++++++++++++++++++++------------------ 2 files changed, 187 insertions(+), 163 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index dd2c6b7b..bb8fca8b 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -1,7 +1,7 @@ """ Data feed apis and infra. -We ship some tsdb integrations for retrieving +We provide tsdb integrations for retrieving and storing data from your brokers as well as sharing your feeds with other fellow pikers. """ @@ -91,6 +91,8 @@ async def open_feed( symbols: Sequence[str], loglevel: str = 'info', ) -> AsyncIterator[Dict[str, Any]]: + """Open a "data feed" which provides streamed real-time quotes. + """ try: mod = get_brokermod(name) except ImportError: diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 6f0d1bb9..1baafb1a 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -1,41 +1,38 @@ """ ``marketstore`` integration. -- TICK data ingest routines +- client management routines +- ticK data ingest routines - websocket client for subscribing to write triggers -- docker container management automation +- todo: tick sequence stream-cloning for testing +- todo: docker container management automation """ -from pprint import pformat -from typing import Dict, Any, List -from functools import partial +from contextlib import asynccontextmanager +from typing import Dict, Any, List, Callable, Tuple import time +from math import isnan import msgpack import numpy as np import pandas as pd import pymarketstore as pymkts -import click -import trio -import tractor from trio_websocket import open_websocket_url -from . import maybe_spawn_brokerd -from ..cli import cli -from .. import watchlists as wl -from ..brokers.data import DataFeed -from ..log import get_logger +from ..log import get_logger, get_console_log +from ..data import open_feed log = get_logger(__name__) -_tick_tbk_ids = ('1Sec', 'TICK') -_tick_tbk = '{}/' + '/'.join(_tick_tbk_ids) - - +_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK') +_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) +_url: str = 'http://localhost:5993/rpc' _quote_dt = [ + # these two are required for as a "primary key" ('Epoch', 'i8'), ('Nanoseconds', 'i4'), - ('Tick', 'i4'), + + ('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask) # ('fill_time', 'f4'), ('Last', 'f4'), ('Bid', 'f4'), @@ -44,11 +41,10 @@ _quote_dt = [ ('Ask', 'f4'), ('Size', 'i8'), ('Volume', 'i8'), + # ('Broker_time_ns', 'i64'), # ('VWAP', 'f4') ] _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) - - _tick_map = { 'Up': 1, 'Equal': 0, @@ -84,7 +80,7 @@ def quote_to_marketstore_structarray( else: # this should get inserted upstream by the broker-client to # subtract from IPC latency - now = timestamp(pd.Timestamp.now()) + now = time.time_ns() secs, ns = now / 10**9, now % 10**9 @@ -109,172 +105,152 @@ def quote_to_marketstore_structarray( def timestamp(datestr: str) -> int: - """Return marketstore compatible 'Epoch' integer in nanoseconds. + """Return marketstore compatible 'Epoch' integer in nanoseconds + from a date formatted str. """ return int(pd.Timestamp(datestr).value) -@cli.command() -@click.option('--test-file', '-t', help='Test quote stream file') -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option( - '--url', - default='http://localhost:5993/rpc', - help='HTTP URL of marketstore instance' -) -@click.argument('name', nargs=1, required=True) -@click.pass_obj -def ingest(config, name, test_file, tl, url): - """Ingest real-time broker quotes and ticks to a marketstore instance. +def mk_tbk(keys: Tuple[str, str, str]) -> str: + """Generate a marketstore table key from a tuple. + + Converts, + ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"``` """ - # global opts - brokermod = config['brokermod'] - loglevel = config['loglevel'] - log = config['log'] + return '{}/' + '/'.join(keys) - watchlist_from_file = wl.ensure_watchlists(config['wl_path']) - watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) - symbols = watchlists[name] - async def main(tries): - async with maybe_spawn_brokerd( - brokername=brokermod.name, - tries=tries, - loglevel=loglevel - ) as portal: - # connect to broker data feed - feed = DataFeed(portal, brokermod) - qstream, quotes = await feed.open_stream( - symbols, - 'stock', - rate=3, - test=test_file, - ) +class Client: + """Async wrapper around the alpaca ``pymarketstore`` sync client. - first_quotes, _ = feed.format_quotes(quotes) + This will server as the shell for building out a proper async client + that isn't horribly documented and un-tested.. + """ + def __init__(self, url: str): + self._client = pymkts.Client(url) - if first_quotes[0].get('last') is None: - log.error("Broker API is down temporarily") - return + async def _invoke( + self, + meth: Callable, + *args, + **kwargs, + ) -> Any: + return err_on_resp(meth(*args, **kwargs)) - quote_cache = {quote['symbol']: quote for quote in first_quotes} + async def destroy( + self, + tbk: Tuple[str, str, str], + ) -> None: + return await self._invoke(self._client.destroy, mk_tbk(tbk)) - client = pymkts.Client(endpoint=url) + async def list_symbols( + self, + tbk: str, + ) -> List[str]: + return await self._invoke(self._client.list_symbols, mk_tbk(tbk)) + + async def write( + self, + symbol: str, + array: np.ndarray, + ) -> None: + start = time.time() + await self._invoke( + self._client.write, + array, + _tick_tbk.format(symbol), + isvariablelength=True + ) + log.debug(f"{symbol} write time (s): {time.time() - start}") + + def query( + self, + symbol, + tbk: Tuple[str, str] = _tick_tbk_ids, + ) -> pd.DataFrame: + # XXX: causes crash + # client.query(pymkts.Params(symbol, '*', 'OHCLV' + result = self._client.query( + pymkts.Params(symbol, *tbk), + ) + return result.first().df() + + +@asynccontextmanager +async def get_client( + url: str = _url, +) -> Client: + yield Client(url) + + +async def ingest_quote_stream( + symbols: List[str], + brokername: str, + tries: int = 1, + loglevel: str = None, +) -> None: + """Ingest a broker quote stream into marketstore in (sampled) tick format. + """ + async with open_feed( + brokername, + symbols, + loglevel=loglevel, + ) as (first_quotes, qstream): + + quote_cache = first_quotes.copy() + + async with get_client() as ms_client: # start ingest to marketstore async for quotes in qstream: + log.info(quotes) for symbol, quote in quotes.items(): - fmt_quote, _ = brokermod.format_stock_quote( - quote, - feed._symbol_data_cache - ) # remap tick strs to ints - fmt_quote['tick'] = _tick_map[ - fmt_quote.get('tick', 'Equal') - ] + quote['tick'] = _tick_map[quote.get('tick', 'Equal')] # check for volume update (i.e. did trades happen # since last quote) - new_vol = fmt_quote.get('volume', None) + new_vol = quote.get('volume', None) if new_vol is None: log.debug(f"No fills for {symbol}") if new_vol == quote_cache.get('volume'): + # should never happen due to field diffing + # on sender side log.error( f"{symbol}: got same volume as last quote?") - quote_cache.update(fmt_quote) + quote_cache.update(quote) a = quote_to_marketstore_structarray( - fmt_quote, + quote, # TODO: check this closer to the broker query api - last_fill=fmt_quote.get('last_fill', '') + last_fill=quote.get('fill_time', '') ) - start = time.time() - err_on_resp(client.write( - a, _tick_tbk.format(symbol), isvariablelength=True) - ) - log.trace( - f"{symbol} write time (s): {time.time() - start}") - - tractor.run( - partial(main, tries=1), - name='ingest_marketstore', - loglevel=loglevel if tl else None, - # start_method='forkserver', - ) + await ms_client.write(symbol, a) -@cli.command() -@click.option( - '--tl', - is_flag=True, - help='Enable tractor logging') -@click.option( - '--url', - default='http://localhost:5993/rpc', - help='HTTP URL of marketstore instance' -) -@click.argument('name', nargs=1, required=True) -@click.pass_obj -def ms_shell(config, name, tl, url): - """Start an IPython shell ready to query the local marketstore db. - """ - client = pymkts.Client(url) - - def query(name, tbk=_tick_tbk_ids): - return client.query( - pymkts.Params(name, *tbk)).first().df() - - # causes crash - # client.query(pymkts.Params(symbol, '*', 'OHCLV' - - from IPython import embed - embed() - - -@cli.command() -@click.option( - '--url', - default='http://localhost:5993/rpc', - help='HTTP URL of marketstore instance' -) -@click.argument('names', nargs=-1) -@click.pass_obj -def marketstore_destroy(config: dict, names: List[str], url: str) -> None: - """Destroy symbol entries in the local marketstore instance. - """ - client = pymkts.Client(url) - if not names: - names = client.list_symbols() - - # default is to wipe db entirely. - answer = input( - "This will entirely wipe you local marketstore db @ " - f"{url} of the following symbols:\n {pformat(names)}" - "\n\nDelete [N/y]?\n") - - if answer == 'y': - for sym in names: - tbk = _tick_tbk.format(sym) - print(f"Destroying {tbk}..") - err_on_resp(client.destroy(_tick_tbk.format(sym))) - else: - print("Nothing deleted.") - - -async def open_quote_stream( - tbks: List[str], +async def stream_quotes( + symbols: List[str], host: str = 'localhost', - port: int = 5993 + port: int = 5993, + diff_cached: bool = True, + loglevel: str = None, ) -> None: """Open a symbol stream from a running instance of marketstore and log to console. """ + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} + async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: # send subs topics to server - await ws.send_message(msgpack.dumps({'streams': tbks})) + resp = await ws.send_message( + msgpack.dumps({'streams': list(tbks.values())}) + ) + log.info(resp) async def recv() -> Dict[str, Any]: return msgpack.loads((await ws.get_message()), encoding='utf-8') @@ -282,21 +258,67 @@ async def open_quote_stream( streams = (await recv())['streams'] log.info(f"Subscribed to {streams}") + _cache = {} + while True: msg = await recv() - log.info(f"Received quote:\n{msg}") + # unpack symbol and quote data + # key is in format ``//`` + symbol = msg['key'].split('/')[0] + data = msg['data'] -@cli.command() -@click.option( - '--url', - default='ws://localhost:5993/ws', - help='HTTP URL of marketstore instance' -) -@click.argument('names', nargs=-1) -@click.pass_obj -def marketstore_stream(config: dict, names: List[str], url: str): - """Connect to a marketstore time bucket stream for (a set of) symbols(s) - and print to console. - """ - trio.run(open_quote_stream, names) + # calc time stamp(s) + s, ns = data.pop('Epoch'), data.pop('Nanoseconds') + ts = s * 10**9 + ns + data['broker_fill_time_ns'] = ts + + quote = {} + for k, v in data.items(): + if isnan(v): + continue + + quote[k.lower()] = v + + quote['symbol'] = symbol + + quotes = {} + + if diff_cached: + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info(f"New quote {quote['symbol']}:\n{new}") + + # only ship diff updates and other required fields + payload = {k: quote[k] for k, v in new} + payload['symbol'] = symbol + + # if there was volume likely the last size of + # shares traded is useful info and it's possible + # that the set difference from above will disregard + # a "size" value since the same # of shares were traded + size = quote.get('size') + volume = quote.get('volume') + if size and volume: + new_volume_since_last = max( + volume - last.get('volume', 0), 0) + log.warning( + f"NEW VOLUME {symbol}:{new_volume_since_last}") + payload['size'] = size + payload['last'] = quote.get('last') + + # XXX: we append to a list for the options case where the + # subscription topic (key) is the same for all + # expiries even though this is uncessary for the + # stock case (different topic [i.e. symbol] for each + # quote). + quotes.setdefault(symbol, []).append(payload) + + # update cache + _cache[symbol].update(quote) + else: + quotes = {symbol: [{key.lower(): val for key, val in quote.items()}]} + + if quotes: + yield quotes From 05d2985f5ffc8f4207b3489c7a5e7c06551b3e15 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 2 Sep 2020 11:32:54 -0400 Subject: [PATCH 24/24] Clarify some odd spots --- piker/data/marketstore.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 1baafb1a..84e62ecb 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -41,7 +41,7 @@ _quote_dt = [ ('Ask', 'f4'), ('Size', 'i8'), ('Volume', 'i8'), - # ('Broker_time_ns', 'i64'), + # ('brokerd_ts', 'i64'), # ('VWAP', 'f4') ] _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) @@ -87,9 +87,8 @@ def quote_to_marketstore_structarray( # pack into List[Tuple[str, Any]] array_input = [] - # insert 'Epoch' entry first + # insert 'Epoch' entry first and then 'Nanoseconds'. array_input.append(int(secs)) - # insert 'Nanoseconds' field array_input.append(int(ns)) # append remaining fields @@ -97,7 +96,10 @@ def quote_to_marketstore_structarray( if 'f' in dt: none = np.nan else: + # for ``np.int`` we use 0 as a null value none = 0 + + # casefold? see https://github.com/alpacahq/marketstore/issues/324 val = quote.get(name.casefold(), none) array_input.append(val)