Factor FSP subplot update code into func
This is in prep toward doing fsp graphics updates from the main quotes update loop (where OHLC and volume are done). Updating fsp output from that task should, for the majority of cases, be fine presuming the processing is derived from the quote stream as a source. Further, calling an update function on each fsp subplot/overlay is of course faster then a full task switch - which is how it currently works with a separate stream for every fsp output. This also will let us delay adding full `Feed` support around fsp streams for the moment while still getting quote throttling dictated by the quote stream. Going forward, We can still support a separate task/fsp stream for updates as needed (ex. some kind of fast external data source that isn't synced with price data) but it should be enabled as needed required by the user.single_display_update_loop
parent
4ea42a0a7e
commit
96937829eb
|
@ -20,6 +20,7 @@ Real-time display tasks for charting / graphics.
|
|||
'''
|
||||
from contextlib import asynccontextmanager
|
||||
# from pprint import pformat
|
||||
from functools import partial
|
||||
import time
|
||||
from types import ModuleType
|
||||
from typing import Optional
|
||||
|
@ -341,76 +342,6 @@ def maybe_mk_fsp_shm(
|
|||
return shm, opened
|
||||
|
||||
|
||||
async def open_fspd_cluster(
|
||||
|
||||
linkedsplits: LinkedSplits,
|
||||
fsps: dict[str, str],
|
||||
sym: str,
|
||||
src_shm: list,
|
||||
brokermod: ModuleType,
|
||||
group_status_key: str,
|
||||
loglevel: str,
|
||||
|
||||
) -> None:
|
||||
'''Create financial signal processing sub-actors (under flat tree)
|
||||
for each entry in config and attach to local graphics update tasks.
|
||||
|
||||
Pass target entrypoint and historical data.
|
||||
|
||||
'''
|
||||
linkedsplits.focus()
|
||||
|
||||
# spawns sub-processes which execute cpu bound fsp work
|
||||
# which is streamed back to this parent.
|
||||
async with (
|
||||
tractor.open_nursery() as n,
|
||||
trio.open_nursery() as ln,
|
||||
):
|
||||
|
||||
# Currently we spawn an actor per fsp chain but
|
||||
# likely we'll want to pool them eventually to
|
||||
# scale horizonatlly once cores are used up.
|
||||
for display_name, conf in fsps.items():
|
||||
|
||||
func_name = conf['func_name']
|
||||
|
||||
shm, opened = maybe_mk_fsp_shm(
|
||||
sym,
|
||||
field_name=func_name,
|
||||
display_name=display_name,
|
||||
readonly=True,
|
||||
)
|
||||
|
||||
# XXX: fsp may have been opened by a duplicate chart.
|
||||
# Error for now until we figure out how to wrap fsps as
|
||||
# "feeds". assert opened, f"A chart for {key} likely
|
||||
# already exists?"
|
||||
|
||||
conf['shm'] = shm
|
||||
|
||||
portal = await n.start_actor(
|
||||
enable_modules=['piker.fsp._engine'],
|
||||
name='fsp.' + display_name,
|
||||
)
|
||||
|
||||
# init async
|
||||
ln.start_soon(
|
||||
update_chart_from_fsp,
|
||||
portal,
|
||||
linkedsplits,
|
||||
brokermod,
|
||||
sym,
|
||||
src_shm,
|
||||
func_name,
|
||||
display_name,
|
||||
conf,
|
||||
group_status_key,
|
||||
loglevel,
|
||||
)
|
||||
|
||||
# blocks here until all fsp actors complete
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_fsp_sidepane(
|
||||
|
||||
|
@ -477,6 +408,84 @@ async def open_fsp_sidepane(
|
|||
yield sidepane
|
||||
|
||||
|
||||
async def open_fspd_cluster(
|
||||
|
||||
linkedsplits: LinkedSplits,
|
||||
fsps: dict[str, str],
|
||||
sym: str,
|
||||
src_shm: list,
|
||||
brokermod: ModuleType,
|
||||
group_status_key: str,
|
||||
loglevel: str,
|
||||
|
||||
# this con
|
||||
display_in_own_task: bool = False,
|
||||
|
||||
) -> None:
|
||||
'''Create sub-actors (under flat tree)
|
||||
for each entry in config and attach to local graphics update tasks.
|
||||
|
||||
Pass target entrypoint and historical data.
|
||||
|
||||
'''
|
||||
linkedsplits.focus()
|
||||
|
||||
# spawns sub-processes which execute cpu bound fsp work
|
||||
# which is streamed back to this parent.
|
||||
async with (
|
||||
tractor.open_nursery() as n,
|
||||
trio.open_nursery() as ln,
|
||||
):
|
||||
|
||||
# Currently we spawn an actor per fsp chain but
|
||||
# likely we'll want to pool them eventually to
|
||||
# scale horizonatlly once cores are used up.
|
||||
for display_name, conf in fsps.items():
|
||||
|
||||
func_name = conf['func_name']
|
||||
|
||||
shm, opened = maybe_mk_fsp_shm(
|
||||
sym,
|
||||
field_name=func_name,
|
||||
display_name=display_name,
|
||||
readonly=True,
|
||||
)
|
||||
|
||||
# XXX: fsp may have been opened by a duplicate chart.
|
||||
# Error for now until we figure out how to wrap fsps as
|
||||
# "feeds". assert opened, f"A chart for {key} likely
|
||||
# already exists?"
|
||||
|
||||
# conf['shm'] = shm
|
||||
|
||||
portal = await n.start_actor(
|
||||
enable_modules=['piker.fsp._engine'],
|
||||
name='fsp.' + display_name,
|
||||
)
|
||||
|
||||
# init async
|
||||
ln.start_soon(
|
||||
partial(
|
||||
update_chart_from_fsp,
|
||||
|
||||
portal,
|
||||
linkedsplits,
|
||||
brokermod,
|
||||
sym,
|
||||
src_shm,
|
||||
func_name,
|
||||
display_name,
|
||||
conf=conf,
|
||||
shm=shm,
|
||||
is_overlay=conf.get('overlay', False),
|
||||
group_status_key=group_status_key,
|
||||
loglevel=loglevel,
|
||||
)
|
||||
)
|
||||
|
||||
# blocks here until all fsp actors complete
|
||||
|
||||
|
||||
async def update_chart_from_fsp(
|
||||
|
||||
portal: tractor.Portal,
|
||||
|
@ -488,6 +497,10 @@ async def update_chart_from_fsp(
|
|||
func_name: str,
|
||||
display_name: str,
|
||||
conf: dict[str, dict],
|
||||
|
||||
shm: ShmArray,
|
||||
is_overlay: bool,
|
||||
|
||||
group_status_key: str,
|
||||
loglevel: str,
|
||||
|
||||
|
@ -512,7 +525,7 @@ async def update_chart_from_fsp(
|
|||
# name as title of sub-chart
|
||||
brokername=brokermod.name,
|
||||
src_shm_token=src_shm.token,
|
||||
dst_shm_token=conf['shm'].token,
|
||||
dst_shm_token=shm.token,
|
||||
symbol=sym,
|
||||
func_name=func_name,
|
||||
loglevel=loglevel,
|
||||
|
@ -520,14 +533,9 @@ async def update_chart_from_fsp(
|
|||
) as (ctx, last_index),
|
||||
ctx.open_stream() as stream,
|
||||
|
||||
open_fsp_sidepane(
|
||||
linkedsplits,
|
||||
{display_name: conf},
|
||||
) as sidepane,
|
||||
open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane,
|
||||
):
|
||||
shm = conf['shm']
|
||||
|
||||
if conf.get('overlay'):
|
||||
if is_overlay:
|
||||
chart = linkedsplits.chart
|
||||
chart.draw_curve(
|
||||
name=display_name,
|
||||
|
@ -535,10 +543,8 @@ async def update_chart_from_fsp(
|
|||
overlay=True,
|
||||
color='default_light',
|
||||
)
|
||||
last_val_sticky = None
|
||||
|
||||
else:
|
||||
|
||||
chart = linkedsplits.add_plot(
|
||||
name=display_name,
|
||||
array=shm.array,
|
||||
|
@ -562,30 +568,17 @@ async def update_chart_from_fsp(
|
|||
# should **not** be the same sub-chart widget
|
||||
assert chart.name != linkedsplits.chart.name
|
||||
|
||||
# sticky only on sub-charts atm
|
||||
last_val_sticky = chart._ysticks[chart.name]
|
||||
array_key = func_name
|
||||
|
||||
# read from last calculated value
|
||||
array = shm.array
|
||||
|
||||
# XXX: fsp func names must be unique meaning we don't have
|
||||
# duplicates of the underlying data even if multiple
|
||||
# sub-charts reference it under different 'named charts'.
|
||||
value = array[func_name][-1]
|
||||
|
||||
last_val_sticky.update_from_data(-1, value)
|
||||
|
||||
chart.linked.focus()
|
||||
|
||||
# works also for overlays in which case data is looked up from
|
||||
# internal chart array set....
|
||||
chart.update_curve_from_array(
|
||||
# first UI update, usually from shm pushed history
|
||||
update_fsp_chart(
|
||||
chart,
|
||||
shm,
|
||||
display_name,
|
||||
shm.array,
|
||||
array_key=func_name
|
||||
array_key=array_key,
|
||||
)
|
||||
|
||||
chart.linked.resize_sidepanes()
|
||||
chart.linked.focus()
|
||||
|
||||
# TODO: figure out if we can roll our own `FillToThreshold` to
|
||||
# get brush filled polygons for OS/OB conditions.
|
||||
|
@ -608,53 +601,34 @@ async def update_chart_from_fsp(
|
|||
|
||||
chart._set_yrange()
|
||||
|
||||
last = time.time()
|
||||
|
||||
done()
|
||||
chart.linked.resize_sidepanes()
|
||||
|
||||
# i = 0
|
||||
# update chart graphics
|
||||
i = 0
|
||||
last = time.time()
|
||||
async for value in stream:
|
||||
|
||||
# chart isn't actively shown so just skip render cycle
|
||||
if chart.linked.isHidden():
|
||||
# print(f'{i} unseen fsp cyclce')
|
||||
# i += 1
|
||||
print(f'{i} unseen fsp cyclce')
|
||||
i += 1
|
||||
continue
|
||||
|
||||
else:
|
||||
now = time.time()
|
||||
period = now - last
|
||||
|
||||
# if period <= 1/30:
|
||||
if period <= 1/_clear_throttle_rate:
|
||||
# faster then display refresh rate
|
||||
# print(f'fsp too fast: {1/period}')
|
||||
print(f'fsp too fast: {1/period}')
|
||||
continue
|
||||
|
||||
# TODO: provide a read sync mechanism to avoid this polling.
|
||||
# the underlying issue is that a backfill and subsequent shm
|
||||
# array first/last index update could result in an empty array
|
||||
# read here since the stream is never torn down on the
|
||||
# re-compute steps.
|
||||
read_tries = 2
|
||||
while read_tries > 0:
|
||||
try:
|
||||
# read last
|
||||
array = shm.array
|
||||
value = array[-1][func_name]
|
||||
break
|
||||
|
||||
except IndexError:
|
||||
read_tries -= 1
|
||||
continue
|
||||
|
||||
if last_val_sticky:
|
||||
last_val_sticky.update_from_data(-1, value)
|
||||
|
||||
# update graphics
|
||||
chart.update_curve_from_array(
|
||||
# run synchronous update
|
||||
update_fsp_chart(
|
||||
chart,
|
||||
shm,
|
||||
display_name,
|
||||
array,
|
||||
array_key=func_name,
|
||||
)
|
||||
|
||||
|
@ -662,6 +636,52 @@ async def update_chart_from_fsp(
|
|||
last = time.time()
|
||||
|
||||
|
||||
def update_fsp_chart(
|
||||
chart: ChartPlotWidget,
|
||||
shm: ShmArray,
|
||||
display_name: str,
|
||||
array_key: str,
|
||||
|
||||
) -> None:
|
||||
|
||||
array = shm.array
|
||||
|
||||
# XXX: is this a problem any more after porting to the
|
||||
# ``tractor.Context`` api?
|
||||
# TODO: provide a read sync mechanism to avoid this polling. the
|
||||
# underlying issue is that a backfill (aka prepend) and subsequent
|
||||
# shm array first/last index update could result in an empty array
|
||||
# read here since the stream is never torn down on the re-compute
|
||||
# steps.
|
||||
# read_tries = 2
|
||||
# while read_tries > 0:
|
||||
# try:
|
||||
# # read last
|
||||
# array = shm.array
|
||||
# value = array[-1][array_key]
|
||||
# break
|
||||
|
||||
# except IndexError:
|
||||
# read_tries -= 1
|
||||
# continue
|
||||
|
||||
# update graphics
|
||||
chart.update_curve_from_array(
|
||||
display_name,
|
||||
array,
|
||||
array_key=array_key,
|
||||
)
|
||||
|
||||
last_val_sticky = chart._ysticks.get(display_name)
|
||||
if last_val_sticky:
|
||||
# read from last calculated value
|
||||
# XXX: fsp func names must be unique meaning we don't have
|
||||
# duplicates of the underlying data even if multiple
|
||||
# sub-charts reference it under different 'named charts'.
|
||||
value = shm.array[array_key][-1]
|
||||
last_val_sticky.update_from_data(-1, value)
|
||||
|
||||
|
||||
async def check_for_new_bars(feed, ohlcv, linkedsplits):
|
||||
"""Task which updates from new bars in the shared ohlcv buffer every
|
||||
``delay_s`` seconds.
|
||||
|
@ -702,6 +722,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits):
|
|||
just_history=False,
|
||||
)
|
||||
|
||||
# main chart overlays
|
||||
for name in price_chart._overlays:
|
||||
|
||||
price_chart.update_curve_from_array(
|
||||
|
@ -709,6 +730,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits):
|
|||
price_chart._arrays[name]
|
||||
)
|
||||
|
||||
# each subplot
|
||||
for name, chart in linkedsplits.subplots.items():
|
||||
chart.update_curve_from_array(
|
||||
chart.name,
|
||||
|
|
Loading…
Reference in New Issue