Merge pull request #242 from pikers/simpler_quote_throttle_logic

Simplify throttle loop to a single `while` block
tinas_unite
goodboy 2021-12-09 16:21:35 -05:00 committed by GitHub
commit b8ed7da63c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 106 additions and 55 deletions

View File

@ -161,7 +161,7 @@ async def iter_ohlc_periods(
async def sample_and_broadcast(
bus: '_FeedBus', # noqa
bus: '_FeedsBus', # noqa
shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel,
sum_tick_vlm: bool = True,
@ -172,7 +172,6 @@ async def sample_and_broadcast(
# iterate stream delivered by broker
async for quotes in quote_stream:
# TODO: ``numba`` this!
for sym, quote in quotes.items():
@ -185,8 +184,12 @@ async def sample_and_broadcast(
# start writing the shm buffer with appropriate
# trade data
for tick in quote['ticks']:
# TODO: we should probably not write every single
# value to an OHLC sample stream XD
# for a tick stream sure.. but this is excessive..
ticks = quote['ticks']
for tick in ticks:
ticktype = tick['type']
# write trade events to shm last OHLC sample
@ -246,7 +249,13 @@ async def sample_and_broadcast(
if tick_throttle:
# this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below.
await stream.send((sym, quote))
try:
stream.send_nowait((sym, quote))
except trio.WouldBlock:
log.warning(
f'Feed overrun {bus.brokername} ->'
f'{stream._ctx.channel.uid} !!!'
)
else:
await stream.send({sym: quote})
@ -258,7 +267,8 @@ async def sample_and_broadcast(
except (
trio.BrokenResourceError,
trio.ClosedResourceError
trio.ClosedResourceError,
trio.EndOfChannel,
):
# XXX: do we need to deregister here
# if it's done in the fee bus code?
@ -268,6 +278,10 @@ async def sample_and_broadcast(
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
if tick_throttle:
assert stream.closed()
# await stream.aclose()
subs.remove((stream, tick_throttle))
@ -283,56 +297,88 @@ async def uniform_rate_send(
) -> None:
sleep_period = 1/rate - 0.000616
# TODO: compute the approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616
# send cycle state
first_quote = last_quote = None
last_send = time.time()
aname = stream._ctx.chan.uid[0]
fsp = False
if 'fsp' in aname:
fsp = True
diff = 0
while True:
sym, first_quote = await quote_stream.receive()
start = time.time()
# compute the remaining time to sleep for this throttled cycle
left_to_sleep = throttle_period - diff
if left_to_sleep > 0:
with trio.move_on_after(left_to_sleep) as cs:
sym, last_quote = await quote_stream.receive()
diff = time.time() - last_send
if not first_quote:
first_quote = last_quote
if (throttle_period - diff) > 0:
# received a quote but the send cycle period hasn't yet
# expired we aren't supposed to send yet so append
# to the tick frame.
# append quotes since last iteration into the last quote's
# tick array/buffer.
ticks = last_quote.get('ticks')
# TODO: once we decide to get fancy really we should have
# a shared mem tick buffer that is just continually filled and
# the UI just ready from it at it's display rate.
# we'll likely head toward this once we get this issue going:
#
while True:
try:
sym, next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks')
# XXX: idea for frame type data structure we could
# use on the wire instead of a simple list?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }
# TODO: once we decide to get fancy really we should
# have a shared mem tick buffer that is just
# continually filled and the UI just ready from it
# at it's display rate.
if ticks:
first_quote['ticks'].extend(ticks)
except trio.WouldBlock:
now = time.time()
rate = 1 / (now - last_send)
last_send = now
# send cycle isn't due yet so continue waiting
continue
# log.info(f'{rate} Hz sending quotes') # \n{first_quote}')
if cs.cancelled_caught:
# 2 cases:
# no quote has arrived yet this cycle so wait for
# the next one.
if not first_quote:
# if no last quote was received since the last send
# cycle **AND** if we timed out waiting for a most
# recent quote **but** the throttle cycle is now due to
# be sent -> we want to immediately send the next
# received quote ASAP.
sym, first_quote = await quote_stream.receive()
# we have a quote already so send it now.
measured_rate = 1 / (time.time() - last_send)
# log.info(
# f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}'
# )
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
break
except trio.ClosedResourceError:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
return
end = time.time()
diff = end - start
# throttle to provided transmit rate
period = max(sleep_period - diff, 0)
if period > 0:
await trio.sleep(period)
# reset send cycle state
first_quote = last_quote = None
diff = 0
last_send = time.time()

View File

@ -65,15 +65,16 @@ log = get_logger(__name__)
class _FeedsBus(BaseModel):
"""Data feeds broadcaster and persistence management.
'''
Data feeds broadcaster and persistence management.
This is a brokerd side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely.
"""
'''
brokername: str
nursery: trio.Nursery
feeds: dict[str, trio.CancelScope] = {}
feeds: dict[str, tuple[trio.CancelScope, dict, dict]] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
@ -103,13 +104,13 @@ _bus: _FeedsBus = None
def get_feed_bus(
brokername: str,
nursery: Optional[trio.Nursery] = None,
) -> _FeedsBus:
"""
'''
Retreive broker-daemon-local data feeds bus from process global
scope. Serialize task access to lock.
"""
'''
global _bus
if nursery is not None:
@ -131,11 +132,12 @@ async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str
) -> None:
"""Allocate a actor-wide service nursery in ``brokerd``
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
"""
'''
try:
async with trio.open_nursery() as service_nursery:
@ -243,7 +245,10 @@ async def allocate_persistent_feed(
).get('sum_tick_vlm', True)
# start sample loop
try:
await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm)
finally:
log.warning(f'{symbol}@{brokername} feed task terminated')
@tractor.context