From fa88d91b8d9d022bf6b5e248bbb26a49f7ba164a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Sep 2021 10:16:43 -0400 Subject: [PATCH] Add breakpoint on bcast lag for testing --- piker/data/_sampling.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index bf9ecbba..4085571b 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -233,16 +233,23 @@ async def sample_and_broadcast( # thus other consumers still attached. subs = bus._subscribers[sym.lower()] + lags = 0 for (stream, tick_throttle) in subs: try: - if tick_throttle: - # this is a send mem chan that likely - # pushes to the ``uniform_rate_send()`` below. - await stream.send(quote) + with trio.move_on_after(0.2) as cs: + if tick_throttle: + # this is a send mem chan that likely + # pushes to the ``uniform_rate_send()`` below. + await stream.send(quote) - else: - await stream.send({sym: quote}) + else: + await stream.send({sym: quote}) + + if cs.cancelled_caught: + lags += 1 + if lags > 10: + await tractor.breakpoint() except ( trio.BrokenResourceError,