From fe0695fb7b773a01b29051237d20db81c557f8e2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jan 2023 15:17:39 -0500 Subject: [PATCH 01/38] First draft storage layer cli Adds a `piker storage` subcmd with a `-d` flag to wipe a particular fqsn's time series (both 1s and 60s). Obviously this needs to be extended much more but provides a start point. --- piker/data/cli.py | 73 ++++++++++++++++++++++++++++++++++----- piker/data/marketstore.py | 23 +++++------- 2 files changed, 73 insertions(+), 23 deletions(-) diff --git a/piker/data/cli.py b/piker/data/cli.py index 554048a4..994b9da4 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -19,7 +19,10 @@ marketstore cli. """ from functools import partial -from pprint import pformat +from pprint import ( + pformat, + pprint, +) from anyio_marketstore import open_marketstore_client import trio @@ -113,15 +116,11 @@ def ms_stream( @cli.command() @click.option( - '--tl', - is_flag=True, - help='Enable tractor logging') -@click.option( - '--host', + '--tsdb_host', default='localhost' ) @click.option( - '--port', + '--tsdb_port', default=5993 ) @click.argument('symbols', nargs=-1) @@ -137,7 +136,7 @@ def storesh( Start an IPython shell ready to query the local marketstore db. ''' - from piker.data.marketstore import tsdb_history_update + from piker.data.marketstore import open_tsdb_client from piker._daemon import open_piker_runtime async def main(): @@ -148,7 +147,63 @@ def storesh( enable_modules=['piker.data._ahab'], ): symbol = symbols[0] - await tsdb_history_update(symbol) + + async with open_tsdb_client(symbol) as storage: + # TODO: ask if user wants to write history for detected + # available shm buffers? + from tractor.trionics import ipython_embed + await ipython_embed() + + trio.run(main) + + +@cli.command() +@click.option( + '--host', + default='localhost' +) +@click.option( + '--port', + default=5993 +) +@click.option( + '--delete', + '-d', + is_flag=True, + help='Delete history (1 Min) for symbol(s)', +) +@click.argument('symbols', nargs=-1) +@click.pass_obj +def storage( + config, + host, + port, + symbols: list[str], + delete: bool, + +): + ''' + Start an IPython shell ready to query the local marketstore db. + + ''' + from piker.data.marketstore import open_tsdb_client + from piker._daemon import open_piker_runtime + + async def main(): + nonlocal symbols + + async with open_piker_runtime( + 'tsdb_storage', + enable_modules=['piker.data._ahab'], + ): + symbol = symbols[0] + async with open_tsdb_client(symbol) as storage: + if delete: + for fqsn in symbols: + syms = await storage.client.list_symbols() + breakpoint() + await storage.delete_ts(fqsn, 60) + await storage.delete_ts(fqsn, 1) trio.run(main) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 190667d6..792396e3 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -510,7 +510,6 @@ class Storage: client = self.client syms = await client.list_symbols() - print(syms) if key not in syms: raise KeyError(f'`{key}` table key not found in\n{syms}?') @@ -627,10 +626,10 @@ async def open_storage_client( yield Storage(client) -async def tsdb_history_update( - fqsn: Optional[str] = None, - -) -> list[str]: +@acm +async def open_tsdb_client( + fqsn: str, +) -> Storage: # TODO: real-time dedicated task for ensuring # history consistency between the tsdb, shm and real-time feed.. @@ -659,7 +658,7 @@ async def tsdb_history_update( # - https://github.com/pikers/piker/issues/98 # profiler = Profiler( - disabled=False, # not pg_profile_enabled(), + disabled=True, # not pg_profile_enabled(), delayed=False, ) @@ -700,14 +699,10 @@ async def tsdb_history_update( # profiler('Finished db arrays diffs') - syms = await storage.client.list_symbols() - log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') - profiler(f'listed symbols {syms}') - - # TODO: ask if user wants to write history for detected - # available shm buffers? - from tractor.trionics import ipython_embed - await ipython_embed() + syms = await storage.client.list_symbols() + # log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') + # profiler(f'listed symbols {syms}') + yield storage # for array in [to_append, to_prepend]: # if array is None: From 7b196b1b97578789795a50e95c70fc33b47e63a0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 12:56:56 -0500 Subject: [PATCH 02/38] Support startup-config overrides to `ahabd` super With the addition of a new `elastixsearch` docker support in https://github.com/pikers/piker/pull/464, adjustments were made to container startup sync logic (particularly the `trio` checkpoint sleep period - which itself is a hack around a sync client api) which caused a regression in upstream startup logic wherein container error logs were not being bubbled up correctly causing a silent failure mode: - `marketstore` container started with corrupt input config - `ahabd` super code timed out on startup phase due to a larger log polling period, skipped processing startup logs from the container, and continued on as though the container was started - history client fails on grpc connection with no clear error on why the connection failed. Here we revert to the old poll period (1ms) to avoid any more silent failures and further extend supervisor control through a configuration override mechanism. To address the underlying design issue, this patch adds support for container-endpoint-callbacks to override supervisor startup configuration parameters via the 2nd value in their returned tuple: the already delivered configuration `dict` value. The current exposed values include: { 'startup_timeout': 1.0, 'startup_query_period': 0.001, 'log_msg_key': 'msg', }, This allows for container specific control over the startup-sync query period (the hack mentioned above) as well as the expected log msg key and of course the startup timeout. --- piker/data/_ahab.py | 85 ++++++++++++++++++++++++++++++++------------- 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 39a5b46a..66b41f38 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -18,6 +18,7 @@ Supervisor for docker with included specific-image service helpers. ''' +from collections import ChainMap import os import time from typing import ( @@ -124,10 +125,19 @@ class Container: async def process_logs_until( self, + log_msg_key: str, + # this is a predicate func for matching log msgs emitted by the # underlying containerized app patt_matcher: Callable[[str], bool], - bp_on_msg: bool = False, + + # XXX WARNING XXX: do not touch this sleep value unless + # you know what you are doing! the value is critical to + # making sure the caller code inside the startup context + # does not timeout BEFORE we receive a match on the + # ``patt_matcher()`` predicate above. + checkpoint_period: float = 0.001, + ) -> bool: ''' Attempt to capture container log messages and relay through our @@ -137,12 +147,14 @@ class Container: seen_so_far = self.seen_so_far while True: + logs = self.cntr.logs() try: logs = self.cntr.logs() except ( docker.errors.NotFound, docker.errors.APIError ): + log.exception('Failed to parse logs?') return False entries = logs.decode().split('\n') @@ -155,25 +167,23 @@ class Container: entry = entry.strip() try: record = json.loads(entry) - - if 'msg' in record: - msg = record['msg'] - elif 'message' in record: - msg = record['message'] - else: - raise KeyError(f'Unexpected log format\n{record}') - + msg = record[log_msg_key] level = record['level'] except json.JSONDecodeError: msg = entry level = 'error' - if msg and entry not in seen_so_far: - seen_so_far.add(entry) - if bp_on_msg: - await tractor.breakpoint() + # TODO: do we need a more general mechanism + # for these kinda of "log record entries"? + # if 'Error' in entry: + # raise RuntimeError(entry) + if ( + msg + and entry not in seen_so_far + ): + seen_so_far.add(entry) getattr(log, level.lower(), log.error)(f'{msg}') if level == 'fatal': @@ -183,7 +193,7 @@ class Container: return True # do a checkpoint so we don't block if cancelled B) - await trio.sleep(0.1) + await trio.sleep(checkpoint_period) return False @@ -301,7 +311,6 @@ class Container: async def open_ahabd( ctx: tractor.Context, endpoint: str, # ns-pointer str-msg-type - start_timeout: float = 1.0, **kwargs, @@ -322,16 +331,39 @@ async def open_ahabd( ) = ep_func(client) cntr = Container(dcntr) - with trio.move_on_after(start_timeout): - found = await cntr.process_logs_until(start_lambda) + conf: ChainMap[str, Any] = ChainMap( - if not found and dcntr not in client.containers.list(): - for entry in cntr.seen_so_far: - log.info(entry) + # container specific + cntr_config, - raise RuntimeError( - f'Failed to start {dcntr.id} check logs deats' - ) + # defaults + { + 'startup_timeout': 1.0, + 'startup_query_period': 0.001, + 'log_msg_key': 'msg', + }, + ) + + found = False + with trio.move_on_after(conf['startup_timeout']): + found = await cntr.process_logs_until( + conf['log_msg_key'], + start_lambda, + checkpoint_period=conf['startup_query_period'], + ) + + # XXX: if we timeout on finding the "startup msg" we expect then + # we want to FOR SURE raise an error upwards! + if ( + not found + and dcntr not in client.containers.list() + ): + for entry in cntr.seen_so_far: + log.info(entry) + + raise RuntimeError( + f'Failed to start {dcntr.id} check logs deats' + ) await ctx.started(( cntr.cntr.id, @@ -346,13 +378,17 @@ async def open_ahabd( await trio.sleep_forever() finally: + # TODO: ensure loglevel can be set and teardown logs are + # reported if possible on error or cancel.. + # with trio.CancelScope(shield=True): await cntr.cancel(stop_lambda) async def start_ahab( service_name: str, endpoint: Callable[docker.DockerClient, DockerContainer], - start_timeout: float = 1.0, + loglevel: str | None = None, + task_status: TaskStatus[ tuple[ trio.Event, @@ -400,7 +436,6 @@ async def start_ahab( async with portal.open_context( open_ahabd, endpoint=str(NamespacePath.from_ref(endpoint)), - start_timeout=start_timeout ) as (ctx, first): cid, pid, cntr_config = first From 959e423849bb96192e5542e385158fe174a373a5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 12:59:20 -0500 Subject: [PATCH 03/38] Add warning around detach flag to docker client --- piker/data/marketstore.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 792396e3..4cfc1826 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -185,7 +185,11 @@ def start_marketstore( config_dir_mnt, data_dir_mnt, ], + + # XXX: this must be set to allow backgrounding/non-blocking + # usage interaction with the container's process. detach=True, + # stop_signal='SIGINT', init=True, # remove=True, From 8c66f066bdab8d9f52c2d5bd35a6632f7afbc050 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 13:00:11 -0500 Subject: [PATCH 04/38] Deliver es specific ahab-super in endpoint startup config --- piker/data/elastic.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/piker/data/elastic.py b/piker/data/elastic.py index 43c6afd0..fadcaa5e 100644 --- a/piker/data/elastic.py +++ b/piker/data/elastic.py @@ -15,17 +15,11 @@ # along with this program. If not, see . from __future__ import annotations -from contextlib import asynccontextmanager as acm -from pprint import pformat from typing import ( Any, TYPE_CHECKING, ) -import pyqtgraph as pg -import numpy as np -import tractor - if TYPE_CHECKING: import docker @@ -65,14 +59,14 @@ def start_elasticsearch( -itd \ --rm \ --network=host \ - --mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \ + --mount type=bind,source="$(pwd)"/elastic,\ + target=/usr/share/elasticsearch/data \ --env "elastic_username=elastic" \ --env "elastic_password=password" \ --env "xpack.security.enabled=false" \ elastic ''' - import docker get_console_log('info', name=__name__) dcntr: DockerContainer = client.containers.run( @@ -86,7 +80,7 @@ def start_elasticsearch( async def start_matcher(msg: str): try: health = (await asks.get( - f'http://localhost:19200/_cat/health', + 'http://localhost:19200/_cat/health', params={'format': 'json'} )).json() @@ -102,7 +96,17 @@ def start_elasticsearch( return ( dcntr, - {}, + { + # apparently we're REALLY tolerant of startup latency + # for CI XD + 'startup_timeout': 240.0, + + # XXX: decrease http poll period bc docker + # is shite at handling fast poll rates.. + 'startup_query_period': 0.1, + + 'log_msg_key': 'message', + }, # expected startup and stop msgs start_matcher, stop_matcher, From 05b67c27d081c4e4ad3b04a8d7b73737658b43de Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 13:01:42 -0500 Subject: [PATCH 05/38] Apply `Services` runtime state **immediately** inside starup block --- piker/_daemon.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 8983eccc..c0c1df79 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -337,7 +337,6 @@ async def open_pikerd( alive underling services (see below). ''' - async with ( open_piker_runtime( @@ -355,7 +354,13 @@ async def open_pikerd( tractor.open_nursery() as actor_nursery, trio.open_nursery() as service_nursery, ): - assert root_actor.accept_addr == reg_addr + if root_actor.accept_addr != reg_addr: + raise RuntimeError(f'Daemon failed to bind on {reg_addr}!?') + + # assign globally for future daemon/task creation + Services.actor_n = actor_nursery + Services.service_n = service_nursery + Services.debug_mode = debug_mode if tsdb: from piker.data._ahab import start_ahab @@ -366,6 +371,7 @@ async def open_pikerd( start_ahab, 'marketstored', start_marketstore, + loglevel, ) log.info( @@ -385,7 +391,6 @@ async def open_pikerd( start_ahab, 'elasticsearch', start_elasticsearch, - start_timeout=240.0 # high cause ci ) ) @@ -396,12 +401,6 @@ async def open_pikerd( f'config: {pformat(config)}' ) - # assign globally for future daemon/task creation - Services.actor_n = actor_nursery - Services.service_n = service_nursery - Services.debug_mode = debug_mode - - try: yield Services @@ -695,7 +694,10 @@ async def maybe_spawn_brokerd( f'brokerd.{brokername}', service_task_target=spawn_brokerd, - spawn_args={'brokername': brokername, 'loglevel': loglevel}, + spawn_args={ + 'brokername': brokername, + 'loglevel': loglevel, + }, loglevel=loglevel, **kwargs, From b078a066212a24c2ea16465fcc6b1d861288fee8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 14:22:23 -0500 Subject: [PATCH 06/38] Doc string and types bump in loggin mod --- piker/log.py | 46 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/piker/log.py b/piker/log.py index 804e09dc..a36beec0 100644 --- a/piker/log.py +++ b/piker/log.py @@ -21,7 +21,11 @@ import logging import json import tractor -from pygments import highlight, lexers, formatters +from pygments import ( + highlight, + lexers, + formatters, +) # Makes it so we only see the full module name when using ``__name__`` # without the extra "piker." prefix. @@ -32,26 +36,48 @@ def get_logger( name: str = None, ) -> logging.Logger: - '''Return the package log or a sub-log for `name` if provided. + ''' + Return the package log or a sub-log for `name` if provided. + ''' return tractor.log.get_logger(name=name, _root_name=_proj_name) -def get_console_log(level: str = None, name: str = None) -> logging.Logger: - '''Get the package logger and enable a handler which writes to stderr. +def get_console_log( + level: str | None = None, + name: str | None = None, + +) -> logging.Logger: + ''' + Get the package logger and enable a handler which writes to stderr. Yeah yeah, i know we can use ``DictConfig``. You do it... + ''' return tractor.log.get_console_log( - level, name=name, _root_name=_proj_name) # our root logger + level, + name=name, + _root_name=_proj_name, + ) # our root logger -def colorize_json(data, style='algol_nu'): - """Colorize json output using ``pygments``. - """ - formatted_json = json.dumps(data, sort_keys=True, indent=4) +def colorize_json( + data: dict, + style='algol_nu', +): + ''' + Colorize json output using ``pygments``. + + ''' + formatted_json = json.dumps( + data, + sort_keys=True, + indent=4, + ) return highlight( - formatted_json, lexers.JsonLexer(), + formatted_json, + lexers.JsonLexer(), + # likeable styles: algol_nu, tango, monokai formatters.TerminalTrueColorFormatter(style=style) ) From 7694419e712bc7c11f756a87f1aba1cce5653c80 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 14:28:48 -0500 Subject: [PATCH 07/38] Background docker-container logs processing Previously we would make the `ahabd` supervisor-actor sync to docker container startup using pseudo-blocking log message processing. This has issues, - we're forced to do a hacky "yield back to `trio`" in order to be "fake async" when reading the log stream and further, - blocking on a message is fragile and often slow. Instead, run the log processor in a background task and in the parent task poll for the container to be in the client list using a similar pseudo-async poll pattern. This allows the super to `Context.started()` sooner (when the container is actually registered as "up") and thus unblock its (remote) caller faster whilst still doing full log msg proxying! Deatz: - adds `Container.cuid: str` a unique container id for logging. - correctly proxy through the `loglevel: str` from `pikerd` caller task. - shield around `Container.cancel()` in the teardown block and use cancel level logging in that method. --- piker/data/_ahab.py | 123 +++++++++++++++++++++++++++++++------------- 1 file changed, 86 insertions(+), 37 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 66b41f38..d2e042e3 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -19,6 +19,7 @@ Supervisor for docker with included specific-image service helpers. ''' from collections import ChainMap +from functools import partial import os import time from typing import ( @@ -46,7 +47,10 @@ from requests.exceptions import ( ReadTimeout, ) -from ..log import get_logger, get_console_log +from ..log import ( + get_logger, + get_console_log, +) from .. import config log = get_logger(__name__) @@ -197,6 +201,11 @@ class Container: return False + @property + def cuid(self) -> str: + fqcn: str = self.cntr.attrs['Config']['Image'] + return f'{fqcn}[{self.cntr.short_id}]' + def try_signal( self, signal: str = 'SIGINT', @@ -232,17 +241,23 @@ class Container: async def cancel( self, - stop_msg: str, + log_msg_key: str, + stop_predicate: Callable[[str], bool], + hard_kill: bool = False, ) -> None: + ''' + Attempt to cancel this container gracefully, fail over to + a hard kill on timeout. + ''' cid = self.cntr.id # first try a graceful cancel log.cancel( - f'SIGINT cancelling container: {cid}\n' - f'waiting on stop msg: "{stop_msg}"' + f'SIGINT cancelling container: {self.cuid}\n' + 'waiting on stop predicate...' ) self.try_signal('SIGINT') @@ -253,7 +268,10 @@ class Container: log.cancel('polling for CNTR logs...') try: - await self.process_logs_until(stop_msg) + await self.process_logs_until( + log_msg_key, + stop_predicate, + ) except ApplicationLogError: hard_kill = True else: @@ -311,11 +329,16 @@ class Container: async def open_ahabd( ctx: tractor.Context, endpoint: str, # ns-pointer str-msg-type + loglevel: str | None = 'cancel', **kwargs, ) -> None: - get_console_log('info', name=__name__) + + log = get_console_log( + loglevel, + name=__name__, + ) async with open_docker() as client: @@ -338,40 +361,63 @@ async def open_ahabd( # defaults { + # startup time limit which is the max the supervisor + # will wait for the container to be registered in + # ``client.containers.list()`` 'startup_timeout': 1.0, + + # how fast to poll for the starup predicate by sleeping + # this amount incrementally thus yielding to the + # ``trio`` scheduler on during sync polling execution. 'startup_query_period': 0.001, + + # str-key value expected to contain log message body-contents + # when read using: + # ``json.loads(entry for entry in DockerContainer.logs())`` 'log_msg_key': 'msg', }, ) - found = False - with trio.move_on_after(conf['startup_timeout']): - found = await cntr.process_logs_until( - conf['log_msg_key'], - start_lambda, - checkpoint_period=conf['startup_query_period'], - ) + with trio.move_on_after(conf['startup_timeout']) as cs: + async with trio.open_nursery() as tn: + tn.start_soon( + partial( + cntr.process_logs_until, + log_msg_key=conf['log_msg_key'], + patt_matcher=start_lambda, + checkpoint_period=conf['startup_query_period'], + ) + ) - # XXX: if we timeout on finding the "startup msg" we expect then - # we want to FOR SURE raise an error upwards! - if ( - not found - and dcntr not in client.containers.list() - ): - for entry in cntr.seen_so_far: - log.info(entry) + # poll for container startup or timeout + while not cs.cancel_called: + if dcntr in client.containers.list(): + break - raise RuntimeError( - f'Failed to start {dcntr.id} check logs deats' - ) + await trio.sleep(conf['startup_query_period']) - await ctx.started(( - cntr.cntr.id, - os.getpid(), - cntr_config, - )) + # sync with remote caller actor-task but allow log + # processing to continue running in bg. + await ctx.started(( + cntr.cntr.id, + os.getpid(), + cntr_config, + )) try: + # XXX: if we timeout on finding the "startup msg" we expect then + # we want to FOR SURE raise an error upwards! + if cs.cancelled_caught: + # if dcntr not in client.containers.list(): + for entry in cntr.seen_so_far: + log.info(entry) + + raise DockerNotStarted( + f'Failed to start container: {dcntr.cuid}\n' + f'due to startup_timeout={conf["startup_timeout"]}s\n\n' + "prolly you should check your container's logs for deats.." + ) + # TODO: we might eventually want a proxy-style msg-prot here # to allow remote control of containers without needing # callers to have root perms? @@ -380,14 +426,17 @@ async def open_ahabd( finally: # TODO: ensure loglevel can be set and teardown logs are # reported if possible on error or cancel.. - # with trio.CancelScope(shield=True): - await cntr.cancel(stop_lambda) + with trio.CancelScope(shield=True): + await cntr.cancel( + log_msg_key=conf['log_msg_key'], + stop_predicate=stop_lambda, + ) async def start_ahab( service_name: str, endpoint: Callable[docker.DockerClient, DockerContainer], - loglevel: str | None = None, + loglevel: str | None = 'cancel', task_status: TaskStatus[ tuple[ @@ -409,13 +458,12 @@ async def start_ahab( ''' cn_ready = trio.Event() try: - async with tractor.open_nursery( - loglevel='runtime', - ) as tn: + async with tractor.open_nursery() as an: - portal = await tn.start_actor( + portal = await an.start_actor( service_name, - enable_modules=[__name__] + enable_modules=[__name__], + loglevel=loglevel, ) # TODO: we have issues with this on teardown @@ -436,6 +484,7 @@ async def start_ahab( async with portal.open_context( open_ahabd, endpoint=str(NamespacePath.from_ref(endpoint)), + loglevel=loglevel, ) as (ctx, first): cid, pid, cntr_config = first From bb723abc9d774fa2ce61269c1c97155d1698a20b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 14:56:21 -0500 Subject: [PATCH 08/38] Always passthrough loglevel to `ahabd` supervisor --- piker/_daemon.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index c0c1df79..1a8576a1 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -368,10 +368,12 @@ async def open_pikerd( log.info('Spawning `marketstore` supervisor') ctn_ready, config, (cid, pid) = await service_nursery.start( - start_ahab, - 'marketstored', - start_marketstore, - loglevel, + partial( + start_ahab, + 'marketstored', + start_marketstore, + loglevel=loglevel, + ) ) log.info( @@ -391,6 +393,7 @@ async def open_pikerd( start_ahab, 'elasticsearch', start_elasticsearch, + loglevel=loglevel, ) ) From 56629b6b2e584e58fa4dd7167ce2e7c4db43e6f5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 15:00:24 -0500 Subject: [PATCH 09/38] Hardcode `cancel` log level for `ahabd` for now --- piker/data/_ahab.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index d2e042e3..2c0230f1 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -413,7 +413,7 @@ async def open_ahabd( log.info(entry) raise DockerNotStarted( - f'Failed to start container: {dcntr.cuid}\n' + f'Failed to start container: {cntr.cuid}\n' f'due to startup_timeout={conf["startup_timeout"]}s\n\n' "prolly you should check your container's logs for deats.." ) @@ -484,7 +484,7 @@ async def start_ahab( async with portal.open_context( open_ahabd, endpoint=str(NamespacePath.from_ref(endpoint)), - loglevel=loglevel, + loglevel='cancel', ) as (ctx, first): cid, pid, cntr_config = first From bfe3ea1f59bef5986b2ebb069d1633abfdf90e6b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 15:01:06 -0500 Subject: [PATCH 10/38] Set explicit `marketstore` container startup timeout --- piker/data/marketstore.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 4cfc1826..6e3ed78f 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -63,11 +63,12 @@ from .._profile import Profiler log = get_logger(__name__) -# container level config +# ahabd-supervisor and container level config _config = { 'grpc_listen_port': 5995, 'ws_listen_port': 5993, 'log_level': 'debug', + 'startup_timeout': 1, } _yaml_config = ''' From 93c81fa4d1c4dc877dde0c81a5664e78cb4db635 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 15:14:39 -0500 Subject: [PATCH 11/38] Start `piker.service` sub-package For now just moves everything that was in `piker._daemon` to a subpkg module but a reorg is coming pronto! --- piker/__init__.py | 8 ++++---- piker/brokers/cli.py | 21 +++++++++++++-------- piker/brokers/core.py | 2 +- piker/clearing/_client.py | 7 +++++-- piker/cli/__init__.py | 14 ++++++++------ piker/data/_ahab.py | 14 +++++++++----- piker/data/_sampling.py | 8 ++++---- piker/data/cli.py | 4 ++-- piker/data/feed.py | 2 +- piker/{_daemon.py => service/__init__.py} | 13 ++++++------- piker/ui/_app.py | 2 +- piker/ui/_exec.py | 2 +- piker/ui/cli.py | 2 +- tests/conftest.py | 12 ++++++------ tests/test_databases.py | 2 +- tests/test_services.py | 3 +-- 16 files changed, 64 insertions(+), 52 deletions(-) rename piker/{_daemon.py => service/__init__.py} (99%) diff --git a/piker/__init__.py b/piker/__init__.py index d08c2dbc..6ebeec3d 100644 --- a/piker/__init__.py +++ b/piker/__init__.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers. -# Copyright 2020-eternity Tyler Goodlet (in stewardship for piker0) +# Copyright 2020-eternity Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -14,11 +14,11 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' piker: trading gear for hackers. -""" -from ._daemon import open_piker_runtime +''' +from .service import open_piker_runtime from .data.feed import open_feed __all__ = [ diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 0d84384d..f86c679e 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -29,8 +29,15 @@ import tractor from ..cli import cli from .. import watchlists as wl from ..log import get_console_log, colorize_json, get_logger -from .._daemon import maybe_spawn_brokerd, maybe_open_pikerd -from ..brokers import core, get_brokermod, data +from ..service import ( + maybe_spawn_brokerd, + maybe_open_pikerd, +) +from ..brokers import ( + core, + get_brokermod, + data, +) log = get_logger('cli') DEFAULT_BROKER = 'questrade' @@ -60,6 +67,7 @@ def get_method(client, meth_name: str): print_ok('found!.') return method + async def run_method(client, meth_name: str, **kwargs): method = get_method(client, meth_name) print('running...', end='', flush=True) @@ -67,19 +75,20 @@ async def run_method(client, meth_name: str, **kwargs): print_ok(f'done! result: {type(result)}') return result + async def run_test(broker_name: str): brokermod = get_brokermod(broker_name) total = 0 passed = 0 failed = 0 - print(f'getting client...', end='', flush=True) + print('getting client...', end='', flush=True) if not hasattr(brokermod, 'get_client'): print_error('fail! no \'get_client\' context manager found.') return async with brokermod.get_client(is_brokercheck=True) as client: - print_ok(f'done! inside client context.') + print_ok('done! inside client context.') # check for methods present on brokermod method_list = [ @@ -130,7 +139,6 @@ async def run_test(broker_name: str): total += 1 - # check for methods present con brokermod.Client and their # results @@ -180,7 +188,6 @@ def brokercheck(config, broker): trio.run(run_test, broker) - @cli.command() @click.option('--keys', '-k', multiple=True, help='Return results only for these keys') @@ -335,8 +342,6 @@ def contracts(ctx, loglevel, broker, symbol, ids): brokermod = get_brokermod(broker) get_console_log(loglevel) - - contracts = trio.run(partial(core.contracts, brokermod, symbol)) if not ids: # just print out expiry dates which can be used with diff --git a/piker/brokers/core.py b/piker/brokers/core.py index af5da3a1..3e9e1614 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -28,7 +28,7 @@ import trio from ..log import get_logger from . import get_brokermod -from .._daemon import maybe_spawn_brokerd +from ..service import maybe_spawn_brokerd from .._cacheables import open_cached_client diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 0a40b548..7d03406a 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -29,8 +29,11 @@ from tractor.trionics import broadcast_receiver from ..log import get_logger from ..data.types import Struct -from .._daemon import maybe_open_emsd -from ._messages import Order, Cancel +from ..service import maybe_open_emsd +from ._messages import ( + Order, + Cancel, +) from ..brokers import get_brokermod if TYPE_CHECKING: diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 9b6f225c..b4d13505 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -19,16 +19,18 @@ CLI commons. ''' import os -from pprint import pformat -from functools import partial import click import trio import tractor -from ..log import get_console_log, get_logger, colorize_json +from ..log import ( + get_console_log, + get_logger, + colorize_json, +) from ..brokers import get_brokermod -from .._daemon import ( +from ..service import ( _default_registry_host, _default_registry_port, ) @@ -68,7 +70,7 @@ def pikerd( ''' - from .._daemon import open_pikerd + from ..service import open_pikerd log = get_console_log(loglevel) if pdb: @@ -171,7 +173,7 @@ def cli( @click.pass_obj def services(config, tl, ports): - from .._daemon import ( + from ..service import ( open_piker_runtime, _default_registry_port, _default_registry_host, diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 2c0230f1..38d4a9e7 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -426,11 +426,15 @@ async def open_ahabd( finally: # TODO: ensure loglevel can be set and teardown logs are # reported if possible on error or cancel.. - with trio.CancelScope(shield=True): - await cntr.cancel( - log_msg_key=conf['log_msg_key'], - stop_predicate=stop_lambda, - ) + # XXX WARNING: currently shielding here can result in hangs + # on ctl-c from user.. ideally we can avoid a cancel getting + # consumed and not propagating whilst still doing teardown + # logging.. + # with trio.CancelScope(shield=True): + await cntr.cancel( + log_msg_key=conf['log_msg_key'], + stop_predicate=stop_lambda, + ) async def start_ahab( diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index a5df96cc..f44304bf 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -42,7 +42,7 @@ from ..log import ( get_logger, get_console_log, ) -from .._daemon import maybe_spawn_daemon +from ..service import maybe_spawn_daemon if TYPE_CHECKING: from ._sharedmem import ( @@ -68,8 +68,8 @@ class Sampler: This non-instantiated type is meant to be a singleton within a `samplerd` actor-service spawned once by the user wishing to - time-step sample real-time quote feeds, see - ``._daemon.maybe_open_samplerd()`` and the below + time-step-sample (real-time) quote feeds, see + ``.service.maybe_open_samplerd()`` and the below ``register_with_sampler()``. ''' @@ -379,7 +379,7 @@ async def spawn_samplerd( update and increment count write and stream broadcasting. ''' - from piker._daemon import Services + from piker.service import Services dname = 'samplerd' log.info(f'Spawning `{dname}`') diff --git a/piker/data/cli.py b/piker/data/cli.py index 994b9da4..7c8b9a68 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -137,7 +137,7 @@ def storesh( ''' from piker.data.marketstore import open_tsdb_client - from piker._daemon import open_piker_runtime + from piker.service import open_piker_runtime async def main(): nonlocal symbols @@ -187,7 +187,7 @@ def storage( ''' from piker.data.marketstore import open_tsdb_client - from piker._daemon import open_piker_runtime + from piker.service import open_piker_runtime async def main(): nonlocal symbols diff --git a/piker/data/feed.py b/piker/data/feed.py index 906f4bb4..a31e955a 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -58,7 +58,7 @@ from ..log import ( get_logger, get_console_log, ) -from .._daemon import ( +from ..service import ( maybe_spawn_brokerd, check_for_service, ) diff --git a/piker/_daemon.py b/piker/service/__init__.py similarity index 99% rename from piker/_daemon.py rename to piker/service/__init__.py index 1a8576a1..6788b764 100644 --- a/piker/_daemon.py +++ b/piker/service/__init__.py @@ -19,6 +19,8 @@ Structured, daemon tree service management. """ from __future__ import annotations +from pprint import pformat +from functools import partial import os from typing import ( Optional, @@ -35,14 +37,11 @@ import tractor import trio from trio_typing import TaskStatus -from .log import ( +from ..log import ( get_logger, get_console_log, ) -from .brokers import get_brokermod - -from pprint import pformat -from functools import partial +from ..brokers import get_brokermod log = get_logger(__name__) @@ -669,7 +668,7 @@ async def spawn_brokerd( ) # non-blocking setup of brokerd service nursery - from .data import _setup_persistent_brokerd + from ..data import _setup_persistent_brokerd await Services.start_service_task( dname, @@ -732,7 +731,7 @@ async def spawn_emsd( ) # non-blocking setup of clearing service - from .clearing._ems import _setup_persistent_emsd + from ..clearing._ems import _setup_persistent_emsd await Services.start_service_task( 'emsd', diff --git a/piker/ui/_app.py b/piker/ui/_app.py index 3be073e7..9978dbe3 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -24,7 +24,7 @@ from types import ModuleType from PyQt5.QtCore import QEvent import trio -from .._daemon import maybe_spawn_brokerd +from ..service import maybe_spawn_brokerd from . import _event from ._exec import run_qtractor from ..data.feed import install_brokerd_search diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index d8eabb70..19663cac 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -49,7 +49,7 @@ from qdarkstyle import DarkPalette import trio from outcome import Error -from .._daemon import ( +from ..service import ( maybe_open_pikerd, get_tractor_runtime_kwargs, ) diff --git a/piker/ui/cli.py b/piker/ui/cli.py index a72c2f5c..9b8385f2 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -24,7 +24,7 @@ import tractor from ..cli import cli from .. import watchlists as wl -from .._daemon import maybe_spawn_brokerd +from ..service import maybe_spawn_brokerd _config_dir = click.get_app_dir('piker') diff --git a/tests/conftest.py b/tests/conftest.py index 8218ec16..68d392aa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,7 +1,6 @@ from contextlib import asynccontextmanager as acm from functools import partial import os -from typing import AsyncContextManager from pathlib import Path from shutil import rmtree @@ -11,7 +10,7 @@ from piker import ( # log, config, ) -from piker._daemon import ( +from piker.service import ( Services, ) from piker.clearing._client import open_ems @@ -88,7 +87,7 @@ async def _open_test_pikerd( ''' import random - from piker._daemon import maybe_open_pikerd + from piker.service import maybe_open_pikerd if reg_addr is None: port = random.randint(6e3, 7e3) @@ -151,8 +150,9 @@ async def _open_test_pikerd_and_ems( fqsn, mode=mode, loglevel=loglevel, - ) as ems_services): - yield (services, ems_services) + ) as ems_services, + ): + yield (services, ems_services) @pytest.fixture @@ -168,7 +168,7 @@ def open_test_pikerd_and_ems( mode, loglevel, open_test_pikerd - ) + ) @pytest.fixture(scope='module') diff --git a/tests/test_databases.py b/tests/test_databases.py index 4eb444f3..7fcee34a 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -3,7 +3,7 @@ import trio from typing import AsyncContextManager -from piker._daemon import Services +from piker.service import Services from piker.log import get_logger from elasticsearch import Elasticsearch diff --git a/tests/test_services.py b/tests/test_services.py index 763b438e..29e613e3 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -9,8 +9,7 @@ import pytest import trio import tractor -from piker.log import get_logger -from piker._daemon import ( +from piker.service import ( find_service, Services, ) From afac553ea2711e6563cb03f1d852cbfd48aaf391 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 15:25:20 -0500 Subject: [PATCH 12/38] Move all docker and external db code to `piker.service` --- piker/cli/__init__.py | 4 ++-- piker/data/cli.py | 4 ++-- piker/service/__init__.py | 4 ++-- piker/{data => service}/_ahab.py | 0 piker/{data => service}/elastic.py | 0 piker/{data => service}/marketstore.py | 5 ++--- 6 files changed, 8 insertions(+), 9 deletions(-) rename piker/{data => service}/_ahab.py (100%) rename piker/{data => service}/elastic.py (100%) rename piker/{data => service}/marketstore.py (99%) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index b4d13505..63b8321a 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -206,8 +206,8 @@ def services(config, tl, ports): def _load_clis() -> None: - from ..data import marketstore # noqa - from ..data import elastic + from ..service import marketstore # noqa + from ..service import elastic from ..data import cli # noqa from ..brokers import cli # noqa from ..ui import cli # noqa diff --git a/piker/data/cli.py b/piker/data/cli.py index 7c8b9a68..cb081c6e 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -144,7 +144,7 @@ def storesh( async with open_piker_runtime( 'storesh', - enable_modules=['piker.data._ahab'], + enable_modules=['piker.service._ahab'], ): symbol = symbols[0] @@ -194,7 +194,7 @@ def storage( async with open_piker_runtime( 'tsdb_storage', - enable_modules=['piker.data._ahab'], + enable_modules=['piker.service._ahab'], ): symbol = symbols[0] async with open_tsdb_client(symbol) as storage: diff --git a/piker/service/__init__.py b/piker/service/__init__.py index 6788b764..04ec4c28 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -362,8 +362,8 @@ async def open_pikerd( Services.debug_mode = debug_mode if tsdb: - from piker.data._ahab import start_ahab - from piker.data.marketstore import start_marketstore + from ._ahab import start_ahab + from .marketstore import start_marketstore log.info('Spawning `marketstore` supervisor') ctn_ready, config, (cid, pid) = await service_nursery.start( diff --git a/piker/data/_ahab.py b/piker/service/_ahab.py similarity index 100% rename from piker/data/_ahab.py rename to piker/service/_ahab.py diff --git a/piker/data/elastic.py b/piker/service/elastic.py similarity index 100% rename from piker/data/elastic.py rename to piker/service/elastic.py diff --git a/piker/data/marketstore.py b/piker/service/marketstore.py similarity index 99% rename from piker/data/marketstore.py rename to piker/service/marketstore.py index 6e3ed78f..5613bd8d 100644 --- a/piker/data/marketstore.py +++ b/piker/service/marketstore.py @@ -26,7 +26,6 @@ from __future__ import annotations from contextlib import asynccontextmanager as acm from datetime import datetime -from pprint import pformat from typing import ( Any, Optional, @@ -55,7 +54,7 @@ if TYPE_CHECKING: import docker from ._ahab import DockerContainer -from .feed import maybe_open_feed +from ..data.feed import maybe_open_feed from ..log import get_logger, get_console_log from .._profile import Profiler @@ -136,7 +135,7 @@ def start_marketstore( # create dirs when dne if not os.path.isdir(config._config_dir): - Path(config._config_dir).mkdir(parents=True, exist_ok=True) + Path(config._config_dir).mkdir(parents=True, exist_ok=True) if not os.path.isdir(mktsdir): os.mkdir(mktsdir) From dd87d1142e19e1de6fd3e0719a4cc970bc584576 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 15:25:38 -0500 Subject: [PATCH 13/38] Bump mkts timeout to 2s --- piker/service/marketstore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index 5613bd8d..e9de9558 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -67,7 +67,7 @@ _config = { 'grpc_listen_port': 5995, 'ws_listen_port': 5993, 'log_level': 'debug', - 'startup_timeout': 1, + 'startup_timeout': 2, } _yaml_config = ''' From b226b678e995eb63f726c45d2ccd36e9fc65d87b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 15:48:17 -0500 Subject: [PATCH 14/38] Fix missed `marketstore` mod imports --- piker/data/cli.py | 2 +- piker/data/feed.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/data/cli.py b/piker/data/cli.py index cb081c6e..6f4e169d 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -30,7 +30,7 @@ import tractor import click import numpy as np -from .marketstore import ( +from ..service.marketstore import ( get_client, # stream_quotes, ingest_quote_stream, diff --git a/piker/data/feed.py b/piker/data/feed.py index a31e955a..69d5be7d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -86,7 +86,7 @@ from ..brokers._util import ( ) if TYPE_CHECKING: - from .marketstore import Storage + from ..service.marketstore import Storage log = get_logger(__name__) @@ -865,7 +865,7 @@ async def manage_history( ): log.info('Found existing `marketstored`') - from . import marketstore + from ..service import marketstore async with ( marketstore.open_storage_client(fqsn)as storage, ): From 31f2b01c3ecb4e2411d116159bc5bc6f5bbd9c5a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 15:59:19 -0500 Subject: [PATCH 15/38] Move `Services` api to `.service._mngr` mod --- piker/service/__init__.py | 110 ++---------------------------- piker/service/_mngr.py | 136 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 105 deletions(-) create mode 100644 piker/service/_mngr.py diff --git a/piker/service/__init__.py b/piker/service/__init__.py index 04ec4c28..e5e2c1fa 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -15,7 +15,7 @@ # along with this program. If not, see . """ -Structured, daemon tree service management. +Actor-runtime service orchestration machinery. """ from __future__ import annotations @@ -31,17 +31,18 @@ from typing import ( from contextlib import ( asynccontextmanager as acm, ) -from collections import defaultdict import tractor import trio -from trio_typing import TaskStatus from ..log import ( get_logger, get_console_log, ) from ..brokers import get_brokermod +from ._mngr import ( + Services, +) log = get_logger(__name__) @@ -142,107 +143,6 @@ _root_modules = [ ] -# TODO: factor this into a ``tractor.highlevel`` extension -# pack for the library. -class Services: - - actor_n: tractor._supervise.ActorNursery - service_n: trio.Nursery - debug_mode: bool # tractor sub-actor debug mode flag - service_tasks: dict[ - str, - tuple[ - trio.CancelScope, - tractor.Portal, - trio.Event, - ] - ] = {} - locks = defaultdict(trio.Lock) - - @classmethod - async def start_service_task( - self, - name: str, - portal: tractor.Portal, - target: Callable, - **kwargs, - - ) -> (trio.CancelScope, tractor.Context): - ''' - Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` teardown. - - This allows for allocating long-running sub-services in our main - daemon and explicitly controlling their lifetimes. - - ''' - async def open_context_in_task( - task_status: TaskStatus[ - tuple[ - trio.CancelScope, - trio.Event, - Any, - ] - ] = trio.TASK_STATUS_IGNORED, - - ) -> Any: - - with trio.CancelScope() as cs: - async with portal.open_context( - target, - **kwargs, - - ) as (ctx, first): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started((cs, complete, first)) - log.info( - f'`pikerd` service {name} started with value {first}' - ) - try: - # wait on any context's return value - # and any final portal result from the - # sub-actor. - ctx_res = await ctx.result() - - # NOTE: blocks indefinitely until cancelled - # either by error from the target context - # function or by being cancelled here by the - # surrounding cancel scope. - return (await portal.result(), ctx_res) - - finally: - await portal.cancel_actor() - complete.set() - self.service_tasks.pop(name) - - cs, complete, first = await self.service_n.start(open_context_in_task) - - # store the cancel scope and portal for later cancellation or - # retstart if needed. - self.service_tasks[name] = (cs, portal, complete) - - return cs, first - - @classmethod - async def cancel_service( - self, - name: str, - - ) -> Any: - ''' - Cancel the service task and actor for the given ``name``. - - ''' - log.info(f'Cancelling `pikerd` service {name}') - cs, portal, complete = self.service_tasks[name] - cs.cancel() - await complete.wait() - assert name not in self.service_tasks, \ - f'Serice task for {name} not terminated?' - - @acm async def open_piker_runtime( name: str, diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py new file mode 100644 index 00000000..04f396af --- /dev/null +++ b/piker/service/_mngr.py @@ -0,0 +1,136 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +daemon-service management API. + +""" +from collections import defaultdict +from typing import ( + Callable, + Any, +) + +import trio +from trio_typing import TaskStatus +import tractor + +from ..log import ( + get_logger, +) + +log = get_logger(__name__) + + +# TODO: factor this into a ``tractor.highlevel`` extension +# pack for the library. +class Services: + + actor_n: tractor._supervise.ActorNursery + service_n: trio.Nursery + debug_mode: bool # tractor sub-actor debug mode flag + service_tasks: dict[ + str, + tuple[ + trio.CancelScope, + tractor.Portal, + trio.Event, + ] + ] = {} + locks = defaultdict(trio.Lock) + + @classmethod + async def start_service_task( + self, + name: str, + portal: tractor.Portal, + target: Callable, + **kwargs, + + ) -> (trio.CancelScope, tractor.Context): + ''' + Open a context in a service sub-actor, add to a stack + that gets unwound at ``pikerd`` teardown. + + This allows for allocating long-running sub-services in our main + daemon and explicitly controlling their lifetimes. + + ''' + async def open_context_in_task( + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + trio.Event, + Any, + ] + ] = trio.TASK_STATUS_IGNORED, + + ) -> Any: + + with trio.CancelScope() as cs: + async with portal.open_context( + target, + **kwargs, + + ) as (ctx, first): + + # unblock once the remote context has started + complete = trio.Event() + task_status.started((cs, complete, first)) + log.info( + f'`pikerd` service {name} started with value {first}' + ) + try: + # wait on any context's return value + # and any final portal result from the + # sub-actor. + ctx_res = await ctx.result() + + # NOTE: blocks indefinitely until cancelled + # either by error from the target context + # function or by being cancelled here by the + # surrounding cancel scope. + return (await portal.result(), ctx_res) + + finally: + await portal.cancel_actor() + complete.set() + self.service_tasks.pop(name) + + cs, complete, first = await self.service_n.start(open_context_in_task) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_tasks[name] = (cs, portal, complete) + + return cs, first + + @classmethod + async def cancel_service( + self, + name: str, + + ) -> Any: + ''' + Cancel the service task and actor for the given ``name``. + + ''' + log.info(f'Cancelling `pikerd` service {name}') + cs, portal, complete = self.service_tasks[name] + cs.cancel() + await complete.wait() + assert name not in self.service_tasks, \ + f'Serice task for {name} not terminated?' From a2d40937a3b030fd6e24a4abec0f32ab85e23bca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 16:05:49 -0500 Subject: [PATCH 16/38] Move actor-discovery utils to `.service._registry --- piker/service/__init__.py | 123 ++++--------------------------- piker/service/_registry.py | 144 +++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 109 deletions(-) create mode 100644 piker/service/_registry.py diff --git a/piker/service/__init__.py b/piker/service/__init__.py index e5e2c1fa..e365e7fd 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -43,89 +43,25 @@ from ..brokers import get_brokermod from ._mngr import ( Services, ) - +from ._registry import ( # noqa + _tractor_kwargs, + _default_reg_addr, + _default_registry_host, + _default_registry_port, + open_registry, + find_service, + check_for_service, +) log = get_logger(__name__) + +__all__ = [ + 'check_for_service', +] + _root_dname = 'pikerd' -_default_registry_host: str = '127.0.0.1' -_default_registry_port: int = 6116 -_default_reg_addr: tuple[str, int] = ( - _default_registry_host, - _default_registry_port, -) - - -# NOTE: this value is set as an actor-global once the first endpoint -# who is capable, spawns a `pikerd` service tree. -_registry: Registry | None = None - - -class Registry: - addr: None | tuple[str, int] = None - - # TODO: table of uids to sockaddrs - peers: dict[ - tuple[str, str], - tuple[str, int], - ] = {} - - -_tractor_kwargs: dict[str, Any] = {} - - -@acm -async def open_registry( - addr: None | tuple[str, int] = None, - ensure_exists: bool = True, - -) -> tuple[str, int]: - - global _tractor_kwargs - actor = tractor.current_actor() - uid = actor.uid - if ( - Registry.addr is not None - and addr - ): - raise RuntimeError( - f'`{uid}` registry addr already bound @ {_registry.sockaddr}' - ) - - was_set: bool = False - - if ( - not tractor.is_root_process() - and Registry.addr is None - ): - Registry.addr = actor._arb_addr - - if ( - ensure_exists - and Registry.addr is None - ): - raise RuntimeError( - f"`{uid}` registry should already exist bug doesn't?" - ) - - if ( - Registry.addr is None - ): - was_set = True - Registry.addr = addr or _default_reg_addr - - _tractor_kwargs['arbiter_addr'] = Registry.addr - - try: - yield Registry.addr - finally: - # XXX: always clear the global addr if we set it so that the - # next (set of) calls will apply whatever new one is passed - # in. - if was_set: - Registry.addr = None - def get_tractor_runtime_kwargs() -> dict[str, Any]: ''' @@ -415,37 +351,6 @@ _data_mods = [ ] -@acm -async def find_service( - service_name: str, -) -> tractor.Portal | None: - - async with open_registry() as reg_addr: - log.info(f'Scanning for service `{service_name}`') - # attach to existing daemon by name if possible - async with tractor.find_actor( - service_name, - arbiter_sockaddr=reg_addr, - ) as maybe_portal: - yield maybe_portal - - -async def check_for_service( - service_name: str, - -) -> None | tuple[str, int]: - ''' - Service daemon "liveness" predicate. - - ''' - async with open_registry(ensure_exists=False) as reg_addr: - async with tractor.query_actor( - service_name, - arbiter_sockaddr=reg_addr, - ) as sockaddr: - return sockaddr - - @acm async def maybe_spawn_daemon( diff --git a/piker/service/_registry.py b/piker/service/_registry.py new file mode 100644 index 00000000..f487e2a4 --- /dev/null +++ b/piker/service/_registry.py @@ -0,0 +1,144 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Inter-actor "discovery" (protocol) layer. + +""" +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, +) +from typing import ( + Any, +) + +import tractor + + +from ..log import ( + get_logger, +) + +log = get_logger(__name__) + +_default_registry_host: str = '127.0.0.1' +_default_registry_port: int = 6116 +_default_reg_addr: tuple[str, int] = ( + _default_registry_host, + _default_registry_port, +) + + +# NOTE: this value is set as an actor-global once the first endpoint +# who is capable, spawns a `pikerd` service tree. +_registry: Registry | None = None + + +class Registry: + addr: None | tuple[str, int] = None + + # TODO: table of uids to sockaddrs + peers: dict[ + tuple[str, str], + tuple[str, int], + ] = {} + + +_tractor_kwargs: dict[str, Any] = {} + + +@acm +async def open_registry( + addr: None | tuple[str, int] = None, + ensure_exists: bool = True, + +) -> tuple[str, int]: + + global _tractor_kwargs + actor = tractor.current_actor() + uid = actor.uid + if ( + Registry.addr is not None + and addr + ): + raise RuntimeError( + f'`{uid}` registry addr already bound @ {_registry.sockaddr}' + ) + + was_set: bool = False + + if ( + not tractor.is_root_process() + and Registry.addr is None + ): + Registry.addr = actor._arb_addr + + if ( + ensure_exists + and Registry.addr is None + ): + raise RuntimeError( + f"`{uid}` registry should already exist bug doesn't?" + ) + + if ( + Registry.addr is None + ): + was_set = True + Registry.addr = addr or _default_reg_addr + + _tractor_kwargs['arbiter_addr'] = Registry.addr + + try: + yield Registry.addr + finally: + # XXX: always clear the global addr if we set it so that the + # next (set of) calls will apply whatever new one is passed + # in. + if was_set: + Registry.addr = None + + +@acm +async def find_service( + service_name: str, +) -> tractor.Portal | None: + + async with open_registry() as reg_addr: + log.info(f'Scanning for service `{service_name}`') + # attach to existing daemon by name if possible + async with tractor.find_actor( + service_name, + arbiter_sockaddr=reg_addr, + ) as maybe_portal: + yield maybe_portal + + +async def check_for_service( + service_name: str, + +) -> None | tuple[str, int]: + ''' + Service daemon "liveness" predicate. + + ''' + async with open_registry(ensure_exists=False) as reg_addr: + async with tractor.query_actor( + service_name, + arbiter_sockaddr=reg_addr, + ) as sockaddr: + return sockaddr From eca048c0c582019acc2fb23e7c0b463bc4d26fc9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 16:20:45 -0500 Subject: [PATCH 17/38] Move daemon spawning endpoints to `service._daemon` module --- piker/service/__init__.py | 238 ++------------------------------- piker/service/_daemon.py | 271 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 279 insertions(+), 230 deletions(-) create mode 100644 piker/service/_daemon.py diff --git a/piker/service/__init__.py b/piker/service/__init__.py index e365e7fd..32a05ae5 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -24,7 +24,6 @@ from functools import partial import os from typing import ( Optional, - Callable, Any, ClassVar, ) @@ -39,7 +38,6 @@ from ..log import ( get_logger, get_console_log, ) -from ..brokers import get_brokermod from ._mngr import ( Services, ) @@ -52,6 +50,13 @@ from ._registry import ( # noqa find_service, check_for_service, ) +from ._daemon import ( # noqa + maybe_spawn_daemon, + spawn_brokerd, + maybe_spawn_brokerd, + spawn_emsd, + maybe_open_emsd, +) log = get_logger(__name__) @@ -73,6 +78,7 @@ def get_tractor_runtime_kwargs() -> dict[str, Any]: _root_modules = [ __name__, + 'piker.service._daemon', 'piker.clearing._ems', 'piker.clearing._client', 'piker.data._sampling', @@ -337,231 +343,3 @@ async def maybe_open_pikerd( # we return no portal to self. assert service_manager yield service_manager - - -# `brokerd` enabled modules -# NOTE: keeping this list as small as possible is part of our caps-sec -# model and should be treated with utmost care! -_data_mods = [ - 'piker.brokers.core', - 'piker.brokers.data', - 'piker.data', - 'piker.data.feed', - 'piker.data._sampling' -] - - -@acm -async def maybe_spawn_daemon( - - service_name: str, - service_task_target: Callable, - spawn_args: dict[str, Any], - loglevel: Optional[str] = None, - - singleton: bool = False, - **kwargs, - -) -> tractor.Portal: - ''' - If no ``service_name`` daemon-actor can be found, - spawn one in a local subactor and return a portal to it. - - If this function is called from a non-pikerd actor, the - spawned service will persist as long as pikerd does or - it is requested to be cancelled. - - This can be seen as a service starting api for remote-actor - clients. - - ''' - if loglevel: - get_console_log(loglevel) - - # serialize access to this section to avoid - # 2 or more tasks racing to create a daemon - lock = Services.locks[service_name] - await lock.acquire() - - async with find_service(service_name) as portal: - if portal is not None: - lock.release() - yield portal - return - - log.warning(f"Couldn't find any existing {service_name}") - - # TODO: really shouldn't the actor spawning be part of the service - # starting method `Services.start_service()` ? - - # ask root ``pikerd`` daemon to spawn the daemon we need if - # pikerd is not live we now become the root of the - # process tree - async with maybe_open_pikerd( - - loglevel=loglevel, - **kwargs, - - ) as pikerd_portal: - - # we are the root and thus are `pikerd` - # so spawn the target service directly by calling - # the provided target routine. - # XXX: this assumes that the target is well formed and will - # do the right things to setup both a sub-actor **and** call - # the ``_Services`` api from above to start the top level - # service task for that actor. - started: bool - if pikerd_portal is None: - started = await service_task_target(**spawn_args) - - else: - # tell the remote `pikerd` to start the target, - # the target can't return a non-serializable value - # since it is expected that service startingn is - # non-blocking and the target task will persist running - # on `pikerd` after the client requesting it's start - # disconnects. - started = await pikerd_portal.run( - service_task_target, - **spawn_args, - ) - - if started: - log.info(f'Service {service_name} started!') - - async with tractor.wait_for_actor(service_name) as portal: - lock.release() - yield portal - await portal.cancel_actor() - - -async def spawn_brokerd( - - brokername: str, - loglevel: Optional[str] = None, - **tractor_kwargs, - -) -> bool: - - log.info(f'Spawning {brokername} broker daemon') - - brokermod = get_brokermod(brokername) - dname = f'brokerd.{brokername}' - - extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) - tractor_kwargs.update(extra_tractor_kwargs) - - # ask `pikerd` to spawn a new sub-actor and manage it under its - # actor nursery - modpath = brokermod.__name__ - broker_enable = [modpath] - for submodname in getattr( - brokermod, - '__enable_modules__', - [], - ): - subpath = f'{modpath}.{submodname}' - broker_enable.append(subpath) - - portal = await Services.actor_n.start_actor( - dname, - enable_modules=_data_mods + broker_enable, - loglevel=loglevel, - debug_mode=Services.debug_mode, - **tractor_kwargs - ) - - # non-blocking setup of brokerd service nursery - from ..data import _setup_persistent_brokerd - - await Services.start_service_task( - dname, - portal, - _setup_persistent_brokerd, - brokername=brokername, - ) - return True - - -@acm -async def maybe_spawn_brokerd( - - brokername: str, - loglevel: Optional[str] = None, - **kwargs, - -) -> tractor.Portal: - ''' - Helper to spawn a brokerd service *from* a client - who wishes to use the sub-actor-daemon. - - ''' - async with maybe_spawn_daemon( - - f'brokerd.{brokername}', - service_task_target=spawn_brokerd, - spawn_args={ - 'brokername': brokername, - 'loglevel': loglevel, - }, - loglevel=loglevel, - **kwargs, - - ) as portal: - yield portal - - -async def spawn_emsd( - - loglevel: Optional[str] = None, - **extra_tractor_kwargs - -) -> bool: - """ - Start the clearing engine under ``pikerd``. - - """ - log.info('Spawning emsd') - - portal = await Services.actor_n.start_actor( - 'emsd', - enable_modules=[ - 'piker.clearing._ems', - 'piker.clearing._client', - ], - loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag - **extra_tractor_kwargs - ) - - # non-blocking setup of clearing service - from ..clearing._ems import _setup_persistent_emsd - - await Services.start_service_task( - 'emsd', - portal, - _setup_persistent_emsd, - ) - return True - - -@acm -async def maybe_open_emsd( - - brokername: str, - loglevel: Optional[str] = None, - **kwargs, - -) -> tractor._portal.Portal: # noqa - - async with maybe_spawn_daemon( - - 'emsd', - service_task_target=spawn_emsd, - spawn_args={'loglevel': loglevel}, - loglevel=loglevel, - **kwargs, - - ) as portal: - yield portal diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py new file mode 100644 index 00000000..8cb9054f --- /dev/null +++ b/piker/service/_daemon.py @@ -0,0 +1,271 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Daemon-actor spawning "endpoint-hooks". + +""" +from __future__ import annotations +from typing import ( + Optional, + Callable, + Any, +) +from contextlib import ( + asynccontextmanager as acm, +) + +import tractor + +from ..log import ( + get_logger, + get_console_log, +) +from ..brokers import get_brokermod +from ._mngr import ( + Services, +) +from ._registry import find_service + +log = get_logger(__name__) + +# `brokerd` enabled modules +# NOTE: keeping this list as small as possible is part of our caps-sec +# model and should be treated with utmost care! +_data_mods = [ + 'piker.brokers.core', + 'piker.brokers.data', + 'piker.data', + 'piker.data.feed', + 'piker.data._sampling' +] + + +@acm +async def maybe_spawn_daemon( + + service_name: str, + service_task_target: Callable, + spawn_args: dict[str, Any], + loglevel: Optional[str] = None, + + singleton: bool = False, + **kwargs, + +) -> tractor.Portal: + ''' + If no ``service_name`` daemon-actor can be found, + spawn one in a local subactor and return a portal to it. + + If this function is called from a non-pikerd actor, the + spawned service will persist as long as pikerd does or + it is requested to be cancelled. + + This can be seen as a service starting api for remote-actor + clients. + + ''' + if loglevel: + get_console_log(loglevel) + + # serialize access to this section to avoid + # 2 or more tasks racing to create a daemon + lock = Services.locks[service_name] + await lock.acquire() + + async with find_service(service_name) as portal: + if portal is not None: + lock.release() + yield portal + return + + log.warning(f"Couldn't find any existing {service_name}") + + # TODO: really shouldn't the actor spawning be part of the service + # starting method `Services.start_service()` ? + + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + from . import maybe_open_pikerd + async with maybe_open_pikerd( + + loglevel=loglevel, + **kwargs, + + ) as pikerd_portal: + + # we are the root and thus are `pikerd` + # so spawn the target service directly by calling + # the provided target routine. + # XXX: this assumes that the target is well formed and will + # do the right things to setup both a sub-actor **and** call + # the ``_Services`` api from above to start the top level + # service task for that actor. + started: bool + if pikerd_portal is None: + started = await service_task_target(**spawn_args) + + else: + # tell the remote `pikerd` to start the target, + # the target can't return a non-serializable value + # since it is expected that service startingn is + # non-blocking and the target task will persist running + # on `pikerd` after the client requesting it's start + # disconnects. + started = await pikerd_portal.run( + service_task_target, + **spawn_args, + ) + + if started: + log.info(f'Service {service_name} started!') + + async with tractor.wait_for_actor(service_name) as portal: + lock.release() + yield portal + await portal.cancel_actor() + + +async def spawn_brokerd( + + brokername: str, + loglevel: Optional[str] = None, + **tractor_kwargs, + +) -> bool: + + log.info(f'Spawning {brokername} broker daemon') + + brokermod = get_brokermod(brokername) + dname = f'brokerd.{brokername}' + + extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) + tractor_kwargs.update(extra_tractor_kwargs) + + # ask `pikerd` to spawn a new sub-actor and manage it under its + # actor nursery + modpath = brokermod.__name__ + broker_enable = [modpath] + for submodname in getattr( + brokermod, + '__enable_modules__', + [], + ): + subpath = f'{modpath}.{submodname}' + broker_enable.append(subpath) + + portal = await Services.actor_n.start_actor( + dname, + enable_modules=_data_mods + broker_enable, + loglevel=loglevel, + debug_mode=Services.debug_mode, + **tractor_kwargs + ) + + # non-blocking setup of brokerd service nursery + from ..data import _setup_persistent_brokerd + + await Services.start_service_task( + dname, + portal, + _setup_persistent_brokerd, + brokername=brokername, + ) + return True + + +@acm +async def maybe_spawn_brokerd( + + brokername: str, + loglevel: Optional[str] = None, + **kwargs, + +) -> tractor.Portal: + ''' + Helper to spawn a brokerd service *from* a client + who wishes to use the sub-actor-daemon. + + ''' + async with maybe_spawn_daemon( + + f'brokerd.{brokername}', + service_task_target=spawn_brokerd, + spawn_args={ + 'brokername': brokername, + 'loglevel': loglevel, + }, + loglevel=loglevel, + **kwargs, + + ) as portal: + yield portal + + +async def spawn_emsd( + + loglevel: Optional[str] = None, + **extra_tractor_kwargs + +) -> bool: + """ + Start the clearing engine under ``pikerd``. + + """ + log.info('Spawning emsd') + + portal = await Services.actor_n.start_actor( + 'emsd', + enable_modules=[ + 'piker.clearing._ems', + 'piker.clearing._client', + ], + loglevel=loglevel, + debug_mode=Services.debug_mode, # set by pikerd flag + **extra_tractor_kwargs + ) + + # non-blocking setup of clearing service + from ..clearing._ems import _setup_persistent_emsd + + await Services.start_service_task( + 'emsd', + portal, + _setup_persistent_emsd, + ) + return True + + +@acm +async def maybe_open_emsd( + + brokername: str, + loglevel: Optional[str] = None, + **kwargs, + +) -> tractor._portal.Portal: # noqa + + async with maybe_spawn_daemon( + + 'emsd', + service_task_target=spawn_emsd, + spawn_args={'loglevel': loglevel}, + loglevel=loglevel, + **kwargs, + + ) as portal: + yield portal From f95ea19b21a792bdf36a2c3b10dfa1454f820606 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 16:28:38 -0500 Subject: [PATCH 18/38] Move `pikerd` runtime boostrap to `.service._actor_runtime` --- piker/service/__init__.py | 319 ++----------------------------- piker/service/_actor_runtime.py | 329 ++++++++++++++++++++++++++++++++ 2 files changed, 346 insertions(+), 302 deletions(-) create mode 100644 piker/service/_actor_runtime.py diff --git a/piker/service/__init__.py b/piker/service/__init__.py index 32a05ae5..3b9767cd 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -19,28 +19,8 @@ Actor-runtime service orchestration machinery. """ from __future__ import annotations -from pprint import pformat -from functools import partial -import os -from typing import ( - Optional, - Any, - ClassVar, -) -from contextlib import ( - asynccontextmanager as acm, -) -import tractor -import trio - -from ..log import ( - get_logger, - get_console_log, -) -from ._mngr import ( - Services, -) +from ._mngr import Services from ._registry import ( # noqa _tractor_kwargs, _default_reg_addr, @@ -57,289 +37,24 @@ from ._daemon import ( # noqa spawn_emsd, maybe_open_emsd, ) - -log = get_logger(__name__) +from ._actor_runtime import ( + open_piker_runtime, + maybe_open_pikerd, + open_pikerd, + get_tractor_runtime_kwargs, +) __all__ = [ 'check_for_service', + 'Services', + 'maybe_spawn_daemon', + 'spawn_brokerd', + 'maybe_spawn_brokerd', + 'spawn_emsd', + 'maybe_open_emsd', + 'open_piker_runtime', + 'maybe_open_pikerd', + 'open_pikerd', + 'get_tractor_runtime_kwargs', ] - -_root_dname = 'pikerd' - - -def get_tractor_runtime_kwargs() -> dict[str, Any]: - ''' - Deliver ``tractor`` related runtime variables in a `dict`. - - ''' - return _tractor_kwargs - - -_root_modules = [ - __name__, - 'piker.service._daemon', - 'piker.clearing._ems', - 'piker.clearing._client', - 'piker.data._sampling', -] - - -@acm -async def open_piker_runtime( - name: str, - enable_modules: list[str] = [], - loglevel: Optional[str] = None, - - # XXX NOTE XXX: you should pretty much never want debug mode - # for data daemons when running in production. - debug_mode: bool = False, - - registry_addr: None | tuple[str, int] = None, - - # TODO: once we have `rsyscall` support we will read a config - # and spawn the service tree distributed per that. - start_method: str = 'trio', - - **tractor_kwargs, - -) -> tuple[ - tractor.Actor, - tuple[str, int], -]: - ''' - Start a piker actor who's runtime will automatically sync with - existing piker actors on the local link based on configuration. - - Can be called from a subactor or any program that needs to start - a root actor. - - ''' - try: - # check for existing runtime - actor = tractor.current_actor().uid - - except tractor._exceptions.NoRuntime: - - registry_addr = registry_addr or _default_reg_addr - - async with ( - tractor.open_root_actor( - - # passed through to ``open_root_actor`` - arbiter_addr=registry_addr, - name=name, - loglevel=loglevel, - debug_mode=debug_mode, - start_method=start_method, - - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=enable_modules, - - **tractor_kwargs, - ) as _, - - open_registry(registry_addr, ensure_exists=False) as addr, - ): - yield ( - tractor.current_actor(), - addr, - ) - else: - async with open_registry(registry_addr) as addr: - yield ( - actor, - addr, - ) - - -@acm -async def open_pikerd( - - loglevel: str | None = None, - - # XXX: you should pretty much never want debug mode - # for data daemons when running in production. - debug_mode: bool = False, - registry_addr: None | tuple[str, int] = None, - - # db init flags - tsdb: bool = False, - es: bool = False, - -) -> Services: - ''' - Start a root piker daemon who's lifetime extends indefinitely until - cancelled. - - A root actor nursery is created which can be used to create and keep - alive underling services (see below). - - ''' - async with ( - open_piker_runtime( - - name=_root_dname, - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=_root_modules, - - loglevel=loglevel, - debug_mode=debug_mode, - registry_addr=registry_addr, - - ) as (root_actor, reg_addr), - tractor.open_nursery() as actor_nursery, - trio.open_nursery() as service_nursery, - ): - if root_actor.accept_addr != reg_addr: - raise RuntimeError(f'Daemon failed to bind on {reg_addr}!?') - - # assign globally for future daemon/task creation - Services.actor_n = actor_nursery - Services.service_n = service_nursery - Services.debug_mode = debug_mode - - if tsdb: - from ._ahab import start_ahab - from .marketstore import start_marketstore - - log.info('Spawning `marketstore` supervisor') - ctn_ready, config, (cid, pid) = await service_nursery.start( - partial( - start_ahab, - 'marketstored', - start_marketstore, - loglevel=loglevel, - ) - - ) - log.info( - f'`marketstored` up!\n' - f'pid: {pid}\n' - f'container id: {cid[:12]}\n' - f'config: {pformat(config)}' - ) - - if es: - from piker.data._ahab import start_ahab - from piker.data.elastic import start_elasticsearch - - log.info('Spawning `elasticsearch` supervisor') - ctn_ready, config, (cid, pid) = await service_nursery.start( - partial( - start_ahab, - 'elasticsearch', - start_elasticsearch, - loglevel=loglevel, - ) - ) - - log.info( - f'`elasticsearch` up!\n' - f'pid: {pid}\n' - f'container id: {cid[:12]}\n' - f'config: {pformat(config)}' - ) - - try: - yield Services - - finally: - # TODO: is this more clever/efficient? - # if 'samplerd' in Services.service_tasks: - # await Services.cancel_service('samplerd') - service_nursery.cancel_scope.cancel() - - -@acm -async def maybe_open_runtime( - loglevel: Optional[str] = None, - **kwargs, - -) -> None: - ''' - Start the ``tractor`` runtime (a root actor) if none exists. - - ''' - name = kwargs.pop('name') - - if not tractor.current_actor(err_on_no_runtime=False): - async with open_piker_runtime( - name, - loglevel=loglevel, - **kwargs, - ) as (_, addr): - yield addr, - else: - async with open_registry() as addr: - yield addr - - -@acm -async def maybe_open_pikerd( - loglevel: Optional[str] = None, - registry_addr: None | tuple = None, - tsdb: bool = False, - es: bool = False, - - **kwargs, - -) -> tractor._portal.Portal | ClassVar[Services]: - ''' - If no ``pikerd`` daemon-root-actor can be found start it and - yield up (we should probably figure out returning a portal to self - though). - - ''' - if loglevel: - get_console_log(loglevel) - - # subtle, we must have the runtime up here or portal lookup will fail - query_name = kwargs.pop('name', f'piker_query_{os.getpid()}') - - # TODO: if we need to make the query part faster we could not init - # an actor runtime and instead just hit the socket? - # from tractor._ipc import _connect_chan, Channel - # async with _connect_chan(host, port) as chan: - # async with open_portal(chan) as arb_portal: - # yield arb_portal - - async with ( - open_piker_runtime( - name=query_name, - registry_addr=registry_addr, - loglevel=loglevel, - **kwargs, - ) as _, - tractor.find_actor( - _root_dname, - arbiter_sockaddr=registry_addr, - ) as portal - ): - # connect to any existing daemon presuming - # its registry socket was selected. - if ( - portal is not None - ): - yield portal - return - - # presume pikerd role since no daemon could be found at - # configured address - async with open_pikerd( - loglevel=loglevel, - debug_mode=kwargs.get('debug_mode', False), - registry_addr=registry_addr, - tsdb=tsdb, - es=es, - - ) as service_manager: - # in the case where we're starting up the - # tractor-piker runtime stack in **this** process - # we return no portal to self. - assert service_manager - yield service_manager diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py new file mode 100644 index 00000000..257babdf --- /dev/null +++ b/piker/service/_actor_runtime.py @@ -0,0 +1,329 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +``tractor`` wrapping + default config to bootstrap the `pikerd`. + +""" +from __future__ import annotations +from pprint import pformat +from functools import partial +import os +from typing import ( + Optional, + Any, + ClassVar, +) +from contextlib import ( + asynccontextmanager as acm, +) + +import tractor +import trio + +from ..log import ( + get_logger, + get_console_log, +) +from ._mngr import ( + Services, +) +from ._registry import ( # noqa + _tractor_kwargs, + _default_reg_addr, + open_registry, +) + +log = get_logger(__name__) + +_root_dname = 'pikerd' + + +def get_tractor_runtime_kwargs() -> dict[str, Any]: + ''' + Deliver ``tractor`` related runtime variables in a `dict`. + + ''' + return _tractor_kwargs + + +_root_modules = [ + __name__, + 'piker.service._daemon', + 'piker.clearing._ems', + 'piker.clearing._client', + 'piker.data._sampling', +] + + +@acm +async def open_piker_runtime( + name: str, + enable_modules: list[str] = [], + loglevel: Optional[str] = None, + + # XXX NOTE XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = False, + + registry_addr: None | tuple[str, int] = None, + + # TODO: once we have `rsyscall` support we will read a config + # and spawn the service tree distributed per that. + start_method: str = 'trio', + + **tractor_kwargs, + +) -> tuple[ + tractor.Actor, + tuple[str, int], +]: + ''' + Start a piker actor who's runtime will automatically sync with + existing piker actors on the local link based on configuration. + + Can be called from a subactor or any program that needs to start + a root actor. + + ''' + try: + # check for existing runtime + actor = tractor.current_actor().uid + + except tractor._exceptions.NoRuntime: + + registry_addr = registry_addr or _default_reg_addr + + async with ( + tractor.open_root_actor( + + # passed through to ``open_root_actor`` + arbiter_addr=registry_addr, + name=name, + loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, + + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=enable_modules, + + **tractor_kwargs, + ) as _, + + open_registry(registry_addr, ensure_exists=False) as addr, + ): + yield ( + tractor.current_actor(), + addr, + ) + else: + async with open_registry(registry_addr) as addr: + yield ( + actor, + addr, + ) + + +@acm +async def open_pikerd( + + loglevel: str | None = None, + + # XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = False, + registry_addr: None | tuple[str, int] = None, + + # db init flags + tsdb: bool = False, + es: bool = False, + +) -> Services: + ''' + Start a root piker daemon who's lifetime extends indefinitely until + cancelled. + + A root actor nursery is created which can be used to create and keep + alive underling services (see below). + + ''' + async with ( + open_piker_runtime( + + name=_root_dname, + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=_root_modules, + + loglevel=loglevel, + debug_mode=debug_mode, + registry_addr=registry_addr, + + ) as (root_actor, reg_addr), + tractor.open_nursery() as actor_nursery, + trio.open_nursery() as service_nursery, + ): + if root_actor.accept_addr != reg_addr: + raise RuntimeError(f'Daemon failed to bind on {reg_addr}!?') + + # assign globally for future daemon/task creation + Services.actor_n = actor_nursery + Services.service_n = service_nursery + Services.debug_mode = debug_mode + + if tsdb: + from ._ahab import start_ahab + from .marketstore import start_marketstore + + log.info('Spawning `marketstore` supervisor') + ctn_ready, config, (cid, pid) = await service_nursery.start( + partial( + start_ahab, + 'marketstored', + start_marketstore, + loglevel=loglevel, + ) + + ) + log.info( + f'`marketstored` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + + if es: + from piker.data._ahab import start_ahab + from piker.data.elastic import start_elasticsearch + + log.info('Spawning `elasticsearch` supervisor') + ctn_ready, config, (cid, pid) = await service_nursery.start( + partial( + start_ahab, + 'elasticsearch', + start_elasticsearch, + loglevel=loglevel, + ) + ) + + log.info( + f'`elasticsearch` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + + try: + yield Services + + finally: + # TODO: is this more clever/efficient? + # if 'samplerd' in Services.service_tasks: + # await Services.cancel_service('samplerd') + service_nursery.cancel_scope.cancel() + + +@acm +async def maybe_open_runtime( + loglevel: Optional[str] = None, + **kwargs, + +) -> None: + ''' + Start the ``tractor`` runtime (a root actor) if none exists. + + ''' + name = kwargs.pop('name') + + if not tractor.current_actor(err_on_no_runtime=False): + async with open_piker_runtime( + name, + loglevel=loglevel, + **kwargs, + ) as (_, addr): + yield addr, + else: + async with open_registry() as addr: + yield addr + + +@acm +async def maybe_open_pikerd( + loglevel: Optional[str] = None, + registry_addr: None | tuple = None, + tsdb: bool = False, + es: bool = False, + + **kwargs, + +) -> tractor._portal.Portal | ClassVar[Services]: + ''' + If no ``pikerd`` daemon-root-actor can be found start it and + yield up (we should probably figure out returning a portal to self + though). + + ''' + if loglevel: + get_console_log(loglevel) + + # subtle, we must have the runtime up here or portal lookup will fail + query_name = kwargs.pop('name', f'piker_query_{os.getpid()}') + + # TODO: if we need to make the query part faster we could not init + # an actor runtime and instead just hit the socket? + # from tractor._ipc import _connect_chan, Channel + # async with _connect_chan(host, port) as chan: + # async with open_portal(chan) as arb_portal: + # yield arb_portal + + async with ( + open_piker_runtime( + name=query_name, + registry_addr=registry_addr, + loglevel=loglevel, + **kwargs, + ) as _, + tractor.find_actor( + _root_dname, + arbiter_sockaddr=registry_addr, + ) as portal + ): + # connect to any existing daemon presuming + # its registry socket was selected. + if ( + portal is not None + ): + yield portal + return + + # presume pikerd role since no daemon could be found at + # configured address + async with open_pikerd( + loglevel=loglevel, + debug_mode=kwargs.get('debug_mode', False), + registry_addr=registry_addr, + tsdb=tsdb, + es=es, + + ) as service_manager: + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we return no portal to self. + assert service_manager + yield service_manager From cec29670714b7f0b477244dbf190a1ce9d142167 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 16:47:28 -0500 Subject: [PATCH 19/38] Import `maybe_open_pikerd` at module level --- piker/service/_daemon.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 8cb9054f..45d6cb81 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -38,6 +38,7 @@ from ..brokers import get_brokermod from ._mngr import ( Services, ) +from ._actor_runtime import maybe_open_pikerd from ._registry import find_service log = get_logger(__name__) @@ -100,7 +101,6 @@ async def maybe_spawn_daemon( # ask root ``pikerd`` daemon to spawn the daemon we need if # pikerd is not live we now become the root of the # process tree - from . import maybe_open_pikerd async with maybe_open_pikerd( loglevel=loglevel, From 441243f83bccd2938cb3b705ae0484cf6dd2dbda Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 20:06:27 -0500 Subject: [PATCH 20/38] Attempt to report `piker storage -d ` errors Not really sure there's much we can do besides dump Grpc stuff when we detect an "error" `str` for the moment.. Either way leave a buncha complaints (como siempre) and do linting fixups.. --- piker/data/cli.py | 61 ++++++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/piker/data/cli.py b/piker/data/cli.py index 6f4e169d..6984d9ff 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -18,34 +18,22 @@ marketstore cli. """ -from functools import partial -from pprint import ( - pformat, - pprint, -) - -from anyio_marketstore import open_marketstore_client import trio import tractor import click -import numpy as np from ..service.marketstore import ( - get_client, + # get_client, # stream_quotes, ingest_quote_stream, # _url, - _tick_tbk_ids, - mk_tbk, + # _tick_tbk_ids, + # mk_tbk, ) from ..cli import cli from .. import watchlists as wl -from ..log import get_logger -from ._sharedmem import ( - maybe_open_shm_array, -) -from ._source import ( - base_iohlc_dtype, +from ..log import ( + get_logger, ) @@ -92,16 +80,16 @@ def ms_stream( # async def main(): # nonlocal names # async with get_client(url) as client: -# +# # if not names: # names = await client.list_symbols() -# +# # # default is to wipe db entirely. # answer = input( # "This will entirely wipe you local marketstore db @ " # f"{url} of the following symbols:\n {pformat(names)}" # "\n\nDelete [N/y]?\n") -# +# # if answer == 'y': # for sym in names: # # tbk = _tick_tbk.format(sym) @@ -110,7 +98,7 @@ def ms_stream( # await client.destroy(mk_tbk(tbk)) # else: # print("Nothing deleted.") -# +# # tractor.run(main) @@ -148,7 +136,7 @@ def storesh( ): symbol = symbols[0] - async with open_tsdb_client(symbol) as storage: + async with open_tsdb_client(symbol): # TODO: ask if user wants to write history for detected # available shm buffers? from tractor.trionics import ipython_embed @@ -186,7 +174,7 @@ def storage( Start an IPython shell ready to query the local marketstore db. ''' - from piker.data.marketstore import open_tsdb_client + from piker.service.marketstore import open_tsdb_client from piker.service import open_piker_runtime async def main(): @@ -201,9 +189,28 @@ def storage( if delete: for fqsn in symbols: syms = await storage.client.list_symbols() - breakpoint() - await storage.delete_ts(fqsn, 60) - await storage.delete_ts(fqsn, 1) + + resp60s = await storage.delete_ts(fqsn, 60) + + msgish = resp60s.ListFields()[0][1] + if 'error' in str(msgish): + + # TODO: MEGA LOL, apparently the symbols don't + # flush out until you refresh something or other + # (maybe the WALFILE)... #lelandorlulzone, classic + # alpaca(Rtm) design here .. + # well, if we ever can make this work we + # probably want to dogsplain the real reason + # for the delete errurz..llululu + if fqsn not in syms: + log.error(f'Pair {fqsn} dne in DB') + + log.error(f'Deletion error: {fqsn}\n{msgish}') + + resp1s = await storage.delete_ts(fqsn, 1) + msgish = resp1s.ListFields()[0][1] + if 'error' in str(msgish): + log.error(f'Deletion error: {fqsn}\n{msgish}') trio.run(main) @@ -237,7 +244,7 @@ def ingest(config, name, test_file, tl): async def entry_point(): async with tractor.open_nursery() as n: - for provider, symbols in grouped_syms.items(): + for provider, symbols in grouped_syms.items(): await n.run_in_actor( ingest_quote_stream, name='ingest_marketstore', From 6f92c6b52d1931e0c5c02ae09716c9edc61b2481 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 20:08:27 -0500 Subject: [PATCH 21/38] Don't crash on a `xdotool` timeout.. --- piker/brokers/ib/_util.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index c7a49909..d6491ee7 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -177,8 +177,11 @@ def i3ipc_xdotool_manual_click_hack() -> None: ) # re-activate and focus original window - subprocess.call([ - 'xdotool', - 'windowactivate', '--sync', str(orig_win_id), - 'click', '--window', str(orig_win_id), '1', - ]) + try: + subprocess.call([ + 'xdotool', + 'windowactivate', '--sync', str(orig_win_id), + 'click', '--window', str(orig_win_id), '1', + ]) + except subprocess.TimeoutExpired: + log.exception(f'xdotool timed out?') From cda7a54718329771524662a9b64d3a3bdd6829f4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 20:20:11 -0500 Subject: [PATCH 22/38] Fix final missed `marketstore` mod import Thanks @esme! XD Also, do a linter pass and remove a buncha unused references. --- tests/test_databases.py | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/tests/test_databases.py b/tests/test_databases.py index 7fcee34a..a469abd5 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -1,25 +1,19 @@ -import pytest import trio from typing import AsyncContextManager -from piker.service import Services -from piker.log import get_logger - from elasticsearch import Elasticsearch -from piker.data import marketstore +from piker.service import marketstore + def test_marketstore_startup_and_version( open_test_pikerd: AsyncContextManager, loglevel, ): - ''' Verify marketstore starts correctly ''' - log = get_logger(__name__) - async def main(): # port = 5995 @@ -36,7 +30,6 @@ def test_marketstore_startup_and_version( len('3862e9973da36cfc6004b88172c08f09269aaf01') ) - trio.run(main) @@ -49,8 +42,6 @@ def test_elasticsearch_startup_and_version( ''' - log = get_logger(__name__) - async def main(): port = 19200 @@ -62,5 +53,4 @@ def test_elasticsearch_startup_and_version( es = Elasticsearch(hosts=[f'http://localhost:{port}']) assert es.info()['version']['number'] == '7.17.4' - trio.run(main) From fbc12b1b077d0e83685fd5249a929687d871ed94 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 21:20:41 -0500 Subject: [PATCH 23/38] Add 10min timeout on CI job.. --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f799bc22..65b020f1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,7 @@ jobs: testing: name: 'install + test-suite' + timeout-minutes: 10 runs-on: ubuntu-latest steps: From 6540c415c1b84619a43904ce1e93c2983f1bb6eb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Mar 2023 21:40:28 -0500 Subject: [PATCH 24/38] Lul, fix imports in elasticsearch block.. --- piker/service/_actor_runtime.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 257babdf..e07f342e 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -209,8 +209,8 @@ async def open_pikerd( ) if es: - from piker.data._ahab import start_ahab - from piker.data.elastic import start_elasticsearch + from ._ahab import start_ahab + from .elastic import start_elasticsearch log.info('Spawning `elasticsearch` supervisor') ctn_ready, config, (cid, pid) = await service_nursery.start( From 31392af427204949437846b8289230cce5183afa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 12:22:33 -0500 Subject: [PATCH 25/38] Move enabled module defs to just above where used --- piker/service/_actor_runtime.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index e07f342e..134b085c 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -49,8 +49,6 @@ from ._registry import ( # noqa log = get_logger(__name__) -_root_dname = 'pikerd' - def get_tractor_runtime_kwargs() -> dict[str, Any]: ''' @@ -60,15 +58,6 @@ def get_tractor_runtime_kwargs() -> dict[str, Any]: return _tractor_kwargs -_root_modules = [ - __name__, - 'piker.service._daemon', - 'piker.clearing._ems', - 'piker.clearing._client', - 'piker.data._sampling', -] - - @acm async def open_piker_runtime( name: str, @@ -139,6 +128,16 @@ async def open_piker_runtime( ) +_root_dname = 'pikerd' +_root_modules = [ + __name__, + 'piker.service._daemon', + 'piker.clearing._ems', + 'piker.clearing._client', + 'piker.data._sampling', +] + + @acm async def open_pikerd( @@ -155,8 +154,7 @@ async def open_pikerd( ) -> Services: ''' - Start a root piker daemon who's lifetime extends indefinitely until - cancelled. + Start a root piker daemon with an indefinite lifetime. A root actor nursery is created which can be used to create and keep alive underling services (see below). From 75b7a8b56ef15a66a6a9ca499c4e27fbbe09f4a1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 12:23:02 -0500 Subject: [PATCH 26/38] `marketstore`: Pull default socket from server config --- piker/service/marketstore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index e9de9558..5c4f90db 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -328,7 +328,7 @@ def quote_to_marketstore_structarray( @acm async def get_client( host: str = 'localhost', - port: int = 5995 + port: int = _config['grpc_listen_port'], ) -> MarketstoreClient: ''' From 2014019b0601398ba3410250f921d611d81c2c05 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 12:23:46 -0500 Subject: [PATCH 27/38] Add reconnect loop to `marketstore` startup test Due to making ahabd supervisor init more async we need to be more tolerant to mkts server startup: the grpc machinery needs to be up otherwise a client which connects to early may just hang on requests.. Add a reconnect loop (which might end up getting factored into client code too) so that we only block on requests once we know the client connection is actually responsive. --- tests/test_databases.py | 49 ++++++++++++++++++++++++++++++++--------- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/tests/test_databases.py b/tests/test_databases.py index a469abd5..74f0f240 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -8,40 +8,67 @@ from piker.service import marketstore def test_marketstore_startup_and_version( open_test_pikerd: AsyncContextManager, - loglevel, + loglevel: str, ): ''' - Verify marketstore starts correctly + Verify marketstore tsdb starts up and we can + connect with a client to do basic API reqs. ''' async def main(): - # port = 5995 async with ( open_test_pikerd( loglevel=loglevel, tsdb=True - ) as (s, i, pikerd_portal, services), - marketstore.get_client() as client + ) as ( + _, # host + _, # port + pikerd_portal, + services, + ), ): + # XXX NOTE: we use a retry-connect loop because it seems + # that if we connect *too fast* to a booting container + # instance (i.e. if mkts's IPC machinery isn't up early + # enough) the client will hang on req-resp submissions. So, + # instead we actually reconnect the client entirely in + # a loop until we get a response. + for _ in range(3): - assert ( - len(await client.server_version()) == - len('3862e9973da36cfc6004b88172c08f09269aaf01') - ) + # NOTE: default sockaddr is embedded within + async with marketstore.get_client() as client: + + with trio.move_on_after(1) as cs: + syms = await client.list_symbols() + + if cs.cancelled_caught: + continue + + + # should be an empty db? + assert not syms + print(f'RX syms resp: {syms}') + + assert ( + len(await client.server_version()) == + len('3862e9973da36cfc6004b88172c08f09269aaf01') + ) + print('VERSION CHECKED') + + break # get out of retry-connect loop trio.run(main) def test_elasticsearch_startup_and_version( open_test_pikerd: AsyncContextManager, - loglevel, + loglevel: str, ): ''' Verify elasticsearch starts correctly ''' - async def main(): port = 19200 From aa36abf36e5e13228a90304a03a30379e3e74651 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 14:09:12 -0500 Subject: [PATCH 28/38] Support passing `tractor` "actor runtime vars" down the runtime --- piker/service/_actor_runtime.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 134b085c..3e35864d 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -74,6 +74,7 @@ async def open_piker_runtime( # and spawn the service tree distributed per that. start_method: str = 'trio', + tractor_runtime_overrides: dict | None = None, **tractor_kwargs, ) -> tuple[ @@ -93,6 +94,8 @@ async def open_piker_runtime( actor = tractor.current_actor().uid except tractor._exceptions.NoRuntime: + tractor._state._runtime_vars[ + 'piker_vars'] = tractor_runtime_overrides registry_addr = registry_addr or _default_reg_addr @@ -152,6 +155,8 @@ async def open_pikerd( tsdb: bool = False, es: bool = False, + **kwargs, + ) -> Services: ''' Start a root piker daemon with an indefinite lifetime. @@ -173,6 +178,8 @@ async def open_pikerd( debug_mode=debug_mode, registry_addr=registry_addr, + **kwargs, + ) as (root_actor, reg_addr), tractor.open_nursery() as actor_nursery, trio.open_nursery() as service_nursery, @@ -297,6 +304,7 @@ async def maybe_open_pikerd( loglevel=loglevel, **kwargs, ) as _, + tractor.find_actor( _root_dname, arbiter_sockaddr=registry_addr, @@ -319,6 +327,8 @@ async def maybe_open_pikerd( tsdb=tsdb, es=es, + **kwargs, + ) as service_manager: # in the case where we're starting up the # tractor-piker runtime stack in **this** process From 5aaa7f47dc894282c99008dd9e94588483cacdfc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 14:09:35 -0500 Subject: [PATCH 29/38] Pull testing config dir from `tractor` runtime vars Provides a more correct solution (particularly for distributed testing) to override the `piker` configuration directory by reading the path from a specific `tractor._state._runtime_vars` entry that can be provided by the test harness. Also fix some typing and comments. --- piker/config.py | 44 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/piker/config.py b/piker/config.py index 3ae6a665..397342e3 100644 --- a/piker/config.py +++ b/piker/config.py @@ -15,7 +15,7 @@ # along with this program. If not, see . """ -Broker configuration mgmt. +Platform configuration (files) mgmt. """ import platform @@ -26,17 +26,25 @@ from os.path import dirname import shutil from typing import Optional from pathlib import Path + from bidict import bidict import toml -from piker.testing import TEST_CONFIG_DIR_PATH + from .log import get_logger log = get_logger('broker-config') -# taken from ``click`` since apparently they have some +# XXX NOTE: taken from ``click`` since apparently they have some # super weirdness with sigint and sudo..no clue -def get_app_dir(app_name, roaming=True, force_posix=False): +# we're probably going to slowly just modify it to our own version over +# time.. +def get_app_dir( + app_name: str, + roaming: bool = True, + force_posix: bool = False, + +) -> str: r"""Returns the config folder for the application. The default behavior is to return whatever is most appropriate for the operating system. @@ -75,14 +83,30 @@ def get_app_dir(app_name, roaming=True, force_posix=False): def _posixify(name): return "-".join(name.split()).lower() - # TODO: This is a hacky way to a) determine we're testing - # and b) creating a test dir. We should aim to set a variable - # within the tractor runtimes and store testing config data - # outside of the users filesystem + # NOTE: for testing with `pytest` we leverage the `tmp_dir` + # fixture to generate (and clean up) a test-request-specific + # directory for isolated configuration files such that, + # - multiple tests can run (possibly in parallel) without data races + # on the config state, + # - we don't need to ever worry about leaking configs into the + # system thus avoiding needing to manage config cleaup fixtures or + # other bothers (since obviously `tmp_dir` cleans up after itself). + # + # In order to "pass down" the test dir path to all (sub-)actors in + # the actor tree we preload the root actor's runtime vars state (an + # internal mechanism for inheriting state down an actor tree in + # `tractor`) with the testing dir and check for it whenever we + # detect `pytest` is being used (which it isn't under normal + # operation). if "pytest" in sys.modules: - app_name = os.path.join(app_name, TEST_CONFIG_DIR_PATH) + import tractor + actor = tractor.current_actor(err_on_no_runtime=False) + if actor: # runtime is up + rvs = tractor._state._runtime_vars + testdirpath = Path(rvs['piker_vars']['piker_test_dir']) + assert testdirpath.exists(), 'piker test harness might be borked!?' + app_name = str(testdirpath) - # if WIN: if platform.system() == 'Windows': key = "APPDATA" if roaming else "LOCALAPPDATA" folder = os.environ.get(key) From 79b0db44496dfc441a1c59b491b66ed67ec2ed9a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 14:33:12 -0500 Subject: [PATCH 30/38] Pass a config `tmp_dir: Path` to the runtime when testing --- piker/testing/__init__.py | 1 - tests/conftest.py | 47 +++++++++++++++++++++++---------------- tests/test_paper.py | 20 ++++++++--------- 3 files changed, 37 insertions(+), 31 deletions(-) delete mode 100644 piker/testing/__init__.py diff --git a/piker/testing/__init__.py b/piker/testing/__init__.py deleted file mode 100644 index 5e3ac93a..00000000 --- a/piker/testing/__init__.py +++ /dev/null @@ -1 +0,0 @@ -TEST_CONFIG_DIR_PATH = '_testing' diff --git a/tests/conftest.py b/tests/conftest.py index 68d392aa..a7244d82 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,12 +2,10 @@ from contextlib import asynccontextmanager as acm from functools import partial import os from pathlib import Path -from shutil import rmtree import pytest import tractor from piker import ( - # log, config, ) from piker.service import ( @@ -71,6 +69,7 @@ def ci_env() -> bool: @acm async def _open_test_pikerd( + tmpconfdir: str, reg_addr: tuple[str, int] | None = None, loglevel: str = 'warning', **kwargs, @@ -97,6 +96,10 @@ async def _open_test_pikerd( maybe_open_pikerd( registry_addr=reg_addr, loglevel=loglevel, + + tractor_runtime_overrides={ + 'piker_test_dir': tmpconfdir, + }, **kwargs, ) as service_manager, ): @@ -119,18 +122,40 @@ async def _open_test_pikerd( @pytest.fixture def open_test_pikerd( - request, + request: pytest.FixtureRequest, + tmp_path: Path, loglevel: str, ): + tmpconfdir: Path = tmp_path / '_testing' + tmpconfdir.mkdir() + tmpconfdir_str: str = str(tmpconfdir) + + # NOTE: on linux the tmp config dir is generally located at: + # /tmp/pytest-of-/pytest-/test_/ + # the default `pytest` config ensures that only the last 4 test + # suite run's dirs will be persisted, otherwise they are removed: + # https://docs.pytest.org/en/6.2.x/tmpdir.html#the-default-base-temporary-directory + print(f'CURRENT TEST CONF DIR: {tmpconfdir}') yield partial( _open_test_pikerd, + # pass in a unique temp dir for this test request + # so that we can have multiple tests running (maybe in parallel) + # bwitout clobbering each other's config state. + tmpconfdir=tmpconfdir_str, + # bind in level from fixture, which is itself set by # `--ll ` cli flag. loglevel=loglevel, ) + # NOTE: the `tmp_dir` fixture will wipe any files older then 3 test + # sessions by default: + # https://docs.pytest.org/en/6.2.x/tmpdir.html#the-default-base-temporary-directory + # BUT, if we wanted to always wipe conf dir and all contained files, + # rmtree(str(tmp_path)) + # TODO: teardown checks such as, # - no leaked subprocs or shm buffers # - all requested container service are torn down @@ -169,19 +194,3 @@ def open_test_pikerd_and_ems( loglevel, open_test_pikerd ) - - -@pytest.fixture(scope='module') -def delete_testing_dir(): - ''' - This fixture removes the temp directory - used for storing all config/ledger/pp data - created during testing sessions. During test runs - this file can be found in .config/piker/_testing - - ''' - yield - app_dir = Path(config.get_app_dir('piker')).resolve() - if app_dir.is_dir(): - rmtree(str(app_dir)) - assert not app_dir.is_dir() diff --git a/tests/test_paper.py b/tests/test_paper.py index 8da1cf12..53e03f47 100644 --- a/tests/test_paper.py +++ b/tests/test_paper.py @@ -17,7 +17,6 @@ from functools import partial from piker.log import get_logger from piker.clearing._messages import Order from piker.pp import ( - open_trade_ledger, open_pps, ) @@ -42,18 +41,19 @@ async def _async_main( price: int = 30000, executions: int = 1, size: float = 0.01, + # Assert options assert_entries: bool = False, assert_pps: bool = False, assert_zeroed_pps: bool = False, assert_msg: bool = False, + ) -> None: ''' Start piker, place a trade and assert data in pps stream, ledger and position table. ''' - oid: str = '' last_msg = {} @@ -136,7 +136,7 @@ def _assert( def _run_test_and_check(fn): - ''' + ''' Close position and assert empty position in pps ''' @@ -150,8 +150,7 @@ def _run_test_and_check(fn): def test_buy( - open_test_pikerd_and_ems: AsyncContextManager, - delete_testing_dir + open_test_pikerd_and_ems: AsyncContextManager, ): ''' Enter a trade and assert entries are made in pps and ledger files. @@ -177,8 +176,7 @@ def test_buy( def test_sell( - open_test_pikerd_and_ems: AsyncContextManager, - delete_testing_dir + open_test_pikerd_and_ems: AsyncContextManager, ): ''' Sell position and ensure pps are zeroed. @@ -201,13 +199,13 @@ def test_sell( ), ) + def test_multi_sell( - open_test_pikerd_and_ems: AsyncContextManager, - delete_testing_dir + open_test_pikerd_and_ems: AsyncContextManager, ): ''' - Make 5 market limit buy orders and - then sell 5 slots at the same price. + Make 5 market limit buy orders and + then sell 5 slots at the same price. Finally, assert cleared positions. ''' From 7cc99115655de4bb6ae711ae55e455d9660b9917 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 14:54:46 -0500 Subject: [PATCH 31/38] Add connection poll loop to es test as well --- tests/test_databases.py | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/tests/test_databases.py b/tests/test_databases.py index 74f0f240..a29d14f3 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -2,7 +2,10 @@ import trio from typing import AsyncContextManager -from elasticsearch import Elasticsearch +from elasticsearch import ( + Elasticsearch, + ConnectionError, +) from piker.service import marketstore @@ -45,8 +48,8 @@ def test_marketstore_startup_and_version( if cs.cancelled_caught: continue - - # should be an empty db? + # should be an empty db (for now) since we spawn + # marketstore in a ephemeral test-harness dir. assert not syms print(f'RX syms resp: {syms}') @@ -72,12 +75,25 @@ def test_elasticsearch_startup_and_version( async def main(): port = 19200 - async with open_test_pikerd( - loglevel=loglevel, - es=True - ) as (s, i, pikerd_portal, services): + async with ( + open_test_pikerd( + loglevel=loglevel, + es=True + ) as ( + _, # host + _, # port + pikerd_portal, + services, + ), + ): - es = Elasticsearch(hosts=[f'http://localhost:{port}']) - assert es.info()['version']['number'] == '7.17.4' + for _ in range(240): + try: + es = Elasticsearch(hosts=[f'http://localhost:{port}']) + except ConnectionError: + await trio.sleep(1) + continue + + assert es.info()['version']['number'] == '7.17.4' trio.run(main) From 9a00c459233b2366413b16301d08ef1ae7383732 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 17:57:42 -0500 Subject: [PATCH 32/38] Add `log` fixture for easy test plugin --- tests/conftest.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index a7244d82..327b71a9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,6 @@ from contextlib import asynccontextmanager as acm from functools import partial +import logging import os from pathlib import Path @@ -11,6 +12,7 @@ from piker import ( from piker.service import ( Services, ) +from piker.log import get_console_log from piker.clearing._client import open_ems @@ -67,6 +69,21 @@ def ci_env() -> bool: return _ci_env +@pytest.fixture() +def log( + request: pytest.FixtureRequest, + loglevel: str, +) -> logging.Logger: + ''' + Deliver a per-test-named ``piker.log`` instance. + + ''' + return get_console_log( + level=loglevel, + name=request.node.name, + ) + + @acm async def _open_test_pikerd( tmpconfdir: str, From 15064d94cb82eef4488e63db736afde3c674070d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 17:58:13 -0500 Subject: [PATCH 33/38] `ahabd`: Harden cancellation teardown (again XD) Needed to move the startup sequence inside the `try:` block to guarantee we always do the (now shielded) `.cancel()` call if we get a cancel during startup. Also, support an optional `started_afunc` field in the config if backends want to just provide a one-off blocking async func to sync container startup. Add a `drop_root_perms: bool` to allow persisting sudo perms for testing or dyanmic container spawning purposes. --- piker/service/_ahab.py | 107 +++++++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 46 deletions(-) diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 38d4a9e7..7c3133e1 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -15,7 +15,8 @@ # along with this program. If not, see . ''' -Supervisor for docker with included specific-image service helpers. +Supervisor for ``docker`` with included async and SC wrapping +to ensure a cancellable container lifetime system. ''' from collections import ChainMap @@ -349,8 +350,8 @@ async def open_ahabd( ( dcntr, cntr_config, - start_lambda, - stop_lambda, + start_pred, + stop_pred, ) = ep_func(client) cntr = Container(dcntr) @@ -375,48 +376,58 @@ async def open_ahabd( # when read using: # ``json.loads(entry for entry in DockerContainer.logs())`` 'log_msg_key': 'msg', + + + # startup sync func, like `Nursery.started()` + 'started_afunc': None, }, ) - with trio.move_on_after(conf['startup_timeout']) as cs: - async with trio.open_nursery() as tn: - tn.start_soon( - partial( - cntr.process_logs_until, - log_msg_key=conf['log_msg_key'], - patt_matcher=start_lambda, - checkpoint_period=conf['startup_query_period'], - ) - ) - - # poll for container startup or timeout - while not cs.cancel_called: - if dcntr in client.containers.list(): - break - - await trio.sleep(conf['startup_query_period']) - - # sync with remote caller actor-task but allow log - # processing to continue running in bg. - await ctx.started(( - cntr.cntr.id, - os.getpid(), - cntr_config, - )) - try: - # XXX: if we timeout on finding the "startup msg" we expect then - # we want to FOR SURE raise an error upwards! - if cs.cancelled_caught: - # if dcntr not in client.containers.list(): - for entry in cntr.seen_so_far: - log.info(entry) + with trio.move_on_after(conf['startup_timeout']) as cs: + async with trio.open_nursery() as tn: + tn.start_soon( + partial( + cntr.process_logs_until, + log_msg_key=conf['log_msg_key'], + patt_matcher=start_pred, + checkpoint_period=conf['startup_query_period'], + ) + ) - raise DockerNotStarted( - f'Failed to start container: {cntr.cuid}\n' - f'due to startup_timeout={conf["startup_timeout"]}s\n\n' - "prolly you should check your container's logs for deats.." - ) + # optional blocking routine + started = conf['started_afunc'] + if started: + await started() + + # poll for container startup or timeout + while not cs.cancel_called: + if dcntr in client.containers.list(): + break + + await trio.sleep(conf['startup_query_period']) + + # sync with remote caller actor-task but allow log + # processing to continue running in bg. + await ctx.started(( + cntr.cntr.id, + os.getpid(), + cntr_config, + )) + + # XXX: if we timeout on finding the "startup msg" we + # expect then we want to FOR SURE raise an error + # upwards! + if cs.cancelled_caught: + # if dcntr not in client.containers.list(): + for entry in cntr.seen_so_far: + log.info(entry) + + raise DockerNotStarted( + f'Failed to start container: {cntr.cuid}\n' + f'due to timeout={conf["startup_timeout"]}s\n\n' + "check ur container's logs!" + ) # TODO: we might eventually want a proxy-style msg-prot here # to allow remote control of containers without needing @@ -430,17 +441,18 @@ async def open_ahabd( # on ctl-c from user.. ideally we can avoid a cancel getting # consumed and not propagating whilst still doing teardown # logging.. - # with trio.CancelScope(shield=True): - await cntr.cancel( - log_msg_key=conf['log_msg_key'], - stop_predicate=stop_lambda, - ) + with trio.CancelScope(shield=True): + await cntr.cancel( + log_msg_key=conf['log_msg_key'], + stop_predicate=stop_pred, + ) async def start_ahab( service_name: str, endpoint: Callable[docker.DockerClient, DockerContainer], loglevel: str | None = 'cancel', + drop_root_perms: bool = True, task_status: TaskStatus[ tuple[ @@ -477,7 +489,10 @@ async def start_ahab( # de-escalate root perms to the original user # after the docker supervisor actor is spawned. - if config._parent_user: + if ( + drop_root_perms + and config._parent_user + ): import pwd os.setuid( pwd.getpwnam( From 0772b4a0faac7aef77eebd4259e76b8f43136357 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 18:33:13 -0500 Subject: [PATCH 34/38] Hard code version from our container, predicate renaming --- piker/service/elastic.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/piker/service/elastic.py b/piker/service/elastic.py index fadcaa5e..31221d57 100644 --- a/piker/service/elastic.py +++ b/piker/service/elastic.py @@ -40,6 +40,9 @@ log = get_logger(__name__) _config = { 'port': 19200, 'log_level': 'debug', + + # hardcoded to our image version + 'version': '7.17.4', } @@ -77,21 +80,31 @@ def start_elasticsearch( remove=True ) - async def start_matcher(msg: str): + async def health_query(msg: str | None = None): + if ( + msg + and _config['version'] in msg + ): + return True + try: health = (await asks.get( 'http://localhost:19200/_cat/health', params={'format': 'json'} )).json() + kog.info( + 'ElasticSearch cntr health:\n' + f'{health}' + ) except OSError: - log.error('couldnt reach elastic container') + log.exception('couldnt reach elastic container') return False log.info(health) return health[0]['status'] == 'green' - async def stop_matcher(msg: str): + async def chk_for_closed_msg(msg: str): return msg == 'closed' return ( @@ -106,8 +119,10 @@ def start_elasticsearch( 'startup_query_period': 0.1, 'log_msg_key': 'message', + + # 'started_afunc': health_query, }, # expected startup and stop msgs - start_matcher, - stop_matcher, + health_query, + chk_for_closed_msg, ) From 44a31155393ed8bd9be4e89b4555aeb22c161526 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 18:34:21 -0500 Subject: [PATCH 35/38] Expose `drop_root_perms_for_ahab` from `pikerd` factories to `ahabd` --- piker/service/_actor_runtime.py | 52 ++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 3e35864d..42829990 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -154,6 +154,7 @@ async def open_pikerd( # db init flags tsdb: bool = False, es: bool = False, + drop_root_perms_for_ahab: bool = True, **kwargs, @@ -203,6 +204,7 @@ async def open_pikerd( 'marketstored', start_marketstore, loglevel=loglevel, + drop_root_perms=drop_root_perms_for_ahab, ) ) @@ -224,6 +226,7 @@ async def open_pikerd( 'elasticsearch', start_elasticsearch, loglevel=loglevel, + drop_root_perms=drop_root_perms_for_ahab, ) ) @@ -244,28 +247,29 @@ async def open_pikerd( service_nursery.cancel_scope.cancel() -@acm -async def maybe_open_runtime( - loglevel: Optional[str] = None, - **kwargs, +# TODO: do we even need this? +# @acm +# async def maybe_open_runtime( +# loglevel: Optional[str] = None, +# **kwargs, -) -> None: - ''' - Start the ``tractor`` runtime (a root actor) if none exists. +# ) -> None: +# ''' +# Start the ``tractor`` runtime (a root actor) if none exists. - ''' - name = kwargs.pop('name') +# ''' +# name = kwargs.pop('name') - if not tractor.current_actor(err_on_no_runtime=False): - async with open_piker_runtime( - name, - loglevel=loglevel, - **kwargs, - ) as (_, addr): - yield addr, - else: - async with open_registry() as addr: - yield addr +# if not tractor.current_actor(err_on_no_runtime=False): +# async with open_piker_runtime( +# name, +# loglevel=loglevel, +# **kwargs, +# ) as (_, addr): +# yield addr, +# else: +# async with open_registry() as addr: +# yield addr @acm @@ -274,6 +278,7 @@ async def maybe_open_pikerd( registry_addr: None | tuple = None, tsdb: bool = False, es: bool = False, + drop_root_perms_for_ahab: bool = True, **kwargs, @@ -288,7 +293,10 @@ async def maybe_open_pikerd( get_console_log(loglevel) # subtle, we must have the runtime up here or portal lookup will fail - query_name = kwargs.pop('name', f'piker_query_{os.getpid()}') + query_name = kwargs.pop( + 'name', + f'piker_query_{os.getpid()}', + ) # TODO: if we need to make the query part faster we could not init # an actor runtime and instead just hit the socket? @@ -324,9 +332,13 @@ async def maybe_open_pikerd( loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), registry_addr=registry_addr, + + # ahabd (docker super) specific controls tsdb=tsdb, es=es, + drop_root_perms_for_ahab=drop_root_perms_for_ahab, + # passthrough to ``tractor`` init **kwargs, ) as service_manager: From 97290fcb05e84f5c8bbcdf2e43692ed6c10f518f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 18:34:47 -0500 Subject: [PATCH 36/38] Never drop root perms in test harness --- tests/conftest.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 327b71a9..3a0afba2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -117,7 +117,13 @@ async def _open_test_pikerd( tractor_runtime_overrides={ 'piker_test_dir': tmpconfdir, }, + + # tests may need to spawn containers dynamically + # or just in sequence per test, so we keep root. + drop_root_perms_for_ahab=False, + **kwargs, + ) as service_manager, ): # this proc/actor is the pikerd From 8ceaa278725bd2bee258d3dff3784f121d96a265 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Mar 2023 18:36:45 -0500 Subject: [PATCH 37/38] Add ES client polling to ensure eventual connectivity.. --- tests/test_databases.py | 45 ++++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/tests/test_databases.py b/tests/test_databases.py index a29d14f3..554b0990 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -1,12 +1,14 @@ -import trio - from typing import AsyncContextManager +import logging +import trio from elasticsearch import ( Elasticsearch, ConnectionError, ) + from piker.service import marketstore +from piker.service import elastic def test_marketstore_startup_and_version( @@ -31,6 +33,9 @@ def test_marketstore_startup_and_version( services, ), ): + # TODO: we should probably make this connection poll + # loop part of the `get_client()` implementation no? + # XXX NOTE: we use a retry-connect loop because it seems # that if we connect *too fast* to a booting container # instance (i.e. if mkts's IPC machinery isn't up early @@ -67,9 +72,11 @@ def test_marketstore_startup_and_version( def test_elasticsearch_startup_and_version( open_test_pikerd: AsyncContextManager, loglevel: str, + log: logging.Logger, ): ''' - Verify elasticsearch starts correctly + Verify elasticsearch starts correctly (like at some point before + infinity time).. ''' async def main(): @@ -86,14 +93,32 @@ def test_elasticsearch_startup_and_version( services, ), ): + # TODO: much like the above connect loop for mkts, we should + # probably make this sync start part of the + # ``open_client()`` implementation? + for i in range(240): + with Elasticsearch( + hosts=[f'http://localhost:{port}'] + ) as es: + try: - for _ in range(240): - try: - es = Elasticsearch(hosts=[f'http://localhost:{port}']) - except ConnectionError: - await trio.sleep(1) - continue + resp = es.info() + assert ( + resp['version']['number'] + == + elastic._config['version'] + ) + print( + "OMG ELASTIX FINALLY CUKCING CONNECTED!>!>!\n" + f'resp: {resp}' + ) + break - assert es.info()['version']['number'] == '7.17.4' + except ConnectionError: + log.exception( + f'RETRYING client connection for {i} time!' + ) + await trio.sleep(1) + continue trio.run(main) From 12883c3c9065e8d7c0a32136e74f7d1ea94e81b3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Mar 2023 09:56:55 -0500 Subject: [PATCH 38/38] Don't double send `enable_modules` and `debug_mode` in kwargs.. This broke non-disti-mode actor tree spawn / runtime, seemingly because the cli entrypoint for a `piker chart` also sends these values down through the call stack independently? Pretty sure we don't need to send the `enable_modules` from the chart actor anyway. --- piker/service/_actor_runtime.py | 2 -- piker/ui/cli.py | 3 --- 2 files changed, 5 deletions(-) diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 42829990..b92ad221 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -174,7 +174,6 @@ async def open_pikerd( # having the root have more then permissions to # spawn other specialized daemons I think? enable_modules=_root_modules, - loglevel=loglevel, debug_mode=debug_mode, registry_addr=registry_addr, @@ -330,7 +329,6 @@ async def maybe_open_pikerd( # configured address async with open_pikerd( loglevel=loglevel, - debug_mode=kwargs.get('debug_mode', False), registry_addr=registry_addr, # ahabd (docker super) specific controls diff --git a/piker/ui/cli.py b/piker/ui/cli.py index 9b8385f2..15b3e9f6 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -181,9 +181,6 @@ def chart( 'debug_mode': pdb, 'loglevel': tractorloglevel, 'name': 'chart', - 'enable_modules': [ - 'piker.clearing._client' - ], 'registry_addr': config.get('registry_addr'), }, )