From 90bc9b9730439974a68f5f147c6d8046a23bfe15 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Jul 2022 16:27:06 -0400 Subject: [PATCH 1/2] Only 4k seconds of 1s ohlc when no tsdb --- piker/data/feed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 2795535d..dfd47852 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -309,7 +309,7 @@ async def start_backfill( # when no tsdb "last datum" is provided, we just load # some near-term history. periods = { - 1: {'days': 1}, + 1: {'seconds': 4000}, 60: {'days': 14}, } From e0491cf2e77edd110cb3a2093fa7b4150c634b6c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Jul 2022 17:10:09 -0400 Subject: [PATCH 2/2] Cache fsp ``ShmArrays`` where possible Minimize calling `.data._shmarray.attach_shm_array()` as much as is possible to avoid the crash from #332. This is the suggested hack from issue #359. Resolves https://github.com/pikers/piker/issues/359 --- piker/fsp/_api.py | 25 ++++++++++++++++++++----- piker/fsp/_engine.py | 7 ++++--- piker/ui/_fsp.py | 7 ++++--- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index a332ec5f..f4e42bc1 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -78,7 +78,8 @@ class Fsp: # + the consuming fsp *to* the consumers output # shm flow. _flow_registry: dict[ - tuple[_Token, str], _Token, + tuple[_Token, str], + tuple[_Token, Optional[ShmArray]], ] = {} def __init__( @@ -120,7 +121,6 @@ class Fsp: ): return self.func(*args, **kwargs) - # TODO: lru_cache this? prettty sure it'll work? def get_shm( self, src_shm: ShmArray, @@ -131,12 +131,27 @@ class Fsp: for this "instance" of a signal processor for the given ``key``. + The destination shm "token" and array are cached if possible to + minimize multiple stdlib/system calls. + ''' - dst_token = self._flow_registry[ + dst_token, maybe_array = self._flow_registry[ (src_shm._token, self.name) ] - shm = attach_shm_array(dst_token) - return shm + if maybe_array is None: + self._flow_registry[ + (src_shm._token, self.name) + ] = ( + dst_token, + # "cache" the ``ShmArray`` such that + # we call the underlying "attach" code as few + # times as possible as per: + # - https://github.com/pikers/piker/issues/359 + # - https://github.com/pikers/piker/issues/332 + maybe_array := attach_shm_array(dst_token) + ) + + return maybe_array def fsp( diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 94d9bfa1..d9f3af26 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -284,9 +284,10 @@ async def cascade( # TODO: ugh i hate this wind/unwind to list over the wire # but not sure how else to do it. for (token, fsp_name, dst_token) in shm_registry: - Fsp._flow_registry[ - (_Token.from_msg(token), fsp_name) - ] = _Token.from_msg(dst_token) + Fsp._flow_registry[( + _Token.from_msg(token), + fsp_name, + )] = _Token.from_msg(dst_token), None fsp: Fsp = reg.get( NamespacePath(ns_path) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 5301db6a..f3165667 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -469,9 +469,10 @@ class FspAdmin: target=target, readonly=True, ) - self._flow_registry[ - (self.src_shm._token, target.name) - ] = dst_shm._token + self._flow_registry[( + self.src_shm._token, + target.name + )] = dst_shm._token # if not opened: # raise RuntimeError(