diff --git a/piker/data/flows.py b/piker/data/flows.py index 01ed7851..9d8b3103 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -22,17 +22,11 @@ real-time data processing data-structures. """ from __future__ import annotations -from contextlib import asynccontextmanager as acm -from functools import partial from typing import ( - AsyncIterator, TYPE_CHECKING, ) import tractor -from tractor.trionics import ( - maybe_open_context, -) import pendulum import numpy as np @@ -45,9 +39,6 @@ from ._sharedmem import ( ShmArray, _Token, ) -from ._sampling import ( - open_sample_stream, -) # from .._profile import ( # Profiler, # pg_profile_enabled, @@ -151,26 +142,6 @@ class Flume(Struct): async def receive(self) -> dict: return await self.stream.receive() - @acm - async def index_stream( - self, - delay_s: float = 1, - - ) -> AsyncIterator[int]: - - if not self.feed: - raise RuntimeError('This flume is not part of any ``Feed``?') - - # TODO: maybe a public (property) API for this in ``tractor``? - portal = self.stream._ctx._portal - assert portal - - # XXX: this should be singleton on a host, - # a lone broker-daemon per provider should be - # created for all practical purposes - async with open_sample_stream(float(delay_s)) as stream: - yield stream - def get_ds_info( self, ) -> tuple[float, float, float]: