Catch overruns on throttled feed subs too

Previously we would only detect overruns and drop subscriptions on
non-throttled feed subs, however you can get the same issue with
a wrapping throttler task:
- the intermediate mem chan can be blocked either by the throttler task
  being too slow, in which case we still want to warn about it
- the stream's IPC channel actually breaks and we still want to drop
  the connection and subscription so it doesn't be come a source of
  stale backpressure.
agg_feedz
Tyler Goodlet 2022-11-14 14:42:22 -05:00
parent f5cd63ad35
commit 326f153a47
1 changed files with 34 additions and 45 deletions

View File

@ -309,25 +309,27 @@ async def sample_and_broadcast(
volume, volume,
) )
# TODO: PUT THIS IN A ``_FeedsBus.broadcast()`` method!
# XXX: we need to be very cautious here that no # XXX: we need to be very cautious here that no
# context-channel is left lingering which doesn't have # context-channel is left lingering which doesn't have
# a far end receiver actor-task. In such a case you can # a far end receiver actor-task. In such a case you can
# end up triggering backpressure which which will # end up triggering backpressure which which will
# eventually block this producer end of the feed and # eventually block this producer end of the feed and
# thus other consumers still attached. # thus other consumers still attached.
sub_key: str = broker_symbol.lower()
subs: list[ subs: list[
tuple[ tuple[
Union[tractor.MsgStream, trio.MemorySendChannel], Union[tractor.MsgStream, trio.MemorySendChannel],
tractor.Context, tractor.Context,
float | None, # tick throttle in Hz float | None, # tick throttle in Hz
] ]
] = bus._subscribers[broker_symbol.lower()] ] = bus._subscribers[sub_key]
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqsn schema (but maybe it # it's own "name" into the fqsn schema (but maybe it
# should?) so we have to manually generate the correct # should?) so we have to manually generate the correct
# key here. # key here.
bsym = f'{broker_symbol}.{brokername}' fqsn = f'{broker_symbol}.{brokername}'
lags: int = 0 lags: int = 0
for (stream, ctx, tick_throttle) in subs: for (stream, ctx, tick_throttle) in subs:
@ -338,47 +340,38 @@ async def sample_and_broadcast(
# pushes to the ``uniform_rate_send()`` below. # pushes to the ``uniform_rate_send()`` below.
try: try:
stream.send_nowait( stream.send_nowait(
(bsym, quote) (fqsn, quote)
) )
except trio.WouldBlock: except trio.WouldBlock:
overruns[sub_key] += 1
chan = ctx.chan chan = ctx.chan
if ctx:
log.warning( log.warning(
f'Feed overrun {bus.brokername} ->' f'Feed OVERRUN {sub_key}'
f'{chan.uid} !!!' '@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n'
f'throttle = {tick_throttle} Hz'
) )
else:
key = id(stream) if overruns[sub_key] > 6:
overruns[key] += 1
log.warning(
f'Feed overrun {broker_symbol}'
'@{bus.brokername} -> '
f'feed @ {tick_throttle} Hz'
)
if overruns[key] > 6:
# TODO: should we check for the # TODO: should we check for the
# context being cancelled? this # context being cancelled? this
# could happen but the # could happen but the
# channel-ipc-pipe is still up. # channel-ipc-pipe is still up.
if not chan.connected(): if (
not chan.connected()
or ctx._cancel_called
):
log.warning( log.warning(
'Dropping broken consumer:\n' 'Dropping broken consumer:\n'
f'{broker_symbol}:' f'{sub_key}:'
f'{ctx.cid}@{chan.uid}' f'{ctx.cid}@{chan.uid}'
) )
await stream.aclose() await stream.aclose()
raise trio.BrokenResourceError raise trio.BrokenResourceError
else:
log.warning(
'Feed getting overrun bro!\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
)
continue
else: else:
await stream.send( await stream.send(
{bsym: quote} {fqsn: quote}
) )
if cs.cancelled_caught: if cs.cancelled_caught:
@ -406,13 +399,9 @@ async def sample_and_broadcast(
# so far seems like no since this should all # so far seems like no since this should all
# be single-threaded. Doing it anyway though # be single-threaded. Doing it anyway though
# since there seems to be some kinda race.. # since there seems to be some kinda race..
try: bus.remove_sub(
subs.remove((stream, tick_throttle)) sub_key,
except ValueError: (stream, ctx, tick_throttle),
log.error(
f'Stream was already removed from subs!?\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
) )