Poll for sampling info at startup, tolerate races
Use the new `Feed.get_ds_info()` method in a poll loop to definitively get the inter-chart sampling info and avoid races with shm buffer backfilling. Also, factor the history increment closure-task into `graphics_update_loop()` which will make it clearer how to factor all the "should we update" logic into some `DisplayState` API.history_view
parent
2ef6460853
commit
31735f26d3
|
@ -26,14 +26,15 @@ from functools import partial
|
||||||
import time
|
import time
|
||||||
from typing import Optional, Any, Callable
|
from typing import Optional, Any, Callable
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
import pendulum
|
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
|
|
||||||
# from .. import brokers
|
# from .. import brokers
|
||||||
from ..data.feed import open_feed
|
from ..data.feed import (
|
||||||
|
open_feed,
|
||||||
|
Feed,
|
||||||
|
)
|
||||||
from ._axes import YAxisLabel
|
from ._axes import YAxisLabel
|
||||||
from ._chart import (
|
from ._chart import (
|
||||||
ChartPlotWidget,
|
ChartPlotWidget,
|
||||||
|
@ -153,11 +154,9 @@ class DisplayState:
|
||||||
|
|
||||||
async def graphics_update_loop(
|
async def graphics_update_loop(
|
||||||
|
|
||||||
linked: LinkedSplits,
|
nurse: trio.Nursery,
|
||||||
stream: tractor.MsgStream,
|
godwidget: GodWidget,
|
||||||
ohlcv: np.ndarray,
|
feed: Feed,
|
||||||
hist_ohlcv: np.ndarray,
|
|
||||||
|
|
||||||
wap_in_history: bool = False,
|
wap_in_history: bool = False,
|
||||||
vlm_chart: Optional[ChartPlotWidget] = None,
|
vlm_chart: Optional[ChartPlotWidget] = None,
|
||||||
|
|
||||||
|
@ -178,10 +177,14 @@ async def graphics_update_loop(
|
||||||
# of copying it from last bar's close
|
# of copying it from last bar's close
|
||||||
# - 1-5 sec bar lookback-autocorrection like tws does?
|
# - 1-5 sec bar lookback-autocorrection like tws does?
|
||||||
# (would require a background history checker task)
|
# (would require a background history checker task)
|
||||||
godwidget = linked.godwidget
|
linked: LinkedSplits = godwidget.rt_linked
|
||||||
display_rate = godwidget.window.current_screen().refreshRate()
|
display_rate = godwidget.window.current_screen().refreshRate()
|
||||||
|
|
||||||
chart = linked.chart
|
chart = linked.chart
|
||||||
|
hist_chart = godwidget.hist_linked.chart
|
||||||
|
|
||||||
|
ohlcv = feed.rt_shm
|
||||||
|
hist_ohlcv = feed.hist_shm
|
||||||
|
|
||||||
# update last price sticky
|
# update last price sticky
|
||||||
last_price_sticky = chart._ysticks[chart.name]
|
last_price_sticky = chart._ysticks[chart.name]
|
||||||
|
@ -257,7 +260,44 @@ async def graphics_update_loop(
|
||||||
|
|
||||||
chart.default_view()
|
chart.default_view()
|
||||||
|
|
||||||
|
# TODO: probably factor this into some kinda `DisplayState`
|
||||||
|
# API that can be reused at least in terms of pulling view
|
||||||
|
# params (eg ``.bars_range()``).
|
||||||
|
async def increment_history_view():
|
||||||
|
i_last_append = i_last = hist_ohlcv.index
|
||||||
|
_, hist_step_size_s, _ = feed.get_ds_info()
|
||||||
|
|
||||||
|
async with feed.index_stream(
|
||||||
|
int(hist_step_size_s)
|
||||||
|
) as istream:
|
||||||
|
async for msg in istream:
|
||||||
|
|
||||||
|
# increment the view position by the sample offset.
|
||||||
|
uppx = hist_chart.view.x_uppx()
|
||||||
|
l, lbar, rbar, r = hist_chart.bars_range()
|
||||||
|
|
||||||
|
i_step = hist_ohlcv.index
|
||||||
|
i_diff = i_step - i_last
|
||||||
|
i_last = i_step
|
||||||
|
liv = r >= i_step
|
||||||
|
append_diff = i_step - i_last_append
|
||||||
|
do_append = (append_diff >= uppx)
|
||||||
|
|
||||||
|
if do_append:
|
||||||
|
i_last_append = i_step
|
||||||
|
|
||||||
|
if (
|
||||||
|
# i_diff > 0 # no new sample step
|
||||||
|
do_append
|
||||||
|
# and uppx < 4 # chart is zoomed out very far
|
||||||
|
and liv
|
||||||
|
):
|
||||||
|
hist_chart.increment_view(steps=i_diff)
|
||||||
|
|
||||||
|
nurse.start_soon(increment_history_view)
|
||||||
|
|
||||||
# main real-time quotes update loop
|
# main real-time quotes update loop
|
||||||
|
stream: tractor.MsgStream = feed.stream
|
||||||
async for quotes in stream:
|
async for quotes in stream:
|
||||||
|
|
||||||
ds.quotes = quotes
|
ds.quotes = quotes
|
||||||
|
@ -736,27 +776,13 @@ async def display_symbol_data(
|
||||||
ohlcv: ShmArray = feed.rt_shm
|
ohlcv: ShmArray = feed.rt_shm
|
||||||
hist_ohlcv: ShmArray = feed.hist_shm
|
hist_ohlcv: ShmArray = feed.hist_shm
|
||||||
|
|
||||||
times = hist_ohlcv.array['time']
|
|
||||||
end = pendulum.from_timestamp(times[-1])
|
|
||||||
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
|
||||||
hist_step_size_s = (end - start).seconds
|
|
||||||
|
|
||||||
times = ohlcv.array['time']
|
|
||||||
end = pendulum.from_timestamp(times[-1])
|
|
||||||
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
|
||||||
rt_step_size_s = (end - start).seconds
|
|
||||||
|
|
||||||
ratio = hist_step_size_s / rt_step_size_s
|
|
||||||
|
|
||||||
# this value needs to be pulled once and only once during
|
# this value needs to be pulled once and only once during
|
||||||
# startup
|
# startup
|
||||||
end_index = feed.startup_hist_index
|
end_index = feed.startup_hist_index
|
||||||
|
|
||||||
# bars = ohlcv.array
|
|
||||||
symbol = feed.symbols[sym]
|
symbol = feed.symbols[sym]
|
||||||
fqsn = symbol.front_fqsn()
|
fqsn = symbol.front_fqsn()
|
||||||
|
|
||||||
|
|
||||||
step_size_s = 1
|
step_size_s = 1
|
||||||
tf_key = tf_in_1s[step_size_s]
|
tf_key = tf_in_1s[step_size_s]
|
||||||
|
|
||||||
|
@ -767,8 +793,8 @@ async def display_symbol_data(
|
||||||
f'step:{tf_key} '
|
f'step:{tf_key} '
|
||||||
)
|
)
|
||||||
|
|
||||||
linked = godwidget.rt_linked
|
rt_linked = godwidget.rt_linked
|
||||||
linked._symbol = symbol
|
rt_linked._symbol = symbol
|
||||||
|
|
||||||
# generate order mode side-pane UI
|
# generate order mode side-pane UI
|
||||||
# A ``FieldsForm`` form to configure order entry
|
# A ``FieldsForm`` form to configure order entry
|
||||||
|
@ -793,7 +819,7 @@ async def display_symbol_data(
|
||||||
)
|
)
|
||||||
|
|
||||||
# create main OHLC chart
|
# create main OHLC chart
|
||||||
chart = linked.plot_ohlc_main(
|
chart = rt_linked.plot_ohlc_main(
|
||||||
symbol,
|
symbol,
|
||||||
ohlcv,
|
ohlcv,
|
||||||
# in the case of history chart we explicitly set `False`
|
# in the case of history chart we explicitly set `False`
|
||||||
|
@ -801,7 +827,6 @@ async def display_symbol_data(
|
||||||
sidepane=pp_pane,
|
sidepane=pp_pane,
|
||||||
)
|
)
|
||||||
|
|
||||||
# chart.default_view()
|
|
||||||
chart._feeds[symbol.key] = feed
|
chart._feeds[symbol.key] = feed
|
||||||
chart.setFocus()
|
chart.setFocus()
|
||||||
|
|
||||||
|
@ -837,6 +862,18 @@ async def display_symbol_data(
|
||||||
# a weird placement of the region on the way-far-left..
|
# a weird placement of the region on the way-far-left..
|
||||||
# region.setClipItem(flow.graphics)
|
# region.setClipItem(flow.graphics)
|
||||||
|
|
||||||
|
# poll for datums load and timestep detection
|
||||||
|
for _ in range(10):
|
||||||
|
try:
|
||||||
|
_, _, ratio = feed.get_ds_info()
|
||||||
|
break
|
||||||
|
except IndexError:
|
||||||
|
await trio.sleep(0.001)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
raise RuntimeError(
|
||||||
|
'Failed to detect sampling periods from shm!?')
|
||||||
|
|
||||||
def update_pi_from_region():
|
def update_pi_from_region():
|
||||||
region.setZValue(10)
|
region.setZValue(10)
|
||||||
mn, mx = region.getRegion()
|
mn, mx = region.getRegion()
|
||||||
|
@ -877,14 +914,14 @@ async def display_symbol_data(
|
||||||
# NOTE: we must immediately tell Qt to show the OHLC chart
|
# NOTE: we must immediately tell Qt to show the OHLC chart
|
||||||
# to avoid a race where the subplots get added/shown to
|
# to avoid a race where the subplots get added/shown to
|
||||||
# the linked set *before* the main price chart!
|
# the linked set *before* the main price chart!
|
||||||
linked.show()
|
rt_linked.show()
|
||||||
linked.focus()
|
rt_linked.focus()
|
||||||
await trio.sleep(0)
|
await trio.sleep(0)
|
||||||
|
|
||||||
linked.splitter.insertWidget(0, hist_linked)
|
rt_linked.splitter.insertWidget(0, hist_linked)
|
||||||
# XXX: if we wanted it at the bottom?
|
# XXX: if we wanted it at the bottom?
|
||||||
# linked.splitter.addWidget(hist_linked)
|
# rt_linked.splitter.addWidget(hist_linked)
|
||||||
linked.focus()
|
rt_linked.focus()
|
||||||
|
|
||||||
vlm_chart: Optional[ChartPlotWidget] = None
|
vlm_chart: Optional[ChartPlotWidget] = None
|
||||||
async with trio.open_nursery() as ln:
|
async with trio.open_nursery() as ln:
|
||||||
|
@ -896,7 +933,7 @@ async def display_symbol_data(
|
||||||
):
|
):
|
||||||
vlm_chart = await ln.start(
|
vlm_chart = await ln.start(
|
||||||
open_vlm_displays,
|
open_vlm_displays,
|
||||||
linked,
|
rt_linked,
|
||||||
ohlcv,
|
ohlcv,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -904,7 +941,7 @@ async def display_symbol_data(
|
||||||
# from an input config.
|
# from an input config.
|
||||||
ln.start_soon(
|
ln.start_soon(
|
||||||
start_fsp_displays,
|
start_fsp_displays,
|
||||||
linked,
|
rt_linked,
|
||||||
ohlcv,
|
ohlcv,
|
||||||
loading_sym_key,
|
loading_sym_key,
|
||||||
loglevel,
|
loglevel,
|
||||||
|
@ -913,46 +950,13 @@ async def display_symbol_data(
|
||||||
# start graphics update loop after receiving first live quote
|
# start graphics update loop after receiving first live quote
|
||||||
ln.start_soon(
|
ln.start_soon(
|
||||||
graphics_update_loop,
|
graphics_update_loop,
|
||||||
linked,
|
ln,
|
||||||
feed.stream,
|
godwidget,
|
||||||
ohlcv,
|
feed,
|
||||||
hist_ohlcv,
|
|
||||||
wap_in_history,
|
wap_in_history,
|
||||||
vlm_chart,
|
vlm_chart,
|
||||||
)
|
)
|
||||||
|
|
||||||
async def increment_history_view():
|
|
||||||
i_last_append = i_last = hist_ohlcv.index
|
|
||||||
|
|
||||||
async with feed.index_stream(
|
|
||||||
int(hist_step_size_s)
|
|
||||||
) as istream:
|
|
||||||
async for msg in istream:
|
|
||||||
|
|
||||||
# increment the view position by the sample offset.
|
|
||||||
uppx = hist_chart.view.x_uppx()
|
|
||||||
l, lbar, rbar, r = hist_chart.bars_range()
|
|
||||||
|
|
||||||
i_step = hist_ohlcv.index
|
|
||||||
i_diff = i_step - i_last
|
|
||||||
i_last = i_step
|
|
||||||
liv = r >= i_step
|
|
||||||
append_diff = i_step - i_last_append
|
|
||||||
do_append = (append_diff >= uppx)
|
|
||||||
|
|
||||||
if do_append:
|
|
||||||
i_last_append = i_step
|
|
||||||
|
|
||||||
if (
|
|
||||||
# i_diff > 0 # no new sample step
|
|
||||||
do_append
|
|
||||||
# and uppx < 4 # chart is zoomed out very far
|
|
||||||
and liv
|
|
||||||
):
|
|
||||||
hist_chart.increment_view(steps=i_diff)
|
|
||||||
|
|
||||||
ln.start_soon(increment_history_view)
|
|
||||||
|
|
||||||
await trio.sleep(0)
|
await trio.sleep(0)
|
||||||
|
|
||||||
# size view to data prior to order mode init
|
# size view to data prior to order mode init
|
||||||
|
@ -973,19 +977,19 @@ async def display_symbol_data(
|
||||||
# let Qt run to render all widgets and make sure the
|
# let Qt run to render all widgets and make sure the
|
||||||
# sidepanes line up vertically.
|
# sidepanes line up vertically.
|
||||||
await trio.sleep(0)
|
await trio.sleep(0)
|
||||||
linked.resize_sidepanes()
|
rt_linked.resize_sidepanes()
|
||||||
linked.set_split_sizes()
|
rt_linked.set_split_sizes()
|
||||||
|
|
||||||
# NOTE: we pop the volume chart from the subplots set so
|
# NOTE: we pop the volume chart from the subplots set so
|
||||||
# that it isn't double rendered in the display loop
|
# that it isn't double rendered in the display loop
|
||||||
# above since we do a maxmin calc on the volume data to
|
# above since we do a maxmin calc on the volume data to
|
||||||
# determine if auto-range adjustements should be made.
|
# determine if auto-range adjustements should be made.
|
||||||
# linked.subplots.pop('volume', None)
|
# rt_linked.subplots.pop('volume', None)
|
||||||
|
|
||||||
# TODO: make this not so shit XD
|
# TODO: make this not so shit XD
|
||||||
# close group status
|
# close group status
|
||||||
sbar._status_groups[loading_sym_key][1]()
|
sbar._status_groups[loading_sym_key][1]()
|
||||||
|
|
||||||
# let the app run.. bby
|
# let the app run.. bby
|
||||||
# linked.graphics_cycle()
|
# rt_linked.graphics_cycle()
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
Loading…
Reference in New Issue