Move `maybe_open_feed()` above for readability

agg_feedz
Tyler Goodlet 2022-11-12 13:43:09 -05:00
parent c088963cf2
commit 1e96ca32df
1 changed files with 51 additions and 51 deletions

View File

@ -1495,6 +1495,57 @@ async def install_brokerd_search(
yield
@acm
async def maybe_open_feed(
fqsns: list[str],
loglevel: Optional[str] = None,
**kwargs,
) -> (
Feed,
ReceiveChannel[dict[str, Any]],
):
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
'''
fqsn = fqsns[0]
async with maybe_open_context(
acm_func=open_feed,
kwargs={
'fqsns': fqsns,
'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'),
# XXX: super critical to have bool defaults here XD
'backpressure': kwargs.get('backpressure', True),
'start_stream': kwargs.get('start_stream', True),
},
key=fqsn,
) as (cache_hit, feed):
if cache_hit:
log.info(f'Using cached feed for {fqsn}')
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with gather_contexts(
mngrs=[stream.subscribe() for stream in feed.streams.values()]
) as bstreams:
for bstream, flume in zip(bstreams, feed.flumes.values()):
flume.stream = bstream
yield feed
else:
yield feed
@acm
async def open_feed(
@ -1640,54 +1691,3 @@ async def open_feed(
assert len(feed.mods) == len(feed.portals) == len(feed.streams)
yield feed
@acm
async def maybe_open_feed(
fqsns: list[str],
loglevel: Optional[str] = None,
**kwargs,
) -> (
Feed,
ReceiveChannel[dict[str, Any]],
):
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
'''
fqsn = fqsns[0]
async with maybe_open_context(
acm_func=open_feed,
kwargs={
'fqsns': fqsns,
'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'),
# XXX: super critical to have bool defaults here XD
'backpressure': kwargs.get('backpressure', True),
'start_stream': kwargs.get('start_stream', True),
},
key=fqsn,
) as (cache_hit, feed):
if cache_hit:
log.info(f'Using cached feed for {fqsn}')
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with gather_contexts(
mngrs=[stream.subscribe() for stream in feed.streams.values()]
) as bstreams:
for bstream, flume in zip(bstreams, feed.flumes.values()):
flume.stream = bstream
yield feed
else:
yield feed