Handle time-indexing for fill arrows

Call into a reworked `Flume.get_index()` for both the slow and fast
chart and do time index clipping to last datum where necessary.
epoch_index_backup
Tyler Goodlet 2022-12-09 14:17:36 -05:00
parent 95b9ae66b2
commit abf3b08328
2 changed files with 38 additions and 30 deletions

View File

@ -48,13 +48,13 @@ from ._sharedmem import (
from ._sampling import ( from ._sampling import (
iter_ohlc_periods, iter_ohlc_periods,
) )
from .._profile import ( # from .._profile import (
Profiler, # Profiler,
pg_profile_enabled, # pg_profile_enabled,
) # )
if TYPE_CHECKING: if TYPE_CHECKING:
from pyqtgraph import PlotItem # from pyqtgraph import PlotItem
from .feed import Feed from .feed import Feed
@ -235,18 +235,18 @@ class Flume(Struct):
def get_index( def get_index(
self, self,
time_s: float, time_s: float,
array: np.ndarray,
) -> int: ) -> int | float:
''' '''
Return array shm-buffer index for for epoch time. Return array shm-buffer index for for epoch time.
''' '''
array = self.rt_shm.array
times = array['time'] times = array['time']
mask = (times >= time_s) first = np.searchsorted(
times,
if any(mask): time_s,
return array['index'][mask][0] side='left',
)
# just the latest index imx = times.shape[0] - 1
return array['index'][-1] return min(first, imx)

View File

@ -494,7 +494,7 @@ class OrderMode:
uuid: str, uuid: str,
price: float, price: float,
arrow_index: float, time_s: float,
pointing: Optional[str] = None, pointing: Optional[str] = None,
@ -513,18 +513,26 @@ class OrderMode:
''' '''
dialog = self.dialogs[uuid] dialog = self.dialogs[uuid]
lines = dialog.lines lines = dialog.lines
chart = self.chart
# XXX: seems to fail on certain types of races? # XXX: seems to fail on certain types of races?
# assert len(lines) == 2 # assert len(lines) == 2
if lines: if lines:
flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn] flume: Flume = self.feed.flumes[chart.linked.symbol.fqsn]
_, _, ratio = flume.get_ds_info() _, _, ratio = flume.get_ds_info()
for i, chart in [
(arrow_index, self.chart), for chart, shm in [
(flume.izero_hist (self.chart, flume.rt_shm),
+ (self.hist_chart, flume.hist_shm),
round((arrow_index - flume.izero_rt)/ratio),
self.hist_chart)
]: ]:
viz = chart.get_viz(chart.name)
index_field = viz.index_field
arr = shm.array
index = flume.get_index(time_s, arr)
if index_field == 'time':
i = arr['time'][index]
self.arrows.add( self.arrows.add(
chart.plotItem, chart.plotItem,
uuid, uuid,
@ -933,6 +941,8 @@ async def process_trade_msg(
fmsg = pformat(msg) fmsg = pformat(msg)
log.debug(f'Received order msg:\n{fmsg}') log.debug(f'Received order msg:\n{fmsg}')
name = msg['name'] name = msg['name']
viz = mode.chart.get_viz(mode.chart.name)
index_field = viz.index_field
if name in ( if name in (
'position', 'position',
@ -1037,11 +1047,11 @@ async def process_trade_msg(
# should only be one "fill" for an alert # should only be one "fill" for an alert
# add a triangle and remove the level line # add a triangle and remove the level line
req = Order(**req) req = Order(**req)
index = flume.get_index(time.time()) tm = time.time()
mode.on_fill( mode.on_fill(
oid, oid,
price=req.price, price=req.price,
arrow_index=index, time_s=tm,
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
msg.req = req msg.req = req
@ -1070,6 +1080,8 @@ async def process_trade_msg(
details = msg.brokerd_msg details = msg.brokerd_msg
# TODO: put the actual exchange timestamp? # TODO: put the actual exchange timestamp?
# TODO: some kinda progress system?
# NOTE: currently the ``kraken`` openOrders sub # NOTE: currently the ``kraken`` openOrders sub
# doesn't deliver their engine timestamp as part of # doesn't deliver their engine timestamp as part of
# it's schema, so this value is **not** from them # it's schema, so this value is **not** from them
@ -1080,15 +1092,11 @@ async def process_trade_msg(
# a true backend one? This will require finagling # a true backend one? This will require finagling
# with how each backend tracks/summarizes time # with how each backend tracks/summarizes time
# stamps for the downstream API. # stamps for the downstream API.
index = flume.get_index( tm = details['broker_time']
details['broker_time']
)
# TODO: some kinda progress system
mode.on_fill( mode.on_fill(
oid, oid,
price=details['price'], price=details['price'],
arrow_index=index, time_s=tm,
pointing='up' if action == 'buy' else 'down', pointing='up' if action == 'buy' else 'down',
) )