Add breakpoint on bcast lag for testing
parent
32f72dd3e8
commit
fa88d91b8d
|
@ -233,9 +233,11 @@ async def sample_and_broadcast(
|
||||||
# thus other consumers still attached.
|
# thus other consumers still attached.
|
||||||
subs = bus._subscribers[sym.lower()]
|
subs = bus._subscribers[sym.lower()]
|
||||||
|
|
||||||
|
lags = 0
|
||||||
for (stream, tick_throttle) in subs:
|
for (stream, tick_throttle) in subs:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
with trio.move_on_after(0.2) as cs:
|
||||||
if tick_throttle:
|
if tick_throttle:
|
||||||
# this is a send mem chan that likely
|
# this is a send mem chan that likely
|
||||||
# pushes to the ``uniform_rate_send()`` below.
|
# pushes to the ``uniform_rate_send()`` below.
|
||||||
|
@ -244,6 +246,11 @@ async def sample_and_broadcast(
|
||||||
else:
|
else:
|
||||||
await stream.send({sym: quote})
|
await stream.send({sym: quote})
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
lags += 1
|
||||||
|
if lags > 10:
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
trio.ClosedResourceError
|
trio.ClosedResourceError
|
||||||
|
|
Loading…
Reference in New Issue