Open feeds using `Portal.open_context()`

fqsns
Tyler Goodlet 2022-03-11 16:49:37 -05:00
parent 97c2a2da3e
commit cc026dfb1d
1 changed files with 10 additions and 9 deletions

View File

@ -519,19 +519,20 @@ async def open_sample_step_stream(
# created for all practical purposes # created for all practical purposes
async with maybe_open_context( async with maybe_open_context(
acm_func=partial( acm_func=partial(
portal.open_stream_from, portal.open_context,
iter_ohlc_periods, iter_ohlc_periods,
), ),
kwargs={'delay_s': delay_s}, kwargs={'delay_s': delay_s},
) as (cache_hit, istream): ) as (cache_hit, (ctx, first)):
if cache_hit: async with ctx.open_stream() as istream:
# add a new broadcast subscription for the quote stream if cache_hit:
# if this feed is likely already in use # add a new broadcast subscription for the quote stream
async with istream.subscribe() as bistream: # if this feed is likely already in use
yield bistream async with istream.subscribe() as bistream:
else: yield bistream
yield istream else:
yield istream
@dataclass @dataclass