diff --git a/README.rst b/README.rst index f379ea18..afaa5de3 100644 --- a/README.rst +++ b/README.rst @@ -5,22 +5,51 @@ Trading gear for hackers. |travis| ``piker`` is an attempt at a pro-grade, broker agnostic, next-gen FOSS toolset for real-time -trading and financial analysis. +trading and financial analysis targetted at hardcore Linux users. -It tries to use as much cutting edge tech as possible including (but not limited to): +It tries to use as much bleeding edge tech as possible including (but not limited to): -- Python 3.7+ -- trio_ -- tractor_ -- kivy_ +- Python 3.7+ for glue and business logic +- trio_ for async +- tractor_ as the underlying actor model +- marketstore_ for historical data persistence and sharing +- Qt_ for pristine high performance UIs .. |travis| image:: https://img.shields.io/travis/pikers/piker/master.svg :target: https://travis-ci.org/pikers/piker .. _trio: https://github.com/python-trio/trio .. _tractor: https://github.com/goodboy/tractor -.. _kivy: https://kivy.org +.. _marketstore: https://github.com/alpacahq/marketstore +.. _Qt: https://www.qt.io/ -Also, we're always open to new framework suggestions and ideas! + +Focus and Features: +******************* +- 100% decentralized: running your code on your hardware with your + broker's data feeds **is the point** (this is not a web-based *I + don't know how to run my own system* project). +- Built on a highly reliable "next-gen" [actor + model](https://github.com/goodboy/tractor) with built in async + streaming and scalability protocols allowing us to utilize + a distributed architecture from the ground up. +- Privacy: your orders, indicators, algos are all run client side and + are shared only with the (groups of) traders you specify. +- Production grade, highly attractive native UIs that feel and fit like + a proper pair of skinny jeans; only meant to be used with a proper + tiling window manager (no, we are not ignorant enough to roll our own). +- Sophisticated charting capable of processing large data sets in real-time + while sanely displaying complex models and strategy systems. +- Built-in support for *hipstery* indicators and studies that you + probably haven't heard of but that the authors **know** generate alpha + when paired with the right strategies. +- Emphasis on collaboration through sharing of data, ideas, and processing + power. +- Adoption is very low priority, especially if you're not an experienced + trader; the system is not built for sale it is built for *people*. +- No, we will never have a "corporation friendly license"; if you intend to use + this code base we must know about it. + +Fitting with these tenets, we're always open to new framework suggestions and ideas. Building the best looking, most reliable, keyboard friendly trading platform is the dream. Feel free to pipe in with your ideas and quiffs. diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index 73fb3e9f..b8b6fccc 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -12,6 +12,10 @@ class BrokerError(Exception): "Generic broker issue" +class SymbolNotFound(BrokerError): + "Symbol not found by broker search" + + def resproc( resp: asks.response_objects.Response, log: logging.Logger, diff --git a/piker/cli.py b/piker/brokers/cli.py similarity index 57% rename from piker/cli.py rename to piker/brokers/cli.py index dcdf3f06..370722b6 100644 --- a/piker/cli.py +++ b/piker/brokers/cli.py @@ -1,9 +1,8 @@ """ Console interface to broker client/daemons. """ -from functools import partial -import json import os +from functools import partial from operator import attrgetter from operator import itemgetter @@ -12,62 +11,17 @@ import pandas as pd import trio import tractor -from . import watchlists as wl -from .log import get_console_log, colorize_json, get_logger -from .brokers import core, get_brokermod, data, config -from .brokers.core import maybe_spawn_brokerd_as_subactor, _data_mods +from ..cli import cli +from .. import watchlists as wl +from ..log import get_console_log, colorize_json, get_logger +from ..brokers.core import maybe_spawn_brokerd_as_subactor +from ..brokers import core, get_brokermod, data log = get_logger('cli') DEFAULT_BROKER = 'questrade' _config_dir = click.get_app_dir('piker') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') -_context_defaults = dict( - default_map={ - 'monitor': { - 'rate': 3, - }, - 'optschain': { - 'rate': 1, - }, - } -) - - -@click.command() -@click.option('--loglevel', '-l', default='warning', help='Logging level') -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') -def pikerd(loglevel, host, tl): - """Spawn the piker broker-daemon. - """ - get_console_log(loglevel) - tractor.run_daemon( - rpc_module_paths=_data_mods, - name='brokerd', - loglevel=loglevel if tl else None, - ) - - -@click.group(context_settings=_context_defaults) -@click.option('--broker', '-b', default=DEFAULT_BROKER, - help='Broker backend to use') -@click.option('--loglevel', '-l', default='warning', help='Logging level') -@click.option('--configdir', '-c', help='Configuration directory') -@click.pass_context -def cli(ctx, broker, loglevel, configdir): - if configdir is not None: - assert os.path.isdir(configdir), f"`{configdir}` is not a valid path" - config._override_config_dir(configdir) - - # ensure that ctx.obj exists even though we aren't using it (yet) - ctx.ensure_object(dict) - ctx.obj.update({ - 'broker': broker, - 'brokermod': get_brokermod(broker), - 'loglevel': loglevel, - 'log': get_console_log(loglevel), - }) @cli.command() @@ -77,7 +31,7 @@ def cli(ctx, broker, loglevel, configdir): @click.argument('kwargs', nargs=-1) @click.pass_obj def api(config, meth, kwargs, keys): - """client for testing broker API methods with pretty printing of output. + """Make a broker-client API method call """ # global opts broker = config['broker'] @@ -114,8 +68,7 @@ def api(config, meth, kwargs, keys): @click.argument('tickers', nargs=-1, required=True) @click.pass_obj def quote(config, tickers, df_output): - """Retreive symbol quotes on the console in either json or dataframe - format. + """Print symbol quotes to the console """ # global opts brokermod = config['brokermod'] @@ -132,11 +85,10 @@ def quote(config, tickers, df_output): brokermod.log.warn(f"Could not find symbol {ticker}?") if df_output: - cols = next(filter(bool, quotes.values())).copy() + cols = next(filter(bool, quotes)).copy() cols.pop('symbol') df = pd.DataFrame( - (quote or {} for quote in quotes.values()), - index=quotes.keys(), + (quote or {} for quote in quotes), columns=cols, ) click.echo(df) @@ -144,6 +96,40 @@ def quote(config, tickers, df_output): click.echo(colorize_json(quotes)) +@cli.command() +@click.option('--df-output', '-df', flag_value=True, + help='Output in `pandas.DataFrame` format') +@click.option('--count', '-c', default=100, + help='Number of bars to retrieve') +@click.argument('symbol', required=True) +@click.pass_obj +def bars(config, symbol, count, df_output): + """Retreive 1m bars for symbol and print on the console + """ + # global opts + brokermod = config['brokermod'] + + # broker backend should return at the least a + # list of candle dictionaries + bars = trio.run( + partial( + core.bars, + brokermod, + symbol, + count=count, + ) + ) + + if not bars: + log.error(f"No quotes could be found for {symbol}?") + return + + if df_output: + click.echo(pd.DataFrame(bars)) + else: + click.echo(colorize_json(bars)) + + @cli.command() @click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--rate', '-r', default=3, help='Quote rate limit') @@ -153,7 +139,7 @@ def quote(config, tickers, df_output): @click.argument('name', nargs=1, required=True) @click.pass_obj def monitor(config, rate, name, dhost, test, tl): - """Spawn a real-time watchlist. + """Start a real-time watchlist UI """ # global opts brokermod = config['brokermod'] @@ -167,7 +153,7 @@ def monitor(config, rate, name, dhost, test, tl): log.error(f"No symbols found for watchlist `{name}`?") return - from .ui.monitor import _async_main + from ..ui.monitor import _async_main async def main(tries): async with maybe_spawn_brokerd_as_subactor( @@ -184,6 +170,7 @@ def monitor(config, rate, name, dhost, test, tl): name='monitor', loglevel=loglevel if tl else None, rpc_module_paths=['piker.ui.monitor'], + start_method='forkserver', ) @@ -196,7 +183,7 @@ def monitor(config, rate, name, dhost, test, tl): @click.argument('name', nargs=1, required=True) @click.pass_obj def record(config, rate, name, dhost, filename): - """Record client side quotes to file + """Record client side quotes to a file on disk """ # global opts brokermod = config['brokermod'] @@ -225,96 +212,6 @@ def record(config, rate, name, dhost, filename): click.echo(f"Data feed recording saved to {filename}") -@cli.group() -@click.option('--config_dir', '-d', default=_watchlists_data_path, - help='Path to piker configuration directory') -@click.pass_context -def watchlists(ctx, config_dir): - """Watchlists commands and operations - """ - loglevel = ctx.parent.params['loglevel'] - get_console_log(loglevel) # activate console logging - - wl.make_config_dir(_config_dir) - ctx.ensure_object(dict) - ctx.obj = {'path': config_dir, - 'watchlist': wl.ensure_watchlists(config_dir)} - - -@watchlists.command(help='show watchlist') -@click.argument('name', nargs=1, required=False) -@click.pass_context -def show(ctx, name): - watchlist = wl.merge_watchlist(ctx.obj['watchlist'], wl._builtins) - click.echo(colorize_json( - watchlist if name is None else watchlist[name])) - - -@watchlists.command(help='load passed in watchlist') -@click.argument('data', nargs=1, required=True) -@click.pass_context -def load(ctx, data): - try: - wl.write_to_file(json.loads(data), ctx.obj['path']) - except (json.JSONDecodeError, IndexError): - click.echo('You have passed an invalid text respresentation of a ' - 'JSON object. Try again.') - - -@watchlists.command(help='add ticker to watchlist') -@click.argument('name', nargs=1, required=True) -@click.argument('ticker_names', nargs=-1, required=True) -@click.pass_context -def add(ctx, name, ticker_names): - for ticker in ticker_names: - watchlist = wl.add_ticker( - name, ticker, ctx.obj['watchlist']) - wl.write_to_file(watchlist, ctx.obj['path']) - - -@watchlists.command(help='remove ticker from watchlist') -@click.argument('name', nargs=1, required=True) -@click.argument('ticker_name', nargs=1, required=True) -@click.pass_context -def remove(ctx, name, ticker_name): - try: - watchlist = wl.remove_ticker(name, ticker_name, ctx.obj['watchlist']) - except KeyError: - log.error(f"No watchlist with name `{name}` could be found?") - except ValueError: - if name in wl._builtins and ticker_name in wl._builtins[name]: - log.error(f"Can not remove ticker `{ticker_name}` from built-in " - f"list `{name}`") - else: - log.error(f"Ticker `{ticker_name}` not found in list `{name}`") - else: - wl.write_to_file(watchlist, ctx.obj['path']) - - -@watchlists.command(help='delete watchlist group') -@click.argument('name', nargs=1, required=True) -@click.pass_context -def delete(ctx, name): - watchlist = wl.delete_group(name, ctx.obj['watchlist']) - wl.write_to_file(watchlist, ctx.obj['path']) - - -@watchlists.command(help='merge a watchlist from another user') -@click.argument('watchlist_to_merge', nargs=1, required=True) -@click.pass_context -def merge(ctx, watchlist_to_merge): - merged_watchlist = wl.merge_watchlist(json.loads(watchlist_to_merge), - ctx.obj['watchlist']) - wl.write_to_file(merged_watchlist, ctx.obj['path']) - - -@watchlists.command(help='dump text respresentation of a watchlist to console') -@click.argument('name', nargs=1, required=False) -@click.pass_context -def dump(ctx, name): - click.echo(json.dumps(ctx.obj['watchlist'])) - - # options utils @cli.command() @@ -325,7 +222,8 @@ def dump(ctx, name): @click.argument('symbol', required=True) @click.pass_context def contracts(ctx, loglevel, broker, symbol, ids): - + """Get list of all option contracts for symbol + """ brokermod = get_brokermod(broker) get_console_log(loglevel) @@ -348,8 +246,7 @@ def contracts(ctx, loglevel, broker, symbol, ids): @click.argument('symbol', required=True) @click.pass_obj def optsquote(config, symbol, df_output, date): - """Retreive symbol quotes on the console in either - json or dataframe format. + """Retreive symbol option quotes on the console """ # global opts brokermod = config['brokermod'] @@ -381,18 +278,18 @@ def optsquote(config, symbol, df_output, date): @click.argument('symbol', required=True) @click.pass_obj def optschain(config, symbol, date, tl, rate, test): - """Start the real-time option chain UI. + """Start an option chain UI """ # global opts loglevel = config['loglevel'] brokername = config['broker'] - from .ui.option_chain import _async_main + from ..ui.option_chain import _async_main async def main(tries): async with maybe_spawn_brokerd_as_subactor( tries=tries, loglevel=loglevel - ) as portal: + ): # run app "main" await _async_main( symbol, @@ -406,4 +303,5 @@ def optschain(config, symbol, date, tl, rate, test): partial(main, tries=1), name='kivy-options-chain', loglevel=loglevel if tl else None, + start_method='forkserver', ) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 2fe6b736..d2c958e6 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -110,3 +110,14 @@ async def contracts( async with brokermod.get_client() as client: # return await client.get_all_contracts([symbol]) return await client.get_all_contracts([symbol]) + + +async def bars( + brokermod: ModuleType, + symbol: str, + **kwargs, +) -> Dict[str, Dict[str, Dict[str, Any]]]: + """Return option contracts (all expiries) for ``symbol``. + """ + async with brokermod.get_client() as client: + return await client.bars(symbol, **kwargs) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 51444917..8d03d684 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -90,7 +90,7 @@ async def stream_requests( """Stream requests for quotes for a set of symbols at the given ``rate`` (per second). - A stock-broker client ``get_quotes()`` async context manager must be + A stock-broker client ``get_quotes()`` async function must be provided which returns an async quote retrieval function. """ broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) @@ -268,7 +268,7 @@ async def start_quote_stream( Spawns new quoter tasks for each broker backend on-demand. Since most brokers seems to support batch quote requests we - limit to one task per process for now. + limit to one task per process (for now). """ # XXX: why do we need this again? get_console_log(tractor.current_actor().loglevel) @@ -313,7 +313,7 @@ async def start_quote_stream( await stream_requests( - # pub required kwargs + # ``msg.pub`` required kwargs task_name=feed_type, ctx=ctx, topics=symbols, @@ -358,7 +358,7 @@ class DataFeed: feed_type: str, rate: int = 1, diff_cached: bool = True, - test: bool = None, + test: str = '', ) -> (AsyncGenerator, dict): if feed_type not in self._allowed: raise ValueError(f"Only feed types {self._allowed} are supported") @@ -416,6 +416,8 @@ class DataFeed: raise def format_quotes(self, quotes, symbol_data={}): + """Format ``quotes`` using broker defined formatter. + """ self._symbol_data_cache.update(symbol_data) formatter = getattr(self.brokermod, f'format_{self._quote_type}_quote') records, displayables = zip(*[ @@ -449,7 +451,7 @@ async def stream_to_file( # an async generator instance agen = await portal.run( "piker.brokers.data", 'start_quote_stream', - broker=brokermod.name, tickers=tickers) + broker=brokermod.name, symbols=tickers) fname = filename or f'{watchlist_name}.jsonstream' with open(fname, 'a') as f: diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 67e3988f..37526890 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -9,14 +9,17 @@ from functools import partial import configparser from typing import List, Tuple, Dict, Any, Iterator, NamedTuple +import arrow import trio from async_generator import asynccontextmanager +import pandas as pd +import numpy as np import wrapt import asks from ..calc import humanize, percent_change from . import config -from ._util import resproc, BrokerError +from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json from .._async_utils import async_lifo_cache @@ -30,6 +33,25 @@ _version = 'v1' # it seems 4 rps is best we can do total _rate_limit = 4 +_time_frames = { + '1m': 'OneMinute', + '2m': 'TwoMinutes', + '3m': 'ThreeMinutes', + '4m': 'FourMinutes', + '5m': 'FiveMinutes', + '10m': 'TenMinutes', + '15m': 'FifteenMinutes', + '20m': 'TwentyMinutes', + '30m': 'HalfHour', + '1h': 'OneHour', + '2h': 'TwoHours', + '4h': 'FourHours', + 'D': 'OneDay', + 'W': 'OneWeek', + 'M': 'OneMonth', + 'Y': 'OneYear', +} + class QuestradeError(Exception): "Non-200 OK response code" @@ -70,9 +92,9 @@ def refresh_token_on_err(tries=3): if "Access token is invalid" not in str(qterr.args[0]): raise # TODO: this will crash when run from a sub-actor since - # STDIN can't be acquired. The right way to handle this - # is to make a request to the parent actor (i.e. - # spawner of this) to call this + # STDIN can't be acquired (ONLY WITH MP). The right way + # to handle this is to make a request to the parent + # actor (i.e. spawner of this) to call this # `client.ensure_access()` locally thus blocking until # the user provides an API key on the "client side" log.warning(f"Tokens are invalid refreshing try {i}..") @@ -168,8 +190,18 @@ class _API: quote['key'] = quote['symbol'] return quotes - async def candles(self, id: str, start: str, end, interval) -> dict: - return await self._get(f'markets/candles/{id}', params={}) + async def candles( + self, symbol_id: + str, start: str, + end: str, + interval: str + ) -> List[Dict[str, float]]: + """Retrieve historical candles for provided date range. + """ + return (await self._get( + f'markets/candles/{symbol_id}', + params={'startTime': start, 'endTime': end, 'interval': interval}, + ))['candles'] async def option_contracts(self, symbol_id: str) -> dict: "Retrieve all option contract API ids with expiry -> strike prices." @@ -193,7 +225,7 @@ class _API: for (symbol, symbol_id, expiry), bystrike in contracts.items() ] resp = await self._sess.post( - path=f'/markets/quotes/options', + path='/markets/quotes/options', # XXX: b'{"code":1024,"message":"The size of the array requested # is not valid: optionIds"}' # ^ what I get when trying to use too many ids manually... @@ -349,7 +381,10 @@ class Client: return data - async def tickers2ids(self, tickers): + async def tickers2ids( + self, + tickers: Iterator[str] + ) -> Dict[str, int]: """Helper routine that take a sequence of ticker symbols and returns their corresponding QT numeric symbol ids. @@ -362,7 +397,7 @@ class Client: if id is not None: symbols2ids[symbol] = id - # still missing uncached values - hit the server + # still missing uncached values - hit the api server to_lookup = list(set(tickers) - set(symbols2ids)) if to_lookup: data = await self.api.symbols(names=','.join(to_lookup)) @@ -511,6 +546,92 @@ class Client: return quotes + async def bars( + self, + symbol: str, + # EST in ISO 8601 format is required... below is EPOCH + start_date: str = "1970-01-01T00:00:00.000000-05:00", + time_frame: str = '1m', + count: float = 20e3, + is_paid_feed: bool = False, + ) -> List[Dict[str, Any]]: + """Retreive OHLCV bars for a symbol over a range to the present. + + .. note:: + The candles endpoint only allows "2000" points per query + however tests here show that it is 20k candles per query. + """ + # fix case + if symbol.islower(): + symbol = symbol.swapcase() + + sids = await self.tickers2ids([symbol]) + if not sids: + raise SymbolNotFound(symbol) + + sid = sids[symbol] + + # get last market open end time + est_end = now = arrow.utcnow().to('US/Eastern').floor('minute') + # on non-paid feeds we can't retreive the first 15 mins + wd = now.isoweekday() + if wd > 5: + quotes = await self.quote([symbol]) + est_end = arrow.get(quotes[0]['lastTradeTime']) + if est_end.hour == 0: + # XXX don't bother figuring out extended hours for now + est_end = est_end.replace(hour=17) + + if not is_paid_feed: + est_end = est_end.shift(minutes=-15) + + est_start = est_end.shift(minutes=-count) + + start = time.time() + bars = await self.api.candles( + sid, + start=est_start.isoformat(), + end=est_end.isoformat(), + interval=_time_frames[time_frame], + ) + log.debug( + f"Took {time.time() - start} seconds to retreive {len(bars)} bars") + return bars + + +# marketstore TSD compatible numpy dtype for bar +_qt_bars_dt = [ + ('Epoch', 'i8'), + # ('start', 'S40'), + # ('end', 'S40'), + ('low', 'f4'), + ('high', 'f4'), + ('open', 'f4'), + ('close', 'f4'), + ('volume', 'i8'), + # ('VWAP', 'f4') +] + + +def get_OHLCV( + bar: Dict[str, Any] +) -> Tuple[str, Any]: + """Return a marketstore key-compatible OHCLV dictionary. + """ + del bar['end'] + del bar['VWAP'] + bar['start'] = pd.Timestamp(bar['start']).value/10**9 + return tuple(bar.values()) + + +def bars_to_marketstore_structarray( + bars: List[Dict[str, Any]] +) -> np.array: + """Return marketstore writeable recarray from sequence of bars + retrieved via the ``candles`` endpoint. + """ + return np.array(list(map(get_OHLCV, bars)), dtype=_qt_bars_dt) + async def token_refresher(client): """Coninually refresh the ``access_token`` near its expiry time. @@ -549,7 +670,7 @@ def get_config( has_token = section.get('refresh_token') if section else False if force_from_user or ask_user_on_failure and not (section or has_token): - log.warn(f"Forcing manual token auth from user") + log.warn("Forcing manual token auth from user") _token_from_user(conf) else: if not section: @@ -634,7 +755,7 @@ async def option_quoter(client: Client, tickers: List[str]): if isinstance(tickers[0], tuple): datetime.fromisoformat(tickers[0][1]) else: - raise ValueError(f'Option subscription format is (symbol, expiry)') + raise ValueError('Option subscription format is (symbol, expiry)') @async_lifo_cache(maxsize=128) async def get_contract_by_date( @@ -679,7 +800,7 @@ _qt_stock_keys = { 'VWAP': ('VWAP', partial(round, ndigits=3)), 'MC': ('MC', humanize), '$ vol': ('$ vol', humanize), - 'volume': ('vol', humanize), + 'volume': ('volume', humanize), # 'close': 'close', # 'openPrice': 'open', 'lowPrice': 'low', @@ -687,8 +808,8 @@ _qt_stock_keys = { # 'low52w': 'low52w', # put in info widget # 'high52w': 'high52w', # "lastTradePriceTrHrs": 7.99, - # 'lastTradeTime': ('time', datetime.fromisoformat), - # "lastTradeTick": "Equal", + 'lastTradeTime': ('fill_time', datetime.fromisoformat), + "lastTradeTick": 'tick', # ("Equal", "Up", "Down") # "symbolId": 3575753, # "tier": "", # 'isHalted': 'halted', # as subscript 'h' @@ -696,12 +817,12 @@ _qt_stock_keys = { } # BidAskLayout columns which will contain three cells the first stacked on top -# of the other 2 +# of the other 2 (this is a UI layout instruction) _stock_bidasks = { 'last': ['bid', 'ask'], 'size': ['bsize', 'asize'], 'VWAP': ['low', 'high'], - 'vol': ['MC', '$ vol'], + 'volume': ['MC', '$ vol'], } diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py new file mode 100644 index 00000000..cc528f8a --- /dev/null +++ b/piker/cli/__init__.py @@ -0,0 +1,70 @@ +""" +CLI commons. +""" +import os + +import click +import tractor + +from ..log import get_console_log, get_logger +from ..brokers import get_brokermod, config +from ..brokers.core import _data_mods + +log = get_logger('cli') +DEFAULT_BROKER = 'questrade' + +_config_dir = click.get_app_dir('piker') +_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') +_context_defaults = dict( + default_map={ + 'monitor': { + 'rate': 3, + }, + 'optschain': { + 'rate': 1, + }, + } +) + + +@click.command() +@click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') +def pikerd(loglevel, host, tl): + """Spawn the piker broker-daemon. + """ + get_console_log(loglevel) + tractor.run_daemon( + rpc_module_paths=_data_mods, + name='brokerd', + loglevel=loglevel if tl else None, + ) + + +@click.group(context_settings=_context_defaults) +@click.option('--broker', '-b', default=DEFAULT_BROKER, + help='Broker backend to use') +@click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.option('--configdir', '-c', help='Configuration directory') +@click.pass_context +def cli(ctx, broker, loglevel, configdir): + if configdir is not None: + assert os.path.isdir(configdir), f"`{configdir}` is not a valid path" + config._override_config_dir(configdir) + + ctx.ensure_object(dict) + ctx.obj.update({ + 'broker': broker, + 'brokermod': get_brokermod(broker), + 'loglevel': loglevel, + 'log': get_console_log(loglevel), + 'confdir': _config_dir, + 'wl_path': _watchlists_data_path, + }) + + +# load downstream cli modules +from ..brokers import cli as _ +from ..watchlists import cli as _ +from ..data import marketstore as _ diff --git a/piker/ui/monitor.py b/piker/ui/monitor.py index 7f8502e0..a7778fbf 100644 --- a/piker/ui/monitor.py +++ b/piker/ui/monitor.py @@ -73,7 +73,7 @@ async def update_quotes( tick_color = None last = cells.get('last') if not last: - vol = cells.get('vol') + vol = cells.get('volume') if not vol: return # no trade exec took place @@ -163,20 +163,23 @@ async def stream_symbol_selection(): async def _async_main( name: str, portal: tractor._portal.Portal, - tickers: List[str], + symbols: List[str], brokermod: ModuleType, loglevel: str = 'info', rate: int = 3, - test: bool = False + test: str = '', ) -> None: '''Launch kivy app + all other related tasks. This is started with cli cmd `piker monitor`. ''' feed = DataFeed(portal, brokermod) - quote_gen, quotes = await feed.open_stream( - tickers, 'stock', rate=rate) + symbols, + 'stock', + rate=rate, + test=test, + ) first_quotes, _ = feed.format_quotes(quotes) diff --git a/piker/watchlists.py b/piker/watchlists/__init__.py similarity index 97% rename from piker/watchlists.py rename to piker/watchlists/__init__.py index 7fc532ba..707c602f 100644 --- a/piker/watchlists.py +++ b/piker/watchlists/__init__.py @@ -2,7 +2,7 @@ import os import json from collections import defaultdict -from .log import get_logger +from ..log import get_logger log = get_logger(__name__) diff --git a/piker/watchlists/cli.py b/piker/watchlists/cli.py new file mode 100644 index 00000000..7ed089d1 --- /dev/null +++ b/piker/watchlists/cli.py @@ -0,0 +1,107 @@ + +""" +Watchlist management commands. +""" +import os +import json + +import click + +from .. import watchlists as wl +from ..cli import cli +from ..log import get_console_log, colorize_json, get_logger + +log = get_logger('watchlist-cli') + +_config_dir = click.get_app_dir('piker') +_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') + + +@cli.group() +@click.option('--config_dir', '-d', default=_watchlists_data_path, + help='Path to piker configuration directory') +@click.pass_context +def watchlists(ctx, config_dir): + """Watchlists commands and operations + """ + loglevel = ctx.parent.params['loglevel'] + get_console_log(loglevel) # activate console logging + + wl.make_config_dir(_config_dir) + ctx.ensure_object(dict) + ctx.obj = {'path': config_dir, + 'watchlist': wl.ensure_watchlists(config_dir)} + + +@watchlists.command(help='show watchlist') +@click.argument('name', nargs=1, required=False) +@click.pass_context +def show(ctx, name): + watchlist = wl.merge_watchlist(ctx.obj['watchlist'], wl._builtins) + click.echo(colorize_json( + watchlist if name is None else watchlist[name])) + + +@watchlists.command(help='load passed in watchlist') +@click.argument('data', nargs=1, required=True) +@click.pass_context +def load(ctx, data): + try: + wl.write_to_file(json.loads(data), ctx.obj['path']) + except (json.JSONDecodeError, IndexError): + click.echo('You have passed an invalid text respresentation of a ' + 'JSON object. Try again.') + + +@watchlists.command(help='add ticker to watchlist') +@click.argument('name', nargs=1, required=True) +@click.argument('ticker_names', nargs=-1, required=True) +@click.pass_context +def add(ctx, name, ticker_names): + for ticker in ticker_names: + watchlist = wl.add_ticker( + name, ticker, ctx.obj['watchlist']) + wl.write_to_file(watchlist, ctx.obj['path']) + + +@watchlists.command(help='remove ticker from watchlist') +@click.argument('name', nargs=1, required=True) +@click.argument('ticker_name', nargs=1, required=True) +@click.pass_context +def remove(ctx, name, ticker_name): + try: + watchlist = wl.remove_ticker(name, ticker_name, ctx.obj['watchlist']) + except KeyError: + log.error(f"No watchlist with name `{name}` could be found?") + except ValueError: + if name in wl._builtins and ticker_name in wl._builtins[name]: + log.error(f"Can not remove ticker `{ticker_name}` from built-in " + f"list `{name}`") + else: + log.error(f"Ticker `{ticker_name}` not found in list `{name}`") + else: + wl.write_to_file(watchlist, ctx.obj['path']) + + +@watchlists.command(help='delete watchlist group') +@click.argument('name', nargs=1, required=True) +@click.pass_context +def delete(ctx, name): + watchlist = wl.delete_group(name, ctx.obj['watchlist']) + wl.write_to_file(watchlist, ctx.obj['path']) + + +@watchlists.command(help='merge a watchlist from another user') +@click.argument('watchlist_to_merge', nargs=1, required=True) +@click.pass_context +def merge(ctx, watchlist_to_merge): + merged_watchlist = wl.merge_watchlist(json.loads(watchlist_to_merge), + ctx.obj['watchlist']) + wl.write_to_file(merged_watchlist, ctx.obj['path']) + + +@watchlists.command(help='dump text respresentation of a watchlist to console') +@click.argument('name', nargs=1, required=False) +@click.pass_context +def dump(ctx, name): + click.echo(json.dumps(ctx.obj['watchlist']))