Merge pull request #92 from pikers/questrade_candles

Questrade candles
relicense_as_agplv3
goodboy 2020-06-01 13:44:54 -05:00 committed by GitHub
commit 9eddfa7b3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 437 additions and 192 deletions

View File

@ -5,22 +5,51 @@ Trading gear for hackers.
|travis|
``piker`` is an attempt at a pro-grade, broker agnostic, next-gen FOSS toolset for real-time
trading and financial analysis.
trading and financial analysis targetted at hardcore Linux users.
It tries to use as much cutting edge tech as possible including (but not limited to):
It tries to use as much bleeding edge tech as possible including (but not limited to):
- Python 3.7+
- trio_
- tractor_
- kivy_
- Python 3.7+ for glue and business logic
- trio_ for async
- tractor_ as the underlying actor model
- marketstore_ for historical data persistence and sharing
- Qt_ for pristine high performance UIs
.. |travis| image:: https://img.shields.io/travis/pikers/piker/master.svg
:target: https://travis-ci.org/pikers/piker
.. _trio: https://github.com/python-trio/trio
.. _tractor: https://github.com/goodboy/tractor
.. _kivy: https://kivy.org
.. _marketstore: https://github.com/alpacahq/marketstore
.. _Qt: https://www.qt.io/
Also, we're always open to new framework suggestions and ideas!
Focus and Features:
*******************
- 100% decentralized: running your code on your hardware with your
broker's data feeds **is the point** (this is not a web-based *I
don't know how to run my own system* project).
- Built on a highly reliable "next-gen" [actor
model](https://github.com/goodboy/tractor) with built in async
streaming and scalability protocols allowing us to utilize
a distributed architecture from the ground up.
- Privacy: your orders, indicators, algos are all run client side and
are shared only with the (groups of) traders you specify.
- Production grade, highly attractive native UIs that feel and fit like
a proper pair of skinny jeans; only meant to be used with a proper
tiling window manager (no, we are not ignorant enough to roll our own).
- Sophisticated charting capable of processing large data sets in real-time
while sanely displaying complex models and strategy systems.
- Built-in support for *hipstery* indicators and studies that you
probably haven't heard of but that the authors **know** generate alpha
when paired with the right strategies.
- Emphasis on collaboration through sharing of data, ideas, and processing
power.
- Adoption is very low priority, especially if you're not an experienced
trader; the system is not built for sale it is built for *people*.
- No, we will never have a "corporation friendly license"; if you intend to use
this code base we must know about it.
Fitting with these tenets, we're always open to new framework suggestions and ideas.
Building the best looking, most reliable, keyboard friendly trading platform is the dream.
Feel free to pipe in with your ideas and quiffs.

View File

@ -12,6 +12,10 @@ class BrokerError(Exception):
"Generic broker issue"
class SymbolNotFound(BrokerError):
"Symbol not found by broker search"
def resproc(
resp: asks.response_objects.Response,
log: logging.Logger,

View File

@ -1,9 +1,8 @@
"""
Console interface to broker client/daemons.
"""
from functools import partial
import json
import os
from functools import partial
from operator import attrgetter
from operator import itemgetter
@ -12,62 +11,17 @@ import pandas as pd
import trio
import tractor
from . import watchlists as wl
from .log import get_console_log, colorize_json, get_logger
from .brokers import core, get_brokermod, data, config
from .brokers.core import maybe_spawn_brokerd_as_subactor, _data_mods
from ..cli import cli
from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger
from ..brokers.core import maybe_spawn_brokerd_as_subactor
from ..brokers import core, get_brokermod, data
log = get_logger('cli')
DEFAULT_BROKER = 'questrade'
_config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
_context_defaults = dict(
default_map={
'monitor': {
'rate': 3,
},
'optschain': {
'rate': 1,
},
}
)
@click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
def pikerd(loglevel, host, tl):
"""Spawn the piker broker-daemon.
"""
get_console_log(loglevel)
tractor.run_daemon(
rpc_module_paths=_data_mods,
name='brokerd',
loglevel=loglevel if tl else None,
)
@click.group(context_settings=_context_defaults)
@click.option('--broker', '-b', default=DEFAULT_BROKER,
help='Broker backend to use')
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--configdir', '-c', help='Configuration directory')
@click.pass_context
def cli(ctx, broker, loglevel, configdir):
if configdir is not None:
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
config._override_config_dir(configdir)
# ensure that ctx.obj exists even though we aren't using it (yet)
ctx.ensure_object(dict)
ctx.obj.update({
'broker': broker,
'brokermod': get_brokermod(broker),
'loglevel': loglevel,
'log': get_console_log(loglevel),
})
@cli.command()
@ -77,7 +31,7 @@ def cli(ctx, broker, loglevel, configdir):
@click.argument('kwargs', nargs=-1)
@click.pass_obj
def api(config, meth, kwargs, keys):
"""client for testing broker API methods with pretty printing of output.
"""Make a broker-client API method call
"""
# global opts
broker = config['broker']
@ -114,8 +68,7 @@ def api(config, meth, kwargs, keys):
@click.argument('tickers', nargs=-1, required=True)
@click.pass_obj
def quote(config, tickers, df_output):
"""Retreive symbol quotes on the console in either json or dataframe
format.
"""Print symbol quotes to the console
"""
# global opts
brokermod = config['brokermod']
@ -132,11 +85,10 @@ def quote(config, tickers, df_output):
brokermod.log.warn(f"Could not find symbol {ticker}?")
if df_output:
cols = next(filter(bool, quotes.values())).copy()
cols = next(filter(bool, quotes)).copy()
cols.pop('symbol')
df = pd.DataFrame(
(quote or {} for quote in quotes.values()),
index=quotes.keys(),
(quote or {} for quote in quotes),
columns=cols,
)
click.echo(df)
@ -144,6 +96,40 @@ def quote(config, tickers, df_output):
click.echo(colorize_json(quotes))
@cli.command()
@click.option('--df-output', '-df', flag_value=True,
help='Output in `pandas.DataFrame` format')
@click.option('--count', '-c', default=100,
help='Number of bars to retrieve')
@click.argument('symbol', required=True)
@click.pass_obj
def bars(config, symbol, count, df_output):
"""Retreive 1m bars for symbol and print on the console
"""
# global opts
brokermod = config['brokermod']
# broker backend should return at the least a
# list of candle dictionaries
bars = trio.run(
partial(
core.bars,
brokermod,
symbol,
count=count,
)
)
if not bars:
log.error(f"No quotes could be found for {symbol}?")
return
if df_output:
click.echo(pd.DataFrame(bars))
else:
click.echo(colorize_json(bars))
@cli.command()
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--rate', '-r', default=3, help='Quote rate limit')
@ -153,7 +139,7 @@ def quote(config, tickers, df_output):
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def monitor(config, rate, name, dhost, test, tl):
"""Spawn a real-time watchlist.
"""Start a real-time watchlist UI
"""
# global opts
brokermod = config['brokermod']
@ -167,7 +153,7 @@ def monitor(config, rate, name, dhost, test, tl):
log.error(f"No symbols found for watchlist `{name}`?")
return
from .ui.monitor import _async_main
from ..ui.monitor import _async_main
async def main(tries):
async with maybe_spawn_brokerd_as_subactor(
@ -184,6 +170,7 @@ def monitor(config, rate, name, dhost, test, tl):
name='monitor',
loglevel=loglevel if tl else None,
rpc_module_paths=['piker.ui.monitor'],
start_method='forkserver',
)
@ -196,7 +183,7 @@ def monitor(config, rate, name, dhost, test, tl):
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def record(config, rate, name, dhost, filename):
"""Record client side quotes to file
"""Record client side quotes to a file on disk
"""
# global opts
brokermod = config['brokermod']
@ -225,96 +212,6 @@ def record(config, rate, name, dhost, filename):
click.echo(f"Data feed recording saved to {filename}")
@cli.group()
@click.option('--config_dir', '-d', default=_watchlists_data_path,
help='Path to piker configuration directory')
@click.pass_context
def watchlists(ctx, config_dir):
"""Watchlists commands and operations
"""
loglevel = ctx.parent.params['loglevel']
get_console_log(loglevel) # activate console logging
wl.make_config_dir(_config_dir)
ctx.ensure_object(dict)
ctx.obj = {'path': config_dir,
'watchlist': wl.ensure_watchlists(config_dir)}
@watchlists.command(help='show watchlist')
@click.argument('name', nargs=1, required=False)
@click.pass_context
def show(ctx, name):
watchlist = wl.merge_watchlist(ctx.obj['watchlist'], wl._builtins)
click.echo(colorize_json(
watchlist if name is None else watchlist[name]))
@watchlists.command(help='load passed in watchlist')
@click.argument('data', nargs=1, required=True)
@click.pass_context
def load(ctx, data):
try:
wl.write_to_file(json.loads(data), ctx.obj['path'])
except (json.JSONDecodeError, IndexError):
click.echo('You have passed an invalid text respresentation of a '
'JSON object. Try again.')
@watchlists.command(help='add ticker to watchlist')
@click.argument('name', nargs=1, required=True)
@click.argument('ticker_names', nargs=-1, required=True)
@click.pass_context
def add(ctx, name, ticker_names):
for ticker in ticker_names:
watchlist = wl.add_ticker(
name, ticker, ctx.obj['watchlist'])
wl.write_to_file(watchlist, ctx.obj['path'])
@watchlists.command(help='remove ticker from watchlist')
@click.argument('name', nargs=1, required=True)
@click.argument('ticker_name', nargs=1, required=True)
@click.pass_context
def remove(ctx, name, ticker_name):
try:
watchlist = wl.remove_ticker(name, ticker_name, ctx.obj['watchlist'])
except KeyError:
log.error(f"No watchlist with name `{name}` could be found?")
except ValueError:
if name in wl._builtins and ticker_name in wl._builtins[name]:
log.error(f"Can not remove ticker `{ticker_name}` from built-in "
f"list `{name}`")
else:
log.error(f"Ticker `{ticker_name}` not found in list `{name}`")
else:
wl.write_to_file(watchlist, ctx.obj['path'])
@watchlists.command(help='delete watchlist group')
@click.argument('name', nargs=1, required=True)
@click.pass_context
def delete(ctx, name):
watchlist = wl.delete_group(name, ctx.obj['watchlist'])
wl.write_to_file(watchlist, ctx.obj['path'])
@watchlists.command(help='merge a watchlist from another user')
@click.argument('watchlist_to_merge', nargs=1, required=True)
@click.pass_context
def merge(ctx, watchlist_to_merge):
merged_watchlist = wl.merge_watchlist(json.loads(watchlist_to_merge),
ctx.obj['watchlist'])
wl.write_to_file(merged_watchlist, ctx.obj['path'])
@watchlists.command(help='dump text respresentation of a watchlist to console')
@click.argument('name', nargs=1, required=False)
@click.pass_context
def dump(ctx, name):
click.echo(json.dumps(ctx.obj['watchlist']))
# options utils
@cli.command()
@ -325,7 +222,8 @@ def dump(ctx, name):
@click.argument('symbol', required=True)
@click.pass_context
def contracts(ctx, loglevel, broker, symbol, ids):
"""Get list of all option contracts for symbol
"""
brokermod = get_brokermod(broker)
get_console_log(loglevel)
@ -348,8 +246,7 @@ def contracts(ctx, loglevel, broker, symbol, ids):
@click.argument('symbol', required=True)
@click.pass_obj
def optsquote(config, symbol, df_output, date):
"""Retreive symbol quotes on the console in either
json or dataframe format.
"""Retreive symbol option quotes on the console
"""
# global opts
brokermod = config['brokermod']
@ -381,18 +278,18 @@ def optsquote(config, symbol, df_output, date):
@click.argument('symbol', required=True)
@click.pass_obj
def optschain(config, symbol, date, tl, rate, test):
"""Start the real-time option chain UI.
"""Start an option chain UI
"""
# global opts
loglevel = config['loglevel']
brokername = config['broker']
from .ui.option_chain import _async_main
from ..ui.option_chain import _async_main
async def main(tries):
async with maybe_spawn_brokerd_as_subactor(
tries=tries, loglevel=loglevel
) as portal:
):
# run app "main"
await _async_main(
symbol,
@ -406,4 +303,5 @@ def optschain(config, symbol, date, tl, rate, test):
partial(main, tries=1),
name='kivy-options-chain',
loglevel=loglevel if tl else None,
start_method='forkserver',
)

View File

@ -110,3 +110,14 @@ async def contracts(
async with brokermod.get_client() as client:
# return await client.get_all_contracts([symbol])
return await client.get_all_contracts([symbol])
async def bars(
brokermod: ModuleType,
symbol: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return option contracts (all expiries) for ``symbol``.
"""
async with brokermod.get_client() as client:
return await client.bars(symbol, **kwargs)

View File

@ -90,7 +90,7 @@ async def stream_requests(
"""Stream requests for quotes for a set of symbols at the given
``rate`` (per second).
A stock-broker client ``get_quotes()`` async context manager must be
A stock-broker client ``get_quotes()`` async function must be
provided which returns an async quote retrieval function.
"""
broker_limit = getattr(feed.mod, '_rate_limit', float('inf'))
@ -268,7 +268,7 @@ async def start_quote_stream(
Spawns new quoter tasks for each broker backend on-demand.
Since most brokers seems to support batch quote requests we
limit to one task per process for now.
limit to one task per process (for now).
"""
# XXX: why do we need this again?
get_console_log(tractor.current_actor().loglevel)
@ -313,7 +313,7 @@ async def start_quote_stream(
await stream_requests(
# pub required kwargs
# ``msg.pub`` required kwargs
task_name=feed_type,
ctx=ctx,
topics=symbols,
@ -358,7 +358,7 @@ class DataFeed:
feed_type: str,
rate: int = 1,
diff_cached: bool = True,
test: bool = None,
test: str = '',
) -> (AsyncGenerator, dict):
if feed_type not in self._allowed:
raise ValueError(f"Only feed types {self._allowed} are supported")
@ -416,6 +416,8 @@ class DataFeed:
raise
def format_quotes(self, quotes, symbol_data={}):
"""Format ``quotes`` using broker defined formatter.
"""
self._symbol_data_cache.update(symbol_data)
formatter = getattr(self.brokermod, f'format_{self._quote_type}_quote')
records, displayables = zip(*[
@ -449,7 +451,7 @@ async def stream_to_file(
# an async generator instance
agen = await portal.run(
"piker.brokers.data", 'start_quote_stream',
broker=brokermod.name, tickers=tickers)
broker=brokermod.name, symbols=tickers)
fname = filename or f'{watchlist_name}.jsonstream'
with open(fname, 'a') as f:

View File

@ -9,14 +9,17 @@ from functools import partial
import configparser
from typing import List, Tuple, Dict, Any, Iterator, NamedTuple
import arrow
import trio
from async_generator import asynccontextmanager
import pandas as pd
import numpy as np
import wrapt
import asks
from ..calc import humanize, percent_change
from . import config
from ._util import resproc, BrokerError
from ._util import resproc, BrokerError, SymbolNotFound
from ..log import get_logger, colorize_json
from .._async_utils import async_lifo_cache
@ -30,6 +33,25 @@ _version = 'v1'
# it seems 4 rps is best we can do total
_rate_limit = 4
_time_frames = {
'1m': 'OneMinute',
'2m': 'TwoMinutes',
'3m': 'ThreeMinutes',
'4m': 'FourMinutes',
'5m': 'FiveMinutes',
'10m': 'TenMinutes',
'15m': 'FifteenMinutes',
'20m': 'TwentyMinutes',
'30m': 'HalfHour',
'1h': 'OneHour',
'2h': 'TwoHours',
'4h': 'FourHours',
'D': 'OneDay',
'W': 'OneWeek',
'M': 'OneMonth',
'Y': 'OneYear',
}
class QuestradeError(Exception):
"Non-200 OK response code"
@ -70,9 +92,9 @@ def refresh_token_on_err(tries=3):
if "Access token is invalid" not in str(qterr.args[0]):
raise
# TODO: this will crash when run from a sub-actor since
# STDIN can't be acquired. The right way to handle this
# is to make a request to the parent actor (i.e.
# spawner of this) to call this
# STDIN can't be acquired (ONLY WITH MP). The right way
# to handle this is to make a request to the parent
# actor (i.e. spawner of this) to call this
# `client.ensure_access()` locally thus blocking until
# the user provides an API key on the "client side"
log.warning(f"Tokens are invalid refreshing try {i}..")
@ -168,8 +190,18 @@ class _API:
quote['key'] = quote['symbol']
return quotes
async def candles(self, id: str, start: str, end, interval) -> dict:
return await self._get(f'markets/candles/{id}', params={})
async def candles(
self, symbol_id:
str, start: str,
end: str,
interval: str
) -> List[Dict[str, float]]:
"""Retrieve historical candles for provided date range.
"""
return (await self._get(
f'markets/candles/{symbol_id}',
params={'startTime': start, 'endTime': end, 'interval': interval},
))['candles']
async def option_contracts(self, symbol_id: str) -> dict:
"Retrieve all option contract API ids with expiry -> strike prices."
@ -193,7 +225,7 @@ class _API:
for (symbol, symbol_id, expiry), bystrike in contracts.items()
]
resp = await self._sess.post(
path=f'/markets/quotes/options',
path='/markets/quotes/options',
# XXX: b'{"code":1024,"message":"The size of the array requested
# is not valid: optionIds"}'
# ^ what I get when trying to use too many ids manually...
@ -349,7 +381,10 @@ class Client:
return data
async def tickers2ids(self, tickers):
async def tickers2ids(
self,
tickers: Iterator[str]
) -> Dict[str, int]:
"""Helper routine that take a sequence of ticker symbols and returns
their corresponding QT numeric symbol ids.
@ -362,7 +397,7 @@ class Client:
if id is not None:
symbols2ids[symbol] = id
# still missing uncached values - hit the server
# still missing uncached values - hit the api server
to_lookup = list(set(tickers) - set(symbols2ids))
if to_lookup:
data = await self.api.symbols(names=','.join(to_lookup))
@ -511,6 +546,92 @@ class Client:
return quotes
async def bars(
self,
symbol: str,
# EST in ISO 8601 format is required... below is EPOCH
start_date: str = "1970-01-01T00:00:00.000000-05:00",
time_frame: str = '1m',
count: float = 20e3,
is_paid_feed: bool = False,
) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present.
.. note::
The candles endpoint only allows "2000" points per query
however tests here show that it is 20k candles per query.
"""
# fix case
if symbol.islower():
symbol = symbol.swapcase()
sids = await self.tickers2ids([symbol])
if not sids:
raise SymbolNotFound(symbol)
sid = sids[symbol]
# get last market open end time
est_end = now = arrow.utcnow().to('US/Eastern').floor('minute')
# on non-paid feeds we can't retreive the first 15 mins
wd = now.isoweekday()
if wd > 5:
quotes = await self.quote([symbol])
est_end = arrow.get(quotes[0]['lastTradeTime'])
if est_end.hour == 0:
# XXX don't bother figuring out extended hours for now
est_end = est_end.replace(hour=17)
if not is_paid_feed:
est_end = est_end.shift(minutes=-15)
est_start = est_end.shift(minutes=-count)
start = time.time()
bars = await self.api.candles(
sid,
start=est_start.isoformat(),
end=est_end.isoformat(),
interval=_time_frames[time_frame],
)
log.debug(
f"Took {time.time() - start} seconds to retreive {len(bars)} bars")
return bars
# marketstore TSD compatible numpy dtype for bar
_qt_bars_dt = [
('Epoch', 'i8'),
# ('start', 'S40'),
# ('end', 'S40'),
('low', 'f4'),
('high', 'f4'),
('open', 'f4'),
('close', 'f4'),
('volume', 'i8'),
# ('VWAP', 'f4')
]
def get_OHLCV(
bar: Dict[str, Any]
) -> Tuple[str, Any]:
"""Return a marketstore key-compatible OHCLV dictionary.
"""
del bar['end']
del bar['VWAP']
bar['start'] = pd.Timestamp(bar['start']).value/10**9
return tuple(bar.values())
def bars_to_marketstore_structarray(
bars: List[Dict[str, Any]]
) -> np.array:
"""Return marketstore writeable recarray from sequence of bars
retrieved via the ``candles`` endpoint.
"""
return np.array(list(map(get_OHLCV, bars)), dtype=_qt_bars_dt)
async def token_refresher(client):
"""Coninually refresh the ``access_token`` near its expiry time.
@ -549,7 +670,7 @@ def get_config(
has_token = section.get('refresh_token') if section else False
if force_from_user or ask_user_on_failure and not (section or has_token):
log.warn(f"Forcing manual token auth from user")
log.warn("Forcing manual token auth from user")
_token_from_user(conf)
else:
if not section:
@ -634,7 +755,7 @@ async def option_quoter(client: Client, tickers: List[str]):
if isinstance(tickers[0], tuple):
datetime.fromisoformat(tickers[0][1])
else:
raise ValueError(f'Option subscription format is (symbol, expiry)')
raise ValueError('Option subscription format is (symbol, expiry)')
@async_lifo_cache(maxsize=128)
async def get_contract_by_date(
@ -679,7 +800,7 @@ _qt_stock_keys = {
'VWAP': ('VWAP', partial(round, ndigits=3)),
'MC': ('MC', humanize),
'$ vol': ('$ vol', humanize),
'volume': ('vol', humanize),
'volume': ('volume', humanize),
# 'close': 'close',
# 'openPrice': 'open',
'lowPrice': 'low',
@ -687,8 +808,8 @@ _qt_stock_keys = {
# 'low52w': 'low52w', # put in info widget
# 'high52w': 'high52w',
# "lastTradePriceTrHrs": 7.99,
# 'lastTradeTime': ('time', datetime.fromisoformat),
# "lastTradeTick": "Equal",
'lastTradeTime': ('fill_time', datetime.fromisoformat),
"lastTradeTick": 'tick', # ("Equal", "Up", "Down")
# "symbolId": 3575753,
# "tier": "",
# 'isHalted': 'halted', # as subscript 'h'
@ -696,12 +817,12 @@ _qt_stock_keys = {
}
# BidAskLayout columns which will contain three cells the first stacked on top
# of the other 2
# of the other 2 (this is a UI layout instruction)
_stock_bidasks = {
'last': ['bid', 'ask'],
'size': ['bsize', 'asize'],
'VWAP': ['low', 'high'],
'vol': ['MC', '$ vol'],
'volume': ['MC', '$ vol'],
}

View File

@ -0,0 +1,70 @@
"""
CLI commons.
"""
import os
import click
import tractor
from ..log import get_console_log, get_logger
from ..brokers import get_brokermod, config
from ..brokers.core import _data_mods
log = get_logger('cli')
DEFAULT_BROKER = 'questrade'
_config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
_context_defaults = dict(
default_map={
'monitor': {
'rate': 3,
},
'optschain': {
'rate': 1,
},
}
)
@click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
def pikerd(loglevel, host, tl):
"""Spawn the piker broker-daemon.
"""
get_console_log(loglevel)
tractor.run_daemon(
rpc_module_paths=_data_mods,
name='brokerd',
loglevel=loglevel if tl else None,
)
@click.group(context_settings=_context_defaults)
@click.option('--broker', '-b', default=DEFAULT_BROKER,
help='Broker backend to use')
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--configdir', '-c', help='Configuration directory')
@click.pass_context
def cli(ctx, broker, loglevel, configdir):
if configdir is not None:
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
config._override_config_dir(configdir)
ctx.ensure_object(dict)
ctx.obj.update({
'broker': broker,
'brokermod': get_brokermod(broker),
'loglevel': loglevel,
'log': get_console_log(loglevel),
'confdir': _config_dir,
'wl_path': _watchlists_data_path,
})
# load downstream cli modules
from ..brokers import cli as _
from ..watchlists import cli as _
from ..data import marketstore as _

View File

@ -73,7 +73,7 @@ async def update_quotes(
tick_color = None
last = cells.get('last')
if not last:
vol = cells.get('vol')
vol = cells.get('volume')
if not vol:
return # no trade exec took place
@ -163,20 +163,23 @@ async def stream_symbol_selection():
async def _async_main(
name: str,
portal: tractor._portal.Portal,
tickers: List[str],
symbols: List[str],
brokermod: ModuleType,
loglevel: str = 'info',
rate: int = 3,
test: bool = False
test: str = '',
) -> None:
'''Launch kivy app + all other related tasks.
This is started with cli cmd `piker monitor`.
'''
feed = DataFeed(portal, brokermod)
quote_gen, quotes = await feed.open_stream(
tickers, 'stock', rate=rate)
symbols,
'stock',
rate=rate,
test=test,
)
first_quotes, _ = feed.format_quotes(quotes)

View File

@ -2,7 +2,7 @@ import os
import json
from collections import defaultdict
from .log import get_logger
from ..log import get_logger
log = get_logger(__name__)

View File

@ -0,0 +1,107 @@
"""
Watchlist management commands.
"""
import os
import json
import click
from .. import watchlists as wl
from ..cli import cli
from ..log import get_console_log, colorize_json, get_logger
log = get_logger('watchlist-cli')
_config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
@cli.group()
@click.option('--config_dir', '-d', default=_watchlists_data_path,
help='Path to piker configuration directory')
@click.pass_context
def watchlists(ctx, config_dir):
"""Watchlists commands and operations
"""
loglevel = ctx.parent.params['loglevel']
get_console_log(loglevel) # activate console logging
wl.make_config_dir(_config_dir)
ctx.ensure_object(dict)
ctx.obj = {'path': config_dir,
'watchlist': wl.ensure_watchlists(config_dir)}
@watchlists.command(help='show watchlist')
@click.argument('name', nargs=1, required=False)
@click.pass_context
def show(ctx, name):
watchlist = wl.merge_watchlist(ctx.obj['watchlist'], wl._builtins)
click.echo(colorize_json(
watchlist if name is None else watchlist[name]))
@watchlists.command(help='load passed in watchlist')
@click.argument('data', nargs=1, required=True)
@click.pass_context
def load(ctx, data):
try:
wl.write_to_file(json.loads(data), ctx.obj['path'])
except (json.JSONDecodeError, IndexError):
click.echo('You have passed an invalid text respresentation of a '
'JSON object. Try again.')
@watchlists.command(help='add ticker to watchlist')
@click.argument('name', nargs=1, required=True)
@click.argument('ticker_names', nargs=-1, required=True)
@click.pass_context
def add(ctx, name, ticker_names):
for ticker in ticker_names:
watchlist = wl.add_ticker(
name, ticker, ctx.obj['watchlist'])
wl.write_to_file(watchlist, ctx.obj['path'])
@watchlists.command(help='remove ticker from watchlist')
@click.argument('name', nargs=1, required=True)
@click.argument('ticker_name', nargs=1, required=True)
@click.pass_context
def remove(ctx, name, ticker_name):
try:
watchlist = wl.remove_ticker(name, ticker_name, ctx.obj['watchlist'])
except KeyError:
log.error(f"No watchlist with name `{name}` could be found?")
except ValueError:
if name in wl._builtins and ticker_name in wl._builtins[name]:
log.error(f"Can not remove ticker `{ticker_name}` from built-in "
f"list `{name}`")
else:
log.error(f"Ticker `{ticker_name}` not found in list `{name}`")
else:
wl.write_to_file(watchlist, ctx.obj['path'])
@watchlists.command(help='delete watchlist group')
@click.argument('name', nargs=1, required=True)
@click.pass_context
def delete(ctx, name):
watchlist = wl.delete_group(name, ctx.obj['watchlist'])
wl.write_to_file(watchlist, ctx.obj['path'])
@watchlists.command(help='merge a watchlist from another user')
@click.argument('watchlist_to_merge', nargs=1, required=True)
@click.pass_context
def merge(ctx, watchlist_to_merge):
merged_watchlist = wl.merge_watchlist(json.loads(watchlist_to_merge),
ctx.obj['watchlist'])
wl.write_to_file(merged_watchlist, ctx.obj['path'])
@watchlists.command(help='dump text respresentation of a watchlist to console')
@click.argument('name', nargs=1, required=False)
@click.pass_context
def dump(ctx, name):
click.echo(json.dumps(ctx.obj['watchlist']))