From 326f153a47b37e4ce63593466f4440b3583295a9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Nov 2022 14:42:22 -0500 Subject: [PATCH] Catch overruns on throttled feed subs too Previously we would only detect overruns and drop subscriptions on non-throttled feed subs, however you can get the same issue with a wrapping throttler task: - the intermediate mem chan can be blocked either by the throttler task being too slow, in which case we still want to warn about it - the stream's IPC channel actually breaks and we still want to drop the connection and subscription so it doesn't be come a source of stale backpressure. --- piker/data/_sampling.py | 79 ++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 45 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index edba3e1f..61b2bd2f 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -309,25 +309,27 @@ async def sample_and_broadcast( volume, ) + # TODO: PUT THIS IN A ``_FeedsBus.broadcast()`` method! # XXX: we need to be very cautious here that no # context-channel is left lingering which doesn't have # a far end receiver actor-task. In such a case you can # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. + sub_key: str = broker_symbol.lower() subs: list[ tuple[ Union[tractor.MsgStream, trio.MemorySendChannel], tractor.Context, float | None, # tick throttle in Hz ] - ] = bus._subscribers[broker_symbol.lower()] + ] = bus._subscribers[sub_key] # NOTE: by default the broker backend doesn't append # it's own "name" into the fqsn schema (but maybe it # should?) so we have to manually generate the correct # key here. - bsym = f'{broker_symbol}.{brokername}' + fqsn = f'{broker_symbol}.{brokername}' lags: int = 0 for (stream, ctx, tick_throttle) in subs: @@ -338,47 +340,38 @@ async def sample_and_broadcast( # pushes to the ``uniform_rate_send()`` below. try: stream.send_nowait( - (bsym, quote) + (fqsn, quote) ) except trio.WouldBlock: + overruns[sub_key] += 1 chan = ctx.chan - if ctx: - log.warning( - f'Feed overrun {bus.brokername} ->' - f'{chan.uid} !!!' - ) - else: - key = id(stream) - overruns[key] += 1 - log.warning( - f'Feed overrun {broker_symbol}' - '@{bus.brokername} -> ' - f'feed @ {tick_throttle} Hz' - ) - if overruns[key] > 6: - # TODO: should we check for the - # context being cancelled? this - # could happen but the - # channel-ipc-pipe is still up. - if not chan.connected(): - log.warning( - 'Dropping broken consumer:\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' - ) - await stream.aclose() - raise trio.BrokenResourceError - else: - log.warning( - 'Feed getting overrun bro!\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' - ) - continue + log.warning( + f'Feed OVERRUN {sub_key}' + '@{bus.brokername} -> \n' + f'feed @ {chan.uid}\n' + f'throttle = {tick_throttle} Hz' + ) + + if overruns[sub_key] > 6: + # TODO: should we check for the + # context being cancelled? this + # could happen but the + # channel-ipc-pipe is still up. + if ( + not chan.connected() + or ctx._cancel_called + ): + log.warning( + 'Dropping broken consumer:\n' + f'{sub_key}:' + f'{ctx.cid}@{chan.uid}' + ) + await stream.aclose() + raise trio.BrokenResourceError else: await stream.send( - {bsym: quote} + {fqsn: quote} ) if cs.cancelled_caught: @@ -406,14 +399,10 @@ async def sample_and_broadcast( # so far seems like no since this should all # be single-threaded. Doing it anyway though # since there seems to be some kinda race.. - try: - subs.remove((stream, tick_throttle)) - except ValueError: - log.error( - f'Stream was already removed from subs!?\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' - ) + bus.remove_sub( + sub_key, + (stream, ctx, tick_throttle), + ) # TODO: a less naive throttler, here's some snippets: