diff --git a/piker/data/flows.py b/piker/data/flows.py index f857d2f0..677b2f69 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -44,13 +44,13 @@ if TYPE_CHECKING: class Flume(Struct): ''' - Composite reference type which points to all the addressing handles - and other meta-data necessary for the read, measure and management - of a set of real-time updated data flows. + Composite reference type which points to all the addressing + handles and other meta-data necessary for the read, measure and + management of a set of real-time updated data flows. Can be thought of as a "flow descriptor" or "flow frame" which - describes the high level properties of a set of data flows that can - be used seamlessly across process-memory boundaries. + describes the high level properties of a set of data flows that + can be used seamlessly across process-memory boundaries. Each instance's sub-components normally includes: - a msg oriented quote stream provided via an IPC transport @@ -73,6 +73,7 @@ class Flume(Struct): # private shm refs loaded dynamically from tokens _hist_shm: ShmArray | None = None _rt_shm: ShmArray | None = None + _readonly: bool = True stream: tractor.MsgStream | None = None izero_hist: int = 0 @@ -89,7 +90,7 @@ class Flume(Struct): if self._rt_shm is None: self._rt_shm = attach_shm_array( token=self._rt_shm_token, - readonly=True, + readonly=self._readonly, ) return self._rt_shm @@ -102,12 +103,10 @@ class Flume(Struct): 'No shm token has been set for the history buffer?' ) - if ( - self._hist_shm is None - ): + if self._hist_shm is None: self._hist_shm = attach_shm_array( token=self._hist_shm_token, - readonly=True, + readonly=self._readonly, ) return self._hist_shm @@ -126,10 +125,10 @@ class Flume(Struct): period and ratio between them. ''' - times = self.hist_shm.array['time'] - end = pendulum.from_timestamp(times[-1]) - start = pendulum.from_timestamp(times[times != times[-1]][-1]) - hist_step_size_s = (end - start).seconds + times: np.ndarray = self.hist_shm.array['time'] + end: float | int = pendulum.from_timestamp(times[-1]) + start: float | int = pendulum.from_timestamp(times[times != times[-1]][-1]) + hist_step_size_s: float = (end - start).seconds times = self.rt_shm.array['time'] end = pendulum.from_timestamp(times[-1]) @@ -149,17 +148,25 @@ class Flume(Struct): msg = self.to_dict() msg['mkt'] = self.mkt.to_dict() - # can't serialize the stream or feed objects, it's expected - # you'll have a ref to it since this msg should be rxed on - # a stream on whatever far end IPC.. + # NOTE: pop all un-msg-serializable fields: + # - `tractor.MsgStream` + # - `Feed` + # - `Shmarray` + # it's expected the `.from_msg()` on the other side + # will get instead some kind of msg-compat version + # that it can load. msg.pop('stream') msg.pop('feed') + msg.pop('_rt_shm') + msg.pop('_hist_shm') + return msg @classmethod def from_msg( cls, msg: dict, + readonly: bool = True, ) -> dict: ''' @@ -170,7 +177,11 @@ class Flume(Struct): mkt_msg = msg.pop('mkt') from ..accounting import MktPair # cycle otherwise.. mkt = MktPair.from_msg(mkt_msg) - return cls(mkt=mkt, **msg) + msg |= {'_readonly': readonly} + return cls( + mkt=mkt, + **msg, + ) def get_index( self,