Always transmit index event even when no shm is registered

async_hist_loading
Tyler Goodlet 2022-02-28 06:56:25 -05:00
parent 786ffde4e6
commit 7a943f0e1e
1 changed files with 28 additions and 24 deletions

View File

@ -20,7 +20,6 @@ financial data flows.
""" """
import time import time
from typing import Dict, List
import tractor import tractor
import trio import trio
@ -35,15 +34,17 @@ log = get_logger(__name__)
# TODO: we could stick these in a composed type to avoid # TODO: we could stick these in a composed type to avoid
# angering the "i hate module scoped variables crowd" (yawn). # angering the "i hate module scoped variables crowd" (yawn).
_shms: Dict[int, List[ShmArray]] = {} _ohlcv_shms: dict[int, list[ShmArray]] = {}
_start_increment: Dict[str, trio.Event] = {}
_incrementers: Dict[int, trio.CancelScope] = {}
_subscribers: Dict[str, tractor.Context] = {}
# holds one-task-per-sample-period tasks which are spawned as-needed by
# data feed requests with a given detected time step usually from
# history loading.
_incrementers: dict[int, trio.CancelScope] = {}
def shm_incrementing(shm_token_name: str) -> trio.Event: # holds all the ``tractor.Context`` remote subscriptions for
global _start_increment # a particular sample period increment event: all subscribers are
return _start_increment.setdefault(shm_token_name, trio.Event()) # notified on a step.
_subscribers: dict[int, tractor.Context] = {}
async def increment_ohlc_buffer( async def increment_ohlc_buffer(
@ -72,13 +73,13 @@ async def increment_ohlc_buffer(
# to solve this is to make this task aware of the instrument's # to solve this is to make this task aware of the instrument's
# tradable hours? # tradable hours?
global _incrementers global _incrementers, _ohlcv_shms, _subscribers
# adjust delay to compensate for trio processing time # adjust delay to compensate for trio processing time
ad = min(_shms.keys()) - 0.001 ad = min(_ohlcv_shms.keys()) - 0.001
total_s = 0 # total seconds counted total_s = 0 # total seconds counted
lowest = min(_shms.keys()) lowest = min(_ohlcv_shms.keys())
ad = lowest - 0.001 ad = lowest - 0.001
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
@ -94,8 +95,10 @@ async def increment_ohlc_buffer(
total_s += lowest total_s += lowest
# increment all subscribed shm arrays # increment all subscribed shm arrays
# TODO: this in ``numba`` # TODO:
for delay_s, shms in _shms.items(): # - this in ``numba``
# - just lookup shms for this step instead of iterating?
for delay_s, shms in _ohlcv_shms.items():
if total_s % delay_s != 0: if total_s % delay_s != 0:
continue continue
@ -120,18 +123,19 @@ async def increment_ohlc_buffer(
# write to the buffer # write to the buffer
shm.push(last) shm.push(last)
# broadcast the buffer index step # broadcast the buffer index step to any subscribers for
subs = _subscribers.get(delay_s, ()) # a given sample period.
subs = _subscribers.get(delay_s, ())
for ctx in subs: for ctx in subs:
try: try:
await ctx.send_yield({'index': shm._last.value}) await ctx.send_yield({'index': shm._last.value})
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError
): ):
log.error(f'{ctx.chan.uid} dropped connection') log.error(f'{ctx.chan.uid} dropped connection')
subs.remove(ctx) subs.remove(ctx)
@tractor.stream @tractor.stream