Fix divide-by-zero when quote read is too fast in throttle task
parent
2f73f809f1
commit
68ed1164a1
|
@ -172,6 +172,7 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
# iterate stream delivered by broker
|
# iterate stream delivered by broker
|
||||||
async for quotes in quote_stream:
|
async for quotes in quote_stream:
|
||||||
|
|
||||||
# TODO: ``numba`` this!
|
# TODO: ``numba`` this!
|
||||||
for sym, quote in quotes.items():
|
for sym, quote in quotes.items():
|
||||||
|
|
||||||
|
@ -184,12 +185,8 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
# start writing the shm buffer with appropriate
|
# start writing the shm buffer with appropriate
|
||||||
# trade data
|
# trade data
|
||||||
|
for tick in quote['ticks']:
|
||||||
|
|
||||||
# TODO: we should probably not write every single
|
|
||||||
# value to an OHLC sample stream XD
|
|
||||||
# for a tick stream sure.. but this is excessive..
|
|
||||||
ticks = quote['ticks']
|
|
||||||
for tick in ticks:
|
|
||||||
ticktype = tick['type']
|
ticktype = tick['type']
|
||||||
|
|
||||||
# write trade events to shm last OHLC sample
|
# write trade events to shm last OHLC sample
|
||||||
|
@ -261,8 +258,7 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError
|
||||||
trio.EndOfChannel,
|
|
||||||
):
|
):
|
||||||
# XXX: do we need to deregister here
|
# XXX: do we need to deregister here
|
||||||
# if it's done in the fee bus code?
|
# if it's done in the fee bus code?
|
||||||
|
@ -272,10 +268,6 @@ async def sample_and_broadcast(
|
||||||
f'{stream._ctx.chan.uid} dropped '
|
f'{stream._ctx.chan.uid} dropped '
|
||||||
'`brokerd`-quotes-feed connection'
|
'`brokerd`-quotes-feed connection'
|
||||||
)
|
)
|
||||||
if tick_throttle:
|
|
||||||
assert stream.closed()
|
|
||||||
# await stream.aclose()
|
|
||||||
|
|
||||||
subs.remove((stream, tick_throttle))
|
subs.remove((stream, tick_throttle))
|
||||||
|
|
||||||
|
|
||||||
|
@ -291,8 +283,12 @@ async def uniform_rate_send(
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
sleep_period = 1/rate - 0.0001 # 100us
|
sleep_period = 1/rate - 0.000616
|
||||||
last_send = time.time()
|
last_send = time.time()
|
||||||
|
aname = stream._ctx.chan.uid[0]
|
||||||
|
fsp = False
|
||||||
|
if 'fsp' in aname:
|
||||||
|
fsp = True
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|
||||||
|
@ -312,33 +308,21 @@ async def uniform_rate_send(
|
||||||
sym, next_quote = quote_stream.receive_nowait()
|
sym, next_quote = quote_stream.receive_nowait()
|
||||||
ticks = next_quote.get('ticks')
|
ticks = next_quote.get('ticks')
|
||||||
|
|
||||||
# XXX: idea for frame type data structure we could use on the
|
|
||||||
# wire instead of a simple list?
|
|
||||||
# frames = {
|
|
||||||
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
|
|
||||||
|
|
||||||
# 'type_a': [tick0, tick1, tick2, .., tickn],
|
|
||||||
# 'type_b': [tick0, tick1, tick2, .., tickn],
|
|
||||||
# 'type_c': [tick0, tick1, tick2, .., tickn],
|
|
||||||
# ...
|
|
||||||
# 'type_n': [tick0, tick1, tick2, .., tickn],
|
|
||||||
# }
|
|
||||||
if ticks:
|
if ticks:
|
||||||
first_quote['ticks'].extend(ticks)
|
first_quote['ticks'].extend(ticks)
|
||||||
|
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
rate = 1 / (now - last_send)
|
diff = now - last_send
|
||||||
|
rate = 1 / diff if diff else float('inf')
|
||||||
|
last_send = now
|
||||||
|
|
||||||
log.debug(
|
# log.info(f'{rate} Hz sending quotes') # \n{first_quote}')
|
||||||
f'`{sym}` throttled send hz: {round(rate, ndigits=1)}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: now if only we could sync this to the display
|
# TODO: now if only we could sync this to the display
|
||||||
# rate timing exactly lul
|
# rate timing exactly lul
|
||||||
try:
|
try:
|
||||||
await stream.send({sym: first_quote})
|
await stream.send({sym: first_quote})
|
||||||
last_send = now
|
|
||||||
break
|
break
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
# if the feed consumer goes down then drop
|
# if the feed consumer goes down then drop
|
||||||
|
|
Loading…
Reference in New Issue