Disconnect stdlib's resource_tracker, fix .push()

Logic in `SharedArray.push()` was totally wrong.
Remove all the `multiprocessing.resource_tracker` crap such that we
aren't loading an extra process at every layer and we don't get tons of
errors when cleaning on in an SC way.
bar_select
Tyler Goodlet 2020-09-17 09:03:11 -04:00
parent 712e36b9d5
commit 17491ba819
1 changed files with 44 additions and 11 deletions

View File

@ -1,13 +1,36 @@
""" """
NumPy based shared memory for real-time FSP. NumPy compatible shared memory buffers for real-time FSP.
""" """
from sys import byteorder from sys import byteorder
from contextlib import contextmanager from contextlib import contextmanager
from typing import Tuple, Optional from typing import Tuple, Optional
from multiprocessing import shared_memory from multiprocessing import shared_memory
from multiprocessing import resource_tracker as mantracker
from _posixshmem import shm_unlink
import numpy as np import numpy as np
# from numpy.lib import recfunctions as rfn
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
base_ohlc_dtype = np.dtype( base_ohlc_dtype = np.dtype(
@ -88,22 +111,31 @@ class SharedArray:
self, self,
data: np.ndarray, data: np.ndarray,
) -> int: ) -> int:
# push array data and return updated index """Ring buffer like "push" to append data
into the buffer and return updated index.
"""
length = len(data) length = len(data)
self._array[self._i.value:length] = data # TODO: use .index for actual ring logic?
self._i.value += length index = self._i.value
end = index + length
self._array[index:end] = data[:]
self._i.value = end
return end
def close(self) -> None: def close(self) -> None:
self._i._shm.close() self._i._shm.close()
self._shm.close() self._shm.close()
def destroy(self) -> None: def destroy(self) -> None:
self._i._shm.unlink() if shared_memory._USE_POSIX:
self._shm.unlink() # We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._i._shm.name)
shm_unlink(self._shm.name)
# def flush(self) -> None: def flush(self) -> None:
# # flush to storage backend? # TODO: flush to storage backend like markestore?
# ... ...
@contextmanager @contextmanager
@ -155,7 +187,8 @@ def attach_shared_array(
dtype: np.dtype = base_ohlc_dtype, dtype: np.dtype = base_ohlc_dtype,
readonly: bool = True, readonly: bool = True,
) -> SharedArray: ) -> SharedArray:
"""Load and attach to an existing shared memory array. """Load and attach to an existing shared memory array previously
created by another process using ``open_shared_array``.
""" """
array_name, counter_name = token array_name, counter_name = token