Compare commits
	
		
			14 Commits 
		
	
	
		
			d475347e53
			...
			3d00d27a12
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 3d00d27a12 | |
|  | d545641d06 | |
|  | 54d4d9fd56 | |
|  | 5fc56cff98 | |
|  | f04d2d27fd | |
|  | 1d19f01b79 | |
|  | bceab54adb | |
|  | 84b1189e1a | |
|  | 1e16642c12 | |
|  | 7eb779a682 | |
|  | a82f2fa578 | |
|  | 193f5c1ae9 | |
|  | 8901360695 | |
|  | a14b599835 | 
|  | @ -29,6 +29,7 @@ import time | |||
| from typing import ( | ||||
|     Any, | ||||
|     AsyncIterator, | ||||
|     Callable, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
|  | @ -53,6 +54,9 @@ from ._util import ( | |||
|     get_console_log, | ||||
| ) | ||||
| from ..service import maybe_spawn_daemon | ||||
| from piker.log import ( | ||||
|     mk_repr, | ||||
| ) | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from ._sharedmem import ( | ||||
|  | @ -561,7 +565,6 @@ async def open_sample_stream( | |||
| 
 | ||||
| 
 | ||||
| async def sample_and_broadcast( | ||||
| 
 | ||||
|     bus: _FeedsBus,  # noqa | ||||
|     rt_shm: ShmArray, | ||||
|     hist_shm: ShmArray, | ||||
|  | @ -582,11 +585,22 @@ async def sample_and_broadcast( | |||
| 
 | ||||
|     overruns = Counter() | ||||
| 
 | ||||
|     # multiline nested `dict` formatter (since rn quote-msgs are | ||||
|     # just that). | ||||
|     pfmt: Callable[[str], str] = mk_repr() | ||||
| 
 | ||||
|     # iterate stream delivered by broker | ||||
|     async for quotes in quote_stream: | ||||
|         # print(quotes) | ||||
| 
 | ||||
|         # TODO: ``numba`` this! | ||||
|         # XXX WARNING XXX only enable for debugging bc ow can cost | ||||
|         # ALOT of perf with HF-feedz!!! | ||||
|         # | ||||
|         # log.info( | ||||
|         #     'Rx live quotes:\n' | ||||
|         #     f'{pfmt(quotes)}' | ||||
|         # ) | ||||
| 
 | ||||
|         # TODO: `numba` this! | ||||
|         for broker_symbol, quote in quotes.items(): | ||||
|             # TODO: in theory you can send the IPC msg *before* writing | ||||
|             # to the sharedmem array to decrease latency, however, that | ||||
|  | @ -659,6 +673,18 @@ async def sample_and_broadcast( | |||
|             sub_key: str = broker_symbol.lower() | ||||
|             subs: set[Sub] = bus.get_subs(sub_key) | ||||
| 
 | ||||
|             if not subs: | ||||
|                 all_bs_fqmes: list[str] = list( | ||||
|                     bus._subscribers.keys() | ||||
|                 ) | ||||
|                 log.warning( | ||||
|                     f'No subscribers for {brokername!r} live-quote ??\n' | ||||
|                     f'broker_symbol: {broker_symbol}\n\n' | ||||
| 
 | ||||
|                     f'Maybe the backend-sys symbol does not match one of,\n' | ||||
|                     f'{pfmt(all_bs_fqmes)}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             # NOTE: by default the broker backend doesn't append | ||||
|             # it's own "name" into the fqme schema (but maybe it | ||||
|             # should?) so we have to manually generate the correct | ||||
|  |  | |||
|  | @ -359,8 +359,8 @@ async def open_autorecon_ws( | |||
| 
 | ||||
| 
 | ||||
| ''' | ||||
| JSONRPC response-request style machinery for transparent multiplexing of msgs | ||||
| over a NoBsWs. | ||||
| JSONRPC response-request style machinery for transparent multiplexing | ||||
| of msgs over a `NoBsWs`. | ||||
| 
 | ||||
| ''' | ||||
| 
 | ||||
|  | @ -377,19 +377,35 @@ async def open_jsonrpc_session( | |||
|     url: str, | ||||
|     start_id: int = 0, | ||||
|     response_type: type = JSONRPCResult, | ||||
|     request_type: Optional[type] = None, | ||||
|     request_hook: Optional[Callable] = None, | ||||
|     error_hook: Optional[Callable] = None, | ||||
|     msg_recv_timeout: float = float('inf'), | ||||
|     # ^NOTE, since only `deribit` is using this jsonrpc stuff atm | ||||
|     # and options mkts are generally "slow moving".. | ||||
|     # | ||||
|     # FURTHER if we break the underlying ws connection then since we | ||||
|     # don't pass a `fixture` to the task that manages `NoBsWs`, i.e. | ||||
|     # `_reconnect_forever()`, the jsonrpc "transport pipe" get's | ||||
|     # broken and never restored with wtv init sequence is required to | ||||
|     # re-establish a working req-resp session. | ||||
| 
 | ||||
|     # request_type: Optional[type] = None, | ||||
|     # request_hook: Optional[Callable] = None, | ||||
|     # error_hook: Optional[Callable] = None, | ||||
| ) -> Callable[[str, dict], dict]: | ||||
| 
 | ||||
|     async with ( | ||||
|         trio.open_nursery() as n, | ||||
|         open_autorecon_ws(url) as ws | ||||
|         open_autorecon_ws( | ||||
|             url=url, | ||||
|             msg_recv_timeout=msg_recv_timeout, | ||||
|         ) as ws | ||||
|     ): | ||||
|         rpc_id: Iterable = count(start_id) | ||||
|         rpc_results: dict[int, dict] = {} | ||||
| 
 | ||||
|         async def json_rpc(method: str, params: dict) -> dict: | ||||
|         async def json_rpc( | ||||
|             method: str, | ||||
|             params: dict, | ||||
|         ) -> dict: | ||||
|             ''' | ||||
|             perform a json rpc call and wait for the result, raise exception in | ||||
|             case of error field present on response | ||||
|  |  | |||
|  | @ -540,7 +540,10 @@ async def open_feed_bus( | |||
|         # subscription since the backend isn't (yet) expected to | ||||
|         # append it's own name to the fqme, so we filter on keys | ||||
|         # which *do not* include that name (e.g .ib) . | ||||
|         bus._subscribers.setdefault(bs_fqme, set()) | ||||
|         bus._subscribers.setdefault( | ||||
|             bs_fqme, | ||||
|             set(), | ||||
|         ) | ||||
| 
 | ||||
|     # sync feed subscribers with flume handles | ||||
|     await ctx.started( | ||||
|  |  | |||
							
								
								
									
										28
									
								
								piker/log.py
								
								
								
								
							
							
						
						
									
										28
									
								
								piker/log.py
								
								
								
								
							|  | @ -18,7 +18,11 @@ | |||
| Log like a forester! | ||||
| """ | ||||
| import logging | ||||
| import reprlib | ||||
| import json | ||||
| from typing import ( | ||||
|     Callable, | ||||
| ) | ||||
| 
 | ||||
| import tractor | ||||
| from pygments import ( | ||||
|  | @ -84,3 +88,27 @@ def colorize_json( | |||
|         # likeable styles: algol_nu, tango, monokai | ||||
|         formatters.TerminalTrueColorFormatter(style=style) | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def mk_repr( | ||||
|     **repr_kws, | ||||
| ) -> Callable[[str], str]: | ||||
|     ''' | ||||
|     Allocate and deliver a `repr.Repr` instance with provided input | ||||
|     settings using the std-lib's `reprlib` mod, | ||||
|      * https://docs.python.org/3/library/reprlib.html | ||||
| 
 | ||||
|     ------ Ex. ------ | ||||
|     An up to 6-layer-nested `dict` as multi-line: | ||||
|     - https://stackoverflow.com/a/79102479 | ||||
|     - https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel | ||||
| 
 | ||||
|     ''' | ||||
|     def_kws: dict[str, int] = dict( | ||||
|         indent=2, | ||||
|         maxlevel=6,  # recursion levels | ||||
|         maxstring=66,  # match editor line-len limit | ||||
|     ) | ||||
|     def_kws |= repr_kws | ||||
|     reprr = reprlib.Repr(**def_kws) | ||||
|     return reprr.repr | ||||
|  |  | |||
|  | @ -161,7 +161,12 @@ class NativeStorageClient: | |||
| 
 | ||||
|     def index_files(self): | ||||
|         for path in self._datadir.iterdir(): | ||||
|             if path.name in {'borked', 'expired',}: | ||||
|             if ( | ||||
|                 path.name in {'borked', 'expired',} | ||||
|                 or | ||||
|                 '.parquet' not in str(path) | ||||
|             ): | ||||
|                 # ignore all non-apache files (for now) | ||||
|                 continue | ||||
| 
 | ||||
|             key: str = path.name.rstrip('.parquet') | ||||
|  |  | |||
|  | @ -434,21 +434,32 @@ async def start_backfill( | |||
|                 # - some other unknown error (ib blocking the | ||||
|                 #   history bc they don't want you seeing how they | ||||
|                 #   cucked all the tinas..) | ||||
|                 if dur := frame_types.get(timeframe): | ||||
|                     # decrement by a frame's worth of duration and | ||||
|                     # retry a few times. | ||||
|                     last_start_dt.subtract( | ||||
|                 if ( | ||||
|                     frame_types | ||||
|                     and | ||||
|                     (dur := frame_types.get(timeframe)) | ||||
|                 ): | ||||
|                     # decrement by a duration's (frame) worth of time | ||||
|                     # as maybe indicated by the backend to see if we | ||||
|                     # can get older data before this possible | ||||
|                     # "history gap". | ||||
|                     orig_last_start_dt = last_start_dt | ||||
|                     last_start_dt = last_start_dt.subtract( | ||||
|                         seconds=dur.total_seconds() | ||||
|                     ) | ||||
|                     log.warning( | ||||
|                         f'{mod.name} -> EMPTY FRAME for end_dt?\n' | ||||
|                         f'tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                         'bf_until <- last_start_dt:\n' | ||||
|                         f'{backfill_until_dt} <- {last_start_dt}\n' | ||||
|                         f'Decrementing `end_dt` by {dur} and retry..\n' | ||||
|                         f'Decrementing `end_dt` by {dur} and retry..\n\n' | ||||
| 
 | ||||
|                         f'orig_last_start_dt: {orig_last_start_dt}\n' | ||||
|                         f'dur subtracted last_start_dt: {last_start_dt}\n' | ||||
|                         f'bf_until: {backfill_until_dt}\n' | ||||
|                     ) | ||||
|                     continue | ||||
| 
 | ||||
|                 raise | ||||
| 
 | ||||
|             # broker says there never was or is no more history to pull | ||||
|             except DataUnavailable: | ||||
|                 log.warning( | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue