Don't error on sub removal attempts, feeds need backpressure

fspd_cluster
Tyler Goodlet 2021-12-07 15:08:44 -05:00
parent 590db2c51b
commit 835ad7794c
1 changed files with 10 additions and 2 deletions

View File

@ -356,7 +356,10 @@ async def open_feed_bus(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle: if tick_throttle:
n.cancel_scope.cancel() n.cancel_scope.cancel()
try:
bus._subscribers[symbol].remove(sub) bus._subscribers[symbol].remove(sub)
except ValueError:
log.warning(f'{sub} for {symbol} was already removed?')
@asynccontextmanager @asynccontextmanager
@ -520,7 +523,12 @@ async def open_feed(
) as (ctx, (init_msg, first_quotes)), ) as (ctx, (init_msg, first_quotes)),
ctx.open_stream() as stream, ctx.open_stream(
# XXX: be explicit about stream backpressure since we should
# **never** overrun on feeds being too fast, which will
# pretty much always happen with HFT XD
backpressure=True
) as stream,
): ):
# we can only read from shm # we can only read from shm