Factor subscription broadcasting into a func

m4_corrections
Tyler Goodlet 2022-04-16 18:33:26 -04:00
parent 5c84a5f8b4
commit 25be7f8d08
1 changed files with 28 additions and 15 deletions

View File

@ -22,7 +22,7 @@ financial data flows.
from __future__ import annotations
from collections import Counter
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
import tractor
import trio
@ -90,6 +90,7 @@ async def increment_ohlc_buffer(
total_s = 0 # total seconds counted
lowest = min(sampler.ohlcv_shms.keys())
lowest_shm = sampler.ohlcv_shms[lowest][0]
ad = lowest - 0.001
with trio.CancelScope() as cs:
@ -133,21 +134,33 @@ async def increment_ohlc_buffer(
# write to the buffer
shm.push(last)
# broadcast the buffer index step to any subscribers for
# a given sample period.
subs = sampler.subscribers.get(delay_s, ())
await broadcast(delay_s, shm=lowest_shm)
for stream in subs:
try:
await stream.send({'index': shm._last.value})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
subs.remove(stream)
async def broadcast(
delay_s: int,
shm: Optional[ShmArray] = None,
) -> None:
# broadcast the buffer index step to any subscribers for
# a given sample period.
subs = sampler.subscribers.get(delay_s, ())
if shm is None:
lowest = min(sampler.ohlcv_shms.keys())
shm = sampler.ohlcv_shms[lowest][0]
for stream in subs:
try:
await stream.send({'index': shm._last.value})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
subs.remove(stream)
@tractor.context