Drop `Flume.index_stream()`, `._sampling.open_sample_stream()` replaces it

epoch_index_backup
Tyler Goodlet 2023-01-04 22:57:26 -05:00
parent c531f8a69a
commit 8ed48add18
1 changed files with 0 additions and 42 deletions

View File

@ -22,17 +22,11 @@ real-time data processing data-structures.
"""
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from functools import partial
from typing import (
AsyncIterator,
TYPE_CHECKING,
)
import tractor
from tractor.trionics import (
maybe_open_context,
)
import pendulum
import numpy as np
@ -45,9 +39,6 @@ from ._sharedmem import (
ShmArray,
_Token,
)
from ._sampling import (
iter_ohlc_periods,
)
# from .._profile import (
# Profiler,
# pg_profile_enabled,
@ -151,39 +142,6 @@ class Flume(Struct):
async def receive(self) -> dict:
return await self.stream.receive()
@acm
async def index_stream(
self,
delay_s: int = 1,
) -> AsyncIterator[int]:
if not self.feed:
raise RuntimeError('This flume is not part of any ``Feed``?')
# TODO: maybe a public (property) API for this in ``tractor``?
portal = self.stream._ctx._portal
assert portal
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_context(
acm_func=partial(
portal.open_context,
iter_ohlc_periods,
),
kwargs={'delay_s': delay_s},
) as (cache_hit, (ctx, first)):
async with ctx.open_stream() as istream:
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with istream.subscribe() as bistream:
yield bistream
else:
yield istream
def get_ds_info(
self,
) -> tuple[float, float, float]: