Move FSP related graphics management into new mod
parent
b7f27f201f
commit
7a41c83f84
|
@ -29,7 +29,6 @@ from types import ModuleType
|
|||
from typing import Optional, AsyncGenerator
|
||||
|
||||
import numpy as np
|
||||
from pydantic import create_model
|
||||
import pyqtgraph as pg
|
||||
import tractor
|
||||
import trio
|
||||
|
@ -45,12 +44,17 @@ from ._chart import (
|
|||
)
|
||||
from .. import fsp
|
||||
from ._l1 import L1Labels
|
||||
from ..data._sharedmem import ShmArray, maybe_open_shm_array
|
||||
from ._fsp import (
|
||||
update_fsp_chart,
|
||||
maybe_mk_fsp_shm,
|
||||
open_fsp_sidepane,
|
||||
has_vlm,
|
||||
maybe_open_vlm_display,
|
||||
)
|
||||
from ..data._sharedmem import ShmArray, try_read
|
||||
from ._forms import (
|
||||
FieldsForm,
|
||||
mk_form,
|
||||
mk_order_pane_layout,
|
||||
open_form_input_handling,
|
||||
)
|
||||
from .order_mode import open_order_mode
|
||||
from ..log import get_logger
|
||||
|
@ -61,78 +65,6 @@ log = get_logger(__name__)
|
|||
_quote_throttle_rate: int = 58 # Hz
|
||||
|
||||
|
||||
def try_read(
|
||||
array: np.ndarray
|
||||
|
||||
) -> Optional[np.ndarray]:
|
||||
'''
|
||||
Try to read the last row from a shared mem array or ``None``
|
||||
if the array read returns a zero-length array result.
|
||||
|
||||
Can be used to check for backfilling race conditions where an array
|
||||
is currently being (re-)written by a writer actor but the reader is
|
||||
unaware and reads during the window where the first and last indexes
|
||||
are being updated.
|
||||
|
||||
'''
|
||||
try:
|
||||
return array[-1]
|
||||
except IndexError:
|
||||
# XXX: race condition with backfilling shm.
|
||||
#
|
||||
# the underlying issue is that a backfill (aka prepend) and subsequent
|
||||
# shm array first/last index update could result in an empty array
|
||||
# read here since the indices may be updated in such a way that
|
||||
# a read delivers an empty array (though it seems like we
|
||||
# *should* be able to prevent that?). also, as and alt and
|
||||
# something we need anyway, maybe there should be some kind of
|
||||
# signal that a prepend is taking place and this consumer can
|
||||
# respond (eg. redrawing graphics) accordingly.
|
||||
|
||||
# the array read was emtpy
|
||||
return None
|
||||
|
||||
|
||||
def update_fsp_chart(
|
||||
chart: ChartPlotWidget,
|
||||
shm: ShmArray,
|
||||
graphics_name: str,
|
||||
array_key: Optional[str],
|
||||
|
||||
) -> None:
|
||||
|
||||
array = shm.array
|
||||
last_row = try_read(array)
|
||||
|
||||
# guard against unreadable case
|
||||
if not last_row:
|
||||
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
|
||||
return
|
||||
|
||||
# update graphics
|
||||
# NOTE: this does a length check internally which allows it
|
||||
# staying above the last row check below..
|
||||
chart.update_curve_from_array(
|
||||
graphics_name,
|
||||
array,
|
||||
array_key=array_key or graphics_name,
|
||||
)
|
||||
chart.cv._set_yrange()
|
||||
|
||||
# XXX: re: ``array_key``: fsp func names must be unique meaning we
|
||||
# can't have duplicates of the underlying data even if multiple
|
||||
# sub-charts reference it under different 'named charts'.
|
||||
|
||||
# read from last calculated value and update any label
|
||||
last_val_sticky = chart._ysticks.get(graphics_name)
|
||||
if last_val_sticky:
|
||||
# array = shm.array[array_key]
|
||||
# if len(array):
|
||||
# value = array[-1]
|
||||
last = last_row[array_key]
|
||||
last_val_sticky.update_from_data(-1, last)
|
||||
|
||||
|
||||
# a working tick-type-classes template
|
||||
_tick_groups = {
|
||||
'clears': {'trade', 'utrade', 'last'},
|
||||
|
@ -155,7 +87,7 @@ def chart_maxmin(
|
|||
# https://arxiv.org/abs/cs/0610046
|
||||
# https://github.com/lemire/pythonmaxmin
|
||||
|
||||
array = chart._arrays[chart.name]
|
||||
array = chart._arrays['ohlc']
|
||||
ifirst = array[0]['index']
|
||||
|
||||
last_bars_range = chart.bars_range()
|
||||
|
@ -212,7 +144,6 @@ async def update_chart_from_quotes(
|
|||
|
||||
if vlm_chart:
|
||||
vlm_sticky = vlm_chart._ysticks['volume']
|
||||
vlm_view = vlm_chart.view
|
||||
|
||||
maxmin = partial(chart_maxmin, chart, vlm_chart)
|
||||
|
||||
|
@ -249,7 +180,6 @@ async def update_chart_from_quotes(
|
|||
tick_margin = 3 * tick_size
|
||||
|
||||
chart.show()
|
||||
view = chart.view
|
||||
last_quote = time.time()
|
||||
|
||||
async for quotes in stream:
|
||||
|
@ -297,10 +227,8 @@ async def update_chart_from_quotes(
|
|||
mx_vlm_in_view != last_mx_vlm or
|
||||
mx_vlm_in_view > last_mx_vlm
|
||||
):
|
||||
print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
|
||||
vlm_view._set_yrange(
|
||||
yrange=(0, mx_vlm_in_view * 1.375)
|
||||
)
|
||||
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
|
||||
vlm_chart._set_yrange(yrange=(0, mx_vlm_in_view * 1.375))
|
||||
last_mx_vlm = mx_vlm_in_view
|
||||
|
||||
ticks_frame = quote.get('ticks', ())
|
||||
|
@ -416,12 +344,9 @@ async def update_chart_from_quotes(
|
|||
l1.bid_label.update_fields({'level': price, 'size': size})
|
||||
|
||||
# check for y-range re-size
|
||||
if (
|
||||
(mx > last_mx) or (mn < last_mn)
|
||||
and not chart._static_yrange == 'axis'
|
||||
):
|
||||
print(f'new y range: {(mn, mx)}')
|
||||
view._set_yrange(
|
||||
if (mx > last_mx) or (mn < last_mn):
|
||||
# print(f'new y range: {(mn, mx)}')
|
||||
chart._set_yrange(
|
||||
yrange=(mn, mx),
|
||||
# TODO: we should probably scale
|
||||
# the view margin based on the size
|
||||
|
@ -443,7 +368,6 @@ async def update_chart_from_quotes(
|
|||
name,
|
||||
array_key=name,
|
||||
)
|
||||
subchart.cv._set_yrange()
|
||||
|
||||
# TODO: all overlays on all subplots..
|
||||
|
||||
|
@ -455,105 +379,6 @@ async def update_chart_from_quotes(
|
|||
curve_name,
|
||||
array_key=curve_name,
|
||||
)
|
||||
# chart._set_yrange()
|
||||
|
||||
|
||||
def maybe_mk_fsp_shm(
|
||||
sym: str,
|
||||
field_name: str,
|
||||
display_name: Optional[str] = None,
|
||||
readonly: bool = True,
|
||||
|
||||
) -> (ShmArray, bool):
|
||||
'''
|
||||
Allocate a single row shm array for an symbol-fsp pair if none
|
||||
exists, otherwise load the shm already existing for that token.
|
||||
|
||||
'''
|
||||
uid = tractor.current_actor().uid
|
||||
if not display_name:
|
||||
display_name = field_name
|
||||
|
||||
# TODO: load function here and introspect
|
||||
# return stream type(s)
|
||||
|
||||
# TODO: should `index` be a required internal field?
|
||||
fsp_dtype = np.dtype([('index', int), (field_name, float)])
|
||||
|
||||
key = f'{sym}.fsp.{display_name}.{".".join(uid)}'
|
||||
|
||||
shm, opened = maybe_open_shm_array(
|
||||
key,
|
||||
# TODO: create entry for each time frame
|
||||
dtype=fsp_dtype,
|
||||
readonly=True,
|
||||
)
|
||||
return shm, opened
|
||||
|
||||
|
||||
@acm
|
||||
async def open_fsp_sidepane(
|
||||
linked: LinkedSplits,
|
||||
conf: dict[str, dict[str, str]],
|
||||
|
||||
) -> FieldsForm:
|
||||
|
||||
schema = {}
|
||||
|
||||
assert len(conf) == 1 # for now
|
||||
|
||||
# add (single) selection widget
|
||||
for display_name, config in conf.items():
|
||||
schema[display_name] = {
|
||||
'label': '**fsp**:',
|
||||
'type': 'select',
|
||||
'default_value': [display_name],
|
||||
}
|
||||
|
||||
# add parameters for selection "options"
|
||||
params = config.get('params', {})
|
||||
for name, config in params.items():
|
||||
|
||||
default = config['default_value']
|
||||
kwargs = config.get('widget_kwargs', {})
|
||||
|
||||
# add to ORM schema
|
||||
schema.update({
|
||||
name: {
|
||||
'label': f'**{name}**:',
|
||||
'type': 'edit',
|
||||
'default_value': default,
|
||||
'kwargs': kwargs,
|
||||
},
|
||||
})
|
||||
|
||||
sidepane: FieldsForm = mk_form(
|
||||
parent=linked.godwidget,
|
||||
fields_schema=schema,
|
||||
)
|
||||
|
||||
# https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation
|
||||
FspConfig = create_model(
|
||||
'FspConfig',
|
||||
name=display_name,
|
||||
**params,
|
||||
)
|
||||
sidepane.model = FspConfig()
|
||||
|
||||
# just a logger for now until we get fsp configs up and running.
|
||||
async def settings_change(key: str, value: str) -> bool:
|
||||
print(f'{key}: {value}')
|
||||
return True
|
||||
|
||||
# TODO:
|
||||
async with (
|
||||
open_form_input_handling(
|
||||
sidepane,
|
||||
focus_next=linked.godwidget,
|
||||
on_value_change=settings_change,
|
||||
)
|
||||
):
|
||||
yield sidepane
|
||||
|
||||
|
||||
@acm
|
||||
|
@ -799,7 +624,7 @@ async def update_chart_from_fsp(
|
|||
level_line(chart, 70, orient_v='bottom')
|
||||
level_line(chart, 80, orient_v='top')
|
||||
|
||||
chart.cv._set_yrange()
|
||||
chart._set_yrange()
|
||||
done() # status updates
|
||||
|
||||
profiler(f'fsp:{func_name} starting update loop')
|
||||
|
@ -908,93 +733,6 @@ async def check_for_new_bars(
|
|||
price_chart.increment_view()
|
||||
|
||||
|
||||
def has_vlm(ohlcv: ShmArray) -> bool:
|
||||
# make sure that the instrument supports volume history
|
||||
# (sometimes this is not the case for some commodities and
|
||||
# derivatives)
|
||||
volm = ohlcv.array['volume']
|
||||
return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm)))
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_vlm_display(
|
||||
linked: LinkedSplits,
|
||||
ohlcv: ShmArray,
|
||||
|
||||
) -> ChartPlotWidget:
|
||||
|
||||
if not has_vlm(ohlcv):
|
||||
log.warning(f"{linked.symbol.key} does not seem to have volume info")
|
||||
yield
|
||||
return
|
||||
else:
|
||||
|
||||
shm, opened = maybe_mk_fsp_shm(
|
||||
linked.symbol.key,
|
||||
'vlm',
|
||||
readonly=True,
|
||||
)
|
||||
|
||||
async with open_fsp_sidepane(
|
||||
linked, {
|
||||
'vlm': {
|
||||
'params': {
|
||||
'price_func': {
|
||||
'default_value': 'chl3',
|
||||
# tell target ``Edit`` widget to not allow
|
||||
# edits for now.
|
||||
'widget_kwargs': {'readonly': True},
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
) as sidepane:
|
||||
|
||||
# built-in $vlm
|
||||
shm = ohlcv
|
||||
chart = linked.add_plot(
|
||||
name='volume',
|
||||
array=shm.array,
|
||||
|
||||
array_key='volume',
|
||||
sidepane=sidepane,
|
||||
|
||||
# curve by default
|
||||
ohlc=False,
|
||||
|
||||
# Draw vertical bars from zero.
|
||||
# we do this internally ourselves since
|
||||
# the curve item internals are pretty convoluted.
|
||||
style='step',
|
||||
)
|
||||
|
||||
# XXX: ONLY for sub-chart fsps, overlays have their
|
||||
# data looked up from the chart's internal array set.
|
||||
# TODO: we must get a data view api going STAT!!
|
||||
chart._shm = shm
|
||||
|
||||
# should **not** be the same sub-chart widget
|
||||
assert chart.name != linked.chart.name
|
||||
|
||||
# sticky only on sub-charts atm
|
||||
last_val_sticky = chart._ysticks[chart.name]
|
||||
|
||||
# read from last calculated value
|
||||
value = shm.array['volume'][-1]
|
||||
|
||||
last_val_sticky.update_from_data(-1, value)
|
||||
|
||||
chart.update_curve_from_array(
|
||||
'volume',
|
||||
shm.array,
|
||||
)
|
||||
|
||||
# size view to data once at outset
|
||||
chart.cv._set_yrange()
|
||||
|
||||
yield chart
|
||||
|
||||
|
||||
async def display_symbol_data(
|
||||
godwidget: GodWidget,
|
||||
provider: str,
|
||||
|
@ -1079,7 +817,7 @@ async def display_symbol_data(
|
|||
)
|
||||
|
||||
# size view to data once at outset
|
||||
chart.cv._set_yrange()
|
||||
chart._set_yrange()
|
||||
|
||||
# TODO: a data view api that makes this less shit
|
||||
chart._shm = ohlcv
|
||||
|
|
|
@ -0,0 +1,272 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
FSP UI and graphics components.
|
||||
|
||||
Financial signal processing cluster and real-time graphics management.
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
from pydantic import create_model
|
||||
import tractor
|
||||
|
||||
from ..data._sharedmem import (
|
||||
ShmArray,
|
||||
maybe_open_shm_array,
|
||||
try_read,
|
||||
)
|
||||
from ._chart import (
|
||||
ChartPlotWidget,
|
||||
LinkedSplits,
|
||||
# GodWidget,
|
||||
)
|
||||
from ._forms import (
|
||||
FieldsForm,
|
||||
mk_form,
|
||||
open_form_input_handling,
|
||||
)
|
||||
from ..log import get_logger
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
def maybe_mk_fsp_shm(
|
||||
sym: str,
|
||||
field_name: str,
|
||||
display_name: Optional[str] = None,
|
||||
readonly: bool = True,
|
||||
|
||||
) -> (ShmArray, bool):
|
||||
'''
|
||||
Allocate a single row shm array for an symbol-fsp pair if none
|
||||
exists, otherwise load the shm already existing for that token.
|
||||
|
||||
'''
|
||||
uid = tractor.current_actor().uid
|
||||
if not display_name:
|
||||
display_name = field_name
|
||||
|
||||
# TODO: load function here and introspect
|
||||
# return stream type(s)
|
||||
|
||||
# TODO: should `index` be a required internal field?
|
||||
fsp_dtype = np.dtype([('index', int), (field_name, float)])
|
||||
|
||||
key = f'{sym}.fsp.{display_name}.{".".join(uid)}'
|
||||
|
||||
shm, opened = maybe_open_shm_array(
|
||||
key,
|
||||
# TODO: create entry for each time frame
|
||||
dtype=fsp_dtype,
|
||||
readonly=True,
|
||||
)
|
||||
return shm, opened
|
||||
|
||||
|
||||
def has_vlm(ohlcv: ShmArray) -> bool:
|
||||
# make sure that the instrument supports volume history
|
||||
# (sometimes this is not the case for some commodities and
|
||||
# derivatives)
|
||||
volm = ohlcv.array['volume']
|
||||
return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm)))
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_vlm_display(
|
||||
linked: LinkedSplits,
|
||||
ohlcv: ShmArray,
|
||||
|
||||
) -> ChartPlotWidget:
|
||||
|
||||
if not has_vlm(ohlcv):
|
||||
log.warning(f"{linked.symbol.key} does not seem to have volume info")
|
||||
yield
|
||||
return
|
||||
else:
|
||||
|
||||
shm, opened = maybe_mk_fsp_shm(
|
||||
linked.symbol.key,
|
||||
'vlm',
|
||||
readonly=True,
|
||||
)
|
||||
|
||||
async with open_fsp_sidepane(
|
||||
linked, {
|
||||
'vlm': {
|
||||
'params': {
|
||||
'price_func': {
|
||||
'default_value': 'chl3',
|
||||
# tell target ``Edit`` widget to not allow
|
||||
# edits for now.
|
||||
'widget_kwargs': {'readonly': True},
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
) as sidepane:
|
||||
|
||||
# built-in $vlm
|
||||
shm = ohlcv
|
||||
chart = linked.add_plot(
|
||||
name='volume',
|
||||
array=shm.array,
|
||||
|
||||
array_key='volume',
|
||||
sidepane=sidepane,
|
||||
|
||||
# curve by default
|
||||
ohlc=False,
|
||||
|
||||
# Draw vertical bars from zero.
|
||||
# we do this internally ourselves since
|
||||
# the curve item internals are pretty convoluted.
|
||||
style='step',
|
||||
)
|
||||
|
||||
# XXX: ONLY for sub-chart fsps, overlays have their
|
||||
# data looked up from the chart's internal array set.
|
||||
# TODO: we must get a data view api going STAT!!
|
||||
chart._shm = shm
|
||||
|
||||
# should **not** be the same sub-chart widget
|
||||
assert chart.name != linked.chart.name
|
||||
|
||||
# sticky only on sub-charts atm
|
||||
last_val_sticky = chart._ysticks[chart.name]
|
||||
|
||||
# read from last calculated value
|
||||
value = shm.array['volume'][-1]
|
||||
|
||||
last_val_sticky.update_from_data(-1, value)
|
||||
|
||||
chart.update_curve_from_array(
|
||||
'volume',
|
||||
shm.array,
|
||||
)
|
||||
|
||||
# size view to data once at outset
|
||||
chart._set_yrange()
|
||||
|
||||
yield chart
|
||||
|
||||
|
||||
def update_fsp_chart(
|
||||
chart: ChartPlotWidget,
|
||||
shm: ShmArray,
|
||||
graphics_name: str,
|
||||
array_key: Optional[str],
|
||||
|
||||
) -> None:
|
||||
|
||||
array = shm.array
|
||||
last_row = try_read(array)
|
||||
|
||||
# guard against unreadable case
|
||||
if not last_row:
|
||||
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
|
||||
return
|
||||
|
||||
# update graphics
|
||||
# NOTE: this does a length check internally which allows it
|
||||
# staying above the last row check below..
|
||||
chart.update_curve_from_array(
|
||||
graphics_name,
|
||||
array,
|
||||
array_key=array_key or graphics_name,
|
||||
)
|
||||
chart._set_yrange()
|
||||
|
||||
# XXX: re: ``array_key``: fsp func names must be unique meaning we
|
||||
# can't have duplicates of the underlying data even if multiple
|
||||
# sub-charts reference it under different 'named charts'.
|
||||
|
||||
# read from last calculated value and update any label
|
||||
last_val_sticky = chart._ysticks.get(graphics_name)
|
||||
if last_val_sticky:
|
||||
# array = shm.array[array_key]
|
||||
# if len(array):
|
||||
# value = array[-1]
|
||||
last = last_row[array_key]
|
||||
last_val_sticky.update_from_data(-1, last)
|
||||
|
||||
|
||||
@acm
|
||||
async def open_fsp_sidepane(
|
||||
linked: LinkedSplits,
|
||||
conf: dict[str, dict[str, str]],
|
||||
|
||||
) -> FieldsForm:
|
||||
|
||||
schema = {}
|
||||
|
||||
assert len(conf) == 1 # for now
|
||||
|
||||
# add (single) selection widget
|
||||
for display_name, config in conf.items():
|
||||
schema[display_name] = {
|
||||
'label': '**fsp**:',
|
||||
'type': 'select',
|
||||
'default_value': [display_name],
|
||||
}
|
||||
|
||||
# add parameters for selection "options"
|
||||
params = config.get('params', {})
|
||||
for name, config in params.items():
|
||||
|
||||
default = config['default_value']
|
||||
kwargs = config.get('widget_kwargs', {})
|
||||
|
||||
# add to ORM schema
|
||||
schema.update({
|
||||
name: {
|
||||
'label': f'**{name}**:',
|
||||
'type': 'edit',
|
||||
'default_value': default,
|
||||
'kwargs': kwargs,
|
||||
},
|
||||
})
|
||||
|
||||
sidepane: FieldsForm = mk_form(
|
||||
parent=linked.godwidget,
|
||||
fields_schema=schema,
|
||||
)
|
||||
|
||||
# https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation
|
||||
FspConfig = create_model(
|
||||
'FspConfig',
|
||||
name=display_name,
|
||||
**params,
|
||||
)
|
||||
sidepane.model = FspConfig()
|
||||
|
||||
# just a logger for now until we get fsp configs up and running.
|
||||
async def settings_change(key: str, value: str) -> bool:
|
||||
print(f'{key}: {value}')
|
||||
return True
|
||||
|
||||
# TODO:
|
||||
async with (
|
||||
open_form_input_handling(
|
||||
sidepane,
|
||||
focus_next=linked.godwidget,
|
||||
on_value_change=settings_change,
|
||||
)
|
||||
):
|
||||
yield sidepane
|
Loading…
Reference in New Issue