Add ES client polling to ensure eventual connectivity..

service_subpkg
Tyler Goodlet 2023-03-09 18:36:45 -05:00
parent 97290fcb05
commit 8ceaa27872
1 changed files with 35 additions and 10 deletions

View File

@ -1,12 +1,14 @@
import trio
from typing import AsyncContextManager from typing import AsyncContextManager
import logging
import trio
from elasticsearch import ( from elasticsearch import (
Elasticsearch, Elasticsearch,
ConnectionError, ConnectionError,
) )
from piker.service import marketstore from piker.service import marketstore
from piker.service import elastic
def test_marketstore_startup_and_version( def test_marketstore_startup_and_version(
@ -31,6 +33,9 @@ def test_marketstore_startup_and_version(
services, services,
), ),
): ):
# TODO: we should probably make this connection poll
# loop part of the `get_client()` implementation no?
# XXX NOTE: we use a retry-connect loop because it seems # XXX NOTE: we use a retry-connect loop because it seems
# that if we connect *too fast* to a booting container # that if we connect *too fast* to a booting container
# instance (i.e. if mkts's IPC machinery isn't up early # instance (i.e. if mkts's IPC machinery isn't up early
@ -67,9 +72,11 @@ def test_marketstore_startup_and_version(
def test_elasticsearch_startup_and_version( def test_elasticsearch_startup_and_version(
open_test_pikerd: AsyncContextManager, open_test_pikerd: AsyncContextManager,
loglevel: str, loglevel: str,
log: logging.Logger,
): ):
''' '''
Verify elasticsearch starts correctly Verify elasticsearch starts correctly (like at some point before
infinity time)..
''' '''
async def main(): async def main():
@ -86,14 +93,32 @@ def test_elasticsearch_startup_and_version(
services, services,
), ),
): ):
# TODO: much like the above connect loop for mkts, we should
for _ in range(240): # probably make this sync start part of the
# ``open_client()`` implementation?
for i in range(240):
with Elasticsearch(
hosts=[f'http://localhost:{port}']
) as es:
try: try:
es = Elasticsearch(hosts=[f'http://localhost:{port}'])
resp = es.info()
assert (
resp['version']['number']
==
elastic._config['version']
)
print(
"OMG ELASTIX FINALLY CUKCING CONNECTED!>!>!\n"
f'resp: {resp}'
)
break
except ConnectionError: except ConnectionError:
log.exception(
f'RETRYING client connection for {i} time!'
)
await trio.sleep(1) await trio.sleep(1)
continue continue
assert es.info()['version']['number'] == '7.17.4'
trio.run(main) trio.run(main)