diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 48b20d80..f0a8d367 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -38,6 +38,7 @@ import contextlib import trio import tractor +from tractor.experimental import msgpub from async_generator import asynccontextmanager from ..log import get_logger, get_console_log @@ -98,7 +99,7 @@ class BrokerFeed: ) -@tractor.msg.pub(tasks=['stock', 'option']) +@msgpub(tasks=['stock', 'option']) async def stream_poll_requests( get_topics: Callable, get_quotes: Coroutine, @@ -293,7 +294,7 @@ async def start_quote_stream( await stream_poll_requests( - # ``msg.pub`` required kwargs + # ``trionics.msgpub`` required kwargs task_name=feed_type, ctx=ctx, topics=symbols, diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b378f5f2..4dcf7b14 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1032,7 +1032,11 @@ async def get_client( # https://interactivebrokers.github.io/tws-api/tick_types.html tick_types = { 77: 'trade', - 48: 'utrade', + + # a "utrade" aka an off exchange "unreportable" (dark) vlm: + # https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume + 48: 'dark_trade', + 0: 'bsize', 1: 'bid', 2: 'ask', @@ -1046,13 +1050,17 @@ tick_types = { def normalize( ticker: Ticker, calc_price: bool = False + ) -> dict: # convert named tuples to dicts so we send usable keys new_ticks = [] for tick in ticker.ticks: if tick and not isinstance(tick, dict): td = tick._asdict() - td['type'] = tick_types.get(td['tickType'], 'n/a') + td['type'] = tick_types.get( + td['tickType'], + 'n/a', + ) new_ticks.append(td) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 3278e40b..24d2dab3 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -553,8 +553,6 @@ async def stream_quotes( quote = ohlc topic = quote['symbol'].lower() - # XXX: format required by ``tractor.msg.pub`` - # requires a ``Dict[topic: str, quote: dict]`` await send_chan.send({topic: quote}) diff --git a/piker/data/_normalize.py b/piker/data/_normalize.py index 3474879e..56d64b75 100644 --- a/piker/data/_normalize.py +++ b/piker/data/_normalize.py @@ -25,14 +25,18 @@ import numpy as np def iterticks( quote: dict, - types: Tuple[str] = ('trade', 'utrade'), + types: Tuple[str] = ('trade', 'dark_trade'), + ) -> AsyncIterator: - """Iterate through ticks delivered per quote cycle. - """ + ''' + Iterate through ticks delivered per quote cycle. + + ''' # print(f"{quote}\n\n") ticks = quote.get('ticks', ()) if ticks: for tick in ticks: # print(f"{quote['symbol']}: {tick}") - if tick.get('type') in types: + ttype = tick.get('type') + if ttype in types: yield tick diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 5e702e08..b29b0f7d 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -176,12 +176,11 @@ async def sample_and_broadcast( # TODO: ``numba`` this! for sym, quote in quotes.items(): - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. + # TODO: in theory you can send the IPC msg *before* writing + # to the sharedmem array to decrease latency, however, that + # will require at least some way to prevent task switching + # at the yield such that the array write isn't delayed while + # another consumer is serviced.. # start writing the shm buffer with appropriate # trade data diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py new file mode 100644 index 00000000..f2c7cdc8 --- /dev/null +++ b/piker/fsp/_api.py @@ -0,0 +1,163 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship of pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +FSP (financial signal processing) apis. + +''' + +# TODO: things to figure the heck out: +# - how to handle non-plottable values (pyqtgraph has facility for this +# now in `arrayToQPath()`) +# - composition of fsps / implicit chaining syntax (we need an issue) + +from __future__ import annotations +from functools import partial +from typing import ( + Any, + Callable, + Awaitable, + Optional, +) + +import numpy as np +import tractor +from tractor.msg import NamespacePath + +from ..data._sharedmem import ( + ShmArray, + maybe_open_shm_array, +) +from ..log import get_logger + +log = get_logger(__name__) + +# global fsp registry filled out by @fsp decorator below +_fsp_registry = {} + + +def _load_builtins() -> dict[tuple, Callable]: + + # import to implicity trigger registration via ``@fsp`` + from . import _momo # noqa + from . import _volume # noqa + + return _fsp_registry + + +class Fsp: + ''' + "Financial signal processor" decorator wrapped async function. + + ''' + + # TODO: checkout the advanced features from ``wrapt``: + # - dynamic enable toggling, + # https://wrapt.readthedocs.io/en/latest/decorators.html#dynamically-disabling-decorators + # - custom object proxies, might be useful for implementing n-compose + # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-object-proxies + # - custom function wrappers, + # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers + + def __init__( + self, + func: Callable[..., Awaitable], + *, + outputs: tuple[str] = (), + display_name: Optional[str] = None, + **config, + + ) -> None: + + # TODO (maybe): + # - type introspection? + # - should we make this a wrapt object proxy? + self.func = func + self.__name__ = func.__name__ # XXX: must have func-object name + + self.ns_path: tuple[str, str] = NamespacePath.from_ref(func) + self.outputs = outputs + self.config: dict[str, Any] = config + + # register with declared set. + _fsp_registry[self.ns_path] = func + + @property + def name(self) -> str: + return self.__name__ + + def __call__( + self, + + # TODO: when we settle on py3.10 we should probably use the new + # type annots from pep 612: + # https://www.python.org/dev/peps/pep-0612/ + # instance, + *args, + **kwargs + ): + return self.func(*args, **kwargs) + + +def fsp( + wrapped=None, + *, + outputs: tuple[str] = (), + display_name: Optional[str] = None, + **config, + +) -> Fsp: + + if wrapped is None: + return partial( + Fsp, + outputs=outputs, + display_name=display_name, + **config, + ) + + return Fsp(wrapped, outputs=(wrapped.__name__,)) + + +def maybe_mk_fsp_shm( + sym: str, + target: fsp, + readonly: bool = True, + +) -> (ShmArray, bool): + ''' + Allocate a single row shm array for an symbol-fsp pair if none + exists, otherwise load the shm already existing for that token. + + ''' + uid = tractor.current_actor().uid + + # TODO: load output types from `Fsp` + # - should `index` be a required internal field? + fsp_dtype = np.dtype( + [('index', int)] + + [(field_name, float) for field_name in target.outputs] + ) + + key = f'{sym}.fsp.{target.name}.{".".join(uid)}' + + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + return shm, opened diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index aafaf76c..afd986e0 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -27,29 +27,18 @@ import pyqtgraph as pg import trio from trio_typing import TaskStatus import tractor +from tractor.msg import NamespacePath from ..log import get_logger, get_console_log from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray -from ._momo import _rsi, _wma -from ._volume import _tina_vwap, dolla_vlm +from ._api import Fsp +from ._api import _load_builtins log = get_logger(__name__) -_fsp_builtins = { - 'rsi': _rsi, - 'wma': _wma, - 'vwap': _tina_vwap, - 'dolla_vlm': dolla_vlm, -} - -# TODO: things to figure the heck out: -# - how to handle non-plottable values (pyqtgraph has facility for this -# now in `arrayToQPath()`) -# - composition of fsps / implicit chaining syntax (we need an issue) - @dataclass class TaskTracker: @@ -88,7 +77,6 @@ async def fsp_compute( src: ShmArray, dst: ShmArray, - func_name: str, func: Callable, attach_stream: bool = False, @@ -115,15 +103,27 @@ async def fsp_compute( # and get historical output history_output = await out_stream.__anext__() + func_name = func.__name__ profiler(f'{func_name} generated history') - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), + # build struct array with an 'index' field to push as history + history = np.zeros( + len(history_output), dtype=dst.array.dtype ) - history[func_name] = history_output + + # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? + # if the output array is multi-field then push + # each respective field. + fields = getattr(history.dtype, 'fields', None) + if fields: + for key in fields.keys(): + if key in history.dtype.fields: + history[func_name] = history_output + + # single-key output stream + else: + history[func_name] = history_output # TODO: XXX: # THERE'S A BIG BUG HERE WITH THE `index` field since we're @@ -164,8 +164,9 @@ async def fsp_compute( async for processed in out_stream: log.debug(f"{func_name}: {processed}") + key, output = processed index = src.index - dst.array[-1][func_name] = processed + dst.array[-1][key] = output # NOTE: for now we aren't streaming this to the consumer # stream latest array index entry which basically just acts @@ -194,7 +195,7 @@ async def cascade( src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], - func_name: str, + ns_path: NamespacePath, zero_on_step: bool = False, loglevel: Optional[str] = None, @@ -213,10 +214,18 @@ async def cascade( src = attach_shm_array(token=src_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token) - func: Callable = _fsp_builtins.get(func_name) + reg = _load_builtins() + lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg]) + log.info( + f'Registered FSP set:\n{lines}' + ) + func: Fsp = reg.get( + NamespacePath(ns_path) + ) + if not func: # TODO: assume it's a func target path - raise ValueError('Unknown fsp target: {func_name}') + raise ValueError(f'Unknown fsp target: {ns_path}') # open a data feed stream with requested broker async with data.feed.maybe_open_feed( @@ -231,11 +240,12 @@ async def cascade( ) as (feed, quote_stream): - profiler(f'{func_name}: feed up') + profiler(f'{func}: feed up') assert src.token == feed.shm.token # last_len = new_len = len(src.array) + func_name = func.__name__ async with ( trio.open_nursery() as n, ): @@ -252,7 +262,7 @@ async def cascade( src=src, dst=dst, - func_name=func_name, + # func_name=func_name, func=func ) diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 2ee55e00..29e94f98 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -23,6 +23,7 @@ from typing import AsyncIterator, Optional import numpy as np from numba import jit, float64, optional, int64 +from ._api import fsp from ..data._normalize import iterticks from ..data._sharedmem import ShmArray @@ -106,7 +107,7 @@ def ema( # nopython=True, # nogil=True # ) -def rsi( +def _rsi( # TODO: use https://github.com/ramonhagenaars/nptyping signal: 'np.ndarray[float64]', @@ -146,7 +147,7 @@ def rsi( return rsi, up_ema[-1], down_ema[-1] -def wma( +def _wma( signal: np.ndarray, length: int, @@ -169,10 +170,8 @@ def wma( return np.convolve(signal, weights, 'valid') -# @piker.fsp.emit( -# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'], -# ) -async def _rsi( +@fsp +async def rsi( source: 'QuoteStream[Dict[str, Any]]', # noqa ohlcv: ShmArray, @@ -188,11 +187,11 @@ async def _rsi( sig = ohlcv.array['close'] # wilder says to seed the RSI EMAs with the SMA for the "period" - seed = wma(ohlcv.last(period)['close'], period)[0] + seed = _wma(ohlcv.last(period)['close'], period)[0] # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. - rsi_h, last_up_ema_close, last_down_ema_close = rsi( + rsi_h, last_up_ema_close, last_down_ema_close = _rsi( sig, period, seed, seed) up_ema_last = last_up_ema_close down_ema_last = last_down_ema_close @@ -218,7 +217,7 @@ async def _rsi( last_down_ema_close = down_ema_last index = ohlcv.index - rsi_out, up_ema_last, down_ema_last = rsi( + rsi_out, up_ema_last, down_ema_last = _rsi( sig, period=period, up_ema_last=last_up_ema_close, @@ -227,7 +226,8 @@ async def _rsi( yield rsi_out[-1:] -async def _wma( +@fsp +async def wma( source, #: AsyncStream[np.ndarray], length: int, @@ -243,10 +243,10 @@ async def _wma( ''' # deliver historical output as "first yield" - yield wma(ohlcv.array['close'], length) + yield _wma(ohlcv.array['close'], length) # begin real-time section async for quote in source: for tick in iterticks(quote, type='trade'): - yield wma(ohlcv.last(length)) + yield _wma(ohlcv.last(length)) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index e662343f..7cf7d7b4 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -19,6 +19,7 @@ from typing import AsyncIterator, Optional, Union import numpy as np from tractor.trionics._broadcast import AsyncReceiver +from ._api import fsp from ..data._normalize import iterticks from ..data._sharedmem import ShmArray @@ -50,7 +51,8 @@ def wap( ) -async def _tina_vwap( +@fsp +async def tina_vwap( source: AsyncReceiver[dict], ohlcv: ShmArray, # OHLC sampled history @@ -62,7 +64,8 @@ async def _tina_vwap( AsyncIterator[np.ndarray], float ]: - '''Streaming volume weighted moving average. + ''' + Streaming volume weighted moving average. Calling this "tina" for now since we're using HLC3 instead of tick. @@ -100,26 +103,25 @@ async def _tina_vwap( w_tot += price * size # yield ((((o + h + l) / 3) * v) weights_tot) / v_tot - yield w_tot / v_tot + yield 'tina_vwap', w_tot / v_tot -# @fsp.config( -# name='dolla_vlm', -# ohlc=False, -# style='step', -# ) +@fsp( + outputs=('dolla_vlm', 'dark_vlm'), + ohlc=False, + curve_style='step', +) async def dolla_vlm( source: AsyncReceiver[dict], ohlcv: ShmArray, # OHLC sampled history -) -> Union[ - AsyncIterator[np.ndarray], - float +) -> AsyncIterator[ + tuple[str, Union[np.ndarray, float]], ]: ''' "Dollar Volume", aka the volume in asset-currency-units (usually a fiat) computed from some price function for the sample step - *times* the asset unit volume. + *multiplied* (*) by the asset unit volume. Useful for comparing cross asset "money flow" in #s that are asset-currency-independent. @@ -129,11 +131,12 @@ async def dolla_vlm( chl3 = (a['close'] + a['high'] + a['low']) / 3 v = a['volume'] - # history + # on first iteration yield history yield chl3 * v i = ohlcv.index - lvlm = 0 + output = vlm = 0 + dvlm = 0 async for quote in source: for tick in iterticks(quote): @@ -145,14 +148,30 @@ async def dolla_vlm( li = ohlcv.index if li > i: i = li - lvlm = 0 + vlm = 0 + dvlm = 0 - c, h, l, v = ohlcv.last()[ - ['close', 'high', 'low', 'volume'] - ][0] + # TODO: for marginned instruments (futes, etfs?) we need to + # show the margin $vlm by multiplying by whatever multiplier + # is reported in the sym info. - lvlm += price * size - tina_lvlm = c+h+l/3 * v + ttype = tick.get('type') + if ttype == 'dark_trade': + print(f'dark_trade: {tick}') + key = 'dark_vlm' + dvlm += price * size + output = dvlm + + else: + key = 'dolla_vlm' + vlm += price * size + output = vlm + + # TODO: plot both to compare? + # c, h, l, v = ohlcv.last()[ + # ['close', 'high', 'low', 'volume'] + # ][0] + # tina_lvlm = c+h+l/3 * v # print(f' tinal vlm: {tina_lvlm}') - yield lvlm + yield key, output diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 22196f23..937b7e1e 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -28,7 +28,6 @@ from typing import Optional, AsyncGenerator, Any import numpy as np from pydantic import create_model import tractor -# from tractor.trionics import gather_contexts import pyqtgraph as pg import trio from trio_typing import TaskStatus @@ -38,57 +37,25 @@ from .._cacheables import maybe_open_context from ..calc import humanize from ..data._sharedmem import ( ShmArray, - maybe_open_shm_array, try_read, ) from ._chart import ( ChartPlotWidget, LinkedSplits, ) -from .. import fsp from ._forms import ( FieldsForm, mk_form, open_form_input_handling, ) +from ..fsp._api import maybe_mk_fsp_shm, Fsp +from ..fsp import cascade +from ..fsp._volume import tina_vwap, dolla_vlm from ..log import get_logger log = get_logger(__name__) -def maybe_mk_fsp_shm( - sym: str, - field_name: str, - display_name: Optional[str] = None, - readonly: bool = True, - -) -> (ShmArray, bool): - ''' - Allocate a single row shm array for an symbol-fsp pair if none - exists, otherwise load the shm already existing for that token. - - ''' - uid = tractor.current_actor().uid - if not display_name: - display_name = field_name - - # TODO: load function here and introspect - # return stream type(s) - - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (field_name, float)]) - - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' - - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, - readonly=True, - ) - return shm, opened - - def has_vlm(ohlcv: ShmArray) -> bool: # make sure that the instrument supports volume history # (sometimes this is not the case for some commodities and @@ -148,11 +115,11 @@ async def open_fsp_sidepane( assert len(conf) == 1 # for now # add (single) selection widget - for display_name, config in conf.items(): - schema[display_name] = { + for name, config in conf.items(): + schema[name] = { 'label': '**fsp**:', 'type': 'select', - 'default_value': [display_name], + 'default_value': [name], } # add parameters for selection "options" @@ -180,7 +147,7 @@ async def open_fsp_sidepane( # https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation FspConfig = create_model( 'FspConfig', - name=display_name, + name=name, **params, ) sidepane.model = FspConfig() @@ -228,8 +195,7 @@ async def run_fsp_ui( linkedsplits: LinkedSplits, shm: ShmArray, started: trio.Event, - func_name: str, - display_name: str, + target: Fsp, conf: dict[str, dict], loglevel: str, # profiler: pg.debug.Profiler, @@ -244,17 +210,18 @@ async def run_fsp_ui( config. ''' - # profiler(f'started UI task for fsp: {func_name}') + name = target.name + # profiler(f'started UI task for fsp: {name}') async with ( # side UI for parameters/controls open_fsp_sidepane( linkedsplits, - {display_name: conf}, + {name: conf}, ) as sidepane, ): await started.wait() - # profiler(f'fsp:{func_name} attached to fsp ctx-stream') + # profiler(f'fsp:{name} attached to fsp ctx-stream') overlay_with = conf.get('overlay', False) if overlay_with: @@ -264,24 +231,24 @@ async def run_fsp_ui( chart = linkedsplits.subplots[overlay_with] chart.draw_curve( - name=display_name, + name=name, data=shm.array, overlay=True, color='default_light', - array_key=func_name, + array_key=name, separate_axes=conf.get('separate_axes', False), **conf.get('chart_kwargs', {}) ) # specially store ref to shm for lookup in display loop - chart._overlays[display_name] = shm + chart._overlays[name] = shm else: # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( - name=display_name, + name=name, array=shm.array, - array_key=func_name, + array_key=name, sidepane=sidepane, # curve by default @@ -299,15 +266,15 @@ async def run_fsp_ui( # should **not** be the same sub-chart widget assert chart.name != linkedsplits.chart.name - array_key = func_name + array_key = name - # profiler(f'fsp:{func_name} chart created') + # profiler(f'fsp:{name} chart created') # first UI update, usually from shm pushed history update_fsp_chart( chart, shm, - display_name, + name, array_key=array_key, ) @@ -320,7 +287,7 @@ async def run_fsp_ui( # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[func_name]) + # graphics = chart.update_from_array(chart.name, array[name]) # graphics.curve.setBrush(50, 50, 200, 100) # graphics.curve.setFillLevel(50) @@ -410,7 +377,7 @@ class FspAdmin: started: trio.Event, dst_shm: ShmArray, conf: dict, - func_name: str, + target: Fsp, loglevel: str, ) -> None: @@ -420,11 +387,12 @@ class FspAdmin: ''' brokername, sym = self.linked.symbol.front_feed() + ns_path = str(target.ns_path) async with ( portal.open_context( # chaining entrypoint - fsp.cascade, + cascade, # data feed key brokername=brokername, @@ -435,7 +403,7 @@ class FspAdmin: dst_shm_token=dst_shm.token, # target - func_name=func_name, + ns_path=ns_path, loglevel=loglevel, zero_on_step=conf.get('zero_on_step', False), @@ -444,8 +412,13 @@ class FspAdmin: ctx.open_stream() as stream, ): # register output data - self._registry[(brokername, sym, func_name)] = ( - stream, dst_shm, complete) + self._registry[ + (brokername, sym, ns_path) + ] = ( + stream, + dst_shm, + complete + ) started.set() @@ -455,39 +428,38 @@ class FspAdmin: async def start_engine_task( self, - display_name: str, + target: Fsp, conf: dict[str, dict[str, Any]], worker_name: Optional[str] = None, - loglevel: str = 'error', + loglevel: str = 'info', ) -> (ShmArray, trio.Event): - # unpack FSP details from config dict - func_name = conf['func_name'] + fqsn = self.linked.symbol.front_feed() # allocate an output shm array dst_shm, opened = maybe_mk_fsp_shm( - self.linked.symbol.front_feed(), - field_name=func_name, - display_name=display_name, + fqsn, + target=target, readonly=True, ) - if not opened: - raise RuntimeError(f'Already started FSP {func_name}') + # if not opened: + # raise RuntimeError( + # f'Already started FSP `{fqsn}:{func_name}`' + # ) portal = self.cluster.get(worker_name) or self.rr_next_portal() complete = trio.Event() started = trio.Event() self.tn.start_soon( self.open_chain, - portal, complete, started, dst_shm, conf, - func_name, + target, loglevel, ) @@ -495,16 +467,16 @@ class FspAdmin: async def open_fsp_chart( self, - display_name: str, + + target: Fsp, + conf: dict, # yeah probably dumb.. loglevel: str = 'error', ) -> (trio.Event, ChartPlotWidget): - func_name = conf['func_name'] - shm, started = await self.start_engine_task( - display_name, + target, conf, loglevel, ) @@ -517,8 +489,7 @@ class FspAdmin: self.linked, shm, started, - func_name, - display_name, + target, conf=conf, loglevel=loglevel, @@ -621,14 +592,22 @@ async def open_vlm_displays( ) # force 0 to always be in view - def maxmin(name) -> tuple[float, float]: - mxmn = chart.maxmin(name=name) - if mxmn: - return 0, mxmn[1] + def maxmin( + names: list[str], - return 0, 0 + ) -> tuple[float, float]: + mx = 0 + for name in names: + mxmn = chart.maxmin(name=name) + if mxmn: + mx = max(mxmn[1], mx) - chart.view._maxmin = partial(maxmin, name='volume') + # if mx: + # return 0, mxmn[1] + + return 0, mx + + chart.view._maxmin = partial(maxmin, names=['volume']) # TODO: fix the x-axis label issue where if you put # the axis on the left it's totally not lined up... @@ -671,8 +650,8 @@ async def open_vlm_displays( # spawn and overlay $ vlm on the same subchart shm, started = await admin.start_engine_task( - 'dolla_vlm', - # linked.symbol.front_feed(), # data-feed symbol key + dolla_vlm, + { # fsp engine conf 'func_name': 'dolla_vlm', 'zero_on_step': True, @@ -704,18 +683,17 @@ async def open_vlm_displays( ) # add custom auto range handler - pi.vb._maxmin = partial(maxmin, name='dolla_vlm') + pi.vb._maxmin = partial( + maxmin, + # keep both regular and dark vlm in view + names=['dolla_vlm', 'dark_vlm'], + ) curve, _ = chart.draw_curve( - name='dolla_vlm', data=shm.array, - array_key='dolla_vlm', overlay=pi, - # color='bracket', - # TODO: this color or dark volume - # color='charcoal', step_mode=True, # **conf.get('chart_kwargs', {}) ) @@ -732,6 +710,17 @@ async def open_vlm_displays( # ``.draw_curve()``. chart._overlays['dolla_vlm'] = shm + curve, _ = chart.draw_curve( + + name='dark_vlm', + data=shm.array, + array_key='dark_vlm', + overlay=pi, + color='charcoal', # darker theme hue + step_mode=True, + # **conf.get('chart_kwargs', {}) + ) + chart._overlays['dark_vlm'] = shm # XXX: old dict-style config before it was moved into the # helper task # 'dolla_vlm': { @@ -759,15 +748,14 @@ async def open_vlm_displays( axis.size_to_values() # built-in vlm fsps - for display_name, conf in { - 'vwap': { - 'func_name': 'vwap', + for target, conf in { + tina_vwap: { 'overlay': 'ohlc', # overlays with OHLCV (main) chart 'anchor': 'session', }, }.items(): started = await admin.open_fsp_chart( - display_name, + target, conf, ) @@ -815,27 +803,26 @@ async def start_fsp_displays( disabled=False ) - # async with gather_contexts(( async with ( # NOTE: this admin internally opens an actor cluster open_fsp_admin(linked, ohlcv) as admin, ): statuses = [] - for display_name, conf in fsp_conf.items(): + for target, conf in fsp_conf.items(): started = await admin.open_fsp_chart( - display_name, + target, conf, ) done = linked.window().status_bar.open_status( - f'loading fsp, {display_name}..', + f'loading fsp, {target}..', group_key=group_status_key, ) statuses.append((started, done)) for fsp_loaded, status_cb in statuses: await fsp_loaded.wait() - profiler(f'attached to fsp portal: {display_name}') + profiler(f'attached to fsp portal: {target}') status_cb() # blocks on nursery until all fsp actors complete diff --git a/requirements.txt b/requirements.txt index ca910e01..680360b0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ # we require a pinned dev branch to get some edge features that # are often untested in tractor's CI and/or being tested by us # first before committing as core features in tractor's base. --e git+git://github.com/goodboy/tractor.git@piker_pin#egg=tractor +-e git+git://github.com/goodboy/tractor.git@master#egg=tractor # `pyqtgraph` peeps keep breaking, fixing, improving so might as well # pin this to a dev branch that we have more control over especially