Merge pull request #250 from pikers/single_display_update_loop

Single display update loop
misc_ib_updates
goodboy 2022-01-23 15:56:11 -05:00 committed by GitHub
commit 4680d68824
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 461 additions and 270 deletions

View File

@ -34,7 +34,6 @@ import trio
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
# from tractor import _broadcast
from pydantic import BaseModel from pydantic import BaseModel
from ..brokers import get_brokermod from ..brokers import get_brokermod
@ -252,7 +251,7 @@ async def allocate_persistent_feed(
@tractor.context @tractor.context
async def attach_feed_bus( async def open_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
@ -512,7 +511,7 @@ async def open_feed(
portal.open_context( portal.open_context(
attach_feed_bus, open_feed_bus,
brokername=brokername, brokername=brokername,
symbol=sym, symbol=sym,
loglevel=loglevel, loglevel=loglevel,

View File

@ -90,7 +90,7 @@ async def fsp_compute(
func_name: str, func_name: str,
func: Callable, func: Callable,
attach_stream: bool = True, attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:

View File

@ -85,11 +85,11 @@ async def _async_main(
screen = godwidget.window.current_screen() screen = godwidget.window.current_screen()
# configure graphics update throttling based on display refresh rate # configure graphics update throttling based on display refresh rate
_display._clear_throttle_rate = min( _display._quote_throttle_rate = min(
round(screen.refreshRate()), round(screen.refreshRate()),
_display._clear_throttle_rate, _display._quote_throttle_rate,
) )
log.info(f'Set graphics update rate to {_display._clear_throttle_rate} Hz') log.info(f'Set graphics update rate to {_display._quote_throttle_rate} Hz')
# TODO: do styling / themeing setup # TODO: do styling / themeing setup
# _style.style_ze_sheets(godwidget) # _style.style_ze_sheets(godwidget)

View File

@ -15,11 +15,14 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Real-time display tasks for charting / graphics. real-time display tasks for charting graphics update.
this module ties together quote and computational (fsp) streams with
graphics update methods via our custom ``pyqtgraph`` charting api.
''' '''
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
# from pprint import pformat from functools import partial
import time import time
from types import ModuleType from types import ModuleType
from typing import Optional from typing import Optional
@ -53,13 +56,136 @@ from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
_clear_throttle_rate: int = 58 # Hz # TODO: load this from a config.toml!
_book_throttle_rate: int = 16 # Hz _quote_throttle_rate: int = 58 # Hz
def update_fsp_chart(
chart: ChartPlotWidget,
shm: ShmArray,
graphics_name: str,
array_key: Optional[str],
) -> None:
array = shm.array
# update graphics
# NOTE: this does a length check internally which allows it
# staying above the last row check below..
chart.update_curve_from_array(
graphics_name,
array,
array_key=array_key or graphics_name,
)
try:
last_row = array[-1]
except IndexError:
# XXX: race condition with backfilling shm.
#
# the underlying issue is that a backfill (aka prepend) and subsequent
# shm array first/last index update could result in an empty array
# read here since the indices may be updated in such a way that
# a read delivers an empty array (though it seems like we
# *should* be able to prevent that?). also, as and alt and
# something we need anyway, maybe there should be some kind of
# signal that a prepend is taking place and this consumer can
# respond (eg. redrawing graphics) accordingly.
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
return
# TODO: provide a read sync mechanism to avoid this polling. the
# underlying issue is that a backfill (aka prepend) and subsequent
# shm array first/last index update could result in an empty array
# read here since the stream is never torn down on the re-compute
# steps.
# read_tries = 2
# while read_tries > 0:
# try:
# # read last
# array = shm.array
# value = array[-1][array_key]
# break
# except IndexError:
# read_tries -= 1
# continue
# XXX: re: ``array_key``: fsp func names must be unique meaning we
# can't have duplicates of the underlying data even if multiple
# sub-charts reference it under different 'named charts'.
# read from last calculated value and update any label
last_val_sticky = chart._ysticks.get(graphics_name)
if last_val_sticky:
# array = shm.array[array_key]
# if len(array):
# value = array[-1]
last = last_row[array_key]
last_val_sticky.update_from_data(-1, last)
# _clses = {
# 'clears': {'trade', 'utrade', 'last'},
# 'last': {'last'},
# 'bids': {'bid', 'bsize'},
# 'asks': {'ask', 'asize'},
# }
# XXX: idea for frame type data structure we could use on the
# wire instead of doing it here?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n', 'type_c'],
# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }
def chart_maxmin(
chart: ChartPlotWidget,
vlm_chart: Optional[ChartPlotWidget] = None,
) -> tuple[
tuple[int, int, int, int],
float,
float,
float,
]:
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
array = chart._arrays['ohlc']
ifirst = array[0]['index']
last_bars_range = chart.bars_range()
l, lbar, rbar, r = last_bars_range
in_view = array[lbar - ifirst:rbar - ifirst + 1]
assert in_view.size
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
# TODO: when we start using line charts, probably want to make
# this an overloaded call on our `DataView
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
mx_vlm_in_view = 0
if vlm_chart:
mx_vlm_in_view = np.max(in_view['volume'])
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
async def update_chart_from_quotes( async def update_chart_from_quotes(
chart: ChartPlotWidget, linked: LinkedSplits,
stream: tractor.MsgStream, stream: tractor.MsgStream,
ohlcv: np.ndarray, ohlcv: np.ndarray,
@ -82,6 +208,8 @@ async def update_chart_from_quotes(
# - 1-5 sec bar lookback-autocorrection like tws does? # - 1-5 sec bar lookback-autocorrection like tws does?
# (would require a background history checker task) # (would require a background history checker task)
chart = linked.chart
# update last price sticky # update last price sticky
last_price_sticky = chart._ysticks[chart.name] last_price_sticky = chart._ysticks[chart.name]
last_price_sticky.update_from_data( last_price_sticky.update_from_data(
@ -91,32 +219,7 @@ async def update_chart_from_quotes(
if vlm_chart: if vlm_chart:
vlm_sticky = vlm_chart._ysticks['volume'] vlm_sticky = vlm_chart._ysticks['volume']
def maxmin(): maxmin = partial(chart_maxmin, chart, vlm_chart)
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
array = chart._arrays['ohlc']
ifirst = array[0]['index']
last_bars_range = chart.bars_range()
l, lbar, rbar, r = last_bars_range
in_view = array[lbar - ifirst:rbar - ifirst + 1]
assert in_view.size
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
# TODO: when we start using line charts, probably want to make
# this an overloaded call on our `DataView
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
mx_vlm_in_view = 0
if vlm_chart:
mx_vlm_in_view = np.max(in_view['volume'])
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
chart.default_view() chart.default_view()
@ -150,21 +253,27 @@ async def update_chart_from_quotes(
tick_size = chart.linked.symbol.tick_size tick_size = chart.linked.symbol.tick_size
tick_margin = 3 * tick_size tick_margin = 3 * tick_size
last_ask = last_bid = last_clear = time.time()
chart.show() chart.show()
last_quote = time.time()
# NOTE: all code below this loop is expected to be synchronous
# and thus draw instructions are not picked up jntil the next
# wait / iteration.
async for quotes in stream: async for quotes in stream:
# chart isn't actively shown so just skip render cycle now = time.time()
quote_period = now - last_quote
if quote_period <= 1/_quote_throttle_rate:
log.warning(f'TOO FAST: {1/quote_period}')
last_quote = now
# chart isn't active/shown so skip render cycle and pause feed(s)
if chart.linked.isHidden(): if chart.linked.isHidden():
await chart.pause_all_feeds() await chart.pause_all_feeds()
continue continue
for sym, quote in quotes.items(): for sym, quote in quotes.items():
now = time.time()
# brange, mx_in_view, mn_in_view = maxmin()
( (
brange, brange,
mx_in_view, mx_in_view,
@ -184,6 +293,13 @@ async def update_chart_from_quotes(
if vlm_chart: if vlm_chart:
# print(f"volume: {end['volume']}") # print(f"volume: {end['volume']}")
vlm_chart.update_curve_from_array('volume', array) vlm_chart.update_curve_from_array('volume', array)
# built-in tina $vlm FSP using chl3 typical price for ohlc step
# last = array[-1]
# chl3 = (last['close'] + last['high'] + last['low']) / 3
# v = last['volume']
# dv = last['volume'] * chl3
vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
if ( if (
@ -194,30 +310,75 @@ async def update_chart_from_quotes(
vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375)) vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375))
last_mx_vlm = mx_vlm_in_view last_mx_vlm = mx_vlm_in_view
for tick in quote.get('ticks', ()): ticks_frame = quote.get('ticks', ())
# log.info( frames_by_type: dict[str, dict] = {}
# f"quotes: {pformat(quote['symbol'])}: {pformat(tick)}") lasts = {}
ticktype = tick.get('type')
# build tick-type "frames" of tick sequences since
# likely the tick arrival rate is higher then our
# (throttled) quote stream rate.
for tick in ticks_frame:
price = tick.get('price') price = tick.get('price')
size = tick.get('size') ticktype = tick.get('type')
if ticktype == 'n/a' or price == -1: if ticktype == 'n/a' or price == -1:
# okkk.. # okkk..
continue continue
# clearing price event # keys are entered in olded-event-inserted-first order
if ticktype in ('trade', 'utrade', 'last'): # since we iterate ``ticks_frame`` in standard order
# above. in other words the order of the keys is the order
# of tick events by type from the provider feed.
frames_by_type.setdefault(ticktype, []).append(tick)
# throttle clearing price updates to ~ max 60 FPS # overwrites so the last tick per type is the entry
period = now - last_clear lasts[ticktype] = tick
if period <= 1/_clear_throttle_rate:
# faster then display refresh rate
continue
# print(f'passthrough {tick}\n{1/(now-last_clear)}') # from pprint import pformat
# set time of last graphics update # frame_counts = {
last_clear = now # typ: len(frame) for typ, frame in frames_by_type.items()
# }
# print(f'{pformat(frame_counts)}')
# print(f'framed: {pformat(frames_by_type)}')
# print(f'lasts: {pformat(lasts)}')
# TODO: eventually we want to separate out the utrade (aka
# dark vlm prices) here and show them as an additional
# graphic.
clear_types = {'trade', 'utrade', 'last'}
# XXX: if we wanted to iterate in "latest" (i.e. most
# current) tick first order as an optimization where we only
# update from the last tick from each type class.
# last_clear_updated: bool = False
# for typ, tick in reversed(lasts.items()):
# iterate in FIFO order per frame
for typ, tick in lasts.items():
price = tick.get('price')
size = tick.get('size')
# compute max and min prices (including bid/ask) from
# tick frames to determine the y-range for chart
# auto-scaling.
# TODO: we need a streaming minmax algo here, see def above.
mx = max(price + tick_margin, mx)
mn = min(price - tick_margin, mn)
if typ in clear_types:
# XXX: if we only wanted to update graphics from the
# "current"/"latest received" clearing price tick
# once (see alt iteration order above).
# if last_clear_updated:
# continue
# last_clear_updated = True
# we only want to update grahpics from the *last*
# tick event that falls under the "clearing price"
# set.
# update price sticky(s) # update price sticky(s)
end = array[-1] end = array[-1]
@ -235,33 +396,11 @@ async def update_chart_from_quotes(
# update vwap overlay line # update vwap overlay line
chart.update_curve_from_array('bar_wap', ohlcv.array) chart.update_curve_from_array('bar_wap', ohlcv.array)
# l1 book events # L1 book label-line updates
# throttle the book graphics updates at a lower rate # XXX: is this correct for ib?
# since they aren't as critical for a manual user
# viewing the chart
elif ticktype in ('ask', 'asize'):
if (now - last_ask) <= 1/_book_throttle_rate:
# print(f'skipping\n{tick}')
continue
# print(f'passthrough {tick}\n{1/(now-last_ask)}')
last_ask = now
elif ticktype in ('bid', 'bsize'):
if (now - last_bid) <= 1/_book_throttle_rate:
continue
# print(f'passthrough {tick}\n{1/(now-last_bid)}')
last_bid = now
# compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see
# def above.
# XXX: prettty sure this is correct?
# if ticktype in ('trade', 'last'): # if ticktype in ('trade', 'last'):
if ticktype in ('last',): # 'size'): # if ticktype in ('last',): # 'size'):
if typ in ('last',): # 'size'):
label = { label = {
l1.ask_label.fields['level']: l1.ask_label, l1.ask_label.fields['level']: l1.ask_label,
@ -271,119 +410,90 @@ async def update_chart_from_quotes(
if label is not None: if label is not None:
label.update_fields({'level': price, 'size': size}) label.update_fields({'level': price, 'size': size})
# on trades should we be knocking down # TODO: on trades should we be knocking down
# the relevant L1 queue? # the relevant L1 queue?
# label.size -= size # label.size -= size
elif ticktype in ('ask', 'asize'): # elif ticktype in ('ask', 'asize'):
elif typ in ('ask', 'asize'):
l1.ask_label.update_fields({'level': price, 'size': size}) l1.ask_label.update_fields({'level': price, 'size': size})
elif ticktype in ('bid', 'bsize'): # elif ticktype in ('bid', 'bsize'):
elif typ in ('bid', 'bsize'):
l1.bid_label.update_fields({'level': price, 'size': size}) l1.bid_label.update_fields({'level': price, 'size': size})
# in view y-range checking for auto-scale # check for y-range re-size
# update the max/min price in view to keep bid/ask on screen if (mx > last_mx) or (mn < last_mn):
mx = max(price + tick_margin, mx) # print(f'new y range: {(mn, mx)}')
mn = min(price - tick_margin, mn) chart._set_yrange(
if (mx > last_mx) or ( yrange=(mn, mx),
mn < last_mn # TODO: we should probably scale
): # the view margin based on the size
# print(f'new y range: {(mn, mx)}') # of the true range? This way you can
chart._set_yrange( # slap in orders outside the current
yrange=(mn, mx), # L1 (only) book range.
# TODO: we should probably scale # range_margin=0.1,
# the view margin based on the size )
# of the true range? This way you can
# slap in orders outside the current
# L1 (only) book range.
# range_margin=0.1,
)
last_mx, last_mn = mx, mn last_mx, last_mn = mx, mn
# run synchronous update on all derived fsp subplots
for name, subchart in linked.subplots.items():
update_fsp_chart(
subchart,
subchart._shm,
# XXX: do we really needs seperate names here?
name,
array_key=name,
)
# TODO: all overlays on all subplots..
# run synchronous update on all derived overlays
for curve_name, shm in chart._overlays.items():
update_fsp_chart(
chart,
shm,
curve_name,
array_key=curve_name,
)
async def fan_out_spawn_fsp_daemons( def maybe_mk_fsp_shm(
linkedsplits: LinkedSplits,
fsps: dict[str, str],
sym: str, sym: str,
src_shm: list, field_name: str,
brokermod: ModuleType, display_name: Optional[str] = None,
group_status_key: str, readonly: bool = True,
loglevel: str,
) -> None: ) -> (ShmArray, bool):
'''Create financial signal processing sub-actors (under flat tree) '''Allocate a single row shm array for an symbol-fsp pair if none
for each entry in config and attach to local graphics update tasks. exists, otherwise load the shm already existing for that token.
Pass target entrypoint and historical data.
''' '''
linkedsplits.focus()
uid = tractor.current_actor().uid uid = tractor.current_actor().uid
if not display_name:
display_name = field_name
# spawns sub-processes which execute cpu bound FSP code # TODO: load function here and introspect
async with ( # return stream type(s)
tractor.open_nursery() as n,
trio.open_nursery() as ln,
):
# Currently we spawn an actor per fsp chain but # TODO: should `index` be a required internal field?
# likely we'll want to pool them eventually to fsp_dtype = np.dtype([('index', int), (field_name, float)])
# scale horizonatlly once cores are used up.
for display_name, conf in fsps.items():
fsp_func_name = conf['fsp_func_name'] key = f'{sym}.fsp.{display_name}.{".".join(uid)}'
# TODO: load function here and introspect shm, opened = maybe_open_shm_array(
# return stream type(s) key,
# TODO: create entry for each time frame
# TODO: should `index` be a required internal field? dtype=fsp_dtype,
fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) readonly=True,
)
key = f'{sym}.fsp.{display_name}.{".".join(uid)}' return shm, opened
# this is all sync currently
shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
)
# XXX: fsp may have been opened by a duplicate chart.
# Error for now until we figure out how to wrap fsps as
# "feeds". assert opened, f"A chart for {key} likely
# already exists?"
conf['shm'] = shm
portal = await n.start_actor(
enable_modules=['piker.fsp._engine'],
name='fsp.' + display_name,
)
# init async
ln.start_soon(
run_fsp,
portal,
linkedsplits,
brokermod,
sym,
src_shm,
fsp_func_name,
display_name,
conf,
group_status_key,
loglevel,
)
# blocks here until all fsp actors complete
@asynccontextmanager @asynccontextmanager
async def open_sidepane( async def open_fsp_sidepane(
linked: LinkedSplits, linked: LinkedSplits,
conf: dict[str, dict[str, str]], conf: dict[str, dict[str, str]],
@ -403,8 +513,11 @@ async def open_sidepane(
} }
# add parameters for selection "options" # add parameters for selection "options"
defaults = config.get('params', {}) params = config.get('params', {})
for name, default in defaults.items(): for name, config in params.items():
default = config['default_value']
kwargs = config.get('widget_kwargs', {})
# add to ORM schema # add to ORM schema
schema.update({ schema.update({
@ -412,6 +525,7 @@ async def open_sidepane(
'label': f'**{name}**:', 'label': f'**{name}**:',
'type': 'edit', 'type': 'edit',
'default_value': default, 'default_value': default,
'kwargs': kwargs,
}, },
}) })
@ -424,7 +538,7 @@ async def open_sidepane(
FspConfig = create_model( FspConfig = create_model(
'FspConfig', 'FspConfig',
name=display_name, name=display_name,
**defaults, **params,
) )
sidepane.model = FspConfig() sidepane.model = FspConfig()
@ -444,16 +558,96 @@ async def open_sidepane(
yield sidepane yield sidepane
async def run_fsp( async def open_fspd_cluster(
linkedsplits: LinkedSplits,
fsps: dict[str, str],
sym: str,
src_shm: list,
brokermod: ModuleType,
group_status_key: str,
loglevel: str,
# this con
display_in_own_task: bool = False,
) -> None:
'''Create sub-actors (under flat tree)
for each entry in config and attach to local graphics update tasks.
Pass target entrypoint and historical data.
'''
linkedsplits.focus()
# spawns sub-processes which execute cpu bound fsp work
# which is streamed back to this parent.
async with (
tractor.open_nursery() as n,
trio.open_nursery() as ln,
):
# Currently we spawn an actor per fsp chain but
# likely we'll want to pool them eventually to
# scale horizonatlly once cores are used up.
for display_name, conf in fsps.items():
func_name = conf['func_name']
shm, opened = maybe_mk_fsp_shm(
sym,
field_name=func_name,
display_name=display_name,
readonly=True,
)
# XXX: fsp may have been opened by a duplicate chart.
# Error for now until we figure out how to wrap fsps as
# "feeds". assert opened, f"A chart for {key} likely
# already exists?"
portal = await n.start_actor(
enable_modules=['piker.fsp._engine'],
name='fsp.' + display_name,
)
# init async
ln.start_soon(
partial(
update_chart_from_fsp,
portal,
linkedsplits,
brokermod,
sym,
src_shm,
func_name,
display_name,
conf=conf,
shm=shm,
is_overlay=conf.get('overlay', False),
group_status_key=group_status_key,
loglevel=loglevel,
)
)
# blocks here until all fsp actors complete
async def update_chart_from_fsp(
portal: tractor.Portal,
portal: tractor._portal.Portal,
linkedsplits: LinkedSplits, linkedsplits: LinkedSplits,
brokermod: ModuleType, brokermod: ModuleType,
sym: str, sym: str,
src_shm: ShmArray, src_shm: ShmArray,
fsp_func_name: str, func_name: str,
display_name: str, display_name: str,
conf: dict[str, dict], conf: dict[str, dict],
shm: ShmArray,
is_overlay: bool,
group_status_key: str, group_status_key: str,
loglevel: str, loglevel: str,
@ -478,22 +672,17 @@ async def run_fsp(
# name as title of sub-chart # name as title of sub-chart
brokername=brokermod.name, brokername=brokermod.name,
src_shm_token=src_shm.token, src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token, dst_shm_token=shm.token,
symbol=sym, symbol=sym,
func_name=fsp_func_name, func_name=func_name,
loglevel=loglevel, loglevel=loglevel,
) as (ctx, last_index), ) as (ctx, last_index),
ctx.open_stream() as stream, ctx.open_stream() as stream,
open_sidepane(
linkedsplits, open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane,
{display_name: conf},
) as sidepane,
): ):
if is_overlay:
shm = conf['shm']
if conf.get('overlay'):
chart = linkedsplits.chart chart = linkedsplits.chart
chart.draw_curve( chart.draw_curve(
name=display_name, name=display_name,
@ -501,15 +690,15 @@ async def run_fsp(
overlay=True, overlay=True,
color='default_light', color='default_light',
) )
last_val_sticky = None # specially store ref to shm for lookup in display loop
chart._overlays[display_name] = shm
else: else:
chart = linkedsplits.add_plot( chart = linkedsplits.add_plot(
name=display_name, name=display_name,
array=shm.array, array=shm.array,
array_key=conf['fsp_func_name'], array_key=conf['func_name'],
sidepane=sidepane, sidepane=sidepane,
# curve by default # curve by default
@ -528,30 +717,17 @@ async def run_fsp(
# should **not** be the same sub-chart widget # should **not** be the same sub-chart widget
assert chart.name != linkedsplits.chart.name assert chart.name != linkedsplits.chart.name
# sticky only on sub-charts atm array_key = func_name
last_val_sticky = chart._ysticks[chart.name]
# read from last calculated value # first UI update, usually from shm pushed history
array = shm.array update_fsp_chart(
chart,
# XXX: fsp func names must be unique meaning we don't have shm,
# duplicates of the underlying data even if multiple
# sub-charts reference it under different 'named charts'.
value = array[fsp_func_name][-1]
last_val_sticky.update_from_data(-1, value)
chart.linked.focus()
# works also for overlays in which case data is looked up from
# internal chart array set....
chart.update_curve_from_array(
display_name, display_name,
shm.array, array_key=array_key,
array_key=fsp_func_name
) )
chart.linked.resize_sidepanes() chart.linked.focus()
# TODO: figure out if we can roll our own `FillToThreshold` to # TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions. # get brush filled polygons for OS/OB conditions.
@ -559,11 +735,11 @@ async def run_fsp(
# generic fills between curve types while ``PlotCurveItem`` has # generic fills between curve types while ``PlotCurveItem`` has
# logic inside ``.paint()`` for ``self.opts['fillLevel']`` which # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
# might be the best solution? # might be the best solution?
# graphics = chart.update_from_array(chart.name, array[fsp_func_name]) # graphics = chart.update_from_array(chart.name, array[func_name])
# graphics.curve.setBrush(50, 50, 200, 100) # graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50) # graphics.curve.setFillLevel(50)
if fsp_func_name == 'rsi': if func_name == 'rsi':
from ._lines import level_line from ._lines import level_line
# add moveable over-[sold/bought] lines # add moveable over-[sold/bought] lines
# and labels only for the 70/30 lines # and labels only for the 70/30 lines
@ -574,58 +750,39 @@ async def run_fsp(
chart._set_yrange() chart._set_yrange()
last = time.time()
done() done()
chart.linked.resize_sidepanes()
# i = 0
# update chart graphics # update chart graphics
i = 0
last = time.time()
async for value in stream: async for value in stream:
# chart isn't actively shown so just skip render cycle # chart isn't actively shown so just skip render cycle
if chart.linked.isHidden(): if chart.linked.isHidden():
# print(f'{i} unseen fsp cyclce') print(f'{i} unseen fsp cyclce')
# i += 1 i += 1
continue continue
now = time.time() else:
period = now - last now = time.time()
period = now - last
# if period <= 1/30: if period <= 1/_quote_throttle_rate:
if period <= 1/_clear_throttle_rate: # faster then display refresh rate
# faster then display refresh rate print(f'fsp too fast: {1/period}')
# print(f'fsp too fast: {1/period}')
continue
# TODO: provide a read sync mechanism to avoid this polling.
# the underlying issue is that a backfill and subsequent shm
# array first/last index update could result in an empty array
# read here since the stream is never torn down on the
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][fsp_func_name]
break
except IndexError:
read_tries -= 1
continue continue
if last_val_sticky: # run synchronous update
last_val_sticky.update_from_data(-1, value) update_fsp_chart(
chart,
shm,
display_name,
array_key=func_name,
)
# update graphics # set time of last graphics update
chart.update_curve_from_array( last = time.time()
display_name,
array,
array_key=fsp_func_name,
)
# set time of last graphics update
last = time.time()
async def check_for_new_bars(feed, ohlcv, linkedsplits): async def check_for_new_bars(feed, ohlcv, linkedsplits):
@ -668,6 +825,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits):
just_history=False, just_history=False,
) )
# main chart overlays
for name in price_chart._overlays: for name in price_chart._overlays:
price_chart.update_curve_from_array( price_chart.update_curve_from_array(
@ -675,6 +833,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits):
price_chart._arrays[name] price_chart._arrays[name]
) )
# each subplot
for name, chart in linkedsplits.subplots.items(): for name, chart in linkedsplits.subplots.items():
chart.update_curve_from_array( chart.update_curve_from_array(
chart.name, chart.name,
@ -707,12 +866,26 @@ async def maybe_open_vlm_display(
yield yield
return return
else: else:
async with open_sidepane(
shm, opened = maybe_mk_fsp_shm(
linked.symbol.key,
'$_vlm',
readonly=True,
)
async with open_fsp_sidepane(
linked, { linked, {
'volume': { 'volume': {
'params': { 'params': {
'price_func': 'ohl3'
} 'price_func': {
'default_value': 'ohl3',
# tell target ``Edit`` widget to not allow
# edits for now.
'widget_kwargs': {'readonly': True},
},
},
} }
}, },
) as sidepane: ) as sidepane:
@ -813,7 +986,7 @@ async def display_symbol_data(
loglevel=loglevel, loglevel=loglevel,
# 60 FPS to limit context switches # 60 FPS to limit context switches
tick_throttle=_clear_throttle_rate, tick_throttle=_quote_throttle_rate,
) as feed, ) as feed,
): ):
@ -868,20 +1041,39 @@ async def display_symbol_data(
# TODO: eventually we'll support some kind of n-compose syntax # TODO: eventually we'll support some kind of n-compose syntax
fsp_conf = { fsp_conf = {
# 'rsi': {
# 'fsp_func_name': 'rsi', 'rsi': {
# 'params': {'period': 14},
# 'chart_kwargs': { # literal python func ref lookup name
# 'static_yrange': (0, 100), 'func_name': 'rsi',
# },
# }, # map of parameters to place on the fsp sidepane widget
# which should map to dynamic inputs available to the
# fsp function at runtime.
'params': {
'period': {
'default_value': 14,
'widget_kwargs': {'readonly': True},
},
},
# ``ChartPlotWidget`` options passthrough
'chart_kwargs': {
'static_yrange': (0, 100),
},
},
} }
if has_vlm(ohlcv): if has_vlm(ohlcv): # and provider != 'binance':
# binance is too fast atm for FSPs until we wrap
# the fsp streams as throttled ``Feeds``, see
#
# add VWAP to fsp config for downstream loading # add VWAP to fsp config for downstream loading
fsp_conf.update({ fsp_conf.update({
'vwap': { 'vwap': {
'fsp_func_name': 'vwap', 'func_name': 'vwap',
'overlay': True, 'overlay': True,
'anchor': 'session', 'anchor': 'session',
}, },
@ -900,7 +1092,7 @@ async def display_symbol_data(
): ):
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
ln.start_soon( ln.start_soon(
fan_out_spawn_fsp_daemons, open_fspd_cluster,
linkedsplits, linkedsplits,
fsp_conf, fsp_conf,
sym, sym,
@ -913,7 +1105,7 @@ async def display_symbol_data(
# start graphics update loop(s)after receiving first live quote # start graphics update loop(s)after receiving first live quote
ln.start_soon( ln.start_soon(
update_chart_from_quotes, update_chart_from_quotes,
chart, linkedsplits,
feed.stream, feed.stream,
ohlcv, ohlcv,
wap_in_history, wap_in_history,