diff --git a/piker/data/feed.py b/piker/data/feed.py index d26c7f37..4dc63da6 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1495,6 +1495,57 @@ async def install_brokerd_search( yield +@acm +async def maybe_open_feed( + + fqsns: list[str], + loglevel: Optional[str] = None, + + **kwargs, + +) -> ( + Feed, + ReceiveChannel[dict[str, Any]], +): + ''' + Maybe open a data to a ``brokerd`` daemon only if there is no + local one for the broker-symbol pair, if one is cached use it wrapped + in a tractor broadcast receiver. + + ''' + fqsn = fqsns[0] + + async with maybe_open_context( + acm_func=open_feed, + kwargs={ + 'fqsns': fqsns, + 'loglevel': loglevel, + 'tick_throttle': kwargs.get('tick_throttle'), + + # XXX: super critical to have bool defaults here XD + 'backpressure': kwargs.get('backpressure', True), + 'start_stream': kwargs.get('start_stream', True), + }, + key=fqsn, + + ) as (cache_hit, feed): + + if cache_hit: + log.info(f'Using cached feed for {fqsn}') + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + + async with gather_contexts( + mngrs=[stream.subscribe() for stream in feed.streams.values()] + ) as bstreams: + for bstream, flume in zip(bstreams, feed.flumes.values()): + flume.stream = bstream + + yield feed + else: + yield feed + + @acm async def open_feed( @@ -1640,54 +1691,3 @@ async def open_feed( assert len(feed.mods) == len(feed.portals) == len(feed.streams) yield feed - - -@acm -async def maybe_open_feed( - - fqsns: list[str], - loglevel: Optional[str] = None, - - **kwargs, - -) -> ( - Feed, - ReceiveChannel[dict[str, Any]], -): - ''' - Maybe open a data to a ``brokerd`` daemon only if there is no - local one for the broker-symbol pair, if one is cached use it wrapped - in a tractor broadcast receiver. - - ''' - fqsn = fqsns[0] - - async with maybe_open_context( - acm_func=open_feed, - kwargs={ - 'fqsns': fqsns, - 'loglevel': loglevel, - 'tick_throttle': kwargs.get('tick_throttle'), - - # XXX: super critical to have bool defaults here XD - 'backpressure': kwargs.get('backpressure', True), - 'start_stream': kwargs.get('start_stream', True), - }, - key=fqsn, - - ) as (cache_hit, feed): - - if cache_hit: - log.info(f'Using cached feed for {fqsn}') - # add a new broadcast subscription for the quote stream - # if this feed is likely already in use - - async with gather_contexts( - mngrs=[stream.subscribe() for stream in feed.streams.values()] - ) as bstreams: - for bstream, flume in zip(bstreams, feed.flumes.values()): - flume.stream = bstream - - yield feed - else: - yield feed