Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 9bc7de0cb3 Add notes about how to do mkts "trimming"
Which is basically just "deleting" rows from a column series.
You can only use the trim command from the `.cmd` cli and only with a so
called `LocalClient` currently; it's also sketchy af and caused
a machine to hang due to mem usage..

Ideally we can patch in this functionality for use by the rpc api
and have it not hang like this XD

Pertains to https://github.com/alpacahq/marketstore/issues/264
2022-05-16 14:31:23 -04:00
Tyler Goodlet 5c294f5ed4 Make vlm a float; discrete is so 80s 2022-05-16 14:31:04 -04:00
Tyler Goodlet b2833896a6 Fix output unpack 2022-05-16 08:12:07 -04:00
Tyler Goodlet 78defa00ec Drop old non-working flatten routine 2022-05-15 17:06:52 -04:00
Tyler Goodlet 3cb6b7221c Factor step format data gen into `to_step_format()`
Yet another path ops routine which converts a 1d array into a data
format suitable for rendering a "step curve" graphics path (aka a "bar
graph" but implemented as a continuous line).

Also, factor the `BarItems` rendering logic (which determines whether to
render the literal bars lines or a downsampled curve) into a routine
`render_baritems()` until we figure out the right abstraction layer for
it.
2022-05-15 16:54:50 -04:00
Tyler Goodlet 06d3cadcc0 Factor ohlc to line data conversion into `._pathops.ohlc_to_line()` 2022-05-15 15:45:06 -04:00
Tyler Goodlet fdd4255246 Drop commented `numba` imports 2022-05-15 15:44:19 -04:00
Tyler Goodlet e9c244ccd0 Move ohlc lines-curve generators into pathops mod 2022-05-15 15:21:25 -04:00
Tyler Goodlet ce68e612de Add `.ui._pathops` module
Starts a module for grouping together all our `QPainterpath` related
generation and data format operations for creation of fast curve
graphics. To start, drops `FastAppendCurve.downsample()` and moves
it to a new `._pathops.xy_downsample()`.
2022-05-15 15:15:14 -04:00
Tyler Goodlet 3bf907c10f Rename `._ohlc.gen_qpath()` -> `.gen_ohlc_qpath()` 2022-05-15 14:30:13 -04:00
7 changed files with 534 additions and 470 deletions

View File

@ -1791,7 +1791,7 @@ async def _setup_quote_stream(
to_trio.send_nowait(None) to_trio.send_nowait(None)
async with load_aio_clients() as accts2clients: async with load_aio_clients() as accts2clients:
client = get_preferred_data_client(accts2clients) caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))

View File

@ -33,7 +33,7 @@ ohlc_fields = [
('high', float), ('high', float),
('low', float), ('low', float),
('close', float), ('close', float),
('volume', int), ('volume', float),
('bar_wap', float), ('bar_wap', float),
] ]

View File

@ -230,8 +230,8 @@ _ohlcv_dt = [
# ohlcv sampling # ohlcv sampling
('Open', 'f4'), ('Open', 'f4'),
('High', 'f4'), ('High', 'f4'),
('Low', 'i8'), ('Low', 'f4'),
('Close', 'i8'), ('Close', 'f4'),
('Volume', 'f4'), ('Volume', 'f4'),
] ]
@ -547,6 +547,17 @@ class Storage:
if err: if err:
raise MarketStoreError(err) raise MarketStoreError(err)
# XXX: currently the only way to do this is through the CLI:
# sudo ./marketstore connect --dir ~/.config/piker/data
# >> \show mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15
# and this seems to block and use up mem..
# >> \trim mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15
# relevant source code for this is here:
# https://github.com/alpacahq/marketstore/blob/master/cmd/connect/session/trim.go#L14
# def delete_range(self, start_dt, end_dt) -> None:
# ...
@acm @acm
async def open_storage_client( async def open_storage_client(

View File

@ -34,10 +34,11 @@ from PyQt5.QtCore import (
from .._profile import pg_profile_enabled, ms_slower_then from .._profile import pg_profile_enabled, ms_slower_then
from ._style import hcolor from ._style import hcolor
from ._compression import ( # from ._compression import (
# ohlc_to_m4_line, # # ohlc_to_m4_line,
ds_m4, # ds_m4,
) # )
from ._pathops import xy_downsample
from ..log import get_logger from ..log import get_logger
@ -174,32 +175,6 @@ class FastAppendCurve(pg.GraphicsObject):
QLineF(lbar, 0, rbar, 0) QLineF(lbar, 0, rbar, 0)
).length() ).length()
def downsample(
self,
x,
y,
px_width,
uppx,
) -> tuple[np.ndarray, np.ndarray]:
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y = ds_m4(
x,
y,
px_width=px_width,
uppx=uppx,
# log_scale=bool(uppx)
)
x = np.broadcast_to(x[:, None], y.shape)
# x = (x + np.array([-0.43, 0, 0, 0.43])).flatten()
x = (x + np.array([-0.5, 0, 0, 0.5])).flatten()
y = y.flatten()
return x, y
def update_from_array( def update_from_array(
self, self,
@ -396,7 +371,8 @@ class FastAppendCurve(pg.GraphicsObject):
self._in_ds = False self._in_ds = False
elif should_ds and uppx and px_width > 1: elif should_ds and uppx and px_width > 1:
x_out, y_out = self.downsample(
x_out, y_out = xy_downsample(
x_out, x_out,
y_out, y_out,
px_width, px_width,
@ -461,7 +437,7 @@ class FastAppendCurve(pg.GraphicsObject):
) )
# if should_ds: # if should_ds:
# new_x, new_y = self.downsample( # new_x, new_y = xy_downsample(
# new_x, # new_x,
# new_y, # new_y,
# px_width, # px_width,

View File

@ -48,19 +48,18 @@ from ..data._sharedmem import (
) )
from .._profile import ( from .._profile import (
pg_profile_enabled, pg_profile_enabled,
ms_slower_then, # ms_slower_then,
)
from ._pathops import (
gen_ohlc_qpath,
ohlc_to_line,
to_step_format,
) )
from ._ohlc import ( from ._ohlc import (
BarItems, BarItems,
gen_qpath,
) )
from ._curve import ( from ._curve import (
FastAppendCurve, FastAppendCurve,
# step_path_arrays_from_1d,
)
from ._compression import (
# ohlc_flatten,
ds_m4,
) )
from ..log import get_logger from ..log import get_logger
@ -114,32 +113,241 @@ def rowarr_to_path(
) )
def mk_ohlc_flat_copy( def render_baritems(
ohlc_shm: ShmArray, flow: Flow,
graphics: BarItems,
read: tuple[
int, int, np.ndarray,
int, int, np.ndarray,
],
profiler: pg.debug.Profiler,
**kwargs,
# XXX: we bind this in currently.. ) -> None:
# x_basis: np.ndarray,
# vr: Optional[slice] = None,
) -> tuple[np.ndarray, np.ndarray]:
''' '''
Return flattened-non-copy view into an OHLC shm array. Graphics management logic for a ``BarItems`` object.
Mostly just logic to determine when and how to downsample an OHLC
lines curve into a flattened line graphic and when to display one
graphic or the other.
TODO: this should likely be moved into some kind of better abstraction
layer, if not a `Renderer` then something just above it?
''' '''
ohlc = ohlc_shm._array[['open', 'high', 'low', 'close']] (
# if vr: xfirst, xlast, array,
# ohlc = ohlc[vr] ivl, ivr, in_view,
# x = x_basis[vr] ) = read
unstructured = rfn.structured_to_unstructured( # if no source data renderer exists create one.
ohlc, self = flow
copy=False, r = self._src_r
) if not r:
# breakpoint() # OHLC bars path renderer
y = unstructured.flatten() r = self._src_r = Renderer(
# x = x_basis[:y.size] flow=self,
return y # TODO: rename this to something with ohlc
draw_path=gen_ohlc_qpath,
last_read=read,
)
ds_curve_r = Renderer(
flow=self,
# just swap in the flat view
# data_t=lambda array: self.gy.array,
last_read=read,
draw_path=partial(
rowarr_to_path,
x_basis=None,
),
)
curve = FastAppendCurve(
name='OHLC',
color=graphics._color,
)
curve.hide()
self.plot.addItem(curve)
# baseline "line" downsampled OHLC curve that should
# kick on only when we reach a certain uppx threshold.
self._render_table[0] = (
ds_curve_r,
curve,
)
dsc_r, curve = self._render_table[0]
# do checks for whether or not we require downsampling:
# - if we're **not** downsampling then we simply want to
# render the bars graphics curve and update..
# - if insteam we are in a downsamplig state then we to
x_gt = 6
uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
if (
should_line
and uppx < x_gt
):
print('FLIPPING TO BARS')
should_line = False
elif (
not should_line
and uppx >= x_gt
):
print('FLIPPING TO LINE')
should_line = True
profiler(f'ds logic complete line={should_line}')
# do graphics updates
if should_line:
fields = ['open', 'high', 'low', 'close']
if self.gy is None:
# create a flattened view onto the OHLC array
# which can be read as a line-style format
shm = self.shm
(
self._iflat_first,
self._iflat_last,
self.gx,
self.gy,
) = ohlc_to_line(
shm,
fields=fields,
)
# print(f'unstruct diff: {time.time() - start}')
gy = self.gy
# update flatted ohlc copy
(
iflat_first,
iflat,
ishm_last,
ishm_first,
) = (
self._iflat_first,
self._iflat_last,
self.shm._last.value,
self.shm._first.value
)
# check for shm prepend updates since last read.
if iflat_first != ishm_first:
# write newly prepended data to flattened copy
gy[
ishm_first:iflat_first
] = rfn.structured_to_unstructured(
self.shm._array[fields][ishm_first:iflat_first]
)
self._iflat_first = ishm_first
to_update = rfn.structured_to_unstructured(
self.shm._array[iflat:ishm_last][fields]
)
gy[iflat:ishm_last][:] = to_update
profiler('updated ustruct OHLC data')
# slice out up-to-last step contents
y_flat = gy[ishm_first:ishm_last]
x_flat = self.gx[ishm_first:ishm_last]
# update local last-index tracking
self._iflat_last = ishm_last
# reshape to 1d for graphics rendering
y = y_flat.reshape(-1)
x = x_flat.reshape(-1)
profiler('flattened ustruct OHLC data')
# do all the same for only in-view data
y_iv_flat = y_flat[ivl:ivr]
x_iv_flat = x_flat[ivl:ivr]
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
profiler('flattened ustruct in-view OHLC data')
# pass into curve graphics processing
curve.update_from_array(
x,
y,
x_iv=x_iv,
y_iv=y_iv,
view_range=(ivl, ivr), # hack
profiler=profiler,
# should_redraw=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs,
)
curve.show()
profiler('updated ds curve')
else:
# render incremental or in-view update
# and apply ouput (path) to graphics.
path, last = r.render(
read,
only_in_view=True,
)
graphics.path = path
graphics.draw_last(last)
# NOTE: on appends we used to have to flip the coords
# cache thought it doesn't seem to be required any more?
# graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
# graphics.prepareGeometryChange()
graphics.update()
if (
not in_line
and should_line
):
# change to line graphic
log.info(
f'downsampling to line graphic {self.name}'
)
graphics.hide()
# graphics.update()
curve.show()
curve.update()
elif in_line and not should_line:
log.info(f'showing bars graphic {self.name}')
curve.hide()
graphics.show()
graphics.update()
# update our pre-downsample-ready data and then pass that
# new data the downsampler algo for incremental update.
# graphics.update_from_array(
# array,
# in_view,
# view_range=(ivl, ivr) if use_vr else None,
# **kwargs,
# )
# generate and apply path to graphics obj
# graphics.path, last = r.render(
# read,
# only_in_view=True,
# )
# graphics.draw_last(last)
class Flow(msgspec.Struct): # , frozen=True): class Flow(msgspec.Struct): # , frozen=True):
@ -357,271 +565,30 @@ class Flow(msgspec.Struct): # , frozen=True):
graphics = self.graphics graphics = self.graphics
if isinstance(graphics, BarItems): if isinstance(graphics, BarItems):
render_baritems(
# if no source data renderer exists create one. self,
r = self._src_r graphics,
if not r: read,
# OHLC bars path renderer profiler,
r = self._src_r = Renderer( **kwargs,
flow=self, )
# TODO: rename this to something with ohlc
draw_path=gen_qpath,
last_read=read,
)
ds_curve_r = Renderer(
flow=self,
# just swap in the flat view
# data_t=lambda array: self.gy.array,
last_read=read,
draw_path=partial(
rowarr_to_path,
x_basis=None,
),
)
curve = FastAppendCurve(
name='OHLC',
color=graphics._color,
)
curve.hide()
self.plot.addItem(curve)
# baseline "line" downsampled OHLC curve that should
# kick on only when we reach a certain uppx threshold.
self._render_table[0] = (
ds_curve_r,
curve,
)
dsc_r, curve = self._render_table[0]
# do checks for whether or not we require downsampling:
# - if we're **not** downsampling then we simply want to
# render the bars graphics curve and update..
# - if insteam we are in a downsamplig state then we to
x_gt = 6
uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
if (
should_line
and uppx < x_gt
):
print('FLIPPING TO BARS')
should_line = False
elif (
not should_line
and uppx >= x_gt
):
print('FLIPPING TO LINE')
should_line = True
profiler(f'ds logic complete line={should_line}')
# do graphics updates
if should_line:
fields = ['open', 'high', 'low', 'close']
if self.gy is None:
# create a flattened view onto the OHLC array
# which can be read as a line-style format
shm = self.shm
# flat = self.gy = self.shm.unstruct_view(fields)
self.gy = self.shm.ustruct(fields)
first = self._iflat_first = self.shm._first.value
last = self._iflat_last = self.shm._last.value
# write pushed data to flattened copy
self.gy[first:last] = rfn.structured_to_unstructured(
self.shm.array[fields]
)
# generate an flat-interpolated x-domain
self.gx = (
np.broadcast_to(
shm._array['index'][:, None],
(
shm._array.size,
# 4, # only ohlc
self.gy.shape[1],
),
) + np.array([-0.5, 0, 0, 0.5])
)
assert self.gy.any()
# print(f'unstruct diff: {time.time() - start}')
# profiler('read unstr view bars to line')
# start = self.gy._first.value
# update flatted ohlc copy
(
iflat_first,
iflat,
ishm_last,
ishm_first,
) = (
self._iflat_first,
self._iflat_last,
self.shm._last.value,
self.shm._first.value
)
# check for shm prepend updates since last read.
if iflat_first != ishm_first:
# write newly prepended data to flattened copy
self.gy[
ishm_first:iflat_first
] = rfn.structured_to_unstructured(
self.shm._array[fields][ishm_first:iflat_first]
)
self._iflat_first = ishm_first
# # flat = self.gy = self.shm.unstruct_view(fields)
# self.gy = self.shm.ustruct(fields)
# # self._iflat_last = self.shm._last.value
# # self._iflat_first = self.shm._first.value
# # do an update for the most recent prepend
# # index
# iflat = ishm_first
to_update = rfn.structured_to_unstructured(
self.shm._array[iflat:ishm_last][fields]
)
self.gy[iflat:ishm_last][:] = to_update
profiler('updated ustruct OHLC data')
# slice out up-to-last step contents
y_flat = self.gy[ishm_first:ishm_last]
x_flat = self.gx[ishm_first:ishm_last]
# update local last-index tracking
self._iflat_last = ishm_last
# reshape to 1d for graphics rendering
y = y_flat.reshape(-1)
x = x_flat.reshape(-1)
profiler('flattened ustruct OHLC data')
# do all the same for only in-view data
y_iv_flat = y_flat[ivl:ivr]
x_iv_flat = x_flat[ivl:ivr]
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
profiler('flattened ustruct in-view OHLC data')
# legacy full-recompute-everytime method
# x, y = ohlc_flatten(array)
# x_iv, y_iv = ohlc_flatten(in_view)
# profiler('flattened OHLC data')
curve.update_from_array(
x,
y,
x_iv=x_iv,
y_iv=y_iv,
view_range=(ivl, ivr), # hack
profiler=profiler,
# should_redraw=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs,
)
curve.show()
profiler('updated ds curve')
else:
# render incremental or in-view update
# and apply ouput (path) to graphics.
path, last = r.render(
read,
only_in_view=True,
)
graphics.path = path
graphics.draw_last(last)
# NOTE: on appends we used to have to flip the coords
# cache thought it doesn't seem to be required any more?
# graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
# graphics.prepareGeometryChange()
graphics.update()
if (
not in_line
and should_line
):
# change to line graphic
log.info(
f'downsampling to line graphic {self.name}'
)
graphics.hide()
# graphics.update()
curve.show()
curve.update()
elif in_line and not should_line:
log.info(f'showing bars graphic {self.name}')
curve.hide()
graphics.show()
graphics.update()
# update our pre-downsample-ready data and then pass that
# new data the downsampler algo for incremental update.
# graphics.update_from_array(
# array,
# in_view,
# view_range=(ivl, ivr) if use_vr else None,
# **kwargs,
# )
# generate and apply path to graphics obj
# graphics.path, last = r.render(
# read,
# only_in_view=True,
# )
# graphics.draw_last(last)
else: else:
# ``FastAppendCurve`` case: # ``FastAppendCurve`` case:
array_key = array_key or self.name array_key = array_key or self.name
uppx = graphics.x_uppx() uppx = graphics.x_uppx()
profiler('read uppx') profiler(f'read uppx {uppx}')
if graphics._step_mode and self.gy is None: if graphics._step_mode and self.gy is None:
self._iflat_first = self.shm._first.value
# create a flattened view onto the OHLC array
# which can be read as a line-style format
shm = self.shm shm = self.shm
(
# fields = ['index', array_key] self._iflat_first,
i = shm._array['index'].copy() self.gx,
out = shm._array[array_key].copy() self.gy,
) = to_step_format(
self.gx = np.broadcast_to( shm,
i[:, None], array_key,
(i.size, 2), )
) + np.array([-0.5, 0.5])
# self.gy = np.broadcast_to(
# out[:, None], (out.size, 2),
# )
self.gy = np.empty((len(out), 2), dtype=out.dtype)
self.gy[:] = out[:, np.newaxis]
# start y at origin level
self.gy[0, 0] = 0
profiler('generated step mode data') profiler('generated step mode data')
if graphics._step_mode: if graphics._step_mode:
@ -784,42 +751,11 @@ class Flow(msgspec.Struct): # , frozen=True):
profiler=profiler, profiler=profiler,
**kwargs **kwargs
) )
profiler(f'`graphics.update_from_array()` complete') profiler('`graphics.update_from_array()` complete')
return graphics return graphics
def xy_downsample(
x,
y,
px_width,
uppx,
x_spacer: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y = ds_m4(
x,
y,
px_width=px_width,
uppx=uppx,
log_scale=bool(uppx)
)
# flatten output to 1d arrays suitable for path-graphics generation.
x = np.broadcast_to(x[:, None], y.shape)
x = (x + np.array(
[-x_spacer, 0, 0, x_spacer]
)).flatten()
y = y.flatten()
return x, y
class Renderer(msgspec.Struct): class Renderer(msgspec.Struct):
flow: Flow flow: Flow

View File

@ -25,17 +25,15 @@ from typing import (
import numpy as np import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
from numba import njit, float64, int64 # , optional
from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QLineF, QPointF from PyQt5.QtCore import QLineF, QPointF
# from numba import types as ntypes
# from ..data._source import numba_ohlc_dtype
from .._profile import pg_profile_enabled, ms_slower_then from .._profile import pg_profile_enabled, ms_slower_then
from ._style import hcolor from ._style import hcolor
from ..log import get_logger from ..log import get_logger
from ._curve import FastAppendCurve from ._curve import FastAppendCurve
from ._compression import ohlc_flatten from ._compression import ohlc_flatten
from ._pathops import gen_ohlc_qpath
if TYPE_CHECKING: if TYPE_CHECKING:
from ._chart import LinkedSplits from ._chart import LinkedSplits
@ -84,119 +82,6 @@ def bar_from_ohlc_row(
return [hl, o, c] return [hl, o, c]
@njit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# ntypes.tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
# ),
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_gap: float64 = 0.43,
) -> np.ndarray:
'''
Generate an array of lines objects from input ohlc data.
'''
size = int(data.shape[0] * 6)
x = np.zeros(
# data,
shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
# TODO: report bug for assert @
# /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
for i, q in enumerate(data[start:], start):
# TODO: ask numba why this doesn't work..
# open, high, low, close, index = q[
# ['open', 'high', 'low', 'close', 'index']]
open = q['open']
high = q['high']
low = q['low']
close = q['close']
index = float64(q['index'])
istart = i * 6
istop = istart + 6
# x,y detail the 6 points which connect all vertexes of a ohlc bar
x[istart:istop] = (
index - bar_gap,
index,
index,
index,
index,
index + bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (1, 1, 1, 1, 1, 0)
return x, y, c
def gen_qpath(
data: np.ndarray,
start: int = 0, # XXX: do we need this?
# 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43,
path: Optional[QtGui.QPainterPath] = None,
) -> QtGui.QPainterPath:
path_was_none = path is None
profiler = pg.debug.Profiler(
msg='gen_qpath ohlc',
disabled=not pg_profile_enabled(),
ms_threshold=ms_slower_then,
)
x, y, c = path_arrays_from_ohlc(
data,
start,
bar_gap=w,
)
profiler("generate stream with numba")
# TODO: numba the internals of this!
path = pg.functions.arrayToQPath(
x,
y,
connect=c,
path=path,
)
# avoid mem allocs if possible
if path_was_none:
path.reserve(path.capacity())
profiler("generate path with arrayToQPath")
return path
class BarItems(pg.GraphicsObject): class BarItems(pg.GraphicsObject):
''' '''
"Price range" bars graphics rendered from a OHLC sampled sequence. "Price range" bars graphics rendered from a OHLC sampled sequence.
@ -274,7 +159,7 @@ class BarItems(pg.GraphicsObject):
''' '''
hist, last = ohlc[:-1], ohlc[-1] hist, last = ohlc[:-1], ohlc[-1]
self.path = gen_qpath(hist, start, self.w) self.path = gen_ohlc_qpath(hist, start, self.w)
# save graphics for later reference and keep track # save graphics for later reference and keep track
# of current internal "last index" # of current internal "last index"

View File

@ -0,0 +1,256 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Super fast ``QPainterPath`` generation related operator routines.
"""
from typing import (
Optional,
)
import numpy as np
from numpy.lib import recfunctions as rfn
from numba import njit, float64, int64 # , optional
import pyqtgraph as pg
from PyQt5 import QtGui
# from PyQt5.QtCore import QLineF, QPointF
from ..data._sharedmem import (
ShmArray,
)
from .._profile import pg_profile_enabled, ms_slower_then
from ._compression import (
# ohlc_flatten,
ds_m4,
)
def xy_downsample(
x,
y,
px_width,
uppx,
x_spacer: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y = ds_m4(
x,
y,
px_width=px_width,
uppx=uppx,
# log_scale=bool(uppx)
)
# flatten output to 1d arrays suitable for path-graphics generation.
x = np.broadcast_to(x[:, None], y.shape)
x = (x + np.array(
[-x_spacer, 0, 0, x_spacer]
)).flatten()
y = y.flatten()
return x, y
@njit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# ntypes.tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
# ),
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_gap: float64 = 0.43,
) -> np.ndarray:
'''
Generate an array of lines objects from input ohlc data.
'''
size = int(data.shape[0] * 6)
x = np.zeros(
# data,
shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
# TODO: report bug for assert @
# /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
for i, q in enumerate(data[start:], start):
# TODO: ask numba why this doesn't work..
# open, high, low, close, index = q[
# ['open', 'high', 'low', 'close', 'index']]
open = q['open']
high = q['high']
low = q['low']
close = q['close']
index = float64(q['index'])
istart = i * 6
istop = istart + 6
# x,y detail the 6 points which connect all vertexes of a ohlc bar
x[istart:istop] = (
index - bar_gap,
index,
index,
index,
index,
index + bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (1, 1, 1, 1, 1, 0)
return x, y, c
def gen_ohlc_qpath(
data: np.ndarray,
start: int = 0, # XXX: do we need this?
# 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43,
path: Optional[QtGui.QPainterPath] = None,
) -> QtGui.QPainterPath:
path_was_none = path is None
profiler = pg.debug.Profiler(
msg='gen_qpath ohlc',
disabled=not pg_profile_enabled(),
ms_threshold=ms_slower_then,
)
x, y, c = path_arrays_from_ohlc(
data,
start,
bar_gap=w,
)
profiler("generate stream with numba")
# TODO: numba the internals of this!
path = pg.functions.arrayToQPath(
x,
y,
connect=c,
path=path,
)
# avoid mem allocs if possible
if path_was_none:
path.reserve(path.capacity())
profiler("generate path with arrayToQPath")
return path
def ohlc_to_line(
ohlc_shm: ShmArray,
fields: list[str] = ['open', 'high', 'low', 'close']
) -> tuple[
int, # flattened first index
int, # flattened last index
np.ndarray,
np.ndarray,
]:
'''
Convert an input struct-array holding OHLC samples into a pair of
flattened x, y arrays with the same size (datums wise) as the source
data.
'''
y_out = ohlc_shm.ustruct(fields)
first = ohlc_shm._first.value
last = ohlc_shm._last.value
# write pushed data to flattened copy
y_out[first:last] = rfn.structured_to_unstructured(
ohlc_shm.array[fields]
)
# generate an flat-interpolated x-domain
x_out = (
np.broadcast_to(
ohlc_shm._array['index'][:, None],
(
ohlc_shm._array.size,
# 4, # only ohlc
y_out.shape[1],
),
) + np.array([-0.5, 0, 0, 0.5])
)
assert y_out.any()
return (
first,
last,
x_out,
y_out,
)
def to_step_format(
shm: ShmArray,
data_field: str,
index_field: str = 'index',
) -> tuple[int, np.ndarray, np.ndarray]:
'''
Convert an input 1d shm array to a "step array" format
for use by path graphics generation.
'''
first = shm._first.value
i = shm._array['index'].copy()
out = shm._array[data_field].copy()
x_out = np.broadcast_to(
i[:, None],
(i.size, 2),
) + np.array([-0.5, 0.5])
y_out = np.empty((len(out), 2), dtype=out.dtype)
y_out[:] = out[:, np.newaxis]
# start y at origin level
y_out[0, 0] = 0
return first, x_out, y_out