diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 160f3fc9..d16bf529 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -161,7 +161,7 @@ async def iter_ohlc_periods( async def sample_and_broadcast( - bus: '_FeedBus', # noqa + bus: '_FeedsBus', # noqa shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, sum_tick_vlm: bool = True, @@ -172,7 +172,6 @@ async def sample_and_broadcast( # iterate stream delivered by broker async for quotes in quote_stream: - # TODO: ``numba`` this! for sym, quote in quotes.items(): @@ -185,8 +184,12 @@ async def sample_and_broadcast( # start writing the shm buffer with appropriate # trade data - for tick in quote['ticks']: + # TODO: we should probably not write every single + # value to an OHLC sample stream XD + # for a tick stream sure.. but this is excessive.. + ticks = quote['ticks'] + for tick in ticks: ticktype = tick['type'] # write trade events to shm last OHLC sample @@ -246,7 +249,13 @@ async def sample_and_broadcast( if tick_throttle: # this is a send mem chan that likely # pushes to the ``uniform_rate_send()`` below. - await stream.send((sym, quote)) + try: + stream.send_nowait((sym, quote)) + except trio.WouldBlock: + log.warning( + f'Feed overrun {bus.brokername} ->' + f'{stream._ctx.channel.uid} !!!' + ) else: await stream.send({sym: quote}) @@ -258,7 +267,8 @@ async def sample_and_broadcast( except ( trio.BrokenResourceError, - trio.ClosedResourceError + trio.ClosedResourceError, + trio.EndOfChannel, ): # XXX: do we need to deregister here # if it's done in the fee bus code? @@ -268,6 +278,10 @@ async def sample_and_broadcast( f'{stream._ctx.chan.uid} dropped ' '`brokerd`-quotes-feed connection' ) + if tick_throttle: + assert stream.closed() + # await stream.aclose() + subs.remove((stream, tick_throttle)) @@ -283,56 +297,88 @@ async def uniform_rate_send( ) -> None: - sleep_period = 1/rate - 0.000616 + # TODO: compute the approx overhead latency per cycle + left_to_sleep = throttle_period = 1/rate - 0.000616 + + # send cycle state + first_quote = last_quote = None last_send = time.time() - aname = stream._ctx.chan.uid[0] - fsp = False - if 'fsp' in aname: - fsp = True + diff = 0 while True: - sym, first_quote = await quote_stream.receive() - start = time.time() + # compute the remaining time to sleep for this throttled cycle + left_to_sleep = throttle_period - diff - # append quotes since last iteration into the last quote's - # tick array/buffer. + if left_to_sleep > 0: + with trio.move_on_after(left_to_sleep) as cs: + sym, last_quote = await quote_stream.receive() + diff = time.time() - last_send - # TODO: once we decide to get fancy really we should have - # a shared mem tick buffer that is just continually filled and - # the UI just ready from it at it's display rate. - # we'll likely head toward this once we get this issue going: - # - while True: - try: - sym, next_quote = quote_stream.receive_nowait() - ticks = next_quote.get('ticks') + if not first_quote: + first_quote = last_quote - if ticks: - first_quote['ticks'].extend(ticks) + if (throttle_period - diff) > 0: + # received a quote but the send cycle period hasn't yet + # expired we aren't supposed to send yet so append + # to the tick frame. - except trio.WouldBlock: - now = time.time() - rate = 1 / (now - last_send) - last_send = now + # append quotes since last iteration into the last quote's + # tick array/buffer. + ticks = last_quote.get('ticks') - # log.info(f'{rate} Hz sending quotes') # \n{first_quote}') + # XXX: idea for frame type data structure we could + # use on the wire instead of a simple list? + # frames = { + # 'index': ['type_a', 'type_c', 'type_n', 'type_n'], - # TODO: now if only we could sync this to the display - # rate timing exactly lul - try: - await stream.send({sym: first_quote}) - break - except trio.ClosedResourceError: - # if the feed consumer goes down then drop - # out of this rate limiter - log.warning(f'{stream} closed') - return + # 'type_a': [tick0, tick1, tick2, .., tickn], + # 'type_b': [tick0, tick1, tick2, .., tickn], + # 'type_c': [tick0, tick1, tick2, .., tickn], + # ... + # 'type_n': [tick0, tick1, tick2, .., tickn], + # } - end = time.time() - diff = end - start + # TODO: once we decide to get fancy really we should + # have a shared mem tick buffer that is just + # continually filled and the UI just ready from it + # at it's display rate. + if ticks: + first_quote['ticks'].extend(ticks) - # throttle to provided transmit rate - period = max(sleep_period - diff, 0) - if period > 0: - await trio.sleep(period) + # send cycle isn't due yet so continue waiting + continue + + if cs.cancelled_caught: + # 2 cases: + # no quote has arrived yet this cycle so wait for + # the next one. + if not first_quote: + # if no last quote was received since the last send + # cycle **AND** if we timed out waiting for a most + # recent quote **but** the throttle cycle is now due to + # be sent -> we want to immediately send the next + # received quote ASAP. + sym, first_quote = await quote_stream.receive() + + # we have a quote already so send it now. + + measured_rate = 1 / (time.time() - last_send) + # log.info( + # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' + # ) + + # TODO: now if only we could sync this to the display + # rate timing exactly lul + try: + await stream.send({sym: first_quote}) + except trio.ClosedResourceError: + # if the feed consumer goes down then drop + # out of this rate limiter + log.warning(f'{stream} closed') + return + + # reset send cycle state + first_quote = last_quote = None + diff = 0 + last_send = time.time() diff --git a/piker/data/feed.py b/piker/data/feed.py index bf003b9a..fbf8035b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -65,15 +65,16 @@ log = get_logger(__name__) class _FeedsBus(BaseModel): - """Data feeds broadcaster and persistence management. + ''' + Data feeds broadcaster and persistence management. This is a brokerd side api used to manager persistent real-time streams that can be allocated and left alive indefinitely. - """ + ''' brokername: str nursery: trio.Nursery - feeds: dict[str, trio.CancelScope] = {} + feeds: dict[str, tuple[trio.CancelScope, dict, dict]] = {} task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() @@ -103,13 +104,13 @@ _bus: _FeedsBus = None def get_feed_bus( brokername: str, nursery: Optional[trio.Nursery] = None, + ) -> _FeedsBus: - """ + ''' Retreive broker-daemon-local data feeds bus from process global scope. Serialize task access to lock. - """ - + ''' global _bus if nursery is not None: @@ -131,11 +132,12 @@ async def _setup_persistent_brokerd( ctx: tractor.Context, brokername: str ) -> None: - """Allocate a actor-wide service nursery in ``brokerd`` + ''' + Allocate a actor-wide service nursery in ``brokerd`` such that feeds can be run in the background persistently by the broker backend as needed. - """ + ''' try: async with trio.open_nursery() as service_nursery: @@ -243,7 +245,10 @@ async def allocate_persistent_feed( ).get('sum_tick_vlm', True) # start sample loop - await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) + try: + await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) + finally: + log.warning(f'{symbol}@{brokername} feed task terminated') @tractor.context