Fix divide-by-zero when quote read is too fast in throttle task

windows_testing_volume
wattygetlood 2021-09-16 16:35:24 -04:00 committed by Tyler Goodlet
parent 48ad97de2c
commit ad623119fa
1 changed files with 12 additions and 28 deletions

View File

@ -172,6 +172,7 @@ async def sample_and_broadcast(
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# TODO: ``numba`` this! # TODO: ``numba`` this!
for sym, quote in quotes.items(): for sym, quote in quotes.items():
@ -184,12 +185,8 @@ async def sample_and_broadcast(
# start writing the shm buffer with appropriate # start writing the shm buffer with appropriate
# trade data # trade data
for tick in quote['ticks']:
# TODO: we should probably not write every single
# value to an OHLC sample stream XD
# for a tick stream sure.. but this is excessive..
ticks = quote['ticks']
for tick in ticks:
ticktype = tick['type'] ticktype = tick['type']
# write trade events to shm last OHLC sample # write trade events to shm last OHLC sample
@ -261,8 +258,7 @@ async def sample_and_broadcast(
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError, trio.ClosedResourceError
trio.EndOfChannel,
): ):
# XXX: do we need to deregister here # XXX: do we need to deregister here
# if it's done in the fee bus code? # if it's done in the fee bus code?
@ -272,10 +268,6 @@ async def sample_and_broadcast(
f'{stream._ctx.chan.uid} dropped ' f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection' '`brokerd`-quotes-feed connection'
) )
if tick_throttle:
assert stream.closed()
# await stream.aclose()
subs.remove((stream, tick_throttle)) subs.remove((stream, tick_throttle))
@ -291,8 +283,12 @@ async def uniform_rate_send(
) -> None: ) -> None:
sleep_period = 1/rate - 0.0001 # 100us sleep_period = 1/rate - 0.000616
last_send = time.time() last_send = time.time()
aname = stream._ctx.chan.uid[0]
fsp = False
if 'fsp' in aname:
fsp = True
while True: while True:
@ -312,33 +308,21 @@ async def uniform_rate_send(
sym, next_quote = quote_stream.receive_nowait() sym, next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks') ticks = next_quote.get('ticks')
# XXX: idea for frame type data structure we could use on the
# wire instead of a simple list?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }
if ticks: if ticks:
first_quote['ticks'].extend(ticks) first_quote['ticks'].extend(ticks)
except trio.WouldBlock: except trio.WouldBlock:
now = time.time() now = time.time()
rate = 1 / (now - last_send) diff = now - last_send
rate = 1 / diff if diff else float('inf')
last_send = now
log.debug( # log.info(f'{rate} Hz sending quotes') # \n{first_quote}')
f'`{sym}` throttled send hz: {round(rate, ndigits=1)}'
)
# TODO: now if only we could sync this to the display # TODO: now if only we could sync this to the display
# rate timing exactly lul # rate timing exactly lul
try: try:
await stream.send({sym: first_quote}) await stream.send({sym: first_quote})
last_send = now
break break
except trio.ClosedResourceError: except trio.ClosedResourceError:
# if the feed consumer goes down then drop # if the feed consumer goes down then drop