Better handle dynamic registry sampler broadcasts
In situations where clients are (dynamically) subscribing *while* broadcasts are starting to taking place we need to handle the `set`-modified-during-iteration case. This scenario seems to be more common during races on concurrent startup of multiple symbols. The solution here is to use another set to take note of subscribers which are successfully sent-to and then skipping them on re-try. This also contains an attempt to exception-handle throttled stream overruns caused by higher frequency feeds (like binance) pushing more quotes then can be handled during (UI) client startup.storage_cli
parent
1e078a3c30
commit
63e705bab0
|
@ -253,12 +253,17 @@ class Sampler:
|
||||||
# f'consumers: {subs}'
|
# f'consumers: {subs}'
|
||||||
)
|
)
|
||||||
borked: set[tractor.MsgStream] = set()
|
borked: set[tractor.MsgStream] = set()
|
||||||
for stream in subs:
|
sent: set[tractor.MsgStream] = set()
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
for stream in (subs - sent):
|
||||||
try:
|
try:
|
||||||
await stream.send({
|
await stream.send({
|
||||||
'index': time_stamp or last_ts,
|
'index': time_stamp or last_ts,
|
||||||
'period': period_s,
|
'period': period_s,
|
||||||
})
|
})
|
||||||
|
sent.add(stream)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
trio.ClosedResourceError
|
trio.ClosedResourceError
|
||||||
|
@ -267,6 +272,11 @@ class Sampler:
|
||||||
f'{stream._ctx.chan.uid} dropped connection'
|
f'{stream._ctx.chan.uid} dropped connection'
|
||||||
)
|
)
|
||||||
borked.add(stream)
|
borked.add(stream)
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
except RuntimeError:
|
||||||
|
log.warning(f'Client subs {subs} changed while broadcasting')
|
||||||
|
continue
|
||||||
|
|
||||||
for stream in borked:
|
for stream in borked:
|
||||||
try:
|
try:
|
||||||
|
@ -848,6 +858,16 @@ async def uniform_rate_send(
|
||||||
# rate timing exactly lul
|
# rate timing exactly lul
|
||||||
try:
|
try:
|
||||||
await stream.send({sym: first_quote})
|
await stream.send({sym: first_quote})
|
||||||
|
except tractor.RemoteActorError as rme:
|
||||||
|
if rme.type is not tractor._exceptions.StreamOverrun:
|
||||||
|
raise
|
||||||
|
ctx = stream._ctx
|
||||||
|
chan = ctx.chan
|
||||||
|
log.warning(
|
||||||
|
'Throttled quote-stream overrun!\n'
|
||||||
|
f'{sym}:{ctx.cid}@{chan.uid}'
|
||||||
|
)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
# NOTE: any of these can be raised by ``tractor``'s IPC
|
# NOTE: any of these can be raised by ``tractor``'s IPC
|
||||||
# transport-layer and we want to be highly resilient
|
# transport-layer and we want to be highly resilient
|
||||||
|
|
|
@ -1589,6 +1589,9 @@ async def open_feed(
|
||||||
(brokermod, bfqsns),
|
(brokermod, bfqsns),
|
||||||
) in zip(ctxs, providers.items()):
|
) in zip(ctxs, providers.items()):
|
||||||
|
|
||||||
|
# NOTE: do it asap to avoid overruns during multi-feed setup?
|
||||||
|
ctx._backpressure = backpressure
|
||||||
|
|
||||||
for fqsn, flume_msg in flumes_msg_dict.items():
|
for fqsn, flume_msg in flumes_msg_dict.items():
|
||||||
flume = Flume.from_msg(flume_msg)
|
flume = Flume.from_msg(flume_msg)
|
||||||
assert flume.symbol.fqsn == fqsn
|
assert flume.symbol.fqsn == fqsn
|
||||||
|
|
Loading…
Reference in New Issue