diff --git a/piker/_daemon.py b/piker/_daemon.py index 053d4864..999b8fce 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -19,7 +19,7 @@ Structured, daemon tree service management. """ from typing import Optional, Union, Callable, Any -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from collections import defaultdict from pydantic import BaseModel @@ -130,7 +130,7 @@ class Services(BaseModel): _services: Optional[Services] = None -@asynccontextmanager +@acm async def open_pikerd( start_method: str = 'trio', loglevel: Optional[str] = None, @@ -185,7 +185,7 @@ async def open_pikerd( yield _services -@asynccontextmanager +@acm async def open_piker_runtime( name: str, enable_modules: list[str] = [], @@ -226,7 +226,7 @@ async def open_piker_runtime( yield tractor.current_actor() -@asynccontextmanager +@acm async def maybe_open_runtime( loglevel: Optional[str] = None, **kwargs, @@ -249,7 +249,7 @@ async def maybe_open_runtime( yield -@asynccontextmanager +@acm async def maybe_open_pikerd( loglevel: Optional[str] = None, **kwargs, @@ -300,7 +300,36 @@ class Brokerd: locks = defaultdict(trio.Lock) -@asynccontextmanager +@acm +async def find_service( + service_name: str, +) -> Optional[tractor.Portal]: + + log.info(f'Scanning for service `{service_name}`') + # attach to existing daemon by name if possible + async with tractor.find_actor( + service_name, + arbiter_sockaddr=_registry_addr, + ) as maybe_portal: + yield maybe_portal + + +async def check_for_service( + service_name: str, + +) -> bool: + ''' + Service daemon "liveness" predicate. + + ''' + async with tractor.query_actor( + service_name, + arbiter_sockaddr=_registry_addr, + ) as sockaddr: + return sockaddr + + +@acm async def maybe_spawn_daemon( service_name: str, @@ -310,7 +339,7 @@ async def maybe_spawn_daemon( **kwargs, ) -> tractor.Portal: - """ + ''' If no ``service_name`` daemon-actor can be found, spawn one in a local subactor and return a portal to it. @@ -321,7 +350,7 @@ async def maybe_spawn_daemon( This can be seen as a service starting api for remote-actor clients. - """ + ''' if loglevel: get_console_log(loglevel) @@ -330,19 +359,13 @@ async def maybe_spawn_daemon( lock = Brokerd.locks[service_name] await lock.acquire() - log.info(f'Scanning for existing {service_name}') - # attach to existing daemon by name if possible - async with tractor.find_actor( - service_name, - arbiter_sockaddr=_registry_addr, - - ) as portal: + async with find_service(service_name) as portal: if portal is not None: lock.release() yield portal return - log.warning(f"Couldn't find any existing {service_name}") + log.warning(f"Couldn't find any existing {service_name}") # ask root ``pikerd`` daemon to spawn the daemon we need if # pikerd is not live we now become the root of the @@ -423,7 +446,7 @@ async def spawn_brokerd( return True -@asynccontextmanager +@acm async def maybe_spawn_brokerd( brokername: str, @@ -431,7 +454,9 @@ async def maybe_spawn_brokerd( **kwargs, ) -> tractor.Portal: - '''Helper to spawn a brokerd service. + ''' + Helper to spawn a brokerd service *from* a client + who wishes to use the sub-actor-daemon. ''' async with maybe_spawn_daemon( @@ -483,7 +508,7 @@ async def spawn_emsd( return True -@asynccontextmanager +@acm async def maybe_open_emsd( brokername: str, diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index 6fcf11f7..d1b2aac5 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -33,7 +33,41 @@ class SymbolNotFound(BrokerError): class NoData(BrokerError): - "Symbol data not permitted" + ''' + Symbol data not permitted or no data + for time range found. + + ''' + def __init__( + self, + *args, + frame_size: int = 1000, + + ) -> None: + super().__init__(*args) + + # when raised, machinery can check if the backend + # set a "frame size" for doing datetime calcs. + self.frame_size: int = 1000 + + +class DataUnavailable(BrokerError): + ''' + Signal storage requests to terminate. + + ''' + # TODO: add in a reason that can be displayed in the + # UI (for eg. `kraken` is bs and you should complain + # to them that you can't pull more OHLC data..) + + +class DataThrottle(BrokerError): + ''' + Broker throttled request rate for data. + + ''' + # TODO: add in throttle metrics/feedback + def resproc( @@ -50,12 +84,12 @@ def resproc( if not resp.status_code == 200: raise BrokerError(resp.body) try: - json = resp.json() + msg = resp.json() except json.decoder.JSONDecodeError: log.exception(f"Failed to process {resp}:\n{resp.text}") raise BrokerError(resp.text) if log_resp: - log.debug(f"Received json contents:\n{colorize_json(json)}") + log.debug(f"Received json contents:\n{colorize_json(msg)}") - return json if return_json else resp + return msg if return_json else resp diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index d0a00b68..bac81bb0 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -19,6 +19,7 @@ Binance backend """ from contextlib import asynccontextmanager as acm +from datetime import datetime from typing import ( Any, Union, Optional, AsyncGenerator, Callable, @@ -221,20 +222,22 @@ class Client: async def bars( self, symbol: str, - start_time: int = None, - end_time: int = None, + start_dt: Optional[datetime] = None, + end_dt: Optional[datetime] = None, limit: int = 1000, # <- max allowed per query as_np: bool = True, ) -> dict: - if start_time is None: - start_time = binance_timestamp( - pendulum.now('UTC').start_of('minute').subtract(minutes=limit) - ) + if end_dt is None: + end_dt = pendulum.now('UTC') - if end_time is None: - end_time = binance_timestamp(pendulum.now('UTC')) + if start_dt is None: + start_dt = end_dt.start_of( + 'minute').subtract(minutes=limit) + + start_time = binance_timestamp(start_dt) + end_time = binance_timestamp(end_dt) # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data bars = await self._api( @@ -379,7 +382,27 @@ async def open_history_client( # TODO implement history getter for the new storage layer. async with open_cached_client('binance') as client: - yield client + + async def get_ohlc( + end_dt: Optional[datetime] = None, + start_dt: Optional[datetime] = None, + + ) -> tuple[ + np.ndarray, + datetime, # start + datetime, # end + ]: + + array = await client.bars( + symbol, + start_dt=start_dt, + end_dt=end_dt, + ) + start_dt = pendulum.from_timestamp(array[0]['time']) + end_dt = pendulum.from_timestamp(array[-1]['time']) + return array, start_dt, end_dt + + yield get_ohlc, {'erlangs': 3, 'rate': 3} async def backfill_bars( diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index f66f81c5..20865e30 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -57,6 +57,8 @@ from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client from fuzzywuzzy import process as fuzzy import numpy as np +import pendulum + from .. import config from ..log import get_logger, get_console_log @@ -295,6 +297,10 @@ class Client: global _enters # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') print(f'REQUESTING BARS {_enters} @ end={end_dt}') + + if not end_dt: + end_dt = '' + _enters += 1 contract = await self.find_contract(fqsn) @@ -1438,8 +1444,6 @@ async def get_bars( a ``MethoProxy``. ''' - import pendulum - fails = 0 bars: Optional[list] = None first_dt: datetime = None @@ -1467,7 +1471,9 @@ async def get_bars( time = bars_array['time'] assert time[-1] == last_dt.timestamp() assert time[0] == first_dt.timestamp() - log.info(f'bars retreived for dts {first_dt}:{last_dt}') + log.info( + f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' + ) return (bars, bars_array, first_dt, last_dt), fails @@ -1478,21 +1484,30 @@ async def get_bars( if 'No market data permissions for' in msg: # TODO: signalling for no permissions searches - raise NoData(f'Symbol: {fqsn}') - break + raise NoData( + f'Symbol: {fqsn}', + ) elif ( err.code == 162 and 'HMDS query returned no data' in err.message ): - # try to decrement start point and look further back - end_dt = last_dt = last_dt.subtract(seconds=2000) + # XXX: this is now done in the storage mgmt layer + # and we shouldn't implicitly decrement the frame dt + # index since the upper layer may be doing so + # concurrently and we don't want to be delivering frames + # that weren't asked for. log.warning( - f'No data found ending @ {end_dt}\n' - f'Starting another request for {end_dt}' + f'NO DATA found ending @ {end_dt}\n' ) - continue + # try to decrement start point and look further back + # end_dt = last_dt = last_dt.subtract(seconds=2000) + + raise NoData( + f'Symbol: {fqsn}', + frame_size=2000, + ) elif _pacing in msg: @@ -1546,8 +1561,8 @@ async def open_history_client( async with open_client_proxy() as proxy: async def get_hist( - end_dt: str, - start_dt: str = '', + end_dt: Optional[datetime] = None, + start_dt: Optional[datetime] = None, ) -> tuple[np.ndarray, str]: @@ -1558,7 +1573,10 @@ async def open_history_client( if out == (None, None): # could be trying to retreive bars over weekend log.error(f"Can't grab bars starting at {end_dt}!?!?") - raise NoData(f'{end_dt}') + raise NoData( + f'{end_dt}', + frame_size=2000, + ) bars, bars_array, first_dt, last_dt = out @@ -1569,7 +1587,12 @@ async def open_history_client( return bars_array, first_dt, last_dt - yield get_hist + # TODO: it seems like we can do async queries for ohlc + # but getting the order right still isn't working and I'm not + # quite sure why.. needs some tinkering and probably + # a lookthrough of the ``ib_insync`` machinery, for eg. maybe + # we have to do the batch queries on the `asyncio` side? + yield get_hist, {'erlangs': 1, 'rate': 6} async def backfill_bars( @@ -1831,6 +1854,7 @@ async def stream_quotes( symbol=sym, ) first_quote = normalize(first_ticker) + # print(f'first quote: {first_quote}') def mk_init_msgs() -> dict[str, dict]: # pass back some symbol info like min_tick, trading_hours, etc. diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 44e4e6b0..30e57b9e 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -20,7 +20,8 @@ Kraken backend. ''' from contextlib import asynccontextmanager as acm from dataclasses import asdict, field -from typing import Any, Optional, AsyncIterator, Callable +from datetime import datetime +from typing import Any, Optional, AsyncIterator, Callable, Union import time from trio_typing import TaskStatus @@ -40,7 +41,13 @@ import base64 from .. import config from .._cacheables import open_cached_client -from ._util import resproc, SymbolNotFound, BrokerError +from ._util import ( + resproc, + SymbolNotFound, + BrokerError, + DataThrottle, + DataUnavailable, +) from ..log import get_logger, get_console_log from ..data import ShmArray from ..data._web_bs import open_autorecon_ws, NoBsWs @@ -305,7 +312,7 @@ class Client: action: str, size: float, reqid: str = None, - validate: bool = False # set True test call without a real submission + validate: bool = False # set True test call without a real submission ) -> dict: ''' Place an order and return integer request id provided by client. @@ -391,17 +398,26 @@ class Client: async def bars( self, symbol: str = 'XBTUSD', + # UTC 2017-07-02 12:53:20 - since: int = None, + since: Optional[Union[int, datetime]] = None, count: int = 720, # <- max allowed per query as_np: bool = True, + ) -> dict: + if since is None: since = pendulum.now('UTC').start_of('minute').subtract( minutes=count).timestamp() + elif isinstance(since, int): + since = pendulum.from_timestamp(since).timestamp() + + else: # presumably a pendulum datetime + since = since.timestamp() + # UTC 2017-07-02 12:53:20 is oldest seconds value - since = str(max(1499000000, since)) + since = str(max(1499000000, int(since))) json = await self._public( 'OHLC', data={ @@ -445,7 +461,16 @@ class Client: array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars return array except KeyError: - raise SymbolNotFound(json['error'][0] + f': {symbol}') + errmsg = json['error'][0] + + if 'not found' in errmsg: + raise SymbolNotFound(errmsg + f': {symbol}') + + elif 'Too many requests' in errmsg: + raise DataThrottle(f'{symbol}') + + else: + raise BrokerError(errmsg) @acm @@ -668,8 +693,8 @@ async def handle_order_requests( oid=msg.oid, reqid=msg.reqid, symbol=msg.symbol, - # TODO: maybe figure out if pending cancels will - # eventually get cancelled + # TODO: maybe figure out if pending + # cancels will eventually get cancelled reason="Order cancel is still pending?", broker_details=resp ).dict() @@ -1003,7 +1028,45 @@ async def open_history_client( # TODO implement history getter for the new storage layer. async with open_cached_client('kraken') as client: - yield client + + # lol, kraken won't send any more then the "last" + # 720 1m bars.. so we have to just ignore further + # requests of this type.. + queries: int = 0 + + async def get_ohlc( + end_dt: Optional[datetime] = None, + start_dt: Optional[datetime] = None, + + ) -> tuple[ + np.ndarray, + datetime, # start + datetime, # end + ]: + + nonlocal queries + if queries > 0: + raise DataUnavailable + + count = 0 + while count <= 3: + try: + array = await client.bars( + symbol, + since=end_dt, + ) + count += 1 + queries += 1 + break + except DataThrottle: + log.warning(f'kraken OHLC throttle for {symbol}') + await trio.sleep(1) + + start_dt = pendulum.from_timestamp(array[0]['time']) + end_dt = pendulum.from_timestamp(array[-1]['time']) + return array, start_dt, end_dt + + yield get_ohlc, {'erlangs': 1, 'rate': 1} async def backfill_bars( diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 7eb7b5d1..e9512322 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -1,7 +1,25 @@ -""" +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' CLI commons. -""" + +''' import os +from pprint import pformat import click import trio @@ -16,29 +34,22 @@ from .. import config log = get_logger('cli') DEFAULT_BROKER = 'questrade' -_config_dir = click.get_app_dir('piker') -_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') -_context_defaults = dict( - default_map={ - # Questrade specific quote poll rates - 'monitor': { - 'rate': 3, - }, - 'optschain': { - 'rate': 1, - }, - } -) - @click.command() @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--pdb', is_flag=True, help='Enable tractor debug mode') @click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') -def pikerd(loglevel, host, tl, pdb): - """Spawn the piker broker-daemon. - """ +@click.option( + '--tsdb', + is_flag=True, + help='Enable local ``marketstore`` instance' +) +def pikerd(loglevel, host, tl, pdb, tsdb): + ''' + Spawn the piker broker-daemon. + + ''' from .._daemon import open_pikerd log = get_console_log(loglevel) @@ -52,13 +63,38 @@ def pikerd(loglevel, host, tl, pdb): )) async def main(): - async with open_pikerd(loglevel=loglevel, debug_mode=pdb): + + async with ( + open_pikerd( + loglevel=loglevel, + debug_mode=pdb, + ), # normally delivers a ``Services`` handle + trio.open_nursery() as n, + ): + if tsdb: + from piker.data._ahab import start_ahab + from piker.data.marketstore import start_marketstore + + log.info('Spawning `marketstore` supervisor') + ctn_ready, config, (cid, pid) = await n.start( + start_ahab, + 'marketstored', + start_marketstore, + + ) + log.info( + f'`marketstore` up!\n' + f'`marketstored` pid: {pid}\n' + f'docker container id: {cid}\n' + f'config: {pformat(config)}' + ) + await trio.sleep_forever() trio.run(main) -@click.group(context_settings=_context_defaults) +@click.group(context_settings=config._context_defaults) @click.option( '--brokers', '-b', default=[DEFAULT_BROKER], @@ -87,8 +123,8 @@ def cli(ctx, brokers, loglevel, tl, configdir): 'loglevel': loglevel, 'tractorloglevel': None, 'log': get_console_log(loglevel), - 'confdir': _config_dir, - 'wl_path': _watchlists_data_path, + 'confdir': config._config_dir, + 'wl_path': config._watchlists_data_path, }) # allow enabling same loglevel in ``tractor`` machinery diff --git a/piker/config.py b/piker/config.py index 93a47378..cf946405 100644 --- a/piker/config.py +++ b/piker/config.py @@ -17,6 +17,8 @@ """ Broker configuration mgmt. """ +import platform +import sys import os from os.path import dirname import shutil @@ -24,14 +26,100 @@ from typing import Optional from bidict import bidict import toml -import click from .log import get_logger log = get_logger('broker-config') -_config_dir = click.get_app_dir('piker') + +# taken from ``click`` since apparently they have some +# super weirdness with sigint and sudo..no clue +def get_app_dir(app_name, roaming=True, force_posix=False): + r"""Returns the config folder for the application. The default behavior + is to return whatever is most appropriate for the operating system. + + To give you an idea, for an app called ``"Foo Bar"``, something like + the following folders could be returned: + + Mac OS X: + ``~/Library/Application Support/Foo Bar`` + Mac OS X (POSIX): + ``~/.foo-bar`` + Unix: + ``~/.config/foo-bar`` + Unix (POSIX): + ``~/.foo-bar`` + Win XP (roaming): + ``C:\Documents and Settings\\Local Settings\Application Data\Foo Bar`` + Win XP (not roaming): + ``C:\Documents and Settings\\Application Data\Foo Bar`` + Win 7 (roaming): + ``C:\Users\\AppData\Roaming\Foo Bar`` + Win 7 (not roaming): + ``C:\Users\\AppData\Local\Foo Bar`` + + .. versionadded:: 2.0 + + :param app_name: the application name. This should be properly capitalized + and can contain whitespace. + :param roaming: controls if the folder should be roaming or not on Windows. + Has no affect otherwise. + :param force_posix: if this is set to `True` then on any POSIX system the + folder will be stored in the home folder with a leading + dot instead of the XDG config home or darwin's + application support folder. + """ + + def _posixify(name): + return "-".join(name.split()).lower() + + # if WIN: + if platform.system() == 'Windows': + key = "APPDATA" if roaming else "LOCALAPPDATA" + folder = os.environ.get(key) + if folder is None: + folder = os.path.expanduser("~") + return os.path.join(folder, app_name) + if force_posix: + return os.path.join(os.path.expanduser("~/.{}".format(_posixify(app_name)))) + if sys.platform == "darwin": + return os.path.join( + os.path.expanduser("~/Library/Application Support"), app_name + ) + return os.path.join( + os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), + _posixify(app_name), + ) + + +_config_dir = _click_config_dir = get_app_dir('piker') +_parent_user = os.environ.get('SUDO_USER') + +if _parent_user: + non_root_user_dir = os.path.expanduser( + f'~{_parent_user}' + ) + root = 'root' + _config_dir = ( + non_root_user_dir + + _click_config_dir[ + _click_config_dir.rfind(root) + len(root): + ] + ) + _file_name = 'brokers.toml' +_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') +_context_defaults = dict( + default_map={ + # Questrade specific quote poll rates + 'monitor': { + 'rate': 3, + }, + 'optschain': { + 'rate': 1, + }, + } +) def _override_config_dir( diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py new file mode 100644 index 00000000..0f96ecaa --- /dev/null +++ b/piker/data/_ahab.py @@ -0,0 +1,370 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Supervisor for docker with included specific-image service helpers. + +''' +import os +from typing import ( + Optional, + Callable, + Any, +) +from contextlib import asynccontextmanager as acm + +import trio +from trio_typing import TaskStatus +import tractor +from tractor.msg import NamespacePath +import docker +import json +from docker.models.containers import Container as DockerContainer +from docker.errors import ( + DockerException, + APIError, +) +from requests.exceptions import ConnectionError, ReadTimeout + +from ..log import get_logger, get_console_log +from .. import config + +log = get_logger(__name__) + + +class DockerNotStarted(Exception): + 'Prolly you dint start da daemon bruh' + + +class ContainerError(RuntimeError): + 'Error reported via app-container logging level' + + +@acm +async def open_docker( + url: Optional[str] = None, + **kwargs, + +) -> docker.DockerClient: + + client: Optional[docker.DockerClient] = None + try: + client = docker.DockerClient( + base_url=url, + **kwargs + ) if url else docker.from_env(**kwargs) + + yield client + + except ( + DockerException, + APIError, + ) as err: + + def unpack_msg(err: Exception) -> str: + args = getattr(err, 'args', None) + if args: + return args + else: + return str(err) + + # could be more specific so let's check if it's just perms. + if err.args: + errs = err.args + for err in errs: + msg = unpack_msg(err) + if 'PermissionError' in msg: + raise DockerException('You dint run as root yo!') + + elif 'FileNotFoundError' in msg: + raise DockerNotStarted('Did you start da service sister?') + + # not perms? + raise + + finally: + if client: + client.close() + for c in client.containers.list(): + c.kill() + + +class Container: + ''' + Wrapper around a ``docker.models.containers.Container`` to include + log capture and relay through our native logging system and helper + method(s) for cancellation/teardown. + + ''' + def __init__( + self, + cntr: DockerContainer, + ) -> None: + + self.cntr = cntr + # log msg de-duplication + self.seen_so_far = set() + + async def process_logs_until( + self, + patt: str, + bp_on_msg: bool = False, + ) -> bool: + ''' + Attempt to capture container log messages and relay through our + native logging system. + + ''' + seen_so_far = self.seen_so_far + + while True: + logs = self.cntr.logs() + entries = logs.decode().split('\n') + for entry in entries: + + # ignore null lines + if not entry: + continue + + try: + record = json.loads(entry.strip()) + except json.JSONDecodeError: + if 'Error' in entry: + raise RuntimeError(entry) + raise + + msg = record['msg'] + level = record['level'] + if msg and entry not in seen_so_far: + seen_so_far.add(entry) + if bp_on_msg: + await tractor.breakpoint() + + getattr(log, level, log.error)(f'{msg}') + + # print(f'level: {level}') + if level in ('error', 'fatal'): + raise ContainerError(msg) + + if patt in msg: + return True + + # do a checkpoint so we don't block if cancelled B) + await trio.sleep(0.01) + + return False + + def try_signal( + self, + signal: str = 'SIGINT', + + ) -> bool: + try: + # XXX: market store doesn't seem to shutdown nicely all the + # time with this (maybe because there are still open grpc + # connections?) noticably after client connections have been + # made or are in use/teardown. It works just fine if you + # just start and stop the container tho?.. + log.cancel(f'SENDING {signal} to {self.cntr.id}') + self.cntr.kill(signal) + return True + + except docker.errors.APIError as err: + if 'is not running' in err.explanation: + return False + + async def cancel( + self, + ) -> None: + + cid = self.cntr.id + self.try_signal('SIGINT') + + with trio.move_on_after(0.5) as cs: + cs.shield = True + await self.process_logs_until('initiating graceful shutdown') + await self.process_logs_until('exiting...',) + + for _ in range(10): + with trio.move_on_after(0.5) as cs: + cs.shield = True + await self.process_logs_until('exiting...',) + break + + if cs.cancelled_caught: + # get out the big guns, bc apparently marketstore + # doesn't actually know how to terminate gracefully + # :eyeroll:... + self.try_signal('SIGKILL') + + try: + log.info('Waiting on container shutdown: {cid}') + self.cntr.wait( + timeout=0.1, + condition='not-running', + ) + break + + except ( + ReadTimeout, + ConnectionError, + ): + log.error(f'failed to wait on container {cid}') + raise + + else: + raise RuntimeError('Failed to cancel container {cid}') + + log.cancel(f'Container stopped: {cid}') + + +@tractor.context +async def open_ahabd( + ctx: tractor.Context, + endpoint: str, # ns-pointer str-msg-type + + **kwargs, + +) -> None: + get_console_log('info', name=__name__) + + async with open_docker() as client: + + # TODO: eventually offer a config-oriented API to do the mounts, + # params, etc. passing to ``Containter.run()``? + # call into endpoint for container config/init + ep_func = NamespacePath(endpoint).load_ref() + dcntr, cntr_config = ep_func(client) + cntr = Container(dcntr) + + with trio.move_on_after(1): + found = await cntr.process_logs_until( + "launching tcp listener for all services...", + ) + + if not found and cntr not in client.containers.list(): + raise RuntimeError( + 'Failed to start `marketstore` check logs deats' + ) + + await ctx.started(( + cntr.cntr.id, + os.getpid(), + cntr_config, + )) + + try: + + # TODO: we might eventually want a proxy-style msg-prot here + # to allow remote control of containers without needing + # callers to have root perms? + await trio.sleep_forever() + + except ( + BaseException, + # trio.Cancelled, + # KeyboardInterrupt, + ): + + with trio.CancelScope(shield=True): + await cntr.cancel() + + raise + + +async def start_ahab( + service_name: str, + endpoint: Callable[docker.DockerClient, DockerContainer], + task_status: TaskStatus[ + tuple[ + trio.Event, + dict[str, Any], + ], + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Start a ``docker`` container supervisor with given service name. + + Currently the actor calling this task should normally be started + with root permissions (until we decide to use something that doesn't + require this, like docker's rootless mode or some wrapper project) but + te root perms are de-escalated after the docker supervisor sub-actor + is started. + + ''' + cn_ready = trio.Event() + try: + async with tractor.open_nursery( + loglevel='runtime', + ) as tn: + + portal = await tn.start_actor( + service_name, + enable_modules=[__name__] + ) + + # TODO: we have issues with this on teardown + # where ``tractor`` tries to issue ``os.kill()`` + # and hits perms errors since the root process + # doesn't any longer have root perms.. + + # de-escalate root perms to the original user + # after the docker supervisor actor is spawned. + if config._parent_user: + import pwd + os.setuid( + pwd.getpwnam( + config._parent_user + )[2] # named user's uid + ) + + async with portal.open_context( + open_ahabd, + endpoint=str(NamespacePath.from_ref(endpoint)), + ) as (ctx, first): + + cid, pid, cntr_config = first + + task_status.started(( + cn_ready, + cntr_config, + (cid, pid), + )) + + await trio.sleep_forever() + + # since we demoted root perms in this parent + # we'll get a perms error on proc cleanup in + # ``tractor`` nursery exit. just make sure + # the child is terminated and don't raise the + # error if so. + + # TODO: we could also consider adding + # a ``tractor.ZombieDetected`` or something that we could raise + # if we find the child didn't terminate. + except PermissionError: + log.warning('Failed to cancel root permsed container') + + except ( + trio.MultiError, + ) as err: + for subexc in err.exceptions: + if isinstance(subexc, PermissionError): + log.warning('Failed to cancel root perms-ed container') + return + else: + raise diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index d31bf7b1..8bc677cf 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -22,14 +22,16 @@ financial data flows. from __future__ import annotations from collections import Counter import time +from typing import TYPE_CHECKING, Optional import tractor import trio from trio_typing import TaskStatus -from ._sharedmem import ShmArray from ..log import get_logger +if TYPE_CHECKING: + from ._sharedmem import ShmArray log = get_logger(__name__) @@ -88,6 +90,7 @@ async def increment_ohlc_buffer( total_s = 0 # total seconds counted lowest = min(sampler.ohlcv_shms.keys()) + lowest_shm = sampler.ohlcv_shms[lowest][0] ad = lowest - 0.001 with trio.CancelScope() as cs: @@ -131,21 +134,41 @@ async def increment_ohlc_buffer( # write to the buffer shm.push(last) - # broadcast the buffer index step to any subscribers for - # a given sample period. - subs = sampler.subscribers.get(delay_s, ()) + await broadcast(delay_s, shm=lowest_shm) - for stream in subs: - try: - await stream.send({'index': shm._last.value}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error( - f'{stream._ctx.chan.uid} dropped connection' - ) - subs.remove(stream) + +async def broadcast( + delay_s: int, + shm: Optional[ShmArray] = None, + +) -> None: + # broadcast the buffer index step to any subscribers for + # a given sample period. + subs = sampler.subscribers.get(delay_s, ()) + + last = -1 + + if shm is None: + periods = sampler.ohlcv_shms.keys() + # if this is an update triggered by a history update there + # might not actually be any sampling bus setup since there's + # no "live feed" active yet. + if periods: + lowest = min(periods) + shm = sampler.ohlcv_shms[lowest][0] + last = shm._last.value + + for stream in subs: + try: + await stream.send({'index': last}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error( + f'{stream._ctx.chan.uid} dropped connection' + ) + subs.remove(stream) @tractor.context @@ -365,7 +388,12 @@ async def uniform_rate_send( if left_to_sleep > 0: with trio.move_on_after(left_to_sleep) as cs: - sym, last_quote = await quote_stream.receive() + try: + sym, last_quote = await quote_stream.receive() + except trio.EndOfChannel: + log.exception(f"feed for {stream} ended?") + break + diff = time.time() - last_send if not first_quote: diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 6bc69eb4..8848ec1c 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -22,7 +22,6 @@ from __future__ import annotations from sys import byteorder from typing import Optional from multiprocessing.shared_memory import SharedMemory, _USE_POSIX -from multiprocessing import resource_tracker as mantracker if _USE_POSIX: from _posixshmem import shm_unlink @@ -30,6 +29,7 @@ if _USE_POSIX: import tractor import numpy as np from pydantic import BaseModel +from numpy.lib import recfunctions as rfn from ..log import get_logger from ._source import base_iohlc_dtype @@ -40,32 +40,39 @@ log = get_logger(__name__) # how much is probably dependent on lifestyle _secs_in_day = int(60 * 60 * 24) -# we try for 3 times but only on a run-every-other-day kinda week. -_default_size = 10 * _secs_in_day +# we try for a buncha times, but only on a run-every-other-day kinda week. +_days_worth = 16 +_default_size = _days_worth * _secs_in_day # where to start the new data append index -_rt_buffer_start = int(9*_secs_in_day) +_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) -# Tell the "resource tracker" thing to fuck off. -class ManTracker(mantracker.ResourceTracker): - def register(self, name, rtype): - pass +def cuckoff_mantracker(): - def unregister(self, name, rtype): - pass + from multiprocessing import resource_tracker as mantracker - def ensure_running(self): - pass + # Tell the "resource tracker" thing to fuck off. + class ManTracker(mantracker.ResourceTracker): + def register(self, name, rtype): + pass + + def unregister(self, name, rtype): + pass + + def ensure_running(self): + pass + + # "know your land and know your prey" + # https://www.dailymotion.com/video/x6ozzco + mantracker._resource_tracker = ManTracker() + mantracker.register = mantracker._resource_tracker.register + mantracker.ensure_running = mantracker._resource_tracker.ensure_running + # ensure_running = mantracker._resource_tracker.ensure_running + mantracker.unregister = mantracker._resource_tracker.unregister + mantracker.getfd = mantracker._resource_tracker.getfd -# "know your land and know your prey" -# https://www.dailymotion.com/video/x6ozzco -mantracker._resource_tracker = ManTracker() -mantracker.register = mantracker._resource_tracker.register -mantracker.ensure_running = mantracker._resource_tracker.ensure_running -ensure_running = mantracker._resource_tracker.ensure_running -mantracker.unregister = mantracker._resource_tracker.unregister -mantracker.getfd = mantracker._resource_tracker.getfd +cuckoff_mantracker() class SharedInt: @@ -191,7 +198,11 @@ class ShmArray: self._post_init: bool = False # pushing data does not write the index (aka primary key) - self._write_fields = list(shmarr.dtype.fields.keys())[1:] + dtype = shmarr.dtype + if dtype.fields: + self._write_fields = list(shmarr.dtype.fields.keys())[1:] + else: + self._write_fields = None # TODO: ringbuf api? @@ -237,6 +248,48 @@ class ShmArray: return a + def ustruct( + self, + fields: Optional[list[str]] = None, + + # type that all field values will be cast to + # in the returned view. + common_dtype: np.dtype = np.float, + + ) -> np.ndarray: + + array = self._array + + if fields: + selection = array[fields] + # fcount = len(fields) + else: + selection = array + # fcount = len(array.dtype.fields) + + # XXX: manual ``.view()`` attempt that also doesn't work. + # uview = selection.view( + # dtype=' int: @@ -267,10 +321,9 @@ class ShmArray: ''' length = len(data) - index = start if start is not None else self._last.value if prepend: - index = self._first.value - length + index = (start or self._first.value) - length if index < 0: raise ValueError( @@ -278,6 +331,9 @@ class ShmArray: f'You have passed {abs(index)} too many datums.' ) + else: + index = start if start is not None else self._last.value + end = index + length if field_map: @@ -295,12 +351,17 @@ class ShmArray: # tries to access ``.array`` (which due to the index # overlap will be empty). Pretty sure we've fixed it now # but leaving this here as a reminder. - if prepend: + if prepend and update_first and length: assert index < self._first.value - if index < self._first.value: + if ( + index < self._first.value + and update_first + ): + assert prepend, 'prepend=True not passed but index decreased?' self._first.value = index - else: + + elif not prepend: self._last.value = end self._post_init = True @@ -336,6 +397,7 @@ class ShmArray: f"Input array has unknown field(s): {only_in_theirs}" ) + # TODO: support "silent" prepends that don't update ._first.value? def prepend( self, data: np.ndarray, @@ -386,7 +448,11 @@ def open_shm_array( create=True, size=a.nbytes ) - array = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) + array = np.ndarray( + a.shape, + dtype=a.dtype, + buffer=shm.buf + ) array[:] = a[:] array.setflags(write=int(not readonly)) diff --git a/piker/data/_source.py b/piker/data/_source.py index 3fa6db7b..2f5f61ed 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -21,9 +21,9 @@ from __future__ import annotations from typing import Any import decimal +from bidict import bidict import numpy as np -import pandas as pd -from pydantic import BaseModel, validate_arguments +from pydantic import BaseModel # from numba import from_dtype @@ -48,16 +48,16 @@ base_ohlc_dtype = np.dtype(ohlc_fields) # https://github.com/numba/numba/issues/4511 # numba_ohlc_dtype = from_dtype(base_ohlc_dtype) -# map time frame "keys" to minutes values -tf_in_1m = { - '1m': 1, - '5m': 5, - '15m': 15, - '30m': 30, - '1h': 60, - '4h': 240, - '1d': 1440, -} +# map time frame "keys" to seconds values +tf_in_1s = bidict({ + 1: '1s', + 60: '1m', + 60*5: '5m', + 60*15: '15m', + 60*30: '30m', + 60*60: '1h', + 60*60*24: '1d', +}) def mk_fqsn( @@ -127,11 +127,11 @@ def unpack_fqsn(fqsn: str) -> tuple[str, str, str]: class Symbol(BaseModel): - """I guess this is some kinda container thing for dealing with + ''' + I guess this is some kinda container thing for dealing with all the different meta-data formats from brokers? - Yah, i guess dats what it izz. - """ + ''' key: str tick_size: float = 0.01 lot_tick_size: float = 0.0 # "volume" precision as min step value @@ -254,61 +254,6 @@ class Symbol(BaseModel): return keys -def from_df( - - df: pd.DataFrame, - source=None, - default_tf=None - -) -> np.recarray: - """Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``. - - """ - df.reset_index(inplace=True) - - # hackery to convert field names - date = 'Date' - if 'date' in df.columns: - date = 'date' - - # convert to POSIX time - df[date] = [d.timestamp() for d in df[date]] - - # try to rename from some camel case - columns = { - 'Date': 'time', - 'date': 'time', - 'Open': 'open', - 'High': 'high', - 'Low': 'low', - 'Close': 'close', - 'Volume': 'volume', - - # most feeds are providing this over sesssion anchored - 'vwap': 'bar_wap', - - # XXX: ib_insync calls this the "wap of the bar" - # but no clue what is actually is... - # https://github.com/pikers/piker/issues/119#issuecomment-729120988 - 'average': 'bar_wap', - } - - df = df.rename(columns=columns) - - for name in df.columns: - # if name not in base_ohlc_dtype.names[1:]: - if name not in base_ohlc_dtype.names: - del df[name] - - # TODO: it turns out column access on recarrays is actually slower: - # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist - # it might make sense to make these structured arrays? - array = df.to_records(index=False) - _nan_to_closest_num(array) - - return array - - def _nan_to_closest_num(array: np.ndarray): """Return interpolated values instead of NaN. diff --git a/piker/data/cli.py b/piker/data/cli.py index 7a774fb5..554048a4 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -16,26 +16,34 @@ """ marketstore cli. + """ -from typing import List from functools import partial from pprint import pformat +from anyio_marketstore import open_marketstore_client import trio import tractor import click +import numpy as np from .marketstore import ( get_client, - stream_quotes, + # stream_quotes, ingest_quote_stream, - _url, + # _url, _tick_tbk_ids, mk_tbk, ) from ..cli import cli from .. import watchlists as wl from ..log import get_logger +from ._sharedmem import ( + maybe_open_shm_array, +) +from ._source import ( + base_iohlc_dtype, +) log = get_logger(__name__) @@ -49,51 +57,58 @@ log = get_logger(__name__) ) @click.argument('names', nargs=-1) @click.pass_obj -def ms_stream(config: dict, names: List[str], url: str): - """Connect to a marketstore time bucket stream for (a set of) symbols(s) +def ms_stream( + config: dict, + names: list[str], + url: str, +) -> None: + ''' + Connect to a marketstore time bucket stream for (a set of) symbols(s) and print to console. - """ + + ''' async def main(): - async for quote in stream_quotes(symbols=names): - log.info(f"Received quote:\n{quote}") + # async for quote in stream_quotes(symbols=names): + # log.info(f"Received quote:\n{quote}") + ... trio.run(main) -@cli.command() -@click.option( - '--url', - default=_url, - help='HTTP URL of marketstore instance' -) -@click.argument('names', nargs=-1) -@click.pass_obj -def ms_destroy(config: dict, names: List[str], url: str) -> None: - """Destroy symbol entries in the local marketstore instance. - """ - async def main(): - nonlocal names - async with get_client(url) as client: - - if not names: - names = await client.list_symbols() - - # default is to wipe db entirely. - answer = input( - "This will entirely wipe you local marketstore db @ " - f"{url} of the following symbols:\n {pformat(names)}" - "\n\nDelete [N/y]?\n") - - if answer == 'y': - for sym in names: - # tbk = _tick_tbk.format(sym) - tbk = tuple(sym, *_tick_tbk_ids) - print(f"Destroying {tbk}..") - await client.destroy(mk_tbk(tbk)) - else: - print("Nothing deleted.") - - tractor.run(main) +# @cli.command() +# @click.option( +# '--url', +# default=_url, +# help='HTTP URL of marketstore instance' +# ) +# @click.argument('names', nargs=-1) +# @click.pass_obj +# def ms_destroy(config: dict, names: list[str], url: str) -> None: +# """Destroy symbol entries in the local marketstore instance. +# """ +# async def main(): +# nonlocal names +# async with get_client(url) as client: +# +# if not names: +# names = await client.list_symbols() +# +# # default is to wipe db entirely. +# answer = input( +# "This will entirely wipe you local marketstore db @ " +# f"{url} of the following symbols:\n {pformat(names)}" +# "\n\nDelete [N/y]?\n") +# +# if answer == 'y': +# for sym in names: +# # tbk = _tick_tbk.format(sym) +# tbk = tuple(sym, *_tick_tbk_ids) +# print(f"Destroying {tbk}..") +# await client.destroy(mk_tbk(tbk)) +# else: +# print("Nothing deleted.") +# +# tractor.run(main) @cli.command() @@ -102,41 +117,53 @@ def ms_destroy(config: dict, names: List[str], url: str) -> None: is_flag=True, help='Enable tractor logging') @click.option( - '--url', - default=_url, - help='HTTP URL of marketstore instance' + '--host', + default='localhost' ) -@click.argument('name', nargs=1, required=True) +@click.option( + '--port', + default=5993 +) +@click.argument('symbols', nargs=-1) @click.pass_obj -def ms_shell(config, name, tl, url): - """Start an IPython shell ready to query the local marketstore db. - """ - async def main(): - async with get_client(url) as client: - query = client.query # noqa - # TODO: write magics to query marketstore - from IPython import embed - embed() +def storesh( + config, + tl, + host, + port, + symbols: list[str], +): + ''' + Start an IPython shell ready to query the local marketstore db. - tractor.run(main) + ''' + from piker.data.marketstore import tsdb_history_update + from piker._daemon import open_piker_runtime + + async def main(): + nonlocal symbols + + async with open_piker_runtime( + 'storesh', + enable_modules=['piker.data._ahab'], + ): + symbol = symbols[0] + await tsdb_history_update(symbol) + + trio.run(main) @cli.command() @click.option('--test-file', '-t', help='Test quote stream file') @click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option( - '--url', - default=_url, - help='HTTP URL of marketstore instance' -) @click.argument('name', nargs=1, required=True) @click.pass_obj -def ingest(config, name, test_file, tl, url): - """Ingest real-time broker quotes and ticks to a marketstore instance. - """ +def ingest(config, name, test_file, tl): + ''' + Ingest real-time broker quotes and ticks to a marketstore instance. + + ''' # global opts - brokermod = config['brokermod'] loglevel = config['loglevel'] tractorloglevel = config['tractorloglevel'] # log = config['log'] @@ -145,15 +172,25 @@ def ingest(config, name, test_file, tl, url): watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) symbols = watchlists[name] - tractor.run( - partial( - ingest_quote_stream, - symbols, - brokermod.name, - tries=1, - loglevel=loglevel, - ), - name='ingest_marketstore', - loglevel=tractorloglevel, - debug_mode=True, - ) + grouped_syms = {} + for sym in symbols: + symbol, _, provider = sym.rpartition('.') + if provider not in grouped_syms: + grouped_syms[provider] = [] + + grouped_syms[provider].append(symbol) + + async def entry_point(): + async with tractor.open_nursery() as n: + for provider, symbols in grouped_syms.items(): + await n.run_in_actor( + ingest_quote_stream, + name='ingest_marketstore', + symbols=symbols, + brokername=provider, + tries=1, + actorloglevel=loglevel, + loglevel=tractorloglevel + ) + + tractor.run(entry_point) diff --git a/piker/data/feed.py b/piker/data/feed.py index 260cab9b..e77052bf 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -20,27 +20,36 @@ Data feed apis and infra. This module is enabled for ``brokerd`` daemons. """ +from __future__ import annotations from dataclasses import dataclass, field +from datetime import datetime from contextlib import asynccontextmanager from functools import partial +from pprint import pformat from types import ModuleType from typing import ( Any, AsyncIterator, Optional, + Generator, Awaitable, + TYPE_CHECKING, ) import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus +import trimeter import tractor from pydantic import BaseModel +import pendulum +import numpy as np from ..brokers import get_brokermod from .._cacheables import maybe_open_context from ..log import get_logger, get_console_log from .._daemon import ( maybe_spawn_brokerd, + check_for_service, ) from ._sharedmem import ( maybe_open_shm_array, @@ -56,12 +65,19 @@ from ._source import ( from ..ui import _search from ._sampling import ( sampler, + broadcast, increment_ohlc_buffer, iter_ohlc_periods, sample_and_broadcast, uniform_rate_send, ) +from ..brokers._util import ( + NoData, + DataUnavailable, +) +if TYPE_CHECKING: + from .marketstore import Storage log = get_logger(__name__) @@ -124,7 +140,7 @@ class _FeedsBus(BaseModel): # def cancel_task( # self, - # task: trio.lowlevel.Task + # task: trio.lowlevel.Task, # ) -> bool: # ... @@ -188,6 +204,442 @@ async def _setup_persistent_brokerd( await trio.sleep_forever() +def diff_history( + array, + start_dt, + end_dt, + last_tsdb_dt: Optional[datetime] = None + +) -> np.ndarray: + + to_push = array + + if last_tsdb_dt: + s_diff = (start_dt - last_tsdb_dt).seconds + + # if we detect a partial frame's worth of data + # that is new, slice out only that history and + # write to shm. + if ( + s_diff < 0 + ): + if abs(s_diff) < len(array): + # the + 1 is because ``last_tsdb_dt`` is pulled from + # the last row entry for the ``'time'`` field retreived + # from the tsdb. + to_push = array[abs(s_diff)+1:] + + else: + # pass back only the portion of the array that is + # greater then the last time stamp in the tsdb. + time = array['time'] + to_push = array[time >= last_tsdb_dt.timestamp()] + + log.info( + f'Pushing partial frame {to_push.size} to shm' + ) + + return to_push + + +async def start_backfill( + mod: ModuleType, + bfqsn: str, + shm: ShmArray, + + last_tsdb_dt: Optional[datetime] = None, + storage: Optional[Storage] = None, + write_tsdb: bool = True, + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> int: + + async with mod.open_history_client(bfqsn) as (hist, config): + + # get latest query's worth of history all the way + # back to what is recorded in the tsdb + array, start_dt, end_dt = await hist(end_dt=None) + + times = array['time'] + + # sample period step size in seconds + step_size_s = ( + pendulum.from_timestamp(times[-1]) - + pendulum.from_timestamp(times[-2]) + ).seconds + + # "frame"'s worth of sample period steps in seconds + frame_size_s = len(array) * step_size_s + + to_push = diff_history( + array, + start_dt, + end_dt, + last_tsdb_dt=last_tsdb_dt, + ) + + log.info(f'Pushing {to_push.size} to shm!') + shm.push(to_push) + + for delay_s in sampler.subscribers: + await broadcast(delay_s) + + # signal that backfilling to tsdb's end datum is complete + bf_done = trio.Event() + + # let caller unblock and deliver latest history frame + task_status.started((shm, start_dt, end_dt, bf_done)) + + if last_tsdb_dt is None: + # maybe a better default (they don't seem to define epoch?!) + + # based on the sample step size load a certain amount + # history + if step_size_s == 1: + last_tsdb_dt = pendulum.now().subtract(days=2) + + elif step_size_s == 60: + last_tsdb_dt = pendulum.now().subtract(years=2) + + else: + raise ValueError( + '`piker` only needs to support 1m and 1s sampling ' + 'but ur api is trying to deliver a longer ' + f'timeframe of {step_size_s} ' 'seconds.. so ye, dun ' + 'do dat bruh.' + ) + + # configure async query throttling + erlangs = config.get('erlangs', 1) + rate = config.get('rate', 1) + frames = {} + + def iter_dts(start: datetime): + + while True: + + hist_period = pendulum.period( + start, + last_tsdb_dt, + ) + dtrange = list(hist_period.range('seconds', frame_size_s)) + log.debug(f'New datetime index:\n{pformat(dtrange)}') + + for end_dt in dtrange: + log.warning(f'Yielding next frame start {end_dt}') + start = yield end_dt + + # if caller sends a new start date, reset to that + if start is not None: + log.warning(f'Resetting date range: {start}') + break + else: + # from while + return + + # pull new history frames until we hit latest + # already in the tsdb or a max count. + count = 0 + + # NOTE: when gaps are detected in the retreived history (by + # comparisor of the end - start versus the expected "frame size" + # in seconds) we need a way to alert the async request code not + # to continue to query for data "within the gap". This var is + # set in such cases such that further requests in that period + # are discarded and further we reset the "datetimem query frame + # index" in such cases to avoid needless noop requests. + earliest_end_dt: Optional[datetime] = start_dt + + async def get_ohlc_frame( + input_end_dt: datetime, + iter_dts_gen: Generator[datetime], + + ) -> np.ndarray: + + nonlocal count, frames, earliest_end_dt, frame_size_s + count += 1 + + if input_end_dt > earliest_end_dt: + # if a request comes in for an inter-gap frame we + # discard it since likely this request is still + # lingering from before the reset of ``iter_dts()`` via + # ``.send()`` below. + log.info(f'Discarding request history ending @ {input_end_dt}') + + # signals to ``trimeter`` loop to discard and + # ``continue`` in it's schedule loop. + return None + + try: + log.info( + f'Requesting {step_size_s}s frame ending in {input_end_dt}' + ) + array, start_dt, end_dt = await hist(end_dt=input_end_dt) + assert array['time'][0] == start_dt.timestamp() + + except NoData: + log.warning( + f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?' + ) + return None # discard signal + + except DataUnavailable as duerr: + # broker is being a bish and we can't pull + # any more.. + log.warning('backend halted on data deliver !?!?') + + # ugh, what's a better way? + # TODO: fwiw, we probably want a way to signal a throttle + # condition (eg. with ib) so that we can halt the + # request loop until the condition is resolved? + return duerr + + diff = end_dt - start_dt + frame_time_diff_s = diff.seconds + expected_frame_size_s = frame_size_s + step_size_s + + if frame_time_diff_s > expected_frame_size_s: + + # XXX: query result includes a start point prior to our + # expected "frame size" and thus is likely some kind of + # history gap (eg. market closed period, outage, etc.) + # so indicate to the request loop that this gap is + # expected by both, + # - resetting the ``iter_dts()`` generator to start at + # the new start point delivered in this result + # - setting the non-locally scoped ``earliest_end_dt`` + # to this new value so that the request loop doesn't + # get tripped up thinking there's an out of order + # request-result condition. + + log.warning( + f'History frame ending @ {end_dt} appears to have a gap:\n' + f'{diff} ~= {frame_time_diff_s} seconds' + ) + + # reset dtrange gen to new start point + try: + next_end = iter_dts_gen.send(start_dt) + log.info( + f'Reset frame index to start at {start_dt}\n' + f'Was at {next_end}' + ) + + # NOTE: manually set "earliest end datetime" index-value + # to avoid the request loop getting confused about + # new frames that are earlier in history - i.e. this + # **is not** the case of out-of-order frames from + # an async batch request. + earliest_end_dt = start_dt + + except StopIteration: + # gen already terminated meaning we probably already + # exhausted it via frame requests. + log.info( + "Datetime index already exhausted, can't reset.." + ) + + to_push = diff_history( + array, + start_dt, + end_dt, + last_tsdb_dt=last_tsdb_dt, + ) + ln = len(to_push) + if ln: + log.info(f'{ln} bars for {start_dt} -> {end_dt}') + frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) + return to_push, start_dt, end_dt + + else: + log.warning( + f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' + ) + return None + + # initial dt index starts at the start of the first query result + idts = iter_dts(start_dt) + + async with trimeter.amap( + partial( + get_ohlc_frame, + # we close in the ``iter_dt()`` gen in so we can send + # reset signals as needed for gap dection in the + # history. + iter_dts_gen=idts, + ), + idts, + + capture_outcome=True, + include_value=True, + + # better technical names bruv... + max_at_once=erlangs, + max_per_second=rate, + + ) as outcomes: + + # Then iterate over the return values, as they become available + # (i.e., not necessarily in the original order) + async for input_end_dt, outcome in outcomes: + + try: + out = outcome.unwrap() + + if out is None: + # skip signal + continue + + elif isinstance(out, DataUnavailable): + # no data available case signal.. so just kill + # further requests and basically just stop + # trying... + break + + except Exception: + log.exception('uhh trimeter bail') + raise + else: + to_push, start_dt, end_dt = out + + if not len(to_push): + # diff returned no new data (i.e. we probablyl hit + # the ``last_tsdb_dt`` point). + # TODO: raise instead? + log.warning(f'No history for range {start_dt} -> {end_dt}') + continue + + # pipeline-style pull frames until we need to wait for + # the next in order to arrive. + # i = end_dts.index(input_end_dt) + # print(f'latest end_dt {end_dt} found at index {i}') + + epochs = list(reversed(sorted(frames))) + for epoch in epochs: + + start = shm.array['time'][0] + last_shm_prepend_dt = pendulum.from_timestamp(start) + earliest_frame_queue_dt = pendulum.from_timestamp(epoch) + + diff = start - epoch + + if diff < 0: + log.warning( + 'Discarding out of order frame:\n' + f'{earliest_frame_queue_dt}' + ) + frames.pop(epoch) + continue + # await tractor.breakpoint() + + if diff > step_size_s: + + if earliest_end_dt < earliest_frame_queue_dt: + # XXX: an expected gap was encountered (see + # logic in ``get_ohlc_frame()``, so allow + # this frame through to the storage layer. + log.warning( + f'Expected history gap of {diff}s:\n' + f'{earliest_frame_queue_dt} <- ' + f'{earliest_end_dt}' + ) + + elif ( + erlangs > 1 + ): + # we don't yet have the next frame to push + # so break back to the async request loop + # while we wait for more async frame-results + # to arrive. + if len(frames) >= erlangs: + log.warning( + 'Frame count in async-queue is greater ' + 'then erlangs?\n' + 'There seems to be a gap between:\n' + f'{earliest_frame_queue_dt} <- ' + f'{last_shm_prepend_dt}\n' + 'Conducting manual call for frame ending: ' + f'{last_shm_prepend_dt}' + ) + ( + to_push, + start_dt, + end_dt, + ) = await get_ohlc_frame( + input_end_dt=last_shm_prepend_dt, + iter_dts_gen=idts, + ) + last_epoch = to_push['time'][-1] + diff = start - last_epoch + + if diff > step_size_s: + await tractor.breakpoint() + raise DataUnavailable( + 'An awkward frame was found:\n' + f'{start_dt} -> {end_dt}:\n{to_push}' + ) + + else: + frames[last_epoch] = ( + to_push, start_dt, end_dt) + break + + expect_end = pendulum.from_timestamp(start) + expect_start = expect_end.subtract( + seconds=frame_size_s) + log.warning( + 'waiting on out-of-order history frame:\n' + f'{expect_end - expect_start}' + ) + break + + to_push, start_dt, end_dt = frames.pop(epoch) + ln = len(to_push) + + # bail gracefully on shm allocation overrun/full condition + try: + shm.push(to_push, prepend=True) + except ValueError: + log.info( + f'Shm buffer overrun on: {start_dt} -> {end_dt}?' + ) + break + + log.info( + f'Shm pushed {ln} frame:\n' + f'{start_dt} -> {end_dt}' + ) + # keep track of most recent "prepended" ``start_dt`` + # both for detecting gaps and ensuring async + # frame-result order. + earliest_end_dt = start_dt + + if ( + storage is not None + and write_tsdb + ): + log.info( + f'Writing {ln} frame to storage:\n' + f'{start_dt} -> {end_dt}' + ) + await storage.write_ohlcv( + f'{bfqsn}.{mod.name}', # lul.. + to_push, + ) + + # TODO: can we only trigger this if the respective + # history in "in view"?!? + # XXX: extremely important, there can be no checkpoints + # in the block above to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + for delay_s in sampler.subscribers: + await broadcast(delay_s) + + bf_done.set() + + async def manage_history( mod: ModuleType, bus: _FeedsBus, @@ -216,50 +668,180 @@ async def manage_history( # we expect the sub-actor to write readonly=False, ) + # TODO: history validation + if not opened: + raise RuntimeError( + "Persistent shm for sym was already open?!" + ) - if opened: + log.info('Scanning for existing `marketstored`') + + is_up = await check_for_service('marketstored') + + # for now only do backfilling if no tsdb can be found + do_legacy_backfill = not is_up and opened + + bfqsn = fqsn.replace('.' + mod.name, '') + open_history_client = getattr(mod, 'open_history_client', None) + + if is_up and opened and open_history_client: + + log.info('Found existing `marketstored`') + from . import marketstore + async with marketstore.open_storage_client( + fqsn, + ) as storage: + + # TODO: this should be used verbatim for the pure + # shm backfiller approach below. + + # start history anal and load missing new data via backend. + series, _, last_tsdb_dt = await storage.load(fqsn) + + broker, symbol, expiry = unpack_fqsn(fqsn) + ( + shm, + latest_start_dt, + latest_end_dt, + bf_done, + ) = await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, + last_tsdb_dt=last_tsdb_dt, + storage=storage, + ) + ) + + # if len(shm.array) < 2: + # TODO: there's an edge case here to solve where if the last + # frame before market close (at least on ib) was pushed and + # there was only "1 new" row pushed from the first backfill + # query-iteration, then the sample step sizing calcs will + # break upstream from here since you can't diff on at least + # 2 steps... probably should also add logic to compute from + # the tsdb series and stash that somewhere as meta data on + # the shm buffer?.. no se. + + task_status.started(shm) + some_data_ready.set() + + await bf_done.wait() + # do diff against last start frame of history and only fill + # in from the tsdb an allotment that allows for most recent + # to be loaded into mem *before* tsdb data. + if last_tsdb_dt: + dt_diff_s = ( + latest_start_dt - last_tsdb_dt + ).seconds + else: + dt_diff_s = 0 + + # await trio.sleep_forever() + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value + + # sanity check on most-recent-data loading + assert prepend_start > dt_diff_s + + history = list(series.values()) + if history: + fastest = history[0] + to_push = fastest[:prepend_start] + + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=marketstore.ohlc_key_map, + ) + + # load as much from storage into shm as space will + # allow according to user's shm size settings. + count = 0 + end = fastest['Epoch'][0] + + while shm._first.value > 0: + count += 1 + series = await storage.read_ohlcv( + fqsn, + end=end, + ) + history = list(series.values()) + fastest = history[0] + end = fastest['Epoch'][0] + prepend_start -= len(to_push) + to_push = fastest[:prepend_start] + + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=marketstore.ohlc_key_map, + ) + + # manually trigger step update to update charts/fsps + # which need an incremental update. + for delay_s in sampler.subscribers: + await broadcast(delay_s) + + if count > 6: + break + + log.info(f'Loaded {to_push.shape} datums from storage') + + # TODO: write new data to tsdb to be ready to for next read. + + if do_legacy_backfill: + # do a legacy incremental backfill from the provider. log.info('No existing `marketstored` found..') # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars - _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) - - # yield back after client connect with filled shm - task_status.started(shm) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() - - # detect sample step size for sampled historical data - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - - # begin real-time updates of shm and tsb once the feed - # goes live. - await feed_is_live.wait() - - if opened: - sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) - - # start shm incrementing for OHLC sampling at the current - # detected sampling period if one dne. - if sampler.incrementers.get(delay_s) is None: - await bus.start_task( - increment_ohlc_buffer, - delay_s, + await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, ) + ) + # yield back after client connect with filled shm + task_status.started(shm) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # history retreival loop depending on user interaction and thus + # a small RPC-prot for remotely controllinlg what data is loaded + # for viewing. await trio.sleep_forever() async def allocate_persistent_feed( bus: _FeedsBus, + brokername: str, symbol: str, + loglevel: str, + start_stream: bool = True, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -277,6 +859,7 @@ async def allocate_persistent_feed( - a real-time streaming task which connec ''' + # load backend module try: mod = get_brokermod(brokername) except ImportError: @@ -319,7 +902,7 @@ async def allocate_persistent_feed( manage_history, mod, bus, - bfqsn, + '.'.join((bfqsn, brokername)), some_data_ready, feed_is_live, ) @@ -333,7 +916,10 @@ async def allocate_persistent_feed( # true fqsn fqsn = '.'.join((bfqsn, brokername)) # add a fqsn entry that includes the ``.`` suffix + # and an entry that includes the broker-specific fqsn (including + # any new suffixes or elements as injected by the backend). init_msg[fqsn] = msg + init_msg[bfqsn] = msg # TODO: pretty sure we don't need this? why not just leave 1s as # the fastest "sample period" since we'll probably always want that @@ -347,13 +933,14 @@ async def allocate_persistent_feed( await some_data_ready.wait() # append ``.`` suffix to each quote symbol - bsym = symbol + f'.{brokername}' + acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}' + generic_first_quotes = { - bsym: first_quote, + acceptable_not_fqsn_with_broker_suffix: first_quote, fqsn: first_quote, } - bus.feeds[symbol] = bus.feeds[fqsn] = ( + bus.feeds[symbol] = bus.feeds[bfqsn] = ( init_msg, generic_first_quotes, ) @@ -363,9 +950,25 @@ async def allocate_persistent_feed( # task_status.started((init_msg, generic_first_quotes)) task_status.started() - # backend will indicate when real-time quotes have begun. + if not start_stream: + await trio.sleep_forever() + + # begin real-time updates of shm and tsb once the feed goes live and + # the backend will indicate when real-time quotes have begun. await feed_is_live.wait() + # start shm incrementer task for OHLC style sampling + # at the current detected step period. + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) + if sampler.incrementers.get(delay_s) is None: + await bus.start_task( + increment_ohlc_buffer, + delay_s, + ) + sum_tick_vlm: bool = init_msg.get( 'shm_write_opts', {} ).get('sum_tick_vlm', True) @@ -388,7 +991,7 @@ async def open_feed_bus( ctx: tractor.Context, brokername: str, - symbol: str, + symbol: str, # normally expected to the broker-specific fqsn loglevel: str, tick_throttle: Optional[float] = None, start_stream: bool = True, @@ -410,7 +1013,9 @@ async def open_feed_bus( # TODO: check for any stale shm entries for this symbol # (after we also group them in a nice `/dev/shm/piker/` subdir). # ensure we are who we think we are - assert 'brokerd' in tractor.current_actor().name + servicename = tractor.current_actor().name + assert 'brokerd' in servicename + assert brokername in servicename bus = get_feed_bus(brokername) @@ -420,7 +1025,7 @@ async def open_feed_bus( entry = bus.feeds.get(symbol) if entry is None: # allocate a new actor-local stream bus which - # will persist for this `brokerd`. + # will persist for this `brokerd`'s service lifetime. async with bus.task_lock: await bus.nursery.start( partial( @@ -428,13 +1033,12 @@ async def open_feed_bus( bus=bus, brokername=brokername, - # here we pass through the selected symbol in native # "format" (i.e. upper vs. lowercase depending on # provider). symbol=symbol, - loglevel=loglevel, + start_stream=start_stream, ) ) # TODO: we can remove this? @@ -450,7 +1054,7 @@ async def open_feed_bus( # true fqsn fqsn = '.'.join([bfqsn, brokername]) assert fqsn in first_quotes - assert bus.feeds[fqsn] + assert bus.feeds[bfqsn] # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) bsym = symbol + f'.{brokername}' diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 27bcda70..39fe1b70 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -14,36 +14,189 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' ``marketstore`` integration. - client management routines - ticK data ingest routines - websocket client for subscribing to write triggers - todo: tick sequence stream-cloning for testing -- todo: docker container management automation -""" -from contextlib import asynccontextmanager -from typing import Dict, Any, List, Callable, Tuple + +''' +from __future__ import annotations +from contextlib import asynccontextmanager as acm +from datetime import datetime +from pprint import pformat +from typing import ( + Any, + Optional, + Union, + TYPE_CHECKING, +) import time from math import isnan +from bidict import bidict import msgpack +import pyqtgraph as pg import numpy as np import pandas as pd -import pymarketstore as pymkts import tractor from trio_websocket import open_websocket_url +from anyio_marketstore import ( + open_marketstore_client, + MarketstoreClient, + Params, +) +import pendulum +import purerpc +if TYPE_CHECKING: + import docker + from ._ahab import DockerContainer + +from .feed import maybe_open_feed from ..log import get_logger, get_console_log -from ..data import open_feed log = get_logger(__name__) -_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK') + +# container level config +_config = { + 'grpc_listen_port': 5995, + 'ws_listen_port': 5993, + 'log_level': 'debug', +} + +_yaml_config = ''' +# piker's ``marketstore`` config. + +# mount this config using: +# sudo docker run --mount \ +# type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ +# 5993:5993 alpacamarkets/marketstore:latest + +root_directory: data +listen_port: {ws_listen_port} +grpc_listen_port: {grpc_listen_port} +log_level: {log_level} +queryable: true +stop_grace_period: 0 +wal_rotate_interval: 5 +stale_threshold: 5 +enable_add: true +enable_remove: false + +triggers: + - module: ondiskagg.so + on: "*/1Sec/OHLCV" + config: + # filter: "nasdaq" + destinations: + - 1Min + - 5Min + - 15Min + - 1H + - 1D + + - module: stream.so + on: '*/*/*' + # config: + # filter: "nasdaq" + +'''.format(**_config) + + +def start_marketstore( + client: docker.DockerClient, + + **kwargs, + +) -> tuple[DockerContainer, dict[str, Any]]: + ''' + Start and supervise a marketstore instance with its config bind-mounted + in from the piker config directory on the system. + + The equivalent cli cmd to this code is: + + sudo docker run --mount \ + type=bind,source="$HOME/.config/piker/",target="/etc" -i -p \ + 5993:5993 alpacamarkets/marketstore:latest + + ''' + import os + import docker + from .. import config + + get_console_log('info', name=__name__) + + yml_file = os.path.join(config._config_dir, 'mkts.yml') + if not os.path.isfile(yml_file): + log.warning( + f'No `marketstore` config exists?: {yml_file}\n' + 'Generating new file from template:\n' + f'{_yaml_config}\n' + ) + with open(yml_file, 'w') as yf: + yf.write(_yaml_config) + + # create a mount from user's local piker config dir into container + config_dir_mnt = docker.types.Mount( + target='/etc', + source=config._config_dir, + type='bind', + ) + + # create a user config subdir where the marketstore + # backing filesystem database can be persisted. + persistent_data_dir = os.path.join( + config._config_dir, 'data', + ) + if not os.path.isdir(persistent_data_dir): + os.mkdir(persistent_data_dir) + + data_dir_mnt = docker.types.Mount( + target='/data', + source=persistent_data_dir, + type='bind', + ) + + dcntr: DockerContainer = client.containers.run( + 'alpacamarkets/marketstore:latest', + # do we need this for cmds? + # '-i', + + # '-p 5993:5993', + ports={ + '5993/tcp': 5993, # jsonrpc / ws? + '5995/tcp': 5995, # grpc + }, + mounts=[ + config_dir_mnt, + data_dir_mnt, + ], + detach=True, + # stop_signal='SIGINT', + init=True, + # remove=True, + ) + return dcntr, _config + + +_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK') _tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) -_url: str = 'http://localhost:5993/rpc' + +_tick_dt = [ + # these two are required for as a "primary key" + ('Epoch', 'i8'), + ('Nanoseconds', 'i4'), + ('IsTrade', 'i1'), + ('IsBid', 'i1'), + ('Price', 'f4'), + ('Size', 'f4') +] + _quote_dt = [ # these two are required for as a "primary key" ('Epoch', 'i8'), @@ -61,6 +214,7 @@ _quote_dt = [ # ('brokerd_ts', 'i64'), # ('VWAP', 'f4') ] + _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) _tick_map = { 'Up': 1, @@ -69,28 +223,49 @@ _tick_map = { None: np.nan, } +_ohlcv_dt = [ + # these two are required for as a "primary key" + ('Epoch', 'i8'), + # ('Nanoseconds', 'i4'), -class MarketStoreError(Exception): - "Generic marketstore client error" + # ohlcv sampling + ('Open', 'f4'), + ('High', 'f4'), + ('Low', 'i8'), + ('Close', 'i8'), + ('Volume', 'f4'), +] -def err_on_resp(response: dict) -> None: - """Raise any errors found in responses from client request. - """ - responses = response['responses'] - if responses is not None: - for r in responses: - err = r['error'] - if err: - raise MarketStoreError(err) +ohlc_key_map = bidict({ + 'Epoch': 'time', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', +}) + + +def mk_tbk(keys: tuple[str, str, str]) -> str: + ''' + Generate a marketstore table key from a tuple. + Converts, + ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"``` + + ''' + return '/'.join(keys) def quote_to_marketstore_structarray( - quote: Dict[str, Any], - last_fill: str, + quote: dict[str, Any], + last_fill: Optional[float] + ) -> np.array: - """Return marketstore writeable structarray from quote ``dict``. - """ + ''' + Return marketstore writeable structarray from quote ``dict``. + + ''' if last_fill: # new fill bby now = timestamp(last_fill) @@ -101,7 +276,7 @@ def quote_to_marketstore_structarray( secs, ns = now / 10**9, now % 10**9 - # pack into List[Tuple[str, Any]] + # pack into list[tuple[str, Any]] array_input = [] # insert 'Epoch' entry first and then 'Nanoseconds'. @@ -123,146 +298,458 @@ def quote_to_marketstore_structarray( return np.array([tuple(array_input)], dtype=_quote_dt) -def timestamp(datestr: str) -> int: - """Return marketstore compatible 'Epoch' integer in nanoseconds +def timestamp(date, **kwargs) -> int: + ''' + Return marketstore compatible 'Epoch' integer in nanoseconds from a date formatted str. - """ - return int(pd.Timestamp(datestr).value) + + ''' + return int(pd.Timestamp(date, **kwargs).value) -def mk_tbk(keys: Tuple[str, str, str]) -> str: - """Generate a marketstore table key from a tuple. - - Converts, - ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"``` - """ - return '{}/' + '/'.join(keys) - - -class Client: - """Async wrapper around the alpaca ``pymarketstore`` sync client. - - This will server as the shell for building out a proper async client - that isn't horribly documented and un-tested.. - """ - def __init__(self, url: str): - self._client = pymkts.Client(url) - - async def _invoke( - self, - meth: Callable, - *args, - **kwargs, - ) -> Any: - return err_on_resp(meth(*args, **kwargs)) - - async def destroy( - self, - tbk: Tuple[str, str, str], - ) -> None: - return await self._invoke(self._client.destroy, mk_tbk(tbk)) - - async def list_symbols( - self, - tbk: str, - ) -> List[str]: - return await self._invoke(self._client.list_symbols, mk_tbk(tbk)) - - async def write( - self, - symbol: str, - array: np.ndarray, - ) -> None: - start = time.time() - await self._invoke( - self._client.write, - array, - _tick_tbk.format(symbol), - isvariablelength=True - ) - log.debug(f"{symbol} write time (s): {time.time() - start}") - - def query( - self, - symbol, - tbk: Tuple[str, str] = _tick_tbk_ids, - ) -> pd.DataFrame: - # XXX: causes crash - # client.query(pymkts.Params(symbol, '*', 'OHCLV' - result = self._client.query( - pymkts.Params(symbol, *tbk), - ) - return result.first().df() - - -@asynccontextmanager +@acm async def get_client( - url: str = _url, -) -> Client: - yield Client(url) + host: str = 'localhost', + port: int = 5995 + +) -> MarketstoreClient: + ''' + Load a ``anyio_marketstore`` grpc client connected + to an existing ``marketstore`` server. + + ''' + async with open_marketstore_client( + host, + port + ) as client: + yield client + + +class MarketStoreError(Exception): + "Generic marketstore client error" + + +# def err_on_resp(response: dict) -> None: +# """Raise any errors found in responses from client request. +# """ +# responses = response['responses'] +# if responses is not None: +# for r in responses: +# err = r['error'] +# if err: +# raise MarketStoreError(err) + + +# map of seconds ints to "time frame" accepted keys +tf_in_1s = bidict({ + 1: '1Sec', + 60: '1Min', + 60*5: '5Min', + 60*15: '15Min', + 60*30: '30Min', + 60*60: '1H', + 60*60*24: '1D', +}) + + +class Storage: + ''' + High level storage api for both real-time and historical ingest. + + ''' + def __init__( + self, + client: MarketstoreClient, + + ) -> None: + # TODO: eventually this should be an api/interface type that + # ensures we can support multiple tsdb backends. + self.client = client + + # series' cache from tsdb reads + self._arrays: dict[str, np.ndarray] = {} + + async def list_keys(self) -> list[str]: + return await self.client.list_symbols() + + async def search_keys(self, pattern: str) -> list[str]: + ''' + Search for time series key in the storage backend. + + ''' + ... + + async def write_ticks(self, ticks: list) -> None: + ... + + async def load( + self, + fqsn: str, + + ) -> tuple[ + dict[int, np.ndarray], # timeframe (in secs) to series + Optional[datetime], # first dt + Optional[datetime], # last dt + ]: + + first_tsdb_dt, last_tsdb_dt = None, None + tsdb_arrays = await self.read_ohlcv(fqsn) + log.info(f'Loaded tsdb history {tsdb_arrays}') + + if tsdb_arrays: + fastest = list(tsdb_arrays.values())[0] + times = fastest['Epoch'] + first, last = times[0], times[-1] + first_tsdb_dt, last_tsdb_dt = map( + pendulum.from_timestamp, [first, last] + ) + + return tsdb_arrays, first_tsdb_dt, last_tsdb_dt + + async def read_ohlcv( + self, + fqsn: str, + timeframe: Optional[Union[int, str]] = None, + end: Optional[int] = None, + + ) -> tuple[ + MarketstoreClient, + Union[dict, np.ndarray] + ]: + client = self.client + syms = await client.list_symbols() + + if fqsn not in syms: + return {} + + tfstr = tf_in_1s[1] + + params = Params( + symbols=fqsn, + timeframe=tfstr, + attrgroup='OHLCV', + end=end, + # limit_from_start=True, + + # TODO: figure the max limit here given the + # ``purepc`` msg size limit of purerpc: 33554432 + limit=int(800e3), + ) + + if timeframe is None: + log.info(f'starting {fqsn} tsdb granularity scan..') + # loop through and try to find highest granularity + for tfstr in tf_in_1s.values(): + try: + log.info(f'querying for {tfstr}@{fqsn}') + params.set('timeframe', tfstr) + result = await client.query(params) + break + + except purerpc.grpclib.exceptions.UnknownError: + # XXX: this is already logged by the container and + # thus shows up through `marketstored` logs relay. + # log.warning(f'{tfstr}@{fqsn} not found') + continue + else: + return {} + + else: + result = await client.query(params) + + # TODO: it turns out column access on recarrays is actually slower: + # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist + # it might make sense to make these structured arrays? + # Fill out a `numpy` array-results map + arrays = {} + for fqsn, data_set in result.by_symbols().items(): + arrays.setdefault(fqsn, {})[ + tf_in_1s.inverse[data_set.timeframe] + ] = data_set.array + + return arrays[fqsn][timeframe] if timeframe else arrays[fqsn] + + async def delete_ts( + self, + key: str, + timeframe: Optional[Union[int, str]] = None, + + ) -> bool: + + client = self.client + syms = await client.list_symbols() + print(syms) + # if key not in syms: + # raise KeyError(f'`{fqsn}` table key not found?') + + return await client.destroy(tbk=key) + + async def write_ohlcv( + self, + fqsn: str, + ohlcv: np.ndarray, + append_and_duplicate: bool = True, + limit: int = int(800e3), + + ) -> None: + # build mkts schema compat array for writing + mkts_dt = np.dtype(_ohlcv_dt) + mkts_array = np.zeros( + len(ohlcv), + dtype=mkts_dt, + ) + # copy from shm array (yes it's this easy): + # https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays + mkts_array[:] = ohlcv[[ + 'time', + 'open', + 'high', + 'low', + 'close', + 'volume', + ]] + + m, r = divmod(len(mkts_array), limit) + + for i in range(m, 1): + to_push = mkts_array[i-1:i*limit] + + # write to db + resp = await self.client.write( + to_push, + tbk=f'{fqsn}/1Sec/OHLCV', + + # NOTE: will will append duplicates + # for the same timestamp-index. + # TODO: pre deduplicate? + isvariablelength=append_and_duplicate, + ) + + log.info( + f'Wrote {mkts_array.size} datums to tsdb\n' + ) + + for resp in resp.responses: + err = resp.error + if err: + raise MarketStoreError(err) + + if r: + to_push = mkts_array[m*limit:] + + # write to db + resp = await self.client.write( + to_push, + tbk=f'{fqsn}/1Sec/OHLCV', + + # NOTE: will will append duplicates + # for the same timestamp-index. + # TODO: pre deduplicate? + isvariablelength=append_and_duplicate, + ) + + log.info( + f'Wrote {mkts_array.size} datums to tsdb\n' + ) + + for resp in resp.responses: + err = resp.error + if err: + raise MarketStoreError(err) + + +@acm +async def open_storage_client( + fqsn: str, + period: Optional[Union[int, str]] = None, # in seconds + +) -> tuple[Storage, dict[str, np.ndarray]]: + ''' + Load a series by key and deliver in ``numpy`` struct array format. + + ''' + async with ( + # eventually a storage backend endpoint + get_client() as client, + ): + # slap on our wrapper api + yield Storage(client) + + +async def tsdb_history_update( + fqsn: Optional[str] = None, + +) -> list[str]: + + # TODO: real-time dedicated task for ensuring + # history consistency between the tsdb, shm and real-time feed.. + + # update sequence design notes: + + # - load existing highest frequency data from mkts + # * how do we want to offer this to the UI? + # - lazy loading? + # - try to load it all and expect graphics caching/diffing + # to hide extra bits that aren't in view? + + # - compute the diff between latest data from broker and shm + # * use sql api in mkts to determine where the backend should + # start querying for data? + # * append any diff with new shm length + # * determine missing (gapped) history by scanning + # * how far back do we look? + + # - begin rt update ingest and aggregation + # * could start by always writing ticks to mkts instead of + # worrying about a shm queue for now. + # * we have a short list of shm queues worth groking: + # - https://github.com/pikers/piker/issues/107 + # * the original data feed arch blurb: + # - https://github.com/pikers/piker/issues/98 + # + profiler = pg.debug.Profiler( + disabled=False, # not pg_profile_enabled(), + delayed=False, + ) + + async with ( + open_storage_client(fqsn) as storage, + + maybe_open_feed( + [fqsn], + start_stream=False, + + ) as (feed, stream), + ): + profiler(f'opened feed for {fqsn}') + + to_append = feed.shm.array + to_prepend = None + + if fqsn: + symbol = feed.symbols.get(fqsn) + if symbol: + fqsn = symbol.front_fqsn() + + # diff db history with shm and only write the missing portions + ohlcv = feed.shm.array + + # TODO: use pg profiler + tsdb_arrays = await storage.read_ohlcv(fqsn) + # hist diffing + if tsdb_arrays: + onesec = tsdb_arrays[1] + + # these aren't currently used but can be referenced from + # within the embedded ipython shell below. + to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]] + to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]] + + profiler('Finished db arrays diffs') + + syms = await storage.client.list_symbols() + log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') + profiler(f'listed symbols {syms}') + + # TODO: ask if user wants to write history for detected + # available shm buffers? + from tractor.trionics import ipython_embed + await ipython_embed() + + # for array in [to_append, to_prepend]: + # if array is None: + # continue + + # log.info( + # f'Writing datums {array.size} -> to tsdb from shm\n' + # ) + # await storage.write_ohlcv(fqsn, array) + + # profiler('Finished db writes') async def ingest_quote_stream( - symbols: List[str], + symbols: list[str], brokername: str, tries: int = 1, loglevel: str = None, + ) -> None: - """Ingest a broker quote stream into marketstore in (sampled) tick format. - """ - async with open_feed( - brokername, - symbols, - loglevel=loglevel, - ) as (first_quotes, qstream): + ''' + Ingest a broker quote stream into a ``marketstore`` tsdb. - quote_cache = first_quotes.copy() + ''' + async with ( + maybe_open_feed(brokername, symbols, loglevel=loglevel) as feed, + get_client() as ms_client, + ): + async for quotes in feed.stream: + log.info(quotes) + for symbol, quote in quotes.items(): + for tick in quote.get('ticks', ()): + ticktype = tick.get('type', 'n/a') - async with get_client() as ms_client: + # techtonic tick write + array = quote_to_marketstore_structarray({ + 'IsTrade': 1 if ticktype == 'trade' else 0, + 'IsBid': 1 if ticktype in ('bid', 'bsize') else 0, + 'Price': tick.get('price'), + 'Size': tick.get('size') + }, last_fill=quote.get('broker_ts', None)) - # start ingest to marketstore - async for quotes in qstream: - log.info(quotes) - for symbol, quote in quotes.items(): + await ms_client.write(array, _tick_tbk) - # remap tick strs to ints - quote['tick'] = _tick_map[quote.get('tick', 'Equal')] + # LEGACY WRITE LOOP (using old tick dt) + # quote_cache = { + # 'size': 0, + # 'tick': 0 + # } - # check for volume update (i.e. did trades happen - # since last quote) - new_vol = quote.get('volume', None) - if new_vol is None: - log.debug(f"No fills for {symbol}") - if new_vol == quote_cache.get('volume'): - # should never happen due to field diffing - # on sender side - log.error( - f"{symbol}: got same volume as last quote?") + # async for quotes in qstream: + # log.info(quotes) + # for symbol, quote in quotes.items(): - quote_cache.update(quote) + # # remap tick strs to ints + # quote['tick'] = _tick_map[quote.get('tick', 'Equal')] - a = quote_to_marketstore_structarray( - quote, - # TODO: check this closer to the broker query api - last_fill=quote.get('fill_time', '') - ) - await ms_client.write(symbol, a) + # # check for volume update (i.e. did trades happen + # # since last quote) + # new_vol = quote.get('volume', None) + # if new_vol is None: + # log.debug(f"No fills for {symbol}") + # if new_vol == quote_cache.get('volume'): + # # should never happen due to field diffing + # # on sender side + # log.error( + # f"{symbol}: got same volume as last quote?") + + # quote_cache.update(quote) + + # a = quote_to_marketstore_structarray( + # quote, + # # TODO: check this closer to the broker query api + # last_fill=quote.get('fill_time', '') + # ) + # await ms_client.write(symbol, a) async def stream_quotes( - symbols: List[str], + symbols: list[str], host: str = 'localhost', port: int = 5993, diff_cached: bool = True, loglevel: str = None, + ) -> None: - """Open a symbol stream from a running instance of marketstore and + ''' + Open a symbol stream from a running instance of marketstore and log to console. - """ + + ''' # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} + tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: # send subs topics to server @@ -271,7 +758,7 @@ async def stream_quotes( ) log.info(resp) - async def recv() -> Dict[str, Any]: + async def recv() -> dict[str, Any]: return msgpack.loads((await ws.get_message()), encoding='utf-8') streams = (await recv())['streams'] diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 01e41c04..488ae22c 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -167,6 +167,7 @@ def _wma( assert length == len(weights) + # lol, for long sequences this is nutso slow and expensive.. return np.convolve(signal, weights, 'valid') diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 47211234..b5456fac 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -309,7 +309,7 @@ async def flow_rates( if period > 1: trade_rate_wma = _wma( - dvlm_shm.array['trade_count'], + dvlm_shm.array['trade_count'][-period:], period, weights=weights, ) @@ -332,7 +332,7 @@ async def flow_rates( if period > 1: dark_trade_rate_wma = _wma( - dvlm_shm.array['dark_trade_count'], + dvlm_shm.array['dark_trade_count'][-period:], period, weights=weights, ) diff --git a/piker/ui/_cursor.py b/piker/ui/_cursor.py index ad3ddc98..a34c15c1 100644 --- a/piker/ui/_cursor.py +++ b/piker/ui/_cursor.py @@ -191,6 +191,9 @@ class ContentsLabel(pg.LabelItem): self.setText( "i:{index}
" + # NB: these fields must be indexed in the correct order via + # the slice syntax below. + "epoch:{}
" "O:{}
" "H:{}
" "L:{}
" @@ -198,7 +201,15 @@ class ContentsLabel(pg.LabelItem): "V:{}
" "wap:{}".format( *array[index - first][ - ['open', 'high', 'low', 'close', 'volume', 'bar_wap'] + [ + 'time', + 'open', + 'high', + 'low', + 'close', + 'volume', + 'bar_wap', + ] ], name=name, index=index, @@ -295,7 +306,8 @@ class ContentsLabels: class Cursor(pg.GraphicsObject): - '''Multi-plot cursor for use on a ``LinkedSplits`` chart (set). + ''' + Multi-plot cursor for use on a ``LinkedSplits`` chart (set). ''' def __init__( @@ -310,7 +322,7 @@ class Cursor(pg.GraphicsObject): self.linked = linkedsplits self.graphics: dict[str, pg.GraphicsObject] = {} - self.plots: List['PlotChartWidget'] = [] # type: ignore # noqa + self.plots: list['PlotChartWidget'] = [] # type: ignore # noqa self.active_plot = None self.digits: int = digits self._datum_xy: tuple[int, float] = (0, 0) @@ -439,7 +451,10 @@ class Cursor(pg.GraphicsObject): if plot.linked.xaxis_chart is plot: xlabel = self.xaxis_label = XAxisLabel( parent=self.plots[plot_index].getAxis('bottom'), - # parent=self.plots[plot_index].pi_overlay.get_axis(plot.plotItem, 'bottom'), + # parent=self.plots[plot_index].pi_overlay.get_axis( + # plot.plotItem, 'bottom' + # ), + opacity=_ch_label_opac, bg_color=self.label_color, ) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 82f12196..927ce5df 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -29,6 +29,7 @@ from typing import Optional, Any, Callable import numpy as np import tractor import trio +import pendulum import pyqtgraph as pg from .. import brokers @@ -47,6 +48,7 @@ from ._fsp import ( open_vlm_displays, ) from ..data._sharedmem import ShmArray +from ..data._source import tf_in_1s from ._forms import ( FieldsForm, mk_order_pane_layout, @@ -398,9 +400,11 @@ def graphics_update_cycle( ) if ( - (xpx < update_uppx or i_diff > 0) + ( + xpx < update_uppx or i_diff > 0 + and liv + ) or trigger_all - and liv ): # TODO: make it so this doesn't have to be called # once the $vlm is up? @@ -494,6 +498,7 @@ def graphics_update_cycle( if ( xpx < update_uppx or i_diff > 0 + or trigger_all ): chart.update_graphics_from_array( chart.name, @@ -599,7 +604,7 @@ def graphics_update_cycle( yrange=(mn, mx), ) - vars['last_mx'], vars['last_mn'] = mx, mn + vars['last_mx'], vars['last_mn'] = mx, mn # run synchronous update on all linked flows for curve_name, flow in chart._flows.items(): @@ -661,11 +666,17 @@ async def display_symbol_data( symbol = feed.symbols[sym] fqsn = symbol.front_fqsn() + times = bars['time'] + end = pendulum.from_timestamp(times[-1]) + start = pendulum.from_timestamp(times[times != times[-1]][-1]) + step_size_s = (end - start).seconds + tf_key = tf_in_1s[step_size_s] + # load in symbol's ohlc data godwidget.window.setWindowTitle( f'{fqsn} ' f'tick:{symbol.tick_size} ' - f'step:1s ' + f'step:{tf_key} ' ) linked = godwidget.linkedsplits diff --git a/requirements.txt b/requirements.txt index e64267b9..077fe24f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,7 @@ # pin this to a dev branch that we have more control over especially # as more graphics stuff gets hashed out. -e git+https://github.com/pikers/pyqtgraph.git@piker_pin#egg=pyqtgraph + + +# our async client for ``marketstore`` (the tsdb) +-e git+https://github.com/pikers/anyio-marketstore.git@master#egg=anyio-marketstore diff --git a/snippets/ib_data_reset.py b/scripts/ib_data_reset.py similarity index 60% rename from snippets/ib_data_reset.py rename to scripts/ib_data_reset.py index a65321dc..7e328190 100644 --- a/snippets/ib_data_reset.py +++ b/scripts/ib_data_reset.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -30,11 +30,13 @@ orig_win_id = t.find_focused().window # for tws win_names: list[str] = [ 'Interactive Brokers', # tws running in i3 - 'IB Gateway.', # gw running in i3 + 'IB Gateway', # gw running in i3 + # 'IB', # gw running in i3 (newer version?) ] for name in win_names: - results = t.find_named(name) + results = t.find_titled(name) + print(f'results for {name}: {results}') if results: con = results[0] print(f'Resetting data feed for {name}') @@ -47,22 +49,32 @@ for name in win_names: # https://github.com/rr-/pyxdotool # https://github.com/ShaneHutter/pyxdotool # https://github.com/cphyc/pyxdotool - subprocess.call([ - 'xdotool', - 'windowactivate', '--sync', win_id, - # move mouse to bottom left of window (where there should - # be nothing to click). - 'mousemove_relative', '--sync', str(w-4), str(h-4), + # TODO: only run the reconnect (2nd) kc on a detected + # disconnect? + for key_combo, timeout in [ + # only required if we need a connection reset. + # ('ctrl+alt+r', 12), + # data feed reset. + ('ctrl+alt+f', 6) + ]: + subprocess.call([ + 'xdotool', + 'windowactivate', '--sync', win_id, - # NOTE: we may need to stick a `--retry 3` in here.. - 'click', '--window', win_id, '--repeat', '3', '1', + # move mouse to bottom left of window (where there should + # be nothing to click). + 'mousemove_relative', '--sync', str(w-4), str(h-4), - # hackzorzes - 'key', 'ctrl+alt+f', - ], - timeout=1, - ) + # NOTE: we may need to stick a `--retry 3` in here.. + 'click', '--window', win_id, + '--repeat', '3', '1', + + # hackzorzes + 'key', key_combo, + ], + timeout=timeout, + ) # re-activate and focus original window subprocess.call([ diff --git a/setup.py b/setup.py index fc3ce78a..71b27d48 100755 --- a/setup.py +++ b/setup.py @@ -77,6 +77,14 @@ setup( # tsdbs 'pymarketstore', ], + extras_require={ + + # serialization + 'tsdb': [ + 'docker', + ], + + }, tests_require=['pytest'], python_requires=">=3.9", # literally for ``datetime.datetime.fromisoformat``... keywords=["async", "trading", "finance", "quant", "charting"],