Remove dead OHLC index consumers from subs list on error

pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-02 22:08:59 -04:00
parent fe0d66e847
commit 2a9d24ccac
1 changed files with 4 additions and 2 deletions

View File

@ -118,8 +118,9 @@ async def increment_ohlc_buffer(
shm.push(last)
# broadcast the buffer index step
# yield {'index': shm._last.value}
for ctx in _subscribers.get(delay_s, ()):
subs = _subscribers.get(delay_s, ())
for ctx in subs:
try:
await ctx.send_yield({'index': shm._last.value})
except (
@ -127,6 +128,7 @@ async def increment_ohlc_buffer(
trio.ClosedResourceError
):
log.error(f'{ctx.chan.uid} dropped connection')
subs.remove(ctx)
@tractor.stream