Make `Flume.index_stream()` defer to new sampling api

samplerd_service
Tyler Goodlet 2023-01-10 15:36:39 -05:00
parent c8c641a038
commit 045b76bab5
1 changed files with 5 additions and 17 deletions

View File

@ -21,6 +21,7 @@ real-time data processing data-structures.
"Streams, flumes, cascades and flows.." "Streams, flumes, cascades and flows.."
""" """
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from typing import ( from typing import (
@ -45,7 +46,7 @@ from ._sharedmem import (
_Token, _Token,
) )
from ._sampling import ( from ._sampling import (
iter_ohlc_periods, open_sample_stream,
) )
if TYPE_CHECKING: if TYPE_CHECKING:
@ -149,7 +150,7 @@ class Flume(Struct):
@acm @acm
async def index_stream( async def index_stream(
self, self,
delay_s: int = 1, delay_s: float = 1,
) -> AsyncIterator[int]: ) -> AsyncIterator[int]:
@ -163,21 +164,8 @@ class Flume(Struct):
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
async with maybe_open_context( async with open_sample_stream(float(delay_s)) as stream:
acm_func=partial( yield stream
portal.open_context,
iter_ohlc_periods,
),
kwargs={'delay_s': delay_s},
) as (cache_hit, (ctx, first)):
async with ctx.open_stream() as istream:
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with istream.subscribe() as bistream:
yield bistream
else:
yield istream
def get_ds_info( def get_ds_info(
self, self,