Expect registry of fsp "flows" to each engine task

In order for fsp routines to be able to look up other "flows" in the
cascade, we need a small registry-table which gives access to a map of
a source stream + an fsp -> an output stream. Eventually we'll also
likely want a dependency (injection) mechanism so that any fsp demanded
can either be dynamically allocated or at the least waited upon before
a consumer tries to access it.
windows_fixes_yo
Tyler Goodlet 2022-02-04 10:40:45 -05:00
parent efb743fd85
commit d130f0449f
1 changed files with 22 additions and 3 deletions

View File

@ -37,8 +37,11 @@ from .. import data
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data._sharedmem import ShmArray
from ._api import Fsp
from ._api import _load_builtins
from ._api import (
Fsp,
_load_builtins,
_Token,
)
log = get_logger(__name__)
@ -99,6 +102,8 @@ async def fsp_compute(
# to the async iterable? it's that or we do some kinda
# async itertools style?
filter_quotes_by_sym(symbol, quote_stream),
# XXX: currently the ``ohlcv`` arg
feed.shm,
)
@ -239,6 +244,8 @@ async def cascade(
ns_path: NamespacePath,
shm_registry: dict[str, _Token],
zero_on_step: bool = False,
loglevel: Optional[str] = None,
@ -261,9 +268,21 @@ async def cascade(
log.info(
f'Registered FSP set:\n{lines}'
)
func: Fsp = reg.get(
# update actor local flows table which registers
# readonly "instances" of this fsp for symbol/source
# so that consumer fsps can look it up by source + fsp.
# TODO: ugh i hate this wind/unwind to list over the wire
# but not sure how else to do it.
for (token, fsp_name, dst_token) in shm_registry:
Fsp._flow_registry[
(_Token.from_msg(token), fsp_name)
] = _Token.from_msg(dst_token)
fsp: Fsp = reg.get(
NamespacePath(ns_path)
)
func = fsp.func
if not func:
# TODO: assume it's a func target path