From bcc8d8a0d5133671b3365309a1588d0d9128c26e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Dec 2021 08:05:57 -0500 Subject: [PATCH] Simplify throttle loop to a single while block This should in theory result in increased burstiness since we remove the plain `trio.sleep()` and instead always wait on the receive channel as much as possible until the `trio.move_on_after()` (+ time diffing calcs) times out and signals the next throttled send cycle. This also is slightly easier to grok code-wise instead of the `try, except` and another tight while loop until a `trio.WouldBlock`. The only simpler way i can think to do it is with 2 tasks: 1 to collect ticks and the other to read and send at the throttle rate. Comment out the log msg for now to avoid latency and add much more detailed comments. Add an overrun log msg to the main sample loop. --- piker/data/_sampling.py | 138 ++++++++++++++++++++++++++-------------- piker/data/feed.py | 23 ++++--- 2 files changed, 106 insertions(+), 55 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 160f3fc9..d16bf529 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -161,7 +161,7 @@ async def iter_ohlc_periods( async def sample_and_broadcast( - bus: '_FeedBus', # noqa + bus: '_FeedsBus', # noqa shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, sum_tick_vlm: bool = True, @@ -172,7 +172,6 @@ async def sample_and_broadcast( # iterate stream delivered by broker async for quotes in quote_stream: - # TODO: ``numba`` this! for sym, quote in quotes.items(): @@ -185,8 +184,12 @@ async def sample_and_broadcast( # start writing the shm buffer with appropriate # trade data - for tick in quote['ticks']: + # TODO: we should probably not write every single + # value to an OHLC sample stream XD + # for a tick stream sure.. but this is excessive.. + ticks = quote['ticks'] + for tick in ticks: ticktype = tick['type'] # write trade events to shm last OHLC sample @@ -246,7 +249,13 @@ async def sample_and_broadcast( if tick_throttle: # this is a send mem chan that likely # pushes to the ``uniform_rate_send()`` below. - await stream.send((sym, quote)) + try: + stream.send_nowait((sym, quote)) + except trio.WouldBlock: + log.warning( + f'Feed overrun {bus.brokername} ->' + f'{stream._ctx.channel.uid} !!!' + ) else: await stream.send({sym: quote}) @@ -258,7 +267,8 @@ async def sample_and_broadcast( except ( trio.BrokenResourceError, - trio.ClosedResourceError + trio.ClosedResourceError, + trio.EndOfChannel, ): # XXX: do we need to deregister here # if it's done in the fee bus code? @@ -268,6 +278,10 @@ async def sample_and_broadcast( f'{stream._ctx.chan.uid} dropped ' '`brokerd`-quotes-feed connection' ) + if tick_throttle: + assert stream.closed() + # await stream.aclose() + subs.remove((stream, tick_throttle)) @@ -283,56 +297,88 @@ async def uniform_rate_send( ) -> None: - sleep_period = 1/rate - 0.000616 + # TODO: compute the approx overhead latency per cycle + left_to_sleep = throttle_period = 1/rate - 0.000616 + + # send cycle state + first_quote = last_quote = None last_send = time.time() - aname = stream._ctx.chan.uid[0] - fsp = False - if 'fsp' in aname: - fsp = True + diff = 0 while True: - sym, first_quote = await quote_stream.receive() - start = time.time() + # compute the remaining time to sleep for this throttled cycle + left_to_sleep = throttle_period - diff - # append quotes since last iteration into the last quote's - # tick array/buffer. + if left_to_sleep > 0: + with trio.move_on_after(left_to_sleep) as cs: + sym, last_quote = await quote_stream.receive() + diff = time.time() - last_send - # TODO: once we decide to get fancy really we should have - # a shared mem tick buffer that is just continually filled and - # the UI just ready from it at it's display rate. - # we'll likely head toward this once we get this issue going: - # - while True: - try: - sym, next_quote = quote_stream.receive_nowait() - ticks = next_quote.get('ticks') + if not first_quote: + first_quote = last_quote - if ticks: - first_quote['ticks'].extend(ticks) + if (throttle_period - diff) > 0: + # received a quote but the send cycle period hasn't yet + # expired we aren't supposed to send yet so append + # to the tick frame. - except trio.WouldBlock: - now = time.time() - rate = 1 / (now - last_send) - last_send = now + # append quotes since last iteration into the last quote's + # tick array/buffer. + ticks = last_quote.get('ticks') - # log.info(f'{rate} Hz sending quotes') # \n{first_quote}') + # XXX: idea for frame type data structure we could + # use on the wire instead of a simple list? + # frames = { + # 'index': ['type_a', 'type_c', 'type_n', 'type_n'], - # TODO: now if only we could sync this to the display - # rate timing exactly lul - try: - await stream.send({sym: first_quote}) - break - except trio.ClosedResourceError: - # if the feed consumer goes down then drop - # out of this rate limiter - log.warning(f'{stream} closed') - return + # 'type_a': [tick0, tick1, tick2, .., tickn], + # 'type_b': [tick0, tick1, tick2, .., tickn], + # 'type_c': [tick0, tick1, tick2, .., tickn], + # ... + # 'type_n': [tick0, tick1, tick2, .., tickn], + # } - end = time.time() - diff = end - start + # TODO: once we decide to get fancy really we should + # have a shared mem tick buffer that is just + # continually filled and the UI just ready from it + # at it's display rate. + if ticks: + first_quote['ticks'].extend(ticks) - # throttle to provided transmit rate - period = max(sleep_period - diff, 0) - if period > 0: - await trio.sleep(period) + # send cycle isn't due yet so continue waiting + continue + + if cs.cancelled_caught: + # 2 cases: + # no quote has arrived yet this cycle so wait for + # the next one. + if not first_quote: + # if no last quote was received since the last send + # cycle **AND** if we timed out waiting for a most + # recent quote **but** the throttle cycle is now due to + # be sent -> we want to immediately send the next + # received quote ASAP. + sym, first_quote = await quote_stream.receive() + + # we have a quote already so send it now. + + measured_rate = 1 / (time.time() - last_send) + # log.info( + # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' + # ) + + # TODO: now if only we could sync this to the display + # rate timing exactly lul + try: + await stream.send({sym: first_quote}) + except trio.ClosedResourceError: + # if the feed consumer goes down then drop + # out of this rate limiter + log.warning(f'{stream} closed') + return + + # reset send cycle state + first_quote = last_quote = None + diff = 0 + last_send = time.time() diff --git a/piker/data/feed.py b/piker/data/feed.py index bf003b9a..fbf8035b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -65,15 +65,16 @@ log = get_logger(__name__) class _FeedsBus(BaseModel): - """Data feeds broadcaster and persistence management. + ''' + Data feeds broadcaster and persistence management. This is a brokerd side api used to manager persistent real-time streams that can be allocated and left alive indefinitely. - """ + ''' brokername: str nursery: trio.Nursery - feeds: dict[str, trio.CancelScope] = {} + feeds: dict[str, tuple[trio.CancelScope, dict, dict]] = {} task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() @@ -103,13 +104,13 @@ _bus: _FeedsBus = None def get_feed_bus( brokername: str, nursery: Optional[trio.Nursery] = None, + ) -> _FeedsBus: - """ + ''' Retreive broker-daemon-local data feeds bus from process global scope. Serialize task access to lock. - """ - + ''' global _bus if nursery is not None: @@ -131,11 +132,12 @@ async def _setup_persistent_brokerd( ctx: tractor.Context, brokername: str ) -> None: - """Allocate a actor-wide service nursery in ``brokerd`` + ''' + Allocate a actor-wide service nursery in ``brokerd`` such that feeds can be run in the background persistently by the broker backend as needed. - """ + ''' try: async with trio.open_nursery() as service_nursery: @@ -243,7 +245,10 @@ async def allocate_persistent_feed( ).get('sum_tick_vlm', True) # start sample loop - await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) + try: + await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) + finally: + log.warning(f'{symbol}@{brokername} feed task terminated') @tractor.context