diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f799bc22..65b020f1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,6 +36,7 @@ jobs: testing: name: 'install + test-suite' + timeout-minutes: 10 runs-on: ubuntu-latest steps: diff --git a/piker/__init__.py b/piker/__init__.py index d08c2dbc..6ebeec3d 100644 --- a/piker/__init__.py +++ b/piker/__init__.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers. -# Copyright 2020-eternity Tyler Goodlet (in stewardship for piker0) +# Copyright 2020-eternity Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -14,11 +14,11 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' piker: trading gear for hackers. -""" -from ._daemon import open_piker_runtime +''' +from .service import open_piker_runtime from .data.feed import open_feed __all__ = [ diff --git a/piker/_daemon.py b/piker/_daemon.py deleted file mode 100644 index 8983eccc..00000000 --- a/piker/_daemon.py +++ /dev/null @@ -1,758 +0,0 @@ -# piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -""" -Structured, daemon tree service management. - -""" -from __future__ import annotations -import os -from typing import ( - Optional, - Callable, - Any, - ClassVar, -) -from contextlib import ( - asynccontextmanager as acm, -) -from collections import defaultdict - -import tractor -import trio -from trio_typing import TaskStatus - -from .log import ( - get_logger, - get_console_log, -) -from .brokers import get_brokermod - -from pprint import pformat -from functools import partial - - -log = get_logger(__name__) - -_root_dname = 'pikerd' - -_default_registry_host: str = '127.0.0.1' -_default_registry_port: int = 6116 -_default_reg_addr: tuple[str, int] = ( - _default_registry_host, - _default_registry_port, -) - - -# NOTE: this value is set as an actor-global once the first endpoint -# who is capable, spawns a `pikerd` service tree. -_registry: Registry | None = None - - -class Registry: - addr: None | tuple[str, int] = None - - # TODO: table of uids to sockaddrs - peers: dict[ - tuple[str, str], - tuple[str, int], - ] = {} - - -_tractor_kwargs: dict[str, Any] = {} - - -@acm -async def open_registry( - addr: None | tuple[str, int] = None, - ensure_exists: bool = True, - -) -> tuple[str, int]: - - global _tractor_kwargs - actor = tractor.current_actor() - uid = actor.uid - if ( - Registry.addr is not None - and addr - ): - raise RuntimeError( - f'`{uid}` registry addr already bound @ {_registry.sockaddr}' - ) - - was_set: bool = False - - if ( - not tractor.is_root_process() - and Registry.addr is None - ): - Registry.addr = actor._arb_addr - - if ( - ensure_exists - and Registry.addr is None - ): - raise RuntimeError( - f"`{uid}` registry should already exist bug doesn't?" - ) - - if ( - Registry.addr is None - ): - was_set = True - Registry.addr = addr or _default_reg_addr - - _tractor_kwargs['arbiter_addr'] = Registry.addr - - try: - yield Registry.addr - finally: - # XXX: always clear the global addr if we set it so that the - # next (set of) calls will apply whatever new one is passed - # in. - if was_set: - Registry.addr = None - - -def get_tractor_runtime_kwargs() -> dict[str, Any]: - ''' - Deliver ``tractor`` related runtime variables in a `dict`. - - ''' - return _tractor_kwargs - - -_root_modules = [ - __name__, - 'piker.clearing._ems', - 'piker.clearing._client', - 'piker.data._sampling', -] - - -# TODO: factor this into a ``tractor.highlevel`` extension -# pack for the library. -class Services: - - actor_n: tractor._supervise.ActorNursery - service_n: trio.Nursery - debug_mode: bool # tractor sub-actor debug mode flag - service_tasks: dict[ - str, - tuple[ - trio.CancelScope, - tractor.Portal, - trio.Event, - ] - ] = {} - locks = defaultdict(trio.Lock) - - @classmethod - async def start_service_task( - self, - name: str, - portal: tractor.Portal, - target: Callable, - **kwargs, - - ) -> (trio.CancelScope, tractor.Context): - ''' - Open a context in a service sub-actor, add to a stack - that gets unwound at ``pikerd`` teardown. - - This allows for allocating long-running sub-services in our main - daemon and explicitly controlling their lifetimes. - - ''' - async def open_context_in_task( - task_status: TaskStatus[ - tuple[ - trio.CancelScope, - trio.Event, - Any, - ] - ] = trio.TASK_STATUS_IGNORED, - - ) -> Any: - - with trio.CancelScope() as cs: - async with portal.open_context( - target, - **kwargs, - - ) as (ctx, first): - - # unblock once the remote context has started - complete = trio.Event() - task_status.started((cs, complete, first)) - log.info( - f'`pikerd` service {name} started with value {first}' - ) - try: - # wait on any context's return value - # and any final portal result from the - # sub-actor. - ctx_res = await ctx.result() - - # NOTE: blocks indefinitely until cancelled - # either by error from the target context - # function or by being cancelled here by the - # surrounding cancel scope. - return (await portal.result(), ctx_res) - - finally: - await portal.cancel_actor() - complete.set() - self.service_tasks.pop(name) - - cs, complete, first = await self.service_n.start(open_context_in_task) - - # store the cancel scope and portal for later cancellation or - # retstart if needed. - self.service_tasks[name] = (cs, portal, complete) - - return cs, first - - @classmethod - async def cancel_service( - self, - name: str, - - ) -> Any: - ''' - Cancel the service task and actor for the given ``name``. - - ''' - log.info(f'Cancelling `pikerd` service {name}') - cs, portal, complete = self.service_tasks[name] - cs.cancel() - await complete.wait() - assert name not in self.service_tasks, \ - f'Serice task for {name} not terminated?' - - -@acm -async def open_piker_runtime( - name: str, - enable_modules: list[str] = [], - loglevel: Optional[str] = None, - - # XXX NOTE XXX: you should pretty much never want debug mode - # for data daemons when running in production. - debug_mode: bool = False, - - registry_addr: None | tuple[str, int] = None, - - # TODO: once we have `rsyscall` support we will read a config - # and spawn the service tree distributed per that. - start_method: str = 'trio', - - **tractor_kwargs, - -) -> tuple[ - tractor.Actor, - tuple[str, int], -]: - ''' - Start a piker actor who's runtime will automatically sync with - existing piker actors on the local link based on configuration. - - Can be called from a subactor or any program that needs to start - a root actor. - - ''' - try: - # check for existing runtime - actor = tractor.current_actor().uid - - except tractor._exceptions.NoRuntime: - - registry_addr = registry_addr or _default_reg_addr - - async with ( - tractor.open_root_actor( - - # passed through to ``open_root_actor`` - arbiter_addr=registry_addr, - name=name, - loglevel=loglevel, - debug_mode=debug_mode, - start_method=start_method, - - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=enable_modules, - - **tractor_kwargs, - ) as _, - - open_registry(registry_addr, ensure_exists=False) as addr, - ): - yield ( - tractor.current_actor(), - addr, - ) - else: - async with open_registry(registry_addr) as addr: - yield ( - actor, - addr, - ) - - -@acm -async def open_pikerd( - - loglevel: str | None = None, - - # XXX: you should pretty much never want debug mode - # for data daemons when running in production. - debug_mode: bool = False, - registry_addr: None | tuple[str, int] = None, - - # db init flags - tsdb: bool = False, - es: bool = False, - -) -> Services: - ''' - Start a root piker daemon who's lifetime extends indefinitely until - cancelled. - - A root actor nursery is created which can be used to create and keep - alive underling services (see below). - - ''' - - async with ( - open_piker_runtime( - - name=_root_dname, - # TODO: eventually we should be able to avoid - # having the root have more then permissions to - # spawn other specialized daemons I think? - enable_modules=_root_modules, - - loglevel=loglevel, - debug_mode=debug_mode, - registry_addr=registry_addr, - - ) as (root_actor, reg_addr), - tractor.open_nursery() as actor_nursery, - trio.open_nursery() as service_nursery, - ): - assert root_actor.accept_addr == reg_addr - - if tsdb: - from piker.data._ahab import start_ahab - from piker.data.marketstore import start_marketstore - - log.info('Spawning `marketstore` supervisor') - ctn_ready, config, (cid, pid) = await service_nursery.start( - start_ahab, - 'marketstored', - start_marketstore, - - ) - log.info( - f'`marketstored` up!\n' - f'pid: {pid}\n' - f'container id: {cid[:12]}\n' - f'config: {pformat(config)}' - ) - - if es: - from piker.data._ahab import start_ahab - from piker.data.elastic import start_elasticsearch - - log.info('Spawning `elasticsearch` supervisor') - ctn_ready, config, (cid, pid) = await service_nursery.start( - partial( - start_ahab, - 'elasticsearch', - start_elasticsearch, - start_timeout=240.0 # high cause ci - ) - ) - - log.info( - f'`elasticsearch` up!\n' - f'pid: {pid}\n' - f'container id: {cid[:12]}\n' - f'config: {pformat(config)}' - ) - - # assign globally for future daemon/task creation - Services.actor_n = actor_nursery - Services.service_n = service_nursery - Services.debug_mode = debug_mode - - - try: - yield Services - - finally: - # TODO: is this more clever/efficient? - # if 'samplerd' in Services.service_tasks: - # await Services.cancel_service('samplerd') - service_nursery.cancel_scope.cancel() - - -@acm -async def maybe_open_runtime( - loglevel: Optional[str] = None, - **kwargs, - -) -> None: - ''' - Start the ``tractor`` runtime (a root actor) if none exists. - - ''' - name = kwargs.pop('name') - - if not tractor.current_actor(err_on_no_runtime=False): - async with open_piker_runtime( - name, - loglevel=loglevel, - **kwargs, - ) as (_, addr): - yield addr, - else: - async with open_registry() as addr: - yield addr - - -@acm -async def maybe_open_pikerd( - loglevel: Optional[str] = None, - registry_addr: None | tuple = None, - tsdb: bool = False, - es: bool = False, - - **kwargs, - -) -> tractor._portal.Portal | ClassVar[Services]: - ''' - If no ``pikerd`` daemon-root-actor can be found start it and - yield up (we should probably figure out returning a portal to self - though). - - ''' - if loglevel: - get_console_log(loglevel) - - # subtle, we must have the runtime up here or portal lookup will fail - query_name = kwargs.pop('name', f'piker_query_{os.getpid()}') - - # TODO: if we need to make the query part faster we could not init - # an actor runtime and instead just hit the socket? - # from tractor._ipc import _connect_chan, Channel - # async with _connect_chan(host, port) as chan: - # async with open_portal(chan) as arb_portal: - # yield arb_portal - - async with ( - open_piker_runtime( - name=query_name, - registry_addr=registry_addr, - loglevel=loglevel, - **kwargs, - ) as _, - tractor.find_actor( - _root_dname, - arbiter_sockaddr=registry_addr, - ) as portal - ): - # connect to any existing daemon presuming - # its registry socket was selected. - if ( - portal is not None - ): - yield portal - return - - # presume pikerd role since no daemon could be found at - # configured address - async with open_pikerd( - loglevel=loglevel, - debug_mode=kwargs.get('debug_mode', False), - registry_addr=registry_addr, - tsdb=tsdb, - es=es, - - ) as service_manager: - # in the case where we're starting up the - # tractor-piker runtime stack in **this** process - # we return no portal to self. - assert service_manager - yield service_manager - - -# `brokerd` enabled modules -# NOTE: keeping this list as small as possible is part of our caps-sec -# model and should be treated with utmost care! -_data_mods = [ - 'piker.brokers.core', - 'piker.brokers.data', - 'piker.data', - 'piker.data.feed', - 'piker.data._sampling' -] - - -@acm -async def find_service( - service_name: str, -) -> tractor.Portal | None: - - async with open_registry() as reg_addr: - log.info(f'Scanning for service `{service_name}`') - # attach to existing daemon by name if possible - async with tractor.find_actor( - service_name, - arbiter_sockaddr=reg_addr, - ) as maybe_portal: - yield maybe_portal - - -async def check_for_service( - service_name: str, - -) -> None | tuple[str, int]: - ''' - Service daemon "liveness" predicate. - - ''' - async with open_registry(ensure_exists=False) as reg_addr: - async with tractor.query_actor( - service_name, - arbiter_sockaddr=reg_addr, - ) as sockaddr: - return sockaddr - - -@acm -async def maybe_spawn_daemon( - - service_name: str, - service_task_target: Callable, - spawn_args: dict[str, Any], - loglevel: Optional[str] = None, - - singleton: bool = False, - **kwargs, - -) -> tractor.Portal: - ''' - If no ``service_name`` daemon-actor can be found, - spawn one in a local subactor and return a portal to it. - - If this function is called from a non-pikerd actor, the - spawned service will persist as long as pikerd does or - it is requested to be cancelled. - - This can be seen as a service starting api for remote-actor - clients. - - ''' - if loglevel: - get_console_log(loglevel) - - # serialize access to this section to avoid - # 2 or more tasks racing to create a daemon - lock = Services.locks[service_name] - await lock.acquire() - - async with find_service(service_name) as portal: - if portal is not None: - lock.release() - yield portal - return - - log.warning(f"Couldn't find any existing {service_name}") - - # TODO: really shouldn't the actor spawning be part of the service - # starting method `Services.start_service()` ? - - # ask root ``pikerd`` daemon to spawn the daemon we need if - # pikerd is not live we now become the root of the - # process tree - async with maybe_open_pikerd( - - loglevel=loglevel, - **kwargs, - - ) as pikerd_portal: - - # we are the root and thus are `pikerd` - # so spawn the target service directly by calling - # the provided target routine. - # XXX: this assumes that the target is well formed and will - # do the right things to setup both a sub-actor **and** call - # the ``_Services`` api from above to start the top level - # service task for that actor. - started: bool - if pikerd_portal is None: - started = await service_task_target(**spawn_args) - - else: - # tell the remote `pikerd` to start the target, - # the target can't return a non-serializable value - # since it is expected that service startingn is - # non-blocking and the target task will persist running - # on `pikerd` after the client requesting it's start - # disconnects. - started = await pikerd_portal.run( - service_task_target, - **spawn_args, - ) - - if started: - log.info(f'Service {service_name} started!') - - async with tractor.wait_for_actor(service_name) as portal: - lock.release() - yield portal - await portal.cancel_actor() - - -async def spawn_brokerd( - - brokername: str, - loglevel: Optional[str] = None, - **tractor_kwargs, - -) -> bool: - - log.info(f'Spawning {brokername} broker daemon') - - brokermod = get_brokermod(brokername) - dname = f'brokerd.{brokername}' - - extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) - tractor_kwargs.update(extra_tractor_kwargs) - - # ask `pikerd` to spawn a new sub-actor and manage it under its - # actor nursery - modpath = brokermod.__name__ - broker_enable = [modpath] - for submodname in getattr( - brokermod, - '__enable_modules__', - [], - ): - subpath = f'{modpath}.{submodname}' - broker_enable.append(subpath) - - portal = await Services.actor_n.start_actor( - dname, - enable_modules=_data_mods + broker_enable, - loglevel=loglevel, - debug_mode=Services.debug_mode, - **tractor_kwargs - ) - - # non-blocking setup of brokerd service nursery - from .data import _setup_persistent_brokerd - - await Services.start_service_task( - dname, - portal, - _setup_persistent_brokerd, - brokername=brokername, - ) - return True - - -@acm -async def maybe_spawn_brokerd( - - brokername: str, - loglevel: Optional[str] = None, - **kwargs, - -) -> tractor.Portal: - ''' - Helper to spawn a brokerd service *from* a client - who wishes to use the sub-actor-daemon. - - ''' - async with maybe_spawn_daemon( - - f'brokerd.{brokername}', - service_task_target=spawn_brokerd, - spawn_args={'brokername': brokername, 'loglevel': loglevel}, - loglevel=loglevel, - **kwargs, - - ) as portal: - yield portal - - -async def spawn_emsd( - - loglevel: Optional[str] = None, - **extra_tractor_kwargs - -) -> bool: - """ - Start the clearing engine under ``pikerd``. - - """ - log.info('Spawning emsd') - - portal = await Services.actor_n.start_actor( - 'emsd', - enable_modules=[ - 'piker.clearing._ems', - 'piker.clearing._client', - ], - loglevel=loglevel, - debug_mode=Services.debug_mode, # set by pikerd flag - **extra_tractor_kwargs - ) - - # non-blocking setup of clearing service - from .clearing._ems import _setup_persistent_emsd - - await Services.start_service_task( - 'emsd', - portal, - _setup_persistent_emsd, - ) - return True - - -@acm -async def maybe_open_emsd( - - brokername: str, - loglevel: Optional[str] = None, - **kwargs, - -) -> tractor._portal.Portal: # noqa - - async with maybe_spawn_daemon( - - 'emsd', - service_task_target=spawn_emsd, - spawn_args={'loglevel': loglevel}, - loglevel=loglevel, - **kwargs, - - ) as portal: - yield portal diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 0d84384d..f86c679e 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -29,8 +29,15 @@ import tractor from ..cli import cli from .. import watchlists as wl from ..log import get_console_log, colorize_json, get_logger -from .._daemon import maybe_spawn_brokerd, maybe_open_pikerd -from ..brokers import core, get_brokermod, data +from ..service import ( + maybe_spawn_brokerd, + maybe_open_pikerd, +) +from ..brokers import ( + core, + get_brokermod, + data, +) log = get_logger('cli') DEFAULT_BROKER = 'questrade' @@ -60,6 +67,7 @@ def get_method(client, meth_name: str): print_ok('found!.') return method + async def run_method(client, meth_name: str, **kwargs): method = get_method(client, meth_name) print('running...', end='', flush=True) @@ -67,19 +75,20 @@ async def run_method(client, meth_name: str, **kwargs): print_ok(f'done! result: {type(result)}') return result + async def run_test(broker_name: str): brokermod = get_brokermod(broker_name) total = 0 passed = 0 failed = 0 - print(f'getting client...', end='', flush=True) + print('getting client...', end='', flush=True) if not hasattr(brokermod, 'get_client'): print_error('fail! no \'get_client\' context manager found.') return async with brokermod.get_client(is_brokercheck=True) as client: - print_ok(f'done! inside client context.') + print_ok('done! inside client context.') # check for methods present on brokermod method_list = [ @@ -130,7 +139,6 @@ async def run_test(broker_name: str): total += 1 - # check for methods present con brokermod.Client and their # results @@ -180,7 +188,6 @@ def brokercheck(config, broker): trio.run(run_test, broker) - @cli.command() @click.option('--keys', '-k', multiple=True, help='Return results only for these keys') @@ -335,8 +342,6 @@ def contracts(ctx, loglevel, broker, symbol, ids): brokermod = get_brokermod(broker) get_console_log(loglevel) - - contracts = trio.run(partial(core.contracts, brokermod, symbol)) if not ids: # just print out expiry dates which can be used with diff --git a/piker/brokers/core.py b/piker/brokers/core.py index af5da3a1..3e9e1614 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -28,7 +28,7 @@ import trio from ..log import get_logger from . import get_brokermod -from .._daemon import maybe_spawn_brokerd +from ..service import maybe_spawn_brokerd from .._cacheables import open_cached_client diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index c7a49909..d6491ee7 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -177,8 +177,11 @@ def i3ipc_xdotool_manual_click_hack() -> None: ) # re-activate and focus original window - subprocess.call([ - 'xdotool', - 'windowactivate', '--sync', str(orig_win_id), - 'click', '--window', str(orig_win_id), '1', - ]) + try: + subprocess.call([ + 'xdotool', + 'windowactivate', '--sync', str(orig_win_id), + 'click', '--window', str(orig_win_id), '1', + ]) + except subprocess.TimeoutExpired: + log.exception(f'xdotool timed out?') diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 0a40b548..7d03406a 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -29,8 +29,11 @@ from tractor.trionics import broadcast_receiver from ..log import get_logger from ..data.types import Struct -from .._daemon import maybe_open_emsd -from ._messages import Order, Cancel +from ..service import maybe_open_emsd +from ._messages import ( + Order, + Cancel, +) from ..brokers import get_brokermod if TYPE_CHECKING: diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 9b6f225c..63b8321a 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -19,16 +19,18 @@ CLI commons. ''' import os -from pprint import pformat -from functools import partial import click import trio import tractor -from ..log import get_console_log, get_logger, colorize_json +from ..log import ( + get_console_log, + get_logger, + colorize_json, +) from ..brokers import get_brokermod -from .._daemon import ( +from ..service import ( _default_registry_host, _default_registry_port, ) @@ -68,7 +70,7 @@ def pikerd( ''' - from .._daemon import open_pikerd + from ..service import open_pikerd log = get_console_log(loglevel) if pdb: @@ -171,7 +173,7 @@ def cli( @click.pass_obj def services(config, tl, ports): - from .._daemon import ( + from ..service import ( open_piker_runtime, _default_registry_port, _default_registry_host, @@ -204,8 +206,8 @@ def services(config, tl, ports): def _load_clis() -> None: - from ..data import marketstore # noqa - from ..data import elastic + from ..service import marketstore # noqa + from ..service import elastic from ..data import cli # noqa from ..brokers import cli # noqa from ..ui import cli # noqa diff --git a/piker/config.py b/piker/config.py index 3ae6a665..397342e3 100644 --- a/piker/config.py +++ b/piker/config.py @@ -15,7 +15,7 @@ # along with this program. If not, see . """ -Broker configuration mgmt. +Platform configuration (files) mgmt. """ import platform @@ -26,17 +26,25 @@ from os.path import dirname import shutil from typing import Optional from pathlib import Path + from bidict import bidict import toml -from piker.testing import TEST_CONFIG_DIR_PATH + from .log import get_logger log = get_logger('broker-config') -# taken from ``click`` since apparently they have some +# XXX NOTE: taken from ``click`` since apparently they have some # super weirdness with sigint and sudo..no clue -def get_app_dir(app_name, roaming=True, force_posix=False): +# we're probably going to slowly just modify it to our own version over +# time.. +def get_app_dir( + app_name: str, + roaming: bool = True, + force_posix: bool = False, + +) -> str: r"""Returns the config folder for the application. The default behavior is to return whatever is most appropriate for the operating system. @@ -75,14 +83,30 @@ def get_app_dir(app_name, roaming=True, force_posix=False): def _posixify(name): return "-".join(name.split()).lower() - # TODO: This is a hacky way to a) determine we're testing - # and b) creating a test dir. We should aim to set a variable - # within the tractor runtimes and store testing config data - # outside of the users filesystem + # NOTE: for testing with `pytest` we leverage the `tmp_dir` + # fixture to generate (and clean up) a test-request-specific + # directory for isolated configuration files such that, + # - multiple tests can run (possibly in parallel) without data races + # on the config state, + # - we don't need to ever worry about leaking configs into the + # system thus avoiding needing to manage config cleaup fixtures or + # other bothers (since obviously `tmp_dir` cleans up after itself). + # + # In order to "pass down" the test dir path to all (sub-)actors in + # the actor tree we preload the root actor's runtime vars state (an + # internal mechanism for inheriting state down an actor tree in + # `tractor`) with the testing dir and check for it whenever we + # detect `pytest` is being used (which it isn't under normal + # operation). if "pytest" in sys.modules: - app_name = os.path.join(app_name, TEST_CONFIG_DIR_PATH) + import tractor + actor = tractor.current_actor(err_on_no_runtime=False) + if actor: # runtime is up + rvs = tractor._state._runtime_vars + testdirpath = Path(rvs['piker_vars']['piker_test_dir']) + assert testdirpath.exists(), 'piker test harness might be borked!?' + app_name = str(testdirpath) - # if WIN: if platform.system() == 'Windows': key = "APPDATA" if roaming else "LOCALAPPDATA" folder = os.environ.get(key) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index a5df96cc..f44304bf 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -42,7 +42,7 @@ from ..log import ( get_logger, get_console_log, ) -from .._daemon import maybe_spawn_daemon +from ..service import maybe_spawn_daemon if TYPE_CHECKING: from ._sharedmem import ( @@ -68,8 +68,8 @@ class Sampler: This non-instantiated type is meant to be a singleton within a `samplerd` actor-service spawned once by the user wishing to - time-step sample real-time quote feeds, see - ``._daemon.maybe_open_samplerd()`` and the below + time-step-sample (real-time) quote feeds, see + ``.service.maybe_open_samplerd()`` and the below ``register_with_sampler()``. ''' @@ -379,7 +379,7 @@ async def spawn_samplerd( update and increment count write and stream broadcasting. ''' - from piker._daemon import Services + from piker.service import Services dname = 'samplerd' log.info(f'Spawning `{dname}`') diff --git a/piker/data/cli.py b/piker/data/cli.py index 554048a4..6984d9ff 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -18,31 +18,22 @@ marketstore cli. """ -from functools import partial -from pprint import pformat - -from anyio_marketstore import open_marketstore_client import trio import tractor import click -import numpy as np -from .marketstore import ( - get_client, +from ..service.marketstore import ( + # get_client, # stream_quotes, ingest_quote_stream, # _url, - _tick_tbk_ids, - mk_tbk, + # _tick_tbk_ids, + # mk_tbk, ) from ..cli import cli from .. import watchlists as wl -from ..log import get_logger -from ._sharedmem import ( - maybe_open_shm_array, -) -from ._source import ( - base_iohlc_dtype, +from ..log import ( + get_logger, ) @@ -89,16 +80,16 @@ def ms_stream( # async def main(): # nonlocal names # async with get_client(url) as client: -# +# # if not names: # names = await client.list_symbols() -# +# # # default is to wipe db entirely. # answer = input( # "This will entirely wipe you local marketstore db @ " # f"{url} of the following symbols:\n {pformat(names)}" # "\n\nDelete [N/y]?\n") -# +# # if answer == 'y': # for sym in names: # # tbk = _tick_tbk.format(sym) @@ -107,21 +98,17 @@ def ms_stream( # await client.destroy(mk_tbk(tbk)) # else: # print("Nothing deleted.") -# +# # tractor.run(main) @cli.command() @click.option( - '--tl', - is_flag=True, - help='Enable tractor logging') -@click.option( - '--host', + '--tsdb_host', default='localhost' ) @click.option( - '--port', + '--tsdb_port', default=5993 ) @click.argument('symbols', nargs=-1) @@ -137,18 +124,93 @@ def storesh( Start an IPython shell ready to query the local marketstore db. ''' - from piker.data.marketstore import tsdb_history_update - from piker._daemon import open_piker_runtime + from piker.data.marketstore import open_tsdb_client + from piker.service import open_piker_runtime async def main(): nonlocal symbols async with open_piker_runtime( 'storesh', - enable_modules=['piker.data._ahab'], + enable_modules=['piker.service._ahab'], ): symbol = symbols[0] - await tsdb_history_update(symbol) + + async with open_tsdb_client(symbol): + # TODO: ask if user wants to write history for detected + # available shm buffers? + from tractor.trionics import ipython_embed + await ipython_embed() + + trio.run(main) + + +@cli.command() +@click.option( + '--host', + default='localhost' +) +@click.option( + '--port', + default=5993 +) +@click.option( + '--delete', + '-d', + is_flag=True, + help='Delete history (1 Min) for symbol(s)', +) +@click.argument('symbols', nargs=-1) +@click.pass_obj +def storage( + config, + host, + port, + symbols: list[str], + delete: bool, + +): + ''' + Start an IPython shell ready to query the local marketstore db. + + ''' + from piker.service.marketstore import open_tsdb_client + from piker.service import open_piker_runtime + + async def main(): + nonlocal symbols + + async with open_piker_runtime( + 'tsdb_storage', + enable_modules=['piker.service._ahab'], + ): + symbol = symbols[0] + async with open_tsdb_client(symbol) as storage: + if delete: + for fqsn in symbols: + syms = await storage.client.list_symbols() + + resp60s = await storage.delete_ts(fqsn, 60) + + msgish = resp60s.ListFields()[0][1] + if 'error' in str(msgish): + + # TODO: MEGA LOL, apparently the symbols don't + # flush out until you refresh something or other + # (maybe the WALFILE)... #lelandorlulzone, classic + # alpaca(Rtm) design here .. + # well, if we ever can make this work we + # probably want to dogsplain the real reason + # for the delete errurz..llululu + if fqsn not in syms: + log.error(f'Pair {fqsn} dne in DB') + + log.error(f'Deletion error: {fqsn}\n{msgish}') + + resp1s = await storage.delete_ts(fqsn, 1) + msgish = resp1s.ListFields()[0][1] + if 'error' in str(msgish): + log.error(f'Deletion error: {fqsn}\n{msgish}') trio.run(main) @@ -182,7 +244,7 @@ def ingest(config, name, test_file, tl): async def entry_point(): async with tractor.open_nursery() as n: - for provider, symbols in grouped_syms.items(): + for provider, symbols in grouped_syms.items(): await n.run_in_actor( ingest_quote_stream, name='ingest_marketstore', diff --git a/piker/data/feed.py b/piker/data/feed.py index 906f4bb4..69d5be7d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -58,7 +58,7 @@ from ..log import ( get_logger, get_console_log, ) -from .._daemon import ( +from ..service import ( maybe_spawn_brokerd, check_for_service, ) @@ -86,7 +86,7 @@ from ..brokers._util import ( ) if TYPE_CHECKING: - from .marketstore import Storage + from ..service.marketstore import Storage log = get_logger(__name__) @@ -865,7 +865,7 @@ async def manage_history( ): log.info('Found existing `marketstored`') - from . import marketstore + from ..service import marketstore async with ( marketstore.open_storage_client(fqsn)as storage, ): diff --git a/piker/log.py b/piker/log.py index 804e09dc..a36beec0 100644 --- a/piker/log.py +++ b/piker/log.py @@ -21,7 +21,11 @@ import logging import json import tractor -from pygments import highlight, lexers, formatters +from pygments import ( + highlight, + lexers, + formatters, +) # Makes it so we only see the full module name when using ``__name__`` # without the extra "piker." prefix. @@ -32,26 +36,48 @@ def get_logger( name: str = None, ) -> logging.Logger: - '''Return the package log or a sub-log for `name` if provided. + ''' + Return the package log or a sub-log for `name` if provided. + ''' return tractor.log.get_logger(name=name, _root_name=_proj_name) -def get_console_log(level: str = None, name: str = None) -> logging.Logger: - '''Get the package logger and enable a handler which writes to stderr. +def get_console_log( + level: str | None = None, + name: str | None = None, + +) -> logging.Logger: + ''' + Get the package logger and enable a handler which writes to stderr. Yeah yeah, i know we can use ``DictConfig``. You do it... + ''' return tractor.log.get_console_log( - level, name=name, _root_name=_proj_name) # our root logger + level, + name=name, + _root_name=_proj_name, + ) # our root logger -def colorize_json(data, style='algol_nu'): - """Colorize json output using ``pygments``. - """ - formatted_json = json.dumps(data, sort_keys=True, indent=4) +def colorize_json( + data: dict, + style='algol_nu', +): + ''' + Colorize json output using ``pygments``. + + ''' + formatted_json = json.dumps( + data, + sort_keys=True, + indent=4, + ) return highlight( - formatted_json, lexers.JsonLexer(), + formatted_json, + lexers.JsonLexer(), + # likeable styles: algol_nu, tango, monokai formatters.TerminalTrueColorFormatter(style=style) ) diff --git a/piker/service/__init__.py b/piker/service/__init__.py new file mode 100644 index 00000000..3b9767cd --- /dev/null +++ b/piker/service/__init__.py @@ -0,0 +1,60 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Actor-runtime service orchestration machinery. + +""" +from __future__ import annotations + +from ._mngr import Services +from ._registry import ( # noqa + _tractor_kwargs, + _default_reg_addr, + _default_registry_host, + _default_registry_port, + open_registry, + find_service, + check_for_service, +) +from ._daemon import ( # noqa + maybe_spawn_daemon, + spawn_brokerd, + maybe_spawn_brokerd, + spawn_emsd, + maybe_open_emsd, +) +from ._actor_runtime import ( + open_piker_runtime, + maybe_open_pikerd, + open_pikerd, + get_tractor_runtime_kwargs, +) + + +__all__ = [ + 'check_for_service', + 'Services', + 'maybe_spawn_daemon', + 'spawn_brokerd', + 'maybe_spawn_brokerd', + 'spawn_emsd', + 'maybe_open_emsd', + 'open_piker_runtime', + 'maybe_open_pikerd', + 'open_pikerd', + 'get_tractor_runtime_kwargs', +] diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py new file mode 100644 index 00000000..b92ad221 --- /dev/null +++ b/piker/service/_actor_runtime.py @@ -0,0 +1,347 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +``tractor`` wrapping + default config to bootstrap the `pikerd`. + +""" +from __future__ import annotations +from pprint import pformat +from functools import partial +import os +from typing import ( + Optional, + Any, + ClassVar, +) +from contextlib import ( + asynccontextmanager as acm, +) + +import tractor +import trio + +from ..log import ( + get_logger, + get_console_log, +) +from ._mngr import ( + Services, +) +from ._registry import ( # noqa + _tractor_kwargs, + _default_reg_addr, + open_registry, +) + +log = get_logger(__name__) + + +def get_tractor_runtime_kwargs() -> dict[str, Any]: + ''' + Deliver ``tractor`` related runtime variables in a `dict`. + + ''' + return _tractor_kwargs + + +@acm +async def open_piker_runtime( + name: str, + enable_modules: list[str] = [], + loglevel: Optional[str] = None, + + # XXX NOTE XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = False, + + registry_addr: None | tuple[str, int] = None, + + # TODO: once we have `rsyscall` support we will read a config + # and spawn the service tree distributed per that. + start_method: str = 'trio', + + tractor_runtime_overrides: dict | None = None, + **tractor_kwargs, + +) -> tuple[ + tractor.Actor, + tuple[str, int], +]: + ''' + Start a piker actor who's runtime will automatically sync with + existing piker actors on the local link based on configuration. + + Can be called from a subactor or any program that needs to start + a root actor. + + ''' + try: + # check for existing runtime + actor = tractor.current_actor().uid + + except tractor._exceptions.NoRuntime: + tractor._state._runtime_vars[ + 'piker_vars'] = tractor_runtime_overrides + + registry_addr = registry_addr or _default_reg_addr + + async with ( + tractor.open_root_actor( + + # passed through to ``open_root_actor`` + arbiter_addr=registry_addr, + name=name, + loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, + + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=enable_modules, + + **tractor_kwargs, + ) as _, + + open_registry(registry_addr, ensure_exists=False) as addr, + ): + yield ( + tractor.current_actor(), + addr, + ) + else: + async with open_registry(registry_addr) as addr: + yield ( + actor, + addr, + ) + + +_root_dname = 'pikerd' +_root_modules = [ + __name__, + 'piker.service._daemon', + 'piker.clearing._ems', + 'piker.clearing._client', + 'piker.data._sampling', +] + + +@acm +async def open_pikerd( + + loglevel: str | None = None, + + # XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = False, + registry_addr: None | tuple[str, int] = None, + + # db init flags + tsdb: bool = False, + es: bool = False, + drop_root_perms_for_ahab: bool = True, + + **kwargs, + +) -> Services: + ''' + Start a root piker daemon with an indefinite lifetime. + + A root actor nursery is created which can be used to create and keep + alive underling services (see below). + + ''' + async with ( + open_piker_runtime( + + name=_root_dname, + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=_root_modules, + loglevel=loglevel, + debug_mode=debug_mode, + registry_addr=registry_addr, + + **kwargs, + + ) as (root_actor, reg_addr), + tractor.open_nursery() as actor_nursery, + trio.open_nursery() as service_nursery, + ): + if root_actor.accept_addr != reg_addr: + raise RuntimeError(f'Daemon failed to bind on {reg_addr}!?') + + # assign globally for future daemon/task creation + Services.actor_n = actor_nursery + Services.service_n = service_nursery + Services.debug_mode = debug_mode + + if tsdb: + from ._ahab import start_ahab + from .marketstore import start_marketstore + + log.info('Spawning `marketstore` supervisor') + ctn_ready, config, (cid, pid) = await service_nursery.start( + partial( + start_ahab, + 'marketstored', + start_marketstore, + loglevel=loglevel, + drop_root_perms=drop_root_perms_for_ahab, + ) + + ) + log.info( + f'`marketstored` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + + if es: + from ._ahab import start_ahab + from .elastic import start_elasticsearch + + log.info('Spawning `elasticsearch` supervisor') + ctn_ready, config, (cid, pid) = await service_nursery.start( + partial( + start_ahab, + 'elasticsearch', + start_elasticsearch, + loglevel=loglevel, + drop_root_perms=drop_root_perms_for_ahab, + ) + ) + + log.info( + f'`elasticsearch` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + + try: + yield Services + + finally: + # TODO: is this more clever/efficient? + # if 'samplerd' in Services.service_tasks: + # await Services.cancel_service('samplerd') + service_nursery.cancel_scope.cancel() + + +# TODO: do we even need this? +# @acm +# async def maybe_open_runtime( +# loglevel: Optional[str] = None, +# **kwargs, + +# ) -> None: +# ''' +# Start the ``tractor`` runtime (a root actor) if none exists. + +# ''' +# name = kwargs.pop('name') + +# if not tractor.current_actor(err_on_no_runtime=False): +# async with open_piker_runtime( +# name, +# loglevel=loglevel, +# **kwargs, +# ) as (_, addr): +# yield addr, +# else: +# async with open_registry() as addr: +# yield addr + + +@acm +async def maybe_open_pikerd( + loglevel: Optional[str] = None, + registry_addr: None | tuple = None, + tsdb: bool = False, + es: bool = False, + drop_root_perms_for_ahab: bool = True, + + **kwargs, + +) -> tractor._portal.Portal | ClassVar[Services]: + ''' + If no ``pikerd`` daemon-root-actor can be found start it and + yield up (we should probably figure out returning a portal to self + though). + + ''' + if loglevel: + get_console_log(loglevel) + + # subtle, we must have the runtime up here or portal lookup will fail + query_name = kwargs.pop( + 'name', + f'piker_query_{os.getpid()}', + ) + + # TODO: if we need to make the query part faster we could not init + # an actor runtime and instead just hit the socket? + # from tractor._ipc import _connect_chan, Channel + # async with _connect_chan(host, port) as chan: + # async with open_portal(chan) as arb_portal: + # yield arb_portal + + async with ( + open_piker_runtime( + name=query_name, + registry_addr=registry_addr, + loglevel=loglevel, + **kwargs, + ) as _, + + tractor.find_actor( + _root_dname, + arbiter_sockaddr=registry_addr, + ) as portal + ): + # connect to any existing daemon presuming + # its registry socket was selected. + if ( + portal is not None + ): + yield portal + return + + # presume pikerd role since no daemon could be found at + # configured address + async with open_pikerd( + loglevel=loglevel, + registry_addr=registry_addr, + + # ahabd (docker super) specific controls + tsdb=tsdb, + es=es, + drop_root_perms_for_ahab=drop_root_perms_for_ahab, + + # passthrough to ``tractor`` init + **kwargs, + + ) as service_manager: + # in the case where we're starting up the + # tractor-piker runtime stack in **this** process + # we return no portal to self. + assert service_manager + yield service_manager diff --git a/piker/data/_ahab.py b/piker/service/_ahab.py similarity index 65% rename from piker/data/_ahab.py rename to piker/service/_ahab.py index 39a5b46a..7c3133e1 100644 --- a/piker/data/_ahab.py +++ b/piker/service/_ahab.py @@ -15,9 +15,12 @@ # along with this program. If not, see . ''' -Supervisor for docker with included specific-image service helpers. +Supervisor for ``docker`` with included async and SC wrapping +to ensure a cancellable container lifetime system. ''' +from collections import ChainMap +from functools import partial import os import time from typing import ( @@ -45,7 +48,10 @@ from requests.exceptions import ( ReadTimeout, ) -from ..log import get_logger, get_console_log +from ..log import ( + get_logger, + get_console_log, +) from .. import config log = get_logger(__name__) @@ -124,10 +130,19 @@ class Container: async def process_logs_until( self, + log_msg_key: str, + # this is a predicate func for matching log msgs emitted by the # underlying containerized app patt_matcher: Callable[[str], bool], - bp_on_msg: bool = False, + + # XXX WARNING XXX: do not touch this sleep value unless + # you know what you are doing! the value is critical to + # making sure the caller code inside the startup context + # does not timeout BEFORE we receive a match on the + # ``patt_matcher()`` predicate above. + checkpoint_period: float = 0.001, + ) -> bool: ''' Attempt to capture container log messages and relay through our @@ -137,12 +152,14 @@ class Container: seen_so_far = self.seen_so_far while True: + logs = self.cntr.logs() try: logs = self.cntr.logs() except ( docker.errors.NotFound, docker.errors.APIError ): + log.exception('Failed to parse logs?') return False entries = logs.decode().split('\n') @@ -155,25 +172,23 @@ class Container: entry = entry.strip() try: record = json.loads(entry) - - if 'msg' in record: - msg = record['msg'] - elif 'message' in record: - msg = record['message'] - else: - raise KeyError(f'Unexpected log format\n{record}') - + msg = record[log_msg_key] level = record['level'] except json.JSONDecodeError: msg = entry level = 'error' - if msg and entry not in seen_so_far: - seen_so_far.add(entry) - if bp_on_msg: - await tractor.breakpoint() + # TODO: do we need a more general mechanism + # for these kinda of "log record entries"? + # if 'Error' in entry: + # raise RuntimeError(entry) + if ( + msg + and entry not in seen_so_far + ): + seen_so_far.add(entry) getattr(log, level.lower(), log.error)(f'{msg}') if level == 'fatal': @@ -183,10 +198,15 @@ class Container: return True # do a checkpoint so we don't block if cancelled B) - await trio.sleep(0.1) + await trio.sleep(checkpoint_period) return False + @property + def cuid(self) -> str: + fqcn: str = self.cntr.attrs['Config']['Image'] + return f'{fqcn}[{self.cntr.short_id}]' + def try_signal( self, signal: str = 'SIGINT', @@ -222,17 +242,23 @@ class Container: async def cancel( self, - stop_msg: str, + log_msg_key: str, + stop_predicate: Callable[[str], bool], + hard_kill: bool = False, ) -> None: + ''' + Attempt to cancel this container gracefully, fail over to + a hard kill on timeout. + ''' cid = self.cntr.id # first try a graceful cancel log.cancel( - f'SIGINT cancelling container: {cid}\n' - f'waiting on stop msg: "{stop_msg}"' + f'SIGINT cancelling container: {self.cuid}\n' + 'waiting on stop predicate...' ) self.try_signal('SIGINT') @@ -243,7 +269,10 @@ class Container: log.cancel('polling for CNTR logs...') try: - await self.process_logs_until(stop_msg) + await self.process_logs_until( + log_msg_key, + stop_predicate, + ) except ApplicationLogError: hard_kill = True else: @@ -301,12 +330,16 @@ class Container: async def open_ahabd( ctx: tractor.Context, endpoint: str, # ns-pointer str-msg-type - start_timeout: float = 1.0, + loglevel: str | None = 'cancel', **kwargs, ) -> None: - get_console_log('info', name=__name__) + + log = get_console_log( + loglevel, + name=__name__, + ) async with open_docker() as client: @@ -317,42 +350,110 @@ async def open_ahabd( ( dcntr, cntr_config, - start_lambda, - stop_lambda, + start_pred, + stop_pred, ) = ep_func(client) cntr = Container(dcntr) - with trio.move_on_after(start_timeout): - found = await cntr.process_logs_until(start_lambda) + conf: ChainMap[str, Any] = ChainMap( - if not found and dcntr not in client.containers.list(): - for entry in cntr.seen_so_far: - log.info(entry) - - raise RuntimeError( - f'Failed to start {dcntr.id} check logs deats' - ) - - await ctx.started(( - cntr.cntr.id, - os.getpid(), + # container specific cntr_config, - )) + + # defaults + { + # startup time limit which is the max the supervisor + # will wait for the container to be registered in + # ``client.containers.list()`` + 'startup_timeout': 1.0, + + # how fast to poll for the starup predicate by sleeping + # this amount incrementally thus yielding to the + # ``trio`` scheduler on during sync polling execution. + 'startup_query_period': 0.001, + + # str-key value expected to contain log message body-contents + # when read using: + # ``json.loads(entry for entry in DockerContainer.logs())`` + 'log_msg_key': 'msg', + + + # startup sync func, like `Nursery.started()` + 'started_afunc': None, + }, + ) try: + with trio.move_on_after(conf['startup_timeout']) as cs: + async with trio.open_nursery() as tn: + tn.start_soon( + partial( + cntr.process_logs_until, + log_msg_key=conf['log_msg_key'], + patt_matcher=start_pred, + checkpoint_period=conf['startup_query_period'], + ) + ) + + # optional blocking routine + started = conf['started_afunc'] + if started: + await started() + + # poll for container startup or timeout + while not cs.cancel_called: + if dcntr in client.containers.list(): + break + + await trio.sleep(conf['startup_query_period']) + + # sync with remote caller actor-task but allow log + # processing to continue running in bg. + await ctx.started(( + cntr.cntr.id, + os.getpid(), + cntr_config, + )) + + # XXX: if we timeout on finding the "startup msg" we + # expect then we want to FOR SURE raise an error + # upwards! + if cs.cancelled_caught: + # if dcntr not in client.containers.list(): + for entry in cntr.seen_so_far: + log.info(entry) + + raise DockerNotStarted( + f'Failed to start container: {cntr.cuid}\n' + f'due to timeout={conf["startup_timeout"]}s\n\n' + "check ur container's logs!" + ) + # TODO: we might eventually want a proxy-style msg-prot here # to allow remote control of containers without needing # callers to have root perms? await trio.sleep_forever() finally: - await cntr.cancel(stop_lambda) + # TODO: ensure loglevel can be set and teardown logs are + # reported if possible on error or cancel.. + # XXX WARNING: currently shielding here can result in hangs + # on ctl-c from user.. ideally we can avoid a cancel getting + # consumed and not propagating whilst still doing teardown + # logging.. + with trio.CancelScope(shield=True): + await cntr.cancel( + log_msg_key=conf['log_msg_key'], + stop_predicate=stop_pred, + ) async def start_ahab( service_name: str, endpoint: Callable[docker.DockerClient, DockerContainer], - start_timeout: float = 1.0, + loglevel: str | None = 'cancel', + drop_root_perms: bool = True, + task_status: TaskStatus[ tuple[ trio.Event, @@ -373,13 +474,12 @@ async def start_ahab( ''' cn_ready = trio.Event() try: - async with tractor.open_nursery( - loglevel='runtime', - ) as tn: + async with tractor.open_nursery() as an: - portal = await tn.start_actor( + portal = await an.start_actor( service_name, - enable_modules=[__name__] + enable_modules=[__name__], + loglevel=loglevel, ) # TODO: we have issues with this on teardown @@ -389,7 +489,10 @@ async def start_ahab( # de-escalate root perms to the original user # after the docker supervisor actor is spawned. - if config._parent_user: + if ( + drop_root_perms + and config._parent_user + ): import pwd os.setuid( pwd.getpwnam( @@ -400,7 +503,7 @@ async def start_ahab( async with portal.open_context( open_ahabd, endpoint=str(NamespacePath.from_ref(endpoint)), - start_timeout=start_timeout + loglevel='cancel', ) as (ctx, first): cid, pid, cntr_config = first diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py new file mode 100644 index 00000000..45d6cb81 --- /dev/null +++ b/piker/service/_daemon.py @@ -0,0 +1,271 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Daemon-actor spawning "endpoint-hooks". + +""" +from __future__ import annotations +from typing import ( + Optional, + Callable, + Any, +) +from contextlib import ( + asynccontextmanager as acm, +) + +import tractor + +from ..log import ( + get_logger, + get_console_log, +) +from ..brokers import get_brokermod +from ._mngr import ( + Services, +) +from ._actor_runtime import maybe_open_pikerd +from ._registry import find_service + +log = get_logger(__name__) + +# `brokerd` enabled modules +# NOTE: keeping this list as small as possible is part of our caps-sec +# model and should be treated with utmost care! +_data_mods = [ + 'piker.brokers.core', + 'piker.brokers.data', + 'piker.data', + 'piker.data.feed', + 'piker.data._sampling' +] + + +@acm +async def maybe_spawn_daemon( + + service_name: str, + service_task_target: Callable, + spawn_args: dict[str, Any], + loglevel: Optional[str] = None, + + singleton: bool = False, + **kwargs, + +) -> tractor.Portal: + ''' + If no ``service_name`` daemon-actor can be found, + spawn one in a local subactor and return a portal to it. + + If this function is called from a non-pikerd actor, the + spawned service will persist as long as pikerd does or + it is requested to be cancelled. + + This can be seen as a service starting api for remote-actor + clients. + + ''' + if loglevel: + get_console_log(loglevel) + + # serialize access to this section to avoid + # 2 or more tasks racing to create a daemon + lock = Services.locks[service_name] + await lock.acquire() + + async with find_service(service_name) as portal: + if portal is not None: + lock.release() + yield portal + return + + log.warning(f"Couldn't find any existing {service_name}") + + # TODO: really shouldn't the actor spawning be part of the service + # starting method `Services.start_service()` ? + + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + + loglevel=loglevel, + **kwargs, + + ) as pikerd_portal: + + # we are the root and thus are `pikerd` + # so spawn the target service directly by calling + # the provided target routine. + # XXX: this assumes that the target is well formed and will + # do the right things to setup both a sub-actor **and** call + # the ``_Services`` api from above to start the top level + # service task for that actor. + started: bool + if pikerd_portal is None: + started = await service_task_target(**spawn_args) + + else: + # tell the remote `pikerd` to start the target, + # the target can't return a non-serializable value + # since it is expected that service startingn is + # non-blocking and the target task will persist running + # on `pikerd` after the client requesting it's start + # disconnects. + started = await pikerd_portal.run( + service_task_target, + **spawn_args, + ) + + if started: + log.info(f'Service {service_name} started!') + + async with tractor.wait_for_actor(service_name) as portal: + lock.release() + yield portal + await portal.cancel_actor() + + +async def spawn_brokerd( + + brokername: str, + loglevel: Optional[str] = None, + **tractor_kwargs, + +) -> bool: + + log.info(f'Spawning {brokername} broker daemon') + + brokermod = get_brokermod(brokername) + dname = f'brokerd.{brokername}' + + extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) + tractor_kwargs.update(extra_tractor_kwargs) + + # ask `pikerd` to spawn a new sub-actor and manage it under its + # actor nursery + modpath = brokermod.__name__ + broker_enable = [modpath] + for submodname in getattr( + brokermod, + '__enable_modules__', + [], + ): + subpath = f'{modpath}.{submodname}' + broker_enable.append(subpath) + + portal = await Services.actor_n.start_actor( + dname, + enable_modules=_data_mods + broker_enable, + loglevel=loglevel, + debug_mode=Services.debug_mode, + **tractor_kwargs + ) + + # non-blocking setup of brokerd service nursery + from ..data import _setup_persistent_brokerd + + await Services.start_service_task( + dname, + portal, + _setup_persistent_brokerd, + brokername=brokername, + ) + return True + + +@acm +async def maybe_spawn_brokerd( + + brokername: str, + loglevel: Optional[str] = None, + **kwargs, + +) -> tractor.Portal: + ''' + Helper to spawn a brokerd service *from* a client + who wishes to use the sub-actor-daemon. + + ''' + async with maybe_spawn_daemon( + + f'brokerd.{brokername}', + service_task_target=spawn_brokerd, + spawn_args={ + 'brokername': brokername, + 'loglevel': loglevel, + }, + loglevel=loglevel, + **kwargs, + + ) as portal: + yield portal + + +async def spawn_emsd( + + loglevel: Optional[str] = None, + **extra_tractor_kwargs + +) -> bool: + """ + Start the clearing engine under ``pikerd``. + + """ + log.info('Spawning emsd') + + portal = await Services.actor_n.start_actor( + 'emsd', + enable_modules=[ + 'piker.clearing._ems', + 'piker.clearing._client', + ], + loglevel=loglevel, + debug_mode=Services.debug_mode, # set by pikerd flag + **extra_tractor_kwargs + ) + + # non-blocking setup of clearing service + from ..clearing._ems import _setup_persistent_emsd + + await Services.start_service_task( + 'emsd', + portal, + _setup_persistent_emsd, + ) + return True + + +@acm +async def maybe_open_emsd( + + brokername: str, + loglevel: Optional[str] = None, + **kwargs, + +) -> tractor._portal.Portal: # noqa + + async with maybe_spawn_daemon( + + 'emsd', + service_task_target=spawn_emsd, + spawn_args={'loglevel': loglevel}, + loglevel=loglevel, + **kwargs, + + ) as portal: + yield portal diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py new file mode 100644 index 00000000..04f396af --- /dev/null +++ b/piker/service/_mngr.py @@ -0,0 +1,136 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +daemon-service management API. + +""" +from collections import defaultdict +from typing import ( + Callable, + Any, +) + +import trio +from trio_typing import TaskStatus +import tractor + +from ..log import ( + get_logger, +) + +log = get_logger(__name__) + + +# TODO: factor this into a ``tractor.highlevel`` extension +# pack for the library. +class Services: + + actor_n: tractor._supervise.ActorNursery + service_n: trio.Nursery + debug_mode: bool # tractor sub-actor debug mode flag + service_tasks: dict[ + str, + tuple[ + trio.CancelScope, + tractor.Portal, + trio.Event, + ] + ] = {} + locks = defaultdict(trio.Lock) + + @classmethod + async def start_service_task( + self, + name: str, + portal: tractor.Portal, + target: Callable, + **kwargs, + + ) -> (trio.CancelScope, tractor.Context): + ''' + Open a context in a service sub-actor, add to a stack + that gets unwound at ``pikerd`` teardown. + + This allows for allocating long-running sub-services in our main + daemon and explicitly controlling their lifetimes. + + ''' + async def open_context_in_task( + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + trio.Event, + Any, + ] + ] = trio.TASK_STATUS_IGNORED, + + ) -> Any: + + with trio.CancelScope() as cs: + async with portal.open_context( + target, + **kwargs, + + ) as (ctx, first): + + # unblock once the remote context has started + complete = trio.Event() + task_status.started((cs, complete, first)) + log.info( + f'`pikerd` service {name} started with value {first}' + ) + try: + # wait on any context's return value + # and any final portal result from the + # sub-actor. + ctx_res = await ctx.result() + + # NOTE: blocks indefinitely until cancelled + # either by error from the target context + # function or by being cancelled here by the + # surrounding cancel scope. + return (await portal.result(), ctx_res) + + finally: + await portal.cancel_actor() + complete.set() + self.service_tasks.pop(name) + + cs, complete, first = await self.service_n.start(open_context_in_task) + + # store the cancel scope and portal for later cancellation or + # retstart if needed. + self.service_tasks[name] = (cs, portal, complete) + + return cs, first + + @classmethod + async def cancel_service( + self, + name: str, + + ) -> Any: + ''' + Cancel the service task and actor for the given ``name``. + + ''' + log.info(f'Cancelling `pikerd` service {name}') + cs, portal, complete = self.service_tasks[name] + cs.cancel() + await complete.wait() + assert name not in self.service_tasks, \ + f'Serice task for {name} not terminated?' diff --git a/piker/service/_registry.py b/piker/service/_registry.py new file mode 100644 index 00000000..f487e2a4 --- /dev/null +++ b/piker/service/_registry.py @@ -0,0 +1,144 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Inter-actor "discovery" (protocol) layer. + +""" +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, +) +from typing import ( + Any, +) + +import tractor + + +from ..log import ( + get_logger, +) + +log = get_logger(__name__) + +_default_registry_host: str = '127.0.0.1' +_default_registry_port: int = 6116 +_default_reg_addr: tuple[str, int] = ( + _default_registry_host, + _default_registry_port, +) + + +# NOTE: this value is set as an actor-global once the first endpoint +# who is capable, spawns a `pikerd` service tree. +_registry: Registry | None = None + + +class Registry: + addr: None | tuple[str, int] = None + + # TODO: table of uids to sockaddrs + peers: dict[ + tuple[str, str], + tuple[str, int], + ] = {} + + +_tractor_kwargs: dict[str, Any] = {} + + +@acm +async def open_registry( + addr: None | tuple[str, int] = None, + ensure_exists: bool = True, + +) -> tuple[str, int]: + + global _tractor_kwargs + actor = tractor.current_actor() + uid = actor.uid + if ( + Registry.addr is not None + and addr + ): + raise RuntimeError( + f'`{uid}` registry addr already bound @ {_registry.sockaddr}' + ) + + was_set: bool = False + + if ( + not tractor.is_root_process() + and Registry.addr is None + ): + Registry.addr = actor._arb_addr + + if ( + ensure_exists + and Registry.addr is None + ): + raise RuntimeError( + f"`{uid}` registry should already exist bug doesn't?" + ) + + if ( + Registry.addr is None + ): + was_set = True + Registry.addr = addr or _default_reg_addr + + _tractor_kwargs['arbiter_addr'] = Registry.addr + + try: + yield Registry.addr + finally: + # XXX: always clear the global addr if we set it so that the + # next (set of) calls will apply whatever new one is passed + # in. + if was_set: + Registry.addr = None + + +@acm +async def find_service( + service_name: str, +) -> tractor.Portal | None: + + async with open_registry() as reg_addr: + log.info(f'Scanning for service `{service_name}`') + # attach to existing daemon by name if possible + async with tractor.find_actor( + service_name, + arbiter_sockaddr=reg_addr, + ) as maybe_portal: + yield maybe_portal + + +async def check_for_service( + service_name: str, + +) -> None | tuple[str, int]: + ''' + Service daemon "liveness" predicate. + + ''' + async with open_registry(ensure_exists=False) as reg_addr: + async with tractor.query_actor( + service_name, + arbiter_sockaddr=reg_addr, + ) as sockaddr: + return sockaddr diff --git a/piker/data/elastic.py b/piker/service/elastic.py similarity index 68% rename from piker/data/elastic.py rename to piker/service/elastic.py index 43c6afd0..31221d57 100644 --- a/piker/data/elastic.py +++ b/piker/service/elastic.py @@ -15,17 +15,11 @@ # along with this program. If not, see . from __future__ import annotations -from contextlib import asynccontextmanager as acm -from pprint import pformat from typing import ( Any, TYPE_CHECKING, ) -import pyqtgraph as pg -import numpy as np -import tractor - if TYPE_CHECKING: import docker @@ -46,6 +40,9 @@ log = get_logger(__name__) _config = { 'port': 19200, 'log_level': 'debug', + + # hardcoded to our image version + 'version': '7.17.4', } @@ -65,14 +62,14 @@ def start_elasticsearch( -itd \ --rm \ --network=host \ - --mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \ + --mount type=bind,source="$(pwd)"/elastic,\ + target=/usr/share/elasticsearch/data \ --env "elastic_username=elastic" \ --env "elastic_password=password" \ --env "xpack.security.enabled=false" \ elastic ''' - import docker get_console_log('info', name=__name__) dcntr: DockerContainer = client.containers.run( @@ -83,27 +80,49 @@ def start_elasticsearch( remove=True ) - async def start_matcher(msg: str): + async def health_query(msg: str | None = None): + if ( + msg + and _config['version'] in msg + ): + return True + try: health = (await asks.get( - f'http://localhost:19200/_cat/health', + 'http://localhost:19200/_cat/health', params={'format': 'json'} )).json() + kog.info( + 'ElasticSearch cntr health:\n' + f'{health}' + ) except OSError: - log.error('couldnt reach elastic container') + log.exception('couldnt reach elastic container') return False log.info(health) return health[0]['status'] == 'green' - async def stop_matcher(msg: str): + async def chk_for_closed_msg(msg: str): return msg == 'closed' return ( dcntr, - {}, + { + # apparently we're REALLY tolerant of startup latency + # for CI XD + 'startup_timeout': 240.0, + + # XXX: decrease http poll period bc docker + # is shite at handling fast poll rates.. + 'startup_query_period': 0.1, + + 'log_msg_key': 'message', + + # 'started_afunc': health_query, + }, # expected startup and stop msgs - start_matcher, - stop_matcher, + health_query, + chk_for_closed_msg, ) diff --git a/piker/data/marketstore.py b/piker/service/marketstore.py similarity index 97% rename from piker/data/marketstore.py rename to piker/service/marketstore.py index 190667d6..5c4f90db 100644 --- a/piker/data/marketstore.py +++ b/piker/service/marketstore.py @@ -26,7 +26,6 @@ from __future__ import annotations from contextlib import asynccontextmanager as acm from datetime import datetime -from pprint import pformat from typing import ( Any, Optional, @@ -55,7 +54,7 @@ if TYPE_CHECKING: import docker from ._ahab import DockerContainer -from .feed import maybe_open_feed +from ..data.feed import maybe_open_feed from ..log import get_logger, get_console_log from .._profile import Profiler @@ -63,11 +62,12 @@ from .._profile import Profiler log = get_logger(__name__) -# container level config +# ahabd-supervisor and container level config _config = { 'grpc_listen_port': 5995, 'ws_listen_port': 5993, 'log_level': 'debug', + 'startup_timeout': 2, } _yaml_config = ''' @@ -135,7 +135,7 @@ def start_marketstore( # create dirs when dne if not os.path.isdir(config._config_dir): - Path(config._config_dir).mkdir(parents=True, exist_ok=True) + Path(config._config_dir).mkdir(parents=True, exist_ok=True) if not os.path.isdir(mktsdir): os.mkdir(mktsdir) @@ -185,7 +185,11 @@ def start_marketstore( config_dir_mnt, data_dir_mnt, ], + + # XXX: this must be set to allow backgrounding/non-blocking + # usage interaction with the container's process. detach=True, + # stop_signal='SIGINT', init=True, # remove=True, @@ -324,7 +328,7 @@ def quote_to_marketstore_structarray( @acm async def get_client( host: str = 'localhost', - port: int = 5995 + port: int = _config['grpc_listen_port'], ) -> MarketstoreClient: ''' @@ -510,7 +514,6 @@ class Storage: client = self.client syms = await client.list_symbols() - print(syms) if key not in syms: raise KeyError(f'`{key}` table key not found in\n{syms}?') @@ -627,10 +630,10 @@ async def open_storage_client( yield Storage(client) -async def tsdb_history_update( - fqsn: Optional[str] = None, - -) -> list[str]: +@acm +async def open_tsdb_client( + fqsn: str, +) -> Storage: # TODO: real-time dedicated task for ensuring # history consistency between the tsdb, shm and real-time feed.. @@ -659,7 +662,7 @@ async def tsdb_history_update( # - https://github.com/pikers/piker/issues/98 # profiler = Profiler( - disabled=False, # not pg_profile_enabled(), + disabled=True, # not pg_profile_enabled(), delayed=False, ) @@ -700,14 +703,10 @@ async def tsdb_history_update( # profiler('Finished db arrays diffs') - syms = await storage.client.list_symbols() - log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') - profiler(f'listed symbols {syms}') - - # TODO: ask if user wants to write history for detected - # available shm buffers? - from tractor.trionics import ipython_embed - await ipython_embed() + syms = await storage.client.list_symbols() + # log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') + # profiler(f'listed symbols {syms}') + yield storage # for array in [to_append, to_prepend]: # if array is None: diff --git a/piker/testing/__init__.py b/piker/testing/__init__.py deleted file mode 100644 index 5e3ac93a..00000000 --- a/piker/testing/__init__.py +++ /dev/null @@ -1 +0,0 @@ -TEST_CONFIG_DIR_PATH = '_testing' diff --git a/piker/ui/_app.py b/piker/ui/_app.py index 3be073e7..9978dbe3 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -24,7 +24,7 @@ from types import ModuleType from PyQt5.QtCore import QEvent import trio -from .._daemon import maybe_spawn_brokerd +from ..service import maybe_spawn_brokerd from . import _event from ._exec import run_qtractor from ..data.feed import install_brokerd_search diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index d8eabb70..19663cac 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -49,7 +49,7 @@ from qdarkstyle import DarkPalette import trio from outcome import Error -from .._daemon import ( +from ..service import ( maybe_open_pikerd, get_tractor_runtime_kwargs, ) diff --git a/piker/ui/cli.py b/piker/ui/cli.py index a72c2f5c..15b3e9f6 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -24,7 +24,7 @@ import tractor from ..cli import cli from .. import watchlists as wl -from .._daemon import maybe_spawn_brokerd +from ..service import maybe_spawn_brokerd _config_dir = click.get_app_dir('piker') @@ -181,9 +181,6 @@ def chart( 'debug_mode': pdb, 'loglevel': tractorloglevel, 'name': 'chart', - 'enable_modules': [ - 'piker.clearing._client' - ], 'registry_addr': config.get('registry_addr'), }, ) diff --git a/tests/conftest.py b/tests/conftest.py index 8218ec16..3a0afba2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,19 +1,18 @@ from contextlib import asynccontextmanager as acm from functools import partial +import logging import os -from typing import AsyncContextManager from pathlib import Path -from shutil import rmtree import pytest import tractor from piker import ( - # log, config, ) -from piker._daemon import ( +from piker.service import ( Services, ) +from piker.log import get_console_log from piker.clearing._client import open_ems @@ -70,8 +69,24 @@ def ci_env() -> bool: return _ci_env +@pytest.fixture() +def log( + request: pytest.FixtureRequest, + loglevel: str, +) -> logging.Logger: + ''' + Deliver a per-test-named ``piker.log`` instance. + + ''' + return get_console_log( + level=loglevel, + name=request.node.name, + ) + + @acm async def _open_test_pikerd( + tmpconfdir: str, reg_addr: tuple[str, int] | None = None, loglevel: str = 'warning', **kwargs, @@ -88,7 +103,7 @@ async def _open_test_pikerd( ''' import random - from piker._daemon import maybe_open_pikerd + from piker.service import maybe_open_pikerd if reg_addr is None: port = random.randint(6e3, 7e3) @@ -98,7 +113,17 @@ async def _open_test_pikerd( maybe_open_pikerd( registry_addr=reg_addr, loglevel=loglevel, + + tractor_runtime_overrides={ + 'piker_test_dir': tmpconfdir, + }, + + # tests may need to spawn containers dynamically + # or just in sequence per test, so we keep root. + drop_root_perms_for_ahab=False, + **kwargs, + ) as service_manager, ): # this proc/actor is the pikerd @@ -120,18 +145,40 @@ async def _open_test_pikerd( @pytest.fixture def open_test_pikerd( - request, + request: pytest.FixtureRequest, + tmp_path: Path, loglevel: str, ): + tmpconfdir: Path = tmp_path / '_testing' + tmpconfdir.mkdir() + tmpconfdir_str: str = str(tmpconfdir) + + # NOTE: on linux the tmp config dir is generally located at: + # /tmp/pytest-of-/pytest-/test_/ + # the default `pytest` config ensures that only the last 4 test + # suite run's dirs will be persisted, otherwise they are removed: + # https://docs.pytest.org/en/6.2.x/tmpdir.html#the-default-base-temporary-directory + print(f'CURRENT TEST CONF DIR: {tmpconfdir}') yield partial( _open_test_pikerd, + # pass in a unique temp dir for this test request + # so that we can have multiple tests running (maybe in parallel) + # bwitout clobbering each other's config state. + tmpconfdir=tmpconfdir_str, + # bind in level from fixture, which is itself set by # `--ll ` cli flag. loglevel=loglevel, ) + # NOTE: the `tmp_dir` fixture will wipe any files older then 3 test + # sessions by default: + # https://docs.pytest.org/en/6.2.x/tmpdir.html#the-default-base-temporary-directory + # BUT, if we wanted to always wipe conf dir and all contained files, + # rmtree(str(tmp_path)) + # TODO: teardown checks such as, # - no leaked subprocs or shm buffers # - all requested container service are torn down @@ -151,8 +198,9 @@ async def _open_test_pikerd_and_ems( fqsn, mode=mode, loglevel=loglevel, - ) as ems_services): - yield (services, ems_services) + ) as ems_services, + ): + yield (services, ems_services) @pytest.fixture @@ -168,20 +216,4 @@ def open_test_pikerd_and_ems( mode, loglevel, open_test_pikerd - ) - - -@pytest.fixture(scope='module') -def delete_testing_dir(): - ''' - This fixture removes the temp directory - used for storing all config/ledger/pp data - created during testing sessions. During test runs - this file can be found in .config/piker/_testing - - ''' - yield - app_dir = Path(config.get_app_dir('piker')).resolve() - if app_dir.is_dir(): - rmtree(str(app_dir)) - assert not app_dir.is_dir() + ) diff --git a/tests/test_databases.py b/tests/test_databases.py index 4eb444f3..554b0990 100644 --- a/tests/test_databases.py +++ b/tests/test_databases.py @@ -1,66 +1,124 @@ -import pytest -import trio - from typing import AsyncContextManager +import logging -from piker._daemon import Services -from piker.log import get_logger +import trio +from elasticsearch import ( + Elasticsearch, + ConnectionError, +) + +from piker.service import marketstore +from piker.service import elastic -from elasticsearch import Elasticsearch -from piker.data import marketstore def test_marketstore_startup_and_version( open_test_pikerd: AsyncContextManager, - loglevel, + loglevel: str, ): + ''' + Verify marketstore tsdb starts up and we can + connect with a client to do basic API reqs. ''' - Verify marketstore starts correctly - - ''' - log = get_logger(__name__) - async def main(): - # port = 5995 async with ( open_test_pikerd( loglevel=loglevel, tsdb=True - ) as (s, i, pikerd_portal, services), - marketstore.get_client() as client + ) as ( + _, # host + _, # port + pikerd_portal, + services, + ), ): + # TODO: we should probably make this connection poll + # loop part of the `get_client()` implementation no? - assert ( - len(await client.server_version()) == - len('3862e9973da36cfc6004b88172c08f09269aaf01') - ) + # XXX NOTE: we use a retry-connect loop because it seems + # that if we connect *too fast* to a booting container + # instance (i.e. if mkts's IPC machinery isn't up early + # enough) the client will hang on req-resp submissions. So, + # instead we actually reconnect the client entirely in + # a loop until we get a response. + for _ in range(3): + # NOTE: default sockaddr is embedded within + async with marketstore.get_client() as client: + + with trio.move_on_after(1) as cs: + syms = await client.list_symbols() + + if cs.cancelled_caught: + continue + + # should be an empty db (for now) since we spawn + # marketstore in a ephemeral test-harness dir. + assert not syms + print(f'RX syms resp: {syms}') + + assert ( + len(await client.server_version()) == + len('3862e9973da36cfc6004b88172c08f09269aaf01') + ) + print('VERSION CHECKED') + + break # get out of retry-connect loop trio.run(main) def test_elasticsearch_startup_and_version( open_test_pikerd: AsyncContextManager, - loglevel, + loglevel: str, + log: logging.Logger, ): ''' - Verify elasticsearch starts correctly + Verify elasticsearch starts correctly (like at some point before + infinity time).. ''' - - log = get_logger(__name__) - async def main(): port = 19200 - async with open_test_pikerd( - loglevel=loglevel, - es=True - ) as (s, i, pikerd_portal, services): + async with ( + open_test_pikerd( + loglevel=loglevel, + es=True + ) as ( + _, # host + _, # port + pikerd_portal, + services, + ), + ): + # TODO: much like the above connect loop for mkts, we should + # probably make this sync start part of the + # ``open_client()`` implementation? + for i in range(240): + with Elasticsearch( + hosts=[f'http://localhost:{port}'] + ) as es: + try: - es = Elasticsearch(hosts=[f'http://localhost:{port}']) - assert es.info()['version']['number'] == '7.17.4' + resp = es.info() + assert ( + resp['version']['number'] + == + elastic._config['version'] + ) + print( + "OMG ELASTIX FINALLY CUKCING CONNECTED!>!>!\n" + f'resp: {resp}' + ) + break + except ConnectionError: + log.exception( + f'RETRYING client connection for {i} time!' + ) + await trio.sleep(1) + continue trio.run(main) diff --git a/tests/test_paper.py b/tests/test_paper.py index 8da1cf12..53e03f47 100644 --- a/tests/test_paper.py +++ b/tests/test_paper.py @@ -17,7 +17,6 @@ from functools import partial from piker.log import get_logger from piker.clearing._messages import Order from piker.pp import ( - open_trade_ledger, open_pps, ) @@ -42,18 +41,19 @@ async def _async_main( price: int = 30000, executions: int = 1, size: float = 0.01, + # Assert options assert_entries: bool = False, assert_pps: bool = False, assert_zeroed_pps: bool = False, assert_msg: bool = False, + ) -> None: ''' Start piker, place a trade and assert data in pps stream, ledger and position table. ''' - oid: str = '' last_msg = {} @@ -136,7 +136,7 @@ def _assert( def _run_test_and_check(fn): - ''' + ''' Close position and assert empty position in pps ''' @@ -150,8 +150,7 @@ def _run_test_and_check(fn): def test_buy( - open_test_pikerd_and_ems: AsyncContextManager, - delete_testing_dir + open_test_pikerd_and_ems: AsyncContextManager, ): ''' Enter a trade and assert entries are made in pps and ledger files. @@ -177,8 +176,7 @@ def test_buy( def test_sell( - open_test_pikerd_and_ems: AsyncContextManager, - delete_testing_dir + open_test_pikerd_and_ems: AsyncContextManager, ): ''' Sell position and ensure pps are zeroed. @@ -201,13 +199,13 @@ def test_sell( ), ) + def test_multi_sell( - open_test_pikerd_and_ems: AsyncContextManager, - delete_testing_dir + open_test_pikerd_and_ems: AsyncContextManager, ): ''' - Make 5 market limit buy orders and - then sell 5 slots at the same price. + Make 5 market limit buy orders and + then sell 5 slots at the same price. Finally, assert cleared positions. ''' diff --git a/tests/test_services.py b/tests/test_services.py index 763b438e..29e613e3 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -9,8 +9,7 @@ import pytest import trio import tractor -from piker.log import get_logger -from piker._daemon import ( +from piker.service import ( find_service, Services, )