Simplify throttle loop to a single while block

This should in theory result in increased burstiness since we remove
the plain `trio.sleep()` and instead always wait on the receive channel
as much as possible until the `trio.move_on_after()` (+ time diffing
calcs) times out and signals the next throttled send cycle. This also is
slightly easier to grok code-wise instead of the `try, except` and
another tight while loop until a `trio.WouldBlock`. The only simpler
way i can think to do it is with 2 tasks: 1 to collect ticks and the
other to read and send at the throttle rate.

Comment out the log msg for now to avoid latency and add much more
detailed comments. Add an overrun log msg to the main sample loop.
simpler_quote_throttle_logic
Tyler Goodlet 2021-12-09 08:05:57 -05:00
parent c808965a6f
commit bcc8d8a0d5
2 changed files with 106 additions and 55 deletions

View File

@ -161,7 +161,7 @@ async def iter_ohlc_periods(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: '_FeedBus', # noqa bus: '_FeedsBus', # noqa
shm: ShmArray, shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
sum_tick_vlm: bool = True, sum_tick_vlm: bool = True,
@ -172,7 +172,6 @@ async def sample_and_broadcast(
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# TODO: ``numba`` this! # TODO: ``numba`` this!
for sym, quote in quotes.items(): for sym, quote in quotes.items():
@ -185,8 +184,12 @@ async def sample_and_broadcast(
# start writing the shm buffer with appropriate # start writing the shm buffer with appropriate
# trade data # trade data
for tick in quote['ticks']:
# TODO: we should probably not write every single
# value to an OHLC sample stream XD
# for a tick stream sure.. but this is excessive..
ticks = quote['ticks']
for tick in ticks:
ticktype = tick['type'] ticktype = tick['type']
# write trade events to shm last OHLC sample # write trade events to shm last OHLC sample
@ -246,7 +249,13 @@ async def sample_and_broadcast(
if tick_throttle: if tick_throttle:
# this is a send mem chan that likely # this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below. # pushes to the ``uniform_rate_send()`` below.
await stream.send((sym, quote)) try:
stream.send_nowait((sym, quote))
except trio.WouldBlock:
log.warning(
f'Feed overrun {bus.brokername} ->'
f'{stream._ctx.channel.uid} !!!'
)
else: else:
await stream.send({sym: quote}) await stream.send({sym: quote})
@ -258,7 +267,8 @@ async def sample_and_broadcast(
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError,
trio.EndOfChannel,
): ):
# XXX: do we need to deregister here # XXX: do we need to deregister here
# if it's done in the fee bus code? # if it's done in the fee bus code?
@ -268,6 +278,10 @@ async def sample_and_broadcast(
f'{stream._ctx.chan.uid} dropped ' f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection' '`brokerd`-quotes-feed connection'
) )
if tick_throttle:
assert stream.closed()
# await stream.aclose()
subs.remove((stream, tick_throttle)) subs.remove((stream, tick_throttle))
@ -283,56 +297,88 @@ async def uniform_rate_send(
) -> None: ) -> None:
sleep_period = 1/rate - 0.000616 # TODO: compute the approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616
# send cycle state
first_quote = last_quote = None
last_send = time.time() last_send = time.time()
aname = stream._ctx.chan.uid[0] diff = 0
fsp = False
if 'fsp' in aname:
fsp = True
while True: while True:
sym, first_quote = await quote_stream.receive() # compute the remaining time to sleep for this throttled cycle
start = time.time() left_to_sleep = throttle_period - diff
# append quotes since last iteration into the last quote's if left_to_sleep > 0:
# tick array/buffer. with trio.move_on_after(left_to_sleep) as cs:
sym, last_quote = await quote_stream.receive()
diff = time.time() - last_send
# TODO: once we decide to get fancy really we should have if not first_quote:
# a shared mem tick buffer that is just continually filled and first_quote = last_quote
# the UI just ready from it at it's display rate.
# we'll likely head toward this once we get this issue going:
#
while True:
try:
sym, next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks')
if ticks: if (throttle_period - diff) > 0:
first_quote['ticks'].extend(ticks) # received a quote but the send cycle period hasn't yet
# expired we aren't supposed to send yet so append
# to the tick frame.
except trio.WouldBlock: # append quotes since last iteration into the last quote's
now = time.time() # tick array/buffer.
rate = 1 / (now - last_send) ticks = last_quote.get('ticks')
last_send = now
# log.info(f'{rate} Hz sending quotes') # \n{first_quote}') # XXX: idea for frame type data structure we could
# use on the wire instead of a simple list?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
# TODO: now if only we could sync this to the display # 'type_a': [tick0, tick1, tick2, .., tickn],
# rate timing exactly lul # 'type_b': [tick0, tick1, tick2, .., tickn],
try: # 'type_c': [tick0, tick1, tick2, .., tickn],
await stream.send({sym: first_quote}) # ...
break # 'type_n': [tick0, tick1, tick2, .., tickn],
except trio.ClosedResourceError: # }
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
return
end = time.time() # TODO: once we decide to get fancy really we should
diff = end - start # have a shared mem tick buffer that is just
# continually filled and the UI just ready from it
# at it's display rate.
if ticks:
first_quote['ticks'].extend(ticks)
# throttle to provided transmit rate # send cycle isn't due yet so continue waiting
period = max(sleep_period - diff, 0) continue
if period > 0:
await trio.sleep(period) if cs.cancelled_caught:
# 2 cases:
# no quote has arrived yet this cycle so wait for
# the next one.
if not first_quote:
# if no last quote was received since the last send
# cycle **AND** if we timed out waiting for a most
# recent quote **but** the throttle cycle is now due to
# be sent -> we want to immediately send the next
# received quote ASAP.
sym, first_quote = await quote_stream.receive()
# we have a quote already so send it now.
measured_rate = 1 / (time.time() - last_send)
# log.info(
# f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}'
# )
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
except trio.ClosedResourceError:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
return
# reset send cycle state
first_quote = last_quote = None
diff = 0
last_send = time.time()

View File

@ -65,15 +65,16 @@ log = get_logger(__name__)
class _FeedsBus(BaseModel): class _FeedsBus(BaseModel):
"""Data feeds broadcaster and persistence management. '''
Data feeds broadcaster and persistence management.
This is a brokerd side api used to manager persistent real-time This is a brokerd side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely. streams that can be allocated and left alive indefinitely.
""" '''
brokername: str brokername: str
nursery: trio.Nursery nursery: trio.Nursery
feeds: dict[str, trio.CancelScope] = {} feeds: dict[str, tuple[trio.CancelScope, dict, dict]] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
@ -103,13 +104,13 @@ _bus: _FeedsBus = None
def get_feed_bus( def get_feed_bus(
brokername: str, brokername: str,
nursery: Optional[trio.Nursery] = None, nursery: Optional[trio.Nursery] = None,
) -> _FeedsBus: ) -> _FeedsBus:
""" '''
Retreive broker-daemon-local data feeds bus from process global Retreive broker-daemon-local data feeds bus from process global
scope. Serialize task access to lock. scope. Serialize task access to lock.
""" '''
global _bus global _bus
if nursery is not None: if nursery is not None:
@ -131,11 +132,12 @@ async def _setup_persistent_brokerd(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str brokername: str
) -> None: ) -> None:
"""Allocate a actor-wide service nursery in ``brokerd`` '''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by such that feeds can be run in the background persistently by
the broker backend as needed. the broker backend as needed.
""" '''
try: try:
async with trio.open_nursery() as service_nursery: async with trio.open_nursery() as service_nursery:
@ -243,7 +245,10 @@ async def allocate_persistent_feed(
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)
# start sample loop # start sample loop
await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) try:
await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm)
finally:
log.warning(f'{symbol}@{brokername} feed task terminated')
@tractor.context @tractor.context