Only handle hist discrepancies when market is open
We obviously don't want to be debugging a sample-index issue if/when the market for the asset is closed (since we'll be guaranteed to have a mismatch, lul). Pass in the `feed_is_live: trio.Event` throughout the backfilling routines to allow first checking for the live feed being active so as to avoid breakpointing on false +ves. Also, add a detailed warning log message for when *actually* investigating a mismatch.overlays_interaction_latency_tuning
							parent
							
								
									0fc06a98d4
								
							
						
					
					
						commit
						92ce1b3304
					
				|  | @ -207,7 +207,7 @@ def get_feed_bus( | |||
| 
 | ||||
| ) -> _FeedsBus: | ||||
|     ''' | ||||
|     Retreive broker-daemon-local data feeds bus from process global | ||||
|     Retrieve broker-daemon-local data feeds bus from process global | ||||
|     scope. Serialize task access to lock. | ||||
| 
 | ||||
|     ''' | ||||
|  | @ -250,6 +250,7 @@ async def start_backfill( | |||
|     shm: ShmArray, | ||||
|     timeframe: float, | ||||
|     sampler_stream: tractor.MsgStream, | ||||
|     feed_is_live: trio.Event, | ||||
| 
 | ||||
|     last_tsdb_dt: Optional[datetime] = None, | ||||
|     storage: Optional[Storage] = None, | ||||
|  | @ -281,7 +282,14 @@ async def start_backfill( | |||
|             - pendulum.from_timestamp(times[-2]) | ||||
|         ).seconds | ||||
| 
 | ||||
|         if step_size_s == 60: | ||||
|         # if the market is open (aka we have a live feed) but the | ||||
|         # history sample step index seems off we report the surrounding | ||||
|         # data and drop into a bp. this case shouldn't really ever | ||||
|         # happen if we're doing history retrieval correctly. | ||||
|         if ( | ||||
|             step_size_s == 60 | ||||
|             and feed_is_live.is_set() | ||||
|         ): | ||||
|             inow = round(time.time()) | ||||
|             diff = inow - times[-1] | ||||
|             if abs(diff) > 60: | ||||
|  | @ -499,6 +507,7 @@ async def basic_backfill( | |||
|     bfqsn: str, | ||||
|     shms: dict[int, ShmArray], | ||||
|     sampler_stream: tractor.MsgStream, | ||||
|     feed_is_live: trio.Event, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|  | @ -518,6 +527,7 @@ async def basic_backfill( | |||
|                     shm, | ||||
|                     timeframe, | ||||
|                     sampler_stream, | ||||
|                     feed_is_live, | ||||
|                 ) | ||||
|             ) | ||||
|         except DataUnavailable: | ||||
|  | @ -534,6 +544,7 @@ async def tsdb_backfill( | |||
|     bfqsn: str, | ||||
|     shms: dict[int, ShmArray], | ||||
|     sampler_stream: tractor.MsgStream, | ||||
|     feed_is_live: trio.Event, | ||||
| 
 | ||||
|     task_status: TaskStatus[ | ||||
|         tuple[ShmArray, ShmArray] | ||||
|  | @ -568,6 +579,8 @@ async def tsdb_backfill( | |||
|                     shm, | ||||
|                     timeframe, | ||||
|                     sampler_stream, | ||||
|                     feed_is_live, | ||||
| 
 | ||||
|                     last_tsdb_dt=last_tsdb_dt, | ||||
|                     tsdb_is_up=True, | ||||
|                     storage=storage, | ||||
|  | @ -870,6 +883,7 @@ async def manage_history( | |||
|                         60: hist_shm, | ||||
|                     }, | ||||
|                     sample_stream, | ||||
|                     feed_is_live, | ||||
|                 ) | ||||
| 
 | ||||
|                 # yield back after client connect with filled shm | ||||
|  | @ -904,6 +918,7 @@ async def manage_history( | |||
|                     60: hist_shm, | ||||
|                 }, | ||||
|                 sample_stream, | ||||
|                 feed_is_live, | ||||
|             ) | ||||
|             task_status.started(( | ||||
|                 hist_zero_index, | ||||
|  | @ -1065,7 +1080,10 @@ async def allocate_persistent_feed( | |||
|     # seed the buffer with a history datum - this is most handy | ||||
|     # for many backends which don't sample @ 1s OHLC but do have | ||||
|     # slower data such as 1m OHLC. | ||||
|     if not len(rt_shm.array): | ||||
|     if ( | ||||
|         not len(rt_shm.array) | ||||
|         and hist_shm.array.size | ||||
|     ): | ||||
|         rt_shm.push(hist_shm.array[-3:-1]) | ||||
|         ohlckeys = ['open', 'high', 'low', 'close'] | ||||
|         rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] | ||||
|  | @ -1076,6 +1094,9 @@ async def allocate_persistent_feed( | |||
|         rt_shm.array['time'][0] = ts | ||||
|         rt_shm.array['time'][1] = ts + 1 | ||||
| 
 | ||||
|     elif hist_shm.array.size == 0: | ||||
|         await tractor.breakpoint() | ||||
| 
 | ||||
|     # wait the spawning parent task to register its subscriber | ||||
|     # send-stream entry before we start the sample loop. | ||||
|     await sub_registered.wait() | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue