Merge pull request #360 from pikers/fsp_shm_caching

Fsp shm caching
ahab_you_bad_boi
goodboy 2022-07-19 09:55:27 -04:00 committed by GitHub
commit f2dba44169
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 29 additions and 12 deletions

View File

@ -309,7 +309,7 @@ async def start_backfill(
# when no tsdb "last datum" is provided, we just load
# some near-term history.
periods = {
1: {'days': 1},
1: {'seconds': 4000},
60: {'days': 14},
}

View File

@ -78,7 +78,8 @@ class Fsp:
# + the consuming fsp *to* the consumers output
# shm flow.
_flow_registry: dict[
tuple[_Token, str], _Token,
tuple[_Token, str],
tuple[_Token, Optional[ShmArray]],
] = {}
def __init__(
@ -120,7 +121,6 @@ class Fsp:
):
return self.func(*args, **kwargs)
# TODO: lru_cache this? prettty sure it'll work?
def get_shm(
self,
src_shm: ShmArray,
@ -131,12 +131,27 @@ class Fsp:
for this "instance" of a signal processor for
the given ``key``.
The destination shm "token" and array are cached if possible to
minimize multiple stdlib/system calls.
'''
dst_token = self._flow_registry[
dst_token, maybe_array = self._flow_registry[
(src_shm._token, self.name)
]
shm = attach_shm_array(dst_token)
return shm
if maybe_array is None:
self._flow_registry[
(src_shm._token, self.name)
] = (
dst_token,
# "cache" the ``ShmArray`` such that
# we call the underlying "attach" code as few
# times as possible as per:
# - https://github.com/pikers/piker/issues/359
# - https://github.com/pikers/piker/issues/332
maybe_array := attach_shm_array(dst_token)
)
return maybe_array
def fsp(

View File

@ -284,9 +284,10 @@ async def cascade(
# TODO: ugh i hate this wind/unwind to list over the wire
# but not sure how else to do it.
for (token, fsp_name, dst_token) in shm_registry:
Fsp._flow_registry[
(_Token.from_msg(token), fsp_name)
] = _Token.from_msg(dst_token)
Fsp._flow_registry[(
_Token.from_msg(token),
fsp_name,
)] = _Token.from_msg(dst_token), None
fsp: Fsp = reg.get(
NamespacePath(ns_path)

View File

@ -469,9 +469,10 @@ class FspAdmin:
target=target,
readonly=True,
)
self._flow_registry[
(self.src_shm._token, target.name)
] = dst_shm._token
self._flow_registry[(
self.src_shm._token,
target.name
)] = dst_shm._token
# if not opened:
# raise RuntimeError(