Better handle dynamic registry sampler broadcasts
In situations where clients are (dynamically) subscribing *while* broadcasts are starting to taking place we need to handle the `set`-modified-during-iteration case. This scenario seems to be more common during races on concurrent startup of multiple symbols. The solution here is to use another set to take note of subscribers which are successfully sent-to and then skipping them on re-try. This also contains an attempt to exception-handle throttled stream overruns caused by higher frequency feeds (like binance) pushing more quotes then can be handled during (UI) client startup.log_linearized_curve_overlays
parent
246d07021e
commit
5dd69b2295
|
@ -253,20 +253,30 @@ class Sampler:
|
|||
# f'consumers: {subs}'
|
||||
)
|
||||
borked: set[tractor.MsgStream] = set()
|
||||
for stream in subs:
|
||||
sent: set[tractor.MsgStream] = set()
|
||||
while True:
|
||||
try:
|
||||
await stream.send({
|
||||
'index': time_stamp or last_ts,
|
||||
'period': period_s,
|
||||
})
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
):
|
||||
log.error(
|
||||
f'{stream._ctx.chan.uid} dropped connection'
|
||||
)
|
||||
borked.add(stream)
|
||||
for stream in (subs - sent):
|
||||
try:
|
||||
await stream.send({
|
||||
'index': time_stamp or last_ts,
|
||||
'period': period_s,
|
||||
})
|
||||
sent.add(stream)
|
||||
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
):
|
||||
log.error(
|
||||
f'{stream._ctx.chan.uid} dropped connection'
|
||||
)
|
||||
borked.add(stream)
|
||||
else:
|
||||
break
|
||||
except RuntimeError:
|
||||
log.warning(f'Client subs {subs} changed while broadcasting')
|
||||
continue
|
||||
|
||||
for stream in borked:
|
||||
try:
|
||||
|
@ -848,6 +858,16 @@ async def uniform_rate_send(
|
|||
# rate timing exactly lul
|
||||
try:
|
||||
await stream.send({sym: first_quote})
|
||||
except tractor.RemoteActorError as rme:
|
||||
if rme.type is not tractor._exceptions.StreamOverrun:
|
||||
raise
|
||||
ctx = stream._ctx
|
||||
chan = ctx.chan
|
||||
log.warning(
|
||||
'Throttled quote-stream overrun!\n'
|
||||
f'{sym}:{ctx.cid}@{chan.uid}'
|
||||
)
|
||||
|
||||
except (
|
||||
# NOTE: any of these can be raised by ``tractor``'s IPC
|
||||
# transport-layer and we want to be highly resilient
|
||||
|
|
|
@ -1589,6 +1589,9 @@ async def open_feed(
|
|||
(brokermod, bfqsns),
|
||||
) in zip(ctxs, providers.items()):
|
||||
|
||||
# NOTE: do it asap to avoid overruns during multi-feed setup?
|
||||
ctx._backpressure = backpressure
|
||||
|
||||
for fqsn, flume_msg in flumes_msg_dict.items():
|
||||
flume = Flume.from_msg(flume_msg)
|
||||
assert flume.symbol.fqsn == fqsn
|
||||
|
|
Loading…
Reference in New Issue