diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index 30a00633..a332ec5f 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -40,6 +40,8 @@ from tractor.msg import NamespacePath from ..data._sharedmem import ( ShmArray, maybe_open_shm_array, + attach_shm_array, + _Token, ) from ..log import get_logger @@ -72,6 +74,13 @@ class Fsp: # - custom function wrappers, # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers + # actor-local map of source flow shm tokens + # + the consuming fsp *to* the consumers output + # shm flow. + _flow_registry: dict[ + tuple[_Token, str], _Token, + ] = {} + def __init__( self, func: Callable[..., Awaitable], @@ -93,7 +102,7 @@ class Fsp: self.config: dict[str, Any] = config # register with declared set. - _fsp_registry[self.ns_path] = func + _fsp_registry[self.ns_path] = self @property def name(self) -> str: @@ -111,6 +120,24 @@ class Fsp: ): return self.func(*args, **kwargs) + # TODO: lru_cache this? prettty sure it'll work? + def get_shm( + self, + src_shm: ShmArray, + + ) -> ShmArray: + ''' + Provide access to allocated shared mem array + for this "instance" of a signal processor for + the given ``key``. + + ''' + dst_token = self._flow_registry[ + (src_shm._token, self.name) + ] + shm = attach_shm_array(dst_token) + return shm + def fsp( wrapped=None, @@ -132,19 +159,27 @@ def fsp( return Fsp(wrapped, outputs=(wrapped.__name__,)) +def mk_fsp_shm_key( + sym: str, + target: Fsp + +) -> str: + uid = tractor.current_actor().uid + return f'{sym}.fsp.{target.name}.{".".join(uid)}' + + def maybe_mk_fsp_shm( sym: str, - target: fsp, + target: Fsp, readonly: bool = True, -) -> (ShmArray, bool): +) -> (str, ShmArray, bool): ''' Allocate a single row shm array for an symbol-fsp pair if none exists, otherwise load the shm already existing for that token. ''' assert isinstance(sym, str), '`sym` should be file-name-friendly `str`' - uid = tractor.current_actor().uid # TODO: load output types from `Fsp` # - should `index` be a required internal field? @@ -153,7 +188,7 @@ def maybe_mk_fsp_shm( [(field_name, float) for field_name in target.outputs] ) - key = f'{sym}.fsp.{target.name}.{".".join(uid)}' + key = mk_fsp_shm_key(sym, target) shm, opened = maybe_open_shm_array( key, @@ -161,4 +196,4 @@ def maybe_mk_fsp_shm( dtype=fsp_dtype, readonly=True, ) - return shm, opened + return key, shm, opened