Drop `Flume.index_stream()`, `._sampling.open_sample_stream()` replaces it
parent
ba8284e73b
commit
7208b3a2d2
|
@ -22,17 +22,11 @@ real-time data processing data-structures.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from contextlib import asynccontextmanager as acm
|
|
||||||
from functools import partial
|
|
||||||
from typing import (
|
from typing import (
|
||||||
AsyncIterator,
|
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.trionics import (
|
|
||||||
maybe_open_context,
|
|
||||||
)
|
|
||||||
import pendulum
|
import pendulum
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
@ -45,9 +39,6 @@ from ._sharedmem import (
|
||||||
ShmArray,
|
ShmArray,
|
||||||
_Token,
|
_Token,
|
||||||
)
|
)
|
||||||
from ._sampling import (
|
|
||||||
open_sample_stream,
|
|
||||||
)
|
|
||||||
# from .._profile import (
|
# from .._profile import (
|
||||||
# Profiler,
|
# Profiler,
|
||||||
# pg_profile_enabled,
|
# pg_profile_enabled,
|
||||||
|
@ -151,26 +142,6 @@ class Flume(Struct):
|
||||||
async def receive(self) -> dict:
|
async def receive(self) -> dict:
|
||||||
return await self.stream.receive()
|
return await self.stream.receive()
|
||||||
|
|
||||||
@acm
|
|
||||||
async def index_stream(
|
|
||||||
self,
|
|
||||||
delay_s: float = 1,
|
|
||||||
|
|
||||||
) -> AsyncIterator[int]:
|
|
||||||
|
|
||||||
if not self.feed:
|
|
||||||
raise RuntimeError('This flume is not part of any ``Feed``?')
|
|
||||||
|
|
||||||
# TODO: maybe a public (property) API for this in ``tractor``?
|
|
||||||
portal = self.stream._ctx._portal
|
|
||||||
assert portal
|
|
||||||
|
|
||||||
# XXX: this should be singleton on a host,
|
|
||||||
# a lone broker-daemon per provider should be
|
|
||||||
# created for all practical purposes
|
|
||||||
async with open_sample_stream(float(delay_s)) as stream:
|
|
||||||
yield stream
|
|
||||||
|
|
||||||
def get_ds_info(
|
def get_ds_info(
|
||||||
self,
|
self,
|
||||||
) -> tuple[float, float, float]:
|
) -> tuple[float, float, float]:
|
||||||
|
|
Loading…
Reference in New Issue