Merge pull request #483 from pikers/service_subpkg

`.service` subpkg
storage_middleware_layer
goodboy 2023-03-10 10:37:46 -05:00 committed by GitHub
commit eb51033b18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1530 additions and 1000 deletions

View File

@ -36,6 +36,7 @@ jobs:
testing: testing:
name: 'install + test-suite' name: 'install + test-suite'
timeout-minutes: 10
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers. # piker: trading gear for hackers.
# Copyright 2020-eternity Tyler Goodlet (in stewardship for piker0) # Copyright 2020-eternity Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -14,11 +14,11 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" '''
piker: trading gear for hackers. piker: trading gear for hackers.
""" '''
from ._daemon import open_piker_runtime from .service import open_piker_runtime
from .data.feed import open_feed from .data.feed import open_feed
__all__ = [ __all__ = [

View File

@ -1,758 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Structured, daemon tree service management.
"""
from __future__ import annotations
import os
from typing import (
Optional,
Callable,
Any,
ClassVar,
)
from contextlib import (
asynccontextmanager as acm,
)
from collections import defaultdict
import tractor
import trio
from trio_typing import TaskStatus
from .log import (
get_logger,
get_console_log,
)
from .brokers import get_brokermod
from pprint import pformat
from functools import partial
log = get_logger(__name__)
_root_dname = 'pikerd'
_default_registry_host: str = '127.0.0.1'
_default_registry_port: int = 6116
_default_reg_addr: tuple[str, int] = (
_default_registry_host,
_default_registry_port,
)
# NOTE: this value is set as an actor-global once the first endpoint
# who is capable, spawns a `pikerd` service tree.
_registry: Registry | None = None
class Registry:
addr: None | tuple[str, int] = None
# TODO: table of uids to sockaddrs
peers: dict[
tuple[str, str],
tuple[str, int],
] = {}
_tractor_kwargs: dict[str, Any] = {}
@acm
async def open_registry(
addr: None | tuple[str, int] = None,
ensure_exists: bool = True,
) -> tuple[str, int]:
global _tractor_kwargs
actor = tractor.current_actor()
uid = actor.uid
if (
Registry.addr is not None
and addr
):
raise RuntimeError(
f'`{uid}` registry addr already bound @ {_registry.sockaddr}'
)
was_set: bool = False
if (
not tractor.is_root_process()
and Registry.addr is None
):
Registry.addr = actor._arb_addr
if (
ensure_exists
and Registry.addr is None
):
raise RuntimeError(
f"`{uid}` registry should already exist bug doesn't?"
)
if (
Registry.addr is None
):
was_set = True
Registry.addr = addr or _default_reg_addr
_tractor_kwargs['arbiter_addr'] = Registry.addr
try:
yield Registry.addr
finally:
# XXX: always clear the global addr if we set it so that the
# next (set of) calls will apply whatever new one is passed
# in.
if was_set:
Registry.addr = None
def get_tractor_runtime_kwargs() -> dict[str, Any]:
'''
Deliver ``tractor`` related runtime variables in a `dict`.
'''
return _tractor_kwargs
_root_modules = [
__name__,
'piker.clearing._ems',
'piker.clearing._client',
'piker.data._sampling',
]
# TODO: factor this into a ``tractor.highlevel`` extension
# pack for the library.
class Services:
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
tractor.Portal,
trio.Event,
]
] = {}
locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task(
self,
name: str,
portal: tractor.Portal,
target: Callable,
**kwargs,
) -> (trio.CancelScope, tractor.Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
async def open_context_in_task(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target,
**kwargs,
) as (ctx, first):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((cs, complete, first))
log.info(
f'`pikerd` service {name} started with value {first}'
)
try:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res = await ctx.result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
finally:
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal, complete)
return cs, first
@classmethod
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
assert name not in self.service_tasks, \
f'Serice task for {name} not terminated?'
@acm
async def open_piker_runtime(
name: str,
enable_modules: list[str] = [],
loglevel: Optional[str] = None,
# XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
# TODO: once we have `rsyscall` support we will read a config
# and spawn the service tree distributed per that.
start_method: str = 'trio',
**tractor_kwargs,
) -> tuple[
tractor.Actor,
tuple[str, int],
]:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
Can be called from a subactor or any program that needs to start
a root actor.
'''
try:
# check for existing runtime
actor = tractor.current_actor().uid
except tractor._exceptions.NoRuntime:
registry_addr = registry_addr or _default_reg_addr
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=enable_modules,
**tractor_kwargs,
) as _,
open_registry(registry_addr, ensure_exists=False) as addr,
):
yield (
tractor.current_actor(),
addr,
)
else:
async with open_registry(registry_addr) as addr:
yield (
actor,
addr,
)
@acm
async def open_pikerd(
loglevel: str | None = None,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
# db init flags
tsdb: bool = False,
es: bool = False,
) -> Services:
'''
Start a root piker daemon who's lifetime extends indefinitely until
cancelled.
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
'''
async with (
open_piker_runtime(
name=_root_dname,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
loglevel=loglevel,
debug_mode=debug_mode,
registry_addr=registry_addr,
) as (root_actor, reg_addr),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
):
assert root_actor.accept_addr == reg_addr
if tsdb:
from piker.data._ahab import start_ahab
from piker.data.marketstore import start_marketstore
log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
start_ahab,
'marketstored',
start_marketstore,
)
log.info(
f'`marketstored` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
if es:
from piker.data._ahab import start_ahab
from piker.data.elastic import start_elasticsearch
log.info('Spawning `elasticsearch` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
partial(
start_ahab,
'elasticsearch',
start_elasticsearch,
start_timeout=240.0 # high cause ci
)
)
log.info(
f'`elasticsearch` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
@acm
async def maybe_open_runtime(
loglevel: Optional[str] = None,
**kwargs,
) -> None:
'''
Start the ``tractor`` runtime (a root actor) if none exists.
'''
name = kwargs.pop('name')
if not tractor.current_actor(err_on_no_runtime=False):
async with open_piker_runtime(
name,
loglevel=loglevel,
**kwargs,
) as (_, addr):
yield addr,
else:
async with open_registry() as addr:
yield addr
@acm
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
tsdb: bool = False,
es: bool = False,
**kwargs,
) -> tractor._portal.Portal | ClassVar[Services]:
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
though).
'''
if loglevel:
get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail
query_name = kwargs.pop('name', f'piker_query_{os.getpid()}')
# TODO: if we need to make the query part faster we could not init
# an actor runtime and instead just hit the socket?
# from tractor._ipc import _connect_chan, Channel
# async with _connect_chan(host, port) as chan:
# async with open_portal(chan) as arb_portal:
# yield arb_portal
async with (
open_piker_runtime(
name=query_name,
registry_addr=registry_addr,
loglevel=loglevel,
**kwargs,
) as _,
tractor.find_actor(
_root_dname,
arbiter_sockaddr=registry_addr,
) as portal
):
# connect to any existing daemon presuming
# its registry socket was selected.
if (
portal is not None
):
yield portal
return
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
tsdb=tsdb,
es=es,
) as service_manager:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
# we return no portal to self.
assert service_manager
yield service_manager
# `brokerd` enabled modules
# NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care!
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
]
@acm
async def find_service(
service_name: str,
) -> tractor.Portal | None:
async with open_registry() as reg_addr:
log.info(f'Scanning for service `{service_name}`')
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
arbiter_sockaddr=reg_addr,
) as maybe_portal:
yield maybe_portal
async def check_for_service(
service_name: str,
) -> None | tuple[str, int]:
'''
Service daemon "liveness" predicate.
'''
async with open_registry(ensure_exists=False) as reg_addr:
async with tractor.query_actor(
service_name,
arbiter_sockaddr=reg_addr,
) as sockaddr:
return sockaddr
@acm
async def maybe_spawn_daemon(
service_name: str,
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
singleton: bool = False,
**kwargs,
) -> tractor.Portal:
'''
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
If this function is called from a non-pikerd actor, the
spawned service will persist as long as pikerd does or
it is requested to be cancelled.
This can be seen as a service starting api for remote-actor
clients.
'''
if loglevel:
get_console_log(loglevel)
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Services.locks[service_name]
await lock.acquire()
async with find_service(service_name) as portal:
if portal is not None:
lock.release()
yield portal
return
log.warning(f"Couldn't find any existing {service_name}")
# TODO: really shouldn't the actor spawning be part of the service
# starting method `Services.start_service()` ?
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**kwargs,
) as pikerd_portal:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
started = await service_task_target(**spawn_args)
else:
# tell the remote `pikerd` to start the target,
# the target can't return a non-serializable value
# since it is expected that service startingn is
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
started = await pikerd_portal.run(
service_task_target,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
async def spawn_brokerd(
brokername: str,
loglevel: Optional[str] = None,
**tractor_kwargs,
) -> bool:
log.info(f'Spawning {brokername} broker daemon')
brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs)
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
modpath = brokermod.__name__
broker_enable = [modpath]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + broker_enable,
loglevel=loglevel,
debug_mode=Services.debug_mode,
**tractor_kwargs
)
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
await Services.start_service_task(
dname,
portal,
_setup_persistent_brokerd,
brokername=brokername,
)
return True
@acm
async def maybe_spawn_brokerd(
brokername: str,
loglevel: Optional[str] = None,
**kwargs,
) -> tractor.Portal:
'''
Helper to spawn a brokerd service *from* a client
who wishes to use the sub-actor-daemon.
'''
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as portal:
yield portal
async def spawn_emsd(
loglevel: Optional[str] = None,
**extra_tractor_kwargs
) -> bool:
"""
Start the clearing engine under ``pikerd``.
"""
log.info('Spawning emsd')
portal = await Services.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd
await Services.start_service_task(
'emsd',
portal,
_setup_persistent_emsd,
)
return True
@acm
async def maybe_open_emsd(
brokername: str,
loglevel: Optional[str] = None,
**kwargs,
) -> tractor._portal.Portal: # noqa
async with maybe_spawn_daemon(
'emsd',
service_task_target=spawn_emsd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as portal:
yield portal

View File

@ -29,8 +29,15 @@ import tractor
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger from ..log import get_console_log, colorize_json, get_logger
from .._daemon import maybe_spawn_brokerd, maybe_open_pikerd from ..service import (
from ..brokers import core, get_brokermod, data maybe_spawn_brokerd,
maybe_open_pikerd,
)
from ..brokers import (
core,
get_brokermod,
data,
)
log = get_logger('cli') log = get_logger('cli')
DEFAULT_BROKER = 'questrade' DEFAULT_BROKER = 'questrade'
@ -60,6 +67,7 @@ def get_method(client, meth_name: str):
print_ok('found!.') print_ok('found!.')
return method return method
async def run_method(client, meth_name: str, **kwargs): async def run_method(client, meth_name: str, **kwargs):
method = get_method(client, meth_name) method = get_method(client, meth_name)
print('running...', end='', flush=True) print('running...', end='', flush=True)
@ -67,19 +75,20 @@ async def run_method(client, meth_name: str, **kwargs):
print_ok(f'done! result: {type(result)}') print_ok(f'done! result: {type(result)}')
return result return result
async def run_test(broker_name: str): async def run_test(broker_name: str):
brokermod = get_brokermod(broker_name) brokermod = get_brokermod(broker_name)
total = 0 total = 0
passed = 0 passed = 0
failed = 0 failed = 0
print(f'getting client...', end='', flush=True) print('getting client...', end='', flush=True)
if not hasattr(brokermod, 'get_client'): if not hasattr(brokermod, 'get_client'):
print_error('fail! no \'get_client\' context manager found.') print_error('fail! no \'get_client\' context manager found.')
return return
async with brokermod.get_client(is_brokercheck=True) as client: async with brokermod.get_client(is_brokercheck=True) as client:
print_ok(f'done! inside client context.') print_ok('done! inside client context.')
# check for methods present on brokermod # check for methods present on brokermod
method_list = [ method_list = [
@ -130,7 +139,6 @@ async def run_test(broker_name: str):
total += 1 total += 1
# check for methods present con brokermod.Client and their # check for methods present con brokermod.Client and their
# results # results
@ -180,7 +188,6 @@ def brokercheck(config, broker):
trio.run(run_test, broker) trio.run(run_test, broker)
@cli.command() @cli.command()
@click.option('--keys', '-k', multiple=True, @click.option('--keys', '-k', multiple=True,
help='Return results only for these keys') help='Return results only for these keys')
@ -335,8 +342,6 @@ def contracts(ctx, loglevel, broker, symbol, ids):
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
get_console_log(loglevel) get_console_log(loglevel)
contracts = trio.run(partial(core.contracts, brokermod, symbol)) contracts = trio.run(partial(core.contracts, brokermod, symbol))
if not ids: if not ids:
# just print out expiry dates which can be used with # just print out expiry dates which can be used with

View File

@ -28,7 +28,7 @@ import trio
from ..log import get_logger from ..log import get_logger
from . import get_brokermod from . import get_brokermod
from .._daemon import maybe_spawn_brokerd from ..service import maybe_spawn_brokerd
from .._cacheables import open_cached_client from .._cacheables import open_cached_client

View File

@ -177,8 +177,11 @@ def i3ipc_xdotool_manual_click_hack() -> None:
) )
# re-activate and focus original window # re-activate and focus original window
try:
subprocess.call([ subprocess.call([
'xdotool', 'xdotool',
'windowactivate', '--sync', str(orig_win_id), 'windowactivate', '--sync', str(orig_win_id),
'click', '--window', str(orig_win_id), '1', 'click', '--window', str(orig_win_id), '1',
]) ])
except subprocess.TimeoutExpired:
log.exception(f'xdotool timed out?')

View File

@ -29,8 +29,11 @@ from tractor.trionics import broadcast_receiver
from ..log import get_logger from ..log import get_logger
from ..data.types import Struct from ..data.types import Struct
from .._daemon import maybe_open_emsd from ..service import maybe_open_emsd
from ._messages import Order, Cancel from ._messages import (
Order,
Cancel,
)
from ..brokers import get_brokermod from ..brokers import get_brokermod
if TYPE_CHECKING: if TYPE_CHECKING:

View File

@ -19,16 +19,18 @@ CLI commons.
''' '''
import os import os
from pprint import pformat
from functools import partial
import click import click
import trio import trio
import tractor import tractor
from ..log import get_console_log, get_logger, colorize_json from ..log import (
get_console_log,
get_logger,
colorize_json,
)
from ..brokers import get_brokermod from ..brokers import get_brokermod
from .._daemon import ( from ..service import (
_default_registry_host, _default_registry_host,
_default_registry_port, _default_registry_port,
) )
@ -68,7 +70,7 @@ def pikerd(
''' '''
from .._daemon import open_pikerd from ..service import open_pikerd
log = get_console_log(loglevel) log = get_console_log(loglevel)
if pdb: if pdb:
@ -171,7 +173,7 @@ def cli(
@click.pass_obj @click.pass_obj
def services(config, tl, ports): def services(config, tl, ports):
from .._daemon import ( from ..service import (
open_piker_runtime, open_piker_runtime,
_default_registry_port, _default_registry_port,
_default_registry_host, _default_registry_host,
@ -204,8 +206,8 @@ def services(config, tl, ports):
def _load_clis() -> None: def _load_clis() -> None:
from ..data import marketstore # noqa from ..service import marketstore # noqa
from ..data import elastic from ..service import elastic
from ..data import cli # noqa from ..data import cli # noqa
from ..brokers import cli # noqa from ..brokers import cli # noqa
from ..ui import cli # noqa from ..ui import cli # noqa

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Broker configuration mgmt. Platform configuration (files) mgmt.
""" """
import platform import platform
@ -26,17 +26,25 @@ from os.path import dirname
import shutil import shutil
from typing import Optional from typing import Optional
from pathlib import Path from pathlib import Path
from bidict import bidict from bidict import bidict
import toml import toml
from piker.testing import TEST_CONFIG_DIR_PATH
from .log import get_logger from .log import get_logger
log = get_logger('broker-config') log = get_logger('broker-config')
# taken from ``click`` since apparently they have some # XXX NOTE: taken from ``click`` since apparently they have some
# super weirdness with sigint and sudo..no clue # super weirdness with sigint and sudo..no clue
def get_app_dir(app_name, roaming=True, force_posix=False): # we're probably going to slowly just modify it to our own version over
# time..
def get_app_dir(
app_name: str,
roaming: bool = True,
force_posix: bool = False,
) -> str:
r"""Returns the config folder for the application. The default behavior r"""Returns the config folder for the application. The default behavior
is to return whatever is most appropriate for the operating system. is to return whatever is most appropriate for the operating system.
@ -75,14 +83,30 @@ def get_app_dir(app_name, roaming=True, force_posix=False):
def _posixify(name): def _posixify(name):
return "-".join(name.split()).lower() return "-".join(name.split()).lower()
# TODO: This is a hacky way to a) determine we're testing # NOTE: for testing with `pytest` we leverage the `tmp_dir`
# and b) creating a test dir. We should aim to set a variable # fixture to generate (and clean up) a test-request-specific
# within the tractor runtimes and store testing config data # directory for isolated configuration files such that,
# outside of the users filesystem # - multiple tests can run (possibly in parallel) without data races
# on the config state,
# - we don't need to ever worry about leaking configs into the
# system thus avoiding needing to manage config cleaup fixtures or
# other bothers (since obviously `tmp_dir` cleans up after itself).
#
# In order to "pass down" the test dir path to all (sub-)actors in
# the actor tree we preload the root actor's runtime vars state (an
# internal mechanism for inheriting state down an actor tree in
# `tractor`) with the testing dir and check for it whenever we
# detect `pytest` is being used (which it isn't under normal
# operation).
if "pytest" in sys.modules: if "pytest" in sys.modules:
app_name = os.path.join(app_name, TEST_CONFIG_DIR_PATH) import tractor
actor = tractor.current_actor(err_on_no_runtime=False)
if actor: # runtime is up
rvs = tractor._state._runtime_vars
testdirpath = Path(rvs['piker_vars']['piker_test_dir'])
assert testdirpath.exists(), 'piker test harness might be borked!?'
app_name = str(testdirpath)
# if WIN:
if platform.system() == 'Windows': if platform.system() == 'Windows':
key = "APPDATA" if roaming else "LOCALAPPDATA" key = "APPDATA" if roaming else "LOCALAPPDATA"
folder = os.environ.get(key) folder = os.environ.get(key)

View File

@ -42,7 +42,7 @@ from ..log import (
get_logger, get_logger,
get_console_log, get_console_log,
) )
from .._daemon import maybe_spawn_daemon from ..service import maybe_spawn_daemon
if TYPE_CHECKING: if TYPE_CHECKING:
from ._sharedmem import ( from ._sharedmem import (
@ -68,8 +68,8 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step sample real-time quote feeds, see time-step-sample (real-time) quote feeds, see
``._daemon.maybe_open_samplerd()`` and the below ``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``. ``register_with_sampler()``.
''' '''
@ -379,7 +379,7 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting. update and increment count write and stream broadcasting.
''' '''
from piker._daemon import Services from piker.service import Services
dname = 'samplerd' dname = 'samplerd'
log.info(f'Spawning `{dname}`') log.info(f'Spawning `{dname}`')

View File

@ -18,31 +18,22 @@
marketstore cli. marketstore cli.
""" """
from functools import partial
from pprint import pformat
from anyio_marketstore import open_marketstore_client
import trio import trio
import tractor import tractor
import click import click
import numpy as np
from .marketstore import ( from ..service.marketstore import (
get_client, # get_client,
# stream_quotes, # stream_quotes,
ingest_quote_stream, ingest_quote_stream,
# _url, # _url,
_tick_tbk_ids, # _tick_tbk_ids,
mk_tbk, # mk_tbk,
) )
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..log import get_logger from ..log import (
from ._sharedmem import ( get_logger,
maybe_open_shm_array,
)
from ._source import (
base_iohlc_dtype,
) )
@ -113,15 +104,11 @@ def ms_stream(
@cli.command() @cli.command()
@click.option( @click.option(
'--tl', '--tsdb_host',
is_flag=True,
help='Enable tractor logging')
@click.option(
'--host',
default='localhost' default='localhost'
) )
@click.option( @click.option(
'--port', '--tsdb_port',
default=5993 default=5993
) )
@click.argument('symbols', nargs=-1) @click.argument('symbols', nargs=-1)
@ -137,18 +124,93 @@ def storesh(
Start an IPython shell ready to query the local marketstore db. Start an IPython shell ready to query the local marketstore db.
''' '''
from piker.data.marketstore import tsdb_history_update from piker.data.marketstore import open_tsdb_client
from piker._daemon import open_piker_runtime from piker.service import open_piker_runtime
async def main(): async def main():
nonlocal symbols nonlocal symbols
async with open_piker_runtime( async with open_piker_runtime(
'storesh', 'storesh',
enable_modules=['piker.data._ahab'], enable_modules=['piker.service._ahab'],
): ):
symbol = symbols[0] symbol = symbols[0]
await tsdb_history_update(symbol)
async with open_tsdb_client(symbol):
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()
trio.run(main)
@cli.command()
@click.option(
'--host',
default='localhost'
)
@click.option(
'--port',
default=5993
)
@click.option(
'--delete',
'-d',
is_flag=True,
help='Delete history (1 Min) for symbol(s)',
)
@click.argument('symbols', nargs=-1)
@click.pass_obj
def storage(
config,
host,
port,
symbols: list[str],
delete: bool,
):
'''
Start an IPython shell ready to query the local marketstore db.
'''
from piker.service.marketstore import open_tsdb_client
from piker.service import open_piker_runtime
async def main():
nonlocal symbols
async with open_piker_runtime(
'tsdb_storage',
enable_modules=['piker.service._ahab'],
):
symbol = symbols[0]
async with open_tsdb_client(symbol) as storage:
if delete:
for fqsn in symbols:
syms = await storage.client.list_symbols()
resp60s = await storage.delete_ts(fqsn, 60)
msgish = resp60s.ListFields()[0][1]
if 'error' in str(msgish):
# TODO: MEGA LOL, apparently the symbols don't
# flush out until you refresh something or other
# (maybe the WALFILE)... #lelandorlulzone, classic
# alpaca(Rtm) design here ..
# well, if we ever can make this work we
# probably want to dogsplain the real reason
# for the delete errurz..llululu
if fqsn not in syms:
log.error(f'Pair {fqsn} dne in DB')
log.error(f'Deletion error: {fqsn}\n{msgish}')
resp1s = await storage.delete_ts(fqsn, 1)
msgish = resp1s.ListFields()[0][1]
if 'error' in str(msgish):
log.error(f'Deletion error: {fqsn}\n{msgish}')
trio.run(main) trio.run(main)

View File

@ -58,7 +58,7 @@ from ..log import (
get_logger, get_logger,
get_console_log, get_console_log,
) )
from .._daemon import ( from ..service import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
check_for_service, check_for_service,
) )
@ -86,7 +86,7 @@ from ..brokers._util import (
) )
if TYPE_CHECKING: if TYPE_CHECKING:
from .marketstore import Storage from ..service.marketstore import Storage
log = get_logger(__name__) log = get_logger(__name__)
@ -865,7 +865,7 @@ async def manage_history(
): ):
log.info('Found existing `marketstored`') log.info('Found existing `marketstored`')
from . import marketstore from ..service import marketstore
async with ( async with (
marketstore.open_storage_client(fqsn)as storage, marketstore.open_storage_client(fqsn)as storage,
): ):

View File

@ -21,7 +21,11 @@ import logging
import json import json
import tractor import tractor
from pygments import highlight, lexers, formatters from pygments import (
highlight,
lexers,
formatters,
)
# Makes it so we only see the full module name when using ``__name__`` # Makes it so we only see the full module name when using ``__name__``
# without the extra "piker." prefix. # without the extra "piker." prefix.
@ -32,26 +36,48 @@ def get_logger(
name: str = None, name: str = None,
) -> logging.Logger: ) -> logging.Logger:
'''Return the package log or a sub-log for `name` if provided. '''
Return the package log or a sub-log for `name` if provided.
''' '''
return tractor.log.get_logger(name=name, _root_name=_proj_name) return tractor.log.get_logger(name=name, _root_name=_proj_name)
def get_console_log(level: str = None, name: str = None) -> logging.Logger: def get_console_log(
'''Get the package logger and enable a handler which writes to stderr. level: str | None = None,
name: str | None = None,
) -> logging.Logger:
'''
Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use ``DictConfig``. You do it... Yeah yeah, i know we can use ``DictConfig``. You do it...
''' '''
return tractor.log.get_console_log( return tractor.log.get_console_log(
level, name=name, _root_name=_proj_name) # our root logger level,
name=name,
_root_name=_proj_name,
) # our root logger
def colorize_json(data, style='algol_nu'): def colorize_json(
"""Colorize json output using ``pygments``. data: dict,
""" style='algol_nu',
formatted_json = json.dumps(data, sort_keys=True, indent=4) ):
'''
Colorize json output using ``pygments``.
'''
formatted_json = json.dumps(
data,
sort_keys=True,
indent=4,
)
return highlight( return highlight(
formatted_json, lexers.JsonLexer(), formatted_json,
lexers.JsonLexer(),
# likeable styles: algol_nu, tango, monokai # likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style) formatters.TerminalTrueColorFormatter(style=style)
) )

View File

@ -0,0 +1,60 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Actor-runtime service orchestration machinery.
"""
from __future__ import annotations
from ._mngr import Services
from ._registry import ( # noqa
_tractor_kwargs,
_default_reg_addr,
_default_registry_host,
_default_registry_port,
open_registry,
find_service,
check_for_service,
)
from ._daemon import ( # noqa
maybe_spawn_daemon,
spawn_brokerd,
maybe_spawn_brokerd,
spawn_emsd,
maybe_open_emsd,
)
from ._actor_runtime import (
open_piker_runtime,
maybe_open_pikerd,
open_pikerd,
get_tractor_runtime_kwargs,
)
__all__ = [
'check_for_service',
'Services',
'maybe_spawn_daemon',
'spawn_brokerd',
'maybe_spawn_brokerd',
'spawn_emsd',
'maybe_open_emsd',
'open_piker_runtime',
'maybe_open_pikerd',
'open_pikerd',
'get_tractor_runtime_kwargs',
]

View File

@ -0,0 +1,347 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
``tractor`` wrapping + default config to bootstrap the `pikerd`.
"""
from __future__ import annotations
from pprint import pformat
from functools import partial
import os
from typing import (
Optional,
Any,
ClassVar,
)
from contextlib import (
asynccontextmanager as acm,
)
import tractor
import trio
from ..log import (
get_logger,
get_console_log,
)
from ._mngr import (
Services,
)
from ._registry import ( # noqa
_tractor_kwargs,
_default_reg_addr,
open_registry,
)
log = get_logger(__name__)
def get_tractor_runtime_kwargs() -> dict[str, Any]:
'''
Deliver ``tractor`` related runtime variables in a `dict`.
'''
return _tractor_kwargs
@acm
async def open_piker_runtime(
name: str,
enable_modules: list[str] = [],
loglevel: Optional[str] = None,
# XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
# TODO: once we have `rsyscall` support we will read a config
# and spawn the service tree distributed per that.
start_method: str = 'trio',
tractor_runtime_overrides: dict | None = None,
**tractor_kwargs,
) -> tuple[
tractor.Actor,
tuple[str, int],
]:
'''
Start a piker actor who's runtime will automatically sync with
existing piker actors on the local link based on configuration.
Can be called from a subactor or any program that needs to start
a root actor.
'''
try:
# check for existing runtime
actor = tractor.current_actor().uid
except tractor._exceptions.NoRuntime:
tractor._state._runtime_vars[
'piker_vars'] = tractor_runtime_overrides
registry_addr = registry_addr or _default_reg_addr
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
arbiter_addr=registry_addr,
name=name,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=enable_modules,
**tractor_kwargs,
) as _,
open_registry(registry_addr, ensure_exists=False) as addr,
):
yield (
tractor.current_actor(),
addr,
)
else:
async with open_registry(registry_addr) as addr:
yield (
actor,
addr,
)
_root_dname = 'pikerd'
_root_modules = [
__name__,
'piker.service._daemon',
'piker.clearing._ems',
'piker.clearing._client',
'piker.data._sampling',
]
@acm
async def open_pikerd(
loglevel: str | None = None,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,
# db init flags
tsdb: bool = False,
es: bool = False,
drop_root_perms_for_ahab: bool = True,
**kwargs,
) -> Services:
'''
Start a root piker daemon with an indefinite lifetime.
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
'''
async with (
open_piker_runtime(
name=_root_dname,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
loglevel=loglevel,
debug_mode=debug_mode,
registry_addr=registry_addr,
**kwargs,
) as (root_actor, reg_addr),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
):
if root_actor.accept_addr != reg_addr:
raise RuntimeError(f'Daemon failed to bind on {reg_addr}!?')
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
if tsdb:
from ._ahab import start_ahab
from .marketstore import start_marketstore
log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
partial(
start_ahab,
'marketstored',
start_marketstore,
loglevel=loglevel,
drop_root_perms=drop_root_perms_for_ahab,
)
)
log.info(
f'`marketstored` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
if es:
from ._ahab import start_ahab
from .elastic import start_elasticsearch
log.info('Spawning `elasticsearch` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
partial(
start_ahab,
'elasticsearch',
start_elasticsearch,
loglevel=loglevel,
drop_root_perms=drop_root_perms_for_ahab,
)
)
log.info(
f'`elasticsearch` up!\n'
f'pid: {pid}\n'
f'container id: {cid[:12]}\n'
f'config: {pformat(config)}'
)
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
# TODO: do we even need this?
# @acm
# async def maybe_open_runtime(
# loglevel: Optional[str] = None,
# **kwargs,
# ) -> None:
# '''
# Start the ``tractor`` runtime (a root actor) if none exists.
# '''
# name = kwargs.pop('name')
# if not tractor.current_actor(err_on_no_runtime=False):
# async with open_piker_runtime(
# name,
# loglevel=loglevel,
# **kwargs,
# ) as (_, addr):
# yield addr,
# else:
# async with open_registry() as addr:
# yield addr
@acm
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
tsdb: bool = False,
es: bool = False,
drop_root_perms_for_ahab: bool = True,
**kwargs,
) -> tractor._portal.Portal | ClassVar[Services]:
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
though).
'''
if loglevel:
get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail
query_name = kwargs.pop(
'name',
f'piker_query_{os.getpid()}',
)
# TODO: if we need to make the query part faster we could not init
# an actor runtime and instead just hit the socket?
# from tractor._ipc import _connect_chan, Channel
# async with _connect_chan(host, port) as chan:
# async with open_portal(chan) as arb_portal:
# yield arb_portal
async with (
open_piker_runtime(
name=query_name,
registry_addr=registry_addr,
loglevel=loglevel,
**kwargs,
) as _,
tractor.find_actor(
_root_dname,
arbiter_sockaddr=registry_addr,
) as portal
):
# connect to any existing daemon presuming
# its registry socket was selected.
if (
portal is not None
):
yield portal
return
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
loglevel=loglevel,
registry_addr=registry_addr,
# ahabd (docker super) specific controls
tsdb=tsdb,
es=es,
drop_root_perms_for_ahab=drop_root_perms_for_ahab,
# passthrough to ``tractor`` init
**kwargs,
) as service_manager:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
# we return no portal to self.
assert service_manager
yield service_manager

View File

@ -15,9 +15,12 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Supervisor for docker with included specific-image service helpers. Supervisor for ``docker`` with included async and SC wrapping
to ensure a cancellable container lifetime system.
''' '''
from collections import ChainMap
from functools import partial
import os import os
import time import time
from typing import ( from typing import (
@ -45,7 +48,10 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ..log import get_logger, get_console_log from ..log import (
get_logger,
get_console_log,
)
from .. import config from .. import config
log = get_logger(__name__) log = get_logger(__name__)
@ -124,10 +130,19 @@ class Container:
async def process_logs_until( async def process_logs_until(
self, self,
log_msg_key: str,
# this is a predicate func for matching log msgs emitted by the # this is a predicate func for matching log msgs emitted by the
# underlying containerized app # underlying containerized app
patt_matcher: Callable[[str], bool], patt_matcher: Callable[[str], bool],
bp_on_msg: bool = False,
# XXX WARNING XXX: do not touch this sleep value unless
# you know what you are doing! the value is critical to
# making sure the caller code inside the startup context
# does not timeout BEFORE we receive a match on the
# ``patt_matcher()`` predicate above.
checkpoint_period: float = 0.001,
) -> bool: ) -> bool:
''' '''
Attempt to capture container log messages and relay through our Attempt to capture container log messages and relay through our
@ -137,12 +152,14 @@ class Container:
seen_so_far = self.seen_so_far seen_so_far = self.seen_so_far
while True: while True:
logs = self.cntr.logs()
try: try:
logs = self.cntr.logs() logs = self.cntr.logs()
except ( except (
docker.errors.NotFound, docker.errors.NotFound,
docker.errors.APIError docker.errors.APIError
): ):
log.exception('Failed to parse logs?')
return False return False
entries = logs.decode().split('\n') entries = logs.decode().split('\n')
@ -155,25 +172,23 @@ class Container:
entry = entry.strip() entry = entry.strip()
try: try:
record = json.loads(entry) record = json.loads(entry)
msg = record[log_msg_key]
if 'msg' in record:
msg = record['msg']
elif 'message' in record:
msg = record['message']
else:
raise KeyError(f'Unexpected log format\n{record}')
level = record['level'] level = record['level']
except json.JSONDecodeError: except json.JSONDecodeError:
msg = entry msg = entry
level = 'error' level = 'error'
if msg and entry not in seen_so_far: # TODO: do we need a more general mechanism
seen_so_far.add(entry) # for these kinda of "log record entries"?
if bp_on_msg: # if 'Error' in entry:
await tractor.breakpoint() # raise RuntimeError(entry)
if (
msg
and entry not in seen_so_far
):
seen_so_far.add(entry)
getattr(log, level.lower(), log.error)(f'{msg}') getattr(log, level.lower(), log.error)(f'{msg}')
if level == 'fatal': if level == 'fatal':
@ -183,10 +198,15 @@ class Container:
return True return True
# do a checkpoint so we don't block if cancelled B) # do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.1) await trio.sleep(checkpoint_period)
return False return False
@property
def cuid(self) -> str:
fqcn: str = self.cntr.attrs['Config']['Image']
return f'{fqcn}[{self.cntr.short_id}]'
def try_signal( def try_signal(
self, self,
signal: str = 'SIGINT', signal: str = 'SIGINT',
@ -222,17 +242,23 @@ class Container:
async def cancel( async def cancel(
self, self,
stop_msg: str, log_msg_key: str,
stop_predicate: Callable[[str], bool],
hard_kill: bool = False, hard_kill: bool = False,
) -> None: ) -> None:
'''
Attempt to cancel this container gracefully, fail over to
a hard kill on timeout.
'''
cid = self.cntr.id cid = self.cntr.id
# first try a graceful cancel # first try a graceful cancel
log.cancel( log.cancel(
f'SIGINT cancelling container: {cid}\n' f'SIGINT cancelling container: {self.cuid}\n'
f'waiting on stop msg: "{stop_msg}"' 'waiting on stop predicate...'
) )
self.try_signal('SIGINT') self.try_signal('SIGINT')
@ -243,7 +269,10 @@ class Container:
log.cancel('polling for CNTR logs...') log.cancel('polling for CNTR logs...')
try: try:
await self.process_logs_until(stop_msg) await self.process_logs_until(
log_msg_key,
stop_predicate,
)
except ApplicationLogError: except ApplicationLogError:
hard_kill = True hard_kill = True
else: else:
@ -301,12 +330,16 @@ class Container:
async def open_ahabd( async def open_ahabd(
ctx: tractor.Context, ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type endpoint: str, # ns-pointer str-msg-type
start_timeout: float = 1.0, loglevel: str | None = 'cancel',
**kwargs, **kwargs,
) -> None: ) -> None:
get_console_log('info', name=__name__)
log = get_console_log(
loglevel,
name=__name__,
)
async with open_docker() as client: async with open_docker() as client:
@ -317,42 +350,110 @@ async def open_ahabd(
( (
dcntr, dcntr,
cntr_config, cntr_config,
start_lambda, start_pred,
stop_lambda, stop_pred,
) = ep_func(client) ) = ep_func(client)
cntr = Container(dcntr) cntr = Container(dcntr)
with trio.move_on_after(start_timeout): conf: ChainMap[str, Any] = ChainMap(
found = await cntr.process_logs_until(start_lambda)
if not found and dcntr not in client.containers.list(): # container specific
for entry in cntr.seen_so_far: cntr_config,
log.info(entry)
raise RuntimeError( # defaults
f'Failed to start {dcntr.id} check logs deats' {
# startup time limit which is the max the supervisor
# will wait for the container to be registered in
# ``client.containers.list()``
'startup_timeout': 1.0,
# how fast to poll for the starup predicate by sleeping
# this amount incrementally thus yielding to the
# ``trio`` scheduler on during sync polling execution.
'startup_query_period': 0.001,
# str-key value expected to contain log message body-contents
# when read using:
# ``json.loads(entry for entry in DockerContainer.logs())``
'log_msg_key': 'msg',
# startup sync func, like `Nursery.started()`
'started_afunc': None,
},
) )
try:
with trio.move_on_after(conf['startup_timeout']) as cs:
async with trio.open_nursery() as tn:
tn.start_soon(
partial(
cntr.process_logs_until,
log_msg_key=conf['log_msg_key'],
patt_matcher=start_pred,
checkpoint_period=conf['startup_query_period'],
)
)
# optional blocking routine
started = conf['started_afunc']
if started:
await started()
# poll for container startup or timeout
while not cs.cancel_called:
if dcntr in client.containers.list():
break
await trio.sleep(conf['startup_query_period'])
# sync with remote caller actor-task but allow log
# processing to continue running in bg.
await ctx.started(( await ctx.started((
cntr.cntr.id, cntr.cntr.id,
os.getpid(), os.getpid(),
cntr_config, cntr_config,
)) ))
try: # XXX: if we timeout on finding the "startup msg" we
# expect then we want to FOR SURE raise an error
# upwards!
if cs.cancelled_caught:
# if dcntr not in client.containers.list():
for entry in cntr.seen_so_far:
log.info(entry)
raise DockerNotStarted(
f'Failed to start container: {cntr.cuid}\n'
f'due to timeout={conf["startup_timeout"]}s\n\n'
"check ur container's logs!"
)
# TODO: we might eventually want a proxy-style msg-prot here # TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing # to allow remote control of containers without needing
# callers to have root perms? # callers to have root perms?
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
await cntr.cancel(stop_lambda) # TODO: ensure loglevel can be set and teardown logs are
# reported if possible on error or cancel..
# XXX WARNING: currently shielding here can result in hangs
# on ctl-c from user.. ideally we can avoid a cancel getting
# consumed and not propagating whilst still doing teardown
# logging..
with trio.CancelScope(shield=True):
await cntr.cancel(
log_msg_key=conf['log_msg_key'],
stop_predicate=stop_pred,
)
async def start_ahab( async def start_ahab(
service_name: str, service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer], endpoint: Callable[docker.DockerClient, DockerContainer],
start_timeout: float = 1.0, loglevel: str | None = 'cancel',
drop_root_perms: bool = True,
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.Event, trio.Event,
@ -373,13 +474,12 @@ async def start_ahab(
''' '''
cn_ready = trio.Event() cn_ready = trio.Event()
try: try:
async with tractor.open_nursery( async with tractor.open_nursery() as an:
loglevel='runtime',
) as tn:
portal = await tn.start_actor( portal = await an.start_actor(
service_name, service_name,
enable_modules=[__name__] enable_modules=[__name__],
loglevel=loglevel,
) )
# TODO: we have issues with this on teardown # TODO: we have issues with this on teardown
@ -389,7 +489,10 @@ async def start_ahab(
# de-escalate root perms to the original user # de-escalate root perms to the original user
# after the docker supervisor actor is spawned. # after the docker supervisor actor is spawned.
if config._parent_user: if (
drop_root_perms
and config._parent_user
):
import pwd import pwd
os.setuid( os.setuid(
pwd.getpwnam( pwd.getpwnam(
@ -400,7 +503,7 @@ async def start_ahab(
async with portal.open_context( async with portal.open_context(
open_ahabd, open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)), endpoint=str(NamespacePath.from_ref(endpoint)),
start_timeout=start_timeout loglevel='cancel',
) as (ctx, first): ) as (ctx, first):
cid, pid, cntr_config = first cid, pid, cntr_config = first

View File

@ -0,0 +1,271 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Daemon-actor spawning "endpoint-hooks".
"""
from __future__ import annotations
from typing import (
Optional,
Callable,
Any,
)
from contextlib import (
asynccontextmanager as acm,
)
import tractor
from ..log import (
get_logger,
get_console_log,
)
from ..brokers import get_brokermod
from ._mngr import (
Services,
)
from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service
log = get_logger(__name__)
# `brokerd` enabled modules
# NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care!
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
]
@acm
async def maybe_spawn_daemon(
service_name: str,
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: Optional[str] = None,
singleton: bool = False,
**kwargs,
) -> tractor.Portal:
'''
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
If this function is called from a non-pikerd actor, the
spawned service will persist as long as pikerd does or
it is requested to be cancelled.
This can be seen as a service starting api for remote-actor
clients.
'''
if loglevel:
get_console_log(loglevel)
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Services.locks[service_name]
await lock.acquire()
async with find_service(service_name) as portal:
if portal is not None:
lock.release()
yield portal
return
log.warning(f"Couldn't find any existing {service_name}")
# TODO: really shouldn't the actor spawning be part of the service
# starting method `Services.start_service()` ?
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
**kwargs,
) as pikerd_portal:
# we are the root and thus are `pikerd`
# so spawn the target service directly by calling
# the provided target routine.
# XXX: this assumes that the target is well formed and will
# do the right things to setup both a sub-actor **and** call
# the ``_Services`` api from above to start the top level
# service task for that actor.
started: bool
if pikerd_portal is None:
started = await service_task_target(**spawn_args)
else:
# tell the remote `pikerd` to start the target,
# the target can't return a non-serializable value
# since it is expected that service startingn is
# non-blocking and the target task will persist running
# on `pikerd` after the client requesting it's start
# disconnects.
started = await pikerd_portal.run(
service_task_target,
**spawn_args,
)
if started:
log.info(f'Service {service_name} started!')
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
async def spawn_brokerd(
brokername: str,
loglevel: Optional[str] = None,
**tractor_kwargs,
) -> bool:
log.info(f'Spawning {brokername} broker daemon')
brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'
extra_tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
tractor_kwargs.update(extra_tractor_kwargs)
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
modpath = brokermod.__name__
broker_enable = [modpath]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
):
subpath = f'{modpath}.{submodname}'
broker_enable.append(subpath)
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + broker_enable,
loglevel=loglevel,
debug_mode=Services.debug_mode,
**tractor_kwargs
)
# non-blocking setup of brokerd service nursery
from ..data import _setup_persistent_brokerd
await Services.start_service_task(
dname,
portal,
_setup_persistent_brokerd,
brokername=brokername,
)
return True
@acm
async def maybe_spawn_brokerd(
brokername: str,
loglevel: Optional[str] = None,
**kwargs,
) -> tractor.Portal:
'''
Helper to spawn a brokerd service *from* a client
who wishes to use the sub-actor-daemon.
'''
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={
'brokername': brokername,
'loglevel': loglevel,
},
loglevel=loglevel,
**kwargs,
) as portal:
yield portal
async def spawn_emsd(
loglevel: Optional[str] = None,
**extra_tractor_kwargs
) -> bool:
"""
Start the clearing engine under ``pikerd``.
"""
log.info('Spawning emsd')
portal = await Services.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
# non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task(
'emsd',
portal,
_setup_persistent_emsd,
)
return True
@acm
async def maybe_open_emsd(
brokername: str,
loglevel: Optional[str] = None,
**kwargs,
) -> tractor._portal.Portal: # noqa
async with maybe_spawn_daemon(
'emsd',
service_task_target=spawn_emsd,
spawn_args={'loglevel': loglevel},
loglevel=loglevel,
**kwargs,
) as portal:
yield portal

View File

@ -0,0 +1,136 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
daemon-service management API.
"""
from collections import defaultdict
from typing import (
Callable,
Any,
)
import trio
from trio_typing import TaskStatus
import tractor
from ..log import (
get_logger,
)
log = get_logger(__name__)
# TODO: factor this into a ``tractor.highlevel`` extension
# pack for the library.
class Services:
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
tractor.Portal,
trio.Event,
]
] = {}
locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task(
self,
name: str,
portal: tractor.Portal,
target: Callable,
**kwargs,
) -> (trio.CancelScope, tractor.Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
This allows for allocating long-running sub-services in our main
daemon and explicitly controlling their lifetimes.
'''
async def open_context_in_task(
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
Any,
]
] = trio.TASK_STATUS_IGNORED,
) -> Any:
with trio.CancelScope() as cs:
async with portal.open_context(
target,
**kwargs,
) as (ctx, first):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((cs, complete, first))
log.info(
f'`pikerd` service {name} started with value {first}'
)
try:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res = await ctx.result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
finally:
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal, complete)
return cs, first
@classmethod
async def cancel_service(
self,
name: str,
) -> Any:
'''
Cancel the service task and actor for the given ``name``.
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name]
cs.cancel()
await complete.wait()
assert name not in self.service_tasks, \
f'Serice task for {name} not terminated?'

View File

@ -0,0 +1,144 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Inter-actor "discovery" (protocol) layer.
"""
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from typing import (
Any,
)
import tractor
from ..log import (
get_logger,
)
log = get_logger(__name__)
_default_registry_host: str = '127.0.0.1'
_default_registry_port: int = 6116
_default_reg_addr: tuple[str, int] = (
_default_registry_host,
_default_registry_port,
)
# NOTE: this value is set as an actor-global once the first endpoint
# who is capable, spawns a `pikerd` service tree.
_registry: Registry | None = None
class Registry:
addr: None | tuple[str, int] = None
# TODO: table of uids to sockaddrs
peers: dict[
tuple[str, str],
tuple[str, int],
] = {}
_tractor_kwargs: dict[str, Any] = {}
@acm
async def open_registry(
addr: None | tuple[str, int] = None,
ensure_exists: bool = True,
) -> tuple[str, int]:
global _tractor_kwargs
actor = tractor.current_actor()
uid = actor.uid
if (
Registry.addr is not None
and addr
):
raise RuntimeError(
f'`{uid}` registry addr already bound @ {_registry.sockaddr}'
)
was_set: bool = False
if (
not tractor.is_root_process()
and Registry.addr is None
):
Registry.addr = actor._arb_addr
if (
ensure_exists
and Registry.addr is None
):
raise RuntimeError(
f"`{uid}` registry should already exist bug doesn't?"
)
if (
Registry.addr is None
):
was_set = True
Registry.addr = addr or _default_reg_addr
_tractor_kwargs['arbiter_addr'] = Registry.addr
try:
yield Registry.addr
finally:
# XXX: always clear the global addr if we set it so that the
# next (set of) calls will apply whatever new one is passed
# in.
if was_set:
Registry.addr = None
@acm
async def find_service(
service_name: str,
) -> tractor.Portal | None:
async with open_registry() as reg_addr:
log.info(f'Scanning for service `{service_name}`')
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
arbiter_sockaddr=reg_addr,
) as maybe_portal:
yield maybe_portal
async def check_for_service(
service_name: str,
) -> None | tuple[str, int]:
'''
Service daemon "liveness" predicate.
'''
async with open_registry(ensure_exists=False) as reg_addr:
async with tractor.query_actor(
service_name,
arbiter_sockaddr=reg_addr,
) as sockaddr:
return sockaddr

View File

@ -15,17 +15,11 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
) )
import pyqtgraph as pg
import numpy as np
import tractor
if TYPE_CHECKING: if TYPE_CHECKING:
import docker import docker
@ -46,6 +40,9 @@ log = get_logger(__name__)
_config = { _config = {
'port': 19200, 'port': 19200,
'log_level': 'debug', 'log_level': 'debug',
# hardcoded to our image version
'version': '7.17.4',
} }
@ -65,14 +62,14 @@ def start_elasticsearch(
-itd \ -itd \
--rm \ --rm \
--network=host \ --network=host \
--mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \ --mount type=bind,source="$(pwd)"/elastic,\
target=/usr/share/elasticsearch/data \
--env "elastic_username=elastic" \ --env "elastic_username=elastic" \
--env "elastic_password=password" \ --env "elastic_password=password" \
--env "xpack.security.enabled=false" \ --env "xpack.security.enabled=false" \
elastic elastic
''' '''
import docker
get_console_log('info', name=__name__) get_console_log('info', name=__name__)
dcntr: DockerContainer = client.containers.run( dcntr: DockerContainer = client.containers.run(
@ -83,27 +80,49 @@ def start_elasticsearch(
remove=True remove=True
) )
async def start_matcher(msg: str): async def health_query(msg: str | None = None):
if (
msg
and _config['version'] in msg
):
return True
try: try:
health = (await asks.get( health = (await asks.get(
f'http://localhost:19200/_cat/health', 'http://localhost:19200/_cat/health',
params={'format': 'json'} params={'format': 'json'}
)).json() )).json()
kog.info(
'ElasticSearch cntr health:\n'
f'{health}'
)
except OSError: except OSError:
log.error('couldnt reach elastic container') log.exception('couldnt reach elastic container')
return False return False
log.info(health) log.info(health)
return health[0]['status'] == 'green' return health[0]['status'] == 'green'
async def stop_matcher(msg: str): async def chk_for_closed_msg(msg: str):
return msg == 'closed' return msg == 'closed'
return ( return (
dcntr, dcntr,
{}, {
# apparently we're REALLY tolerant of startup latency
# for CI XD
'startup_timeout': 240.0,
# XXX: decrease http poll period bc docker
# is shite at handling fast poll rates..
'startup_query_period': 0.1,
'log_msg_key': 'message',
# 'started_afunc': health_query,
},
# expected startup and stop msgs # expected startup and stop msgs
start_matcher, health_query,
stop_matcher, chk_for_closed_msg,
) )

View File

@ -26,7 +26,6 @@
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from pprint import pformat
from typing import ( from typing import (
Any, Any,
Optional, Optional,
@ -55,7 +54,7 @@ if TYPE_CHECKING:
import docker import docker
from ._ahab import DockerContainer from ._ahab import DockerContainer
from .feed import maybe_open_feed from ..data.feed import maybe_open_feed
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._profile import Profiler from .._profile import Profiler
@ -63,11 +62,12 @@ from .._profile import Profiler
log = get_logger(__name__) log = get_logger(__name__)
# container level config # ahabd-supervisor and container level config
_config = { _config = {
'grpc_listen_port': 5995, 'grpc_listen_port': 5995,
'ws_listen_port': 5993, 'ws_listen_port': 5993,
'log_level': 'debug', 'log_level': 'debug',
'startup_timeout': 2,
} }
_yaml_config = ''' _yaml_config = '''
@ -185,7 +185,11 @@ def start_marketstore(
config_dir_mnt, config_dir_mnt,
data_dir_mnt, data_dir_mnt,
], ],
# XXX: this must be set to allow backgrounding/non-blocking
# usage interaction with the container's process.
detach=True, detach=True,
# stop_signal='SIGINT', # stop_signal='SIGINT',
init=True, init=True,
# remove=True, # remove=True,
@ -324,7 +328,7 @@ def quote_to_marketstore_structarray(
@acm @acm
async def get_client( async def get_client(
host: str = 'localhost', host: str = 'localhost',
port: int = 5995 port: int = _config['grpc_listen_port'],
) -> MarketstoreClient: ) -> MarketstoreClient:
''' '''
@ -510,7 +514,6 @@ class Storage:
client = self.client client = self.client
syms = await client.list_symbols() syms = await client.list_symbols()
print(syms)
if key not in syms: if key not in syms:
raise KeyError(f'`{key}` table key not found in\n{syms}?') raise KeyError(f'`{key}` table key not found in\n{syms}?')
@ -627,10 +630,10 @@ async def open_storage_client(
yield Storage(client) yield Storage(client)
async def tsdb_history_update( @acm
fqsn: Optional[str] = None, async def open_tsdb_client(
fqsn: str,
) -> list[str]: ) -> Storage:
# TODO: real-time dedicated task for ensuring # TODO: real-time dedicated task for ensuring
# history consistency between the tsdb, shm and real-time feed.. # history consistency between the tsdb, shm and real-time feed..
@ -659,7 +662,7 @@ async def tsdb_history_update(
# - https://github.com/pikers/piker/issues/98 # - https://github.com/pikers/piker/issues/98
# #
profiler = Profiler( profiler = Profiler(
disabled=False, # not pg_profile_enabled(), disabled=True, # not pg_profile_enabled(),
delayed=False, delayed=False,
) )
@ -701,13 +704,9 @@ async def tsdb_history_update(
# profiler('Finished db arrays diffs') # profiler('Finished db arrays diffs')
syms = await storage.client.list_symbols() syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') # log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}') # profiler(f'listed symbols {syms}')
yield storage
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()
# for array in [to_append, to_prepend]: # for array in [to_append, to_prepend]:
# if array is None: # if array is None:

View File

@ -1 +0,0 @@
TEST_CONFIG_DIR_PATH = '_testing'

View File

@ -24,7 +24,7 @@ from types import ModuleType
from PyQt5.QtCore import QEvent from PyQt5.QtCore import QEvent
import trio import trio
from .._daemon import maybe_spawn_brokerd from ..service import maybe_spawn_brokerd
from . import _event from . import _event
from ._exec import run_qtractor from ._exec import run_qtractor
from ..data.feed import install_brokerd_search from ..data.feed import install_brokerd_search

View File

@ -49,7 +49,7 @@ from qdarkstyle import DarkPalette
import trio import trio
from outcome import Error from outcome import Error
from .._daemon import ( from ..service import (
maybe_open_pikerd, maybe_open_pikerd,
get_tractor_runtime_kwargs, get_tractor_runtime_kwargs,
) )

View File

@ -24,7 +24,7 @@ import tractor
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from .._daemon import maybe_spawn_brokerd from ..service import maybe_spawn_brokerd
_config_dir = click.get_app_dir('piker') _config_dir = click.get_app_dir('piker')
@ -181,9 +181,6 @@ def chart(
'debug_mode': pdb, 'debug_mode': pdb,
'loglevel': tractorloglevel, 'loglevel': tractorloglevel,
'name': 'chart', 'name': 'chart',
'enable_modules': [
'piker.clearing._client'
],
'registry_addr': config.get('registry_addr'), 'registry_addr': config.get('registry_addr'),
}, },
) )

View File

@ -1,19 +1,18 @@
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import logging
import os import os
from typing import AsyncContextManager
from pathlib import Path from pathlib import Path
from shutil import rmtree
import pytest import pytest
import tractor import tractor
from piker import ( from piker import (
# log,
config, config,
) )
from piker._daemon import ( from piker.service import (
Services, Services,
) )
from piker.log import get_console_log
from piker.clearing._client import open_ems from piker.clearing._client import open_ems
@ -70,8 +69,24 @@ def ci_env() -> bool:
return _ci_env return _ci_env
@pytest.fixture()
def log(
request: pytest.FixtureRequest,
loglevel: str,
) -> logging.Logger:
'''
Deliver a per-test-named ``piker.log`` instance.
'''
return get_console_log(
level=loglevel,
name=request.node.name,
)
@acm @acm
async def _open_test_pikerd( async def _open_test_pikerd(
tmpconfdir: str,
reg_addr: tuple[str, int] | None = None, reg_addr: tuple[str, int] | None = None,
loglevel: str = 'warning', loglevel: str = 'warning',
**kwargs, **kwargs,
@ -88,7 +103,7 @@ async def _open_test_pikerd(
''' '''
import random import random
from piker._daemon import maybe_open_pikerd from piker.service import maybe_open_pikerd
if reg_addr is None: if reg_addr is None:
port = random.randint(6e3, 7e3) port = random.randint(6e3, 7e3)
@ -98,7 +113,17 @@ async def _open_test_pikerd(
maybe_open_pikerd( maybe_open_pikerd(
registry_addr=reg_addr, registry_addr=reg_addr,
loglevel=loglevel, loglevel=loglevel,
tractor_runtime_overrides={
'piker_test_dir': tmpconfdir,
},
# tests may need to spawn containers dynamically
# or just in sequence per test, so we keep root.
drop_root_perms_for_ahab=False,
**kwargs, **kwargs,
) as service_manager, ) as service_manager,
): ):
# this proc/actor is the pikerd # this proc/actor is the pikerd
@ -120,18 +145,40 @@ async def _open_test_pikerd(
@pytest.fixture @pytest.fixture
def open_test_pikerd( def open_test_pikerd(
request, request: pytest.FixtureRequest,
tmp_path: Path,
loglevel: str, loglevel: str,
): ):
tmpconfdir: Path = tmp_path / '_testing'
tmpconfdir.mkdir()
tmpconfdir_str: str = str(tmpconfdir)
# NOTE: on linux the tmp config dir is generally located at:
# /tmp/pytest-of-<username>/pytest-<run#>/test_<current_test_name>/
# the default `pytest` config ensures that only the last 4 test
# suite run's dirs will be persisted, otherwise they are removed:
# https://docs.pytest.org/en/6.2.x/tmpdir.html#the-default-base-temporary-directory
print(f'CURRENT TEST CONF DIR: {tmpconfdir}')
yield partial( yield partial(
_open_test_pikerd, _open_test_pikerd,
# pass in a unique temp dir for this test request
# so that we can have multiple tests running (maybe in parallel)
# bwitout clobbering each other's config state.
tmpconfdir=tmpconfdir_str,
# bind in level from fixture, which is itself set by # bind in level from fixture, which is itself set by
# `--ll <value>` cli flag. # `--ll <value>` cli flag.
loglevel=loglevel, loglevel=loglevel,
) )
# NOTE: the `tmp_dir` fixture will wipe any files older then 3 test
# sessions by default:
# https://docs.pytest.org/en/6.2.x/tmpdir.html#the-default-base-temporary-directory
# BUT, if we wanted to always wipe conf dir and all contained files,
# rmtree(str(tmp_path))
# TODO: teardown checks such as, # TODO: teardown checks such as,
# - no leaked subprocs or shm buffers # - no leaked subprocs or shm buffers
# - all requested container service are torn down # - all requested container service are torn down
@ -151,7 +198,8 @@ async def _open_test_pikerd_and_ems(
fqsn, fqsn,
mode=mode, mode=mode,
loglevel=loglevel, loglevel=loglevel,
) as ems_services): ) as ems_services,
):
yield (services, ems_services) yield (services, ems_services)
@ -169,19 +217,3 @@ def open_test_pikerd_and_ems(
loglevel, loglevel,
open_test_pikerd open_test_pikerd
) )
@pytest.fixture(scope='module')
def delete_testing_dir():
'''
This fixture removes the temp directory
used for storing all config/ledger/pp data
created during testing sessions. During test runs
this file can be found in .config/piker/_testing
'''
yield
app_dir = Path(config.get_app_dir('piker')).resolve()
if app_dir.is_dir():
rmtree(str(app_dir))
assert not app_dir.is_dir()

View File

@ -1,66 +1,124 @@
import pytest
import trio
from typing import AsyncContextManager from typing import AsyncContextManager
import logging
from piker._daemon import Services import trio
from piker.log import get_logger from elasticsearch import (
Elasticsearch,
ConnectionError,
)
from piker.service import marketstore
from piker.service import elastic
from elasticsearch import Elasticsearch
from piker.data import marketstore
def test_marketstore_startup_and_version( def test_marketstore_startup_and_version(
open_test_pikerd: AsyncContextManager, open_test_pikerd: AsyncContextManager,
loglevel, loglevel: str,
): ):
'''
Verify marketstore tsdb starts up and we can
connect with a client to do basic API reqs.
''' '''
Verify marketstore starts correctly
'''
log = get_logger(__name__)
async def main(): async def main():
# port = 5995
async with ( async with (
open_test_pikerd( open_test_pikerd(
loglevel=loglevel, loglevel=loglevel,
tsdb=True tsdb=True
) as (s, i, pikerd_portal, services), ) as (
marketstore.get_client() as client _, # host
_, # port
pikerd_portal,
services,
),
): ):
# TODO: we should probably make this connection poll
# loop part of the `get_client()` implementation no?
# XXX NOTE: we use a retry-connect loop because it seems
# that if we connect *too fast* to a booting container
# instance (i.e. if mkts's IPC machinery isn't up early
# enough) the client will hang on req-resp submissions. So,
# instead we actually reconnect the client entirely in
# a loop until we get a response.
for _ in range(3):
# NOTE: default sockaddr is embedded within
async with marketstore.get_client() as client:
with trio.move_on_after(1) as cs:
syms = await client.list_symbols()
if cs.cancelled_caught:
continue
# should be an empty db (for now) since we spawn
# marketstore in a ephemeral test-harness dir.
assert not syms
print(f'RX syms resp: {syms}')
assert ( assert (
len(await client.server_version()) == len(await client.server_version()) ==
len('3862e9973da36cfc6004b88172c08f09269aaf01') len('3862e9973da36cfc6004b88172c08f09269aaf01')
) )
print('VERSION CHECKED')
break # get out of retry-connect loop
trio.run(main) trio.run(main)
def test_elasticsearch_startup_and_version( def test_elasticsearch_startup_and_version(
open_test_pikerd: AsyncContextManager, open_test_pikerd: AsyncContextManager,
loglevel, loglevel: str,
log: logging.Logger,
): ):
''' '''
Verify elasticsearch starts correctly Verify elasticsearch starts correctly (like at some point before
infinity time)..
''' '''
log = get_logger(__name__)
async def main(): async def main():
port = 19200 port = 19200
async with open_test_pikerd( async with (
open_test_pikerd(
loglevel=loglevel, loglevel=loglevel,
es=True es=True
) as (s, i, pikerd_portal, services): ) as (
_, # host
_, # port
pikerd_portal,
services,
),
):
# TODO: much like the above connect loop for mkts, we should
# probably make this sync start part of the
# ``open_client()`` implementation?
for i in range(240):
with Elasticsearch(
hosts=[f'http://localhost:{port}']
) as es:
try:
es = Elasticsearch(hosts=[f'http://localhost:{port}']) resp = es.info()
assert es.info()['version']['number'] == '7.17.4' assert (
resp['version']['number']
==
elastic._config['version']
)
print(
"OMG ELASTIX FINALLY CUKCING CONNECTED!>!>!\n"
f'resp: {resp}'
)
break
except ConnectionError:
log.exception(
f'RETRYING client connection for {i} time!'
)
await trio.sleep(1)
continue
trio.run(main) trio.run(main)

View File

@ -17,7 +17,6 @@ from functools import partial
from piker.log import get_logger from piker.log import get_logger
from piker.clearing._messages import Order from piker.clearing._messages import Order
from piker.pp import ( from piker.pp import (
open_trade_ledger,
open_pps, open_pps,
) )
@ -42,18 +41,19 @@ async def _async_main(
price: int = 30000, price: int = 30000,
executions: int = 1, executions: int = 1,
size: float = 0.01, size: float = 0.01,
# Assert options # Assert options
assert_entries: bool = False, assert_entries: bool = False,
assert_pps: bool = False, assert_pps: bool = False,
assert_zeroed_pps: bool = False, assert_zeroed_pps: bool = False,
assert_msg: bool = False, assert_msg: bool = False,
) -> None: ) -> None:
''' '''
Start piker, place a trade and assert data in Start piker, place a trade and assert data in
pps stream, ledger and position table. pps stream, ledger and position table.
''' '''
oid: str = '' oid: str = ''
last_msg = {} last_msg = {}
@ -151,7 +151,6 @@ def _run_test_and_check(fn):
def test_buy( def test_buy(
open_test_pikerd_and_ems: AsyncContextManager, open_test_pikerd_and_ems: AsyncContextManager,
delete_testing_dir
): ):
''' '''
Enter a trade and assert entries are made in pps and ledger files. Enter a trade and assert entries are made in pps and ledger files.
@ -178,7 +177,6 @@ def test_buy(
def test_sell( def test_sell(
open_test_pikerd_and_ems: AsyncContextManager, open_test_pikerd_and_ems: AsyncContextManager,
delete_testing_dir
): ):
''' '''
Sell position and ensure pps are zeroed. Sell position and ensure pps are zeroed.
@ -201,9 +199,9 @@ def test_sell(
), ),
) )
def test_multi_sell( def test_multi_sell(
open_test_pikerd_and_ems: AsyncContextManager, open_test_pikerd_and_ems: AsyncContextManager,
delete_testing_dir
): ):
''' '''
Make 5 market limit buy orders and Make 5 market limit buy orders and

View File

@ -9,8 +9,7 @@ import pytest
import trio import trio
import tractor import tractor
from piker.log import get_logger from piker.service import (
from piker._daemon import (
find_service, find_service,
Services, Services,
) )