Better handle dynamic registry sampler broadcasts

In situations where clients are (dynamically) subscribing *while*
broadcasts are starting to taking place we need to handle the
`set`-modified-during-iteration case. This scenario seems to be more
common during races on concurrent startup of multiple symbols. The
solution here is to use another set to take note of subscribers which
are successfully sent-to and then skipping them on re-try.

This also contains an attempt to exception-handle throttled stream
overruns caused by higher frequency feeds (like binance) pushing more
quotes then can be handled during (UI) client startup.
multichartz
Tyler Goodlet 2023-01-25 09:11:34 -05:00
parent d62aa071ae
commit 36fb8abe9d
2 changed files with 36 additions and 13 deletions

View File

@ -253,20 +253,30 @@ class Sampler:
# f'consumers: {subs}'
)
borked: set[tractor.MsgStream] = set()
for stream in subs:
sent: set[tractor.MsgStream] = set()
while True:
try:
await stream.send({
'index': time_stamp or last_ts,
'period': period_s,
})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
borked.add(stream)
for stream in (subs - sent):
try:
await stream.send({
'index': time_stamp or last_ts,
'period': period_s,
})
sent.add(stream)
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
borked.add(stream)
else:
break
except RuntimeError:
log.warning(f'Client subs {subs} changed while broadcasting')
continue
for stream in borked:
try:
@ -848,6 +858,16 @@ async def uniform_rate_send(
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
ctx = stream._ctx
chan = ctx.chan
log.warning(
'Throttled quote-stream overrun!\n'
f'{sym}:{ctx.cid}@{chan.uid}'
)
except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient

View File

@ -1589,6 +1589,9 @@ async def open_feed(
(brokermod, bfqsns),
) in zip(ctxs, providers.items()):
# NOTE: do it asap to avoid overruns during multi-feed setup?
ctx._backpressure = backpressure
for fqsn, flume_msg in flumes_msg_dict.items():
flume = Flume.from_msg(flume_msg)
assert flume.symbol.fqsn == fqsn