Add reconnect loop to `marketstore` startup test

Due to making ahabd supervisor init more async we need to be more
tolerant to mkts server startup: the grpc machinery needs to be up
otherwise a client which connects to early may just hang on requests..

Add a reconnect loop (which might end up getting factored into client
code too) so that we only block on requests once we know the client
connection is actually responsive.
service_subpkg
Tyler Goodlet 2023-03-09 12:23:46 -05:00
parent 75b7a8b56e
commit 2014019b06
1 changed files with 38 additions and 11 deletions

View File

@ -8,40 +8,67 @@ from piker.service import marketstore
def test_marketstore_startup_and_version(
open_test_pikerd: AsyncContextManager,
loglevel,
loglevel: str,
):
'''
Verify marketstore starts correctly
Verify marketstore tsdb starts up and we can
connect with a client to do basic API reqs.
'''
async def main():
# port = 5995
async with (
open_test_pikerd(
loglevel=loglevel,
tsdb=True
) as (s, i, pikerd_portal, services),
marketstore.get_client() as client
) as (
_, # host
_, # port
pikerd_portal,
services,
),
):
# XXX NOTE: we use a retry-connect loop because it seems
# that if we connect *too fast* to a booting container
# instance (i.e. if mkts's IPC machinery isn't up early
# enough) the client will hang on req-resp submissions. So,
# instead we actually reconnect the client entirely in
# a loop until we get a response.
for _ in range(3):
assert (
len(await client.server_version()) ==
len('3862e9973da36cfc6004b88172c08f09269aaf01')
)
# NOTE: default sockaddr is embedded within
async with marketstore.get_client() as client:
with trio.move_on_after(1) as cs:
syms = await client.list_symbols()
if cs.cancelled_caught:
continue
# should be an empty db?
assert not syms
print(f'RX syms resp: {syms}')
assert (
len(await client.server_version()) ==
len('3862e9973da36cfc6004b88172c08f09269aaf01')
)
print('VERSION CHECKED')
break # get out of retry-connect loop
trio.run(main)
def test_elasticsearch_startup_and_version(
open_test_pikerd: AsyncContextManager,
loglevel,
loglevel: str,
):
'''
Verify elasticsearch starts correctly
'''
async def main():
port = 19200