Handle time-indexing for fill arrows
Call into a reworked `Flume.get_index()` for both the slow and fast chart and do time index clipping to last datum where necessary.epoch_indexing_and_dataviz_layer
							parent
							
								
									9fcc6f9c44
								
							
						
					
					
						commit
						3bed142d15
					
				|  | @ -48,13 +48,13 @@ from ._sharedmem import ( | |||
| from ._sampling import ( | ||||
|     open_sample_stream, | ||||
| ) | ||||
| from .._profile import ( | ||||
|     Profiler, | ||||
|     pg_profile_enabled, | ||||
| ) | ||||
| # from .._profile import ( | ||||
| #     Profiler, | ||||
| #     pg_profile_enabled, | ||||
| # ) | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from pyqtgraph import PlotItem | ||||
|     # from pyqtgraph import PlotItem | ||||
|     from .feed import Feed | ||||
| 
 | ||||
| 
 | ||||
|  | @ -222,18 +222,18 @@ class Flume(Struct): | |||
|     def get_index( | ||||
|         self, | ||||
|         time_s: float, | ||||
|         array: np.ndarray, | ||||
| 
 | ||||
|     ) -> int: | ||||
|     ) -> int | float: | ||||
|         ''' | ||||
|         Return array shm-buffer index for for epoch time. | ||||
| 
 | ||||
|         ''' | ||||
|         array = self.rt_shm.array | ||||
|         times = array['time'] | ||||
|         mask = (times >= time_s) | ||||
| 
 | ||||
|         if any(mask): | ||||
|             return array['index'][mask][0] | ||||
| 
 | ||||
|         # just the latest index | ||||
|         return array['index'][-1] | ||||
|         first = np.searchsorted( | ||||
|             times, | ||||
|             time_s, | ||||
|             side='left', | ||||
|         ) | ||||
|         imx = times.shape[0] - 1 | ||||
|         return min(first, imx) | ||||
|  |  | |||
|  | @ -494,7 +494,7 @@ class OrderMode: | |||
| 
 | ||||
|         uuid: str, | ||||
|         price: float, | ||||
|         arrow_index: float, | ||||
|         time_s: float, | ||||
| 
 | ||||
|         pointing: Optional[str] = None, | ||||
| 
 | ||||
|  | @ -513,18 +513,26 @@ class OrderMode: | |||
|         ''' | ||||
|         dialog = self.dialogs[uuid] | ||||
|         lines = dialog.lines | ||||
|         chart = self.chart | ||||
| 
 | ||||
|         # XXX: seems to fail on certain types of races? | ||||
|         # assert len(lines) == 2 | ||||
|         if lines: | ||||
|             flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn] | ||||
|             flume: Flume = self.feed.flumes[chart.linked.symbol.fqsn] | ||||
|             _, _, ratio = flume.get_ds_info() | ||||
|             for i, chart in [ | ||||
|                 (arrow_index, self.chart), | ||||
|                 (flume.izero_hist | ||||
|                  + | ||||
|                  round((arrow_index - flume.izero_rt)/ratio), | ||||
|                  self.hist_chart) | ||||
| 
 | ||||
|             for chart, shm in [ | ||||
|                 (self.chart, flume.rt_shm), | ||||
|                 (self.hist_chart, flume.hist_shm), | ||||
|             ]: | ||||
|                 viz = chart.get_viz(chart.name) | ||||
|                 index_field = viz.index_field | ||||
|                 arr = shm.array | ||||
|                 index = flume.get_index(time_s, arr) | ||||
| 
 | ||||
|                 if index_field == 'time': | ||||
|                     i = arr['time'][index] | ||||
| 
 | ||||
|                 self.arrows.add( | ||||
|                     chart.plotItem, | ||||
|                     uuid, | ||||
|  | @ -933,6 +941,8 @@ async def process_trade_msg( | |||
|     fmsg = pformat(msg) | ||||
|     log.debug(f'Received order msg:\n{fmsg}') | ||||
|     name = msg['name'] | ||||
|     viz = mode.chart.get_viz(mode.chart.name) | ||||
|     index_field = viz.index_field | ||||
| 
 | ||||
|     if name in ( | ||||
|         'position', | ||||
|  | @ -1037,11 +1047,11 @@ async def process_trade_msg( | |||
|             # should only be one "fill" for an alert | ||||
|             # add a triangle and remove the level line | ||||
|             req = Order(**req) | ||||
|             index = flume.get_index(time.time()) | ||||
|             tm = time.time() | ||||
|             mode.on_fill( | ||||
|                 oid, | ||||
|                 price=req.price, | ||||
|                 arrow_index=index, | ||||
|                 time_s=tm, | ||||
|             ) | ||||
|             mode.lines.remove_line(uuid=oid) | ||||
|             msg.req = req | ||||
|  | @ -1070,6 +1080,8 @@ async def process_trade_msg( | |||
|             details = msg.brokerd_msg | ||||
| 
 | ||||
|             # TODO: put the actual exchange timestamp? | ||||
|             # TODO: some kinda progress system? | ||||
| 
 | ||||
|             # NOTE: currently the ``kraken`` openOrders sub | ||||
|             # doesn't deliver their engine timestamp as part of | ||||
|             # it's schema, so this value is **not** from them | ||||
|  | @ -1080,15 +1092,11 @@ async def process_trade_msg( | |||
|             # a true backend one? This will require finagling | ||||
|             # with how each backend tracks/summarizes time | ||||
|             # stamps for the downstream API. | ||||
|             index = flume.get_index( | ||||
|                 details['broker_time'] | ||||
|             ) | ||||
| 
 | ||||
|             # TODO: some kinda progress system | ||||
|             tm = details['broker_time'] | ||||
|             mode.on_fill( | ||||
|                 oid, | ||||
|                 price=details['price'], | ||||
|                 arrow_index=index, | ||||
|                 time_s=tm, | ||||
|                 pointing='up' if action == 'buy' else 'down', | ||||
|             ) | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue