Cache fsp ``ShmArrays`` where possible

Minimize calling `.data._shmarray.attach_shm_array()` as much as is
possible to avoid the crash from #332. This is the suggested hack from
issue #359.

Resolves https://github.com/pikers/piker/issues/359
fsp_shm_caching
Tyler Goodlet 2022-07-15 17:10:09 -04:00
parent 90bc9b9730
commit e0491cf2e7
3 changed files with 28 additions and 11 deletions

View File

@ -78,7 +78,8 @@ class Fsp:
# + the consuming fsp *to* the consumers output # + the consuming fsp *to* the consumers output
# shm flow. # shm flow.
_flow_registry: dict[ _flow_registry: dict[
tuple[_Token, str], _Token, tuple[_Token, str],
tuple[_Token, Optional[ShmArray]],
] = {} ] = {}
def __init__( def __init__(
@ -120,7 +121,6 @@ class Fsp:
): ):
return self.func(*args, **kwargs) return self.func(*args, **kwargs)
# TODO: lru_cache this? prettty sure it'll work?
def get_shm( def get_shm(
self, self,
src_shm: ShmArray, src_shm: ShmArray,
@ -131,12 +131,27 @@ class Fsp:
for this "instance" of a signal processor for for this "instance" of a signal processor for
the given ``key``. the given ``key``.
The destination shm "token" and array are cached if possible to
minimize multiple stdlib/system calls.
''' '''
dst_token = self._flow_registry[ dst_token, maybe_array = self._flow_registry[
(src_shm._token, self.name) (src_shm._token, self.name)
] ]
shm = attach_shm_array(dst_token) if maybe_array is None:
return shm self._flow_registry[
(src_shm._token, self.name)
] = (
dst_token,
# "cache" the ``ShmArray`` such that
# we call the underlying "attach" code as few
# times as possible as per:
# - https://github.com/pikers/piker/issues/359
# - https://github.com/pikers/piker/issues/332
maybe_array := attach_shm_array(dst_token)
)
return maybe_array
def fsp( def fsp(

View File

@ -284,9 +284,10 @@ async def cascade(
# TODO: ugh i hate this wind/unwind to list over the wire # TODO: ugh i hate this wind/unwind to list over the wire
# but not sure how else to do it. # but not sure how else to do it.
for (token, fsp_name, dst_token) in shm_registry: for (token, fsp_name, dst_token) in shm_registry:
Fsp._flow_registry[ Fsp._flow_registry[(
(_Token.from_msg(token), fsp_name) _Token.from_msg(token),
] = _Token.from_msg(dst_token) fsp_name,
)] = _Token.from_msg(dst_token), None
fsp: Fsp = reg.get( fsp: Fsp = reg.get(
NamespacePath(ns_path) NamespacePath(ns_path)

View File

@ -469,9 +469,10 @@ class FspAdmin:
target=target, target=target,
readonly=True, readonly=True,
) )
self._flow_registry[ self._flow_registry[(
(self.src_shm._token, target.name) self.src_shm._token,
] = dst_shm._token target.name
)] = dst_shm._token
# if not opened: # if not opened:
# raise RuntimeError( # raise RuntimeError(