Factor subscription broadcasting into a func
parent
d4e0d4463f
commit
7d8cf3eaf8
|
@ -22,7 +22,7 @@ financial data flows.
|
|||
from __future__ import annotations
|
||||
from collections import Counter
|
||||
import time
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
@ -90,6 +90,7 @@ async def increment_ohlc_buffer(
|
|||
|
||||
total_s = 0 # total seconds counted
|
||||
lowest = min(sampler.ohlcv_shms.keys())
|
||||
lowest_shm = sampler.ohlcv_shms[lowest][0]
|
||||
ad = lowest - 0.001
|
||||
|
||||
with trio.CancelScope() as cs:
|
||||
|
@ -133,21 +134,33 @@ async def increment_ohlc_buffer(
|
|||
# write to the buffer
|
||||
shm.push(last)
|
||||
|
||||
# broadcast the buffer index step to any subscribers for
|
||||
# a given sample period.
|
||||
subs = sampler.subscribers.get(delay_s, ())
|
||||
await broadcast(delay_s, shm=lowest_shm)
|
||||
|
||||
for stream in subs:
|
||||
try:
|
||||
await stream.send({'index': shm._last.value})
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
):
|
||||
log.error(
|
||||
f'{stream._ctx.chan.uid} dropped connection'
|
||||
)
|
||||
subs.remove(stream)
|
||||
|
||||
async def broadcast(
|
||||
delay_s: int,
|
||||
shm: Optional[ShmArray] = None,
|
||||
|
||||
) -> None:
|
||||
# broadcast the buffer index step to any subscribers for
|
||||
# a given sample period.
|
||||
subs = sampler.subscribers.get(delay_s, ())
|
||||
|
||||
if shm is None:
|
||||
lowest = min(sampler.ohlcv_shms.keys())
|
||||
shm = sampler.ohlcv_shms[lowest][0]
|
||||
|
||||
for stream in subs:
|
||||
try:
|
||||
await stream.send({'index': shm._last.value})
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
):
|
||||
log.error(
|
||||
f'{stream._ctx.chan.uid} dropped connection'
|
||||
)
|
||||
subs.remove(stream)
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
|
Loading…
Reference in New Issue