From d130f0449f15864260f5eeb381f8bc7cea6d0075 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 10:40:45 -0500 Subject: [PATCH] Expect registry of fsp "flows" to each engine task In order for fsp routines to be able to look up other "flows" in the cascade, we need a small registry-table which gives access to a map of a source stream + an fsp -> an output stream. Eventually we'll also likely want a dependency (injection) mechanism so that any fsp demanded can either be dynamically allocated or at the least waited upon before a consumer tries to access it. --- piker/fsp/_engine.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index f9ba0c39..1b853c60 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -37,8 +37,11 @@ from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray -from ._api import Fsp -from ._api import _load_builtins +from ._api import ( + Fsp, + _load_builtins, + _Token, +) log = get_logger(__name__) @@ -99,6 +102,8 @@ async def fsp_compute( # to the async iterable? it's that or we do some kinda # async itertools style? filter_quotes_by_sym(symbol, quote_stream), + + # XXX: currently the ``ohlcv`` arg feed.shm, ) @@ -239,6 +244,8 @@ async def cascade( ns_path: NamespacePath, + shm_registry: dict[str, _Token], + zero_on_step: bool = False, loglevel: Optional[str] = None, @@ -261,9 +268,21 @@ async def cascade( log.info( f'Registered FSP set:\n{lines}' ) - func: Fsp = reg.get( + + # update actor local flows table which registers + # readonly "instances" of this fsp for symbol/source + # so that consumer fsps can look it up by source + fsp. + # TODO: ugh i hate this wind/unwind to list over the wire + # but not sure how else to do it. + for (token, fsp_name, dst_token) in shm_registry: + Fsp._flow_registry[ + (_Token.from_msg(token), fsp_name) + ] = _Token.from_msg(dst_token) + + fsp: Fsp = reg.get( NamespacePath(ns_path) ) + func = fsp.func if not func: # TODO: assume it's a func target path