piker/tests/test_databases.py

171 lines
4.9 KiB
Python

from typing import AsyncContextManager
import logging
import pytest
from elasticsearch import (
Elasticsearch,
ConnectionError,
)
import trio
from piker.service import marketstore
from piker.service import elastic
def test_marketstore_startup_and_version(
open_test_pikerd: AsyncContextManager,
loglevel: str,
root_conf: dict,
):
'''
Verify marketstore tsdb starts up and we can
connect with a client to do basic API reqs.
'''
async def main():
user_conf: dict = {
'grpc_listen_port': 5995 + 6,
'ws_listen_port': 5993 + 6,
}
dname: str # service name
config: dict # service name
async with (
open_test_pikerd(
loglevel=loglevel,
# tsdb=True
) as (
_, # host
_, # port
pikerd_portal,
services,
),
marketstore.start_ahab_daemon(
services,
user_conf,
loglevel=loglevel,
) as (dname, config)
):
# ensure user config was applied
for k, v in user_conf.items():
assert config[k] == v
# netconf: dict = root_conf['network']
# tsdbconf = netconf['tsdb']
# TODO: we should probably make this connection poll
# loop part of the `get_client()` implementation no?
# XXX NOTE: we use a retry-connect loop because it seems
# that if we connect *too fast* to a booting container
# instance (i.e. if mkts's IPC machinery isn't up early
# enough) the client will hang on req-resp submissions. So,
# instead we actually reconnect the client entirely in
# a loop until we get a response.
for _ in range(3):
# NOTE: default sockaddr is embedded within
async with marketstore.get_client(
host='localhost',
port=user_conf['grpc_listen_port'],
) as client:
print(f'Client is up @ {user_conf}!')
with trio.move_on_after(1) as cs:
syms = await client.list_symbols()
if cs.cancelled_caught:
continue
# should be an empty db (for now) since we spawn
# marketstore in a ephemeral test-harness dir.
assert not syms
print(f'RX syms resp: {syms}')
assert (
len(await client.server_version()) ==
len('3862e9973da36cfc6004b88172c08f09269aaf01')
)
print('VERSION CHECKED')
break # get out of retry-connect loop
else:
raise RuntimeError('Failed to connect to {conf}!')
# gracefully teardown docker-daemon-service
print(f'Cancelling docker service {dname}')
trio.run(main)
@pytest.mark.skip
def test_elasticsearch_startup_and_version(
open_test_pikerd: AsyncContextManager,
loglevel: str,
log: logging.Logger,
):
'''
Verify elasticsearch starts correctly (like at some point before
infinity time)..
'''
async def main():
port: int = 19200
user_conf: dict = {
'port': port,
}
dname: str # service name
config: dict # service name
async with (
open_test_pikerd(
loglevel=loglevel,
) as (
_, # host
_, # port
pikerd_portal,
services,
),
elastic.start_ahab_daemon(
services,
user_conf,
loglevel=loglevel,
) as (dname, config)
):
# TODO: much like the above connect loop for mkts, we should
# probably make this sync start part of the
# ``open_client()`` implementation?
for i in range(240):
with Elasticsearch(
hosts=[f'http://localhost:{port}']
) as es:
try:
resp = es.info()
assert (
resp['version']['number']
==
elastic._config['version']
)
print(
"OMG ELASTIX FINALLY CUKCING CONNECTED!>!>!\n"
f'resp: {resp}'
)
break
except ConnectionError:
log.exception(
f'RETRYING client connection for {i} time!'
)
await trio.sleep(1)
continue
trio.run(main)