diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 6da478d3..fd98452f 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -206,7 +206,6 @@ async def allocate_persistent_feed( bus.feeds[symbol] = (cs, init_msg, first_quote) with cs: - if opened: # start history backfill task # ``backfill_bars()`` is a required backend func @@ -246,43 +245,47 @@ async def allocate_persistent_feed( # while another consumer is serviced.. # start writing the shm buffer with appropriate trade data - for tick in iterticks(quote, types=('trade', 'utrade',)): - last = tick['price'] + for tick in quote['ticks']: - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] + # write trade events to shm last OHLC sample + if tick['type'] in ('trade', 'utrade'): - new_v = tick.get('size', 0) + last = tick['price'] - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] - if sum_tick_vlm: - volume = v + new_v - else: - volume = v + new_v = tick.get('size', 0) - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - volume, - ) + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last - for ctx in bus.subscribers[sym]: - await ctx.send_yield({sym: quote}) + if sum_tick_vlm: + volume = v + new_v + else: + volume = v + + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + volume, + ) + + for ctx in bus.subscribers[sym]: + await ctx.send_yield({sym: quote}) @tractor.stream