diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 772ef2bd..c2b06d83 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -57,6 +57,7 @@ from ._sampling import ( _incrementers, increment_ohlc_buffer, iter_ohlc_periods, + sample_and_broadcast, ) __all__ = [ @@ -242,84 +243,8 @@ async def allocate_persistent_feed( 'shm_write_opts', {} ).get('sum_tick_vlm', True) - log.info("Started shared mem bar writer") - - # iterate stream delivered by broker - async for quotes in quote_stream: - for sym, quote in quotes.items(): - - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. - - # start writing the shm buffer with appropriate - # trade data - for tick in quote['ticks']: - - # if tick['type'] in ('utrade',): - # print(tick) - - # write trade events to shm last OHLC sample - if tick['type'] in ('trade', 'utrade'): - - last = tick['price'] - - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - - new_v = tick.get('size', 0) - - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - if sum_tick_vlm: - volume = v + new_v - else: - # presume backend takes care of summing - # it's own vlm - volume = quote['volume'] - - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'bar_wap', # can be optionally provided - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - quote.get('bar_wap', 0), - volume, - ) - - # XXX: we need to be very cautious here that no - # context-channel is left lingering which doesn't have - # a far end receiver actor-task. In such a case you can - # end up triggering backpressure which which will - # eventually block this producer end of the feed and - # thus other consumers still attached. - subs = bus.subscribers[sym] - for ctx in subs: - # print(f'sub is {ctx.chan.uid}') - try: - await ctx.send_yield({sym: quote}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - subs.remove(ctx) - log.error(f'{ctx.chan.uid} dropped connection') + # start sample loop + await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) @tractor.stream diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index eccf4ca6..40951697 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -95,7 +95,7 @@ async def increment_ohlc_buffer( if total_s % delay_s != 0: continue - # TODO: numa this! + # TODO: ``numba`` this! for shm in shms: # TODO: in theory we could make this faster by copying the # "last" readable value into the underlying larger buffer's @@ -149,3 +149,92 @@ async def iter_ohlc_periods( await trio.sleep_forever() finally: subs.remove(ctx) + + +async def sample_and_broadcast( + bus: '_FeedBus', # noqa + shm: ShmArray, + quote_stream: trio.abc.ReceiveChannel, + sum_tick_vlm: bool = True, +) -> None: + + log.info("Started shared mem bar writer") + + # iterate stream delivered by broker + async for quotes in quote_stream: + + # TODO: ``numba`` this! + for sym, quote in quotes.items(): + + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. + + # start writing the shm buffer with appropriate + # trade data + for tick in quote['ticks']: + + # if tick['type'] in ('utrade',): + # print(tick) + + # write trade events to shm last OHLC sample + if tick['type'] in ('trade', 'utrade'): + + last = tick['price'] + + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] + + new_v = tick.get('size', 0) + + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last + + if sum_tick_vlm: + volume = v + new_v + else: + # presume backend takes care of summing + # it's own vlm + volume = quote['volume'] + + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'bar_wap', # can be optionally provided + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + quote.get('bar_wap', 0), + volume, + ) + + # XXX: we need to be very cautious here that no + # context-channel is left lingering which doesn't have + # a far end receiver actor-task. In such a case you can + # end up triggering backpressure which which will + # eventually block this producer end of the feed and + # thus other consumers still attached. + subs = bus.subscribers[sym] + for ctx in subs: + # print(f'sub is {ctx.chan.uid}') + try: + await ctx.send_yield({sym: quote}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + subs.remove(ctx) + log.error(f'{ctx.chan.uid} dropped connection')