Incrementally update flattend OHLC data

After much effort (and exhaustion) but failure to get a view into
our `numpy` OHLC struct-array, this instead allocates an in-thread-memory
array which is updated with flattened data every flow update cycle.

I need to report what I think is a bug to `numpy` core about the whole
view thing not working but, more or less this gets the same behaviour
and minimizes work to flatten the sampled data for line-graphics
drawing thus improving refresh latency when drawing large downsampled
curves.

TL;DR:
- add `ShmArray.ustruct()` to return a **copy of** (since a view doesn't
  work..) the (field filtered) shm array which is the same index-length
  as the source data.
- update the OHLC ds curve with view aware data sliced out from the
  pre-allocated and incrementally updated data (we had to add a last
  index var `._iflat` to track appends - this should be moved into
  a renderer eventually?).
incr_update_backup
Tyler Goodlet 2022-04-21 14:52:30 -04:00
parent b4c7d02fcb
commit 6f5bb9cbe0
2 changed files with 140 additions and 56 deletions

View File

@ -22,7 +22,6 @@ from __future__ import annotations
from sys import byteorder from sys import byteorder
from typing import Optional from typing import Optional
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
from multiprocessing import resource_tracker as mantracker
if _USE_POSIX: if _USE_POSIX:
from _posixshmem import shm_unlink from _posixshmem import shm_unlink
@ -30,6 +29,7 @@ if _USE_POSIX:
import tractor import tractor
import numpy as np import numpy as np
from pydantic import BaseModel from pydantic import BaseModel
from numpy.lib import recfunctions as rfn
from ..log import get_logger from ..log import get_logger
from ._source import base_iohlc_dtype from ._source import base_iohlc_dtype
@ -46,26 +46,33 @@ _default_size = 10 * _secs_in_day
_rt_buffer_start = int(9*_secs_in_day) _rt_buffer_start = int(9*_secs_in_day)
# Tell the "resource tracker" thing to fuck off. def cuckoff_mantracker():
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype): from multiprocessing import resource_tracker as mantracker
pass
def ensure_running(self): # Tell the "resource tracker" thing to fuck off.
pass class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey" # "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco # https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker() mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running mantracker.ensure_running = mantracker._resource_tracker.ensure_running
ensure_running = mantracker._resource_tracker.ensure_running ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd mantracker.getfd = mantracker._resource_tracker.getfd
cuckoff_mantracker()
class SharedInt: class SharedInt:
@ -191,7 +198,11 @@ class ShmArray:
self._post_init: bool = False self._post_init: bool = False
# pushing data does not write the index (aka primary key) # pushing data does not write the index (aka primary key)
self._write_fields = list(shmarr.dtype.fields.keys())[1:] dtype = shmarr.dtype
if dtype.fields:
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
else:
self._write_fields = None
# TODO: ringbuf api? # TODO: ringbuf api?
@ -237,6 +248,48 @@ class ShmArray:
return a return a
def ustruct(
self,
fields: Optional[list[str]] = None,
# type that all field values will be cast to
# in the returned view.
common_dtype: np.dtype = np.float,
) -> np.ndarray:
array = self._array
if fields:
selection = array[fields]
fcount = len(fields)
else:
selection = array
fcount = len(array.dtype.fields)
# XXX: manual ``.view()`` attempt that also doesn't work.
# uview = selection.view(
# dtype='<f16',
# ).reshape(-1, 4, order='A')
# assert len(selection) == len(uview)
u = rfn.structured_to_unstructured(
selection,
# dtype=float,
copy=True,
)
# unstruct = np.ndarray(u.shape, dtype=a.dtype, buffer=shm.buf)
# array[:] = a[:]
return u
# return ShmArray(
# shmarr=u,
# first=self._first,
# last=self._last,
# shm=self._shm
# )
def last( def last(
self, self,
length: int = 1, length: int = 1,
@ -386,7 +439,11 @@ def open_shm_array(
create=True, create=True,
size=a.nbytes size=a.nbytes
) )
array = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) array = np.ndarray(
a.shape,
dtype=a.dtype,
buffer=shm.buf
)
array[:] = a[:] array[:] = a[:]
array.setflags(write=int(not readonly)) array.setflags(write=int(not readonly))

View File

@ -24,7 +24,6 @@ incremental update.
''' '''
from __future__ import annotations from __future__ import annotations
from functools import partial from functools import partial
import time
from typing import ( from typing import (
Optional, Optional,
Callable, Callable,
@ -38,7 +37,7 @@ from PyQt5.QtGui import QPainterPath
from ..data._sharedmem import ( from ..data._sharedmem import (
ShmArray, ShmArray,
# attach_shm_array open_shm_array,
) )
from .._profile import pg_profile_enabled, ms_slower_then from .._profile import pg_profile_enabled, ms_slower_then
from ._ohlc import ( from ._ohlc import (
@ -152,6 +151,7 @@ class Flow(msgspec.Struct): # , frozen=True):
render: bool = True # toggle for display loop render: bool = True # toggle for display loop
flat: Optional[ShmArray] = None flat: Optional[ShmArray] = None
x_basis: Optional[np.ndarray] = None x_basis: Optional[np.ndarray] = None
_iflat: int = 0
_last_uppx: float = 0 _last_uppx: float = 0
_in_ds: bool = False _in_ds: bool = False
@ -344,6 +344,7 @@ class Flow(msgspec.Struct): # , frozen=True):
graphics = self.graphics graphics = self.graphics
if isinstance(graphics, BarItems): if isinstance(graphics, BarItems):
fields = ['open', 'high', 'low', 'close']
# if no source data renderer exists create one. # if no source data renderer exists create one.
r = self._src_r r = self._src_r
if not r: if not r:
@ -357,17 +358,37 @@ class Flow(msgspec.Struct): # , frozen=True):
# create a flattened view onto the OHLC array # create a flattened view onto the OHLC array
# which can be read as a line-style format # which can be read as a line-style format
# shm = self.shm shm = self.shm
# self.flat = shm.unstruct_view(['open', 'high', 'low', 'close'])
# flat = self.flat = self.shm.unstruct_view(fields)
self.flat = self.shm.ustruct(fields)
self._iflat = self.shm._last.value
# import pdbpp # import pdbpp
# pdbpp.set_trace() # pdbpp.set_trace()
# x = self.x_basis = ( # assert len(flat._array) == len(self.shm._array[fields])
# np.broadcast_to(
# shm._array['index'][:, None], x = self.x_basis = (
# # self.flat._array.shape, np.broadcast_to(
# self.flat.shape, shm._array['index'][:, None],
# ) + np.array([-0.5, 0, 0, 0.5]) (
shm._array.size,
# 4, # only ohlc
self.flat.shape[1],
),
) + np.array([-0.5, 0, 0, 0.5])
)
# fshm = self.flat = open_shm_array(
# f'{self.name}_flat',
# dtype=flattened.dtype,
# size=flattened.size,
# ) # )
# fshm.push(flattened)
# print(f'unstruct diff: {time.time() - start}')
# profiler('read unstr view bars to line')
# start = self.flat._first.value
ds_curve_r = Renderer( ds_curve_r = Renderer(
flow=self, flow=self,
@ -407,7 +428,7 @@ class Flow(msgspec.Struct): # , frozen=True):
# - if we're **not** downsampling then we simply want to # - if we're **not** downsampling then we simply want to
# render the bars graphics curve and update.. # render the bars graphics curve and update..
# - if insteam we are in a downsamplig state then we to # - if insteam we are in a downsamplig state then we to
x_gt = 8 x_gt = 6
uppx = curve.x_uppx() uppx = curve.x_uppx()
in_line = should_line = curve.isVisible() in_line = should_line = curve.isVisible()
if ( if (
@ -426,37 +447,43 @@ class Flow(msgspec.Struct): # , frozen=True):
# do graphics updates # do graphics updates
if should_line: if should_line:
# start = time.time()
# y = self.shm.unstruct_view(
# ['open', 'high', 'low', 'close'],
# )
# print(f'unstruct diff: {time.time() - start}')
# profiler('read unstr view bars to line')
# # start = self.flat._first.value
# x = self.x_basis[:y.size].flatten() # update flatted ohlc copy
# y = y.flatten() iflat, ishm = self._iflat, self.shm._last.value
# profiler('flattening bars to line') to_update = rfn.structured_to_unstructured(
# path, last = dsc_r.render(read) self.shm._array[iflat:ishm][fields]
# x, flat = ohlc_flat_view( )
# ohlc_shm=self.shm,
# x_basis=x_basis,
# )
# y = y.flatten()
# y_iv = y[ivl:ivr].flatten()
# x_iv = x[ivl:ivr].flatten()
# assert y.size == x.size
x, y = self.flat = ohlc_flatten(array) # print(to_update)
x_iv, y_iv = ohlc_flatten(in_view) self.flat[iflat:ishm][:] = to_update
profiler('flattened OHLC data') profiler('updated ustruct OHLC data')
y_flat = self.flat[:ishm]
x_flat = self.x_basis[:ishm]
self._iflat = ishm
y = y_flat.reshape(-1)
x = x_flat.reshape(-1)
profiler('flattened ustruct OHLC data')
y_iv_flat = y_flat[ivl:ivr]
x_iv_flat = x_flat[ivl:ivr]
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
profiler('flattened ustruct in-view OHLC data')
# x, y = ohlc_flatten(array)
# x_iv, y_iv = ohlc_flatten(in_view)
# profiler('flattened OHLC data')
curve.update_from_array( curve.update_from_array(
x, x,
y, y,
x_iv=x_iv, x_iv=x_iv,
y_iv=y_iv, y_iv=y_iv,
view_range=None, # hack view_range=(ivl, ivr), # hack
profiler=profiler, profiler=profiler,
) )
profiler('updated ds curve') profiler('updated ds curve')
@ -477,7 +504,7 @@ class Flow(msgspec.Struct): # , frozen=True):
# graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache) # graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) # graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
graphics.prepareGeometryChange() # graphics.prepareGeometryChange()
graphics.update() graphics.update()
if ( if (
@ -632,7 +659,7 @@ class Renderer(msgspec.Struct):
''' '''
# do full source data render to path # do full source data render to path
last_read = ( (
xfirst, xlast, array, xfirst, xlast, array,
ivl, ivr, in_view, ivl, ivr, in_view,
) = self.last_read ) = self.last_read