WIP fsp output throttling - not working yet

fsp_feeds
Tyler Goodlet 2021-09-09 10:47:41 -04:00
parent 91c005b3c1
commit 61827a20bf
1 changed files with 9 additions and 5 deletions

View File

@ -246,7 +246,7 @@ async def sample_and_broadcast(
if tick_throttle: if tick_throttle:
# this is a send mem chan that likely # this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below. # pushes to the ``uniform_rate_send()`` below.
await stream.send(quote) await stream.send((sym, quote))
else: else:
await stream.send({sym: quote}) await stream.send({sym: quote})
@ -285,10 +285,14 @@ async def uniform_rate_send(
sleep_period = 1/rate - 0.000616 sleep_period = 1/rate - 0.000616
last_send = time.time() last_send = time.time()
aname = stream._ctx.chan.uid[0]
fsp = False
if 'fsp' in aname:
fsp = True
while True: while True:
first_quote = await quote_stream.receive() sym, first_quote = await quote_stream.receive()
start = time.time() start = time.time()
# append quotes since last iteration into the last quote's # append quotes since last iteration into the last quote's
@ -301,7 +305,7 @@ async def uniform_rate_send(
# #
while True: while True:
try: try:
next_quote = quote_stream.receive_nowait() sym, next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks') ticks = next_quote.get('ticks')
if ticks: if ticks:
@ -312,12 +316,12 @@ async def uniform_rate_send(
rate = 1 / (now - last_send) rate = 1 / (now - last_send)
last_send = now last_send = now
# print(f'{rate} Hz sending quotes') # \n{first_quote}') # log.info(f'{rate} Hz sending quotes') # \n{first_quote}')
# TODO: now if only we could sync this to the display # TODO: now if only we could sync this to the display
# rate timing exactly lul # rate timing exactly lul
try: try:
await stream.send({first_quote['symbol']: first_quote}) await stream.send({sym: first_quote})
break break
except trio.ClosedResourceError: except trio.ClosedResourceError:
# if the feed consumer goes down then drop # if the feed consumer goes down then drop