Factor FSP subplot update code into func

This is in prep toward doing fsp graphics updates from the main quotes
update loop (where OHLC and volume are done). Updating fsp output from
that task should, for the majority of cases, be fine presuming the
processing is derived from the quote stream as a source. Further,
calling an update function on each fsp subplot/overlay is of course
faster then a full task switch - which is how it currently works with
a separate stream for every fsp output. This also will let us delay
adding full `Feed` support around fsp streams for the moment while still
getting quote throttling dictated by the quote stream.

Going forward, We can still support a separate task/fsp stream for
updates as needed (ex. some kind of fast external data source that isn't
synced with price data) but it should be enabled as needed required by
the user.
windows_testing_volume
Tyler Goodlet 2021-09-26 12:26:09 -04:00
parent 6751840568
commit 549ff4ef11
1 changed files with 161 additions and 139 deletions

View File

@ -20,6 +20,7 @@ Real-time display tasks for charting / graphics.
''' '''
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
# from pprint import pformat # from pprint import pformat
from functools import partial
import time import time
from types import ModuleType from types import ModuleType
from typing import Optional from typing import Optional
@ -341,76 +342,6 @@ def maybe_mk_fsp_shm(
return shm, opened return shm, opened
async def open_fspd_cluster(
linkedsplits: LinkedSplits,
fsps: dict[str, str],
sym: str,
src_shm: list,
brokermod: ModuleType,
group_status_key: str,
loglevel: str,
) -> None:
'''Create financial signal processing sub-actors (under flat tree)
for each entry in config and attach to local graphics update tasks.
Pass target entrypoint and historical data.
'''
linkedsplits.focus()
# spawns sub-processes which execute cpu bound fsp work
# which is streamed back to this parent.
async with (
tractor.open_nursery() as n,
trio.open_nursery() as ln,
):
# Currently we spawn an actor per fsp chain but
# likely we'll want to pool them eventually to
# scale horizonatlly once cores are used up.
for display_name, conf in fsps.items():
func_name = conf['func_name']
shm, opened = maybe_mk_fsp_shm(
sym,
field_name=func_name,
display_name=display_name,
readonly=True,
)
# XXX: fsp may have been opened by a duplicate chart.
# Error for now until we figure out how to wrap fsps as
# "feeds". assert opened, f"A chart for {key} likely
# already exists?"
conf['shm'] = shm
portal = await n.start_actor(
enable_modules=['piker.fsp._engine'],
name='fsp.' + display_name,
)
# init async
ln.start_soon(
update_chart_from_fsp,
portal,
linkedsplits,
brokermod,
sym,
src_shm,
func_name,
display_name,
conf,
group_status_key,
loglevel,
)
# blocks here until all fsp actors complete
@asynccontextmanager @asynccontextmanager
async def open_fsp_sidepane( async def open_fsp_sidepane(
@ -477,6 +408,84 @@ async def open_fsp_sidepane(
yield sidepane yield sidepane
async def open_fspd_cluster(
linkedsplits: LinkedSplits,
fsps: dict[str, str],
sym: str,
src_shm: list,
brokermod: ModuleType,
group_status_key: str,
loglevel: str,
# this con
display_in_own_task: bool = False,
) -> None:
'''Create sub-actors (under flat tree)
for each entry in config and attach to local graphics update tasks.
Pass target entrypoint and historical data.
'''
linkedsplits.focus()
# spawns sub-processes which execute cpu bound fsp work
# which is streamed back to this parent.
async with (
tractor.open_nursery() as n,
trio.open_nursery() as ln,
):
# Currently we spawn an actor per fsp chain but
# likely we'll want to pool them eventually to
# scale horizonatlly once cores are used up.
for display_name, conf in fsps.items():
func_name = conf['func_name']
shm, opened = maybe_mk_fsp_shm(
sym,
field_name=func_name,
display_name=display_name,
readonly=True,
)
# XXX: fsp may have been opened by a duplicate chart.
# Error for now until we figure out how to wrap fsps as
# "feeds". assert opened, f"A chart for {key} likely
# already exists?"
# conf['shm'] = shm
portal = await n.start_actor(
enable_modules=['piker.fsp._engine'],
name='fsp.' + display_name,
)
# init async
ln.start_soon(
partial(
update_chart_from_fsp,
portal,
linkedsplits,
brokermod,
sym,
src_shm,
func_name,
display_name,
conf=conf,
shm=shm,
is_overlay=conf.get('overlay', False),
group_status_key=group_status_key,
loglevel=loglevel,
)
)
# blocks here until all fsp actors complete
async def update_chart_from_fsp( async def update_chart_from_fsp(
portal: tractor.Portal, portal: tractor.Portal,
@ -488,6 +497,10 @@ async def update_chart_from_fsp(
func_name: str, func_name: str,
display_name: str, display_name: str,
conf: dict[str, dict], conf: dict[str, dict],
shm: ShmArray,
is_overlay: bool,
group_status_key: str, group_status_key: str,
loglevel: str, loglevel: str,
@ -512,7 +525,7 @@ async def update_chart_from_fsp(
# name as title of sub-chart # name as title of sub-chart
brokername=brokermod.name, brokername=brokermod.name,
src_shm_token=src_shm.token, src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token, dst_shm_token=shm.token,
symbol=sym, symbol=sym,
func_name=func_name, func_name=func_name,
loglevel=loglevel, loglevel=loglevel,
@ -520,14 +533,9 @@ async def update_chart_from_fsp(
) as (ctx, last_index), ) as (ctx, last_index),
ctx.open_stream() as stream, ctx.open_stream() as stream,
open_fsp_sidepane( open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane,
linkedsplits,
{display_name: conf},
) as sidepane,
): ):
shm = conf['shm'] if is_overlay:
if conf.get('overlay'):
chart = linkedsplits.chart chart = linkedsplits.chart
chart.draw_curve( chart.draw_curve(
name=display_name, name=display_name,
@ -535,10 +543,8 @@ async def update_chart_from_fsp(
overlay=True, overlay=True,
color='default_light', color='default_light',
) )
last_val_sticky = None
else: else:
chart = linkedsplits.add_plot( chart = linkedsplits.add_plot(
name=display_name, name=display_name,
array=shm.array, array=shm.array,
@ -562,30 +568,17 @@ async def update_chart_from_fsp(
# should **not** be the same sub-chart widget # should **not** be the same sub-chart widget
assert chart.name != linkedsplits.chart.name assert chart.name != linkedsplits.chart.name
# sticky only on sub-charts atm
last_val_sticky = chart._ysticks[chart.name]
# read from last calculated value
array = shm.array
# XXX: fsp func names must be unique meaning we don't have
# duplicates of the underlying data even if multiple
# sub-charts reference it under different 'named charts'.
value = array[func_name][-1]
last_val_sticky.update_from_data(-1, value)
chart.linked.focus()
# works also for overlays in which case data is looked up from
# internal chart array set....
chart.update_curve_from_array(
display_name,
shm.array,
array_key = func_name array_key = func_name
# first UI update, usually from shm pushed history
update_fsp_chart(
chart,
shm,
display_name,
array_key=array_key,
) )
chart.linked.resize_sidepanes() chart.linked.focus()
# TODO: figure out if we can roll our own `FillToThreshold` to # TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions. # get brush filled polygons for OS/OB conditions.
@ -608,53 +601,34 @@ async def update_chart_from_fsp(
chart._set_yrange() chart._set_yrange()
last = time.time()
done() done()
chart.linked.resize_sidepanes()
# i = 0
# update chart graphics # update chart graphics
i = 0
last = time.time()
async for value in stream: async for value in stream:
# chart isn't actively shown so just skip render cycle # chart isn't actively shown so just skip render cycle
if chart.linked.isHidden(): if chart.linked.isHidden():
# print(f'{i} unseen fsp cyclce') print(f'{i} unseen fsp cyclce')
# i += 1 i += 1
continue continue
else:
now = time.time() now = time.time()
period = now - last period = now - last
# if period <= 1/30:
if period <= 1/_clear_throttle_rate: if period <= 1/_clear_throttle_rate:
# faster then display refresh rate # faster then display refresh rate
# print(f'fsp too fast: {1/period}') print(f'fsp too fast: {1/period}')
continue continue
# TODO: provide a read sync mechanism to avoid this polling. # run synchronous update
# the underlying issue is that a backfill and subsequent shm update_fsp_chart(
# array first/last index update could result in an empty array chart,
# read here since the stream is never torn down on the shm,
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][func_name]
break
except IndexError:
read_tries -= 1
continue
if last_val_sticky:
last_val_sticky.update_from_data(-1, value)
# update graphics
chart.update_curve_from_array(
display_name, display_name,
array,
array_key=func_name, array_key=func_name,
) )
@ -662,6 +636,52 @@ async def update_chart_from_fsp(
last = time.time() last = time.time()
def update_fsp_chart(
chart: ChartPlotWidget,
shm: ShmArray,
display_name: str,
array_key: str,
) -> None:
array = shm.array
# XXX: is this a problem any more after porting to the
# ``tractor.Context`` api?
# TODO: provide a read sync mechanism to avoid this polling. the
# underlying issue is that a backfill (aka prepend) and subsequent
# shm array first/last index update could result in an empty array
# read here since the stream is never torn down on the re-compute
# steps.
# read_tries = 2
# while read_tries > 0:
# try:
# # read last
# array = shm.array
# value = array[-1][array_key]
# break
# except IndexError:
# read_tries -= 1
# continue
# update graphics
chart.update_curve_from_array(
display_name,
array,
array_key=array_key,
)
last_val_sticky = chart._ysticks.get(display_name)
if last_val_sticky:
# read from last calculated value
# XXX: fsp func names must be unique meaning we don't have
# duplicates of the underlying data even if multiple
# sub-charts reference it under different 'named charts'.
value = shm.array[array_key][-1]
last_val_sticky.update_from_data(-1, value)
async def check_for_new_bars(feed, ohlcv, linkedsplits): async def check_for_new_bars(feed, ohlcv, linkedsplits):
"""Task which updates from new bars in the shared ohlcv buffer every """Task which updates from new bars in the shared ohlcv buffer every
``delay_s`` seconds. ``delay_s`` seconds.
@ -702,6 +722,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits):
just_history=False, just_history=False,
) )
# main chart overlays
for name in price_chart._overlays: for name in price_chart._overlays:
price_chart.update_curve_from_array( price_chart.update_curve_from_array(
@ -709,6 +730,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits):
price_chart._arrays[name] price_chart._arrays[name]
) )
# each subplot
for name, chart in linkedsplits.subplots.items(): for name, chart in linkedsplits.subplots.items():
chart.update_curve_from_array( chart.update_curve_from_array(
chart.name, chart.name,