Define and pass a default `Flume._readonly: bool`

Allows opening with `.from_msg(readonly=False)` for write permissions
making underlyig shm arrays readonly. Also, make sure to pop the
`ShmArray` field entries prior to msg-ization, not sure how that worked
with the `Feed.flumes` equivalent..but?
distribute_dis
Tyler Goodlet 2023-12-06 17:25:49 -05:00
parent 6029f39a3f
commit 9e71e0768f
1 changed files with 29 additions and 18 deletions

View File

@ -44,13 +44,13 @@ if TYPE_CHECKING:
class Flume(Struct):
'''
Composite reference type which points to all the addressing handles
and other meta-data necessary for the read, measure and management
of a set of real-time updated data flows.
Composite reference type which points to all the addressing
handles and other meta-data necessary for the read, measure and
management of a set of real-time updated data flows.
Can be thought of as a "flow descriptor" or "flow frame" which
describes the high level properties of a set of data flows that can
be used seamlessly across process-memory boundaries.
describes the high level properties of a set of data flows that
can be used seamlessly across process-memory boundaries.
Each instance's sub-components normally includes:
- a msg oriented quote stream provided via an IPC transport
@ -73,6 +73,7 @@ class Flume(Struct):
# private shm refs loaded dynamically from tokens
_hist_shm: ShmArray | None = None
_rt_shm: ShmArray | None = None
_readonly: bool = True
stream: tractor.MsgStream | None = None
izero_hist: int = 0
@ -89,7 +90,7 @@ class Flume(Struct):
if self._rt_shm is None:
self._rt_shm = attach_shm_array(
token=self._rt_shm_token,
readonly=True,
readonly=self._readonly,
)
return self._rt_shm
@ -102,12 +103,10 @@ class Flume(Struct):
'No shm token has been set for the history buffer?'
)
if (
self._hist_shm is None
):
if self._hist_shm is None:
self._hist_shm = attach_shm_array(
token=self._hist_shm_token,
readonly=True,
readonly=self._readonly,
)
return self._hist_shm
@ -126,10 +125,10 @@ class Flume(Struct):
period and ratio between them.
'''
times = self.hist_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
hist_step_size_s = (end - start).seconds
times: np.ndarray = self.hist_shm.array['time']
end: float | int = pendulum.from_timestamp(times[-1])
start: float | int = pendulum.from_timestamp(times[times != times[-1]][-1])
hist_step_size_s: float = (end - start).seconds
times = self.rt_shm.array['time']
end = pendulum.from_timestamp(times[-1])
@ -149,17 +148,25 @@ class Flume(Struct):
msg = self.to_dict()
msg['mkt'] = self.mkt.to_dict()
# can't serialize the stream or feed objects, it's expected
# you'll have a ref to it since this msg should be rxed on
# a stream on whatever far end IPC..
# NOTE: pop all un-msg-serializable fields:
# - `tractor.MsgStream`
# - `Feed`
# - `Shmarray`
# it's expected the `.from_msg()` on the other side
# will get instead some kind of msg-compat version
# that it can load.
msg.pop('stream')
msg.pop('feed')
msg.pop('_rt_shm')
msg.pop('_hist_shm')
return msg
@classmethod
def from_msg(
cls,
msg: dict,
readonly: bool = True,
) -> dict:
'''
@ -170,7 +177,11 @@ class Flume(Struct):
mkt_msg = msg.pop('mkt')
from ..accounting import MktPair # cycle otherwise..
mkt = MktPair.from_msg(mkt_msg)
return cls(mkt=mkt, **msg)
msg |= {'_readonly': readonly}
return cls(
mkt=mkt,
**msg,
)
def get_index(
self,