diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ca18f2c4..f799bc22 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,13 +42,16 @@ jobs: - name: Checkout uses: actions/checkout@v3 + - name: Build DB container + run: docker build -t piker:elastic dockering/elastic + - name: Setup python uses: actions/setup-python@v3 with: python-version: '3.10' - name: Install dependencies - run: pip install -U . -r requirements-test.txt -r requirements.txt --upgrade-strategy eager + run: pip install -U .[es] -r requirements-test.txt -r requirements.txt --upgrade-strategy eager - name: Test suite run: pytest tests -rs diff --git a/piker/_daemon.py b/piker/_daemon.py index f1ced6e9..8983eccc 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -316,8 +316,6 @@ async def open_piker_runtime( @acm async def open_pikerd( - tsdb: bool, - es: bool, loglevel: str | None = None, @@ -326,6 +324,10 @@ async def open_pikerd( debug_mode: bool = False, registry_addr: None | tuple[str, int] = None, + # db init flags + tsdb: bool = False, + es: bool = False, + ) -> Services: ''' Start a root piker daemon who's lifetime extends indefinitely until @@ -383,7 +385,7 @@ async def open_pikerd( start_ahab, 'elasticsearch', start_elasticsearch, - start_timeout=30.0 + start_timeout=240.0 # high cause ci ) ) @@ -436,10 +438,10 @@ async def maybe_open_runtime( @acm async def maybe_open_pikerd( - tsdb: bool = False, - es: bool = False, loglevel: Optional[str] = None, registry_addr: None | tuple = None, + tsdb: bool = False, + es: bool = False, **kwargs, @@ -486,11 +488,11 @@ async def maybe_open_pikerd( # presume pikerd role since no daemon could be found at # configured address async with open_pikerd( - tsdb=tsdb, - es=es, loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), registry_addr=registry_addr, + tsdb=tsdb, + es=es, ) as service_manager: # in the case where we're starting up the diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 1964eb1b..2ab8680e 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -137,7 +137,13 @@ class Container: seen_so_far = self.seen_so_far while True: - logs = self.cntr.logs() + try: + logs = self.cntr.logs() + except docker.errors.NotFound: + return False + except docker.errors.APIError: + return False + entries = logs.decode().split('\n') for entry in entries: @@ -159,9 +165,6 @@ class Container: level = record['level'] except json.JSONDecodeError: - # if 'Error' in entry: - # raise RuntimeError(entry) - # raise msg = entry level = 'error' @@ -175,11 +178,11 @@ class Container: if level == 'fatal': raise ApplicationLogError(msg) - if patt_matcher(msg): + if await patt_matcher(msg): return True # do a checkpoint so we don't block if cancelled B) - await trio.sleep(0.01) + await trio.sleep(0.1) return False @@ -321,10 +324,13 @@ async def open_ahabd( with trio.move_on_after(start_timeout): found = await cntr.process_logs_until(start_lambda) - if not found and cntr not in client.containers.list(): - raise RuntimeError( - 'Failed to start `marketstore` check logs deats' - ) + if not found and dcntr not in client.containers.list(): + for entry in cntr.seen_so_far: + log.info(entry) + + raise RuntimeError( + f'Failed to start {dcntr.id} check logs deats' + ) await ctx.started(( cntr.cntr.id, diff --git a/piker/data/elastic.py b/piker/data/elastic.py index f9ed7b16..a501de4f 100644 --- a/piker/data/elastic.py +++ b/piker/data/elastic.py @@ -47,7 +47,7 @@ from piker.log import ( get_console_log ) -from elasticsearch import Elasticsearch +import asks log = get_logger(__name__) @@ -88,15 +88,33 @@ def start_elasticsearch( dcntr: DockerContainer = client.containers.run( 'piker:elastic', + name='piker-elastic', network='host', detach=True, - remove=True, + remove=True ) + async def start_matcher(msg: str): + try: + health = (await asks.get( + f'http://localhost:19200/_cat/health', + params={'format': 'json'} + )).json() + + except OSError: + log.error('couldnt reach elastic container') + return False + + log.info(health) + return health[0]['status'] == 'green' + + async def stop_matcher(msg: str): + return msg == 'closed' + return ( dcntr, {}, # expected startup and stop msgs - lambda start_msg: start_msg == "started", - lambda stop_msg: stop_msg == "closed", + start_matcher, + stop_matcher, ) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 4a6131b2..236bcfaf 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -189,13 +189,20 @@ def start_marketstore( init=True, # remove=True, ) + + async def start_matcher(msg: str): + return "launching tcp listener for all services..." in msg + + async def stop_matcher(msg: str): + return "exiting..." in msg + return ( dcntr, _config, # expected startup and stop msgs - lambda start_msg: "launching tcp listener for all services..." in start_msg, - lambda stop_msg: "exiting..." in stop_msg, + start_matcher, + stop_matcher, ) diff --git a/tests/conftest.py b/tests/conftest.py index 9ccc11ab..2cfaad7a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -119,8 +119,6 @@ def cse_symbols(): @acm async def _open_test_pikerd( - tsdb: bool = False, - es: bool = False, reg_addr: tuple[str, int] | None = None, **kwargs, @@ -145,8 +143,6 @@ async def _open_test_pikerd( # try: async with ( maybe_open_pikerd( - tsdb=tsdb, - es=es, registry_addr=reg_addr, **kwargs, ) as service_manager, diff --git a/tests/test_databases.py b/tests/test_databases.py index 32e9bbc1..907b716a 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -6,6 +6,8 @@ from typing import AsyncContextManager from piker._daemon import Services from piker.log import get_logger +from elasticsearch import Elasticsearch + # def test_marketstore( open_test_pikerd: AsyncContextManager): @@ -16,30 +18,27 @@ Verify marketstore starts and closes correctly def test_elasticsearch( - open_test_pikerd: AsyncContextManager, + open_test_pikerd: AsyncContextManager, ): - ''' + ''' Verify elasticsearch starts and closes correctly - ''' + ''' - # log = get_logger(__name__) + log = get_logger(__name__) - # log.info('#################### Starting test ####################') + # log.info('#################### Starting test ####################') - async def main(): - port = 19200 - daemon_addr = ('127.0.0.1', port) + async def main(): + port = 19200 - async with ( - open_test_pikerd( - tsdb=False, - es=True, - reg_addr=daemon_addr, - ) as (s, i, pikerd_portal, services), - # pikerd(), - ): - assert pikerd_portal.channel.raddr == daemon_addr + async with open_test_pikerd( + loglevel='info', + es=True + ) as (s, i, pikerd_portal, services): + + es = Elasticsearch(hosts=[f'http://localhost:{port}']) + assert es.info()['version']['number'] == '7.17.4' - trio.run(main) \ No newline at end of file + trio.run(main)