Compare commits
	
		
			17 Commits 
		
	
	
		
			ef9bc7d1ed
			...
			30483c3218
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 30483c3218 | |
|  | 8b61b4e79b | |
|  | 3edb6926e4 | |
|  | 9fb2fc1437 | |
|  | 48c7fb023e | |
|  | 7de20ebe42 | |
|  | 5e371f1d73 | |
|  | 6c221bb348 | |
|  | e391c896f8 | |
|  | 5633f5614d | |
|  | 76735189de | |
|  | d49608f74e | |
|  | bf0ac93aa3 | |
|  | d7179d47b0 | |
|  | c390e87536 | |
|  | 5e4a6d61c7 | |
|  | 3caaa30b03 | 
|  | @ -98,13 +98,14 @@ async def open_cached_client( | |||
|     If one has not been setup do it and cache it. | ||||
| 
 | ||||
|     ''' | ||||
|     brokermod = get_brokermod(brokername) | ||||
|     brokermod: ModuleType = get_brokermod(brokername) | ||||
| 
 | ||||
|     # TODO: make abstract or `typing.Protocol` | ||||
|     # client: Client | ||||
|     async with maybe_open_context( | ||||
|         acm_func=brokermod.get_client, | ||||
|         kwargs=kwargs, | ||||
| 
 | ||||
|     ) as (cache_hit, client): | ||||
| 
 | ||||
|         if cache_hit: | ||||
|             log.runtime(f'Reusing existing {client}') | ||||
| 
 | ||||
|  |  | |||
|  | @ -471,11 +471,15 @@ def search( | |||
| 
 | ||||
|     ''' | ||||
|     # global opts | ||||
|     brokermods = list(config['brokermods'].values()) | ||||
|     brokermods: list[ModuleType] = list(config['brokermods'].values()) | ||||
| 
 | ||||
|     # TODO: this is coming from the `search --pdb` NOT from | ||||
|     # the `piker --pdb` XD .. | ||||
|     # -[ ] pull from the parent click ctx's values..dumdum | ||||
|     # assert pdb | ||||
| 
 | ||||
|     # define tractor entrypoint | ||||
|     async def main(func): | ||||
| 
 | ||||
|         async with maybe_open_pikerd( | ||||
|             loglevel=config['loglevel'], | ||||
|             debug_mode=pdb, | ||||
|  |  | |||
|  | @ -22,7 +22,9 @@ routines should be primitive data types where possible. | |||
| """ | ||||
| import inspect | ||||
| from types import ModuleType | ||||
| from typing import List, Dict, Any, Optional | ||||
| from typing import ( | ||||
|     Any, | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| 
 | ||||
|  | @ -34,8 +36,10 @@ from ..accounting import MktPair | |||
| 
 | ||||
| 
 | ||||
| async def api(brokername: str, methname: str, **kwargs) -> dict: | ||||
|     """Make (proxy through) a broker API call by name and return its result. | ||||
|     """ | ||||
|     ''' | ||||
|     Make (proxy through) a broker API call by name and return its result. | ||||
| 
 | ||||
|     ''' | ||||
|     brokermod = get_brokermod(brokername) | ||||
|     async with brokermod.get_client() as client: | ||||
|         meth = getattr(client, methname, None) | ||||
|  | @ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: | |||
| 
 | ||||
| async def stocks_quote( | ||||
|     brokermod: ModuleType, | ||||
|     tickers: List[str] | ||||
| ) -> Dict[str, Dict[str, Any]]: | ||||
|     """Return quotes dict for ``tickers``. | ||||
|     """ | ||||
|     tickers: list[str] | ||||
| 
 | ||||
| ) -> dict[str, dict[str, Any]]: | ||||
|     ''' | ||||
|     Return a `dict` of snapshot quotes for the provided input | ||||
|     `tickers`: a `list` of fqmes. | ||||
| 
 | ||||
|     ''' | ||||
|     async with brokermod.get_client() as client: | ||||
|         return await client.quote(tickers) | ||||
| 
 | ||||
|  | @ -74,13 +82,15 @@ async def stocks_quote( | |||
| async def option_chain( | ||||
|     brokermod: ModuleType, | ||||
|     symbol: str, | ||||
|     date: Optional[str] = None, | ||||
| ) -> Dict[str, Dict[str, Dict[str, Any]]]: | ||||
|     """Return option chain for ``symbol`` for ``date``. | ||||
|     date: str|None = None, | ||||
| ) -> dict[str, dict[str, dict[str, Any]]]: | ||||
|     ''' | ||||
|     Return option chain for ``symbol`` for ``date``. | ||||
| 
 | ||||
|     By default all expiries are returned. If ``date`` is provided | ||||
|     then contract quotes for that single expiry are returned. | ||||
|     """ | ||||
| 
 | ||||
|     ''' | ||||
|     async with brokermod.get_client() as client: | ||||
|         if date: | ||||
|             id = int((await client.tickers2ids([symbol]))[symbol]) | ||||
|  | @ -98,7 +108,7 @@ async def option_chain( | |||
| # async def contracts( | ||||
| #     brokermod: ModuleType, | ||||
| #     symbol: str, | ||||
| # ) -> Dict[str, Dict[str, Dict[str, Any]]]: | ||||
| # ) -> dict[str, dict[str, dict[str, Any]]]: | ||||
| #     """Return option contracts (all expiries) for ``symbol``. | ||||
| #     """ | ||||
| #     async with brokermod.get_client() as client: | ||||
|  | @ -110,15 +120,24 @@ async def bars( | |||
|     brokermod: ModuleType, | ||||
|     symbol: str, | ||||
|     **kwargs, | ||||
| ) -> Dict[str, Dict[str, Dict[str, Any]]]: | ||||
|     """Return option contracts (all expiries) for ``symbol``. | ||||
|     """ | ||||
| ) -> dict[str, dict[str, dict[str, Any]]]: | ||||
|     ''' | ||||
|     Return option contracts (all expiries) for ``symbol``. | ||||
| 
 | ||||
|     ''' | ||||
|     async with brokermod.get_client() as client: | ||||
|         return await client.bars(symbol, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| async def search_w_brokerd(name: str, pattern: str) -> dict: | ||||
| async def search_w_brokerd( | ||||
|     name: str, | ||||
|     pattern: str, | ||||
| ) -> dict: | ||||
| 
 | ||||
|     # TODO: WHY NOT WORK!?! | ||||
|     # when we `step` through the next block? | ||||
|     # import tractor | ||||
|     # await tractor.pause() | ||||
|     async with open_cached_client(name) as client: | ||||
| 
 | ||||
|         # TODO: support multiple asset type concurrent searches. | ||||
|  | @ -130,12 +149,12 @@ async def symbol_search( | |||
|     pattern: str, | ||||
|     **kwargs, | ||||
| 
 | ||||
| ) -> Dict[str, Dict[str, Dict[str, Any]]]: | ||||
| ) -> dict[str, dict[str, dict[str, Any]]]: | ||||
|     ''' | ||||
|     Return symbol info from broker. | ||||
| 
 | ||||
|     ''' | ||||
|     results = [] | ||||
|     results: list[str] = [] | ||||
| 
 | ||||
|     async def search_backend( | ||||
|         brokermod: ModuleType | ||||
|  | @ -143,6 +162,13 @@ async def symbol_search( | |||
| 
 | ||||
|         brokername: str = mod.name | ||||
| 
 | ||||
|         # TODO: figure this the FUCK OUT | ||||
|         # -> ok so obvi in the root actor any async task that's | ||||
|         # spawned outside the main tractor-root-actor task needs to | ||||
|         # call this.. | ||||
|         # await tractor.devx._debug.maybe_init_greenback() | ||||
|         # tractor.pause_from_sync() | ||||
| 
 | ||||
|         async with maybe_spawn_brokerd( | ||||
|             mod.name, | ||||
|             infect_asyncio=getattr( | ||||
|  | @ -162,7 +188,6 @@ async def symbol_search( | |||
|             )) | ||||
| 
 | ||||
|     async with trio.open_nursery() as n: | ||||
| 
 | ||||
|         for mod in brokermods: | ||||
|             n.start_soon(search_backend, mod.name) | ||||
| 
 | ||||
|  | @ -172,11 +197,13 @@ async def symbol_search( | |||
| async def mkt_info( | ||||
|     brokermod: ModuleType, | ||||
|     fqme: str, | ||||
| 
 | ||||
|     **kwargs, | ||||
| 
 | ||||
| ) -> MktPair: | ||||
|     ''' | ||||
|     Return MktPair info from broker including src and dst assets. | ||||
|     Return the `piker.accounting.MktPair` info struct from a given | ||||
|     backend broker tradable src/dst asset pair. | ||||
| 
 | ||||
|     ''' | ||||
|     async with open_cached_client(brokermod.name) as client: | ||||
|  |  | |||
|  | @ -587,7 +587,7 @@ async def get_bars( | |||
|                         data_cs.cancel() | ||||
| 
 | ||||
|                     # spawn new data reset task | ||||
|                     data_cs, reset_done = await nurse.start( | ||||
|                     data_cs, reset_done = await tn.start( | ||||
|                         partial( | ||||
|                             wait_on_data_reset, | ||||
|                             proxy, | ||||
|  | @ -607,11 +607,11 @@ async def get_bars( | |||
|     # such that simultaneous symbol queries don't try data resettingn | ||||
|     # too fast.. | ||||
|     unset_resetter: bool = False | ||||
|     async with trio.open_nursery() as nurse: | ||||
|     async with trio.open_nursery() as tn: | ||||
| 
 | ||||
|         # start history request that we allow | ||||
|         # to run indefinitely until a result is acquired | ||||
|         nurse.start_soon(query) | ||||
|         tn.start_soon(query) | ||||
| 
 | ||||
|         # start history reset loop which waits up to the timeout | ||||
|         # for a result before triggering a data feed reset. | ||||
|  | @ -631,7 +631,7 @@ async def get_bars( | |||
|                 unset_resetter: bool = True | ||||
| 
 | ||||
|             # spawn new data reset task | ||||
|             data_cs, reset_done = await nurse.start( | ||||
|             data_cs, reset_done = await tn.start( | ||||
|                 partial( | ||||
|                     wait_on_data_reset, | ||||
|                     proxy, | ||||
|  | @ -705,7 +705,9 @@ async def _setup_quote_stream( | |||
|         # to_trio, from_aio = trio.open_memory_channel(2**8)  # type: ignore | ||||
|         def teardown(): | ||||
|             ticker.updateEvent.disconnect(push) | ||||
|             log.error(f"Disconnected stream for `{symbol}`") | ||||
|             log.error( | ||||
|                 f'Disconnected stream for `{symbol}`' | ||||
|             ) | ||||
|             client.ib.cancelMktData(contract) | ||||
| 
 | ||||
|             # decouple broadcast mem chan | ||||
|  | @ -761,7 +763,10 @@ async def open_aio_quote_stream( | |||
|     symbol: str, | ||||
|     contract: Contract | None = None, | ||||
| 
 | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| ) -> ( | ||||
|     trio.abc.Channel|  # iface | ||||
|     tractor.to_asyncio.LinkedTaskChannel  # actually | ||||
| ): | ||||
| 
 | ||||
|     from tractor.trionics import broadcast_receiver | ||||
|     global _quote_streams | ||||
|  | @ -778,6 +783,7 @@ async def open_aio_quote_stream( | |||
|             yield from_aio | ||||
|             return | ||||
| 
 | ||||
|     from_aio: tractor.to_asyncio.LinkedTaskChannel | ||||
|     async with tractor.to_asyncio.open_channel_from( | ||||
|         _setup_quote_stream, | ||||
|         symbol=symbol, | ||||
|  | @ -983,17 +989,18 @@ async def stream_quotes( | |||
|         ) | ||||
|         cs: trio.CancelScope | None = None | ||||
|         startup: bool = True | ||||
|         iter_quotes: trio.abc.Channel | ||||
|         while ( | ||||
|             startup | ||||
|             or cs.cancel_called | ||||
|         ): | ||||
|             with trio.CancelScope() as cs: | ||||
|                 async with ( | ||||
|                     trio.open_nursery() as nurse, | ||||
|                     trio.open_nursery() as tn, | ||||
|                     open_aio_quote_stream( | ||||
|                         symbol=sym, | ||||
|                         contract=con, | ||||
|                     ) as stream, | ||||
|                     ) as iter_quotes, | ||||
|                 ): | ||||
|                     # ugh, clear ticks since we've consumed them | ||||
|                     # (ahem, ib_insync is stateful trash) | ||||
|  | @ -1021,9 +1028,9 @@ async def stream_quotes( | |||
|                         await rt_ev.wait() | ||||
|                         cs.cancel()  # cancel called should now be set | ||||
| 
 | ||||
|                     nurse.start_soon(reset_on_feed) | ||||
|                     tn.start_soon(reset_on_feed) | ||||
| 
 | ||||
|                     async with aclosing(stream): | ||||
|                     async with aclosing(iter_quotes): | ||||
|                         # if syminfo.get('no_vlm', False): | ||||
|                         if not init_msg.shm_write_opts['has_vlm']: | ||||
| 
 | ||||
|  | @ -1038,19 +1045,21 @@ async def stream_quotes( | |||
|                             # wait for real volume on feed (trading might be | ||||
|                             # closed) | ||||
|                             while True: | ||||
|                                 ticker = await stream.receive() | ||||
|                                 ticker = await iter_quotes.receive() | ||||
| 
 | ||||
|                                 # for a real volume contract we rait for | ||||
|                                 # the first "real" trade to take place | ||||
|                                 if ( | ||||
|                                     # not calc_price | ||||
|                                     # and not ticker.rtTime | ||||
|                                     not ticker.rtTime | ||||
|                                     False | ||||
|                                     # not ticker.rtTime | ||||
|                                 ): | ||||
|                                     # spin consuming tickers until we | ||||
|                                     # get a real market datum | ||||
|                                     log.debug(f"New unsent ticker: {ticker}") | ||||
|                                     continue | ||||
| 
 | ||||
|                                 else: | ||||
|                                     log.debug("Received first volume tick") | ||||
|                                     # ugh, clear ticks since we've | ||||
|  | @ -1066,13 +1075,18 @@ async def stream_quotes( | |||
|                             log.debug(f"First ticker received {quote}") | ||||
| 
 | ||||
|                         # tell data-layer spawner-caller that live | ||||
|                         # quotes are now streaming. | ||||
|                         # quotes are now active desptie not having | ||||
|                         # necessarily received a first vlm/clearing | ||||
|                         # tick. | ||||
|                         ticker = await iter_quotes.receive() | ||||
|                         feed_is_live.set() | ||||
|                         fqme: str = quote['fqme'] | ||||
|                         await send_chan.send({fqme: quote}) | ||||
| 
 | ||||
|                         # last = time.time() | ||||
|                         async for ticker in stream: | ||||
|                         async for ticker in iter_quotes: | ||||
|                             quote = normalize(ticker) | ||||
|                             fqme = quote['fqme'] | ||||
|                             fqme: str = quote['fqme'] | ||||
|                             await send_chan.send({fqme: quote}) | ||||
| 
 | ||||
|                             # ugh, clear ticks since we've consumed them | ||||
|  |  | |||
|  | @ -544,7 +544,7 @@ async def open_trade_dialog( | |||
|             # to be reloaded. | ||||
|             balances: dict[str, float] = await client.get_balances() | ||||
| 
 | ||||
|             verify_balances( | ||||
|             await verify_balances( | ||||
|                 acnt, | ||||
|                 src_fiat, | ||||
|                 balances, | ||||
|  |  | |||
|  | @ -37,6 +37,12 @@ import tractor | |||
| from async_generator import asynccontextmanager | ||||
| import numpy as np | ||||
| import wrapt | ||||
| 
 | ||||
| # TODO, port to `httpx`/`trio-websocket` whenver i get back to | ||||
| # writing a proper ws-api streamer for this backend (since the data | ||||
| # feeds are free now) as per GH feat-req: | ||||
| # https://github.com/pikers/piker/issues/509 | ||||
| # | ||||
| import asks | ||||
| 
 | ||||
| from ..calc import humanize, percent_change | ||||
|  |  | |||
|  | @ -273,7 +273,7 @@ async def _reconnect_forever( | |||
|                 nobsws._connected.set() | ||||
|                 await trio.sleep_forever() | ||||
|         except HandshakeError: | ||||
|             log.exception(f'Retrying connection') | ||||
|             log.exception('Retrying connection') | ||||
| 
 | ||||
|         # ws & nursery block ends | ||||
| 
 | ||||
|  | @ -359,8 +359,8 @@ async def open_autorecon_ws( | |||
| 
 | ||||
| 
 | ||||
| ''' | ||||
| JSONRPC response-request style machinery for transparent multiplexing of msgs | ||||
| over a NoBsWs. | ||||
| JSONRPC response-request style machinery for transparent multiplexing | ||||
| of msgs over a `NoBsWs`. | ||||
| 
 | ||||
| ''' | ||||
| 
 | ||||
|  | @ -377,43 +377,82 @@ async def open_jsonrpc_session( | |||
|     url: str, | ||||
|     start_id: int = 0, | ||||
|     response_type: type = JSONRPCResult, | ||||
|     request_type: Optional[type] = None, | ||||
|     request_hook: Optional[Callable] = None, | ||||
|     error_hook: Optional[Callable] = None, | ||||
|     msg_recv_timeout: float = float('inf'), | ||||
|     # ^NOTE, since only `deribit` is using this jsonrpc stuff atm | ||||
|     # and options mkts are generally "slow moving".. | ||||
|     # | ||||
|     # FURTHER if we break the underlying ws connection then since we | ||||
|     # don't pass a `fixture` to the task that manages `NoBsWs`, i.e. | ||||
|     # `_reconnect_forever()`, the jsonrpc "transport pipe" get's | ||||
|     # broken and never restored with wtv init sequence is required to | ||||
|     # re-establish a working req-resp session. | ||||
| 
 | ||||
| ) -> Callable[[str, dict], dict]: | ||||
|     ''' | ||||
|     Init a json-RPC-over-websocket connection to the provided `url`. | ||||
| 
 | ||||
|     A `json_rpc: Callable[[str, dict], dict` is delivered to the | ||||
|     caller for sending requests and a bg-`trio.Task` handles | ||||
|     processing of response msgs including error reporting/raising in | ||||
|     the parent/caller task. | ||||
| 
 | ||||
|     ''' | ||||
|     # NOTE, store all request msgs so we can raise errors on the | ||||
|     # caller side! | ||||
|     req_msgs: dict[int, dict] = {} | ||||
| 
 | ||||
|     async with ( | ||||
|         trio.open_nursery() as n, | ||||
|         open_autorecon_ws(url) as ws | ||||
|         trio.open_nursery() as tn, | ||||
|         open_autorecon_ws( | ||||
|             url=url, | ||||
|             msg_recv_timeout=msg_recv_timeout, | ||||
|         ) as ws | ||||
|     ): | ||||
|         rpc_id: Iterable = count(start_id) | ||||
|         rpc_id: Iterable[int] = count(start_id) | ||||
|         rpc_results: dict[int, dict] = {} | ||||
| 
 | ||||
|         async def json_rpc(method: str, params: dict) -> dict: | ||||
|         async def json_rpc( | ||||
|             method: str, | ||||
|             params: dict, | ||||
|         ) -> dict: | ||||
|             ''' | ||||
|             perform a json rpc call and wait for the result, raise exception in | ||||
|             case of error field present on response | ||||
|             ''' | ||||
|             nonlocal req_msgs | ||||
| 
 | ||||
|             req_id: int = next(rpc_id) | ||||
|             msg = { | ||||
|                 'jsonrpc': '2.0', | ||||
|                 'id': next(rpc_id), | ||||
|                 'id': req_id, | ||||
|                 'method': method, | ||||
|                 'params': params | ||||
|             } | ||||
|             _id = msg['id'] | ||||
| 
 | ||||
|             rpc_results[_id] = { | ||||
|             result = rpc_results[_id] = { | ||||
|                 'result': None, | ||||
|                 'event': trio.Event() | ||||
|                 'error': None, | ||||
|                 'event': trio.Event(),  # signal caller resp arrived | ||||
|             } | ||||
|             req_msgs[_id] = msg | ||||
| 
 | ||||
|             await ws.send_msg(msg) | ||||
| 
 | ||||
|             # wait for reponse before unblocking requester code | ||||
|             await rpc_results[_id]['event'].wait() | ||||
| 
 | ||||
|             ret = rpc_results[_id]['result'] | ||||
|             if (maybe_result := result['result']): | ||||
|                 ret = maybe_result | ||||
|                 del rpc_results[_id] | ||||
| 
 | ||||
|             del rpc_results[_id] | ||||
|             else: | ||||
|                 err = result['error'] | ||||
|                 raise Exception( | ||||
|                     f'JSONRPC request failed\n' | ||||
|                     f'req: {msg}\n' | ||||
|                     f'resp: {err}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             if ret.error is not None: | ||||
|                 raise Exception(json.dumps(ret.error, indent=4)) | ||||
|  | @ -428,6 +467,7 @@ async def open_jsonrpc_session( | |||
|             the server side. | ||||
| 
 | ||||
|             ''' | ||||
|             nonlocal req_msgs | ||||
|             async for msg in ws: | ||||
|                 match msg: | ||||
|                     case { | ||||
|  | @ -451,19 +491,28 @@ async def open_jsonrpc_session( | |||
|                         'params': _, | ||||
|                     }: | ||||
|                         log.debug(f'Recieved\n{msg}') | ||||
|                         if request_hook: | ||||
|                             await request_hook(request_type(**msg)) | ||||
| 
 | ||||
|                     case { | ||||
|                         'error': error | ||||
|                     }: | ||||
|                         log.warning(f'Recieved\n{error}') | ||||
|                         if error_hook: | ||||
|                             await error_hook(response_type(**msg)) | ||||
|                         # retreive orig request msg, set error | ||||
|                         # response in original "result" msg, | ||||
|                         # THEN FINALLY set the event to signal caller | ||||
|                         # to raise the error in the parent task. | ||||
|                         req_id: int = error['id'] | ||||
|                         req_msg: dict = req_msgs[req_id] | ||||
|                         result: dict = rpc_results[req_id] | ||||
|                         result['error'] = error | ||||
|                         result['event'].set() | ||||
|                         log.error( | ||||
|                             f'JSONRPC request failed\n' | ||||
|                             f'req: {req_msg}\n' | ||||
|                             f'resp: {error}\n' | ||||
|                         ) | ||||
| 
 | ||||
|                     case _: | ||||
|                         log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') | ||||
| 
 | ||||
|         n.start_soon(recv_task) | ||||
|         tn.start_soon(recv_task) | ||||
|         yield json_rpc | ||||
|         n.cancel_scope.cancel() | ||||
|         tn.cancel_scope.cancel() | ||||
|  |  | |||
|  | @ -386,6 +386,8 @@ def ldshm( | |||
|             open_annot_ctl() as actl, | ||||
|         ): | ||||
|             shm_df: pl.DataFrame | None = None | ||||
|             tf2aids: dict[float, dict] = {} | ||||
| 
 | ||||
|             for ( | ||||
|                 shmfile, | ||||
|                 shm, | ||||
|  | @ -526,16 +528,17 @@ def ldshm( | |||
|                             new_df, | ||||
|                             step_gaps, | ||||
|                         ) | ||||
| 
 | ||||
|                         # last chance manual overwrites in REPL | ||||
|                         await tractor.pause() | ||||
|                         # await tractor.pause() | ||||
|                         assert aids | ||||
|                         tf2aids[period_s] = aids | ||||
| 
 | ||||
|                 else: | ||||
|                     # allow interaction even when no ts problems. | ||||
|                     await tractor.pause() | ||||
|                     # assert not diff | ||||
|                     assert not diff | ||||
| 
 | ||||
|             await tractor.pause() | ||||
|             log.info('Exiting TSP shm anal-izer!') | ||||
| 
 | ||||
|             if shm_df is None: | ||||
|                 log.error( | ||||
|  |  | |||
|  | @ -161,7 +161,13 @@ class NativeStorageClient: | |||
| 
 | ||||
|     def index_files(self): | ||||
|         for path in self._datadir.iterdir(): | ||||
|             if path.name in {'borked', 'expired',}: | ||||
|             if ( | ||||
|                 path.is_dir() | ||||
|                 or | ||||
|                 '.parquet' not in str(path) | ||||
|                 # or | ||||
|                 # path.name in {'borked', 'expired',} | ||||
|             ): | ||||
|                 continue | ||||
| 
 | ||||
|             key: str = path.name.rstrip('.parquet') | ||||
|  |  | |||
|  | @ -44,8 +44,10 @@ import trio | |||
| from trio_typing import TaskStatus | ||||
| import tractor | ||||
| from pendulum import ( | ||||
|     Interval, | ||||
|     DateTime, | ||||
|     Duration, | ||||
|     duration as mk_duration, | ||||
|     from_timestamp, | ||||
| ) | ||||
| import numpy as np | ||||
|  | @ -214,7 +216,8 @@ async def maybe_fill_null_segments( | |||
|         # pair, immediately stop backfilling? | ||||
|         if ( | ||||
|             start_dt | ||||
|             and end_dt < start_dt | ||||
|             and | ||||
|             end_dt < start_dt | ||||
|         ): | ||||
|             await tractor.pause() | ||||
|             break | ||||
|  | @ -262,6 +265,7 @@ async def maybe_fill_null_segments( | |||
|         except tractor.ContextCancelled: | ||||
|             # log.exception | ||||
|             await tractor.pause() | ||||
|             raise | ||||
| 
 | ||||
|     null_segs_detected.set() | ||||
|     # RECHECK for more null-gaps | ||||
|  | @ -349,7 +353,7 @@ async def maybe_fill_null_segments( | |||
| 
 | ||||
| async def start_backfill( | ||||
|     get_hist, | ||||
|     frame_types: dict[str, Duration] | None, | ||||
|     def_frame_duration: Duration, | ||||
|     mod: ModuleType, | ||||
|     mkt: MktPair, | ||||
|     shm: ShmArray, | ||||
|  | @ -379,22 +383,23 @@ async def start_backfill( | |||
|         update_start_on_prepend: bool = False | ||||
|         if backfill_until_dt is None: | ||||
| 
 | ||||
|             # TODO: drop this right and just expose the backfill | ||||
|             # limits inside a [storage] section in conf.toml? | ||||
|             # when no tsdb "last datum" is provided, we just load | ||||
|             # some near-term history. | ||||
|             # periods = { | ||||
|             #     1: {'days': 1}, | ||||
|             #     60: {'days': 14}, | ||||
|             # } | ||||
| 
 | ||||
|             # do a decently sized backfill and load it into storage. | ||||
|             # TODO: per-provider default history-durations? | ||||
|             # -[ ] inside the `open_history_client()` config allow | ||||
|             #    declaring the history duration limits instead of | ||||
|             #    guessing and/or applying the same limits to all? | ||||
|             # | ||||
|             # -[ ] allow declaring (default) per-provider backfill | ||||
|             #     limits inside a [storage] sub-section in conf.toml? | ||||
|             # | ||||
|             # NOTE, when no tsdb "last datum" is provided, we just | ||||
|             # load some near-term history by presuming a "decently | ||||
|             # large" 60s duration limit and a much shorter 1s range. | ||||
|             periods = { | ||||
|                 1: {'days': 2}, | ||||
|                 60: {'years': 6}, | ||||
|             } | ||||
|             period_duration: int = periods[timeframe] | ||||
|             update_start_on_prepend = True | ||||
|             update_start_on_prepend: bool = True | ||||
| 
 | ||||
|             # NOTE: manually set the "latest" datetime which we intend to | ||||
|             # backfill history "until" so as to adhere to the history | ||||
|  | @ -416,7 +421,6 @@ async def start_backfill( | |||
|                 f'backfill_until_dt: {backfill_until_dt}\n' | ||||
|                 f'last_start_dt: {last_start_dt}\n' | ||||
|             ) | ||||
| 
 | ||||
|             try: | ||||
|                 ( | ||||
|                     array, | ||||
|  | @ -426,71 +430,114 @@ async def start_backfill( | |||
|                     timeframe, | ||||
|                     end_dt=last_start_dt, | ||||
|                 ) | ||||
| 
 | ||||
|             except NoData as _daterr: | ||||
|                 # 3 cases: | ||||
|                 # - frame in the middle of a legit venue gap | ||||
|                 # - history actually began at the `last_start_dt` | ||||
|                 # - some other unknown error (ib blocking the | ||||
|                 #   history bc they don't want you seeing how they | ||||
|                 #   cucked all the tinas..) | ||||
|                 if dur := frame_types.get(timeframe): | ||||
|                     # decrement by a frame's worth of duration and | ||||
|                     # retry a few times. | ||||
|                     last_start_dt.subtract( | ||||
|                         seconds=dur.total_seconds() | ||||
|                 orig_last_start_dt: datetime = last_start_dt | ||||
|                 gap_report: str = ( | ||||
|                     f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' | ||||
|                     f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                     f'last_start_dt: {orig_last_start_dt}\n\n' | ||||
|                     f'bf_until: {backfill_until_dt}\n' | ||||
|                 ) | ||||
|                 # EMPTY FRAME signal with 3 (likely) causes: | ||||
|                 # | ||||
|                 # 1. range contains legit gap in venue history | ||||
|                 # 2. history actually (edge case) **began** at the | ||||
|                 #    value `last_start_dt` | ||||
|                 # 3. some other unknown error (ib blocking the | ||||
|                 #    history-query bc they don't want you seeing how | ||||
|                 #    they cucked all the tinas.. like with options | ||||
|                 #    hist) | ||||
|                 # | ||||
|                 if def_frame_duration: | ||||
|                     # decrement by a duration's (frame) worth of time | ||||
|                     # as maybe indicated by the backend to see if we | ||||
|                     # can get older data before this possible | ||||
|                     # "history gap". | ||||
|                     last_start_dt: datetime = last_start_dt.subtract( | ||||
|                         seconds=def_frame_duration.total_seconds() | ||||
|                     ) | ||||
|                     log.warning( | ||||
|                         f'{mod.name} -> EMPTY FRAME for end_dt?\n' | ||||
|                         f'tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                         'bf_until <- last_start_dt:\n' | ||||
|                         f'{backfill_until_dt} <- {last_start_dt}\n' | ||||
|                         f'Decrementing `end_dt` by {dur} and retry..\n' | ||||
|                     gap_report += ( | ||||
|                         f'Decrementing `end_dt` and retrying with,\n' | ||||
|                         f'def_frame_duration: {def_frame_duration}\n' | ||||
|                         f'(new) last_start_dt: {last_start_dt}\n' | ||||
|                     ) | ||||
|                     log.warning(gap_report) | ||||
|                     # skip writing to shm/tsdb and try the next | ||||
|                     # duration's worth of prior history. | ||||
|                     continue | ||||
| 
 | ||||
|             # broker says there never was or is no more history to pull | ||||
|             except DataUnavailable: | ||||
|                 log.warning( | ||||
|                     f'NO-MORE-DATA in range?\n' | ||||
|                     f'`{mod.name}` halted history:\n' | ||||
|                     f'tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                     'bf_until <- last_start_dt:\n' | ||||
|                     f'{backfill_until_dt} <- {last_start_dt}\n' | ||||
|                 ) | ||||
|                 else: | ||||
|                     # await tractor.pause() | ||||
|                     raise DataUnavailable(gap_report) | ||||
| 
 | ||||
|                 # ugh, what's a better way? | ||||
|                 # TODO: fwiw, we probably want a way to signal a throttle | ||||
|                 # condition (eg. with ib) so that we can halt the | ||||
|                 # request loop until the condition is resolved? | ||||
|                 if timeframe > 1: | ||||
|                     await tractor.pause() | ||||
|             # broker says there never was or is no more history to pull | ||||
|             except DataUnavailable as due: | ||||
|                 message: str = due.args[0] | ||||
|                 log.warning( | ||||
|                     f'Provider {mod.name!r} halted backfill due to,\n\n' | ||||
| 
 | ||||
|                     f'{message}\n' | ||||
| 
 | ||||
|                     f'fqme: {mkt.fqme}\n' | ||||
|                     f'timeframe: {timeframe}\n' | ||||
|                     f'last_start_dt: {last_start_dt}\n' | ||||
|                     f'bf_until: {backfill_until_dt}\n' | ||||
|                 ) | ||||
|                 # UGH: what's a better way? | ||||
|                 # TODO: backends are responsible for being correct on | ||||
|                 # this right!? | ||||
|                 # -[ ] in the `ib` case we could maybe offer some way | ||||
|                 #     to halt the request loop until the condition is | ||||
|                 #     resolved or should the backend be entirely in | ||||
|                 #     charge of solving such faults? yes, right? | ||||
|                 return | ||||
| 
 | ||||
|             time: np.ndarray = array['time'] | ||||
|             assert ( | ||||
|                 array['time'][0] | ||||
|                 time[0] | ||||
|                 == | ||||
|                 next_start_dt.timestamp() | ||||
|             ) | ||||
| 
 | ||||
|             diff = last_start_dt - next_start_dt | ||||
|             frame_time_diff_s = diff.seconds | ||||
|             assert time[-1] == next_end_dt.timestamp() | ||||
| 
 | ||||
|             expected_dur: Interval = last_start_dt - next_start_dt | ||||
| 
 | ||||
|             # frame's worth of sample-period-steps, in seconds | ||||
|             frame_size_s: float = len(array) * timeframe | ||||
|             expected_frame_size_s: float = frame_size_s + timeframe | ||||
|             if frame_time_diff_s > expected_frame_size_s: | ||||
| 
 | ||||
|             recv_frame_dur: Duration = ( | ||||
|                 from_timestamp(array[-1]['time']) | ||||
|                 - | ||||
|                 from_timestamp(array[0]['time']) | ||||
|             ) | ||||
|             if ( | ||||
|                 (lt_frame := (recv_frame_dur < expected_dur)) | ||||
|                 or | ||||
|                 (null_frame := (frame_size_s == 0)) | ||||
|                 # ^XXX, should NEVER hit now! | ||||
|             ): | ||||
|                 # XXX: query result includes a start point prior to our | ||||
|                 # expected "frame size" and thus is likely some kind of | ||||
|                 # history gap (eg. market closed period, outage, etc.) | ||||
|                 # so just report it to console for now. | ||||
|                 if lt_frame: | ||||
|                     reason = 'Possible GAP (or first-datum)' | ||||
|                 else: | ||||
|                     assert null_frame | ||||
|                     reason = 'NULL-FRAME' | ||||
| 
 | ||||
|                 missing_dur: Interval = expected_dur.end - recv_frame_dur.end | ||||
|                 log.warning( | ||||
|                     'GAP DETECTED:\n' | ||||
|                     f'last_start_dt: {last_start_dt}\n' | ||||
|                     f'diff: {diff}\n' | ||||
|                     f'frame_time_diff_s: {frame_time_diff_s}\n' | ||||
|                     f'{timeframe}s-series {reason} detected!\n' | ||||
|                     f'fqme: {mkt.fqme}\n' | ||||
|                     f'last_start_dt: {last_start_dt}\n\n' | ||||
|                     f'recv interval: {recv_frame_dur}\n' | ||||
|                     f'expected interval: {expected_dur}\n\n' | ||||
| 
 | ||||
|                     f'Missing duration of history of {missing_dur.in_words()!r}\n' | ||||
|                     f'{missing_dur}\n' | ||||
|                 ) | ||||
|                 # await tractor.pause() | ||||
| 
 | ||||
|             to_push = diff_history( | ||||
|                 array, | ||||
|  | @ -565,22 +612,27 @@ async def start_backfill( | |||
|             # long-term storage. | ||||
|             if ( | ||||
|                 storage is not None | ||||
|                 and write_tsdb | ||||
|                 and | ||||
|                 write_tsdb | ||||
|             ): | ||||
|                 log.info( | ||||
|                     f'Writing {ln} frame to storage:\n' | ||||
|                     f'{next_start_dt} -> {last_start_dt}' | ||||
|                 ) | ||||
| 
 | ||||
|                 # always drop the src asset token for | ||||
|                 # NOTE, always drop the src asset token for | ||||
|                 # non-currency-pair like market types (for now) | ||||
|                 # | ||||
|                 # THAT IS, for now our table key schema is NOT | ||||
|                 # including the dst[/src] source asset token. SO, | ||||
|                 # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for | ||||
|                 # historical reasons ONLY. | ||||
|                 if mkt.dst.atype not in { | ||||
|                     'crypto', | ||||
|                     'crypto_currency', | ||||
|                     'fiat',  # a "forex pair" | ||||
|                     'perpetual_future',  # stupid "perps" from cex land | ||||
|                 }: | ||||
|                     # for now, our table key schema is not including | ||||
|                     # the dst[/src] source asset token. | ||||
|                     col_sym_key: str = mkt.get_fqme( | ||||
|                         delim_char='', | ||||
|                         without_src=True, | ||||
|  | @ -685,7 +737,7 @@ async def back_load_from_tsdb( | |||
|         last_tsdb_dt | ||||
|         and latest_start_dt | ||||
|     ): | ||||
|         backfilled_size_s = ( | ||||
|         backfilled_size_s: Duration = ( | ||||
|             latest_start_dt - last_tsdb_dt | ||||
|         ).seconds | ||||
|         # if the shm buffer len is not large enough to contain | ||||
|  | @ -908,6 +960,8 @@ async def tsdb_backfill( | |||
|             f'{pformat(config)}\n' | ||||
|         ) | ||||
| 
 | ||||
|         # concurrently load the provider's most-recent-frame AND any | ||||
|         # pre-existing tsdb history already saved in `piker` storage. | ||||
|         dt_eps: list[DateTime, DateTime] = [] | ||||
|         async with trio.open_nursery() as tn: | ||||
|             tn.start_soon( | ||||
|  | @ -918,7 +972,6 @@ async def tsdb_backfill( | |||
|                 timeframe, | ||||
|                 config, | ||||
|             ) | ||||
| 
 | ||||
|             tsdb_entry: tuple = await load_tsdb_hist( | ||||
|                 storage, | ||||
|                 mkt, | ||||
|  | @ -947,6 +1000,25 @@ async def tsdb_backfill( | |||
|                 mr_end_dt, | ||||
|             ) = dt_eps | ||||
| 
 | ||||
|             first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds | ||||
|             calced_frame_size: Duration = mk_duration( | ||||
|                 seconds=first_frame_dur_s, | ||||
|             ) | ||||
|             # NOTE, attempt to use the backend declared default frame | ||||
|             # sizing (as allowed by their time-series query APIs) and | ||||
|             # if not provided try to construct a default from the | ||||
|             # first frame received above. | ||||
|             def_frame_durs: dict[ | ||||
|                 int, | ||||
|                 Duration, | ||||
|             ]|None = config.get('frame_types', None) | ||||
|             if def_frame_durs: | ||||
|                 def_frame_size: Duration = def_frame_durs[timeframe] | ||||
|                 assert def_frame_size == calced_frame_size | ||||
|             else: | ||||
|                 # use what we calced from first frame above. | ||||
|                 def_frame_size = calced_frame_size | ||||
| 
 | ||||
|             # NOTE: when there's no offline data, there's 2 cases: | ||||
|             # - data backend doesn't support timeframe/sample | ||||
|             #   period (in which case `dt_eps` should be `None` and | ||||
|  | @ -977,7 +1049,7 @@ async def tsdb_backfill( | |||
|                     partial( | ||||
|                         start_backfill, | ||||
|                         get_hist=get_hist, | ||||
|                         frame_types=config.get('frame_types', None), | ||||
|                         def_frame_duration=def_frame_size, | ||||
|                         mod=mod, | ||||
|                         mkt=mkt, | ||||
|                         shm=shm, | ||||
|  |  | |||
|  | @ -616,6 +616,18 @@ def detect_price_gaps( | |||
|     # ]) | ||||
|     ... | ||||
| 
 | ||||
| # TODO: probably just use the null_segs impl above? | ||||
| def detect_vlm_gaps( | ||||
|     df: pl.DataFrame, | ||||
|     col: str = 'volume', | ||||
| 
 | ||||
| ) -> pl.DataFrame: | ||||
| 
 | ||||
|     vnull: pl.DataFrame = w_dts.filter( | ||||
|         pl.col(col) == 0 | ||||
|     ) | ||||
|     return vnull | ||||
| 
 | ||||
| 
 | ||||
| def dedupe( | ||||
|     src_df: pl.DataFrame, | ||||
|  | @ -626,7 +638,6 @@ def dedupe( | |||
| 
 | ||||
| ) -> tuple[ | ||||
|     pl.DataFrame,  # with dts | ||||
|     pl.DataFrame,  # gaps | ||||
|     pl.DataFrame,  # with deduplicated dts (aka gap/repeat removal) | ||||
|     int,  # len diff between input and deduped | ||||
| ]: | ||||
|  | @ -639,19 +650,22 @@ def dedupe( | |||
|     ''' | ||||
|     wdts: pl.DataFrame = with_dts(src_df) | ||||
| 
 | ||||
|     # maybe sort on any time field | ||||
|     if sort: | ||||
|         wdts = wdts.sort(by='time') | ||||
|         # TODO: detect out-of-order segments which were corrected! | ||||
|         # -[ ] report in log msg | ||||
|         # -[ ] possibly return segment sections which were moved? | ||||
|     deduped = wdts | ||||
| 
 | ||||
|     # remove duplicated datetime samples/sections | ||||
|     deduped: pl.DataFrame = wdts.unique( | ||||
|         subset=['dt'], | ||||
|         # subset=['dt'], | ||||
|         subset=['time'], | ||||
|         maintain_order=True, | ||||
|     ) | ||||
| 
 | ||||
|     # maybe sort on any time field | ||||
|     if sort: | ||||
|         deduped = deduped.sort(by='time') | ||||
|         # TODO: detect out-of-order segments which were corrected! | ||||
|         # -[ ] report in log msg | ||||
|         # -[ ] possibly return segment sections which were moved? | ||||
| 
 | ||||
|     diff: int = ( | ||||
|         wdts.height | ||||
|         - | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue