Sync shm mod with dev version in `tractor`, drop buffer sizing vars, require `size: int` to all allocators

basic_buy_bot
Tyler Goodlet 2023-06-15 12:59:50 -04:00
parent e1be098406
commit 33ec27715b
1 changed files with 32 additions and 24 deletions

View File

@ -37,15 +37,6 @@ from ._source import def_iohlcv_fields
from .types import Struct
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 24)
# we try for a buncha times, but only on a run-every-other-day kinda week.
_days_worth = 16
_default_size = _days_worth * _secs_in_day
# where to start the new data append index
_rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
def cuckoff_mantracker():
'''
Disable all ``multiprocessing``` "resource tracking" machinery since
@ -70,7 +61,6 @@ def cuckoff_mantracker():
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
# ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
@ -442,10 +432,10 @@ class ShmArray:
def open_shm_array(
key: Optional[str] = None,
size: int = _default_size, # see above
dtype: Optional[np.dtype] = None,
size: int,
key: str | None = None,
dtype: np.dtype | None = None,
append_start_index: int | None = None,
readonly: bool = False,
) -> ShmArray:
@ -510,10 +500,13 @@ def open_shm_array(
# ``ShmArray._start.value: int = 0`` and the yet-to-be written
# real-time section will start at ``ShmArray.index: int``.
# this sets the index to 3/4 of the length of the buffer
# leaving a "days worth of second samples" for the real-time
# section.
last.value = first.value = _rt_buffer_start
# this sets the index to nearly 2/3rds into the the length of
# the buffer leaving at least a "days worth of second samples"
# for the real-time section.
if append_start_index is None:
append_start_index = round(size * 0.616)
last.value = first.value = append_start_index
shmarr = ShmArray(
array,
@ -527,7 +520,6 @@ def open_shm_array(
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
stack = tractor.current_actor().lifetime_stack
stack.callback(shmarr.close)
stack.callback(shmarr.destroy)
@ -622,7 +614,10 @@ def attach_shm_array(
def maybe_open_shm_array(
key: str,
dtype: Optional[np.dtype] = None,
size: int,
dtype: np.dtype | None = None,
append_start_index: int | None = None,
readonly: bool = False,
**kwargs,
) -> tuple[ShmArray, bool]:
@ -643,11 +638,16 @@ def maybe_open_shm_array(
use ``attach_shm_array``.
'''
size = kwargs.pop('size', _default_size)
try:
# see if we already know this key
token = _known_tokens[key]
return attach_shm_array(token=token, **kwargs), False
return (
attach_shm_array(
token=token,
readonly=readonly,
),
False,
)
except KeyError:
log.debug(f"Could not find {key} in shms cache")
if dtype:
@ -666,8 +666,16 @@ def maybe_open_shm_array(
# Attempt to open a block and expect
# to fail if a block has been allocated
# on the OS by someone else.
return open_shm_array(key=key, dtype=dtype, **kwargs), True
return (
open_shm_array(
key=key,
size=size,
dtype=dtype,
append_start_index=append_start_index,
readonly=readonly,
),
True,
)
def try_read(
array: np.ndarray