CLI tests for watchlist commands
parent
11f25958ba
commit
cd69c30143
30
piker/cli.py
30
piker/cli.py
|
@ -22,6 +22,7 @@ DEFAULT_BROKER = 'robinhood'
|
|||
_config_dir = click.get_app_dir('piker')
|
||||
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
|
||||
|
||||
|
||||
def run(main, loglevel='info'):
|
||||
log = get_console_log(loglevel)
|
||||
|
||||
|
@ -152,20 +153,23 @@ def watch(loglevel, broker, rate, name):
|
|||
|
||||
@cli.group()
|
||||
@click.option('--loglevel', '-l', default='warning', help='Logging level')
|
||||
@click.option('--config_dir', '-d', default=_watchlists_data_path,
|
||||
help='Path to piker configuration directory')
|
||||
@click.pass_context
|
||||
def watchlists(ctx, loglevel):
|
||||
def watchlists(ctx, loglevel, config_dir):
|
||||
"""Watchlists commands and operations
|
||||
"""
|
||||
get_console_log(loglevel) # activate console logging
|
||||
wl.make_config_dir(_config_dir)
|
||||
ctx.obj = wl.ensure_watchlists(_watchlists_data_path)
|
||||
ctx.obj = {'path': config_dir,
|
||||
'watchlist': wl.ensure_watchlists(config_dir)}
|
||||
|
||||
|
||||
@watchlists.command(help='show watchlist')
|
||||
@click.argument('name', nargs=1, required=False)
|
||||
@click.pass_context
|
||||
def show(ctx, name):
|
||||
watchlist = ctx.obj
|
||||
watchlist = ctx.obj['watchlist']
|
||||
click.echo(colorize_json(
|
||||
watchlist if name is None else watchlist[name]))
|
||||
|
||||
|
@ -175,7 +179,7 @@ def show(ctx, name):
|
|||
@click.pass_context
|
||||
def load(ctx, data):
|
||||
try:
|
||||
wl.write_watchlists(data, _watchlists_data_path)
|
||||
wl.write_watchlists(data, ctx.obj['path'])
|
||||
except (json.JSONDecodeError, IndexError):
|
||||
click.echo('You have passed an invalid text respresentation of a '
|
||||
'JSON object. Try again.')
|
||||
|
@ -186,8 +190,8 @@ def load(ctx, data):
|
|||
@click.argument('ticker_name', nargs=1, required=True)
|
||||
@click.pass_context
|
||||
def add(ctx, name, ticker_name):
|
||||
watchlist = ctx.obj
|
||||
wl.add_ticker(name, ticker_name, watchlist, _watchlists_data_path)
|
||||
watchlist = ctx.obj['watchlist']
|
||||
wl.add_ticker(name, ticker_name, watchlist, ctx.obj['path'])
|
||||
|
||||
|
||||
@watchlists.command(help='remove ticker from watchlist')
|
||||
|
@ -195,29 +199,29 @@ def add(ctx, name, ticker_name):
|
|||
@click.argument('ticker_name', nargs=1, required=True)
|
||||
@click.pass_context
|
||||
def remove(ctx, name, ticker_name):
|
||||
watchlist = ctx.obj
|
||||
wl.remove_ticker(name, ticker_name, watchlist, _watchlists_data_path)
|
||||
watchlist = ctx.obj['watchlist']
|
||||
wl.remove_ticker(name, ticker_name, watchlist, ctx.obj['path'])
|
||||
|
||||
|
||||
@watchlists.command(help='delete watchlist group')
|
||||
@click.argument('name', nargs=1, required=True)
|
||||
@click.pass_context
|
||||
def delete(ctx, name):
|
||||
watchlist = ctx.obj
|
||||
wl.delete_group(name, watchlist, _watchlists_data_path)
|
||||
watchlist = ctx.obj['watchlist']
|
||||
wl.delete_group(name, watchlist, ctx.obj['path'])
|
||||
|
||||
|
||||
@watchlists.command(help='merge a watchlist from another user')
|
||||
@click.argument('watchlist_to_merge', nargs=1, required=True)
|
||||
@click.pass_context
|
||||
def merge(ctx, watchlist_to_merge):
|
||||
watchlist = ctx.obj
|
||||
wl.merge_watchlist(watchlist_to_merge, watchlist, _watchlists_data_path)
|
||||
watchlist = ctx.obj['watchlist']
|
||||
wl.merge_watchlist(watchlist_to_merge, watchlist, ctx.obj['path'])
|
||||
|
||||
|
||||
@watchlists.command(help='dump text respresentation of a watchlist to console')
|
||||
@click.argument('name', nargs=1, required=False)
|
||||
@click.pass_context
|
||||
def dump(ctx, name):
|
||||
watchlist = ctx.obj
|
||||
watchlist = ctx.obj['watchlist']
|
||||
click.echo(json.dumps(watchlist))
|
||||
|
|
|
@ -4,14 +4,19 @@ CLI testing, dawg.
|
|||
import json
|
||||
import subprocess
|
||||
import pytest
|
||||
import tempfile
|
||||
import os.path
|
||||
import logging
|
||||
|
||||
import piker.watchlists as wl
|
||||
import piker.cli as cli
|
||||
from piker.log import colorize_json
|
||||
|
||||
def run(cmd):
|
||||
|
||||
def run(cmd, *args):
|
||||
"""Run cmd and check for zero return code.
|
||||
"""
|
||||
cp = subprocess.run(cmd.split())
|
||||
cp = subprocess.run(cmd.split() + list(args))
|
||||
cp.check_returncode()
|
||||
return cp
|
||||
|
||||
|
@ -87,3 +92,119 @@ def test_api_method_not_found(nyse_tickers, capfd):
|
|||
out, err = capfd.readouterr()
|
||||
assert 'null' in out
|
||||
assert f'No api method `{bad_meth}` could be found?' in err
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def temp_dir():
|
||||
"""Creates a path to a pretend config dir in a temporary directory for
|
||||
testing.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
config_dir = os.path.join(tempdir, 'piker')
|
||||
yield config_dir
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def piker_dir(temp_dir):
|
||||
wl.make_config_dir(temp_dir)
|
||||
json_file_path = os.path.join(temp_dir, 'watchlists.json')
|
||||
watchlist = {
|
||||
'dad': ['GM', 'TSLA', 'DOL.TO', 'CIM', 'SPY', 'SHOP.TO'],
|
||||
'pharma': ['ATE.VN'],
|
||||
'indexes': ['SPY', 'DAX', 'QQQ', 'DIA'],
|
||||
}
|
||||
wl.write_sorted_json(watchlist, json_file_path)
|
||||
yield json_file_path
|
||||
|
||||
|
||||
def test_show_watchlists(capfd, piker_dir):
|
||||
"""Ensure a watchlist is printed.
|
||||
"""
|
||||
expected_out = json.dumps({
|
||||
'dad': ['CIM', 'DOL.TO', 'GM', 'SHOP.TO', 'SPY', 'TSLA'],
|
||||
'indexes': ['DAX', 'DIA', 'QQQ', 'SPY'],
|
||||
'pharma': ['ATE.VN'],
|
||||
}, indent=4)
|
||||
run(f"piker watchlists -d {piker_dir} show")
|
||||
out, err = capfd.readouterr()
|
||||
assert out.strip() == expected_out
|
||||
|
||||
|
||||
def test_dump_watchlists(capfd, piker_dir):
|
||||
"""Ensure watchlist is dumped.
|
||||
"""
|
||||
expected_out = json.dumps({
|
||||
'dad': ['CIM', 'DOL.TO', 'GM', 'SHOP.TO', 'SPY', 'TSLA'],
|
||||
'indexes': ['DAX', 'DIA', 'QQQ', 'SPY'],
|
||||
'pharma': ['ATE.VN'],
|
||||
})
|
||||
run(f"piker watchlists -d {piker_dir} dump")
|
||||
out, err = capfd.readouterr()
|
||||
assert out.strip() == expected_out
|
||||
|
||||
|
||||
def test_ticker_added_to_watchlists(capfd, piker_dir):
|
||||
expected_out = {
|
||||
'dad': ['CIM', 'DOL.TO', 'GM', 'SHOP.TO', 'SPY', 'TSLA'],
|
||||
'indexes': ['DAX', 'DIA', 'QQQ', 'SPY'],
|
||||
'pharma': ['ATE.VN', 'CRACK'],
|
||||
}
|
||||
run(f"piker watchlists -d {piker_dir} add pharma CRACK")
|
||||
out = wl.ensure_watchlists(piker_dir)
|
||||
assert out == expected_out
|
||||
|
||||
|
||||
def test_ticker_removed_from_watchlists(capfd, piker_dir):
|
||||
expected_out = {
|
||||
'dad': ['CIM', 'DOL.TO', 'GM', 'SHOP.TO', 'SPY', 'TSLA'],
|
||||
'indexes': ['DAX', 'DIA', 'SPY'],
|
||||
'pharma': ['ATE.VN'],
|
||||
}
|
||||
run(f"piker watchlists -d {piker_dir} remove indexes QQQ")
|
||||
out = wl.ensure_watchlists(piker_dir)
|
||||
assert out == expected_out
|
||||
|
||||
|
||||
def test_group_deleted_from_watchlists(capfd, piker_dir):
|
||||
expected_out = {
|
||||
'dad': ['CIM', 'DOL.TO', 'GM', 'SHOP.TO', 'SPY', 'TSLA'],
|
||||
'indexes': ['DAX', 'DIA', 'QQQ', 'SPY'],
|
||||
}
|
||||
run(f"piker watchlists -d {piker_dir} delete pharma")
|
||||
out = wl.ensure_watchlists(piker_dir)
|
||||
assert out == expected_out
|
||||
|
||||
|
||||
def test_watchlists_loaded(capfd, piker_dir):
|
||||
expected_out_text = json.dumps({
|
||||
'dad': ['CIM', 'DOL.TO', 'GM', 'SHOP.TO', 'SPY', 'TSLA'],
|
||||
'pharma': ['ATE.VN'],
|
||||
})
|
||||
expected_out = {
|
||||
'dad': ['CIM', 'DOL.TO', 'GM', 'SHOP.TO', 'SPY', 'TSLA'],
|
||||
'pharma': ['ATE.VN'],
|
||||
}
|
||||
run(f"piker watchlists -d {piker_dir} load", expected_out_text)
|
||||
out = wl.ensure_watchlists(piker_dir)
|
||||
assert out == expected_out
|
||||
|
||||
|
||||
def test_watchlists_is_merge(capfd, piker_dir):
|
||||
orig_watchlist = {
|
||||
'dad': ['CIM', 'DOL.TO', 'GM', 'SHOP.TO', 'SPY', 'TSLA'],
|
||||
'indexes': ['DAX', 'DIA', 'QQQ', 'SPY'],
|
||||
'pharma': ['ATE.VN'],
|
||||
}
|
||||
list_to_merge = json.dumps({
|
||||
'drugs': ['CRACK']
|
||||
})
|
||||
expected_out = {
|
||||
'dad': ['CIM', 'DOL.TO', 'GM', 'SHOP.TO', 'SPY', 'TSLA'],
|
||||
'indexes': ['DAX', 'DIA', 'QQQ', 'SPY'],
|
||||
'pharma': ['ATE.VN'],
|
||||
'drugs': ['CRACK']
|
||||
}
|
||||
wl.write_sorted_json(orig_watchlist, piker_dir)
|
||||
run(f"piker watchlists -d {piker_dir} merge", list_to_merge)
|
||||
out = wl.ensure_watchlists(piker_dir)
|
||||
assert out == expected_out
|
||||
|
|
|
@ -13,7 +13,6 @@ import piker.watchlists as wl
|
|||
@pytest.fixture
|
||||
def temp_dir():
|
||||
"""Creates a path to a pretend config dir in a temporary directory for
|
||||
|
||||
testing.
|
||||
"""
|
||||
with tempfile.TemporaryDirectory() as tempdir:
|
||||
|
@ -56,7 +55,6 @@ def test_watchlists_config_dir_created(caplog, temp_dir):
|
|||
|
||||
def test_watchlist_is_read_from_file(piker_dir):
|
||||
"""Ensure json info is read from file or an empty dict is generated
|
||||
|
||||
and that text respresentation of a watchlist is saved to file.
|
||||
"""
|
||||
wl_temp = wl.ensure_watchlists(piker_dir)
|
||||
|
@ -79,7 +77,6 @@ def test_new_ticker_added(piker_dir):
|
|||
|
||||
def test_ticker_is_removed(piker_dir):
|
||||
"""Verify that passed in ticker is removed and that a group is removed
|
||||
|
||||
if no tickers left.
|
||||
"""
|
||||
wl_temp = {'test': ['TEST.CN', 'TEST2.CN'], 'test2': ['TEST.CN']}
|
||||
|
|
Loading…
Reference in New Issue