Add `Fsp._flow_registry` as actor-local table

Define the flows table as a class var (thus making it a "global" and/or
actor-local state) which can be accessed by any in process task. Add
`Fsp.get_shm()` to allow accessing output streams by source-token + fsp
routine reference and thus providing inter-fsp low level access to
real-time flows.
windows_fixes_yo
Tyler Goodlet 2022-02-04 11:11:34 -05:00
parent df6afe24a4
commit ebf3e00438
1 changed files with 41 additions and 6 deletions

View File

@ -40,6 +40,8 @@ from tractor.msg import NamespacePath
from ..data._sharedmem import ( from ..data._sharedmem import (
ShmArray, ShmArray,
maybe_open_shm_array, maybe_open_shm_array,
attach_shm_array,
_Token,
) )
from ..log import get_logger from ..log import get_logger
@ -72,6 +74,13 @@ class Fsp:
# - custom function wrappers, # - custom function wrappers,
# https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers
# actor-local map of source flow shm tokens
# + the consuming fsp *to* the consumers output
# shm flow.
_flow_registry: dict[
tuple[_Token, str], _Token,
] = {}
def __init__( def __init__(
self, self,
func: Callable[..., Awaitable], func: Callable[..., Awaitable],
@ -93,7 +102,7 @@ class Fsp:
self.config: dict[str, Any] = config self.config: dict[str, Any] = config
# register with declared set. # register with declared set.
_fsp_registry[self.ns_path] = func _fsp_registry[self.ns_path] = self
@property @property
def name(self) -> str: def name(self) -> str:
@ -111,6 +120,24 @@ class Fsp:
): ):
return self.func(*args, **kwargs) return self.func(*args, **kwargs)
# TODO: lru_cache this? prettty sure it'll work?
def get_shm(
self,
src_shm: ShmArray,
) -> ShmArray:
'''
Provide access to allocated shared mem array
for this "instance" of a signal processor for
the given ``key``.
'''
dst_token = self._flow_registry[
(src_shm._token, self.name)
]
shm = attach_shm_array(dst_token)
return shm
def fsp( def fsp(
wrapped=None, wrapped=None,
@ -132,19 +159,27 @@ def fsp(
return Fsp(wrapped, outputs=(wrapped.__name__,)) return Fsp(wrapped, outputs=(wrapped.__name__,))
def mk_fsp_shm_key(
sym: str,
target: Fsp
) -> str:
uid = tractor.current_actor().uid
return f'{sym}.fsp.{target.name}.{".".join(uid)}'
def maybe_mk_fsp_shm( def maybe_mk_fsp_shm(
sym: str, sym: str,
target: fsp, target: Fsp,
readonly: bool = True, readonly: bool = True,
) -> (ShmArray, bool): ) -> (str, ShmArray, bool):
''' '''
Allocate a single row shm array for an symbol-fsp pair if none Allocate a single row shm array for an symbol-fsp pair if none
exists, otherwise load the shm already existing for that token. exists, otherwise load the shm already existing for that token.
''' '''
assert isinstance(sym, str), '`sym` should be file-name-friendly `str`' assert isinstance(sym, str), '`sym` should be file-name-friendly `str`'
uid = tractor.current_actor().uid
# TODO: load output types from `Fsp` # TODO: load output types from `Fsp`
# - should `index` be a required internal field? # - should `index` be a required internal field?
@ -153,7 +188,7 @@ def maybe_mk_fsp_shm(
[(field_name, float) for field_name in target.outputs] [(field_name, float) for field_name in target.outputs]
) )
key = f'{sym}.fsp.{target.name}.{".".join(uid)}' key = mk_fsp_shm_key(sym, target)
shm, opened = maybe_open_shm_array( shm, opened = maybe_open_shm_array(
key, key,
@ -161,4 +196,4 @@ def maybe_mk_fsp_shm(
dtype=fsp_dtype, dtype=fsp_dtype,
readonly=True, readonly=True,
) )
return shm, opened return key, shm, opened