Expect fqsns through fsp machinery

fqsns
Tyler Goodlet 2022-03-18 15:04:15 -04:00
parent 998a5acd92
commit 6ac60fbe22
1 changed files with 7 additions and 6 deletions

View File

@ -37,6 +37,7 @@ from .. import data
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data._sharedmem import ShmArray
from ..data._source import Symbol
from ._api import (
Fsp,
_load_builtins,
@ -76,7 +77,7 @@ async def filter_quotes_by_sym(
async def fsp_compute(
ctx: tractor.Context,
symbol: str,
symbol: Symbol,
feed: Feed,
quote_stream: trio.abc.ReceiveChannel,
@ -95,13 +96,14 @@ async def fsp_compute(
disabled=True
)
fqsn = symbol.front_fqsn()
out_stream = func(
# TODO: do we even need this if we do the feed api right?
# shouldn't a local stream do this before we get a handle
# to the async iterable? it's that or we do some kinda
# async itertools style?
filter_quotes_by_sym(symbol, quote_stream),
filter_quotes_by_sym(fqsn, quote_stream),
# XXX: currently the ``ohlcv`` arg
feed.shm,
@ -235,8 +237,7 @@ async def cascade(
ctx: tractor.Context,
# data feed key
brokername: str,
symbol: str,
fqsn: str,
src_shm_token: dict,
dst_shm_token: tuple[str, np.dtype],
@ -289,8 +290,7 @@ async def cascade(
# open a data feed stream with requested broker
async with data.feed.maybe_open_feed(
brokername,
[symbol],
[fqsn],
# TODO throttle tick outputs from *this* daemon since
# it'll emit tons of ticks due to the throttle only
@ -299,6 +299,7 @@ async def cascade(
# tick_throttle=60,
) as (feed, quote_stream):
symbol = feed.symbols[fqsn]
profiler(f'{func}: feed up')