Add `ShmArray.ustruct()`: return an unstructured array copy
We return a copy (since since a view doesn't seem to work..) of the (field filtered) shm array contents which is the same index-length as the source data. Further, fence off the resource tracker disable-hack into a helper routine.l1_precision_fix
parent
1657f51edc
commit
b910eceb3b
|
@ -22,7 +22,6 @@ from __future__ import annotations
|
|||
from sys import byteorder
|
||||
from typing import Optional
|
||||
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
|
||||
from multiprocessing import resource_tracker as mantracker
|
||||
|
||||
if _USE_POSIX:
|
||||
from _posixshmem import shm_unlink
|
||||
|
@ -30,6 +29,7 @@ if _USE_POSIX:
|
|||
import tractor
|
||||
import numpy as np
|
||||
from pydantic import BaseModel
|
||||
from numpy.lib import recfunctions as rfn
|
||||
|
||||
from ..log import get_logger
|
||||
from ._source import base_iohlc_dtype
|
||||
|
@ -46,26 +46,33 @@ _default_size = 10 * _secs_in_day
|
|||
_rt_buffer_start = int(9*_secs_in_day)
|
||||
|
||||
|
||||
# Tell the "resource tracker" thing to fuck off.
|
||||
class ManTracker(mantracker.ResourceTracker):
|
||||
def register(self, name, rtype):
|
||||
pass
|
||||
def cuckoff_mantracker():
|
||||
|
||||
def unregister(self, name, rtype):
|
||||
pass
|
||||
from multiprocessing import resource_tracker as mantracker
|
||||
|
||||
def ensure_running(self):
|
||||
pass
|
||||
# Tell the "resource tracker" thing to fuck off.
|
||||
class ManTracker(mantracker.ResourceTracker):
|
||||
def register(self, name, rtype):
|
||||
pass
|
||||
|
||||
def unregister(self, name, rtype):
|
||||
pass
|
||||
|
||||
def ensure_running(self):
|
||||
pass
|
||||
|
||||
|
||||
# "know your land and know your prey"
|
||||
# https://www.dailymotion.com/video/x6ozzco
|
||||
mantracker._resource_tracker = ManTracker()
|
||||
mantracker.register = mantracker._resource_tracker.register
|
||||
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
|
||||
ensure_running = mantracker._resource_tracker.ensure_running
|
||||
mantracker.unregister = mantracker._resource_tracker.unregister
|
||||
mantracker.getfd = mantracker._resource_tracker.getfd
|
||||
# "know your land and know your prey"
|
||||
# https://www.dailymotion.com/video/x6ozzco
|
||||
mantracker._resource_tracker = ManTracker()
|
||||
mantracker.register = mantracker._resource_tracker.register
|
||||
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
|
||||
ensure_running = mantracker._resource_tracker.ensure_running
|
||||
mantracker.unregister = mantracker._resource_tracker.unregister
|
||||
mantracker.getfd = mantracker._resource_tracker.getfd
|
||||
|
||||
|
||||
cuckoff_mantracker()
|
||||
|
||||
|
||||
class SharedInt:
|
||||
|
@ -191,7 +198,11 @@ class ShmArray:
|
|||
self._post_init: bool = False
|
||||
|
||||
# pushing data does not write the index (aka primary key)
|
||||
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
|
||||
dtype = shmarr.dtype
|
||||
if dtype.fields:
|
||||
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
|
||||
else:
|
||||
self._write_fields = None
|
||||
|
||||
# TODO: ringbuf api?
|
||||
|
||||
|
@ -237,6 +248,48 @@ class ShmArray:
|
|||
|
||||
return a
|
||||
|
||||
def ustruct(
|
||||
self,
|
||||
fields: Optional[list[str]] = None,
|
||||
|
||||
# type that all field values will be cast to
|
||||
# in the returned view.
|
||||
common_dtype: np.dtype = np.float,
|
||||
|
||||
) -> np.ndarray:
|
||||
|
||||
array = self._array
|
||||
|
||||
if fields:
|
||||
selection = array[fields]
|
||||
fcount = len(fields)
|
||||
else:
|
||||
selection = array
|
||||
fcount = len(array.dtype.fields)
|
||||
|
||||
# XXX: manual ``.view()`` attempt that also doesn't work.
|
||||
# uview = selection.view(
|
||||
# dtype='<f16',
|
||||
# ).reshape(-1, 4, order='A')
|
||||
|
||||
# assert len(selection) == len(uview)
|
||||
|
||||
u = rfn.structured_to_unstructured(
|
||||
selection,
|
||||
# dtype=float,
|
||||
copy=True,
|
||||
)
|
||||
|
||||
# unstruct = np.ndarray(u.shape, dtype=a.dtype, buffer=shm.buf)
|
||||
# array[:] = a[:]
|
||||
return u
|
||||
# return ShmArray(
|
||||
# shmarr=u,
|
||||
# first=self._first,
|
||||
# last=self._last,
|
||||
# shm=self._shm
|
||||
# )
|
||||
|
||||
def last(
|
||||
self,
|
||||
length: int = 1,
|
||||
|
@ -386,7 +439,11 @@ def open_shm_array(
|
|||
create=True,
|
||||
size=a.nbytes
|
||||
)
|
||||
array = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
|
||||
array = np.ndarray(
|
||||
a.shape,
|
||||
dtype=a.dtype,
|
||||
buffer=shm.buf
|
||||
)
|
||||
array[:] = a[:]
|
||||
array.setflags(write=int(not readonly))
|
||||
|
||||
|
|
Loading…
Reference in New Issue