diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 91197d60..f556dd3a 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -32,7 +32,7 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed, maybe_open_feed +from ..data.feed import Feed, maybe_open_feed, open_feed from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( @@ -958,13 +958,17 @@ async def _emsd_main( # spawn one task per broker feed async with ( - maybe_open_feed( + open_feed( + # maybe_open_feed( broker, [symbol], loglevel=loglevel, - ) as (feed, stream), + # ) as (feed, stream), + ) as feed, ): + stream = feed.stream + # XXX: this should be initial price quote from target provider first_quote = feed.first_quote diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 3b5c359a..60bc1ecb 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -177,11 +177,15 @@ async def cascade( func: Callable = _fsps[fsp_func_name] # open a data feed stream with requested broker - async with data.feed.maybe_open_feed( + # async with data.feed.maybe_open_feed( + async with data.feed.open_feed( brokername, [symbol], shielded_stream=True, - ) as (feed, stream): + # ) as (feed, stream): + ) as feed: + + stream = feed.stream assert src.token == feed.shm.token