Compare commits
	
		
			23 Commits 
		
	
	
		
			gitea_feat
			...
			clears_tab
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | c53071e43a | |
|  | 41ffccc59e | |
|  | 55856f5e8b | |
|  | e8ab28e456 | |
|  | d2b6216994 | |
|  | eb743759a4 | |
|  | 74910ba56c | |
|  | 28535fa977 | |
|  | 1d7e642dbd | |
|  | 69be65237f | |
|  | 96f5a8abb8 | |
|  | 13e886c967 | |
|  | 2c6b832e50 | |
|  | d5c3124722 | |
|  | c939e75ef9 | |
|  | 844f8beaa7 | |
|  | ac7ba500be | |
|  | 3301619647 | |
|  | 7f498766af | |
|  | 9270391e01 | |
|  | 0c061d8957 | |
|  | 87f7a03dbe | |
|  | 1adf5fb9c0 | 
|  | @ -195,9 +195,8 @@ async def open_piker_runtime( | |||
| 
 | ||||
| ) -> Optional[tractor._portal.Portal]: | ||||
|     ''' | ||||
|     Start a piker actor who's runtime will automatically | ||||
|     sync with existing piker actors in local network | ||||
|     based on configuration. | ||||
|     Start a piker actor who's runtime will automatically sync with | ||||
|     existing piker actors on the local link based on configuration. | ||||
| 
 | ||||
|     ''' | ||||
|     global _services | ||||
|  |  | |||
|  | @ -388,6 +388,7 @@ async def open_history_client( | |||
|     async with open_cached_client('binance') as client: | ||||
| 
 | ||||
|         async def get_ohlc( | ||||
|             timeframe: float, | ||||
|             end_dt: Optional[datetime] = None, | ||||
|             start_dt: Optional[datetime] = None, | ||||
| 
 | ||||
|  |  | |||
|  | @ -43,6 +43,7 @@ from bidict import bidict | |||
| import trio | ||||
| import tractor | ||||
| from tractor import to_asyncio | ||||
| import pendulum | ||||
| import ib_insync as ibis | ||||
| from ib_insync.contract import ( | ||||
|     Contract, | ||||
|  | @ -52,6 +53,7 @@ from ib_insync.contract import ( | |||
| from ib_insync.order import Order | ||||
| from ib_insync.ticker import Ticker | ||||
| from ib_insync.objects import ( | ||||
|     BarDataList, | ||||
|     Position, | ||||
|     Fill, | ||||
|     Execution, | ||||
|  | @ -78,26 +80,11 @@ _time_units = { | |||
|     'h': ' hours', | ||||
| } | ||||
| 
 | ||||
| _time_frames = { | ||||
|     '1s': '1 Sec', | ||||
|     '5s': '5 Sec', | ||||
|     '30s': '30 Sec', | ||||
|     '1m': 'OneMinute', | ||||
|     '2m': 'TwoMinutes', | ||||
|     '3m': 'ThreeMinutes', | ||||
|     '4m': 'FourMinutes', | ||||
|     '5m': 'FiveMinutes', | ||||
|     '10m': 'TenMinutes', | ||||
|     '15m': 'FifteenMinutes', | ||||
|     '20m': 'TwentyMinutes', | ||||
|     '30m': 'HalfHour', | ||||
|     '1h': 'OneHour', | ||||
|     '2h': 'TwoHours', | ||||
|     '4h': 'FourHours', | ||||
|     'D': 'OneDay', | ||||
|     'W': 'OneWeek', | ||||
|     'M': 'OneMonth', | ||||
|     'Y': 'OneYear', | ||||
| _bar_sizes = { | ||||
|     1: '1 Sec', | ||||
|     60: '1 min', | ||||
|     60*60: '1 hour', | ||||
|     24*60*60: '1 day', | ||||
| } | ||||
| 
 | ||||
| _show_wap_in_history: bool = False | ||||
|  | @ -199,7 +186,8 @@ _adhoc_futes_set = { | |||
|     'lb.nymex',  # random len lumber | ||||
| 
 | ||||
|     # metals | ||||
|     'xauusd.cmdty',  # gold spot | ||||
|     # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 | ||||
|     'xauusd.cmdty',  # london gold spot ^ | ||||
|     'gc.nymex', | ||||
|     'mgc.nymex',  # micro | ||||
| 
 | ||||
|  | @ -257,14 +245,12 @@ _exch_skip_list = { | |||
|     'PSE', | ||||
| } | ||||
| 
 | ||||
| # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 | ||||
| 
 | ||||
| _enters = 0 | ||||
| 
 | ||||
| 
 | ||||
| def bars_to_np(bars: list) -> np.ndarray: | ||||
|     ''' | ||||
|     Convert a "bars list thing" (``BarsList`` type from ibis) | ||||
|     Convert a "bars list thing" (``BarDataList`` type from ibis) | ||||
|     into a numpy struct array. | ||||
| 
 | ||||
|     ''' | ||||
|  | @ -284,6 +270,27 @@ def bars_to_np(bars: list) -> np.ndarray: | |||
|     return nparr | ||||
| 
 | ||||
| 
 | ||||
| # NOTE: pacing violations exist for higher sample rates: | ||||
| # https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations | ||||
| # Also see note on duration limits being lifted on 1m+ periods, | ||||
| # but they say "use with discretion": | ||||
| # https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd | ||||
| _samplings: dict[int, tuple[str, str]] = { | ||||
|     1: ( | ||||
|         '1 secs', | ||||
|         f'{int(2e3)} S', | ||||
|         pendulum.duration(seconds=2e3), | ||||
|     ), | ||||
|     # TODO: benchmark >1 D duration on query to see if | ||||
|     # throughput can be made faster during backfilling. | ||||
|     60: ( | ||||
|         '1 min', | ||||
|         '1 D', | ||||
|         pendulum.duration(days=1), | ||||
|     ), | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| class Client: | ||||
|     ''' | ||||
|     IB wrapped for our broker backend API. | ||||
|  | @ -338,19 +345,32 @@ class Client: | |||
|         start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", | ||||
|         end_dt: Union[datetime, str] = "", | ||||
| 
 | ||||
|         sample_period_s: str = 1,  # ohlc sample period | ||||
|         period_count: int = int(2e3),  # <- max per 1s sample query | ||||
|         # ohlc sample period in seconds | ||||
|         sample_period_s: int = 1, | ||||
| 
 | ||||
|     ) -> list[dict[str, Any]]: | ||||
|         # optional "duration of time" equal to the | ||||
|         # length of the returned history frame. | ||||
|         duration: Optional[str] = None, | ||||
| 
 | ||||
|         **kwargs, | ||||
| 
 | ||||
|     ) -> tuple[BarDataList, np.ndarray, pendulum.Duration]: | ||||
|         ''' | ||||
|         Retreive OHLCV bars for a fqsn over a range to the present. | ||||
| 
 | ||||
|         ''' | ||||
|         # See API docs here: | ||||
|         # https://interactivebrokers.github.io/tws-api/historical_data.html | ||||
|         bars_kwargs = {'whatToShow': 'TRADES'} | ||||
|         bars_kwargs.update(kwargs) | ||||
|         bar_size, duration, dt_duration = _samplings[sample_period_s] | ||||
| 
 | ||||
|         global _enters | ||||
|         # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') | ||||
|         print(f'REQUESTING BARS {_enters} @ end={end_dt}') | ||||
|         print( | ||||
|             f"REQUESTING {duration}'s worth {bar_size} BARS\n" | ||||
|             f'{_enters} @ end={end_dt}"' | ||||
|         ) | ||||
| 
 | ||||
|         if not end_dt: | ||||
|             end_dt = '' | ||||
|  | @ -360,30 +380,20 @@ class Client: | |||
|         contract = (await self.find_contracts(fqsn))[0] | ||||
|         bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) | ||||
| 
 | ||||
|         # _min = min(2000*100, count) | ||||
|         bars = await self.ib.reqHistoricalDataAsync( | ||||
|             contract, | ||||
|             endDateTime=end_dt, | ||||
|             formatDate=2, | ||||
| 
 | ||||
|             # time history length values format: | ||||
|             # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` | ||||
| 
 | ||||
|             # OHLC sampling values: | ||||
|             # 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins, | ||||
|             # 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins, | ||||
|             # 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M | ||||
|             # barSizeSetting='1 secs', | ||||
|             barSizeSetting=bar_size, | ||||
| 
 | ||||
|             # durationStr='{count} S'.format(count=15000 * 5), | ||||
|             # durationStr='{count} D'.format(count=1), | ||||
|             # barSizeSetting='5 secs', | ||||
| 
 | ||||
|             durationStr='{count} S'.format(count=period_count), | ||||
|             # barSizeSetting='5 secs', | ||||
|             barSizeSetting='1 secs', | ||||
| 
 | ||||
|             # barSizeSetting='1 min', | ||||
|             # time history length values format: | ||||
|             # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` | ||||
|             durationStr=duration, | ||||
| 
 | ||||
|             # always use extended hours | ||||
|             useRTH=False, | ||||
|  | @ -394,11 +404,21 @@ class Client: | |||
|             # whatToShow='TRADES', | ||||
|         ) | ||||
|         if not bars: | ||||
|             # TODO: raise underlying error here | ||||
|             raise ValueError(f"No bars retreived for {fqsn}?") | ||||
|             # NOTE: there's 2 cases here to handle (and this should be | ||||
|             # read alongside the implementation of | ||||
|             # ``.reqHistoricalDataAsync()``): | ||||
|             # - no data is returned for the period likely due to | ||||
|             # a weekend, holiday or other non-trading period prior to | ||||
|             # ``end_dt`` which exceeds the ``duration``, | ||||
|             # - a timeout occurred in which case insync internals return | ||||
|             # an empty list thing with bars.clear()... | ||||
|             return [], np.empty(0), dt_duration | ||||
|             # TODO: we could maybe raise ``NoData`` instead if we | ||||
|             # rewrite the method in the first case? right now there's no | ||||
|             # way to detect a timeout. | ||||
| 
 | ||||
|         nparr = bars_to_np(bars) | ||||
|         return bars, nparr | ||||
|         return bars, nparr, dt_duration | ||||
| 
 | ||||
|     async def con_deats( | ||||
|         self, | ||||
|  | @ -463,7 +483,7 @@ class Client: | |||
|         self, | ||||
|         pattern: str, | ||||
|         # how many contracts to search "up to" | ||||
|         upto: int = 6, | ||||
|         upto: int = 16, | ||||
|         asdicts: bool = True, | ||||
| 
 | ||||
|     ) -> dict[str, ContractDetails]: | ||||
|  | @ -498,6 +518,16 @@ class Client: | |||
| 
 | ||||
|                 exch = tract.exchange | ||||
|                 if exch not in _exch_skip_list: | ||||
| 
 | ||||
|                     # try to lookup any contracts from our adhoc set | ||||
|                     # since often the exchange/venue is named slightly | ||||
|                     # different (eg. BRR.CMECRYPTO` instead of just | ||||
|                     # `.CME`). | ||||
|                     info = _adhoc_symbol_map.get(sym) | ||||
|                     if info: | ||||
|                         con_kwargs, bars_kwargs = info | ||||
|                         exch = con_kwargs['exchange'] | ||||
| 
 | ||||
|                     # try get all possible contracts for symbol as per, | ||||
|                     # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut | ||||
|                     con = ibis.Future( | ||||
|  | @ -1066,6 +1096,7 @@ async def load_aio_clients( | |||
|     # retry a few times to get the client going.. | ||||
|     connect_retries: int = 3, | ||||
|     connect_timeout: float = 0.5, | ||||
|     disconnect_on_exit: bool = True, | ||||
| 
 | ||||
| ) -> dict[str, Client]: | ||||
|     ''' | ||||
|  | @ -1207,6 +1238,7 @@ async def load_aio_clients( | |||
|     finally: | ||||
|         # TODO: for re-scans we'll want to not teardown clients which | ||||
|         # are up and stable right? | ||||
|         if disconnect_on_exit: | ||||
|             for acct, client in _accounts2clients.items(): | ||||
|                 log.info(f'Disconnecting {acct}@{client}') | ||||
|                 client.ib.disconnect() | ||||
|  |  | |||
|  | @ -305,7 +305,7 @@ async def update_ledger_from_api_trades( | |||
|         entry['listingExchange'] = pexch | ||||
| 
 | ||||
|     conf = get_config() | ||||
|     entries = trades_to_ledger_entries( | ||||
|     entries = api_trades_to_ledger_entries( | ||||
|         conf['accounts'].inverse, | ||||
|         trade_entries, | ||||
|     ) | ||||
|  | @ -362,7 +362,7 @@ async def update_and_audit_msgs( | |||
|                 # if ib reports a lesser pp it's not as bad since we can | ||||
|                 # presume we're at least not more in the shit then we | ||||
|                 # thought. | ||||
|                 if diff: | ||||
|                 if diff and pikersize: | ||||
|                     reverse_split_ratio = pikersize / ibsize | ||||
|                     split_ratio = 1/reverse_split_ratio | ||||
| 
 | ||||
|  | @ -372,6 +372,7 @@ async def update_and_audit_msgs( | |||
|                         entry = f'split_ratio = 1/{int(reverse_split_ratio)}' | ||||
| 
 | ||||
|                     raise ValueError( | ||||
|                     # log.error( | ||||
|                         f'POSITION MISMATCH ib <-> piker ledger:\n' | ||||
|                         f'ib: {ibppmsg}\n' | ||||
|                         f'piker: {msg}\n' | ||||
|  | @ -1122,18 +1123,16 @@ def norm_trade_records( | |||
|             continue | ||||
| 
 | ||||
|         # timestamping is way different in API records | ||||
|         dtstr = record.get('datetime') | ||||
|         date = record.get('date') | ||||
|         if not date: | ||||
|             # probably a flex record with a wonky non-std timestamp.. | ||||
|             date, ts = record['dateTime'].split(';') | ||||
|             dt = pendulum.parse(date) | ||||
|             ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' | ||||
|             tsdt = pendulum.parse(ts) | ||||
|             dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) | ||||
|         flex_dtstr = record.get('dateTime') | ||||
| 
 | ||||
|         else: | ||||
|             # epoch_dt = pendulum.from_timestamp(record.get('time')) | ||||
|             dt = pendulum.parse(date) | ||||
|         if dtstr or date: | ||||
|             dt = pendulum.parse(dtstr or date) | ||||
| 
 | ||||
|         elif flex_dtstr: | ||||
|             # probably a flex record with a wonky non-std timestamp.. | ||||
|             dt = parse_flex_dt(record['dateTime']) | ||||
| 
 | ||||
|         # special handling of symbol extraction from | ||||
|         # flex records using some ad-hoc schema parsing. | ||||
|  | @ -1182,41 +1181,29 @@ def norm_trade_records( | |||
|     return {r.tid: r for r in records} | ||||
| 
 | ||||
| 
 | ||||
| def trades_to_ledger_entries( | ||||
| def parse_flex_dt( | ||||
|     record: str, | ||||
| ) -> pendulum.datetime: | ||||
|     date, ts = record.split(';') | ||||
|     dt = pendulum.parse(date) | ||||
|     ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' | ||||
|     tsdt = pendulum.parse(ts) | ||||
|     return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) | ||||
| 
 | ||||
| 
 | ||||
| def api_trades_to_ledger_entries( | ||||
|     accounts: bidict, | ||||
|     trade_entries: list[object], | ||||
|     source_type: str = 'api', | ||||
| 
 | ||||
| ) -> dict: | ||||
|     ''' | ||||
|     Convert either of API execution objects or flex report | ||||
|     entry objects into ``dict`` form, pretty much straight up | ||||
|     without modification. | ||||
|     Convert API execution objects entry objects into ``dict`` form, | ||||
|     pretty much straight up without modification except add | ||||
|     a `pydatetime` field from the parsed timestamp. | ||||
| 
 | ||||
|     ''' | ||||
|     trades_by_account = {} | ||||
| 
 | ||||
|     for t in trade_entries: | ||||
|         if source_type == 'flex': | ||||
|             entry = t.__dict__ | ||||
| 
 | ||||
|             # XXX: LOL apparently ``toml`` has a bug | ||||
|             # where a section key error will show up in the write | ||||
|             # if you leave a table key as an `int`? So i guess | ||||
|             # cast to strs for all keys.. | ||||
| 
 | ||||
|             # oddly for some so-called "BookTrade" entries | ||||
|             # this field seems to be blank, no cuckin clue. | ||||
|             # trade['ibExecID'] | ||||
|             tid = str(entry.get('ibExecID') or entry['tradeID']) | ||||
|             # date = str(entry['tradeDate']) | ||||
| 
 | ||||
|             # XXX: is it going to cause problems if a account name | ||||
|             # get's lost? The user should be able to find it based | ||||
|             # on the actual exec history right? | ||||
|             acctid = accounts[str(entry['accountId'])] | ||||
| 
 | ||||
|         elif source_type == 'api': | ||||
|         # NOTE: example of schema we pull from the API client. | ||||
|         # { | ||||
|         #     'commissionReport': CommissionReport(... | ||||
|  | @ -1243,7 +1230,8 @@ def trades_to_ledger_entries( | |||
|         tid = str(entry['execId']) | ||||
|         dt = pendulum.from_timestamp(entry['time']) | ||||
|         # TODO: why isn't this showing seconds in the str? | ||||
|             entry['date'] = str(dt) | ||||
|         entry['pydatetime'] = dt | ||||
|         entry['datetime'] = str(dt) | ||||
|         acctid = accounts[entry['acctNumber']] | ||||
| 
 | ||||
|         if not tid: | ||||
|  | @ -1262,6 +1250,73 @@ def trades_to_ledger_entries( | |||
|             acctid, {} | ||||
|         )[tid] = entry | ||||
| 
 | ||||
|     # sort entries in output by python based datetime | ||||
|     for acctid in trades_by_account: | ||||
|         trades_by_account[acctid] = dict(sorted( | ||||
|             trades_by_account[acctid].items(), | ||||
|             key=lambda entry: entry[1].pop('pydatetime'), | ||||
|         )) | ||||
| 
 | ||||
|     return trades_by_account | ||||
| 
 | ||||
| 
 | ||||
| def flex_records_to_ledger_entries( | ||||
|     accounts: bidict, | ||||
|     trade_entries: list[object], | ||||
| 
 | ||||
| ) -> dict: | ||||
|     ''' | ||||
|     Convert flex report entry objects into ``dict`` form, pretty much | ||||
|     straight up without modification except add a `pydatetime` field | ||||
|     from the parsed timestamp. | ||||
| 
 | ||||
|     ''' | ||||
|     trades_by_account = {} | ||||
|     for t in trade_entries: | ||||
|         entry = t.__dict__ | ||||
| 
 | ||||
|         # XXX: LOL apparently ``toml`` has a bug | ||||
|         # where a section key error will show up in the write | ||||
|         # if you leave a table key as an `int`? So i guess | ||||
|         # cast to strs for all keys.. | ||||
| 
 | ||||
|         # oddly for some so-called "BookTrade" entries | ||||
|         # this field seems to be blank, no cuckin clue. | ||||
|         # trade['ibExecID'] | ||||
|         tid = str(entry.get('ibExecID') or entry['tradeID']) | ||||
|         # date = str(entry['tradeDate']) | ||||
| 
 | ||||
|         # XXX: is it going to cause problems if a account name | ||||
|         # get's lost? The user should be able to find it based | ||||
|         # on the actual exec history right? | ||||
|         acctid = accounts[str(entry['accountId'])] | ||||
| 
 | ||||
|         # probably a flex record with a wonky non-std timestamp.. | ||||
|         dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime']) | ||||
|         entry['datetime'] = str(dt) | ||||
| 
 | ||||
|         if not tid: | ||||
|             # this is likely some kind of internal adjustment | ||||
|             # transaction, likely one of the following: | ||||
|             # - an expiry event that will show a "book trade" indicating | ||||
|             #   some adjustment to cash balances: zeroing or itm settle. | ||||
|             # - a manual cash balance position adjustment likely done by | ||||
|             #   the user from the accounts window in TWS where they can | ||||
|             #   manually set the avg price and size: | ||||
|             #   https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST | ||||
|             log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') | ||||
|             continue | ||||
| 
 | ||||
|         trades_by_account.setdefault( | ||||
|             acctid, {} | ||||
|         )[tid] = entry | ||||
| 
 | ||||
|     for acctid in trades_by_account: | ||||
|         trades_by_account[acctid] = dict(sorted( | ||||
|             trades_by_account[acctid].items(), | ||||
|             key=lambda entry: entry[1]['pydatetime'], | ||||
|         )) | ||||
| 
 | ||||
|     return trades_by_account | ||||
| 
 | ||||
| 
 | ||||
|  | @ -1308,15 +1363,16 @@ def load_flex_trades( | |||
|     ln = len(trade_entries) | ||||
|     log.info(f'Loaded {ln} trades from flex query') | ||||
| 
 | ||||
|     trades_by_account = trades_to_ledger_entries( | ||||
|         # get reverse map to user account names | ||||
|         conf['accounts'].inverse, | ||||
|     trades_by_account = flex_records_to_ledger_entries( | ||||
|         conf['accounts'].inverse,  # reverse map to user account names | ||||
|         trade_entries, | ||||
|         source_type='flex', | ||||
|     ) | ||||
| 
 | ||||
|     ledger_dict: Optional[dict] = None | ||||
| 
 | ||||
|     for acctid in trades_by_account: | ||||
|         trades_by_id = trades_by_account[acctid] | ||||
| 
 | ||||
|         with open_trade_ledger('ib', acctid) as ledger_dict: | ||||
|             tid_delta = set(trades_by_id) - set(ledger_dict) | ||||
|             log.info( | ||||
|  | @ -1324,9 +1380,11 @@ def load_flex_trades( | |||
|                 f'{pformat(tid_delta)}' | ||||
|             ) | ||||
|             if tid_delta: | ||||
|                 ledger_dict.update( | ||||
|                     {tid: trades_by_id[tid] for tid in tid_delta} | ||||
|                 ) | ||||
|                 sorted_delta = dict(sorted( | ||||
|                     {tid: trades_by_id[tid] for tid in tid_delta}.items(), | ||||
|                     key=lambda entry: entry[1].pop('pydatetime'), | ||||
|                 )) | ||||
|                 ledger_dict.update(sorted_delta) | ||||
| 
 | ||||
|     return ledger_dict | ||||
| 
 | ||||
|  |  | |||
|  | @ -22,6 +22,7 @@ import asyncio | |||
| from contextlib import asynccontextmanager as acm | ||||
| from dataclasses import asdict | ||||
| from datetime import datetime | ||||
| from functools import partial | ||||
| from math import isnan | ||||
| import time | ||||
| from typing import ( | ||||
|  | @ -38,7 +39,6 @@ import tractor | |||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| 
 | ||||
| from piker.data._sharedmem import ShmArray | ||||
| from .._util import SymbolNotFound, NoData | ||||
| from .api import ( | ||||
|     # _adhoc_futes_set, | ||||
|  | @ -111,24 +111,54 @@ async def open_history_client( | |||
|     that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. | ||||
| 
 | ||||
|     ''' | ||||
|     # TODO: | ||||
|     # - add logic to handle tradable hours and only grab | ||||
|     #   valid bars in the range? | ||||
|     # - we want to avoid overrunning the underlying shm array buffer and | ||||
|     #   we should probably calc the number of calls to make depending on | ||||
|     #   that until we have the `marketstore` daemon in place in which case | ||||
|     #   the shm size will be driven by user config and available sys | ||||
|     #   memory. | ||||
| 
 | ||||
|     async with open_data_client() as proxy: | ||||
| 
 | ||||
|         max_timeout: float = 2. | ||||
|         mean: float = 0 | ||||
|         count: int = 0 | ||||
| 
 | ||||
|         async def get_hist( | ||||
|             timeframe: float, | ||||
|             end_dt: Optional[datetime] = None, | ||||
|             start_dt: Optional[datetime] = None, | ||||
| 
 | ||||
|         ) -> tuple[np.ndarray, str]: | ||||
|             nonlocal max_timeout, mean, count | ||||
| 
 | ||||
|             out, fails = await get_bars(proxy, symbol, end_dt=end_dt) | ||||
|             query_start = time.time() | ||||
|             out, timedout = await get_bars( | ||||
|                 proxy, | ||||
|                 symbol, | ||||
|                 timeframe, | ||||
|                 end_dt=end_dt, | ||||
|             ) | ||||
|             latency = time.time() - query_start | ||||
|             if ( | ||||
|                 not timedout | ||||
|                 # and latency <= max_timeout | ||||
|             ): | ||||
|                 count += 1 | ||||
|                 mean += latency / count | ||||
|                 print( | ||||
|                     f'HISTORY FRAME QUERY LATENCY: {latency}\n' | ||||
|                     f'mean: {mean}' | ||||
|                 ) | ||||
| 
 | ||||
|             # TODO: add logic here to handle tradable hours and only grab | ||||
|             # valid bars in the range | ||||
|             if out is None: | ||||
|                 # could be trying to retreive bars over weekend | ||||
|                 log.error(f"Can't grab bars starting at {end_dt}!?!?") | ||||
|                 raise NoData( | ||||
|                     f'{end_dt}', | ||||
|                     frame_size=2000, | ||||
|                     # frame_size=2000, | ||||
|                 ) | ||||
| 
 | ||||
|             bars, bars_array, first_dt, last_dt = out | ||||
|  | @ -145,7 +175,7 @@ async def open_history_client( | |||
|         # quite sure why.. needs some tinkering and probably | ||||
|         # a lookthrough of the ``ib_insync`` machinery, for eg. maybe | ||||
|         # we have to do the batch queries on the `asyncio` side? | ||||
|         yield get_hist, {'erlangs': 1, 'rate': 6} | ||||
|         yield get_hist, {'erlangs': 1, 'rate': 3} | ||||
| 
 | ||||
| 
 | ||||
| _pacing: str = ( | ||||
|  | @ -154,102 +184,19 @@ _pacing: str = ( | |||
| ) | ||||
| 
 | ||||
| 
 | ||||
| async def get_bars( | ||||
| 
 | ||||
| async def wait_on_data_reset( | ||||
|     proxy: MethodProxy, | ||||
|     fqsn: str, | ||||
|     reset_type: str = 'data', | ||||
|     timeout: float = 16, | ||||
| 
 | ||||
|     # blank to start which tells ib to look up the latest datum | ||||
|     end_dt: str = '', | ||||
|     task_status: TaskStatus[ | ||||
|         tuple[ | ||||
|             trio.CancelScope, | ||||
|             trio.Event, | ||||
|         ] | ||||
|     ] = trio.TASK_STATUS_IGNORED, | ||||
| ) -> bool: | ||||
| 
 | ||||
| ) -> (dict, np.ndarray): | ||||
|     ''' | ||||
|     Retrieve historical data from a ``trio``-side task using | ||||
|     a ``MethoProxy``. | ||||
| 
 | ||||
|     ''' | ||||
|     fails = 0 | ||||
|     bars: Optional[list] = None | ||||
|     first_dt: datetime = None | ||||
|     last_dt: datetime = None | ||||
| 
 | ||||
|     if end_dt: | ||||
|         last_dt = pendulum.from_timestamp(end_dt.timestamp()) | ||||
| 
 | ||||
|     for _ in range(10): | ||||
|         try: | ||||
|             out = await proxy.bars( | ||||
|                 fqsn=fqsn, | ||||
|                 end_dt=end_dt, | ||||
|             ) | ||||
|             if out: | ||||
|                 bars, bars_array = out | ||||
| 
 | ||||
|             else: | ||||
|                 await tractor.breakpoint() | ||||
| 
 | ||||
|             if bars_array is None: | ||||
|                 raise SymbolNotFound(fqsn) | ||||
| 
 | ||||
|             first_dt = pendulum.from_timestamp( | ||||
|                 bars[0].date.timestamp()) | ||||
| 
 | ||||
|             last_dt = pendulum.from_timestamp( | ||||
|                 bars[-1].date.timestamp()) | ||||
| 
 | ||||
|             time = bars_array['time'] | ||||
|             assert time[-1] == last_dt.timestamp() | ||||
|             assert time[0] == first_dt.timestamp() | ||||
|             log.info( | ||||
|                 f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' | ||||
|             ) | ||||
| 
 | ||||
|             return (bars, bars_array, first_dt, last_dt), fails | ||||
| 
 | ||||
|         except RequestError as err: | ||||
|             msg = err.message | ||||
| 
 | ||||
|             if 'No market data permissions for' in msg: | ||||
|                 # TODO: signalling for no permissions searches | ||||
|                 raise NoData( | ||||
|                     f'Symbol: {fqsn}', | ||||
|                 ) | ||||
| 
 | ||||
|             elif ( | ||||
|                 err.code == 162 and | ||||
|                 'HMDS query returned no data' in err.message | ||||
|             ): | ||||
|                 # XXX: this is now done in the storage mgmt layer | ||||
|                 # and we shouldn't implicitly decrement the frame dt | ||||
|                 # index since the upper layer may be doing so | ||||
|                 # concurrently and we don't want to be delivering frames | ||||
|                 # that weren't asked for. | ||||
|                 log.warning( | ||||
|                     f'NO DATA found ending @ {end_dt}\n' | ||||
|                 ) | ||||
| 
 | ||||
|                 # try to decrement start point and look further back | ||||
|                 # end_dt = last_dt = last_dt.subtract(seconds=2000) | ||||
| 
 | ||||
|                 raise NoData( | ||||
|                     f'Symbol: {fqsn}', | ||||
|                     frame_size=2000, | ||||
|                 ) | ||||
| 
 | ||||
|             # elif ( | ||||
|             #     err.code == 162 and | ||||
|             #     'Trading TWS session is connected from a different IP | ||||
|             #     address' in err.message | ||||
|             # ): | ||||
|             #     log.warning("ignoring ip address warning") | ||||
|             #     continue | ||||
| 
 | ||||
|             elif _pacing in msg: | ||||
| 
 | ||||
|                 log.warning( | ||||
|                     'History throttle rate reached!\n' | ||||
|                     'Resetting farms with `ctrl-alt-f` hack\n' | ||||
|                 ) | ||||
|     # TODO: we might have to put a task lock around this | ||||
|     # method.. | ||||
|     hist_ev = proxy.status_event( | ||||
|  | @ -265,150 +212,238 @@ async def get_bars( | |||
|     # live_ev = proxy.status_event( | ||||
|     #     'Market data farm connection is OK:usfuture' | ||||
|     # ) | ||||
| 
 | ||||
|     # try to wait on the reset event(s) to arrive, a timeout | ||||
|     # will trigger a retry up to 6 times (for now). | ||||
|                 tries: int = 2 | ||||
|                 timeout: float = 10 | ||||
| 
 | ||||
|                 # try 3 time with a data reset then fail over to | ||||
|                 # a connection reset. | ||||
|                 for i in range(1, tries): | ||||
|     done = trio.Event() | ||||
|     with trio.move_on_after(timeout) as cs: | ||||
| 
 | ||||
|         task_status.started((cs, done)) | ||||
| 
 | ||||
|         log.warning('Sending DATA RESET request') | ||||
|                     await data_reset_hack(reset_type='data') | ||||
|         res = await data_reset_hack(reset_type=reset_type) | ||||
| 
 | ||||
|                     with trio.move_on_after(timeout) as cs: | ||||
|                         for name, ev in [ | ||||
|                             # TODO: not sure if waiting on other events | ||||
|                             # is all that useful here or not. in theory | ||||
|                             # you could wait on one of the ones above | ||||
|                             # first to verify the reset request was | ||||
|                             # sent? | ||||
|                             ('history', hist_ev), | ||||
|                         ]: | ||||
|                             await ev.wait() | ||||
|                             log.info(f"{name} DATA RESET") | ||||
|                             break | ||||
| 
 | ||||
|                     if cs.cancelled_caught: | ||||
|                         fails += 1 | ||||
|                         log.warning( | ||||
|                             f'Data reset {name} timeout, retrying {i}.' | ||||
|                         ) | ||||
| 
 | ||||
|                         continue | ||||
|                 else: | ||||
| 
 | ||||
|                     log.warning('Sending CONNECTION RESET') | ||||
|                     res = await data_reset_hack(reset_type='connection') | ||||
|         if not res: | ||||
|             log.warning( | ||||
|                 'NO VNC DETECTED!\n' | ||||
|                 'Manually press ctrl-alt-f on your IB java app' | ||||
|             ) | ||||
|                         # break | ||||
|             done.set() | ||||
|             return False | ||||
| 
 | ||||
|                     with trio.move_on_after(timeout) as cs: | ||||
|                         for name, ev in [ | ||||
|         # TODO: not sure if waiting on other events | ||||
|                             # is all that useful here or not. in theory | ||||
|                             # you could wait on one of the ones above | ||||
|                             # first to verify the reset request was | ||||
|                             # sent? | ||||
|         # is all that useful here or not. | ||||
|         # - in theory you could wait on one of the ones above first | ||||
|         # to verify the reset request was sent? | ||||
|         # - we need the same for real-time quote feeds which can | ||||
|         # sometimes flake out and stop delivering.. | ||||
|         for name, ev in [ | ||||
|             ('history', hist_ev), | ||||
|         ]: | ||||
|             await ev.wait() | ||||
|             log.info(f"{name} DATA RESET") | ||||
|             done.set() | ||||
|             return True | ||||
| 
 | ||||
|                     if cs.cancelled_caught: | ||||
|                         fails += 1 | ||||
|                         log.warning('Data CONNECTION RESET timeout!?') | ||||
|     if cs.cancel_called: | ||||
|         log.warning( | ||||
|             'Data reset task canceled?' | ||||
|         ) | ||||
| 
 | ||||
|     done.set() | ||||
|     return False | ||||
| 
 | ||||
| 
 | ||||
| _data_resetter_task: trio.Task | None = None | ||||
| 
 | ||||
| 
 | ||||
| async def get_bars( | ||||
| 
 | ||||
|     proxy: MethodProxy, | ||||
|     fqsn: str, | ||||
|     timeframe: int, | ||||
| 
 | ||||
|     # blank to start which tells ib to look up the latest datum | ||||
|     end_dt: str = '', | ||||
| 
 | ||||
|     # TODO: make this more dynamic based on measured frame rx latency.. | ||||
|     timeout: float = 3,  # how long before we trigger a feed reset | ||||
| 
 | ||||
|     task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
| ) -> (dict, np.ndarray): | ||||
|     ''' | ||||
|     Retrieve historical data from a ``trio``-side task using | ||||
|     a ``MethoProxy``. | ||||
| 
 | ||||
|     ''' | ||||
|     global _data_resetter_task | ||||
| 
 | ||||
|     data_cs: trio.CancelScope | None = None | ||||
|     result: tuple[ | ||||
|         ibis.objects.BarDataList, | ||||
|         np.ndarray, | ||||
|         datetime, | ||||
|         datetime, | ||||
|     ] | None = None | ||||
|     result_ready = trio.Event() | ||||
| 
 | ||||
|     async def query(): | ||||
|         nonlocal result, data_cs, end_dt | ||||
|         while True: | ||||
|             try: | ||||
|                 out = await proxy.bars( | ||||
|                     fqsn=fqsn, | ||||
|                     end_dt=end_dt, | ||||
|                     sample_period_s=timeframe, | ||||
| 
 | ||||
|                     # ideally we cancel the request just before we | ||||
|                     # cancel on the ``trio``-side and trigger a data | ||||
|                     # reset hack.. the problem is there's no way (with | ||||
|                     # current impl) to detect a cancel case. | ||||
|                     # timeout=timeout, | ||||
|                 ) | ||||
|                 if out is None: | ||||
|                     raise NoData(f'{end_dt}') | ||||
| 
 | ||||
|                 bars, bars_array, dt_duration = out | ||||
| 
 | ||||
|                 if not bars: | ||||
|                     log.warning( | ||||
|                         f'History is blank for {dt_duration} from {end_dt}' | ||||
|                     ) | ||||
|                     end_dt -= dt_duration | ||||
|                     continue | ||||
| 
 | ||||
|                 if bars_array is None: | ||||
|                     raise SymbolNotFound(fqsn) | ||||
| 
 | ||||
|                 first_dt = pendulum.from_timestamp( | ||||
|                     bars[0].date.timestamp()) | ||||
| 
 | ||||
|                 last_dt = pendulum.from_timestamp( | ||||
|                     bars[-1].date.timestamp()) | ||||
| 
 | ||||
|                 time = bars_array['time'] | ||||
|                 assert time[-1] == last_dt.timestamp() | ||||
|                 assert time[0] == first_dt.timestamp() | ||||
|                 log.info( | ||||
|                     f'{len(bars)} bars retreived {first_dt} -> {last_dt}' | ||||
|                 ) | ||||
| 
 | ||||
|                 if data_cs: | ||||
|                     data_cs.cancel() | ||||
| 
 | ||||
|                 result = (bars, bars_array, first_dt, last_dt) | ||||
| 
 | ||||
|                 # signal data reset loop parent task | ||||
|                 result_ready.set() | ||||
| 
 | ||||
|                 return result | ||||
| 
 | ||||
|             except RequestError as err: | ||||
|                 msg = err.message | ||||
| 
 | ||||
|                 if 'No market data permissions for' in msg: | ||||
|                     # TODO: signalling for no permissions searches | ||||
|                     raise NoData( | ||||
|                         f'Symbol: {fqsn}', | ||||
|                     ) | ||||
| 
 | ||||
|                 elif err.code == 162: | ||||
|                     if 'HMDS query returned no data' in err.message: | ||||
|                         # XXX: this is now done in the storage mgmt | ||||
|                         # layer and we shouldn't implicitly decrement | ||||
|                         # the frame dt index since the upper layer may | ||||
|                         # be doing so concurrently and we don't want to | ||||
|                         # be delivering frames that weren't asked for. | ||||
|                         log.warning( | ||||
|                             f'NO DATA found ending @ {end_dt}\n' | ||||
|                         ) | ||||
| 
 | ||||
|                         # try to decrement start point and look further back | ||||
|                         # end_dt = end_dt.subtract(seconds=2000) | ||||
|                         end_dt = end_dt.subtract(days=1) | ||||
|                         print("SUBTRACTING DAY") | ||||
|                         continue | ||||
| 
 | ||||
|                     elif 'API historical data query cancelled' in err.message: | ||||
|                         log.warning( | ||||
|                             'Query cancelled by IB (:eyeroll:):\n' | ||||
|                             f'{err.message}' | ||||
|                         ) | ||||
|                         continue | ||||
|                     elif ( | ||||
|                         'Trading TWS session is connected from a different IP' | ||||
|                             in err.message | ||||
|                     ): | ||||
|                         log.warning("ignoring ip address warning") | ||||
|                         continue | ||||
| 
 | ||||
|                 # XXX: more or less same as above timeout case | ||||
|                 elif _pacing in msg: | ||||
|                     log.warning( | ||||
|                         'History throttle rate reached!\n' | ||||
|                         'Resetting farms with `ctrl-alt-f` hack\n' | ||||
|                     ) | ||||
| 
 | ||||
|                     # cancel any existing reset task | ||||
|                     if data_cs: | ||||
|                         data_cs.cancel() | ||||
| 
 | ||||
|                     # spawn new data reset task | ||||
|                     data_cs, reset_done = await nurse.start( | ||||
|                         partial( | ||||
|                             wait_on_data_reset, | ||||
|                             proxy, | ||||
|                             timeout=float('inf'), | ||||
|                             reset_type='connection' | ||||
|                         ) | ||||
|                     ) | ||||
|                     continue | ||||
| 
 | ||||
|                 else: | ||||
|                     raise | ||||
| 
 | ||||
|     return None, None | ||||
|     # else:  # throttle wasn't fixed so error out immediately | ||||
|     #     raise _err | ||||
|     # TODO: make this global across all history task/requests | ||||
|     # such that simultaneous symbol queries don't try data resettingn | ||||
|     # too fast.. | ||||
|     unset_resetter: bool = False | ||||
|     async with trio.open_nursery() as nurse: | ||||
| 
 | ||||
|         # start history request that we allow | ||||
|         # to run indefinitely until a result is acquired | ||||
|         nurse.start_soon(query) | ||||
| 
 | ||||
| async def backfill_bars( | ||||
|         # start history reset loop which waits up to the timeout | ||||
|         # for a result before triggering a data feed reset. | ||||
|         while not result_ready.is_set(): | ||||
| 
 | ||||
|     fqsn: str, | ||||
|     shm: ShmArray,  # type: ignore # noqa | ||||
|             with trio.move_on_after(timeout): | ||||
|                 await result_ready.wait() | ||||
|                 break | ||||
| 
 | ||||
|     # TODO: we want to avoid overrunning the underlying shm array buffer | ||||
|     # and we should probably calc the number of calls to make depending | ||||
|     # on that until we have the `marketstore` daemon in place in which | ||||
|     # case the shm size will be driven by user config and available sys | ||||
|     # memory. | ||||
|     count: int = 16, | ||||
| 
 | ||||
|     task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Fill historical bars into shared mem / storage afap. | ||||
| 
 | ||||
|     TODO: avoid pacing constraints: | ||||
|     https://github.com/pikers/piker/issues/128 | ||||
| 
 | ||||
|     ''' | ||||
|     # last_dt1 = None | ||||
|     last_dt = None | ||||
| 
 | ||||
|     with trio.CancelScope() as cs: | ||||
| 
 | ||||
|         async with open_data_client() as proxy: | ||||
| 
 | ||||
|             out, fails = await get_bars(proxy, fqsn) | ||||
| 
 | ||||
|             if out is None: | ||||
|                 raise RuntimeError("Could not pull currrent history?!") | ||||
| 
 | ||||
|             (first_bars, bars_array, first_dt, last_dt) = out | ||||
|             vlm = bars_array['volume'] | ||||
|             vlm[vlm < 0] = 0 | ||||
|             last_dt = first_dt | ||||
| 
 | ||||
|             # write historical data to buffer | ||||
|             shm.push(bars_array) | ||||
| 
 | ||||
|             task_status.started(cs) | ||||
| 
 | ||||
|             i = 0 | ||||
|             while i < count: | ||||
| 
 | ||||
|                 out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) | ||||
| 
 | ||||
|                 if out is None: | ||||
|                     # could be trying to retreive bars over weekend | ||||
|                     # TODO: add logic here to handle tradable hours and | ||||
|                     # only grab valid bars in the range | ||||
|                     log.error(f"Can't grab bars starting at {first_dt}!?!?") | ||||
| 
 | ||||
|                     # XXX: get_bars() should internally decrement dt by | ||||
|                     # 2k seconds and try again. | ||||
|             if _data_resetter_task: | ||||
|                 # don't double invoke the reset hack if another | ||||
|                 # requester task already has it covered. | ||||
|                 continue | ||||
|             else: | ||||
|                 _data_resetter_task = trio.lowlevel.current_task() | ||||
|                 unset_resetter = True | ||||
| 
 | ||||
|                 (first_bars, bars_array, first_dt, last_dt) = out | ||||
|                 # last_dt1 = last_dt | ||||
|                 # last_dt = first_dt | ||||
|             # spawn new data reset task | ||||
|             data_cs, reset_done = await nurse.start( | ||||
|                 partial( | ||||
|                     wait_on_data_reset, | ||||
|                     proxy, | ||||
|                     timeout=float('inf'), | ||||
|                 ) | ||||
|             ) | ||||
|             # sync wait on reset to complete | ||||
|             await reset_done.wait() | ||||
| 
 | ||||
|                 # volume cleaning since there's -ve entries, | ||||
|                 # wood luv to know what crookery that is.. | ||||
|                 vlm = bars_array['volume'] | ||||
|                 vlm[vlm < 0] = 0 | ||||
| 
 | ||||
|                 # TODO we should probably dig into forums to see what peeps | ||||
|                 # think this data "means" and then use it as an indicator of | ||||
|                 # sorts? dinkus has mentioned that $vlms for the day dont' | ||||
|                 # match other platforms nor the summary stat tws shows in | ||||
|                 # the monitor - it's probably worth investigating. | ||||
| 
 | ||||
|                 shm.push(bars_array, prepend=True) | ||||
|                 i += 1 | ||||
|     _data_resetter_task = None if unset_resetter else _data_resetter_task | ||||
|     return result, data_cs is not None | ||||
| 
 | ||||
| 
 | ||||
| asset_type_map = { | ||||
|  | @ -466,7 +501,9 @@ async def _setup_quote_stream( | |||
| 
 | ||||
|     to_trio.send_nowait(None) | ||||
| 
 | ||||
|     async with load_aio_clients() as accts2clients: | ||||
|     async with load_aio_clients( | ||||
|         disconnect_on_exit=False, | ||||
|     ) as accts2clients: | ||||
|         caccount_name, client = get_preferred_data_client(accts2clients) | ||||
|         contract = contract or (await client.find_contract(symbol)) | ||||
|         ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) | ||||
|  | @ -512,10 +549,11 @@ async def _setup_quote_stream( | |||
|                 # Manually do the dereg ourselves. | ||||
|                 teardown() | ||||
|             except trio.WouldBlock: | ||||
|                 log.warning( | ||||
|                     f'channel is blocking symbol feed for {symbol}?' | ||||
|                     f'\n{to_trio.statistics}' | ||||
|                 ) | ||||
|                 # log.warning( | ||||
|                 #     f'channel is blocking symbol feed for {symbol}?' | ||||
|                 #     f'\n{to_trio.statistics}' | ||||
|                 # ) | ||||
|                 pass | ||||
| 
 | ||||
|             # except trio.WouldBlock: | ||||
|             #     # for slow debugging purposes to avoid clobbering prompt | ||||
|  | @ -545,7 +583,8 @@ async def open_aio_quote_stream( | |||
|     from_aio = _quote_streams.get(symbol) | ||||
|     if from_aio: | ||||
| 
 | ||||
|         # if we already have a cached feed deliver a rx side clone to consumer | ||||
|         # if we already have a cached feed deliver a rx side clone | ||||
|         # to consumer | ||||
|         async with broadcast_receiver( | ||||
|             from_aio, | ||||
|             2**6, | ||||
|  | @ -736,17 +775,45 @@ async def stream_quotes( | |||
|             await trio.sleep_forever() | ||||
|             return  # we never expect feed to come up? | ||||
| 
 | ||||
|         async with open_aio_quote_stream( | ||||
|         cs: Optional[trio.CancelScope] = None | ||||
|         startup: bool = True | ||||
|         while ( | ||||
|             startup | ||||
|             or cs.cancel_called | ||||
|         ): | ||||
|             with trio.CancelScope() as cs: | ||||
|                 async with ( | ||||
|                     trio.open_nursery() as nurse, | ||||
|                     open_aio_quote_stream( | ||||
|                         symbol=sym, | ||||
|                         contract=con, | ||||
|         ) as stream: | ||||
| 
 | ||||
|                     ) as stream, | ||||
|                 ): | ||||
|                     # ugh, clear ticks since we've consumed them | ||||
|                     # (ahem, ib_insync is stateful trash) | ||||
|                     first_ticker.ticks = [] | ||||
| 
 | ||||
|                     # only on first entry at feed boot up | ||||
|                     if startup: | ||||
|                         startup = False | ||||
|                         task_status.started((init_msgs, first_quote)) | ||||
| 
 | ||||
|                     # start a stream restarter task which monitors the | ||||
|                     # data feed event. | ||||
|                     async def reset_on_feed(): | ||||
| 
 | ||||
|                         # TODO: this seems to be surpressed from the | ||||
|                         # traceback in ``tractor``? | ||||
|                         # assert 0 | ||||
| 
 | ||||
|                         rt_ev = proxy.status_event( | ||||
|                             'Market data farm connection is OK:usfarm' | ||||
|                         ) | ||||
|                         await rt_ev.wait() | ||||
|                         cs.cancel()  # cancel called should now be set | ||||
| 
 | ||||
|                     nurse.start_soon(reset_on_feed) | ||||
| 
 | ||||
|                     async with aclosing(stream): | ||||
|                         if syminfo.get('no_vlm', False): | ||||
| 
 | ||||
|  | @ -754,29 +821,31 @@ async def stream_quotes( | |||
|                             # include vlm data. | ||||
|                             atype = syminfo['asset_type'] | ||||
|                             log.info( | ||||
|                         f'Non-vlm asset {sym}@{atype}, skipping quote poll...' | ||||
|                                 f'No-vlm {sym}@{atype}, skipping quote poll' | ||||
|                             ) | ||||
| 
 | ||||
|                         else: | ||||
|                     # wait for real volume on feed (trading might be closed) | ||||
|                             # wait for real volume on feed (trading might be | ||||
|                             # closed) | ||||
|                             while True: | ||||
|                                 ticker = await stream.receive() | ||||
| 
 | ||||
|                         # for a real volume contract we rait for the first | ||||
|                         # "real" trade to take place | ||||
|                                 # for a real volume contract we rait for | ||||
|                                 # the first "real" trade to take place | ||||
|                                 if ( | ||||
|                                     # not calc_price | ||||
|                                     # and not ticker.rtTime | ||||
|                                     not ticker.rtTime | ||||
|                                 ): | ||||
|                             # spin consuming tickers until we get a real | ||||
|                             # market datum | ||||
|                                     # spin consuming tickers until we | ||||
|                                     # get a real market datum | ||||
|                                     log.debug(f"New unsent ticker: {ticker}") | ||||
|                                     continue | ||||
|                                 else: | ||||
|                             log.debug("Received first real volume tick") | ||||
|                             # ugh, clear ticks since we've consumed them | ||||
|                             # (ahem, ib_insync is truly stateful trash) | ||||
|                                     log.debug("Received first volume tick") | ||||
|                                     # ugh, clear ticks since we've | ||||
|                                     # consumed them (ahem, ib_insync is | ||||
|                                     # truly stateful trash) | ||||
|                                     ticker.ticks = [] | ||||
| 
 | ||||
|                                     # XXX: this works because we don't use | ||||
|  | @ -904,7 +973,14 @@ async def open_symbol_search( | |||
|                     except trio.WouldBlock: | ||||
|                         pass | ||||
| 
 | ||||
|                 if not pattern or pattern.isspace(): | ||||
|                 if ( | ||||
|                     not pattern | ||||
|                     or pattern.isspace() | ||||
| 
 | ||||
|                     # XXX: not sure if this is a bad assumption but it | ||||
|                     # seems to make search snappier? | ||||
|                     or len(pattern) < 1 | ||||
|                 ): | ||||
|                     log.warning('empty pattern received, skipping..') | ||||
| 
 | ||||
|                     # TODO: *BUG* if nothing is returned here the client | ||||
|  |  | |||
|  | @ -258,6 +258,7 @@ async def open_history_client( | |||
|         queries: int = 0 | ||||
| 
 | ||||
|         async def get_ohlc( | ||||
|             timeframe: float, | ||||
|             end_dt: Optional[datetime] = None, | ||||
|             start_dt: Optional[datetime] = None, | ||||
| 
 | ||||
|  |  | |||
|  | @ -138,25 +138,26 @@ def cli(ctx, brokers, loglevel, tl, configdir): | |||
| @click.pass_obj | ||||
| def services(config, tl, names): | ||||
| 
 | ||||
|     async def list_services(): | ||||
|     from .._daemon import open_piker_runtime | ||||
| 
 | ||||
|         async with tractor.get_arbiter( | ||||
|     async def list_services(): | ||||
|         async with ( | ||||
|             open_piker_runtime( | ||||
|                 name='service_query', | ||||
|                 loglevel=config['loglevel'] if tl else None, | ||||
|             ), | ||||
|             tractor.get_arbiter( | ||||
|                 *_tractor_kwargs['arbiter_addr'] | ||||
|         ) as portal: | ||||
|             ) as portal | ||||
|         ): | ||||
|             registry = await portal.run_from_ns('self', 'get_registry') | ||||
|             json_d = {} | ||||
|             for key, socket in registry.items(): | ||||
|                 # name, uuid = uid | ||||
|                 host, port = socket | ||||
|                 json_d[key] = f'{host}:{port}' | ||||
|             click.echo(f"{colorize_json(json_d)}") | ||||
| 
 | ||||
|     tractor.run( | ||||
|         list_services, | ||||
|         name='service_query', | ||||
|         loglevel=config['loglevel'] if tl else None, | ||||
|         arbiter_addr=_tractor_kwargs['arbiter_addr'], | ||||
|     ) | ||||
|     trio.run(list_services) | ||||
| 
 | ||||
| 
 | ||||
| def _load_clis() -> None: | ||||
|  |  | |||
|  | @ -21,16 +21,19 @@ This module is enabled for ``brokerd`` daemons. | |||
| 
 | ||||
| """ | ||||
| from __future__ import annotations | ||||
| from dataclasses import dataclass, field | ||||
| from datetime import datetime | ||||
| from contextlib import asynccontextmanager | ||||
| from dataclasses import ( | ||||
|     dataclass, | ||||
|     field, | ||||
| ) | ||||
| from datetime import datetime | ||||
| from functools import partial | ||||
| from pprint import pformat | ||||
| from types import ModuleType | ||||
| from typing import ( | ||||
|     Any, | ||||
|     AsyncIterator, Optional, | ||||
|     Generator, | ||||
|     AsyncIterator, | ||||
|     Callable, | ||||
|     Optional, | ||||
|     Awaitable, | ||||
|     TYPE_CHECKING, | ||||
|     Union, | ||||
|  | @ -39,7 +42,6 @@ from typing import ( | |||
| import trio | ||||
| from trio.abc import ReceiveChannel | ||||
| from trio_typing import TaskStatus | ||||
| import trimeter | ||||
| import tractor | ||||
| from tractor.trionics import maybe_open_context | ||||
| import pendulum | ||||
|  | @ -252,6 +254,7 @@ async def start_backfill( | |||
|     mod: ModuleType, | ||||
|     bfqsn: str, | ||||
|     shm: ShmArray, | ||||
|     timeframe: float, | ||||
| 
 | ||||
|     last_tsdb_dt: Optional[datetime] = None, | ||||
|     storage: Optional[Storage] = None, | ||||
|  | @ -262,11 +265,19 @@ async def start_backfill( | |||
| 
 | ||||
| ) -> int: | ||||
| 
 | ||||
|     hist: Callable[ | ||||
|         [int, datetime, datetime], | ||||
|         tuple[np.ndarray, str] | ||||
|     ] | ||||
|     config: dict[str, int] | ||||
|     async with mod.open_history_client(bfqsn) as (hist, config): | ||||
| 
 | ||||
|         # get latest query's worth of history all the way | ||||
|         # back to what is recorded in the tsdb | ||||
|         array, start_dt, end_dt = await hist(end_dt=None) | ||||
|         array, start_dt, end_dt = await hist( | ||||
|             timeframe, | ||||
|             end_dt=None, | ||||
|         ) | ||||
| 
 | ||||
|         times = array['time'] | ||||
| 
 | ||||
|  | @ -289,6 +300,9 @@ async def start_backfill( | |||
|         log.info(f'Pushing {to_push.size} to shm!') | ||||
|         shm.push(to_push) | ||||
| 
 | ||||
|         # TODO: *** THIS IS A BUG *** | ||||
|         # we need to only broadcast to subscribers for this fqsn.. | ||||
|         # otherwise all fsps get reset on every chart.. | ||||
|         for delay_s in sampler.subscribers: | ||||
|             await broadcast(delay_s) | ||||
| 
 | ||||
|  | @ -304,8 +318,8 @@ async def start_backfill( | |||
|                 raise ValueError( | ||||
|                     '`piker` only needs to support 1m and 1s sampling ' | ||||
|                     'but ur api is trying to deliver a longer ' | ||||
|                     f'timeframe of {step_size_s} ' 'seconds.. so ye, dun ' | ||||
|                     'do dat brudder.' | ||||
|                     f'timeframe of {step_size_s} seconds..\n' | ||||
|                     'So yuh.. dun do dat brudder.' | ||||
|                 ) | ||||
| 
 | ||||
|             # when no tsdb "last datum" is provided, we just load | ||||
|  | @ -319,96 +333,60 @@ async def start_backfill( | |||
|                 # do a decently sized backfill and load it into storage. | ||||
|                 periods = { | ||||
|                     1: {'days': 6}, | ||||
|                     60: {'years': 2}, | ||||
|                     60: {'years': 6}, | ||||
|                 } | ||||
| 
 | ||||
|             kwargs = periods[step_size_s] | ||||
|             last_tsdb_dt = start_dt.subtract(**kwargs) | ||||
| 
 | ||||
|         # configure async query throttling | ||||
|         erlangs = config.get('erlangs', 1) | ||||
|         rate = config.get('rate', 1) | ||||
|         frames = {} | ||||
| 
 | ||||
|         def iter_dts(start: datetime): | ||||
| 
 | ||||
|             while True: | ||||
| 
 | ||||
|                 hist_period = pendulum.period( | ||||
|                     start, | ||||
|                     last_tsdb_dt, | ||||
|                 ) | ||||
|                 dtrange = list(hist_period.range('seconds', frame_size_s)) | ||||
|                 log.debug(f'New datetime index:\n{pformat(dtrange)}') | ||||
| 
 | ||||
|                 for end_dt in dtrange: | ||||
|                     log.info(f'Yielding next frame start {end_dt}') | ||||
|                     start = yield end_dt | ||||
| 
 | ||||
|                     # if caller sends a new start date, reset to that | ||||
|                     if start is not None: | ||||
|                         log.warning(f'Resetting date range: {start}') | ||||
|                         break | ||||
|                 else: | ||||
|                     # from while | ||||
|                     return | ||||
| 
 | ||||
|         # pull new history frames until we hit latest | ||||
|         # already in the tsdb or a max count. | ||||
|         count = 0 | ||||
| 
 | ||||
|         # NOTE: when gaps are detected in the retreived history (by | ||||
|         # comparisor of the end - start versus the expected "frame size" | ||||
|         # in seconds) we need a way to alert the async request code not | ||||
|         # to continue to query for data "within the gap". This var is | ||||
|         # set in such cases such that further requests in that period | ||||
|         # are discarded and further we reset the "datetimem query frame | ||||
|         # index" in such cases to avoid needless noop requests. | ||||
|         earliest_end_dt: Optional[datetime] = start_dt | ||||
| 
 | ||||
|         async def get_ohlc_frame( | ||||
|             input_end_dt: datetime, | ||||
|             iter_dts_gen: Generator[datetime], | ||||
| 
 | ||||
|         ) -> np.ndarray: | ||||
| 
 | ||||
|             nonlocal count, frames, earliest_end_dt, frame_size_s | ||||
|             count += 1 | ||||
| 
 | ||||
|             if input_end_dt > earliest_end_dt: | ||||
|                 # if a request comes in for an inter-gap frame we | ||||
|                 # discard it since likely this request is still | ||||
|                 # lingering from before the reset of ``iter_dts()`` via | ||||
|                 # ``.send()`` below. | ||||
|                 log.info(f'Discarding request history ending @ {input_end_dt}') | ||||
| 
 | ||||
|                 # signals to ``trimeter`` loop to discard and | ||||
|                 # ``continue`` in it's schedule loop. | ||||
|                 return None | ||||
|         # rate = config.get('rate', 1) | ||||
|         # XXX: legacy from ``trimeter`` code but unsupported now. | ||||
|         # erlangs = config.get('erlangs', 1) | ||||
| 
 | ||||
|         # inline sequential loop where we simply pass the | ||||
|         # last retrieved start dt to the next request as | ||||
|         # it's end dt. | ||||
|         starts: set[datetime] = set() | ||||
|         while start_dt > last_tsdb_dt: | ||||
|             try: | ||||
|                 log.info( | ||||
|                     f'Requesting {step_size_s}s frame ending in {input_end_dt}' | ||||
|                     f'Requesting {step_size_s}s frame ending in {start_dt}' | ||||
|                 ) | ||||
|                 array, start_dt, end_dt = await hist(end_dt=input_end_dt) | ||||
|                 array, next_start_dt, end_dt = await hist( | ||||
|                     timeframe, | ||||
|                     end_dt=start_dt, | ||||
|                 ) | ||||
| 
 | ||||
|                 if next_start_dt in starts: | ||||
|                     start_dt = min(starts) | ||||
|                     print("SKIPPING DUPLICATE FRAME @ {next_start_dt}") | ||||
|                     continue | ||||
| 
 | ||||
|                 # only update new start point if new | ||||
|                 start_dt = next_start_dt | ||||
|                 starts.add(start_dt) | ||||
| 
 | ||||
|                 assert array['time'][0] == start_dt.timestamp() | ||||
| 
 | ||||
|             except NoData: | ||||
|                 # XXX: unhandled history gap (shouldn't happen?) | ||||
|                 log.warning( | ||||
|                     f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?' | ||||
|                     f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?' | ||||
|                 ) | ||||
|                 return None  # discard signal | ||||
|                 await tractor.breakpoint() | ||||
| 
 | ||||
|             except DataUnavailable as duerr: | ||||
|                 # broker is being a bish and we can't pull | ||||
|                 # any more.. | ||||
|                 log.warning('backend halted on data deliver !?!?') | ||||
|             except DataUnavailable:  # as duerr: | ||||
|                 # broker is being a bish and we can't pull any more.. | ||||
|                 log.warning( | ||||
|                     f'NO-MORE-DATA: backend {mod.name} halted history!?' | ||||
|                 ) | ||||
| 
 | ||||
|                 # ugh, what's a better way? | ||||
|                 # TODO: fwiw, we probably want a way to signal a throttle | ||||
|                 # condition (eg. with ib) so that we can halt the | ||||
|                 # request loop until the condition is resolved? | ||||
|                 return duerr | ||||
|                 return | ||||
| 
 | ||||
|             diff = end_dt - start_dt | ||||
|             frame_time_diff_s = diff.seconds | ||||
|  | @ -419,42 +397,12 @@ async def start_backfill( | |||
|                 # XXX: query result includes a start point prior to our | ||||
|                 # expected "frame size" and thus is likely some kind of | ||||
|                 # history gap (eg. market closed period, outage, etc.) | ||||
|                 # so indicate to the request loop that this gap is | ||||
|                 # expected by both, | ||||
|                 # - resetting the ``iter_dts()`` generator to start at | ||||
|                 #   the new start point delivered in this result | ||||
|                 # - setting the non-locally scoped ``earliest_end_dt`` | ||||
|                 #   to this new value so that the request loop doesn't | ||||
|                 #   get tripped up thinking there's an out of order | ||||
|                 #   request-result condition. | ||||
| 
 | ||||
|                 # so just report it to console for now. | ||||
|                 log.warning( | ||||
|                     f'History frame ending @ {end_dt} appears to have a gap:\n' | ||||
|                     f'{diff} ~= {frame_time_diff_s} seconds' | ||||
|                 ) | ||||
| 
 | ||||
|                 # reset dtrange gen to new start point | ||||
|                 try: | ||||
|                     next_end = iter_dts_gen.send(start_dt) | ||||
|                     log.info( | ||||
|                         f'Reset frame index to start at {start_dt}\n' | ||||
|                         f'Was at {next_end}' | ||||
|                     ) | ||||
| 
 | ||||
|                     # NOTE: manually set "earliest end datetime" index-value | ||||
|                     # to avoid the request loop getting confused about | ||||
|                     # new frames that are earlier in history - i.e. this | ||||
|                     # **is not** the case of out-of-order frames from | ||||
|                     # an async batch request. | ||||
|                     earliest_end_dt = start_dt | ||||
| 
 | ||||
|                 except StopIteration: | ||||
|                     # gen already terminated meaning we probably already | ||||
|                     # exhausted it via frame requests. | ||||
|                     log.info( | ||||
|                         "Datetime index already exhausted, can't reset.." | ||||
|                     ) | ||||
| 
 | ||||
|             to_push = diff_history( | ||||
|                 array, | ||||
|                 start_dt, | ||||
|  | @ -464,152 +412,11 @@ async def start_backfill( | |||
|             ln = len(to_push) | ||||
|             if ln: | ||||
|                 log.info(f'{ln} bars for {start_dt} -> {end_dt}') | ||||
|                 frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) | ||||
|                 return to_push, start_dt, end_dt | ||||
| 
 | ||||
|             else: | ||||
|                 log.warning( | ||||
|                     f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' | ||||
|                 ) | ||||
|                 return None | ||||
| 
 | ||||
|         # initial dt index starts at the start of the first query result | ||||
|         idts = iter_dts(start_dt) | ||||
| 
 | ||||
|         async with trimeter.amap( | ||||
|             partial( | ||||
|                 get_ohlc_frame, | ||||
|                 # we close in the ``iter_dt()`` gen in so we can send | ||||
|                 # reset signals as needed for gap dection in the | ||||
|                 # history. | ||||
|                 iter_dts_gen=idts, | ||||
|             ), | ||||
|             idts, | ||||
| 
 | ||||
|             capture_outcome=True, | ||||
|             include_value=True, | ||||
| 
 | ||||
|             # better technical names bruv... | ||||
|             max_at_once=erlangs, | ||||
|             max_per_second=rate, | ||||
| 
 | ||||
|         ) as outcomes: | ||||
| 
 | ||||
|             # Then iterate over the return values, as they become available | ||||
|             # (i.e., not necessarily in the original order) | ||||
|             async for input_end_dt, outcome in outcomes: | ||||
| 
 | ||||
|                 try: | ||||
|                     out = outcome.unwrap() | ||||
| 
 | ||||
|                     if out is None: | ||||
|                         # skip signal | ||||
|                         continue | ||||
| 
 | ||||
|                     elif isinstance(out, DataUnavailable): | ||||
|                         # no data available case signal.. so just kill | ||||
|                         # further requests and basically just stop | ||||
|                         # trying... | ||||
|                         break | ||||
| 
 | ||||
|                 except Exception: | ||||
|                     log.exception('uhh trimeter bail') | ||||
|                     raise | ||||
|                 else: | ||||
|                     to_push, start_dt, end_dt = out | ||||
| 
 | ||||
|                 if not len(to_push): | ||||
|                     # diff returned no new data (i.e. we probablyl hit | ||||
|                     # the ``last_tsdb_dt`` point). | ||||
|                     # TODO: raise instead? | ||||
|                     log.warning(f'No history for range {start_dt} -> {end_dt}') | ||||
|                     continue | ||||
| 
 | ||||
|                 # pipeline-style pull frames until we need to wait for | ||||
|                 # the next in order to arrive. | ||||
|                 # i = end_dts.index(input_end_dt) | ||||
|                 # print(f'latest end_dt {end_dt} found at index {i}') | ||||
| 
 | ||||
|                 epochs = list(reversed(sorted(frames))) | ||||
|                 for epoch in epochs: | ||||
| 
 | ||||
|                     start = shm.array['time'][0] | ||||
|                     last_shm_prepend_dt = pendulum.from_timestamp(start) | ||||
|                     earliest_frame_queue_dt = pendulum.from_timestamp(epoch) | ||||
| 
 | ||||
|                     diff = start - epoch | ||||
| 
 | ||||
|                     if diff < 0: | ||||
|                         log.warning( | ||||
|                             'Discarding out of order frame:\n' | ||||
|                             f'{earliest_frame_queue_dt}' | ||||
|                         ) | ||||
|                         frames.pop(epoch) | ||||
|                         continue | ||||
| 
 | ||||
|                     if diff > step_size_s: | ||||
| 
 | ||||
|                         if earliest_end_dt < earliest_frame_queue_dt: | ||||
|                             # XXX: an expected gap was encountered (see | ||||
|                             # logic in ``get_ohlc_frame()``, so allow | ||||
|                             # this frame through to the storage layer. | ||||
|                             log.warning( | ||||
|                                 f'Expected history gap of {diff}s:\n' | ||||
|                                 f'{earliest_frame_queue_dt} <- ' | ||||
|                                 f'{earliest_end_dt}' | ||||
|                             ) | ||||
| 
 | ||||
|                         elif ( | ||||
|                             erlangs > 1 | ||||
|                         ): | ||||
|                             # we don't yet have the next frame to push | ||||
|                             # so break back to the async request loop | ||||
|                             # while we wait for more async frame-results | ||||
|                             # to arrive. | ||||
|                             if len(frames) >= erlangs: | ||||
|                                 log.warning( | ||||
|                                     'Frame count in async-queue is greater ' | ||||
|                                     'then erlangs?\n' | ||||
|                                     'There seems to be a gap between:\n' | ||||
|                                     f'{earliest_frame_queue_dt} <- ' | ||||
|                                     f'{last_shm_prepend_dt}\n' | ||||
|                                     'Conducting manual call for frame ending: ' | ||||
|                                     f'{last_shm_prepend_dt}' | ||||
|                                 ) | ||||
|                                 ( | ||||
|                                     to_push, | ||||
|                                     start_dt, | ||||
|                                     end_dt, | ||||
|                                 ) = await get_ohlc_frame( | ||||
|                                     input_end_dt=last_shm_prepend_dt, | ||||
|                                     iter_dts_gen=idts, | ||||
|                                 ) | ||||
|                                 last_epoch = to_push['time'][-1] | ||||
|                                 diff = start - last_epoch | ||||
| 
 | ||||
|                                 if diff > step_size_s: | ||||
|                                     await tractor.breakpoint() | ||||
|                                     raise DataUnavailable( | ||||
|                                         'An awkward frame was found:\n' | ||||
|                                         f'{start_dt} -> {end_dt}:\n{to_push}' | ||||
|                                     ) | ||||
| 
 | ||||
|                                 else: | ||||
|                                     frames[last_epoch] = ( | ||||
|                                         to_push, start_dt, end_dt) | ||||
|                                     break | ||||
| 
 | ||||
|                             expect_end = pendulum.from_timestamp(start) | ||||
|                             expect_start = expect_end.subtract( | ||||
|                                 seconds=frame_size_s) | ||||
|                             log.warning( | ||||
|                                 'waiting on out-of-order history frame:\n' | ||||
|                                 f'{expect_end - expect_start}' | ||||
|                             ) | ||||
|                             break | ||||
| 
 | ||||
|                     to_push, start_dt, end_dt = frames.pop(epoch) | ||||
|                     ln = len(to_push) | ||||
| 
 | ||||
|             # bail gracefully on shm allocation overrun/full condition | ||||
|             try: | ||||
|  | @ -624,10 +431,6 @@ async def start_backfill( | |||
|                 f'Shm pushed {ln} frame:\n' | ||||
|                 f'{start_dt} -> {end_dt}' | ||||
|             ) | ||||
|                     # keep track of most recent "prepended" ``start_dt`` | ||||
|                     # both for detecting gaps and ensuring async | ||||
|                     # frame-result order. | ||||
|                     earliest_end_dt = start_dt | ||||
| 
 | ||||
|             if ( | ||||
|                 storage is not None | ||||
|  | @ -640,10 +443,12 @@ async def start_backfill( | |||
|                 await storage.write_ohlcv( | ||||
|                     f'{bfqsn}.{mod.name}',  # lul.. | ||||
|                     to_push, | ||||
|                     timeframe, | ||||
|                 ) | ||||
| 
 | ||||
|         # TODO: can we only trigger this if the respective | ||||
|         # history in "in view"?!? | ||||
| 
 | ||||
|         # XXX: extremely important, there can be no checkpoints | ||||
|         # in the block above to avoid entering new ``frames`` | ||||
|         # values while we're pipelining the current ones to | ||||
|  | @ -651,7 +456,9 @@ async def start_backfill( | |||
|         for delay_s in sampler.subscribers: | ||||
|             await broadcast(delay_s) | ||||
| 
 | ||||
|         # short-circuit (for now) | ||||
|         bf_done.set() | ||||
|         return | ||||
| 
 | ||||
| 
 | ||||
| async def manage_history( | ||||
|  | @ -660,6 +467,7 @@ async def manage_history( | |||
|     fqsn: str, | ||||
|     some_data_ready: trio.Event, | ||||
|     feed_is_live: trio.Event, | ||||
|     timeframe: float = 60,  # in seconds | ||||
| 
 | ||||
|     task_status: TaskStatus = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
|  | @ -683,10 +491,10 @@ async def manage_history( | |||
|         readonly=False, | ||||
|     ) | ||||
|     # TODO: history validation | ||||
|     if not opened: | ||||
|         raise RuntimeError( | ||||
|             "Persistent shm for sym was already open?!" | ||||
|         ) | ||||
|     # if not opened: | ||||
|     #     raise RuntimeError( | ||||
|     #         "Persistent shm for sym was already open?!" | ||||
|     #     ) | ||||
| 
 | ||||
|     rt_shm, opened = maybe_open_shm_array( | ||||
|         key=f'{fqsn}_rt', | ||||
|  | @ -698,10 +506,10 @@ async def manage_history( | |||
|         readonly=False, | ||||
|         size=3*_secs_in_day, | ||||
|     ) | ||||
|     if not opened: | ||||
|         raise RuntimeError( | ||||
|             "Persistent shm for sym was already open?!" | ||||
|         ) | ||||
|     # if not opened: | ||||
|     #     raise RuntimeError( | ||||
|     #         "Persistent shm for sym was already open?!" | ||||
|     #     ) | ||||
| 
 | ||||
|     log.info('Scanning for existing `marketstored`') | ||||
| 
 | ||||
|  | @ -726,7 +534,10 @@ async def manage_history( | |||
|             # shm backfiller approach below. | ||||
| 
 | ||||
|             # start history anal and load missing new data via backend. | ||||
|             series, _, last_tsdb_dt = await storage.load(fqsn) | ||||
|             series, _, last_tsdb_dt = await storage.load( | ||||
|                 fqsn, | ||||
|                 timeframe=timeframe, | ||||
|             ) | ||||
| 
 | ||||
|             broker, symbol, expiry = unpack_fqsn(fqsn) | ||||
|             ( | ||||
|  | @ -739,6 +550,7 @@ async def manage_history( | |||
|                     mod, | ||||
|                     bfqsn, | ||||
|                     hist_shm, | ||||
|                     timeframe=timeframe, | ||||
|                     last_tsdb_dt=last_tsdb_dt, | ||||
|                     tsdb_is_up=True, | ||||
|                     storage=storage, | ||||
|  | @ -769,7 +581,6 @@ async def manage_history( | |||
|             else: | ||||
|                 dt_diff_s = 0 | ||||
| 
 | ||||
|             # await trio.sleep_forever() | ||||
|             # TODO: see if there's faster multi-field reads: | ||||
|             # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields | ||||
|             # re-index  with a `time` and index field | ||||
|  | @ -804,6 +615,7 @@ async def manage_history( | |||
|                     series = await storage.read_ohlcv( | ||||
|                         fqsn, | ||||
|                         end=end, | ||||
|                         timeframe=timeframe, | ||||
|                     ) | ||||
|                     history = list(series.values()) | ||||
|                     fastest = history[0] | ||||
|  | @ -856,6 +668,7 @@ async def manage_history( | |||
|                 mod, | ||||
|                 bfqsn, | ||||
|                 hist_shm, | ||||
|                 timeframe=timeframe, | ||||
|             ) | ||||
|         ) | ||||
| 
 | ||||
|  |  | |||
|  | @ -387,6 +387,7 @@ class Storage: | |||
|     async def load( | ||||
|         self, | ||||
|         fqsn: str, | ||||
|         timeframe: int, | ||||
| 
 | ||||
|     ) -> tuple[ | ||||
|         dict[int, np.ndarray],  # timeframe (in secs) to series | ||||
|  | @ -400,12 +401,16 @@ class Storage: | |||
|             # on first load we don't need to pull the max | ||||
|             # history per request size worth. | ||||
|             limit=3000, | ||||
|             timeframe=timeframe, | ||||
|         ) | ||||
|         log.info(f'Loaded tsdb history {tsdb_arrays}') | ||||
| 
 | ||||
|         if tsdb_arrays: | ||||
|             fastest = list(tsdb_arrays.values())[0] | ||||
|             times = fastest['Epoch'] | ||||
|         if len(tsdb_arrays): | ||||
|             # fastest = list(tsdb_arrays.values())[0] | ||||
|             # slowest = list(tsdb_arrays.values())[-1] | ||||
|             hist = tsdb_arrays[timeframe] | ||||
| 
 | ||||
|             times = hist['Epoch'] | ||||
|             first, last = times[0], times[-1] | ||||
|             first_tsdb_dt, last_tsdb_dt = map( | ||||
|                 pendulum.from_timestamp, [first, last] | ||||
|  | @ -420,9 +425,9 @@ class Storage: | |||
|         end: Optional[int] = None, | ||||
|         limit: int = int(800e3), | ||||
| 
 | ||||
|     ) -> tuple[ | ||||
|         MarketstoreClient, | ||||
|         Union[dict, np.ndarray] | ||||
|     ) -> dict[ | ||||
|         int, | ||||
|         Union[dict, np.ndarray], | ||||
|     ]: | ||||
|         client = self.client | ||||
|         syms = await client.list_symbols() | ||||
|  | @ -430,7 +435,8 @@ class Storage: | |||
|         if fqsn not in syms: | ||||
|             return {} | ||||
| 
 | ||||
|         tfstr = tf_in_1s[1] | ||||
|         # use the provided timeframe or 1s by default | ||||
|         tfstr = tf_in_1s.get(timeframe, tf_in_1s[1]) | ||||
| 
 | ||||
|         params = Params( | ||||
|             symbols=fqsn, | ||||
|  | @ -463,39 +469,52 @@ class Storage: | |||
|                 return {} | ||||
| 
 | ||||
|         else: | ||||
|             params.set('timeframe', tfstr) | ||||
|             try: | ||||
|                 result = await client.query(params) | ||||
|             except purerpc.grpclib.exceptions.UnknownError: | ||||
|                 # indicate there is no history for this timeframe | ||||
|                 return {} | ||||
| 
 | ||||
|         # Fill out a `numpy` array-results map keyed by timeframe | ||||
|         arrays = {} | ||||
| 
 | ||||
|         # TODO: it turns out column access on recarrays is actually slower: | ||||
|         # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist | ||||
|         # it might make sense to make these structured arrays? | ||||
|         # Fill out a `numpy` array-results map | ||||
|         arrays = {} | ||||
|         for fqsn, data_set in result.by_symbols().items(): | ||||
|             arrays.setdefault(fqsn, {})[ | ||||
|                 tf_in_1s.inverse[data_set.timeframe] | ||||
|             ] = data_set.array | ||||
| 
 | ||||
|         return arrays[fqsn][timeframe] if timeframe else arrays[fqsn] | ||||
|         return arrays[fqsn] | ||||
| 
 | ||||
|     async def delete_ts( | ||||
|         self, | ||||
|         key: str, | ||||
|         timeframe: Optional[Union[int, str]] = None, | ||||
|         fmt: str = 'OHLCV', | ||||
| 
 | ||||
|     ) -> bool: | ||||
| 
 | ||||
|         client = self.client | ||||
|         syms = await client.list_symbols() | ||||
|         print(syms) | ||||
|         # if key not in syms: | ||||
|         #     raise KeyError(f'`{fqsn}` table key not found?') | ||||
|         if key not in syms: | ||||
|             raise KeyError(f'`{key}` table key not found in\n{syms}?') | ||||
| 
 | ||||
|         return await client.destroy(tbk=key) | ||||
|         tbk = mk_tbk(( | ||||
|             key, | ||||
|             tf_in_1s.get(timeframe, tf_in_1s[60]), | ||||
|             fmt, | ||||
|         )) | ||||
|         return await client.destroy(tbk=tbk) | ||||
| 
 | ||||
|     async def write_ohlcv( | ||||
|         self, | ||||
|         fqsn: str, | ||||
|         ohlcv: np.ndarray, | ||||
|         timeframe: int, | ||||
|         append_and_duplicate: bool = True, | ||||
|         limit: int = int(800e3), | ||||
| 
 | ||||
|  | @ -525,7 +544,7 @@ class Storage: | |||
|             # write to db | ||||
|             resp = await self.client.write( | ||||
|                 to_push, | ||||
|                 tbk=f'{fqsn}/1Sec/OHLCV', | ||||
|                 tbk=f'{fqsn}/{tf_in_1s[timeframe]}/OHLCV', | ||||
| 
 | ||||
|                 # NOTE: will will append duplicates | ||||
|                 # for the same timestamp-index. | ||||
|  | @ -577,6 +596,7 @@ class Storage: | |||
|     # def delete_range(self, start_dt, end_dt) -> None: | ||||
|     #     ... | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_storage_client( | ||||
|     fqsn: str, | ||||
|  | @ -642,7 +662,7 @@ async def tsdb_history_update( | |||
|     ): | ||||
|         profiler(f'opened feed for {fqsn}') | ||||
| 
 | ||||
|         to_append = feed.shm.array | ||||
|         to_append = feed.hist_shm.array | ||||
|         to_prepend = None | ||||
| 
 | ||||
|         if fqsn: | ||||
|  | @ -651,7 +671,7 @@ async def tsdb_history_update( | |||
|                 fqsn = symbol.front_fqsn() | ||||
| 
 | ||||
|             # diff db history with shm and only write the missing portions | ||||
|             ohlcv = feed.shm.array | ||||
|             ohlcv = feed.hist_shm.array | ||||
| 
 | ||||
|             # TODO: use pg profiler | ||||
|             tsdb_arrays = await storage.read_ohlcv(fqsn) | ||||
|  |  | |||
							
								
								
									
										67
									
								
								piker/pp.py
								
								
								
								
							
							
						
						
									
										67
									
								
								piker/pp.py
								
								
								
								
							|  | @ -20,6 +20,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. | |||
| (looking at you `ib` and dirt-bird friends) | ||||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| from contextlib import contextmanager as cm | ||||
| from pprint import pformat | ||||
| import os | ||||
|  | @ -138,13 +139,31 @@ class Position(Struct): | |||
| 
 | ||||
|     # ordered record of known constituent trade messages | ||||
|     clears: dict[ | ||||
|         Union[str, int, Status],  # trade id | ||||
|         str | int,  # trade id | ||||
|         dict[str, Any],  # transaction history summaries | ||||
|     ] = {} | ||||
|     first_clear_dt: Optional[datetime] = None | ||||
| 
 | ||||
|     expiry: Optional[datetime] = None | ||||
| 
 | ||||
|     # @property | ||||
|     # def clears(self) -> dict[ | ||||
|     #     Union[str, int, Status],  # trade id | ||||
|     #     dict[str, Any],  # transaction history summaries | ||||
|     # ]: | ||||
|     #     ''' | ||||
|     #     Datetime sorted reference to internal clears table. | ||||
| 
 | ||||
|     #     ''' | ||||
|     #     # self._clears = {} | ||||
|     #     self._clears = dict(sorted( | ||||
|     #         self._clears.items(), | ||||
|     #         key=lambda entry: entry[1]['dt'], | ||||
|     #     )) | ||||
|     #         # self._clears[k] = v | ||||
| 
 | ||||
|     #     return self._clears | ||||
| 
 | ||||
|     def to_dict(self) -> dict: | ||||
|         return { | ||||
|             f: getattr(self, f) | ||||
|  | @ -219,6 +238,10 @@ class Position(Struct): | |||
| 
 | ||||
|         ''' | ||||
|         clears = list(self.clears.values()) | ||||
|         if not clears: | ||||
|             log.warning(f'No clears table for {self.symbol}!?') | ||||
|             return | ||||
| 
 | ||||
|         self.first_clear_dt = min(list(entry['dt'] for entry in clears)) | ||||
|         last_clear = clears[-1] | ||||
| 
 | ||||
|  | @ -623,6 +646,7 @@ class PpTable(Struct): | |||
| 
 | ||||
|     def to_toml( | ||||
|         self, | ||||
|         min_clears: bool = True, | ||||
|     ) -> dict[str, Any]: | ||||
| 
 | ||||
|         active, closed = self.dump_active() | ||||
|  | @ -635,7 +659,9 @@ class PpTable(Struct): | |||
| 
 | ||||
|             # keep the minimal amount of clears that make up this | ||||
|             # position since the last net-zero state. | ||||
|             if min_clears: | ||||
|                 pos.minimize_clears() | ||||
| 
 | ||||
|             pos.ensure_state() | ||||
| 
 | ||||
|             # serialize to pre-toml form | ||||
|  | @ -682,6 +708,8 @@ def load_pps_from_ledger( | |||
|     brokername: str, | ||||
|     acctname: str, | ||||
| 
 | ||||
|     table: Optional[PpTable] = None, | ||||
| 
 | ||||
|     # post normalization filter on ledger entries to be processed | ||||
|     filter_by: Optional[list[dict]] = None, | ||||
| 
 | ||||
|  | @ -698,7 +726,6 @@ def load_pps_from_ledger( | |||
|     ''' | ||||
|     with ( | ||||
|         open_trade_ledger(brokername, acctname) as ledger, | ||||
|         open_pps(brokername, acctname) as table, | ||||
|     ): | ||||
|         if not ledger: | ||||
|             # null case, no ledger file with content | ||||
|  | @ -716,6 +743,10 @@ def load_pps_from_ledger( | |||
|         else: | ||||
|             records = src_records | ||||
| 
 | ||||
|         if table is None: | ||||
|             with open_pps(brokername, acctname) as table: | ||||
|                 updated = table.update_from_trans(records) | ||||
|         else: | ||||
|             updated = table.update_from_trans(records) | ||||
| 
 | ||||
|     return records, updated | ||||
|  | @ -886,15 +917,27 @@ def open_pps( | |||
|         conf=conf, | ||||
|     ) | ||||
| 
 | ||||
|     # first pass populate all missing clears record tables | ||||
|     # for fqsn, entry in pps.items(): | ||||
|     #     # convert clears sub-tables (only in this form | ||||
|     #     # for toml re-presentation) back into a master table. | ||||
|     #     clears_list = entry.get('clears', []) | ||||
| 
 | ||||
|     #     # attempt to reload from ledger | ||||
|     #     if not clears_list: | ||||
|     #         trans, pos = load_pps_from_ledger( | ||||
|     #             brokername, | ||||
|     #             acctid, | ||||
|     #             filter_by=[entry['bsuid']], | ||||
|     #             table=table, | ||||
|     #         ) | ||||
|     #         # breakpoint() | ||||
| 
 | ||||
|     # unmarshal/load ``pps.toml`` config entries into object form | ||||
|     # and update `PpTable` obj entries. | ||||
|     for fqsn, entry in pps.items(): | ||||
|         bsuid = entry['bsuid'] | ||||
| 
 | ||||
|         # convert clears sub-tables (only in this form | ||||
|         # for toml re-presentation) back into a master table. | ||||
|         clears_list = entry['clears'] | ||||
| 
 | ||||
|         # index clears entries in "object" form by tid in a top | ||||
|         # level dict instead of a list (as is presented in our | ||||
|         # ``pps.toml``). | ||||
|  | @ -906,6 +949,18 @@ def open_pps( | |||
|         # processing of new clear events. | ||||
|         trans: list[Transaction] = [] | ||||
| 
 | ||||
|         # convert clears sub-tables (only in this form | ||||
|         # for toml re-presentation) back into a master table. | ||||
|         clears_list = entry['clears'] | ||||
| 
 | ||||
|         # # attempt to reload from ledger | ||||
|         # if not clears_list: | ||||
|         #     trans, pos = load_pps_from_ledger( | ||||
|         #         brokername, | ||||
|         #         acctid, | ||||
|         #         table=table, | ||||
|         #     ) | ||||
| 
 | ||||
|         for clears_table in clears_list: | ||||
|             tid = clears_table.pop('tid') | ||||
|             dtstr = clears_table['dt'] | ||||
|  |  | |||
|  | @ -249,14 +249,14 @@ async def graphics_update_loop( | |||
|     linked: LinkedSplits = godwidget.rt_linked | ||||
|     display_rate = godwidget.window.current_screen().refreshRate() | ||||
| 
 | ||||
|     chart = linked.chart | ||||
|     fast_chart = linked.chart | ||||
|     hist_chart = godwidget.hist_linked.chart | ||||
| 
 | ||||
|     ohlcv = feed.rt_shm | ||||
|     hist_ohlcv = feed.hist_shm | ||||
| 
 | ||||
|     # update last price sticky | ||||
|     last_price_sticky = chart._ysticks[chart.name] | ||||
|     last_price_sticky = fast_chart._ysticks[fast_chart.name] | ||||
|     last_price_sticky.update_from_data( | ||||
|         *ohlcv.array[-1][['index', 'close']] | ||||
|     ) | ||||
|  | @ -268,7 +268,7 @@ async def graphics_update_loop( | |||
| 
 | ||||
|     maxmin = partial( | ||||
|         chart_maxmin, | ||||
|         chart, | ||||
|         fast_chart, | ||||
|         ohlcv, | ||||
|         vlm_chart, | ||||
|     ) | ||||
|  | @ -282,15 +282,15 @@ async def graphics_update_loop( | |||
| 
 | ||||
|     last, volume = ohlcv.array[-1][['close', 'volume']] | ||||
| 
 | ||||
|     symbol = chart.linked.symbol | ||||
|     symbol = fast_chart.linked.symbol | ||||
| 
 | ||||
|     l1 = L1Labels( | ||||
|         chart, | ||||
|         fast_chart, | ||||
|         # determine precision/decimal lengths | ||||
|         digits=symbol.tick_size_digits, | ||||
|         size_digits=symbol.lot_size_digits, | ||||
|     ) | ||||
|     chart._l1_labels = l1 | ||||
|     fast_chart._l1_labels = l1 | ||||
| 
 | ||||
|     # TODO: | ||||
|     # - in theory we should be able to read buffer data faster | ||||
|  | @ -300,10 +300,10 @@ async def graphics_update_loop( | |||
|     #   levels this might be dark volume we need to | ||||
|     #   present differently -> likely dark vlm | ||||
| 
 | ||||
|     tick_size = chart.linked.symbol.tick_size | ||||
|     tick_size = fast_chart.linked.symbol.tick_size | ||||
|     tick_margin = 3 * tick_size | ||||
| 
 | ||||
|     chart.show() | ||||
|     fast_chart.show() | ||||
|     last_quote = time.time() | ||||
|     i_last = ohlcv.index | ||||
| 
 | ||||
|  | @ -313,7 +313,7 @@ async def graphics_update_loop( | |||
|         'maxmin': maxmin, | ||||
|         'ohlcv': ohlcv, | ||||
|         'hist_ohlcv': hist_ohlcv, | ||||
|         'chart': chart, | ||||
|         'chart': fast_chart, | ||||
|         'last_price_sticky': last_price_sticky, | ||||
|         'hist_last_price_sticky': hist_last_price_sticky, | ||||
|         'l1': l1, | ||||
|  | @ -333,7 +333,7 @@ async def graphics_update_loop( | |||
|         ds.vlm_chart = vlm_chart | ||||
|         ds.vlm_sticky = vlm_sticky | ||||
| 
 | ||||
|     chart.default_view() | ||||
|     fast_chart.default_view() | ||||
| 
 | ||||
|     # TODO: probably factor this into some kinda `DisplayState` | ||||
|     # API that can be reused at least in terms of pulling view | ||||
|  | @ -410,16 +410,16 @@ async def graphics_update_loop( | |||
|         last_quote = time.time() | ||||
| 
 | ||||
|         # chart isn't active/shown so skip render cycle and pause feed(s) | ||||
|         if chart.linked.isHidden(): | ||||
|         if fast_chart.linked.isHidden(): | ||||
|             # print('skipping update') | ||||
|             chart.pause_all_feeds() | ||||
|             fast_chart.pause_all_feeds() | ||||
|             continue | ||||
| 
 | ||||
|         ic = chart.view._ic | ||||
|         if ic: | ||||
|             chart.pause_all_feeds() | ||||
|             await ic.wait() | ||||
|             chart.resume_all_feeds() | ||||
|         # ic = fast_chart.view._ic | ||||
|         # if ic: | ||||
|         #     fast_chart.pause_all_feeds() | ||||
|         #     await ic.wait() | ||||
|         #     fast_chart.resume_all_feeds() | ||||
| 
 | ||||
|         # sync call to update all graphics/UX components. | ||||
|         graphics_update_cycle(ds) | ||||
|  | @ -502,6 +502,7 @@ def graphics_update_cycle( | |||
|             or trigger_all | ||||
|         ): | ||||
|             chart.increment_view(steps=i_diff) | ||||
|             chart.view._set_yrange(yrange=(mn, mx)) | ||||
| 
 | ||||
|             if vlm_chart: | ||||
|                 vlm_chart.increment_view(steps=i_diff) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue