Use `open_sample_stream()` to increment fsp buffers

epoch_index_backup
Tyler Goodlet 2023-01-04 23:12:42 -05:00
parent 3328822e44
commit c0f1a29bfd
1 changed files with 10 additions and 9 deletions

View File

@ -39,7 +39,10 @@ from ..data.feed import (
Feed,
)
from ..data._sharedmem import ShmArray
from ..data._sampling import _default_delay_s
from ..data._sampling import (
_default_delay_s,
open_sample_stream,
)
from ..data._source import Symbol
from ._api import (
Fsp,
@ -336,7 +339,6 @@ async def cascade(
symbol = flume.symbol
assert src.token == flume.rt_shm.token
profiler(f'{func}: feed up')
# last_len = new_len = len(src.array)
func_name = func.__name__
async with (
@ -442,22 +444,21 @@ async def cascade(
# signal
times = src.array['time']
if len(times) > 1:
delay_s = times[-1] - times[times != times[-1]][-1]
delay_s = float(times[-1] - times[times != times[-1]][-1])
else:
# our default "HFT" sample rate.
delay_s = _default_delay_s
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with flume.index_stream(
int(delay_s)
) as istream:
# sub and increment the underlying shared memory buffer
# on every step msg received from the global `samplerd`
# service.
async with open_sample_stream(float(delay_s)) as istream:
profiler(f'{func_name}: sample stream up')
profiler.finish()
async for i in istream:
# log.runtime(f'FSP incrementing {i}')
print(f'FSP incrementing {i}')
# respawn the compute task if the source
# array has been updated such that we compute