Add first-draft `trimeter` based concurrent ohlc history fetching
							parent
							
								
									d77cfa3587
								
							
						
					
					
						commit
						7b1c0939bd
					
				|  | @ -35,6 +35,7 @@ from typing import ( | |||
| import trio | ||||
| from trio.abc import ReceiveChannel | ||||
| from trio_typing import TaskStatus | ||||
| import trimeter | ||||
| import tractor | ||||
| from pydantic import BaseModel | ||||
| import pendulum | ||||
|  | @ -263,32 +264,66 @@ async def start_backfill( | |||
|         # let caller unblock and deliver latest history frame | ||||
|         task_status.started((shm, start_dt, end_dt, bf_done)) | ||||
| 
 | ||||
|         times = array['time'] | ||||
|         step_size_s = ( | ||||
|             pendulum.from_timestamp(times[-1]) - | ||||
|             pendulum.from_timestamp(times[-2]) | ||||
|         ).seconds | ||||
|         frame_step_s = (end_dt - start_dt).seconds | ||||
| 
 | ||||
|         if last_tsdb_dt is None: | ||||
|             # maybe a better default (they don't seem to define epoch?!) | ||||
|             last_tsdb_dt = pendulum.now().subtract(days=1) | ||||
| 
 | ||||
|             # based on the sample step size load a certain amount | ||||
|             # history | ||||
|             if step_size_s == 1: | ||||
|                 last_tsdb_dt = pendulum.now().subtract(weeks=2) | ||||
| 
 | ||||
|             elif step_size_s == 60: | ||||
|                 last_tsdb_dt = pendulum.now().subtract(years=2) | ||||
| 
 | ||||
|             else: | ||||
|                 raise ValueError( | ||||
|                     '`piker` only needs to support 1m and 1s sampling ' | ||||
|                     'but ur api is trying to deliver a longer ' | ||||
|                     f'timeframe of {step_size_s} ' 'seconds.. so ye, dun ' | ||||
|                     'do dat bruh.' | ||||
|                 ) | ||||
| 
 | ||||
|         hist_period = pendulum.period( | ||||
|             start_dt.subtract(seconds=step_size_s), | ||||
|             last_tsdb_dt, | ||||
|         ) | ||||
|         end_dts = list(hist_period.range('seconds', frame_step_s)) | ||||
| 
 | ||||
|         # pull new history frames until we hit latest | ||||
|         # already in the tsdb or a max count. | ||||
|         # mx_fills = 16 | ||||
|         count = 0 | ||||
|         # while True: | ||||
|         while ( | ||||
|             end_dt > last_tsdb_dt | ||||
|             # and count < mx_fills | ||||
|         ): | ||||
|         frames = {} | ||||
| 
 | ||||
|         async def get_ohlc_frame( | ||||
|             input_end_dt: datetime, | ||||
| 
 | ||||
|         ) -> np.ndarray: | ||||
| 
 | ||||
|             nonlocal count | ||||
|             count += 1 | ||||
|             try: | ||||
|                 array, start_dt, end_dt = await hist(end_dt=start_dt) | ||||
|                 array, start_dt, end_dt = await hist(end_dt=input_end_dt) | ||||
|                 # if input_end_dt.timestamp() == end_dts[0].timestamp(): | ||||
|                 #     await tractor.breakpoint() | ||||
| 
 | ||||
|             except NoData: | ||||
|                 # decrement by the diff in time last delivered. | ||||
|                 end_dt = start_dt.subtract(seconds=(end_dt - start_dt).seconds) | ||||
|                 continue | ||||
|                 log.warning('no data for range {(end_dt - start_dt)} ?!?') | ||||
|                 # continue | ||||
| 
 | ||||
|             except DataUnavailable: | ||||
|                 # broker is being a bish and we can't pull | ||||
|                 # any more.. | ||||
|                 break | ||||
|                 log.warning('backend halted on data deliver !?!?') | ||||
|                 # break | ||||
| 
 | ||||
|             to_push = diff_history( | ||||
|                 array, | ||||
|  | @ -302,18 +337,74 @@ async def start_backfill( | |||
|             print(f"PULLING {count}") | ||||
|             log.info(f'Pushing {to_push.size} to shm!') | ||||
| 
 | ||||
|             if to_push.size < 1: | ||||
|                 break | ||||
|             frames[input_end_dt.timestamp()] = (to_push, start_dt, end_dt) | ||||
| 
 | ||||
|             # bail on shm allocation overrun | ||||
|             try: | ||||
|                 shm.push(to_push, prepend=True) | ||||
|             except ValueError: | ||||
|                 await tractor.breakpoint() | ||||
|                 break | ||||
|             return to_push, start_dt, end_dt | ||||
| 
 | ||||
|             for delay_s in sampler.subscribers: | ||||
|                 await broadcast(delay_s) | ||||
|             # if to_push.size < 1: | ||||
|             #     print('UHHH SIZE <1 BREAKING!?') | ||||
|             #     break | ||||
| 
 | ||||
|         rate = erlangs = 5 | ||||
|         async with trimeter.amap( | ||||
| 
 | ||||
|             get_ohlc_frame, | ||||
|             end_dts, | ||||
| 
 | ||||
|             capture_outcome=True, | ||||
|             include_value=True, | ||||
|             max_at_once=erlangs, | ||||
|             max_per_second=rate, | ||||
| 
 | ||||
|         ) as outcomes: | ||||
| 
 | ||||
|             # Then iterate over the return values, as they become available | ||||
|             # (i.e., not necessarily in the original order) | ||||
|             async for input_end_dt, outcome in outcomes: | ||||
|                 try: | ||||
|                     out = outcome.unwrap() | ||||
|                 except Exception: | ||||
|                     log.exception('uhh trimeter bail') | ||||
|                     raise | ||||
|                 else: | ||||
|                     to_push, start_dt, end_dt = out | ||||
| 
 | ||||
|                 # pipeline-style pull frames until we need to wait for | ||||
|                 # the next in order to arrive. | ||||
|                 i = end_dts.index(input_end_dt) | ||||
|                 print(f'latest end_dt {end_dt} found at index {i}') | ||||
| 
 | ||||
|                 for epoch in reversed(sorted(frames)): | ||||
|                     start = shm.array['time'][0] | ||||
| 
 | ||||
|                     # we don't yet have the next frame to push | ||||
|                     # so break back to the async request loop. | ||||
|                     diff = epoch - start | ||||
|                     if abs(diff) > step_size_s: | ||||
|                         if len(frames) > 20: | ||||
|                             log.warning( | ||||
|                                 f'there appears to be a history gap of {diff}?' | ||||
|                             ) | ||||
|                             # from pprint import pprint | ||||
|                             # await tractor.breakpoint() | ||||
|                         else: | ||||
|                             break | ||||
| 
 | ||||
|                     to_push, start_dt, end_dt = frames.pop(epoch) | ||||
|                     print(f'pushing frame ending at {end_dt}') | ||||
| 
 | ||||
|                     if not len(to_push): | ||||
|                         break | ||||
| 
 | ||||
|                     # bail on shm allocation overrun | ||||
|                     try: | ||||
|                         shm.push(to_push, prepend=True) | ||||
|                     except ValueError: | ||||
|                         await tractor.breakpoint() | ||||
|                         break | ||||
| 
 | ||||
|                     for delay_s in sampler.subscribers: | ||||
|                         await broadcast(delay_s) | ||||
| 
 | ||||
|         bf_done.set() | ||||
|         # update start index to include all tsdb history | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue