diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 30e57b9e..670eed6f 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -21,6 +21,7 @@ Kraken backend. from contextlib import asynccontextmanager as acm from dataclasses import asdict, field from datetime import datetime +from pprint import pformat from typing import Any, Optional, AsyncIterator, Callable, Union import time @@ -569,7 +570,10 @@ async def handle_order_requests( order: BrokerdOrder async for request_msg in ems_order_stream: - log.info(f'Received order request {request_msg}') + log.info( + 'Received order request:\n' + f'{pformat(request_msg)}' + ) action = request_msg['action'] @@ -628,6 +632,7 @@ async def handle_order_requests( # update the internal pairing of oid to krakens # txid with the new txid that is returned on edit reqid = resp['result']['txid'] + # deliver ack that order has been submitted to broker routing await ems_order_stream.send( BrokerdOrderAck( @@ -788,7 +793,10 @@ async def trades_dialogue( # Get websocket token for authenticated data stream # Assert that a token was actually received. resp = await client.endpoint('GetWebSocketsToken', {}) + + # lol wtf is this.. assert resp['error'] == [] + token = resp['result']['token'] async with ( diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index a5d04f0c..17f9be1a 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -80,7 +80,9 @@ def mk_check( return check_lt - raise ValueError('trigger: {trigger_price}, last: {known_last}') + raise ValueError( + f'trigger: {trigger_price}, last: {known_last}' + ) @dataclass @@ -561,7 +563,10 @@ async def translate_and_relay_brokerd_events( name = brokerd_msg['name'] - log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}') + log.info( + f'Received broker trade event:\n' + f'{pformat(brokerd_msg)}' + ) if name == 'position': @@ -613,19 +618,28 @@ async def translate_and_relay_brokerd_events( # packed at submission since we already know it ahead of # time paper = brokerd_msg['broker_details'].get('paper_info') + ext = brokerd_msg['broker_details'].get('external') if paper: # paperboi keeps the ems id up front oid = paper['oid'] - else: + elif ext: # may be an order msg specified as "external" to the # piker ems flow (i.e. generated by some other # external broker backend client (like tws for ib) - ext = brokerd_msg['broker_details'].get('external') - if ext: - log.error(f"External trade event {ext}") + log.error(f"External trade event {ext}") continue + + else: + # something is out of order, we don't have an oid for + # this broker-side message. + log.error( + 'Unknown oid:{oid} for msg:\n' + f'{pformat(brokerd_msg)}' + 'Unable to relay message to client side!?' + ) + else: # check for existing live flow entry entry = book._ems_entries.get(oid) @@ -823,7 +837,9 @@ async def process_client_order_cmds( if reqid: # send cancel to brokerd immediately! - log.info("Submitting cancel for live order {reqid}") + log.info( + f'Submitting cancel for live order {reqid}' + ) await brokerd_order_stream.send(msg.dict()) diff --git a/piker/config.py b/piker/config.py index cf946405..d1926dec 100644 --- a/piker/config.py +++ b/piker/config.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) 2018-present Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -16,6 +16,7 @@ """ Broker configuration mgmt. + """ import platform import sys @@ -50,7 +51,7 @@ def get_app_dir(app_name, roaming=True, force_posix=False): Unix (POSIX): ``~/.foo-bar`` Win XP (roaming): - ``C:\Documents and Settings\\Local Settings\Application Data\Foo Bar`` + ``C:\Documents and Settings\\Local Settings\Application Data\Foo`` Win XP (not roaming): ``C:\Documents and Settings\\Application Data\Foo Bar`` Win 7 (roaming): @@ -81,7 +82,8 @@ def get_app_dir(app_name, roaming=True, force_posix=False): folder = os.path.expanduser("~") return os.path.join(folder, app_name) if force_posix: - return os.path.join(os.path.expanduser("~/.{}".format(_posixify(app_name)))) + return os.path.join( + os.path.expanduser("~/.{}".format(_posixify(app_name)))) if sys.platform == "darwin": return os.path.join( os.path.expanduser("~/Library/Application Support"), app_name @@ -107,7 +109,12 @@ if _parent_user: ] ) -_file_name = 'brokers.toml' +_conf_names: set[str] = { + 'brokers', + 'trades', + 'watchlists', +} + _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') _context_defaults = dict( default_map={ @@ -129,23 +136,43 @@ def _override_config_dir( _config_dir = path -def get_broker_conf_path(): +def _conf_fn_w_ext( + name: str, +) -> str: + # change this if we ever change the config file format. + return f'{name}.toml' + + +def get_conf_path( + conf_name: str = 'brokers', + +) -> str: """Return the default config path normally under ``~/.config/piker`` on linux. Contains files such as: - brokers.toml - watchlists.toml + - trades.toml + + # maybe coming soon ;) - signals.toml - strats.toml """ - return os.path.join(_config_dir, _file_name) + assert conf_name in _conf_names + fn = _conf_fn_w_ext(conf_name) + return os.path.join( + _config_dir, + fn, + ) def repodir(): - """Return the abspath to the repo directory. - """ + ''' + Return the abspath to the repo directory. + + ''' dirpath = os.path.abspath( # we're 3 levels down in **this** module file dirname(dirname(os.path.realpath(__file__))) @@ -154,16 +181,27 @@ def repodir(): def load( + conf_name: str = 'brokers', path: str = None + ) -> (dict, str): - """Load broker config. - """ - path = path or get_broker_conf_path() + ''' + Load config file by name. + + ''' + path = path or get_conf_path(conf_name) if not os.path.isfile(path): - shutil.copyfile( - os.path.join(repodir(), 'config', 'brokers.toml'), - path, + fn = _conf_fn_w_ext(conf_name) + + template = os.path.join( + repodir(), + 'config', + fn ) + # try to copy in a template config to the user's directory + # if one exists. + if os.path.isfile(template): + shutil.copyfile(template, path) config = toml.load(path) log.debug(f"Read config file {path}") @@ -172,13 +210,17 @@ def load( def write( config: dict, # toml config as dict + name: str = 'brokers', path: str = None, + ) -> None: - """Write broker config to disk. + '''' + Write broker config to disk. Create a ``brokers.ini`` file if one does not exist. - """ - path = path or get_broker_conf_path() + + ''' + path = path or get_conf_path(name) dirname = os.path.dirname(path) if not os.path.isdir(dirname): log.debug(f"Creating config dir {_config_dir}") @@ -188,7 +230,10 @@ def write( raise ValueError( "Watch out you're trying to write a blank config!") - log.debug(f"Writing config file {path}") + log.debug( + f"Writing config `{name}` file to:\n" + f"{path}" + ) with open(path, 'w') as cf: return toml.dump(config, cf) @@ -218,4 +263,5 @@ def load_accounts( # our default paper engine entry accounts['paper'] = None + return accounts diff --git a/piker/data/_ahab.py b/piker/data/_ahab.py index 0f96ecaa..fea19a4d 100644 --- a/piker/data/_ahab.py +++ b/piker/data/_ahab.py @@ -98,8 +98,6 @@ async def open_docker( finally: if client: client.close() - for c in client.containers.list(): - c.kill() class Container: diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 8bc677cf..77b15d7f 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -22,7 +22,7 @@ financial data flows. from __future__ import annotations from collections import Counter import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Union import tractor import trio @@ -32,6 +32,7 @@ from ..log import get_logger if TYPE_CHECKING: from ._sharedmem import ShmArray + from .feed import _FeedsBus log = get_logger(__name__) @@ -142,11 +143,17 @@ async def broadcast( shm: Optional[ShmArray] = None, ) -> None: - # broadcast the buffer index step to any subscribers for - # a given sample period. + ''' + Broadcast the given ``shm: ShmArray``'s buffer index step to any + subscribers for a given sample period. + + The sent msg will include the first and last index which slice into + the buffer's non-empty data. + + ''' subs = sampler.subscribers.get(delay_s, ()) - last = -1 + first = last = -1 if shm is None: periods = sampler.ohlcv_shms.keys() @@ -156,11 +163,16 @@ async def broadcast( if periods: lowest = min(periods) shm = sampler.ohlcv_shms[lowest][0] + first = shm._first.value last = shm._last.value for stream in subs: try: - await stream.send({'index': last}) + await stream.send({ + 'first': first, + 'last': last, + 'index': last, + }) except ( trio.BrokenResourceError, trio.ClosedResourceError @@ -168,7 +180,12 @@ async def broadcast( log.error( f'{stream._ctx.chan.uid} dropped connection' ) - subs.remove(stream) + try: + subs.remove(stream) + except ValueError: + log.warning( + f'{stream._ctx.chan.uid} sub already removed!?' + ) @tractor.context @@ -203,7 +220,7 @@ async def iter_ohlc_periods( async def sample_and_broadcast( - bus: '_FeedsBus', # noqa + bus: _FeedsBus, # noqa shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, brokername: str, @@ -282,7 +299,13 @@ async def sample_and_broadcast( # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. - subs = bus._subscribers[broker_symbol.lower()] + subs: list[ + tuple[ + Union[tractor.MsgStream, trio.MemorySendChannel], + tractor.Context, + Optional[float], # tick throttle in Hz + ] + ] = bus._subscribers[broker_symbol.lower()] # NOTE: by default the broker backend doesn't append # it's own "name" into the fqsn schema (but maybe it @@ -291,7 +314,7 @@ async def sample_and_broadcast( bsym = f'{broker_symbol}.{brokername}' lags: int = 0 - for (stream, tick_throttle) in subs: + for (stream, ctx, tick_throttle) in subs: try: with trio.move_on_after(0.2) as cs: @@ -303,25 +326,41 @@ async def sample_and_broadcast( (bsym, quote) ) except trio.WouldBlock: - ctx = getattr(stream, '_ctx', None) + chan = ctx.chan if ctx: log.warning( f'Feed overrun {bus.brokername} ->' - f'{ctx.channel.uid} !!!' + f'{chan.uid} !!!' ) else: key = id(stream) overruns[key] += 1 log.warning( - f'Feed overrun {bus.brokername} -> ' + f'Feed overrun {broker_symbol}' + '@{bus.brokername} -> ' f'feed @ {tick_throttle} Hz' ) if overruns[key] > 6: - log.warning( - f'Dropping consumer {stream}' - ) - await stream.aclose() - raise trio.BrokenResourceError + # TODO: should we check for the + # context being cancelled? this + # could happen but the + # channel-ipc-pipe is still up. + if not chan.connected(): + log.warning( + 'Dropping broken consumer:\n' + f'{broker_symbol}:' + f'{ctx.cid}@{chan.uid}' + ) + await stream.aclose() + raise trio.BrokenResourceError + else: + log.warning( + 'Feed getting overrun bro!\n' + f'{broker_symbol}:' + f'{ctx.cid}@{chan.uid}' + ) + continue + else: await stream.send( {bsym: quote} @@ -337,11 +376,12 @@ async def sample_and_broadcast( trio.ClosedResourceError, trio.EndOfChannel, ): - ctx = getattr(stream, '_ctx', None) + chan = ctx.chan if ctx: log.warning( - f'{ctx.chan.uid} dropped ' - '`brokerd`-quotes-feed connection' + 'Dropped `brokerd`-quotes-feed connection:\n' + f'{broker_symbol}:' + f'{ctx.cid}@{chan.uid}' ) if tick_throttle: assert stream._closed @@ -354,7 +394,11 @@ async def sample_and_broadcast( try: subs.remove((stream, tick_throttle)) except ValueError: - log.error(f'{stream} was already removed from subs!?') + log.error( + f'Stream was already removed from subs!?\n' + f'{broker_symbol}:' + f'{ctx.cid}@{chan.uid}' + ) # TODO: a less naive throttler, here's some snippets: @@ -466,6 +510,7 @@ async def uniform_rate_send( # if the feed consumer goes down then drop # out of this rate limiter log.warning(f'{stream} closed') + await stream.aclose() return # reset send cycle state diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 8848ec1c..1172fc7b 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -20,6 +20,7 @@ NumPy compatible shared memory buffers for real-time IPC streaming. """ from __future__ import annotations from sys import byteorder +import time from typing import Optional from multiprocessing.shared_memory import SharedMemory, _USE_POSIX @@ -98,7 +99,12 @@ class SharedInt: if _USE_POSIX: # We manually unlink to bypass all the "resource tracker" # nonsense meant for non-SC systems. - shm_unlink(self._shm.name) + name = self._shm.name + try: + shm_unlink(name) + except FileNotFoundError: + # might be a teardown race here? + log.warning(f'Shm for {name} already unlinked?') class _Token(BaseModel): @@ -536,8 +542,26 @@ def attach_shm_array( if key in _known_tokens: assert _Token.from_msg(_known_tokens[key]) == token, "WTF" + # XXX: ugh, looks like due to the ``shm_open()`` C api we can't + # actually place files in a subdir, see discussion here: + # https://stackoverflow.com/a/11103289 + # attach to array buffer and view as per dtype - shm = SharedMemory(name=key) + _err: Optional[Exception] = None + for _ in range(3): + try: + shm = SharedMemory( + name=key, + create=False, + ) + break + except OSError as oserr: + _err = oserr + time.sleep(0.1) + else: + if _err: + raise _err + shmarr = np.ndarray( (size,), dtype=token.dtype, diff --git a/piker/data/_source.py b/piker/data/_source.py index 2f5f61ed..9afcb191 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -33,7 +33,7 @@ ohlc_fields = [ ('high', float), ('low', float), ('close', float), - ('volume', int), + ('volume', float), ('bar_wap', float), ] diff --git a/piker/data/feed.py b/piker/data/feed.py index 605349e9..1165fddc 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -33,6 +33,7 @@ from typing import ( Generator, Awaitable, TYPE_CHECKING, + Union, ) import trio @@ -40,12 +41,12 @@ from trio.abc import ReceiveChannel from trio_typing import TaskStatus import trimeter import tractor +from tractor.trionics import maybe_open_context from pydantic import BaseModel import pendulum import numpy as np from ..brokers import get_brokermod -from .._cacheables import maybe_open_context from ..calc import humanize from ..log import get_logger, get_console_log from .._daemon import ( @@ -116,7 +117,13 @@ class _FeedsBus(BaseModel): # https://github.com/samuelcolvin/pydantic/issues/2816 _subscribers: dict[ str, - list[tuple[tractor.MsgStream, Optional[float]]] + list[ + tuple[ + Union[tractor.MsgStream, trio.MemorySendChannel], + tractor.Context, + Optional[float], # tick throttle in Hz + ] + ] ] = {} async def start_task( @@ -228,7 +235,7 @@ def diff_history( # the + 1 is because ``last_tsdb_dt`` is pulled from # the last row entry for the ``'time'`` field retreived # from the tsdb. - to_push = array[abs(s_diff)+1:] + to_push = array[abs(s_diff) + 1:] else: # pass back only the portion of the array that is @@ -251,6 +258,7 @@ async def start_backfill( last_tsdb_dt: Optional[datetime] = None, storage: Optional[Storage] = None, write_tsdb: bool = True, + tsdb_is_up: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -266,8 +274,8 @@ async def start_backfill( # sample period step size in seconds step_size_s = ( - pendulum.from_timestamp(times[-1]) - - pendulum.from_timestamp(times[-2]) + pendulum.from_timestamp(times[-1]) + - pendulum.from_timestamp(times[-2]) ).seconds # "frame"'s worth of sample period steps in seconds @@ -292,25 +300,33 @@ async def start_backfill( # let caller unblock and deliver latest history frame task_status.started((shm, start_dt, end_dt, bf_done)) + # based on the sample step size, maybe load a certain amount history if last_tsdb_dt is None: - # maybe a better default (they don't seem to define epoch?!) - - # based on the sample step size load a certain amount - # history - if step_size_s == 1: - last_tsdb_dt = pendulum.now().subtract(days=2) - - elif step_size_s == 60: - last_tsdb_dt = pendulum.now().subtract(years=2) - - else: + if step_size_s not in (1, 60): raise ValueError( '`piker` only needs to support 1m and 1s sampling ' 'but ur api is trying to deliver a longer ' f'timeframe of {step_size_s} ' 'seconds.. so ye, dun ' - 'do dat bruh.' + 'do dat brudder.' ) + # when no tsdb "last datum" is provided, we just load + # some near-term history. + periods = { + 1: {'days': 1}, + 60: {'days': 14}, + } + + if tsdb_is_up: + # do a decently sized backfill and load it into storage. + periods = { + 1: {'days': 6}, + 60: {'years': 2}, + } + + kwargs = periods[step_size_s] + last_tsdb_dt = start_dt.subtract(**kwargs) + # configure async query throttling erlangs = config.get('erlangs', 1) rate = config.get('rate', 1) @@ -328,7 +344,7 @@ async def start_backfill( log.debug(f'New datetime index:\n{pformat(dtrange)}') for end_dt in dtrange: - log.warning(f'Yielding next frame start {end_dt}') + log.info(f'Yielding next frame start {end_dt}') start = yield end_dt # if caller sends a new start date, reset to that @@ -568,8 +584,8 @@ async def start_backfill( start_dt, end_dt, ) = await get_ohlc_frame( - input_end_dt=last_shm_prepend_dt, - iter_dts_gen=idts, + input_end_dt=last_shm_prepend_dt, + iter_dts_gen=idts, ) last_epoch = to_push['time'][-1] diff = start - last_epoch @@ -712,6 +728,7 @@ async def manage_history( bfqsn, shm, last_tsdb_dt=last_tsdb_dt, + tsdb_is_up=True, storage=storage, ) ) @@ -795,6 +812,15 @@ async def manage_history( # manually trigger step update to update charts/fsps # which need an incremental update. + # NOTE: the way this works is super duper + # un-intuitive right now: + # - the broadcaster fires a msg to the fsp subsystem. + # - fsp subsys then checks for a sample step diff and + # possibly recomputes prepended history. + # - the fsp then sends back to the parent actor + # (usually a chart showing graphics for said fsp) + # which tells the chart to conduct a manual full + # graphics loop cycle. for delay_s in sampler.subscribers: await broadcast(delay_s) @@ -994,7 +1020,7 @@ async def open_feed_bus( brokername: str, symbol: str, # normally expected to the broker-specific fqsn loglevel: str, - tick_throttle: Optional[float] = None, + tick_throttle: Optional[float] = None, start_stream: bool = True, ) -> None: @@ -1098,10 +1124,10 @@ async def open_feed_bus( recv, stream, ) - sub = (send, tick_throttle) + sub = (send, ctx, tick_throttle) else: - sub = (stream, tick_throttle) + sub = (stream, ctx, tick_throttle) subs = bus._subscribers[bfqsn] subs.append(sub) @@ -1255,7 +1281,7 @@ async def install_brokerd_search( # a backend module? pause_period=getattr( brokermod, '_search_conf', {} - ).get('pause_period', 0.0616), + ).get('pause_period', 0.0616), ): yield diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index e1fb38d5..4d1c91ad 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -230,8 +230,8 @@ _ohlcv_dt = [ # ohlcv sampling ('Open', 'f4'), ('High', 'f4'), - ('Low', 'i8'), - ('Close', 'i8'), + ('Low', 'f4'), + ('Close', 'f4'), ('Volume', 'f4'), ] @@ -547,6 +547,17 @@ class Storage: if err: raise MarketStoreError(err) + # XXX: currently the only way to do this is through the CLI: + + # sudo ./marketstore connect --dir ~/.config/piker/data + # >> \show mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15 + # and this seems to block and use up mem.. + # >> \trim mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15 + + # relevant source code for this is here: + # https://github.com/alpacahq/marketstore/blob/master/cmd/connect/session/trim.go#L14 + # def delete_range(self, start_dt, end_dt) -> None: + # ... @acm async def open_storage_client( @@ -628,12 +639,13 @@ async def tsdb_history_update( tsdb_arrays = await storage.read_ohlcv(fqsn) # hist diffing if tsdb_arrays: - onesec = tsdb_arrays[1] - - # these aren't currently used but can be referenced from - # within the embedded ipython shell below. - to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]] - to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]] + for secs in (1, 60): + ts = tsdb_arrays.get(secs) + if ts is not None and len(ts): + # these aren't currently used but can be referenced from + # within the embedded ipython shell below. + to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]] + to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]] profiler('Finished db arrays diffs') diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 0776c7a2..cf45c40e 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -369,7 +369,12 @@ async def cascade( # always trigger UI refresh after history update, # see ``piker.ui._fsp.FspAdmin.open_chain()`` and # ``piker.ui._display.trigger_update()``. - await client_stream.send('update') + await client_stream.send({ + 'fsp_update': { + 'key': dst_shm_token, + 'first': dst._first.value, + 'last': dst._last.value, + }}) return tracker, index def is_synced( diff --git a/piker/log.py b/piker/log.py index 7c8bb798..804e09dc 100644 --- a/piker/log.py +++ b/piker/log.py @@ -25,10 +25,13 @@ from pygments import highlight, lexers, formatters # Makes it so we only see the full module name when using ``__name__`` # without the extra "piker." prefix. -_proj_name = 'piker' +_proj_name: str = 'piker' -def get_logger(name: str = None) -> logging.Logger: +def get_logger( + name: str = None, + +) -> logging.Logger: '''Return the package log or a sub-log for `name` if provided. ''' return tractor.log.get_logger(name=name, _root_name=_proj_name) diff --git a/piker/ui/_axes.py b/piker/ui/_axes.py index 93ac7af7..7ba52055 100644 --- a/piker/ui/_axes.py +++ b/piker/ui/_axes.py @@ -223,8 +223,9 @@ class DynamicDateAxis(Axis): ) -> list[str]: chart = self.linkedsplits.chart - bars = chart._arrays[chart.name] - shm = self.linkedsplits.chart._shm + flow = chart._flows[chart.name] + shm = flow.shm + bars = shm.array first = shm._first.value bars_len = len(bars) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index a3a97164..7b40f0d7 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -34,9 +34,7 @@ from PyQt5.QtWidgets import ( QVBoxLayout, QSplitter, ) -import msgspec import numpy as np -# from pydantic import BaseModel import pyqtgraph as pg import trio @@ -49,9 +47,13 @@ from ._cursor import ( Cursor, ContentsLabel, ) +from ..data._sharedmem import ShmArray from ._l1 import L1Labels from ._ohlc import BarItems -from ._curve import FastAppendCurve +from ._curve import ( + Curve, + StepCurve, +) from ._style import ( hcolor, CHART_MARGINS, @@ -60,15 +62,12 @@ from ._style import ( ) from ..data.feed import Feed from ..data._source import Symbol -from ..data._sharedmem import ( - ShmArray, - # _Token, -) from ..log import get_logger from ._interaction import ChartView from ._forms import FieldsForm from .._profile import pg_profile_enabled, ms_slower_then from ._overlay import PlotItemOverlay +from ._flows import Flow if TYPE_CHECKING: from ._display import DisplayState @@ -419,7 +418,7 @@ class LinkedSplits(QWidget): self, symbol: Symbol, - array: np.ndarray, + shm: ShmArray, sidepane: FieldsForm, style: str = 'bar', @@ -444,7 +443,7 @@ class LinkedSplits(QWidget): self.chart = self.add_plot( name=symbol.key, - array=array, + shm=shm, style=style, _is_main=True, @@ -472,7 +471,7 @@ class LinkedSplits(QWidget): self, name: str, - array: np.ndarray, + shm: ShmArray, array_key: Optional[str] = None, style: str = 'line', @@ -516,7 +515,6 @@ class LinkedSplits(QWidget): name=name, data_key=array_key or name, - array=array, parent=qframe, linkedsplits=self, axisItems=axes, @@ -580,7 +578,7 @@ class LinkedSplits(QWidget): graphics, data_key = cpw.draw_ohlc( name, - array, + shm, array_key=array_key ) self.cursor.contents_labels.add_label( @@ -594,7 +592,7 @@ class LinkedSplits(QWidget): add_label = True graphics, data_key = cpw.draw_curve( name, - array, + shm, array_key=array_key, color='default_light', ) @@ -603,7 +601,7 @@ class LinkedSplits(QWidget): add_label = True graphics, data_key = cpw.draw_curve( name, - array, + shm, array_key=array_key, step_mode=True, color='davies', @@ -691,7 +689,6 @@ class ChartPlotWidget(pg.PlotWidget): # the "data view" we generate graphics from name: str, - array: np.ndarray, data_key: str, linkedsplits: LinkedSplits, @@ -744,14 +741,6 @@ class ChartPlotWidget(pg.PlotWidget): self._max_l1_line_len: float = 0 # self.setViewportMargins(0, 0, 0, 0) - # self._ohlc = array # readonly view of ohlc data - - # TODO: move to Aggr above XD - # readonly view of data arrays - self._arrays = { - self.data_key: array, - } - self._graphics = {} # registry of underlying graphics # registry of overlay curve names self._flows: dict[str, Flow] = {} @@ -767,7 +756,6 @@ class ChartPlotWidget(pg.PlotWidget): # show background grid self.showGrid(x=False, y=True, alpha=0.3) - self.default_view() self.cv.enable_auto_yrange() self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem) @@ -816,14 +804,8 @@ class ChartPlotWidget(pg.PlotWidget): Return a range tuple for the bars present in view. ''' - l, r = self.view_range() - array = self._arrays[self.name] - start, stop = self._xrange = ( - array[0]['index'], - array[-1]['index'], - ) - lbar = max(l, start) - rbar = min(r, stop) + main_flow = self._flows[self.name] + ifirst, l, lbar, rbar, r, ilast = main_flow.datums_range() return l, lbar, rbar, r def curve_width_pxs( @@ -877,40 +859,51 @@ class ChartPlotWidget(pg.PlotWidget): def default_view( self, - steps_on_screen: Optional[int] = None + bars_from_y: int = 3000, ) -> None: ''' Set the view box to the "default" startup view of the scene. ''' - try: - index = self._arrays[self.name]['index'] - except IndexError: - log.warning(f'array for {self.name} not loaded yet?') + flow = self._flows.get(self.name) + if not flow: + log.warning(f'`Flow` for {self.name} not loaded yet?') return + index = flow.shm.array['index'] xfirst, xlast = index[0], index[-1] l, lbar, rbar, r = self.bars_range() - - marker_pos, l1_len = self.pre_l1_xs() - end = xlast + l1_len + 1 + view = self.view if ( rbar < 0 or l < xfirst + or l < 0 or (rbar - lbar) < 6 ): - # set fixed bars count on screen that approx includes as + # TODO: set fixed bars count on screen that approx includes as # many bars as possible before a downsample line is shown. - begin = xlast - round(6116 / 6) + begin = xlast - bars_from_y + view.setXRange( + min=begin, + max=xlast, + padding=0, + ) + # re-get range + l, lbar, rbar, r = self.bars_range() - else: - begin = end - (r - l) + # we get the L1 spread label "length" in view coords + # terms now that we've scaled either by user control + # or to the default set of bars as per the immediate block + # above. + marker_pos, l1_len = self.pre_l1_xs() + end = xlast + l1_len + 1 + begin = end - (r - l) # for debugging # print( - # f'bars range: {brange}\n' + # # f'bars range: {brange}\n' # f'xlast: {xlast}\n' # f'marker pos: {marker_pos}\n' # f'l1 len: {l1_len}\n' @@ -922,14 +915,13 @@ class ChartPlotWidget(pg.PlotWidget): if self._static_yrange == 'axis': self._static_yrange = None - view = self.view view.setXRange( min=begin, max=end, padding=0, ) - view._set_yrange() self.view.maybe_downsample_graphics() + view._set_yrange() try: self.linked.graphics_cycle() except IndexError: @@ -960,7 +952,7 @@ class ChartPlotWidget(pg.PlotWidget): def draw_ohlc( self, name: str, - data: np.ndarray, + shm: ShmArray, array_key: Optional[str] = None, @@ -980,15 +972,12 @@ class ChartPlotWidget(pg.PlotWidget): # the np array buffer to be drawn on next render cycle self.plotItem.addItem(graphics) - # draw after to allow self.scene() to work... - graphics.draw_from_data(data) - data_key = array_key or name - self._graphics[data_key] = graphics self._flows[data_key] = Flow( name=name, plot=self.plotItem, + _shm=shm, is_ohlc=True, graphics=graphics, ) @@ -1058,20 +1047,21 @@ class ChartPlotWidget(pg.PlotWidget): self, name: str, - data: np.ndarray, + shm: ShmArray, array_key: Optional[str] = None, overlay: bool = False, color: Optional[str] = None, add_label: bool = True, pi: Optional[pg.PlotItem] = None, + step_mode: bool = False, **pdi_kwargs, ) -> (pg.PlotDataItem, str): ''' Draw a "curve" (line plot graphics) for the provided data in - the input array ``data``. + the input shm array ``shm``. ''' color = color or self.pen_color or 'default_light' @@ -1081,40 +1071,26 @@ class ChartPlotWidget(pg.PlotWidget): data_key = array_key or name - # yah, we wrote our own B) - curve = FastAppendCurve( - y=data[data_key], - x=data['index'], - # antialias=True, + curve_type = { + None: Curve, + 'step': StepCurve, + # TODO: + # 'bars': BarsItems + }['step' if step_mode else None] + + curve = curve_type( name=name, - - # XXX: pretty sure this is just more overhead - # on data reads and makes graphics rendering no faster - # clipToView=True, - **pdi_kwargs, ) - # XXX: see explanation for different caching modes: - # https://stackoverflow.com/a/39410081 - # seems to only be useful if we don't re-generate the entire - # QPainterPath every time - # curve.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) - - # don't ever use this - it's a colossal nightmare of artefacts - # and is disastrous for performance. - # curve.setCacheMode(QtWidgets.QGraphicsItem.ItemCoordinateCache) - - # register curve graphics and backing array for name - self._graphics[name] = curve - self._arrays[data_key] = data - pi = pi or self.plotItem self._flows[data_key] = Flow( name=name, plot=pi, + _shm=shm, is_ohlc=False, + # register curve graphics with this flow graphics=curve, ) @@ -1175,16 +1151,11 @@ class ChartPlotWidget(pg.PlotWidget): ) return last - def update_graphics_from_array( + def update_graphics_from_flow( self, graphics_name: str, - - array: Optional[np.ndarray] = None, array_key: Optional[str] = None, - use_vr: bool = True, - render: bool = True, - **kwargs, ) -> pg.GraphicsObject: @@ -1192,63 +1163,11 @@ class ChartPlotWidget(pg.PlotWidget): Update the named internal graphics from ``array``. ''' - if array is not None: - assert len(array) - - data_key = array_key or graphics_name - if graphics_name not in self._flows: - data_key = self.name - - if array is not None: - # write array to internal graphics table - self._arrays[data_key] = array - else: - array = self._arrays[data_key] - - # array key and graphics "name" might be different.. - graphics = self._graphics[graphics_name] - - # compute "in-view" indices - l, lbar, rbar, r = self.bars_range() - indexes = array['index'] - ifirst = indexes[0] - ilast = indexes[-1] - - lbar_i = max(l, ifirst) - ifirst - rbar_i = min(r, ilast) - ifirst - - # TODO: we could do it this way as well no? - # to_draw = array[lbar - ifirst:(rbar - ifirst) + 1] - in_view = array[lbar_i: rbar_i + 1] - - if ( - not in_view.size - or not render - ): - return graphics - - if isinstance(graphics, BarItems): - graphics.update_from_array( - array, - in_view, - view_range=(lbar_i, rbar_i) if use_vr else None, - - **kwargs, - ) - - else: - graphics.update_from_array( - x=array['index'], - y=array[data_key], - - x_iv=in_view['index'], - y_iv=in_view[data_key], - view_range=(lbar_i, rbar_i) if use_vr else None, - - **kwargs - ) - - return graphics + flow = self._flows[array_key or graphics_name] + return flow.update_graphics( + array_key=array_key, + **kwargs, + ) # def _label_h(self, yhigh: float, ylow: float) -> float: # # compute contents label "height" in view terms @@ -1295,7 +1214,7 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: this should go onto some sort of # data-view thinger..right? - ohlc = self._shm.array + ohlc = self._flows[self.name].shm.array # XXX: not sure why the time is so off here # looks like we're gonna have to do some fixing.. @@ -1325,7 +1244,9 @@ class ChartPlotWidget(pg.PlotWidget): def maxmin( self, name: Optional[str] = None, - bars_range: Optional[tuple[int, int, int, int]] = None, + bars_range: Optional[tuple[ + int, int, int, int, int, int + ]] = None, ) -> tuple[float, float]: ''' @@ -1334,16 +1255,14 @@ class ChartPlotWidget(pg.PlotWidget): If ``bars_range`` is provided use that range. ''' + # print(f'Chart[{self.name}].maxmin()') profiler = pg.debug.Profiler( - msg=f'`{str(self)}.maxmin()` loop cycle for: `{self.name}`', + msg=f'`{str(self)}.maxmin(name={name})`: `{self.name}`', disabled=not pg_profile_enabled(), - gt=ms_slower_then, + ms_threshold=ms_slower_then, delayed=True, ) - l, lbar, rbar, r = bars_range or self.bars_range() - profiler(f'{self.name} got bars range') - # TODO: here we should instead look up the ``Flow.shm.array`` # and read directly from shm to avoid copying to memory first # and then reading it again here. @@ -1353,112 +1272,26 @@ class ChartPlotWidget(pg.PlotWidget): flow is None ): log.error(f"flow {flow_key} doesn't exist in chart {self.name} !?") - res = 0, 0 + key = res = 0, 0 else: + ( + first, + l, + lbar, + rbar, + r, + last, + ) = bars_range or flow.datums_range() + profiler(f'{self.name} got bars range') + key = round(lbar), round(rbar) res = flow.maxmin(*key) - profiler(f'yrange mxmn: {key} -> {res}') if res == (None, None): log.error( f"{flow_key} no mxmn for bars_range => {key} !?" ) res = 0, 0 + profiler(f'yrange mxmn: {key} -> {res}') return res - - -# class FlowsTable(pydantic.BaseModel): -# ''' -# Data-AGGRegate: high level API onto multiple (categorized) -# ``Flow``s with high level processing routines for -# multi-graphics computations and display. - -# ''' -# flows: dict[str, np.ndarray] = {} - - -class Flow(msgspec.Struct): # , frozen=True): - ''' - (FinancialSignal-)Flow compound type which wraps a real-time - graphics (curve) and its backing data stream together for high level - access and control. - - The intention is for this type to eventually be capable of shm-passing - of incrementally updated graphics stream data between actors. - - ''' - name: str - plot: pg.PlotItem - is_ohlc: bool = False - graphics: pg.GraphicsObject - - # TODO: hackery to be able to set a shm later - # but whilst also allowing this type to hashable, - # likely will require serializable token that is used to attach - # to the underlying shm ref after startup? - _shm: Optional[ShmArray] = None # currently, may be filled in "later" - - # cache of y-range values per x-range input. - _mxmns: dict[tuple[int, int], tuple[float, float]] = {} - - @property - def shm(self) -> ShmArray: - return self._shm - - @shm.setter - def shm(self, shm: ShmArray) -> ShmArray: - self._shm = shm - - def maxmin( - self, - lbar, - rbar, - - ) -> tuple[float, float]: - ''' - Compute the cached max and min y-range values for a given - x-range determined by ``lbar`` and ``rbar``. - - ''' - rkey = (lbar, rbar) - cached_result = self._mxmns.get(rkey) - if cached_result: - return cached_result - - shm = self.shm - if shm is None: - mxmn = None - - else: # new block for profiling?.. - arr = shm.array - - # build relative indexes into shm array - # TODO: should we just add/use a method - # on the shm to do this? - ifirst = arr[0]['index'] - slice_view = arr[ - lbar - ifirst: - (rbar - ifirst) + 1 - ] - - if not slice_view.size: - mxmn = None - - else: - if self.is_ohlc: - ylow = np.min(slice_view['low']) - yhigh = np.max(slice_view['high']) - - else: - view = slice_view[self.name] - ylow = np.min(view) - yhigh = np.max(view) - - mxmn = ylow, yhigh - - if mxmn is not None: - # cache new mxmn result - self._mxmns[rkey] = mxmn - - return mxmn diff --git a/piker/ui/_compression.py b/piker/ui/_compression.py index adb42251..e9564359 100644 --- a/piker/ui/_compression.py +++ b/piker/ui/_compression.py @@ -138,50 +138,20 @@ def ohlc_flatten( return x, flat -def ohlc_to_m4_line( - ohlc: np.ndarray, - px_width: int, - - downsample: bool = False, - uppx: Optional[float] = None, - pretrace: bool = False, - -) -> tuple[np.ndarray, np.ndarray]: - ''' - Convert an OHLC struct-array to a m4 downsampled 1-d array. - - ''' - xpts, flat = ohlc_flatten( - ohlc, - use_mxmn=pretrace, - ) - - if downsample: - bins, x, y = ds_m4( - xpts, - flat, - px_width=px_width, - uppx=uppx, - log_scale=bool(uppx) - ) - x = np.broadcast_to(x[:, None], y.shape) - x = (x + np.array([-0.43, 0, 0, 0.43])).flatten() - y = y.flatten() - - return x, y - else: - return xpts, flat - - def ds_m4( x: np.ndarray, y: np.ndarray, + # units-per-pixel-x(dimension) + uppx: float, - # this is the width of the data in view - # in display-device-local pixel units. - px_width: int, - uppx: Optional[float] = None, - log_scale: bool = True, + # XXX: troll zone / easter egg.. + # want to mess with ur pal, pass in the actual + # pixel width here instead of uppx-proper (i.e. pass + # in our ``pg.GraphicsObject`` derivative's ``.px_width()`` + # gto mega-trip-out ur bud). Hint, it used to be implemented + # (wrongly) using "pixel width", so check the git history ;) + + xrange: Optional[float] = None, ) -> tuple[int, np.ndarray, np.ndarray]: ''' @@ -208,52 +178,49 @@ def ds_m4( # "i didn't show it in the sample code, but it's accounted for # in the start and end indices and number of bins" - # optionally log-scale down the "supposed pxs on screen" - # as the units-per-px (uppx) get's large. - if log_scale: - assert uppx, 'You must provide a `uppx` value to use log scaling!' - - # scaler = 2**7 / (1 + math.log(uppx, 2)) - scaler = round( - max( - # NOTE: found that a 16x px width brought greater - # detail, likely due to dpi scaling? - # px_width=px_width * 16, - 2**7 / (1 + math.log(uppx, 2)), - 1 - ) - ) - px_width *= scaler - - assert px_width > 1 # width of screen in pxs? + # should never get called unless actually needed + assert uppx > 1 # NOTE: if we didn't pre-slice the data to downsample # you could in theory pass these as the slicing params, # do we care though since we can always just pre-slice the # input? x_start = x[0] # x value start/lowest in domain - x_end = x[-1] # x end value/highest in domain + + if xrange is None: + x_end = x[-1] # x end value/highest in domain + xrange = (x_end - x_start) # XXX: always round up on the input pixels - px_width = math.ceil(px_width) + # lnx = len(x) + # uppx *= max(4 / (1 + math.log(uppx, 2)), 1) - x_range = x_end - x_start + pxw = math.ceil(xrange / uppx) - # ratio of indexed x-value to width of raster in pixels. - # this is more or less, uppx: units-per-pixel. - w = x_range / float(px_width) + # scale up the frame "width" directly with uppx + w = uppx # ensure we make more then enough # frames (windows) for the output pixel - frames = px_width + frames = pxw # if we have more and then exact integer's # (uniform quotient output) worth of datum-domain-points # per windows-frame, add one more window to ensure # we have room for all output down-samples. - pts_per_pixel, r = divmod(len(x), frames) + pts_per_pixel, r = divmod(xrange, frames) if r: + # while r: frames += 1 + pts_per_pixel, r = divmod(xrange, frames) + + # print( + # f'uppx: {uppx}\n' + # f'xrange: {xrange}\n' + # f'pxw: {pxw}\n' + # f'frames: {frames}\n' + # ) + assert frames >= (xrange / uppx) # call into ``numba`` nb, i_win, y_out = _m4( diff --git a/piker/ui/_cursor.py b/piker/ui/_cursor.py index a34c15c1..606ff3f2 100644 --- a/piker/ui/_cursor.py +++ b/piker/ui/_cursor.py @@ -43,8 +43,8 @@ log = get_logger(__name__) # latency (in terms of perceived lag in cross hair) so really be sure # there's an improvement if you want to change it! -_mouse_rate_limit = 120 # TODO; should we calc current screen refresh rate? -_debounce_delay = 1 / 40 +_mouse_rate_limit = 60 # TODO; should we calc current screen refresh rate? +_debounce_delay = 0 _ch_label_opac = 1 @@ -98,25 +98,30 @@ class LineDot(pg.CurvePoint): ev: QtCore.QEvent, ) -> bool: - if not isinstance( - ev, QtCore.QDynamicPropertyChangeEvent - ) or self.curve() is None: + + if ( + not isinstance(ev, QtCore.QDynamicPropertyChangeEvent) + or self.curve() is None + ): return False # TODO: get rid of this ``.getData()`` and # make a more pythonic api to retreive backing # numpy arrays... - (x, y) = self.curve().getData() - index = self.property('index') - # first = self._plot._arrays['ohlc'][0]['index'] - # first = x[0] - # i = index - first - if index: - i = round(index - x[0]) - if i > 0 and i < len(y): - newPos = (index, y[i]) - QtWidgets.QGraphicsItem.setPos(self, *newPos) - return True + # (x, y) = self.curve().getData() + # index = self.property('index') + # # first = self._plot._arrays['ohlc'][0]['index'] + # # first = x[0] + # # i = index - first + # if index: + # i = round(index - x[0]) + # if i > 0 and i < len(y): + # newPos = (index, y[i]) + # QtWidgets.QGraphicsItem.setPos( + # self, + # *newPos, + # ) + # return True return False @@ -254,13 +259,13 @@ class ContentsLabels: def update_labels( self, index: int, - # array_name: str, ) -> None: - # for name, (label, update) in self._labels.items(): for chart, name, label, update in self._labels: - array = chart._arrays[name] + flow = chart._flows[name] + array = flow.shm.array + if not ( index >= 0 and index < array[-1]['index'] @@ -269,8 +274,6 @@ class ContentsLabels: print('WTF out of range?') continue - # array = chart._arrays[name] - # call provided update func with data point try: label.show() @@ -472,9 +475,12 @@ class Cursor(pg.GraphicsObject): ) -> LineDot: # if this plot contains curves add line dot "cursors" to denote # the current sample under the mouse + main_flow = plot._flows[plot.name] + # read out last index + i = main_flow.shm.array[-1]['index'] cursor = LineDot( curve, - index=plot._arrays[plot.name][-1]['index'], + index=i, plot=plot ) plot.addItem(cursor) diff --git a/piker/ui/_curve.py b/piker/ui/_curve.py index 00a4ca7a..ac967bf7 100644 --- a/piker/ui/_curve.py +++ b/piker/ui/_curve.py @@ -18,83 +18,37 @@ Fast, smooth, sexy curves. """ -from typing import Optional +from contextlib import contextmanager as cm +from typing import Optional, Callable import numpy as np import pyqtgraph as pg -from PyQt5 import QtGui, QtWidgets +from PyQt5 import QtWidgets from PyQt5.QtWidgets import QGraphicsItem from PyQt5.QtCore import ( Qt, QLineF, QSizeF, QRectF, + # QRect, QPointF, ) - +from PyQt5.QtGui import ( + QPainter, + QPainterPath, +) from .._profile import pg_profile_enabled, ms_slower_then from ._style import hcolor -from ._compression import ( - # ohlc_to_m4_line, - ds_m4, -) +# from ._compression import ( +# # ohlc_to_m4_line, +# ds_m4, +# ) from ..log import get_logger log = get_logger(__name__) -def step_path_arrays_from_1d( - x: np.ndarray, - y: np.ndarray, - include_endpoints: bool = False, - -) -> (np.ndarray, np.ndarray): - ''' - Generate a "step mode" curve aligned with OHLC style bars - such that each segment spans each bar (aka "centered" style). - - ''' - y_out = y.copy() - x_out = x.copy() - x2 = np.empty( - # the data + 2 endpoints on either end for - # "termination of the path". - (len(x) + 1, 2), - # we want to align with OHLC or other sampling style - # bars likely so we need fractinal values - dtype=float, - ) - x2[0] = x[0] - 0.5 - x2[1] = x[0] + 0.5 - x2[1:] = x[:, np.newaxis] + 0.5 - - # flatten to 1-d - x_out = x2.reshape(x2.size) - - # we create a 1d with 2 extra indexes to - # hold the start and (current) end value for the steps - # on either end - y2 = np.empty((len(y), 2), dtype=y.dtype) - y2[:] = y[:, np.newaxis] - - y_out = np.empty( - 2*len(y) + 2, - dtype=y.dtype - ) - - # flatten and set 0 endpoints - y_out[1:-1] = y2.reshape(y2.size) - y_out[0] = 0 - y_out[-1] = 0 - - if not include_endpoints: - return x_out[:-1], y_out[:-1] - - else: - return x_out, y_out - - _line_styles: dict[str, int] = { 'solid': Qt.PenStyle.SolidLine, 'dash': Qt.PenStyle.DashLine, @@ -103,24 +57,43 @@ _line_styles: dict[str, int] = { } -class FastAppendCurve(pg.GraphicsObject): +class Curve(pg.GraphicsObject): ''' - A faster, append friendly version of ``pyqtgraph.PlotCurveItem`` - built for real-time data updates. + A faster, simpler, append friendly version of + ``pyqtgraph.PlotCurveItem`` built for highly customizable real-time + updates. - The main difference is avoiding regeneration of the entire - historical path where possible and instead only updating the "new" - segment(s) via a ``numpy`` array diff calc. Further the "last" - graphic segment is drawn independently such that near-term (high - frequency) discrete-time-sampled style updates don't trigger a full - path redraw. + This type is a much stripped down version of a ``pyqtgraph`` style + "graphics object" in the sense that the internal lower level + graphics which are drawn in the ``.paint()`` method are actually + rendered outside of this class entirely and instead are assigned as + state (instance vars) here and then drawn during a Qt graphics + cycle. + + The main motivation for this more modular, composed design is that + lower level graphics data can be rendered in different threads and + then read and drawn in this main thread without having to worry + about dealing with Qt's concurrency primitives. See + ``piker.ui._flows.Renderer`` for details and logic related to lower + level path generation and incremental update. The main differences in + the path generation code include: + + - avoiding regeneration of the entire historical path where possible + and instead only updating the "new" segment(s) via a ``numpy`` + array diff calc. + - here, the "last" graphics datum-segment is drawn independently + such that near-term (high frequency) discrete-time-sampled style + updates don't trigger a full path redraw. ''' + + # sub-type customization methods + sub_br: Optional[Callable] = None + sub_paint: Optional[Callable] = None + declare_paintables: Optional[Callable] = None + def __init__( self, - - x: np.ndarray, - y: np.ndarray, *args, step_mode: bool = False, @@ -134,27 +107,25 @@ class FastAppendCurve(pg.GraphicsObject): ) -> None: - # brutaaalll, see comments within.. - self._y = self.yData = y - self._x = self.xData = x - self._name = name - self.path: Optional[QtGui.QPainterPath] = None + # brutaaalll, see comments within.. + self.yData = None + self.xData = None + + # self._last_cap: int = 0 + self.path: Optional[QPainterPath] = None + + # additional path used for appends which tries to avoid + # triggering an update/redraw of the presumably larger + # historical ``.path`` above. self.use_fpath = use_fpath - self.fast_path: Optional[QtGui.QPainterPath] = None + self.fast_path: Optional[QPainterPath] = None # TODO: we can probably just dispense with the parent since # we're basically only using the pen setting now... super().__init__(*args, **kwargs) - # self._xrange: tuple[int, int] = self.dataBounds(ax=0) - self._xrange: Optional[tuple[int, int]] = None - - # self._last_draw = time.time() - self._in_ds: bool = False - self._last_uppx: float = 0 - # all history of curve is drawn in single px thickness pen = pg.mkPen(hcolor(color)) pen.setStyle(_line_styles[style]) @@ -168,29 +139,43 @@ class FastAppendCurve(pg.GraphicsObject): # self.last_step_pen = pg.mkPen(hcolor(color), width=2) self.last_step_pen = pg.mkPen(pen, width=2) - self._last_line: Optional[QLineF] = None - self._last_step_rect: Optional[QRectF] = None + # self._last_line: Optional[QLineF] = None + self._last_line = QLineF() + self._last_w: float = 1 # flat-top style histogram-like discrete curve - self._step_mode: bool = step_mode + # self._step_mode: bool = step_mode # self._fill = True self._brush = pg.functions.mkBrush(hcolor(fill_color or color)) + # NOTE: this setting seems to mostly prevent redraws on mouse + # interaction which is a huge boon for avg interaction latency. + # TODO: one question still remaining is if this makes trasform # interactions slower (such as zooming) and if so maybe if/when # we implement a "history" mode for the view we disable this in # that mode? - if step_mode: - # don't enable caching by default for the case where the - # only thing drawn is the "last" line segment which can - # have a weird artifact where it won't be fully drawn to its - # endpoint (something we saw on trade rate curves) - self.setCacheMode( - QGraphicsItem.DeviceCoordinateCache - ) + # don't enable caching by default for the case where the + # only thing drawn is the "last" line segment which can + # have a weird artifact where it won't be fully drawn to its + # endpoint (something we saw on trade rate curves) + self.setCacheMode(QGraphicsItem.DeviceCoordinateCache) - self.update() + # XXX: see explanation for different caching modes: + # https://stackoverflow.com/a/39410081 + # seems to only be useful if we don't re-generate the entire + # QPainterPath every time + # curve.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) + + # don't ever use this - it's a colossal nightmare of artefacts + # and is disastrous for performance. + # curve.setCacheMode(QtWidgets.QGraphicsItem.ItemCoordinateCache) + + # allow sub-type customization + declare = self.declare_paintables + if declare: + declare() # TODO: probably stick this in a new parent # type which will contain our own version of @@ -214,9 +199,6 @@ class FastAppendCurve(pg.GraphicsObject): vr = self.viewRect() l, r = int(vr.left()), int(vr.right()) - if not self._xrange: - return 0 - start, stop = self._xrange lbar = max(l, start) rbar = min(r, stop) @@ -225,352 +207,10 @@ class FastAppendCurve(pg.GraphicsObject): QLineF(lbar, 0, rbar, 0) ).length() - def downsample( - self, - x, - y, - px_width, - uppx, - - ) -> tuple[np.ndarray, np.ndarray]: - - # downsample whenever more then 1 pixels per datum can be shown. - # always refresh data bounds until we get diffing - # working properly, see above.. - bins, x, y = ds_m4( - x, - y, - px_width=px_width, - uppx=uppx, - log_scale=bool(uppx) - ) - x = np.broadcast_to(x[:, None], y.shape) - # x = (x + np.array([-0.43, 0, 0, 0.43])).flatten() - x = (x + np.array([-0.5, 0, 0, 0.5])).flatten() - y = y.flatten() - - # presumably? - self._in_ds = True - return x, y - - def update_from_array( - self, - - # full array input history - x: np.ndarray, - y: np.ndarray, - - # pre-sliced array data that's "in view" - x_iv: np.ndarray, - y_iv: np.ndarray, - - view_range: Optional[tuple[int, int]] = None, - profiler: Optional[pg.debug.Profiler] = None, - - ) -> QtGui.QPainterPath: - ''' - Update curve from input 2-d data. - - Compare with a cached "x-range" state and (pre/a)ppend based on - a length diff. - - ''' - profiler = profiler or pg.debug.Profiler( - msg=f'FastAppendCurve.update_from_array(): `{self._name}`', - disabled=not pg_profile_enabled(), - gt=ms_slower_then, - ) - # flip_cache = False - - if self._xrange: - istart, istop = self._xrange - else: - self._xrange = istart, istop = x[0], x[-1] - # print(f"xrange: {self._xrange}") - - # XXX: lol brutal, the internals of `CurvePoint` (inherited by - # our `LineDot`) required ``.getData()`` to work.. - self.xData = x - self.yData = y - self._x, self._y = x, y - - if view_range: - profiler(f'view range slice {view_range}') - - # downsampling incremental state checking - uppx = self.x_uppx() - px_width = self.px_width() - uppx_diff = (uppx - self._last_uppx) - - should_ds = False - should_redraw = False - - # if a view range is passed, plan to draw the - # source ouput that's "in view" of the chart. - if view_range and not self._in_ds: - # print(f'{self._name} vr: {view_range}') - - # by default we only pull data up to the last (current) index - x_out, y_out = x_iv[:-1], y_iv[:-1] - - # step mode: draw flat top discrete "step" - # over the index space for each datum. - if self._step_mode: - # TODO: numba this bish - x_out, y_out = step_path_arrays_from_1d( - x_out, - y_out - ) - profiler('generated step arrays') - - should_redraw = True - profiler('sliced in-view array history') - - # x_last = x_iv[-1] - # y_last = y_iv[-1] - self._last_vr = view_range - - # self.disable_cache() - # flip_cache = True - - else: - self._xrange = x[0], x[-1] - - x_last = x[-1] - y_last = y[-1] - - # check for downsampling conditions - if ( - # std m4 downsample conditions - px_width - and uppx_diff >= 4 - or uppx_diff <= -3 - or self._step_mode and abs(uppx_diff) >= 4 - - ): - log.info( - f'{self._name} sampler change: {self._last_uppx} -> {uppx}' - ) - self._last_uppx = uppx - should_ds = True - - elif ( - uppx <= 2 - and self._in_ds - ): - # we should de-downsample back to our original - # source data so we clear our path data in prep - # to generate a new one from original source data. - should_redraw = True - should_ds = False - - # compute the length diffs between the first/last index entry in - # the input data and the last indexes we have on record from the - # last time we updated the curve index. - prepend_length = int(istart - x[0]) - append_length = int(x[-1] - istop) - - # no_path_yet = self.path is None - if ( - self.path is None - or should_redraw - or should_ds - or prepend_length > 0 - ): - if ( - not view_range - or self._in_ds - ): - # by default we only pull data up to the last (current) index - x_out, y_out = x[:-1], y[:-1] - - # step mode: draw flat top discrete "step" - # over the index space for each datum. - if self._step_mode: - x_out, y_out = step_path_arrays_from_1d( - x_out, - y_out, - ) - # TODO: numba this bish - profiler('generated step arrays') - - if should_redraw: - profiler('path reversion to non-ds') - if self.path: - self.path.clear() - - if self.fast_path: - self.fast_path.clear() - - if should_redraw and not should_ds: - if self._in_ds: - log.info(f'DEDOWN -> {self._name}') - - self._in_ds = False - - elif should_ds and px_width: - x_out, y_out = self.downsample( - x_out, - y_out, - px_width, - uppx, - ) - profiler(f'FULL PATH downsample redraw={should_ds}') - self._in_ds = True - - self.path = pg.functions.arrayToQPath( - x_out, - y_out, - connect='all', - finiteCheck=False, - path=self.path, - ) - profiler('generated fresh path') - # profiler(f'DRAW PATH IN VIEW -> {self._name}') - - # reserve mem allocs see: - # - https://doc.qt.io/qt-5/qpainterpath.html#reserve - # - https://doc.qt.io/qt-5/qpainterpath.html#capacity - # - https://doc.qt.io/qt-5/qpainterpath.html#clear - # XXX: right now this is based on had hoc checks on a - # hidpi 3840x2160 4k monitor but we should optimize for - # the target display(s) on the sys. - # if no_path_yet: - # self.path.reserve(int(500e3)) - - # TODO: get this piecewise prepend working - right now it's - # giving heck on vwap... - # elif prepend_length: - # breakpoint() - - # prepend_path = pg.functions.arrayToQPath( - # x[0:prepend_length], - # y[0:prepend_length], - # connect='all' - # ) - - # # swap prepend path in "front" - # old_path = self.path - # self.path = prepend_path - # # self.path.moveTo(new_x[0], new_y[0]) - # self.path.connectPath(old_path) - - elif ( - append_length > 0 - and not view_range - ): - new_x = x[-append_length - 2:-1] - new_y = y[-append_length - 2:-1] - - if self._step_mode: - new_x, new_y = step_path_arrays_from_1d( - new_x, - new_y, - ) - # [1:] since we don't need the vertical line normally at - # the beginning of the step curve taking the first (x, - # y) poing down to the x-axis **because** this is an - # appended path graphic. - new_x = new_x[1:] - new_y = new_y[1:] - - profiler('diffed append arrays') - - if should_ds: - new_x, new_y = self.downsample( - new_x, - new_y, - **should_ds, - ) - profiler(f'fast path downsample redraw={should_ds}') - - append_path = pg.functions.arrayToQPath( - new_x, - new_y, - connect='all', - finiteCheck=False, - path=self.fast_path, - ) - - if self.use_fpath: - # an attempt at trying to make append-updates faster.. - if self.fast_path is None: - self.fast_path = append_path - self.fast_path.reserve(int(6e3)) - else: - self.fast_path.connectPath(append_path) - size = self.fast_path.capacity() - profiler(f'connected fast path w size: {size}') - - # print(f"append_path br: {append_path.boundingRect()}") - # self.path.moveTo(new_x[0], new_y[0]) - # path.connectPath(append_path) - - # XXX: lol this causes a hang.. - # self.path = self.path.simplified() - else: - size = self.path.capacity() - profiler(f'connected history path w size: {size}') - self.path.connectPath(append_path) - - # other merging ideas: - # https://stackoverflow.com/questions/8936225/how-to-merge-qpainterpaths - # path.addPath(append_path) - # path.closeSubpath() - - # TODO: try out new work from `pyqtgraph` main which - # should repair horrid perf: - # https://github.com/pyqtgraph/pyqtgraph/pull/2032 - # ok, nope still horrible XD - # if self._fill: - # # XXX: super slow set "union" op - # self.path = self.path.united(append_path).simplified() - - # self.disable_cache() - # flip_cache = True - - # draw the "current" step graphic segment so it lines up with - # the "middle" of the current (OHLC) sample. - if self._step_mode: - self._last_line = QLineF( - x_last - 0.5, 0, - x_last + 0.5, 0, - ) - self._last_step_rect = QRectF( - x_last - 0.5, 0, - x_last + 0.5, y_last - ) - # print( - # f"path br: {self.path.boundingRect()}", - # f"fast path br: {self.fast_path.boundingRect()}", - # f"last rect br: {self._last_step_rect}", - # ) - else: - self._last_line = QLineF( - x[-2], y[-2], - x[-1], y_last - ) - - profiler('draw last segment') - - # trigger redraw of path - # do update before reverting to cache mode - # self.prepareGeometryChange() - self.update() - profiler('.update()') - - # if flip_cache: - # # XXX: seems to be needed to avoid artifacts (see above). - # self.setCacheMode(QGraphicsItem.DeviceCoordinateCache) - # XXX: lol brutal, the internals of `CurvePoint` (inherited by # our `LineDot`) required ``.getData()`` to work.. def getData(self): - return self._x, self._y - - # TODO: drop the above after ``Cursor`` re-work - def get_arrays(self) -> tuple[np.ndarray, np.ndarray]: - return self._x, self._y + return self.xData, self.yData def clear(self): ''' @@ -593,25 +233,18 @@ class FastAppendCurve(pg.GraphicsObject): # self.fast_path.clear() self.fast_path = None - # self.disable_cache() - # self.setCacheMode(QGraphicsItem.DeviceCoordinateCache) - - def disable_cache(self) -> None: - ''' - Disable the use of the pixel coordinate cache and trigger a geo event. - - ''' - # XXX: pretty annoying but, without this there's little - # artefacts on the append updates to the curve... + @cm + def reset_cache(self) -> None: self.setCacheMode(QtWidgets.QGraphicsItem.NoCache) - self.prepareGeometryChange() + yield + self.setCacheMode(QGraphicsItem.DeviceCoordinateCache) def boundingRect(self): ''' Compute and then cache our rect. ''' if self.path is None: - return QtGui.QPainterPath().boundingRect() + return QPainterPath().boundingRect() else: # dynamically override this method after initial # path is created to avoid requiring the above None check @@ -623,6 +256,7 @@ class FastAppendCurve(pg.GraphicsObject): Post init ``.boundingRect()```. ''' + # hb = self.path.boundingRect() hb = self.path.controlPointRect() hb_size = hb.size() @@ -630,17 +264,60 @@ class FastAppendCurve(pg.GraphicsObject): if fp: fhb = fp.controlPointRect() hb_size = fhb.size() + hb_size + # print(f'hb_size: {hb_size}') - w = hb_size.width() + 1 - h = hb_size.height() + 1 + # if self._last_step_rect: + # hb_size += self._last_step_rect.size() + + # if self._line: + # br = self._last_step_rect.bottomRight() + + # tl = QPointF( + # # self._vr[0], + # # hb.topLeft().y(), + # # 0, + # # hb_size.height() + 1 + # ) + + # br = self._last_step_rect.bottomRight() + + w = hb_size.width() + h = hb_size.height() + + sbr = self.sub_br + if sbr: + w, h = self.sub_br(w, h) + else: + # assume plain line graphic and use + # default unit step in each direction. + + # only on a plane line do we include + # and extra index step's worth of width + # since in the step case the end of the curve + # actually terminates earlier so we don't need + # this for the last step. + w += self._last_w + # ll = self._last_line + h += 1 # ll.y2() - ll.y1() + + # br = QPointF( + # self._vr[-1], + # # tl.x() + w, + # tl.y() + h, + # ) br = QRectF( # top left + # hb.topLeft() + # tl, QPointF(hb.topLeft()), + # br, # total size + # QSizeF(hb_size) + # hb_size, QSizeF(w, h) ) # print(f'bounding rect: {br}') @@ -648,40 +325,36 @@ class FastAppendCurve(pg.GraphicsObject): def paint( self, - p: QtGui.QPainter, + p: QPainter, opt: QtWidgets.QStyleOptionGraphicsItem, w: QtWidgets.QWidget ) -> None: profiler = pg.debug.Profiler( - msg=f'FastAppendCurve.paint(): `{self._name}`', + msg=f'Curve.paint(): `{self._name}`', disabled=not pg_profile_enabled(), - gt=ms_slower_then, + ms_threshold=ms_slower_then, ) - if ( - self._step_mode - and self._last_step_rect - ): - brush = self._brush + sub_paint = self.sub_paint + if sub_paint: + sub_paint(p, profiler) - # p.drawLines(*tuple(filter(bool, self._last_step_lines))) - # p.drawRect(self._last_step_rect) - p.fillRect(self._last_step_rect, brush) - profiler('.fillRect()') - - if self._last_line: - p.setPen(self.last_step_pen) - p.drawLine(self._last_line) - profiler('.drawLine()') - p.setPen(self._pen) + p.setPen(self.last_step_pen) + p.drawLine(self._last_line) + profiler('.drawLine()') + p.setPen(self._pen) path = self.path + # cap = path.capacity() + # if cap != self._last_cap: + # print(f'NEW CAPACITY: {self._last_cap} -> {cap}') + # self._last_cap = cap if path: p.drawPath(path) - profiler('.drawPath(path)') + profiler(f'.drawPath(path): {path.capacity()}') fp = self.fast_path if fp: @@ -695,3 +368,117 @@ class FastAppendCurve(pg.GraphicsObject): # if self._fill: # brush = self.opts['brush'] # p.fillPath(self.path, brush) + + def draw_last_datum( + self, + path: QPainterPath, + src_data: np.ndarray, + render_data: np.ndarray, + reset: bool, + array_key: str, + + ) -> None: + # default line draw last call + with self.reset_cache(): + x = render_data['index'] + y = render_data[array_key] + + # draw the "current" step graphic segment so it + # lines up with the "middle" of the current + # (OHLC) sample. + self._last_line = QLineF( + x[-2], y[-2], + x[-1], y[-1], + ) + + return x, y + + +# TODO: this should probably be a "downsampled" curve type +# that draws a bar-style (but for the px column) last graphics +# element such that the current datum in view can be shown +# (via it's max / min) even when highly zoomed out. +class FlattenedOHLC(Curve): + + def draw_last_datum( + self, + path: QPainterPath, + src_data: np.ndarray, + render_data: np.ndarray, + reset: bool, + array_key: str, + + ) -> None: + lasts = src_data[-2:] + x = lasts['index'] + y = lasts['close'] + + # draw the "current" step graphic segment so it + # lines up with the "middle" of the current + # (OHLC) sample. + self._last_line = QLineF( + x[-2], y[-2], + x[-1], y[-1] + ) + return x, y + + +class StepCurve(Curve): + + def declare_paintables( + self, + ) -> None: + self._last_step_rect = QRectF() + + def draw_last_datum( + self, + path: QPainterPath, + src_data: np.ndarray, + render_data: np.ndarray, + reset: bool, + array_key: str, + + w: float = 0.5, + + ) -> None: + + # TODO: remove this and instead place all step curve + # updating into pre-path data render callbacks. + # full input data + x = src_data['index'] + y = src_data[array_key] + + x_last = x[-1] + y_last = y[-1] + + # lol, commenting this makes step curves + # all "black" for me :eyeroll:.. + self._last_line = QLineF( + x_last - w, 0, + x_last + w, 0, + ) + self._last_step_rect = QRectF( + x_last - w, 0, + x_last + w, y_last, + ) + return x, y + + def sub_paint( + self, + p: QPainter, + profiler: pg.debug.Profiler, + + ) -> None: + # p.drawLines(*tuple(filter(bool, self._last_step_lines))) + # p.drawRect(self._last_step_rect) + p.fillRect(self._last_step_rect, self._brush) + profiler('.fillRect()') + + def sub_br( + self, + path_w: float, + path_h: float, + + ) -> (float, float): + # passthrough + return path_w, path_h diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 927ce5df..415827fb 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -32,7 +32,7 @@ import trio import pendulum import pyqtgraph as pg -from .. import brokers +# from .. import brokers from ..data.feed import open_feed from ._axes import YAxisLabel from ._chart import ( @@ -54,16 +54,16 @@ from ._forms import ( mk_order_pane_layout, ) from .order_mode import open_order_mode -# from .._profile import ( -# pg_profile_enabled, -# ms_slower_then, -# ) +from .._profile import ( + pg_profile_enabled, + ms_slower_then, +) from ..log import get_logger log = get_logger(__name__) # TODO: load this from a config.toml! -_quote_throttle_rate: int = 12 # Hz +_quote_throttle_rate: int = 22 # Hz # a working tick-type-classes template @@ -96,28 +96,19 @@ def chart_maxmin( Compute max and min datums "in view" for range limits. ''' - array = ohlcv_shm.array - ifirst = array[0]['index'] - last_bars_range = chart.bars_range() - l, lbar, rbar, r = last_bars_range - in_view = array[lbar - ifirst:rbar - ifirst + 1] + out = chart.maxmin() - if not in_view.size: - log.warning('Resetting chart to data') - chart.default_view() + if out is None: return (last_bars_range, 0, 0, 0) - mx, mn = ( - np.nanmax(in_view['high']), - np.nanmin(in_view['low'],) - ) + mn, mx = out mx_vlm_in_view = 0 if vlm_chart: - mx_vlm_in_view = np.max( - in_view['volume'] - ) + out = vlm_chart.maxmin() + if out: + _, mx_vlm_in_view = out return ( last_bars_range, @@ -272,6 +263,7 @@ async def graphics_update_loop( 'vars': { 'tick_margin': tick_margin, 'i_last': i_last, + 'i_last_append': i_last, 'last_mx_vlm': last_mx_vlm, 'last_mx': last_mx, 'last_mn': last_mn, @@ -318,6 +310,7 @@ def graphics_update_cycle( ds: DisplayState, wap_in_history: bool = False, trigger_all: bool = False, # flag used by prepend history updates + prepend_update_index: Optional[int] = None, ) -> None: # TODO: eventually optimize this whole graphics stack with ``numba`` @@ -327,9 +320,12 @@ def graphics_update_cycle( profiler = pg.debug.Profiler( msg=f'Graphics loop cycle for: `{chart.name}`', - disabled=True, # not pg_profile_enabled(), - gt=1/12 * 1e3, - # gt=ms_slower_then, + delayed=True, + disabled=not pg_profile_enabled(), + # disabled=True, + ms_threshold=ms_slower_then, + + # ms_threshold=1/12 * 1e3, ) # unpack multi-referenced components @@ -340,12 +336,12 @@ def graphics_update_cycle( vars = ds.vars tick_margin = vars['tick_margin'] - update_uppx = 6 + update_uppx = 16 for sym, quote in ds.quotes.items(): # compute the first available graphic's x-units-per-pixel - xpx = vlm_chart.view.x_uppx() + uppx = vlm_chart.view.x_uppx() # NOTE: vlm may be written by the ``brokerd`` backend # event though a tick sample is not emitted. @@ -364,26 +360,58 @@ def graphics_update_cycle( i_diff = i_step - vars['i_last'] vars['i_last'] = i_step + append_diff = i_step - vars['i_last_append'] + + # update the "last datum" (aka extending the flow graphic with + # new data) only if the number of unit steps is >= the number of + # such unit steps per pixel (aka uppx). Iow, if the zoom level + # is such that a datum(s) update to graphics wouldn't span + # to a new pixel, we don't update yet. + do_append = (append_diff >= uppx) + if do_append: + vars['i_last_append'] = i_step + + do_rt_update = uppx < update_uppx + # print( + # f'append_diff:{append_diff}\n' + # f'uppx:{uppx}\n' + # f'do_append: {do_append}' + # ) + + # TODO: we should only run mxmn when we know + # an update is due via ``do_append`` above. ( brange, mx_in_view, mn_in_view, mx_vlm_in_view, ) = ds.maxmin() - l, lbar, rbar, r = brange mx = mx_in_view + tick_margin mn = mn_in_view - tick_margin - profiler('maxmin call') - liv = r > i_step # the last datum is in view + + profiler('`ds.maxmin()` call') + + liv = r >= i_step # the last datum is in view + + if ( + prepend_update_index is not None + and lbar > prepend_update_index + ): + # on a history update (usually from the FSP subsys) + # if the segment of history that is being prepended + # isn't in view there is no reason to do a graphics + # update. + log.debug('Skipping prepend graphics cycle: frame not in view') + return # don't real-time "shift" the curve to the # left unless we get one of the following: if ( ( - i_diff > 0 # no new sample step - and xpx < 4 # chart is zoomed out very far - and r >= i_step # the last datum isn't in view + # i_diff > 0 # no new sample step + do_append + # and uppx < 4 # chart is zoomed out very far and liv ) or trigger_all @@ -393,6 +421,11 @@ def graphics_update_cycle( # and then iff update curves and shift? chart.increment_view(steps=i_diff) + if vlm_chart: + vlm_chart.increment_view(steps=i_diff) + + profiler('view incremented') + if vlm_chart: # always update y-label ds.vlm_sticky.update_from_data( @@ -401,17 +434,16 @@ def graphics_update_cycle( if ( ( - xpx < update_uppx or i_diff > 0 + do_rt_update + or do_append and liv ) or trigger_all ): # TODO: make it so this doesn't have to be called # once the $vlm is up? - vlm_chart.update_graphics_from_array( + vlm_chart.update_graphics_from_flow( 'volume', - array, - # UGGGh, see ``maxmin()`` impl in `._fsp` for # the overlayed plotitems... we need a better # bay to invoke a maxmin per overlay.. @@ -424,6 +456,7 @@ def graphics_update_cycle( # connected to update accompanying overlay # graphics.. ) + profiler('`vlm_chart.update_graphics_from_flow()`') if ( mx_vlm_in_view != vars['last_mx_vlm'] @@ -432,22 +465,29 @@ def graphics_update_cycle( vlm_chart.view._set_yrange( yrange=yrange, ) + profiler('`vlm_chart.view._set_yrange()`') # print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}') vars['last_mx_vlm'] = mx_vlm_in_view for curve_name, flow in vlm_chart._flows.items(): + + if not flow.render: + continue + update_fsp_chart( vlm_chart, flow, curve_name, array_key=curve_name, + # do_append=uppx < update_uppx, + do_append=do_append, ) # is this even doing anything? # (pretty sure it's the real-time # resizing from last quote?) fvb = flow.plot.vb fvb._set_yrange( - autoscale_linked_plots=False, + # autoscale_linked_plots=False, name=curve_name, ) @@ -496,13 +536,17 @@ def graphics_update_cycle( # update ohlc sampled price bars if ( - xpx < update_uppx - or i_diff > 0 + do_rt_update + or do_append or trigger_all ): - chart.update_graphics_from_array( + # TODO: we should always update the "last" datum + # since the current range should at least be updated + # to it's max/min on the last pixel. + chart.update_graphics_from_flow( chart.name, - array, + # do_append=uppx < update_uppx, + do_append=do_append, ) # iterate in FIFO order per tick-frame @@ -515,8 +559,9 @@ def graphics_update_cycle( # tick frames to determine the y-range for chart # auto-scaling. # TODO: we need a streaming minmax algo here, see def above. - mx = max(price + tick_margin, mx) - mn = min(price - tick_margin, mn) + if liv: + mx = max(price + tick_margin, mx) + mn = min(price - tick_margin, mn) if typ in clear_types: @@ -539,9 +584,8 @@ def graphics_update_cycle( if wap_in_history: # update vwap overlay line - chart.update_graphics_from_array( + chart.update_graphics_from_flow( 'bar_wap', - array, ) # L1 book label-line updates @@ -557,7 +601,7 @@ def graphics_update_cycle( if ( label is not None - # and liv + and liv ): label.update_fields( {'level': price, 'size': size} @@ -571,7 +615,7 @@ def graphics_update_cycle( typ in _tick_groups['asks'] # TODO: instead we could check if the price is in the # y-view-range? - # and liv + and liv ): l1.ask_label.update_fields({'level': price, 'size': size}) @@ -579,7 +623,7 @@ def graphics_update_cycle( typ in _tick_groups['bids'] # TODO: instead we could check if the price is in the # y-view-range? - # and liv + and liv ): l1.bid_label.update_fields({'level': price, 'size': size}) @@ -594,6 +638,7 @@ def graphics_update_cycle( main_vb._ic is None or not main_vb._ic.is_set() ): + # print(f'updating range due to mxmn') main_vb._set_yrange( # TODO: we should probably scale # the view margin based on the size @@ -604,10 +649,24 @@ def graphics_update_cycle( yrange=(mn, mx), ) - vars['last_mx'], vars['last_mn'] = mx, mn + # XXX: update this every draw cycle to make L1-always-in-view work. + vars['last_mx'], vars['last_mn'] = mx, mn # run synchronous update on all linked flows for curve_name, flow in chart._flows.items(): + + if ( + not (do_rt_update or do_append) + and liv + # even if we're downsampled bigly + # draw the last datum in the final + # px column to give the user the mx/mn + # range of that set. + ): + # always update the last datum-element + # graphic for all flows + flow.draw_last(array_key=curve_name) + # TODO: should the "main" (aka source) flow be special? if curve_name == chart.data_key: continue @@ -643,7 +702,7 @@ async def display_symbol_data( ) # historical data fetch - brokermod = brokers.get_brokermod(provider) + # brokermod = brokers.get_brokermod(provider) # ohlc_status_done = sbar.open_status( # 'retreiving OHLC history.. ', @@ -692,32 +751,31 @@ async def display_symbol_data( # create main OHLC chart chart = linked.plot_ohlc_main( symbol, - bars, + ohlcv, sidepane=pp_pane, ) + chart.default_view() chart._feeds[symbol.key] = feed chart.setFocus() # plot historical vwap if available wap_in_history = False - if brokermod._show_wap_in_history: + # XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?! + # if brokermod._show_wap_in_history: - if 'bar_wap' in bars.dtype.fields: - wap_in_history = True - chart.draw_curve( - name='bar_wap', - data=bars, - add_label=False, - ) + # if 'bar_wap' in bars.dtype.fields: + # wap_in_history = True + # chart.draw_curve( + # name='bar_wap', + # shm=ohlcv, + # color='default_light', + # add_label=False, + # ) # size view to data once at outset chart.cv._set_yrange() - # TODO: a data view api that makes this less shit - chart._shm = ohlcv - chart._flows[chart.data_key].shm = ohlcv - # NOTE: we must immediately tell Qt to show the OHLC chart # to avoid a race where the subplots get added/shown to # the linked set *before* the main price chart! @@ -780,6 +838,5 @@ async def display_symbol_data( sbar._status_groups[loading_sym_key][1]() # let the app run.. bby - chart.default_view() # linked.graphics_cycle() await trio.sleep_forever() diff --git a/piker/ui/_editors.py b/piker/ui/_editors.py index 9a99d2f7..03fd208e 100644 --- a/piker/ui/_editors.py +++ b/piker/ui/_editors.py @@ -343,7 +343,7 @@ class SelectRect(QtGui.QGraphicsRectItem): nbars = ixmx - ixmn + 1 chart = self._chart - data = chart._arrays[chart.name][ixmn:ixmx] + data = chart._flows[chart.name].shm.array[ixmn:ixmx] if len(data): std = data['close'].std() diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 7b69acef..1d1a9c3d 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -49,10 +49,6 @@ from . import _style log = get_logger(__name__) # pyqtgraph global config -# might as well enable this for now? -pg.useOpenGL = True -pg.enableExperimental = True - # engage core tweaks that give us better response # latency then the average pg user _do_overrides() diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py new file mode 100644 index 00000000..01bbbece --- /dev/null +++ b/piker/ui/_flows.py @@ -0,0 +1,1204 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +High level streaming graphics primitives. + +This is an intermediate layer which associates real-time low latency +graphics primitives with underlying FSP related data structures for fast +incremental update. + +''' +from __future__ import annotations +from typing import ( + Optional, + Callable, + Union, +) + +import msgspec +import numpy as np +from numpy.lib import recfunctions as rfn +import pyqtgraph as pg +from PyQt5.QtGui import QPainterPath +from PyQt5.QtCore import QLineF + +from ..data._sharedmem import ( + ShmArray, +) +from .._profile import ( + pg_profile_enabled, + # ms_slower_then, +) +from ._pathops import ( + gen_ohlc_qpath, + ohlc_to_line, + to_step_format, + xy_downsample, +) +from ._ohlc import ( + BarItems, + # bar_from_ohlc_row, +) +from ._curve import ( + Curve, + StepCurve, + FlattenedOHLC, +) +from ..log import get_logger + + +log = get_logger(__name__) + + +# class FlowsTable(msgspec.Struct): +# ''' +# Data-AGGRegate: high level API onto multiple (categorized) +# ``Flow``s with high level processing routines for +# multi-graphics computations and display. + +# ''' +# flows: dict[str, np.ndarray] = {} + + +def update_ohlc_to_line( + src_shm: ShmArray, + array_key: str, + src_update: np.ndarray, + slc: slice, + ln: int, + first: int, + last: int, + is_append: bool, + +) -> np.ndarray: + + fields = ['open', 'high', 'low', 'close'] + return ( + rfn.structured_to_unstructured(src_update[fields]), + slc, + ) + + +def ohlc_flat_to_xy( + r: Renderer, + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + +) -> tuple[ + np.ndarray, + np.nd.array, + str, +]: + # TODO: in the case of an existing ``.update_xy()`` + # should we be passing in array as an xy arrays tuple? + + # 2 more datum-indexes to capture zero at end + x_flat = r.x_data[r._xy_first:r._xy_last] + y_flat = r.y_data[r._xy_first:r._xy_last] + + # slice to view + ivl, ivr = vr + x_iv_flat = x_flat[ivl:ivr] + y_iv_flat = y_flat[ivl:ivr] + + # reshape to 1d for graphics rendering + y_iv = y_iv_flat.reshape(-1) + x_iv = x_iv_flat.reshape(-1) + + return x_iv, y_iv, 'all' + + +def render_baritems( + flow: Flow, + graphics: BarItems, + read: tuple[ + int, int, np.ndarray, + int, int, np.ndarray, + ], + profiler: pg.debug.Profiler, + **kwargs, + +) -> None: + ''' + Graphics management logic for a ``BarItems`` object. + + Mostly just logic to determine when and how to downsample an OHLC + lines curve into a flattened line graphic and when to display one + graphic or the other. + + TODO: this should likely be moved into some kind of better abstraction + layer, if not a `Renderer` then something just above it? + + ''' + bars = graphics + + # if no source data renderer exists create one. + self = flow + show_bars: bool = False + + r = self._src_r + if not r: + show_bars = True + # OHLC bars path renderer + r = self._src_r = Renderer( + flow=self, + format_xy=gen_ohlc_qpath, + last_read=read, + ) + + ds_curve_r = Renderer( + flow=self, + last_read=read, + + # incr update routines + allocate_xy=ohlc_to_line, + update_xy=update_ohlc_to_line, + format_xy=ohlc_flat_to_xy, + ) + + curve = FlattenedOHLC( + name=f'{flow.name}_ds_ohlc', + color=bars._color, + ) + curve.hide() + self.plot.addItem(curve) + + # baseline "line" downsampled OHLC curve that should + # kick on only when we reach a certain uppx threshold. + self._render_table = (ds_curve_r, curve) + + ds_r, curve = self._render_table + + # do checks for whether or not we require downsampling: + # - if we're **not** downsampling then we simply want to + # render the bars graphics curve and update.. + # - if instead we are in a downsamplig state then we to + x_gt = 6 + uppx = curve.x_uppx() + in_line = should_line = curve.isVisible() + if ( + should_line + and uppx < x_gt + ): + # print('FLIPPING TO BARS') + should_line = False + + elif ( + not should_line + and uppx >= x_gt + ): + # print('FLIPPING TO LINE') + should_line = True + + profiler(f'ds logic complete line={should_line}') + + # do graphics updates + if should_line: + r = ds_r + graphics = curve + profiler('updated ds curve') + + else: + graphics = bars + + if show_bars: + bars.show() + + changed_to_line = False + if ( + not in_line + and should_line + ): + # change to line graphic + log.info( + f'downsampling to line graphic {self.name}' + ) + bars.hide() + curve.show() + curve.update() + changed_to_line = True + + elif in_line and not should_line: + # change to bars graphic + log.info(f'showing bars graphic {self.name}') + curve.hide() + bars.show() + bars.update() + + return ( + graphics, + r, + {'read_from_key': False}, + should_line, + changed_to_line, + ) + + +def update_step_xy( + src_shm: ShmArray, + array_key: str, + y_update: np.ndarray, + slc: slice, + ln: int, + first: int, + last: int, + is_append: bool, + +) -> np.ndarray: + + # for a step curve we slice from one datum prior + # to the current "update slice" to get the previous + # "level". + if is_append: + start = max(last - 1, 0) + end = src_shm._last.value + new_y = src_shm._array[start:end][array_key] + slc = slice(start, end) + + else: + new_y = y_update + + return ( + np.broadcast_to( + new_y[:, None], (new_y.size, 2), + ), + slc, + ) + + +def step_to_xy( + r: Renderer, + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + +) -> tuple[ + np.ndarray, + np.nd.array, + str, +]: + + # 2 more datum-indexes to capture zero at end + x_step = r.x_data[r._xy_first:r._xy_last+2] + y_step = r.y_data[r._xy_first:r._xy_last+2] + + lasts = array[['index', array_key]] + last = lasts[array_key][-1] + y_step[-1] = last + + # slice out in-view data + ivl, ivr = vr + ys_iv = y_step[ivl:ivr+1] + xs_iv = x_step[ivl:ivr+1] + + # flatten to 1d + y_iv = ys_iv.reshape(ys_iv.size) + x_iv = xs_iv.reshape(xs_iv.size) + + # print( + # f'ys_iv : {ys_iv[-s:]}\n' + # f'y_iv: {y_iv[-s:]}\n' + # f'xs_iv: {xs_iv[-s:]}\n' + # f'x_iv: {x_iv[-s:]}\n' + # ) + + return x_iv, y_iv, 'all' + + +class Flow(msgspec.Struct): # , frozen=True): + ''' + (Financial Signal-)Flow compound type which wraps a real-time + shm array stream with displayed graphics (curves, charts) + for high level access and control as well as efficient incremental + update. + + The intention is for this type to eventually be capable of shm-passing + of incrementally updated graphics stream data between actors. + + ''' + name: str + plot: pg.PlotItem + graphics: Curve + _shm: ShmArray + + is_ohlc: bool = False + render: bool = True # toggle for display loop + + # downsampling state + _last_uppx: float = 0 + _in_ds: bool = False + + # map from uppx -> (downsampled data, incremental graphics) + _src_r: Optional[Renderer] = None + _render_table: dict[ + Optional[int], + tuple[Renderer, pg.GraphicsItem], + ] = (None, None) + + # TODO: hackery to be able to set a shm later + # but whilst also allowing this type to hashable, + # likely will require serializable token that is used to attach + # to the underlying shm ref after startup? + # _shm: Optional[ShmArray] = None # currently, may be filled in "later" + + # last read from shm (usually due to an update call) + _last_read: Optional[np.ndarray] = None + + # cache of y-range values per x-range input. + _mxmns: dict[tuple[int, int], tuple[float, float]] = {} + + @property + def shm(self) -> ShmArray: + return self._shm + + # TODO: remove this and only allow setting through + # private ``._shm`` attr? + @shm.setter + def shm(self, shm: ShmArray) -> ShmArray: + self._shm = shm + + def maxmin( + self, + lbar: int, + rbar: int, + + ) -> tuple[float, float]: + ''' + Compute the cached max and min y-range values for a given + x-range determined by ``lbar`` and ``rbar``. + + ''' + rkey = (lbar, rbar) + cached_result = self._mxmns.get(rkey) + if cached_result: + return cached_result + + shm = self.shm + if shm is None: + mxmn = None + + else: # new block for profiling?.. + arr = shm.array + + # build relative indexes into shm array + # TODO: should we just add/use a method + # on the shm to do this? + ifirst = arr[0]['index'] + slice_view = arr[ + lbar - ifirst: + (rbar - ifirst) + 1 + ] + + if not slice_view.size: + mxmn = None + + else: + if self.is_ohlc: + ylow = np.min(slice_view['low']) + yhigh = np.max(slice_view['high']) + + else: + view = slice_view[self.name] + ylow = np.min(view) + yhigh = np.max(view) + + mxmn = ylow, yhigh + + if mxmn is not None: + # cache new mxmn result + self._mxmns[rkey] = mxmn + + return mxmn + + def view_range(self) -> tuple[int, int]: + ''' + Return the indexes in view for the associated + plot displaying this flow's data. + + ''' + vr = self.plot.viewRect() + return int(vr.left()), int(vr.right()) + + def datums_range(self) -> tuple[ + int, int, int, int, int, int + ]: + ''' + Return a range tuple for the datums present in view. + + ''' + l, r = self.view_range() + + # TODO: avoid this and have shm passed + # in earlier. + if self.shm is None: + # haven't initialized the flow yet + return (0, l, 0, 0, r, 0) + + array = self.shm.array + index = array['index'] + start = index[0] + end = index[-1] + lbar = max(l, start) + rbar = min(r, end) + return ( + start, l, lbar, rbar, r, end, + ) + + def read( + self, + array_field: Optional[str] = None, + + ) -> tuple[ + int, int, np.ndarray, + int, int, np.ndarray, + ]: + # read call + array = self.shm.array + + indexes = array['index'] + ifirst = indexes[0] + ilast = indexes[-1] + + ifirst, l, lbar, rbar, r, ilast = self.datums_range() + + # get read-relative indices adjusting + # for master shm index. + lbar_i = max(l, ifirst) - ifirst + rbar_i = min(r, ilast) - ifirst + + if array_field: + array = array[array_field] + + # TODO: we could do it this way as well no? + # to_draw = array[lbar - ifirst:(rbar - ifirst) + 1] + in_view = array[lbar_i: rbar_i + 1] + + return ( + # abs indices + full data set + ifirst, ilast, array, + + # relative indices + in view datums + lbar_i, rbar_i, in_view, + ) + + def update_graphics( + self, + use_vr: bool = True, + render: bool = True, + array_key: Optional[str] = None, + + profiler: Optional[pg.debug.Profiler] = None, + do_append: bool = True, + + **kwargs, + + ) -> pg.GraphicsObject: + ''' + Read latest datums from shm and render to (incrementally) + render to graphics. + + ''' + profiler = pg.debug.Profiler( + msg=f'Flow.update_graphics() for {self.name}', + disabled=not pg_profile_enabled(), + ms_threshold=4, + # ms_threshold=ms_slower_then, + ) + # shm read and slice to view + read = ( + xfirst, xlast, src_array, + ivl, ivr, in_view, + ) = self.read() + + profiler('read src shm data') + + graphics = self.graphics + + if ( + not in_view.size + or not render + ): + # print('exiting early') + return graphics + + slice_to_head: int = -1 + should_redraw: bool = False + rkwargs = {} + + if isinstance(graphics, BarItems): + # XXX: special case where we change out graphics + # to a line after a certain uppx threshold. + ( + graphics, + r, + rkwargs, + should_line, + changed_to_line, + ) = render_baritems( + self, + graphics, + read, + profiler, + **kwargs, + ) + # bars = True + should_redraw = changed_to_line or not should_line + + else: + r = self._src_r + if not r: + # just using for ``.diff()`` atm.. + r = self._src_r = Renderer( + flow=self, + # TODO: rename this to something with ohlc + last_read=read, + ) + + # ``Curve`` derivative case(s): + array_key = array_key or self.name + # print(array_key) + + # ds update config + new_sample_rate: bool = False + should_ds: bool = r._in_ds + showing_src_data: bool = not r._in_ds + + # step_mode = getattr(graphics, '_step_mode', False) + step_mode = isinstance(graphics, StepCurve) + if step_mode: + + r.allocate_xy = to_step_format + r.update_xy = update_step_xy + r.format_xy = step_to_xy + + # TODO: append logic inside ``.render()`` isn't + # correct yet for step curves.. remove this to see it. + should_redraw = True + slice_to_head = -2 + + # downsampling incremental state checking + # check for and set std m4 downsample conditions + uppx = graphics.x_uppx() + uppx_diff = (uppx - self._last_uppx) + profiler(f'diffed uppx {uppx}') + if ( + uppx > 1 + and abs(uppx_diff) >= 1 + ): + log.debug( + f'{array_key} sampler change: {self._last_uppx} -> {uppx}' + ) + self._last_uppx = uppx + + new_sample_rate = True + showing_src_data = False + should_ds = True + should_redraw = True + + elif ( + uppx <= 2 + and self._in_ds + ): + # we should de-downsample back to our original + # source data so we clear our path data in prep + # to generate a new one from original source data. + new_sample_rate = True + showing_src_data = True + should_ds = False + should_redraw = True + + # MAIN RENDER LOGIC: + # - determine in view data and redraw on range change + # - determine downsampling ops if needed + # - (incrementally) update ``QPainterPath`` + + out = r.render( + read, + array_key, + profiler, + uppx=uppx, + # use_vr=True, + + # TODO: better way to detect and pass this? + # if we want to eventually cache renderers for a given uppx + # we should probably use this as a key + state? + should_redraw=should_redraw, + new_sample_rate=new_sample_rate, + should_ds=should_ds, + showing_src_data=showing_src_data, + + slice_to_head=slice_to_head, + do_append=do_append, + + **rkwargs, + ) + + if not out: + log.warning(f'{self.name} failed to render!?') + return graphics + + path, data, reset = out + + # XXX: SUPER UGGGHHH... without this we get stale cache + # graphics that don't update until you downsampler again.. + if reset: + with graphics.reset_cache(): + # assign output paths to graphicis obj + graphics.path = r.path + graphics.fast_path = r.fast_path + else: + # assign output paths to graphicis obj + graphics.path = r.path + graphics.fast_path = r.fast_path + + graphics.draw_last_datum( + path, + src_array, + data, + reset, + array_key, + ) + + # TODO: is this ever better? + # graphics.prepareGeometryChange() + # profiler('.prepareGeometryChange()') + + # TODO: does this actuallly help us in any way (prolly should + # look at the source / ask ogi). I think it avoid artifacts on + # wheel-scroll downsampling curve updates? + graphics.update() + profiler('.update()') + + # track downsampled state + self._in_ds = r._in_ds + + return graphics + + def draw_last( + self, + array_key: Optional[str] = None, + + ) -> None: + + # shm read and slice to view + ( + xfirst, xlast, src_array, + ivl, ivr, in_view, + ) = self.read() + + g = self.graphics + array_key = array_key or self.name + x, y = g.draw_last_datum( + g.path, + src_array, + src_array, + False, # never reset path + array_key, + ) + + if self._in_ds: + # we only care about the last pixel's + # worth of data since that's all the screen + # can represent on the last column where + # the most recent datum is being drawn. + uppx = self._last_uppx + y = y[-uppx:] + ymn, ymx = y.min(), y.max() + # print(f'drawing uppx={uppx} mxmn line: {ymn}, {ymx}') + g._last_line = QLineF( + x[-2], ymn, + x[-1], ymx, + ) + + +def by_index_and_key( + renderer: Renderer, + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + +) -> tuple[ + np.ndarray, + np.ndarray, + np.ndarray, +]: + return array['index'], array[array_key], 'all' + + +class Renderer(msgspec.Struct): + + flow: Flow + # last array view read + last_read: Optional[tuple] = None + + # default just returns index, and named array from data + format_xy: Callable[ + [np.ndarray, str], + tuple[np.ndarray] + ] = by_index_and_key + + # optional pre-graphics xy formatted data which + # is incrementally updated in sync with the source data. + allocate_xy: Optional[Callable[ + [int, slice], + tuple[np.ndarray, np.nd.array] + ]] = None + + update_xy: Optional[Callable[ + [int, slice], None] + ] = None + + x_data: Optional[np.ndarray] = None + y_data: Optional[np.ndarray] = None + + # indexes which slice into the above arrays (which are allocated + # based on source data shm input size) and allow retrieving + # incrementally updated data. + _xy_first: int = 0 + _xy_last: int = 0 + + # output graphics rendering, the main object + # processed in ``QGraphicsObject.paint()`` + path: Optional[QPainterPath] = None + fast_path: Optional[QPainterPath] = None + + # XXX: just ideas.. + # called on the final data (transform) output to convert + # to "graphical data form" a format that can be passed to + # the ``.draw()`` implementation. + # graphics_t: Optional[Callable[ShmArray, np.ndarray]] = None + # graphics_t_shm: Optional[ShmArray] = None + + # path graphics update implementation methods + # prepend_fn: Optional[Callable[QPainterPath, QPainterPath]] = None + # append_fn: Optional[Callable[QPainterPath, QPainterPath]] = None + + # downsampling state + _last_uppx: float = 0 + _in_ds: bool = False + + # incremental update state(s) + _last_vr: Optional[tuple[float, float]] = None + _last_ivr: Optional[tuple[float, float]] = None + + def diff( + self, + new_read: tuple[np.ndarray], + + ) -> tuple[ + np.ndarray, + np.ndarray, + ]: + ( + last_xfirst, + last_xlast, + last_array, + last_ivl, + last_ivr, + last_in_view, + ) = self.last_read + + # TODO: can the renderer just call ``Flow.read()`` directly? + # unpack latest source data read + ( + xfirst, + xlast, + array, + ivl, + ivr, + in_view, + ) = new_read + + # compute the length diffs between the first/last index entry in + # the input data and the last indexes we have on record from the + # last time we updated the curve index. + prepend_length = int(last_xfirst - xfirst) + append_length = int(xlast - last_xlast) + + # blah blah blah + # do diffing for prepend, append and last entry + return ( + slice(xfirst, last_xfirst), + prepend_length, + append_length, + slice(last_xlast, xlast), + ) + + def draw_path( + self, + x: np.ndarray, + y: np.ndarray, + connect: Union[str, np.ndarray] = 'all', + path: Optional[QPainterPath] = None, + redraw: bool = False, + + ) -> QPainterPath: + + path_was_none = path is None + + if redraw and path: + path.clear() + + # TODO: avoid this? + if self.fast_path: + self.fast_path.clear() + + # profiler('cleared paths due to `should_redraw=True`') + + path = pg.functions.arrayToQPath( + x, + y, + connect=connect, + finiteCheck=False, + + # reserve mem allocs see: + # - https://doc.qt.io/qt-5/qpainterpath.html#reserve + # - https://doc.qt.io/qt-5/qpainterpath.html#capacity + # - https://doc.qt.io/qt-5/qpainterpath.html#clear + # XXX: right now this is based on had hoc checks on a + # hidpi 3840x2160 4k monitor but we should optimize for + # the target display(s) on the sys. + # if no_path_yet: + # graphics.path.reserve(int(500e3)) + # path=path, # path re-use / reserving + ) + + # avoid mem allocs if possible + if path_was_none: + path.reserve(path.capacity()) + + return path + + def render( + self, + + new_read, + array_key: str, + profiler: pg.debug.Profiler, + uppx: float = 1, + + # redraw and ds flags + should_redraw: bool = False, + new_sample_rate: bool = False, + should_ds: bool = False, + showing_src_data: bool = True, + + do_append: bool = True, + slice_to_head: int = -1, + use_fpath: bool = True, + + # only render datums "in view" of the ``ChartView`` + use_vr: bool = True, + read_from_key: bool = True, + + ) -> list[QPainterPath]: + ''' + Render the current graphics path(s) + + There are (at least) 3 stages from source data to graphics data: + - a data transform (which can be stored in additional shm) + - a graphics transform which converts discrete basis data to + a `float`-basis view-coords graphics basis. (eg. ``ohlc_flatten()``, + ``step_path_arrays_from_1d()``, etc.) + + - blah blah blah (from notes) + + ''' + # TODO: can the renderer just call ``Flow.read()`` directly? + # unpack latest source data read + ( + xfirst, + xlast, + array, + ivl, + ivr, + in_view, + ) = new_read + + ( + pre_slice, + prepend_length, + append_length, + post_slice, + ) = self.diff(new_read) + + if self.update_xy: + + shm = self.flow.shm + + if self.y_data is None: + # we first need to allocate xy data arrays + # from the source data. + assert self.allocate_xy + self.x_data, self.y_data = self.allocate_xy( + shm, + array_key, + ) + self._xy_first = shm._first.value + self._xy_last = shm._last.value + profiler('allocated xy history') + + if prepend_length: + y_prepend = shm._array[pre_slice] + + if read_from_key: + y_prepend = y_prepend[array_key] + + xy_data, xy_slice = self.update_xy( + shm, + array_key, + + # this is the pre-sliced, "normally expected" + # new data that an updater would normally be + # expected to process, however in some cases (like + # step curves) the updater routine may want to do + # the source history-data reading itself, so we pass + # both here. + y_prepend, + + pre_slice, + prepend_length, + self._xy_first, + self._xy_last, + is_append=False, + ) + self.y_data[xy_slice] = xy_data + self._xy_first = shm._first.value + profiler('prepended xy history: {prepend_length}') + + if append_length: + y_append = shm._array[post_slice] + + if read_from_key: + y_append = y_append[array_key] + + xy_data, xy_slice = self.update_xy( + shm, + array_key, + + y_append, + post_slice, + append_length, + + self._xy_first, + self._xy_last, + is_append=True, + ) + # self.y_data[post_slice] = xy_data + # self.y_data[xy_slice or post_slice] = xy_data + self.y_data[xy_slice] = xy_data + self._xy_last = shm._last.value + profiler('appened xy history: {append_length}') + + if use_vr: + array = in_view + # else: + # ivl, ivr = xfirst, xlast + + hist = array[:slice_to_head] + + # xy-path data transform: convert source data to a format + # able to be passed to a `QPainterPath` rendering routine. + if not len(hist): + return + + x_out, y_out, connect = self.format_xy( + self, + # TODO: hist here should be the pre-sliced + # x/y_data in the case where allocate_xy is + # defined? + hist, + array_key, + (ivl, ivr), + ) + + profiler('sliced input arrays') + + if ( + use_vr + ): + # if a view range is passed, plan to draw the + # source ouput that's "in view" of the chart. + view_range = (ivl, ivr) + # print(f'{self._name} vr: {view_range}') + + profiler(f'view range slice {view_range}') + + vl, vr = view_range + + zoom_or_append = False + last_vr = self._last_vr + last_ivr = self._last_ivr or vl, vr + + # incremental in-view data update. + if last_vr: + # relative slice indices + lvl, lvr = last_vr + # abs slice indices + al, ar = last_ivr + + # left_change = abs(x_iv[0] - al) >= 1 + # right_change = abs(x_iv[-1] - ar) >= 1 + + if ( + # likely a zoom view change + (vr - lvr) > 2 or vl < lvl + # append / prepend update + # we had an append update where the view range + # didn't change but the data-viewed (shifted) + # underneath, so we need to redraw. + # or left_change and right_change and last_vr == view_range + + # not (left_change and right_change) and ivr + # ( + # or abs(x_iv[ivr] - livr) > 1 + ): + zoom_or_append = True + + self._last_vr = view_range + if len(x_out): + self._last_ivr = x_out[0], x_out[slice_to_head] + + # redraw conditions + if ( + prepend_length > 0 + or new_sample_rate + or append_length > 0 + or zoom_or_append + ): + should_redraw = True + + path = self.path + fast_path = self.fast_path + reset = False + + # redraw the entire source data if we have either of: + # - no prior path graphic rendered or, + # - we always intend to re-render the data only in view + if ( + path is None + or should_redraw + ): + # print(f"{self.flow.name} -> REDRAWING BRUH") + if new_sample_rate and showing_src_data: + log.info(f'DEDOWN -> {array_key}') + self._in_ds = False + + elif should_ds and uppx > 1: + + x_out, y_out = xy_downsample( + x_out, + y_out, + uppx, + ) + reset = True + profiler(f'FULL PATH downsample redraw={should_ds}') + self._in_ds = True + + path = self.draw_path( + x=x_out, + y=y_out, + connect=connect, + path=path, + redraw=True, + ) + + profiler( + 'generated fresh path. ' + f'(should_redraw: {should_redraw} ' + f'should_ds: {should_ds} new_sample_rate: {new_sample_rate})' + ) + + # TODO: get this piecewise prepend working - right now it's + # giving heck on vwap... + # elif prepend_length: + # breakpoint() + + # prepend_path = pg.functions.arrayToQPath( + # x[0:prepend_length], + # y[0:prepend_length], + # connect='all' + # ) + + # # swap prepend path in "front" + # old_path = graphics.path + # graphics.path = prepend_path + # # graphics.path.moveTo(new_x[0], new_y[0]) + # graphics.path.connectPath(old_path) + + elif ( + append_length > 0 + and do_append + and not should_redraw + ): + # print(f'{array_key} append len: {append_length}') + new_x = x_out[-append_length - 2:] # slice_to_head] + new_y = y_out[-append_length - 2:] # slice_to_head] + profiler('sliced append path') + + profiler( + f'diffed array input, append_length={append_length}' + ) + + # if should_ds: + # new_x, new_y = xy_downsample( + # new_x, + # new_y, + # uppx, + # ) + # profiler(f'fast path downsample redraw={should_ds}') + + append_path = self.draw_path( + x=new_x, + y=new_y, + connect=connect, + path=fast_path, + ) + profiler('generated append qpath') + + if use_fpath: + # an attempt at trying to make append-updates faster.. + if fast_path is None: + fast_path = append_path + # fast_path.reserve(int(6e3)) + else: + fast_path.connectPath(append_path) + size = fast_path.capacity() + profiler(f'connected fast path w size: {size}') + + # print(f"append_path br: {append_path.boundingRect()}") + # graphics.path.moveTo(new_x[0], new_y[0]) + # path.connectPath(append_path) + + # XXX: lol this causes a hang.. + # graphics.path = graphics.path.simplified() + else: + size = path.capacity() + profiler(f'connected history path w size: {size}') + path.connectPath(append_path) + + self.path = path + self.fast_path = fast_path + + # TODO: eventually maybe we can implement some kind of + # transform on the ``QPainterPath`` that will more or less + # detect the diff in "elements" terms? + # update diff state since we've now rendered paths. + self.last_read = new_read + + return self.path, array, reset diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 52763375..af03a9c6 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -75,6 +75,7 @@ def update_fsp_chart( flow, graphics_name: str, array_key: Optional[str], + **kwargs, ) -> None: @@ -93,10 +94,10 @@ def update_fsp_chart( # update graphics # NOTE: this does a length check internally which allows it # staying above the last row check below.. - chart.update_graphics_from_array( + chart.update_graphics_from_flow( graphics_name, - array, array_key=array_key or graphics_name, + **kwargs, ) # XXX: re: ``array_key``: fsp func names must be unique meaning we @@ -106,9 +107,6 @@ def update_fsp_chart( # read from last calculated value and update any label last_val_sticky = chart._ysticks.get(graphics_name) if last_val_sticky: - # array = shm.array[array_key] - # if len(array): - # value = array[-1] last = last_row[array_key] last_val_sticky.update_from_data(-1, last) @@ -246,20 +244,18 @@ async def run_fsp_ui( chart.draw_curve( name=name, - data=shm.array, + shm=shm, overlay=True, color='default_light', array_key=name, **conf.get('chart_kwargs', {}) ) - # specially store ref to shm for lookup in display loop - chart._flows[name].shm = shm else: # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( name=name, - array=shm.array, + shm=shm, array_key=name, sidepane=sidepane, @@ -271,12 +267,6 @@ async def run_fsp_ui( **conf.get('chart_kwargs', {}) ) - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - chart._flows[chart.data_key].shm = shm - # should **not** be the same sub-chart widget assert chart.name != linkedsplits.chart.name @@ -445,12 +435,16 @@ class FspAdmin: # wait for graceful shutdown signal async with stream.subscribe() as stream: async for msg in stream: - if msg == 'update': + info = msg.get('fsp_update') + if info: # if the chart isn't hidden try to update # the data on screen. if not self.linked.isHidden(): log.info(f'Re-syncing graphics for fsp: {ns_path}') - self.linked.graphics_cycle(trigger_all=True) + self.linked.graphics_cycle( + trigger_all=True, + prepend_update_index=info['first'], + ) else: log.info(f'recved unexpected fsp engine msg: {msg}') @@ -626,7 +620,7 @@ async def open_vlm_displays( shm = ohlcv chart = linked.add_plot( name='volume', - array=shm.array, + shm=shm, array_key='volume', sidepane=sidepane, @@ -639,10 +633,9 @@ async def open_vlm_displays( # the curve item internals are pretty convoluted. style='step', ) - chart._flows['volume'].shm = ohlcv # force 0 to always be in view - def maxmin( + def multi_maxmin( names: list[str], ) -> tuple[float, float]: @@ -658,7 +651,7 @@ async def open_vlm_displays( return 0, mx - chart.view.maxmin = partial(maxmin, names=['volume']) + chart.view.maxmin = partial(multi_maxmin, names=['volume']) # TODO: fix the x-axis label issue where if you put # the axis on the left it's totally not lined up... @@ -666,11 +659,6 @@ async def open_vlm_displays( # chart.hideAxis('right') # chart.showAxis('left') - # XXX: ONLY for sub-chart fsps, overlays have their - # data looked up from the chart's internal array set. - # TODO: we must get a data view api going STAT!! - chart._shm = shm - # send back new chart to caller task_status.started(chart) @@ -685,9 +673,9 @@ async def open_vlm_displays( last_val_sticky.update_from_data(-1, value) - vlm_curve = chart.update_graphics_from_array( + vlm_curve = chart.update_graphics_from_flow( 'volume', - shm.array, + # shm.array, ) # size view to data once at outset @@ -753,19 +741,20 @@ async def open_vlm_displays( 'dolla_vlm', 'dark_vlm', ] - dvlm_rate_fields = [ - 'dvlm_rate', - 'dark_dvlm_rate', - ] + # dvlm_rate_fields = [ + # 'dvlm_rate', + # 'dark_dvlm_rate', + # ] trade_rate_fields = [ 'trade_rate', 'dark_trade_rate', ] group_mxmn = partial( - maxmin, + multi_maxmin, # keep both regular and dark vlm in view - names=fields + dvlm_rate_fields, + names=fields, + # names=fields + dvlm_rate_fields, ) # add custom auto range handler @@ -795,9 +784,8 @@ async def open_vlm_displays( color = 'bracket' curve, _ = chart.draw_curve( - # name='dolla_vlm', name=name, - data=shm.array, + shm=shm, array_key=name, overlay=pi, color=color, @@ -812,7 +800,6 @@ async def open_vlm_displays( # ``.draw_curve()``. flow = chart._flows[name] assert flow.plot is pi - flow.shm = shm chart_curves( fields, @@ -834,11 +821,11 @@ async def open_vlm_displays( ) await started.wait() - chart_curves( - dvlm_rate_fields, - dvlm_pi, - fr_shm, - ) + # chart_curves( + # dvlm_rate_fields, + # dvlm_pi, + # fr_shm, + # ) # TODO: is there a way to "sync" the dual axes such that only # one curve is needed? @@ -847,7 +834,9 @@ async def open_vlm_displays( # liquidity events (well at least on low OHLC periods - 1s). vlm_curve.hide() chart.removeItem(vlm_curve) - chart._flows.pop('volume') + vflow = chart._flows['volume'] + vflow.render = False + # avoid range sorting on volume once disabled chart.view.disable_auto_yrange() @@ -874,7 +863,7 @@ async def open_vlm_displays( ) # add custom auto range handler tr_pi.vb.maxmin = partial( - maxmin, + multi_maxmin, # keep both regular and dark vlm in view names=trade_rate_fields, ) @@ -902,10 +891,10 @@ async def open_vlm_displays( # built-in vlm fsps for target, conf in { - tina_vwap: { - 'overlay': 'ohlc', # overlays with OHLCV (main) chart - 'anchor': 'session', - }, + # tina_vwap: { + # 'overlay': 'ohlc', # overlays with OHLCV (main) chart + # 'anchor': 'session', + # }, }.items(): started = await admin.open_fsp_chart( target, diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index 4872f595..a659612a 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -20,7 +20,6 @@ Chart view box primitives """ from __future__ import annotations from contextlib import asynccontextmanager -# import itertools import time from typing import Optional, Callable @@ -35,10 +34,9 @@ import trio from ..log import get_logger from .._profile import pg_profile_enabled, ms_slower_then -from ._style import _min_points_to_show +# from ._style import _min_points_to_show from ._editors import SelectRect from . import _event -from ._ohlc import BarItems log = get_logger(__name__) @@ -486,15 +484,18 @@ class ChartView(ViewBox): # don't zoom more then the min points setting l, lbar, rbar, r = chart.bars_range() - vl = r - l + # vl = r - l - if ev.delta() > 0 and vl <= _min_points_to_show: - log.debug("Max zoom bruh...") - return + # if ev.delta() > 0 and vl <= _min_points_to_show: + # log.debug("Max zoom bruh...") + # return - if ev.delta() < 0 and vl >= len(chart._arrays[chart.name]) + 666: - log.debug("Min zoom bruh...") - return + # if ( + # ev.delta() < 0 + # and vl >= len(chart._flows[chart.name].shm.array) + 666 + # ): + # log.debug("Min zoom bruh...") + # return # actual scaling factor s = 1.015 ** (ev.delta() * -1 / 20) # self.state['wheelScaleFactor']) @@ -568,11 +569,23 @@ class ChartView(ViewBox): self._resetTarget() self.scaleBy(s, focal) + + # XXX: the order of the next 2 lines i'm pretty sure + # matters, we want the resize to trigger before the graphics + # update, but i gotta feelin that because this one is signal + # based (and thus not necessarily sync invoked right away) + # that calling the resize method manually might work better. self.sigRangeChangedManually.emit(mask) - # self._ic.set() - # self._ic = None - # self.chart.resume_all_feeds() + # XXX: without this is seems as though sometimes + # when zooming in from far out (and maybe vice versa?) + # the signal isn't being fired enough since if you pan + # just after you'll see further downsampling code run + # (pretty noticeable on the OHLC ds curve) but with this + # that never seems to happen? Only question is how much this + # "double work" is causing latency when these missing event + # fires don't happen? + self.maybe_downsample_graphics() ev.accept() @@ -734,9 +747,8 @@ class ChartView(ViewBox): # flag to prevent triggering sibling charts from the same linked # set from recursion errors. - autoscale_linked_plots: bool = True, + autoscale_linked_plots: bool = False, name: Optional[str] = None, - # autoscale_overlays: bool = False, ) -> None: ''' @@ -747,9 +759,12 @@ class ChartView(ViewBox): data set. ''' + name = self.name + # print(f'YRANGE ON {name}') profiler = pg.debug.Profiler( + msg=f'`ChartView._set_yrange()`: `{name}`', disabled=not pg_profile_enabled(), - gt=ms_slower_then, + ms_threshold=ms_slower_then, delayed=True, ) set_range = True @@ -775,45 +790,22 @@ class ChartView(ViewBox): elif yrange is not None: ylow, yhigh = yrange - # calculate max, min y values in viewable x-range from data. - # Make sure min bars/datums on screen is adhered. - else: - br = bars_range or chart.bars_range() - profiler(f'got bars range: {br}') - - # TODO: maybe should be a method on the - # chart widget/item? - # if False: - # if autoscale_linked_plots: - # # avoid recursion by sibling plots - # linked = self.linkedsplits - # plots = list(linked.subplots.copy().values()) - # main = linked.chart - # if main: - # plots.append(main) - - # for chart in plots: - # if chart and not chart._static_yrange: - # chart.cv._set_yrange( - # bars_range=br, - # autoscale_linked_plots=False, - # ) - # profiler('autoscaled linked plots') - if set_range: + # XXX: only compute the mxmn range + # if none is provided as input! if not yrange: - # XXX: only compute the mxmn range - # if none is provided as input! + # flow = chart._flows[name] yrange = self._maxmin() if yrange is None: - log.warning(f'No yrange provided for {self.name}!?') + log.warning(f'No yrange provided for {name}!?') + print(f"WTF NO YRANGE {name}") return ylow, yhigh = yrange - profiler(f'maxmin(): {yrange}') + profiler(f'callback ._maxmin(): {yrange}') # view margins: stay within a % of the "true range" diff = yhigh - ylow @@ -830,6 +822,8 @@ class ChartView(ViewBox): self.setYRange(ylow, yhigh) profiler(f'set limits: {(ylow, yhigh)}') + profiler.finish() + def enable_auto_yrange( self, src_vb: Optional[ChartView] = None, @@ -843,17 +837,9 @@ class ChartView(ViewBox): if src_vb is None: src_vb = self - # such that when a linked chart changes its range - # this local view is also automatically changed and - # resized to data. - src_vb.sigXRangeChanged.connect(self._set_yrange) - # splitter(s) resizing src_vb.sigResized.connect(self._set_yrange) - # mouse wheel doesn't emit XRangeChanged - src_vb.sigRangeChangedManually.connect(self._set_yrange) - # TODO: a smarter way to avoid calling this needlessly? # 2 things i can think of: # - register downsample-able graphics specially and only @@ -864,15 +850,16 @@ class ChartView(ViewBox): self.maybe_downsample_graphics ) - def disable_auto_yrange( - self, - ) -> None: + # mouse wheel doesn't emit XRangeChanged + src_vb.sigRangeChangedManually.connect(self._set_yrange) - # self._chart._static_yrange = 'axis' + # src_vb.sigXRangeChanged.connect(self._set_yrange) + # src_vb.sigXRangeChanged.connect( + # self.maybe_downsample_graphics + # ) + + def disable_auto_yrange(self) -> None: - self.sigXRangeChanged.disconnect( - self._set_yrange, - ) self.sigResized.disconnect( self._set_yrange, ) @@ -883,6 +870,11 @@ class ChartView(ViewBox): self._set_yrange, ) + # self.sigXRangeChanged.disconnect(self._set_yrange) + # self.sigXRangeChanged.disconnect( + # self.maybe_downsample_graphics + # ) + def x_uppx(self) -> float: ''' Return the "number of x units" within a single @@ -890,7 +882,7 @@ class ChartView(ViewBox): graphics items which are our children. ''' - graphics = list(self._chart._graphics.values()) + graphics = [f.graphics for f in self._chart._flows.values()] if not graphics: return 0 @@ -901,25 +893,21 @@ class ChartView(ViewBox): else: return 0 - def maybe_downsample_graphics(self): - - uppx = self.x_uppx() - if ( - # we probably want to drop this once we are "drawing in - # view" for downsampled flows.. - uppx and uppx > 16 - and self._ic is not None - ): - # don't bother updating since we're zoomed out bigly and - # in a pan-interaction, in which case we shouldn't be - # doing view-range based rendering (at least not yet). - # print(f'{uppx} exiting early!') - return + def maybe_downsample_graphics( + self, + autoscale_overlays: bool = True, + ): profiler = pg.debug.Profiler( + msg=f'ChartView.maybe_downsample_graphics() for {self.name}', disabled=not pg_profile_enabled(), - gt=3, - delayed=True, + + # XXX: important to avoid not seeing underlying + # ``.update_graphics_from_flow()`` nested profiling likely + # due to the way delaying works and garbage collection of + # the profiler in the delegated method calls. + ms_threshold=6, + # ms_threshold=ms_slower_then, ) # TODO: a faster single-loop-iterator way of doing this XD @@ -928,19 +916,32 @@ class ChartView(ViewBox): plots = linked.subplots | {chart.name: chart} for chart_name, chart in plots.items(): for name, flow in chart._flows.items(): - graphics = flow.graphics - use_vr = False - if isinstance(graphics, BarItems): - use_vr = True + if ( + not flow.render + + # XXX: super important to be aware of this. + # or not flow.graphics.isVisible() + ): + continue # pass in no array which will read and render from the last # passed array (normally provided by the display loop.) - chart.update_graphics_from_array( + chart.update_graphics_from_flow( name, - use_vr=use_vr, - profiler=profiler, + use_vr=True, ) - profiler(f'range change updated {chart_name}:{name}') - profiler.finish() + # for each overlay on this chart auto-scale the + # y-range to max-min values. + if autoscale_overlays: + overlay = chart.pi_overlay + if overlay: + for pi in overlay.overlays: + pi.vb._set_yrange( + # TODO: get the range once up front... + # bars_range=br, + ) + profiler('autoscaled linked plots') + + profiler(f'<{chart_name}>.update_graphics_from_flow({name})') diff --git a/piker/ui/_ohlc.py b/piker/ui/_ohlc.py index 44dbb0c2..dbe4c18e 100644 --- a/piker/ui/_ohlc.py +++ b/piker/ui/_ohlc.py @@ -25,17 +25,13 @@ from typing import ( import numpy as np import pyqtgraph as pg -from numba import njit, float64, int64 # , optional from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5.QtCore import QLineF, QPointF -# from numba import types as ntypes -# from ..data._source import numba_ohlc_dtype +from PyQt5.QtGui import QPainterPath from .._profile import pg_profile_enabled, ms_slower_then from ._style import hcolor from ..log import get_logger -from ._curve import FastAppendCurve -from ._compression import ohlc_flatten if TYPE_CHECKING: from ._chart import LinkedSplits @@ -46,7 +42,8 @@ log = get_logger(__name__) def bar_from_ohlc_row( row: np.ndarray, - w: float + # 0.5 is no overlap between arms, 1.0 is full overlap + w: float = 0.43 ) -> tuple[QLineF]: ''' @@ -84,128 +81,11 @@ def bar_from_ohlc_row( return [hl, o, c] -@njit( - # TODO: for now need to construct this manually for readonly arrays, see - # https://github.com/numba/numba/issues/4511 - # ntypes.tuple((float64[:], float64[:], float64[:]))( - # numba_ohlc_dtype[::1], # contiguous - # int64, - # optional(float64), - # ), - nogil=True -) -def path_arrays_from_ohlc( - data: np.ndarray, - start: int64, - bar_gap: float64 = 0.43, - -) -> np.ndarray: - ''' - Generate an array of lines objects from input ohlc data. - - ''' - size = int(data.shape[0] * 6) - - x = np.zeros( - # data, - shape=size, - dtype=float64, - ) - y, c = x.copy(), x.copy() - - # TODO: report bug for assert @ - # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 - for i, q in enumerate(data[start:], start): - - # TODO: ask numba why this doesn't work.. - # open, high, low, close, index = q[ - # ['open', 'high', 'low', 'close', 'index']] - - open = q['open'] - high = q['high'] - low = q['low'] - close = q['close'] - index = float64(q['index']) - - istart = i * 6 - istop = istart + 6 - - # x,y detail the 6 points which connect all vertexes of a ohlc bar - x[istart:istop] = ( - index - bar_gap, - index, - index, - index, - index, - index + bar_gap, - ) - y[istart:istop] = ( - open, - open, - low, - high, - close, - close, - ) - - # specifies that the first edge is never connected to the - # prior bars last edge thus providing a small "gap"/"space" - # between bars determined by ``bar_gap``. - c[istart:istop] = (1, 1, 1, 1, 1, 0) - - return x, y, c - - -def gen_qpath( - data: np.ndarray, - start: int, # XXX: do we need this? - w: float, - path: Optional[QtGui.QPainterPath] = None, - -) -> QtGui.QPainterPath: - - path_was_none = path is None - - profiler = pg.debug.Profiler( - msg='gen_qpath ohlc', - disabled=not pg_profile_enabled(), - gt=ms_slower_then, - ) - - x, y, c = path_arrays_from_ohlc( - data, - start, - bar_gap=w, - ) - profiler("generate stream with numba") - - # TODO: numba the internals of this! - path = pg.functions.arrayToQPath( - x, - y, - connect=c, - path=path, - ) - - # avoid mem allocs if possible - if path_was_none: - path.reserve(path.capacity()) - - profiler("generate path with arrayToQPath") - - return path - - class BarItems(pg.GraphicsObject): ''' "Price range" bars graphics rendered from a OHLC sampled sequence. ''' - sigPlotChanged = QtCore.pyqtSignal(object) - - # 0.5 is no overlap between arms, 1.0 is full overlap - w: float = 0.43 - def __init__( self, linked: LinkedSplits, @@ -225,388 +105,13 @@ class BarItems(pg.GraphicsObject): self.last_bar_pen = pg.mkPen(hcolor(last_bar_color), width=2) self._name = name - self._ds_line_xy: Optional[ - tuple[np.ndarray, np.ndarray] - ] = None - - # NOTE: this prevents redraws on mouse interaction which is - # a huge boon for avg interaction latency. - - # TODO: one question still remaining is if this makes trasform - # interactions slower (such as zooming) and if so maybe if/when - # we implement a "history" mode for the view we disable this in - # that mode? self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) - - self._pi = plotitem - self.path = QtGui.QPainterPath() - self.fast_path = QtGui.QPainterPath() - - self._xrange: tuple[int, int] - self._yrange: tuple[float, float] - self._vrange = None - - # TODO: don't render the full backing array each time - # self._path_data = None + self.path = QPainterPath() self._last_bar_lines: Optional[tuple[QLineF, ...]] = None - # track the current length of drawable lines within the larger array - self.start_index: int = 0 - self.stop_index: int = 0 - - # downsampler-line state - self._in_ds: bool = False - self._ds_line: Optional[FastAppendCurve] = None - self._dsi: tuple[int, int] = 0, 0 - self._xs_in_px: float = 0 - - def draw_from_data( - self, - ohlc: np.ndarray, - start: int = 0, - - ) -> QtGui.QPainterPath: - ''' - Draw OHLC datum graphics from a ``np.ndarray``. - - This routine is usually only called to draw the initial history. - - ''' - hist, last = ohlc[:-1], ohlc[-1] - self.path = gen_qpath(hist, start, self.w) - - # save graphics for later reference and keep track - # of current internal "last index" - # self.start_index = len(ohlc) - index = ohlc['index'] - self._xrange = (index[0], index[-1]) - self._yrange = ( - np.nanmax(ohlc['high']), - np.nanmin(ohlc['low']), - ) - - # up to last to avoid double draw of last bar - self._last_bar_lines = bar_from_ohlc_row(last, self.w) - - x, y = self._ds_line_xy = ohlc_flatten(ohlc) - - # TODO: figuring out the most optimial size for the ideal - # curve-path by, - # - calcing the display's max px width `.screen()` - # - drawing a curve and figuring out it's capacity: - # https://doc.qt.io/qt-5/qpainterpath.html#capacity - # - reserving that cap for each curve-mapped-to-shm with - - # - leveraging clearing when needed to redraw the entire - # curve that does not release mem allocs: - # https://doc.qt.io/qt-5/qpainterpath.html#clear - curve = FastAppendCurve( - y=y, - x=x, - name='OHLC', - color=self._color, - ) - curve.hide() - self._pi.addItem(curve) - self._ds_line = curve - - self._ds_xrange = (index[0], index[-1]) - - # trigger render - # https://doc.qt.io/qt-5/qgraphicsitem.html#update - self.update() - - return self.path - def x_uppx(self) -> int: - if self._ds_line: - return self._ds_line.x_uppx() - else: - return 0 - - def update_from_array( - self, - - # full array input history - ohlc: np.ndarray, - - # pre-sliced array data that's "in view" - ohlc_iv: np.ndarray, - - view_range: Optional[tuple[int, int]] = None, - profiler: Optional[pg.debug.Profiler] = None, - - ) -> None: - ''' - Update the last datum's bar graphic from input data array. - - This routine should be interface compatible with - ``pg.PlotCurveItem.setData()``. Normally this method in - ``pyqtgraph`` seems to update all the data passed to the - graphics object, and then update/rerender, but here we're - assuming the prior graphics havent changed (OHLC history rarely - does) so this "should" be simpler and faster. - - This routine should be made (transitively) as fast as possible. - - ''' - profiler = profiler or pg.debug.Profiler( - disabled=not pg_profile_enabled(), - gt=ms_slower_then, - delayed=True, - ) - - # index = self.start_index - istart, istop = self._xrange - ds_istart, ds_istop = self._ds_xrange - - index = ohlc['index'] - first_index, last_index = index[0], index[-1] - - # length = len(ohlc) - # prepend_length = istart - first_index - # append_length = last_index - istop - - # ds_prepend_length = ds_istart - first_index - # ds_append_length = last_index - ds_istop - - flip_cache = False - - x_gt = 16 - if self._ds_line: - uppx = self._ds_line.x_uppx() - else: - uppx = 0 - - should_line = self._in_ds - if ( - self._in_ds - and uppx < x_gt - ): - should_line = False - - elif ( - not self._in_ds - and uppx >= x_gt - ): - should_line = True - - profiler('ds logic complete') - - if should_line: - # update the line graphic - # x, y = self._ds_line_xy = ohlc_flatten(ohlc_iv) - x, y = self._ds_line_xy = ohlc_flatten(ohlc) - x_iv, y_iv = self._ds_line_xy = ohlc_flatten(ohlc_iv) - profiler('flattening bars to line') - - # TODO: we should be diffing the amount of new data which - # needs to be downsampled. Ideally we actually are just - # doing all the ds-ing in sibling actors so that the data - # can just be read and rendered to graphics on events of our - # choice. - # diff = do_diff(ohlc, new_bit) - curve = self._ds_line - curve.update_from_array( - x=x, - y=y, - x_iv=x_iv, - y_iv=y_iv, - view_range=None, # hack - profiler=profiler, - ) - profiler('updated ds line') - - if not self._in_ds: - # hide bars and show line - self.hide() - # XXX: is this actually any faster? - # self._pi.removeItem(self) - - # TODO: a `.ui()` log level? - log.info( - f'downsampling to line graphic {self._name}' - ) - - # self._pi.addItem(curve) - curve.show() - curve.update() - self._in_ds = True - - # stop here since we don't need to update bars path any more - # as we delegate to the downsample line with updates. - profiler.finish() - # print('terminating early') - return - - else: - # we should be in bars mode - - if self._in_ds: - # flip back to bars graphics and hide the downsample line. - log.info(f'showing bars graphic {self._name}') - - curve = self._ds_line - curve.hide() - # self._pi.removeItem(curve) - - # XXX: is this actually any faster? - # self._pi.addItem(self) - self.show() - self._in_ds = False - - # generate in_view path - self.path = gen_qpath( - ohlc_iv, - 0, - self.w, - # path=self.path, - ) - - # TODO: to make the downsampling faster - # - allow mapping only a range of lines thus only drawing as - # many bars as exactly specified. - # - move ohlc "flattening" to a shmarr - # - maybe move all this embedded logic to a higher - # level type? - - # if prepend_length: - # # new history was added and we need to render a new path - # prepend_bars = ohlc[:prepend_length] - - # if ds_prepend_length: - # ds_prepend_bars = ohlc[:ds_prepend_length] - # pre_x, pre_y = ohlc_flatten(ds_prepend_bars) - # fx = np.concatenate((pre_x, fx)) - # fy = np.concatenate((pre_y, fy)) - # profiler('ds line prepend diff complete') - - # if append_length: - # # generate new graphics to match provided array - # # path appending logic: - # # we need to get the previous "current bar(s)" for the time step - # # and convert it to a sub-path to append to the historical set - # # new_bars = ohlc[istop - 1:istop + append_length - 1] - # append_bars = ohlc[-append_length - 1:-1] - # # print(f'ohlc bars to append size: {append_bars.size}\n') - - # if ds_append_length: - # ds_append_bars = ohlc[-ds_append_length - 1:-1] - # post_x, post_y = ohlc_flatten(ds_append_bars) - # print( - # f'ds curve to append sizes: {(post_x.size, post_y.size)}' - # ) - # fx = np.concatenate((fx, post_x)) - # fy = np.concatenate((fy, post_y)) - - # profiler('ds line append diff complete') - - profiler('array diffs complete') - - # does this work? - last = ohlc[-1] - # fy[-1] = last['close'] - - # # incremental update and cache line datums - # self._ds_line_xy = fx, fy - - # maybe downsample to line - # ds = self.maybe_downsample() - # if ds: - # # if we downsample to a line don't bother with - # # any more path generation / updates - # self._ds_xrange = first_index, last_index - # profiler('downsampled to line') - # return - - # print(in_view.size) - - # if self.path: - # self.path = path - # self.path.reserve(path.capacity()) - # self.path.swap(path) - - # path updates - # if prepend_length: - # # XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path - # # y value not matching the first value from - # # ohlc[prepend_length + 1] ??? - # prepend_path = gen_qpath(prepend_bars, 0, self.w) - # old_path = self.path - # self.path = prepend_path - # self.path.addPath(old_path) - # profiler('path PREPEND') - - # if append_length: - # append_path = gen_qpath(append_bars, 0, self.w) - - # self.path.moveTo( - # float(istop - self.w), - # float(append_bars[0]['open']) - # ) - # self.path.addPath(append_path) - - # profiler('path APPEND') - # fp = self.fast_path - # if fp is None: - # self.fast_path = append_path - - # else: - # fp.moveTo( - # float(istop - self.w), float(new_bars[0]['open']) - # ) - # fp.addPath(append_path) - - # self.setCacheMode(QtWidgets.QGraphicsItem.NoCache) - # flip_cache = True - - self._xrange = first_index, last_index - - # trigger redraw despite caching - self.prepareGeometryChange() - - # generate new lines objects for updatable "current bar" - self._last_bar_lines = bar_from_ohlc_row(last, self.w) - - # last bar update - i, o, h, l, last, v = last[ - ['index', 'open', 'high', 'low', 'close', 'volume'] - ] - # assert i == self.start_index - 1 - # assert i == last_index - body, larm, rarm = self._last_bar_lines - - # XXX: is there a faster way to modify this? - rarm.setLine(rarm.x1(), last, rarm.x2(), last) - - # writer is responsible for changing open on "first" volume of bar - larm.setLine(larm.x1(), o, larm.x2(), o) - - if l != h: # noqa - - if body is None: - body = self._last_bar_lines[0] = QLineF(i, l, i, h) - else: - # update body - body.setLine(i, l, i, h) - - # XXX: pretty sure this is causing an issue where the bar has - # a large upward move right before the next sample and the body - # is getting set to None since the next bar is flat but the shm - # array index update wasn't read by the time this code runs. Iow - # we're doing this removal of the body for a bar index that is - # now out of date / from some previous sample. It's weird - # though because i've seen it do this to bars i - 3 back? - - profiler('last bar set') - - self.update() - profiler('.update()') - - if flip_cache: - self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) - - profiler.finish() + # we expect the downsample curve report this. + return 0 def boundingRect(self): # Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect @@ -630,16 +135,6 @@ class BarItems(pg.GraphicsObject): hb.bottomRight(), ) - # fp = self.fast_path - # if fp: - # fhb = fp.controlPointRect() - # print((hb_tl, hb_br)) - # print(fhb) - # hb_tl, hb_br = ( - # fhb.topLeft() + hb.topLeft(), - # fhb.bottomRight() + hb.bottomRight(), - # ) - # need to include last bar height or BR will be off mx_y = hb_br.y() mn_y = hb_tl.y() @@ -675,12 +170,9 @@ class BarItems(pg.GraphicsObject): ) -> None: - if self._in_ds: - return - profiler = pg.debug.Profiler( disabled=not pg_profile_enabled(), - gt=ms_slower_then, + ms_threshold=ms_slower_then, ) # p.setCompositionMode(0) @@ -692,13 +184,67 @@ class BarItems(pg.GraphicsObject): # lead to any perf gains other then when zoomed in to less bars # in view. p.setPen(self.last_bar_pen) - p.drawLines(*tuple(filter(bool, self._last_bar_lines))) - profiler('draw last bar') + if self._last_bar_lines: + p.drawLines(*tuple(filter(bool, self._last_bar_lines))) + profiler('draw last bar') p.setPen(self.bars_pen) p.drawPath(self.path) profiler(f'draw history path: {self.path.capacity()}') - # if self.fast_path: - # p.drawPath(self.fast_path) - # profiler('draw fast path') + def draw_last_datum( + self, + path: QPainterPath, + src_data: np.ndarray, + render_data: np.ndarray, + reset: bool, + array_key: str, + + fields: list[str] = [ + 'index', + 'open', + 'high', + 'low', + 'close', + ], + + ) -> None: + + # relevant fields + ohlc = src_data[fields] + last_row = ohlc[-1:] + + # individual values + last_row = i, o, h, l, last = ohlc[-1] + + # generate new lines objects for updatable "current bar" + self._last_bar_lines = bar_from_ohlc_row(last_row) + + # assert i == graphics.start_index - 1 + # assert i == last_index + body, larm, rarm = self._last_bar_lines + + # XXX: is there a faster way to modify this? + rarm.setLine(rarm.x1(), last, rarm.x2(), last) + + # writer is responsible for changing open on "first" volume of bar + larm.setLine(larm.x1(), o, larm.x2(), o) + + if l != h: # noqa + + if body is None: + body = self._last_bar_lines[0] = QLineF(i, l, i, h) + else: + # update body + body.setLine(i, l, i, h) + + # XXX: pretty sure this is causing an issue where the + # bar has a large upward move right before the next + # sample and the body is getting set to None since the + # next bar is flat but the shm array index update wasn't + # read by the time this code runs. Iow we're doing this + # removal of the body for a bar index that is now out of + # date / from some previous sample. It's weird though + # because i've seen it do this to bars i - 3 back? + + return ohlc['index'], ohlc['close'] diff --git a/piker/ui/_pathops.py b/piker/ui/_pathops.py new file mode 100644 index 00000000..83b46f43 --- /dev/null +++ b/piker/ui/_pathops.py @@ -0,0 +1,236 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +""" +Super fast ``QPainterPath`` generation related operator routines. + +""" +from __future__ import annotations +from typing import ( + # Optional, + TYPE_CHECKING, +) + +import numpy as np +from numpy.lib import recfunctions as rfn +from numba import njit, float64, int64 # , optional +# import pyqtgraph as pg +from PyQt5 import QtGui +# from PyQt5.QtCore import QLineF, QPointF + +from ..data._sharedmem import ( + ShmArray, +) +# from .._profile import pg_profile_enabled, ms_slower_then +from ._compression import ( + ds_m4, +) + +if TYPE_CHECKING: + from ._flows import Renderer + + +def xy_downsample( + x, + y, + uppx, + + x_spacer: float = 0.5, + +) -> tuple[np.ndarray, np.ndarray]: + + # downsample whenever more then 1 pixels per datum can be shown. + # always refresh data bounds until we get diffing + # working properly, see above.. + bins, x, y = ds_m4( + x, + y, + uppx, + ) + + # flatten output to 1d arrays suitable for path-graphics generation. + x = np.broadcast_to(x[:, None], y.shape) + x = (x + np.array( + [-x_spacer, 0, 0, x_spacer] + )).flatten() + y = y.flatten() + + return x, y + + +@njit( + # TODO: for now need to construct this manually for readonly arrays, see + # https://github.com/numba/numba/issues/4511 + # ntypes.tuple((float64[:], float64[:], float64[:]))( + # numba_ohlc_dtype[::1], # contiguous + # int64, + # optional(float64), + # ), + nogil=True +) +def path_arrays_from_ohlc( + data: np.ndarray, + start: int64, + bar_gap: float64 = 0.43, + +) -> np.ndarray: + ''' + Generate an array of lines objects from input ohlc data. + + ''' + size = int(data.shape[0] * 6) + + x = np.zeros( + # data, + shape=size, + dtype=float64, + ) + y, c = x.copy(), x.copy() + + # TODO: report bug for assert @ + # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 + for i, q in enumerate(data[start:], start): + + # TODO: ask numba why this doesn't work.. + # open, high, low, close, index = q[ + # ['open', 'high', 'low', 'close', 'index']] + + open = q['open'] + high = q['high'] + low = q['low'] + close = q['close'] + index = float64(q['index']) + + istart = i * 6 + istop = istart + 6 + + # x,y detail the 6 points which connect all vertexes of a ohlc bar + x[istart:istop] = ( + index - bar_gap, + index, + index, + index, + index, + index + bar_gap, + ) + y[istart:istop] = ( + open, + open, + low, + high, + close, + close, + ) + + # specifies that the first edge is never connected to the + # prior bars last edge thus providing a small "gap"/"space" + # between bars determined by ``bar_gap``. + c[istart:istop] = (1, 1, 1, 1, 1, 0) + + return x, y, c + + +def gen_ohlc_qpath( + r: Renderer, + data: np.ndarray, + array_key: str, # we ignore this + vr: tuple[int, int], + + start: int = 0, # XXX: do we need this? + # 0.5 is no overlap between arms, 1.0 is full overlap + w: float = 0.43, + +) -> QtGui.QPainterPath: + ''' + More or less direct proxy to ``path_arrays_from_ohlc()`` + but with closed in kwargs for line spacing. + + ''' + x, y, c = path_arrays_from_ohlc( + data, + start, + bar_gap=w, + ) + return x, y, c + + +def ohlc_to_line( + ohlc_shm: ShmArray, + data_field: str, + fields: list[str] = ['open', 'high', 'low', 'close'] + +) -> tuple[ + np.ndarray, + np.ndarray, +]: + ''' + Convert an input struct-array holding OHLC samples into a pair of + flattened x, y arrays with the same size (datums wise) as the source + data. + + ''' + y_out = ohlc_shm.ustruct(fields) + first = ohlc_shm._first.value + last = ohlc_shm._last.value + + # write pushed data to flattened copy + y_out[first:last] = rfn.structured_to_unstructured( + ohlc_shm.array[fields] + ) + + # generate an flat-interpolated x-domain + x_out = ( + np.broadcast_to( + ohlc_shm._array['index'][:, None], + ( + ohlc_shm._array.size, + # 4, # only ohlc + y_out.shape[1], + ), + ) + np.array([-0.5, 0, 0, 0.5]) + ) + assert y_out.any() + + return ( + x_out, + y_out, + ) + + +def to_step_format( + shm: ShmArray, + data_field: str, + index_field: str = 'index', + +) -> tuple[int, np.ndarray, np.ndarray]: + ''' + Convert an input 1d shm array to a "step array" format + for use by path graphics generation. + + ''' + i = shm._array['index'].copy() + out = shm._array[data_field].copy() + + x_out = np.broadcast_to( + i[:, None], + (i.size, 2), + ) + np.array([-0.5, 0.5]) + + y_out = np.empty((len(out), 2), dtype=out.dtype) + y_out[:] = out[:, np.newaxis] + + # start y at origin level + y_out[0, 0] = 0 + return x_out, y_out diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 3e230b71..a86fe816 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -873,7 +873,9 @@ async def process_trades_and_update_ui( mode.lines.remove_line(uuid=oid) # each clearing tick is responded individually - elif resp in ('broker_filled',): + elif resp in ( + 'broker_filled', + ): known_order = book._sent_orders.get(oid) if not known_order: