Handle time-indexing for fill arrows
Call into a reworked `Flume.get_index()` for both the slow and fast chart and do time index clipping to last datum where necessary.pre_viz_calls
							parent
							
								
									7aaa782af0
								
							
						
					
					
						commit
						ecf7898de9
					
				| 
						 | 
					@ -48,13 +48,13 @@ from ._sharedmem import (
 | 
				
			||||||
from ._sampling import (
 | 
					from ._sampling import (
 | 
				
			||||||
    open_sample_stream,
 | 
					    open_sample_stream,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from .._profile import (
 | 
					# from .._profile import (
 | 
				
			||||||
    Profiler,
 | 
					#     Profiler,
 | 
				
			||||||
    pg_profile_enabled,
 | 
					#     pg_profile_enabled,
 | 
				
			||||||
)
 | 
					# )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if TYPE_CHECKING:
 | 
					if TYPE_CHECKING:
 | 
				
			||||||
    from pyqtgraph import PlotItem
 | 
					    # from pyqtgraph import PlotItem
 | 
				
			||||||
    from .feed import Feed
 | 
					    from .feed import Feed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -222,18 +222,18 @@ class Flume(Struct):
 | 
				
			||||||
    def get_index(
 | 
					    def get_index(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        time_s: float,
 | 
					        time_s: float,
 | 
				
			||||||
 | 
					        array: np.ndarray,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ) -> int:
 | 
					    ) -> int | float:
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        Return array shm-buffer index for for epoch time.
 | 
					        Return array shm-buffer index for for epoch time.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        array = self.rt_shm.array
 | 
					 | 
				
			||||||
        times = array['time']
 | 
					        times = array['time']
 | 
				
			||||||
        mask = (times >= time_s)
 | 
					        first = np.searchsorted(
 | 
				
			||||||
 | 
					            times,
 | 
				
			||||||
        if any(mask):
 | 
					            time_s,
 | 
				
			||||||
            return array['index'][mask][0]
 | 
					            side='left',
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
        # just the latest index
 | 
					        imx = times.shape[0] - 1
 | 
				
			||||||
        return array['index'][-1]
 | 
					        return min(first, imx)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -494,7 +494,7 @@ class OrderMode:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        uuid: str,
 | 
					        uuid: str,
 | 
				
			||||||
        price: float,
 | 
					        price: float,
 | 
				
			||||||
        arrow_index: float,
 | 
					        time_s: float,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        pointing: Optional[str] = None,
 | 
					        pointing: Optional[str] = None,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -513,18 +513,26 @@ class OrderMode:
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        dialog = self.dialogs[uuid]
 | 
					        dialog = self.dialogs[uuid]
 | 
				
			||||||
        lines = dialog.lines
 | 
					        lines = dialog.lines
 | 
				
			||||||
 | 
					        chart = self.chart
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # XXX: seems to fail on certain types of races?
 | 
					        # XXX: seems to fail on certain types of races?
 | 
				
			||||||
        # assert len(lines) == 2
 | 
					        # assert len(lines) == 2
 | 
				
			||||||
        if lines:
 | 
					        if lines:
 | 
				
			||||||
            flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn]
 | 
					            flume: Flume = self.feed.flumes[chart.linked.symbol.fqsn]
 | 
				
			||||||
            _, _, ratio = flume.get_ds_info()
 | 
					            _, _, ratio = flume.get_ds_info()
 | 
				
			||||||
            for i, chart in [
 | 
					
 | 
				
			||||||
                (arrow_index, self.chart),
 | 
					            for chart, shm in [
 | 
				
			||||||
                (flume.izero_hist
 | 
					                (self.chart, flume.rt_shm),
 | 
				
			||||||
                 +
 | 
					                (self.hist_chart, flume.hist_shm),
 | 
				
			||||||
                 round((arrow_index - flume.izero_rt)/ratio),
 | 
					 | 
				
			||||||
                 self.hist_chart)
 | 
					 | 
				
			||||||
            ]:
 | 
					            ]:
 | 
				
			||||||
 | 
					                viz = chart.get_viz(chart.name)
 | 
				
			||||||
 | 
					                index_field = viz.index_field
 | 
				
			||||||
 | 
					                arr = shm.array
 | 
				
			||||||
 | 
					                index = flume.get_index(time_s, arr)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if index_field == 'time':
 | 
				
			||||||
 | 
					                    i = arr['time'][index]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                self.arrows.add(
 | 
					                self.arrows.add(
 | 
				
			||||||
                    chart.plotItem,
 | 
					                    chart.plotItem,
 | 
				
			||||||
                    uuid,
 | 
					                    uuid,
 | 
				
			||||||
| 
						 | 
					@ -933,6 +941,8 @@ async def process_trade_msg(
 | 
				
			||||||
    fmsg = pformat(msg)
 | 
					    fmsg = pformat(msg)
 | 
				
			||||||
    log.debug(f'Received order msg:\n{fmsg}')
 | 
					    log.debug(f'Received order msg:\n{fmsg}')
 | 
				
			||||||
    name = msg['name']
 | 
					    name = msg['name']
 | 
				
			||||||
 | 
					    viz = mode.chart.get_viz(mode.chart.name)
 | 
				
			||||||
 | 
					    index_field = viz.index_field
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if name in (
 | 
					    if name in (
 | 
				
			||||||
        'position',
 | 
					        'position',
 | 
				
			||||||
| 
						 | 
					@ -1037,11 +1047,11 @@ async def process_trade_msg(
 | 
				
			||||||
            # should only be one "fill" for an alert
 | 
					            # should only be one "fill" for an alert
 | 
				
			||||||
            # add a triangle and remove the level line
 | 
					            # add a triangle and remove the level line
 | 
				
			||||||
            req = Order(**req)
 | 
					            req = Order(**req)
 | 
				
			||||||
            index = flume.get_index(time.time())
 | 
					            tm = time.time()
 | 
				
			||||||
            mode.on_fill(
 | 
					            mode.on_fill(
 | 
				
			||||||
                oid,
 | 
					                oid,
 | 
				
			||||||
                price=req.price,
 | 
					                price=req.price,
 | 
				
			||||||
                arrow_index=index,
 | 
					                time_s=tm,
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            mode.lines.remove_line(uuid=oid)
 | 
					            mode.lines.remove_line(uuid=oid)
 | 
				
			||||||
            msg.req = req
 | 
					            msg.req = req
 | 
				
			||||||
| 
						 | 
					@ -1070,6 +1080,8 @@ async def process_trade_msg(
 | 
				
			||||||
            details = msg.brokerd_msg
 | 
					            details = msg.brokerd_msg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # TODO: put the actual exchange timestamp?
 | 
					            # TODO: put the actual exchange timestamp?
 | 
				
			||||||
 | 
					            # TODO: some kinda progress system?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # NOTE: currently the ``kraken`` openOrders sub
 | 
					            # NOTE: currently the ``kraken`` openOrders sub
 | 
				
			||||||
            # doesn't deliver their engine timestamp as part of
 | 
					            # doesn't deliver their engine timestamp as part of
 | 
				
			||||||
            # it's schema, so this value is **not** from them
 | 
					            # it's schema, so this value is **not** from them
 | 
				
			||||||
| 
						 | 
					@ -1080,15 +1092,11 @@ async def process_trade_msg(
 | 
				
			||||||
            # a true backend one? This will require finagling
 | 
					            # a true backend one? This will require finagling
 | 
				
			||||||
            # with how each backend tracks/summarizes time
 | 
					            # with how each backend tracks/summarizes time
 | 
				
			||||||
            # stamps for the downstream API.
 | 
					            # stamps for the downstream API.
 | 
				
			||||||
            index = flume.get_index(
 | 
					            tm = details['broker_time']
 | 
				
			||||||
                details['broker_time']
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # TODO: some kinda progress system
 | 
					 | 
				
			||||||
            mode.on_fill(
 | 
					            mode.on_fill(
 | 
				
			||||||
                oid,
 | 
					                oid,
 | 
				
			||||||
                price=details['price'],
 | 
					                price=details['price'],
 | 
				
			||||||
                arrow_index=index,
 | 
					                time_s=tm,
 | 
				
			||||||
                pointing='up' if action == 'buy' else 'down',
 | 
					                pointing='up' if action == 'buy' else 'down',
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue