diff --git a/piker/_daemon.py b/piker/_daemon.py index 44986a22..1dfebae3 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -195,9 +195,8 @@ async def open_piker_runtime( ) -> Optional[tractor._portal.Portal]: ''' - Start a piker actor who's runtime will automatically - sync with existing piker actors in local network - based on configuration. + Start a piker actor who's runtime will automatically sync with + existing piker actors on the local link based on configuration. ''' global _services diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 68c7238e..aa189e81 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -36,7 +36,11 @@ import tractor import wsproto from .._cacheables import open_cached_client -from ._util import resproc, SymbolNotFound +from ._util import ( + resproc, + SymbolNotFound, + DataUnavailable, +) from ..log import get_logger, get_console_log from ..data import ShmArray from ..data.types import Struct @@ -388,6 +392,7 @@ async def open_history_client( async with open_cached_client('binance') as client: async def get_ohlc( + timeframe: float, end_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None, @@ -396,6 +401,8 @@ async def open_history_client( datetime, # start datetime, # end ]: + if timeframe != 60: + raise DataUnavailable('Only 1m bars are supported') array = await client.bars( symbol, diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 2e699a0b..9448d1f6 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -43,6 +43,7 @@ from bidict import bidict import trio import tractor from tractor import to_asyncio +import pendulum import ib_insync as ibis from ib_insync.contract import ( Contract, @@ -52,6 +53,7 @@ from ib_insync.contract import ( from ib_insync.order import Order from ib_insync.ticker import Ticker from ib_insync.objects import ( + BarDataList, Position, Fill, Execution, @@ -78,26 +80,11 @@ _time_units = { 'h': ' hours', } -_time_frames = { - '1s': '1 Sec', - '5s': '5 Sec', - '30s': '30 Sec', - '1m': 'OneMinute', - '2m': 'TwoMinutes', - '3m': 'ThreeMinutes', - '4m': 'FourMinutes', - '5m': 'FiveMinutes', - '10m': 'TenMinutes', - '15m': 'FifteenMinutes', - '20m': 'TwentyMinutes', - '30m': 'HalfHour', - '1h': 'OneHour', - '2h': 'TwoHours', - '4h': 'FourHours', - 'D': 'OneDay', - 'W': 'OneWeek', - 'M': 'OneMonth', - 'Y': 'OneYear', +_bar_sizes = { + 1: '1 Sec', + 60: '1 min', + 60*60: '1 hour', + 24*60*60: '1 day', } _show_wap_in_history: bool = False @@ -199,7 +186,8 @@ _adhoc_futes_set = { 'lb.nymex', # random len lumber # metals - 'xauusd.cmdty', # gold spot + # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 + 'xauusd.cmdty', # london gold spot ^ 'gc.nymex', 'mgc.nymex', # micro @@ -257,14 +245,12 @@ _exch_skip_list = { 'PSE', } -# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 - _enters = 0 def bars_to_np(bars: list) -> np.ndarray: ''' - Convert a "bars list thing" (``BarsList`` type from ibis) + Convert a "bars list thing" (``BarDataList`` type from ibis) into a numpy struct array. ''' @@ -284,6 +270,27 @@ def bars_to_np(bars: list) -> np.ndarray: return nparr +# NOTE: pacing violations exist for higher sample rates: +# https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations +# Also see note on duration limits being lifted on 1m+ periods, +# but they say "use with discretion": +# https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd +_samplings: dict[int, tuple[str, str]] = { + 1: ( + '1 secs', + f'{int(2e3)} S', + pendulum.duration(seconds=2e3), + ), + # TODO: benchmark >1 D duration on query to see if + # throughput can be made faster during backfilling. + 60: ( + '1 min', + '1 D', + pendulum.duration(days=1), + ), +} + + class Client: ''' IB wrapped for our broker backend API. @@ -338,19 +345,32 @@ class Client: start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", end_dt: Union[datetime, str] = "", - sample_period_s: str = 1, # ohlc sample period - period_count: int = int(2e3), # <- max per 1s sample query + # ohlc sample period in seconds + sample_period_s: int = 1, - ) -> list[dict[str, Any]]: + # optional "duration of time" equal to the + # length of the returned history frame. + duration: Optional[str] = None, + + **kwargs, + + ) -> tuple[BarDataList, np.ndarray, pendulum.Duration]: ''' Retreive OHLCV bars for a fqsn over a range to the present. ''' + # See API docs here: + # https://interactivebrokers.github.io/tws-api/historical_data.html bars_kwargs = {'whatToShow': 'TRADES'} + bars_kwargs.update(kwargs) + bar_size, duration, dt_duration = _samplings[sample_period_s] global _enters # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') - print(f'REQUESTING BARS {_enters} @ end={end_dt}') + print( + f"REQUESTING {duration}'s worth {bar_size} BARS\n" + f'{_enters} @ end={end_dt}"' + ) if not end_dt: end_dt = '' @@ -360,30 +380,20 @@ class Client: contract = (await self.find_contracts(fqsn))[0] bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) - # _min = min(2000*100, count) bars = await self.ib.reqHistoricalDataAsync( contract, endDateTime=end_dt, formatDate=2, - # time history length values format: - # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` - # OHLC sampling values: # 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins, # 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins, # 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M - # barSizeSetting='1 secs', + barSizeSetting=bar_size, - # durationStr='{count} S'.format(count=15000 * 5), - # durationStr='{count} D'.format(count=1), - # barSizeSetting='5 secs', - - durationStr='{count} S'.format(count=period_count), - # barSizeSetting='5 secs', - barSizeSetting='1 secs', - - # barSizeSetting='1 min', + # time history length values format: + # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` + durationStr=duration, # always use extended hours useRTH=False, @@ -394,11 +404,21 @@ class Client: # whatToShow='TRADES', ) if not bars: - # TODO: raise underlying error here - raise ValueError(f"No bars retreived for {fqsn}?") + # NOTE: there's 2 cases here to handle (and this should be + # read alongside the implementation of + # ``.reqHistoricalDataAsync()``): + # - no data is returned for the period likely due to + # a weekend, holiday or other non-trading period prior to + # ``end_dt`` which exceeds the ``duration``, + # - a timeout occurred in which case insync internals return + # an empty list thing with bars.clear()... + return [], np.empty(0), dt_duration + # TODO: we could maybe raise ``NoData`` instead if we + # rewrite the method in the first case? right now there's no + # way to detect a timeout. nparr = bars_to_np(bars) - return bars, nparr + return bars, nparr, dt_duration async def con_deats( self, @@ -463,7 +483,7 @@ class Client: self, pattern: str, # how many contracts to search "up to" - upto: int = 6, + upto: int = 16, asdicts: bool = True, ) -> dict[str, ContractDetails]: @@ -498,6 +518,16 @@ class Client: exch = tract.exchange if exch not in _exch_skip_list: + + # try to lookup any contracts from our adhoc set + # since often the exchange/venue is named slightly + # different (eg. BRR.CMECRYPTO` instead of just + # `.CME`). + info = _adhoc_symbol_map.get(sym) + if info: + con_kwargs, bars_kwargs = info + exch = con_kwargs['exchange'] + # try get all possible contracts for symbol as per, # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut con = ibis.Future( @@ -748,11 +778,14 @@ class Client: async def get_head_time( self, - contract: Contract, - ) -> datetime: - """Return the first datetime stamp for ``contract``. + fqsn: str, - """ + ) -> datetime: + ''' + Return the first datetime stamp for ``contract``. + + ''' + contract = (await self.find_contracts(fqsn))[0] return await self.ib.reqHeadTimeStampAsync( contract, whatToShow='TRADES', @@ -822,9 +855,7 @@ class Client: # async to be consistent for the client proxy, and cuz why not. def submit_limit( self, - # ignored since ib doesn't support defining your - # own order id - oid: str, + oid: str, # ignored since doesn't support defining your own symbol: str, price: float, action: str, @@ -840,6 +871,9 @@ class Client: ''' Place an order and return integer request id provided by client. + Relevant docs: + - https://interactivebrokers.github.io/tws-api/order_limitations.html + ''' try: contract = self._contracts[symbol] @@ -865,6 +899,9 @@ class Client: optOutSmartRouting=True, routeMarketableToBbo=True, designatedLocation='SMART', + # TODO: make all orders GTC? + # https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba + # goodTillDate=f"yyyyMMdd-HH:mm:ss", ), ) except AssertionError: # errrg insync.. @@ -1066,6 +1103,7 @@ async def load_aio_clients( # retry a few times to get the client going.. connect_retries: int = 3, connect_timeout: float = 0.5, + disconnect_on_exit: bool = True, ) -> dict[str, Client]: ''' @@ -1207,10 +1245,11 @@ async def load_aio_clients( finally: # TODO: for re-scans we'll want to not teardown clients which # are up and stable right? - for acct, client in _accounts2clients.items(): - log.info(f'Disconnecting {acct}@{client}') - client.ib.disconnect() - _client_cache.pop((host, port), None) + if disconnect_on_exit: + for acct, client in _accounts2clients.items(): + log.info(f'Disconnecting {acct}@{client}') + client.ib.disconnect() + _client_cache.pop((host, port), None) async def load_clients_for_trio( diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index f9dd91ea..daf9a703 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -305,7 +305,7 @@ async def update_ledger_from_api_trades( entry['listingExchange'] = pexch conf = get_config() - entries = trades_to_ledger_entries( + entries = api_trades_to_ledger_entries( conf['accounts'].inverse, trade_entries, ) @@ -362,7 +362,7 @@ async def update_and_audit_msgs( # if ib reports a lesser pp it's not as bad since we can # presume we're at least not more in the shit then we # thought. - if diff: + if diff and pikersize: reverse_split_ratio = pikersize / ibsize split_ratio = 1/reverse_split_ratio @@ -372,6 +372,7 @@ async def update_and_audit_msgs( entry = f'split_ratio = 1/{int(reverse_split_ratio)}' raise ValueError( + # log.error( f'POSITION MISMATCH ib <-> piker ledger:\n' f'ib: {ibppmsg}\n' f'piker: {msg}\n' @@ -1122,18 +1123,16 @@ def norm_trade_records( continue # timestamping is way different in API records + dtstr = record.get('datetime') date = record.get('date') - if not date: - # probably a flex record with a wonky non-std timestamp.. - date, ts = record['dateTime'].split(';') - dt = pendulum.parse(date) - ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' - tsdt = pendulum.parse(ts) - dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) + flex_dtstr = record.get('dateTime') - else: - # epoch_dt = pendulum.from_timestamp(record.get('time')) - dt = pendulum.parse(date) + if dtstr or date: + dt = pendulum.parse(dtstr or date) + + elif flex_dtstr: + # probably a flex record with a wonky non-std timestamp.. + dt = parse_flex_dt(record['dateTime']) # special handling of symbol extraction from # flex records using some ad-hoc schema parsing. @@ -1182,69 +1181,58 @@ def norm_trade_records( return {r.tid: r for r in records} -def trades_to_ledger_entries( +def parse_flex_dt( + record: str, +) -> pendulum.datetime: + date, ts = record.split(';') + dt = pendulum.parse(date) + ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' + tsdt = pendulum.parse(ts) + return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) + + +def api_trades_to_ledger_entries( accounts: bidict, trade_entries: list[object], - source_type: str = 'api', ) -> dict: ''' - Convert either of API execution objects or flex report - entry objects into ``dict`` form, pretty much straight up - without modification. + Convert API execution objects entry objects into ``dict`` form, + pretty much straight up without modification except add + a `pydatetime` field from the parsed timestamp. ''' trades_by_account = {} - for t in trade_entries: - if source_type == 'flex': - entry = t.__dict__ + # NOTE: example of schema we pull from the API client. + # { + # 'commissionReport': CommissionReport(... + # 'contract': {... + # 'execution': Execution(... + # 'time': 1654801166.0 + # } - # XXX: LOL apparently ``toml`` has a bug - # where a section key error will show up in the write - # if you leave a table key as an `int`? So i guess - # cast to strs for all keys.. + # flatten all sub-dicts and values into one top level entry. + entry = {} + for section, val in t.items(): + match section: + case 'contract' | 'execution' | 'commissionReport': + # sub-dict cases + entry.update(val) - # oddly for some so-called "BookTrade" entries - # this field seems to be blank, no cuckin clue. - # trade['ibExecID'] - tid = str(entry.get('ibExecID') or entry['tradeID']) - # date = str(entry['tradeDate']) + case 'time': + # ib has wack ns timestamps, or is that us? + continue - # XXX: is it going to cause problems if a account name - # get's lost? The user should be able to find it based - # on the actual exec history right? - acctid = accounts[str(entry['accountId'])] + case _: + entry[section] = val - elif source_type == 'api': - # NOTE: example of schema we pull from the API client. - # { - # 'commissionReport': CommissionReport(... - # 'contract': {... - # 'execution': Execution(... - # 'time': 1654801166.0 - # } - - # flatten all sub-dicts and values into one top level entry. - entry = {} - for section, val in t.items(): - match section: - case 'contract' | 'execution' | 'commissionReport': - # sub-dict cases - entry.update(val) - - case 'time': - # ib has wack ns timestamps, or is that us? - continue - - case _: - entry[section] = val - - tid = str(entry['execId']) - dt = pendulum.from_timestamp(entry['time']) - # TODO: why isn't this showing seconds in the str? - entry['date'] = str(dt) - acctid = accounts[entry['acctNumber']] + tid = str(entry['execId']) + dt = pendulum.from_timestamp(entry['time']) + # TODO: why isn't this showing seconds in the str? + entry['pydatetime'] = dt + entry['datetime'] = str(dt) + acctid = accounts[entry['acctNumber']] if not tid: # this is likely some kind of internal adjustment @@ -1262,6 +1250,73 @@ def trades_to_ledger_entries( acctid, {} )[tid] = entry + # sort entries in output by python based datetime + for acctid in trades_by_account: + trades_by_account[acctid] = dict(sorted( + trades_by_account[acctid].items(), + key=lambda entry: entry[1].pop('pydatetime'), + )) + + return trades_by_account + + +def flex_records_to_ledger_entries( + accounts: bidict, + trade_entries: list[object], + +) -> dict: + ''' + Convert flex report entry objects into ``dict`` form, pretty much + straight up without modification except add a `pydatetime` field + from the parsed timestamp. + + ''' + trades_by_account = {} + for t in trade_entries: + entry = t.__dict__ + + # XXX: LOL apparently ``toml`` has a bug + # where a section key error will show up in the write + # if you leave a table key as an `int`? So i guess + # cast to strs for all keys.. + + # oddly for some so-called "BookTrade" entries + # this field seems to be blank, no cuckin clue. + # trade['ibExecID'] + tid = str(entry.get('ibExecID') or entry['tradeID']) + # date = str(entry['tradeDate']) + + # XXX: is it going to cause problems if a account name + # get's lost? The user should be able to find it based + # on the actual exec history right? + acctid = accounts[str(entry['accountId'])] + + # probably a flex record with a wonky non-std timestamp.. + dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime']) + entry['datetime'] = str(dt) + + if not tid: + # this is likely some kind of internal adjustment + # transaction, likely one of the following: + # - an expiry event that will show a "book trade" indicating + # some adjustment to cash balances: zeroing or itm settle. + # - a manual cash balance position adjustment likely done by + # the user from the accounts window in TWS where they can + # manually set the avg price and size: + # https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST + log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') + continue + + trades_by_account.setdefault( + acctid, {} + )[tid] = entry + + for acctid in trades_by_account: + trades_by_account[acctid] = dict(sorted( + trades_by_account[acctid].items(), + key=lambda entry: entry[1]['pydatetime'], + )) + return trades_by_account @@ -1308,15 +1363,16 @@ def load_flex_trades( ln = len(trade_entries) log.info(f'Loaded {ln} trades from flex query') - trades_by_account = trades_to_ledger_entries( - # get reverse map to user account names - conf['accounts'].inverse, + trades_by_account = flex_records_to_ledger_entries( + conf['accounts'].inverse, # reverse map to user account names trade_entries, - source_type='flex', ) + ledger_dict: Optional[dict] = None + for acctid in trades_by_account: trades_by_id = trades_by_account[acctid] + with open_trade_ledger('ib', acctid) as ledger_dict: tid_delta = set(trades_by_id) - set(ledger_dict) log.info( @@ -1324,9 +1380,11 @@ def load_flex_trades( f'{pformat(tid_delta)}' ) if tid_delta: - ledger_dict.update( - {tid: trades_by_id[tid] for tid in tid_delta} - ) + sorted_delta = dict(sorted( + {tid: trades_by_id[tid] for tid in tid_delta}.items(), + key=lambda entry: entry[1].pop('pydatetime'), + )) + ledger_dict.update(sorted_delta) return ledger_dict diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 18981f60..53910f38 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -22,6 +22,7 @@ import asyncio from contextlib import asynccontextmanager as acm from dataclasses import asdict from datetime import datetime +from functools import partial from math import isnan import time from typing import ( @@ -38,8 +39,11 @@ import tractor import trio from trio_typing import TaskStatus -from piker.data._sharedmem import ShmArray -from .._util import SymbolNotFound, NoData +from .._util import ( + NoData, + DataUnavailable, + SymbolNotFound, +) from .api import ( # _adhoc_futes_set, con2fqsn, @@ -103,7 +107,7 @@ async def open_data_client() -> MethodProxy: @acm async def open_history_client( - symbol: str, + fqsn: str, ) -> tuple[Callable, int]: ''' @@ -111,26 +115,65 @@ async def open_history_client( that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. ''' + # TODO: + # - add logic to handle tradable hours and only grab + # valid bars in the range? + # - we want to avoid overrunning the underlying shm array buffer and + # we should probably calc the number of calls to make depending on + # that until we have the `marketstore` daemon in place in which case + # the shm size will be driven by user config and available sys + # memory. + async with open_data_client() as proxy: + max_timeout: float = 2. + mean: float = 0 + count: int = 0 + + head_dt = await proxy.get_head_time(fqsn=fqsn) + async def get_hist( + timeframe: float, end_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None, ) -> tuple[np.ndarray, str]: + nonlocal max_timeout, mean, count - out, fails = await get_bars(proxy, symbol, end_dt=end_dt) + query_start = time.time() + out, timedout = await get_bars( + proxy, + fqsn, + timeframe, + end_dt=end_dt, + ) + latency = time.time() - query_start + if ( + not timedout + # and latency <= max_timeout + ): + count += 1 + mean += latency / count + print( + f'HISTORY FRAME QUERY LATENCY: {latency}\n' + f'mean: {mean}' + ) - # TODO: add logic here to handle tradable hours and only grab - # valid bars in the range - if out is None: + if ( + out is None + ): # could be trying to retreive bars over weekend log.error(f"Can't grab bars starting at {end_dt}!?!?") raise NoData( f'{end_dt}', - frame_size=2000, + # frame_size=2000, ) + if ( + end_dt and end_dt <= head_dt + ): + raise DataUnavailable(f'First timestamp is {head_dt}') + bars, bars_array, first_dt, last_dt = out # volume cleaning since there's -ve entries, @@ -145,7 +188,7 @@ async def open_history_client( # quite sure why.. needs some tinkering and probably # a lookthrough of the ``ib_insync`` machinery, for eg. maybe # we have to do the batch queries on the `asyncio` side? - yield get_hist, {'erlangs': 1, 'rate': 6} + yield get_hist, {'erlangs': 1, 'rate': 3} _pacing: str = ( @@ -154,261 +197,287 @@ _pacing: str = ( ) +async def wait_on_data_reset( + proxy: MethodProxy, + reset_type: str = 'data', + timeout: float = 16, + + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + trio.Event, + ] + ] = trio.TASK_STATUS_IGNORED, +) -> bool: + + # TODO: we might have to put a task lock around this + # method.. + hist_ev = proxy.status_event( + 'HMDS data farm connection is OK:ushmds' + ) + + # XXX: other event messages we might want to try and + # wait for but i wasn't able to get any of this + # reliable.. + # reconnect_start = proxy.status_event( + # 'Market data farm is connecting:usfuture' + # ) + # live_ev = proxy.status_event( + # 'Market data farm connection is OK:usfuture' + # ) + # try to wait on the reset event(s) to arrive, a timeout + # will trigger a retry up to 6 times (for now). + + done = trio.Event() + with trio.move_on_after(timeout) as cs: + + task_status.started((cs, done)) + + log.warning('Sending DATA RESET request') + res = await data_reset_hack(reset_type=reset_type) + + if not res: + log.warning( + 'NO VNC DETECTED!\n' + 'Manually press ctrl-alt-f on your IB java app' + ) + done.set() + return False + + # TODO: not sure if waiting on other events + # is all that useful here or not. + # - in theory you could wait on one of the ones above first + # to verify the reset request was sent? + # - we need the same for real-time quote feeds which can + # sometimes flake out and stop delivering.. + for name, ev in [ + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + done.set() + return True + + if cs.cancel_called: + log.warning( + 'Data reset task canceled?' + ) + + done.set() + return False + + +_data_resetter_task: trio.Task | None = None + + async def get_bars( proxy: MethodProxy, fqsn: str, + timeframe: int, # blank to start which tells ib to look up the latest datum end_dt: str = '', + # TODO: make this more dynamic based on measured frame rx latency? + # how long before we trigger a feed reset (seconds) + feed_reset_timeout: float = 3, + + # how many days to subtract before giving up on further + # history queries for instrument, presuming that most don't + # not trade for a week XD + max_nodatas: int = 6, + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> (dict, np.ndarray): ''' Retrieve historical data from a ``trio``-side task using a ``MethoProxy``. ''' - fails = 0 - bars: Optional[list] = None - first_dt: datetime = None - last_dt: datetime = None + global _data_resetter_task + nodatas_count: int = 0 - if end_dt: - last_dt = pendulum.from_timestamp(end_dt.timestamp()) + data_cs: trio.CancelScope | None = None + result: tuple[ + ibis.objects.BarDataList, + np.ndarray, + datetime, + datetime, + ] | None = None + result_ready = trio.Event() - for _ in range(10): - try: - out = await proxy.bars( - fqsn=fqsn, - end_dt=end_dt, - ) - if out: - bars, bars_array = out + async def query(): + nonlocal result, data_cs, end_dt, nodatas_count + while True: + try: + out = await proxy.bars( + fqsn=fqsn, + end_dt=end_dt, + sample_period_s=timeframe, - else: - await tractor.breakpoint() - - if bars_array is None: - raise SymbolNotFound(fqsn) - - first_dt = pendulum.from_timestamp( - bars[0].date.timestamp()) - - last_dt = pendulum.from_timestamp( - bars[-1].date.timestamp()) - - time = bars_array['time'] - assert time[-1] == last_dt.timestamp() - assert time[0] == first_dt.timestamp() - log.info( - f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' - ) - - return (bars, bars_array, first_dt, last_dt), fails - - except RequestError as err: - msg = err.message - - if 'No market data permissions for' in msg: - # TODO: signalling for no permissions searches - raise NoData( - f'Symbol: {fqsn}', + # ideally we cancel the request just before we + # cancel on the ``trio``-side and trigger a data + # reset hack.. the problem is there's no way (with + # current impl) to detect a cancel case. + # timeout=timeout, ) - - elif ( - err.code == 162 and - 'HMDS query returned no data' in err.message - ): - # XXX: this is now done in the storage mgmt layer - # and we shouldn't implicitly decrement the frame dt - # index since the upper layer may be doing so - # concurrently and we don't want to be delivering frames - # that weren't asked for. - log.warning( - f'NO DATA found ending @ {end_dt}\n' - ) - - # try to decrement start point and look further back - # end_dt = last_dt = last_dt.subtract(seconds=2000) - - raise NoData( - f'Symbol: {fqsn}', - frame_size=2000, - ) - - # elif ( - # err.code == 162 and - # 'Trading TWS session is connected from a different IP - # address' in err.message - # ): - # log.warning("ignoring ip address warning") - # continue - - elif _pacing in msg: - - log.warning( - 'History throttle rate reached!\n' - 'Resetting farms with `ctrl-alt-f` hack\n' - ) - # TODO: we might have to put a task lock around this - # method.. - hist_ev = proxy.status_event( - 'HMDS data farm connection is OK:ushmds' - ) - - # XXX: other event messages we might want to try and - # wait for but i wasn't able to get any of this - # reliable.. - # reconnect_start = proxy.status_event( - # 'Market data farm is connecting:usfuture' - # ) - # live_ev = proxy.status_event( - # 'Market data farm connection is OK:usfuture' - # ) - - # try to wait on the reset event(s) to arrive, a timeout - # will trigger a retry up to 6 times (for now). - tries: int = 2 - timeout: float = 10 - - # try 3 time with a data reset then fail over to - # a connection reset. - for i in range(1, tries): - - log.warning('Sending DATA RESET request') - await data_reset_hack(reset_type='data') - - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") - break - - if cs.cancelled_caught: - fails += 1 - log.warning( - f'Data reset {name} timeout, retrying {i}.' - ) - - continue - else: - - log.warning('Sending CONNECTION RESET') - res = await data_reset_hack(reset_type='connection') - if not res: - log.warning( - 'NO VNC DETECTED!\n' - 'Manually press ctrl-alt-f on your IB java app' - ) - # break - - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") - - if cs.cancelled_caught: - fails += 1 - log.warning('Data CONNECTION RESET timeout!?') - - else: - raise - - return None, None - # else: # throttle wasn't fixed so error out immediately - # raise _err - - -async def backfill_bars( - - fqsn: str, - shm: ShmArray, # type: ignore # noqa - - # TODO: we want to avoid overrunning the underlying shm array buffer - # and we should probably calc the number of calls to make depending - # on that until we have the `marketstore` daemon in place in which - # case the shm size will be driven by user config and available sys - # memory. - count: int = 16, - - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - - TODO: avoid pacing constraints: - https://github.com/pikers/piker/issues/128 - - ''' - # last_dt1 = None - last_dt = None - - with trio.CancelScope() as cs: - - async with open_data_client() as proxy: - - out, fails = await get_bars(proxy, fqsn) - - if out is None: - raise RuntimeError("Could not pull currrent history?!") - - (first_bars, bars_array, first_dt, last_dt) = out - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - last_dt = first_dt - - # write historical data to buffer - shm.push(bars_array) - - task_status.started(cs) - - i = 0 - while i < count: - - out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) - if out is None: - # could be trying to retreive bars over weekend - # TODO: add logic here to handle tradable hours and - # only grab valid bars in the range - log.error(f"Can't grab bars starting at {first_dt}!?!?") + raise NoData(f'{end_dt}') - # XXX: get_bars() should internally decrement dt by - # 2k seconds and try again. + bars, bars_array, dt_duration = out + + if not bars: + log.warning( + f'History is blank for {dt_duration} from {end_dt}' + ) + end_dt -= dt_duration continue - (first_bars, bars_array, first_dt, last_dt) = out - # last_dt1 = last_dt - # last_dt = first_dt + if bars_array is None: + raise SymbolNotFound(fqsn) - # volume cleaning since there's -ve entries, - # wood luv to know what crookery that is.. - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 + first_dt = pendulum.from_timestamp( + bars[0].date.timestamp()) - # TODO we should probably dig into forums to see what peeps - # think this data "means" and then use it as an indicator of - # sorts? dinkus has mentioned that $vlms for the day dont' - # match other platforms nor the summary stat tws shows in - # the monitor - it's probably worth investigating. + last_dt = pendulum.from_timestamp( + bars[-1].date.timestamp()) - shm.push(bars_array, prepend=True) - i += 1 + time = bars_array['time'] + assert time[-1] == last_dt.timestamp() + assert time[0] == first_dt.timestamp() + log.info( + f'{len(bars)} bars retreived {first_dt} -> {last_dt}' + ) + + if data_cs: + data_cs.cancel() + + result = (bars, bars_array, first_dt, last_dt) + + # signal data reset loop parent task + result_ready.set() + + return result + + except RequestError as err: + msg = err.message + + if 'No market data permissions for' in msg: + # TODO: signalling for no permissions searches + raise NoData( + f'Symbol: {fqsn}', + ) + + elif err.code == 162: + if ( + 'HMDS query returned no data' in msg + ): + # XXX: this is now done in the storage mgmt + # layer and we shouldn't implicitly decrement + # the frame dt index since the upper layer may + # be doing so concurrently and we don't want to + # be delivering frames that weren't asked for. + # try to decrement start point and look further back + # end_dt = end_dt.subtract(seconds=2000) + logmsg = "SUBTRACTING DAY from DT index" + if end_dt is not None: + end_dt = end_dt.subtract(days=1) + elif end_dt is None: + end_dt = pendulum.now().subtract(days=1) + + log.warning( + f'NO DATA found ending @ {end_dt}\n' + + logmsg + ) + + if nodatas_count >= max_nodatas: + raise DataUnavailable( + f'Presuming {fqsn} has no further history ' + f'after {max_nodatas} tries..' + ) + + nodatas_count += 1 + continue + + elif 'API historical data query cancelled' in err.message: + log.warning( + 'Query cancelled by IB (:eyeroll:):\n' + f'{err.message}' + ) + continue + elif ( + 'Trading TWS session is connected from a different IP' + in err.message + ): + log.warning("ignoring ip address warning") + continue + + # XXX: more or less same as above timeout case + elif _pacing in msg: + log.warning( + 'History throttle rate reached!\n' + 'Resetting farms with `ctrl-alt-f` hack\n' + ) + + # cancel any existing reset task + if data_cs: + data_cs.cancel() + + # spawn new data reset task + data_cs, reset_done = await nurse.start( + partial( + wait_on_data_reset, + proxy, + timeout=float('inf'), + reset_type='connection' + ) + ) + continue + + else: + raise + + # TODO: make this global across all history task/requests + # such that simultaneous symbol queries don't try data resettingn + # too fast.. + unset_resetter: bool = False + async with trio.open_nursery() as nurse: + + # start history request that we allow + # to run indefinitely until a result is acquired + nurse.start_soon(query) + + # start history reset loop which waits up to the timeout + # for a result before triggering a data feed reset. + while not result_ready.is_set(): + + with trio.move_on_after(feed_reset_timeout): + await result_ready.wait() + break + + if _data_resetter_task: + # don't double invoke the reset hack if another + # requester task already has it covered. + continue + else: + _data_resetter_task = trio.lowlevel.current_task() + unset_resetter = True + + # spawn new data reset task + data_cs, reset_done = await nurse.start( + partial( + wait_on_data_reset, + proxy, + timeout=float('inf'), + ) + ) + # sync wait on reset to complete + await reset_done.wait() + + _data_resetter_task = None if unset_resetter else _data_resetter_task + return result, data_cs is not None asset_type_map = { @@ -466,7 +535,9 @@ async def _setup_quote_stream( to_trio.send_nowait(None) - async with load_aio_clients() as accts2clients: + async with load_aio_clients( + disconnect_on_exit=False, + ) as accts2clients: caccount_name, client = get_preferred_data_client(accts2clients) contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) @@ -512,10 +583,11 @@ async def _setup_quote_stream( # Manually do the dereg ourselves. teardown() except trio.WouldBlock: - log.warning( - f'channel is blocking symbol feed for {symbol}?' - f'\n{to_trio.statistics}' - ) + # log.warning( + # f'channel is blocking symbol feed for {symbol}?' + # f'\n{to_trio.statistics}' + # ) + pass # except trio.WouldBlock: # # for slow debugging purposes to avoid clobbering prompt @@ -545,7 +617,8 @@ async def open_aio_quote_stream( from_aio = _quote_streams.get(symbol) if from_aio: - # if we already have a cached feed deliver a rx side clone to consumer + # if we already have a cached feed deliver a rx side clone + # to consumer async with broadcast_receiver( from_aio, 2**6, @@ -736,67 +809,97 @@ async def stream_quotes( await trio.sleep_forever() return # we never expect feed to come up? - async with open_aio_quote_stream( - symbol=sym, - contract=con, - ) as stream: - - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] - - task_status.started((init_msgs, first_quote)) - - async with aclosing(stream): - if syminfo.get('no_vlm', False): - - # generally speaking these feeds don't - # include vlm data. - atype = syminfo['asset_type'] - log.info( - f'Non-vlm asset {sym}@{atype}, skipping quote poll...' - ) - - else: - # wait for real volume on feed (trading might be closed) - while True: - ticker = await stream.receive() - - # for a real volume contract we rait for the first - # "real" trade to take place - if ( - # not calc_price - # and not ticker.rtTime - not ticker.rtTime - ): - # spin consuming tickers until we get a real - # market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] - - # XXX: this works because we don't use - # ``aclosing()`` above? - break - - quote = normalize(ticker) - log.debug(f"First ticker received {quote}") - - # tell caller quotes are now coming in live - feed_is_live.set() - - # last = time.time() - async for ticker in stream: - quote = normalize(ticker) - await send_chan.send({quote['fqsn']: quote}) - + cs: Optional[trio.CancelScope] = None + startup: bool = True + while ( + startup + or cs.cancel_called + ): + with trio.CancelScope() as cs: + async with ( + trio.open_nursery() as nurse, + open_aio_quote_stream( + symbol=sym, + contract=con, + ) as stream, + ): # ugh, clear ticks since we've consumed them - ticker.ticks = [] - # last = time.time() + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] + + # only on first entry at feed boot up + if startup: + startup = False + task_status.started((init_msgs, first_quote)) + + # start a stream restarter task which monitors the + # data feed event. + async def reset_on_feed(): + + # TODO: this seems to be surpressed from the + # traceback in ``tractor``? + # assert 0 + + rt_ev = proxy.status_event( + 'Market data farm connection is OK:usfarm' + ) + await rt_ev.wait() + cs.cancel() # cancel called should now be set + + nurse.start_soon(reset_on_feed) + + async with aclosing(stream): + if syminfo.get('no_vlm', False): + + # generally speaking these feeds don't + # include vlm data. + atype = syminfo['asset_type'] + log.info( + f'No-vlm {sym}@{atype}, skipping quote poll' + ) + + else: + # wait for real volume on feed (trading might be + # closed) + while True: + ticker = await stream.receive() + + # for a real volume contract we rait for + # the first "real" trade to take place + if ( + # not calc_price + # and not ticker.rtTime + not ticker.rtTime + ): + # spin consuming tickers until we + # get a real market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first volume tick") + # ugh, clear ticks since we've + # consumed them (ahem, ib_insync is + # truly stateful trash) + ticker.ticks = [] + + # XXX: this works because we don't use + # ``aclosing()`` above? + break + + quote = normalize(ticker) + log.debug(f"First ticker received {quote}") + + # tell caller quotes are now coming in live + feed_is_live.set() + + # last = time.time() + async for ticker in stream: + quote = normalize(ticker) + await send_chan.send({quote['fqsn']: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + # last = time.time() async def data_reset_hack( @@ -904,7 +1007,14 @@ async def open_symbol_search( except trio.WouldBlock: pass - if not pattern or pattern.isspace(): + if ( + not pattern + or pattern.isspace() + + # XXX: not sure if this is a bad assumption but it + # seems to make search snappier? + or len(pattern) < 1 + ): log.warning('empty pattern received, skipping..') # TODO: *BUG* if nothing is returned here the client diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 49961b52..b0bf9821 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -259,6 +259,7 @@ async def open_history_client( queries: int = 0 async def get_ohlc( + timeframe: float, end_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None, diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 853860aa..c87f9751 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -138,25 +138,26 @@ def cli(ctx, brokers, loglevel, tl, configdir): @click.pass_obj def services(config, tl, names): - async def list_services(): + from .._daemon import open_piker_runtime - async with tractor.get_arbiter( - *_tractor_kwargs['arbiter_addr'] - ) as portal: + async def list_services(): + async with ( + open_piker_runtime( + name='service_query', + loglevel=config['loglevel'] if tl else None, + ), + tractor.get_arbiter( + *_tractor_kwargs['arbiter_addr'] + ) as portal + ): registry = await portal.run_from_ns('self', 'get_registry') json_d = {} for key, socket in registry.items(): - # name, uuid = uid host, port = socket json_d[key] = f'{host}:{port}' click.echo(f"{colorize_json(json_d)}") - tractor.run( - list_services, - name='service_query', - loglevel=config['loglevel'] if tl else None, - arbiter_addr=_tractor_kwargs['arbiter_addr'], - ) + trio.run(list_services) def _load_clis() -> None: diff --git a/piker/data/feed.py b/piker/data/feed.py index 66b540ee..e4bacfc8 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -21,16 +21,19 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations -from dataclasses import dataclass, field +from contextlib import asynccontextmanager as acm +from dataclasses import ( + dataclass, + field, +) from datetime import datetime -from contextlib import asynccontextmanager from functools import partial -from pprint import pformat from types import ModuleType from typing import ( Any, - AsyncIterator, Optional, - Generator, + AsyncIterator, + Callable, + Optional, Awaitable, TYPE_CHECKING, Union, @@ -39,7 +42,6 @@ from typing import ( import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus -import trimeter import tractor from tractor.trionics import maybe_open_context import pendulum @@ -76,7 +78,6 @@ from ._sampling import ( _default_delay_s, ) from ..brokers._util import ( - NoData, DataUnavailable, ) @@ -252,6 +253,7 @@ async def start_backfill( mod: ModuleType, bfqsn: str, shm: ShmArray, + timeframe: float, last_tsdb_dt: Optional[datetime] = None, storage: Optional[Storage] = None, @@ -262,12 +264,19 @@ async def start_backfill( ) -> int: + hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ] + config: dict[str, int] async with mod.open_history_client(bfqsn) as (hist, config): # get latest query's worth of history all the way # back to what is recorded in the tsdb - array, start_dt, end_dt = await hist(end_dt=None) - + array, start_dt, end_dt = await hist( + timeframe, + end_dt=None, + ) times = array['time'] # sample period step size in seconds @@ -276,7 +285,7 @@ async def start_backfill( - pendulum.from_timestamp(times[-2]) ).seconds - # "frame"'s worth of sample period steps in seconds + # frame's worth of sample-period-steps, in seconds frame_size_s = len(array) * step_size_s to_push = diff_history( @@ -287,8 +296,11 @@ async def start_backfill( ) log.info(f'Pushing {to_push.size} to shm!') - shm.push(to_push) + shm.push(to_push, prepend=True) + # TODO: *** THIS IS A BUG *** + # we need to only broadcast to subscribers for this fqsn.. + # otherwise all fsps get reset on every chart.. for delay_s in sampler.subscribers: await broadcast(delay_s) @@ -296,7 +308,11 @@ async def start_backfill( bf_done = trio.Event() # let caller unblock and deliver latest history frame - task_status.started((start_dt, end_dt, bf_done)) + task_status.started(( + start_dt, + end_dt, + bf_done, + )) # based on the sample step size, maybe load a certain amount history if last_tsdb_dt is None: @@ -304,14 +320,14 @@ async def start_backfill( raise ValueError( '`piker` only needs to support 1m and 1s sampling ' 'but ur api is trying to deliver a longer ' - f'timeframe of {step_size_s} ' 'seconds.. so ye, dun ' - 'do dat brudder.' + f'timeframe of {step_size_s} seconds..\n' + 'So yuh.. dun do dat brudder.' ) # when no tsdb "last datum" is provided, we just load # some near-term history. periods = { - 1: {'seconds': 4000}, + 1: {'days': 1}, 60: {'days': 14}, } @@ -319,96 +335,61 @@ async def start_backfill( # do a decently sized backfill and load it into storage. periods = { 1: {'days': 6}, - 60: {'years': 2}, + 60: {'years': 6}, } kwargs = periods[step_size_s] + + # NOTE: manually set the "latest" datetime which we intend to + # backfill history "until" so as to adhere to the history + # settings above when the tsdb is detected as being empty. last_tsdb_dt = start_dt.subtract(**kwargs) # configure async query throttling - erlangs = config.get('erlangs', 1) - rate = config.get('rate', 1) - frames = {} + # rate = config.get('rate', 1) + # XXX: legacy from ``trimeter`` code but unsupported now. + # erlangs = config.get('erlangs', 1) - def iter_dts(start: datetime): + # avoid duplicate history frames with a set of datetime frame + # starts. + starts: set[datetime] = set() - while True: - - hist_period = pendulum.period( - start, - last_tsdb_dt, - ) - dtrange = list(hist_period.range('seconds', frame_size_s)) - log.debug(f'New datetime index:\n{pformat(dtrange)}') - - for end_dt in dtrange: - log.info(f'Yielding next frame start {end_dt}') - start = yield end_dt - - # if caller sends a new start date, reset to that - if start is not None: - log.warning(f'Resetting date range: {start}') - break - else: - # from while - return - - # pull new history frames until we hit latest - # already in the tsdb or a max count. - count = 0 - - # NOTE: when gaps are detected in the retreived history (by - # comparisor of the end - start versus the expected "frame size" - # in seconds) we need a way to alert the async request code not - # to continue to query for data "within the gap". This var is - # set in such cases such that further requests in that period - # are discarded and further we reset the "datetimem query frame - # index" in such cases to avoid needless noop requests. - earliest_end_dt: Optional[datetime] = start_dt - - async def get_ohlc_frame( - input_end_dt: datetime, - iter_dts_gen: Generator[datetime], - - ) -> np.ndarray: - - nonlocal count, frames, earliest_end_dt, frame_size_s - count += 1 - - if input_end_dt > earliest_end_dt: - # if a request comes in for an inter-gap frame we - # discard it since likely this request is still - # lingering from before the reset of ``iter_dts()`` via - # ``.send()`` below. - log.info(f'Discarding request history ending @ {input_end_dt}') - - # signals to ``trimeter`` loop to discard and - # ``continue`` in it's schedule loop. - return None + # inline sequential loop where we simply pass the + # last retrieved start dt to the next request as + # it's end dt. + while start_dt > last_tsdb_dt: + log.info( + f'Requesting {step_size_s}s frame ending in {start_dt}' + ) try: - log.info( - f'Requesting {step_size_s}s frame ending in {input_end_dt}' + array, next_start_dt, end_dt = await hist( + timeframe, + end_dt=start_dt, ) - array, start_dt, end_dt = await hist(end_dt=input_end_dt) - assert array['time'][0] == start_dt.timestamp() - except NoData: + # broker says there never was or is no more history to pull + except DataUnavailable: log.warning( - f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?' + f'NO-MORE-DATA: backend {mod.name} halted history!?' ) - return None # discard signal - - except DataUnavailable as duerr: - # broker is being a bish and we can't pull - # any more.. - log.warning('backend halted on data deliver !?!?') # ugh, what's a better way? # TODO: fwiw, we probably want a way to signal a throttle # condition (eg. with ib) so that we can halt the # request loop until the condition is resolved? - return duerr + return + + if next_start_dt in starts: + start_dt = min(starts) + print("SKIPPING DUPLICATE FRAME @ {next_start_dt}") + continue + + # only update new start point if not-yet-seen + start_dt = next_start_dt + starts.add(start_dt) + + assert array['time'][0] == start_dt.timestamp() diff = end_dt - start_dt frame_time_diff_s = diff.seconds @@ -419,42 +400,12 @@ async def start_backfill( # XXX: query result includes a start point prior to our # expected "frame size" and thus is likely some kind of # history gap (eg. market closed period, outage, etc.) - # so indicate to the request loop that this gap is - # expected by both, - # - resetting the ``iter_dts()`` generator to start at - # the new start point delivered in this result - # - setting the non-locally scoped ``earliest_end_dt`` - # to this new value so that the request loop doesn't - # get tripped up thinking there's an out of order - # request-result condition. - + # so just report it to console for now. log.warning( f'History frame ending @ {end_dt} appears to have a gap:\n' f'{diff} ~= {frame_time_diff_s} seconds' ) - # reset dtrange gen to new start point - try: - next_end = iter_dts_gen.send(start_dt) - log.info( - f'Reset frame index to start at {start_dt}\n' - f'Was at {next_end}' - ) - - # NOTE: manually set "earliest end datetime" index-value - # to avoid the request loop getting confused about - # new frames that are earlier in history - i.e. this - # **is not** the case of out-of-order frames from - # an async batch request. - earliest_end_dt = start_dt - - except StopIteration: - # gen already terminated meaning we probably already - # exhausted it via frame requests. - log.info( - "Datetime index already exhausted, can't reset.." - ) - to_push = diff_history( array, start_dt, @@ -464,194 +415,288 @@ async def start_backfill( ln = len(to_push) if ln: log.info(f'{ln} bars for {start_dt} -> {end_dt}') - frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) - return to_push, start_dt, end_dt else: log.warning( f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' ) - return None - # initial dt index starts at the start of the first query result - idts = iter_dts(start_dt) + # bail gracefully on shm allocation overrun/full condition + try: + shm.push(to_push, prepend=True) + except ValueError: + log.info( + f'Shm buffer overrun on: {start_dt} -> {end_dt}?' + ) + break - async with trimeter.amap( - partial( - get_ohlc_frame, - # we close in the ``iter_dt()`` gen in so we can send - # reset signals as needed for gap dection in the - # history. - iter_dts_gen=idts, - ), - idts, + log.info( + f'Shm pushed {ln} frame:\n' + f'{start_dt} -> {end_dt}' + ) - capture_outcome=True, - include_value=True, + if ( + storage is not None + and write_tsdb + ): + log.info( + f'Writing {ln} frame to storage:\n' + f'{start_dt} -> {end_dt}' + ) + await storage.write_ohlcv( + f'{bfqsn}.{mod.name}', # lul.. + to_push, + timeframe, + ) - # better technical names bruv... - max_at_once=erlangs, - max_per_second=rate, + # TODO: can we only trigger this if the respective + # history in "in view"?!? - ) as outcomes: + # XXX: extremely important, there can be no checkpoints + # in the block above to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + for delay_s in sampler.subscribers: + await broadcast(delay_s) - # Then iterate over the return values, as they become available - # (i.e., not necessarily in the original order) - async for input_end_dt, outcome in outcomes: + # short-circuit (for now) + bf_done.set() - try: - out = outcome.unwrap() - if out is None: - # skip signal - continue +async def basic_backfill( + bus: _FeedsBus, + mod: ModuleType, + bfqsn: str, + shms: dict[int, ShmArray], - elif isinstance(out, DataUnavailable): - # no data available case signal.. so just kill - # further requests and basically just stop - # trying... - break +) -> None: - except Exception: - log.exception('uhh trimeter bail') - raise - else: - to_push, start_dt, end_dt = out + # do a legacy incremental backfill from the provider. + log.info('No TSDB (marketstored) found, doing basic backfill..') - if not len(to_push): - # diff returned no new data (i.e. we probablyl hit - # the ``last_tsdb_dt`` point). - # TODO: raise instead? - log.warning(f'No history for range {start_dt} -> {end_dt}') - continue + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + for timeframe, shm in shms.items(): + try: + await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, + timeframe=timeframe, + ) + ) + except DataUnavailable: + # XXX: timeframe not supported for backend + continue - # pipeline-style pull frames until we need to wait for - # the next in order to arrive. - # i = end_dts.index(input_end_dt) - # print(f'latest end_dt {end_dt} found at index {i}') - epochs = list(reversed(sorted(frames))) - for epoch in epochs: +async def tsdb_backfill( + mod: ModuleType, + marketstore: ModuleType, + bus: _FeedsBus, + storage: Storage, + fqsn: str, + bfqsn: str, + shms: dict[int, ShmArray], - start = shm.array['time'][0] - last_shm_prepend_dt = pendulum.from_timestamp(start) - earliest_frame_queue_dt = pendulum.from_timestamp(epoch) + task_status: TaskStatus[ + tuple[ShmArray, ShmArray] + ] = trio.TASK_STATUS_IGNORED, - diff = start - epoch +) -> None: - if diff < 0: - log.warning( - 'Discarding out of order frame:\n' - f'{earliest_frame_queue_dt}' - ) - frames.pop(epoch) - continue + # TODO: this should be used verbatim for the pure + # shm backfiller approach below. + dts_per_tf: dict[int, datetime] = {} - if diff > step_size_s: + # start history anal and load missing new data via backend. + for timeframe, shm in shms.items(): + tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( + fqsn, + timeframe=timeframe, + ) - if earliest_end_dt < earliest_frame_queue_dt: - # XXX: an expected gap was encountered (see - # logic in ``get_ohlc_frame()``, so allow - # this frame through to the storage layer. - log.warning( - f'Expected history gap of {diff}s:\n' - f'{earliest_frame_queue_dt} <- ' - f'{earliest_end_dt}' - ) + broker, symbol, expiry = unpack_fqsn(fqsn) + try: + ( + latest_start_dt, + latest_end_dt, + bf_done, + ) = await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, + timeframe=timeframe, + last_tsdb_dt=last_tsdb_dt, + tsdb_is_up=True, + storage=storage, + ) + ) + except DataUnavailable: + # XXX: timeframe not supported for backend + dts_per_tf[timeframe] = ( + tsdb_history, + last_tsdb_dt, + None, + None, + None, + ) + continue - elif ( - erlangs > 1 - ): - # we don't yet have the next frame to push - # so break back to the async request loop - # while we wait for more async frame-results - # to arrive. - if len(frames) >= erlangs: - log.warning( - 'Frame count in async-queue is greater ' - 'then erlangs?\n' - 'There seems to be a gap between:\n' - f'{earliest_frame_queue_dt} <- ' - f'{last_shm_prepend_dt}\n' - 'Conducting manual call for frame ending: ' - f'{last_shm_prepend_dt}' - ) - ( - to_push, - start_dt, - end_dt, - ) = await get_ohlc_frame( - input_end_dt=last_shm_prepend_dt, - iter_dts_gen=idts, - ) - last_epoch = to_push['time'][-1] - diff = start - last_epoch + # tsdb_history = series.get(timeframe) + dts_per_tf[timeframe] = ( + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + bf_done, + ) - if diff > step_size_s: - await tractor.breakpoint() - raise DataUnavailable( - 'An awkward frame was found:\n' - f'{start_dt} -> {end_dt}:\n{to_push}' - ) + # if len(hist_shm.array) < 2: + # TODO: there's an edge case here to solve where if the last + # frame before market close (at least on ib) was pushed and + # there was only "1 new" row pushed from the first backfill + # query-iteration, then the sample step sizing calcs will + # break upstream from here since you can't diff on at least + # 2 steps... probably should also add logic to compute from + # the tsdb series and stash that somewhere as meta data on + # the shm buffer?.. no se. - else: - frames[last_epoch] = ( - to_push, start_dt, end_dt) - break + # unblock the feed bus management task + # assert len(shms[1].array) + task_status.started(( + shms[60], + shms[1], + )) - expect_end = pendulum.from_timestamp(start) - expect_start = expect_end.subtract( - seconds=frame_size_s) - log.warning( - 'waiting on out-of-order history frame:\n' - f'{expect_end - expect_start}' - ) - break + async def back_load_from_tsdb( + timeframe: int, + shm: ShmArray, + ): + ( + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + bf_done, + ) = dts_per_tf[timeframe] - to_push, start_dt, end_dt = frames.pop(epoch) - ln = len(to_push) + # sync to backend history task's query/load completion + if bf_done: + await bf_done.wait() - # bail gracefully on shm allocation overrun/full condition - try: - shm.push(to_push, prepend=True) - except ValueError: - log.info( - f'Shm buffer overrun on: {start_dt} -> {end_dt}?' - ) - break + # Load tsdb history into shm buffer (for display). - log.info( - f'Shm pushed {ln} frame:\n' - f'{start_dt} -> {end_dt}' - ) - # keep track of most recent "prepended" ``start_dt`` - # both for detecting gaps and ensuring async - # frame-result order. - earliest_end_dt = start_dt + # TODO: eventually it'd be nice to not require a shm array/buffer + # to accomplish this.. maybe we can do some kind of tsdb direct to + # graphics format eventually in a child-actor? - if ( - storage is not None - and write_tsdb - ): - log.info( - f'Writing {ln} frame to storage:\n' - f'{start_dt} -> {end_dt}' - ) - await storage.write_ohlcv( - f'{bfqsn}.{mod.name}', # lul.. - to_push, - ) + # do diff against last start frame of history and only fill + # in from the tsdb an allotment that allows for most recent + # to be loaded into mem *before* tsdb data. + if last_tsdb_dt and latest_start_dt: + dt_diff_s = ( + latest_start_dt - last_tsdb_dt + ).seconds + else: + dt_diff_s = 0 - # TODO: can we only trigger this if the respective - # history in "in view"?!? - # XXX: extremely important, there can be no checkpoints - # in the block above to avoid entering new ``frames`` - # values while we're pipelining the current ones to - # memory... + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value + + # sanity check on most-recent-data loading + assert prepend_start > dt_diff_s + + if ( + len(tsdb_history) + ): + to_push = tsdb_history[:prepend_start] + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=marketstore.ohlc_key_map, + ) + prepend_start = shm._first.value + + # load as much from storage into shm as space will + # allow according to user's shm size settings. + last_frame_start = tsdb_history['Epoch'][0] + + while ( + shm._first.value > 0 + ): + tsdb_history = await storage.read_ohlcv( + fqsn, + end=last_frame_start, + timeframe=timeframe, + ) + if ( + not len(tsdb_history) + ): + # on empty db history + break + + time = tsdb_history['Epoch'] + frame_start = time[0] + frame_end = time[0] + print(f"LOADING MKTS HISTORY: {frame_start} - {frame_end}") + + if frame_start >= last_frame_start: + # no new data loaded was from tsdb, so we can exit. + break + + prepend_start = shm._first.value + to_push = tsdb_history[:prepend_start] + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + shm.push( + to_push, + prepend=True, + field_map=marketstore.ohlc_key_map, + ) + last_frame_start = frame_start + + log.info(f'Loaded {to_push.shape} datums from storage') + + # manually trigger step update to update charts/fsps + # which need an incremental update. + # NOTE: the way this works is super duper + # un-intuitive right now: + # - the broadcaster fires a msg to the fsp subsystem. + # - fsp subsys then checks for a sample step diff and + # possibly recomputes prepended history. + # - the fsp then sends back to the parent actor + # (usually a chart showing graphics for said fsp) + # which tells the chart to conduct a manual full + # graphics loop cycle. for delay_s in sampler.subscribers: await broadcast(delay_s) - bf_done.set() + # TODO: write new data to tsdb to be ready to for next read. + + # backload from db (concurrently per timeframe) once backfilling of + # recent dat a loaded from the backend provider (see + # ``bf_done.wait()`` call). + async with trio.open_nursery() as nurse: + for timeframe, shm in shms.items(): + nurse.start_soon( + back_load_from_tsdb, + timeframe, + shm, + ) async def manage_history( @@ -660,8 +705,11 @@ async def manage_history( fqsn: str, some_data_ready: trio.Event, feed_is_live: trio.Event, + timeframe: float = 60, # in seconds - task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[ + tuple[ShmArray, ShmArray] + ] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -682,6 +730,8 @@ async def manage_history( # we expect the sub-actor to write readonly=False, ) + hist_zero_index = hist_shm.index - 1 + # TODO: history validation if not opened: raise RuntimeError( @@ -696,181 +746,94 @@ async def manage_history( # we expect the sub-actor to write readonly=False, - size=3*_secs_in_day, + size=4*_secs_in_day, ) + + # (for now) set the rt (hft) shm array with space to prepend + # only a few days worth of 1s history. + days = 3 + start_index = days*_secs_in_day + rt_shm._first.value = start_index + rt_shm._last.value = start_index + rt_zero_index = rt_shm.index - 1 + if not opened: raise RuntimeError( "Persistent shm for sym was already open?!" ) log.info('Scanning for existing `marketstored`') - - is_up = await check_for_service('marketstored') - - # for now only do backfilling if no tsdb can be found - do_legacy_backfill = not is_up and opened + tsdb_is_up = await check_for_service('marketstored') bfqsn = fqsn.replace('.' + mod.name, '') open_history_client = getattr(mod, 'open_history_client', None) assert open_history_client - if is_up and opened and open_history_client: - + if ( + tsdb_is_up + and opened + and open_history_client + ): log.info('Found existing `marketstored`') + from . import marketstore - async with marketstore.open_storage_client( - fqsn, - ) as storage: - - # TODO: this should be used verbatim for the pure - # shm backfiller approach below. - - # start history anal and load missing new data via backend. - series, _, last_tsdb_dt = await storage.load(fqsn) - - broker, symbol, expiry = unpack_fqsn(fqsn) - ( - latest_start_dt, - latest_end_dt, - bf_done, - ) = await bus.nursery.start( - partial( - start_backfill, - mod, - bfqsn, - hist_shm, - last_tsdb_dt=last_tsdb_dt, - tsdb_is_up=True, - storage=storage, - ) + async with ( + marketstore.open_storage_client(fqsn)as storage, + ): + hist_shm, rt_shm = await bus.nursery.start( + tsdb_backfill, + mod, + marketstore, + bus, + storage, + fqsn, + bfqsn, + { + 1: rt_shm, + 60: hist_shm, + }, ) - # if len(hist_shm.array) < 2: - # TODO: there's an edge case here to solve where if the last - # frame before market close (at least on ib) was pushed and - # there was only "1 new" row pushed from the first backfill - # query-iteration, then the sample step sizing calcs will - # break upstream from here since you can't diff on at least - # 2 steps... probably should also add logic to compute from - # the tsdb series and stash that somewhere as meta data on - # the shm buffer?.. no se. + # yield back after client connect with filled shm + task_status.started(( + hist_zero_index, + hist_shm, + rt_zero_index, + rt_shm, + )) - task_status.started((hist_shm, rt_shm)) + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. some_data_ready.set() - await bf_done.wait() - # do diff against last start frame of history and only fill - # in from the tsdb an allotment that allows for most recent - # to be loaded into mem *before* tsdb data. - if last_tsdb_dt: - dt_diff_s = ( - latest_start_dt - last_tsdb_dt - ).seconds - else: - dt_diff_s = 0 + # history retreival loop depending on user interaction and thus + # a small RPC-prot for remotely controllinlg what data is loaded + # for viewing. + await trio.sleep_forever() - # await trio.sleep_forever() - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - prepend_start = hist_shm._first.value - - # sanity check on most-recent-data loading - assert prepend_start > dt_diff_s - - history = list(series.values()) - if history: - fastest = history[0] - to_push = fastest[:prepend_start] - - hist_shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - # start=prepend_start, - field_map=marketstore.ohlc_key_map, - ) - - # load as much from storage into shm as space will - # allow according to user's shm size settings. - count = 0 - end = fastest['Epoch'][0] - - while hist_shm._first.value > 0: - count += 1 - series = await storage.read_ohlcv( - fqsn, - end=end, - ) - history = list(series.values()) - fastest = history[0] - end = fastest['Epoch'][0] - prepend_start -= len(to_push) - to_push = fastest[:prepend_start] - - hist_shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - # start=prepend_start, - field_map=marketstore.ohlc_key_map, - ) - - # manually trigger step update to update charts/fsps - # which need an incremental update. - # NOTE: the way this works is super duper - # un-intuitive right now: - # - the broadcaster fires a msg to the fsp subsystem. - # - fsp subsys then checks for a sample step diff and - # possibly recomputes prepended history. - # - the fsp then sends back to the parent actor - # (usually a chart showing graphics for said fsp) - # which tells the chart to conduct a manual full - # graphics loop cycle. - for delay_s in sampler.subscribers: - await broadcast(delay_s) - - if count > 6: - break - - log.info(f'Loaded {to_push.shape} datums from storage') - - # TODO: write new data to tsdb to be ready to for next read. - - if do_legacy_backfill: - # do a legacy incremental backfill from the provider. - log.info('No existing `marketstored` found..') - - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - await bus.nursery.start( - partial( - start_backfill, - mod, - bfqsn, - hist_shm, - ) + # load less history if no tsdb can be found + elif ( + not tsdb_is_up + and opened + ): + await basic_backfill( + bus, + mod, + bfqsn, + shms={ + 1: rt_shm, + 60: hist_shm, + }, ) - - # yield back after client connect with filled shm - task_status.started((hist_shm, rt_shm)) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. + task_status.started(( + hist_zero_index, + hist_shm, + rt_zero_index, + rt_shm, + )) some_data_ready.set() - - # history retreival loop depending on user interaction and thus - # a small RPC-prot for remotely controllinlg what data is loaded - # for viewing. - await trio.sleep_forever() + await trio.sleep_forever() async def allocate_persistent_feed( @@ -937,7 +900,12 @@ async def allocate_persistent_feed( # https://github.com/python-trio/trio/issues/2258 # bus.nursery.start_soon( # await bus.start_task( - hist_shm, rt_shm = await bus.nursery.start( + ( + izero_hist, + hist_shm, + izero_rt, + rt_shm, + ) = await bus.nursery.start( manage_history, mod, bus, @@ -951,7 +919,8 @@ async def allocate_persistent_feed( # this task. msg = init_msg[symbol] msg['hist_shm_token'] = hist_shm.token - msg['startup_hist_index'] = hist_shm.index - 1 + msg['izero_hist'] = izero_hist + msg['izero_rt'] = izero_rt msg['rt_shm_token'] = rt_shm.token # true fqsn @@ -981,31 +950,19 @@ async def allocate_persistent_feed( fqsn: first_quote, } + # for ambiguous names we simply apply the retreived + # feed to that name (for now). bus.feeds[symbol] = bus.feeds[bfqsn] = ( init_msg, generic_first_quotes, ) - # for ambiguous names we simply apply the retreived - # feed to that name (for now). + # insert 1s ohlc into the increment buffer set + # to update and shift every second sampler.ohlcv_shms.setdefault( 1, [] ).append(rt_shm) - ohlckeys = ['open', 'high', 'low', 'close'] - - # set the rt (hft) shm array as append only - # (for now). - rt_shm._first.value = 0 - rt_shm._last.value = 0 - - # push last sample from history to rt buffer just as a filler datum - # but we don't want a history sized datum outlier so set vlm to zero - # and ohlc to the close value. - rt_shm.push(hist_shm.array[-2:-1]) - - rt_shm.array[ohlckeys] = hist_shm.array['close'][-1] - rt_shm._array['volume'] = 0 task_status.started() @@ -1016,16 +973,12 @@ async def allocate_persistent_feed( # the backend will indicate when real-time quotes have begun. await feed_is_live.wait() - # start shm incrementer task for OHLC style sampling - # at the current detected step period. - times = hist_shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - sampler.ohlcv_shms.setdefault(delay_s, []).append(hist_shm) + # insert 1m ohlc into the increment buffer set + # to shift every 60s. + sampler.ohlcv_shms.setdefault(60, []).append(hist_shm) # create buffer a single incrementer task broker backend # (aka `brokerd`) using the lowest sampler period. - # await tractor.breakpoint() - # for delay_s in sampler.ohlcv_shms: if sampler.incrementers.get(_default_delay_s) is None: await bus.start_task( increment_ohlc_buffer, @@ -1036,7 +989,18 @@ async def allocate_persistent_feed( 'shm_write_opts', {} ).get('sum_tick_vlm', True) - # start sample loop + # NOTE: if no high-freq sampled data has (yet) been loaded, + # seed the buffer with a history datum - this is most handy + # for many backends which don't sample @ 1s OHLC but do have + # slower data such as 1m OHLC. + if not len(rt_shm.array): + rt_shm.push(hist_shm.array[-3:-1]) + ohlckeys = ['open', 'high', 'low', 'close'] + rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] + rt_shm.array['volume'][-2] = 0 + + # start sample loop and shm incrementer task for OHLC style sampling + # at the above registered step periods. try: await sample_and_broadcast( bus, @@ -1224,7 +1188,8 @@ class Feed: stream: trio.abc.ReceiveChannel[dict[str, Any]] status: dict[str, Any] - startup_hist_index: int = 0 + izero_hist: int = 0 + izero_rt: int = 0 throttle_rate: Optional[int] = None @@ -1242,7 +1207,7 @@ class Feed: async def receive(self) -> dict: return await self.stream.receive() - @asynccontextmanager + @acm async def index_stream( self, delay_s: int = 1, @@ -1303,7 +1268,7 @@ class Feed: ) -@asynccontextmanager +@acm async def install_brokerd_search( portal: tractor.Portal, @@ -1321,7 +1286,10 @@ async def install_brokerd_search( async def search(text: str) -> dict[str, Any]: await stream.send(text) - return await stream.receive() + try: + return await stream.receive() + except trio.EndOfChannel: + return {} async with _search.register_symbol_search( @@ -1337,7 +1305,7 @@ async def install_brokerd_search( yield -@asynccontextmanager +@acm async def open_feed( fqsns: list[str], @@ -1413,7 +1381,8 @@ async def open_feed( stream=stream, _portal=portal, status={}, - startup_hist_index=init['startup_hist_index'], + izero_hist=init['izero_hist'], + izero_rt=init['izero_rt'], throttle_rate=tick_throttle, ) @@ -1465,7 +1434,7 @@ async def open_feed( await ctx.cancel() -@asynccontextmanager +@acm async def maybe_open_feed( fqsns: list[str], diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 7f39ad88..3edc1718 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -387,50 +387,57 @@ class Storage: async def load( self, fqsn: str, + timeframe: int, ) -> tuple[ - dict[int, np.ndarray], # timeframe (in secs) to series + np.ndarray, # timeframe sampled array-series Optional[datetime], # first dt Optional[datetime], # last dt ]: first_tsdb_dt, last_tsdb_dt = None, None - tsdb_arrays = await self.read_ohlcv( + hist = await self.read_ohlcv( fqsn, # on first load we don't need to pull the max # history per request size worth. limit=3000, + timeframe=timeframe, ) - log.info(f'Loaded tsdb history {tsdb_arrays}') + log.info(f'Loaded tsdb history {hist}') - if tsdb_arrays: - fastest = list(tsdb_arrays.values())[0] - times = fastest['Epoch'] + if len(hist): + times = hist['Epoch'] first, last = times[0], times[-1] first_tsdb_dt, last_tsdb_dt = map( pendulum.from_timestamp, [first, last] ) - return tsdb_arrays, first_tsdb_dt, last_tsdb_dt + return ( + hist, # array-data + first_tsdb_dt, # start of query-frame + last_tsdb_dt, # most recent + ) async def read_ohlcv( self, fqsn: str, - timeframe: Optional[Union[int, str]] = None, + timeframe: int | str, end: Optional[int] = None, limit: int = int(800e3), - ) -> tuple[ - MarketstoreClient, - Union[dict, np.ndarray] + ) -> dict[ + int, + Union[dict, np.ndarray], ]: + client = self.client syms = await client.list_symbols() if fqsn not in syms: return {} - tfstr = tf_in_1s[1] + # use the provided timeframe or 1s by default + tfstr = tf_in_1s.get(timeframe, tf_in_1s[1]) params = Params( symbols=fqsn, @@ -444,58 +451,68 @@ class Storage: limit=limit, ) - if timeframe is None: - log.info(f'starting {fqsn} tsdb granularity scan..') - # loop through and try to find highest granularity - for tfstr in tf_in_1s.values(): - try: - log.info(f'querying for {tfstr}@{fqsn}') - params.set('timeframe', tfstr) - result = await client.query(params) - break - - except purerpc.grpclib.exceptions.UnknownError: - # XXX: this is already logged by the container and - # thus shows up through `marketstored` logs relay. - # log.warning(f'{tfstr}@{fqsn} not found') - continue - else: - return {} - - else: + try: result = await client.query(params) + except purerpc.grpclib.exceptions.UnknownError: + # indicate there is no history for this timeframe + return {} # TODO: it turns out column access on recarrays is actually slower: # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # it might make sense to make these structured arrays? - # Fill out a `numpy` array-results map - arrays = {} - for fqsn, data_set in result.by_symbols().items(): - arrays.setdefault(fqsn, {})[ - tf_in_1s.inverse[data_set.timeframe] - ] = data_set.array + data_set = result.by_symbols()[fqsn] + array = data_set.array - return arrays[fqsn][timeframe] if timeframe else arrays[fqsn] + # XXX: ensure sample rate is as expected + time = data_set.array['Epoch'] + if len(time) > 1: + time_step = time[-1] - time[-2] + ts = tf_in_1s.inverse[data_set.timeframe] + + if time_step != ts: + log.warning( + f'MKTS BUG: wrong timeframe loaded: {time_step}' + 'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG' + f'WIPING HISTORY FOR {ts}s' + ) + await self.delete_ts(fqsn, timeframe) + + # try reading again.. + return await self.read_ohlcv( + fqsn, + timeframe, + end, + limit, + ) + + return array async def delete_ts( self, key: str, timeframe: Optional[Union[int, str]] = None, + fmt: str = 'OHLCV', ) -> bool: client = self.client syms = await client.list_symbols() print(syms) - # if key not in syms: - # raise KeyError(f'`{fqsn}` table key not found?') + if key not in syms: + raise KeyError(f'`{key}` table key not found in\n{syms}?') - return await client.destroy(tbk=key) + tbk = mk_tbk(( + key, + tf_in_1s.get(timeframe, tf_in_1s[60]), + fmt, + )) + return await client.destroy(tbk=tbk) async def write_ohlcv( self, fqsn: str, ohlcv: np.ndarray, + timeframe: int, append_and_duplicate: bool = True, limit: int = int(800e3), @@ -519,17 +536,18 @@ class Storage: m, r = divmod(len(mkts_array), limit) + tfkey = tf_in_1s[timeframe] for i in range(m, 1): to_push = mkts_array[i-1:i*limit] # write to db resp = await self.client.write( to_push, - tbk=f'{fqsn}/1Sec/OHLCV', + tbk=f'{fqsn}/{tfkey}/OHLCV', # NOTE: will will append duplicates # for the same timestamp-index. - # TODO: pre deduplicate? + # TODO: pre-deduplicate? isvariablelength=append_and_duplicate, ) @@ -548,7 +566,7 @@ class Storage: # write to db resp = await self.client.write( to_push, - tbk=f'{fqsn}/1Sec/OHLCV', + tbk=f'{fqsn}/{tfkey}/OHLCV', # NOTE: will will append duplicates # for the same timestamp-index. @@ -577,6 +595,7 @@ class Storage: # def delete_range(self, start_dt, end_dt) -> None: # ... + @acm async def open_storage_client( fqsn: str, @@ -642,8 +661,8 @@ async def tsdb_history_update( ): profiler(f'opened feed for {fqsn}') - to_append = feed.shm.array - to_prepend = None + # to_append = feed.hist_shm.array + # to_prepend = None if fqsn: symbol = feed.symbols.get(fqsn) @@ -651,21 +670,21 @@ async def tsdb_history_update( fqsn = symbol.front_fqsn() # diff db history with shm and only write the missing portions - ohlcv = feed.shm.array + # ohlcv = feed.hist_shm.array # TODO: use pg profiler - tsdb_arrays = await storage.read_ohlcv(fqsn) - # hist diffing - if tsdb_arrays: - for secs in (1, 60): - ts = tsdb_arrays.get(secs) - if ts is not None and len(ts): - # these aren't currently used but can be referenced from - # within the embedded ipython shell below. - to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]] - to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]] + # for secs in (1, 60): + # tsdb_array = await storage.read_ohlcv( + # fqsn, + # timeframe=timeframe, + # ) + # # hist diffing: + # # these aren't currently used but can be referenced from + # # within the embedded ipython shell below. + # to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]] + # to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]] - profiler('Finished db arrays diffs') + # profiler('Finished db arrays diffs') syms = await storage.client.list_symbols() log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 4d24f5ca..b20b99c0 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -49,7 +49,9 @@ from ._fsp import ( has_vlm, open_vlm_displays, ) -from ..data._sharedmem import ShmArray +from ..data._sharedmem import ( + ShmArray, +) from ..data._source import tf_in_1s from ._forms import ( FieldsForm, @@ -249,14 +251,14 @@ async def graphics_update_loop( linked: LinkedSplits = godwidget.rt_linked display_rate = godwidget.window.current_screen().refreshRate() - chart = linked.chart + fast_chart = linked.chart hist_chart = godwidget.hist_linked.chart ohlcv = feed.rt_shm hist_ohlcv = feed.hist_shm # update last price sticky - last_price_sticky = chart._ysticks[chart.name] + last_price_sticky = fast_chart._ysticks[fast_chart.name] last_price_sticky.update_from_data( *ohlcv.array[-1][['index', 'close']] ) @@ -268,7 +270,7 @@ async def graphics_update_loop( maxmin = partial( chart_maxmin, - chart, + fast_chart, ohlcv, vlm_chart, ) @@ -282,15 +284,15 @@ async def graphics_update_loop( last, volume = ohlcv.array[-1][['close', 'volume']] - symbol = chart.linked.symbol + symbol = fast_chart.linked.symbol l1 = L1Labels( - chart, + fast_chart, # determine precision/decimal lengths digits=symbol.tick_size_digits, size_digits=symbol.lot_size_digits, ) - chart._l1_labels = l1 + fast_chart._l1_labels = l1 # TODO: # - in theory we should be able to read buffer data faster @@ -300,10 +302,10 @@ async def graphics_update_loop( # levels this might be dark volume we need to # present differently -> likely dark vlm - tick_size = chart.linked.symbol.tick_size + tick_size = fast_chart.linked.symbol.tick_size tick_margin = 3 * tick_size - chart.show() + fast_chart.show() last_quote = time.time() i_last = ohlcv.index @@ -313,7 +315,7 @@ async def graphics_update_loop( 'maxmin': maxmin, 'ohlcv': ohlcv, 'hist_ohlcv': hist_ohlcv, - 'chart': chart, + 'chart': fast_chart, 'last_price_sticky': last_price_sticky, 'hist_last_price_sticky': hist_last_price_sticky, 'l1': l1, @@ -333,7 +335,7 @@ async def graphics_update_loop( ds.vlm_chart = vlm_chart ds.vlm_sticky = vlm_sticky - chart.default_view() + fast_chart.default_view() # TODO: probably factor this into some kinda `DisplayState` # API that can be reused at least in terms of pulling view @@ -410,16 +412,16 @@ async def graphics_update_loop( last_quote = time.time() # chart isn't active/shown so skip render cycle and pause feed(s) - if chart.linked.isHidden(): + if fast_chart.linked.isHidden(): # print('skipping update') - chart.pause_all_feeds() + fast_chart.pause_all_feeds() continue - ic = chart.view._ic - if ic: - chart.pause_all_feeds() - await ic.wait() - chart.resume_all_feeds() + # ic = fast_chart.view._ic + # if ic: + # fast_chart.pause_all_feeds() + # await ic.wait() + # fast_chart.resume_all_feeds() # sync call to update all graphics/UX components. graphics_update_cycle(ds) @@ -502,6 +504,7 @@ def graphics_update_cycle( or trigger_all ): chart.increment_view(steps=i_diff) + chart.view._set_yrange(yrange=(mn, mx)) if vlm_chart: vlm_chart.increment_view(steps=i_diff) @@ -806,6 +809,140 @@ def graphics_update_cycle( flow.draw_last(array_key=curve_name) +async def link_views_with_region( + rt_chart: ChartPlotWidget, + hist_chart: ChartPlotWidget, + feed: Feed, + +) -> None: + + # these value are be only pulled once during shm init/startup + izero_hist = feed.izero_hist + izero_rt = feed.izero_rt + + # Add the LinearRegionItem to the ViewBox, but tell the ViewBox + # to exclude this item when doing auto-range calculations. + rt_pi = rt_chart.plotItem + hist_pi = hist_chart.plotItem + + region = pg.LinearRegionItem( + movable=False, + # color scheme that matches sidepane styling + pen=pg.mkPen(hcolor('gunmetal')), + brush=pg.mkBrush(hcolor('default_darkest')), + ) + region.setZValue(10) # put linear region "in front" in layer terms + + hist_pi.addItem(region, ignoreBounds=True) + + flow = rt_chart._flows[hist_chart.name] + assert flow + + # XXX: no idea why this doesn't work but it's causing + # a weird placement of the region on the way-far-left.. + # region.setClipItem(flow.graphics) + + # poll for datums load and timestep detection + for _ in range(100): + try: + _, _, ratio = feed.get_ds_info() + break + except IndexError: + await trio.sleep(0.01) + continue + else: + raise RuntimeError( + 'Failed to detect sampling periods from shm!?') + + # sampling rate transform math: + # ----------------------------- + # define the fast chart to slow chart as a linear mapping + # over the fast index domain `i` to the slow index domain + # `j` as: + # + # j = i - i_offset + # ------------ + j_offset + # j/i + # + # conversely the inverse function is: + # + # i = j/i * (j - j_offset) + i_offset + # + # Where `j_offset` is our ``izero_hist`` and `i_offset` is our + # `izero_rt`, the ``ShmArray`` offsets which correspond to the + # indexes in each array where the "current" time is indexed at init. + # AKA the index where new data is "appended to" and historical data + # if "prepended from". + # + # more practically (and by default) `i` is normally an index + # into 1s samples and `j` is an index into 60s samples (aka 1m). + # in the below handlers ``ratio`` is the `j/i` and ``mn``/``mx`` + # are the low and high index input from the source index domain. + + def update_region_from_pi( + window, + viewRange: tuple[tuple, tuple], + is_manual: bool = True, + + ) -> None: + # put linear region "in front" in layer terms + region.setZValue(10) + + # set the region on the history chart + # to the range currently viewed in the + # HFT/real-time chart. + mn, mx = viewRange[0] + ds_mn = (mn - izero_rt)/ratio + ds_mx = (mx - izero_rt)/ratio + lhmn = ds_mn + izero_hist + lhmx = ds_mx + izero_hist + # print( + # f'rt_view_range: {(mn, mx)}\n' + # f'ds_mn, ds_mx: {(ds_mn, ds_mx)}\n' + # f'lhmn, lhmx: {(lhmn, lhmx)}\n' + # ) + region.setRegion(( + lhmn, + lhmx, + )) + + # TODO: if we want to have the slow chart adjust range to + # match the fast chart's selection -> results in the + # linear region expansion never can go "outside of view". + # hmn, hmx = hvr = hist_chart.view.state['viewRange'][0] + # print((hmn, hmx)) + # if ( + # hvr + # and (lhmn < hmn or lhmx > hmx) + # ): + # hist_pi.setXRange( + # lhmn, + # lhmx, + # padding=0, + # ) + # hist_linked.graphics_cycle() + + # connect region to be updated on plotitem interaction. + rt_pi.sigRangeChanged.connect(update_region_from_pi) + + def update_pi_from_region(): + region.setZValue(10) + mn, mx = region.getRegion() + # print(f'region_x: {(mn, mx)}') + rt_pi.setXRange( + ((mn - izero_hist) * ratio) + izero_rt, + ((mx - izero_hist) * ratio) + izero_rt, + padding=0, + ) + + # TODO BUG XXX: seems to cause a real perf hit and a recursion error + # (but used to work before generalizing for 1s ohlc offset?).. + # something to do with the label callback handlers? + + # region.sigRegionChanged.connect(update_pi_from_region) + # region.sigRegionChangeFinished.connect(update_pi_from_region) + + async def display_symbol_data( godwidget: GodWidget, provider: str, @@ -850,10 +987,6 @@ async def display_symbol_data( ohlcv: ShmArray = feed.rt_shm hist_ohlcv: ShmArray = feed.hist_shm - # this value needs to be pulled once and only once during - # startup - end_index = feed.startup_hist_index - symbol = feed.symbols[sym] fqsn = symbol.front_fqsn() @@ -917,91 +1050,6 @@ async def display_symbol_data( # add_label=False, # ) - # Add the LinearRegionItem to the ViewBox, but tell the ViewBox - # to exclude this item when doing auto-range calculations. - rt_pi = chart.plotItem - hist_pi = hist_chart.plotItem - region = pg.LinearRegionItem( - # color scheme that matches sidepane styling - pen=pg.mkPen(hcolor('gunmetal')), - brush=pg.mkBrush(hcolor('default_darkest')), - ) - region.setZValue(10) # put linear region "in front" in layer terms - hist_pi.addItem(region, ignoreBounds=True) - flow = chart._flows[hist_chart.name] - assert flow - # XXX: no idea why this doesn't work but it's causing - # a weird placement of the region on the way-far-left.. - # region.setClipItem(flow.graphics) - - # poll for datums load and timestep detection - for _ in range(100): - try: - _, _, ratio = feed.get_ds_info() - break - except IndexError: - await trio.sleep(0.01) - continue - else: - raise RuntimeError( - 'Failed to detect sampling periods from shm!?') - - def update_pi_from_region(): - region.setZValue(10) - mn, mx = region.getRegion() - # print(f'region_x: {(mn, mx)}') - - # XXX: seems to cause a real perf hit? - rt_pi.setXRange( - (mn - end_index) * ratio, - (mx - end_index) * ratio, - padding=0, - ) - - region.sigRegionChanged.connect(update_pi_from_region) - - def update_region_from_pi( - window, - viewRange: tuple[tuple, tuple], - is_manual: bool = True, - - ) -> None: - # set the region on the history chart - # to the range currently viewed in the - # HFT/real-time chart. - mn, mx = viewRange[0] - ds_mn = mn/ratio - ds_mx = mx/ratio - # print( - # f'rt_view_range: {(mn, mx)}\n' - # f'ds_mn, ds_mx: {(ds_mn, ds_mx)}\n' - # ) - lhmn = ds_mn + end_index - lhmx = ds_mx + end_index - region.setRegion(( - lhmn, - lhmx, - )) - - # TODO: if we want to have the slow chart adjust range to - # match the fast chart's selection -> results in the - # linear region expansion never can go "outside of view". - # hmn, hmx = hvr = hist_chart.view.state['viewRange'][0] - # print((hmn, hmx)) - # if ( - # hvr - # and (lhmn < hmn or lhmx > hmx) - # ): - # hist_pi.setXRange( - # lhmn, - # lhmx, - # padding=0, - # ) - # hist_linked.graphics_cycle() - - # connect region to be updated on plotitem interaction. - rt_pi.sigRangeChanged.connect(update_region_from_pi) - # NOTE: we must immediately tell Qt to show the OHLC chart # to avoid a race where the subplots get added/shown to # the linked set *before* the main price chart! @@ -1069,6 +1117,12 @@ async def display_symbol_data( godwidget.resize_all() + await link_views_with_region( + chart, + hist_chart, + feed, + ) + mode: OrderMode async with ( open_order_mode( diff --git a/piker/ui/_notify.py b/piker/ui/_notify.py index c3da8d4b..c4804663 100644 --- a/piker/ui/_notify.py +++ b/piker/ui/_notify.py @@ -58,8 +58,11 @@ async def notify_from_ems_status_msg( if is_subproc: global _dbus_uid - if not _dbus_uid: - su = os.environ['SUDO_USER'] + su = os.environ.get('SUDO_USER') + if ( + not _dbus_uid + and su + ): # TODO: use `trio` but we need to use nursery.start() # to use pipes? diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 0b9558d7..fa8ecbce 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -517,7 +517,9 @@ class OrderMode: _, _, ratio = self.feed.get_ds_info() for i, chart in [ (arrow_index, self.chart), - (self.feed.startup_hist_index + round(arrow_index/ratio), + (self.feed.izero_hist + + + round((arrow_index - self.feed.izero_rt)/ratio), self.hist_chart) ]: self.arrows.add(