diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index aedeffc3..4139d60d 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1042,6 +1042,7 @@ tick_types = { # https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume 48: 'dark_trade', + # standard L1 ticks 0: 'bsize', 1: 'bid', 2: 'ask', @@ -1049,6 +1050,12 @@ tick_types = { 4: 'last', 5: 'size', 8: 'volume', + + # ``ib_insync`` already packs these into + # quotes under the following fields. + # 55: 'trades_per_min', # `'tradeRate'` + # 56: 'vlm_per_min', # `'volumeRate'` + # 89: 'shortable', # `'shortableShares'` } @@ -1069,6 +1076,10 @@ def normalize( new_ticks.append(td) + tbt = ticker.tickByTicks + if tbt: + print(f'tickbyticks:\n {ticker.tickByTicks}') + ticker.ticks = new_ticks # some contracts don't have volume so we may want to calculate @@ -1081,6 +1092,11 @@ def normalize( # serialize for transport data = asdict(ticker) + # convert named tuples to dicts for transport + tbts = data.get('tickByTicks') + if tbts: + data['tickByTicks'] = [tbt._asdict() for tbt in tbts] + # add time stamps for downstream latency measurements data['brokerd_ts'] = time.time() @@ -1263,7 +1279,18 @@ async def _setup_quote_stream( to_trio: trio.abc.SendChannel, symbol: str, - opts: tuple[int] = ('375', '233', '236'), + opts: tuple[int] = ( + '375', # RT trade volume (excludes utrades) + '233', # RT trade volume (includes utrades) + '236', # Shortable shares + + # these all appear to only be updated every 25s thus + # making them mostly useless and explains why the scanner + # is always slow XD + # '293', # Trade count for day + '294', # Trade rate / minute + '295', # Vlm rate / minute + ), contract: Optional[Contract] = None, ) -> trio.abc.ReceiveChannel: @@ -1281,6 +1308,12 @@ async def _setup_quote_stream( contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + # NOTE: it's batch-wise and slow af but I guess could + # be good for backchecking? Seems to be every 5s maybe? + # ticker: Ticker = client.ib.reqTickByTickData( + # contract, 'Last', + # ) + # # define a simple queue push routine that streams quote packets # # to trio over the ``to_trio`` memory channel. # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore diff --git a/piker/data/_normalize.py b/piker/data/_normalize.py index 56d64b75..677468ad 100644 --- a/piker/data/_normalize.py +++ b/piker/data/_normalize.py @@ -14,27 +14,67 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Stream format enforcement. -""" -from typing import AsyncIterator, Optional, Tuple - -import numpy as np +''' +from itertools import chain +from typing import AsyncIterator def iterticks( quote: dict, - types: Tuple[str] = ('trade', 'dark_trade'), + types: tuple[str] = ( + 'trade', + 'dark_trade', + ), + deduplicate_darks: bool = False, ) -> AsyncIterator: ''' Iterate through ticks delivered per quote cycle. ''' + if deduplicate_darks: + assert 'dark_trade' in types + # print(f"{quote}\n\n") ticks = quote.get('ticks', ()) + trades = {} + darks = {} + if ticks: + + # do a first pass and attempt to remove duplicate dark + # trades with the same tick signature. + if deduplicate_darks: + for tick in ticks: + ttype = tick.get('type') + + time = tick.get('time', None) + if time: + sig = ( + time, + tick['price'], + tick['size'] + ) + + if ttype == 'dark_trade': + darks[sig] = tick + + elif ttype == 'trade': + trades[sig] = tick + + # filter duplicates + for sig, tick in trades.items(): + tick = darks.pop(sig, None) + if tick: + ticks.remove(tick) + # print(f'DUPLICATE {tick}') + + # re-insert ticks + ticks.extend(list(chain(trades.values(), darks.values()))) + for tick in ticks: # print(f"{quote['symbol']}: {tick}") ttype = tick.get('type') diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index b29b0f7d..669f624e 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -252,7 +252,7 @@ async def sample_and_broadcast( try: stream.send_nowait((sym, quote)) except trio.WouldBlock: - ctx = getattr(sream, '_ctx', None) + ctx = getattr(stream, '_ctx', None) if ctx: log.warning( f'Feed overrun {bus.brokername} ->' @@ -371,7 +371,7 @@ async def uniform_rate_send( # we have a quote already so send it now. - measured_rate = 1 / (time.time() - last_send) + # measured_rate = 1 / (time.time() - last_send) # log.info( # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' # ) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index c741ba1c..5f7fdcd0 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -18,9 +18,10 @@ NumPy compatible shared memory buffers for real-time IPC streaming. """ +from __future__ import annotations from dataclasses import dataclass, asdict from sys import byteorder -from typing import List, Tuple, Optional +from typing import Optional from multiprocessing.shared_memory import SharedMemory, _USE_POSIX from multiprocessing import resource_tracker as mantracker @@ -29,6 +30,7 @@ if _USE_POSIX: import tractor import numpy as np +from pydantic import BaseModel, validator from ..log import get_logger from ._source import base_iohlc_dtype @@ -85,26 +87,34 @@ class SharedInt: shm_unlink(self._shm.name) -@dataclass -class _Token: - """Internal represenation of a shared memory "token" +class _Token(BaseModel): + ''' + Internal represenation of a shared memory "token" which can be used to key a system wide post shm entry. - """ + + ''' + class Config: + frozen = True + shm_name: str # this servers as a "key" value shm_first_index_name: str shm_last_index_name: str - dtype_descr: List[Tuple[str]] + dtype_descr: tuple - def __post_init__(self): - # np.array requires a list for dtype - self.dtype_descr = np.dtype(list(map(tuple, self.dtype_descr))).descr + @property + def dtype(self) -> np.dtype: + return np.dtype(list(map(tuple, self.dtype_descr))).descr def as_msg(self): - return asdict(self) + return self.dict() @classmethod - def from_msg(self, msg: dict) -> '_Token': - return msg if isinstance(msg, _Token) else _Token(**msg) + def from_msg(cls, msg: dict) -> _Token: + if isinstance(msg, _Token): + return msg + + msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) + return _Token(**msg) # TODO: this api? @@ -127,15 +137,17 @@ def _make_token( key: str, dtype: Optional[np.dtype] = None, ) -> _Token: - """Create a serializable token that can be used + ''' + Create a serializable token that can be used to access a shared array. - """ + + ''' dtype = base_iohlc_dtype if dtype is None else dtype return _Token( - key, - key + "_first", - key + "_last", - np.dtype(dtype).descr + shm_name=key, + shm_first_index_name=key + "_first", + shm_last_index_name=key + "_last", + dtype_descr=np.dtype(dtype).descr ) @@ -178,10 +190,10 @@ class ShmArray: @property def _token(self) -> _Token: return _Token( - self._shm.name, - self._first._shm.name, - self._last._shm.name, - self._array.dtype.descr, + shm_name=self._shm.name, + shm_first_index_name=self._first._shm.name, + shm_last_index_name=self._last._shm.name, + dtype_descr=tuple(self._array.dtype.descr), ) @property @@ -402,16 +414,19 @@ def open_shm_array( def attach_shm_array( - token: Tuple[str, str, Tuple[str, str]], + token: tuple[str, str, tuple[str, str]], size: int = _default_size, readonly: bool = True, + ) -> ShmArray: - """Attach to an existing shared memory array previously + ''' + Attach to an existing shared memory array previously created by another process using ``open_shared_array``. No new shared mem is allocated but wrapper types for read/write access are constructed. - """ + + ''' token = _Token.from_msg(token) key = token.shm_name @@ -422,7 +437,7 @@ def attach_shm_array( shm = SharedMemory(name=key) shmarr = np.ndarray( (size,), - dtype=token.dtype_descr, + dtype=token.dtype, buffer=shm.buf ) shmarr.setflags(write=int(not readonly)) @@ -470,8 +485,10 @@ def maybe_open_shm_array( key: str, dtype: Optional[np.dtype] = None, **kwargs, -) -> Tuple[ShmArray, bool]: - """Attempt to attach to a shared memory block using a "key" lookup + +) -> tuple[ShmArray, bool]: + ''' + Attempt to attach to a shared memory block using a "key" lookup to registered blocks in the users overall "system" registry (presumes you don't have the block's explicit token). @@ -485,7 +502,8 @@ def maybe_open_shm_array( If you know the explicit ``_Token`` for your memory segment instead use ``attach_shm_array``. - """ + + ''' try: # see if we already know this key token = _known_tokens[key] diff --git a/piker/data/feed.py b/piker/data/feed.py index b3e1efd6..55f8b9b9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -567,7 +567,7 @@ async def open_feed( shm_token = data['shm_token'] # XXX: msgspec won't relay through the tuples XD - shm_token['dtype_descr'] = list( + shm_token['dtype_descr'] = tuple( map(tuple, shm_token['dtype_descr'])) assert shm_token == shm.token # sanity diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index f2c7cdc8..a332ec5f 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -40,6 +40,8 @@ from tractor.msg import NamespacePath from ..data._sharedmem import ( ShmArray, maybe_open_shm_array, + attach_shm_array, + _Token, ) from ..log import get_logger @@ -72,6 +74,13 @@ class Fsp: # - custom function wrappers, # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers + # actor-local map of source flow shm tokens + # + the consuming fsp *to* the consumers output + # shm flow. + _flow_registry: dict[ + tuple[_Token, str], _Token, + ] = {} + def __init__( self, func: Callable[..., Awaitable], @@ -93,7 +102,7 @@ class Fsp: self.config: dict[str, Any] = config # register with declared set. - _fsp_registry[self.ns_path] = func + _fsp_registry[self.ns_path] = self @property def name(self) -> str: @@ -111,6 +120,24 @@ class Fsp: ): return self.func(*args, **kwargs) + # TODO: lru_cache this? prettty sure it'll work? + def get_shm( + self, + src_shm: ShmArray, + + ) -> ShmArray: + ''' + Provide access to allocated shared mem array + for this "instance" of a signal processor for + the given ``key``. + + ''' + dst_token = self._flow_registry[ + (src_shm._token, self.name) + ] + shm = attach_shm_array(dst_token) + return shm + def fsp( wrapped=None, @@ -132,18 +159,27 @@ def fsp( return Fsp(wrapped, outputs=(wrapped.__name__,)) +def mk_fsp_shm_key( + sym: str, + target: Fsp + +) -> str: + uid = tractor.current_actor().uid + return f'{sym}.fsp.{target.name}.{".".join(uid)}' + + def maybe_mk_fsp_shm( sym: str, - target: fsp, + target: Fsp, readonly: bool = True, -) -> (ShmArray, bool): +) -> (str, ShmArray, bool): ''' Allocate a single row shm array for an symbol-fsp pair if none exists, otherwise load the shm already existing for that token. ''' - uid = tractor.current_actor().uid + assert isinstance(sym, str), '`sym` should be file-name-friendly `str`' # TODO: load output types from `Fsp` # - should `index` be a required internal field? @@ -152,7 +188,7 @@ def maybe_mk_fsp_shm( [(field_name, float) for field_name in target.outputs] ) - key = f'{sym}.fsp.{target.name}.{".".join(uid)}' + key = mk_fsp_shm_key(sym, target) shm, opened = maybe_open_shm_array( key, @@ -160,4 +196,4 @@ def maybe_mk_fsp_shm( dtype=fsp_dtype, readonly=True, ) - return shm, opened + return key, shm, opened diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index afd986e0..1b853c60 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -20,7 +20,10 @@ core task logic for processing chains ''' from dataclasses import dataclass from functools import partial -from typing import AsyncIterator, Callable, Optional +from typing import ( + AsyncIterator, Callable, Optional, + Union, +) import numpy as np import pyqtgraph as pg @@ -34,8 +37,11 @@ from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray -from ._api import Fsp -from ._api import _load_builtins +from ._api import ( + Fsp, + _load_builtins, + _Token, +) log = get_logger(__name__) @@ -96,33 +102,74 @@ async def fsp_compute( # to the async iterable? it's that or we do some kinda # async itertools style? filter_quotes_by_sym(symbol, quote_stream), + + # XXX: currently the ``ohlcv`` arg feed.shm, ) # Conduct a single iteration of fsp with historical bars input # and get historical output + history_output: Union[ + dict[str, np.ndarray], # multi-output case + np.ndarray, # single output case + ] history_output = await out_stream.__anext__() func_name = func.__name__ profiler(f'{func_name} generated history') # build struct array with an 'index' field to push as history - history = np.zeros( - len(history_output), - dtype=dst.array.dtype - ) # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # if the output array is multi-field then push # each respective field. - fields = getattr(history.dtype, 'fields', None) - if fields: + # await tractor.breakpoint() + fields = getattr(dst.array.dtype, 'fields', None).copy() + fields.pop('index') + # TODO: nptyping here! + history: Optional[np.ndarray] = None + if fields and len(fields) > 1 and fields: + if not isinstance(history_output, dict): + raise ValueError( + f'`{func_name}` is a multi-output FSP and should yield a ' + '`dict[str, np.ndarray]` for history' + ) + for key in fields.keys(): - if key in history.dtype.fields: - history[func_name] = history_output + if key in history_output: + output = history_output[key] + + if history is None: + + if output is None: + length = len(src.array) + else: + length = len(output) + + # using the first output, determine + # the length of the struct-array that + # will be pushed to shm. + history = np.zeros( + length, + dtype=dst.array.dtype + ) + + if output is None: + continue + + history[key] = output # single-key output stream else: + if not isinstance(history_output, np.ndarray): + raise ValueError( + f'`{func_name}` is a single output FSP and should yield an ' + '`np.ndarray` for history' + ) + history = np.zeros( + len(history_output), + dtype=dst.array.dtype + ) history[func_name] = history_output # TODO: XXX: @@ -197,6 +244,8 @@ async def cascade( ns_path: NamespacePath, + shm_registry: dict[str, _Token], + zero_on_step: bool = False, loglevel: Optional[str] = None, @@ -219,9 +268,21 @@ async def cascade( log.info( f'Registered FSP set:\n{lines}' ) - func: Fsp = reg.get( + + # update actor local flows table which registers + # readonly "instances" of this fsp for symbol/source + # so that consumer fsps can look it up by source + fsp. + # TODO: ugh i hate this wind/unwind to list over the wire + # but not sure how else to do it. + for (token, fsp_name, dst_token) in shm_registry: + Fsp._flow_registry[ + (_Token.from_msg(token), fsp_name) + ] = _Token.from_msg(dst_token) + + fsp: Fsp = reg.get( NamespacePath(ns_path) ) + func = fsp.func if not func: # TODO: assume it's a func target path diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 29e94f98..01e41c04 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -170,6 +170,32 @@ def _wma( return np.convolve(signal, weights, 'valid') +@fsp +async def wma( + + source, #: AsyncStream[np.ndarray], + length: int, + ohlcv: np.ndarray, # price time-frame "aware" + +) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? + ''' + Streaming weighted moving average. + + ``weights`` is a sequence of already scaled values. As an example + for the WMA often found in "techincal analysis": + ``weights = np.arange(1, N) * N*(N-1)/2``. + + ''' + # deliver historical output as "first yield" + yield _wma(ohlcv.array['close'], length) + + # begin real-time section + + async for quote in source: + for tick in iterticks(quote, type='trade'): + yield _wma(ohlcv.last(length)) + + @fsp async def rsi( @@ -224,29 +250,3 @@ async def rsi( down_ema_last=last_down_ema_close, ) yield rsi_out[-1:] - - -@fsp -async def wma( - - source, #: AsyncStream[np.ndarray], - length: int, - ohlcv: np.ndarray, # price time-frame "aware" - -) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? - ''' - Streaming weighted moving average. - - ``weights`` is a sequence of already scaled values. As an example - for the WMA often found in "techincal analysis": - ``weights = np.arange(1, N) * N*(N-1)/2``. - - ''' - # deliver historical output as "first yield" - yield _wma(ohlcv.array['close'], length) - - # begin real-time section - - async for quote in source: - for tick in iterticks(quote, type='trade'): - yield _wma(ohlcv.last(length)) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 7cf7d7b4..47211234 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -22,17 +22,25 @@ from tractor.trionics._broadcast import AsyncReceiver from ._api import fsp from ..data._normalize import iterticks from ..data._sharedmem import ShmArray +from ._momo import _wma +from ..log import get_logger + +log = get_logger(__name__) +# NOTE: is the same as our `wma` fsp, and if so which one is faster? +# Ohhh, this is an IIR style i think? So it has an anchor point +# effectively instead of a moving window/FIR style? def wap( signal: np.ndarray, weights: np.ndarray, ) -> np.ndarray: - """Weighted average price from signal and weights. + ''' + Weighted average price from signal and weights. - """ + ''' cum_weights = np.cumsum(weights) cum_weighted_input = np.cumsum(signal * weights) @@ -89,7 +97,10 @@ async def tina_vwap( # vwap_tot = h_vwap[-1] async for quote in source: - for tick in iterticks(quote, types=['trade']): + for tick in iterticks( + quote, + types=['trade'], + ): # c, h, l, v = ohlcv.array[-1][ # ['closes', 'high', 'low', 'volume'] @@ -107,8 +118,12 @@ async def tina_vwap( @fsp( - outputs=('dolla_vlm', 'dark_vlm'), - ohlc=False, + outputs=( + 'dolla_vlm', + 'dark_vlm', + 'trade_count', + 'dark_trade_count', + ), curve_style='step', ) async def dolla_vlm( @@ -132,14 +147,24 @@ async def dolla_vlm( v = a['volume'] # on first iteration yield history - yield chl3 * v + yield { + 'dolla_vlm': chl3 * v, + 'dark_vlm': None, + } i = ohlcv.index - output = vlm = 0 - dvlm = 0 + dvlm = vlm = 0 + dark_trade_count = trade_count = 0 async for quote in source: - for tick in iterticks(quote): + for tick in iterticks( + quote, + types=( + 'trade', + 'dark_trade', + ), + deduplicate_darks=True, + ): # this computes tick-by-tick weightings from here forward size = tick['size'] @@ -148,24 +173,30 @@ async def dolla_vlm( li = ohlcv.index if li > i: i = li - vlm = 0 - dvlm = 0 + trade_count = dark_trade_count = dvlm = vlm = 0 # TODO: for marginned instruments (futes, etfs?) we need to # show the margin $vlm by multiplying by whatever multiplier # is reported in the sym info. ttype = tick.get('type') + if ttype == 'dark_trade': - print(f'dark_trade: {tick}') - key = 'dark_vlm' dvlm += price * size - output = dvlm + yield 'dark_vlm', dvlm + + dark_trade_count += 1 + yield 'dark_trade_count', dark_trade_count + + # print(f'{dark_trade_count}th dark_trade: {tick}') else: - key = 'dolla_vlm' + # print(f'vlm: {tick}') vlm += price * size - output = vlm + yield 'dolla_vlm', vlm + + trade_count += 1 + yield 'trade_count', trade_count # TODO: plot both to compare? # c, h, l, v = ohlcv.last()[ @@ -174,4 +205,154 @@ async def dolla_vlm( # tina_lvlm = c+h+l/3 * v # print(f' tinal vlm: {tina_lvlm}') - yield key, output + +@fsp( + # TODO: eventually I guess we should support some kinda declarative + # graphics config syntax per output yah? That seems like a clean way + # to let users configure things? Not sure how exactly to offer that + # api as well as how to expose such a thing *inside* the body? + outputs=( + # pulled verbatim from `ib` for now + '1m_trade_rate', + '1m_vlm_rate', + + # our own instantaneous rate calcs which are all + # parameterized by a samples count (bars) period + 'trade_rate', + 'dark_trade_rate', + + 'dvlm_rate', + 'dark_dvlm_rate', + ), + curve_style='line', +) +async def flow_rates( + source: AsyncReceiver[dict], + ohlcv: ShmArray, # OHLC sampled history + + # TODO (idea): a dynamic generic / boxing type that can be updated by other + # FSPs, user input, and possibly any general event stream in + # real-time. Hint: ideally implemented with caching until mutated + # ;) + period: 'Param[int]' = 6, # noqa + + # TODO: support other means by providing a map + # to weights `partial()`-ed with `wma()`? + mean_type: str = 'arithmetic', + + # TODO (idea): a generic for declaring boxed fsps much like ``pytest`` + # fixtures? This probably needs a lot of thought if we want to offer + # a higher level composition syntax eventually (oh right gotta make + # an issue for that). + # ideas for how to allow composition / intercalling: + # - offer a `Fsp.get_history()` to do the first yield output? + # * err wait can we just have shm access directly? + # - how would it work if some consumer fsp wanted to dynamically + # change params which are input to the callee fsp? i guess we could + # lazy copy in that case? + # dvlm: 'Fsp[dolla_vlm]' + +) -> AsyncIterator[ + tuple[str, Union[np.ndarray, float]], +]: + # generally no history available prior to real-time calcs + yield { + # from ib + '1m_trade_rate': None, + '1m_vlm_rate': None, + + 'trade_rate': None, + 'dark_trade_rate': None, + + 'dvlm_rate': None, + 'dark_dvlm_rate': None, + } + + # TODO: 3.10 do ``anext()`` + quote = await source.__anext__() + + # ltr = 0 + # lvr = 0 + tr = quote.get('tradeRate') + yield '1m_trade_rate', tr or 0 + vr = quote.get('volumeRate') + yield '1m_vlm_rate', vr or 0 + + yield 'trade_rate', 0 + yield 'dark_trade_rate', 0 + yield 'dvlm_rate', 0 + yield 'dark_dvlm_rate', 0 + + # NOTE: in theory we could dynamically allocate a cascade based on + # this call but not sure if that's too "dynamic" in terms of + # validating cascade flows from message typing perspective. + + # attach to ``dolla_vlm`` fsp running + # on this same source flow. + dvlm_shm = dolla_vlm.get_shm(ohlcv) + + # precompute arithmetic mean weights (all ones) + seq = np.full((period,), 1) + weights = seq / seq.sum() + + async for quote in source: + if not quote: + log.error("OH WTF NO QUOTE IN FSP") + continue + + # dvlm_wma = _wma( + # dvlm_shm.array['dolla_vlm'], + # period, + # weights=weights, + # ) + # yield 'dvlm_rate', dvlm_wma[-1] + + if period > 1: + trade_rate_wma = _wma( + dvlm_shm.array['trade_count'], + period, + weights=weights, + ) + trade_rate = trade_rate_wma[-1] + # print(trade_rate) + yield 'trade_rate', trade_rate + else: + # instantaneous rate per sample step + count = dvlm_shm.array['trade_count'][-1] + yield 'trade_rate', count + + # TODO: skip this if no dark vlm is declared + # by symbol info (eg. in crypto$) + # dark_dvlm_wma = _wma( + # dvlm_shm.array['dark_vlm'], + # period, + # weights=weights, + # ) + # yield 'dark_dvlm_rate', dark_dvlm_wma[-1] + + if period > 1: + dark_trade_rate_wma = _wma( + dvlm_shm.array['dark_trade_count'], + period, + weights=weights, + ) + yield 'dark_trade_rate', dark_trade_rate_wma[-1] + else: + # instantaneous rate per sample step + dark_count = dvlm_shm.array['dark_trade_count'][-1] + yield 'dark_trade_rate', dark_count + + # XXX: ib specific schema we should + # probably pre-pack ourselves. + + # tr = quote.get('tradeRate') + # if tr is not None and tr != ltr: + # # print(f'trade rate: {tr}') + # yield '1m_trade_rate', tr + # ltr = tr + + # vr = quote.get('volumeRate') + # if vr is not None and vr != lvr: + # # print(f'vlm rate: {vr}') + # yield '1m_vlm_rate', vr + # lvr = vr diff --git a/piker/ui/_axes.py b/piker/ui/_axes.py index 56096b7d..2363cc84 100644 --- a/piker/ui/_axes.py +++ b/piker/ui/_axes.py @@ -44,10 +44,14 @@ class Axis(pg.AxisItem): self, linkedsplits, typical_max_str: str = '100 000.000', + text_color: str = 'bracket', **kwargs ) -> None: - super().__init__(**kwargs) + super().__init__( + # textPen=textPen, + **kwargs + ) # XXX: pretty sure this makes things slower # self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) @@ -74,15 +78,28 @@ class Axis(pg.AxisItem): }) self.setTickFont(_font.font) + # NOTE: this is for surrounding "border" self.setPen(_axis_pen) + # this is the text color - self.setTextPen(_axis_pen) + # self.setTextPen(pg.mkPen(hcolor(text_color))) + self.text_color = text_color + self.typical_br = _font._qfm.boundingRect(typical_max_str) # size the pertinent axis dimension to a "typical value" self.size_to_values() + @property + def text_color(self) -> str: + return self._text_color + + @text_color.setter + def text_color(self, text_color: str) -> None: + self.setTextPen(pg.mkPen(hcolor(text_color))) + self._text_color = text_color + def size_to_values(self) -> None: pass @@ -109,7 +126,8 @@ class PriceAxis(Axis): def set_title( self, title: str, - view: Optional[ChartView] = None + view: Optional[ChartView] = None, + color: Optional[str] = None, ) -> Label: ''' @@ -123,7 +141,7 @@ class PriceAxis(Axis): label = self.title = Label( view=view or self.linkedView(), fmt_str=title, - color='bracket', + color=color or self.text_color, parent=self, # update_on_range_change=False, ) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index f4a3c19e..a9350d97 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -33,6 +33,7 @@ from PyQt5.QtWidgets import ( import numpy as np import pyqtgraph as pg import trio +from pydantic import BaseModel from ._axes import ( DynamicDateAxis, @@ -614,17 +615,30 @@ class LinkedSplits(QWidget): cpw.sidepane.setMinimumWidth(sp_w) cpw.sidepane.setMaximumWidth(sp_w) -# import pydantic -# class Graphics(pydantic.BaseModel): +# class FlowsTable(pydantic.BaseModel): # ''' # Data-AGGRegate: high level API onto multiple (categorized) -# ``ShmArray``s with high level processing routines for -# graphics computations and display. +# ``Flow``s with high level processing routines for +# multi-graphics computations and display. # ''' -# arrays: dict[str, np.ndarray] = {} -# graphics: dict[str, pg.GraphicsObject] = {} +# flows: dict[str, np.ndarray] = {} + + +class Flow(BaseModel): + ''' + (FinancialSignal-)Flow compound type which wraps a real-time + graphics (curve) and its backing data stream together for high level + access and control. + + ''' + class Config: + arbitrary_types_allowed = True + + name: str + plot: pg.PlotItem + shm: Optional[ShmArray] = None # may be filled in "later" class ChartPlotWidget(pg.PlotWidget): @@ -721,8 +735,9 @@ class ChartPlotWidget(pg.PlotWidget): self.data_key: array, } self._graphics = {} # registry of underlying graphics + # registry of overlay curve names - self._overlays: dict[str, ShmArray] = {} + self._flows: dict[str, Flow] = {} self._feeds: dict[Symbol, Feed] = {} @@ -980,9 +995,6 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: this probably needs its own method? if overlay: - # anchor_at = ('bottom', 'left') - self._overlays[name] = None - if isinstance(overlay, pg.PlotItem): if overlay not in self.pi_overlay.overlays: raise RuntimeError( @@ -990,6 +1002,9 @@ class ChartPlotWidget(pg.PlotWidget): ) pi = overlay + # anchor_at = ('bottom', 'left') + self._flows[name] = Flow(name=name, plot=pi) + else: # anchor_at = ('top', 'left') @@ -1062,7 +1077,7 @@ class ChartPlotWidget(pg.PlotWidget): assert len(array) data_key = array_key or graphics_name - if graphics_name not in self._overlays: + if graphics_name not in self._flows: self._arrays[self.name] = array else: self._arrays[data_key] = array @@ -1164,9 +1179,15 @@ class ChartPlotWidget(pg.PlotWidget): # f"begin: {begin}, end: {end}, extra: {extra}" # ) - a = self._arrays[name or self.name] + # TODO: here we should instead look up the ``Flow.shm.array`` + # and read directly from shm to avoid copying to memory first + # and then reading it again here. + a = self._arrays.get(name or self.name) + if a is None: + return None + ifirst = a[0]['index'] - bars = a[lbar - ifirst:rbar - ifirst + 1] + bars = a[lbar - ifirst:(rbar - ifirst) + 1] if not len(bars): # likely no data loaded yet or extreme scrolling? diff --git a/piker/ui/_cursor.py b/piker/ui/_cursor.py index f6f8edde..e006858e 100644 --- a/piker/ui/_cursor.py +++ b/piker/ui/_cursor.py @@ -253,7 +253,7 @@ class ContentsLabels: and index < array[-1]['index'] ): # out of range - print('out of range?') + print('WTF out of range?') continue # array = chart._arrays[name] @@ -550,17 +550,20 @@ class Cursor(pg.GraphicsObject): for cursor in opts.get('cursors', ()): cursor.setIndex(ix) - # update the label on the bottom of the crosshair - axes = plot.plotItem.axes - + # Update the label on the bottom of the crosshair. # TODO: make this an up-front calc that we update - # on axis-widget resize events. + # on axis-widget resize events instead of on every mouse + # update cylce. + # left axis offset width for calcuating # absolute x-axis label placement. left_axis_width = 0 - left = axes.get('left') - if left: - left_axis_width = left['item'].width() + if len(plot.pi_overlay.overlays): + # breakpoint() + lefts = plot.pi_overlay.get_axes('left') + if lefts: + for left in lefts: + left_axis_width += left.width() # map back to abs (label-local) coordinates self.xaxis_label.update_label( diff --git a/piker/ui/_curve.py b/piker/ui/_curve.py index 15b19f76..7fc43e4e 100644 --- a/piker/ui/_curve.py +++ b/piker/ui/_curve.py @@ -24,6 +24,7 @@ import numpy as np import pyqtgraph as pg from PyQt5 import QtGui, QtWidgets from PyQt5.QtCore import ( + Qt, QLineF, QSizeF, QRectF, @@ -85,6 +86,14 @@ def step_path_arrays_from_1d( return x_out, y_out +_line_styles: dict[str, int] = { + 'solid': Qt.PenStyle.SolidLine, + 'dash': Qt.PenStyle.DashLine, + 'dot': Qt.PenStyle.DotLine, + 'dashdot': Qt.PenStyle.DashDotLine, +} + + # TODO: got a feeling that dropping this inheritance gets us even more speedups class FastAppendCurve(pg.PlotCurveItem): ''' @@ -106,6 +115,8 @@ class FastAppendCurve(pg.PlotCurveItem): step_mode: bool = False, color: str = 'default_lightest', fill_color: Optional[str] = None, + style: str = 'solid', + name: Optional[str] = None, **kwargs @@ -114,14 +125,22 @@ class FastAppendCurve(pg.PlotCurveItem): # TODO: we can probably just dispense with the parent since # we're basically only using the pen setting now... super().__init__(*args, **kwargs) - + self._name = name self._xrange: tuple[int, int] = self.dataBounds(ax=0) # all history of curve is drawn in single px thickness - self.setPen(hcolor(color)) + pen = pg.mkPen(hcolor(color)) + pen.setStyle(_line_styles[style]) + + if 'dash' in style: + pen.setDashPattern([8, 3]) + + self.setPen(pen) # last segment is drawn in 2px thickness for emphasis - self.last_step_pen = pg.mkPen(hcolor(color), width=2) + # self.last_step_pen = pg.mkPen(hcolor(color), width=2) + self.last_step_pen = pg.mkPen(pen, width=2) + self._last_line: QLineF = None self._last_step_rect: QRectF = None @@ -135,7 +154,12 @@ class FastAppendCurve(pg.PlotCurveItem): # interactions slower (such as zooming) and if so maybe if/when # we implement a "history" mode for the view we disable this in # that mode? - self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) + if step_mode: + # don't enable caching by default for the case where the + # only thing drawn is the "last" line segment which can + # have a weird artifact where it won't be fully drawn to its + # endpoint (something we saw on trade rate curves) + self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) def update_from_array( self, @@ -245,10 +269,13 @@ class FastAppendCurve(pg.PlotCurveItem): # self.path.connectPath(append_path) path.connectPath(append_path) - # XXX: pretty annoying but, without this there's little - # artefacts on the append updates to the curve... - self.setCacheMode(QtWidgets.QGraphicsItem.NoCache) - self.prepareGeometryChange() + self.disable_cache() + flip_cache = True + + if ( + self._step_mode + ): + self.disable_cache() flip_cache = True # print(f"update br: {self.path.boundingRect()}") @@ -273,6 +300,7 @@ class FastAppendCurve(pg.PlotCurveItem): x_last + 0.5, y_last ) else: + # print((x[-1], y_last)) self._last_line = QLineF( x[-2], y[-2], x[-1], y_last @@ -287,6 +315,12 @@ class FastAppendCurve(pg.PlotCurveItem): # XXX: seems to be needed to avoid artifacts (see above). self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) + def disable_cache(self) -> None: + # XXX: pretty annoying but, without this there's little + # artefacts on the append updates to the curve... + self.setCacheMode(QtWidgets.QGraphicsItem.NoCache) + self.prepareGeometryChange() + def boundingRect(self): if self.path is None: return QtGui.QPainterPath().boundingRect() @@ -323,6 +357,7 @@ class FastAppendCurve(pg.PlotCurveItem): p: QtGui.QPainter, opt: QtWidgets.QStyleOptionGraphicsItem, w: QtWidgets.QWidget + ) -> None: profiler = pg.debug.Profiler(disabled=not pg_profile_enabled()) @@ -340,11 +375,11 @@ class FastAppendCurve(pg.PlotCurveItem): # p.drawPath(self.path) # profiler('.drawPath()') - # else: p.setPen(self.last_step_pen) p.drawLine(self._last_line) profiler('.drawLine()') + # else: p.setPen(self.opts['pen']) p.drawPath(self.path) profiler('.drawPath()') diff --git a/piker/ui/_display.py b/piker/ui/_display.py index c2333350..3bfd327a 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -54,7 +54,7 @@ from ..log import get_logger log = get_logger(__name__) # TODO: load this from a config.toml! -_quote_throttle_rate: int = 58 # Hz +_quote_throttle_rate: int = 6 + 16 # Hz # a working tick-type-classes template @@ -106,7 +106,7 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view -async def update_linked_charts_graphics( +async def graphics_update_loop( linked: LinkedSplits, stream: tractor.MsgStream, ohlcv: np.ndarray, @@ -258,13 +258,18 @@ async def update_linked_charts_graphics( ) last_mx_vlm = mx_vlm_in_view - for curve_name, shm in vlm_chart._overlays.items(): + for curve_name, flow in vlm_chart._flows.items(): update_fsp_chart( vlm_chart, - shm, + flow.shm, curve_name, array_key=curve_name, ) + # is this even doing anything? + flow.plot.vb._set_yrange( + autoscale_linked_plots=False, + name=curve_name, + ) ticks_frame = quote.get('ticks', ()) @@ -411,14 +416,14 @@ async def update_linked_charts_graphics( # TODO: all overlays on all subplots.. # run synchronous update on all derived overlays - for curve_name, shm in chart._overlays.items(): + for curve_name, flow in chart._flows.items(): update_fsp_chart( chart, - shm, + flow.shm, curve_name, array_key=curve_name, ) - # chart._set_yrange() + # chart.view._set_yrange() async def check_for_new_bars( @@ -473,11 +478,11 @@ async def check_for_new_bars( ) # main chart overlays - for name in price_chart._overlays: - + # for name in price_chart._flows: + for curve_name in price_chart._flows: price_chart.update_curve_from_array( - name, - price_chart._arrays[name] + curve_name, + price_chart._arrays[curve_name] ) # each subplot @@ -614,7 +619,7 @@ async def display_symbol_data( # start graphics update loop after receiving first live quote ln.start_soon( - update_linked_charts_graphics, + graphics_update_loop, linkedsplits, feed.stream, ohlcv, diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 937b7e1e..eac8f27d 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -22,6 +22,7 @@ Financial signal processing cluster and real-time graphics management. ''' from contextlib import asynccontextmanager as acm from functools import partial +import inspect from itertools import cycle from typing import Optional, AsyncGenerator, Any @@ -37,6 +38,7 @@ from .._cacheables import maybe_open_context from ..calc import humanize from ..data._sharedmem import ( ShmArray, + _Token, try_read, ) from ._chart import ( @@ -50,7 +52,11 @@ from ._forms import ( ) from ..fsp._api import maybe_mk_fsp_shm, Fsp from ..fsp import cascade -from ..fsp._volume import tina_vwap, dolla_vlm +from ..fsp._volume import ( + tina_vwap, + dolla_vlm, + flow_rates, +) from ..log import get_logger log = get_logger(__name__) @@ -153,7 +159,11 @@ async def open_fsp_sidepane( sidepane.model = FspConfig() # just a logger for now until we get fsp configs up and running. - async def settings_change(key: str, value: str) -> bool: + async def settings_change( + key: str, + value: str + + ) -> bool: print(f'{key}: {value}') return True @@ -240,7 +250,7 @@ async def run_fsp_ui( **conf.get('chart_kwargs', {}) ) # specially store ref to shm for lookup in display loop - chart._overlays[name] = shm + chart._flows[name].shm = shm else: # create a new sub-chart widget for this fsp @@ -363,6 +373,7 @@ class FspAdmin: tuple, tuple[tractor.MsgStream, ShmArray] ] = {} + self._flow_registry: dict[_Token, str] = {} self.src_shm = src_shm def rr_next_portal(self) -> tractor.Portal: @@ -407,6 +418,11 @@ class FspAdmin: loglevel=loglevel, zero_on_step=conf.get('zero_on_step', False), + shm_registry=[ + (token.as_msg(), fsp_name, dst_token.as_msg()) + for (token, fsp_name), dst_token + in self._flow_registry.items() + ], ) as (ctx, last_index), ctx.open_stream() as stream, @@ -439,11 +455,15 @@ class FspAdmin: fqsn = self.linked.symbol.front_feed() # allocate an output shm array - dst_shm, opened = maybe_mk_fsp_shm( - fqsn, + key, dst_shm, opened = maybe_mk_fsp_shm( + '.'.join(fqsn), target=target, readonly=True, ) + self._flow_registry[ + (self.src_shm._token, target.name) + ] = dst_shm._token + # if not opened: # raise RuntimeError( # f'Already started FSP `{fqsn}:{func_name}`' @@ -555,15 +575,22 @@ async def open_vlm_displays( be spawned here. ''' + sig = inspect.signature(flow_rates.func) + params = sig.parameters + async with ( open_fsp_sidepane( linked, { - 'vlm': { + 'flows': { + + # TODO: add support for dynamically changing these 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. + u'\u03BC' + '_type': { + 'default_value': str(params['mean_type'].default), + }, + 'period': { + 'default_value': str(params['period'].default), + # make widget un-editable for now. 'widget_kwargs': {'readonly': True}, }, }, @@ -572,6 +599,12 @@ async def open_vlm_displays( ) as sidepane, open_fsp_admin(linked, ohlcv) as admin, ): + # TODO: support updates + # period_field = sidepane.fields['period'] + # period_field.setText( + # str(period_param.default) + # ) + # built-in vlm which we plot ASAP since it's # usually data provided directly with OHLC history. shm = ohlcv @@ -596,18 +629,19 @@ async def open_vlm_displays( names: list[str], ) -> tuple[float, float]: + mx = 0 for name in names: + mxmn = chart.maxmin(name=name) if mxmn: - mx = max(mxmn[1], mx) - - # if mx: - # return 0, mxmn[1] + ymax = mxmn[1] + if ymax > mx: + mx = ymax return 0, mx - chart.view._maxmin = partial(maxmin, names=['volume']) + chart.view.maxmin = partial(maxmin, names=['volume']) # TODO: fix the x-axis label issue where if you put # the axis on the left it's totally not lined up... @@ -648,8 +682,9 @@ async def open_vlm_displays( if dvlm: + tasks_ready = [] # spawn and overlay $ vlm on the same subchart - shm, started = await admin.start_engine_task( + dvlm_shm, started = await admin.start_engine_task( dolla_vlm, { # fsp engine conf @@ -663,11 +698,26 @@ async def open_vlm_displays( }, # loglevel, ) + tasks_ready.append(started) + + # FIXME: we should error on starting the same fsp right + # since it might collide with existing shm.. or wait we + # had this before?? + # dolla_vlm, + + tasks_ready.append(started) # profiler(f'created shm for fsp actor: {display_name}') - await started.wait() + # wait for all engine tasks to startup + async with trio.open_nursery() as n: + for event in tasks_ready: + n.start_soon(event.wait) - pi = chart.overlay_plotitem( + # dolla vlm overlay + # XXX: the main chart already contains a vlm "units" axis + # so here we add an overlay wth a y-range in + # $ liquidity-value units (normally a fiat like USD). + dvlm_pi = chart.overlay_plotitem( 'dolla_vlm', index=0, # place axis on inside (nearest to chart) axis_title=' $vlm', @@ -679,24 +729,29 @@ async def open_vlm_displays( digits=2, ), }, - ) + # all to be overlayed curve names + fields = [ + 'dolla_vlm', + 'dark_vlm', + ] + dvlm_rate_fields = [ + 'dvlm_rate', + 'dark_dvlm_rate', + ] + trade_rate_fields = [ + 'trade_rate', + 'dark_trade_rate', + ] + # add custom auto range handler - pi.vb._maxmin = partial( + dvlm_pi.vb._maxmin = partial( maxmin, # keep both regular and dark vlm in view - names=['dolla_vlm', 'dark_vlm'], + names=fields + dvlm_rate_fields, ) - curve, _ = chart.draw_curve( - name='dolla_vlm', - data=shm.array, - array_key='dolla_vlm', - overlay=pi, - step_mode=True, - # **conf.get('chart_kwargs', {}) - ) # TODO: is there a way to "sync" the dual axes such that only # one curve is needed? # hide the original vlm curve since the $vlm one is now @@ -704,48 +759,117 @@ async def open_vlm_displays( # liquidity events (well at least on low OHLC periods - 1s). vlm_curve.hide() - # TODO: we need a better API to do this.. - # specially store ref to shm for lookup in display loop - # since only a placeholder of `None` is entered in - # ``.draw_curve()``. - chart._overlays['dolla_vlm'] = shm + # use slightly less light (then bracket) gray + # for volume from "main exchange" and a more "bluey" + # gray for "dark" vlm. + vlm_color = 'i3' + dark_vlm_color = 'charcoal' - curve, _ = chart.draw_curve( + # add dvlm (step) curves to common view + def chart_curves( + names: list[str], + pi: pg.PlotItem, + shm: ShmArray, + step_mode: bool = False, + style: str = 'solid', - name='dark_vlm', - data=shm.array, - array_key='dark_vlm', - overlay=pi, - color='charcoal', # darker theme hue + ) -> None: + for name in names: + if 'dark' in name: + color = dark_vlm_color + elif 'rate' in name: + color = vlm_color + else: + color = 'bracket' + + curve, _ = chart.draw_curve( + # name='dolla_vlm', + name=name, + data=shm.array, + array_key=name, + overlay=pi, + color=color, + step_mode=step_mode, + style=style, + ) + + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + chart._flows[name].shm = shm + + chart_curves( + fields, + dvlm_pi, + dvlm_shm, step_mode=True, - # **conf.get('chart_kwargs', {}) ) - chart._overlays['dark_vlm'] = shm - # XXX: old dict-style config before it was moved into the - # helper task - # 'dolla_vlm': { - # 'func_name': 'dolla_vlm', - # 'zero_on_step': True, - # 'overlay': 'volume', - # 'separate_axes': True, - # 'params': { - # 'price_func': { - # 'default_value': 'chl3', - # # tell target ``Edit`` widget to not allow - # # edits for now. - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - # 'chart_kwargs': {'step_mode': True} - # }, - # } + # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is + # up since this one depends on it. - for name, axis_info in pi.axes.items(): - # lol this sux XD - axis = axis_info['item'] - if isinstance(axis, PriceAxis): - axis.size_to_values() + fr_shm, started = await admin.start_engine_task( + flow_rates, + { # fsp engine conf + 'func_name': 'flow_rates', + 'zero_on_step': True, + }, + # loglevel, + ) + await started.wait() + + chart_curves( + dvlm_rate_fields, + dvlm_pi, + fr_shm, + ) + + # Trade rate overlay + # XXX: requires an additional overlay for + # a trades-per-period (time) y-range. + tr_pi = chart.overlay_plotitem( + 'trade_rates', + + # TODO: dynamically update period (and thus this axis?) + # title from user input. + axis_title='clears', + + axis_side='left', + axis_kwargs={ + 'typical_max_str': ' 10.0 M ', + 'formatter': partial( + humanize, + digits=2, + ), + 'text_color': vlm_color, + }, + + ) + # add custom auto range handler + tr_pi.vb.maxmin = partial( + maxmin, + # keep both regular and dark vlm in view + names=trade_rate_fields, + ) + + chart_curves( + trade_rate_fields, + tr_pi, + fr_shm, + # step_mode=True, + + # dashed line to represent "individual trades" being + # more "granular" B) + style='dash', + ) + + for pi in (dvlm_pi, tr_pi): + for name, axis_info in pi.axes.items(): + # lol this sux XD + axis = axis_info['item'] + if isinstance(axis, PriceAxis): + axis.size_to_values() # built-in vlm fsps for target, conf in { diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index 4168a3ff..dca41855 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -342,7 +342,7 @@ class ChartView(ViewBox): wheelEventRelay = QtCore.Signal(object, object, object) event_relay_source: 'Optional[ViewBox]' = None - relays: dict[str, Signal] = {} + relays: dict[str, QtCore.Signal] = {} def __init__( self, @@ -421,6 +421,14 @@ class ChartView(ViewBox): if self._maxmin is None: self._maxmin = chart.maxmin + @property + def maxmin(self) -> Callable: + return self._maxmin + + @maxmin.setter + def maxmin(self, callback: Callable) -> None: + self._maxmin = callback + def wheelEvent( self, ev, @@ -474,7 +482,11 @@ class ChartView(ViewBox): # lastPos = ev.lastPos() # dif = pos - lastPos # dif = dif * -1 - center = Point(fn.invertQTransform(self.childGroup.transform()).map(ev.pos())) + center = Point( + fn.invertQTransform( + self.childGroup.transform() + ).map(ev.pos()) + ) # scale_y = 1.3 ** (center.y() * -1 / 20) self.scaleBy(s, center) @@ -674,7 +686,8 @@ class ChartView(ViewBox): # flag to prevent triggering sibling charts from the same linked # set from recursion errors. autoscale_linked_plots: bool = True, - autoscale_overlays: bool = False, + name: Optional[str] = None, + # autoscale_overlays: bool = False, ) -> None: ''' @@ -731,7 +744,12 @@ class ChartView(ViewBox): ) if set_range: - ylow, yhigh = self._maxmin() + + yrange = self._maxmin() + if yrange is None: + return + + ylow, yhigh = yrange # view margins: stay within a % of the "true range" diff = yhigh - ylow diff --git a/piker/ui/_overlay.py b/piker/ui/_overlay.py index 256909bd..65ec2364 100644 --- a/piker/ui/_overlay.py +++ b/piker/ui/_overlay.py @@ -103,11 +103,6 @@ class ComposedGridLayout: dict[str, AxisItem], ] = {} - self._axes2pi: dict[ - AxisItem, - dict[str, PlotItem], - ] = {} - # TODO: better name? # construct surrounding layouts for placing outer axes and # their legends and title labels. @@ -158,8 +153,8 @@ class ComposedGridLayout: for name, axis_info in plotitem.axes.items(): axis = axis_info['item'] # register this plot's (maybe re-placed) axes for lookup. - self._pi2axes.setdefault(index, {})[name] = axis - self._axes2pi.setdefault(index, {})[name] = plotitem + # print(f'inserting {name}:{axis} to index {index}') + self._pi2axes.setdefault(name, {})[index] = axis # enter plot into list for index tracking self.items.insert(index, plotitem) @@ -213,11 +208,12 @@ class ComposedGridLayout: # invert insert index for layouts which are # not-left-to-right, top-to-bottom insert oriented + insert_index = index if name in ('top', 'left'): - index = min(len(axes) - index, 0) - assert index >= 0 + insert_index = min(len(axes) - index, 0) + assert insert_index >= 0 - linlayout.insertItem(index, axis) + linlayout.insertItem(insert_index, axis) axes.insert(index, axis) self._register_item(index, plotitem) @@ -243,13 +239,15 @@ class ComposedGridLayout: plot: PlotItem, name: str, - ) -> AxisItem: + ) -> Optional[AxisItem]: ''' - Retrieve the named axis for overlayed ``plot``. + Retrieve the named axis for overlayed ``plot`` or ``None`` + if axis for that name is not shown. ''' index = self.items.index(plot) - return self._pi2axes[index][name] + named = self._pi2axes[name] + return named.get(index) def pop( self, @@ -341,7 +339,7 @@ def mk_relay_method( # halt/short circuit the graphicscene loop). Further the # surrounding handler for this signal must be allowed to execute # and get processed by **this consumer**. - print(f'{vb.name} rx relayed from {relayed_from.name}') + # print(f'{vb.name} rx relayed from {relayed_from.name}') ev.ignore() return slot( @@ -351,7 +349,7 @@ def mk_relay_method( ) if axis is not None: - print(f'{vb.name} handling axis event:\n{str(ev)}') + # print(f'{vb.name} handling axis event:\n{str(ev)}') ev.accept() return slot( vb, @@ -490,7 +488,6 @@ class PlotItemOverlay: vb.setZValue(1000) # XXX: critical for scene layering/relaying self.overlays: list[PlotItem] = [] - from piker.ui._overlay import ComposedGridLayout self.layout = ComposedGridLayout( root_plotitem, root_plotitem.layout, @@ -511,7 +508,7 @@ class PlotItemOverlay: ) -> None: - index = index or 0 + index = index or len(self.overlays) root = self.root_plotitem # layout: QGraphicsGridLayout = root.layout self.overlays.insert(index, plotitem) @@ -613,6 +610,26 @@ class PlotItemOverlay: ''' return self.layout.get_axis(plot, name) + def get_axes( + self, + name: str, + + ) -> list[AxisItem]: + ''' + Retrieve all axes for all plots with ``name: str``. + + If a particular overlay doesn't have a displayed named axis + then it is not delivered in the returned ``list``. + + ''' + axes = [] + for plot in self.overlays: + axis = self.layout.get_axis(plot, name) + if axis: + axes.append(axis) + + return axes + # TODO: i guess we need this if you want to detach existing plots # dynamically? XXX: untested as of now. def _disconnect_all( diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 6795f384..755e72f3 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -433,9 +433,12 @@ class OrderMode: [ 'notify-send', '-u', 'normal', - '-t', '10000', + '-t', '1616', 'piker', - f'alert: {msg}', + + # TODO: add in standard fill/exec info that maybe we + # pack in a broker independent way? + f'{msg["resp"]}: {msg["trigger_price"]}', ], ) log.runtime(result) @@ -666,7 +669,7 @@ async def open_order_mode( ) # vbox.setAlignment(feed_label, Qt.AlignBottom) # vbox.setAlignment(Qt.AlignBottom) - blank_h = chart.height() - ( + _ = chart.height() - ( form.height() + form.fill_bar.height() # feed_label.height()