Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 40e49333be Bump mkts timeout to 2s 2023-03-08 15:25:38 -05:00
Tyler Goodlet f627fedf74 Move all docker and external db code to `piker.service` 2023-03-08 15:25:20 -05:00
Tyler Goodlet bd248381ea Start `piker.service` sub-package
For now just moves everything that was in `piker._daemon` to a subpkg
module but a reorg is coming pronto!
2023-03-08 15:14:39 -05:00
Tyler Goodlet a70d76e3e6 Set explicit `marketstore` container startup timeout 2023-03-08 15:01:06 -05:00
Tyler Goodlet a5caaef467 Hardcode `cancel` log level for `ahabd` for now 2023-03-08 15:00:24 -05:00
Tyler Goodlet 7e35696dbb Always passthrough loglevel to `ahabd` supervisor 2023-03-08 14:56:21 -05:00
Tyler Goodlet 93702320a3 Background docker-container logs processing
Previously we would make the `ahabd` supervisor-actor sync to docker
container startup using pseudo-blocking log message processing.

This has issues,
- we're forced to do a hacky "yield back to `trio`" in order to be
  "fake async" when reading the log stream and further,
- blocking on a message is fragile and often slow.

Instead, run the log processor in a background task and in the parent
task poll for the container to be in the client list using a similar
pseudo-async poll pattern. This allows the super to `Context.started()`
sooner (when the container is actually registered as "up") and thus
unblock its (remote) caller faster whilst still doing full log msg
proxying!

Deatz:
- adds `Container.cuid: str` a unique container id for logging.
- correctly proxy through the `loglevel: str` from `pikerd` caller task.
- shield around `Container.cancel()` in the teardown block and use
  cancel level logging in that method.
2023-03-08 14:28:48 -05:00
Tyler Goodlet 5683eb8ef0 Doc string and types bump in loggin mod 2023-03-08 14:22:23 -05:00
Tyler Goodlet ad6b655d7d Apply `Services` runtime state **immediately** inside starup block 2023-03-08 13:01:42 -05:00
Tyler Goodlet 6d1ecdde40 Deliver es specific ahab-super in endpoint startup config 2023-03-08 13:00:11 -05:00
19 changed files with 221 additions and 125 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers.
# Copyright 2020-eternity Tyler Goodlet (in stewardship for piker0)
# Copyright 2020-eternity Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -14,11 +14,11 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
'''
piker: trading gear for hackers.
"""
from ._daemon import open_piker_runtime
'''
from .service import open_piker_runtime
from .data.feed import open_feed
__all__ = [

View File

@ -29,8 +29,15 @@ import tractor
from ..cli import cli
from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger
from .._daemon import maybe_spawn_brokerd, maybe_open_pikerd
from ..brokers import core, get_brokermod, data
from ..service import (
maybe_spawn_brokerd,
maybe_open_pikerd,
)
from ..brokers import (
core,
get_brokermod,
data,
)
log = get_logger('cli')
DEFAULT_BROKER = 'questrade'
@ -60,6 +67,7 @@ def get_method(client, meth_name: str):
print_ok('found!.')
return method
async def run_method(client, meth_name: str, **kwargs):
method = get_method(client, meth_name)
print('running...', end='', flush=True)
@ -67,19 +75,20 @@ async def run_method(client, meth_name: str, **kwargs):
print_ok(f'done! result: {type(result)}')
return result
async def run_test(broker_name: str):
brokermod = get_brokermod(broker_name)
total = 0
passed = 0
failed = 0
print(f'getting client...', end='', flush=True)
print('getting client...', end='', flush=True)
if not hasattr(brokermod, 'get_client'):
print_error('fail! no \'get_client\' context manager found.')
return
async with brokermod.get_client(is_brokercheck=True) as client:
print_ok(f'done! inside client context.')
print_ok('done! inside client context.')
# check for methods present on brokermod
method_list = [
@ -130,7 +139,6 @@ async def run_test(broker_name: str):
total += 1
# check for methods present con brokermod.Client and their
# results
@ -180,7 +188,6 @@ def brokercheck(config, broker):
trio.run(run_test, broker)
@cli.command()
@click.option('--keys', '-k', multiple=True,
help='Return results only for these keys')
@ -335,8 +342,6 @@ def contracts(ctx, loglevel, broker, symbol, ids):
brokermod = get_brokermod(broker)
get_console_log(loglevel)
contracts = trio.run(partial(core.contracts, brokermod, symbol))
if not ids:
# just print out expiry dates which can be used with

View File

@ -28,7 +28,7 @@ import trio
from ..log import get_logger
from . import get_brokermod
from .._daemon import maybe_spawn_brokerd
from ..service import maybe_spawn_brokerd
from .._cacheables import open_cached_client

View File

@ -29,8 +29,11 @@ from tractor.trionics import broadcast_receiver
from ..log import get_logger
from ..data.types import Struct
from .._daemon import maybe_open_emsd
from ._messages import Order, Cancel
from ..service import maybe_open_emsd
from ._messages import (
Order,
Cancel,
)
from ..brokers import get_brokermod
if TYPE_CHECKING:

View File

@ -19,16 +19,18 @@ CLI commons.
'''
import os
from pprint import pformat
from functools import partial
import click
import trio
import tractor
from ..log import get_console_log, get_logger, colorize_json
from ..log import (
get_console_log,
get_logger,
colorize_json,
)
from ..brokers import get_brokermod
from .._daemon import (
from ..service import (
_default_registry_host,
_default_registry_port,
)
@ -68,7 +70,7 @@ def pikerd(
'''
from .._daemon import open_pikerd
from ..service import open_pikerd
log = get_console_log(loglevel)
if pdb:
@ -171,7 +173,7 @@ def cli(
@click.pass_obj
def services(config, tl, ports):
from .._daemon import (
from ..service import (
open_piker_runtime,
_default_registry_port,
_default_registry_host,
@ -204,8 +206,8 @@ def services(config, tl, ports):
def _load_clis() -> None:
from ..data import marketstore # noqa
from ..data import elastic
from ..service import marketstore # noqa
from ..service import elastic
from ..data import cli # noqa
from ..brokers import cli # noqa
from ..ui import cli # noqa

View File

@ -42,7 +42,7 @@ from ..log import (
get_logger,
get_console_log,
)
from .._daemon import maybe_spawn_daemon
from ..service import maybe_spawn_daemon
if TYPE_CHECKING:
from ._sharedmem import (
@ -69,7 +69,7 @@ class Sampler:
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see
``._daemon.maybe_open_samplerd()`` and the below
``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
'''
@ -391,7 +391,7 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting.
'''
from piker._daemon import Services
from piker.service import Services
dname = 'samplerd'
log.info(f'Spawning `{dname}`')

View File

@ -137,14 +137,14 @@ def storesh(
'''
from piker.data.marketstore import open_tsdb_client
from piker._daemon import open_piker_runtime
from piker.service import open_piker_runtime
async def main():
nonlocal symbols
async with open_piker_runtime(
'storesh',
enable_modules=['piker.data._ahab'],
enable_modules=['piker.service._ahab'],
):
symbol = symbols[0]
@ -187,14 +187,14 @@ def storage(
'''
from piker.data.marketstore import open_tsdb_client
from piker._daemon import open_piker_runtime
from piker.service import open_piker_runtime
async def main():
nonlocal symbols
async with open_piker_runtime(
'tsdb_storage',
enable_modules=['piker.data._ahab'],
enable_modules=['piker.service._ahab'],
):
symbol = symbols[0]
async with open_tsdb_client(symbol) as storage:

View File

@ -58,7 +58,7 @@ from ..log import (
get_logger,
get_console_log,
)
from .._daemon import (
from ..service import (
maybe_spawn_brokerd,
check_for_service,
)

View File

@ -21,7 +21,11 @@ import logging
import json
import tractor
from pygments import highlight, lexers, formatters
from pygments import (
highlight,
lexers,
formatters,
)
# Makes it so we only see the full module name when using ``__name__``
# without the extra "piker." prefix.
@ -32,26 +36,48 @@ def get_logger(
name: str = None,
) -> logging.Logger:
'''Return the package log or a sub-log for `name` if provided.
'''
Return the package log or a sub-log for `name` if provided.
'''
return tractor.log.get_logger(name=name, _root_name=_proj_name)
def get_console_log(level: str = None, name: str = None) -> logging.Logger:
'''Get the package logger and enable a handler which writes to stderr.
def get_console_log(
level: str | None = None,
name: str | None = None,
) -> logging.Logger:
'''
Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use ``DictConfig``. You do it...
'''
return tractor.log.get_console_log(
level, name=name, _root_name=_proj_name) # our root logger
level,
name=name,
_root_name=_proj_name,
) # our root logger
def colorize_json(data, style='algol_nu'):
"""Colorize json output using ``pygments``.
"""
formatted_json = json.dumps(data, sort_keys=True, indent=4)
def colorize_json(
data: dict,
style='algol_nu',
):
'''
Colorize json output using ``pygments``.
'''
formatted_json = json.dumps(
data,
sort_keys=True,
indent=4,
)
return highlight(
formatted_json, lexers.JsonLexer(),
formatted_json,
lexers.JsonLexer(),
# likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style)
)

View File

@ -19,6 +19,8 @@ Structured, daemon tree service management.
"""
from __future__ import annotations
from pprint import pformat
from functools import partial
import os
from typing import (
Optional,
@ -35,14 +37,11 @@ import tractor
import trio
from trio_typing import TaskStatus
from .log import (
from ..log import (
get_logger,
get_console_log,
)
from .brokers import get_brokermod
from pprint import pformat
from functools import partial
from ..brokers import get_brokermod
log = get_logger(__name__)
@ -337,7 +336,6 @@ async def open_pikerd(
alive underling services (see below).
'''
async with (
open_piker_runtime(
@ -355,17 +353,26 @@ async def open_pikerd(
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
):
assert root_actor.accept_addr == reg_addr
if root_actor.accept_addr != reg_addr:
raise RuntimeError(f'Daemon failed to bind on {reg_addr}!?')
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
if tsdb:
from piker.data._ahab import start_ahab
from piker.data.marketstore import start_marketstore
from ._ahab import start_ahab
from .marketstore import start_marketstore
log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await service_nursery.start(
partial(
start_ahab,
'marketstored',
start_marketstore,
loglevel=loglevel,
)
)
log.info(
@ -385,7 +392,7 @@ async def open_pikerd(
start_ahab,
'elasticsearch',
start_elasticsearch,
start_timeout=240.0 # high cause ci
loglevel=loglevel,
)
)
@ -396,12 +403,6 @@ async def open_pikerd(
f'config: {pformat(config)}'
)
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try:
yield Services
@ -667,7 +668,7 @@ async def spawn_brokerd(
)
# non-blocking setup of brokerd service nursery
from .data import _setup_persistent_brokerd
from ..data import _setup_persistent_brokerd
await Services.start_service_task(
dname,
@ -695,7 +696,10 @@ async def maybe_spawn_brokerd(
f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={'brokername': brokername, 'loglevel': loglevel},
spawn_args={
'brokername': brokername,
'loglevel': loglevel,
},
loglevel=loglevel,
**kwargs,
@ -727,7 +731,7 @@ async def spawn_emsd(
)
# non-blocking setup of clearing service
from .clearing._ems import _setup_persistent_emsd
from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task(
'emsd',

View File

@ -19,6 +19,7 @@ Supervisor for docker with included specific-image service helpers.
'''
from collections import ChainMap
from functools import partial
import os
import time
from typing import (
@ -46,7 +47,10 @@ from requests.exceptions import (
ReadTimeout,
)
from ..log import get_logger, get_console_log
from ..log import (
get_logger,
get_console_log,
)
from .. import config
log = get_logger(__name__)
@ -197,6 +201,11 @@ class Container:
return False
@property
def cuid(self) -> str:
fqcn: str = self.cntr.attrs['Config']['Image']
return f'{fqcn}[{self.cntr.short_id}]'
def try_signal(
self,
signal: str = 'SIGINT',
@ -232,17 +241,23 @@ class Container:
async def cancel(
self,
stop_msg: str,
log_msg_key: str,
stop_predicate: Callable[[str], bool],
hard_kill: bool = False,
) -> None:
'''
Attempt to cancel this container gracefully, fail over to
a hard kill on timeout.
'''
cid = self.cntr.id
# first try a graceful cancel
log.cancel(
f'SIGINT cancelling container: {cid}\n'
f'waiting on stop msg: "{stop_msg}"'
f'SIGINT cancelling container: {self.cuid}\n'
'waiting on stop predicate...'
)
self.try_signal('SIGINT')
@ -253,7 +268,10 @@ class Container:
log.cancel('polling for CNTR logs...')
try:
await self.process_logs_until(stop_msg)
await self.process_logs_until(
log_msg_key,
stop_predicate,
)
except ApplicationLogError:
hard_kill = True
else:
@ -311,11 +329,16 @@ class Container:
async def open_ahabd(
ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type
loglevel: str | None = 'cancel',
**kwargs,
) -> None:
get_console_log('info', name=__name__)
log = get_console_log(
loglevel,
name=__name__,
)
async with open_docker() as client:
@ -338,33 +361,43 @@ async def open_ahabd(
# defaults
{
# startup time limit which is the max the supervisor
# will wait for the container to be registered in
# ``client.containers.list()``
'startup_timeout': 1.0,
# how fast to poll for the starup predicate by sleeping
# this amount incrementally thus yielding to the
# ``trio`` scheduler on during sync polling execution.
'startup_query_period': 0.001,
# str-key value expected to contain log message body-contents
# when read using:
# ``json.loads(entry for entry in DockerContainer.logs())``
'log_msg_key': 'msg',
},
)
found = False
with trio.move_on_after(conf['startup_timeout']):
found = await cntr.process_logs_until(
conf['log_msg_key'],
start_lambda,
with trio.move_on_after(conf['startup_timeout']) as cs:
async with trio.open_nursery() as tn:
tn.start_soon(
partial(
cntr.process_logs_until,
log_msg_key=conf['log_msg_key'],
patt_matcher=start_lambda,
checkpoint_period=conf['startup_query_period'],
)
# XXX: if we timeout on finding the "startup msg" we expect then
# we want to FOR SURE raise an error upwards!
if (
not found
and dcntr not in client.containers.list()
):
for entry in cntr.seen_so_far:
log.info(entry)
raise RuntimeError(
f'Failed to start {dcntr.id} check logs deats'
)
# poll for container startup or timeout
while not cs.cancel_called:
if dcntr in client.containers.list():
break
await trio.sleep(conf['startup_query_period'])
# sync with remote caller actor-task but allow log
# processing to continue running in bg.
await ctx.started((
cntr.cntr.id,
os.getpid(),
@ -372,6 +405,19 @@ async def open_ahabd(
))
try:
# XXX: if we timeout on finding the "startup msg" we expect then
# we want to FOR SURE raise an error upwards!
if cs.cancelled_caught:
# if dcntr not in client.containers.list():
for entry in cntr.seen_so_far:
log.info(entry)
raise DockerNotStarted(
f'Failed to start container: {cntr.cuid}\n'
f'due to startup_timeout={conf["startup_timeout"]}s\n\n'
"prolly you should check your container's logs for deats.."
)
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
@ -380,14 +426,21 @@ async def open_ahabd(
finally:
# TODO: ensure loglevel can be set and teardown logs are
# reported if possible on error or cancel..
# XXX WARNING: currently shielding here can result in hangs
# on ctl-c from user.. ideally we can avoid a cancel getting
# consumed and not propagating whilst still doing teardown
# logging..
# with trio.CancelScope(shield=True):
await cntr.cancel(stop_lambda)
await cntr.cancel(
log_msg_key=conf['log_msg_key'],
stop_predicate=stop_lambda,
)
async def start_ahab(
service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer],
loglevel: str | None = None,
loglevel: str | None = 'cancel',
task_status: TaskStatus[
tuple[
@ -409,13 +462,12 @@ async def start_ahab(
'''
cn_ready = trio.Event()
try:
async with tractor.open_nursery(
loglevel='runtime',
) as tn:
async with tractor.open_nursery() as an:
portal = await tn.start_actor(
portal = await an.start_actor(
service_name,
enable_modules=[__name__]
enable_modules=[__name__],
loglevel=loglevel,
)
# TODO: we have issues with this on teardown
@ -436,6 +488,7 @@ async def start_ahab(
async with portal.open_context(
open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)),
loglevel='cancel',
) as (ctx, first):
cid, pid, cntr_config = first

View File

@ -15,17 +15,11 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import (
Any,
TYPE_CHECKING,
)
import pyqtgraph as pg
import numpy as np
import tractor
if TYPE_CHECKING:
import docker
@ -65,14 +59,14 @@ def start_elasticsearch(
-itd \
--rm \
--network=host \
--mount type=bind,source="$(pwd)"/elastic,target=/usr/share/elasticsearch/data \
--mount type=bind,source="$(pwd)"/elastic,\
target=/usr/share/elasticsearch/data \
--env "elastic_username=elastic" \
--env "elastic_password=password" \
--env "xpack.security.enabled=false" \
elastic
'''
import docker
get_console_log('info', name=__name__)
dcntr: DockerContainer = client.containers.run(
@ -86,7 +80,7 @@ def start_elasticsearch(
async def start_matcher(msg: str):
try:
health = (await asks.get(
f'http://localhost:19200/_cat/health',
'http://localhost:19200/_cat/health',
params={'format': 'json'}
)).json()
@ -102,7 +96,17 @@ def start_elasticsearch(
return (
dcntr,
{},
{
# apparently we're REALLY tolerant of startup latency
# for CI XD
'startup_timeout': 240.0,
# XXX: decrease http poll period bc docker
# is shite at handling fast poll rates..
'startup_query_period': 0.1,
'log_msg_key': 'message',
},
# expected startup and stop msgs
start_matcher,
stop_matcher,

View File

@ -26,7 +26,6 @@
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from datetime import datetime
from pprint import pformat
from typing import (
Any,
Optional,
@ -55,7 +54,7 @@ if TYPE_CHECKING:
import docker
from ._ahab import DockerContainer
from .feed import maybe_open_feed
from ..data.feed import maybe_open_feed
from ..log import get_logger, get_console_log
from .._profile import Profiler
@ -63,11 +62,12 @@ from .._profile import Profiler
log = get_logger(__name__)
# container level config
# ahabd-supervisor and container level config
_config = {
'grpc_listen_port': 5995,
'ws_listen_port': 5993,
'log_level': 'debug',
'startup_timeout': 2,
}
_yaml_config = '''

View File

@ -24,7 +24,7 @@ from types import ModuleType
from PyQt5.QtCore import QEvent
import trio
from .._daemon import maybe_spawn_brokerd
from ..service import maybe_spawn_brokerd
from . import _event
from ._exec import run_qtractor
from ..data.feed import install_brokerd_search

View File

@ -49,7 +49,7 @@ from qdarkstyle import DarkPalette
import trio
from outcome import Error
from .._daemon import (
from ..service import (
maybe_open_pikerd,
get_tractor_runtime_kwargs,
)

View File

@ -24,7 +24,7 @@ import tractor
from ..cli import cli
from .. import watchlists as wl
from .._daemon import maybe_spawn_brokerd
from ..service import maybe_spawn_brokerd
_config_dir = click.get_app_dir('piker')

View File

@ -1,7 +1,6 @@
from contextlib import asynccontextmanager as acm
from functools import partial
import os
from typing import AsyncContextManager
from pathlib import Path
from shutil import rmtree
@ -11,7 +10,7 @@ from piker import (
# log,
config,
)
from piker._daemon import (
from piker.service import (
Services,
)
from piker.clearing._client import open_ems
@ -88,7 +87,7 @@ async def _open_test_pikerd(
'''
import random
from piker._daemon import maybe_open_pikerd
from piker.service import maybe_open_pikerd
if reg_addr is None:
port = random.randint(6e3, 7e3)
@ -151,7 +150,8 @@ async def _open_test_pikerd_and_ems(
fqsn,
mode=mode,
loglevel=loglevel,
) as ems_services):
) as ems_services,
):
yield (services, ems_services)

View File

@ -3,7 +3,7 @@ import trio
from typing import AsyncContextManager
from piker._daemon import Services
from piker.service import Services
from piker.log import get_logger
from elasticsearch import Elasticsearch

View File

@ -9,8 +9,7 @@ import pytest
import trio
import tractor
from piker.log import get_logger
from piker._daemon import (
from piker.service import (
find_service,
Services,
)