Merge pull request #308 from pikers/storage_layer

Storage layer: initial `marketstore` tsdb support with async OHLCV history loading.
l1_precision_fix
goodboy 2022-05-11 10:18:33 -04:00 committed by GitHub
commit 482fc1da10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 2354 additions and 473 deletions

View File

@ -19,7 +19,7 @@ Structured, daemon tree service management.
"""
from typing import Optional, Union, Callable, Any
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager as acm
from collections import defaultdict
from pydantic import BaseModel
@ -130,7 +130,7 @@ class Services(BaseModel):
_services: Optional[Services] = None
@asynccontextmanager
@acm
async def open_pikerd(
start_method: str = 'trio',
loglevel: Optional[str] = None,
@ -185,7 +185,7 @@ async def open_pikerd(
yield _services
@asynccontextmanager
@acm
async def open_piker_runtime(
name: str,
enable_modules: list[str] = [],
@ -226,7 +226,7 @@ async def open_piker_runtime(
yield tractor.current_actor()
@asynccontextmanager
@acm
async def maybe_open_runtime(
loglevel: Optional[str] = None,
**kwargs,
@ -249,7 +249,7 @@ async def maybe_open_runtime(
yield
@asynccontextmanager
@acm
async def maybe_open_pikerd(
loglevel: Optional[str] = None,
**kwargs,
@ -300,7 +300,36 @@ class Brokerd:
locks = defaultdict(trio.Lock)
@asynccontextmanager
@acm
async def find_service(
service_name: str,
) -> Optional[tractor.Portal]:
log.info(f'Scanning for service `{service_name}`')
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
arbiter_sockaddr=_registry_addr,
) as maybe_portal:
yield maybe_portal
async def check_for_service(
service_name: str,
) -> bool:
'''
Service daemon "liveness" predicate.
'''
async with tractor.query_actor(
service_name,
arbiter_sockaddr=_registry_addr,
) as sockaddr:
return sockaddr
@acm
async def maybe_spawn_daemon(
service_name: str,
@ -310,7 +339,7 @@ async def maybe_spawn_daemon(
**kwargs,
) -> tractor.Portal:
"""
'''
If no ``service_name`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
@ -321,7 +350,7 @@ async def maybe_spawn_daemon(
This can be seen as a service starting api for remote-actor
clients.
"""
'''
if loglevel:
get_console_log(loglevel)
@ -330,13 +359,7 @@ async def maybe_spawn_daemon(
lock = Brokerd.locks[service_name]
await lock.acquire()
log.info(f'Scanning for existing {service_name}')
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
arbiter_sockaddr=_registry_addr,
) as portal:
async with find_service(service_name) as portal:
if portal is not None:
lock.release()
yield portal
@ -423,7 +446,7 @@ async def spawn_brokerd(
return True
@asynccontextmanager
@acm
async def maybe_spawn_brokerd(
brokername: str,
@ -431,7 +454,9 @@ async def maybe_spawn_brokerd(
**kwargs,
) -> tractor.Portal:
'''Helper to spawn a brokerd service.
'''
Helper to spawn a brokerd service *from* a client
who wishes to use the sub-actor-daemon.
'''
async with maybe_spawn_daemon(
@ -483,7 +508,7 @@ async def spawn_emsd(
return True
@asynccontextmanager
@acm
async def maybe_open_emsd(
brokername: str,

View File

@ -33,7 +33,41 @@ class SymbolNotFound(BrokerError):
class NoData(BrokerError):
"Symbol data not permitted"
'''
Symbol data not permitted or no data
for time range found.
'''
def __init__(
self,
*args,
frame_size: int = 1000,
) -> None:
super().__init__(*args)
# when raised, machinery can check if the backend
# set a "frame size" for doing datetime calcs.
self.frame_size: int = 1000
class DataUnavailable(BrokerError):
'''
Signal storage requests to terminate.
'''
# TODO: add in a reason that can be displayed in the
# UI (for eg. `kraken` is bs and you should complain
# to them that you can't pull more OHLC data..)
class DataThrottle(BrokerError):
'''
Broker throttled request rate for data.
'''
# TODO: add in throttle metrics/feedback
def resproc(
@ -50,12 +84,12 @@ def resproc(
if not resp.status_code == 200:
raise BrokerError(resp.body)
try:
json = resp.json()
msg = resp.json()
except json.decoder.JSONDecodeError:
log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text)
if log_resp:
log.debug(f"Received json contents:\n{colorize_json(json)}")
log.debug(f"Received json contents:\n{colorize_json(msg)}")
return json if return_json else resp
return msg if return_json else resp

View File

@ -19,6 +19,7 @@ Binance backend
"""
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import (
Any, Union, Optional,
AsyncGenerator, Callable,
@ -221,20 +222,22 @@ class Client:
async def bars(
self,
symbol: str,
start_time: int = None,
end_time: int = None,
start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None,
limit: int = 1000, # <- max allowed per query
as_np: bool = True,
) -> dict:
if start_time is None:
start_time = binance_timestamp(
pendulum.now('UTC').start_of('minute').subtract(minutes=limit)
)
if end_dt is None:
end_dt = pendulum.now('UTC')
if end_time is None:
end_time = binance_timestamp(pendulum.now('UTC'))
if start_dt is None:
start_dt = end_dt.start_of(
'minute').subtract(minutes=limit)
start_time = binance_timestamp(start_dt)
end_time = binance_timestamp(end_dt)
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
bars = await self._api(
@ -379,7 +382,27 @@ async def open_history_client(
# TODO implement history getter for the new storage layer.
async with open_cached_client('binance') as client:
yield client
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
array = await client.bars(
symbol,
start_dt=start_dt,
end_dt=end_dt,
)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def backfill_bars(

View File

@ -57,6 +57,8 @@ from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client
from fuzzywuzzy import process as fuzzy
import numpy as np
import pendulum
from .. import config
from ..log import get_logger, get_console_log
@ -295,6 +297,10 @@ class Client:
global _enters
# log.info(f'REQUESTING BARS {_enters} @ end={end_dt}')
print(f'REQUESTING BARS {_enters} @ end={end_dt}')
if not end_dt:
end_dt = ''
_enters += 1
contract = await self.find_contract(fqsn)
@ -1438,8 +1444,6 @@ async def get_bars(
a ``MethoProxy``.
'''
import pendulum
fails = 0
bars: Optional[list] = None
first_dt: datetime = None
@ -1467,7 +1471,9 @@ async def get_bars(
time = bars_array['time']
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(f'bars retreived for dts {first_dt}:{last_dt}')
log.info(
f'{len(bars)} bars retreived for {first_dt} -> {last_dt}'
)
return (bars, bars_array, first_dt, last_dt), fails
@ -1478,21 +1484,30 @@ async def get_bars(
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(f'Symbol: {fqsn}')
break
raise NoData(
f'Symbol: {fqsn}',
)
elif (
err.code == 162
and 'HMDS query returned no data' in err.message
):
# try to decrement start point and look further back
end_dt = last_dt = last_dt.subtract(seconds=2000)
# XXX: this is now done in the storage mgmt layer
# and we shouldn't implicitly decrement the frame dt
# index since the upper layer may be doing so
# concurrently and we don't want to be delivering frames
# that weren't asked for.
log.warning(
f'No data found ending @ {end_dt}\n'
f'Starting another request for {end_dt}'
f'NO DATA found ending @ {end_dt}\n'
)
continue
# try to decrement start point and look further back
# end_dt = last_dt = last_dt.subtract(seconds=2000)
raise NoData(
f'Symbol: {fqsn}',
frame_size=2000,
)
elif _pacing in msg:
@ -1546,8 +1561,8 @@ async def open_history_client(
async with open_client_proxy() as proxy:
async def get_hist(
end_dt: str,
start_dt: str = '',
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[np.ndarray, str]:
@ -1558,7 +1573,10 @@ async def open_history_client(
if out == (None, None):
# could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(f'{end_dt}')
raise NoData(
f'{end_dt}',
frame_size=2000,
)
bars, bars_array, first_dt, last_dt = out
@ -1569,7 +1587,12 @@ async def open_history_client(
return bars_array, first_dt, last_dt
yield get_hist
# TODO: it seems like we can do async queries for ohlc
# but getting the order right still isn't working and I'm not
# quite sure why.. needs some tinkering and probably
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe
# we have to do the batch queries on the `asyncio` side?
yield get_hist, {'erlangs': 1, 'rate': 6}
async def backfill_bars(
@ -1831,6 +1854,7 @@ async def stream_quotes(
symbol=sym,
)
first_quote = normalize(first_ticker)
# print(f'first quote: {first_quote}')
def mk_init_msgs() -> dict[str, dict]:
# pass back some symbol info like min_tick, trading_hours, etc.

View File

@ -20,7 +20,8 @@ Kraken backend.
'''
from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field
from typing import Any, Optional, AsyncIterator, Callable
from datetime import datetime
from typing import Any, Optional, AsyncIterator, Callable, Union
import time
from trio_typing import TaskStatus
@ -40,7 +41,13 @@ import base64
from .. import config
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ._util import (
resproc,
SymbolNotFound,
BrokerError,
DataThrottle,
DataUnavailable,
)
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws, NoBsWs
@ -391,17 +398,26 @@ class Client:
async def bars(
self,
symbol: str = 'XBTUSD',
# UTC 2017-07-02 12:53:20
since: int = None,
since: Optional[Union[int, datetime]] = None,
count: int = 720, # <- max allowed per query
as_np: bool = True,
) -> dict:
if since is None:
since = pendulum.now('UTC').start_of('minute').subtract(
minutes=count).timestamp()
elif isinstance(since, int):
since = pendulum.from_timestamp(since).timestamp()
else: # presumably a pendulum datetime
since = since.timestamp()
# UTC 2017-07-02 12:53:20 is oldest seconds value
since = str(max(1499000000, since))
since = str(max(1499000000, int(since)))
json = await self._public(
'OHLC',
data={
@ -445,7 +461,16 @@ class Client:
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
except KeyError:
raise SymbolNotFound(json['error'][0] + f': {symbol}')
errmsg = json['error'][0]
if 'not found' in errmsg:
raise SymbolNotFound(errmsg + f': {symbol}')
elif 'Too many requests' in errmsg:
raise DataThrottle(f'{symbol}')
else:
raise BrokerError(errmsg)
@acm
@ -668,8 +693,8 @@ async def handle_order_requests(
oid=msg.oid,
reqid=msg.reqid,
symbol=msg.symbol,
# TODO: maybe figure out if pending cancels will
# eventually get cancelled
# TODO: maybe figure out if pending
# cancels will eventually get cancelled
reason="Order cancel is still pending?",
broker_details=resp
).dict()
@ -1003,7 +1028,45 @@ async def open_history_client(
# TODO implement history getter for the new storage layer.
async with open_cached_client('kraken') as client:
yield client
# lol, kraken won't send any more then the "last"
# 720 1m bars.. so we have to just ignore further
# requests of this type..
queries: int = 0
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
nonlocal queries
if queries > 0:
raise DataUnavailable
count = 0
while count <= 3:
try:
array = await client.bars(
symbol,
since=end_dt,
)
count += 1
queries += 1
break
except DataThrottle:
log.warning(f'kraken OHLC throttle for {symbol}')
await trio.sleep(1)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 1, 'rate': 1}
async def backfill_bars(

View File

@ -1,7 +1,25 @@
"""
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
CLI commons.
"""
'''
import os
from pprint import pformat
import click
import trio
@ -16,29 +34,22 @@ from .. import config
log = get_logger('cli')
DEFAULT_BROKER = 'questrade'
_config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
_context_defaults = dict(
default_map={
# Questrade specific quote poll rates
'monitor': {
'rate': 3,
},
'optschain': {
'rate': 1,
},
}
)
@click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--pdb', is_flag=True, help='Enable tractor debug mode')
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
def pikerd(loglevel, host, tl, pdb):
"""Spawn the piker broker-daemon.
"""
@click.option(
'--tsdb',
is_flag=True,
help='Enable local ``marketstore`` instance'
)
def pikerd(loglevel, host, tl, pdb, tsdb):
'''
Spawn the piker broker-daemon.
'''
from .._daemon import open_pikerd
log = get_console_log(loglevel)
@ -52,13 +63,38 @@ def pikerd(loglevel, host, tl, pdb):
))
async def main():
async with open_pikerd(loglevel=loglevel, debug_mode=pdb):
async with (
open_pikerd(
loglevel=loglevel,
debug_mode=pdb,
), # normally delivers a ``Services`` handle
trio.open_nursery() as n,
):
if tsdb:
from piker.data._ahab import start_ahab
from piker.data.marketstore import start_marketstore
log.info('Spawning `marketstore` supervisor')
ctn_ready, config, (cid, pid) = await n.start(
start_ahab,
'marketstored',
start_marketstore,
)
log.info(
f'`marketstore` up!\n'
f'`marketstored` pid: {pid}\n'
f'docker container id: {cid}\n'
f'config: {pformat(config)}'
)
await trio.sleep_forever()
trio.run(main)
@click.group(context_settings=_context_defaults)
@click.group(context_settings=config._context_defaults)
@click.option(
'--brokers', '-b',
default=[DEFAULT_BROKER],
@ -87,8 +123,8 @@ def cli(ctx, brokers, loglevel, tl, configdir):
'loglevel': loglevel,
'tractorloglevel': None,
'log': get_console_log(loglevel),
'confdir': _config_dir,
'wl_path': _watchlists_data_path,
'confdir': config._config_dir,
'wl_path': config._watchlists_data_path,
})
# allow enabling same loglevel in ``tractor`` machinery

View File

@ -17,6 +17,8 @@
"""
Broker configuration mgmt.
"""
import platform
import sys
import os
from os.path import dirname
import shutil
@ -24,14 +26,100 @@ from typing import Optional
from bidict import bidict
import toml
import click
from .log import get_logger
log = get_logger('broker-config')
_config_dir = click.get_app_dir('piker')
# taken from ``click`` since apparently they have some
# super weirdness with sigint and sudo..no clue
def get_app_dir(app_name, roaming=True, force_posix=False):
r"""Returns the config folder for the application. The default behavior
is to return whatever is most appropriate for the operating system.
To give you an idea, for an app called ``"Foo Bar"``, something like
the following folders could be returned:
Mac OS X:
``~/Library/Application Support/Foo Bar``
Mac OS X (POSIX):
``~/.foo-bar``
Unix:
``~/.config/foo-bar``
Unix (POSIX):
``~/.foo-bar``
Win XP (roaming):
``C:\Documents and Settings\<user>\Local Settings\Application Data\Foo Bar``
Win XP (not roaming):
``C:\Documents and Settings\<user>\Application Data\Foo Bar``
Win 7 (roaming):
``C:\Users\<user>\AppData\Roaming\Foo Bar``
Win 7 (not roaming):
``C:\Users\<user>\AppData\Local\Foo Bar``
.. versionadded:: 2.0
:param app_name: the application name. This should be properly capitalized
and can contain whitespace.
:param roaming: controls if the folder should be roaming or not on Windows.
Has no affect otherwise.
:param force_posix: if this is set to `True` then on any POSIX system the
folder will be stored in the home folder with a leading
dot instead of the XDG config home or darwin's
application support folder.
"""
def _posixify(name):
return "-".join(name.split()).lower()
# if WIN:
if platform.system() == 'Windows':
key = "APPDATA" if roaming else "LOCALAPPDATA"
folder = os.environ.get(key)
if folder is None:
folder = os.path.expanduser("~")
return os.path.join(folder, app_name)
if force_posix:
return os.path.join(os.path.expanduser("~/.{}".format(_posixify(app_name))))
if sys.platform == "darwin":
return os.path.join(
os.path.expanduser("~/Library/Application Support"), app_name
)
return os.path.join(
os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")),
_posixify(app_name),
)
_config_dir = _click_config_dir = get_app_dir('piker')
_parent_user = os.environ.get('SUDO_USER')
if _parent_user:
non_root_user_dir = os.path.expanduser(
f'~{_parent_user}'
)
root = 'root'
_config_dir = (
non_root_user_dir +
_click_config_dir[
_click_config_dir.rfind(root) + len(root):
]
)
_file_name = 'brokers.toml'
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
_context_defaults = dict(
default_map={
# Questrade specific quote poll rates
'monitor': {
'rate': 3,
},
'optschain': {
'rate': 1,
},
}
)
def _override_config_dir(

370
piker/data/_ahab.py 100644
View File

@ -0,0 +1,370 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Supervisor for docker with included specific-image service helpers.
'''
import os
from typing import (
Optional,
Callable,
Any,
)
from contextlib import asynccontextmanager as acm
import trio
from trio_typing import TaskStatus
import tractor
from tractor.msg import NamespacePath
import docker
import json
from docker.models.containers import Container as DockerContainer
from docker.errors import (
DockerException,
APIError,
)
from requests.exceptions import ConnectionError, ReadTimeout
from ..log import get_logger, get_console_log
from .. import config
log = get_logger(__name__)
class DockerNotStarted(Exception):
'Prolly you dint start da daemon bruh'
class ContainerError(RuntimeError):
'Error reported via app-container logging level'
@acm
async def open_docker(
url: Optional[str] = None,
**kwargs,
) -> docker.DockerClient:
client: Optional[docker.DockerClient] = None
try:
client = docker.DockerClient(
base_url=url,
**kwargs
) if url else docker.from_env(**kwargs)
yield client
except (
DockerException,
APIError,
) as err:
def unpack_msg(err: Exception) -> str:
args = getattr(err, 'args', None)
if args:
return args
else:
return str(err)
# could be more specific so let's check if it's just perms.
if err.args:
errs = err.args
for err in errs:
msg = unpack_msg(err)
if 'PermissionError' in msg:
raise DockerException('You dint run as root yo!')
elif 'FileNotFoundError' in msg:
raise DockerNotStarted('Did you start da service sister?')
# not perms?
raise
finally:
if client:
client.close()
for c in client.containers.list():
c.kill()
class Container:
'''
Wrapper around a ``docker.models.containers.Container`` to include
log capture and relay through our native logging system and helper
method(s) for cancellation/teardown.
'''
def __init__(
self,
cntr: DockerContainer,
) -> None:
self.cntr = cntr
# log msg de-duplication
self.seen_so_far = set()
async def process_logs_until(
self,
patt: str,
bp_on_msg: bool = False,
) -> bool:
'''
Attempt to capture container log messages and relay through our
native logging system.
'''
seen_so_far = self.seen_so_far
while True:
logs = self.cntr.logs()
entries = logs.decode().split('\n')
for entry in entries:
# ignore null lines
if not entry:
continue
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
# print(f'level: {level}')
if level in ('error', 'fatal'):
raise ContainerError(msg)
if patt in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)
return False
def try_signal(
self,
signal: str = 'SIGINT',
) -> bool:
try:
# XXX: market store doesn't seem to shutdown nicely all the
# time with this (maybe because there are still open grpc
# connections?) noticably after client connections have been
# made or are in use/teardown. It works just fine if you
# just start and stop the container tho?..
log.cancel(f'SENDING {signal} to {self.cntr.id}')
self.cntr.kill(signal)
return True
except docker.errors.APIError as err:
if 'is not running' in err.explanation:
return False
async def cancel(
self,
) -> None:
cid = self.cntr.id
self.try_signal('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until('initiating graceful shutdown')
await self.process_logs_until('exiting...',)
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until('exiting...',)
break
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
else:
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}')
@tractor.context
async def open_ahabd(
ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type
**kwargs,
) -> None:
get_console_log('info', name=__name__)
async with open_docker() as client:
# TODO: eventually offer a config-oriented API to do the mounts,
# params, etc. passing to ``Containter.run()``?
# call into endpoint for container config/init
ep_func = NamespacePath(endpoint).load_ref()
dcntr, cntr_config = ep_func(client)
cntr = Container(dcntr)
with trio.move_on_after(1):
found = await cntr.process_logs_until(
"launching tcp listener for all services...",
)
if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
)
await ctx.started((
cntr.cntr.id,
os.getpid(),
cntr_config,
))
try:
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):
with trio.CancelScope(shield=True):
await cntr.cancel()
raise
async def start_ahab(
service_name: str,
endpoint: Callable[docker.DockerClient, DockerContainer],
task_status: TaskStatus[
tuple[
trio.Event,
dict[str, Any],
],
] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Start a ``docker`` container supervisor with given service name.
Currently the actor calling this task should normally be started
with root permissions (until we decide to use something that doesn't
require this, like docker's rootless mode or some wrapper project) but
te root perms are de-escalated after the docker supervisor sub-actor
is started.
'''
cn_ready = trio.Event()
try:
async with tractor.open_nursery(
loglevel='runtime',
) as tn:
portal = await tn.start_actor(
service_name,
enable_modules=[__name__]
)
# TODO: we have issues with this on teardown
# where ``tractor`` tries to issue ``os.kill()``
# and hits perms errors since the root process
# doesn't any longer have root perms..
# de-escalate root perms to the original user
# after the docker supervisor actor is spawned.
if config._parent_user:
import pwd
os.setuid(
pwd.getpwnam(
config._parent_user
)[2] # named user's uid
)
async with portal.open_context(
open_ahabd,
endpoint=str(NamespacePath.from_ref(endpoint)),
) as (ctx, first):
cid, pid, cntr_config = first
task_status.started((
cn_ready,
cntr_config,
(cid, pid),
))
await trio.sleep_forever()
# since we demoted root perms in this parent
# we'll get a perms error on proc cleanup in
# ``tractor`` nursery exit. just make sure
# the child is terminated and don't raise the
# error if so.
# TODO: we could also consider adding
# a ``tractor.ZombieDetected`` or something that we could raise
# if we find the child didn't terminate.
except PermissionError:
log.warning('Failed to cancel root permsed container')
except (
trio.MultiError,
) as err:
for subexc in err.exceptions:
if isinstance(subexc, PermissionError):
log.warning('Failed to cancel root perms-ed container')
return
else:
raise

View File

@ -22,14 +22,16 @@ financial data flows.
from __future__ import annotations
from collections import Counter
import time
from typing import TYPE_CHECKING, Optional
import tractor
import trio
from trio_typing import TaskStatus
from ._sharedmem import ShmArray
from ..log import get_logger
if TYPE_CHECKING:
from ._sharedmem import ShmArray
log = get_logger(__name__)
@ -88,6 +90,7 @@ async def increment_ohlc_buffer(
total_s = 0 # total seconds counted
lowest = min(sampler.ohlcv_shms.keys())
lowest_shm = sampler.ohlcv_shms[lowest][0]
ad = lowest - 0.001
with trio.CancelScope() as cs:
@ -131,13 +134,33 @@ async def increment_ohlc_buffer(
# write to the buffer
shm.push(last)
await broadcast(delay_s, shm=lowest_shm)
async def broadcast(
delay_s: int,
shm: Optional[ShmArray] = None,
) -> None:
# broadcast the buffer index step to any subscribers for
# a given sample period.
subs = sampler.subscribers.get(delay_s, ())
last = -1
if shm is None:
periods = sampler.ohlcv_shms.keys()
# if this is an update triggered by a history update there
# might not actually be any sampling bus setup since there's
# no "live feed" active yet.
if periods:
lowest = min(periods)
shm = sampler.ohlcv_shms[lowest][0]
last = shm._last.value
for stream in subs:
try:
await stream.send({'index': shm._last.value})
await stream.send({'index': last})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
@ -365,7 +388,12 @@ async def uniform_rate_send(
if left_to_sleep > 0:
with trio.move_on_after(left_to_sleep) as cs:
try:
sym, last_quote = await quote_stream.receive()
except trio.EndOfChannel:
log.exception(f"feed for {stream} ended?")
break
diff = time.time() - last_send
if not first_quote:

View File

@ -22,7 +22,6 @@ from __future__ import annotations
from sys import byteorder
from typing import Optional
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
from multiprocessing import resource_tracker as mantracker
if _USE_POSIX:
from _posixshmem import shm_unlink
@ -30,6 +29,7 @@ if _USE_POSIX:
import tractor
import numpy as np
from pydantic import BaseModel
from numpy.lib import recfunctions as rfn
from ..log import get_logger
from ._source import base_iohlc_dtype
@ -40,14 +40,19 @@ log = get_logger(__name__)
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 24)
# we try for 3 times but only on a run-every-other-day kinda week.
_default_size = 10 * _secs_in_day
# we try for a buncha times, but only on a run-every-other-day kinda week.
_days_worth = 16
_default_size = _days_worth * _secs_in_day
# where to start the new data append index
_rt_buffer_start = int(9*_secs_in_day)
_rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def cuckoff_mantracker():
from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
@ -57,15 +62,17 @@ class ManTracker(mantracker.ResourceTracker):
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
# ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
cuckoff_mantracker()
class SharedInt:
@ -191,7 +198,11 @@ class ShmArray:
self._post_init: bool = False
# pushing data does not write the index (aka primary key)
dtype = shmarr.dtype
if dtype.fields:
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
else:
self._write_fields = None
# TODO: ringbuf api?
@ -237,6 +248,48 @@ class ShmArray:
return a
def ustruct(
self,
fields: Optional[list[str]] = None,
# type that all field values will be cast to
# in the returned view.
common_dtype: np.dtype = np.float,
) -> np.ndarray:
array = self._array
if fields:
selection = array[fields]
# fcount = len(fields)
else:
selection = array
# fcount = len(array.dtype.fields)
# XXX: manual ``.view()`` attempt that also doesn't work.
# uview = selection.view(
# dtype='<f16',
# ).reshape(-1, 4, order='A')
# assert len(selection) == len(uview)
u = rfn.structured_to_unstructured(
selection,
# dtype=float,
copy=True,
)
# unstruct = np.ndarray(u.shape, dtype=a.dtype, buffer=shm.buf)
# array[:] = a[:]
return u
# return ShmArray(
# shmarr=u,
# first=self._first,
# last=self._last,
# shm=self._shm
# )
def last(
self,
length: int = 1,
@ -255,6 +308,7 @@ class ShmArray:
field_map: Optional[dict[str, str]] = None,
prepend: bool = False,
update_first: bool = True,
start: Optional[int] = None,
) -> int:
@ -267,10 +321,9 @@ class ShmArray:
'''
length = len(data)
index = start if start is not None else self._last.value
if prepend:
index = self._first.value - length
index = (start or self._first.value) - length
if index < 0:
raise ValueError(
@ -278,6 +331,9 @@ class ShmArray:
f'You have passed {abs(index)} too many datums.'
)
else:
index = start if start is not None else self._last.value
end = index + length
if field_map:
@ -295,12 +351,17 @@ class ShmArray:
# tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder.
if prepend:
if prepend and update_first and length:
assert index < self._first.value
if index < self._first.value:
if (
index < self._first.value
and update_first
):
assert prepend, 'prepend=True not passed but index decreased?'
self._first.value = index
else:
elif not prepend:
self._last.value = end
self._post_init = True
@ -336,6 +397,7 @@ class ShmArray:
f"Input array has unknown field(s): {only_in_theirs}"
)
# TODO: support "silent" prepends that don't update ._first.value?
def prepend(
self,
data: np.ndarray,
@ -386,7 +448,11 @@ def open_shm_array(
create=True,
size=a.nbytes
)
array = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
array = np.ndarray(
a.shape,
dtype=a.dtype,
buffer=shm.buf
)
array[:] = a[:]
array.setflags(write=int(not readonly))

View File

@ -21,9 +21,9 @@ from __future__ import annotations
from typing import Any
import decimal
from bidict import bidict
import numpy as np
import pandas as pd
from pydantic import BaseModel, validate_arguments
from pydantic import BaseModel
# from numba import from_dtype
@ -48,16 +48,16 @@ base_ohlc_dtype = np.dtype(ohlc_fields)
# https://github.com/numba/numba/issues/4511
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
# map time frame "keys" to minutes values
tf_in_1m = {
'1m': 1,
'5m': 5,
'15m': 15,
'30m': 30,
'1h': 60,
'4h': 240,
'1d': 1440,
}
# map time frame "keys" to seconds values
tf_in_1s = bidict({
1: '1s',
60: '1m',
60*5: '5m',
60*15: '15m',
60*30: '30m',
60*60: '1h',
60*60*24: '1d',
})
def mk_fqsn(
@ -127,11 +127,11 @@ def unpack_fqsn(fqsn: str) -> tuple[str, str, str]:
class Symbol(BaseModel):
"""I guess this is some kinda container thing for dealing with
'''
I guess this is some kinda container thing for dealing with
all the different meta-data formats from brokers?
Yah, i guess dats what it izz.
"""
'''
key: str
tick_size: float = 0.01
lot_tick_size: float = 0.0 # "volume" precision as min step value
@ -254,61 +254,6 @@ class Symbol(BaseModel):
return keys
def from_df(
df: pd.DataFrame,
source=None,
default_tf=None
) -> np.recarray:
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.
"""
df.reset_index(inplace=True)
# hackery to convert field names
date = 'Date'
if 'date' in df.columns:
date = 'date'
# convert to POSIX time
df[date] = [d.timestamp() for d in df[date]]
# try to rename from some camel case
columns = {
'Date': 'time',
'date': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
# most feeds are providing this over sesssion anchored
'vwap': 'bar_wap',
# XXX: ib_insync calls this the "wap of the bar"
# but no clue what is actually is...
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
'average': 'bar_wap',
}
df = df.rename(columns=columns)
for name in df.columns:
# if name not in base_ohlc_dtype.names[1:]:
if name not in base_ohlc_dtype.names:
del df[name]
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
array = df.to_records(index=False)
_nan_to_closest_num(array)
return array
def _nan_to_closest_num(array: np.ndarray):
"""Return interpolated values instead of NaN.

View File

@ -16,26 +16,34 @@
"""
marketstore cli.
"""
from typing import List
from functools import partial
from pprint import pformat
from anyio_marketstore import open_marketstore_client
import trio
import tractor
import click
import numpy as np
from .marketstore import (
get_client,
stream_quotes,
# stream_quotes,
ingest_quote_stream,
_url,
# _url,
_tick_tbk_ids,
mk_tbk,
)
from ..cli import cli
from .. import watchlists as wl
from ..log import get_logger
from ._sharedmem import (
maybe_open_shm_array,
)
from ._source import (
base_iohlc_dtype,
)
log = get_logger(__name__)
@ -49,51 +57,58 @@ log = get_logger(__name__)
)
@click.argument('names', nargs=-1)
@click.pass_obj
def ms_stream(config: dict, names: List[str], url: str):
"""Connect to a marketstore time bucket stream for (a set of) symbols(s)
def ms_stream(
config: dict,
names: list[str],
url: str,
) -> None:
'''
Connect to a marketstore time bucket stream for (a set of) symbols(s)
and print to console.
"""
'''
async def main():
async for quote in stream_quotes(symbols=names):
log.info(f"Received quote:\n{quote}")
# async for quote in stream_quotes(symbols=names):
# log.info(f"Received quote:\n{quote}")
...
trio.run(main)
@cli.command()
@click.option(
'--url',
default=_url,
help='HTTP URL of marketstore instance'
)
@click.argument('names', nargs=-1)
@click.pass_obj
def ms_destroy(config: dict, names: List[str], url: str) -> None:
"""Destroy symbol entries in the local marketstore instance.
"""
async def main():
nonlocal names
async with get_client(url) as client:
if not names:
names = await client.list_symbols()
# default is to wipe db entirely.
answer = input(
"This will entirely wipe you local marketstore db @ "
f"{url} of the following symbols:\n {pformat(names)}"
"\n\nDelete [N/y]?\n")
if answer == 'y':
for sym in names:
# tbk = _tick_tbk.format(sym)
tbk = tuple(sym, *_tick_tbk_ids)
print(f"Destroying {tbk}..")
await client.destroy(mk_tbk(tbk))
else:
print("Nothing deleted.")
tractor.run(main)
# @cli.command()
# @click.option(
# '--url',
# default=_url,
# help='HTTP URL of marketstore instance'
# )
# @click.argument('names', nargs=-1)
# @click.pass_obj
# def ms_destroy(config: dict, names: list[str], url: str) -> None:
# """Destroy symbol entries in the local marketstore instance.
# """
# async def main():
# nonlocal names
# async with get_client(url) as client:
#
# if not names:
# names = await client.list_symbols()
#
# # default is to wipe db entirely.
# answer = input(
# "This will entirely wipe you local marketstore db @ "
# f"{url} of the following symbols:\n {pformat(names)}"
# "\n\nDelete [N/y]?\n")
#
# if answer == 'y':
# for sym in names:
# # tbk = _tick_tbk.format(sym)
# tbk = tuple(sym, *_tick_tbk_ids)
# print(f"Destroying {tbk}..")
# await client.destroy(mk_tbk(tbk))
# else:
# print("Nothing deleted.")
#
# tractor.run(main)
@cli.command()
@ -102,41 +117,53 @@ def ms_destroy(config: dict, names: List[str], url: str) -> None:
is_flag=True,
help='Enable tractor logging')
@click.option(
'--url',
default=_url,
help='HTTP URL of marketstore instance'
'--host',
default='localhost'
)
@click.argument('name', nargs=1, required=True)
@click.option(
'--port',
default=5993
)
@click.argument('symbols', nargs=-1)
@click.pass_obj
def ms_shell(config, name, tl, url):
"""Start an IPython shell ready to query the local marketstore db.
"""
async def main():
async with get_client(url) as client:
query = client.query # noqa
# TODO: write magics to query marketstore
from IPython import embed
embed()
def storesh(
config,
tl,
host,
port,
symbols: list[str],
):
'''
Start an IPython shell ready to query the local marketstore db.
tractor.run(main)
'''
from piker.data.marketstore import tsdb_history_update
from piker._daemon import open_piker_runtime
async def main():
nonlocal symbols
async with open_piker_runtime(
'storesh',
enable_modules=['piker.data._ahab'],
):
symbol = symbols[0]
await tsdb_history_update(symbol)
trio.run(main)
@cli.command()
@click.option('--test-file', '-t', help='Test quote stream file')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option(
'--url',
default=_url,
help='HTTP URL of marketstore instance'
)
@click.argument('name', nargs=1, required=True)
@click.pass_obj
def ingest(config, name, test_file, tl, url):
"""Ingest real-time broker quotes and ticks to a marketstore instance.
"""
def ingest(config, name, test_file, tl):
'''
Ingest real-time broker quotes and ticks to a marketstore instance.
'''
# global opts
brokermod = config['brokermod']
loglevel = config['loglevel']
tractorloglevel = config['tractorloglevel']
# log = config['log']
@ -145,15 +172,25 @@ def ingest(config, name, test_file, tl, url):
watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins)
symbols = watchlists[name]
tractor.run(
partial(
grouped_syms = {}
for sym in symbols:
symbol, _, provider = sym.rpartition('.')
if provider not in grouped_syms:
grouped_syms[provider] = []
grouped_syms[provider].append(symbol)
async def entry_point():
async with tractor.open_nursery() as n:
for provider, symbols in grouped_syms.items():
await n.run_in_actor(
ingest_quote_stream,
symbols,
brokermod.name,
tries=1,
loglevel=loglevel,
),
name='ingest_marketstore',
loglevel=tractorloglevel,
debug_mode=True,
symbols=symbols,
brokername=provider,
tries=1,
actorloglevel=loglevel,
loglevel=tractorloglevel
)
tractor.run(entry_point)

View File

@ -20,27 +20,36 @@ Data feed apis and infra.
This module is enabled for ``brokerd`` daemons.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager
from functools import partial
from pprint import pformat
from types import ModuleType
from typing import (
Any,
AsyncIterator, Optional,
Generator,
Awaitable,
TYPE_CHECKING,
)
import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import trimeter
import tractor
from pydantic import BaseModel
import pendulum
import numpy as np
from ..brokers import get_brokermod
from .._cacheables import maybe_open_context
from ..log import get_logger, get_console_log
from .._daemon import (
maybe_spawn_brokerd,
check_for_service,
)
from ._sharedmem import (
maybe_open_shm_array,
@ -56,12 +65,19 @@ from ._source import (
from ..ui import _search
from ._sampling import (
sampler,
broadcast,
increment_ohlc_buffer,
iter_ohlc_periods,
sample_and_broadcast,
uniform_rate_send,
)
from ..brokers._util import (
NoData,
DataUnavailable,
)
if TYPE_CHECKING:
from .marketstore import Storage
log = get_logger(__name__)
@ -124,7 +140,7 @@ class _FeedsBus(BaseModel):
# def cancel_task(
# self,
# task: trio.lowlevel.Task
# task: trio.lowlevel.Task,
# ) -> bool:
# ...
@ -188,6 +204,442 @@ async def _setup_persistent_brokerd(
await trio.sleep_forever()
def diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt: Optional[datetime] = None
) -> np.ndarray:
to_push = array
if last_tsdb_dt:
s_diff = (start_dt - last_tsdb_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if (
s_diff < 0
):
if abs(s_diff) < len(array):
# the + 1 is because ``last_tsdb_dt`` is pulled from
# the last row entry for the ``'time'`` field retreived
# from the tsdb.
to_push = array[abs(s_diff)+1:]
else:
# pass back only the portion of the array that is
# greater then the last time stamp in the tsdb.
time = array['time']
to_push = array[time >= last_tsdb_dt.timestamp()]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
async def start_backfill(
mod: ModuleType,
bfqsn: str,
shm: ShmArray,
last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
write_tsdb: bool = True,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> int:
async with mod.open_history_client(bfqsn) as (hist, config):
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt=None)
times = array['time']
# sample period step size in seconds
step_size_s = (
pendulum.from_timestamp(times[-1]) -
pendulum.from_timestamp(times[-2])
).seconds
# "frame"'s worth of sample period steps in seconds
frame_size_s = len(array) * step_size_s
to_push = diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt=last_tsdb_dt,
)
log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push)
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# signal that backfilling to tsdb's end datum is complete
bf_done = trio.Event()
# let caller unblock and deliver latest history frame
task_status.started((shm, start_dt, end_dt, bf_done))
if last_tsdb_dt is None:
# maybe a better default (they don't seem to define epoch?!)
# based on the sample step size load a certain amount
# history
if step_size_s == 1:
last_tsdb_dt = pendulum.now().subtract(days=2)
elif step_size_s == 60:
last_tsdb_dt = pendulum.now().subtract(years=2)
else:
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {step_size_s} ' 'seconds.. so ye, dun '
'do dat bruh.'
)
# configure async query throttling
erlangs = config.get('erlangs', 1)
rate = config.get('rate', 1)
frames = {}
def iter_dts(start: datetime):
while True:
hist_period = pendulum.period(
start,
last_tsdb_dt,
)
dtrange = list(hist_period.range('seconds', frame_size_s))
log.debug(f'New datetime index:\n{pformat(dtrange)}')
for end_dt in dtrange:
log.warning(f'Yielding next frame start {end_dt}')
start = yield end_dt
# if caller sends a new start date, reset to that
if start is not None:
log.warning(f'Resetting date range: {start}')
break
else:
# from while
return
# pull new history frames until we hit latest
# already in the tsdb or a max count.
count = 0
# NOTE: when gaps are detected in the retreived history (by
# comparisor of the end - start versus the expected "frame size"
# in seconds) we need a way to alert the async request code not
# to continue to query for data "within the gap". This var is
# set in such cases such that further requests in that period
# are discarded and further we reset the "datetimem query frame
# index" in such cases to avoid needless noop requests.
earliest_end_dt: Optional[datetime] = start_dt
async def get_ohlc_frame(
input_end_dt: datetime,
iter_dts_gen: Generator[datetime],
) -> np.ndarray:
nonlocal count, frames, earliest_end_dt, frame_size_s
count += 1
if input_end_dt > earliest_end_dt:
# if a request comes in for an inter-gap frame we
# discard it since likely this request is still
# lingering from before the reset of ``iter_dts()`` via
# ``.send()`` below.
log.info(f'Discarding request history ending @ {input_end_dt}')
# signals to ``trimeter`` loop to discard and
# ``continue`` in it's schedule loop.
return None
try:
log.info(
f'Requesting {step_size_s}s frame ending in {input_end_dt}'
)
array, start_dt, end_dt = await hist(end_dt=input_end_dt)
assert array['time'][0] == start_dt.timestamp()
except NoData:
log.warning(
f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?'
)
return None # discard signal
except DataUnavailable as duerr:
# broker is being a bish and we can't pull
# any more..
log.warning('backend halted on data deliver !?!?')
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
return duerr
diff = end_dt - start_dt
frame_time_diff_s = diff.seconds
expected_frame_size_s = frame_size_s + step_size_s
if frame_time_diff_s > expected_frame_size_s:
# XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.)
# so indicate to the request loop that this gap is
# expected by both,
# - resetting the ``iter_dts()`` generator to start at
# the new start point delivered in this result
# - setting the non-locally scoped ``earliest_end_dt``
# to this new value so that the request loop doesn't
# get tripped up thinking there's an out of order
# request-result condition.
log.warning(
f'History frame ending @ {end_dt} appears to have a gap:\n'
f'{diff} ~= {frame_time_diff_s} seconds'
)
# reset dtrange gen to new start point
try:
next_end = iter_dts_gen.send(start_dt)
log.info(
f'Reset frame index to start at {start_dt}\n'
f'Was at {next_end}'
)
# NOTE: manually set "earliest end datetime" index-value
# to avoid the request loop getting confused about
# new frames that are earlier in history - i.e. this
# **is not** the case of out-of-order frames from
# an async batch request.
earliest_end_dt = start_dt
except StopIteration:
# gen already terminated meaning we probably already
# exhausted it via frame requests.
log.info(
"Datetime index already exhausted, can't reset.."
)
to_push = diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt=last_tsdb_dt,
)
ln = len(to_push)
if ln:
log.info(f'{ln} bars for {start_dt} -> {end_dt}')
frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt)
return to_push, start_dt, end_dt
else:
log.warning(
f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}'
)
return None
# initial dt index starts at the start of the first query result
idts = iter_dts(start_dt)
async with trimeter.amap(
partial(
get_ohlc_frame,
# we close in the ``iter_dt()`` gen in so we can send
# reset signals as needed for gap dection in the
# history.
iter_dts_gen=idts,
),
idts,
capture_outcome=True,
include_value=True,
# better technical names bruv...
max_at_once=erlangs,
max_per_second=rate,
) as outcomes:
# Then iterate over the return values, as they become available
# (i.e., not necessarily in the original order)
async for input_end_dt, outcome in outcomes:
try:
out = outcome.unwrap()
if out is None:
# skip signal
continue
elif isinstance(out, DataUnavailable):
# no data available case signal.. so just kill
# further requests and basically just stop
# trying...
break
except Exception:
log.exception('uhh trimeter bail')
raise
else:
to_push, start_dt, end_dt = out
if not len(to_push):
# diff returned no new data (i.e. we probablyl hit
# the ``last_tsdb_dt`` point).
# TODO: raise instead?
log.warning(f'No history for range {start_dt} -> {end_dt}')
continue
# pipeline-style pull frames until we need to wait for
# the next in order to arrive.
# i = end_dts.index(input_end_dt)
# print(f'latest end_dt {end_dt} found at index {i}')
epochs = list(reversed(sorted(frames)))
for epoch in epochs:
start = shm.array['time'][0]
last_shm_prepend_dt = pendulum.from_timestamp(start)
earliest_frame_queue_dt = pendulum.from_timestamp(epoch)
diff = start - epoch
if diff < 0:
log.warning(
'Discarding out of order frame:\n'
f'{earliest_frame_queue_dt}'
)
frames.pop(epoch)
continue
# await tractor.breakpoint()
if diff > step_size_s:
if earliest_end_dt < earliest_frame_queue_dt:
# XXX: an expected gap was encountered (see
# logic in ``get_ohlc_frame()``, so allow
# this frame through to the storage layer.
log.warning(
f'Expected history gap of {diff}s:\n'
f'{earliest_frame_queue_dt} <- '
f'{earliest_end_dt}'
)
elif (
erlangs > 1
):
# we don't yet have the next frame to push
# so break back to the async request loop
# while we wait for more async frame-results
# to arrive.
if len(frames) >= erlangs:
log.warning(
'Frame count in async-queue is greater '
'then erlangs?\n'
'There seems to be a gap between:\n'
f'{earliest_frame_queue_dt} <- '
f'{last_shm_prepend_dt}\n'
'Conducting manual call for frame ending: '
f'{last_shm_prepend_dt}'
)
(
to_push,
start_dt,
end_dt,
) = await get_ohlc_frame(
input_end_dt=last_shm_prepend_dt,
iter_dts_gen=idts,
)
last_epoch = to_push['time'][-1]
diff = start - last_epoch
if diff > step_size_s:
await tractor.breakpoint()
raise DataUnavailable(
'An awkward frame was found:\n'
f'{start_dt} -> {end_dt}:\n{to_push}'
)
else:
frames[last_epoch] = (
to_push, start_dt, end_dt)
break
expect_end = pendulum.from_timestamp(start)
expect_start = expect_end.subtract(
seconds=frame_size_s)
log.warning(
'waiting on out-of-order history frame:\n'
f'{expect_end - expect_start}'
)
break
to_push, start_dt, end_dt = frames.pop(epoch)
ln = len(to_push)
# bail gracefully on shm allocation overrun/full condition
try:
shm.push(to_push, prepend=True)
except ValueError:
log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
)
break
log.info(
f'Shm pushed {ln} frame:\n'
f'{start_dt} -> {end_dt}'
)
# keep track of most recent "prepended" ``start_dt``
# both for detecting gaps and ensuring async
# frame-result order.
earliest_end_dt = start_dt
if (
storage is not None
and write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{start_dt} -> {end_dt}'
)
await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul..
to_push,
)
# TODO: can we only trigger this if the respective
# history in "in view"?!?
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
for delay_s in sampler.subscribers:
await broadcast(delay_s)
bf_done.set()
async def manage_history(
mod: ModuleType,
bus: _FeedsBus,
@ -216,14 +668,157 @@ async def manage_history(
# we expect the sub-actor to write
readonly=False,
)
# TODO: history validation
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
if opened:
log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored')
# for now only do backfilling if no tsdb can be found
do_legacy_backfill = not is_up and opened
bfqsn = fqsn.replace('.' + mod.name, '')
open_history_client = getattr(mod, 'open_history_client', None)
if is_up and opened and open_history_client:
log.info('Found existing `marketstored`')
from . import marketstore
async with marketstore.open_storage_client(
fqsn,
) as storage:
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
# start history anal and load missing new data via backend.
series, _, last_tsdb_dt = await storage.load(fqsn)
broker, symbol, expiry = unpack_fqsn(fqsn)
(
shm,
latest_start_dt,
latest_end_dt,
bf_done,
) = await bus.nursery.start(
partial(
start_backfill,
mod,
bfqsn,
shm,
last_tsdb_dt=last_tsdb_dt,
storage=storage,
)
)
# if len(shm.array) < 2:
# TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and
# there was only "1 new" row pushed from the first backfill
# query-iteration, then the sample step sizing calcs will
# break upstream from here since you can't diff on at least
# 2 steps... probably should also add logic to compute from
# the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se.
task_status.started(shm)
some_data_ready.set()
await bf_done.wait()
# do diff against last start frame of history and only fill
# in from the tsdb an allotment that allows for most recent
# to be loaded into mem *before* tsdb data.
if last_tsdb_dt:
dt_diff_s = (
latest_start_dt - last_tsdb_dt
).seconds
else:
dt_diff_s = 0
# await trio.sleep_forever()
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
prepend_start = shm._first.value
# sanity check on most-recent-data loading
assert prepend_start > dt_diff_s
history = list(series.values())
if history:
fastest = history[0]
to_push = fastest[:prepend_start]
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
# start=prepend_start,
field_map=marketstore.ohlc_key_map,
)
# load as much from storage into shm as space will
# allow according to user's shm size settings.
count = 0
end = fastest['Epoch'][0]
while shm._first.value > 0:
count += 1
series = await storage.read_ohlcv(
fqsn,
end=end,
)
history = list(series.values())
fastest = history[0]
end = fastest['Epoch'][0]
prepend_start -= len(to_push)
to_push = fastest[:prepend_start]
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
# start=prepend_start,
field_map=marketstore.ohlc_key_map,
)
# manually trigger step update to update charts/fsps
# which need an incremental update.
for delay_s in sampler.subscribers:
await broadcast(delay_s)
if count > 6:
break
log.info(f'Loaded {to_push.shape} datums from storage')
# TODO: write new data to tsdb to be ready to for next read.
if do_legacy_backfill:
# do a legacy incremental backfill from the provider.
log.info('No existing `marketstored` found..')
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
_ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
await bus.nursery.start(
partial(
start_backfill,
mod,
bfqsn,
shm,
)
)
# yield back after client connect with filled shm
task_status.started(shm)
@ -233,33 +828,20 @@ async def manage_history(
# data that can be used.
some_data_ready.set()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
# history retreival loop depending on user interaction and thus
# a small RPC-prot for remotely controllinlg what data is loaded
# for viewing.
await trio.sleep_forever()
async def allocate_persistent_feed(
bus: _FeedsBus,
brokername: str,
symbol: str,
loglevel: str,
start_stream: bool = True,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -277,6 +859,7 @@ async def allocate_persistent_feed(
- a real-time streaming task which connec
'''
# load backend module
try:
mod = get_brokermod(brokername)
except ImportError:
@ -319,7 +902,7 @@ async def allocate_persistent_feed(
manage_history,
mod,
bus,
bfqsn,
'.'.join((bfqsn, brokername)),
some_data_ready,
feed_is_live,
)
@ -333,7 +916,10 @@ async def allocate_persistent_feed(
# true fqsn
fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix
# and an entry that includes the broker-specific fqsn (including
# any new suffixes or elements as injected by the backend).
init_msg[fqsn] = msg
init_msg[bfqsn] = msg
# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
@ -347,13 +933,14 @@ async def allocate_persistent_feed(
await some_data_ready.wait()
# append ``.<broker>`` suffix to each quote symbol
bsym = symbol + f'.{brokername}'
acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}'
generic_first_quotes = {
bsym: first_quote,
acceptable_not_fqsn_with_broker_suffix: first_quote,
fqsn: first_quote,
}
bus.feeds[symbol] = bus.feeds[fqsn] = (
bus.feeds[symbol] = bus.feeds[bfqsn] = (
init_msg,
generic_first_quotes,
)
@ -363,9 +950,25 @@ async def allocate_persistent_feed(
# task_status.started((init_msg, generic_first_quotes))
task_status.started()
# backend will indicate when real-time quotes have begun.
if not start_stream:
await trio.sleep_forever()
# begin real-time updates of shm and tsb once the feed goes live and
# the backend will indicate when real-time quotes have begun.
await feed_is_live.wait()
# start shm incrementer task for OHLC style sampling
# at the current detected step period.
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
@ -388,7 +991,7 @@ async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
symbol: str,
symbol: str, # normally expected to the broker-specific fqsn
loglevel: str,
tick_throttle: Optional[float] = None,
start_stream: bool = True,
@ -410,7 +1013,9 @@ async def open_feed_bus(
# TODO: check for any stale shm entries for this symbol
# (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name
servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert brokername in servicename
bus = get_feed_bus(brokername)
@ -420,7 +1025,7 @@ async def open_feed_bus(
entry = bus.feeds.get(symbol)
if entry is None:
# allocate a new actor-local stream bus which
# will persist for this `brokerd`.
# will persist for this `brokerd`'s service lifetime.
async with bus.task_lock:
await bus.nursery.start(
partial(
@ -428,13 +1033,12 @@ async def open_feed_bus(
bus=bus,
brokername=brokername,
# here we pass through the selected symbol in native
# "format" (i.e. upper vs. lowercase depending on
# provider).
symbol=symbol,
loglevel=loglevel,
start_stream=start_stream,
)
)
# TODO: we can remove this?
@ -450,7 +1054,7 @@ async def open_feed_bus(
# true fqsn
fqsn = '.'.join([bfqsn, brokername])
assert fqsn in first_quotes
assert bus.feeds[fqsn]
assert bus.feeds[bfqsn]
# broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib)
bsym = symbol + f'.{brokername}'

View File

@ -14,36 +14,189 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
'''
``marketstore`` integration.
- client management routines
- ticK data ingest routines
- websocket client for subscribing to write triggers
- todo: tick sequence stream-cloning for testing
- todo: docker container management automation
"""
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Callable, Tuple
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from datetime import datetime
from pprint import pformat
from typing import (
Any,
Optional,
Union,
TYPE_CHECKING,
)
import time
from math import isnan
from bidict import bidict
import msgpack
import pyqtgraph as pg
import numpy as np
import pandas as pd
import pymarketstore as pymkts
import tractor
from trio_websocket import open_websocket_url
from anyio_marketstore import (
open_marketstore_client,
MarketstoreClient,
Params,
)
import pendulum
import purerpc
if TYPE_CHECKING:
import docker
from ._ahab import DockerContainer
from .feed import maybe_open_feed
from ..log import get_logger, get_console_log
from ..data import open_feed
log = get_logger(__name__)
_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK')
# container level config
_config = {
'grpc_listen_port': 5995,
'ws_listen_port': 5993,
'log_level': 'debug',
}
_yaml_config = '''
# piker's ``marketstore`` config.
# mount this config using:
# sudo docker run --mount \
# type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \
# 5993:5993 alpacamarkets/marketstore:latest
root_directory: data
listen_port: {ws_listen_port}
grpc_listen_port: {grpc_listen_port}
log_level: {log_level}
queryable: true
stop_grace_period: 0
wal_rotate_interval: 5
stale_threshold: 5
enable_add: true
enable_remove: false
triggers:
- module: ondiskagg.so
on: "*/1Sec/OHLCV"
config:
# filter: "nasdaq"
destinations:
- 1Min
- 5Min
- 15Min
- 1H
- 1D
- module: stream.so
on: '*/*/*'
# config:
# filter: "nasdaq"
'''.format(**_config)
def start_marketstore(
client: docker.DockerClient,
**kwargs,
) -> tuple[DockerContainer, dict[str, Any]]:
'''
Start and supervise a marketstore instance with its config bind-mounted
in from the piker config directory on the system.
The equivalent cli cmd to this code is:
sudo docker run --mount \
type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \
5993:5993 alpacamarkets/marketstore:latest
'''
import os
import docker
from .. import config
get_console_log('info', name=__name__)
yml_file = os.path.join(config._config_dir, 'mkts.yml')
if not os.path.isfile(yml_file):
log.warning(
f'No `marketstore` config exists?: {yml_file}\n'
'Generating new file from template:\n'
f'{_yaml_config}\n'
)
with open(yml_file, 'w') as yf:
yf.write(_yaml_config)
# create a mount from user's local piker config dir into container
config_dir_mnt = docker.types.Mount(
target='/etc',
source=config._config_dir,
type='bind',
)
# create a user config subdir where the marketstore
# backing filesystem database can be persisted.
persistent_data_dir = os.path.join(
config._config_dir, 'data',
)
if not os.path.isdir(persistent_data_dir):
os.mkdir(persistent_data_dir)
data_dir_mnt = docker.types.Mount(
target='/data',
source=persistent_data_dir,
type='bind',
)
dcntr: DockerContainer = client.containers.run(
'alpacamarkets/marketstore:latest',
# do we need this for cmds?
# '-i',
# '-p 5993:5993',
ports={
'5993/tcp': 5993, # jsonrpc / ws?
'5995/tcp': 5995, # grpc
},
mounts=[
config_dir_mnt,
data_dir_mnt,
],
detach=True,
# stop_signal='SIGINT',
init=True,
# remove=True,
)
return dcntr, _config
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
_url: str = 'http://localhost:5993/rpc'
_tick_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'),
('Nanoseconds', 'i4'),
('IsTrade', 'i1'),
('IsBid', 'i1'),
('Price', 'f4'),
('Size', 'f4')
]
_quote_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'),
@ -61,6 +214,7 @@ _quote_dt = [
# ('brokerd_ts', 'i64'),
# ('VWAP', 'f4')
]
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
_tick_map = {
'Up': 1,
@ -69,28 +223,49 @@ _tick_map = {
None: np.nan,
}
_ohlcv_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'),
# ('Nanoseconds', 'i4'),
class MarketStoreError(Exception):
"Generic marketstore client error"
# ohlcv sampling
('Open', 'f4'),
('High', 'f4'),
('Low', 'i8'),
('Close', 'i8'),
('Volume', 'f4'),
]
def err_on_resp(response: dict) -> None:
"""Raise any errors found in responses from client request.
"""
responses = response['responses']
if responses is not None:
for r in responses:
err = r['error']
if err:
raise MarketStoreError(err)
ohlc_key_map = bidict({
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
})
def mk_tbk(keys: tuple[str, str, str]) -> str:
'''
Generate a marketstore table key from a tuple.
Converts,
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
'''
return '/'.join(keys)
def quote_to_marketstore_structarray(
quote: Dict[str, Any],
last_fill: str,
quote: dict[str, Any],
last_fill: Optional[float]
) -> np.array:
"""Return marketstore writeable structarray from quote ``dict``.
"""
'''
Return marketstore writeable structarray from quote ``dict``.
'''
if last_fill:
# new fill bby
now = timestamp(last_fill)
@ -101,7 +276,7 @@ def quote_to_marketstore_structarray(
secs, ns = now / 10**9, now % 10**9
# pack into List[Tuple[str, Any]]
# pack into list[tuple[str, Any]]
array_input = []
# insert 'Epoch' entry first and then 'Nanoseconds'.
@ -123,146 +298,458 @@ def quote_to_marketstore_structarray(
return np.array([tuple(array_input)], dtype=_quote_dt)
def timestamp(datestr: str) -> int:
"""Return marketstore compatible 'Epoch' integer in nanoseconds
def timestamp(date, **kwargs) -> int:
'''
Return marketstore compatible 'Epoch' integer in nanoseconds
from a date formatted str.
"""
return int(pd.Timestamp(datestr).value)
'''
return int(pd.Timestamp(date, **kwargs).value)
def mk_tbk(keys: Tuple[str, str, str]) -> str:
"""Generate a marketstore table key from a tuple.
Converts,
``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"```
"""
return '{}/' + '/'.join(keys)
class Client:
"""Async wrapper around the alpaca ``pymarketstore`` sync client.
This will server as the shell for building out a proper async client
that isn't horribly documented and un-tested..
"""
def __init__(self, url: str):
self._client = pymkts.Client(url)
async def _invoke(
self,
meth: Callable,
*args,
**kwargs,
) -> Any:
return err_on_resp(meth(*args, **kwargs))
async def destroy(
self,
tbk: Tuple[str, str, str],
) -> None:
return await self._invoke(self._client.destroy, mk_tbk(tbk))
async def list_symbols(
self,
tbk: str,
) -> List[str]:
return await self._invoke(self._client.list_symbols, mk_tbk(tbk))
async def write(
self,
symbol: str,
array: np.ndarray,
) -> None:
start = time.time()
await self._invoke(
self._client.write,
array,
_tick_tbk.format(symbol),
isvariablelength=True
)
log.debug(f"{symbol} write time (s): {time.time() - start}")
def query(
self,
symbol,
tbk: Tuple[str, str] = _tick_tbk_ids,
) -> pd.DataFrame:
# XXX: causes crash
# client.query(pymkts.Params(symbol, '*', 'OHCLV'
result = self._client.query(
pymkts.Params(symbol, *tbk),
)
return result.first().df()
@asynccontextmanager
@acm
async def get_client(
url: str = _url,
) -> Client:
yield Client(url)
host: str = 'localhost',
port: int = 5995
) -> MarketstoreClient:
'''
Load a ``anyio_marketstore`` grpc client connected
to an existing ``marketstore`` server.
'''
async with open_marketstore_client(
host,
port
) as client:
yield client
class MarketStoreError(Exception):
"Generic marketstore client error"
# def err_on_resp(response: dict) -> None:
# """Raise any errors found in responses from client request.
# """
# responses = response['responses']
# if responses is not None:
# for r in responses:
# err = r['error']
# if err:
# raise MarketStoreError(err)
# map of seconds ints to "time frame" accepted keys
tf_in_1s = bidict({
1: '1Sec',
60: '1Min',
60*5: '5Min',
60*15: '15Min',
60*30: '30Min',
60*60: '1H',
60*60*24: '1D',
})
class Storage:
'''
High level storage api for both real-time and historical ingest.
'''
def __init__(
self,
client: MarketstoreClient,
) -> None:
# TODO: eventually this should be an api/interface type that
# ensures we can support multiple tsdb backends.
self.client = client
# series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {}
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None:
...
async def load(
self,
fqsn: str,
) -> tuple[
dict[int, np.ndarray], # timeframe (in secs) to series
Optional[datetime], # first dt
Optional[datetime], # last dt
]:
first_tsdb_dt, last_tsdb_dt = None, None
tsdb_arrays = await self.read_ohlcv(fqsn)
log.info(f'Loaded tsdb history {tsdb_arrays}')
if tsdb_arrays:
fastest = list(tsdb_arrays.values())[0]
times = fastest['Epoch']
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last]
)
return tsdb_arrays, first_tsdb_dt, last_tsdb_dt
async def read_ohlcv(
self,
fqsn: str,
timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
) -> tuple[
MarketstoreClient,
Union[dict, np.ndarray]
]:
client = self.client
syms = await client.list_symbols()
if fqsn not in syms:
return {}
tfstr = tf_in_1s[1]
params = Params(
symbols=fqsn,
timeframe=tfstr,
attrgroup='OHLCV',
end=end,
# limit_from_start=True,
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=int(800e3),
)
if timeframe is None:
log.info(f'starting {fqsn} tsdb granularity scan..')
# loop through and try to find highest granularity
for tfstr in tf_in_1s.values():
try:
log.info(f'querying for {tfstr}@{fqsn}')
params.set('timeframe', tfstr)
result = await client.query(params)
break
except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and
# thus shows up through `marketstored` logs relay.
# log.warning(f'{tfstr}@{fqsn} not found')
continue
else:
return {}
else:
result = await client.query(params)
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map
arrays = {}
for fqsn, data_set in result.by_symbols().items():
arrays.setdefault(fqsn, {})[
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
return await client.destroy(tbk=key)
async def write_ohlcv(
self,
fqsn: str,
ohlcv: np.ndarray,
append_and_duplicate: bool = True,
limit: int = int(800e3),
) -> None:
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(ohlcv),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = ohlcv[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
m, r = divmod(len(mkts_array), limit)
for i in range(m, 1):
to_push = mkts_array[i-1:i*limit]
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqsn}/1Sec/OHLCV',
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=append_and_duplicate,
)
log.info(
f'Wrote {mkts_array.size} datums to tsdb\n'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
if r:
to_push = mkts_array[m*limit:]
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqsn}/1Sec/OHLCV',
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=append_and_duplicate,
)
log.info(
f'Wrote {mkts_array.size} datums to tsdb\n'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
@acm
async def open_storage_client(
fqsn: str,
period: Optional[Union[int, str]] = None, # in seconds
) -> tuple[Storage, dict[str, np.ndarray]]:
'''
Load a series by key and deliver in ``numpy`` struct array format.
'''
async with (
# eventually a storage backend endpoint
get_client() as client,
):
# slap on our wrapper api
yield Storage(client)
async def tsdb_history_update(
fqsn: Optional[str] = None,
) -> list[str]:
# TODO: real-time dedicated task for ensuring
# history consistency between the tsdb, shm and real-time feed..
# update sequence design notes:
# - load existing highest frequency data from mkts
# * how do we want to offer this to the UI?
# - lazy loading?
# - try to load it all and expect graphics caching/diffing
# to hide extra bits that aren't in view?
# - compute the diff between latest data from broker and shm
# * use sql api in mkts to determine where the backend should
# start querying for data?
# * append any diff with new shm length
# * determine missing (gapped) history by scanning
# * how far back do we look?
# - begin rt update ingest and aggregation
# * could start by always writing ticks to mkts instead of
# worrying about a shm queue for now.
# * we have a short list of shm queues worth groking:
# - https://github.com/pikers/piker/issues/107
# * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98
#
profiler = pg.debug.Profiler(
disabled=False, # not pg_profile_enabled(),
delayed=False,
)
async with (
open_storage_client(fqsn) as storage,
maybe_open_feed(
[fqsn],
start_stream=False,
) as (feed, stream),
):
profiler(f'opened feed for {fqsn}')
to_append = feed.shm.array
to_prepend = None
if fqsn:
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)
# hist diffing
if tsdb_arrays:
onesec = tsdb_arrays[1]
# these aren't currently used but can be referenced from
# within the embedded ipython shell below.
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
profiler('Finished db arrays diffs')
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# TODO: ask if user wants to write history for detected
# available shm buffers?
from tractor.trionics import ipython_embed
await ipython_embed()
# for array in [to_append, to_prepend]:
# if array is None:
# continue
# log.info(
# f'Writing datums {array.size} -> to tsdb from shm\n'
# )
# await storage.write_ohlcv(fqsn, array)
# profiler('Finished db writes')
async def ingest_quote_stream(
symbols: List[str],
symbols: list[str],
brokername: str,
tries: int = 1,
loglevel: str = None,
) -> None:
"""Ingest a broker quote stream into marketstore in (sampled) tick format.
"""
async with open_feed(
brokername,
symbols,
loglevel=loglevel,
) as (first_quotes, qstream):
'''
Ingest a broker quote stream into a ``marketstore`` tsdb.
quote_cache = first_quotes.copy()
async with get_client() as ms_client:
# start ingest to marketstore
async for quotes in qstream:
'''
async with (
maybe_open_feed(brokername, symbols, loglevel=loglevel) as feed,
get_client() as ms_client,
):
async for quotes in feed.stream:
log.info(quotes)
for symbol, quote in quotes.items():
for tick in quote.get('ticks', ()):
ticktype = tick.get('type', 'n/a')
# remap tick strs to ints
quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
# techtonic tick write
array = quote_to_marketstore_structarray({
'IsTrade': 1 if ticktype == 'trade' else 0,
'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
'Price': tick.get('price'),
'Size': tick.get('size')
}, last_fill=quote.get('broker_ts', None))
# check for volume update (i.e. did trades happen
# since last quote)
new_vol = quote.get('volume', None)
if new_vol is None:
log.debug(f"No fills for {symbol}")
if new_vol == quote_cache.get('volume'):
# should never happen due to field diffing
# on sender side
log.error(
f"{symbol}: got same volume as last quote?")
await ms_client.write(array, _tick_tbk)
quote_cache.update(quote)
# LEGACY WRITE LOOP (using old tick dt)
# quote_cache = {
# 'size': 0,
# 'tick': 0
# }
a = quote_to_marketstore_structarray(
quote,
# TODO: check this closer to the broker query api
last_fill=quote.get('fill_time', '')
)
await ms_client.write(symbol, a)
# async for quotes in qstream:
# log.info(quotes)
# for symbol, quote in quotes.items():
# # remap tick strs to ints
# quote['tick'] = _tick_map[quote.get('tick', 'Equal')]
# # check for volume update (i.e. did trades happen
# # since last quote)
# new_vol = quote.get('volume', None)
# if new_vol is None:
# log.debug(f"No fills for {symbol}")
# if new_vol == quote_cache.get('volume'):
# # should never happen due to field diffing
# # on sender side
# log.error(
# f"{symbol}: got same volume as last quote?")
# quote_cache.update(quote)
# a = quote_to_marketstore_structarray(
# quote,
# # TODO: check this closer to the broker query api
# last_fill=quote.get('fill_time', '')
# )
# await ms_client.write(symbol, a)
async def stream_quotes(
symbols: List[str],
symbols: list[str],
host: str = 'localhost',
port: int = 5993,
diff_cached: bool = True,
loglevel: str = None,
) -> None:
"""Open a symbol stream from a running instance of marketstore and
'''
Open a symbol stream from a running instance of marketstore and
log to console.
"""
'''
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server
@ -271,7 +758,7 @@ async def stream_quotes(
)
log.info(resp)
async def recv() -> Dict[str, Any]:
async def recv() -> dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8')
streams = (await recv())['streams']

View File

@ -167,6 +167,7 @@ def _wma(
assert length == len(weights)
# lol, for long sequences this is nutso slow and expensive..
return np.convolve(signal, weights, 'valid')

View File

@ -309,7 +309,7 @@ async def flow_rates(
if period > 1:
trade_rate_wma = _wma(
dvlm_shm.array['trade_count'],
dvlm_shm.array['trade_count'][-period:],
period,
weights=weights,
)
@ -332,7 +332,7 @@ async def flow_rates(
if period > 1:
dark_trade_rate_wma = _wma(
dvlm_shm.array['dark_trade_count'],
dvlm_shm.array['dark_trade_count'][-period:],
period,
weights=weights,
)

View File

@ -191,6 +191,9 @@ class ContentsLabel(pg.LabelItem):
self.setText(
"<b>i</b>:{index}<br/>"
# NB: these fields must be indexed in the correct order via
# the slice syntax below.
"<b>epoch</b>:{}<br/>"
"<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>"
"<b>L</b>:{}<br/>"
@ -198,7 +201,15 @@ class ContentsLabel(pg.LabelItem):
"<b>V</b>:{}<br/>"
"<b>wap</b>:{}".format(
*array[index - first][
['open', 'high', 'low', 'close', 'volume', 'bar_wap']
[
'time',
'open',
'high',
'low',
'close',
'volume',
'bar_wap',
]
],
name=name,
index=index,
@ -295,7 +306,8 @@ class ContentsLabels:
class Cursor(pg.GraphicsObject):
'''Multi-plot cursor for use on a ``LinkedSplits`` chart (set).
'''
Multi-plot cursor for use on a ``LinkedSplits`` chart (set).
'''
def __init__(
@ -310,7 +322,7 @@ class Cursor(pg.GraphicsObject):
self.linked = linkedsplits
self.graphics: dict[str, pg.GraphicsObject] = {}
self.plots: List['PlotChartWidget'] = [] # type: ignore # noqa
self.plots: list['PlotChartWidget'] = [] # type: ignore # noqa
self.active_plot = None
self.digits: int = digits
self._datum_xy: tuple[int, float] = (0, 0)
@ -439,7 +451,10 @@ class Cursor(pg.GraphicsObject):
if plot.linked.xaxis_chart is plot:
xlabel = self.xaxis_label = XAxisLabel(
parent=self.plots[plot_index].getAxis('bottom'),
# parent=self.plots[plot_index].pi_overlay.get_axis(plot.plotItem, 'bottom'),
# parent=self.plots[plot_index].pi_overlay.get_axis(
# plot.plotItem, 'bottom'
# ),
opacity=_ch_label_opac,
bg_color=self.label_color,
)

View File

@ -29,6 +29,7 @@ from typing import Optional, Any, Callable
import numpy as np
import tractor
import trio
import pendulum
import pyqtgraph as pg
from .. import brokers
@ -47,6 +48,7 @@ from ._fsp import (
open_vlm_displays,
)
from ..data._sharedmem import ShmArray
from ..data._source import tf_in_1s
from ._forms import (
FieldsForm,
mk_order_pane_layout,
@ -398,9 +400,11 @@ def graphics_update_cycle(
)
if (
(xpx < update_uppx or i_diff > 0)
or trigger_all
(
xpx < update_uppx or i_diff > 0
and liv
)
or trigger_all
):
# TODO: make it so this doesn't have to be called
# once the $vlm is up?
@ -494,6 +498,7 @@ def graphics_update_cycle(
if (
xpx < update_uppx
or i_diff > 0
or trigger_all
):
chart.update_graphics_from_array(
chart.name,
@ -661,11 +666,17 @@ async def display_symbol_data(
symbol = feed.symbols[sym]
fqsn = symbol.front_fqsn()
times = bars['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
step_size_s = (end - start).seconds
tf_key = tf_in_1s[step_size_s]
# load in symbol's ohlc data
godwidget.window.setWindowTitle(
f'{fqsn} '
f'tick:{symbol.tick_size} '
f'step:1s '
f'step:{tf_key} '
)
linked = godwidget.linkedsplits

View File

@ -7,3 +7,7 @@
# pin this to a dev branch that we have more control over especially
# as more graphics stuff gets hashed out.
-e git+https://github.com/pikers/pyqtgraph.git@piker_pin#egg=pyqtgraph
# our async client for ``marketstore`` (the tsdb)
-e git+https://github.com/pikers/anyio-marketstore.git@master#egg=anyio-marketstore

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -30,11 +30,13 @@ orig_win_id = t.find_focused().window
# for tws
win_names: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway.', # gw running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
]
for name in win_names:
results = t.find_named(name)
results = t.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
print(f'Resetting data feed for {name}')
@ -47,6 +49,15 @@ for name in win_names:
# https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool
# TODO: only run the reconnect (2nd) kc on a detected
# disconnect?
for key_combo, timeout in [
# only required if we need a connection reset.
# ('ctrl+alt+r', 12),
# data feed reset.
('ctrl+alt+f', 6)
]:
subprocess.call([
'xdotool',
'windowactivate', '--sync', win_id,
@ -56,12 +67,13 @@ for name in win_names:
'mousemove_relative', '--sync', str(w-4), str(h-4),
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id, '--repeat', '3', '1',
'click', '--window', win_id,
'--repeat', '3', '1',
# hackzorzes
'key', 'ctrl+alt+f',
'key', key_combo,
],
timeout=1,
timeout=timeout,
)
# re-activate and focus original window

View File

@ -77,6 +77,14 @@ setup(
# tsdbs
'pymarketstore',
],
extras_require={
# serialization
'tsdb': [
'docker',
],
},
tests_require=['pytest'],
python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``...
keywords=["async", "trading", "finance", "quant", "charting"],