""" Watchlists testing. """ import json import pytest import tempfile import os.path import logging import piker.watchlists as wl @pytest.fixture def temp_dir(): """Creates a path to a pretend config dir in a temporary directory for testing. """ with tempfile.TemporaryDirectory() as tempdir: config_dir = os.path.join(tempdir, 'piker') yield config_dir @pytest.fixture def piker_dir(temp_dir): wl.make_config_dir(temp_dir) yield os.path.join(temp_dir, 'watchlists.json') def test_watchlist_is_sorted_no_dups_and_saved_to_file(piker_dir): wl_temp = {'test': ['TEST.CN', 'AAA'], 'AA': ['TEST.CN', 'TEST.CN'], 'AA': ['TEST.CN']} wl_sort = {'AA': ['TEST.CN'], 'test': ['AAA', 'TEST.CN']} wl.write_sorted_json(wl_temp, piker_dir) temp_sorted = wl.ensure_watchlists(piker_dir) assert temp_sorted == wl_sort def test_watchlists_config_dir_created(caplog, temp_dir): """Ensure that a config directory is created. """ with caplog.at_level(logging.DEBUG): wl.make_config_dir(temp_dir) assert len(caplog.records) == 1 record = caplog.records[0] assert record.levelname == 'DEBUG' assert record.message == f"Creating config dir {temp_dir}" assert os.path.isdir(temp_dir) # Test that there is no error and that a log message is not generatd # when trying to create a directory that already exists with caplog.at_level(logging.DEBUG): wl.make_config_dir(temp_dir) # There should be no additional log message. assert len(caplog.records) == 1 def test_watchlist_is_read_from_file(piker_dir): """Ensure json info is read from file or an empty dict is generated and that text respresentation of a watchlist is saved to file. """ wl_temp = wl.ensure_watchlists(piker_dir) assert wl_temp == {} wl_temp2 = '{"AA": ["TEST.CN"]}' wl.write_watchlists(wl_temp2, piker_dir) assert json.loads(wl_temp2) == wl.ensure_watchlists(piker_dir) def test_new_ticker_added(piker_dir): """Ensure that a new ticker is added to a watchlist for both cases. """ wl.add_ticker('test', 'TEST.CN', {'test': ['TEST2.CN']}, piker_dir) wl_temp = wl.ensure_watchlists(piker_dir) assert len(wl_temp['test']) == 2 wl.add_ticker('test2', 'TEST.CN', wl_temp, piker_dir) wl_temp = wl.ensure_watchlists(piker_dir) assert wl_temp['test2'] def test_ticker_is_removed(piker_dir): """Verify that passed in ticker is removed and that a group is removed if no tickers left. """ wl_temp = {'test': ['TEST.CN', 'TEST2.CN'], 'test2': ['TEST.CN']} wl.remove_ticker('test', 'TEST.CN', wl_temp, piker_dir) wl.remove_ticker('test2', 'TEST.CN', wl_temp, piker_dir) wl_temp = wl.ensure_watchlists(piker_dir) assert wl_temp == {'test': ['TEST2.CN']} assert not wl_temp.get('test2') def test_group_is_deleted(piker_dir): """Check that watchlist group is removed. """ wl_temp = {'test': ['TEST.CN']} wl.delete_group('test', wl_temp, piker_dir) wl_temp = wl.ensure_watchlists(piker_dir) assert not wl_temp.get('test') def test_watchlist_is_merged(piker_dir): """Ensure that watchlist is merged. """ wl_temp = {'test': ['TEST.CN']} wl_temp2 = '{"test2": ["TEST2.CN"]}' wl.merge_watchlist(wl_temp2, wl_temp, piker_dir) wl_temp3 = wl.ensure_watchlists(piker_dir) assert wl_temp3 == {'test': ['TEST.CN'], 'test2': ['TEST2.CN']}