Rework ohlc sampling to launch from .start()

Avoid bothering with a trio event and expect the caller to do manual shm
registering with the write loop. Provide OHLC sample period indexing
through a re-branded pub-sub func ``iter_ohlc_periods()``.
cached_feeds
Tyler Goodlet 2021-04-03 01:18:51 -04:00
parent a8a3f098cf
commit c05fc8991a
1 changed files with 79 additions and 43 deletions

View File

@ -17,17 +17,25 @@
""" """
Data buffers for fast shared humpy. Data buffers for fast shared humpy.
""" """
from typing import Tuple, Callable, Dict from typing import Dict, List
# import time
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus
from ._sharedmem import ShmArray from ._sharedmem import ShmArray
from ..log import get_logger
_shms: Dict[int, ShmArray] = {} log = get_logger(__name__)
# TODO: we could stick these in a composed type to avoid
# angering the "i hate module scoped variables crowd" (yawn).
_shms: Dict[int, List[ShmArray]] = {}
_start_increment: Dict[str, trio.Event] = {} _start_increment: Dict[str, trio.Event] = {}
_incrementers: Dict[int, trio.CancelScope] = {}
_subscribers: Dict[str, tractor.Context] = {}
def shm_incrementing(shm_token_name: str) -> trio.Event: def shm_incrementing(shm_token_name: str) -> trio.Event:
@ -35,11 +43,9 @@ def shm_incrementing(shm_token_name: str) -> trio.Event:
return _start_increment.setdefault(shm_token_name, trio.Event()) return _start_increment.setdefault(shm_token_name, trio.Event())
@tractor.msg.pub
async def increment_ohlc_buffer( async def increment_ohlc_buffer(
shm_token: dict, delay_s: int,
get_topics: Callable[..., Tuple[str]], task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
# delay_s: Optional[float] = None,
): ):
"""Task which inserts new bars into the provide shared memory array """Task which inserts new bars into the provide shared memory array
every ``delay_s`` seconds. every ``delay_s`` seconds.
@ -54,14 +60,16 @@ async def increment_ohlc_buffer(
the underlying buffers will actually be incremented. the underlying buffers will actually be incremented.
""" """
# wait for brokerd to signal we should start sampling # # wait for brokerd to signal we should start sampling
await shm_incrementing(shm_token['shm_name']).wait() # await shm_incrementing(shm_token['shm_name']).wait()
# TODO: right now we'll spin printing bars if the last time stamp is # TODO: right now we'll spin printing bars if the last time stamp is
# before a large period of no market activity. Likely the best way # before a large period of no market activity. Likely the best way
# to solve this is to make this task aware of the instrument's # to solve this is to make this task aware of the instrument's
# tradable hours? # tradable hours?
global _incrementers
# adjust delay to compensate for trio processing time # adjust delay to compensate for trio processing time
ad = min(_shms.keys()) - 0.001 ad = min(_shms.keys()) - 0.001
@ -69,47 +77,75 @@ async def increment_ohlc_buffer(
lowest = min(_shms.keys()) lowest = min(_shms.keys())
ad = lowest - 0.001 ad = lowest - 0.001
while True: with trio.CancelScope() as cs:
# TODO: do we want to support dynamically
# adding a "lower" lowest increment period?
await trio.sleep(ad)
total_s += lowest
# increment all subscribed shm arrays # register this time period step as active
# TODO: this in ``numba`` _incrementers[delay_s] = cs
for delay_s, shms in _shms.items(): task_status.started(cs)
if total_s % delay_s != 0:
continue
# TODO: numa this! while True:
for shm in shms: # TODO: do we want to support dynamically
# TODO: in theory we could make this faster by copying the # adding a "lower" lowest increment period?
# "last" readable value into the underlying larger buffer's await trio.sleep(ad)
# next value and then incrementing the counter instead of total_s += lowest
# using ``.push()``?
# append new entry to buffer thus "incrementing" the bar # increment all subscribed shm arrays
array = shm.array # TODO: this in ``numba``
last = array[-1:][shm._write_fields].copy() for delay_s, shms in _shms.items():
# (index, t, close) = last[0][['index', 'time', 'close']] if total_s % delay_s != 0:
(t, close) = last[0][['time', 'close']] continue
# this copies non-std fields (eg. vwap) from the last datum # TODO: numa this!
last[ for shm in shms:
['time', 'volume', 'open', 'high', 'low', 'close'] # TODO: in theory we could make this faster by copying the
][0] = (t + delay_s, 0, close, close, close, close) # "last" readable value into the underlying larger buffer's
# next value and then incrementing the counter instead of
# using ``.push()``?
# write to the buffer # append new entry to buffer thus "incrementing" the bar
shm.push(last) array = shm.array
last = array[-1:][shm._write_fields].copy()
# (index, t, close) = last[0][['index', 'time', 'close']]
(t, close) = last[0][['time', 'close']]
# broadcast the buffer index step # this copies non-std fields (eg. vwap) from the last datum
yield {'index': shm._last.value} last[
['time', 'volume', 'open', 'high', 'low', 'close']
][0] = (t + delay_s, 0, close, close, close, close)
# write to the buffer
shm.push(last)
# broadcast the buffer index step
# yield {'index': shm._last.value}
for ctx in _subscribers.get(delay_s, ()):
try:
await ctx.send_yield({'index': shm._last.value})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(f'{ctx.chan.uid} dropped connection')
def subscribe_ohlc_for_increment( @tractor.stream
shm: ShmArray, async def iter_ohlc_periods(
delay: int, ctx: tractor.Context,
delay_s: int,
) -> None: ) -> None:
"""Add an OHLC ``ShmArray`` to the increment set.
""" """
_shms.setdefault(delay, []).append(shm) Subscribe to OHLC sampling "step" events: when the time
aggregation period increments, this event stream emits an index
event.
"""
# add our subscription
global _subscribers
subs = _subscribers.setdefault(delay_s, [])
subs.append(ctx)
try:
# stream and block until cancelled
await trio.sleep_forever()
finally:
subs.remove(ctx)