Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 40e49333be Bump mkts timeout to 2s 2023-03-08 15:25:38 -05:00
Tyler Goodlet f627fedf74 Move all docker and external db code to `piker.service` 2023-03-08 15:25:20 -05:00
Tyler Goodlet bd248381ea Start `piker.service` sub-package
For now just moves everything that was in `piker._daemon` to a subpkg
module but a reorg is coming pronto!
2023-03-08 15:14:39 -05:00
Tyler Goodlet a70d76e3e6 Set explicit `marketstore` container startup timeout 2023-03-08 15:01:06 -05:00
Tyler Goodlet a5caaef467 Hardcode `cancel` log level for `ahabd` for now 2023-03-08 15:00:24 -05:00
Tyler Goodlet 7e35696dbb Always passthrough loglevel to `ahabd` supervisor 2023-03-08 14:56:21 -05:00
Tyler Goodlet 93702320a3 Background docker-container logs processing
Previously we would make the `ahabd` supervisor-actor sync to docker
container startup using pseudo-blocking log message processing.

This has issues,
- we're forced to do a hacky "yield back to `trio`" in order to be
  "fake async" when reading the log stream and further,
- blocking on a message is fragile and often slow.

Instead, run the log processor in a background task and in the parent
task poll for the container to be in the client list using a similar
pseudo-async poll pattern. This allows the super to `Context.started()`
sooner (when the container is actually registered as "up") and thus
unblock its (remote) caller faster whilst still doing full log msg
proxying!

Deatz:
- adds `Container.cuid: str` a unique container id for logging.
- correctly proxy through the `loglevel: str` from `pikerd` caller task.
- shield around `Container.cancel()` in the teardown block and use
  cancel level logging in that method.
2023-03-08 14:28:48 -05:00
Tyler Goodlet 5683eb8ef0 Doc string and types bump in loggin mod 2023-03-08 14:22:23 -05:00
Tyler Goodlet ad6b655d7d Apply `Services` runtime state **immediately** inside starup block 2023-03-08 13:01:42 -05:00
Tyler Goodlet 6d1ecdde40 Deliver es specific ahab-super in endpoint startup config 2023-03-08 13:00:11 -05:00
19 changed files with 221 additions and 125 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers. # piker: trading gear for hackers.
# Copyright 2020-eternity Tyler Goodlet (in stewardship for piker0) # Copyright 2020-eternity Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -14,11 +14,11 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" '''
piker: trading gear for hackers. piker: trading gear for hackers.
""" '''
from ._daemon import open_piker_runtime from .service import open_piker_runtime
from .data.feed import open_feed from .data.feed import open_feed
__all__ = [ __all__ = [

View File

@ -29,8 +29,15 @@ import tractor
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger from ..log import get_console_log, colorize_json, get_logger
from .._daemon import maybe_spawn_brokerd, maybe_open_pikerd from ..service import (
from ..brokers import core, get_brokermod, data maybe_spawn_brokerd,
maybe_open_pikerd,
)
from ..brokers import (
core,
get_brokermod,
data,
)
log = get_logger('cli') log = get_logger('cli')
DEFAULT_BROKER = 'questrade' DEFAULT_BROKER = 'questrade'
@ -60,6 +67,7 @@ def get_method(client, meth_name: str):
print_ok('found!.') print_ok('found!.')
return method return method
async def run_method(client, meth_name: str, **kwargs): async def run_method(client, meth_name: str, **kwargs):
method = get_method(client, meth_name) method = get_method(client, meth_name)
print('running...', end='', flush=True) print('running...', end='', flush=True)
@ -67,19 +75,20 @@ async def run_method(client, meth_name: str, **kwargs):
print_ok(f'done! result: {type(result)}') print_ok(f'done! result: {type(result)}')
return result return result
async def run_test(broker_name: str): async def run_test(broker_name: str):
brokermod = get_brokermod(broker_name) brokermod = get_brokermod(broker_name)
total = 0 total = 0
passed = 0 passed = 0
failed = 0 failed = 0
print(f'getting client...', end='', flush=True) print('getting client...', end='', flush=True)
if not hasattr(brokermod, 'get_client'): if not hasattr(brokermod, 'get_client'):
print_error('fail! no \'get_client\' context manager found.') print_error('fail! no \'get_client\' context manager found.')
return return
async with brokermod.get_client(is_brokercheck=True) as client: async with brokermod.get_client(is_brokercheck=True) as client:
print_ok(f'done! inside client context.') print_ok('done! inside client context.')
# check for methods present on brokermod # check for methods present on brokermod
method_list = [ method_list = [
@ -130,7 +139,6 @@ async def run_test(broker_name: str):
total += 1 total += 1
# check for methods present con brokermod.Client and their # check for methods present con brokermod.Client and their
# results # results
@ -180,7 +188,6 @@ def brokercheck(config, broker):
trio.run(run_test, broker) trio.run(run_test, broker)
@cli.command() @cli.command()
@click.option('--keys', '-k', multiple=True, @click.option('--keys', '-k', multiple=True,
help='Return results only for these keys') help='Return results only for these keys')
@ -335,8 +342,6 @@ def contracts(ctx, loglevel, broker, symbol, ids):
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
get_console_log(loglevel) get_console_log(loglevel)
contracts = trio.run(partial(core.contracts, brokermod, symbol)) contracts = trio.run(partial(core.contracts, brokermod, symbol))
if not ids: if not ids:
# just print out expiry dates which can be used with # just print out expiry dates which can be used with

View File

@ -28,7 +28,7 @@ import trio
from ..log import get_logger from ..log import get_logger
from . import get_brokermod from . import get_brokermod
from .._daemon import maybe_spawn_brokerd from ..service import maybe_spawn_brokerd
from .._cacheables import open_cached_client from .._cacheables import open_cached_client

View File

@ -29,8 +29,11 @@ from tractor.trionics import broadcast_receiver
from ..log import get_logger from ..log import get_logger
from ..data.types import Struct from ..data.types import Struct
from .._daemon import maybe_open_emsd from ..service import maybe_open_emsd
from ._messages import Order, Cancel from ._messages import (
Order,
Cancel,
)
from ..brokers import get_brokermod from ..brokers import get_brokermod
if TYPE_CHECKING: if TYPE_CHECKING:

View File

@ -19,16 +19,18 @@ CLI commons.
''' '''
import os import os
from pprint import pformat
from functools import partial
import click import click
import trio import trio
import tractor import tractor
from ..log import get_console_log, get_logger, colorize_json from ..log import (
get_console_log,
get_logger,
colorize_json,
)
from ..brokers import get_brokermod from ..brokers import get_brokermod
from .._daemon import ( from ..service import (
_default_registry_host, _default_registry_host,
_default_registry_port, _default_registry_port,
) )
@ -68,7 +70,7 @@ def pikerd(
''' '''
from .._daemon import open_pikerd from ..service import open_pikerd
log = get_console_log(loglevel) log = get_console_log(loglevel)
if pdb: if pdb:
@ -171,7 +173,7 @@ def cli(
@click.pass_obj @click.pass_obj
def services(config, tl, ports): def services(config, tl, ports):
from .._daemon import ( from ..service import (
open_piker_runtime, open_piker_runtime,
_default_registry_port, _default_registry_port,
_default_registry_host, _default_registry_host,
@ -204,8 +206,8 @@ def services(config, tl, ports):
def _load_clis() -> None: def _load_clis() -> None:
from ..data import marketstore # noqa from ..service import marketstore # noqa
from ..data import elastic from ..service import elastic
from ..data import cli # noqa from ..data import cli # noqa
from ..brokers import cli # noqa from ..brokers import cli # noqa
from ..ui import cli # noqa from ..ui import cli # noqa

View File

@ -42,7 +42,7 @@ from ..log import (
get_logger, get_logger,
get_console_log, get_console_log,
) )
from .._daemon import maybe_spawn_daemon from ..service import maybe_spawn_daemon
if TYPE_CHECKING: if TYPE_CHECKING:
from ._sharedmem import ( from ._sharedmem import (
@ -69,7 +69,7 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see time-step-sample (real-time) quote feeds, see
``._daemon.maybe_open_samplerd()`` and the below ``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``. ``register_with_sampler()``.
''' '''
@ -391,7 +391,7 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting. update and increment count write and stream broadcasting.
''' '''
from piker._daemon import Services from piker.service import Services
dname = 'samplerd' dname = 'samplerd'
log.info(f'Spawning `{dname}`') log.info(f'Spawning `{dname}`')

View File

@ -137,14 +137,14 @@ def storesh(
''' '''
from piker.data.marketstore import open_tsdb_client from piker.data.marketstore import open_tsdb_client
from piker._daemon import open_piker_runtime from piker.service import open_piker_runtime
async def main(): async def main():
nonlocal symbols nonlocal symbols
async with open_piker_runtime( async with open_piker_runtime(
'storesh', 'storesh',
enable_modules=['piker.data._ahab'], enable_modules=['piker.service._ahab'],
): ):
symbol = symbols[0] symbol = symbols[0]
@ -187,14 +187,14 @@ def storage(
''' '''
from piker.data.marketstore import open_tsdb_client from piker.data.marketstore import open_tsdb_client
from piker._daemon import open_piker_runtime from piker.service import open_piker_runtime
async def main(): async def main():
nonlocal symbols nonlocal symbols
async with open_piker_runtime( async with open_piker_runtime(
'tsdb_storage', 'tsdb_storage',
enable_modules=['piker.data._ahab'], enable_modules=['piker.service._ahab'],
): ):
symbol = symbols[0] symbol = symbols[0]
async with open_tsdb_client(symbol) as storage: async with open_tsdb_client(symbol) as storage:

View File

@ -58,7 +58,7 @@ from ..log import (
get_logger, get_logger,
get_console_log, get_console_log,
) )
from .._daemon import ( from ..service import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
check_for_service, check_for_service,
) )

View File

@ -21,7 +21,11 @@ import logging
import json import json
import tractor import tractor
from pygments import highlight, lexers, formatters from pygments import (
highlight,
lexers,
formatters,
)
# Makes it so we only see the full module name when using ``__name__`` # Makes it so we only see the full module name when using ``__name__``
# without the extra "piker." prefix. # without the extra "piker." prefix.
@ -32,26 +36,48 @@ def get_logger(
name: str = None, name: str = None,
) -> logging.Logger: ) -> logging.Logger:
'''Return the package log or a sub-log for `name` if provided. '''
Return the package log or a sub-log for `name` if provided.
''' '''
return tractor.log.get_logger(name=name, _root_name=_proj_name) return tractor.log.get_logger(name=name, _root_name=_proj_name)
def get_console_log(level: str = None, name: str = None) -> logging.Logger: def get_console_log(
'''Get the package logger and enable a handler which writes to stderr. level: str | None = None,
name: str | None = None,
) -> logging.Logger:
'''
Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use ``DictConfig``. You do it... Yeah yeah, i know we can use ``DictConfig``. You do it...
''' '''
return tractor.log.get_console_log( return tractor.log.get_console_log(
level, name=name, _root_name=_proj_name) # our root logger level,
name=name,
_root_name=_proj_name,
) # our root logger
def colorize_json(data, style='algol_nu'): def colorize_json(
"""Colorize json output using ``pygments``. data: dict,
""" style='algol_nu',
formatted_json = json.dumps(data, sort_keys=True, indent=4) ):
'''
Colorize json output using ``pygments``.
'''
formatted_json = json.dumps(
data,
sort_keys=True,
indent=4,
)
return highlight( return highlight(
formatted_json, lexers.JsonLexer(), formatted_json,
lexers.JsonLexer(),
# likeable styles: algol_nu, tango, monokai # likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style) formatters.TerminalTrueColorFormatter(style=style)
) )

View File

@ -19,6 +19,8 @@ Structured, daemon tree service management.
""" """
from __future__ import annotations from __future__ import annotations
from pprint import pformat
from functools import partial
import os import os
from typing import ( from typing import (
Optional, Optional,
@ -35,14 +37,11 @@ import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from .log import ( from ..log import (
get_logger, get_logger,
get_console_log, get_console_log,
) )
from .brokers import get_brokermod from ..brokers import get_brokermod
from pprint import pformat
from functools import partial
log = get_logger(__name__) log = get_logger(__name__)
@ -337,7 +336,6 @@ async def open_pikerd(
alive underling services (see below). alive underling services (see below).
''' '''
async with ( async with (
open_piker_runtime( open_piker_runtime(
@ -355,17 +353,26 @@ async def open_pikerd(
tractor.open_nursery() as actor_nursery, tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery, trio.open_nursery() as service_nursery,
): ):
assert root_actor.accept_addr == reg_addr if root_actor.accept_addr != reg_addr:
raise RuntimeError(f'Daemon failed to bind on {reg_addr}!?')
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
if tsdb: if tsdb:
from piker.data._ahab import start_ahab from ._ahab import start_ahab
from piker.data.marketstore import start_marketstore from .marketstore import start_marketstore
log.info('Spawning `marketstore` supervisor') log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start( ctn_ready, config, (cid, pid) = await service_nursery.start(
start_ahab, partial(
'marketstored', start_ahab,
start_marketstore, 'marketstored',
start_marketstore,
loglevel=loglevel,
)
) )
log.info( log.info(
@ -385,7 +392,7 @@ async def open_pikerd(
start_ahab, start_ahab,
'elasticsearch', 'elasticsearch',
start_elasticsearch, start_elasticsearch,
start_timeout=240.0 # high cause ci loglevel=loglevel,
) )
) )
@ -396,12 +403,6 @@ async def open_pikerd(
f'config: {pformat(config)}' f'config: {pformat(config)}'
) )
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try: try:
yield Services yield Services
@ -667,7 +668,7 @@ async def spawn_brokerd(
) )
# non-blocking setup of brokerd service nursery # non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd from ..data import _setup_persistent_brokerd
await Services.start_service_task( await Services.start_service_task(
dname, dname,
@ -695,7 +696,10 @@ async def maybe_spawn_brokerd(
f'brokerd.{brokername}', f'brokerd.{brokername}',
service_task_target=spawn_brokerd, service_task_target=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel}, spawn_args={
'brokername': brokername,
'loglevel': loglevel,
},
loglevel=loglevel, loglevel=loglevel,
**kwargs, **kwargs,
@ -727,7 +731,7 @@ async def spawn_emsd(
) )
# non-blocking setup of clearing service # non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task( await Services.start_service_task(
'emsd', 'emsd',

View File

@ -19,6 +19,7 @@ Supervisor for docker with included specific-image service helpers.
''' '''
from collections import ChainMap from collections import ChainMap
from functools import partial
import os import os
import time import time
from typing import ( from typing import (
@ -46,7 +47,10 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ..log import get_logger, get_console_log from ..log import (
get_logger,
get_console_log,
)
from .. import config from .. import config
log = get_logger(__name__) log = get_logger(__name__)
@ -197,6 +201,11 @@ class Container:
return False return False
@property
def cuid(self) -> str:
fqcn: str = self.cntr.attrs['Config']['Image']
return f'{fqcn}[{self.cntr.short_id}]'
def try_signal( def try_signal(
self, self,
signal: str = 'SIGINT', signal: str = 'SIGINT',
@ -232,17 +241,23 @@ class Container:
async def cancel( async def cancel(
self, self,
stop_msg: str, log_msg_key: str,
stop_predicate: Callable[[str], bool],
hard_kill: bool = False, hard_kill: bool = False,
) -> None: ) -> None:
'''
Attempt to cancel this container gracefully, fail over to
a hard kill on timeout.
'''
cid = self.cntr.id cid = self.cntr.id
# first try a graceful cancel # first try a graceful cancel
log.cancel( log.cancel(
f'SIGINT cancelling container: {cid}\n' f'SIGINT cancelling container: {self.cuid}\n'
f'waiting on stop msg: "{stop_msg}"' 'waiting on stop predicate...'
) )
self.try_signal('SIGINT') self.try_signal('SIGINT')
@ -253,7 +268,10 @@ class Container:
log.cancel('polling for CNTR logs...') log.cancel('polling for CNTR logs...')
try: try:
await self.process_logs_until(stop_msg) await self.process_logs_until(
log_msg_key,
stop_predicate,
)
except ApplicationLogError: except ApplicationLogError:
hard_kill = True hard_kill = True
else: else:
@ -311,11 +329,16 @@ class Container:
async def open_ahabd( async def open_ahabd(
ctx: tractor.Context, ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type endpoint: str, # ns-pointer str-msg-type
loglevel: str | None = 'cancel',
**kwargs, **kwargs,
) -> None: ) -> None:
get_console_log('info', name=__name__)
log = get_console_log(
loglevel,
name=__name__,
)
async with open_docker() as client: async with open_docker() as client:
@ -338,40 +361,63 @@ async def open_ahabd(
# defaults # defaults
{ {
# startup time limit which is the max the supervisor
# will wait for the container to be registered in
# ``client.containers.list()``
'startup_timeout': 1.0, 'startup_timeout': 1.0,
# how fast to poll for the starup predicate by sleeping
# this amount incrementally thus yielding to the
# ``trio`` scheduler on during sync polling execution.
'startup_query_period': 0.001, 'startup_query_period': 0.001,
# str-key value expected to contain log message body-contents
# when read using:
# ``json.loads(entry for entry in DockerContainer.logs())``
'log_msg_key': 'msg', 'log_msg_key': 'msg',
}, },
) )
found = False with trio.move_on_after(conf['startup_timeout']) as cs:
with trio.move_on_after(conf['startup_timeout']): async with trio.open_nursery() as tn:
found = await cntr.process_logs_until( tn.start_soon(
conf['log_msg_key'], partial(
start_lambda, cntr.process_logs_until,
checkpoint_period=conf['startup_query_period'], log_msg_key=conf['log_msg_key'],
) patt_matcher=start_lambda,
checkpoint_period=conf['startup_query_period'],
)
)
# XXX: if we timeout on finding the "startup msg" we expect then # poll for container startup or timeout
# we want to FOR SURE raise an error upwards! while not cs.cancel_called:
if ( if dcntr in client.containers.list():
not found break
and dcntr not in client.containers.list()
):
for entry in cntr.seen_so_far:
log.info(entry)
raise RuntimeError( await trio.sleep(conf['startup_query_period'])
f'Failed to start {dcntr.id} check logs deats'
)
await ctx.started(( # sync with remote caller actor-task but allow log
cntr.cntr.id, # processing to continue running in bg.
os.getpid(), await ctx.started((
cntr_config, cntr.cntr.id,
)) os.getpid(),
cntr_config,
))
try: try:
# XXX: if we timeout on finding the "startup msg" we expect then
# we want to FOR SURE raise an error upwards!
if cs.cancelled_caught:
# if dcntr not in client.containers.list():
for entry in cntr.seen_so_far:
log.info(entry)
raise DockerNotStarted(
f'Failed to start container: {cntr.cuid}\n'
f'due to startup_timeout={conf["startup_timeout"]}s\n\n'
"prolly you should check your container's logs for deats.."
)
# TODO: we might eventually want a proxy-style msg-prot here # TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing # to allow remote control of containers without needing
# callers to have root perms? # callers to have root perms?
@ -380,14 +426,21 @@ async def open_ahabd(
finally: finally:
# TODO: ensure loglevel can be set and teardown logs are # TODO: ensure loglevel can be set and teardown logs are
# reported if possible on error or cancel.. # reported if possible on error or cancel..
# XXX WARNING: currently shielding here can result in hangs
# on ctl-c from user.. ideally we can avoid a cancel getting
# consumed and not propagating whilst still doing teardown
# logging..
# with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
await cntr.cancel(stop_lambda) await cntr.cancel(
log_msg_key=conf['log_msg_key'],
stop_predicate=stop_lambda,
)
async def start_ahab( async def start_ahab(
service_name: str, service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer], endpoint: Callable[docker.DockerClient, DockerContainer],
loglevel: str | None = None, loglevel: str | None = 'cancel',
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
@ -409,13 +462,12 @@ async def start_ahab(
''' '''
cn_ready = trio.Event() cn_ready = trio.Event()
try: try:
async with tractor.open_nursery( async with tractor.open_nursery() as an:
loglevel='runtime',
) as tn:
portal = await tn.start_actor( portal = await an.start_actor(
service_name, service_name,
enable_modules=[__name__] enable_modules=[__name__],
loglevel=loglevel,
) )
# TODO: we have issues with this on teardown # TODO: we have issues with this on teardown
@ -436,6 +488,7 @@ async def start_ahab(
async with portal.open_context( async with portal.open_context(
open_ahabd, open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)), endpoint=str(NamespacePath.from_ref(endpoint)),
loglevel='cancel',
) as (ctx, first): ) as (ctx, first):
cid, pid, cntr_config = first cid, pid, cntr_config = first

View File

@ -15,17 +15,11 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import ( from typing import (
Any, Any,
TYPE_CHECKING, TYPE_CHECKING,
) )
import pyqtgraph as pg
import numpy as np
import tractor
if TYPE_CHECKING: if TYPE_CHECKING:
import docker import docker
@ -65,14 +59,14 @@ def start_elasticsearch(
-itd \ -itd \
--rm \ --rm \
--network=host \ --network=host \
--mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \ --mount type=bind,source="$(pwd)"/elastic,\
target=/usr/share/elasticsearch/data \
--env "elastic_username=elastic" \ --env "elastic_username=elastic" \
--env "elastic_password=password" \ --env "elastic_password=password" \
--env "xpack.security.enabled=false" \ --env "xpack.security.enabled=false" \
elastic elastic
''' '''
import docker
get_console_log('info', name=__name__) get_console_log('info', name=__name__)
dcntr: DockerContainer = client.containers.run( dcntr: DockerContainer = client.containers.run(
@ -86,7 +80,7 @@ def start_elasticsearch(
async def start_matcher(msg: str): async def start_matcher(msg: str):
try: try:
health = (await asks.get( health = (await asks.get(
f'http://localhost:19200/_cat/health', 'http://localhost:19200/_cat/health',
params={'format': 'json'} params={'format': 'json'}
)).json() )).json()
@ -102,7 +96,17 @@ def start_elasticsearch(
return ( return (
dcntr, dcntr,
{}, {
# apparently we're REALLY tolerant of startup latency
# for CI XD
'startup_timeout': 240.0,
# XXX: decrease http poll period bc docker
# is shite at handling fast poll rates..
'startup_query_period': 0.1,
'log_msg_key': 'message',
},
# expected startup and stop msgs # expected startup and stop msgs
start_matcher, start_matcher,
stop_matcher, stop_matcher,

View File

@ -26,7 +26,6 @@
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from pprint import pformat
from typing import ( from typing import (
Any, Any,
Optional, Optional,
@ -55,7 +54,7 @@ if TYPE_CHECKING:
import docker import docker
from ._ahab import DockerContainer from ._ahab import DockerContainer
from .feed import maybe_open_feed from ..data.feed import maybe_open_feed
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._profile import Profiler from .._profile import Profiler
@ -63,11 +62,12 @@ from .._profile import Profiler
log = get_logger(__name__) log = get_logger(__name__)
# container level config # ahabd-supervisor and container level config
_config = { _config = {
'grpc_listen_port': 5995, 'grpc_listen_port': 5995,
'ws_listen_port': 5993, 'ws_listen_port': 5993,
'log_level': 'debug', 'log_level': 'debug',
'startup_timeout': 2,
} }
_yaml_config = ''' _yaml_config = '''
@ -135,7 +135,7 @@ def start_marketstore(
# create dirs when dne # create dirs when dne
if not os.path.isdir(config._config_dir): if not os.path.isdir(config._config_dir):
Path(config._config_dir).mkdir(parents=True, exist_ok=True) Path(config._config_dir).mkdir(parents=True, exist_ok=True)
if not os.path.isdir(mktsdir): if not os.path.isdir(mktsdir):
os.mkdir(mktsdir) os.mkdir(mktsdir)

View File

@ -24,7 +24,7 @@ from types import ModuleType
from PyQt5.QtCore import QEvent from PyQt5.QtCore import QEvent
import trio import trio
from .._daemon import maybe_spawn_brokerd from ..service import maybe_spawn_brokerd
from . import _event from . import _event
from ._exec import run_qtractor from ._exec import run_qtractor
from ..data.feed import install_brokerd_search from ..data.feed import install_brokerd_search

View File

@ -49,7 +49,7 @@ from qdarkstyle import DarkPalette
import trio import trio
from outcome import Error from outcome import Error
from .._daemon import ( from ..service import (
maybe_open_pikerd, maybe_open_pikerd,
get_tractor_runtime_kwargs, get_tractor_runtime_kwargs,
) )

View File

@ -24,7 +24,7 @@ import tractor
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from .._daemon import maybe_spawn_brokerd from ..service import maybe_spawn_brokerd
_config_dir = click.get_app_dir('piker') _config_dir = click.get_app_dir('piker')

View File

@ -1,7 +1,6 @@
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import os import os
from typing import AsyncContextManager
from pathlib import Path from pathlib import Path
from shutil import rmtree from shutil import rmtree
@ -11,7 +10,7 @@ from piker import (
# log, # log,
config, config,
) )
from piker._daemon import ( from piker.service import (
Services, Services,
) )
from piker.clearing._client import open_ems from piker.clearing._client import open_ems
@ -88,7 +87,7 @@ async def _open_test_pikerd(
''' '''
import random import random
from piker._daemon import maybe_open_pikerd from piker.service import maybe_open_pikerd
if reg_addr is None: if reg_addr is None:
port = random.randint(6e3, 7e3) port = random.randint(6e3, 7e3)
@ -151,8 +150,9 @@ async def _open_test_pikerd_and_ems(
fqsn, fqsn,
mode=mode, mode=mode,
loglevel=loglevel, loglevel=loglevel,
) as ems_services): ) as ems_services,
yield (services, ems_services) ):
yield (services, ems_services)
@pytest.fixture @pytest.fixture
@ -168,7 +168,7 @@ def open_test_pikerd_and_ems(
mode, mode,
loglevel, loglevel,
open_test_pikerd open_test_pikerd
) )
@pytest.fixture(scope='module') @pytest.fixture(scope='module')

View File

@ -3,7 +3,7 @@ import trio
from typing import AsyncContextManager from typing import AsyncContextManager
from piker._daemon import Services from piker.service import Services
from piker.log import get_logger from piker.log import get_logger
from elasticsearch import Elasticsearch from elasticsearch import Elasticsearch

View File

@ -9,8 +9,7 @@ import pytest
import trio import trio
import tractor import tractor
from piker.log import get_logger from piker.service import (
from piker._daemon import (
find_service, find_service,
Services, Services,
) )