Pop subscriber streams on connection errors
parent
003fa6254f
commit
8b6fb83257
|
@ -248,12 +248,15 @@ async def sample_and_broadcast(
|
||||||
# so far seems like no since this should all
|
# so far seems like no since this should all
|
||||||
# be single-threaded.
|
# be single-threaded.
|
||||||
log.error(f'{stream._ctx.chan.uid} dropped connection')
|
log.error(f'{stream._ctx.chan.uid} dropped connection')
|
||||||
|
subs.remove((stream, tick_throttle))
|
||||||
|
|
||||||
|
|
||||||
async def uniform_rate_send(
|
async def uniform_rate_send(
|
||||||
|
|
||||||
rate: float,
|
rate: float,
|
||||||
quote_stream: trio.abc.ReceiveChannel,
|
quote_stream: trio.abc.ReceiveChannel,
|
||||||
stream: tractor.MsgStream,
|
stream: tractor.MsgStream,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
sleep_period = 1/rate - 0.000616
|
sleep_period = 1/rate - 0.000616
|
||||||
|
@ -289,8 +292,14 @@ async def uniform_rate_send(
|
||||||
|
|
||||||
# TODO: now if only we could sync this to the display
|
# TODO: now if only we could sync this to the display
|
||||||
# rate timing exactly lul
|
# rate timing exactly lul
|
||||||
await stream.send({first_quote['symbol']: first_quote})
|
try:
|
||||||
break
|
await stream.send({first_quote['symbol']: first_quote})
|
||||||
|
break
|
||||||
|
except trio.ClosedResourceError:
|
||||||
|
# if the feed consumer goes down then drop
|
||||||
|
# out of this rate limiter
|
||||||
|
log.warning(f'{stream} closed')
|
||||||
|
return
|
||||||
|
|
||||||
end = time.time()
|
end = time.time()
|
||||||
diff = end - start
|
diff = end - start
|
||||||
|
|
Loading…
Reference in New Issue