diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index f1dd49d7..7e75c283 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -37,6 +37,7 @@ from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray +from ..data._source import Symbol from ._api import ( Fsp, _load_builtins, @@ -76,7 +77,7 @@ async def filter_quotes_by_sym( async def fsp_compute( ctx: tractor.Context, - symbol: str, + symbol: Symbol, feed: Feed, quote_stream: trio.abc.ReceiveChannel, @@ -95,13 +96,14 @@ async def fsp_compute( disabled=True ) + fqsn = symbol.front_fqsn() out_stream = func( # TODO: do we even need this if we do the feed api right? # shouldn't a local stream do this before we get a handle # to the async iterable? it's that or we do some kinda # async itertools style? - filter_quotes_by_sym(symbol, quote_stream), + filter_quotes_by_sym(fqsn, quote_stream), # XXX: currently the ``ohlcv`` arg feed.shm, @@ -235,8 +237,7 @@ async def cascade( ctx: tractor.Context, # data feed key - brokername: str, - symbol: str, + fqsn: str, src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], @@ -289,8 +290,7 @@ async def cascade( # open a data feed stream with requested broker async with data.feed.maybe_open_feed( - brokername, - [symbol], + [fqsn], # TODO throttle tick outputs from *this* daemon since # it'll emit tons of ticks due to the throttle only @@ -299,6 +299,7 @@ async def cascade( # tick_throttle=60, ) as (feed, quote_stream): + symbol = feed.symbols[fqsn] profiler(f'{func}: feed up')