piker/tests/test_databases.py

125 lines
3.7 KiB
Python

from typing import AsyncContextManager
import logging
import trio
from elasticsearch import (
Elasticsearch,
ConnectionError,
)
from piker.service import marketstore
from piker.service import elastic
def test_marketstore_startup_and_version(
open_test_pikerd: AsyncContextManager,
loglevel: str,
):
'''
Verify marketstore tsdb starts up and we can
connect with a client to do basic API reqs.
'''
async def main():
async with (
open_test_pikerd(
loglevel=loglevel,
tsdb=True
) as (
_, # host
_, # port
pikerd_portal,
services,
),
):
# TODO: we should probably make this connection poll
# loop part of the `get_client()` implementation no?
# XXX NOTE: we use a retry-connect loop because it seems
# that if we connect *too fast* to a booting container
# instance (i.e. if mkts's IPC machinery isn't up early
# enough) the client will hang on req-resp submissions. So,
# instead we actually reconnect the client entirely in
# a loop until we get a response.
for _ in range(3):
# NOTE: default sockaddr is embedded within
async with marketstore.get_client() as client:
with trio.move_on_after(1) as cs:
syms = await client.list_symbols()
if cs.cancelled_caught:
continue
# should be an empty db (for now) since we spawn
# marketstore in a ephemeral test-harness dir.
assert not syms
print(f'RX syms resp: {syms}')
assert (
len(await client.server_version()) ==
len('3862e9973da36cfc6004b88172c08f09269aaf01')
)
print('VERSION CHECKED')
break # get out of retry-connect loop
trio.run(main)
def test_elasticsearch_startup_and_version(
open_test_pikerd: AsyncContextManager,
loglevel: str,
log: logging.Logger,
):
'''
Verify elasticsearch starts correctly (like at some point before
infinity time)..
'''
async def main():
port = 19200
async with (
open_test_pikerd(
loglevel=loglevel,
es=True
) as (
_, # host
_, # port
pikerd_portal,
services,
),
):
# TODO: much like the above connect loop for mkts, we should
# probably make this sync start part of the
# ``open_client()`` implementation?
for i in range(240):
with Elasticsearch(
hosts=[f'http://localhost:{port}']
) as es:
try:
resp = es.info()
assert (
resp['version']['number']
==
elastic._config['version']
)
print(
"OMG ELASTIX FINALLY CUKCING CONNECTED!>!>!\n"
f'resp: {resp}'
)
break
except ConnectionError:
log.exception(
f'RETRYING client connection for {i} time!'
)
await trio.sleep(1)
continue
trio.run(main)