diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 2aab0ecf..a1d615f5 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -17,6 +17,7 @@ """ Data buffers for fast shared humpy. """ +import time from typing import Dict, List import tractor @@ -152,10 +153,12 @@ async def iter_ohlc_periods( async def sample_and_broadcast( + bus: '_FeedBus', # noqa shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, sum_tick_vlm: bool = True, + ) -> None: log.info("Started shared mem bar writer") @@ -177,11 +180,10 @@ async def sample_and_broadcast( # trade data for tick in quote['ticks']: - # if tick['type'] in ('utrade',): - # print(tick) + ticktype = tick['type'] # write trade events to shm last OHLC sample - if tick['type'] in ('trade', 'utrade'): + if ticktype in ('trade', 'utrade'): last = tick['price'] @@ -229,16 +231,71 @@ async def sample_and_broadcast( # thus other consumers still attached. subs = bus._subscribers[sym.lower()] - for stream in subs: - # print(f'sub is {ctx.chan.uid}') - try: - await stream.send({sym: quote}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - # XXX: do we need to deregister here - # if it's done in the fee bus code? - # so far seems like no since this should all - # be single-threaded. - log.error(f'{stream._ctx.chan.uid} dropped connection') + for (stream, tick_throttle) in subs: + + if tick_throttle: + await stream.send(quote) + + else: + try: + await stream.send({sym: quote}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # XXX: do we need to deregister here + # if it's done in the fee bus code? + # so far seems like no since this should all + # be single-threaded. + log.error(f'{stream._ctx.chan.uid} dropped connection') + + +async def uniform_rate_send( + rate: float, + quote_stream: trio.abc.ReceiveChannel, + stream: tractor.MsgStream, +) -> None: + + sleep_period = 1/rate - 0.000616 + last_send = time.time() + + while True: + + first_quote = await quote_stream.receive() + start = time.time() + + # append quotes since last iteration into the last quote's + # tick array/buffer. + + # TODO: once we decide to get fancy really we should have + # a shared mem tick buffer that is just continually filled and + # the UI just ready from it at it's display rate. + # we'll likely head toward this once we get this issue going: + # + while True: + try: + next_quote = quote_stream.receive_nowait() + ticks = next_quote.get('ticks') + + if ticks: + first_quote['ticks'].extend(ticks) + + except trio.WouldBlock: + now = time.time() + rate = 1 / (now - last_send) + last_send = now + + # print(f'{rate} Hz sending quotes\n{first_quote}') + + # TODO: now if only we could sync this to the display + # rate timing exactly lul + await stream.send({first_quote['symbol']: first_quote}) + break + + end = time.time() + diff = end - start + + # throttle to provided transmit rate + period = max(sleep_period - diff, 0) + if period > 0: + await trio.sleep(period)