Compare commits
53 Commits
310_plus
...
marketstor
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 381fa11458 | |
Tyler Goodlet | 7bb54f61b7 | |
Tyler Goodlet | c2c3516b34 | |
Tyler Goodlet | 1f58f5daba | |
Tyler Goodlet | eb4b63a710 | |
Tyler Goodlet | 6971eca46c | |
Tyler Goodlet | 033a32cff1 | |
Tyler Goodlet | df8054b7a4 | |
Tyler Goodlet | 288eda195f | |
Tyler Goodlet | 17f469d619 | |
Tyler Goodlet | 94cba54beb | |
Tyler Goodlet | 1c6a46da5d | |
Tyler Goodlet | 54afa30759 | |
Tyler Goodlet | 62ef081694 | |
Tyler Goodlet | 279516959f | |
Tyler Goodlet | 1c3457d829 | |
Tyler Goodlet | 97211a0a7e | |
Tyler Goodlet | 1115d06e33 | |
Tyler Goodlet | c874b6aca0 | |
Tyler Goodlet | 531989a9e5 | |
Tyler Goodlet | 7f5730d222 | |
Tyler Goodlet | 38dc9cc99b | |
Tyler Goodlet | c63eb91f6f | |
Tyler Goodlet | 981735669c | |
Tyler Goodlet | f1b73244f2 | |
Tyler Goodlet | d1599c4f6c | |
Tyler Goodlet | 5a69ab800f | |
Tyler Goodlet | 1623d4ed83 | |
Tyler Goodlet | 96b7d55018 | |
Tyler Goodlet | b6282bc485 | |
Tyler Goodlet | e81e83821e | |
Tyler Goodlet | 4af42ee1b5 | |
Tyler Goodlet | 3eeb1caf32 | |
Tyler Goodlet | a5c1febd12 | |
Tyler Goodlet | ab2d721be4 | |
Tyler Goodlet | 4c38d0246e | |
Tyler Goodlet | ecc3613654 | |
Tyler Goodlet | 216ad65933 | |
Tyler Goodlet | ec501da681 | |
Tyler Goodlet | 218449a9ef | |
Tyler Goodlet | 973f0e6180 | |
Tyler Goodlet | 5a592d5f42 | |
Tyler Goodlet | d6ffe1ba7a | |
Tyler Goodlet | 8f22ccd5ef | |
Tyler Goodlet | 685ec489ef | |
Tyler Goodlet | 48777d72e8 | |
Tyler Goodlet | ac524dc78b | |
Tyler Goodlet | 1693cc42c1 | |
Tyler Goodlet | 91d63f5957 | |
Tyler Goodlet | c291c26a3e | |
Guillermo Rodriguez | d367d68fc2 | |
Guillermo Rodriguez | b0401b91c1 | |
Guillermo Rodriguez | 846e6a3c74 |
|
@ -19,7 +19,7 @@ Structured, daemon tree service management.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import Optional, Union, Callable, Any
|
from typing import Optional, Union, Callable, Any
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager as acm
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
@ -130,7 +130,7 @@ class Services(BaseModel):
|
||||||
_services: Optional[Services] = None
|
_services: Optional[Services] = None
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
async def open_pikerd(
|
async def open_pikerd(
|
||||||
start_method: str = 'trio',
|
start_method: str = 'trio',
|
||||||
loglevel: Optional[str] = None,
|
loglevel: Optional[str] = None,
|
||||||
|
@ -185,7 +185,7 @@ async def open_pikerd(
|
||||||
yield _services
|
yield _services
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
async def open_piker_runtime(
|
async def open_piker_runtime(
|
||||||
name: str,
|
name: str,
|
||||||
enable_modules: list[str] = [],
|
enable_modules: list[str] = [],
|
||||||
|
@ -226,7 +226,7 @@ async def open_piker_runtime(
|
||||||
yield tractor.current_actor()
|
yield tractor.current_actor()
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
async def maybe_open_runtime(
|
async def maybe_open_runtime(
|
||||||
loglevel: Optional[str] = None,
|
loglevel: Optional[str] = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
@ -249,7 +249,7 @@ async def maybe_open_runtime(
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
async def maybe_open_pikerd(
|
async def maybe_open_pikerd(
|
||||||
loglevel: Optional[str] = None,
|
loglevel: Optional[str] = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
@ -300,7 +300,36 @@ class Brokerd:
|
||||||
locks = defaultdict(trio.Lock)
|
locks = defaultdict(trio.Lock)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
|
async def find_service(
|
||||||
|
service_name: str,
|
||||||
|
) -> Optional[tractor.Portal]:
|
||||||
|
|
||||||
|
log.info(f'Scanning for service `{service_name}`')
|
||||||
|
# attach to existing daemon by name if possible
|
||||||
|
async with tractor.find_actor(
|
||||||
|
service_name,
|
||||||
|
arbiter_sockaddr=_registry_addr,
|
||||||
|
) as maybe_portal:
|
||||||
|
yield maybe_portal
|
||||||
|
|
||||||
|
|
||||||
|
async def check_for_service(
|
||||||
|
service_name: str,
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Service daemon "liveness" predicate.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with tractor.query_actor(
|
||||||
|
service_name,
|
||||||
|
arbiter_sockaddr=_registry_addr,
|
||||||
|
) as sockaddr:
|
||||||
|
return sockaddr
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
async def maybe_spawn_daemon(
|
async def maybe_spawn_daemon(
|
||||||
|
|
||||||
service_name: str,
|
service_name: str,
|
||||||
|
@ -310,7 +339,7 @@ async def maybe_spawn_daemon(
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> tractor.Portal:
|
) -> tractor.Portal:
|
||||||
"""
|
'''
|
||||||
If no ``service_name`` daemon-actor can be found,
|
If no ``service_name`` daemon-actor can be found,
|
||||||
spawn one in a local subactor and return a portal to it.
|
spawn one in a local subactor and return a portal to it.
|
||||||
|
|
||||||
|
@ -321,7 +350,7 @@ async def maybe_spawn_daemon(
|
||||||
This can be seen as a service starting api for remote-actor
|
This can be seen as a service starting api for remote-actor
|
||||||
clients.
|
clients.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
if loglevel:
|
if loglevel:
|
||||||
get_console_log(loglevel)
|
get_console_log(loglevel)
|
||||||
|
|
||||||
|
@ -330,19 +359,13 @@ async def maybe_spawn_daemon(
|
||||||
lock = Brokerd.locks[service_name]
|
lock = Brokerd.locks[service_name]
|
||||||
await lock.acquire()
|
await lock.acquire()
|
||||||
|
|
||||||
log.info(f'Scanning for existing {service_name}')
|
async with find_service(service_name) as portal:
|
||||||
# attach to existing daemon by name if possible
|
|
||||||
async with tractor.find_actor(
|
|
||||||
service_name,
|
|
||||||
arbiter_sockaddr=_registry_addr,
|
|
||||||
|
|
||||||
) as portal:
|
|
||||||
if portal is not None:
|
if portal is not None:
|
||||||
lock.release()
|
lock.release()
|
||||||
yield portal
|
yield portal
|
||||||
return
|
return
|
||||||
|
|
||||||
log.warning(f"Couldn't find any existing {service_name}")
|
log.warning(f"Couldn't find any existing {service_name}")
|
||||||
|
|
||||||
# ask root ``pikerd`` daemon to spawn the daemon we need if
|
# ask root ``pikerd`` daemon to spawn the daemon we need if
|
||||||
# pikerd is not live we now become the root of the
|
# pikerd is not live we now become the root of the
|
||||||
|
@ -423,7 +446,7 @@ async def spawn_brokerd(
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
async def maybe_spawn_brokerd(
|
async def maybe_spawn_brokerd(
|
||||||
|
|
||||||
brokername: str,
|
brokername: str,
|
||||||
|
@ -431,7 +454,9 @@ async def maybe_spawn_brokerd(
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> tractor.Portal:
|
) -> tractor.Portal:
|
||||||
'''Helper to spawn a brokerd service.
|
'''
|
||||||
|
Helper to spawn a brokerd service *from* a client
|
||||||
|
who wishes to use the sub-actor-daemon.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with maybe_spawn_daemon(
|
async with maybe_spawn_daemon(
|
||||||
|
@ -483,7 +508,7 @@ async def spawn_emsd(
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
async def maybe_open_emsd(
|
async def maybe_open_emsd(
|
||||||
|
|
||||||
brokername: str,
|
brokername: str,
|
||||||
|
|
|
@ -16,29 +16,22 @@ from .. import config
|
||||||
log = get_logger('cli')
|
log = get_logger('cli')
|
||||||
DEFAULT_BROKER = 'questrade'
|
DEFAULT_BROKER = 'questrade'
|
||||||
|
|
||||||
_config_dir = click.get_app_dir('piker')
|
|
||||||
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
|
|
||||||
_context_defaults = dict(
|
|
||||||
default_map={
|
|
||||||
# Questrade specific quote poll rates
|
|
||||||
'monitor': {
|
|
||||||
'rate': 3,
|
|
||||||
},
|
|
||||||
'optschain': {
|
|
||||||
'rate': 1,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@click.command()
|
@click.command()
|
||||||
@click.option('--loglevel', '-l', default='warning', help='Logging level')
|
@click.option('--loglevel', '-l', default='warning', help='Logging level')
|
||||||
@click.option('--tl', is_flag=True, help='Enable tractor logging')
|
@click.option('--tl', is_flag=True, help='Enable tractor logging')
|
||||||
@click.option('--pdb', is_flag=True, help='Enable tractor debug mode')
|
@click.option('--pdb', is_flag=True, help='Enable tractor debug mode')
|
||||||
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
|
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
|
||||||
def pikerd(loglevel, host, tl, pdb):
|
@click.option(
|
||||||
"""Spawn the piker broker-daemon.
|
'--tsdb',
|
||||||
"""
|
is_flag=True,
|
||||||
|
help='Enable local ``marketstore`` instance'
|
||||||
|
)
|
||||||
|
def pikerd(loglevel, host, tl, pdb, tsdb):
|
||||||
|
'''
|
||||||
|
Spawn the piker broker-daemon.
|
||||||
|
|
||||||
|
'''
|
||||||
from .._daemon import open_pikerd
|
from .._daemon import open_pikerd
|
||||||
log = get_console_log(loglevel)
|
log = get_console_log(loglevel)
|
||||||
|
|
||||||
|
@ -52,13 +45,33 @@ def pikerd(loglevel, host, tl, pdb):
|
||||||
))
|
))
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with open_pikerd(loglevel=loglevel, debug_mode=pdb):
|
|
||||||
|
async with (
|
||||||
|
open_pikerd(
|
||||||
|
loglevel=loglevel,
|
||||||
|
debug_mode=pdb,
|
||||||
|
), # normally delivers a ``Services`` handle
|
||||||
|
trio.open_nursery() as n,
|
||||||
|
):
|
||||||
|
if tsdb:
|
||||||
|
# TODO:
|
||||||
|
# async with maybe_open_marketstored():
|
||||||
|
|
||||||
|
from piker.data._ahab import start_ahab
|
||||||
|
log.info('Spawning `marketstore` supervisor')
|
||||||
|
ctn_ready = await n.start(
|
||||||
|
start_ahab,
|
||||||
|
'marketstored',
|
||||||
|
)
|
||||||
|
await ctn_ready.wait()
|
||||||
|
log.info('`marketstore` container:{uid} up')
|
||||||
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
@click.group(context_settings=_context_defaults)
|
@click.group(context_settings=config._context_defaults)
|
||||||
@click.option(
|
@click.option(
|
||||||
'--brokers', '-b',
|
'--brokers', '-b',
|
||||||
default=[DEFAULT_BROKER],
|
default=[DEFAULT_BROKER],
|
||||||
|
@ -87,8 +100,8 @@ def cli(ctx, brokers, loglevel, tl, configdir):
|
||||||
'loglevel': loglevel,
|
'loglevel': loglevel,
|
||||||
'tractorloglevel': None,
|
'tractorloglevel': None,
|
||||||
'log': get_console_log(loglevel),
|
'log': get_console_log(loglevel),
|
||||||
'confdir': _config_dir,
|
'confdir': config._config_dir,
|
||||||
'wl_path': _watchlists_data_path,
|
'wl_path': config._watchlists_data_path,
|
||||||
})
|
})
|
||||||
|
|
||||||
# allow enabling same loglevel in ``tractor`` machinery
|
# allow enabling same loglevel in ``tractor`` machinery
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
"""
|
"""
|
||||||
Broker configuration mgmt.
|
Broker configuration mgmt.
|
||||||
"""
|
"""
|
||||||
|
import platform
|
||||||
|
import sys
|
||||||
import os
|
import os
|
||||||
from os.path import dirname
|
from os.path import dirname
|
||||||
import shutil
|
import shutil
|
||||||
|
@ -24,14 +26,100 @@ from typing import Optional
|
||||||
|
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
import toml
|
import toml
|
||||||
import click
|
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
|
|
||||||
log = get_logger('broker-config')
|
log = get_logger('broker-config')
|
||||||
|
|
||||||
_config_dir = click.get_app_dir('piker')
|
|
||||||
|
# taken from ``click`` since apparently they have some
|
||||||
|
# super weirdness with sigint and sudo..no clue
|
||||||
|
def get_app_dir(app_name, roaming=True, force_posix=False):
|
||||||
|
r"""Returns the config folder for the application. The default behavior
|
||||||
|
is to return whatever is most appropriate for the operating system.
|
||||||
|
|
||||||
|
To give you an idea, for an app called ``"Foo Bar"``, something like
|
||||||
|
the following folders could be returned:
|
||||||
|
|
||||||
|
Mac OS X:
|
||||||
|
``~/Library/Application Support/Foo Bar``
|
||||||
|
Mac OS X (POSIX):
|
||||||
|
``~/.foo-bar``
|
||||||
|
Unix:
|
||||||
|
``~/.config/foo-bar``
|
||||||
|
Unix (POSIX):
|
||||||
|
``~/.foo-bar``
|
||||||
|
Win XP (roaming):
|
||||||
|
``C:\Documents and Settings\<user>\Local Settings\Application Data\Foo Bar``
|
||||||
|
Win XP (not roaming):
|
||||||
|
``C:\Documents and Settings\<user>\Application Data\Foo Bar``
|
||||||
|
Win 7 (roaming):
|
||||||
|
``C:\Users\<user>\AppData\Roaming\Foo Bar``
|
||||||
|
Win 7 (not roaming):
|
||||||
|
``C:\Users\<user>\AppData\Local\Foo Bar``
|
||||||
|
|
||||||
|
.. versionadded:: 2.0
|
||||||
|
|
||||||
|
:param app_name: the application name. This should be properly capitalized
|
||||||
|
and can contain whitespace.
|
||||||
|
:param roaming: controls if the folder should be roaming or not on Windows.
|
||||||
|
Has no affect otherwise.
|
||||||
|
:param force_posix: if this is set to `True` then on any POSIX system the
|
||||||
|
folder will be stored in the home folder with a leading
|
||||||
|
dot instead of the XDG config home or darwin's
|
||||||
|
application support folder.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _posixify(name):
|
||||||
|
return "-".join(name.split()).lower()
|
||||||
|
|
||||||
|
# if WIN:
|
||||||
|
if platform.system() == 'Windows':
|
||||||
|
key = "APPDATA" if roaming else "LOCALAPPDATA"
|
||||||
|
folder = os.environ.get(key)
|
||||||
|
if folder is None:
|
||||||
|
folder = os.path.expanduser("~")
|
||||||
|
return os.path.join(folder, app_name)
|
||||||
|
if force_posix:
|
||||||
|
return os.path.join(os.path.expanduser("~/.{}".format(_posixify(app_name))))
|
||||||
|
if sys.platform == "darwin":
|
||||||
|
return os.path.join(
|
||||||
|
os.path.expanduser("~/Library/Application Support"), app_name
|
||||||
|
)
|
||||||
|
return os.path.join(
|
||||||
|
os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")),
|
||||||
|
_posixify(app_name),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_config_dir = _click_config_dir = get_app_dir('piker')
|
||||||
|
_parent_user = os.environ.get('SUDO_USER')
|
||||||
|
|
||||||
|
if _parent_user:
|
||||||
|
non_root_user_dir = os.path.expanduser(
|
||||||
|
f'~{_parent_user}'
|
||||||
|
)
|
||||||
|
root = 'root'
|
||||||
|
_config_dir = (
|
||||||
|
non_root_user_dir +
|
||||||
|
_click_config_dir[
|
||||||
|
_click_config_dir.rfind(root) + len(root):
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
_file_name = 'brokers.toml'
|
_file_name = 'brokers.toml'
|
||||||
|
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
|
||||||
|
_context_defaults = dict(
|
||||||
|
default_map={
|
||||||
|
# Questrade specific quote poll rates
|
||||||
|
'monitor': {
|
||||||
|
'rate': 3,
|
||||||
|
},
|
||||||
|
'optschain': {
|
||||||
|
'rate': 1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _override_config_dir(
|
def _override_config_dir(
|
||||||
|
|
|
@ -0,0 +1,469 @@
|
||||||
|
# piker: trading gear for hackers
|
||||||
|
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
Supervisor for docker with included specific-image service helpers.
|
||||||
|
|
||||||
|
'''
|
||||||
|
import os
|
||||||
|
from typing import (
|
||||||
|
Optional,
|
||||||
|
# Any,
|
||||||
|
)
|
||||||
|
from contextlib import asynccontextmanager as acm
|
||||||
|
|
||||||
|
import trio
|
||||||
|
from trio_typing import TaskStatus
|
||||||
|
import tractor
|
||||||
|
import docker
|
||||||
|
import json
|
||||||
|
from docker.models.containers import Container as DockerContainer
|
||||||
|
from docker.errors import DockerException, APIError
|
||||||
|
from requests.exceptions import ConnectionError, ReadTimeout
|
||||||
|
|
||||||
|
from ..log import get_logger, get_console_log
|
||||||
|
from .. import config
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
_config = '''
|
||||||
|
# piker's ``marketstore`` config.
|
||||||
|
|
||||||
|
# mount this config using:
|
||||||
|
# sudo docker run --mount \
|
||||||
|
# type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \
|
||||||
|
# 5993:5993 alpacamarkets/marketstore:latest
|
||||||
|
|
||||||
|
root_directory: data
|
||||||
|
listen_port: 5993
|
||||||
|
grpc_listen_port: 5995
|
||||||
|
log_level: debug
|
||||||
|
queryable: true
|
||||||
|
stop_grace_period: 0
|
||||||
|
wal_rotate_interval: 5
|
||||||
|
stale_threshold: 5
|
||||||
|
enable_add: true
|
||||||
|
enable_remove: false
|
||||||
|
|
||||||
|
triggers:
|
||||||
|
- module: ondiskagg.so
|
||||||
|
on: "*/1Sec/OHLCV"
|
||||||
|
config:
|
||||||
|
# filter: "nasdaq"
|
||||||
|
destinations:
|
||||||
|
- 1Min
|
||||||
|
- 5Min
|
||||||
|
- 15Min
|
||||||
|
- 1H
|
||||||
|
- 1D
|
||||||
|
|
||||||
|
- module: stream.so
|
||||||
|
on: '*/*/*'
|
||||||
|
# config:
|
||||||
|
# filter: "nasdaq"
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
|
class DockerNotStarted(Exception):
|
||||||
|
'Prolly you dint start da daemon bruh'
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def open_docker(
|
||||||
|
url: Optional[str] = None,
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> docker.DockerClient:
|
||||||
|
|
||||||
|
client: Optional[docker.DockerClient] = None
|
||||||
|
try:
|
||||||
|
client = docker.DockerClient(
|
||||||
|
base_url=url,
|
||||||
|
**kwargs
|
||||||
|
) if url else docker.from_env(**kwargs)
|
||||||
|
|
||||||
|
yield client
|
||||||
|
|
||||||
|
except (
|
||||||
|
DockerException,
|
||||||
|
APIError,
|
||||||
|
) as err:
|
||||||
|
|
||||||
|
def unpack_msg(err: Exception) -> str:
|
||||||
|
args = getattr(err, 'args', None)
|
||||||
|
if args:
|
||||||
|
return args
|
||||||
|
else:
|
||||||
|
return str(err)
|
||||||
|
|
||||||
|
# could be more specific so let's check if it's just perms.
|
||||||
|
if err.args:
|
||||||
|
errs = err.args
|
||||||
|
for err in errs:
|
||||||
|
msg = unpack_msg(err)
|
||||||
|
if 'PermissionError' in msg:
|
||||||
|
raise DockerException('You dint run as root yo!')
|
||||||
|
|
||||||
|
elif 'FileNotFoundError' in msg:
|
||||||
|
raise DockerNotStarted('Did you start da service sister?')
|
||||||
|
|
||||||
|
# not perms?
|
||||||
|
raise
|
||||||
|
|
||||||
|
finally:
|
||||||
|
if client:
|
||||||
|
client.close()
|
||||||
|
# client.api._custom_adapter.close()
|
||||||
|
for c in client.containers.list():
|
||||||
|
c.kill()
|
||||||
|
|
||||||
|
|
||||||
|
class Container:
|
||||||
|
'''
|
||||||
|
Wrapper around a ``docker.models.containers.Container`` to include
|
||||||
|
log capture and relay through our native logging system and helper
|
||||||
|
method(s) for cancellation/teardown.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
cntr: DockerContainer,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
self.cntr = cntr
|
||||||
|
# log msg de-duplication
|
||||||
|
self.seen_so_far = set()
|
||||||
|
|
||||||
|
async def process_logs_until(
|
||||||
|
self,
|
||||||
|
patt: str,
|
||||||
|
bp_on_msg: bool = False,
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Attempt to capture container log messages and relay through our
|
||||||
|
native logging system.
|
||||||
|
|
||||||
|
'''
|
||||||
|
seen_so_far = self.seen_so_far
|
||||||
|
|
||||||
|
while True:
|
||||||
|
logs = self.cntr.logs()
|
||||||
|
entries = logs.decode().split('\n')
|
||||||
|
for entry in entries:
|
||||||
|
|
||||||
|
# ignore null lines
|
||||||
|
if not entry:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
record = json.loads(entry.strip())
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
if 'Error' in entry:
|
||||||
|
raise RuntimeError(entry)
|
||||||
|
raise
|
||||||
|
|
||||||
|
msg = record['msg']
|
||||||
|
level = record['level']
|
||||||
|
if msg and entry not in seen_so_far:
|
||||||
|
seen_so_far.add(entry)
|
||||||
|
if bp_on_msg:
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
|
getattr(log, level, log.error)(f'{msg}')
|
||||||
|
|
||||||
|
if patt in msg:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# do a checkpoint so we don't block if cancelled B)
|
||||||
|
await trio.sleep(0.01)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def try_signal(
|
||||||
|
self,
|
||||||
|
signal: str = 'SIGINT',
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
try:
|
||||||
|
# XXX: market store doesn't seem to shutdown nicely all the
|
||||||
|
# time with this (maybe because there are still open grpc
|
||||||
|
# connections?) noticably after client connections have been
|
||||||
|
# made or are in use/teardown. It works just fine if you
|
||||||
|
# just start and stop the container tho?..
|
||||||
|
log.cancel(f'SENDING {signal} to {self.cntr.id}')
|
||||||
|
self.cntr.kill(signal)
|
||||||
|
return True
|
||||||
|
|
||||||
|
except docker.errors.APIError as err:
|
||||||
|
# _err = err
|
||||||
|
if 'is not running' in err.explanation:
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def cancel(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
cid = self.cntr.id
|
||||||
|
self.try_signal('SIGINT')
|
||||||
|
|
||||||
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
cs.shield = True
|
||||||
|
# print('PROCESSINGN LOGS')
|
||||||
|
await self.process_logs_until('initiating graceful shutdown')
|
||||||
|
# print('SHUTDOWN REPORTED BY CONTAINER')
|
||||||
|
await self.process_logs_until('exiting...',)
|
||||||
|
|
||||||
|
for _ in range(10):
|
||||||
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
cs.shield = True
|
||||||
|
# print('waiting on EXITING')
|
||||||
|
await self.process_logs_until('exiting...',)
|
||||||
|
# print('got EXITING')
|
||||||
|
break
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
# get out the big guns, bc apparently marketstore
|
||||||
|
# doesn't actually know how to terminate gracefully
|
||||||
|
# :eyeroll:...
|
||||||
|
self.try_signal('SIGKILL')
|
||||||
|
|
||||||
|
try:
|
||||||
|
log.info('Waiting on container shutdown: {cid}')
|
||||||
|
self.cntr.wait(
|
||||||
|
timeout=0.1,
|
||||||
|
condition='not-running',
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
except (
|
||||||
|
ReadTimeout,
|
||||||
|
ConnectionError,
|
||||||
|
):
|
||||||
|
log.error(f'failed to wait on container {cid}')
|
||||||
|
raise
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise RuntimeError('Failed to cancel container {cid}')
|
||||||
|
|
||||||
|
log.cancel(f'Container stopped: {cid}')
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def open_marketstored(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
**kwargs,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Start and supervise a marketstore instance with its config bind-mounted
|
||||||
|
in from the piker config directory on the system.
|
||||||
|
|
||||||
|
The equivalent cli cmd to this code is:
|
||||||
|
|
||||||
|
sudo docker run --mount \
|
||||||
|
type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \
|
||||||
|
5993:5993 alpacamarkets/marketstore:latest
|
||||||
|
|
||||||
|
'''
|
||||||
|
log = get_console_log('info', name=__name__)
|
||||||
|
|
||||||
|
async with open_docker() as client:
|
||||||
|
|
||||||
|
# create a mount from user's local piker config dir into container
|
||||||
|
config_dir_mnt = docker.types.Mount(
|
||||||
|
target='/etc',
|
||||||
|
source=config._config_dir,
|
||||||
|
type='bind',
|
||||||
|
)
|
||||||
|
|
||||||
|
# create a user config subdir where the marketstore
|
||||||
|
# backing filesystem database can be persisted.
|
||||||
|
persistent_data_dir = os.path.join(
|
||||||
|
config._config_dir, 'data',
|
||||||
|
)
|
||||||
|
if not os.path.isdir(persistent_data_dir):
|
||||||
|
os.mkdir(persistent_data_dir)
|
||||||
|
|
||||||
|
data_dir_mnt = docker.types.Mount(
|
||||||
|
target='/data',
|
||||||
|
source=persistent_data_dir,
|
||||||
|
type='bind',
|
||||||
|
)
|
||||||
|
|
||||||
|
dcntr: DockerContainer = client.containers.run(
|
||||||
|
'alpacamarkets/marketstore:latest',
|
||||||
|
# do we need this for cmds?
|
||||||
|
# '-i',
|
||||||
|
|
||||||
|
# '-p 5993:5993',
|
||||||
|
ports={
|
||||||
|
'5993/tcp': 5993, # jsonrpc
|
||||||
|
'5995/tcp': 5995, # grpc
|
||||||
|
},
|
||||||
|
mounts=[config_dir_mnt, data_dir_mnt],
|
||||||
|
detach=True,
|
||||||
|
# stop_signal='SIGINT',
|
||||||
|
init=True,
|
||||||
|
# remove=True,
|
||||||
|
)
|
||||||
|
cntr = Container(dcntr)
|
||||||
|
|
||||||
|
with trio.move_on_after(1):
|
||||||
|
found = await cntr.process_logs_until(
|
||||||
|
"launching tcp listener for all services...",
|
||||||
|
)
|
||||||
|
|
||||||
|
if not found and cntr not in client.containers.list():
|
||||||
|
raise RuntimeError(
|
||||||
|
'Failed to start `marketstore` check logs deats'
|
||||||
|
)
|
||||||
|
|
||||||
|
await ctx.started((cntr.cntr.id, os.getpid()))
|
||||||
|
|
||||||
|
# async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
# TODO: we might eventually want a proxy-style msg-prot here
|
||||||
|
# to allow remote control of containers without needing
|
||||||
|
# callers to have root perms?
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
# await cntr.cancel()
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# # block for the expected "teardown log msg"..
|
||||||
|
# # await cntr.process_logs_until('exiting...',)
|
||||||
|
|
||||||
|
# # only msg should be to signal killing the
|
||||||
|
# # container and this super daemon.
|
||||||
|
# msg = await stream.receive()
|
||||||
|
# # print("GOT CANCEL MSG")
|
||||||
|
|
||||||
|
# cid = msg['cancel']
|
||||||
|
# log.cancel(f'Cancelling container {cid}')
|
||||||
|
|
||||||
|
# # print("CANCELLING CONTAINER")
|
||||||
|
# await cntr.cancel()
|
||||||
|
|
||||||
|
# # print("SENDING ACK")
|
||||||
|
# await stream.send('ack')
|
||||||
|
|
||||||
|
except (
|
||||||
|
BaseException,
|
||||||
|
# trio.Cancelled,
|
||||||
|
# KeyboardInterrupt,
|
||||||
|
):
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await cntr.cancel()
|
||||||
|
# await stream.send('ack')
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
async def start_ahab(
|
||||||
|
service_name: str,
|
||||||
|
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Start a ``docker`` container supervisor with given service name.
|
||||||
|
|
||||||
|
Currently the actor calling this task should normally be started
|
||||||
|
with root permissions (until we decide to use something that doesn't
|
||||||
|
require this, like docker's rootless mode or some wrapper project) but
|
||||||
|
te root perms are de-escalated after the docker supervisor sub-actor
|
||||||
|
is started.
|
||||||
|
|
||||||
|
'''
|
||||||
|
cn_ready = trio.Event()
|
||||||
|
try:
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
loglevel='runtime',
|
||||||
|
) as tn:
|
||||||
|
|
||||||
|
portal = await tn.start_actor(
|
||||||
|
service_name,
|
||||||
|
enable_modules=[__name__]
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: we have issues with this on teardown
|
||||||
|
# where ``tractor`` tries to issue ``os.kill()``
|
||||||
|
# and hits perms errors since the root process
|
||||||
|
# doesn't any longer have root perms..
|
||||||
|
|
||||||
|
# de-escalate root perms to the original user
|
||||||
|
# after the docker supervisor actor is spawned.
|
||||||
|
if config._parent_user:
|
||||||
|
import pwd
|
||||||
|
os.setuid(
|
||||||
|
pwd.getpwnam(
|
||||||
|
config._parent_user
|
||||||
|
)[2] # named user's uid
|
||||||
|
)
|
||||||
|
|
||||||
|
task_status.started(cn_ready)
|
||||||
|
|
||||||
|
async with portal.open_context(
|
||||||
|
open_marketstored,
|
||||||
|
) as (ctx, first):
|
||||||
|
|
||||||
|
cid, pid = first
|
||||||
|
|
||||||
|
await trio.sleep_forever()
|
||||||
|
# async with ctx.open_stream() as stream:
|
||||||
|
# try:
|
||||||
|
# # run till cancelled
|
||||||
|
# await trio.sleep_forever()
|
||||||
|
# finally:
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# # print('SENDING CANCEL TO MARKETSTORED')
|
||||||
|
# await stream.send({'cancel': (cid, pid)})
|
||||||
|
# assert await stream.receive() == 'ack'
|
||||||
|
|
||||||
|
# since we demoted root perms in this parent
|
||||||
|
# we'll get a perms error on proc cleanup in
|
||||||
|
# ``tractor`` nursery exit. just make sure
|
||||||
|
# the child is terminated and don't raise the
|
||||||
|
# error if so.
|
||||||
|
|
||||||
|
# TODO: we could also consider adding
|
||||||
|
# a ``tractor.ZombieDetected`` or something that we could raise
|
||||||
|
# if we find the child didn't terminate.
|
||||||
|
# await tractor.breakpoint()
|
||||||
|
except PermissionError:
|
||||||
|
log.warning('Failed to cancel root permsed container')
|
||||||
|
|
||||||
|
except (
|
||||||
|
trio.MultiError,
|
||||||
|
) as err:
|
||||||
|
for subexc in err.exceptions:
|
||||||
|
if isinstance(subexc, PermissionError):
|
||||||
|
log.warning('Failed to cancel root perms-ed container')
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
await start_ahab()
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
|
@ -22,14 +22,16 @@ financial data flows.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from collections import Counter
|
from collections import Counter
|
||||||
import time
|
import time
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
|
||||||
from ._sharedmem import ShmArray
|
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ._sharedmem import ShmArray
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
|
@ -22,8 +22,7 @@ from typing import Any
|
||||||
import decimal
|
import decimal
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
from pydantic import BaseModel
|
||||||
from pydantic import BaseModel, validate_arguments
|
|
||||||
# from numba import from_dtype
|
# from numba import from_dtype
|
||||||
|
|
||||||
|
|
||||||
|
@ -127,11 +126,11 @@ def unpack_fqsn(fqsn: str) -> tuple[str, str, str]:
|
||||||
|
|
||||||
|
|
||||||
class Symbol(BaseModel):
|
class Symbol(BaseModel):
|
||||||
"""I guess this is some kinda container thing for dealing with
|
'''
|
||||||
|
I guess this is some kinda container thing for dealing with
|
||||||
all the different meta-data formats from brokers?
|
all the different meta-data formats from brokers?
|
||||||
|
|
||||||
Yah, i guess dats what it izz.
|
'''
|
||||||
"""
|
|
||||||
key: str
|
key: str
|
||||||
tick_size: float = 0.01
|
tick_size: float = 0.01
|
||||||
lot_tick_size: float = 0.0 # "volume" precision as min step value
|
lot_tick_size: float = 0.0 # "volume" precision as min step value
|
||||||
|
@ -254,61 +253,6 @@ class Symbol(BaseModel):
|
||||||
return keys
|
return keys
|
||||||
|
|
||||||
|
|
||||||
def from_df(
|
|
||||||
|
|
||||||
df: pd.DataFrame,
|
|
||||||
source=None,
|
|
||||||
default_tf=None
|
|
||||||
|
|
||||||
) -> np.recarray:
|
|
||||||
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.
|
|
||||||
|
|
||||||
"""
|
|
||||||
df.reset_index(inplace=True)
|
|
||||||
|
|
||||||
# hackery to convert field names
|
|
||||||
date = 'Date'
|
|
||||||
if 'date' in df.columns:
|
|
||||||
date = 'date'
|
|
||||||
|
|
||||||
# convert to POSIX time
|
|
||||||
df[date] = [d.timestamp() for d in df[date]]
|
|
||||||
|
|
||||||
# try to rename from some camel case
|
|
||||||
columns = {
|
|
||||||
'Date': 'time',
|
|
||||||
'date': 'time',
|
|
||||||
'Open': 'open',
|
|
||||||
'High': 'high',
|
|
||||||
'Low': 'low',
|
|
||||||
'Close': 'close',
|
|
||||||
'Volume': 'volume',
|
|
||||||
|
|
||||||
# most feeds are providing this over sesssion anchored
|
|
||||||
'vwap': 'bar_wap',
|
|
||||||
|
|
||||||
# XXX: ib_insync calls this the "wap of the bar"
|
|
||||||
# but no clue what is actually is...
|
|
||||||
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
|
|
||||||
'average': 'bar_wap',
|
|
||||||
}
|
|
||||||
|
|
||||||
df = df.rename(columns=columns)
|
|
||||||
|
|
||||||
for name in df.columns:
|
|
||||||
# if name not in base_ohlc_dtype.names[1:]:
|
|
||||||
if name not in base_ohlc_dtype.names:
|
|
||||||
del df[name]
|
|
||||||
|
|
||||||
# TODO: it turns out column access on recarrays is actually slower:
|
|
||||||
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
|
|
||||||
# it might make sense to make these structured arrays?
|
|
||||||
array = df.to_records(index=False)
|
|
||||||
_nan_to_closest_num(array)
|
|
||||||
|
|
||||||
return array
|
|
||||||
|
|
||||||
|
|
||||||
def _nan_to_closest_num(array: np.ndarray):
|
def _nan_to_closest_num(array: np.ndarray):
|
||||||
"""Return interpolated values instead of NaN.
|
"""Return interpolated values instead of NaN.
|
||||||
|
|
||||||
|
|
|
@ -16,26 +16,34 @@
|
||||||
|
|
||||||
"""
|
"""
|
||||||
marketstore cli.
|
marketstore cli.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import List
|
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
|
||||||
|
from anyio_marketstore import open_marketstore_client
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
import click
|
import click
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
from .marketstore import (
|
from .marketstore import (
|
||||||
get_client,
|
get_client,
|
||||||
stream_quotes,
|
# stream_quotes,
|
||||||
ingest_quote_stream,
|
ingest_quote_stream,
|
||||||
_url,
|
# _url,
|
||||||
_tick_tbk_ids,
|
_tick_tbk_ids,
|
||||||
mk_tbk,
|
mk_tbk,
|
||||||
)
|
)
|
||||||
from ..cli import cli
|
from ..cli import cli
|
||||||
from .. import watchlists as wl
|
from .. import watchlists as wl
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
|
from ._sharedmem import (
|
||||||
|
maybe_open_shm_array,
|
||||||
|
)
|
||||||
|
from ._source import (
|
||||||
|
base_iohlc_dtype,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -49,51 +57,58 @@ log = get_logger(__name__)
|
||||||
)
|
)
|
||||||
@click.argument('names', nargs=-1)
|
@click.argument('names', nargs=-1)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def ms_stream(config: dict, names: List[str], url: str):
|
def ms_stream(
|
||||||
"""Connect to a marketstore time bucket stream for (a set of) symbols(s)
|
config: dict,
|
||||||
|
names: list[str],
|
||||||
|
url: str,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Connect to a marketstore time bucket stream for (a set of) symbols(s)
|
||||||
and print to console.
|
and print to console.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async for quote in stream_quotes(symbols=names):
|
# async for quote in stream_quotes(symbols=names):
|
||||||
log.info(f"Received quote:\n{quote}")
|
# log.info(f"Received quote:\n{quote}")
|
||||||
|
...
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
# @cli.command()
|
||||||
@click.option(
|
# @click.option(
|
||||||
'--url',
|
# '--url',
|
||||||
default=_url,
|
# default=_url,
|
||||||
help='HTTP URL of marketstore instance'
|
# help='HTTP URL of marketstore instance'
|
||||||
)
|
# )
|
||||||
@click.argument('names', nargs=-1)
|
# @click.argument('names', nargs=-1)
|
||||||
@click.pass_obj
|
# @click.pass_obj
|
||||||
def ms_destroy(config: dict, names: List[str], url: str) -> None:
|
# def ms_destroy(config: dict, names: list[str], url: str) -> None:
|
||||||
"""Destroy symbol entries in the local marketstore instance.
|
# """Destroy symbol entries in the local marketstore instance.
|
||||||
"""
|
# """
|
||||||
async def main():
|
# async def main():
|
||||||
nonlocal names
|
# nonlocal names
|
||||||
async with get_client(url) as client:
|
# async with get_client(url) as client:
|
||||||
|
#
|
||||||
if not names:
|
# if not names:
|
||||||
names = await client.list_symbols()
|
# names = await client.list_symbols()
|
||||||
|
#
|
||||||
# default is to wipe db entirely.
|
# # default is to wipe db entirely.
|
||||||
answer = input(
|
# answer = input(
|
||||||
"This will entirely wipe you local marketstore db @ "
|
# "This will entirely wipe you local marketstore db @ "
|
||||||
f"{url} of the following symbols:\n {pformat(names)}"
|
# f"{url} of the following symbols:\n {pformat(names)}"
|
||||||
"\n\nDelete [N/y]?\n")
|
# "\n\nDelete [N/y]?\n")
|
||||||
|
#
|
||||||
if answer == 'y':
|
# if answer == 'y':
|
||||||
for sym in names:
|
# for sym in names:
|
||||||
# tbk = _tick_tbk.format(sym)
|
# # tbk = _tick_tbk.format(sym)
|
||||||
tbk = tuple(sym, *_tick_tbk_ids)
|
# tbk = tuple(sym, *_tick_tbk_ids)
|
||||||
print(f"Destroying {tbk}..")
|
# print(f"Destroying {tbk}..")
|
||||||
await client.destroy(mk_tbk(tbk))
|
# await client.destroy(mk_tbk(tbk))
|
||||||
else:
|
# else:
|
||||||
print("Nothing deleted.")
|
# print("Nothing deleted.")
|
||||||
|
#
|
||||||
tractor.run(main)
|
# tractor.run(main)
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
@cli.command()
|
||||||
|
@ -102,41 +117,53 @@ def ms_destroy(config: dict, names: List[str], url: str) -> None:
|
||||||
is_flag=True,
|
is_flag=True,
|
||||||
help='Enable tractor logging')
|
help='Enable tractor logging')
|
||||||
@click.option(
|
@click.option(
|
||||||
'--url',
|
'--host',
|
||||||
default=_url,
|
default='localhost'
|
||||||
help='HTTP URL of marketstore instance'
|
|
||||||
)
|
)
|
||||||
@click.argument('name', nargs=1, required=True)
|
@click.option(
|
||||||
|
'--port',
|
||||||
|
default=5993
|
||||||
|
)
|
||||||
|
@click.argument('symbols', nargs=-1)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def ms_shell(config, name, tl, url):
|
def storesh(
|
||||||
"""Start an IPython shell ready to query the local marketstore db.
|
config,
|
||||||
"""
|
tl,
|
||||||
async def main():
|
host,
|
||||||
async with get_client(url) as client:
|
port,
|
||||||
query = client.query # noqa
|
symbols: list[str],
|
||||||
# TODO: write magics to query marketstore
|
):
|
||||||
from IPython import embed
|
'''
|
||||||
embed()
|
Start an IPython shell ready to query the local marketstore db.
|
||||||
|
|
||||||
tractor.run(main)
|
'''
|
||||||
|
from piker.data.marketstore import tsdb_history_update
|
||||||
|
from piker._daemon import open_piker_runtime
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
nonlocal symbols
|
||||||
|
|
||||||
|
async with open_piker_runtime(
|
||||||
|
'storesh',
|
||||||
|
enable_modules=['piker.data._ahab'],
|
||||||
|
):
|
||||||
|
symbol = symbols[0]
|
||||||
|
await tsdb_history_update(symbol)
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
@cli.command()
|
||||||
@click.option('--test-file', '-t', help='Test quote stream file')
|
@click.option('--test-file', '-t', help='Test quote stream file')
|
||||||
@click.option('--tl', is_flag=True, help='Enable tractor logging')
|
@click.option('--tl', is_flag=True, help='Enable tractor logging')
|
||||||
@click.option('--tl', is_flag=True, help='Enable tractor logging')
|
|
||||||
@click.option(
|
|
||||||
'--url',
|
|
||||||
default=_url,
|
|
||||||
help='HTTP URL of marketstore instance'
|
|
||||||
)
|
|
||||||
@click.argument('name', nargs=1, required=True)
|
@click.argument('name', nargs=1, required=True)
|
||||||
@click.pass_obj
|
@click.pass_obj
|
||||||
def ingest(config, name, test_file, tl, url):
|
def ingest(config, name, test_file, tl):
|
||||||
"""Ingest real-time broker quotes and ticks to a marketstore instance.
|
'''
|
||||||
"""
|
Ingest real-time broker quotes and ticks to a marketstore instance.
|
||||||
|
|
||||||
|
'''
|
||||||
# global opts
|
# global opts
|
||||||
brokermod = config['brokermod']
|
|
||||||
loglevel = config['loglevel']
|
loglevel = config['loglevel']
|
||||||
tractorloglevel = config['tractorloglevel']
|
tractorloglevel = config['tractorloglevel']
|
||||||
# log = config['log']
|
# log = config['log']
|
||||||
|
@ -145,15 +172,25 @@ def ingest(config, name, test_file, tl, url):
|
||||||
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
|
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
|
||||||
symbols = watchlists[name]
|
symbols = watchlists[name]
|
||||||
|
|
||||||
tractor.run(
|
grouped_syms = {}
|
||||||
partial(
|
for sym in symbols:
|
||||||
ingest_quote_stream,
|
symbol, _, provider = sym.rpartition('.')
|
||||||
symbols,
|
if provider not in grouped_syms:
|
||||||
brokermod.name,
|
grouped_syms[provider] = []
|
||||||
tries=1,
|
|
||||||
loglevel=loglevel,
|
grouped_syms[provider].append(symbol)
|
||||||
),
|
|
||||||
name='ingest_marketstore',
|
async def entry_point():
|
||||||
loglevel=tractorloglevel,
|
async with tractor.open_nursery() as n:
|
||||||
debug_mode=True,
|
for provider, symbols in grouped_syms.items():
|
||||||
)
|
await n.run_in_actor(
|
||||||
|
ingest_quote_stream,
|
||||||
|
name='ingest_marketstore',
|
||||||
|
symbols=symbols,
|
||||||
|
brokername=provider,
|
||||||
|
tries=1,
|
||||||
|
actorloglevel=loglevel,
|
||||||
|
loglevel=tractorloglevel
|
||||||
|
)
|
||||||
|
|
||||||
|
tractor.run(entry_point)
|
||||||
|
|
|
@ -20,6 +20,7 @@ Data feed apis and infra.
|
||||||
This module is enabled for ``brokerd`` daemons.
|
This module is enabled for ``brokerd`` daemons.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
@ -35,12 +36,14 @@ from trio.abc import ReceiveChannel
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
from ..brokers import get_brokermod
|
from ..brokers import get_brokermod
|
||||||
from .._cacheables import maybe_open_context
|
from .._cacheables import maybe_open_context
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from .._daemon import (
|
from .._daemon import (
|
||||||
maybe_spawn_brokerd,
|
maybe_spawn_brokerd,
|
||||||
|
check_for_service,
|
||||||
)
|
)
|
||||||
from ._sharedmem import (
|
from ._sharedmem import (
|
||||||
maybe_open_shm_array,
|
maybe_open_shm_array,
|
||||||
|
@ -124,7 +127,7 @@ class _FeedsBus(BaseModel):
|
||||||
|
|
||||||
# def cancel_task(
|
# def cancel_task(
|
||||||
# self,
|
# self,
|
||||||
# task: trio.lowlevel.Task
|
# task: trio.lowlevel.Task,
|
||||||
# ) -> bool:
|
# ) -> bool:
|
||||||
# ...
|
# ...
|
||||||
|
|
||||||
|
@ -188,6 +191,22 @@ async def _setup_persistent_brokerd(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
async def start_backfill(
|
||||||
|
mod: ModuleType,
|
||||||
|
fqsn: str,
|
||||||
|
shm: ShmArray,
|
||||||
|
|
||||||
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
) -> int:
|
||||||
|
|
||||||
|
return await mod.backfill_bars(
|
||||||
|
fqsn,
|
||||||
|
shm,
|
||||||
|
task_status=task_status,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def manage_history(
|
async def manage_history(
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
bus: _FeedsBus,
|
bus: _FeedsBus,
|
||||||
|
@ -216,50 +235,152 @@ async def manage_history(
|
||||||
# we expect the sub-actor to write
|
# we expect the sub-actor to write
|
||||||
readonly=False,
|
readonly=False,
|
||||||
)
|
)
|
||||||
|
# TODO: history validation
|
||||||
|
if not opened:
|
||||||
|
raise RuntimeError(
|
||||||
|
"Persistent shm for sym was already open?!"
|
||||||
|
)
|
||||||
|
|
||||||
if opened:
|
log.info('Scanning for existing `marketstored`')
|
||||||
|
|
||||||
|
is_up = await check_for_service('marketstored')
|
||||||
|
|
||||||
|
# for now only do backfilling if no tsdb can be found
|
||||||
|
do_legacy_backfill = not is_up and opened
|
||||||
|
|
||||||
|
open_history_client = getattr(mod, 'open_history_client', None)
|
||||||
|
|
||||||
|
if is_up and opened and open_history_client:
|
||||||
|
|
||||||
|
log.info('Found existing `marketstored`')
|
||||||
|
from . import marketstore
|
||||||
|
|
||||||
|
async with marketstore.open_storage_client(
|
||||||
|
fqsn,
|
||||||
|
) as storage:
|
||||||
|
|
||||||
|
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
||||||
|
|
||||||
|
if not tsdb_arrays:
|
||||||
|
do_legacy_backfill = True
|
||||||
|
|
||||||
|
else:
|
||||||
|
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
||||||
|
|
||||||
|
fastest = list(tsdb_arrays.values())[0]
|
||||||
|
times = fastest['Epoch']
|
||||||
|
first, last = times[0], times[-1]
|
||||||
|
first_tsdb_dt, last_tsdb_dt = map(
|
||||||
|
pendulum.from_timestamp, [first, last]
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: this should be used verbatim for the pure
|
||||||
|
# shm backfiller approach below.
|
||||||
|
|
||||||
|
def diff_history(
|
||||||
|
array,
|
||||||
|
start_dt,
|
||||||
|
end_dt,
|
||||||
|
|
||||||
|
) -> np.ndarray:
|
||||||
|
|
||||||
|
s_diff = (last_tsdb_dt - start_dt).seconds
|
||||||
|
|
||||||
|
# if we detect a partial frame's worth of data
|
||||||
|
# that is new, slice out only that history and
|
||||||
|
# write to shm.
|
||||||
|
if s_diff > 0:
|
||||||
|
assert last_tsdb_dt > start_dt
|
||||||
|
selected = array['time'] > last_tsdb_dt.timestamp()
|
||||||
|
to_push = array[selected]
|
||||||
|
log.info(
|
||||||
|
f'Pushing partial frame {to_push.size} to shm'
|
||||||
|
)
|
||||||
|
return to_push
|
||||||
|
|
||||||
|
else:
|
||||||
|
return array
|
||||||
|
|
||||||
|
# start history anal and load missing new data via backend.
|
||||||
|
async with open_history_client(fqsn) as hist:
|
||||||
|
|
||||||
|
# get latest query's worth of history all the way
|
||||||
|
# back to what is recorded in the tsdb
|
||||||
|
array, start_dt, end_dt = await hist(end_dt='')
|
||||||
|
to_push = diff_history(array, start_dt, end_dt)
|
||||||
|
shm.push(to_push)
|
||||||
|
|
||||||
|
# let caller unblock and deliver latest history frame
|
||||||
|
task_status.started(shm)
|
||||||
|
some_data_ready.set()
|
||||||
|
|
||||||
|
# pull new history frames until we hit latest
|
||||||
|
# already in the tsdb
|
||||||
|
while start_dt > last_tsdb_dt:
|
||||||
|
array, start_dt, end_dt = await hist(end_dt=start_dt)
|
||||||
|
to_push = diff_history(array, start_dt, end_dt)
|
||||||
|
shm.push(to_push, prepend=True)
|
||||||
|
|
||||||
|
# TODO: see if there's faster multi-field reads:
|
||||||
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||||
|
# re-index with a `time` and index field
|
||||||
|
shm.push(
|
||||||
|
fastest[-shm._first.value:],
|
||||||
|
|
||||||
|
# insert the history pre a "days worth" of samples
|
||||||
|
# to leave some real-time buffer space at the end.
|
||||||
|
prepend=True,
|
||||||
|
# start=shm._len - _secs_in_day,
|
||||||
|
field_map={
|
||||||
|
'Epoch': 'time',
|
||||||
|
'Open': 'open',
|
||||||
|
'High': 'high',
|
||||||
|
'Low': 'low',
|
||||||
|
'Close': 'close',
|
||||||
|
'Volume': 'volume',
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: write new data to tsdb to be ready to for next
|
||||||
|
# read.
|
||||||
|
|
||||||
|
if do_legacy_backfill:
|
||||||
|
# do a legacy incremental backfill from the provider.
|
||||||
log.info('No existing `marketstored` found..')
|
log.info('No existing `marketstored` found..')
|
||||||
|
|
||||||
|
bfqsn = fqsn.replace('.' + mod.name, '')
|
||||||
# start history backfill task ``backfill_bars()`` is
|
# start history backfill task ``backfill_bars()`` is
|
||||||
# a required backend func this must block until shm is
|
# a required backend func this must block until shm is
|
||||||
# filled with first set of ohlc bars
|
# filled with first set of ohlc bars
|
||||||
_ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
|
await bus.nursery.start(
|
||||||
|
start_backfill,
|
||||||
|
mod,
|
||||||
|
bfqsn,
|
||||||
|
shm,
|
||||||
|
)
|
||||||
|
|
||||||
# yield back after client connect with filled shm
|
# yield back after client connect with filled shm
|
||||||
task_status.started(shm)
|
task_status.started(shm)
|
||||||
|
|
||||||
# indicate to caller that feed can be delivered to
|
# indicate to caller that feed can be delivered to
|
||||||
# remote requesting client since we've loaded history
|
# remote requesting client since we've loaded history
|
||||||
# data that can be used.
|
# data that can be used.
|
||||||
some_data_ready.set()
|
some_data_ready.set()
|
||||||
|
|
||||||
# detect sample step size for sampled historical data
|
|
||||||
times = shm.array['time']
|
|
||||||
delay_s = times[-1] - times[times != times[-1]][-1]
|
|
||||||
|
|
||||||
# begin real-time updates of shm and tsb once the feed
|
|
||||||
# goes live.
|
|
||||||
await feed_is_live.wait()
|
|
||||||
|
|
||||||
if opened:
|
|
||||||
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
|
|
||||||
|
|
||||||
# start shm incrementing for OHLC sampling at the current
|
|
||||||
# detected sampling period if one dne.
|
|
||||||
if sampler.incrementers.get(delay_s) is None:
|
|
||||||
await bus.start_task(
|
|
||||||
increment_ohlc_buffer,
|
|
||||||
delay_s,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
# history retreival loop depending on user interaction and thus
|
||||||
|
# a small RPC-prot for remotely controllinlg what data is loaded
|
||||||
|
# for viewing.
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
async def allocate_persistent_feed(
|
async def allocate_persistent_feed(
|
||||||
bus: _FeedsBus,
|
bus: _FeedsBus,
|
||||||
|
|
||||||
brokername: str,
|
brokername: str,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
|
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
start_stream: bool = True,
|
||||||
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
@ -277,6 +398,7 @@ async def allocate_persistent_feed(
|
||||||
- a real-time streaming task which connec
|
- a real-time streaming task which connec
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# load backend module
|
||||||
try:
|
try:
|
||||||
mod = get_brokermod(brokername)
|
mod = get_brokermod(brokername)
|
||||||
except ImportError:
|
except ImportError:
|
||||||
|
@ -319,7 +441,7 @@ async def allocate_persistent_feed(
|
||||||
manage_history,
|
manage_history,
|
||||||
mod,
|
mod,
|
||||||
bus,
|
bus,
|
||||||
bfqsn,
|
'.'.join((bfqsn, brokername)),
|
||||||
some_data_ready,
|
some_data_ready,
|
||||||
feed_is_live,
|
feed_is_live,
|
||||||
)
|
)
|
||||||
|
@ -333,7 +455,10 @@ async def allocate_persistent_feed(
|
||||||
# true fqsn
|
# true fqsn
|
||||||
fqsn = '.'.join((bfqsn, brokername))
|
fqsn = '.'.join((bfqsn, brokername))
|
||||||
# add a fqsn entry that includes the ``.<broker>`` suffix
|
# add a fqsn entry that includes the ``.<broker>`` suffix
|
||||||
|
# and an entry that includes the broker-specific fqsn (including
|
||||||
|
# any new suffixes or elements as injected by the backend).
|
||||||
init_msg[fqsn] = msg
|
init_msg[fqsn] = msg
|
||||||
|
init_msg[bfqsn] = msg
|
||||||
|
|
||||||
# TODO: pretty sure we don't need this? why not just leave 1s as
|
# TODO: pretty sure we don't need this? why not just leave 1s as
|
||||||
# the fastest "sample period" since we'll probably always want that
|
# the fastest "sample period" since we'll probably always want that
|
||||||
|
@ -347,13 +472,14 @@ async def allocate_persistent_feed(
|
||||||
await some_data_ready.wait()
|
await some_data_ready.wait()
|
||||||
|
|
||||||
# append ``.<broker>`` suffix to each quote symbol
|
# append ``.<broker>`` suffix to each quote symbol
|
||||||
bsym = symbol + f'.{brokername}'
|
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
|
||||||
|
|
||||||
generic_first_quotes = {
|
generic_first_quotes = {
|
||||||
bsym: first_quote,
|
acceptable_not_fqsn_with_broker_suffix: first_quote,
|
||||||
fqsn: first_quote,
|
fqsn: first_quote,
|
||||||
}
|
}
|
||||||
|
|
||||||
bus.feeds[symbol] = bus.feeds[fqsn] = (
|
bus.feeds[symbol] = bus.feeds[bfqsn] = (
|
||||||
init_msg,
|
init_msg,
|
||||||
generic_first_quotes,
|
generic_first_quotes,
|
||||||
)
|
)
|
||||||
|
@ -363,9 +489,25 @@ async def allocate_persistent_feed(
|
||||||
# task_status.started((init_msg, generic_first_quotes))
|
# task_status.started((init_msg, generic_first_quotes))
|
||||||
task_status.started()
|
task_status.started()
|
||||||
|
|
||||||
# backend will indicate when real-time quotes have begun.
|
if not start_stream:
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
# begin real-time updates of shm and tsb once the feed goes live and
|
||||||
|
# the backend will indicate when real-time quotes have begun.
|
||||||
await feed_is_live.wait()
|
await feed_is_live.wait()
|
||||||
|
|
||||||
|
# start shm incrementer task for OHLC style sampling
|
||||||
|
# at the current detected step period.
|
||||||
|
times = shm.array['time']
|
||||||
|
delay_s = times[-1] - times[times != times[-1]][-1]
|
||||||
|
|
||||||
|
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
|
||||||
|
if sampler.incrementers.get(delay_s) is None:
|
||||||
|
await bus.start_task(
|
||||||
|
increment_ohlc_buffer,
|
||||||
|
delay_s,
|
||||||
|
)
|
||||||
|
|
||||||
sum_tick_vlm: bool = init_msg.get(
|
sum_tick_vlm: bool = init_msg.get(
|
||||||
'shm_write_opts', {}
|
'shm_write_opts', {}
|
||||||
).get('sum_tick_vlm', True)
|
).get('sum_tick_vlm', True)
|
||||||
|
@ -388,7 +530,7 @@ async def open_feed_bus(
|
||||||
|
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
brokername: str,
|
brokername: str,
|
||||||
symbol: str,
|
symbol: str, # normally expected to the broker-specific fqsn
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
tick_throttle: Optional[float] = None,
|
tick_throttle: Optional[float] = None,
|
||||||
start_stream: bool = True,
|
start_stream: bool = True,
|
||||||
|
@ -410,7 +552,9 @@ async def open_feed_bus(
|
||||||
# TODO: check for any stale shm entries for this symbol
|
# TODO: check for any stale shm entries for this symbol
|
||||||
# (after we also group them in a nice `/dev/shm/piker/` subdir).
|
# (after we also group them in a nice `/dev/shm/piker/` subdir).
|
||||||
# ensure we are who we think we are
|
# ensure we are who we think we are
|
||||||
assert 'brokerd' in tractor.current_actor().name
|
servicename = tractor.current_actor().name
|
||||||
|
assert 'brokerd' in servicename
|
||||||
|
assert brokername in servicename
|
||||||
|
|
||||||
bus = get_feed_bus(brokername)
|
bus = get_feed_bus(brokername)
|
||||||
|
|
||||||
|
@ -420,7 +564,7 @@ async def open_feed_bus(
|
||||||
entry = bus.feeds.get(symbol)
|
entry = bus.feeds.get(symbol)
|
||||||
if entry is None:
|
if entry is None:
|
||||||
# allocate a new actor-local stream bus which
|
# allocate a new actor-local stream bus which
|
||||||
# will persist for this `brokerd`.
|
# will persist for this `brokerd`'s service lifetime.
|
||||||
async with bus.task_lock:
|
async with bus.task_lock:
|
||||||
await bus.nursery.start(
|
await bus.nursery.start(
|
||||||
partial(
|
partial(
|
||||||
|
@ -428,13 +572,12 @@ async def open_feed_bus(
|
||||||
|
|
||||||
bus=bus,
|
bus=bus,
|
||||||
brokername=brokername,
|
brokername=brokername,
|
||||||
|
|
||||||
# here we pass through the selected symbol in native
|
# here we pass through the selected symbol in native
|
||||||
# "format" (i.e. upper vs. lowercase depending on
|
# "format" (i.e. upper vs. lowercase depending on
|
||||||
# provider).
|
# provider).
|
||||||
symbol=symbol,
|
symbol=symbol,
|
||||||
|
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
start_stream=start_stream,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# TODO: we can remove this?
|
# TODO: we can remove this?
|
||||||
|
@ -450,7 +593,7 @@ async def open_feed_bus(
|
||||||
# true fqsn
|
# true fqsn
|
||||||
fqsn = '.'.join([bfqsn, brokername])
|
fqsn = '.'.join([bfqsn, brokername])
|
||||||
assert fqsn in first_quotes
|
assert fqsn in first_quotes
|
||||||
assert bus.feeds[fqsn]
|
assert bus.feeds[bfqsn]
|
||||||
|
|
||||||
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
|
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
|
||||||
bsym = symbol + f'.{brokername}'
|
bsym = symbol + f'.{brokername}'
|
||||||
|
|
|
@ -14,36 +14,58 @@
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
``marketstore`` integration.
|
``marketstore`` integration.
|
||||||
|
|
||||||
- client management routines
|
- client management routines
|
||||||
- ticK data ingest routines
|
- ticK data ingest routines
|
||||||
- websocket client for subscribing to write triggers
|
- websocket client for subscribing to write triggers
|
||||||
- todo: tick sequence stream-cloning for testing
|
- todo: tick sequence stream-cloning for testing
|
||||||
- todo: docker container management automation
|
|
||||||
"""
|
'''
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager as acm
|
||||||
from typing import Dict, Any, List, Callable, Tuple
|
from pprint import pformat
|
||||||
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Optional,
|
||||||
|
Union,
|
||||||
|
)
|
||||||
import time
|
import time
|
||||||
from math import isnan
|
from math import isnan
|
||||||
|
|
||||||
|
from bidict import bidict
|
||||||
import msgpack
|
import msgpack
|
||||||
|
import pyqtgraph as pg
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import pymarketstore as pymkts
|
|
||||||
import tractor
|
import tractor
|
||||||
from trio_websocket import open_websocket_url
|
from trio_websocket import open_websocket_url
|
||||||
|
from anyio_marketstore import (
|
||||||
|
open_marketstore_client,
|
||||||
|
MarketstoreClient,
|
||||||
|
Params,
|
||||||
|
)
|
||||||
|
import purerpc
|
||||||
|
|
||||||
|
from .feed import maybe_open_feed
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from ..data import open_feed
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
|
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
|
||||||
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
|
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
|
||||||
_url: str = 'http://localhost:5993/rpc'
|
|
||||||
|
_tick_dt = [
|
||||||
|
# these two are required for as a "primary key"
|
||||||
|
('Epoch', 'i8'),
|
||||||
|
('Nanoseconds', 'i4'),
|
||||||
|
('IsTrade', 'i1'),
|
||||||
|
('IsBid', 'i1'),
|
||||||
|
('Price', 'f4'),
|
||||||
|
('Size', 'f4')
|
||||||
|
]
|
||||||
|
|
||||||
_quote_dt = [
|
_quote_dt = [
|
||||||
# these two are required for as a "primary key"
|
# these two are required for as a "primary key"
|
||||||
('Epoch', 'i8'),
|
('Epoch', 'i8'),
|
||||||
|
@ -61,6 +83,7 @@ _quote_dt = [
|
||||||
# ('brokerd_ts', 'i64'),
|
# ('brokerd_ts', 'i64'),
|
||||||
# ('VWAP', 'f4')
|
# ('VWAP', 'f4')
|
||||||
]
|
]
|
||||||
|
|
||||||
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
|
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
|
||||||
_tick_map = {
|
_tick_map = {
|
||||||
'Up': 1,
|
'Up': 1,
|
||||||
|
@ -69,28 +92,39 @@ _tick_map = {
|
||||||
None: np.nan,
|
None: np.nan,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_ohlcv_dt = [
|
||||||
|
# these two are required for as a "primary key"
|
||||||
|
('Epoch', 'i8'),
|
||||||
|
# ('Nanoseconds', 'i4'),
|
||||||
|
|
||||||
class MarketStoreError(Exception):
|
# ohlcv sampling
|
||||||
"Generic marketstore client error"
|
('Open', 'f4'),
|
||||||
|
('High', 'f4'),
|
||||||
|
('Low', 'i8'),
|
||||||
|
('Close', 'i8'),
|
||||||
|
('Volume', 'f4'),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
def err_on_resp(response: dict) -> None:
|
def mk_tbk(keys: tuple[str, str, str]) -> str:
|
||||||
"""Raise any errors found in responses from client request.
|
'''
|
||||||
"""
|
Generate a marketstore table key from a tuple.
|
||||||
responses = response['responses']
|
Converts,
|
||||||
if responses is not None:
|
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
|
||||||
for r in responses:
|
|
||||||
err = r['error']
|
'''
|
||||||
if err:
|
return '/'.join(keys)
|
||||||
raise MarketStoreError(err)
|
|
||||||
|
|
||||||
|
|
||||||
def quote_to_marketstore_structarray(
|
def quote_to_marketstore_structarray(
|
||||||
quote: Dict[str, Any],
|
quote: dict[str, Any],
|
||||||
last_fill: str,
|
last_fill: Optional[float]
|
||||||
|
|
||||||
) -> np.array:
|
) -> np.array:
|
||||||
"""Return marketstore writeable structarray from quote ``dict``.
|
'''
|
||||||
"""
|
Return marketstore writeable structarray from quote ``dict``.
|
||||||
|
|
||||||
|
'''
|
||||||
if last_fill:
|
if last_fill:
|
||||||
# new fill bby
|
# new fill bby
|
||||||
now = timestamp(last_fill)
|
now = timestamp(last_fill)
|
||||||
|
@ -101,7 +135,7 @@ def quote_to_marketstore_structarray(
|
||||||
|
|
||||||
secs, ns = now / 10**9, now % 10**9
|
secs, ns = now / 10**9, now % 10**9
|
||||||
|
|
||||||
# pack into List[Tuple[str, Any]]
|
# pack into list[tuple[str, Any]]
|
||||||
array_input = []
|
array_input = []
|
||||||
|
|
||||||
# insert 'Epoch' entry first and then 'Nanoseconds'.
|
# insert 'Epoch' entry first and then 'Nanoseconds'.
|
||||||
|
@ -123,146 +157,391 @@ def quote_to_marketstore_structarray(
|
||||||
return np.array([tuple(array_input)], dtype=_quote_dt)
|
return np.array([tuple(array_input)], dtype=_quote_dt)
|
||||||
|
|
||||||
|
|
||||||
def timestamp(datestr: str) -> int:
|
def timestamp(date, **kwargs) -> int:
|
||||||
"""Return marketstore compatible 'Epoch' integer in nanoseconds
|
'''
|
||||||
|
Return marketstore compatible 'Epoch' integer in nanoseconds
|
||||||
from a date formatted str.
|
from a date formatted str.
|
||||||
"""
|
|
||||||
return int(pd.Timestamp(datestr).value)
|
'''
|
||||||
|
return int(pd.Timestamp(date, **kwargs).value)
|
||||||
|
|
||||||
|
|
||||||
def mk_tbk(keys: Tuple[str, str, str]) -> str:
|
@acm
|
||||||
"""Generate a marketstore table key from a tuple.
|
|
||||||
|
|
||||||
Converts,
|
|
||||||
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
|
|
||||||
"""
|
|
||||||
return '{}/' + '/'.join(keys)
|
|
||||||
|
|
||||||
|
|
||||||
class Client:
|
|
||||||
"""Async wrapper around the alpaca ``pymarketstore`` sync client.
|
|
||||||
|
|
||||||
This will server as the shell for building out a proper async client
|
|
||||||
that isn't horribly documented and un-tested..
|
|
||||||
"""
|
|
||||||
def __init__(self, url: str):
|
|
||||||
self._client = pymkts.Client(url)
|
|
||||||
|
|
||||||
async def _invoke(
|
|
||||||
self,
|
|
||||||
meth: Callable,
|
|
||||||
*args,
|
|
||||||
**kwargs,
|
|
||||||
) -> Any:
|
|
||||||
return err_on_resp(meth(*args, **kwargs))
|
|
||||||
|
|
||||||
async def destroy(
|
|
||||||
self,
|
|
||||||
tbk: Tuple[str, str, str],
|
|
||||||
) -> None:
|
|
||||||
return await self._invoke(self._client.destroy, mk_tbk(tbk))
|
|
||||||
|
|
||||||
async def list_symbols(
|
|
||||||
self,
|
|
||||||
tbk: str,
|
|
||||||
) -> List[str]:
|
|
||||||
return await self._invoke(self._client.list_symbols, mk_tbk(tbk))
|
|
||||||
|
|
||||||
async def write(
|
|
||||||
self,
|
|
||||||
symbol: str,
|
|
||||||
array: np.ndarray,
|
|
||||||
) -> None:
|
|
||||||
start = time.time()
|
|
||||||
await self._invoke(
|
|
||||||
self._client.write,
|
|
||||||
array,
|
|
||||||
_tick_tbk.format(symbol),
|
|
||||||
isvariablelength=True
|
|
||||||
)
|
|
||||||
log.debug(f"{symbol} write time (s): {time.time() - start}")
|
|
||||||
|
|
||||||
def query(
|
|
||||||
self,
|
|
||||||
symbol,
|
|
||||||
tbk: Tuple[str, str] = _tick_tbk_ids,
|
|
||||||
) -> pd.DataFrame:
|
|
||||||
# XXX: causes crash
|
|
||||||
# client.query(pymkts.Params(symbol, '*', 'OHCLV'
|
|
||||||
result = self._client.query(
|
|
||||||
pymkts.Params(symbol, *tbk),
|
|
||||||
)
|
|
||||||
return result.first().df()
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def get_client(
|
async def get_client(
|
||||||
url: str = _url,
|
host: str = 'localhost',
|
||||||
) -> Client:
|
port: int = 5995
|
||||||
yield Client(url)
|
|
||||||
|
) -> MarketstoreClient:
|
||||||
|
'''
|
||||||
|
Load a ``anyio_marketstore`` grpc client connected
|
||||||
|
to an existing ``marketstore`` server.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with open_marketstore_client(
|
||||||
|
host,
|
||||||
|
port
|
||||||
|
) as client:
|
||||||
|
yield client
|
||||||
|
|
||||||
|
|
||||||
|
class MarketStoreError(Exception):
|
||||||
|
"Generic marketstore client error"
|
||||||
|
|
||||||
|
|
||||||
|
# def err_on_resp(response: dict) -> None:
|
||||||
|
# """Raise any errors found in responses from client request.
|
||||||
|
# """
|
||||||
|
# responses = response['responses']
|
||||||
|
# if responses is not None:
|
||||||
|
# for r in responses:
|
||||||
|
# err = r['error']
|
||||||
|
# if err:
|
||||||
|
# raise MarketStoreError(err)
|
||||||
|
|
||||||
|
|
||||||
|
tf_in_1s = bidict({
|
||||||
|
1: '1Sec',
|
||||||
|
60: '1Min',
|
||||||
|
60*5: '5Min',
|
||||||
|
60*15: '15Min',
|
||||||
|
60*30: '30Min',
|
||||||
|
60*60: '1H',
|
||||||
|
60*60*24: '1D',
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
class Storage:
|
||||||
|
'''
|
||||||
|
High level storage api for both real-time and historical ingest.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
client: MarketstoreClient,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
# TODO: eventually this should be an api/interface type that
|
||||||
|
# ensures we can support multiple tsdb backends.
|
||||||
|
self.client = client
|
||||||
|
|
||||||
|
# series' cache from tsdb reads
|
||||||
|
self._arrays: dict[str, np.ndarray] = {}
|
||||||
|
|
||||||
|
async def list_keys(self) -> list[str]:
|
||||||
|
return await self.client.list_symbols()
|
||||||
|
|
||||||
|
async def search_keys(self, pattern: str) -> list[str]:
|
||||||
|
'''
|
||||||
|
Search for time series key in the storage backend.
|
||||||
|
|
||||||
|
'''
|
||||||
|
...
|
||||||
|
|
||||||
|
async def write_ticks(self, ticks: list) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
async def write_ohlcv(self, ohlcv: np.ndarray) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
async def read_ohlcv(
|
||||||
|
self,
|
||||||
|
fqsn: str,
|
||||||
|
timeframe: Optional[Union[int, str]] = None,
|
||||||
|
|
||||||
|
) -> tuple[
|
||||||
|
MarketstoreClient,
|
||||||
|
Union[dict, np.ndarray]
|
||||||
|
]:
|
||||||
|
client = self.client
|
||||||
|
syms = await client.list_symbols()
|
||||||
|
|
||||||
|
if fqsn not in syms:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
tfstr = tf_in_1s[1]
|
||||||
|
|
||||||
|
params = Params(
|
||||||
|
symbols=fqsn,
|
||||||
|
timeframe=tfstr,
|
||||||
|
attrgroup='OHLCV',
|
||||||
|
# limit_from_start=True,
|
||||||
|
|
||||||
|
# TODO: figure the max limit here given the
|
||||||
|
# ``purepc`` msg size limit of purerpc: 33554432
|
||||||
|
limit=int(800e3),
|
||||||
|
)
|
||||||
|
|
||||||
|
if timeframe is None:
|
||||||
|
log.info(f'starting {fqsn} tsdb granularity scan..')
|
||||||
|
# loop through and try to find highest granularity
|
||||||
|
for tfstr in tf_in_1s.values():
|
||||||
|
try:
|
||||||
|
log.info(f'querying for {tfstr}@{fqsn}')
|
||||||
|
params.set('timeframe', tfstr)
|
||||||
|
result = await client.query(params)
|
||||||
|
break
|
||||||
|
|
||||||
|
except purerpc.grpclib.exceptions.UnknownError:
|
||||||
|
# XXX: this is already logged by the container and
|
||||||
|
# thus shows up through `marketstored` logs relay.
|
||||||
|
# log.warning(f'{tfstr}@{fqsn} not found')
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
else:
|
||||||
|
result = await client.query(params)
|
||||||
|
|
||||||
|
# TODO: it turns out column access on recarrays is actually slower:
|
||||||
|
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
|
||||||
|
# it might make sense to make these structured arrays?
|
||||||
|
# Fill out a `numpy` array-results map
|
||||||
|
arrays = {}
|
||||||
|
for fqsn, data_set in result.by_symbols().items():
|
||||||
|
arrays.setdefault(fqsn, {})[
|
||||||
|
tf_in_1s.inverse[data_set.timeframe]
|
||||||
|
] = data_set.array
|
||||||
|
|
||||||
|
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
|
||||||
|
|
||||||
|
async def delete_ts(
|
||||||
|
self,
|
||||||
|
key: str,
|
||||||
|
timeframe: Optional[Union[int, str]] = None,
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
|
||||||
|
client = self.client
|
||||||
|
syms = await client.list_symbols()
|
||||||
|
print(syms)
|
||||||
|
# if key not in syms:
|
||||||
|
# raise KeyError(f'`{fqsn}` table key not found?')
|
||||||
|
|
||||||
|
return await client.destroy(tbk=key)
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def open_storage_client(
|
||||||
|
fqsn: str,
|
||||||
|
period: Optional[Union[int, str]] = None, # in seconds
|
||||||
|
|
||||||
|
) -> tuple[Storage, dict[str, np.ndarray]]:
|
||||||
|
'''
|
||||||
|
Load a series by key and deliver in ``numpy`` struct array format.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with (
|
||||||
|
# eventually a storage backend endpoint
|
||||||
|
get_client() as client,
|
||||||
|
):
|
||||||
|
# slap on our wrapper api
|
||||||
|
yield Storage(client)
|
||||||
|
|
||||||
|
|
||||||
|
async def tsdb_history_update(
|
||||||
|
fqsn: str,
|
||||||
|
|
||||||
|
) -> list[str]:
|
||||||
|
|
||||||
|
# TODO: real-time dedicated task for ensuring
|
||||||
|
# history consistency between the tsdb, shm and real-time feed..
|
||||||
|
|
||||||
|
# update sequence design notes:
|
||||||
|
|
||||||
|
# - load existing highest frequency data from mkts
|
||||||
|
# * how do we want to offer this to the UI?
|
||||||
|
# - lazy loading?
|
||||||
|
# - try to load it all and expect graphics caching/diffing
|
||||||
|
# to hide extra bits that aren't in view?
|
||||||
|
|
||||||
|
# - compute the diff between latest data from broker and shm
|
||||||
|
# * use sql api in mkts to determine where the backend should
|
||||||
|
# start querying for data?
|
||||||
|
# * append any diff with new shm length
|
||||||
|
# * determine missing (gapped) history by scanning
|
||||||
|
# * how far back do we look?
|
||||||
|
|
||||||
|
# - begin rt update ingest and aggregation
|
||||||
|
# * could start by always writing ticks to mkts instead of
|
||||||
|
# worrying about a shm queue for now.
|
||||||
|
# * we have a short list of shm queues worth groking:
|
||||||
|
# - https://github.com/pikers/piker/issues/107
|
||||||
|
# * the original data feed arch blurb:
|
||||||
|
# - https://github.com/pikers/piker/issues/98
|
||||||
|
#
|
||||||
|
profiler = pg.debug.Profiler(
|
||||||
|
disabled=False, # not pg_profile_enabled(),
|
||||||
|
delayed=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
open_storage_client(fqsn) as storage,
|
||||||
|
|
||||||
|
maybe_open_feed(
|
||||||
|
[fqsn],
|
||||||
|
start_stream=False,
|
||||||
|
|
||||||
|
) as (feed, stream),
|
||||||
|
):
|
||||||
|
profiler(f'opened feed for {fqsn}')
|
||||||
|
|
||||||
|
symbol = feed.symbols.get(fqsn)
|
||||||
|
if symbol:
|
||||||
|
fqsn = symbol.front_fqsn()
|
||||||
|
|
||||||
|
syms = await storage.client.list_symbols()
|
||||||
|
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
|
||||||
|
profiler(f'listed symbols {syms}')
|
||||||
|
|
||||||
|
# diff db history with shm and only write the missing portions
|
||||||
|
ohlcv = feed.shm.array
|
||||||
|
|
||||||
|
# TODO: use pg profiler
|
||||||
|
tsdb_arrays = await storage.read_ohlcv(fqsn)
|
||||||
|
|
||||||
|
to_append = feed.shm.array
|
||||||
|
to_prepend = None
|
||||||
|
|
||||||
|
# hist diffing
|
||||||
|
if tsdb_arrays:
|
||||||
|
onesec = tsdb_arrays[1]
|
||||||
|
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
|
||||||
|
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
|
||||||
|
|
||||||
|
profiler('Finished db arrays diffs')
|
||||||
|
|
||||||
|
for array in [to_append, to_prepend]:
|
||||||
|
if array is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f'Writing datums {array.size} -> to tsdb from shm\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# build mkts schema compat array for writing
|
||||||
|
mkts_dt = np.dtype(_ohlcv_dt)
|
||||||
|
mkts_array = np.zeros(
|
||||||
|
len(array),
|
||||||
|
dtype=mkts_dt,
|
||||||
|
)
|
||||||
|
# copy from shm array (yes it's this easy):
|
||||||
|
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
|
||||||
|
mkts_array[:] = array[[
|
||||||
|
'time',
|
||||||
|
'open',
|
||||||
|
'high',
|
||||||
|
'low',
|
||||||
|
'close',
|
||||||
|
'volume',
|
||||||
|
]]
|
||||||
|
|
||||||
|
# write to db
|
||||||
|
resp = await storage.client.write(
|
||||||
|
mkts_array,
|
||||||
|
tbk=f'{fqsn}/1Sec/OHLCV',
|
||||||
|
|
||||||
|
# NOTE: will will append duplicates
|
||||||
|
# for the same timestamp-index.
|
||||||
|
# TODO: pre deduplicate?
|
||||||
|
isvariablelength=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f'Wrote {to_append.size} datums to tsdb\n'
|
||||||
|
)
|
||||||
|
profiler('Finished db writes')
|
||||||
|
|
||||||
|
for resp in resp.responses:
|
||||||
|
err = resp.error
|
||||||
|
if err:
|
||||||
|
raise MarketStoreError(err)
|
||||||
|
|
||||||
|
from tractor.trionics import ipython_embed
|
||||||
|
await ipython_embed()
|
||||||
|
|
||||||
|
|
||||||
async def ingest_quote_stream(
|
async def ingest_quote_stream(
|
||||||
symbols: List[str],
|
symbols: list[str],
|
||||||
brokername: str,
|
brokername: str,
|
||||||
tries: int = 1,
|
tries: int = 1,
|
||||||
loglevel: str = None,
|
loglevel: str = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
|
'''
|
||||||
"""
|
Ingest a broker quote stream into a ``marketstore`` tsdb.
|
||||||
async with open_feed(
|
|
||||||
brokername,
|
|
||||||
symbols,
|
|
||||||
loglevel=loglevel,
|
|
||||||
) as (first_quotes, qstream):
|
|
||||||
|
|
||||||
quote_cache = first_quotes.copy()
|
'''
|
||||||
|
async with (
|
||||||
|
maybe_open_feed(brokername, symbols, loglevel=loglevel) as feed,
|
||||||
|
get_client() as ms_client,
|
||||||
|
):
|
||||||
|
async for quotes in feed.stream:
|
||||||
|
log.info(quotes)
|
||||||
|
for symbol, quote in quotes.items():
|
||||||
|
for tick in quote.get('ticks', ()):
|
||||||
|
ticktype = tick.get('type', 'n/a')
|
||||||
|
|
||||||
async with get_client() as ms_client:
|
# techtonic tick write
|
||||||
|
array = quote_to_marketstore_structarray({
|
||||||
|
'IsTrade': 1 if ticktype == 'trade' else 0,
|
||||||
|
'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
|
||||||
|
'Price': tick.get('price'),
|
||||||
|
'Size': tick.get('size')
|
||||||
|
}, last_fill=quote.get('broker_ts', None))
|
||||||
|
|
||||||
# start ingest to marketstore
|
await ms_client.write(array, _tick_tbk)
|
||||||
async for quotes in qstream:
|
|
||||||
log.info(quotes)
|
|
||||||
for symbol, quote in quotes.items():
|
|
||||||
|
|
||||||
# remap tick strs to ints
|
# LEGACY WRITE LOOP (using old tick dt)
|
||||||
quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
|
# quote_cache = {
|
||||||
|
# 'size': 0,
|
||||||
|
# 'tick': 0
|
||||||
|
# }
|
||||||
|
|
||||||
# check for volume update (i.e. did trades happen
|
# async for quotes in qstream:
|
||||||
# since last quote)
|
# log.info(quotes)
|
||||||
new_vol = quote.get('volume', None)
|
# for symbol, quote in quotes.items():
|
||||||
if new_vol is None:
|
|
||||||
log.debug(f"No fills for {symbol}")
|
|
||||||
if new_vol == quote_cache.get('volume'):
|
|
||||||
# should never happen due to field diffing
|
|
||||||
# on sender side
|
|
||||||
log.error(
|
|
||||||
f"{symbol}: got same volume as last quote?")
|
|
||||||
|
|
||||||
quote_cache.update(quote)
|
# # remap tick strs to ints
|
||||||
|
# quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
|
||||||
|
|
||||||
a = quote_to_marketstore_structarray(
|
# # check for volume update (i.e. did trades happen
|
||||||
quote,
|
# # since last quote)
|
||||||
# TODO: check this closer to the broker query api
|
# new_vol = quote.get('volume', None)
|
||||||
last_fill=quote.get('fill_time', '')
|
# if new_vol is None:
|
||||||
)
|
# log.debug(f"No fills for {symbol}")
|
||||||
await ms_client.write(symbol, a)
|
# if new_vol == quote_cache.get('volume'):
|
||||||
|
# # should never happen due to field diffing
|
||||||
|
# # on sender side
|
||||||
|
# log.error(
|
||||||
|
# f"{symbol}: got same volume as last quote?")
|
||||||
|
|
||||||
|
# quote_cache.update(quote)
|
||||||
|
|
||||||
|
# a = quote_to_marketstore_structarray(
|
||||||
|
# quote,
|
||||||
|
# # TODO: check this closer to the broker query api
|
||||||
|
# last_fill=quote.get('fill_time', '')
|
||||||
|
# )
|
||||||
|
# await ms_client.write(symbol, a)
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
symbols: List[str],
|
symbols: list[str],
|
||||||
host: str = 'localhost',
|
host: str = 'localhost',
|
||||||
port: int = 5993,
|
port: int = 5993,
|
||||||
diff_cached: bool = True,
|
diff_cached: bool = True,
|
||||||
loglevel: str = None,
|
loglevel: str = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Open a symbol stream from a running instance of marketstore and
|
'''
|
||||||
|
Open a symbol stream from a running instance of marketstore and
|
||||||
log to console.
|
log to console.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||||
|
|
||||||
tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
|
tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
|
||||||
|
|
||||||
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
|
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
|
||||||
# send subs topics to server
|
# send subs topics to server
|
||||||
|
@ -271,7 +550,7 @@ async def stream_quotes(
|
||||||
)
|
)
|
||||||
log.info(resp)
|
log.info(resp)
|
||||||
|
|
||||||
async def recv() -> Dict[str, Any]:
|
async def recv() -> dict[str, Any]:
|
||||||
return msgpack.loads((await ws.get_message()), encoding='utf-8')
|
return msgpack.loads((await ws.get_message()), encoding='utf-8')
|
||||||
|
|
||||||
streams = (await recv())['streams']
|
streams = (await recv())['streams']
|
||||||
|
|
|
@ -7,3 +7,7 @@
|
||||||
# pin this to a dev branch that we have more control over especially
|
# pin this to a dev branch that we have more control over especially
|
||||||
# as more graphics stuff gets hashed out.
|
# as more graphics stuff gets hashed out.
|
||||||
-e git+https://github.com/pikers/pyqtgraph.git@piker_pin#egg=pyqtgraph
|
-e git+https://github.com/pikers/pyqtgraph.git@piker_pin#egg=pyqtgraph
|
||||||
|
|
||||||
|
|
||||||
|
# our async client for ``marketstore`` (the tsdb)
|
||||||
|
-e git+https://github.com/pikers/anyio-marketstore.git@master#egg=anyio-marketstore
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
# piker: trading gear for hackers
|
# piker: trading gear for hackers
|
||||||
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
@ -30,11 +30,13 @@ orig_win_id = t.find_focused().window
|
||||||
# for tws
|
# for tws
|
||||||
win_names: list[str] = [
|
win_names: list[str] = [
|
||||||
'Interactive Brokers', # tws running in i3
|
'Interactive Brokers', # tws running in i3
|
||||||
'IB Gateway.', # gw running in i3
|
'IB Gateway', # gw running in i3
|
||||||
|
# 'IB', # gw running in i3 (newer version?)
|
||||||
]
|
]
|
||||||
|
|
||||||
for name in win_names:
|
for name in win_names:
|
||||||
results = t.find_named(name)
|
results = t.find_titled(name)
|
||||||
|
print(f'results for {name}: {results}')
|
||||||
if results:
|
if results:
|
||||||
con = results[0]
|
con = results[0]
|
||||||
print(f'Resetting data feed for {name}')
|
print(f'Resetting data feed for {name}')
|
||||||
|
@ -47,22 +49,32 @@ for name in win_names:
|
||||||
# https://github.com/rr-/pyxdotool
|
# https://github.com/rr-/pyxdotool
|
||||||
# https://github.com/ShaneHutter/pyxdotool
|
# https://github.com/ShaneHutter/pyxdotool
|
||||||
# https://github.com/cphyc/pyxdotool
|
# https://github.com/cphyc/pyxdotool
|
||||||
subprocess.call([
|
|
||||||
'xdotool',
|
|
||||||
'windowactivate', '--sync', win_id,
|
|
||||||
|
|
||||||
# move mouse to bottom left of window (where there should
|
# TODO: only run the reconnect (2nd) kc on a detected
|
||||||
# be nothing to click).
|
# disconnect?
|
||||||
'mousemove_relative', '--sync', str(w-4), str(h-4),
|
for key_combo, timeout in [
|
||||||
|
# only required if we need a connection reset.
|
||||||
|
# ('ctrl+alt+r', 12),
|
||||||
|
# data feed reset.
|
||||||
|
('ctrl+alt+f', 6)
|
||||||
|
]:
|
||||||
|
subprocess.call([
|
||||||
|
'xdotool',
|
||||||
|
'windowactivate', '--sync', win_id,
|
||||||
|
|
||||||
# NOTE: we may need to stick a `--retry 3` in here..
|
# move mouse to bottom left of window (where there should
|
||||||
'click', '--window', win_id, '--repeat', '3', '1',
|
# be nothing to click).
|
||||||
|
'mousemove_relative', '--sync', str(w-4), str(h-4),
|
||||||
|
|
||||||
# hackzorzes
|
# NOTE: we may need to stick a `--retry 3` in here..
|
||||||
'key', 'ctrl+alt+f',
|
'click', '--window', win_id,
|
||||||
],
|
'--repeat', '3', '1',
|
||||||
timeout=1,
|
|
||||||
)
|
# hackzorzes
|
||||||
|
'key', key_combo,
|
||||||
|
],
|
||||||
|
timeout=timeout,
|
||||||
|
)
|
||||||
|
|
||||||
# re-activate and focus original window
|
# re-activate and focus original window
|
||||||
subprocess.call([
|
subprocess.call([
|
8
setup.py
8
setup.py
|
@ -77,6 +77,14 @@ setup(
|
||||||
# tsdbs
|
# tsdbs
|
||||||
'pymarketstore',
|
'pymarketstore',
|
||||||
],
|
],
|
||||||
|
extras_require={
|
||||||
|
|
||||||
|
# serialization
|
||||||
|
'tsdb': [
|
||||||
|
'docker',
|
||||||
|
],
|
||||||
|
|
||||||
|
},
|
||||||
tests_require=['pytest'],
|
tests_require=['pytest'],
|
||||||
python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...
|
python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...
|
||||||
keywords=["async", "trading", "finance", "quant", "charting"],
|
keywords=["async", "trading", "finance", "quant", "charting"],
|
||||||
|
|
Loading…
Reference in New Issue