diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index f9ba0c39..1b853c60 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -37,8 +37,11 @@ from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray -from ._api import Fsp -from ._api import _load_builtins +from ._api import ( + Fsp, + _load_builtins, + _Token, +) log = get_logger(__name__) @@ -99,6 +102,8 @@ async def fsp_compute( # to the async iterable? it's that or we do some kinda # async itertools style? filter_quotes_by_sym(symbol, quote_stream), + + # XXX: currently the ``ohlcv`` arg feed.shm, ) @@ -239,6 +244,8 @@ async def cascade( ns_path: NamespacePath, + shm_registry: dict[str, _Token], + zero_on_step: bool = False, loglevel: Optional[str] = None, @@ -261,9 +268,21 @@ async def cascade( log.info( f'Registered FSP set:\n{lines}' ) - func: Fsp = reg.get( + + # update actor local flows table which registers + # readonly "instances" of this fsp for symbol/source + # so that consumer fsps can look it up by source + fsp. + # TODO: ugh i hate this wind/unwind to list over the wire + # but not sure how else to do it. + for (token, fsp_name, dst_token) in shm_registry: + Fsp._flow_registry[ + (_Token.from_msg(token), fsp_name) + ] = _Token.from_msg(dst_token) + + fsp: Fsp = reg.get( NamespacePath(ns_path) ) + func = fsp.func if not func: # TODO: assume it's a func target path