piker/tests/test_watchlists.py

109 lines
3.4 KiB
Python
Raw Normal View History

"""
Watchlists testing.
"""
import pytest
import tempfile
import os.path
import logging
import piker.watchlists as wl
@pytest.fixture
def temp_dir():
"""Creates a path to a pretend config dir in a temporary directory for
testing.
"""
with tempfile.TemporaryDirectory() as tempdir:
config_dir = os.path.join(tempdir, 'piker')
yield config_dir
@pytest.fixture
def piker_dir(temp_dir):
wl.make_config_dir(temp_dir)
yield os.path.join(temp_dir, 'watchlists.json')
def test_watchlist_is_sorted_no_dups_and_saved_to_file(piker_dir):
wl_temp = {'test': ['TEST.CN', 'AAA'], 'AA': ['TEST.CN', 'TEST.CN'],
'AA': ['TEST.CN']}
wl_sort = {'AA': ['TEST.CN'], 'test': ['AAA', 'TEST.CN']}
2018-04-06 19:07:47 +00:00
wl.write_to_file(wl_temp, piker_dir)
temp_sorted = wl.ensure_watchlists(piker_dir)
assert temp_sorted == wl_sort
def test_watchlists_config_dir_created(caplog, temp_dir):
"""Ensure that a config directory is created.
"""
2019-02-10 22:29:08 +00:00
with caplog.at_level(logging.DEBUG, logger='piker'):
wl.make_config_dir(temp_dir)
assert len(caplog.records) == 1
record = caplog.records[0]
assert record.levelname == 'DEBUG'
assert record.message == f"Creating config dir {temp_dir}"
assert os.path.isdir(temp_dir)
# Test that there is no error and that a log message is not generatd
# when trying to create a directory that already exists
with caplog.at_level(logging.DEBUG):
wl.make_config_dir(temp_dir)
# There should be no additional log message.
assert len(caplog.records) == 1
def test_watchlist_is_read_from_file(piker_dir):
"""Ensure json info is read from file or an empty dict is generated
and that text respresentation of a watchlist is saved to file.
"""
wl_temp = wl.ensure_watchlists(piker_dir)
assert wl_temp == {}
wl_temp2 = {"AA": ["TEST.CN"]}
2018-04-06 19:07:47 +00:00
wl.write_to_file(wl_temp2, piker_dir)
assert wl_temp2 == wl.ensure_watchlists(piker_dir)
def test_new_ticker_added():
"""Ensure that a new ticker is added to a watchlist for both cases.
"""
wl_temp = wl.add_ticker('test', 'TEST.CN', {'test': ['TEST2.CN']})
assert len(wl_temp['test']) == 2
wl_temp = wl.add_ticker('test2', 'TEST.CN', wl_temp)
assert wl_temp['test2']
def test_ticker_is_removed():
"""Verify that passed in ticker is removed and that a group is removed
if no tickers left.
"""
wl_temp = {'test': ['TEST.CN', 'TEST2.CN'], 'test2': ['TEST.CN']}
wl_temp = wl.remove_ticker('test', 'TEST.CN', wl_temp)
wl_temp = wl.remove_ticker('test2', 'TEST.CN', wl_temp)
assert wl_temp == {'test': ['TEST2.CN']}
assert not wl_temp.get('test2')
2018-04-10 18:13:58 +00:00
# verify trying to remove from nonexistant list
with pytest.raises(KeyError):
wl.remove_ticker('doggy', 'TEST.CN', wl_temp)
# verify trying to remove non-existing ticker
with pytest.raises(ValueError):
wl.remove_ticker('test', 'TEST.CN', wl_temp)
def test_group_is_deleted():
"""Check that watchlist group is removed.
"""
wl_temp = {'test': ['TEST.CN']}
wl_temp = wl.delete_group('test', wl_temp)
assert not wl_temp.get('test')
def test_watchlist_is_merged():
"""Ensure that watchlist is merged.
"""
wl_temp = {'test': ['TEST.CN']}
wl_temp2 = {'test': ['TOAST'], "test2": ["TEST2.CN"]}
wl_temp3 = wl.merge_watchlist(wl_temp2, wl_temp)
assert wl_temp3 == {'test': ['TEST.CN', 'TOAST'], 'test2': ['TEST2.CN']}