More prep for FSP feeds

The major change is moving the fsp "daemon" (more like wanna-be fspd)
endpoint to use the newer `tractor.Portal.open_context()` and
bi-directional streaming api.

There's a few other things in here too:
- make a helper for allocating single colume fsp shm arrays
- rename some some fsp related functions to be more explicit on their
  purposes
windows_testing_volume
Tyler Goodlet 2021-09-24 18:17:15 -04:00
parent d0bad2e98e
commit 6751840568
1 changed files with 114 additions and 51 deletions

View File

@ -53,6 +53,7 @@ from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
# TODO: load these from a config.toml!
_clear_throttle_rate: int = 58 # Hz _clear_throttle_rate: int = 58 # Hz
_book_throttle_rate: int = 16 # Hz _book_throttle_rate: int = 16 # Hz
@ -184,6 +185,13 @@ async def update_chart_from_quotes(
if vlm_chart: if vlm_chart:
# print(f"volume: {end['volume']}") # print(f"volume: {end['volume']}")
vlm_chart.update_curve_from_array('volume', array) vlm_chart.update_curve_from_array('volume', array)
# built-in tina $vlm FSP using chl3 typical price for ohlc step
# last = array[-1]
# chl3 = (last['close'] + last['high'] + last['low']) / 3
# v = last['volume']
# dv = last['volume'] * chl3
vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
if ( if (
@ -302,7 +310,38 @@ async def update_chart_from_quotes(
last_mx, last_mn = mx, mn last_mx, last_mn = mx, mn
async def fan_out_spawn_fsp_daemons( def maybe_mk_fsp_shm(
sym: str,
field_name: str,
display_name: Optional[str] = None,
readonly: bool = True,
) -> (ShmArray, bool):
'''Allocate a single row shm array for an symbol-fsp pair.
'''
uid = tractor.current_actor().uid
if not display_name:
display_name = field_name
# TODO: load function here and introspect
# return stream type(s)
# TODO: should `index` be a required internal field?
fsp_dtype = np.dtype([('index', int), (field_name, float)])
key = f'{sym}.fsp.{display_name}.{".".join(uid)}'
shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
)
return shm, opened
async def open_fspd_cluster(
linkedsplits: LinkedSplits, linkedsplits: LinkedSplits,
fsps: dict[str, str], fsps: dict[str, str],
@ -321,9 +360,8 @@ async def fan_out_spawn_fsp_daemons(
''' '''
linkedsplits.focus() linkedsplits.focus()
uid = tractor.current_actor().uid # spawns sub-processes which execute cpu bound fsp work
# which is streamed back to this parent.
# spawns sub-processes which execute cpu bound FSP code
async with ( async with (
tractor.open_nursery() as n, tractor.open_nursery() as n,
trio.open_nursery() as ln, trio.open_nursery() as ln,
@ -334,21 +372,12 @@ async def fan_out_spawn_fsp_daemons(
# scale horizonatlly once cores are used up. # scale horizonatlly once cores are used up.
for display_name, conf in fsps.items(): for display_name, conf in fsps.items():
fsp_func_name = conf['fsp_func_name'] func_name = conf['func_name']
# TODO: load function here and introspect shm, opened = maybe_mk_fsp_shm(
# return stream type(s) sym,
field_name=func_name,
# TODO: should `index` be a required internal field? display_name=display_name,
fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
key = f'{sym}.fsp.{display_name}.{".".join(uid)}'
# this is all sync currently
shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True, readonly=True,
) )
@ -366,13 +395,13 @@ async def fan_out_spawn_fsp_daemons(
# init async # init async
ln.start_soon( ln.start_soon(
run_fsp, update_chart_from_fsp,
portal, portal,
linkedsplits, linkedsplits,
brokermod, brokermod,
sym, sym,
src_shm, src_shm,
fsp_func_name, func_name,
display_name, display_name,
conf, conf,
group_status_key, group_status_key,
@ -383,7 +412,7 @@ async def fan_out_spawn_fsp_daemons(
@asynccontextmanager @asynccontextmanager
async def open_sidepane( async def open_fsp_sidepane(
linked: LinkedSplits, linked: LinkedSplits,
conf: dict[str, dict[str, str]], conf: dict[str, dict[str, str]],
@ -403,8 +432,11 @@ async def open_sidepane(
} }
# add parameters for selection "options" # add parameters for selection "options"
defaults = config.get('params', {}) params = config.get('params', {})
for name, default in defaults.items(): for name, config in params.items():
default = config['default_value']
kwargs = config.get('widget_kwargs', {})
# add to ORM schema # add to ORM schema
schema.update({ schema.update({
@ -412,6 +444,7 @@ async def open_sidepane(
'label': f'**{name}**:', 'label': f'**{name}**:',
'type': 'edit', 'type': 'edit',
'default_value': default, 'default_value': default,
'kwargs': kwargs,
}, },
}) })
@ -424,7 +457,7 @@ async def open_sidepane(
FspConfig = create_model( FspConfig = create_model(
'FspConfig', 'FspConfig',
name=display_name, name=display_name,
**defaults, **params,
) )
sidepane.model = FspConfig() sidepane.model = FspConfig()
@ -444,14 +477,15 @@ async def open_sidepane(
yield sidepane yield sidepane
async def run_fsp( async def update_chart_from_fsp(
portal: tractor.Portal,
portal: tractor._portal.Portal,
linkedsplits: LinkedSplits, linkedsplits: LinkedSplits,
brokermod: ModuleType, brokermod: ModuleType,
sym: str, sym: str,
src_shm: ShmArray, src_shm: ShmArray,
fsp_func_name: str, func_name: str,
display_name: str, display_name: str,
conf: dict[str, dict], conf: dict[str, dict],
group_status_key: str, group_status_key: str,
@ -470,7 +504,7 @@ async def run_fsp(
) )
async with ( async with (
portal.open_stream_from( portal.open_context(
# chaining entrypoint # chaining entrypoint
fsp.cascade, fsp.cascade,
@ -480,21 +514,17 @@ async def run_fsp(
src_shm_token=src_shm.token, src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token, dst_shm_token=conf['shm'].token,
symbol=sym, symbol=sym,
fsp_func_name=fsp_func_name, func_name=func_name,
loglevel=loglevel, loglevel=loglevel,
) as stream, ) as (ctx, last_index),
ctx.open_stream() as stream,
open_sidepane( open_fsp_sidepane(
linkedsplits, linkedsplits,
{display_name: conf}, {display_name: conf},
) as sidepane, ) as sidepane,
): ):
# receive last index for processed historical
# data-array as first msg
_ = await stream.receive()
shm = conf['shm'] shm = conf['shm']
if conf.get('overlay'): if conf.get('overlay'):
@ -513,7 +543,7 @@ async def run_fsp(
name=display_name, name=display_name,
array=shm.array, array=shm.array,
array_key=conf['fsp_func_name'], array_key=conf['func_name'],
sidepane=sidepane, sidepane=sidepane,
# curve by default # curve by default
@ -541,7 +571,7 @@ async def run_fsp(
# XXX: fsp func names must be unique meaning we don't have # XXX: fsp func names must be unique meaning we don't have
# duplicates of the underlying data even if multiple # duplicates of the underlying data even if multiple
# sub-charts reference it under different 'named charts'. # sub-charts reference it under different 'named charts'.
value = array[fsp_func_name][-1] value = array[func_name][-1]
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)
@ -552,7 +582,7 @@ async def run_fsp(
chart.update_curve_from_array( chart.update_curve_from_array(
display_name, display_name,
shm.array, shm.array,
array_key=fsp_func_name array_key=func_name
) )
chart.linked.resize_sidepanes() chart.linked.resize_sidepanes()
@ -563,11 +593,11 @@ async def run_fsp(
# generic fills between curve types while ``PlotCurveItem`` has # generic fills between curve types while ``PlotCurveItem`` has
# logic inside ``.paint()`` for ``self.opts['fillLevel']`` which # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
# might be the best solution? # might be the best solution?
# graphics = chart.update_from_array(chart.name, array[fsp_func_name]) # graphics = chart.update_from_array(chart.name, array[func_name])
# graphics.curve.setBrush(50, 50, 200, 100) # graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50) # graphics.curve.setFillLevel(50)
if fsp_func_name == 'rsi': if func_name == 'rsi':
from ._lines import level_line from ._lines import level_line
# add moveable over-[sold/bought] lines # add moveable over-[sold/bought] lines
# and labels only for the 70/30 lines # and labels only for the 70/30 lines
@ -611,7 +641,7 @@ async def run_fsp(
try: try:
# read last # read last
array = shm.array array = shm.array
value = array[-1][fsp_func_name] value = array[-1][func_name]
break break
except IndexError: except IndexError:
@ -625,7 +655,7 @@ async def run_fsp(
chart.update_curve_from_array( chart.update_curve_from_array(
display_name, display_name,
array, array,
array_key=fsp_func_name, array_key=func_name,
) )
# set time of last graphics update # set time of last graphics update
@ -711,12 +741,26 @@ async def maybe_open_vlm_display(
yield yield
return return
else: else:
async with open_sidepane(
shm, opened = maybe_mk_fsp_shm(
linked.symbol.key,
'$_vlm',
readonly=True,
)
async with open_fsp_sidepane(
linked, { linked, {
'volume': { 'volume': {
'params': { 'params': {
'price_func': 'ohl3'
} 'price_func': {
'default_value': 'ohl3',
# tell target ``Edit`` widget to not allow
# edits for now.
'widget_kwargs': {'readonly': True},
},
},
} }
}, },
) as sidepane: ) as sidepane:
@ -872,20 +916,39 @@ async def display_symbol_data(
# TODO: eventually we'll support some kind of n-compose syntax # TODO: eventually we'll support some kind of n-compose syntax
fsp_conf = { fsp_conf = {
'rsi': { 'rsi': {
'fsp_func_name': 'rsi',
'params': {'period': 14}, # literal python func ref lookup name
'func_name': 'rsi',
# map of parameters to place on the fsp sidepane widget
# which should map to dynamic inputs available to the
# fsp function at runtime.
'params': {
'period': {
'default_value': 14,
'widget_kwargs': {'readonly': True},
},
},
# ``ChartPlotWidget`` options passthrough
'chart_kwargs': { 'chart_kwargs': {
'static_yrange': (0, 100), 'static_yrange': (0, 100),
}, },
}, },
} }
if has_vlm(ohlcv): if has_vlm(ohlcv): # and provider != 'binance':
# binance is too fast atm for FSPs until we wrap
# the fsp streams as throttled ``Feeds``, see
#
# add VWAP to fsp config for downstream loading # add VWAP to fsp config for downstream loading
fsp_conf.update({ fsp_conf.update({
'vwap': { 'vwap': {
'fsp_func_name': 'vwap', 'func_name': 'vwap',
'overlay': True, 'overlay': True,
'anchor': 'session', 'anchor': 'session',
}, },
@ -904,7 +967,7 @@ async def display_symbol_data(
): ):
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
ln.start_soon( ln.start_soon(
fan_out_spawn_fsp_daemons, open_fspd_cluster,
linkedsplits, linkedsplits,
fsp_conf, fsp_conf,
sym, sym,