Presume shortest delay input to `increment_ohlc_buffer()`
Instead of worrying about the increment period per shm subscription, just use the value passed as input and presume the caller knows that only one task is necessary and that the wakeup (sampling) period should be the shortest that is needed. It's very unlikely we don't want at least a 1s sampling (both in terms of task switching cost and general usage) which will eventually ship as the default "real-time" feed "timeframe". Further, this "fast" increment sampling task can handle all lower sampling periods (eg. 1m, 5m, 1H) based on the current implementation just the same. Also, add a global default sample period as `_defaul_delay_s` for use in other internal modules.history_view
parent
4d2708cd42
commit
60052ff73a
|
@ -37,6 +37,9 @@ if TYPE_CHECKING:
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
_default_delay_s: float = 1.0
|
||||||
|
|
||||||
|
|
||||||
class sampler:
|
class sampler:
|
||||||
'''
|
'''
|
||||||
Global sampling engine registry.
|
Global sampling engine registry.
|
||||||
|
@ -104,14 +107,18 @@ async def increment_ohlc_buffer(
|
||||||
# TODO: do we want to support dynamically
|
# TODO: do we want to support dynamically
|
||||||
# adding a "lower" lowest increment period?
|
# adding a "lower" lowest increment period?
|
||||||
await trio.sleep(ad)
|
await trio.sleep(ad)
|
||||||
total_s += lowest
|
total_s += delay_s
|
||||||
|
|
||||||
# increment all subscribed shm arrays
|
# increment all subscribed shm arrays
|
||||||
# TODO:
|
# TODO:
|
||||||
# - this in ``numba``
|
# - this in ``numba``
|
||||||
# - just lookup shms for this step instead of iterating?
|
# - just lookup shms for this step instead of iterating?
|
||||||
for delay_s, shms in sampler.ohlcv_shms.items():
|
for this_delay_s, shms in sampler.ohlcv_shms.items():
|
||||||
if total_s % delay_s != 0:
|
|
||||||
|
# short-circuit on any not-ready because slower sample
|
||||||
|
# rate consuming shm buffers.
|
||||||
|
if total_s % this_delay_s != 0:
|
||||||
|
# print(f'skipping `{this_delay_s}s` sample update')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# TODO: ``numba`` this!
|
# TODO: ``numba`` this!
|
||||||
|
@ -152,7 +159,6 @@ async def broadcast(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
subs = sampler.subscribers.get(delay_s, ())
|
subs = sampler.subscribers.get(delay_s, ())
|
||||||
|
|
||||||
first = last = -1
|
first = last = -1
|
||||||
|
|
||||||
if shm is None:
|
if shm is None:
|
||||||
|
|
Loading…
Reference in New Issue