diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 97869bb9..513dded8 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -38,7 +38,7 @@ log = get_logger(__name__) @dataclass class OrderBook: - """Buy-side (client-side ?) order book ctl and tracking. + '''EMS-client-side order book ctl and tracking. A style similar to "model-view" is used here where this api is provided as a supervised control for an EMS actor which does all the @@ -48,7 +48,7 @@ class OrderBook: Currently, this is mostly for keeping local state to match the EMS and use received events to trigger graphics updates. - """ + ''' # mem channels used to relay order requests to the EMS daemon _to_ems: trio.abc.SendChannel _from_order_book: trio.abc.ReceiveChannel diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 06536859..91197d60 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -32,9 +32,8 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed, open_feed +from ..data.feed import Feed, maybe_open_feed from .._daemon import maybe_spawn_brokerd -from .._cacheables import maybe_open_ctx from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -959,15 +958,11 @@ async def _emsd_main( # spawn one task per broker feed async with ( - maybe_open_ctx( - key=(broker, symbol), - mngr=open_feed( - broker, - [symbol], - loglevel=loglevel, - ), + maybe_open_feed( + broker, + [symbol], loglevel=loglevel, - ) as feed, + ) as (feed, stream), ): # XXX: this should be initial price quote from target provider @@ -1011,7 +1006,7 @@ async def _emsd_main( brokerd_stream, ems_client_order_stream, - feed.stream, + stream, broker, symbol, book diff --git a/piker/data/feed.py b/piker/data/feed.py index c56c0720..07df13a1 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -441,7 +441,7 @@ async def open_feed( tick_throttle: Optional[float] = None, # Hz shielded_stream: bool = False, -) -> ReceiveChannel[dict[str, Any]]: +) -> Feed: ''' Open a "data feed" which provides streamed real-time quotes. @@ -522,7 +522,7 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) try: - yield feed, bstream + yield feed finally: # drop the infinite stream connection await ctx.cancel() @@ -538,7 +538,7 @@ async def maybe_open_feed( tick_throttle: Optional[float] = None, # Hz shielded_stream: bool = False, -) -> ReceiveChannel[dict[str, Any]]: +) -> (Feed, ReceiveChannel[dict[str, Any]]): '''Maybe open a data to a ``brokerd`` daemon only if there is no local one for the broker-symbol pair, if one is cached use it wrapped in a tractor broadcast receiver. @@ -553,12 +553,12 @@ async def maybe_open_feed( [sym], loglevel=loglevel, ), - ) as (cache_hit, (feed, stream)): + ) as (cache_hit, feed): if cache_hit: # add a new broadcast subscription for the quote stream # if this feed is likely already in use - async with stream.subscribe() as bstream: + async with feed.stream.subscribe() as bstream: yield feed, bstream else: yield feed, stream diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 312a0cef..3b5c359a 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -69,6 +69,7 @@ async def fsp_compute( ctx: tractor.Context, symbol: str, feed: Feed, + stream: trio.abc.ReceiveChannel, src: ShmArray, dst: ShmArray, @@ -93,14 +94,14 @@ async def fsp_compute( yield {} # task cancellation won't kill the channel - with stream.shield(): - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes + # since we shielded at the `open_feed()` call + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes out_stream = func( - filter_by_sym(symbol, feed.stream), + filter_by_sym(symbol, stream), feed.shm, ) @@ -164,7 +165,8 @@ async def cascade( dst_shm_token: Tuple[str, np.dtype], symbol: str, fsp_func_name: str, -) -> AsyncIterator[dict]: + +) -> None: """Chain streaming signal processors and deliver output to destination mem buf. @@ -175,7 +177,11 @@ async def cascade( func: Callable = _fsps[fsp_func_name] # open a data feed stream with requested broker - async with data.open_feed(brokername, [symbol]) as feed: + async with data.feed.maybe_open_feed( + brokername, + [symbol], + shielded_stream=True, + ) as (feed, stream): assert src.token == feed.shm.token @@ -186,6 +192,7 @@ async def cascade( ctx=ctx, symbol=symbol, feed=feed, + stream=stream, src=src, dst=dst,