Spawn and cache an fsp cluster ahead of time

Use a fixed worker count and don't respawn for every chart, instead
opting for a round-robin to tasks in a cluster and (for now) hoping for
the best in terms of trio scheduling, though we should obviously route
via symbol-locality next. This is currently a boon for chart spawning
startup times since actor creation is done AOT.

Additionally,
- use `zero_on_step` for dollar volume
- drop rsi on startup (again)
- add dollar volume (via fsp) along side unit volume
- litter more profiling to fsp chart startup sequence
- pre-define tick type classes for update loop
fspd_cluster
Tyler Goodlet 2021-10-01 16:47:17 -04:00
parent 2eef6c76d0
commit 224c01e43e
1 changed files with 160 additions and 103 deletions

View File

@ -21,22 +21,23 @@ this module ties together quote and computational (fsp) streams with
graphics update methods via our custom ``pyqtgraph`` charting api. graphics update methods via our custom ``pyqtgraph`` charting api.
''' '''
from contextlib import asynccontextmanager from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from itertools import cycle
import time import time
from types import ModuleType from types import ModuleType
from typing import Optional from typing import Optional, AsyncGenerator
import numpy as np import numpy as np
from pydantic import create_model from pydantic import create_model
import pyqtgraph as pg
import tractor import tractor
import trio import trio
from .. import brokers from .. import brokers
from ..data.feed import ( from .._cacheables import maybe_open_ctx
open_feed, from ..trionics import async_enter_all
# Feed, from ..data.feed import open_feed
)
from ._chart import ( from ._chart import (
ChartPlotWidget, ChartPlotWidget,
LinkedSplits, LinkedSplits,
@ -70,15 +71,6 @@ def update_fsp_chart(
array = shm.array array = shm.array
# update graphics
# NOTE: this does a length check internally which allows it
# staying above the last row check below..
chart.update_curve_from_array(
graphics_name,
array,
array_key=array_key or graphics_name,
)
try: try:
last_row = array[-1] last_row = array[-1]
except IndexError: except IndexError:
@ -95,22 +87,15 @@ def update_fsp_chart(
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
return return
# TODO: provide a read sync mechanism to avoid this polling. the # update graphics
# underlying issue is that a backfill (aka prepend) and subsequent # NOTE: this does a length check internally which allows it
# shm array first/last index update could result in an empty array # staying above the last row check below..
# read here since the stream is never torn down on the re-compute chart.update_curve_from_array(
# steps. graphics_name,
# read_tries = 2 array,
# while read_tries > 0: array_key=array_key or graphics_name,
# try: )
# # read last chart._set_yrange()
# array = shm.array
# value = array[-1][array_key]
# break
# except IndexError:
# read_tries -= 1
# continue
# XXX: re: ``array_key``: fsp func names must be unique meaning we # XXX: re: ``array_key``: fsp func names must be unique meaning we
# can't have duplicates of the underlying data even if multiple # can't have duplicates of the underlying data even if multiple
@ -126,24 +111,12 @@ def update_fsp_chart(
last_val_sticky.update_from_data(-1, last) last_val_sticky.update_from_data(-1, last)
# _clses = { # a working tick-type-classes template
# 'clears': {'trade', 'utrade', 'last'}, _tick_groups = {
# 'last': {'last'}, 'clears': {'trade', 'utrade', 'last'},
# 'bids': {'bid', 'bsize'}, 'bids': {'bid', 'bsize'},
# 'asks': {'ask', 'asize'}, 'asks': {'ask', 'asize'},
# } }
# XXX: idea for frame type data structure we could use on the
# wire instead of doing it here?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n', 'type_c'],
# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }
def chart_maxmin( def chart_maxmin(
@ -263,8 +236,12 @@ async def update_chart_from_quotes(
now = time.time() now = time.time()
quote_period = now - last_quote quote_period = now - last_quote
if quote_period <= 1/_quote_throttle_rate: quote_rate = round(1/quote_period, 1) if quote_period else float('inf')
log.warning(f'TOO FAST: {1/quote_period}') if (
quote_period <= 1/_quote_throttle_rate
and quote_rate > _quote_throttle_rate + 2
):
log.warning(f'High quote rate {symbol.key}: {quote_rate}')
last_quote = now last_quote = now
# chart isn't active/shown so skip render cycle and pause feed(s) # chart isn't active/shown so skip render cycle and pause feed(s)
@ -291,15 +268,7 @@ async def update_chart_from_quotes(
array = ohlcv.array array = ohlcv.array
if vlm_chart: if vlm_chart:
# print(f"volume: {end['volume']}")
vlm_chart.update_curve_from_array('volume', array) vlm_chart.update_curve_from_array('volume', array)
# built-in tina $vlm FSP using chl3 typical price for ohlc step
# last = array[-1]
# chl3 = (last['close'] + last['high'] + last['low']) / 3
# v = last['volume']
# dv = last['volume'] * chl3
vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
if ( if (
@ -346,7 +315,7 @@ async def update_chart_from_quotes(
# TODO: eventually we want to separate out the utrade (aka # TODO: eventually we want to separate out the utrade (aka
# dark vlm prices) here and show them as an additional # dark vlm prices) here and show them as an additional
# graphic. # graphic.
clear_types = {'trade', 'utrade', 'last'} clear_types = _tick_groups['clears']
# XXX: if we wanted to iterate in "latest" (i.e. most # XXX: if we wanted to iterate in "latest" (i.e. most
# current) tick first order as an optimization where we only # current) tick first order as an optimization where we only
@ -415,11 +384,11 @@ async def update_chart_from_quotes(
# label.size -= size # label.size -= size
# elif ticktype in ('ask', 'asize'): # elif ticktype in ('ask', 'asize'):
elif typ in ('ask', 'asize'): elif typ in _tick_groups['asks']:
l1.ask_label.update_fields({'level': price, 'size': size}) l1.ask_label.update_fields({'level': price, 'size': size})
# elif ticktype in ('bid', 'bsize'): # elif ticktype in ('bid', 'bsize'):
elif typ in ('bid', 'bsize'): elif typ in _tick_groups['bids']:
l1.bid_label.update_fields({'level': price, 'size': size}) l1.bid_label.update_fields({'level': price, 'size': size})
# check for y-range re-size # check for y-range re-size
@ -492,7 +461,7 @@ def maybe_mk_fsp_shm(
return shm, opened return shm, opened
@asynccontextmanager @acm
async def open_fsp_sidepane( async def open_fsp_sidepane(
linked: LinkedSplits, linked: LinkedSplits,
@ -558,8 +527,66 @@ async def open_fsp_sidepane(
yield sidepane yield sidepane
async def open_fspd_cluster( @acm
async def open_fsp_cluster(
workers: int = 2
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
profiler = pg.debug.Profiler(
delayed=False,
disabled=False
)
portals: dict[str, tractor.Portal] = {}
uid = tractor.current_actor().uid
async with tractor.open_nursery() as an:
# XXX: fsp may have been opened by a duplicate chart.
# Error for now until we figure out how to wrap fsps as
# "feeds". assert opened, f"A chart for {key} likely
# already exists?"
async with trio.open_nursery() as n:
for index in range(workers):
async def start(i) -> None:
key = f'fsp_{i}.' + '_'.join(uid)
portals[key] = await an.start_actor(
enable_modules=['piker.fsp._engine'],
name=key,
)
n.start_soon(start, index)
assert len(portals) == workers
profiler('started fsp cluster')
yield portals
@acm
async def maybe_open_fsp_cluster(
workers: int = 2,
**kwargs,
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
uid = tractor.current_actor().uid
async with maybe_open_ctx(
key=uid, # for now make a cluster per client?
mngr=open_fsp_cluster(
workers,
# loglevel=loglevel,
**kwargs,
),
) as (cache_hit, cluster_map):
if cache_hit:
log.info('re-using existing fsp cluster')
yield cluster_map
else:
yield cluster_map
async def start_fsp_displays(
cluster_map: dict[str, tractor.Portal],
linkedsplits: LinkedSplits, linkedsplits: LinkedSplits,
fsps: dict[str, str], fsps: dict[str, str],
sym: str, sym: str,
@ -580,19 +607,22 @@ async def open_fspd_cluster(
''' '''
linkedsplits.focus() linkedsplits.focus()
# spawns sub-processes which execute cpu bound fsp work profiler = pg.debug.Profiler(
# which is streamed back to this parent. delayed=False,
async with ( disabled=False
tractor.open_nursery() as n, )
trio.open_nursery() as ln,
): async with trio.open_nursery() as n:
# Currently we spawn an actor per fsp chain but # Currently we spawn an actor per fsp chain but
# likely we'll want to pool them eventually to # likely we'll want to pool them eventually to
# scale horizonatlly once cores are used up. # scale horizonatlly once cores are used up.
for display_name, conf in fsps.items(): for (display_name, conf), (name, portal) in zip(
fsps.items(),
# rr to cluster for now..
cycle(cluster_map.items()),
):
func_name = conf['func_name'] func_name = conf['func_name']
shm, opened = maybe_mk_fsp_shm( shm, opened = maybe_mk_fsp_shm(
sym, sym,
field_name=func_name, field_name=func_name,
@ -600,18 +630,17 @@ async def open_fspd_cluster(
readonly=True, readonly=True,
) )
profiler(f'created shm for fsp actor: {display_name}')
# XXX: fsp may have been opened by a duplicate chart. # XXX: fsp may have been opened by a duplicate chart.
# Error for now until we figure out how to wrap fsps as # Error for now until we figure out how to wrap fsps as
# "feeds". assert opened, f"A chart for {key} likely # "feeds". assert opened, f"A chart for {key} likely
# already exists?" # already exists?"
portal = await n.start_actor( profiler(f'attached to fsp portal: {display_name}')
enable_modules=['piker.fsp._engine'],
name='fsp.' + display_name,
)
# init async # init async
ln.start_soon( n.start_soon(
partial( partial(
update_chart_from_fsp, update_chart_from_fsp,
@ -627,6 +656,7 @@ async def open_fspd_cluster(
is_overlay=conf.get('overlay', False), is_overlay=conf.get('overlay', False),
group_status_key=group_status_key, group_status_key=group_status_key,
loglevel=loglevel, loglevel=loglevel,
profiler=profiler,
) )
) )
@ -650,6 +680,7 @@ async def update_chart_from_fsp(
group_status_key: str, group_status_key: str,
loglevel: str, loglevel: str,
profiler: pg.debug.Profiler,
) -> None: ) -> None:
'''FSP stream chart update loop. '''FSP stream chart update loop.
@ -658,6 +689,9 @@ async def update_chart_from_fsp(
config map. config map.
''' '''
profiler(f'started chart task for fsp: {func_name}')
done = linkedsplits.window().status_bar.open_status( done = linkedsplits.window().status_bar.open_status(
f'loading fsp, {display_name}..', f'loading fsp, {display_name}..',
group_key=group_status_key, group_key=group_status_key,
@ -676,12 +710,15 @@ async def update_chart_from_fsp(
symbol=sym, symbol=sym,
func_name=func_name, func_name=func_name,
loglevel=loglevel, loglevel=loglevel,
zero_on_step=conf.get('zero_on_step', False),
) as (ctx, last_index), ) as (ctx, last_index),
ctx.open_stream() as stream, ctx.open_stream() as stream,
open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane,
): ):
profiler(f'fsp:{func_name} attached to fsp ctx-stream')
if is_overlay: if is_overlay:
chart = linkedsplits.chart chart = linkedsplits.chart
chart.draw_curve( chart.draw_curve(
@ -719,6 +756,8 @@ async def update_chart_from_fsp(
array_key = func_name array_key = func_name
profiler(f'fsp:{func_name} chart created')
# first UI update, usually from shm pushed history # first UI update, usually from shm pushed history
update_fsp_chart( update_fsp_chart(
chart, chart,
@ -753,6 +792,9 @@ async def update_chart_from_fsp(
done() done()
chart.linked.resize_sidepanes() chart.linked.resize_sidepanes()
profiler(f'fsp:{func_name} starting update loop')
profiler.finish()
# update chart graphics # update chart graphics
i = 0 i = 0
last = time.time() last = time.time()
@ -853,7 +895,7 @@ def has_vlm(ohlcv: ShmArray) -> bool:
return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm)))
@asynccontextmanager @acm
async def maybe_open_vlm_display( async def maybe_open_vlm_display(
linked: LinkedSplits, linked: LinkedSplits,
@ -875,12 +917,12 @@ async def maybe_open_vlm_display(
async with open_fsp_sidepane( async with open_fsp_sidepane(
linked, { linked, {
'volume': { '$_vlm': {
'params': { 'params': {
'price_func': { 'price_func': {
'default_value': 'ohl3', 'default_value': 'chl3',
# tell target ``Edit`` widget to not allow # tell target ``Edit`` widget to not allow
# edits for now. # edits for now.
'widget_kwargs': {'readonly': True}, 'widget_kwargs': {'readonly': True},
@ -979,17 +1021,19 @@ async def display_symbol_data(
# group_key=loading_sym_key, # group_key=loading_sym_key,
# ) # )
async with( async with async_enter_all(
open_feed( open_feed(
provider, provider,
[sym], [sym],
loglevel=loglevel, loglevel=loglevel,
# 60 FPS to limit context switches # limit to at least display's FPS
# avoiding needless Qt-in-guest-mode context switches
tick_throttle=_quote_throttle_rate, tick_throttle=_quote_throttle_rate,
),
maybe_open_fsp_cluster(),
) as feed, ) as (feed, cluster_map):
):
ohlcv: ShmArray = feed.shm ohlcv: ShmArray = feed.shm
bars = ohlcv.array bars = ohlcv.array
@ -1042,25 +1086,36 @@ async def display_symbol_data(
# TODO: eventually we'll support some kind of n-compose syntax # TODO: eventually we'll support some kind of n-compose syntax
fsp_conf = { fsp_conf = {
'rsi': { # 'rsi': {
# 'func_name': 'rsi', # literal python func ref lookup name
# literal python func ref lookup name # # map of parameters to place on the fsp sidepane widget
'func_name': 'rsi', # # which should map to dynamic inputs available to the
# # fsp function at runtime.
# 'params': {
# 'period': {
# 'default_value': 14,
# 'widget_kwargs': {'readonly': True},
# },
# },
# map of parameters to place on the fsp sidepane widget # # ``ChartPlotWidget`` options passthrough
# which should map to dynamic inputs available to the # 'chart_kwargs': {
# fsp function at runtime. # 'static_yrange': (0, 100),
# },
# },
'dolla_vlm': {
'func_name': 'dolla_vlm',
'zero_on_step': True,
'params': { 'params': {
'period': { 'price_func': {
'default_value': 14, 'default_value': 'chl3',
# tell target ``Edit`` widget to not allow
# edits for now.
'widget_kwargs': {'readonly': True}, 'widget_kwargs': {'readonly': True},
}, },
}, },
'chart_kwargs': {'style': 'step'}
# ``ChartPlotWidget`` options passthrough
'chart_kwargs': {
'static_yrange': (0, 100),
},
}, },
} }
@ -1072,11 +1127,11 @@ async def display_symbol_data(
# add VWAP to fsp config for downstream loading # add VWAP to fsp config for downstream loading
fsp_conf.update({ fsp_conf.update({
'vwap': { # 'vwap': {
'func_name': 'vwap', # 'func_name': 'vwap',
'overlay': True, # 'overlay': True,
'anchor': 'session', # 'anchor': 'session',
}, # },
}) })
# NOTE: we must immediately tell Qt to show the OHLC chart # NOTE: we must immediately tell Qt to show the OHLC chart
@ -1086,13 +1141,15 @@ async def display_symbol_data(
linkedsplits.focus() linkedsplits.focus()
await trio.sleep(0) await trio.sleep(0)
vlm_chart = None
async with ( async with (
trio.open_nursery() as ln, trio.open_nursery() as ln,
maybe_open_vlm_display(linkedsplits, ohlcv) as vlm_chart, maybe_open_vlm_display(linkedsplits, ohlcv) as vlm_chart,
): ):
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
ln.start_soon( ln.start_soon(
open_fspd_cluster, start_fsp_displays,
cluster_map,
linkedsplits, linkedsplits,
fsp_conf, fsp_conf,
sym, sym,