Drop `Flume.index_stream()`, `._sampling.open_sample_stream()` replaces it
							parent
							
								
									3e17e52555
								
							
						
					
					
						commit
						63f0567418
					
				| 
						 | 
				
			
			@ -22,17 +22,11 @@ real-time data processing data-structures.
 | 
			
		|||
 | 
			
		||||
"""
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
from contextlib import asynccontextmanager as acm
 | 
			
		||||
from functools import partial
 | 
			
		||||
from typing import (
 | 
			
		||||
    AsyncIterator,
 | 
			
		||||
    TYPE_CHECKING,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor.trionics import (
 | 
			
		||||
    maybe_open_context,
 | 
			
		||||
)
 | 
			
		||||
import pendulum
 | 
			
		||||
import numpy as np
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -45,9 +39,6 @@ from ._sharedmem import (
 | 
			
		|||
    ShmArray,
 | 
			
		||||
    _Token,
 | 
			
		||||
)
 | 
			
		||||
from ._sampling import (
 | 
			
		||||
    open_sample_stream,
 | 
			
		||||
)
 | 
			
		||||
# from .._profile import (
 | 
			
		||||
#     Profiler,
 | 
			
		||||
#     pg_profile_enabled,
 | 
			
		||||
| 
						 | 
				
			
			@ -151,26 +142,6 @@ class Flume(Struct):
 | 
			
		|||
    async def receive(self) -> dict:
 | 
			
		||||
        return await self.stream.receive()
 | 
			
		||||
 | 
			
		||||
    @acm
 | 
			
		||||
    async def index_stream(
 | 
			
		||||
        self,
 | 
			
		||||
        delay_s: float = 1,
 | 
			
		||||
 | 
			
		||||
    ) -> AsyncIterator[int]:
 | 
			
		||||
 | 
			
		||||
        if not self.feed:
 | 
			
		||||
            raise RuntimeError('This flume is not part of any ``Feed``?')
 | 
			
		||||
 | 
			
		||||
        # TODO: maybe a public (property) API for this in ``tractor``?
 | 
			
		||||
        portal = self.stream._ctx._portal
 | 
			
		||||
        assert portal
 | 
			
		||||
 | 
			
		||||
        # XXX: this should be singleton on a host,
 | 
			
		||||
        # a lone broker-daemon per provider should be
 | 
			
		||||
        # created for all practical purposes
 | 
			
		||||
        async with open_sample_stream(float(delay_s)) as stream:
 | 
			
		||||
            yield stream
 | 
			
		||||
 | 
			
		||||
    def get_ds_info(
 | 
			
		||||
        self,
 | 
			
		||||
    ) -> tuple[float, float, float]:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue