diff --git a/piker/data/flows.py b/piker/data/flows.py index f1b8eabf..01ed7851 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -48,13 +48,13 @@ from ._sharedmem import ( from ._sampling import ( open_sample_stream, ) -from .._profile import ( - Profiler, - pg_profile_enabled, -) +# from .._profile import ( +# Profiler, +# pg_profile_enabled, +# ) if TYPE_CHECKING: - from pyqtgraph import PlotItem + # from pyqtgraph import PlotItem from .feed import Feed @@ -222,18 +222,18 @@ class Flume(Struct): def get_index( self, time_s: float, + array: np.ndarray, - ) -> int: + ) -> int | float: ''' Return array shm-buffer index for for epoch time. ''' - array = self.rt_shm.array times = array['time'] - mask = (times >= time_s) - - if any(mask): - return array['index'][mask][0] - - # just the latest index - return array['index'][-1] + first = np.searchsorted( + times, + time_s, + side='left', + ) + imx = times.shape[0] - 1 + return min(first, imx) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 1dd49872..ee0196f7 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -494,7 +494,7 @@ class OrderMode: uuid: str, price: float, - arrow_index: float, + time_s: float, pointing: Optional[str] = None, @@ -513,18 +513,26 @@ class OrderMode: ''' dialog = self.dialogs[uuid] lines = dialog.lines + chart = self.chart + # XXX: seems to fail on certain types of races? # assert len(lines) == 2 if lines: - flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn] + flume: Flume = self.feed.flumes[chart.linked.symbol.fqsn] _, _, ratio = flume.get_ds_info() - for i, chart in [ - (arrow_index, self.chart), - (flume.izero_hist - + - round((arrow_index - flume.izero_rt)/ratio), - self.hist_chart) + + for chart, shm in [ + (self.chart, flume.rt_shm), + (self.hist_chart, flume.hist_shm), ]: + viz = chart.get_viz(chart.name) + index_field = viz.index_field + arr = shm.array + index = flume.get_index(time_s, arr) + + if index_field == 'time': + i = arr['time'][index] + self.arrows.add( chart.plotItem, uuid, @@ -933,6 +941,8 @@ async def process_trade_msg( fmsg = pformat(msg) log.debug(f'Received order msg:\n{fmsg}') name = msg['name'] + viz = mode.chart.get_viz(mode.chart.name) + index_field = viz.index_field if name in ( 'position', @@ -1037,11 +1047,11 @@ async def process_trade_msg( # should only be one "fill" for an alert # add a triangle and remove the level line req = Order(**req) - index = flume.get_index(time.time()) + tm = time.time() mode.on_fill( oid, price=req.price, - arrow_index=index, + time_s=tm, ) mode.lines.remove_line(uuid=oid) msg.req = req @@ -1070,6 +1080,8 @@ async def process_trade_msg( details = msg.brokerd_msg # TODO: put the actual exchange timestamp? + # TODO: some kinda progress system? + # NOTE: currently the ``kraken`` openOrders sub # doesn't deliver their engine timestamp as part of # it's schema, so this value is **not** from them @@ -1080,15 +1092,11 @@ async def process_trade_msg( # a true backend one? This will require finagling # with how each backend tracks/summarizes time # stamps for the downstream API. - index = flume.get_index( - details['broker_time'] - ) - - # TODO: some kinda progress system + tm = details['broker_time'] mode.on_fill( oid, price=details['price'], - arrow_index=index, + time_s=tm, pointing='up' if action == 'buy' else 'down', )