Move `Flume` to a new `.data._flumes` module
parent
8793b76ee2
commit
13e86fbe30
|
@ -0,0 +1,304 @@
|
||||||
|
# piker: trading gear for hackers
|
||||||
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Real-time data flow abstractions.
|
||||||
|
|
||||||
|
<writeup on "buffers, streams, flumes and flows..">
|
||||||
|
|
||||||
|
"""
|
||||||
|
from contextlib import asynccontextmanager as acm
|
||||||
|
from functools import partial
|
||||||
|
from typing import (
|
||||||
|
AsyncIterator,
|
||||||
|
TYPE_CHECKING,
|
||||||
|
)
|
||||||
|
|
||||||
|
import tractor
|
||||||
|
from tractor.trionics import (
|
||||||
|
maybe_open_context,
|
||||||
|
)
|
||||||
|
import pendulum
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from .types import Struct
|
||||||
|
from ._source import (
|
||||||
|
Symbol,
|
||||||
|
)
|
||||||
|
from ._sharedmem import (
|
||||||
|
attach_shm_array,
|
||||||
|
ShmArray,
|
||||||
|
_Token,
|
||||||
|
)
|
||||||
|
from ._sampling import (
|
||||||
|
iter_ohlc_periods,
|
||||||
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from pyqtgraph import PlotItem
|
||||||
|
from .feed import Feed
|
||||||
|
|
||||||
|
|
||||||
|
class Flume(Struct):
|
||||||
|
'''
|
||||||
|
Composite reference type which points to all the addressing handles
|
||||||
|
and other meta-data necessary for the read, measure and management
|
||||||
|
of a set of real-time updated data flows.
|
||||||
|
|
||||||
|
Can be thought of as a "flow descriptor" or "flow frame" which
|
||||||
|
describes the high level properties of a set of data flows that can
|
||||||
|
be used seamlessly across process-memory boundaries.
|
||||||
|
|
||||||
|
Each instance's sub-components normally includes:
|
||||||
|
- a msg oriented quote stream provided via an IPC transport
|
||||||
|
- history and real-time shm buffers which are both real-time
|
||||||
|
updated and backfilled.
|
||||||
|
- associated startup indexing information related to both buffer
|
||||||
|
real-time-append and historical prepend addresses.
|
||||||
|
- low level APIs to read and measure the updated data and manage
|
||||||
|
queuing properties.
|
||||||
|
|
||||||
|
'''
|
||||||
|
symbol: Symbol
|
||||||
|
first_quote: dict
|
||||||
|
_hist_shm_token: _Token
|
||||||
|
_rt_shm_token: _Token
|
||||||
|
|
||||||
|
# private shm refs loaded dynamically from tokens
|
||||||
|
_hist_shm: ShmArray | None = None
|
||||||
|
_rt_shm: ShmArray | None = None
|
||||||
|
|
||||||
|
stream: tractor.MsgStream | None = None
|
||||||
|
izero_hist: int = 0
|
||||||
|
izero_rt: int = 0
|
||||||
|
throttle_rate: int | None = None
|
||||||
|
|
||||||
|
# TODO: do we need this really if we can pull the `Portal` from
|
||||||
|
# ``tractor``'s internals?
|
||||||
|
feed: Feed | None = None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def rt_shm(self) -> ShmArray:
|
||||||
|
|
||||||
|
if self._rt_shm is None:
|
||||||
|
self._rt_shm = attach_shm_array(
|
||||||
|
token=self._rt_shm_token,
|
||||||
|
readonly=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
return self._rt_shm
|
||||||
|
|
||||||
|
@property
|
||||||
|
def hist_shm(self) -> ShmArray:
|
||||||
|
|
||||||
|
if self._hist_shm is None:
|
||||||
|
self._hist_shm = attach_shm_array(
|
||||||
|
token=self._hist_shm_token,
|
||||||
|
readonly=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
return self._hist_shm
|
||||||
|
|
||||||
|
async def receive(self) -> dict:
|
||||||
|
return await self.stream.receive()
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def index_stream(
|
||||||
|
self,
|
||||||
|
delay_s: int = 1,
|
||||||
|
|
||||||
|
) -> AsyncIterator[int]:
|
||||||
|
|
||||||
|
if not self.feed:
|
||||||
|
raise RuntimeError('This flume is not part of any ``Feed``?')
|
||||||
|
|
||||||
|
# TODO: maybe a public (property) API for this in ``tractor``?
|
||||||
|
portal = self.stream._ctx._portal
|
||||||
|
assert portal
|
||||||
|
|
||||||
|
# XXX: this should be singleton on a host,
|
||||||
|
# a lone broker-daemon per provider should be
|
||||||
|
# created for all practical purposes
|
||||||
|
async with maybe_open_context(
|
||||||
|
acm_func=partial(
|
||||||
|
portal.open_context,
|
||||||
|
iter_ohlc_periods,
|
||||||
|
),
|
||||||
|
kwargs={'delay_s': delay_s},
|
||||||
|
) as (cache_hit, (ctx, first)):
|
||||||
|
async with ctx.open_stream() as istream:
|
||||||
|
if cache_hit:
|
||||||
|
# add a new broadcast subscription for the quote stream
|
||||||
|
# if this feed is likely already in use
|
||||||
|
async with istream.subscribe() as bistream:
|
||||||
|
yield bistream
|
||||||
|
else:
|
||||||
|
yield istream
|
||||||
|
|
||||||
|
def get_ds_info(
|
||||||
|
self,
|
||||||
|
) -> tuple[float, float, float]:
|
||||||
|
'''
|
||||||
|
Compute the "downsampling" ratio info between the historical shm
|
||||||
|
buffer and the real-time (HFT) one.
|
||||||
|
|
||||||
|
Return a tuple of the fast sample period, historical sample
|
||||||
|
period and ratio between them.
|
||||||
|
|
||||||
|
'''
|
||||||
|
times = self.hist_shm.array['time']
|
||||||
|
end = pendulum.from_timestamp(times[-1])
|
||||||
|
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
||||||
|
hist_step_size_s = (end - start).seconds
|
||||||
|
|
||||||
|
times = self.rt_shm.array['time']
|
||||||
|
end = pendulum.from_timestamp(times[-1])
|
||||||
|
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
||||||
|
rt_step_size_s = (end - start).seconds
|
||||||
|
|
||||||
|
ratio = hist_step_size_s / rt_step_size_s
|
||||||
|
return (
|
||||||
|
rt_step_size_s,
|
||||||
|
hist_step_size_s,
|
||||||
|
ratio,
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: get native msgspec decoding for these workinn
|
||||||
|
def to_msg(self) -> dict:
|
||||||
|
msg = self.to_dict()
|
||||||
|
msg['symbol'] = msg['symbol'].to_dict()
|
||||||
|
|
||||||
|
# can't serialize the stream or feed objects, it's expected
|
||||||
|
# you'll have a ref to it since this msg should be rxed on
|
||||||
|
# a stream on whatever far end IPC..
|
||||||
|
msg.pop('stream')
|
||||||
|
msg.pop('feed')
|
||||||
|
return msg
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_msg(cls, msg: dict) -> dict:
|
||||||
|
symbol = Symbol(**msg.pop('symbol'))
|
||||||
|
return cls(
|
||||||
|
symbol=symbol,
|
||||||
|
**msg,
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_index(
|
||||||
|
self,
|
||||||
|
time_s: float,
|
||||||
|
|
||||||
|
) -> int:
|
||||||
|
'''
|
||||||
|
Return array shm-buffer index for for epoch time.
|
||||||
|
|
||||||
|
'''
|
||||||
|
array = self.rt_shm.array
|
||||||
|
times = array['time']
|
||||||
|
mask = (times >= time_s)
|
||||||
|
|
||||||
|
if any(mask):
|
||||||
|
return array['index'][mask][0]
|
||||||
|
|
||||||
|
# just the latest index
|
||||||
|
array['index'][-1]
|
||||||
|
|
||||||
|
def slice_from_time(
|
||||||
|
self,
|
||||||
|
array: np.ndarray,
|
||||||
|
start_t: float,
|
||||||
|
stop_t: float,
|
||||||
|
timeframe_s: int = 1,
|
||||||
|
return_data: bool = False,
|
||||||
|
|
||||||
|
) -> np.ndarray:
|
||||||
|
'''
|
||||||
|
Slice an input struct array providing only datums
|
||||||
|
"in view" of this chart.
|
||||||
|
|
||||||
|
'''
|
||||||
|
arr = {
|
||||||
|
1: self.rt_shm.array,
|
||||||
|
60: self.hist_shm.arry,
|
||||||
|
}[timeframe_s]
|
||||||
|
|
||||||
|
times = arr['time']
|
||||||
|
index = array['index']
|
||||||
|
|
||||||
|
# use advanced indexing to map the
|
||||||
|
# time range to the index range.
|
||||||
|
mask = (
|
||||||
|
(times >= start_t)
|
||||||
|
&
|
||||||
|
(times < stop_t)
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: if we can ensure each time field has a uniform
|
||||||
|
# step we can instead do some arithmetic to determine
|
||||||
|
# the equivalent index like we used to?
|
||||||
|
# return array[
|
||||||
|
# lbar - ifirst:
|
||||||
|
# (rbar - ifirst) + 1
|
||||||
|
# ]
|
||||||
|
|
||||||
|
i_by_t = index[mask]
|
||||||
|
i_0 = i_by_t[0]
|
||||||
|
|
||||||
|
abs_slc = slice(
|
||||||
|
i_0,
|
||||||
|
i_by_t[-1],
|
||||||
|
)
|
||||||
|
# slice data by offset from the first index
|
||||||
|
# available in the passed datum set.
|
||||||
|
read_slc = slice(
|
||||||
|
0,
|
||||||
|
i_by_t[-1] - i_0,
|
||||||
|
)
|
||||||
|
if not return_data:
|
||||||
|
return (
|
||||||
|
abs_slc,
|
||||||
|
read_slc,
|
||||||
|
)
|
||||||
|
|
||||||
|
# also return the readable data from the timerange
|
||||||
|
return (
|
||||||
|
abs_slc,
|
||||||
|
read_slc,
|
||||||
|
arr[mask],
|
||||||
|
)
|
||||||
|
|
||||||
|
def view_data(
|
||||||
|
self,
|
||||||
|
plot: PlotItem,
|
||||||
|
timeframe_s: int = 1,
|
||||||
|
|
||||||
|
) -> np.ndarray:
|
||||||
|
|
||||||
|
# get far-side x-indices plot view
|
||||||
|
vr = plot.viewRect()
|
||||||
|
l = vr.left()
|
||||||
|
r = vr.right()
|
||||||
|
|
||||||
|
(
|
||||||
|
abs_slc,
|
||||||
|
buf_slc,
|
||||||
|
iv_arr,
|
||||||
|
) = self.slice_from_time(
|
||||||
|
start_t=l,
|
||||||
|
stop_t=r,
|
||||||
|
timeframe_s=timeframe_s,
|
||||||
|
return_data=True,
|
||||||
|
)
|
||||||
|
return iv_arr
|
|
@ -29,7 +29,6 @@ import time
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncIterator,
|
|
||||||
AsyncContextManager,
|
AsyncContextManager,
|
||||||
Callable,
|
Callable,
|
||||||
Optional,
|
Optional,
|
||||||
|
@ -57,11 +56,10 @@ from .._daemon import (
|
||||||
maybe_spawn_brokerd,
|
maybe_spawn_brokerd,
|
||||||
check_for_service,
|
check_for_service,
|
||||||
)
|
)
|
||||||
|
from ._flumes import Flume
|
||||||
from ._sharedmem import (
|
from ._sharedmem import (
|
||||||
maybe_open_shm_array,
|
maybe_open_shm_array,
|
||||||
attach_shm_array,
|
|
||||||
ShmArray,
|
ShmArray,
|
||||||
_Token,
|
|
||||||
_secs_in_day,
|
_secs_in_day,
|
||||||
)
|
)
|
||||||
from .ingest import get_ingestormod
|
from .ingest import get_ingestormod
|
||||||
|
@ -76,7 +74,6 @@ from ._sampling import (
|
||||||
sampler,
|
sampler,
|
||||||
broadcast,
|
broadcast,
|
||||||
increment_ohlc_buffer,
|
increment_ohlc_buffer,
|
||||||
iter_ohlc_periods,
|
|
||||||
sample_and_broadcast,
|
sample_and_broadcast,
|
||||||
uniform_rate_send,
|
uniform_rate_send,
|
||||||
_default_delay_s,
|
_default_delay_s,
|
||||||
|
@ -87,7 +84,6 @@ from ..brokers._util import (
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .marketstore import Storage
|
from .marketstore import Storage
|
||||||
from pyqtgraph import PlotItem
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -894,258 +890,6 @@ async def manage_history(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
class Flume(Struct):
|
|
||||||
'''
|
|
||||||
Composite reference type which points to all the addressing handles
|
|
||||||
and other meta-data necessary for the read, measure and management
|
|
||||||
of a set of real-time updated data flows.
|
|
||||||
|
|
||||||
Can be thought of as a "flow descriptor" or "flow frame" which
|
|
||||||
describes the high level properties of a set of data flows that can
|
|
||||||
be used seamlessly across process-memory boundaries.
|
|
||||||
|
|
||||||
Each instance's sub-components normally includes:
|
|
||||||
- a msg oriented quote stream provided via an IPC transport
|
|
||||||
- history and real-time shm buffers which are both real-time
|
|
||||||
updated and backfilled.
|
|
||||||
- associated startup indexing information related to both buffer
|
|
||||||
real-time-append and historical prepend addresses.
|
|
||||||
- low level APIs to read and measure the updated data and manage
|
|
||||||
queuing properties.
|
|
||||||
|
|
||||||
'''
|
|
||||||
symbol: Symbol
|
|
||||||
first_quote: dict
|
|
||||||
_hist_shm_token: _Token
|
|
||||||
_rt_shm_token: _Token
|
|
||||||
|
|
||||||
# private shm refs loaded dynamically from tokens
|
|
||||||
_hist_shm: ShmArray | None = None
|
|
||||||
_rt_shm: ShmArray | None = None
|
|
||||||
|
|
||||||
stream: tractor.MsgStream | None = None
|
|
||||||
izero_hist: int = 0
|
|
||||||
izero_rt: int = 0
|
|
||||||
throttle_rate: int | None = None
|
|
||||||
|
|
||||||
# TODO: do we need this really if we can pull the `Portal` from
|
|
||||||
# ``tractor``'s internals?
|
|
||||||
feed: Feed | None = None
|
|
||||||
|
|
||||||
@property
|
|
||||||
def rt_shm(self) -> ShmArray:
|
|
||||||
|
|
||||||
if self._rt_shm is None:
|
|
||||||
self._rt_shm = attach_shm_array(
|
|
||||||
token=self._rt_shm_token,
|
|
||||||
readonly=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
return self._rt_shm
|
|
||||||
|
|
||||||
@property
|
|
||||||
def hist_shm(self) -> ShmArray:
|
|
||||||
|
|
||||||
if self._hist_shm is None:
|
|
||||||
self._hist_shm = attach_shm_array(
|
|
||||||
token=self._hist_shm_token,
|
|
||||||
readonly=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
return self._hist_shm
|
|
||||||
|
|
||||||
async def receive(self) -> dict:
|
|
||||||
return await self.stream.receive()
|
|
||||||
|
|
||||||
@acm
|
|
||||||
async def index_stream(
|
|
||||||
self,
|
|
||||||
delay_s: int = 1,
|
|
||||||
|
|
||||||
) -> AsyncIterator[int]:
|
|
||||||
|
|
||||||
if not self.feed:
|
|
||||||
raise RuntimeError('This flume is not part of any ``Feed``?')
|
|
||||||
|
|
||||||
# TODO: maybe a public (property) API for this in ``tractor``?
|
|
||||||
portal = self.stream._ctx._portal
|
|
||||||
assert portal
|
|
||||||
|
|
||||||
# XXX: this should be singleton on a host,
|
|
||||||
# a lone broker-daemon per provider should be
|
|
||||||
# created for all practical purposes
|
|
||||||
async with maybe_open_context(
|
|
||||||
acm_func=partial(
|
|
||||||
portal.open_context,
|
|
||||||
iter_ohlc_periods,
|
|
||||||
),
|
|
||||||
kwargs={'delay_s': delay_s},
|
|
||||||
) as (cache_hit, (ctx, first)):
|
|
||||||
async with ctx.open_stream() as istream:
|
|
||||||
if cache_hit:
|
|
||||||
# add a new broadcast subscription for the quote stream
|
|
||||||
# if this feed is likely already in use
|
|
||||||
async with istream.subscribe() as bistream:
|
|
||||||
yield bistream
|
|
||||||
else:
|
|
||||||
yield istream
|
|
||||||
|
|
||||||
def get_ds_info(
|
|
||||||
self,
|
|
||||||
) -> tuple[float, float, float]:
|
|
||||||
'''
|
|
||||||
Compute the "downsampling" ratio info between the historical shm
|
|
||||||
buffer and the real-time (HFT) one.
|
|
||||||
|
|
||||||
Return a tuple of the fast sample period, historical sample
|
|
||||||
period and ratio between them.
|
|
||||||
|
|
||||||
'''
|
|
||||||
times = self.hist_shm.array['time']
|
|
||||||
end = pendulum.from_timestamp(times[-1])
|
|
||||||
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
|
||||||
hist_step_size_s = (end - start).seconds
|
|
||||||
|
|
||||||
times = self.rt_shm.array['time']
|
|
||||||
end = pendulum.from_timestamp(times[-1])
|
|
||||||
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
|
||||||
rt_step_size_s = (end - start).seconds
|
|
||||||
|
|
||||||
ratio = hist_step_size_s / rt_step_size_s
|
|
||||||
return (
|
|
||||||
rt_step_size_s,
|
|
||||||
hist_step_size_s,
|
|
||||||
ratio,
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: get native msgspec decoding for these workinn
|
|
||||||
def to_msg(self) -> dict:
|
|
||||||
msg = self.to_dict()
|
|
||||||
msg['symbol'] = msg['symbol'].to_dict()
|
|
||||||
|
|
||||||
# can't serialize the stream or feed objects, it's expected
|
|
||||||
# you'll have a ref to it since this msg should be rxed on
|
|
||||||
# a stream on whatever far end IPC..
|
|
||||||
msg.pop('stream')
|
|
||||||
msg.pop('feed')
|
|
||||||
return msg
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_msg(cls, msg: dict) -> dict:
|
|
||||||
symbol = Symbol(**msg.pop('symbol'))
|
|
||||||
return cls(
|
|
||||||
symbol=symbol,
|
|
||||||
**msg,
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_index(
|
|
||||||
self,
|
|
||||||
time_s: float,
|
|
||||||
|
|
||||||
) -> int:
|
|
||||||
'''
|
|
||||||
Return array shm-buffer index for for epoch time.
|
|
||||||
|
|
||||||
'''
|
|
||||||
array = self.rt_shm.array
|
|
||||||
times = array['time']
|
|
||||||
mask = (times >= time_s)
|
|
||||||
|
|
||||||
if any(mask):
|
|
||||||
return array['index'][mask][0]
|
|
||||||
|
|
||||||
# just the latest index
|
|
||||||
array['index'][-1]
|
|
||||||
|
|
||||||
def slice_from_time(
|
|
||||||
self,
|
|
||||||
array: np.ndarray,
|
|
||||||
start_t: float,
|
|
||||||
stop_t: float,
|
|
||||||
timeframe_s: int = 1,
|
|
||||||
return_data: bool = False,
|
|
||||||
|
|
||||||
) -> np.ndarray:
|
|
||||||
'''
|
|
||||||
Slice an input struct array providing only datums
|
|
||||||
"in view" of this chart.
|
|
||||||
|
|
||||||
'''
|
|
||||||
arr = {
|
|
||||||
1: self.rt_shm.array,
|
|
||||||
60: self.hist_shm.arry,
|
|
||||||
}[timeframe_s]
|
|
||||||
|
|
||||||
times = arr['time']
|
|
||||||
index = array['index']
|
|
||||||
|
|
||||||
# use advanced indexing to map the
|
|
||||||
# time range to the index range.
|
|
||||||
mask = (
|
|
||||||
(times >= start_t)
|
|
||||||
&
|
|
||||||
(times < stop_t)
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: if we can ensure each time field has a uniform
|
|
||||||
# step we can instead do some arithmetic to determine
|
|
||||||
# the equivalent index like we used to?
|
|
||||||
# return array[
|
|
||||||
# lbar - ifirst:
|
|
||||||
# (rbar - ifirst) + 1
|
|
||||||
# ]
|
|
||||||
|
|
||||||
i_by_t = index[mask]
|
|
||||||
i_0 = i_by_t[0]
|
|
||||||
|
|
||||||
abs_slc = slice(
|
|
||||||
i_0,
|
|
||||||
i_by_t[-1],
|
|
||||||
)
|
|
||||||
# slice data by offset from the first index
|
|
||||||
# available in the passed datum set.
|
|
||||||
read_slc = slice(
|
|
||||||
0,
|
|
||||||
i_by_t[-1] - i_0,
|
|
||||||
)
|
|
||||||
if not return_data:
|
|
||||||
return (
|
|
||||||
abs_slc,
|
|
||||||
read_slc,
|
|
||||||
)
|
|
||||||
|
|
||||||
# also return the readable data from the timerange
|
|
||||||
return (
|
|
||||||
abs_slc,
|
|
||||||
read_slc,
|
|
||||||
arr[mask],
|
|
||||||
)
|
|
||||||
|
|
||||||
def view_data(
|
|
||||||
self,
|
|
||||||
plot: PlotItem,
|
|
||||||
timeframe_s: int = 1,
|
|
||||||
|
|
||||||
) -> np.ndarray:
|
|
||||||
|
|
||||||
# get far-side x-indices plot view
|
|
||||||
vr = plot.viewRect()
|
|
||||||
l = vr.left()
|
|
||||||
r = vr.right()
|
|
||||||
|
|
||||||
(
|
|
||||||
abs_slc,
|
|
||||||
buf_slc,
|
|
||||||
iv_arr,
|
|
||||||
) = self.slice_from_time(
|
|
||||||
start_t=l,
|
|
||||||
stop_t=r,
|
|
||||||
timeframe_s=timeframe_s,
|
|
||||||
return_data=True,
|
|
||||||
)
|
|
||||||
return iv_arr
|
|
||||||
|
|
||||||
|
|
||||||
async def allocate_persistent_feed(
|
async def allocate_persistent_feed(
|
||||||
bus: _FeedsBus,
|
bus: _FeedsBus,
|
||||||
sub_registered: trio.Event,
|
sub_registered: trio.Event,
|
||||||
|
|
Loading…
Reference in New Issue