Expect fqsns through fsp machinery

mkts_backup
Tyler Goodlet 2022-03-18 15:04:15 -04:00
parent a8cb6c2056
commit d03cd23571
1 changed files with 7 additions and 6 deletions

View File

@ -37,6 +37,7 @@ from .. import data
from ..data import attach_shm_array from ..data import attach_shm_array
from ..data.feed import Feed from ..data.feed import Feed
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ..data._source import Symbol
from ._api import ( from ._api import (
Fsp, Fsp,
_load_builtins, _load_builtins,
@ -76,7 +77,7 @@ async def filter_quotes_by_sym(
async def fsp_compute( async def fsp_compute(
ctx: tractor.Context, ctx: tractor.Context,
symbol: str, symbol: Symbol,
feed: Feed, feed: Feed,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -95,13 +96,14 @@ async def fsp_compute(
disabled=True disabled=True
) )
fqsn = symbol.front_fqsn()
out_stream = func( out_stream = func(
# TODO: do we even need this if we do the feed api right? # TODO: do we even need this if we do the feed api right?
# shouldn't a local stream do this before we get a handle # shouldn't a local stream do this before we get a handle
# to the async iterable? it's that or we do some kinda # to the async iterable? it's that or we do some kinda
# async itertools style? # async itertools style?
filter_quotes_by_sym(symbol, quote_stream), filter_quotes_by_sym(fqsn, quote_stream),
# XXX: currently the ``ohlcv`` arg # XXX: currently the ``ohlcv`` arg
feed.shm, feed.shm,
@ -241,8 +243,7 @@ async def cascade(
ctx: tractor.Context, ctx: tractor.Context,
# data feed key # data feed key
brokername: str, fqsn: str,
symbol: str,
src_shm_token: dict, src_shm_token: dict,
dst_shm_token: tuple[str, np.dtype], dst_shm_token: tuple[str, np.dtype],
@ -298,8 +299,7 @@ async def cascade(
# open a data feed stream with requested broker # open a data feed stream with requested broker
async with data.feed.maybe_open_feed( async with data.feed.maybe_open_feed(
brokername, [fqsn],
[symbol],
# TODO throttle tick outputs from *this* daemon since # TODO throttle tick outputs from *this* daemon since
# it'll emit tons of ticks due to the throttle only # it'll emit tons of ticks due to the throttle only
@ -308,6 +308,7 @@ async def cascade(
# tick_throttle=60, # tick_throttle=60,
) as (feed, quote_stream): ) as (feed, quote_stream):
symbol = feed.symbols[fqsn]
profiler(f'{func}: feed up') profiler(f'{func}: feed up')