Remove dead OHLC index consumers from subs list on error
parent
f5beb22d6e
commit
d0ad5e43f9
|
@ -118,8 +118,9 @@ async def increment_ohlc_buffer(
|
||||||
shm.push(last)
|
shm.push(last)
|
||||||
|
|
||||||
# broadcast the buffer index step
|
# broadcast the buffer index step
|
||||||
# yield {'index': shm._last.value}
|
subs = _subscribers.get(delay_s, ())
|
||||||
for ctx in _subscribers.get(delay_s, ()):
|
|
||||||
|
for ctx in subs:
|
||||||
try:
|
try:
|
||||||
await ctx.send_yield({'index': shm._last.value})
|
await ctx.send_yield({'index': shm._last.value})
|
||||||
except (
|
except (
|
||||||
|
@ -127,6 +128,7 @@ async def increment_ohlc_buffer(
|
||||||
trio.ClosedResourceError
|
trio.ClosedResourceError
|
||||||
):
|
):
|
||||||
log.error(f'{ctx.chan.uid} dropped connection')
|
log.error(f'{ctx.chan.uid} dropped connection')
|
||||||
|
subs.remove(ctx)
|
||||||
|
|
||||||
|
|
||||||
@tractor.stream
|
@tractor.stream
|
||||||
|
|
Loading…
Reference in New Issue