Merge pull request #465 from pikers/loglevel_to_testpikerd

`loglevel` to `open_test_pikerd()` via `--ll <level>` flag
backend_spec
jaredgoldman 2023-02-21 12:34:55 -05:00 committed by GitHub
commit 82174d01c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 76 additions and 93 deletions

View File

@ -20,10 +20,6 @@ Broker clients, daemons and general back end machinery.
from importlib import import_module from importlib import import_module
from types import ModuleType from types import ModuleType
# TODO: move to urllib3/requests once supported
import asks
asks.init('trio')
__brokers__ = [ __brokers__ = [
'binance', 'binance',
'ib', 'ib',
@ -45,16 +41,20 @@ __brokers__ = [
def get_brokermod(brokername: str) -> ModuleType: def get_brokermod(brokername: str) -> ModuleType:
"""Return the imported broker module by name. '''
""" Return the imported broker module by name.
'''
module = import_module('.' + brokername, 'piker.brokers') module = import_module('.' + brokername, 'piker.brokers')
# we only allow monkeying because it's for internal keying # we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1] module.name = module.__name__.split('.')[-1]
return module return module
def iter_brokermods(): def iter_brokermods():
"""Iterate all built-in broker modules. '''
""" Iterate all built-in broker modules.
'''
for name in __brokers__: for name in __brokers__:
yield get_brokermod(name) yield get_brokermod(name)

View File

@ -227,26 +227,28 @@ async def get_cached_feed(
@tractor.stream @tractor.stream
async def start_quote_stream( async def start_quote_stream(
ctx: tractor.Context, # marks this as a streaming func stream: tractor.Context, # marks this as a streaming func
broker: str, broker: str,
symbols: List[Any], symbols: List[Any],
feed_type: str = 'stock', feed_type: str = 'stock',
rate: int = 3, rate: int = 3,
) -> None: ) -> None:
"""Handle per-broker quote stream subscriptions using a "lazy" pub-sub '''
Handle per-broker quote stream subscriptions using a "lazy" pub-sub
pattern. pattern.
Spawns new quoter tasks for each broker backend on-demand. Spawns new quoter tasks for each broker backend on-demand.
Since most brokers seems to support batch quote requests we Since most brokers seems to support batch quote requests we
limit to one task per process (for now). limit to one task per process (for now).
"""
'''
# XXX: why do we need this again? # XXX: why do we need this again?
get_console_log(tractor.current_actor().loglevel) get_console_log(tractor.current_actor().loglevel)
# pull global vars from local actor # pull global vars from local actor
symbols = list(symbols) symbols = list(symbols)
log.info( log.info(
f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}") f"{stream.chan.uid} subscribed to {broker} for symbols {symbols}")
# another actor task may have already created it # another actor task may have already created it
async with get_cached_feed(broker) as feed: async with get_cached_feed(broker) as feed:
@ -290,13 +292,13 @@ async def start_quote_stream(
assert fquote['displayable'] assert fquote['displayable']
payload[sym] = fquote payload[sym] = fquote
await ctx.send_yield(payload) await stream.send_yield(payload)
await stream_poll_requests( await stream_poll_requests(
# ``trionics.msgpub`` required kwargs # ``trionics.msgpub`` required kwargs
task_name=feed_type, task_name=feed_type,
ctx=ctx, ctx=stream,
topics=symbols, topics=symbols,
packetizer=feed.mod.packetizer, packetizer=feed.mod.packetizer,
@ -319,9 +321,11 @@ async def call_client(
class DataFeed: class DataFeed:
"""Data feed client for streaming symbol data from and making API client calls '''
to a (remote) ``brokerd`` daemon. Data feed client for streaming symbol data from and making API
""" client calls to a (remote) ``brokerd`` daemon.
'''
_allowed = ('stock', 'option') _allowed = ('stock', 'option')
def __init__(self, portal, brokermod): def __init__(self, portal, brokermod):

View File

@ -175,6 +175,7 @@ async def relay_order_cmds_from_sync_code(
async def open_ems( async def open_ems(
fqsn: str, fqsn: str,
mode: str = 'live', mode: str = 'live',
loglevel: str = 'error',
) -> tuple[ ) -> tuple[
OrderBook, OrderBook,
@ -244,6 +245,7 @@ async def open_ems(
_emsd_main, _emsd_main,
fqsn=fqsn, fqsn=fqsn,
exec_mode=mode, exec_mode=mode,
loglevel=loglevel,
) as ( ) as (
ctx, ctx,

View File

@ -1498,10 +1498,10 @@ async def open_feed(
fqsns: list[str], fqsns: list[str],
loglevel: Optional[str] = None, loglevel: str | None = None,
backpressure: bool = True, backpressure: bool = True,
start_stream: bool = True, start_stream: bool = True,
tick_throttle: Optional[float] = None, # Hz tick_throttle: float | None = None, # Hz
) -> Feed: ) -> Feed:
''' '''

View File

@ -1,4 +1,5 @@
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial
import os import os
import pytest import pytest
@ -19,6 +20,11 @@ def pytest_addoption(parser):
help="Use a practice API account") help="Use a practice API account")
@pytest.fixture(scope='session')
def loglevel(request) -> str:
return request.config.option.loglevel
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def test_config(): def test_config():
dirname = os.path.dirname dirname = os.path.dirname
@ -32,7 +38,10 @@ def test_config():
@pytest.fixture(scope='session', autouse=True) @pytest.fixture(scope='session', autouse=True)
def confdir(request, test_config): def confdir(
request,
test_config: str,
):
''' '''
If the `--confdir` flag is not passed use the If the `--confdir` flag is not passed use the
broker config file found in that dir. broker config file found in that dir.
@ -45,51 +54,6 @@ def confdir(request, test_config):
return confdir return confdir
# @pytest.fixture(scope='session', autouse=True)
# def travis(confdir):
# is_travis = os.environ.get('TRAVIS', False)
# if is_travis:
# # this directory is cached, see .travis.yaml
# conf_file = config.get_broker_conf_path()
# refresh_token = os.environ['QT_REFRESH_TOKEN']
# def write_with_token(token):
# # XXX don't pass the dir path here since may be
# # written behind the scenes in the `confdir fixture`
# if not os.path.isfile(conf_file):
# open(conf_file, 'w').close()
# conf, path = config.load()
# conf.setdefault('questrade', {}).update(
# {'refresh_token': token,
# 'is_practice': 'True'}
# )
# config.write(conf, path)
# async def ensure_config():
# # try to refresh current token using cached brokers config
# # if it fails fail try using the refresh token provided by the
# # env var and if that fails stop the test run here.
# try:
# async with questrade.get_client(ask_user=False):
# pass
# except (
# FileNotFoundError, ValueError,
# questrade.BrokerError, questrade.QuestradeError,
# trio.MultiError,
# ):
# # 3 cases:
# # - config doesn't have a ``refresh_token`` k/v
# # - cache dir does not exist yet
# # - current token is expired; take it form env var
# write_with_token(refresh_token)
# async with questrade.get_client(ask_user=False):
# pass
# # XXX ``pytest_trio`` doesn't support scope or autouse
# trio.run(ensure_config)
_ci_env: bool = os.environ.get('CI', False) _ci_env: bool = os.environ.get('CI', False)
@ -102,24 +66,10 @@ def ci_env() -> bool:
return _ci_env return _ci_env
@pytest.fixture
def us_symbols():
return ['TSLA', 'AAPL', 'CGC', 'CRON']
@pytest.fixture
def tmx_symbols():
return ['APHA.TO', 'WEED.TO', 'ACB.TO']
@pytest.fixture
def cse_symbols():
return ['TRUL.CN', 'CWEB.CN', 'SNN.CN']
@acm @acm
async def _open_test_pikerd( async def _open_test_pikerd(
reg_addr: tuple[str, int] | None = None, reg_addr: tuple[str, int] | None = None,
loglevel: str = 'warning',
**kwargs, **kwargs,
) -> tuple[ ) -> tuple[
@ -140,10 +90,10 @@ async def _open_test_pikerd(
port = random.randint(6e3, 7e3) port = random.randint(6e3, 7e3)
reg_addr = ('127.0.0.1', port) reg_addr = ('127.0.0.1', port)
# try:
async with ( async with (
maybe_open_pikerd( maybe_open_pikerd(
registry_addr=reg_addr, registry_addr=reg_addr,
loglevel=loglevel,
**kwargs, **kwargs,
) as service_manager, ) as service_manager,
): ):
@ -165,9 +115,18 @@ async def _open_test_pikerd(
@pytest.fixture @pytest.fixture
def open_test_pikerd(): def open_test_pikerd(
request,
loglevel: str,
):
yield _open_test_pikerd yield partial(
_open_test_pikerd,
# bind in level from fixture, which is itself set by
# `--ll <value>` cli flag.
loglevel=loglevel,
)
# TODO: teardown checks such as, # TODO: teardown checks such as,
# - no leaked subprocs or shm buffers # - no leaked subprocs or shm buffers

View File

@ -35,6 +35,7 @@ from piker.data._source import (
def test_multi_fqsn_feed( def test_multi_fqsn_feed(
open_test_pikerd: AsyncContextManager, open_test_pikerd: AsyncContextManager,
fqsns: set[str], fqsns: set[str],
loglevel: str,
ci_env: bool ci_env: bool
): ):
''' '''
@ -60,7 +61,7 @@ def test_multi_fqsn_feed(
open_test_pikerd(), open_test_pikerd(),
open_feed( open_feed(
fqsns, fqsns,
loglevel='info', loglevel=loglevel,
# TODO: ensure throttle rate is applied # TODO: ensure throttle rate is applied
# limit to at least display's FPS # limit to at least display's FPS

View File

@ -111,6 +111,21 @@ def match_packet(symbols, quotes, feed_type='stock'):
assert not quotes assert not quotes
@pytest.fixture
def us_symbols():
return ['TSLA', 'AAPL', 'CGC', 'CRON']
@pytest.fixture
def tmx_symbols():
return ['APHA.TO', 'WEED.TO', 'ACB.TO']
@pytest.fixture
def cse_symbols():
return ['TRUL.CN', 'CWEB.CN', 'SNN.CN']
# @tractor_test # @tractor_test
async def test_concurrent_tokens_refresh(us_symbols, loglevel): async def test_concurrent_tokens_refresh(us_symbols, loglevel):
"""Verify that concurrent requests from mulitple tasks work alongside """Verify that concurrent requests from mulitple tasks work alongside
@ -258,8 +273,8 @@ async def stream_option_chain(feed, symbols):
# latency arithmetic # latency arithmetic
loops = 8 loops = 8
period = 1/3. # 3 rps # period = 1/3. # 3 rps
timeout = float('inf') #loops / period timeout = float('inf') # loops / period
try: try:
# it'd sure be nice to have an asyncitertools here... # it'd sure be nice to have an asyncitertools here...
@ -307,8 +322,8 @@ async def stream_stocks(feed, symbols):
symbols, 'stock', rate=3, diff_cached=False) symbols, 'stock', rate=3, diff_cached=False)
# latency arithmetic # latency arithmetic
loops = 8 loops = 8
period = 1/3. # 3 rps # period = 1/3. # 3 rps
timeout = loops / period # timeout = loops / period
try: try:
# it'd sure be nice to have an asyncitertools here... # it'd sure be nice to have an asyncitertools here...

View File

@ -11,7 +11,6 @@ import tractor
from piker._daemon import ( from piker._daemon import (
find_service, find_service,
check_for_service,
Services, Services,
) )
from piker.data import ( from piker.data import (
@ -75,7 +74,8 @@ async def ensure_service(
def test_ensure_datafeed_actors( def test_ensure_datafeed_actors(
open_test_pikerd: AsyncContextManager open_test_pikerd: AsyncContextManager,
loglevel: str,
) -> None: ) -> None:
''' '''
@ -93,7 +93,7 @@ def test_ensure_datafeed_actors(
open_test_pikerd(), open_test_pikerd(),
open_feed( open_feed(
['xbtusdt.kraken'], ['xbtusdt.kraken'],
loglevel='info', loglevel=loglevel,
) as feed ) as feed
): ):
# halt rt quote streams since we aren't testing them # halt rt quote streams since we aren't testing them
@ -109,7 +109,8 @@ def test_ensure_datafeed_actors(
def test_ensure_ems_in_paper_actors( def test_ensure_ems_in_paper_actors(
open_test_pikerd: AsyncContextManager open_test_pikerd: AsyncContextManager,
loglevel: str,
) -> None: ) -> None:
@ -136,6 +137,7 @@ def test_ensure_ems_in_paper_actors(
open_ems( open_ems(
'xbtusdt.kraken', 'xbtusdt.kraken',
mode='paper', mode='paper',
loglevel=loglevel,
) as ( ) as (
book, book,
trades_stream, trades_stream,