# piker: trading gear for hackers # Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . ''' Data vizualization APIs ''' from __future__ import annotations from functools import lru_cache from math import ( ceil, floor, ) from typing import ( Optional, Literal, TYPE_CHECKING, ) import msgspec import numpy as np import pyqtgraph as pg from PyQt5.QtCore import QLineF from ..data._sharedmem import ( ShmArray, ) from ..data.feed import Flume from ..data._formatters import ( IncrementalFormatter, OHLCBarsFmtr, # Plain OHLC renderer OHLCBarsAsCurveFmtr, # OHLC converted to line StepCurveFmtr, # "step" curve (like for vlm) ) from ..data._pathops import ( slice_from_time, ) from ._ohlc import ( BarItems, ) from ._curve import ( Curve, StepCurve, FlattenedOHLC, ) from ._render import Renderer from ..log import get_logger from .._profile import ( Profiler, pg_profile_enabled, ms_slower_then, ) if TYPE_CHECKING: from ._interaction import ChartView from ._chart import ChartPlotWidget from ._display import DisplayState log = get_logger(__name__) def render_baritems( viz: Viz, graphics: BarItems, read: tuple[ int, int, np.ndarray, int, int, np.ndarray, ], profiler: Profiler, **kwargs, ) -> None: ''' Graphics management logic for a ``BarItems`` object. Mostly just logic to determine when and how to downsample an OHLC lines curve into a flattened line graphic and when to display one graphic or the other. TODO: this should likely be moved into some kind of better abstraction layer, if not a `Renderer` then something just above it? ''' bars = graphics self = viz # TODO: make this a ``Viz`` method? r = self._src_r first_render: bool = False # if no source data renderer exists create one. if not r: first_render = True # OHLC bars path renderer r = self._src_r = Renderer( viz=self, fmtr=OHLCBarsFmtr( shm=viz.shm, viz=viz, ), ) ds_curve_r = Renderer( viz=self, fmtr=OHLCBarsAsCurveFmtr( shm=viz.shm, viz=viz, ), ) curve = FlattenedOHLC( name=f'{viz.name}_ds_ohlc', color=bars._color, ) viz.ds_graphics = curve curve.hide() self.plot.addItem(curve) # baseline "line" downsampled OHLC curve that should # kick on only when we reach a certain uppx threshold. self._alt_r = (ds_curve_r, curve) ds_r, curve = self._alt_r # print( # f'r: {r.fmtr.xy_slice}\n' # f'ds_r: {ds_r.fmtr.xy_slice}\n' # ) # do checks for whether or not we require downsampling: # - if we're **not** downsampling then we simply want to # render the bars graphics curve and update.. # - if instead we are in a downsamplig state then we to x_gt = 6 * (self.index_step() or 1) uppx = curve.x_uppx() # print(f'BARS UPPX: {uppx}') in_line = should_line = curve.isVisible() if ( in_line and uppx < x_gt ): # print('FLIPPING TO BARS') should_line = False viz._in_ds = False elif ( not in_line and uppx >= x_gt ): # print('FLIPPING TO LINE') should_line = True viz._in_ds = True profiler(f'ds logic complete line={should_line}') # do graphics updates if should_line: r = ds_r graphics = curve profiler('updated ds curve') else: graphics = bars if first_render: bars.show() changed_to_line = False if ( not in_line and should_line ): # change to line graphic log.info( f'downsampling to line graphic {self.name}' ) bars.hide() curve.show() curve.update() changed_to_line = True elif ( in_line and not should_line ): # change to bars graphic log.info( f'showing bars graphic {self.name}\n' f'first bars render?: {first_render}' ) curve.hide() bars.show() bars.update() # XXX: is this required? viz._in_ds = should_line should_redraw = ( changed_to_line or not should_line ) return ( graphics, r, should_redraw, should_line, ) _sample_rates: set[float] = {1, 60} class Viz(msgspec.Struct): # , frozen=True): ''' (Data) "Visualization" compound type which wraps a real-time shm array stream with displayed graphics (curves, charts) for high level access and control as well as efficient incremental update. The intention is for this type to eventually be capable of shm-passing of incrementally updated graphics stream data between actors. ''' name: str plot: pg.PlotItem _shm: ShmArray flume: Flume graphics: Curve | BarItems # for tracking y-mn/mx for y-axis auto-ranging yrange: tuple[float, float] = None # in some cases a viz may want to change its # graphical "type" or, "form" when downsampling, to # start this is only ever an interpolation line. ds_graphics: Optional[Curve] = None is_ohlc: bool = False render: bool = True # toggle for display loop _index_field: Literal[ 'index', 'time', # TODO: idea is to re-index all time series to a common # longest-len-int-index where we avoid gaps and instead # graph on the 0 -> N domain of the array index super set. # 'gapless', ] = 'time' # downsampling state _last_uppx: float = 0 _in_ds: bool = False _index_step: float | None = None # map from uppx -> (downsampled data, incremental graphics) _src_r: Renderer | None = None _alt_r: tuple[ Renderer, pg.GraphicsItem ] | None = None # cache of y-range values per x-range input. _mxmns: dict[ tuple[int, int], tuple[float, float], ] = {} # cache of median calcs from input read slice hashes # see `.median()` _meds: dict[ int, float, ] = {} # to make lru_cache-ing work, see # https://docs.python.org/3/faq/programming.html#how-do-i-cache-method-calls def __eq__(self, other): return self._shm._token == other._shm._token def __hash__(self): return hash(self._shm._token) @property def shm(self) -> ShmArray: return self._shm @property def index_field(self) -> str: return self._index_field def index_step( self, reset: bool = False, ) -> float: # attempt to dectect the best step size by scanning a sample of # the source data. if self._index_step is None: index = self.shm.array[self.index_field] isample = index[:16] mxdiff: None | float = None for step in np.diff(isample): if step in _sample_rates: if ( mxdiff is not None and step != mxdiff ): raise ValueError( f'Multiple step sizes detected? {mxdiff}, {step}' ) mxdiff = step self._index_step = max(mxdiff, 1) if ( mxdiff < 1 or 1 < mxdiff < 60 ): # TODO: remove this once we're sure the above scan loop # is rock solid. breakpoint() return self._index_step def maxmin( self, x_range: slice | tuple[int, int] | None = None, i_read_range: tuple[int, int] | None = None, use_caching: bool = True, ) -> tuple[float, float] | None: ''' Compute the cached max and min y-range values for a given x-range determined by ``lbar`` and ``rbar`` or ``None`` if no range can be determined (yet). ''' name = self.name profiler = Profiler( msg=f'`Viz[{name}].maxmin()`', disabled=not pg_profile_enabled(), ms_threshold=4, delayed=True, ) shm = self.shm if shm is None: return None do_print: bool = False arr = shm.array if i_read_range is not None: read_slc = slice(*i_read_range) index = arr[read_slc][self.index_field] if not index.size: return None ixrng = (index[0], index[-1]) else: if x_range is None: ( l, _, lbar, rbar, _, r, ) = self.datums_range() profiler(f'{self.name} got bars range') x_range = lbar, rbar # TODO: hash the slice instead maybe? # https://stackoverflow.com/a/29980872 lbar, rbar = ixrng = round(x_range[0]), round(x_range[1]) if use_caching: cached_result = self._mxmns.get(ixrng) if cached_result: if do_print: print( f'{self.name} CACHED maxmin\n' f'{ixrng} -> {cached_result}' ) read_slc, mxmn = cached_result return ( ixrng, read_slc, mxmn, ) if i_read_range is None: # get relative slice indexes into array if self.index_field == 'time': read_slc = slice_from_time( arr, start_t=lbar, stop_t=rbar, step=self.index_step(), ) else: ifirst = arr[0]['index'] read_slc = slice( lbar - ifirst, (rbar - ifirst) + 1 ) slice_view = arr[read_slc] if not slice_view.size: log.warning( f'{self.name} no maxmin in view?\n' f"{name} no mxmn for bars_range => {ixrng} !?" ) return None elif self.yrange: mxmn = self.yrange if do_print: print( f'{self.name} M4 maxmin:\n' f'{ixrng} -> {mxmn}' ) else: if self.is_ohlc: ylow = np.min(slice_view['low']) yhigh = np.max(slice_view['high']) else: view = slice_view[self.name] ylow = np.min(view) yhigh = np.max(view) mxmn = ylow, yhigh if ( do_print ): s = 3 print( f'{self.name} MANUAL ohlc={self.is_ohlc} maxmin:\n' f'{ixrng} -> {mxmn}\n' f'read_slc: {read_slc}\n' # f'abs_slc: {slice_view["index"]}\n' f'first {s}:\n{slice_view[:s]}\n' f'last {s}:\n{slice_view[-s:]}\n' ) # cache result for input range assert mxmn self._mxmns[ixrng] = (read_slc, mxmn) profiler(f'yrange mxmn cacheing: {x_range} -> {mxmn}') return ( ixrng, read_slc, mxmn, ) @lru_cache(maxsize=6116) def median_from_range( self, start: int, stop: int, ) -> float: in_view = self.shm.array[start:stop] if self.is_ohlc: return np.median(in_view['close']) else: return np.median(in_view[self.name]) def view_range(self) -> tuple[int, int]: ''' Return the start and stop x-indexes for the managed ``ViewBox``. ''' vr = self.plot.viewRect() return ( vr.left(), vr.right(), ) def bars_range(self) -> tuple[int, int, int, int]: ''' Return a range tuple for the left-view, left-datum, right-datum and right-view x-indices. ''' l, start, datum_start, datum_stop, stop, r = self.datums_range() return l, datum_start, datum_stop, r def datums_range( self, view_range: None | tuple[float, float] = None, index_field: str | None = None, array: None | np.ndarray = None, ) -> tuple[ int, int, int, int, int, int ]: ''' Return a range tuple for the datums present in view. ''' l, r = view_range or self.view_range() index_field: str = index_field or self.index_field if index_field == 'index': l, r = round(l), round(r) if array is None: array = self.shm.array index = array[index_field] first = floor(index[0]) last = ceil(index[-1]) # first and last datums in view determined by # l / r view range. leftmost = floor(l) rightmost = ceil(r) # invalid view state if ( r < l or l < 0 or r < 0 or (l > last and r > last) ): leftmost = first rightmost = last else: rightmost = max( min(last, rightmost), first, ) leftmost = min( max(first, leftmost), last, rightmost - 1, ) assert leftmost < rightmost return ( l, # left x-in-view first, # first datum leftmost, rightmost, last, # last_datum r, # right-x-in-view ) def read( self, array_field: Optional[str] = None, index_field: str | None = None, profiler: None | Profiler = None, ) -> tuple[ int, int, np.ndarray, int, int, np.ndarray, ]: ''' Read the underlying shm array buffer and return the data plus indexes for the first and last which has been written to. ''' index_field: str = index_field or self.index_field vr = l, r = self.view_range() # readable data array = self.shm.array if profiler: profiler('self.shm.array READ') ( l, ifirst, lbar, rbar, ilast, r, ) = self.datums_range( view_range=vr, index_field=index_field, array=array, ) if profiler: profiler('self.datums_range()') abs_slc = slice(ifirst, ilast) # TODO: support time slicing if index_field == 'time': read_slc = slice_from_time( array, start_t=lbar, stop_t=rbar, ) # TODO: maybe we should return this from the slicer call # above? in_view = array[read_slc] if in_view.size: abs_indx = in_view['index'] abs_slc = slice( int(abs_indx[0]), int(abs_indx[-1]), ) if profiler: profiler( '`slice_from_time(' f'start_t={lbar}' f'stop_t={rbar})' ) # array-index slicing # TODO: can we do time based indexing using arithmetic presuming # a uniform time stamp step size? else: # get read-relative indices adjusting for master shm index. lbar_i = max(l, ifirst) - ifirst rbar_i = min(r, ilast) - ifirst # NOTE: the slice here does NOT include the extra ``+ 1`` # BUT the ``in_view`` slice DOES.. read_slc = slice(lbar_i, rbar_i) in_view = array[lbar_i: rbar_i + 1] # in_view = array[lbar_i-1: rbar_i+1] # XXX: same as ^ # to_draw = array[lbar - ifirst:(rbar - ifirst) + 1] if profiler: profiler('index arithmetic for slicing') if array_field: array = array[array_field] return ( # abs indices + full data set abs_slc.start, abs_slc.stop, array, # relative (read) indices + in view data read_slc.start, read_slc.stop, in_view, ) def update_graphics( self, render: bool = True, array_key: str | None = None, profiler: Profiler | None = None, do_append: bool = True, **kwargs, ) -> tuple[ bool, tuple[int, int], pg.GraphicsObject, ]: ''' Read latest datums from shm and render to (incrementally) render to graphics. ''' profiler = Profiler( msg=f'Viz.update_graphics() for {self.name}', disabled=not pg_profile_enabled(), ms_threshold=ms_slower_then, # ms_threshold=4, ) # shm read and slice to view read = ( xfirst, xlast, src_array, ivl, ivr, in_view, ) = self.read(profiler=profiler) profiler('read src shm data') graphics = self.graphics if ( not in_view.size or not render ): # print(f'{self.name} not in view (exiting early)') return ( False, (ivl, ivr), graphics, ) should_redraw: bool = False ds_allowed: bool = True # guard for m4 activation # TODO: probably specialize ``Renderer`` types instead of # these logic checks? # - put these blocks into a `.load_renderer()` meth? # - consider a OHLCRenderer, StepCurveRenderer, Renderer? r = self._src_r if isinstance(graphics, BarItems): # XXX: special case where we change out graphics # to a line after a certain uppx threshold. ( graphics, r, should_redraw, ds_allowed, # in line mode? ) = render_baritems( self, graphics, read, profiler, **kwargs, ) elif not r: if isinstance(graphics, StepCurve): r = self._src_r = Renderer( viz=self, fmtr=StepCurveFmtr( shm=self.shm, viz=self, ), ) else: r = self._src_r if not r: # just using for ``.diff()`` atm.. r = self._src_r = Renderer( viz=self, fmtr=IncrementalFormatter( shm=self.shm, viz=self, ), ) # ``Curve`` derivative case(s): array_key = array_key or self.name # ds update config new_sample_rate: bool = False should_ds: bool = r._in_ds showing_src_data: bool = not r._in_ds # downsampling incremental state checking # check for and set std m4 downsample conditions uppx = graphics.x_uppx() uppx_diff = (uppx - self._last_uppx) profiler(f'diffed uppx {uppx}') if ( uppx > 1 and abs(uppx_diff) >= 1 and ds_allowed ): log.debug( f'{array_key} sampler change: {self._last_uppx} -> {uppx}' ) self._last_uppx = uppx new_sample_rate = True showing_src_data = False should_ds = True should_redraw = True # "back to source" case: # this more or less skips use of the m4 downsampler # inside ``Renderer.render()`` which results in a path # drawn verbatim to match the xy source data. elif ( uppx <= 2 and self._in_ds ): # we should de-downsample back to our original # source data so we clear our path data in prep # to generate a new one from original source data. new_sample_rate = True should_ds = False should_redraw = True showing_src_data = True # MAIN RENDER LOGIC: # - determine in view data and redraw on range change # - determine downsampling ops if needed # - (incrementally) update ``QPainterPath`` out = r.render( read, array_key, profiler, uppx=uppx, # TODO: better way to detect and pass this? # if we want to eventually cache renderers for a given uppx # we should probably use this as a key + state? should_redraw=should_redraw, new_sample_rate=new_sample_rate, should_ds=should_ds, showing_src_data=showing_src_data, do_append=do_append, ) if not out: log.warning(f'{self.name} failed to render!?') return ( False, (ivl, ivr), graphics, ) path, reset_cache = out # XXX: SUPER UGGGHHH... without this we get stale cache # graphics that "smear" across the view horizontally # when panning and the first datum is out of view.. reset_cache = False if ( reset_cache ): # assign output paths to graphicis obj but # after a coords-cache reset. with graphics.reset_cache(): graphics.path = r.path graphics.fast_path = r.fast_path self.draw_last( array_key=array_key, last_read=read, reset_cache=reset_cache, ) else: # assign output paths to graphicis obj graphics.path = r.path graphics.fast_path = r.fast_path self.draw_last( array_key=array_key, last_read=read, reset_cache=reset_cache, ) # graphics.draw_last_datum( # path, # src_array, # reset_cache, # array_key, # index_field=self.index_field, # ) # TODO: does this actuallly help us in any way (prolly should # look at the source / ask ogi). I think it avoid artifacts on # wheel-scroll downsampling curve updates? # TODO: is this ever better? graphics.prepareGeometryChange() profiler('.prepareGeometryChange()') graphics.update() profiler('.update()') # track downsampled state self._in_ds = r._in_ds return ( True, (ivl, ivr), graphics, ) def draw_last( self, array_key: str | None = None, last_read: tuple | None = None, reset_cache: bool = False, only_last_uppx: bool = False, ) -> None: # shm read and slice to view ( xfirst, xlast, src_array, ivl, ivr, in_view, ) = last_read or self.read() array_key = array_key or self.name gfx = self.graphics # the renderer is downsampling we choose # to always try and update a single (interpolating) # line segment that spans and tries to display # the last uppx's worth of datums. # we only care about the last pixel's # worth of data since that's all the screen # can represent on the last column where # the most recent datum is being drawn. uppx = ceil(gfx.x_uppx()) if ( (self._in_ds or only_last_uppx) and uppx > 0 ): alt_renderer = self._alt_r if alt_renderer: renderer, gfx = alt_renderer else: renderer = self._src_r fmtr = renderer.fmtr x = fmtr.x_1d y = fmtr.y_1d iuppx = ceil(uppx) if alt_renderer: iuppx = ceil(uppx / fmtr.flat_index_ratio) y = y[-iuppx:] ymn, ymx = y.min(), y.max() try: x_start = x[-iuppx] except IndexError: # we're less then an x-px wide so just grab the start # datum index. x_start = x[0] gfx._last_line = QLineF( x_start, ymn, x[-1], ymx, ) # print( # f'updating DS curve {self.name}@{time_step}s\n' # f'drawing uppx={uppx} mxmn line: {ymn}, {ymx}' # ) else: x, y = gfx.draw_last_datum( gfx.path, src_array, reset_cache, # never reset path array_key, self.index_field, ) # print(f'updating NOT DS curve {self.name}') gfx.update() def default_view( self, bars_from_y: int = int(616 * 3/8), y_offset: int = 0, # in datums do_ds: bool = True, ) -> None: ''' Set the plot's viewbox to a "default" startup setting where we try to show the underlying data range sanely. ''' shm: ShmArray = self.shm array: np.ndarray = shm.array view: ChartView = self.plot.vb ( vl, first_datum, datum_start, datum_stop, last_datum, vr, ) = self.datums_range(array=array) # invalid case: view is not ordered correctly # return and expect caller to sort it out. if ( vl > vr ): log.warning( 'Skipping `.default_view()` viewbox not initialized..\n' f'l -> r: {vl} -> {vr}\n' f'datum_start -> datum_stop: {datum_start} -> {datum_stop}\n' ) return chartw: ChartPlotWidget = self.plot.getViewWidget() index_field = self.index_field step = self.index_step() if index_field == 'time': # transform l -> r view range values into # data index domain to determine how view # should be reset to better match data. read_slc = slice_from_time( array, start_t=vl, stop_t=vr, step=step, ) else: read_slc = slice(0, datum_stop - datum_start + 1) index_iv = array[index_field][read_slc] uppx: float = self.graphics.x_uppx() or 1 # l->r distance in scene units, no larger then data span data_diff = last_datum - first_datum rl_diff = vr - vl rescale_to_data: bool = False # new_uppx: float = 1 if rl_diff > data_diff: rescale_to_data = True rl_diff = data_diff new_uppx: float = data_diff / self.px_width() # orient by offset from the y-axis including # space to compensate for the L1 labels. if not y_offset: _, l1_offset = chartw.pre_l1_xs() offset = l1_offset if ( rescale_to_data ): offset = (offset / uppx) * new_uppx else: offset = (y_offset * step) + uppx*step # align right side of view to the rightmost datum + the selected # offset from above. r_reset = (self.graphics.x_last() or last_datum) + offset # no data is in view so check for the only 2 sane cases: # - entire view is LEFT of data # - entire view is RIGHT of data if index_iv.size == 0: log.warning(f'No data in view for {vl} -> {vr}') # 2 cases either the view is to the left or right of the # data set. if ( vl <= first_datum and vr <= first_datum ): l_reset = first_datum elif ( vl >= last_datum and vr >= last_datum ): l_reset = r_reset - rl_diff else: log.warning(f'Unknown view state {vl} -> {vr}') return else: # maintain the l->r view distance l_reset = r_reset - rl_diff # remove any custom user yrange setttings if chartw._static_yrange == 'axis': chartw._static_yrange = None view.setXRange( min=l_reset, max=r_reset, padding=0, ) if do_ds: view.interact_graphics_cycle() # view._set_yrange(viz=self) def incr_info( self, ds: DisplayState, update_uppx: float = 16, is_1m: bool = False, ) -> tuple: ''' Return a slew of graphics related data-flow metrics to do with incrementally updating a data view. Output info includes, ---------------------- uppx: float x-domain units-per-pixel. liv: bool telling if the "last datum" is in vie"last datum" is in view. do_px_step: bool recent data append(s) are enough that the next physical pixel-column should be used for drawing. i_diff_t: float the difference between the last globally recorded time stamp aand the current one. append_diff: int diff between last recorded "append index" (the index at whic `do_px_step` was last returned `True`) and the current index. do_rt_update: bool `True` only when the uppx is less then some threshold defined by `update_uppx`. should_tread: bool determines the first step, globally across all callers, that the a set of data views should be "treaded", shifted in the x-domain such that the last datum in view is always in the same spot in non-view/scene (aka GUI coord) terms. ''' # get most recent right datum index in-view l, start, datum_start, datum_stop, stop, r = self.datums_range() lasts = self.shm.array[-1] i_step = lasts['index'] # last index-specific step. i_step_t = lasts['time'] # last time step. # fqsn = self.flume.symbol.fqsn # check if "last (is) in view" -> is a real-time update necessary? if self.index_field == 'index': liv = (r >= i_step) else: liv = (r >= i_step_t) # compute the first available graphic obj's x-units-per-pixel # TODO: make this not loop through all vizs each time! uppx = self.plot.vb.x_uppx() # NOTE: this used to be implemented in a dedicated # "increment task": ``check_for_new_bars()`` but it doesn't # make sense to do a whole task switch when we can just do # this simple index-diff and all the fsp sub-curve graphics # are diffed on each draw cycle anyway; so updates to the # "curve" length is already automatic. globalz = ds.globalz varz = ds.hist_vars if is_1m else ds.vars last_key = 'i_last_slow_t' if is_1m else 'i_last_t' glast = globalz[last_key] # calc datums diff since last global increment i_diff_t: float = i_step_t - glast # when the current step is now greater then the last we have # read from the display state globals, we presume that the # underlying source shm buffer has added a new sample and thus # we should increment the global view a step (i.e. tread the # view in place to keep the current datum at the same spot on # screen). should_tread: bool = False if i_diff_t > 0: globalz[last_key] = i_step_t should_tread = True # update the "last datum" (aka extending the vizs graphic with # new data) only if the number of unit steps is >= the number of # such unit steps per pixel (aka uppx). Iow, if the zoom level # is such that a datum(s) update to graphics wouldn't span # to a new pixel, we don't update yet. i_last_append = varz['i_last_append'] append_diff: int = i_step - i_last_append do_px_step = (append_diff * self.index_step()) >= uppx do_rt_update = (uppx < update_uppx) if ( do_px_step ): varz['i_last_append'] = i_step # print( # f'DOING APPEND => {fqsn}\n' # f'i_step: {i_step}\n' # f'i_step_t: {i_step_t}\n' # f'glast: {glast}\n' # f'last_append: {i_last_append}\n' # f'r: {r}\n' # '-----------------------------\n' # f'uppx: {uppx}\n' # f'liv: {liv}\n' # f'do_px_step: {do_px_step}\n' # f'i_diff_t: {i_diff_t}\n' # f'do_rt_update: {do_rt_update}\n' # f'append_diff: {append_diff}\n' # f'should_tread: {should_tread}\n' # ) varz['i_last'] = i_step # TODO: pack this into a struct? return ( uppx, liv, do_px_step, i_diff_t, append_diff, do_rt_update, should_tread, ) def px_width(self) -> float: ''' Return the width of the view box containing this graphic in pixel units. ''' vb = self.plot.vb if not vb: return 0 vl, vr = self.view_range() return vb.mapViewToDevice( QLineF( vl, 0, vr, 0, ) ).length()