diff --git a/piker/ui/_display.py b/piker/ui/_display.py index e3c6f342..0a4a9405 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -21,22 +21,23 @@ this module ties together quote and computational (fsp) streams with graphics update methods via our custom ``pyqtgraph`` charting api. ''' -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from functools import partial +from itertools import cycle import time from types import ModuleType -from typing import Optional +from typing import Optional, AsyncGenerator import numpy as np from pydantic import create_model +import pyqtgraph as pg import tractor import trio from .. import brokers -from ..data.feed import ( - open_feed, - # Feed, -) +from .._cacheables import maybe_open_ctx +from ..trionics import async_enter_all +from ..data.feed import open_feed from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -70,15 +71,6 @@ def update_fsp_chart( array = shm.array - # update graphics - # NOTE: this does a length check internally which allows it - # staying above the last row check below.. - chart.update_curve_from_array( - graphics_name, - array, - array_key=array_key or graphics_name, - ) - try: last_row = array[-1] except IndexError: @@ -95,22 +87,15 @@ def update_fsp_chart( log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') return - # TODO: provide a read sync mechanism to avoid this polling. the - # underlying issue is that a backfill (aka prepend) and subsequent - # shm array first/last index update could result in an empty array - # read here since the stream is never torn down on the re-compute - # steps. - # read_tries = 2 - # while read_tries > 0: - # try: - # # read last - # array = shm.array - # value = array[-1][array_key] - # break - - # except IndexError: - # read_tries -= 1 - # continue + # update graphics + # NOTE: this does a length check internally which allows it + # staying above the last row check below.. + chart.update_curve_from_array( + graphics_name, + array, + array_key=array_key or graphics_name, + ) + chart._set_yrange() # XXX: re: ``array_key``: fsp func names must be unique meaning we # can't have duplicates of the underlying data even if multiple @@ -126,24 +111,12 @@ def update_fsp_chart( last_val_sticky.update_from_data(-1, last) -# _clses = { -# 'clears': {'trade', 'utrade', 'last'}, -# 'last': {'last'}, -# 'bids': {'bid', 'bsize'}, -# 'asks': {'ask', 'asize'}, -# } - -# XXX: idea for frame type data structure we could use on the -# wire instead of doing it here? -# frames = { -# 'index': ['type_a', 'type_c', 'type_n', 'type_n', 'type_c'], - -# 'type_a': [tick0, tick1, tick2, .., tickn], -# 'type_b': [tick0, tick1, tick2, .., tickn], -# 'type_c': [tick0, tick1, tick2, .., tickn], -# ... -# 'type_n': [tick0, tick1, tick2, .., tickn], -# } +# a working tick-type-classes template +_tick_groups = { + 'clears': {'trade', 'utrade', 'last'}, + 'bids': {'bid', 'bsize'}, + 'asks': {'ask', 'asize'}, +} def chart_maxmin( @@ -263,8 +236,12 @@ async def update_chart_from_quotes( now = time.time() quote_period = now - last_quote - if quote_period <= 1/_quote_throttle_rate: - log.warning(f'TOO FAST: {1/quote_period}') + quote_rate = round(1/quote_period, 1) if quote_period else float('inf') + if ( + quote_period <= 1/_quote_throttle_rate + and quote_rate > _quote_throttle_rate + 2 + ): + log.warning(f'High quote rate {symbol.key}: {quote_rate}') last_quote = now # chart isn't active/shown so skip render cycle and pause feed(s) @@ -291,15 +268,7 @@ async def update_chart_from_quotes( array = ohlcv.array if vlm_chart: - # print(f"volume: {end['volume']}") vlm_chart.update_curve_from_array('volume', array) - - # built-in tina $vlm FSP using chl3 typical price for ohlc step - # last = array[-1] - # chl3 = (last['close'] + last['high'] + last['low']) / 3 - # v = last['volume'] - # dv = last['volume'] * chl3 - vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) if ( @@ -346,7 +315,7 @@ async def update_chart_from_quotes( # TODO: eventually we want to separate out the utrade (aka # dark vlm prices) here and show them as an additional # graphic. - clear_types = {'trade', 'utrade', 'last'} + clear_types = _tick_groups['clears'] # XXX: if we wanted to iterate in "latest" (i.e. most # current) tick first order as an optimization where we only @@ -415,11 +384,11 @@ async def update_chart_from_quotes( # label.size -= size # elif ticktype in ('ask', 'asize'): - elif typ in ('ask', 'asize'): + elif typ in _tick_groups['asks']: l1.ask_label.update_fields({'level': price, 'size': size}) # elif ticktype in ('bid', 'bsize'): - elif typ in ('bid', 'bsize'): + elif typ in _tick_groups['bids']: l1.bid_label.update_fields({'level': price, 'size': size}) # check for y-range re-size @@ -492,7 +461,7 @@ def maybe_mk_fsp_shm( return shm, opened -@asynccontextmanager +@acm async def open_fsp_sidepane( linked: LinkedSplits, @@ -558,8 +527,66 @@ async def open_fsp_sidepane( yield sidepane -async def open_fspd_cluster( +@acm +async def open_fsp_cluster( + workers: int = 2 +) -> AsyncGenerator[int, dict[str, tractor.Portal]]: + + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + portals: dict[str, tractor.Portal] = {} + uid = tractor.current_actor().uid + + async with tractor.open_nursery() as an: + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" + async with trio.open_nursery() as n: + for index in range(workers): + + async def start(i) -> None: + key = f'fsp_{i}.' + '_'.join(uid) + portals[key] = await an.start_actor( + enable_modules=['piker.fsp._engine'], + name=key, + ) + + n.start_soon(start, index) + + assert len(portals) == workers + profiler('started fsp cluster') + yield portals + + +@acm +async def maybe_open_fsp_cluster( + workers: int = 2, + **kwargs, +) -> AsyncGenerator[int, dict[str, tractor.Portal]]: + + uid = tractor.current_actor().uid + async with maybe_open_ctx( + key=uid, # for now make a cluster per client? + mngr=open_fsp_cluster( + workers, + # loglevel=loglevel, + **kwargs, + ), + ) as (cache_hit, cluster_map): + if cache_hit: + log.info('re-using existing fsp cluster') + yield cluster_map + else: + yield cluster_map + + +async def start_fsp_displays( + + cluster_map: dict[str, tractor.Portal], linkedsplits: LinkedSplits, fsps: dict[str, str], sym: str, @@ -580,19 +607,22 @@ async def open_fspd_cluster( ''' linkedsplits.focus() - # spawns sub-processes which execute cpu bound fsp work - # which is streamed back to this parent. - async with ( - tractor.open_nursery() as n, - trio.open_nursery() as ln, - ): + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + + async with trio.open_nursery() as n: # Currently we spawn an actor per fsp chain but # likely we'll want to pool them eventually to # scale horizonatlly once cores are used up. - for display_name, conf in fsps.items(): + for (display_name, conf), (name, portal) in zip( + fsps.items(), + # rr to cluster for now.. + cycle(cluster_map.items()), + ): func_name = conf['func_name'] - shm, opened = maybe_mk_fsp_shm( sym, field_name=func_name, @@ -600,18 +630,17 @@ async def open_fspd_cluster( readonly=True, ) + profiler(f'created shm for fsp actor: {display_name}') + # XXX: fsp may have been opened by a duplicate chart. # Error for now until we figure out how to wrap fsps as # "feeds". assert opened, f"A chart for {key} likely # already exists?" - portal = await n.start_actor( - enable_modules=['piker.fsp._engine'], - name='fsp.' + display_name, - ) + profiler(f'attached to fsp portal: {display_name}') # init async - ln.start_soon( + n.start_soon( partial( update_chart_from_fsp, @@ -627,6 +656,7 @@ async def open_fspd_cluster( is_overlay=conf.get('overlay', False), group_status_key=group_status_key, loglevel=loglevel, + profiler=profiler, ) ) @@ -650,6 +680,7 @@ async def update_chart_from_fsp( group_status_key: str, loglevel: str, + profiler: pg.debug.Profiler, ) -> None: '''FSP stream chart update loop. @@ -658,6 +689,9 @@ async def update_chart_from_fsp( config map. ''' + + profiler(f'started chart task for fsp: {func_name}') + done = linkedsplits.window().status_bar.open_status( f'loading fsp, {display_name}..', group_key=group_status_key, @@ -676,12 +710,15 @@ async def update_chart_from_fsp( symbol=sym, func_name=func_name, loglevel=loglevel, + zero_on_step=conf.get('zero_on_step', False), ) as (ctx, last_index), ctx.open_stream() as stream, open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, ): + profiler(f'fsp:{func_name} attached to fsp ctx-stream') + if is_overlay: chart = linkedsplits.chart chart.draw_curve( @@ -719,6 +756,8 @@ async def update_chart_from_fsp( array_key = func_name + profiler(f'fsp:{func_name} chart created') + # first UI update, usually from shm pushed history update_fsp_chart( chart, @@ -753,6 +792,9 @@ async def update_chart_from_fsp( done() chart.linked.resize_sidepanes() + profiler(f'fsp:{func_name} starting update loop') + profiler.finish() + # update chart graphics i = 0 last = time.time() @@ -853,7 +895,7 @@ def has_vlm(ohlcv: ShmArray) -> bool: return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) -@asynccontextmanager +@acm async def maybe_open_vlm_display( linked: LinkedSplits, @@ -875,12 +917,12 @@ async def maybe_open_vlm_display( async with open_fsp_sidepane( linked, { - 'volume': { + '$_vlm': { 'params': { 'price_func': { - 'default_value': 'ohl3', + 'default_value': 'chl3', # tell target ``Edit`` widget to not allow # edits for now. 'widget_kwargs': {'readonly': True}, @@ -979,17 +1021,19 @@ async def display_symbol_data( # group_key=loading_sym_key, # ) - async with( + async with async_enter_all( open_feed( provider, [sym], loglevel=loglevel, - # 60 FPS to limit context switches + # limit to at least display's FPS + # avoiding needless Qt-in-guest-mode context switches tick_throttle=_quote_throttle_rate, + ), + maybe_open_fsp_cluster(), - ) as feed, - ): + ) as (feed, cluster_map): ohlcv: ShmArray = feed.shm bars = ohlcv.array @@ -1042,25 +1086,36 @@ async def display_symbol_data( # TODO: eventually we'll support some kind of n-compose syntax fsp_conf = { - 'rsi': { + # 'rsi': { + # 'func_name': 'rsi', # literal python func ref lookup name - # literal python func ref lookup name - 'func_name': 'rsi', + # # map of parameters to place on the fsp sidepane widget + # # which should map to dynamic inputs available to the + # # fsp function at runtime. + # 'params': { + # 'period': { + # 'default_value': 14, + # 'widget_kwargs': {'readonly': True}, + # }, + # }, - # map of parameters to place on the fsp sidepane widget - # which should map to dynamic inputs available to the - # fsp function at runtime. + # # ``ChartPlotWidget`` options passthrough + # 'chart_kwargs': { + # 'static_yrange': (0, 100), + # }, + # }, + 'dolla_vlm': { + 'func_name': 'dolla_vlm', + 'zero_on_step': True, 'params': { - 'period': { - 'default_value': 14, + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. 'widget_kwargs': {'readonly': True}, }, }, - - # ``ChartPlotWidget`` options passthrough - 'chart_kwargs': { - 'static_yrange': (0, 100), - }, + 'chart_kwargs': {'style': 'step'} }, } @@ -1072,11 +1127,11 @@ async def display_symbol_data( # add VWAP to fsp config for downstream loading fsp_conf.update({ - 'vwap': { - 'func_name': 'vwap', - 'overlay': True, - 'anchor': 'session', - }, + # 'vwap': { + # 'func_name': 'vwap', + # 'overlay': True, + # 'anchor': 'session', + # }, }) # NOTE: we must immediately tell Qt to show the OHLC chart @@ -1086,13 +1141,15 @@ async def display_symbol_data( linkedsplits.focus() await trio.sleep(0) + vlm_chart = None async with ( trio.open_nursery() as ln, maybe_open_vlm_display(linkedsplits, ohlcv) as vlm_chart, ): # load initial fsp chain (otherwise known as "indicators") ln.start_soon( - open_fspd_cluster, + start_fsp_displays, + cluster_map, linkedsplits, fsp_conf, sym,