Add a `open_test_pikerd()` acm fixture for easy booting of the service stack

agg_feedz
Tyler Goodlet 2023-01-09 18:16:09 -05:00
parent 008ae47e14
commit 06622105cd
1 changed files with 105 additions and 46 deletions

View File

@ -1,10 +1,12 @@
from contextlib import asynccontextmanager as acm
import os
import pytest
import tractor
import trio
from piker import log, config
from piker.brokers import questrade
from piker import (
# log,
config,
)
def pytest_addoption(parser):
@ -28,9 +30,11 @@ def test_config():
@pytest.fixture(scope='session', autouse=True)
def confdir(request, test_config):
"""If the `--confdir` flag is not passed use the
'''
If the `--confdir` flag is not passed use the
broker config file found in that dir.
"""
'''
confdir = request.config.option.confdir
if confdir is not None:
config._override_config_dir(confdir)
@ -38,49 +42,49 @@ def confdir(request, test_config):
return confdir
@pytest.fixture(scope='session', autouse=True)
def travis(confdir):
is_travis = os.environ.get('TRAVIS', False)
if is_travis:
# this directory is cached, see .travis.yaml
conf_file = config.get_broker_conf_path()
refresh_token = os.environ['QT_REFRESH_TOKEN']
# @pytest.fixture(scope='session', autouse=True)
# def travis(confdir):
# is_travis = os.environ.get('TRAVIS', False)
# if is_travis:
# # this directory is cached, see .travis.yaml
# conf_file = config.get_broker_conf_path()
# refresh_token = os.environ['QT_REFRESH_TOKEN']
def write_with_token(token):
# XXX don't pass the dir path here since may be
# written behind the scenes in the `confdir fixture`
if not os.path.isfile(conf_file):
open(conf_file, 'w').close()
conf, path = config.load()
conf.setdefault('questrade', {}).update(
{'refresh_token': token,
'is_practice': 'True'}
)
config.write(conf, path)
# def write_with_token(token):
# # XXX don't pass the dir path here since may be
# # written behind the scenes in the `confdir fixture`
# if not os.path.isfile(conf_file):
# open(conf_file, 'w').close()
# conf, path = config.load()
# conf.setdefault('questrade', {}).update(
# {'refresh_token': token,
# 'is_practice': 'True'}
# )
# config.write(conf, path)
async def ensure_config():
# try to refresh current token using cached brokers config
# if it fails fail try using the refresh token provided by the
# env var and if that fails stop the test run here.
try:
async with questrade.get_client(ask_user=False):
pass
except (
FileNotFoundError, ValueError,
questrade.BrokerError, questrade.QuestradeError,
trio.MultiError,
):
# 3 cases:
# - config doesn't have a ``refresh_token`` k/v
# - cache dir does not exist yet
# - current token is expired; take it form env var
write_with_token(refresh_token)
# async def ensure_config():
# # try to refresh current token using cached brokers config
# # if it fails fail try using the refresh token provided by the
# # env var and if that fails stop the test run here.
# try:
# async with questrade.get_client(ask_user=False):
# pass
# except (
# FileNotFoundError, ValueError,
# questrade.BrokerError, questrade.QuestradeError,
# trio.MultiError,
# ):
# # 3 cases:
# # - config doesn't have a ``refresh_token`` k/v
# # - cache dir does not exist yet
# # - current token is expired; take it form env var
# write_with_token(refresh_token)
async with questrade.get_client(ask_user=False):
pass
# async with questrade.get_client(ask_user=False):
# pass
# XXX ``pytest_trio`` doesn't support scope or autouse
trio.run(ensure_config)
# # XXX ``pytest_trio`` doesn't support scope or autouse
# trio.run(ensure_config)
_ci_env: bool = os.environ.get('CI', False)
@ -88,10 +92,13 @@ _ci_env: bool = os.environ.get('CI', False)
@pytest.fixture(scope='session')
def ci_env() -> bool:
"""Detect CI envoirment.
"""
'''
Detect CI envoirment.
'''
return _ci_env
@pytest.fixture
def us_symbols():
return ['TSLA', 'AAPL', 'CGC', 'CRON']
@ -106,3 +113,55 @@ def tmx_symbols():
def cse_symbols():
return ['TRUL.CN', 'CWEB.CN', 'SNN.CN']
@acm
async def _open_test_pikerd(
reg_addr: tuple[str, int] | None = None,
**kwargs,
) -> tuple[
str,
int,
tractor.Portal
]:
'''
Testing helper to startup the service tree and runtime on
a different port then the default to allow testing alongside
a running stack.
'''
import random
from piker._daemon import maybe_open_pikerd
if reg_addr is None:
port = random.randint(6e3, 7e3)
reg_addr = ('127.0.0.1', port)
async with (
maybe_open_pikerd(
registry_addr=reg_addr,
**kwargs,
),
):
async with tractor.wait_for_actor(
'pikerd',
arbiter_sockaddr=reg_addr,
) as portal:
raddr = portal.channel.raddr
assert raddr == reg_addr
yield (
raddr[0],
raddr[1],
portal,
)
@pytest.fixture
def open_test_pikerd():
yield _open_test_pikerd
# TODO: teardown checks such as,
# - no leaked subprocs or shm buffers
# - all requested container service are torn down
# - certain ``tractor`` runtime state?