Compare commits
	
		
			23 Commits 
		
	
	
		
			gitea_feat
			...
			clears_tab
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | c53071e43a | |
|  | 41ffccc59e | |
|  | 55856f5e8b | |
|  | e8ab28e456 | |
|  | d2b6216994 | |
|  | eb743759a4 | |
|  | 74910ba56c | |
|  | 28535fa977 | |
|  | 1d7e642dbd | |
|  | 69be65237f | |
|  | 96f5a8abb8 | |
|  | 13e886c967 | |
|  | 2c6b832e50 | |
|  | d5c3124722 | |
|  | c939e75ef9 | |
|  | 844f8beaa7 | |
|  | ac7ba500be | |
|  | 3301619647 | |
|  | 7f498766af | |
|  | 9270391e01 | |
|  | 0c061d8957 | |
|  | 87f7a03dbe | |
|  | 1adf5fb9c0 | 
|  | @ -195,9 +195,8 @@ async def open_piker_runtime( | ||||||
| 
 | 
 | ||||||
| ) -> Optional[tractor._portal.Portal]: | ) -> Optional[tractor._portal.Portal]: | ||||||
|     ''' |     ''' | ||||||
|     Start a piker actor who's runtime will automatically |     Start a piker actor who's runtime will automatically sync with | ||||||
|     sync with existing piker actors in local network |     existing piker actors on the local link based on configuration. | ||||||
|     based on configuration. |  | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     global _services |     global _services | ||||||
|  |  | ||||||
|  | @ -388,6 +388,7 @@ async def open_history_client( | ||||||
|     async with open_cached_client('binance') as client: |     async with open_cached_client('binance') as client: | ||||||
| 
 | 
 | ||||||
|         async def get_ohlc( |         async def get_ohlc( | ||||||
|  |             timeframe: float, | ||||||
|             end_dt: Optional[datetime] = None, |             end_dt: Optional[datetime] = None, | ||||||
|             start_dt: Optional[datetime] = None, |             start_dt: Optional[datetime] = None, | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -43,6 +43,7 @@ from bidict import bidict | ||||||
| import trio | import trio | ||||||
| import tractor | import tractor | ||||||
| from tractor import to_asyncio | from tractor import to_asyncio | ||||||
|  | import pendulum | ||||||
| import ib_insync as ibis | import ib_insync as ibis | ||||||
| from ib_insync.contract import ( | from ib_insync.contract import ( | ||||||
|     Contract, |     Contract, | ||||||
|  | @ -52,6 +53,7 @@ from ib_insync.contract import ( | ||||||
| from ib_insync.order import Order | from ib_insync.order import Order | ||||||
| from ib_insync.ticker import Ticker | from ib_insync.ticker import Ticker | ||||||
| from ib_insync.objects import ( | from ib_insync.objects import ( | ||||||
|  |     BarDataList, | ||||||
|     Position, |     Position, | ||||||
|     Fill, |     Fill, | ||||||
|     Execution, |     Execution, | ||||||
|  | @ -78,26 +80,11 @@ _time_units = { | ||||||
|     'h': ' hours', |     'h': ' hours', | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| _time_frames = { | _bar_sizes = { | ||||||
|     '1s': '1 Sec', |     1: '1 Sec', | ||||||
|     '5s': '5 Sec', |     60: '1 min', | ||||||
|     '30s': '30 Sec', |     60*60: '1 hour', | ||||||
|     '1m': 'OneMinute', |     24*60*60: '1 day', | ||||||
|     '2m': 'TwoMinutes', |  | ||||||
|     '3m': 'ThreeMinutes', |  | ||||||
|     '4m': 'FourMinutes', |  | ||||||
|     '5m': 'FiveMinutes', |  | ||||||
|     '10m': 'TenMinutes', |  | ||||||
|     '15m': 'FifteenMinutes', |  | ||||||
|     '20m': 'TwentyMinutes', |  | ||||||
|     '30m': 'HalfHour', |  | ||||||
|     '1h': 'OneHour', |  | ||||||
|     '2h': 'TwoHours', |  | ||||||
|     '4h': 'FourHours', |  | ||||||
|     'D': 'OneDay', |  | ||||||
|     'W': 'OneWeek', |  | ||||||
|     'M': 'OneMonth', |  | ||||||
|     'Y': 'OneYear', |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| _show_wap_in_history: bool = False | _show_wap_in_history: bool = False | ||||||
|  | @ -199,7 +186,8 @@ _adhoc_futes_set = { | ||||||
|     'lb.nymex',  # random len lumber |     'lb.nymex',  # random len lumber | ||||||
| 
 | 
 | ||||||
|     # metals |     # metals | ||||||
|     'xauusd.cmdty',  # gold spot |     # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 | ||||||
|  |     'xauusd.cmdty',  # london gold spot ^ | ||||||
|     'gc.nymex', |     'gc.nymex', | ||||||
|     'mgc.nymex',  # micro |     'mgc.nymex',  # micro | ||||||
| 
 | 
 | ||||||
|  | @ -257,14 +245,12 @@ _exch_skip_list = { | ||||||
|     'PSE', |     'PSE', | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 |  | ||||||
| 
 |  | ||||||
| _enters = 0 | _enters = 0 | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def bars_to_np(bars: list) -> np.ndarray: | def bars_to_np(bars: list) -> np.ndarray: | ||||||
|     ''' |     ''' | ||||||
|     Convert a "bars list thing" (``BarsList`` type from ibis) |     Convert a "bars list thing" (``BarDataList`` type from ibis) | ||||||
|     into a numpy struct array. |     into a numpy struct array. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|  | @ -284,6 +270,27 @@ def bars_to_np(bars: list) -> np.ndarray: | ||||||
|     return nparr |     return nparr | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # NOTE: pacing violations exist for higher sample rates: | ||||||
|  | # https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations | ||||||
|  | # Also see note on duration limits being lifted on 1m+ periods, | ||||||
|  | # but they say "use with discretion": | ||||||
|  | # https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd | ||||||
|  | _samplings: dict[int, tuple[str, str]] = { | ||||||
|  |     1: ( | ||||||
|  |         '1 secs', | ||||||
|  |         f'{int(2e3)} S', | ||||||
|  |         pendulum.duration(seconds=2e3), | ||||||
|  |     ), | ||||||
|  |     # TODO: benchmark >1 D duration on query to see if | ||||||
|  |     # throughput can be made faster during backfilling. | ||||||
|  |     60: ( | ||||||
|  |         '1 min', | ||||||
|  |         '1 D', | ||||||
|  |         pendulum.duration(days=1), | ||||||
|  |     ), | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| class Client: | class Client: | ||||||
|     ''' |     ''' | ||||||
|     IB wrapped for our broker backend API. |     IB wrapped for our broker backend API. | ||||||
|  | @ -338,19 +345,32 @@ class Client: | ||||||
|         start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", |         start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", | ||||||
|         end_dt: Union[datetime, str] = "", |         end_dt: Union[datetime, str] = "", | ||||||
| 
 | 
 | ||||||
|         sample_period_s: str = 1,  # ohlc sample period |         # ohlc sample period in seconds | ||||||
|         period_count: int = int(2e3),  # <- max per 1s sample query |         sample_period_s: int = 1, | ||||||
| 
 | 
 | ||||||
|     ) -> list[dict[str, Any]]: |         # optional "duration of time" equal to the | ||||||
|  |         # length of the returned history frame. | ||||||
|  |         duration: Optional[str] = None, | ||||||
|  | 
 | ||||||
|  |         **kwargs, | ||||||
|  | 
 | ||||||
|  |     ) -> tuple[BarDataList, np.ndarray, pendulum.Duration]: | ||||||
|         ''' |         ''' | ||||||
|         Retreive OHLCV bars for a fqsn over a range to the present. |         Retreive OHLCV bars for a fqsn over a range to the present. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|  |         # See API docs here: | ||||||
|  |         # https://interactivebrokers.github.io/tws-api/historical_data.html | ||||||
|         bars_kwargs = {'whatToShow': 'TRADES'} |         bars_kwargs = {'whatToShow': 'TRADES'} | ||||||
|  |         bars_kwargs.update(kwargs) | ||||||
|  |         bar_size, duration, dt_duration = _samplings[sample_period_s] | ||||||
| 
 | 
 | ||||||
|         global _enters |         global _enters | ||||||
|         # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') |         # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') | ||||||
|         print(f'REQUESTING BARS {_enters} @ end={end_dt}') |         print( | ||||||
|  |             f"REQUESTING {duration}'s worth {bar_size} BARS\n" | ||||||
|  |             f'{_enters} @ end={end_dt}"' | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|         if not end_dt: |         if not end_dt: | ||||||
|             end_dt = '' |             end_dt = '' | ||||||
|  | @ -360,30 +380,20 @@ class Client: | ||||||
|         contract = (await self.find_contracts(fqsn))[0] |         contract = (await self.find_contracts(fqsn))[0] | ||||||
|         bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) |         bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) | ||||||
| 
 | 
 | ||||||
|         # _min = min(2000*100, count) |  | ||||||
|         bars = await self.ib.reqHistoricalDataAsync( |         bars = await self.ib.reqHistoricalDataAsync( | ||||||
|             contract, |             contract, | ||||||
|             endDateTime=end_dt, |             endDateTime=end_dt, | ||||||
|             formatDate=2, |             formatDate=2, | ||||||
| 
 | 
 | ||||||
|             # time history length values format: |  | ||||||
|             # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` |  | ||||||
| 
 |  | ||||||
|             # OHLC sampling values: |             # OHLC sampling values: | ||||||
|             # 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins, |             # 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins, | ||||||
|             # 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins, |             # 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins, | ||||||
|             # 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M |             # 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M | ||||||
|             # barSizeSetting='1 secs', |             barSizeSetting=bar_size, | ||||||
| 
 | 
 | ||||||
|             # durationStr='{count} S'.format(count=15000 * 5), |             # time history length values format: | ||||||
|             # durationStr='{count} D'.format(count=1), |             # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` | ||||||
|             # barSizeSetting='5 secs', |             durationStr=duration, | ||||||
| 
 |  | ||||||
|             durationStr='{count} S'.format(count=period_count), |  | ||||||
|             # barSizeSetting='5 secs', |  | ||||||
|             barSizeSetting='1 secs', |  | ||||||
| 
 |  | ||||||
|             # barSizeSetting='1 min', |  | ||||||
| 
 | 
 | ||||||
|             # always use extended hours |             # always use extended hours | ||||||
|             useRTH=False, |             useRTH=False, | ||||||
|  | @ -394,11 +404,21 @@ class Client: | ||||||
|             # whatToShow='TRADES', |             # whatToShow='TRADES', | ||||||
|         ) |         ) | ||||||
|         if not bars: |         if not bars: | ||||||
|             # TODO: raise underlying error here |             # NOTE: there's 2 cases here to handle (and this should be | ||||||
|             raise ValueError(f"No bars retreived for {fqsn}?") |             # read alongside the implementation of | ||||||
|  |             # ``.reqHistoricalDataAsync()``): | ||||||
|  |             # - no data is returned for the period likely due to | ||||||
|  |             # a weekend, holiday or other non-trading period prior to | ||||||
|  |             # ``end_dt`` which exceeds the ``duration``, | ||||||
|  |             # - a timeout occurred in which case insync internals return | ||||||
|  |             # an empty list thing with bars.clear()... | ||||||
|  |             return [], np.empty(0), dt_duration | ||||||
|  |             # TODO: we could maybe raise ``NoData`` instead if we | ||||||
|  |             # rewrite the method in the first case? right now there's no | ||||||
|  |             # way to detect a timeout. | ||||||
| 
 | 
 | ||||||
|         nparr = bars_to_np(bars) |         nparr = bars_to_np(bars) | ||||||
|         return bars, nparr |         return bars, nparr, dt_duration | ||||||
| 
 | 
 | ||||||
|     async def con_deats( |     async def con_deats( | ||||||
|         self, |         self, | ||||||
|  | @ -463,7 +483,7 @@ class Client: | ||||||
|         self, |         self, | ||||||
|         pattern: str, |         pattern: str, | ||||||
|         # how many contracts to search "up to" |         # how many contracts to search "up to" | ||||||
|         upto: int = 6, |         upto: int = 16, | ||||||
|         asdicts: bool = True, |         asdicts: bool = True, | ||||||
| 
 | 
 | ||||||
|     ) -> dict[str, ContractDetails]: |     ) -> dict[str, ContractDetails]: | ||||||
|  | @ -498,6 +518,16 @@ class Client: | ||||||
| 
 | 
 | ||||||
|                 exch = tract.exchange |                 exch = tract.exchange | ||||||
|                 if exch not in _exch_skip_list: |                 if exch not in _exch_skip_list: | ||||||
|  | 
 | ||||||
|  |                     # try to lookup any contracts from our adhoc set | ||||||
|  |                     # since often the exchange/venue is named slightly | ||||||
|  |                     # different (eg. BRR.CMECRYPTO` instead of just | ||||||
|  |                     # `.CME`). | ||||||
|  |                     info = _adhoc_symbol_map.get(sym) | ||||||
|  |                     if info: | ||||||
|  |                         con_kwargs, bars_kwargs = info | ||||||
|  |                         exch = con_kwargs['exchange'] | ||||||
|  | 
 | ||||||
|                     # try get all possible contracts for symbol as per, |                     # try get all possible contracts for symbol as per, | ||||||
|                     # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut |                     # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut | ||||||
|                     con = ibis.Future( |                     con = ibis.Future( | ||||||
|  | @ -1066,6 +1096,7 @@ async def load_aio_clients( | ||||||
|     # retry a few times to get the client going.. |     # retry a few times to get the client going.. | ||||||
|     connect_retries: int = 3, |     connect_retries: int = 3, | ||||||
|     connect_timeout: float = 0.5, |     connect_timeout: float = 0.5, | ||||||
|  |     disconnect_on_exit: bool = True, | ||||||
| 
 | 
 | ||||||
| ) -> dict[str, Client]: | ) -> dict[str, Client]: | ||||||
|     ''' |     ''' | ||||||
|  | @ -1207,10 +1238,11 @@ async def load_aio_clients( | ||||||
|     finally: |     finally: | ||||||
|         # TODO: for re-scans we'll want to not teardown clients which |         # TODO: for re-scans we'll want to not teardown clients which | ||||||
|         # are up and stable right? |         # are up and stable right? | ||||||
|         for acct, client in _accounts2clients.items(): |         if disconnect_on_exit: | ||||||
|             log.info(f'Disconnecting {acct}@{client}') |             for acct, client in _accounts2clients.items(): | ||||||
|             client.ib.disconnect() |                 log.info(f'Disconnecting {acct}@{client}') | ||||||
|             _client_cache.pop((host, port), None) |                 client.ib.disconnect() | ||||||
|  |                 _client_cache.pop((host, port), None) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def load_clients_for_trio( | async def load_clients_for_trio( | ||||||
|  |  | ||||||
|  | @ -305,7 +305,7 @@ async def update_ledger_from_api_trades( | ||||||
|         entry['listingExchange'] = pexch |         entry['listingExchange'] = pexch | ||||||
| 
 | 
 | ||||||
|     conf = get_config() |     conf = get_config() | ||||||
|     entries = trades_to_ledger_entries( |     entries = api_trades_to_ledger_entries( | ||||||
|         conf['accounts'].inverse, |         conf['accounts'].inverse, | ||||||
|         trade_entries, |         trade_entries, | ||||||
|     ) |     ) | ||||||
|  | @ -362,7 +362,7 @@ async def update_and_audit_msgs( | ||||||
|                 # if ib reports a lesser pp it's not as bad since we can |                 # if ib reports a lesser pp it's not as bad since we can | ||||||
|                 # presume we're at least not more in the shit then we |                 # presume we're at least not more in the shit then we | ||||||
|                 # thought. |                 # thought. | ||||||
|                 if diff: |                 if diff and pikersize: | ||||||
|                     reverse_split_ratio = pikersize / ibsize |                     reverse_split_ratio = pikersize / ibsize | ||||||
|                     split_ratio = 1/reverse_split_ratio |                     split_ratio = 1/reverse_split_ratio | ||||||
| 
 | 
 | ||||||
|  | @ -372,6 +372,7 @@ async def update_and_audit_msgs( | ||||||
|                         entry = f'split_ratio = 1/{int(reverse_split_ratio)}' |                         entry = f'split_ratio = 1/{int(reverse_split_ratio)}' | ||||||
| 
 | 
 | ||||||
|                     raise ValueError( |                     raise ValueError( | ||||||
|  |                     # log.error( | ||||||
|                         f'POSITION MISMATCH ib <-> piker ledger:\n' |                         f'POSITION MISMATCH ib <-> piker ledger:\n' | ||||||
|                         f'ib: {ibppmsg}\n' |                         f'ib: {ibppmsg}\n' | ||||||
|                         f'piker: {msg}\n' |                         f'piker: {msg}\n' | ||||||
|  | @ -1122,18 +1123,16 @@ def norm_trade_records( | ||||||
|             continue |             continue | ||||||
| 
 | 
 | ||||||
|         # timestamping is way different in API records |         # timestamping is way different in API records | ||||||
|  |         dtstr = record.get('datetime') | ||||||
|         date = record.get('date') |         date = record.get('date') | ||||||
|         if not date: |         flex_dtstr = record.get('dateTime') | ||||||
|             # probably a flex record with a wonky non-std timestamp.. |  | ||||||
|             date, ts = record['dateTime'].split(';') |  | ||||||
|             dt = pendulum.parse(date) |  | ||||||
|             ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' |  | ||||||
|             tsdt = pendulum.parse(ts) |  | ||||||
|             dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) |  | ||||||
| 
 | 
 | ||||||
|         else: |         if dtstr or date: | ||||||
|             # epoch_dt = pendulum.from_timestamp(record.get('time')) |             dt = pendulum.parse(dtstr or date) | ||||||
|             dt = pendulum.parse(date) | 
 | ||||||
|  |         elif flex_dtstr: | ||||||
|  |             # probably a flex record with a wonky non-std timestamp.. | ||||||
|  |             dt = parse_flex_dt(record['dateTime']) | ||||||
| 
 | 
 | ||||||
|         # special handling of symbol extraction from |         # special handling of symbol extraction from | ||||||
|         # flex records using some ad-hoc schema parsing. |         # flex records using some ad-hoc schema parsing. | ||||||
|  | @ -1182,69 +1181,58 @@ def norm_trade_records( | ||||||
|     return {r.tid: r for r in records} |     return {r.tid: r for r in records} | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def trades_to_ledger_entries( | def parse_flex_dt( | ||||||
|  |     record: str, | ||||||
|  | ) -> pendulum.datetime: | ||||||
|  |     date, ts = record.split(';') | ||||||
|  |     dt = pendulum.parse(date) | ||||||
|  |     ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' | ||||||
|  |     tsdt = pendulum.parse(ts) | ||||||
|  |     return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def api_trades_to_ledger_entries( | ||||||
|     accounts: bidict, |     accounts: bidict, | ||||||
|     trade_entries: list[object], |     trade_entries: list[object], | ||||||
|     source_type: str = 'api', |  | ||||||
| 
 | 
 | ||||||
| ) -> dict: | ) -> dict: | ||||||
|     ''' |     ''' | ||||||
|     Convert either of API execution objects or flex report |     Convert API execution objects entry objects into ``dict`` form, | ||||||
|     entry objects into ``dict`` form, pretty much straight up |     pretty much straight up without modification except add | ||||||
|     without modification. |     a `pydatetime` field from the parsed timestamp. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     trades_by_account = {} |     trades_by_account = {} | ||||||
| 
 |  | ||||||
|     for t in trade_entries: |     for t in trade_entries: | ||||||
|         if source_type == 'flex': |         # NOTE: example of schema we pull from the API client. | ||||||
|             entry = t.__dict__ |         # { | ||||||
|  |         #     'commissionReport': CommissionReport(... | ||||||
|  |         #     'contract': {... | ||||||
|  |         #     'execution': Execution(... | ||||||
|  |         #     'time': 1654801166.0 | ||||||
|  |         # } | ||||||
| 
 | 
 | ||||||
|             # XXX: LOL apparently ``toml`` has a bug |         # flatten all sub-dicts and values into one top level entry. | ||||||
|             # where a section key error will show up in the write |         entry = {} | ||||||
|             # if you leave a table key as an `int`? So i guess |         for section, val in t.items(): | ||||||
|             # cast to strs for all keys.. |             match section: | ||||||
|  |                 case 'contract' | 'execution' | 'commissionReport': | ||||||
|  |                     # sub-dict cases | ||||||
|  |                     entry.update(val) | ||||||
| 
 | 
 | ||||||
|             # oddly for some so-called "BookTrade" entries |                 case 'time': | ||||||
|             # this field seems to be blank, no cuckin clue. |                     # ib has wack ns timestamps, or is that us? | ||||||
|             # trade['ibExecID'] |                     continue | ||||||
|             tid = str(entry.get('ibExecID') or entry['tradeID']) |  | ||||||
|             # date = str(entry['tradeDate']) |  | ||||||
| 
 | 
 | ||||||
|             # XXX: is it going to cause problems if a account name |                 case _: | ||||||
|             # get's lost? The user should be able to find it based |                     entry[section] = val | ||||||
|             # on the actual exec history right? |  | ||||||
|             acctid = accounts[str(entry['accountId'])] |  | ||||||
| 
 | 
 | ||||||
|         elif source_type == 'api': |         tid = str(entry['execId']) | ||||||
|             # NOTE: example of schema we pull from the API client. |         dt = pendulum.from_timestamp(entry['time']) | ||||||
|             # { |         # TODO: why isn't this showing seconds in the str? | ||||||
|             #     'commissionReport': CommissionReport(... |         entry['pydatetime'] = dt | ||||||
|             #     'contract': {... |         entry['datetime'] = str(dt) | ||||||
|             #     'execution': Execution(... |         acctid = accounts[entry['acctNumber']] | ||||||
|             #     'time': 1654801166.0 |  | ||||||
|             # } |  | ||||||
| 
 |  | ||||||
|             # flatten all sub-dicts and values into one top level entry. |  | ||||||
|             entry = {} |  | ||||||
|             for section, val in t.items(): |  | ||||||
|                 match section: |  | ||||||
|                     case 'contract' | 'execution' | 'commissionReport': |  | ||||||
|                         # sub-dict cases |  | ||||||
|                         entry.update(val) |  | ||||||
| 
 |  | ||||||
|                     case 'time': |  | ||||||
|                         # ib has wack ns timestamps, or is that us? |  | ||||||
|                         continue |  | ||||||
| 
 |  | ||||||
|                     case _: |  | ||||||
|                         entry[section] = val |  | ||||||
| 
 |  | ||||||
|             tid = str(entry['execId']) |  | ||||||
|             dt = pendulum.from_timestamp(entry['time']) |  | ||||||
|             # TODO: why isn't this showing seconds in the str? |  | ||||||
|             entry['date'] = str(dt) |  | ||||||
|             acctid = accounts[entry['acctNumber']] |  | ||||||
| 
 | 
 | ||||||
|         if not tid: |         if not tid: | ||||||
|             # this is likely some kind of internal adjustment |             # this is likely some kind of internal adjustment | ||||||
|  | @ -1262,6 +1250,73 @@ def trades_to_ledger_entries( | ||||||
|             acctid, {} |             acctid, {} | ||||||
|         )[tid] = entry |         )[tid] = entry | ||||||
| 
 | 
 | ||||||
|  |     # sort entries in output by python based datetime | ||||||
|  |     for acctid in trades_by_account: | ||||||
|  |         trades_by_account[acctid] = dict(sorted( | ||||||
|  |             trades_by_account[acctid].items(), | ||||||
|  |             key=lambda entry: entry[1].pop('pydatetime'), | ||||||
|  |         )) | ||||||
|  | 
 | ||||||
|  |     return trades_by_account | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def flex_records_to_ledger_entries( | ||||||
|  |     accounts: bidict, | ||||||
|  |     trade_entries: list[object], | ||||||
|  | 
 | ||||||
|  | ) -> dict: | ||||||
|  |     ''' | ||||||
|  |     Convert flex report entry objects into ``dict`` form, pretty much | ||||||
|  |     straight up without modification except add a `pydatetime` field | ||||||
|  |     from the parsed timestamp. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     trades_by_account = {} | ||||||
|  |     for t in trade_entries: | ||||||
|  |         entry = t.__dict__ | ||||||
|  | 
 | ||||||
|  |         # XXX: LOL apparently ``toml`` has a bug | ||||||
|  |         # where a section key error will show up in the write | ||||||
|  |         # if you leave a table key as an `int`? So i guess | ||||||
|  |         # cast to strs for all keys.. | ||||||
|  | 
 | ||||||
|  |         # oddly for some so-called "BookTrade" entries | ||||||
|  |         # this field seems to be blank, no cuckin clue. | ||||||
|  |         # trade['ibExecID'] | ||||||
|  |         tid = str(entry.get('ibExecID') or entry['tradeID']) | ||||||
|  |         # date = str(entry['tradeDate']) | ||||||
|  | 
 | ||||||
|  |         # XXX: is it going to cause problems if a account name | ||||||
|  |         # get's lost? The user should be able to find it based | ||||||
|  |         # on the actual exec history right? | ||||||
|  |         acctid = accounts[str(entry['accountId'])] | ||||||
|  | 
 | ||||||
|  |         # probably a flex record with a wonky non-std timestamp.. | ||||||
|  |         dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime']) | ||||||
|  |         entry['datetime'] = str(dt) | ||||||
|  | 
 | ||||||
|  |         if not tid: | ||||||
|  |             # this is likely some kind of internal adjustment | ||||||
|  |             # transaction, likely one of the following: | ||||||
|  |             # - an expiry event that will show a "book trade" indicating | ||||||
|  |             #   some adjustment to cash balances: zeroing or itm settle. | ||||||
|  |             # - a manual cash balance position adjustment likely done by | ||||||
|  |             #   the user from the accounts window in TWS where they can | ||||||
|  |             #   manually set the avg price and size: | ||||||
|  |             #   https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST | ||||||
|  |             log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') | ||||||
|  |             continue | ||||||
|  | 
 | ||||||
|  |         trades_by_account.setdefault( | ||||||
|  |             acctid, {} | ||||||
|  |         )[tid] = entry | ||||||
|  | 
 | ||||||
|  |     for acctid in trades_by_account: | ||||||
|  |         trades_by_account[acctid] = dict(sorted( | ||||||
|  |             trades_by_account[acctid].items(), | ||||||
|  |             key=lambda entry: entry[1]['pydatetime'], | ||||||
|  |         )) | ||||||
|  | 
 | ||||||
|     return trades_by_account |     return trades_by_account | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -1308,15 +1363,16 @@ def load_flex_trades( | ||||||
|     ln = len(trade_entries) |     ln = len(trade_entries) | ||||||
|     log.info(f'Loaded {ln} trades from flex query') |     log.info(f'Loaded {ln} trades from flex query') | ||||||
| 
 | 
 | ||||||
|     trades_by_account = trades_to_ledger_entries( |     trades_by_account = flex_records_to_ledger_entries( | ||||||
|         # get reverse map to user account names |         conf['accounts'].inverse,  # reverse map to user account names | ||||||
|         conf['accounts'].inverse, |  | ||||||
|         trade_entries, |         trade_entries, | ||||||
|         source_type='flex', |  | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|  |     ledger_dict: Optional[dict] = None | ||||||
|  | 
 | ||||||
|     for acctid in trades_by_account: |     for acctid in trades_by_account: | ||||||
|         trades_by_id = trades_by_account[acctid] |         trades_by_id = trades_by_account[acctid] | ||||||
|  | 
 | ||||||
|         with open_trade_ledger('ib', acctid) as ledger_dict: |         with open_trade_ledger('ib', acctid) as ledger_dict: | ||||||
|             tid_delta = set(trades_by_id) - set(ledger_dict) |             tid_delta = set(trades_by_id) - set(ledger_dict) | ||||||
|             log.info( |             log.info( | ||||||
|  | @ -1324,9 +1380,11 @@ def load_flex_trades( | ||||||
|                 f'{pformat(tid_delta)}' |                 f'{pformat(tid_delta)}' | ||||||
|             ) |             ) | ||||||
|             if tid_delta: |             if tid_delta: | ||||||
|                 ledger_dict.update( |                 sorted_delta = dict(sorted( | ||||||
|                     {tid: trades_by_id[tid] for tid in tid_delta} |                     {tid: trades_by_id[tid] for tid in tid_delta}.items(), | ||||||
|                 ) |                     key=lambda entry: entry[1].pop('pydatetime'), | ||||||
|  |                 )) | ||||||
|  |                 ledger_dict.update(sorted_delta) | ||||||
| 
 | 
 | ||||||
|     return ledger_dict |     return ledger_dict | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -22,6 +22,7 @@ import asyncio | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm | ||||||
| from dataclasses import asdict | from dataclasses import asdict | ||||||
| from datetime import datetime | from datetime import datetime | ||||||
|  | from functools import partial | ||||||
| from math import isnan | from math import isnan | ||||||
| import time | import time | ||||||
| from typing import ( | from typing import ( | ||||||
|  | @ -38,7 +39,6 @@ import tractor | ||||||
| import trio | import trio | ||||||
| from trio_typing import TaskStatus | from trio_typing import TaskStatus | ||||||
| 
 | 
 | ||||||
| from piker.data._sharedmem import ShmArray |  | ||||||
| from .._util import SymbolNotFound, NoData | from .._util import SymbolNotFound, NoData | ||||||
| from .api import ( | from .api import ( | ||||||
|     # _adhoc_futes_set, |     # _adhoc_futes_set, | ||||||
|  | @ -111,24 +111,54 @@ async def open_history_client( | ||||||
|     that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. |     that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|  |     # TODO: | ||||||
|  |     # - add logic to handle tradable hours and only grab | ||||||
|  |     #   valid bars in the range? | ||||||
|  |     # - we want to avoid overrunning the underlying shm array buffer and | ||||||
|  |     #   we should probably calc the number of calls to make depending on | ||||||
|  |     #   that until we have the `marketstore` daemon in place in which case | ||||||
|  |     #   the shm size will be driven by user config and available sys | ||||||
|  |     #   memory. | ||||||
|  | 
 | ||||||
|     async with open_data_client() as proxy: |     async with open_data_client() as proxy: | ||||||
| 
 | 
 | ||||||
|  |         max_timeout: float = 2. | ||||||
|  |         mean: float = 0 | ||||||
|  |         count: int = 0 | ||||||
|  | 
 | ||||||
|         async def get_hist( |         async def get_hist( | ||||||
|  |             timeframe: float, | ||||||
|             end_dt: Optional[datetime] = None, |             end_dt: Optional[datetime] = None, | ||||||
|             start_dt: Optional[datetime] = None, |             start_dt: Optional[datetime] = None, | ||||||
| 
 | 
 | ||||||
|         ) -> tuple[np.ndarray, str]: |         ) -> tuple[np.ndarray, str]: | ||||||
|  |             nonlocal max_timeout, mean, count | ||||||
| 
 | 
 | ||||||
|             out, fails = await get_bars(proxy, symbol, end_dt=end_dt) |             query_start = time.time() | ||||||
|  |             out, timedout = await get_bars( | ||||||
|  |                 proxy, | ||||||
|  |                 symbol, | ||||||
|  |                 timeframe, | ||||||
|  |                 end_dt=end_dt, | ||||||
|  |             ) | ||||||
|  |             latency = time.time() - query_start | ||||||
|  |             if ( | ||||||
|  |                 not timedout | ||||||
|  |                 # and latency <= max_timeout | ||||||
|  |             ): | ||||||
|  |                 count += 1 | ||||||
|  |                 mean += latency / count | ||||||
|  |                 print( | ||||||
|  |                     f'HISTORY FRAME QUERY LATENCY: {latency}\n' | ||||||
|  |                     f'mean: {mean}' | ||||||
|  |                 ) | ||||||
| 
 | 
 | ||||||
|             # TODO: add logic here to handle tradable hours and only grab |  | ||||||
|             # valid bars in the range |  | ||||||
|             if out is None: |             if out is None: | ||||||
|                 # could be trying to retreive bars over weekend |                 # could be trying to retreive bars over weekend | ||||||
|                 log.error(f"Can't grab bars starting at {end_dt}!?!?") |                 log.error(f"Can't grab bars starting at {end_dt}!?!?") | ||||||
|                 raise NoData( |                 raise NoData( | ||||||
|                     f'{end_dt}', |                     f'{end_dt}', | ||||||
|                     frame_size=2000, |                     # frame_size=2000, | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|             bars, bars_array, first_dt, last_dt = out |             bars, bars_array, first_dt, last_dt = out | ||||||
|  | @ -145,7 +175,7 @@ async def open_history_client( | ||||||
|         # quite sure why.. needs some tinkering and probably |         # quite sure why.. needs some tinkering and probably | ||||||
|         # a lookthrough of the ``ib_insync`` machinery, for eg. maybe |         # a lookthrough of the ``ib_insync`` machinery, for eg. maybe | ||||||
|         # we have to do the batch queries on the `asyncio` side? |         # we have to do the batch queries on the `asyncio` side? | ||||||
|         yield get_hist, {'erlangs': 1, 'rate': 6} |         yield get_hist, {'erlangs': 1, 'rate': 3} | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| _pacing: str = ( | _pacing: str = ( | ||||||
|  | @ -154,261 +184,266 @@ _pacing: str = ( | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | async def wait_on_data_reset( | ||||||
|  |     proxy: MethodProxy, | ||||||
|  |     reset_type: str = 'data', | ||||||
|  |     timeout: float = 16, | ||||||
|  | 
 | ||||||
|  |     task_status: TaskStatus[ | ||||||
|  |         tuple[ | ||||||
|  |             trio.CancelScope, | ||||||
|  |             trio.Event, | ||||||
|  |         ] | ||||||
|  |     ] = trio.TASK_STATUS_IGNORED, | ||||||
|  | ) -> bool: | ||||||
|  | 
 | ||||||
|  |     # TODO: we might have to put a task lock around this | ||||||
|  |     # method.. | ||||||
|  |     hist_ev = proxy.status_event( | ||||||
|  |         'HMDS data farm connection is OK:ushmds' | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     # XXX: other event messages we might want to try and | ||||||
|  |     # wait for but i wasn't able to get any of this | ||||||
|  |     # reliable.. | ||||||
|  |     # reconnect_start = proxy.status_event( | ||||||
|  |     #     'Market data farm is connecting:usfuture' | ||||||
|  |     # ) | ||||||
|  |     # live_ev = proxy.status_event( | ||||||
|  |     #     'Market data farm connection is OK:usfuture' | ||||||
|  |     # ) | ||||||
|  |     # try to wait on the reset event(s) to arrive, a timeout | ||||||
|  |     # will trigger a retry up to 6 times (for now). | ||||||
|  | 
 | ||||||
|  |     done = trio.Event() | ||||||
|  |     with trio.move_on_after(timeout) as cs: | ||||||
|  | 
 | ||||||
|  |         task_status.started((cs, done)) | ||||||
|  | 
 | ||||||
|  |         log.warning('Sending DATA RESET request') | ||||||
|  |         res = await data_reset_hack(reset_type=reset_type) | ||||||
|  | 
 | ||||||
|  |         if not res: | ||||||
|  |             log.warning( | ||||||
|  |                 'NO VNC DETECTED!\n' | ||||||
|  |                 'Manually press ctrl-alt-f on your IB java app' | ||||||
|  |             ) | ||||||
|  |             done.set() | ||||||
|  |             return False | ||||||
|  | 
 | ||||||
|  |         # TODO: not sure if waiting on other events | ||||||
|  |         # is all that useful here or not. | ||||||
|  |         # - in theory you could wait on one of the ones above first | ||||||
|  |         # to verify the reset request was sent? | ||||||
|  |         # - we need the same for real-time quote feeds which can | ||||||
|  |         # sometimes flake out and stop delivering.. | ||||||
|  |         for name, ev in [ | ||||||
|  |             ('history', hist_ev), | ||||||
|  |         ]: | ||||||
|  |             await ev.wait() | ||||||
|  |             log.info(f"{name} DATA RESET") | ||||||
|  |             done.set() | ||||||
|  |             return True | ||||||
|  | 
 | ||||||
|  |     if cs.cancel_called: | ||||||
|  |         log.warning( | ||||||
|  |             'Data reset task canceled?' | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |     done.set() | ||||||
|  |     return False | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | _data_resetter_task: trio.Task | None = None | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| async def get_bars( | async def get_bars( | ||||||
| 
 | 
 | ||||||
|     proxy: MethodProxy, |     proxy: MethodProxy, | ||||||
|     fqsn: str, |     fqsn: str, | ||||||
|  |     timeframe: int, | ||||||
| 
 | 
 | ||||||
|     # blank to start which tells ib to look up the latest datum |     # blank to start which tells ib to look up the latest datum | ||||||
|     end_dt: str = '', |     end_dt: str = '', | ||||||
| 
 | 
 | ||||||
|  |     # TODO: make this more dynamic based on measured frame rx latency.. | ||||||
|  |     timeout: float = 3,  # how long before we trigger a feed reset | ||||||
|  | 
 | ||||||
|  |     task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, | ||||||
|  | 
 | ||||||
| ) -> (dict, np.ndarray): | ) -> (dict, np.ndarray): | ||||||
|     ''' |     ''' | ||||||
|     Retrieve historical data from a ``trio``-side task using |     Retrieve historical data from a ``trio``-side task using | ||||||
|     a ``MethoProxy``. |     a ``MethoProxy``. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     fails = 0 |     global _data_resetter_task | ||||||
|     bars: Optional[list] = None |  | ||||||
|     first_dt: datetime = None |  | ||||||
|     last_dt: datetime = None |  | ||||||
| 
 | 
 | ||||||
|     if end_dt: |     data_cs: trio.CancelScope | None = None | ||||||
|         last_dt = pendulum.from_timestamp(end_dt.timestamp()) |     result: tuple[ | ||||||
|  |         ibis.objects.BarDataList, | ||||||
|  |         np.ndarray, | ||||||
|  |         datetime, | ||||||
|  |         datetime, | ||||||
|  |     ] | None = None | ||||||
|  |     result_ready = trio.Event() | ||||||
| 
 | 
 | ||||||
|     for _ in range(10): |     async def query(): | ||||||
|         try: |         nonlocal result, data_cs, end_dt | ||||||
|             out = await proxy.bars( |         while True: | ||||||
|                 fqsn=fqsn, |             try: | ||||||
|                 end_dt=end_dt, |                 out = await proxy.bars( | ||||||
|             ) |                     fqsn=fqsn, | ||||||
|             if out: |                     end_dt=end_dt, | ||||||
|                 bars, bars_array = out |                     sample_period_s=timeframe, | ||||||
| 
 | 
 | ||||||
|             else: |                     # ideally we cancel the request just before we | ||||||
|                 await tractor.breakpoint() |                     # cancel on the ``trio``-side and trigger a data | ||||||
| 
 |                     # reset hack.. the problem is there's no way (with | ||||||
|             if bars_array is None: |                     # current impl) to detect a cancel case. | ||||||
|                 raise SymbolNotFound(fqsn) |                     # timeout=timeout, | ||||||
| 
 |  | ||||||
|             first_dt = pendulum.from_timestamp( |  | ||||||
|                 bars[0].date.timestamp()) |  | ||||||
| 
 |  | ||||||
|             last_dt = pendulum.from_timestamp( |  | ||||||
|                 bars[-1].date.timestamp()) |  | ||||||
| 
 |  | ||||||
|             time = bars_array['time'] |  | ||||||
|             assert time[-1] == last_dt.timestamp() |  | ||||||
|             assert time[0] == first_dt.timestamp() |  | ||||||
|             log.info( |  | ||||||
|                 f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|             return (bars, bars_array, first_dt, last_dt), fails |  | ||||||
| 
 |  | ||||||
|         except RequestError as err: |  | ||||||
|             msg = err.message |  | ||||||
| 
 |  | ||||||
|             if 'No market data permissions for' in msg: |  | ||||||
|                 # TODO: signalling for no permissions searches |  | ||||||
|                 raise NoData( |  | ||||||
|                     f'Symbol: {fqsn}', |  | ||||||
|                 ) |                 ) | ||||||
| 
 |  | ||||||
|             elif ( |  | ||||||
|                 err.code == 162 and |  | ||||||
|                 'HMDS query returned no data' in err.message |  | ||||||
|             ): |  | ||||||
|                 # XXX: this is now done in the storage mgmt layer |  | ||||||
|                 # and we shouldn't implicitly decrement the frame dt |  | ||||||
|                 # index since the upper layer may be doing so |  | ||||||
|                 # concurrently and we don't want to be delivering frames |  | ||||||
|                 # that weren't asked for. |  | ||||||
|                 log.warning( |  | ||||||
|                     f'NO DATA found ending @ {end_dt}\n' |  | ||||||
|                 ) |  | ||||||
| 
 |  | ||||||
|                 # try to decrement start point and look further back |  | ||||||
|                 # end_dt = last_dt = last_dt.subtract(seconds=2000) |  | ||||||
| 
 |  | ||||||
|                 raise NoData( |  | ||||||
|                     f'Symbol: {fqsn}', |  | ||||||
|                     frame_size=2000, |  | ||||||
|                 ) |  | ||||||
| 
 |  | ||||||
|             # elif ( |  | ||||||
|             #     err.code == 162 and |  | ||||||
|             #     'Trading TWS session is connected from a different IP |  | ||||||
|             #     address' in err.message |  | ||||||
|             # ): |  | ||||||
|             #     log.warning("ignoring ip address warning") |  | ||||||
|             #     continue |  | ||||||
| 
 |  | ||||||
|             elif _pacing in msg: |  | ||||||
| 
 |  | ||||||
|                 log.warning( |  | ||||||
|                     'History throttle rate reached!\n' |  | ||||||
|                     'Resetting farms with `ctrl-alt-f` hack\n' |  | ||||||
|                 ) |  | ||||||
|                 # TODO: we might have to put a task lock around this |  | ||||||
|                 # method.. |  | ||||||
|                 hist_ev = proxy.status_event( |  | ||||||
|                     'HMDS data farm connection is OK:ushmds' |  | ||||||
|                 ) |  | ||||||
| 
 |  | ||||||
|                 # XXX: other event messages we might want to try and |  | ||||||
|                 # wait for but i wasn't able to get any of this |  | ||||||
|                 # reliable.. |  | ||||||
|                 # reconnect_start = proxy.status_event( |  | ||||||
|                 #     'Market data farm is connecting:usfuture' |  | ||||||
|                 # ) |  | ||||||
|                 # live_ev = proxy.status_event( |  | ||||||
|                 #     'Market data farm connection is OK:usfuture' |  | ||||||
|                 # ) |  | ||||||
| 
 |  | ||||||
|                 # try to wait on the reset event(s) to arrive, a timeout |  | ||||||
|                 # will trigger a retry up to 6 times (for now). |  | ||||||
|                 tries: int = 2 |  | ||||||
|                 timeout: float = 10 |  | ||||||
| 
 |  | ||||||
|                 # try 3 time with a data reset then fail over to |  | ||||||
|                 # a connection reset. |  | ||||||
|                 for i in range(1, tries): |  | ||||||
| 
 |  | ||||||
|                     log.warning('Sending DATA RESET request') |  | ||||||
|                     await data_reset_hack(reset_type='data') |  | ||||||
| 
 |  | ||||||
|                     with trio.move_on_after(timeout) as cs: |  | ||||||
|                         for name, ev in [ |  | ||||||
|                             # TODO: not sure if waiting on other events |  | ||||||
|                             # is all that useful here or not. in theory |  | ||||||
|                             # you could wait on one of the ones above |  | ||||||
|                             # first to verify the reset request was |  | ||||||
|                             # sent? |  | ||||||
|                             ('history', hist_ev), |  | ||||||
|                         ]: |  | ||||||
|                             await ev.wait() |  | ||||||
|                             log.info(f"{name} DATA RESET") |  | ||||||
|                             break |  | ||||||
| 
 |  | ||||||
|                     if cs.cancelled_caught: |  | ||||||
|                         fails += 1 |  | ||||||
|                         log.warning( |  | ||||||
|                             f'Data reset {name} timeout, retrying {i}.' |  | ||||||
|                         ) |  | ||||||
| 
 |  | ||||||
|                         continue |  | ||||||
|                 else: |  | ||||||
| 
 |  | ||||||
|                     log.warning('Sending CONNECTION RESET') |  | ||||||
|                     res = await data_reset_hack(reset_type='connection') |  | ||||||
|                     if not res: |  | ||||||
|                         log.warning( |  | ||||||
|                             'NO VNC DETECTED!\n' |  | ||||||
|                             'Manually press ctrl-alt-f on your IB java app' |  | ||||||
|                         ) |  | ||||||
|                         # break |  | ||||||
| 
 |  | ||||||
|                     with trio.move_on_after(timeout) as cs: |  | ||||||
|                         for name, ev in [ |  | ||||||
|                             # TODO: not sure if waiting on other events |  | ||||||
|                             # is all that useful here or not. in theory |  | ||||||
|                             # you could wait on one of the ones above |  | ||||||
|                             # first to verify the reset request was |  | ||||||
|                             # sent? |  | ||||||
|                             ('history', hist_ev), |  | ||||||
|                         ]: |  | ||||||
|                             await ev.wait() |  | ||||||
|                             log.info(f"{name} DATA RESET") |  | ||||||
| 
 |  | ||||||
|                     if cs.cancelled_caught: |  | ||||||
|                         fails += 1 |  | ||||||
|                         log.warning('Data CONNECTION RESET timeout!?') |  | ||||||
| 
 |  | ||||||
|             else: |  | ||||||
|                 raise |  | ||||||
| 
 |  | ||||||
|     return None, None |  | ||||||
|     # else:  # throttle wasn't fixed so error out immediately |  | ||||||
|     #     raise _err |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def backfill_bars( |  | ||||||
| 
 |  | ||||||
|     fqsn: str, |  | ||||||
|     shm: ShmArray,  # type: ignore # noqa |  | ||||||
| 
 |  | ||||||
|     # TODO: we want to avoid overrunning the underlying shm array buffer |  | ||||||
|     # and we should probably calc the number of calls to make depending |  | ||||||
|     # on that until we have the `marketstore` daemon in place in which |  | ||||||
|     # case the shm size will be driven by user config and available sys |  | ||||||
|     # memory. |  | ||||||
|     count: int = 16, |  | ||||||
| 
 |  | ||||||
|     task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, |  | ||||||
| 
 |  | ||||||
| ) -> None: |  | ||||||
|     ''' |  | ||||||
|     Fill historical bars into shared mem / storage afap. |  | ||||||
| 
 |  | ||||||
|     TODO: avoid pacing constraints: |  | ||||||
|     https://github.com/pikers/piker/issues/128 |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     # last_dt1 = None |  | ||||||
|     last_dt = None |  | ||||||
| 
 |  | ||||||
|     with trio.CancelScope() as cs: |  | ||||||
| 
 |  | ||||||
|         async with open_data_client() as proxy: |  | ||||||
| 
 |  | ||||||
|             out, fails = await get_bars(proxy, fqsn) |  | ||||||
| 
 |  | ||||||
|             if out is None: |  | ||||||
|                 raise RuntimeError("Could not pull currrent history?!") |  | ||||||
| 
 |  | ||||||
|             (first_bars, bars_array, first_dt, last_dt) = out |  | ||||||
|             vlm = bars_array['volume'] |  | ||||||
|             vlm[vlm < 0] = 0 |  | ||||||
|             last_dt = first_dt |  | ||||||
| 
 |  | ||||||
|             # write historical data to buffer |  | ||||||
|             shm.push(bars_array) |  | ||||||
| 
 |  | ||||||
|             task_status.started(cs) |  | ||||||
| 
 |  | ||||||
|             i = 0 |  | ||||||
|             while i < count: |  | ||||||
| 
 |  | ||||||
|                 out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) |  | ||||||
| 
 |  | ||||||
|                 if out is None: |                 if out is None: | ||||||
|                     # could be trying to retreive bars over weekend |                     raise NoData(f'{end_dt}') | ||||||
|                     # TODO: add logic here to handle tradable hours and |  | ||||||
|                     # only grab valid bars in the range |  | ||||||
|                     log.error(f"Can't grab bars starting at {first_dt}!?!?") |  | ||||||
| 
 | 
 | ||||||
|                     # XXX: get_bars() should internally decrement dt by |                 bars, bars_array, dt_duration = out | ||||||
|                     # 2k seconds and try again. | 
 | ||||||
|  |                 if not bars: | ||||||
|  |                     log.warning( | ||||||
|  |                         f'History is blank for {dt_duration} from {end_dt}' | ||||||
|  |                     ) | ||||||
|  |                     end_dt -= dt_duration | ||||||
|                     continue |                     continue | ||||||
| 
 | 
 | ||||||
|                 (first_bars, bars_array, first_dt, last_dt) = out |                 if bars_array is None: | ||||||
|                 # last_dt1 = last_dt |                     raise SymbolNotFound(fqsn) | ||||||
|                 # last_dt = first_dt |  | ||||||
| 
 | 
 | ||||||
|                 # volume cleaning since there's -ve entries, |                 first_dt = pendulum.from_timestamp( | ||||||
|                 # wood luv to know what crookery that is.. |                     bars[0].date.timestamp()) | ||||||
|                 vlm = bars_array['volume'] |  | ||||||
|                 vlm[vlm < 0] = 0 |  | ||||||
| 
 | 
 | ||||||
|                 # TODO we should probably dig into forums to see what peeps |                 last_dt = pendulum.from_timestamp( | ||||||
|                 # think this data "means" and then use it as an indicator of |                     bars[-1].date.timestamp()) | ||||||
|                 # sorts? dinkus has mentioned that $vlms for the day dont' |  | ||||||
|                 # match other platforms nor the summary stat tws shows in |  | ||||||
|                 # the monitor - it's probably worth investigating. |  | ||||||
| 
 | 
 | ||||||
|                 shm.push(bars_array, prepend=True) |                 time = bars_array['time'] | ||||||
|                 i += 1 |                 assert time[-1] == last_dt.timestamp() | ||||||
|  |                 assert time[0] == first_dt.timestamp() | ||||||
|  |                 log.info( | ||||||
|  |                     f'{len(bars)} bars retreived {first_dt} -> {last_dt}' | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |                 if data_cs: | ||||||
|  |                     data_cs.cancel() | ||||||
|  | 
 | ||||||
|  |                 result = (bars, bars_array, first_dt, last_dt) | ||||||
|  | 
 | ||||||
|  |                 # signal data reset loop parent task | ||||||
|  |                 result_ready.set() | ||||||
|  | 
 | ||||||
|  |                 return result | ||||||
|  | 
 | ||||||
|  |             except RequestError as err: | ||||||
|  |                 msg = err.message | ||||||
|  | 
 | ||||||
|  |                 if 'No market data permissions for' in msg: | ||||||
|  |                     # TODO: signalling for no permissions searches | ||||||
|  |                     raise NoData( | ||||||
|  |                         f'Symbol: {fqsn}', | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |                 elif err.code == 162: | ||||||
|  |                     if 'HMDS query returned no data' in err.message: | ||||||
|  |                         # XXX: this is now done in the storage mgmt | ||||||
|  |                         # layer and we shouldn't implicitly decrement | ||||||
|  |                         # the frame dt index since the upper layer may | ||||||
|  |                         # be doing so concurrently and we don't want to | ||||||
|  |                         # be delivering frames that weren't asked for. | ||||||
|  |                         log.warning( | ||||||
|  |                             f'NO DATA found ending @ {end_dt}\n' | ||||||
|  |                         ) | ||||||
|  | 
 | ||||||
|  |                         # try to decrement start point and look further back | ||||||
|  |                         # end_dt = end_dt.subtract(seconds=2000) | ||||||
|  |                         end_dt = end_dt.subtract(days=1) | ||||||
|  |                         print("SUBTRACTING DAY") | ||||||
|  |                         continue | ||||||
|  | 
 | ||||||
|  |                     elif 'API historical data query cancelled' in err.message: | ||||||
|  |                         log.warning( | ||||||
|  |                             'Query cancelled by IB (:eyeroll:):\n' | ||||||
|  |                             f'{err.message}' | ||||||
|  |                         ) | ||||||
|  |                         continue | ||||||
|  |                     elif ( | ||||||
|  |                         'Trading TWS session is connected from a different IP' | ||||||
|  |                             in err.message | ||||||
|  |                     ): | ||||||
|  |                         log.warning("ignoring ip address warning") | ||||||
|  |                         continue | ||||||
|  | 
 | ||||||
|  |                 # XXX: more or less same as above timeout case | ||||||
|  |                 elif _pacing in msg: | ||||||
|  |                     log.warning( | ||||||
|  |                         'History throttle rate reached!\n' | ||||||
|  |                         'Resetting farms with `ctrl-alt-f` hack\n' | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |                     # cancel any existing reset task | ||||||
|  |                     if data_cs: | ||||||
|  |                         data_cs.cancel() | ||||||
|  | 
 | ||||||
|  |                     # spawn new data reset task | ||||||
|  |                     data_cs, reset_done = await nurse.start( | ||||||
|  |                         partial( | ||||||
|  |                             wait_on_data_reset, | ||||||
|  |                             proxy, | ||||||
|  |                             timeout=float('inf'), | ||||||
|  |                             reset_type='connection' | ||||||
|  |                         ) | ||||||
|  |                     ) | ||||||
|  |                     continue | ||||||
|  | 
 | ||||||
|  |                 else: | ||||||
|  |                     raise | ||||||
|  | 
 | ||||||
|  |     # TODO: make this global across all history task/requests | ||||||
|  |     # such that simultaneous symbol queries don't try data resettingn | ||||||
|  |     # too fast.. | ||||||
|  |     unset_resetter: bool = False | ||||||
|  |     async with trio.open_nursery() as nurse: | ||||||
|  | 
 | ||||||
|  |         # start history request that we allow | ||||||
|  |         # to run indefinitely until a result is acquired | ||||||
|  |         nurse.start_soon(query) | ||||||
|  | 
 | ||||||
|  |         # start history reset loop which waits up to the timeout | ||||||
|  |         # for a result before triggering a data feed reset. | ||||||
|  |         while not result_ready.is_set(): | ||||||
|  | 
 | ||||||
|  |             with trio.move_on_after(timeout): | ||||||
|  |                 await result_ready.wait() | ||||||
|  |                 break | ||||||
|  | 
 | ||||||
|  |             if _data_resetter_task: | ||||||
|  |                 # don't double invoke the reset hack if another | ||||||
|  |                 # requester task already has it covered. | ||||||
|  |                 continue | ||||||
|  |             else: | ||||||
|  |                 _data_resetter_task = trio.lowlevel.current_task() | ||||||
|  |                 unset_resetter = True | ||||||
|  | 
 | ||||||
|  |             # spawn new data reset task | ||||||
|  |             data_cs, reset_done = await nurse.start( | ||||||
|  |                 partial( | ||||||
|  |                     wait_on_data_reset, | ||||||
|  |                     proxy, | ||||||
|  |                     timeout=float('inf'), | ||||||
|  |                 ) | ||||||
|  |             ) | ||||||
|  |             # sync wait on reset to complete | ||||||
|  |             await reset_done.wait() | ||||||
|  | 
 | ||||||
|  |     _data_resetter_task = None if unset_resetter else _data_resetter_task | ||||||
|  |     return result, data_cs is not None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| asset_type_map = { | asset_type_map = { | ||||||
|  | @ -466,7 +501,9 @@ async def _setup_quote_stream( | ||||||
| 
 | 
 | ||||||
|     to_trio.send_nowait(None) |     to_trio.send_nowait(None) | ||||||
| 
 | 
 | ||||||
|     async with load_aio_clients() as accts2clients: |     async with load_aio_clients( | ||||||
|  |         disconnect_on_exit=False, | ||||||
|  |     ) as accts2clients: | ||||||
|         caccount_name, client = get_preferred_data_client(accts2clients) |         caccount_name, client = get_preferred_data_client(accts2clients) | ||||||
|         contract = contract or (await client.find_contract(symbol)) |         contract = contract or (await client.find_contract(symbol)) | ||||||
|         ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) |         ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) | ||||||
|  | @ -512,10 +549,11 @@ async def _setup_quote_stream( | ||||||
|                 # Manually do the dereg ourselves. |                 # Manually do the dereg ourselves. | ||||||
|                 teardown() |                 teardown() | ||||||
|             except trio.WouldBlock: |             except trio.WouldBlock: | ||||||
|                 log.warning( |                 # log.warning( | ||||||
|                     f'channel is blocking symbol feed for {symbol}?' |                 #     f'channel is blocking symbol feed for {symbol}?' | ||||||
|                     f'\n{to_trio.statistics}' |                 #     f'\n{to_trio.statistics}' | ||||||
|                 ) |                 # ) | ||||||
|  |                 pass | ||||||
| 
 | 
 | ||||||
|             # except trio.WouldBlock: |             # except trio.WouldBlock: | ||||||
|             #     # for slow debugging purposes to avoid clobbering prompt |             #     # for slow debugging purposes to avoid clobbering prompt | ||||||
|  | @ -545,7 +583,8 @@ async def open_aio_quote_stream( | ||||||
|     from_aio = _quote_streams.get(symbol) |     from_aio = _quote_streams.get(symbol) | ||||||
|     if from_aio: |     if from_aio: | ||||||
| 
 | 
 | ||||||
|         # if we already have a cached feed deliver a rx side clone to consumer |         # if we already have a cached feed deliver a rx side clone | ||||||
|  |         # to consumer | ||||||
|         async with broadcast_receiver( |         async with broadcast_receiver( | ||||||
|             from_aio, |             from_aio, | ||||||
|             2**6, |             2**6, | ||||||
|  | @ -736,67 +775,97 @@ async def stream_quotes( | ||||||
|             await trio.sleep_forever() |             await trio.sleep_forever() | ||||||
|             return  # we never expect feed to come up? |             return  # we never expect feed to come up? | ||||||
| 
 | 
 | ||||||
|         async with open_aio_quote_stream( |         cs: Optional[trio.CancelScope] = None | ||||||
|             symbol=sym, |         startup: bool = True | ||||||
|             contract=con, |         while ( | ||||||
|         ) as stream: |             startup | ||||||
| 
 |             or cs.cancel_called | ||||||
|             # ugh, clear ticks since we've consumed them |         ): | ||||||
|             # (ahem, ib_insync is stateful trash) |             with trio.CancelScope() as cs: | ||||||
|             first_ticker.ticks = [] |                 async with ( | ||||||
| 
 |                     trio.open_nursery() as nurse, | ||||||
|             task_status.started((init_msgs, first_quote)) |                     open_aio_quote_stream( | ||||||
| 
 |                         symbol=sym, | ||||||
|             async with aclosing(stream): |                         contract=con, | ||||||
|                 if syminfo.get('no_vlm', False): |                     ) as stream, | ||||||
| 
 |                 ): | ||||||
|                     # generally speaking these feeds don't |  | ||||||
|                     # include vlm data. |  | ||||||
|                     atype = syminfo['asset_type'] |  | ||||||
|                     log.info( |  | ||||||
|                         f'Non-vlm asset {sym}@{atype}, skipping quote poll...' |  | ||||||
|                     ) |  | ||||||
| 
 |  | ||||||
|                 else: |  | ||||||
|                     # wait for real volume on feed (trading might be closed) |  | ||||||
|                     while True: |  | ||||||
|                         ticker = await stream.receive() |  | ||||||
| 
 |  | ||||||
|                         # for a real volume contract we rait for the first |  | ||||||
|                         # "real" trade to take place |  | ||||||
|                         if ( |  | ||||||
|                             # not calc_price |  | ||||||
|                             # and not ticker.rtTime |  | ||||||
|                             not ticker.rtTime |  | ||||||
|                         ): |  | ||||||
|                             # spin consuming tickers until we get a real |  | ||||||
|                             # market datum |  | ||||||
|                             log.debug(f"New unsent ticker: {ticker}") |  | ||||||
|                             continue |  | ||||||
|                         else: |  | ||||||
|                             log.debug("Received first real volume tick") |  | ||||||
|                             # ugh, clear ticks since we've consumed them |  | ||||||
|                             # (ahem, ib_insync is truly stateful trash) |  | ||||||
|                             ticker.ticks = [] |  | ||||||
| 
 |  | ||||||
|                             # XXX: this works because we don't use |  | ||||||
|                             # ``aclosing()`` above? |  | ||||||
|                             break |  | ||||||
| 
 |  | ||||||
|                     quote = normalize(ticker) |  | ||||||
|                     log.debug(f"First ticker received {quote}") |  | ||||||
| 
 |  | ||||||
|                 # tell caller quotes are now coming in live |  | ||||||
|                 feed_is_live.set() |  | ||||||
| 
 |  | ||||||
|                 # last = time.time() |  | ||||||
|                 async for ticker in stream: |  | ||||||
|                     quote = normalize(ticker) |  | ||||||
|                     await send_chan.send({quote['fqsn']: quote}) |  | ||||||
| 
 |  | ||||||
|                     # ugh, clear ticks since we've consumed them |                     # ugh, clear ticks since we've consumed them | ||||||
|                     ticker.ticks = [] |                     # (ahem, ib_insync is stateful trash) | ||||||
|                     # last = time.time() |                     first_ticker.ticks = [] | ||||||
|  | 
 | ||||||
|  |                     # only on first entry at feed boot up | ||||||
|  |                     if startup: | ||||||
|  |                         startup = False | ||||||
|  |                         task_status.started((init_msgs, first_quote)) | ||||||
|  | 
 | ||||||
|  |                     # start a stream restarter task which monitors the | ||||||
|  |                     # data feed event. | ||||||
|  |                     async def reset_on_feed(): | ||||||
|  | 
 | ||||||
|  |                         # TODO: this seems to be surpressed from the | ||||||
|  |                         # traceback in ``tractor``? | ||||||
|  |                         # assert 0 | ||||||
|  | 
 | ||||||
|  |                         rt_ev = proxy.status_event( | ||||||
|  |                             'Market data farm connection is OK:usfarm' | ||||||
|  |                         ) | ||||||
|  |                         await rt_ev.wait() | ||||||
|  |                         cs.cancel()  # cancel called should now be set | ||||||
|  | 
 | ||||||
|  |                     nurse.start_soon(reset_on_feed) | ||||||
|  | 
 | ||||||
|  |                     async with aclosing(stream): | ||||||
|  |                         if syminfo.get('no_vlm', False): | ||||||
|  | 
 | ||||||
|  |                             # generally speaking these feeds don't | ||||||
|  |                             # include vlm data. | ||||||
|  |                             atype = syminfo['asset_type'] | ||||||
|  |                             log.info( | ||||||
|  |                                 f'No-vlm {sym}@{atype}, skipping quote poll' | ||||||
|  |                             ) | ||||||
|  | 
 | ||||||
|  |                         else: | ||||||
|  |                             # wait for real volume on feed (trading might be | ||||||
|  |                             # closed) | ||||||
|  |                             while True: | ||||||
|  |                                 ticker = await stream.receive() | ||||||
|  | 
 | ||||||
|  |                                 # for a real volume contract we rait for | ||||||
|  |                                 # the first "real" trade to take place | ||||||
|  |                                 if ( | ||||||
|  |                                     # not calc_price | ||||||
|  |                                     # and not ticker.rtTime | ||||||
|  |                                     not ticker.rtTime | ||||||
|  |                                 ): | ||||||
|  |                                     # spin consuming tickers until we | ||||||
|  |                                     # get a real market datum | ||||||
|  |                                     log.debug(f"New unsent ticker: {ticker}") | ||||||
|  |                                     continue | ||||||
|  |                                 else: | ||||||
|  |                                     log.debug("Received first volume tick") | ||||||
|  |                                     # ugh, clear ticks since we've | ||||||
|  |                                     # consumed them (ahem, ib_insync is | ||||||
|  |                                     # truly stateful trash) | ||||||
|  |                                     ticker.ticks = [] | ||||||
|  | 
 | ||||||
|  |                                     # XXX: this works because we don't use | ||||||
|  |                                     # ``aclosing()`` above? | ||||||
|  |                                     break | ||||||
|  | 
 | ||||||
|  |                             quote = normalize(ticker) | ||||||
|  |                             log.debug(f"First ticker received {quote}") | ||||||
|  | 
 | ||||||
|  |                         # tell caller quotes are now coming in live | ||||||
|  |                         feed_is_live.set() | ||||||
|  | 
 | ||||||
|  |                         # last = time.time() | ||||||
|  |                         async for ticker in stream: | ||||||
|  |                             quote = normalize(ticker) | ||||||
|  |                             await send_chan.send({quote['fqsn']: quote}) | ||||||
|  | 
 | ||||||
|  |                             # ugh, clear ticks since we've consumed them | ||||||
|  |                             ticker.ticks = [] | ||||||
|  |                             # last = time.time() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def data_reset_hack( | async def data_reset_hack( | ||||||
|  | @ -904,7 +973,14 @@ async def open_symbol_search( | ||||||
|                     except trio.WouldBlock: |                     except trio.WouldBlock: | ||||||
|                         pass |                         pass | ||||||
| 
 | 
 | ||||||
|                 if not pattern or pattern.isspace(): |                 if ( | ||||||
|  |                     not pattern | ||||||
|  |                     or pattern.isspace() | ||||||
|  | 
 | ||||||
|  |                     # XXX: not sure if this is a bad assumption but it | ||||||
|  |                     # seems to make search snappier? | ||||||
|  |                     or len(pattern) < 1 | ||||||
|  |                 ): | ||||||
|                     log.warning('empty pattern received, skipping..') |                     log.warning('empty pattern received, skipping..') | ||||||
| 
 | 
 | ||||||
|                     # TODO: *BUG* if nothing is returned here the client |                     # TODO: *BUG* if nothing is returned here the client | ||||||
|  |  | ||||||
|  | @ -258,6 +258,7 @@ async def open_history_client( | ||||||
|         queries: int = 0 |         queries: int = 0 | ||||||
| 
 | 
 | ||||||
|         async def get_ohlc( |         async def get_ohlc( | ||||||
|  |             timeframe: float, | ||||||
|             end_dt: Optional[datetime] = None, |             end_dt: Optional[datetime] = None, | ||||||
|             start_dt: Optional[datetime] = None, |             start_dt: Optional[datetime] = None, | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -138,25 +138,26 @@ def cli(ctx, brokers, loglevel, tl, configdir): | ||||||
| @click.pass_obj | @click.pass_obj | ||||||
| def services(config, tl, names): | def services(config, tl, names): | ||||||
| 
 | 
 | ||||||
|     async def list_services(): |     from .._daemon import open_piker_runtime | ||||||
| 
 | 
 | ||||||
|         async with tractor.get_arbiter( |     async def list_services(): | ||||||
|             *_tractor_kwargs['arbiter_addr'] |         async with ( | ||||||
|         ) as portal: |             open_piker_runtime( | ||||||
|  |                 name='service_query', | ||||||
|  |                 loglevel=config['loglevel'] if tl else None, | ||||||
|  |             ), | ||||||
|  |             tractor.get_arbiter( | ||||||
|  |                 *_tractor_kwargs['arbiter_addr'] | ||||||
|  |             ) as portal | ||||||
|  |         ): | ||||||
|             registry = await portal.run_from_ns('self', 'get_registry') |             registry = await portal.run_from_ns('self', 'get_registry') | ||||||
|             json_d = {} |             json_d = {} | ||||||
|             for key, socket in registry.items(): |             for key, socket in registry.items(): | ||||||
|                 # name, uuid = uid |  | ||||||
|                 host, port = socket |                 host, port = socket | ||||||
|                 json_d[key] = f'{host}:{port}' |                 json_d[key] = f'{host}:{port}' | ||||||
|             click.echo(f"{colorize_json(json_d)}") |             click.echo(f"{colorize_json(json_d)}") | ||||||
| 
 | 
 | ||||||
|     tractor.run( |     trio.run(list_services) | ||||||
|         list_services, |  | ||||||
|         name='service_query', |  | ||||||
|         loglevel=config['loglevel'] if tl else None, |  | ||||||
|         arbiter_addr=_tractor_kwargs['arbiter_addr'], |  | ||||||
|     ) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _load_clis() -> None: | def _load_clis() -> None: | ||||||
|  |  | ||||||
|  | @ -21,16 +21,19 @@ This module is enabled for ``brokerd`` daemons. | ||||||
| 
 | 
 | ||||||
| """ | """ | ||||||
| from __future__ import annotations | from __future__ import annotations | ||||||
| from dataclasses import dataclass, field |  | ||||||
| from datetime import datetime |  | ||||||
| from contextlib import asynccontextmanager | from contextlib import asynccontextmanager | ||||||
|  | from dataclasses import ( | ||||||
|  |     dataclass, | ||||||
|  |     field, | ||||||
|  | ) | ||||||
|  | from datetime import datetime | ||||||
| from functools import partial | from functools import partial | ||||||
| from pprint import pformat |  | ||||||
| from types import ModuleType | from types import ModuleType | ||||||
| from typing import ( | from typing import ( | ||||||
|     Any, |     Any, | ||||||
|     AsyncIterator, Optional, |     AsyncIterator, | ||||||
|     Generator, |     Callable, | ||||||
|  |     Optional, | ||||||
|     Awaitable, |     Awaitable, | ||||||
|     TYPE_CHECKING, |     TYPE_CHECKING, | ||||||
|     Union, |     Union, | ||||||
|  | @ -39,7 +42,6 @@ from typing import ( | ||||||
| import trio | import trio | ||||||
| from trio.abc import ReceiveChannel | from trio.abc import ReceiveChannel | ||||||
| from trio_typing import TaskStatus | from trio_typing import TaskStatus | ||||||
| import trimeter |  | ||||||
| import tractor | import tractor | ||||||
| from tractor.trionics import maybe_open_context | from tractor.trionics import maybe_open_context | ||||||
| import pendulum | import pendulum | ||||||
|  | @ -252,6 +254,7 @@ async def start_backfill( | ||||||
|     mod: ModuleType, |     mod: ModuleType, | ||||||
|     bfqsn: str, |     bfqsn: str, | ||||||
|     shm: ShmArray, |     shm: ShmArray, | ||||||
|  |     timeframe: float, | ||||||
| 
 | 
 | ||||||
|     last_tsdb_dt: Optional[datetime] = None, |     last_tsdb_dt: Optional[datetime] = None, | ||||||
|     storage: Optional[Storage] = None, |     storage: Optional[Storage] = None, | ||||||
|  | @ -262,11 +265,19 @@ async def start_backfill( | ||||||
| 
 | 
 | ||||||
| ) -> int: | ) -> int: | ||||||
| 
 | 
 | ||||||
|  |     hist: Callable[ | ||||||
|  |         [int, datetime, datetime], | ||||||
|  |         tuple[np.ndarray, str] | ||||||
|  |     ] | ||||||
|  |     config: dict[str, int] | ||||||
|     async with mod.open_history_client(bfqsn) as (hist, config): |     async with mod.open_history_client(bfqsn) as (hist, config): | ||||||
| 
 | 
 | ||||||
|         # get latest query's worth of history all the way |         # get latest query's worth of history all the way | ||||||
|         # back to what is recorded in the tsdb |         # back to what is recorded in the tsdb | ||||||
|         array, start_dt, end_dt = await hist(end_dt=None) |         array, start_dt, end_dt = await hist( | ||||||
|  |             timeframe, | ||||||
|  |             end_dt=None, | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|         times = array['time'] |         times = array['time'] | ||||||
| 
 | 
 | ||||||
|  | @ -289,6 +300,9 @@ async def start_backfill( | ||||||
|         log.info(f'Pushing {to_push.size} to shm!') |         log.info(f'Pushing {to_push.size} to shm!') | ||||||
|         shm.push(to_push) |         shm.push(to_push) | ||||||
| 
 | 
 | ||||||
|  |         # TODO: *** THIS IS A BUG *** | ||||||
|  |         # we need to only broadcast to subscribers for this fqsn.. | ||||||
|  |         # otherwise all fsps get reset on every chart.. | ||||||
|         for delay_s in sampler.subscribers: |         for delay_s in sampler.subscribers: | ||||||
|             await broadcast(delay_s) |             await broadcast(delay_s) | ||||||
| 
 | 
 | ||||||
|  | @ -304,8 +318,8 @@ async def start_backfill( | ||||||
|                 raise ValueError( |                 raise ValueError( | ||||||
|                     '`piker` only needs to support 1m and 1s sampling ' |                     '`piker` only needs to support 1m and 1s sampling ' | ||||||
|                     'but ur api is trying to deliver a longer ' |                     'but ur api is trying to deliver a longer ' | ||||||
|                     f'timeframe of {step_size_s} ' 'seconds.. so ye, dun ' |                     f'timeframe of {step_size_s} seconds..\n' | ||||||
|                     'do dat brudder.' |                     'So yuh.. dun do dat brudder.' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|             # when no tsdb "last datum" is provided, we just load |             # when no tsdb "last datum" is provided, we just load | ||||||
|  | @ -319,96 +333,60 @@ async def start_backfill( | ||||||
|                 # do a decently sized backfill and load it into storage. |                 # do a decently sized backfill and load it into storage. | ||||||
|                 periods = { |                 periods = { | ||||||
|                     1: {'days': 6}, |                     1: {'days': 6}, | ||||||
|                     60: {'years': 2}, |                     60: {'years': 6}, | ||||||
|                 } |                 } | ||||||
| 
 | 
 | ||||||
|             kwargs = periods[step_size_s] |             kwargs = periods[step_size_s] | ||||||
|             last_tsdb_dt = start_dt.subtract(**kwargs) |             last_tsdb_dt = start_dt.subtract(**kwargs) | ||||||
| 
 | 
 | ||||||
|         # configure async query throttling |         # configure async query throttling | ||||||
|         erlangs = config.get('erlangs', 1) |         # rate = config.get('rate', 1) | ||||||
|         rate = config.get('rate', 1) |         # XXX: legacy from ``trimeter`` code but unsupported now. | ||||||
|         frames = {} |         # erlangs = config.get('erlangs', 1) | ||||||
| 
 |  | ||||||
|         def iter_dts(start: datetime): |  | ||||||
| 
 |  | ||||||
|             while True: |  | ||||||
| 
 |  | ||||||
|                 hist_period = pendulum.period( |  | ||||||
|                     start, |  | ||||||
|                     last_tsdb_dt, |  | ||||||
|                 ) |  | ||||||
|                 dtrange = list(hist_period.range('seconds', frame_size_s)) |  | ||||||
|                 log.debug(f'New datetime index:\n{pformat(dtrange)}') |  | ||||||
| 
 |  | ||||||
|                 for end_dt in dtrange: |  | ||||||
|                     log.info(f'Yielding next frame start {end_dt}') |  | ||||||
|                     start = yield end_dt |  | ||||||
| 
 |  | ||||||
|                     # if caller sends a new start date, reset to that |  | ||||||
|                     if start is not None: |  | ||||||
|                         log.warning(f'Resetting date range: {start}') |  | ||||||
|                         break |  | ||||||
|                 else: |  | ||||||
|                     # from while |  | ||||||
|                     return |  | ||||||
| 
 |  | ||||||
|         # pull new history frames until we hit latest |  | ||||||
|         # already in the tsdb or a max count. |  | ||||||
|         count = 0 |  | ||||||
| 
 |  | ||||||
|         # NOTE: when gaps are detected in the retreived history (by |  | ||||||
|         # comparisor of the end - start versus the expected "frame size" |  | ||||||
|         # in seconds) we need a way to alert the async request code not |  | ||||||
|         # to continue to query for data "within the gap". This var is |  | ||||||
|         # set in such cases such that further requests in that period |  | ||||||
|         # are discarded and further we reset the "datetimem query frame |  | ||||||
|         # index" in such cases to avoid needless noop requests. |  | ||||||
|         earliest_end_dt: Optional[datetime] = start_dt |  | ||||||
| 
 |  | ||||||
|         async def get_ohlc_frame( |  | ||||||
|             input_end_dt: datetime, |  | ||||||
|             iter_dts_gen: Generator[datetime], |  | ||||||
| 
 |  | ||||||
|         ) -> np.ndarray: |  | ||||||
| 
 |  | ||||||
|             nonlocal count, frames, earliest_end_dt, frame_size_s |  | ||||||
|             count += 1 |  | ||||||
| 
 |  | ||||||
|             if input_end_dt > earliest_end_dt: |  | ||||||
|                 # if a request comes in for an inter-gap frame we |  | ||||||
|                 # discard it since likely this request is still |  | ||||||
|                 # lingering from before the reset of ``iter_dts()`` via |  | ||||||
|                 # ``.send()`` below. |  | ||||||
|                 log.info(f'Discarding request history ending @ {input_end_dt}') |  | ||||||
| 
 |  | ||||||
|                 # signals to ``trimeter`` loop to discard and |  | ||||||
|                 # ``continue`` in it's schedule loop. |  | ||||||
|                 return None |  | ||||||
| 
 | 
 | ||||||
|  |         # inline sequential loop where we simply pass the | ||||||
|  |         # last retrieved start dt to the next request as | ||||||
|  |         # it's end dt. | ||||||
|  |         starts: set[datetime] = set() | ||||||
|  |         while start_dt > last_tsdb_dt: | ||||||
|             try: |             try: | ||||||
|                 log.info( |                 log.info( | ||||||
|                     f'Requesting {step_size_s}s frame ending in {input_end_dt}' |                     f'Requesting {step_size_s}s frame ending in {start_dt}' | ||||||
|                 ) |                 ) | ||||||
|                 array, start_dt, end_dt = await hist(end_dt=input_end_dt) |                 array, next_start_dt, end_dt = await hist( | ||||||
|  |                     timeframe, | ||||||
|  |                     end_dt=start_dt, | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |                 if next_start_dt in starts: | ||||||
|  |                     start_dt = min(starts) | ||||||
|  |                     print("SKIPPING DUPLICATE FRAME @ {next_start_dt}") | ||||||
|  |                     continue | ||||||
|  | 
 | ||||||
|  |                 # only update new start point if new | ||||||
|  |                 start_dt = next_start_dt | ||||||
|  |                 starts.add(start_dt) | ||||||
|  | 
 | ||||||
|                 assert array['time'][0] == start_dt.timestamp() |                 assert array['time'][0] == start_dt.timestamp() | ||||||
| 
 | 
 | ||||||
|             except NoData: |             except NoData: | ||||||
|  |                 # XXX: unhandled history gap (shouldn't happen?) | ||||||
|                 log.warning( |                 log.warning( | ||||||
|                     f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?' |                     f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?' | ||||||
|                 ) |                 ) | ||||||
|                 return None  # discard signal |                 await tractor.breakpoint() | ||||||
| 
 | 
 | ||||||
|             except DataUnavailable as duerr: |             except DataUnavailable:  # as duerr: | ||||||
|                 # broker is being a bish and we can't pull |                 # broker is being a bish and we can't pull any more.. | ||||||
|                 # any more.. |                 log.warning( | ||||||
|                 log.warning('backend halted on data deliver !?!?') |                     f'NO-MORE-DATA: backend {mod.name} halted history!?' | ||||||
|  |                 ) | ||||||
| 
 | 
 | ||||||
|                 # ugh, what's a better way? |                 # ugh, what's a better way? | ||||||
|                 # TODO: fwiw, we probably want a way to signal a throttle |                 # TODO: fwiw, we probably want a way to signal a throttle | ||||||
|                 # condition (eg. with ib) so that we can halt the |                 # condition (eg. with ib) so that we can halt the | ||||||
|                 # request loop until the condition is resolved? |                 # request loop until the condition is resolved? | ||||||
|                 return duerr |                 return | ||||||
| 
 | 
 | ||||||
|             diff = end_dt - start_dt |             diff = end_dt - start_dt | ||||||
|             frame_time_diff_s = diff.seconds |             frame_time_diff_s = diff.seconds | ||||||
|  | @ -419,42 +397,12 @@ async def start_backfill( | ||||||
|                 # XXX: query result includes a start point prior to our |                 # XXX: query result includes a start point prior to our | ||||||
|                 # expected "frame size" and thus is likely some kind of |                 # expected "frame size" and thus is likely some kind of | ||||||
|                 # history gap (eg. market closed period, outage, etc.) |                 # history gap (eg. market closed period, outage, etc.) | ||||||
|                 # so indicate to the request loop that this gap is |                 # so just report it to console for now. | ||||||
|                 # expected by both, |  | ||||||
|                 # - resetting the ``iter_dts()`` generator to start at |  | ||||||
|                 #   the new start point delivered in this result |  | ||||||
|                 # - setting the non-locally scoped ``earliest_end_dt`` |  | ||||||
|                 #   to this new value so that the request loop doesn't |  | ||||||
|                 #   get tripped up thinking there's an out of order |  | ||||||
|                 #   request-result condition. |  | ||||||
| 
 |  | ||||||
|                 log.warning( |                 log.warning( | ||||||
|                     f'History frame ending @ {end_dt} appears to have a gap:\n' |                     f'History frame ending @ {end_dt} appears to have a gap:\n' | ||||||
|                     f'{diff} ~= {frame_time_diff_s} seconds' |                     f'{diff} ~= {frame_time_diff_s} seconds' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|                 # reset dtrange gen to new start point |  | ||||||
|                 try: |  | ||||||
|                     next_end = iter_dts_gen.send(start_dt) |  | ||||||
|                     log.info( |  | ||||||
|                         f'Reset frame index to start at {start_dt}\n' |  | ||||||
|                         f'Was at {next_end}' |  | ||||||
|                     ) |  | ||||||
| 
 |  | ||||||
|                     # NOTE: manually set "earliest end datetime" index-value |  | ||||||
|                     # to avoid the request loop getting confused about |  | ||||||
|                     # new frames that are earlier in history - i.e. this |  | ||||||
|                     # **is not** the case of out-of-order frames from |  | ||||||
|                     # an async batch request. |  | ||||||
|                     earliest_end_dt = start_dt |  | ||||||
| 
 |  | ||||||
|                 except StopIteration: |  | ||||||
|                     # gen already terminated meaning we probably already |  | ||||||
|                     # exhausted it via frame requests. |  | ||||||
|                     log.info( |  | ||||||
|                         "Datetime index already exhausted, can't reset.." |  | ||||||
|                     ) |  | ||||||
| 
 |  | ||||||
|             to_push = diff_history( |             to_push = diff_history( | ||||||
|                 array, |                 array, | ||||||
|                 start_dt, |                 start_dt, | ||||||
|  | @ -464,194 +412,53 @@ async def start_backfill( | ||||||
|             ln = len(to_push) |             ln = len(to_push) | ||||||
|             if ln: |             if ln: | ||||||
|                 log.info(f'{ln} bars for {start_dt} -> {end_dt}') |                 log.info(f'{ln} bars for {start_dt} -> {end_dt}') | ||||||
|                 frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) |  | ||||||
|                 return to_push, start_dt, end_dt |  | ||||||
| 
 | 
 | ||||||
|             else: |             else: | ||||||
|                 log.warning( |                 log.warning( | ||||||
|                     f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' |                     f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {end_dt}' | ||||||
|                 ) |                 ) | ||||||
|                 return None |  | ||||||
| 
 | 
 | ||||||
|         # initial dt index starts at the start of the first query result |             # bail gracefully on shm allocation overrun/full condition | ||||||
|         idts = iter_dts(start_dt) |             try: | ||||||
|  |                 shm.push(to_push, prepend=True) | ||||||
|  |             except ValueError: | ||||||
|  |                 log.info( | ||||||
|  |                     f'Shm buffer overrun on: {start_dt} -> {end_dt}?' | ||||||
|  |                 ) | ||||||
|  |                 break | ||||||
| 
 | 
 | ||||||
|         async with trimeter.amap( |             log.info( | ||||||
|             partial( |                 f'Shm pushed {ln} frame:\n' | ||||||
|                 get_ohlc_frame, |                 f'{start_dt} -> {end_dt}' | ||||||
|                 # we close in the ``iter_dt()`` gen in so we can send |             ) | ||||||
|                 # reset signals as needed for gap dection in the |  | ||||||
|                 # history. |  | ||||||
|                 iter_dts_gen=idts, |  | ||||||
|             ), |  | ||||||
|             idts, |  | ||||||
| 
 | 
 | ||||||
|             capture_outcome=True, |             if ( | ||||||
|             include_value=True, |                 storage is not None | ||||||
|  |                 and write_tsdb | ||||||
|  |             ): | ||||||
|  |                 log.info( | ||||||
|  |                     f'Writing {ln} frame to storage:\n' | ||||||
|  |                     f'{start_dt} -> {end_dt}' | ||||||
|  |                 ) | ||||||
|  |                 await storage.write_ohlcv( | ||||||
|  |                     f'{bfqsn}.{mod.name}',  # lul.. | ||||||
|  |                     to_push, | ||||||
|  |                     timeframe, | ||||||
|  |                 ) | ||||||
| 
 | 
 | ||||||
|             # better technical names bruv... |         # TODO: can we only trigger this if the respective | ||||||
|             max_at_once=erlangs, |         # history in "in view"?!? | ||||||
|             max_per_second=rate, |  | ||||||
| 
 | 
 | ||||||
|         ) as outcomes: |         # XXX: extremely important, there can be no checkpoints | ||||||
| 
 |         # in the block above to avoid entering new ``frames`` | ||||||
|             # Then iterate over the return values, as they become available |         # values while we're pipelining the current ones to | ||||||
|             # (i.e., not necessarily in the original order) |         # memory... | ||||||
|             async for input_end_dt, outcome in outcomes: |         for delay_s in sampler.subscribers: | ||||||
| 
 |             await broadcast(delay_s) | ||||||
|                 try: |  | ||||||
|                     out = outcome.unwrap() |  | ||||||
| 
 |  | ||||||
|                     if out is None: |  | ||||||
|                         # skip signal |  | ||||||
|                         continue |  | ||||||
| 
 |  | ||||||
|                     elif isinstance(out, DataUnavailable): |  | ||||||
|                         # no data available case signal.. so just kill |  | ||||||
|                         # further requests and basically just stop |  | ||||||
|                         # trying... |  | ||||||
|                         break |  | ||||||
| 
 |  | ||||||
|                 except Exception: |  | ||||||
|                     log.exception('uhh trimeter bail') |  | ||||||
|                     raise |  | ||||||
|                 else: |  | ||||||
|                     to_push, start_dt, end_dt = out |  | ||||||
| 
 |  | ||||||
|                 if not len(to_push): |  | ||||||
|                     # diff returned no new data (i.e. we probablyl hit |  | ||||||
|                     # the ``last_tsdb_dt`` point). |  | ||||||
|                     # TODO: raise instead? |  | ||||||
|                     log.warning(f'No history for range {start_dt} -> {end_dt}') |  | ||||||
|                     continue |  | ||||||
| 
 |  | ||||||
|                 # pipeline-style pull frames until we need to wait for |  | ||||||
|                 # the next in order to arrive. |  | ||||||
|                 # i = end_dts.index(input_end_dt) |  | ||||||
|                 # print(f'latest end_dt {end_dt} found at index {i}') |  | ||||||
| 
 |  | ||||||
|                 epochs = list(reversed(sorted(frames))) |  | ||||||
|                 for epoch in epochs: |  | ||||||
| 
 |  | ||||||
|                     start = shm.array['time'][0] |  | ||||||
|                     last_shm_prepend_dt = pendulum.from_timestamp(start) |  | ||||||
|                     earliest_frame_queue_dt = pendulum.from_timestamp(epoch) |  | ||||||
| 
 |  | ||||||
|                     diff = start - epoch |  | ||||||
| 
 |  | ||||||
|                     if diff < 0: |  | ||||||
|                         log.warning( |  | ||||||
|                             'Discarding out of order frame:\n' |  | ||||||
|                             f'{earliest_frame_queue_dt}' |  | ||||||
|                         ) |  | ||||||
|                         frames.pop(epoch) |  | ||||||
|                         continue |  | ||||||
| 
 |  | ||||||
|                     if diff > step_size_s: |  | ||||||
| 
 |  | ||||||
|                         if earliest_end_dt < earliest_frame_queue_dt: |  | ||||||
|                             # XXX: an expected gap was encountered (see |  | ||||||
|                             # logic in ``get_ohlc_frame()``, so allow |  | ||||||
|                             # this frame through to the storage layer. |  | ||||||
|                             log.warning( |  | ||||||
|                                 f'Expected history gap of {diff}s:\n' |  | ||||||
|                                 f'{earliest_frame_queue_dt} <- ' |  | ||||||
|                                 f'{earliest_end_dt}' |  | ||||||
|                             ) |  | ||||||
| 
 |  | ||||||
|                         elif ( |  | ||||||
|                             erlangs > 1 |  | ||||||
|                         ): |  | ||||||
|                             # we don't yet have the next frame to push |  | ||||||
|                             # so break back to the async request loop |  | ||||||
|                             # while we wait for more async frame-results |  | ||||||
|                             # to arrive. |  | ||||||
|                             if len(frames) >= erlangs: |  | ||||||
|                                 log.warning( |  | ||||||
|                                     'Frame count in async-queue is greater ' |  | ||||||
|                                     'then erlangs?\n' |  | ||||||
|                                     'There seems to be a gap between:\n' |  | ||||||
|                                     f'{earliest_frame_queue_dt} <- ' |  | ||||||
|                                     f'{last_shm_prepend_dt}\n' |  | ||||||
|                                     'Conducting manual call for frame ending: ' |  | ||||||
|                                     f'{last_shm_prepend_dt}' |  | ||||||
|                                 ) |  | ||||||
|                                 ( |  | ||||||
|                                     to_push, |  | ||||||
|                                     start_dt, |  | ||||||
|                                     end_dt, |  | ||||||
|                                 ) = await get_ohlc_frame( |  | ||||||
|                                     input_end_dt=last_shm_prepend_dt, |  | ||||||
|                                     iter_dts_gen=idts, |  | ||||||
|                                 ) |  | ||||||
|                                 last_epoch = to_push['time'][-1] |  | ||||||
|                                 diff = start - last_epoch |  | ||||||
| 
 |  | ||||||
|                                 if diff > step_size_s: |  | ||||||
|                                     await tractor.breakpoint() |  | ||||||
|                                     raise DataUnavailable( |  | ||||||
|                                         'An awkward frame was found:\n' |  | ||||||
|                                         f'{start_dt} -> {end_dt}:\n{to_push}' |  | ||||||
|                                     ) |  | ||||||
| 
 |  | ||||||
|                                 else: |  | ||||||
|                                     frames[last_epoch] = ( |  | ||||||
|                                         to_push, start_dt, end_dt) |  | ||||||
|                                     break |  | ||||||
| 
 |  | ||||||
|                             expect_end = pendulum.from_timestamp(start) |  | ||||||
|                             expect_start = expect_end.subtract( |  | ||||||
|                                 seconds=frame_size_s) |  | ||||||
|                             log.warning( |  | ||||||
|                                 'waiting on out-of-order history frame:\n' |  | ||||||
|                                 f'{expect_end - expect_start}' |  | ||||||
|                             ) |  | ||||||
|                             break |  | ||||||
| 
 |  | ||||||
|                     to_push, start_dt, end_dt = frames.pop(epoch) |  | ||||||
|                     ln = len(to_push) |  | ||||||
| 
 |  | ||||||
|                     # bail gracefully on shm allocation overrun/full condition |  | ||||||
|                     try: |  | ||||||
|                         shm.push(to_push, prepend=True) |  | ||||||
|                     except ValueError: |  | ||||||
|                         log.info( |  | ||||||
|                             f'Shm buffer overrun on: {start_dt} -> {end_dt}?' |  | ||||||
|                         ) |  | ||||||
|                         break |  | ||||||
| 
 |  | ||||||
|                     log.info( |  | ||||||
|                         f'Shm pushed {ln} frame:\n' |  | ||||||
|                         f'{start_dt} -> {end_dt}' |  | ||||||
|                     ) |  | ||||||
|                     # keep track of most recent "prepended" ``start_dt`` |  | ||||||
|                     # both for detecting gaps and ensuring async |  | ||||||
|                     # frame-result order. |  | ||||||
|                     earliest_end_dt = start_dt |  | ||||||
| 
 |  | ||||||
|                     if ( |  | ||||||
|                         storage is not None |  | ||||||
|                         and write_tsdb |  | ||||||
|                     ): |  | ||||||
|                         log.info( |  | ||||||
|                             f'Writing {ln} frame to storage:\n' |  | ||||||
|                             f'{start_dt} -> {end_dt}' |  | ||||||
|                         ) |  | ||||||
|                         await storage.write_ohlcv( |  | ||||||
|                             f'{bfqsn}.{mod.name}',  # lul.. |  | ||||||
|                             to_push, |  | ||||||
|                         ) |  | ||||||
| 
 |  | ||||||
|                 # TODO: can we only trigger this if the respective |  | ||||||
|                 # history in "in view"?!? |  | ||||||
|                 # XXX: extremely important, there can be no checkpoints |  | ||||||
|                 # in the block above to avoid entering new ``frames`` |  | ||||||
|                 # values while we're pipelining the current ones to |  | ||||||
|                 # memory... |  | ||||||
|                 for delay_s in sampler.subscribers: |  | ||||||
|                     await broadcast(delay_s) |  | ||||||
| 
 | 
 | ||||||
|  |         # short-circuit (for now) | ||||||
|         bf_done.set() |         bf_done.set() | ||||||
|  |         return | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def manage_history( | async def manage_history( | ||||||
|  | @ -660,6 +467,7 @@ async def manage_history( | ||||||
|     fqsn: str, |     fqsn: str, | ||||||
|     some_data_ready: trio.Event, |     some_data_ready: trio.Event, | ||||||
|     feed_is_live: trio.Event, |     feed_is_live: trio.Event, | ||||||
|  |     timeframe: float = 60,  # in seconds | ||||||
| 
 | 
 | ||||||
|     task_status: TaskStatus = trio.TASK_STATUS_IGNORED, |     task_status: TaskStatus = trio.TASK_STATUS_IGNORED, | ||||||
| 
 | 
 | ||||||
|  | @ -683,10 +491,10 @@ async def manage_history( | ||||||
|         readonly=False, |         readonly=False, | ||||||
|     ) |     ) | ||||||
|     # TODO: history validation |     # TODO: history validation | ||||||
|     if not opened: |     # if not opened: | ||||||
|         raise RuntimeError( |     #     raise RuntimeError( | ||||||
|             "Persistent shm for sym was already open?!" |     #         "Persistent shm for sym was already open?!" | ||||||
|         ) |     #     ) | ||||||
| 
 | 
 | ||||||
|     rt_shm, opened = maybe_open_shm_array( |     rt_shm, opened = maybe_open_shm_array( | ||||||
|         key=f'{fqsn}_rt', |         key=f'{fqsn}_rt', | ||||||
|  | @ -698,10 +506,10 @@ async def manage_history( | ||||||
|         readonly=False, |         readonly=False, | ||||||
|         size=3*_secs_in_day, |         size=3*_secs_in_day, | ||||||
|     ) |     ) | ||||||
|     if not opened: |     # if not opened: | ||||||
|         raise RuntimeError( |     #     raise RuntimeError( | ||||||
|             "Persistent shm for sym was already open?!" |     #         "Persistent shm for sym was already open?!" | ||||||
|         ) |     #     ) | ||||||
| 
 | 
 | ||||||
|     log.info('Scanning for existing `marketstored`') |     log.info('Scanning for existing `marketstored`') | ||||||
| 
 | 
 | ||||||
|  | @ -726,7 +534,10 @@ async def manage_history( | ||||||
|             # shm backfiller approach below. |             # shm backfiller approach below. | ||||||
| 
 | 
 | ||||||
|             # start history anal and load missing new data via backend. |             # start history anal and load missing new data via backend. | ||||||
|             series, _, last_tsdb_dt = await storage.load(fqsn) |             series, _, last_tsdb_dt = await storage.load( | ||||||
|  |                 fqsn, | ||||||
|  |                 timeframe=timeframe, | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|             broker, symbol, expiry = unpack_fqsn(fqsn) |             broker, symbol, expiry = unpack_fqsn(fqsn) | ||||||
|             ( |             ( | ||||||
|  | @ -739,6 +550,7 @@ async def manage_history( | ||||||
|                     mod, |                     mod, | ||||||
|                     bfqsn, |                     bfqsn, | ||||||
|                     hist_shm, |                     hist_shm, | ||||||
|  |                     timeframe=timeframe, | ||||||
|                     last_tsdb_dt=last_tsdb_dt, |                     last_tsdb_dt=last_tsdb_dt, | ||||||
|                     tsdb_is_up=True, |                     tsdb_is_up=True, | ||||||
|                     storage=storage, |                     storage=storage, | ||||||
|  | @ -769,7 +581,6 @@ async def manage_history( | ||||||
|             else: |             else: | ||||||
|                 dt_diff_s = 0 |                 dt_diff_s = 0 | ||||||
| 
 | 
 | ||||||
|             # await trio.sleep_forever() |  | ||||||
|             # TODO: see if there's faster multi-field reads: |             # TODO: see if there's faster multi-field reads: | ||||||
|             # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields |             # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields | ||||||
|             # re-index  with a `time` and index field |             # re-index  with a `time` and index field | ||||||
|  | @ -804,6 +615,7 @@ async def manage_history( | ||||||
|                     series = await storage.read_ohlcv( |                     series = await storage.read_ohlcv( | ||||||
|                         fqsn, |                         fqsn, | ||||||
|                         end=end, |                         end=end, | ||||||
|  |                         timeframe=timeframe, | ||||||
|                     ) |                     ) | ||||||
|                     history = list(series.values()) |                     history = list(series.values()) | ||||||
|                     fastest = history[0] |                     fastest = history[0] | ||||||
|  | @ -856,6 +668,7 @@ async def manage_history( | ||||||
|                 mod, |                 mod, | ||||||
|                 bfqsn, |                 bfqsn, | ||||||
|                 hist_shm, |                 hist_shm, | ||||||
|  |                 timeframe=timeframe, | ||||||
|             ) |             ) | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -387,6 +387,7 @@ class Storage: | ||||||
|     async def load( |     async def load( | ||||||
|         self, |         self, | ||||||
|         fqsn: str, |         fqsn: str, | ||||||
|  |         timeframe: int, | ||||||
| 
 | 
 | ||||||
|     ) -> tuple[ |     ) -> tuple[ | ||||||
|         dict[int, np.ndarray],  # timeframe (in secs) to series |         dict[int, np.ndarray],  # timeframe (in secs) to series | ||||||
|  | @ -400,12 +401,16 @@ class Storage: | ||||||
|             # on first load we don't need to pull the max |             # on first load we don't need to pull the max | ||||||
|             # history per request size worth. |             # history per request size worth. | ||||||
|             limit=3000, |             limit=3000, | ||||||
|  |             timeframe=timeframe, | ||||||
|         ) |         ) | ||||||
|         log.info(f'Loaded tsdb history {tsdb_arrays}') |         log.info(f'Loaded tsdb history {tsdb_arrays}') | ||||||
| 
 | 
 | ||||||
|         if tsdb_arrays: |         if len(tsdb_arrays): | ||||||
|             fastest = list(tsdb_arrays.values())[0] |             # fastest = list(tsdb_arrays.values())[0] | ||||||
|             times = fastest['Epoch'] |             # slowest = list(tsdb_arrays.values())[-1] | ||||||
|  |             hist = tsdb_arrays[timeframe] | ||||||
|  | 
 | ||||||
|  |             times = hist['Epoch'] | ||||||
|             first, last = times[0], times[-1] |             first, last = times[0], times[-1] | ||||||
|             first_tsdb_dt, last_tsdb_dt = map( |             first_tsdb_dt, last_tsdb_dt = map( | ||||||
|                 pendulum.from_timestamp, [first, last] |                 pendulum.from_timestamp, [first, last] | ||||||
|  | @ -420,9 +425,9 @@ class Storage: | ||||||
|         end: Optional[int] = None, |         end: Optional[int] = None, | ||||||
|         limit: int = int(800e3), |         limit: int = int(800e3), | ||||||
| 
 | 
 | ||||||
|     ) -> tuple[ |     ) -> dict[ | ||||||
|         MarketstoreClient, |         int, | ||||||
|         Union[dict, np.ndarray] |         Union[dict, np.ndarray], | ||||||
|     ]: |     ]: | ||||||
|         client = self.client |         client = self.client | ||||||
|         syms = await client.list_symbols() |         syms = await client.list_symbols() | ||||||
|  | @ -430,7 +435,8 @@ class Storage: | ||||||
|         if fqsn not in syms: |         if fqsn not in syms: | ||||||
|             return {} |             return {} | ||||||
| 
 | 
 | ||||||
|         tfstr = tf_in_1s[1] |         # use the provided timeframe or 1s by default | ||||||
|  |         tfstr = tf_in_1s.get(timeframe, tf_in_1s[1]) | ||||||
| 
 | 
 | ||||||
|         params = Params( |         params = Params( | ||||||
|             symbols=fqsn, |             symbols=fqsn, | ||||||
|  | @ -463,39 +469,52 @@ class Storage: | ||||||
|                 return {} |                 return {} | ||||||
| 
 | 
 | ||||||
|         else: |         else: | ||||||
|             result = await client.query(params) |             params.set('timeframe', tfstr) | ||||||
|  |             try: | ||||||
|  |                 result = await client.query(params) | ||||||
|  |             except purerpc.grpclib.exceptions.UnknownError: | ||||||
|  |                 # indicate there is no history for this timeframe | ||||||
|  |                 return {} | ||||||
|  | 
 | ||||||
|  |         # Fill out a `numpy` array-results map keyed by timeframe | ||||||
|  |         arrays = {} | ||||||
| 
 | 
 | ||||||
|         # TODO: it turns out column access on recarrays is actually slower: |         # TODO: it turns out column access on recarrays is actually slower: | ||||||
|         # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist |         # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist | ||||||
|         # it might make sense to make these structured arrays? |         # it might make sense to make these structured arrays? | ||||||
|         # Fill out a `numpy` array-results map |  | ||||||
|         arrays = {} |  | ||||||
|         for fqsn, data_set in result.by_symbols().items(): |         for fqsn, data_set in result.by_symbols().items(): | ||||||
|             arrays.setdefault(fqsn, {})[ |             arrays.setdefault(fqsn, {})[ | ||||||
|                 tf_in_1s.inverse[data_set.timeframe] |                 tf_in_1s.inverse[data_set.timeframe] | ||||||
|             ] = data_set.array |             ] = data_set.array | ||||||
| 
 | 
 | ||||||
|         return arrays[fqsn][timeframe] if timeframe else arrays[fqsn] |         return arrays[fqsn] | ||||||
| 
 | 
 | ||||||
|     async def delete_ts( |     async def delete_ts( | ||||||
|         self, |         self, | ||||||
|         key: str, |         key: str, | ||||||
|         timeframe: Optional[Union[int, str]] = None, |         timeframe: Optional[Union[int, str]] = None, | ||||||
|  |         fmt: str = 'OHLCV', | ||||||
| 
 | 
 | ||||||
|     ) -> bool: |     ) -> bool: | ||||||
| 
 | 
 | ||||||
|         client = self.client |         client = self.client | ||||||
|         syms = await client.list_symbols() |         syms = await client.list_symbols() | ||||||
|         print(syms) |         print(syms) | ||||||
|         # if key not in syms: |         if key not in syms: | ||||||
|         #     raise KeyError(f'`{fqsn}` table key not found?') |             raise KeyError(f'`{key}` table key not found in\n{syms}?') | ||||||
| 
 | 
 | ||||||
|         return await client.destroy(tbk=key) |         tbk = mk_tbk(( | ||||||
|  |             key, | ||||||
|  |             tf_in_1s.get(timeframe, tf_in_1s[60]), | ||||||
|  |             fmt, | ||||||
|  |         )) | ||||||
|  |         return await client.destroy(tbk=tbk) | ||||||
| 
 | 
 | ||||||
|     async def write_ohlcv( |     async def write_ohlcv( | ||||||
|         self, |         self, | ||||||
|         fqsn: str, |         fqsn: str, | ||||||
|         ohlcv: np.ndarray, |         ohlcv: np.ndarray, | ||||||
|  |         timeframe: int, | ||||||
|         append_and_duplicate: bool = True, |         append_and_duplicate: bool = True, | ||||||
|         limit: int = int(800e3), |         limit: int = int(800e3), | ||||||
| 
 | 
 | ||||||
|  | @ -525,7 +544,7 @@ class Storage: | ||||||
|             # write to db |             # write to db | ||||||
|             resp = await self.client.write( |             resp = await self.client.write( | ||||||
|                 to_push, |                 to_push, | ||||||
|                 tbk=f'{fqsn}/1Sec/OHLCV', |                 tbk=f'{fqsn}/{tf_in_1s[timeframe]}/OHLCV', | ||||||
| 
 | 
 | ||||||
|                 # NOTE: will will append duplicates |                 # NOTE: will will append duplicates | ||||||
|                 # for the same timestamp-index. |                 # for the same timestamp-index. | ||||||
|  | @ -577,6 +596,7 @@ class Storage: | ||||||
|     # def delete_range(self, start_dt, end_dt) -> None: |     # def delete_range(self, start_dt, end_dt) -> None: | ||||||
|     #     ... |     #     ... | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| @acm | @acm | ||||||
| async def open_storage_client( | async def open_storage_client( | ||||||
|     fqsn: str, |     fqsn: str, | ||||||
|  | @ -642,7 +662,7 @@ async def tsdb_history_update( | ||||||
|     ): |     ): | ||||||
|         profiler(f'opened feed for {fqsn}') |         profiler(f'opened feed for {fqsn}') | ||||||
| 
 | 
 | ||||||
|         to_append = feed.shm.array |         to_append = feed.hist_shm.array | ||||||
|         to_prepend = None |         to_prepend = None | ||||||
| 
 | 
 | ||||||
|         if fqsn: |         if fqsn: | ||||||
|  | @ -651,7 +671,7 @@ async def tsdb_history_update( | ||||||
|                 fqsn = symbol.front_fqsn() |                 fqsn = symbol.front_fqsn() | ||||||
| 
 | 
 | ||||||
|             # diff db history with shm and only write the missing portions |             # diff db history with shm and only write the missing portions | ||||||
|             ohlcv = feed.shm.array |             ohlcv = feed.hist_shm.array | ||||||
| 
 | 
 | ||||||
|             # TODO: use pg profiler |             # TODO: use pg profiler | ||||||
|             tsdb_arrays = await storage.read_ohlcv(fqsn) |             tsdb_arrays = await storage.read_ohlcv(fqsn) | ||||||
|  |  | ||||||
							
								
								
									
										71
									
								
								piker/pp.py
								
								
								
								
							
							
						
						
									
										71
									
								
								piker/pp.py
								
								
								
								
							|  | @ -20,6 +20,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. | ||||||
| (looking at you `ib` and dirt-bird friends) | (looking at you `ib` and dirt-bird friends) | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
|  | from __future__ import annotations | ||||||
| from contextlib import contextmanager as cm | from contextlib import contextmanager as cm | ||||||
| from pprint import pformat | from pprint import pformat | ||||||
| import os | import os | ||||||
|  | @ -138,13 +139,31 @@ class Position(Struct): | ||||||
| 
 | 
 | ||||||
|     # ordered record of known constituent trade messages |     # ordered record of known constituent trade messages | ||||||
|     clears: dict[ |     clears: dict[ | ||||||
|         Union[str, int, Status],  # trade id |         str | int,  # trade id | ||||||
|         dict[str, Any],  # transaction history summaries |         dict[str, Any],  # transaction history summaries | ||||||
|     ] = {} |     ] = {} | ||||||
|     first_clear_dt: Optional[datetime] = None |     first_clear_dt: Optional[datetime] = None | ||||||
| 
 | 
 | ||||||
|     expiry: Optional[datetime] = None |     expiry: Optional[datetime] = None | ||||||
| 
 | 
 | ||||||
|  |     # @property | ||||||
|  |     # def clears(self) -> dict[ | ||||||
|  |     #     Union[str, int, Status],  # trade id | ||||||
|  |     #     dict[str, Any],  # transaction history summaries | ||||||
|  |     # ]: | ||||||
|  |     #     ''' | ||||||
|  |     #     Datetime sorted reference to internal clears table. | ||||||
|  | 
 | ||||||
|  |     #     ''' | ||||||
|  |     #     # self._clears = {} | ||||||
|  |     #     self._clears = dict(sorted( | ||||||
|  |     #         self._clears.items(), | ||||||
|  |     #         key=lambda entry: entry[1]['dt'], | ||||||
|  |     #     )) | ||||||
|  |     #         # self._clears[k] = v | ||||||
|  | 
 | ||||||
|  |     #     return self._clears | ||||||
|  | 
 | ||||||
|     def to_dict(self) -> dict: |     def to_dict(self) -> dict: | ||||||
|         return { |         return { | ||||||
|             f: getattr(self, f) |             f: getattr(self, f) | ||||||
|  | @ -219,6 +238,10 @@ class Position(Struct): | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         clears = list(self.clears.values()) |         clears = list(self.clears.values()) | ||||||
|  |         if not clears: | ||||||
|  |             log.warning(f'No clears table for {self.symbol}!?') | ||||||
|  |             return | ||||||
|  | 
 | ||||||
|         self.first_clear_dt = min(list(entry['dt'] for entry in clears)) |         self.first_clear_dt = min(list(entry['dt'] for entry in clears)) | ||||||
|         last_clear = clears[-1] |         last_clear = clears[-1] | ||||||
| 
 | 
 | ||||||
|  | @ -623,6 +646,7 @@ class PpTable(Struct): | ||||||
| 
 | 
 | ||||||
|     def to_toml( |     def to_toml( | ||||||
|         self, |         self, | ||||||
|  |         min_clears: bool = True, | ||||||
|     ) -> dict[str, Any]: |     ) -> dict[str, Any]: | ||||||
| 
 | 
 | ||||||
|         active, closed = self.dump_active() |         active, closed = self.dump_active() | ||||||
|  | @ -635,7 +659,9 @@ class PpTable(Struct): | ||||||
| 
 | 
 | ||||||
|             # keep the minimal amount of clears that make up this |             # keep the minimal amount of clears that make up this | ||||||
|             # position since the last net-zero state. |             # position since the last net-zero state. | ||||||
|             pos.minimize_clears() |             if min_clears: | ||||||
|  |                 pos.minimize_clears() | ||||||
|  | 
 | ||||||
|             pos.ensure_state() |             pos.ensure_state() | ||||||
| 
 | 
 | ||||||
|             # serialize to pre-toml form |             # serialize to pre-toml form | ||||||
|  | @ -682,6 +708,8 @@ def load_pps_from_ledger( | ||||||
|     brokername: str, |     brokername: str, | ||||||
|     acctname: str, |     acctname: str, | ||||||
| 
 | 
 | ||||||
|  |     table: Optional[PpTable] = None, | ||||||
|  | 
 | ||||||
|     # post normalization filter on ledger entries to be processed |     # post normalization filter on ledger entries to be processed | ||||||
|     filter_by: Optional[list[dict]] = None, |     filter_by: Optional[list[dict]] = None, | ||||||
| 
 | 
 | ||||||
|  | @ -698,7 +726,6 @@ def load_pps_from_ledger( | ||||||
|     ''' |     ''' | ||||||
|     with ( |     with ( | ||||||
|         open_trade_ledger(brokername, acctname) as ledger, |         open_trade_ledger(brokername, acctname) as ledger, | ||||||
|         open_pps(brokername, acctname) as table, |  | ||||||
|     ): |     ): | ||||||
|         if not ledger: |         if not ledger: | ||||||
|             # null case, no ledger file with content |             # null case, no ledger file with content | ||||||
|  | @ -716,7 +743,11 @@ def load_pps_from_ledger( | ||||||
|         else: |         else: | ||||||
|             records = src_records |             records = src_records | ||||||
| 
 | 
 | ||||||
|         updated = table.update_from_trans(records) |         if table is None: | ||||||
|  |             with open_pps(brokername, acctname) as table: | ||||||
|  |                 updated = table.update_from_trans(records) | ||||||
|  |         else: | ||||||
|  |             updated = table.update_from_trans(records) | ||||||
| 
 | 
 | ||||||
|     return records, updated |     return records, updated | ||||||
| 
 | 
 | ||||||
|  | @ -886,15 +917,27 @@ def open_pps( | ||||||
|         conf=conf, |         conf=conf, | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|  |     # first pass populate all missing clears record tables | ||||||
|  |     # for fqsn, entry in pps.items(): | ||||||
|  |     #     # convert clears sub-tables (only in this form | ||||||
|  |     #     # for toml re-presentation) back into a master table. | ||||||
|  |     #     clears_list = entry.get('clears', []) | ||||||
|  | 
 | ||||||
|  |     #     # attempt to reload from ledger | ||||||
|  |     #     if not clears_list: | ||||||
|  |     #         trans, pos = load_pps_from_ledger( | ||||||
|  |     #             brokername, | ||||||
|  |     #             acctid, | ||||||
|  |     #             filter_by=[entry['bsuid']], | ||||||
|  |     #             table=table, | ||||||
|  |     #         ) | ||||||
|  |     #         # breakpoint() | ||||||
|  | 
 | ||||||
|     # unmarshal/load ``pps.toml`` config entries into object form |     # unmarshal/load ``pps.toml`` config entries into object form | ||||||
|     # and update `PpTable` obj entries. |     # and update `PpTable` obj entries. | ||||||
|     for fqsn, entry in pps.items(): |     for fqsn, entry in pps.items(): | ||||||
|         bsuid = entry['bsuid'] |         bsuid = entry['bsuid'] | ||||||
| 
 | 
 | ||||||
|         # convert clears sub-tables (only in this form |  | ||||||
|         # for toml re-presentation) back into a master table. |  | ||||||
|         clears_list = entry['clears'] |  | ||||||
| 
 |  | ||||||
|         # index clears entries in "object" form by tid in a top |         # index clears entries in "object" form by tid in a top | ||||||
|         # level dict instead of a list (as is presented in our |         # level dict instead of a list (as is presented in our | ||||||
|         # ``pps.toml``). |         # ``pps.toml``). | ||||||
|  | @ -906,6 +949,18 @@ def open_pps( | ||||||
|         # processing of new clear events. |         # processing of new clear events. | ||||||
|         trans: list[Transaction] = [] |         trans: list[Transaction] = [] | ||||||
| 
 | 
 | ||||||
|  |         # convert clears sub-tables (only in this form | ||||||
|  |         # for toml re-presentation) back into a master table. | ||||||
|  |         clears_list = entry['clears'] | ||||||
|  | 
 | ||||||
|  |         # # attempt to reload from ledger | ||||||
|  |         # if not clears_list: | ||||||
|  |         #     trans, pos = load_pps_from_ledger( | ||||||
|  |         #         brokername, | ||||||
|  |         #         acctid, | ||||||
|  |         #         table=table, | ||||||
|  |         #     ) | ||||||
|  | 
 | ||||||
|         for clears_table in clears_list: |         for clears_table in clears_list: | ||||||
|             tid = clears_table.pop('tid') |             tid = clears_table.pop('tid') | ||||||
|             dtstr = clears_table['dt'] |             dtstr = clears_table['dt'] | ||||||
|  |  | ||||||
|  | @ -249,14 +249,14 @@ async def graphics_update_loop( | ||||||
|     linked: LinkedSplits = godwidget.rt_linked |     linked: LinkedSplits = godwidget.rt_linked | ||||||
|     display_rate = godwidget.window.current_screen().refreshRate() |     display_rate = godwidget.window.current_screen().refreshRate() | ||||||
| 
 | 
 | ||||||
|     chart = linked.chart |     fast_chart = linked.chart | ||||||
|     hist_chart = godwidget.hist_linked.chart |     hist_chart = godwidget.hist_linked.chart | ||||||
| 
 | 
 | ||||||
|     ohlcv = feed.rt_shm |     ohlcv = feed.rt_shm | ||||||
|     hist_ohlcv = feed.hist_shm |     hist_ohlcv = feed.hist_shm | ||||||
| 
 | 
 | ||||||
|     # update last price sticky |     # update last price sticky | ||||||
|     last_price_sticky = chart._ysticks[chart.name] |     last_price_sticky = fast_chart._ysticks[fast_chart.name] | ||||||
|     last_price_sticky.update_from_data( |     last_price_sticky.update_from_data( | ||||||
|         *ohlcv.array[-1][['index', 'close']] |         *ohlcv.array[-1][['index', 'close']] | ||||||
|     ) |     ) | ||||||
|  | @ -268,7 +268,7 @@ async def graphics_update_loop( | ||||||
| 
 | 
 | ||||||
|     maxmin = partial( |     maxmin = partial( | ||||||
|         chart_maxmin, |         chart_maxmin, | ||||||
|         chart, |         fast_chart, | ||||||
|         ohlcv, |         ohlcv, | ||||||
|         vlm_chart, |         vlm_chart, | ||||||
|     ) |     ) | ||||||
|  | @ -282,15 +282,15 @@ async def graphics_update_loop( | ||||||
| 
 | 
 | ||||||
|     last, volume = ohlcv.array[-1][['close', 'volume']] |     last, volume = ohlcv.array[-1][['close', 'volume']] | ||||||
| 
 | 
 | ||||||
|     symbol = chart.linked.symbol |     symbol = fast_chart.linked.symbol | ||||||
| 
 | 
 | ||||||
|     l1 = L1Labels( |     l1 = L1Labels( | ||||||
|         chart, |         fast_chart, | ||||||
|         # determine precision/decimal lengths |         # determine precision/decimal lengths | ||||||
|         digits=symbol.tick_size_digits, |         digits=symbol.tick_size_digits, | ||||||
|         size_digits=symbol.lot_size_digits, |         size_digits=symbol.lot_size_digits, | ||||||
|     ) |     ) | ||||||
|     chart._l1_labels = l1 |     fast_chart._l1_labels = l1 | ||||||
| 
 | 
 | ||||||
|     # TODO: |     # TODO: | ||||||
|     # - in theory we should be able to read buffer data faster |     # - in theory we should be able to read buffer data faster | ||||||
|  | @ -300,10 +300,10 @@ async def graphics_update_loop( | ||||||
|     #   levels this might be dark volume we need to |     #   levels this might be dark volume we need to | ||||||
|     #   present differently -> likely dark vlm |     #   present differently -> likely dark vlm | ||||||
| 
 | 
 | ||||||
|     tick_size = chart.linked.symbol.tick_size |     tick_size = fast_chart.linked.symbol.tick_size | ||||||
|     tick_margin = 3 * tick_size |     tick_margin = 3 * tick_size | ||||||
| 
 | 
 | ||||||
|     chart.show() |     fast_chart.show() | ||||||
|     last_quote = time.time() |     last_quote = time.time() | ||||||
|     i_last = ohlcv.index |     i_last = ohlcv.index | ||||||
| 
 | 
 | ||||||
|  | @ -313,7 +313,7 @@ async def graphics_update_loop( | ||||||
|         'maxmin': maxmin, |         'maxmin': maxmin, | ||||||
|         'ohlcv': ohlcv, |         'ohlcv': ohlcv, | ||||||
|         'hist_ohlcv': hist_ohlcv, |         'hist_ohlcv': hist_ohlcv, | ||||||
|         'chart': chart, |         'chart': fast_chart, | ||||||
|         'last_price_sticky': last_price_sticky, |         'last_price_sticky': last_price_sticky, | ||||||
|         'hist_last_price_sticky': hist_last_price_sticky, |         'hist_last_price_sticky': hist_last_price_sticky, | ||||||
|         'l1': l1, |         'l1': l1, | ||||||
|  | @ -333,7 +333,7 @@ async def graphics_update_loop( | ||||||
|         ds.vlm_chart = vlm_chart |         ds.vlm_chart = vlm_chart | ||||||
|         ds.vlm_sticky = vlm_sticky |         ds.vlm_sticky = vlm_sticky | ||||||
| 
 | 
 | ||||||
|     chart.default_view() |     fast_chart.default_view() | ||||||
| 
 | 
 | ||||||
|     # TODO: probably factor this into some kinda `DisplayState` |     # TODO: probably factor this into some kinda `DisplayState` | ||||||
|     # API that can be reused at least in terms of pulling view |     # API that can be reused at least in terms of pulling view | ||||||
|  | @ -410,16 +410,16 @@ async def graphics_update_loop( | ||||||
|         last_quote = time.time() |         last_quote = time.time() | ||||||
| 
 | 
 | ||||||
|         # chart isn't active/shown so skip render cycle and pause feed(s) |         # chart isn't active/shown so skip render cycle and pause feed(s) | ||||||
|         if chart.linked.isHidden(): |         if fast_chart.linked.isHidden(): | ||||||
|             # print('skipping update') |             # print('skipping update') | ||||||
|             chart.pause_all_feeds() |             fast_chart.pause_all_feeds() | ||||||
|             continue |             continue | ||||||
| 
 | 
 | ||||||
|         ic = chart.view._ic |         # ic = fast_chart.view._ic | ||||||
|         if ic: |         # if ic: | ||||||
|             chart.pause_all_feeds() |         #     fast_chart.pause_all_feeds() | ||||||
|             await ic.wait() |         #     await ic.wait() | ||||||
|             chart.resume_all_feeds() |         #     fast_chart.resume_all_feeds() | ||||||
| 
 | 
 | ||||||
|         # sync call to update all graphics/UX components. |         # sync call to update all graphics/UX components. | ||||||
|         graphics_update_cycle(ds) |         graphics_update_cycle(ds) | ||||||
|  | @ -502,6 +502,7 @@ def graphics_update_cycle( | ||||||
|             or trigger_all |             or trigger_all | ||||||
|         ): |         ): | ||||||
|             chart.increment_view(steps=i_diff) |             chart.increment_view(steps=i_diff) | ||||||
|  |             chart.view._set_yrange(yrange=(mn, mx)) | ||||||
| 
 | 
 | ||||||
|             if vlm_chart: |             if vlm_chart: | ||||||
|                 vlm_chart.increment_view(steps=i_diff) |                 vlm_chart.increment_view(steps=i_diff) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue