Factor `marketstore` container specifics into `piker.data.marketstore`

l1_precision_fix
Tyler Goodlet 2022-05-10 14:38:38 -04:00
parent 9ddfae44d2
commit e196e9d1a0
3 changed files with 143 additions and 100 deletions

View File

@ -72,14 +72,15 @@ def pikerd(loglevel, host, tl, pdb, tsdb):
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
if tsdb: if tsdb:
# TODO:
# async with maybe_open_marketstored():
from piker.data._ahab import start_ahab from piker.data._ahab import start_ahab
from piker.data.marketstore import start_marketstore
log.info('Spawning `marketstore` supervisor') log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await n.start( ctn_ready, config, (cid, pid) = await n.start(
start_ahab, start_ahab,
'marketstored', 'marketstored',
start_marketstore,
) )
log.info( log.info(
f'`marketstored` up pid:{pid}\n' f'`marketstored` up pid:{pid}\n'

View File

@ -21,6 +21,7 @@ Supervisor for docker with included specific-image service helpers.
import os import os
from typing import ( from typing import (
Optional, Optional,
Callable,
Any, Any,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
@ -28,6 +29,7 @@ from contextlib import asynccontextmanager as acm
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor.msg import NamespacePath
import docker import docker
import json import json
from docker.models.containers import Container as DockerContainer from docker.models.containers import Container as DockerContainer
@ -39,50 +41,6 @@ from .. import config
log = get_logger(__name__) log = get_logger(__name__)
_config = {
'grpc_listen_port': 5995,
'ws_listen_port': 5993,
'log_level': 'debug',
}
_yaml_config = '''
# piker's ``marketstore`` config.
# mount this config using:
# sudo docker run --mount \
# type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \
# 5993:5993 alpacamarkets/marketstore:latest
root_directory: data
listen_port: {ws_listen_port}
grpc_listen_port: {grpc_listen_port}
log_level: {log_level}
queryable: true
stop_grace_period: 0
wal_rotate_interval: 5
stale_threshold: 5
enable_add: true
enable_remove: false
triggers:
- module: ondiskagg.so
on: "*/1Sec/OHLCV"
config:
# filter: "nasdaq"
destinations:
- 1Min
- 5Min
- 15Min
- 1H
- 1D
- module: stream.so
on: '*/*/*'
# config:
# filter: "nasdaq"
'''.format(**_config)
class DockerNotStarted(Exception): class DockerNotStarted(Exception):
'Prolly you dint start da daemon bruh' 'Prolly you dint start da daemon bruh'
@ -263,63 +221,22 @@ class Container:
@tractor.context @tractor.context
async def open_marketstored( async def open_ahabd(
ctx: tractor.Context, ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type
**kwargs, **kwargs,
) -> None: ) -> None:
'''
Start and supervise a marketstore instance with its config bind-mounted
in from the piker config directory on the system.
The equivalent cli cmd to this code is:
sudo docker run --mount \
type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \
5993:5993 alpacamarkets/marketstore:latest
'''
get_console_log('info', name=__name__) get_console_log('info', name=__name__)
async with open_docker() as client: async with open_docker() as client:
# create a mount from user's local piker config dir into container # TODO: eventually offer a config-oriented API to do the mounts,
config_dir_mnt = docker.types.Mount( # params, etc. passing to ``Containter.run()``?
target='/etc', # call into endpoint for container config/init
source=config._config_dir, ep_func = NamespacePath(endpoint).load_ref()
type='bind', dcntr, cntr_config = ep_func(client)
)
# create a user config subdir where the marketstore
# backing filesystem database can be persisted.
persistent_data_dir = os.path.join(
config._config_dir, 'data',
)
if not os.path.isdir(persistent_data_dir):
os.mkdir(persistent_data_dir)
data_dir_mnt = docker.types.Mount(
target='/data',
source=persistent_data_dir,
type='bind',
)
dcntr: DockerContainer = client.containers.run(
'alpacamarkets/marketstore:latest',
# do we need this for cmds?
# '-i',
# '-p 5993:5993',
ports={
'5993/tcp': 5993, # jsonrpc
'5995/tcp': 5995, # grpc
},
mounts=[config_dir_mnt, data_dir_mnt],
detach=True,
# stop_signal='SIGINT',
init=True,
# remove=True,
)
cntr = Container(dcntr) cntr = Container(dcntr)
with trio.move_on_after(1): with trio.move_on_after(1):
@ -332,7 +249,11 @@ async def open_marketstored(
'Failed to start `marketstore` check logs deats' 'Failed to start `marketstore` check logs deats'
) )
await ctx.started((cntr.cntr.id, os.getpid())) await ctx.started((
cntr.cntr.id,
os.getpid(),
cntr_config,
))
try: try:
@ -355,6 +276,7 @@ async def open_marketstored(
async def start_ahab( async def start_ahab(
service_name: str, service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer],
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.Event, trio.Event,
@ -400,14 +322,15 @@ async def start_ahab(
) )
async with portal.open_context( async with portal.open_context(
open_marketstored, open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)),
) as (ctx, first): ) as (ctx, first):
cid, pid = first cid, pid, cntr_config = first
task_status.started(( task_status.started((
cn_ready, cn_ready,
_config, cntr_config,
(cid, pid), (cid, pid),
)) ))

View File

@ -23,6 +23,7 @@
- todo: tick sequence stream-cloning for testing - todo: tick sequence stream-cloning for testing
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from pprint import pformat from pprint import pformat
@ -30,6 +31,7 @@ from typing import (
Any, Any,
Optional, Optional,
Union, Union,
TYPE_CHECKING,
) )
import time import time
from math import isnan from math import isnan
@ -49,12 +51,129 @@ from anyio_marketstore import (
import pendulum import pendulum
import purerpc import purerpc
if TYPE_CHECKING:
import docker
from ._ahab import DockerContainer
from .feed import maybe_open_feed from .feed import maybe_open_feed
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
log = get_logger(__name__) log = get_logger(__name__)
# container level config
_config = {
'grpc_listen_port': 5995,
'ws_listen_port': 5993,
'log_level': 'debug',
}
_yaml_config = '''
# piker's ``marketstore`` config.
# mount this config using:
# sudo docker run --mount \
# type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \
# 5993:5993 alpacamarkets/marketstore:latest
root_directory: data
listen_port: {ws_listen_port}
grpc_listen_port: {grpc_listen_port}
log_level: {log_level}
queryable: true
stop_grace_period: 0
wal_rotate_interval: 5
stale_threshold: 5
enable_add: true
enable_remove: false
triggers:
- module: ondiskagg.so
on: "*/1Sec/OHLCV"
config:
# filter: "nasdaq"
destinations:
- 1Min
- 5Min
- 15Min
- 1H
- 1D
- module: stream.so
on: '*/*/*'
# config:
# filter: "nasdaq"
'''.format(**_config)
def start_marketstore(
client: docker.DockerClient,
**kwargs,
) -> tuple[DockerContainer, dict[str, Any]]:
'''
Start and supervise a marketstore instance with its config bind-mounted
in from the piker config directory on the system.
The equivalent cli cmd to this code is:
sudo docker run --mount \
type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \
5993:5993 alpacamarkets/marketstore:latest
'''
import os
import docker
from .. import config
get_console_log('info', name=__name__)
# create a mount from user's local piker config dir into container
config_dir_mnt = docker.types.Mount(
target='/etc',
source=config._config_dir,
type='bind',
)
# create a user config subdir where the marketstore
# backing filesystem database can be persisted.
persistent_data_dir = os.path.join(
config._config_dir, 'data',
)
if not os.path.isdir(persistent_data_dir):
os.mkdir(persistent_data_dir)
data_dir_mnt = docker.types.Mount(
target='/data',
source=persistent_data_dir,
type='bind',
)
dcntr: DockerContainer = client.containers.run(
'alpacamarkets/marketstore:latest',
# do we need this for cmds?
# '-i',
# '-p 5993:5993',
ports={
'5993/tcp': 5993, # jsonrpc / ws?
'5995/tcp': 5995, # grpc
},
mounts=[
config_dir_mnt,
data_dir_mnt,
],
detach=True,
# stop_signal='SIGINT',
init=True,
# remove=True,
)
return dcntr, _config
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') _tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) _tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)