From 2eef6c76d05af4d8d805055a7721f92e2d59b958 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Oct 2021 10:27:08 -0400 Subject: [PATCH 01/12] Start trionics mod with an `async_enter_all` --- piker/trionics.py | 80 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 piker/trionics.py diff --git a/piker/trionics.py b/piker/trionics.py new file mode 100644 index 00000000..10f6a33d --- /dev/null +++ b/piker/trionics.py @@ -0,0 +1,80 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship of piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +sugarz for trio/tractor conc peeps. + +''' +from typing import AsyncContextManager +from typing import TypeVar +from contextlib import asynccontextmanager as acm + +import trio + + +# A regular invariant generic type +T = TypeVar("T") + + +async def _enter_and_sleep( + + mngr: AsyncContextManager[T], + to_yield: dict[int, T], + all_entered: trio.Event, + # task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + +) -> T: + '''Open the async context manager deliver it's value + to this task's spawner and sleep until cancelled. + + ''' + async with mngr as value: + to_yield[id(mngr)] = value + + if all(to_yield.values()): + all_entered.set() + + # sleep until cancelled + await trio.sleep_forever() + + +@acm +async def async_enter_all( + + *mngrs: list[AsyncContextManager[T]], + +) -> tuple[T]: + + to_yield = {}.fromkeys(id(mngr) for mngr in mngrs) + + all_entered = trio.Event() + + async with trio.open_nursery() as n: + for mngr in mngrs: + n.start_soon( + _enter_and_sleep, + mngr, + to_yield, + all_entered, + ) + + # deliver control once all managers have started up + await all_entered.wait() + yield tuple(to_yield.values()) + + # tear down all sleeper tasks thus triggering individual + # mngr ``__aexit__()``s. + n.cancel_scope.cancel() From 224c01e43e24f04cf065277a22b156ac25d2c3a8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Oct 2021 16:47:17 -0400 Subject: [PATCH 02/12] Spawn and cache an fsp cluster ahead of time Use a fixed worker count and don't respawn for every chart, instead opting for a round-robin to tasks in a cluster and (for now) hoping for the best in terms of trio scheduling, though we should obviously route via symbol-locality next. This is currently a boon for chart spawning startup times since actor creation is done AOT. Additionally, - use `zero_on_step` for dollar volume - drop rsi on startup (again) - add dollar volume (via fsp) along side unit volume - litter more profiling to fsp chart startup sequence - pre-define tick type classes for update loop --- piker/ui/_display.py | 263 ++++++++++++++++++++++++++----------------- 1 file changed, 160 insertions(+), 103 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index e3c6f342..0a4a9405 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -21,22 +21,23 @@ this module ties together quote and computational (fsp) streams with graphics update methods via our custom ``pyqtgraph`` charting api. ''' -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from functools import partial +from itertools import cycle import time from types import ModuleType -from typing import Optional +from typing import Optional, AsyncGenerator import numpy as np from pydantic import create_model +import pyqtgraph as pg import tractor import trio from .. import brokers -from ..data.feed import ( - open_feed, - # Feed, -) +from .._cacheables import maybe_open_ctx +from ..trionics import async_enter_all +from ..data.feed import open_feed from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -70,15 +71,6 @@ def update_fsp_chart( array = shm.array - # update graphics - # NOTE: this does a length check internally which allows it - # staying above the last row check below.. - chart.update_curve_from_array( - graphics_name, - array, - array_key=array_key or graphics_name, - ) - try: last_row = array[-1] except IndexError: @@ -95,22 +87,15 @@ def update_fsp_chart( log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') return - # TODO: provide a read sync mechanism to avoid this polling. the - # underlying issue is that a backfill (aka prepend) and subsequent - # shm array first/last index update could result in an empty array - # read here since the stream is never torn down on the re-compute - # steps. - # read_tries = 2 - # while read_tries > 0: - # try: - # # read last - # array = shm.array - # value = array[-1][array_key] - # break - - # except IndexError: - # read_tries -= 1 - # continue + # update graphics + # NOTE: this does a length check internally which allows it + # staying above the last row check below.. + chart.update_curve_from_array( + graphics_name, + array, + array_key=array_key or graphics_name, + ) + chart._set_yrange() # XXX: re: ``array_key``: fsp func names must be unique meaning we # can't have duplicates of the underlying data even if multiple @@ -126,24 +111,12 @@ def update_fsp_chart( last_val_sticky.update_from_data(-1, last) -# _clses = { -# 'clears': {'trade', 'utrade', 'last'}, -# 'last': {'last'}, -# 'bids': {'bid', 'bsize'}, -# 'asks': {'ask', 'asize'}, -# } - -# XXX: idea for frame type data structure we could use on the -# wire instead of doing it here? -# frames = { -# 'index': ['type_a', 'type_c', 'type_n', 'type_n', 'type_c'], - -# 'type_a': [tick0, tick1, tick2, .., tickn], -# 'type_b': [tick0, tick1, tick2, .., tickn], -# 'type_c': [tick0, tick1, tick2, .., tickn], -# ... -# 'type_n': [tick0, tick1, tick2, .., tickn], -# } +# a working tick-type-classes template +_tick_groups = { + 'clears': {'trade', 'utrade', 'last'}, + 'bids': {'bid', 'bsize'}, + 'asks': {'ask', 'asize'}, +} def chart_maxmin( @@ -263,8 +236,12 @@ async def update_chart_from_quotes( now = time.time() quote_period = now - last_quote - if quote_period <= 1/_quote_throttle_rate: - log.warning(f'TOO FAST: {1/quote_period}') + quote_rate = round(1/quote_period, 1) if quote_period else float('inf') + if ( + quote_period <= 1/_quote_throttle_rate + and quote_rate > _quote_throttle_rate + 2 + ): + log.warning(f'High quote rate {symbol.key}: {quote_rate}') last_quote = now # chart isn't active/shown so skip render cycle and pause feed(s) @@ -291,15 +268,7 @@ async def update_chart_from_quotes( array = ohlcv.array if vlm_chart: - # print(f"volume: {end['volume']}") vlm_chart.update_curve_from_array('volume', array) - - # built-in tina $vlm FSP using chl3 typical price for ohlc step - # last = array[-1] - # chl3 = (last['close'] + last['high'] + last['low']) / 3 - # v = last['volume'] - # dv = last['volume'] * chl3 - vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) if ( @@ -346,7 +315,7 @@ async def update_chart_from_quotes( # TODO: eventually we want to separate out the utrade (aka # dark vlm prices) here and show them as an additional # graphic. - clear_types = {'trade', 'utrade', 'last'} + clear_types = _tick_groups['clears'] # XXX: if we wanted to iterate in "latest" (i.e. most # current) tick first order as an optimization where we only @@ -415,11 +384,11 @@ async def update_chart_from_quotes( # label.size -= size # elif ticktype in ('ask', 'asize'): - elif typ in ('ask', 'asize'): + elif typ in _tick_groups['asks']: l1.ask_label.update_fields({'level': price, 'size': size}) # elif ticktype in ('bid', 'bsize'): - elif typ in ('bid', 'bsize'): + elif typ in _tick_groups['bids']: l1.bid_label.update_fields({'level': price, 'size': size}) # check for y-range re-size @@ -492,7 +461,7 @@ def maybe_mk_fsp_shm( return shm, opened -@asynccontextmanager +@acm async def open_fsp_sidepane( linked: LinkedSplits, @@ -558,8 +527,66 @@ async def open_fsp_sidepane( yield sidepane -async def open_fspd_cluster( +@acm +async def open_fsp_cluster( + workers: int = 2 +) -> AsyncGenerator[int, dict[str, tractor.Portal]]: + + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + portals: dict[str, tractor.Portal] = {} + uid = tractor.current_actor().uid + + async with tractor.open_nursery() as an: + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" + async with trio.open_nursery() as n: + for index in range(workers): + + async def start(i) -> None: + key = f'fsp_{i}.' + '_'.join(uid) + portals[key] = await an.start_actor( + enable_modules=['piker.fsp._engine'], + name=key, + ) + + n.start_soon(start, index) + + assert len(portals) == workers + profiler('started fsp cluster') + yield portals + + +@acm +async def maybe_open_fsp_cluster( + workers: int = 2, + **kwargs, +) -> AsyncGenerator[int, dict[str, tractor.Portal]]: + + uid = tractor.current_actor().uid + async with maybe_open_ctx( + key=uid, # for now make a cluster per client? + mngr=open_fsp_cluster( + workers, + # loglevel=loglevel, + **kwargs, + ), + ) as (cache_hit, cluster_map): + if cache_hit: + log.info('re-using existing fsp cluster') + yield cluster_map + else: + yield cluster_map + + +async def start_fsp_displays( + + cluster_map: dict[str, tractor.Portal], linkedsplits: LinkedSplits, fsps: dict[str, str], sym: str, @@ -580,19 +607,22 @@ async def open_fspd_cluster( ''' linkedsplits.focus() - # spawns sub-processes which execute cpu bound fsp work - # which is streamed back to this parent. - async with ( - tractor.open_nursery() as n, - trio.open_nursery() as ln, - ): + profiler = pg.debug.Profiler( + delayed=False, + disabled=False + ) + + async with trio.open_nursery() as n: # Currently we spawn an actor per fsp chain but # likely we'll want to pool them eventually to # scale horizonatlly once cores are used up. - for display_name, conf in fsps.items(): + for (display_name, conf), (name, portal) in zip( + fsps.items(), + # rr to cluster for now.. + cycle(cluster_map.items()), + ): func_name = conf['func_name'] - shm, opened = maybe_mk_fsp_shm( sym, field_name=func_name, @@ -600,18 +630,17 @@ async def open_fspd_cluster( readonly=True, ) + profiler(f'created shm for fsp actor: {display_name}') + # XXX: fsp may have been opened by a duplicate chart. # Error for now until we figure out how to wrap fsps as # "feeds". assert opened, f"A chart for {key} likely # already exists?" - portal = await n.start_actor( - enable_modules=['piker.fsp._engine'], - name='fsp.' + display_name, - ) + profiler(f'attached to fsp portal: {display_name}') # init async - ln.start_soon( + n.start_soon( partial( update_chart_from_fsp, @@ -627,6 +656,7 @@ async def open_fspd_cluster( is_overlay=conf.get('overlay', False), group_status_key=group_status_key, loglevel=loglevel, + profiler=profiler, ) ) @@ -650,6 +680,7 @@ async def update_chart_from_fsp( group_status_key: str, loglevel: str, + profiler: pg.debug.Profiler, ) -> None: '''FSP stream chart update loop. @@ -658,6 +689,9 @@ async def update_chart_from_fsp( config map. ''' + + profiler(f'started chart task for fsp: {func_name}') + done = linkedsplits.window().status_bar.open_status( f'loading fsp, {display_name}..', group_key=group_status_key, @@ -676,12 +710,15 @@ async def update_chart_from_fsp( symbol=sym, func_name=func_name, loglevel=loglevel, + zero_on_step=conf.get('zero_on_step', False), ) as (ctx, last_index), ctx.open_stream() as stream, open_fsp_sidepane(linkedsplits, {display_name: conf},) as sidepane, ): + profiler(f'fsp:{func_name} attached to fsp ctx-stream') + if is_overlay: chart = linkedsplits.chart chart.draw_curve( @@ -719,6 +756,8 @@ async def update_chart_from_fsp( array_key = func_name + profiler(f'fsp:{func_name} chart created') + # first UI update, usually from shm pushed history update_fsp_chart( chart, @@ -753,6 +792,9 @@ async def update_chart_from_fsp( done() chart.linked.resize_sidepanes() + profiler(f'fsp:{func_name} starting update loop') + profiler.finish() + # update chart graphics i = 0 last = time.time() @@ -853,7 +895,7 @@ def has_vlm(ohlcv: ShmArray) -> bool: return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) -@asynccontextmanager +@acm async def maybe_open_vlm_display( linked: LinkedSplits, @@ -875,12 +917,12 @@ async def maybe_open_vlm_display( async with open_fsp_sidepane( linked, { - 'volume': { + '$_vlm': { 'params': { 'price_func': { - 'default_value': 'ohl3', + 'default_value': 'chl3', # tell target ``Edit`` widget to not allow # edits for now. 'widget_kwargs': {'readonly': True}, @@ -979,17 +1021,19 @@ async def display_symbol_data( # group_key=loading_sym_key, # ) - async with( + async with async_enter_all( open_feed( provider, [sym], loglevel=loglevel, - # 60 FPS to limit context switches + # limit to at least display's FPS + # avoiding needless Qt-in-guest-mode context switches tick_throttle=_quote_throttle_rate, + ), + maybe_open_fsp_cluster(), - ) as feed, - ): + ) as (feed, cluster_map): ohlcv: ShmArray = feed.shm bars = ohlcv.array @@ -1042,25 +1086,36 @@ async def display_symbol_data( # TODO: eventually we'll support some kind of n-compose syntax fsp_conf = { - 'rsi': { + # 'rsi': { + # 'func_name': 'rsi', # literal python func ref lookup name - # literal python func ref lookup name - 'func_name': 'rsi', + # # map of parameters to place on the fsp sidepane widget + # # which should map to dynamic inputs available to the + # # fsp function at runtime. + # 'params': { + # 'period': { + # 'default_value': 14, + # 'widget_kwargs': {'readonly': True}, + # }, + # }, - # map of parameters to place on the fsp sidepane widget - # which should map to dynamic inputs available to the - # fsp function at runtime. + # # ``ChartPlotWidget`` options passthrough + # 'chart_kwargs': { + # 'static_yrange': (0, 100), + # }, + # }, + 'dolla_vlm': { + 'func_name': 'dolla_vlm', + 'zero_on_step': True, 'params': { - 'period': { - 'default_value': 14, + 'price_func': { + 'default_value': 'chl3', + # tell target ``Edit`` widget to not allow + # edits for now. 'widget_kwargs': {'readonly': True}, }, }, - - # ``ChartPlotWidget`` options passthrough - 'chart_kwargs': { - 'static_yrange': (0, 100), - }, + 'chart_kwargs': {'style': 'step'} }, } @@ -1072,11 +1127,11 @@ async def display_symbol_data( # add VWAP to fsp config for downstream loading fsp_conf.update({ - 'vwap': { - 'func_name': 'vwap', - 'overlay': True, - 'anchor': 'session', - }, + # 'vwap': { + # 'func_name': 'vwap', + # 'overlay': True, + # 'anchor': 'session', + # }, }) # NOTE: we must immediately tell Qt to show the OHLC chart @@ -1086,13 +1141,15 @@ async def display_symbol_data( linkedsplits.focus() await trio.sleep(0) + vlm_chart = None async with ( trio.open_nursery() as ln, maybe_open_vlm_display(linkedsplits, ohlcv) as vlm_chart, ): # load initial fsp chain (otherwise known as "indicators") ln.start_soon( - open_fspd_cluster, + start_fsp_displays, + cluster_map, linkedsplits, fsp_conf, sym, From 162c58a8d87ac1e1267cc413effa5f85e9142532 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Oct 2021 10:32:34 -0400 Subject: [PATCH 03/12] Start testing out trionics helpers, put vlm before rsi --- piker/ui/_display.py | 83 ++++++++++++++++++++------------------------ 1 file changed, 38 insertions(+), 45 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 0a4a9405..a6379d46 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -37,7 +37,7 @@ import trio from .. import brokers from .._cacheables import maybe_open_ctx from ..trionics import async_enter_all -from ..data.feed import open_feed +from ..data.feed import open_feed, Feed from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -533,33 +533,19 @@ async def open_fsp_cluster( ) -> AsyncGenerator[int, dict[str, tractor.Portal]]: + from tractor._clustering import open_actor_cluster + profiler = pg.debug.Profiler( delayed=False, disabled=False ) - portals: dict[str, tractor.Portal] = {} - uid = tractor.current_actor().uid - - async with tractor.open_nursery() as an: - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" - async with trio.open_nursery() as n: - for index in range(workers): - - async def start(i) -> None: - key = f'fsp_{i}.' + '_'.join(uid) - portals[key] = await an.start_actor( - enable_modules=['piker.fsp._engine'], - name=key, - ) - - n.start_soon(start, index) - - assert len(portals) == workers + async with open_actor_cluster( + count=2, + names=['fsp_0', 'fsp_1'], + modules=['piker.fsp._engine'], + ) as cluster_map: profiler('started fsp cluster') - yield portals + yield cluster_map @acm @@ -827,11 +813,17 @@ async def update_chart_from_fsp( last = time.time() -async def check_for_new_bars(feed, ohlcv, linkedsplits): - """Task which updates from new bars in the shared ohlcv buffer every +async def check_for_new_bars( + feed: Feed, + ohlcv: np.ndarray, + linkedsplits: LinkedSplits, + +) -> None: + ''' + Task which updates from new bars in the shared ohlcv buffer every ``delay_s`` seconds. - """ + ''' # TODO: right now we'll spin printing bars if the last time # stamp is before a large period of no market activity. # Likely the best way to solve this is to make this task @@ -861,6 +853,7 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits): # XXX: this puts a flat bar on the current time step # TODO: if we eventually have an x-axis time-step "cursor" # we can get rid of this since it is extra overhead. + price_chart.update_ohlc_from_array( price_chart.name, ohlcv.array, @@ -1086,24 +1079,6 @@ async def display_symbol_data( # TODO: eventually we'll support some kind of n-compose syntax fsp_conf = { - # 'rsi': { - # 'func_name': 'rsi', # literal python func ref lookup name - - # # map of parameters to place on the fsp sidepane widget - # # which should map to dynamic inputs available to the - # # fsp function at runtime. - # 'params': { - # 'period': { - # 'default_value': 14, - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - - # # ``ChartPlotWidget`` options passthrough - # 'chart_kwargs': { - # 'static_yrange': (0, 100), - # }, - # }, 'dolla_vlm': { 'func_name': 'dolla_vlm', 'zero_on_step': True, @@ -1118,6 +1093,24 @@ async def display_symbol_data( 'chart_kwargs': {'style': 'step'} }, + 'rsi': { + 'func_name': 'rsi', # literal python func ref lookup name + + # map of parameters to place on the fsp sidepane widget + # which should map to dynamic inputs available to the + # fsp function at runtime. + 'params': { + 'period': { + 'default_value': 14, + 'widget_kwargs': {'readonly': True}, + }, + }, + + # ``ChartPlotWidget`` options passthrough + 'chart_kwargs': { + 'static_yrange': (0, 100), + }, + }, } if has_vlm(ohlcv): # and provider != 'binance': @@ -1144,7 +1137,7 @@ async def display_symbol_data( vlm_chart = None async with ( trio.open_nursery() as ln, - maybe_open_vlm_display(linkedsplits, ohlcv) as vlm_chart, + # maybe_open_vlm_display(linkedsplits, ohlcv) as vlm_chart, ): # load initial fsp chain (otherwise known as "indicators") ln.start_soon( From 8f023cd66f398f5993e34a64530ca4e17de932ee Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 28 Oct 2021 09:51:02 -0400 Subject: [PATCH 04/12] Factor out context cacher to `tractor.trionics` --- piker/_cacheables.py | 122 +------------------------------------------ piker/data/feed.py | 6 +-- piker/ui/_display.py | 4 +- 3 files changed, 7 insertions(+), 125 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 07ad2319..02ac9240 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -18,30 +18,18 @@ Cacheing apis and toolz. """ -# further examples of interest: -# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8 from collections import OrderedDict -from typing import ( - Any, - Hashable, - Optional, - TypeVar, - AsyncContextManager, -) from contextlib import ( asynccontextmanager, ) -import trio -from trio_typing import TaskStatus -import tractor +from tractor.trionics import maybe_open_context from .brokers import get_brokermod from .log import get_logger -T = TypeVar('T') log = get_logger(__name__) @@ -74,112 +62,6 @@ def async_lifo_cache(maxsize=128): return decorator -_cache: dict[str, 'Client'] = {} # noqa - - -class cache: - '''Globally (processs wide) cached, task access to a - kept-alive-while-in-use async resource. - - ''' - lock = trio.Lock() - users: int = 0 - values: dict[Any, Any] = {} - resources: dict[ - int, - Optional[tuple[trio.Nursery, trio.Event]] - ] = {} - no_more_users: Optional[trio.Event] = None - - @classmethod - async def run_ctx( - cls, - mng, - key, - task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, - - ) -> None: - async with mng as value: - - _, no_more_users = cls.resources[id(mng)] - cls.values[key] = value - task_status.started(value) - try: - await no_more_users.wait() - finally: - value = cls.values.pop(key) - # discard nursery ref so it won't be re-used (an error) - cls.resources.pop(id(mng)) - - -@asynccontextmanager -async def maybe_open_ctx( - - key: Hashable, - mngr: AsyncContextManager[T], - -) -> (bool, T): - '''Maybe open a context manager if there is not already a cached - version for the provided ``key``. Return the cached instance on - a cache hit. - - ''' - - await cache.lock.acquire() - - ctx_key = id(mngr) - - value = None - try: - # lock feed acquisition around task racing / ``trio``'s - # scheduler protocol - value = cache.values[key] - log.info(f'Reusing cached resource for {key}') - cache.users += 1 - cache.lock.release() - yield True, value - - except KeyError: - log.info(f'Allocating new resource for {key}') - - # **critical section** that should prevent other tasks from - # checking the cache until complete otherwise the scheduler - # may switch and by accident we create more then one feed. - - # TODO: avoid pulling from ``tractor`` internals and - # instead offer a "root nursery" in piker actors? - service_n = tractor.current_actor()._service_n - - # TODO: does this need to be a tractor "root nursery"? - ln = cache.resources.get(ctx_key) - assert not ln - - ln, _ = cache.resources[ctx_key] = (service_n, trio.Event()) - - value = await ln.start(cache.run_ctx, mngr, key) - cache.users += 1 - cache.lock.release() - - yield False, value - - finally: - cache.users -= 1 - - if cache.lock.locked(): - cache.lock.release() - - if value is not None: - # if no more consumers, teardown the client - if cache.users <= 0: - log.warning(f'De-allocating resource for {key}') - - # terminate mngr nursery - entry = cache.resources.get(ctx_key) - if entry: - _, no_more_users = entry - no_more_users.set() - - @asynccontextmanager async def open_cached_client( brokername: str, @@ -190,7 +72,7 @@ async def open_cached_client( ''' brokermod = get_brokermod(brokername) - async with maybe_open_ctx( + async with maybe_open_context( key=brokername, mngr=brokermod.get_client(), ) as (cache_hit, client): diff --git a/piker/data/feed.py b/piker/data/feed.py index f8cb0e9b..337a625c 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -37,7 +37,7 @@ import tractor from pydantic import BaseModel from ..brokers import get_brokermod -from .._cacheables import maybe_open_ctx +from .._cacheables import maybe_open_context from ..log import get_logger, get_console_log from .._daemon import ( maybe_spawn_brokerd, @@ -368,7 +368,7 @@ async def open_sample_step_stream( # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes - async with maybe_open_ctx( + async with maybe_open_context( key=delay_s, mngr=portal.open_stream_from( iter_ohlc_periods, @@ -590,7 +590,7 @@ async def maybe_open_feed( ''' sym = symbols[0].lower() - async with maybe_open_ctx( + async with maybe_open_context( key=(brokername, sym), mngr=open_feed( brokername, diff --git a/piker/ui/_display.py b/piker/ui/_display.py index a6379d46..463f243d 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -35,7 +35,7 @@ import tractor import trio from .. import brokers -from .._cacheables import maybe_open_ctx +from .._cacheables import maybe_open_context from ..trionics import async_enter_all from ..data.feed import open_feed, Feed from ._chart import ( @@ -555,7 +555,7 @@ async def maybe_open_fsp_cluster( ) -> AsyncGenerator[int, dict[str, tractor.Portal]]: uid = tractor.current_actor().uid - async with maybe_open_ctx( + async with maybe_open_context( key=uid, # for now make a cluster per client? mngr=open_fsp_cluster( workers, From ca467f45b6acf70bb817a36ae32baf7b0c10063e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 1 Nov 2021 10:03:58 -0400 Subject: [PATCH 05/12] Guard against empty array read in step update task --- piker/data/_sharedmem.py | 3 +-- piker/ui/_display.py | 47 ++++++++++++++++++++++++++++++---------- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 2169e262..5d38cbbd 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -272,9 +272,8 @@ class ShmArray: return end except ValueError as err: - # shoudl raise if diff detected + # should raise if diff detected self.diff_err_fields(data) - raise err def diff_err_fields( diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 463f243d..f6b8dce3 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -61,18 +61,19 @@ log = get_logger(__name__) _quote_throttle_rate: int = 58 # Hz -def update_fsp_chart( - chart: ChartPlotWidget, - shm: ShmArray, - graphics_name: str, - array_key: Optional[str], +def try_read(array: np.ndarray) -> Optional[np.ndarray]: + ''' + Try to read the last row from a shared mem array or ``None`` + if the array read returns a zero-length array result. -) -> None: - - array = shm.array + Can be used to check for backfilling race conditions where an array + is currently being (re-)written by a writer actor but the reader is + unaware and reads during the window where the first and last indexes + are being updated. + ''' try: - last_row = array[-1] + return array[-1] except IndexError: # XXX: race condition with backfilling shm. # @@ -85,6 +86,23 @@ def update_fsp_chart( # signal that a prepend is taking place and this consumer can # respond (eg. redrawing graphics) accordingly. log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') + + # the array read was emtpy + return None + + +def update_fsp_chart( + chart: ChartPlotWidget, + shm: ShmArray, + graphics_name: str, + array_key: Optional[str], + +) -> None: + + array = shm.array + last_row = try_read(array) + # guard against unreadable case + if not last_row: return # update graphics @@ -850,13 +868,17 @@ async def check_for_new_bars( # current bar) and then either write the current bar manually # or place a cursor for visual cue of the current time step. + array = ohlcv.array + # avoid unreadable race case on backfills + while not try_read(array): + await trio.sleep(0.01) + # XXX: this puts a flat bar on the current time step # TODO: if we eventually have an x-axis time-step "cursor" # we can get rid of this since it is extra overhead. - price_chart.update_ohlc_from_array( price_chart.name, - ohlcv.array, + array, just_history=False, ) @@ -870,6 +892,9 @@ async def check_for_new_bars( # each subplot for name, chart in linkedsplits.subplots.items(): + + # TODO: do we need the same unreadable guard as for the + # price chart (above) here? chart.update_curve_from_array( chart.name, chart._shm.array, From 00d6258a2420e66b66e86d36421462e6c2716bdc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Nov 2021 15:45:56 -0400 Subject: [PATCH 06/12] Stopgap: don't rerun Context.started() fsp calc task --- piker/fsp/_engine.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 00cccdbd..2b7056da 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -144,10 +144,13 @@ async def fsp_compute( profiler(f'{func_name} pushed history') profiler.finish() + # TODO: UGH, what is the right way to do something like this? + if not ctx._started_called: + await ctx.started(index) + # setup a respawn handle with trio.CancelScope() as cs: tracker = TaskTracker(trio.Event(), cs) - await ctx.started(index) task_status.started((tracker, index)) profiler(f'{func_name} yield last index') From 94572716e68741cd9221a11d33cfd906bdf85a8f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Nov 2021 15:48:03 -0400 Subject: [PATCH 07/12] Drop print around unshown fsp updates --- piker/ui/_display.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index f6b8dce3..c3335a1a 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -800,14 +800,11 @@ async def update_chart_from_fsp( profiler.finish() # update chart graphics - i = 0 last = time.time() async for value in stream: # chart isn't actively shown so just skip render cycle if chart.linked.isHidden(): - print(f'{i} unseen fsp cyclce') - i += 1 continue else: From 590db2c51bec3fc8a1cc15df8169693ec06709ab Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 9 Nov 2021 09:44:01 -0500 Subject: [PATCH 08/12] Add back in vwap --- piker/ui/_display.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index c3335a1a..3e057ddd 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -1142,11 +1142,11 @@ async def display_symbol_data( # add VWAP to fsp config for downstream loading fsp_conf.update({ - # 'vwap': { - # 'func_name': 'vwap', - # 'overlay': True, - # 'anchor': 'session', - # }, + 'vwap': { + 'func_name': 'vwap', + 'overlay': True, + 'anchor': 'session', + }, }) # NOTE: we must immediately tell Qt to show the OHLC chart From 835ad7794c499117c1e9d4ac39a5c60aac4fefb6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Dec 2021 15:08:44 -0500 Subject: [PATCH 09/12] Don't error on sub removal attempts, feeds need backpressure --- piker/data/feed.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 337a625c..0ee94ad9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -356,7 +356,10 @@ async def open_feed_bus( f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') if tick_throttle: n.cancel_scope.cancel() - bus._subscribers[symbol].remove(sub) + try: + bus._subscribers[symbol].remove(sub) + except ValueError: + log.warning(f'{sub} for {symbol} was already removed?') @asynccontextmanager @@ -520,7 +523,12 @@ async def open_feed( ) as (ctx, (init_msg, first_quotes)), - ctx.open_stream() as stream, + ctx.open_stream( + # XXX: be explicit about stream backpressure since we should + # **never** overrun on feeds being too fast, which will + # pretty much always happen with HFT XD + backpressure=True + ) as stream, ): # we can only read from shm From 5b368992f6fc1e681a38f01bb05bc44f01b3b400 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Dec 2021 15:10:37 -0500 Subject: [PATCH 10/12] docstr tweakz --- piker/ui/_app.py | 7 ++++--- piker/ui/_display.py | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/piker/ui/_app.py b/piker/ui/_app.py index 78db608c..998815ba 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -170,10 +170,11 @@ def _main( piker_loglevel: str, tractor_kwargs, ) -> None: - """Sync entry point to start a chart app. + ''' + Sync entry point to start a chart: a ``tractor`` + Qt runtime + entry point - """ - # ``tractor`` + Qt runtime entry point + ''' run_qtractor( func=_async_main, args=(sym, brokernames, piker_loglevel), diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 3e057ddd..85e7ac60 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -1014,7 +1014,8 @@ async def display_symbol_data( order_mode_started: trio.Event, ) -> None: - '''Spawn a real-time updated chart for ``symbol``. + ''' + Spawn a real-time updated chart for ``symbol``. Spawned ``LinkedSplits`` chart widgets can remain up but hidden so that multiple symbols can be viewed and switched between extremely From 422977d27ace39b01e4cbce4c50410e3410d90f7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 17 Dec 2021 13:34:04 -0500 Subject: [PATCH 11/12] Port to new `tractor.trionics.maybe_open_context()` api --- piker/_cacheables.py | 6 +++--- piker/data/feed.py | 27 +++++++++++++++------------ piker/ui/_display.py | 14 +++++++------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 02ac9240..ba7361c3 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -66,14 +66,14 @@ def async_lifo_cache(maxsize=128): async def open_cached_client( brokername: str, ) -> 'Client': # noqa - '''Get a cached broker client from the current actor's local vars. + ''' + Get a cached broker client from the current actor's local vars. If one has not been setup do it and cache it. ''' brokermod = get_brokermod(brokername) async with maybe_open_context( - key=brokername, - mngr=brokermod.get_client(), + acm_func=brokermod.get_client, ) as (cache_hit, client): yield client diff --git a/piker/data/feed.py b/piker/data/feed.py index 0ee94ad9..1e0c55b2 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -27,7 +27,6 @@ from types import ModuleType from typing import ( Any, Sequence, AsyncIterator, Optional, - Awaitable, Callable, ) import trio @@ -372,11 +371,12 @@ async def open_sample_step_stream( # a lone broker-daemon per provider should be # created for all practical purposes async with maybe_open_context( - key=delay_s, - mngr=portal.open_stream_from( + acm_func=partial( + portal.open_stream_from, iter_ohlc_periods, - delay_s=delay_s, # must be kwarg ), + + kwargs={'delay_s': delay_s}, ) as (cache_hit, istream): if cache_hit: # add a new broadcast subscription for the quote stream @@ -524,7 +524,7 @@ async def open_feed( ) as (ctx, (init_msg, first_quotes)), ctx.open_stream( - # XXX: be explicit about stream backpressure since we should + # XXX: be explicit about stream backpressure since we should # **never** overrun on feeds being too fast, which will # pretty much always happen with HFT XD backpressure=True @@ -574,6 +574,7 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) + # yield feed try: yield feed finally: @@ -599,16 +600,18 @@ async def maybe_open_feed( sym = symbols[0].lower() async with maybe_open_context( - key=(brokername, sym), - mngr=open_feed( - brokername, - [sym], - loglevel=loglevel, - **kwargs, - ), + acm_func=open_feed, + kwargs={ + 'brokername': brokername, + 'symbols': [sym], + 'loglevel': loglevel, + 'tick_throttle': kwargs.get('tick_throttle'), + }, + key=sym, ) as (cache_hit, feed): if cache_hit: + print('USING CACHED FEED') # add a new broadcast subscription for the quote stream # if this feed is likely already in use async with feed.stream.subscribe() as bstream: diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 85e7ac60..241cd370 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -572,14 +572,14 @@ async def maybe_open_fsp_cluster( **kwargs, ) -> AsyncGenerator[int, dict[str, tractor.Portal]]: - uid = tractor.current_actor().uid + kwargs.update( + {'workers': workers} + ) + async with maybe_open_context( - key=uid, # for now make a cluster per client? - mngr=open_fsp_cluster( - workers, - # loglevel=loglevel, - **kwargs, - ), + # for now make a cluster per client? + acm_func=open_fsp_cluster, + kwargs=kwargs, ) as (cache_hit, cluster_map): if cache_hit: log.info('re-using existing fsp cluster') From 119ff0ec2053a895d65ed0ab348b1a338d13edfb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 Jan 2022 06:20:06 -0500 Subject: [PATCH 12/12] Drop dollar vlm config; Add back basic vlm --- piker/ui/_display.py | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 241cd370..dcc0badd 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -1102,20 +1102,6 @@ async def display_symbol_data( # TODO: eventually we'll support some kind of n-compose syntax fsp_conf = { - 'dolla_vlm': { - 'func_name': 'dolla_vlm', - 'zero_on_step': True, - 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. - 'widget_kwargs': {'readonly': True}, - }, - }, - 'chart_kwargs': {'style': 'step'} - }, - 'rsi': { 'func_name': 'rsi', # literal python func ref lookup name @@ -1160,7 +1146,7 @@ async def display_symbol_data( vlm_chart = None async with ( trio.open_nursery() as ln, - # maybe_open_vlm_display(linkedsplits, ohlcv) as vlm_chart, + maybe_open_vlm_display(linkedsplits, ohlcv) as vlm_chart, ): # load initial fsp chain (otherwise known as "indicators") ln.start_soon(