Always load FSPs with the default (fast) sampling period
parent
861fe791eb
commit
f79c3617d6
|
@ -37,6 +37,7 @@ from .. import data
|
|||
from ..data import attach_shm_array
|
||||
from ..data.feed import Feed
|
||||
from ..data._sharedmem import ShmArray
|
||||
from ..data._sampling import _default_delay_s
|
||||
from ..data._source import Symbol
|
||||
from ._api import (
|
||||
Fsp,
|
||||
|
@ -105,7 +106,7 @@ async def fsp_compute(
|
|||
filter_quotes_by_sym(fqsn, quote_stream),
|
||||
|
||||
# XXX: currently the ``ohlcv`` arg
|
||||
feed.shm,
|
||||
feed.rt_shm,
|
||||
)
|
||||
|
||||
# Conduct a single iteration of fsp with historical bars input
|
||||
|
@ -313,7 +314,7 @@ async def cascade(
|
|||
|
||||
profiler(f'{func}: feed up')
|
||||
|
||||
assert src.token == feed.shm.token
|
||||
assert src.token == feed.rt_shm.token
|
||||
# last_len = new_len = len(src.array)
|
||||
|
||||
func_name = func.__name__
|
||||
|
@ -420,7 +421,11 @@ async def cascade(
|
|||
# detect sample period step for subscription to increment
|
||||
# signal
|
||||
times = src.array['time']
|
||||
delay_s = times[-1] - times[times != times[-1]][-1]
|
||||
if len(times) > 1:
|
||||
delay_s = times[-1] - times[times != times[-1]][-1]
|
||||
else:
|
||||
# our default "HFT" sample rate.
|
||||
delay_s = _default_delay_s
|
||||
|
||||
# Increment the underlying shared memory buffer on every
|
||||
# "increment" msg received from the underlying data feed.
|
||||
|
@ -431,7 +436,8 @@ async def cascade(
|
|||
profiler(f'{func_name}: sample stream up')
|
||||
profiler.finish()
|
||||
|
||||
async for _ in istream:
|
||||
async for i in istream:
|
||||
# log.runtime(f'FSP incrementing {i}')
|
||||
|
||||
# respawn the compute task if the source
|
||||
# array has been updated such that we compute
|
||||
|
|
Loading…
Reference in New Issue