diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 95eb6f08..a35e4aea 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -20,10 +20,6 @@ Broker clients, daemons and general back end machinery. from importlib import import_module from types import ModuleType -# TODO: move to urllib3/requests once supported -import asks -asks.init('trio') - __brokers__ = [ 'binance', 'ib', @@ -45,16 +41,20 @@ __brokers__ = [ def get_brokermod(brokername: str) -> ModuleType: - """Return the imported broker module by name. - """ + ''' + Return the imported broker module by name. + + ''' module = import_module('.' + brokername, 'piker.brokers') # we only allow monkeying because it's for internal keying - module.name = module.__name__.split('.')[-1] + module.name = module.__name__.split('.')[-1] return module def iter_brokermods(): - """Iterate all built-in broker modules. - """ + ''' + Iterate all built-in broker modules. + + ''' for name in __brokers__: yield get_brokermod(name) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index f0a8d367..5183d2c4 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -227,26 +227,28 @@ async def get_cached_feed( @tractor.stream async def start_quote_stream( - ctx: tractor.Context, # marks this as a streaming func + stream: tractor.Context, # marks this as a streaming func broker: str, symbols: List[Any], feed_type: str = 'stock', rate: int = 3, ) -> None: - """Handle per-broker quote stream subscriptions using a "lazy" pub-sub + ''' + Handle per-broker quote stream subscriptions using a "lazy" pub-sub pattern. Spawns new quoter tasks for each broker backend on-demand. Since most brokers seems to support batch quote requests we limit to one task per process (for now). - """ + + ''' # XXX: why do we need this again? get_console_log(tractor.current_actor().loglevel) # pull global vars from local actor symbols = list(symbols) log.info( - f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}") + f"{stream.chan.uid} subscribed to {broker} for symbols {symbols}") # another actor task may have already created it async with get_cached_feed(broker) as feed: @@ -290,13 +292,13 @@ async def start_quote_stream( assert fquote['displayable'] payload[sym] = fquote - await ctx.send_yield(payload) + await stream.send_yield(payload) await stream_poll_requests( # ``trionics.msgpub`` required kwargs task_name=feed_type, - ctx=ctx, + ctx=stream, topics=symbols, packetizer=feed.mod.packetizer, @@ -319,9 +321,11 @@ async def call_client( class DataFeed: - """Data feed client for streaming symbol data from and making API client calls - to a (remote) ``brokerd`` daemon. - """ + ''' + Data feed client for streaming symbol data from and making API + client calls to a (remote) ``brokerd`` daemon. + + ''' _allowed = ('stock', 'option') def __init__(self, portal, brokermod): diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index f3b26cbe..0a40b548 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -175,6 +175,7 @@ async def relay_order_cmds_from_sync_code( async def open_ems( fqsn: str, mode: str = 'live', + loglevel: str = 'error', ) -> tuple[ OrderBook, @@ -244,6 +245,7 @@ async def open_ems( _emsd_main, fqsn=fqsn, exec_mode=mode, + loglevel=loglevel, ) as ( ctx, diff --git a/piker/data/feed.py b/piker/data/feed.py index 7f628c49..906f4bb4 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1498,10 +1498,10 @@ async def open_feed( fqsns: list[str], - loglevel: Optional[str] = None, + loglevel: str | None = None, backpressure: bool = True, start_stream: bool = True, - tick_throttle: Optional[float] = None, # Hz + tick_throttle: float | None = None, # Hz ) -> Feed: ''' diff --git a/tests/conftest.py b/tests/conftest.py index 2cfaad7a..75c8a92d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ from contextlib import asynccontextmanager as acm +from functools import partial import os import pytest @@ -19,6 +20,11 @@ def pytest_addoption(parser): help="Use a practice API account") +@pytest.fixture(scope='session') +def loglevel(request) -> str: + return request.config.option.loglevel + + @pytest.fixture(scope='session') def test_config(): dirname = os.path.dirname @@ -32,7 +38,10 @@ def test_config(): @pytest.fixture(scope='session', autouse=True) -def confdir(request, test_config): +def confdir( + request, + test_config: str, +): ''' If the `--confdir` flag is not passed use the broker config file found in that dir. @@ -45,51 +54,6 @@ def confdir(request, test_config): return confdir -# @pytest.fixture(scope='session', autouse=True) -# def travis(confdir): -# is_travis = os.environ.get('TRAVIS', False) -# if is_travis: -# # this directory is cached, see .travis.yaml -# conf_file = config.get_broker_conf_path() -# refresh_token = os.environ['QT_REFRESH_TOKEN'] - -# def write_with_token(token): -# # XXX don't pass the dir path here since may be -# # written behind the scenes in the `confdir fixture` -# if not os.path.isfile(conf_file): -# open(conf_file, 'w').close() -# conf, path = config.load() -# conf.setdefault('questrade', {}).update( -# {'refresh_token': token, -# 'is_practice': 'True'} -# ) -# config.write(conf, path) - -# async def ensure_config(): -# # try to refresh current token using cached brokers config -# # if it fails fail try using the refresh token provided by the -# # env var and if that fails stop the test run here. -# try: -# async with questrade.get_client(ask_user=False): -# pass -# except ( -# FileNotFoundError, ValueError, -# questrade.BrokerError, questrade.QuestradeError, -# trio.MultiError, -# ): -# # 3 cases: -# # - config doesn't have a ``refresh_token`` k/v -# # - cache dir does not exist yet -# # - current token is expired; take it form env var -# write_with_token(refresh_token) - -# async with questrade.get_client(ask_user=False): -# pass - -# # XXX ``pytest_trio`` doesn't support scope or autouse -# trio.run(ensure_config) - - _ci_env: bool = os.environ.get('CI', False) @@ -102,24 +66,10 @@ def ci_env() -> bool: return _ci_env -@pytest.fixture -def us_symbols(): - return ['TSLA', 'AAPL', 'CGC', 'CRON'] - - -@pytest.fixture -def tmx_symbols(): - return ['APHA.TO', 'WEED.TO', 'ACB.TO'] - - -@pytest.fixture -def cse_symbols(): - return ['TRUL.CN', 'CWEB.CN', 'SNN.CN'] - - @acm async def _open_test_pikerd( reg_addr: tuple[str, int] | None = None, + loglevel: str = 'warning', **kwargs, ) -> tuple[ @@ -140,10 +90,10 @@ async def _open_test_pikerd( port = random.randint(6e3, 7e3) reg_addr = ('127.0.0.1', port) - # try: async with ( maybe_open_pikerd( registry_addr=reg_addr, + loglevel=loglevel, **kwargs, ) as service_manager, ): @@ -165,9 +115,18 @@ async def _open_test_pikerd( @pytest.fixture -def open_test_pikerd(): +def open_test_pikerd( + request, + loglevel: str, +): - yield _open_test_pikerd + yield partial( + _open_test_pikerd, + + # bind in level from fixture, which is itself set by + # `--ll ` cli flag. + loglevel=loglevel, + ) # TODO: teardown checks such as, # - no leaked subprocs or shm buffers diff --git a/tests/test_feeds.py b/tests/test_feeds.py index 2b85301f..a79ca861 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -35,6 +35,7 @@ from piker.data._source import ( def test_multi_fqsn_feed( open_test_pikerd: AsyncContextManager, fqsns: set[str], + loglevel: str, ci_env: bool ): ''' @@ -60,7 +61,7 @@ def test_multi_fqsn_feed( open_test_pikerd(), open_feed( fqsns, - loglevel='info', + loglevel=loglevel, # TODO: ensure throttle rate is applied # limit to at least display's FPS diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 733b2ba7..4614b4f9 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -111,6 +111,21 @@ def match_packet(symbols, quotes, feed_type='stock'): assert not quotes +@pytest.fixture +def us_symbols(): + return ['TSLA', 'AAPL', 'CGC', 'CRON'] + + +@pytest.fixture +def tmx_symbols(): + return ['APHA.TO', 'WEED.TO', 'ACB.TO'] + + +@pytest.fixture +def cse_symbols(): + return ['TRUL.CN', 'CWEB.CN', 'SNN.CN'] + + # @tractor_test async def test_concurrent_tokens_refresh(us_symbols, loglevel): """Verify that concurrent requests from mulitple tasks work alongside @@ -258,8 +273,8 @@ async def stream_option_chain(feed, symbols): # latency arithmetic loops = 8 - period = 1/3. # 3 rps - timeout = float('inf') #loops / period + # period = 1/3. # 3 rps + timeout = float('inf') # loops / period try: # it'd sure be nice to have an asyncitertools here... @@ -307,8 +322,8 @@ async def stream_stocks(feed, symbols): symbols, 'stock', rate=3, diff_cached=False) # latency arithmetic loops = 8 - period = 1/3. # 3 rps - timeout = loops / period + # period = 1/3. # 3 rps + # timeout = loops / period try: # it'd sure be nice to have an asyncitertools here... diff --git a/tests/test_services.py b/tests/test_services.py index 936a426e..bdce6aa2 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -11,7 +11,6 @@ import tractor from piker._daemon import ( find_service, - check_for_service, Services, ) from piker.data import ( @@ -75,7 +74,8 @@ async def ensure_service( def test_ensure_datafeed_actors( - open_test_pikerd: AsyncContextManager + open_test_pikerd: AsyncContextManager, + loglevel: str, ) -> None: ''' @@ -93,7 +93,7 @@ def test_ensure_datafeed_actors( open_test_pikerd(), open_feed( ['xbtusdt.kraken'], - loglevel='info', + loglevel=loglevel, ) as feed ): # halt rt quote streams since we aren't testing them @@ -109,7 +109,8 @@ def test_ensure_datafeed_actors( def test_ensure_ems_in_paper_actors( - open_test_pikerd: AsyncContextManager + open_test_pikerd: AsyncContextManager, + loglevel: str, ) -> None: @@ -136,6 +137,7 @@ def test_ensure_ems_in_paper_actors( open_ems( 'xbtusdt.kraken', mode='paper', + loglevel=loglevel, ) as ( book, trades_stream,