From 30dabbab443e4a8940dc29e28bfb42116acffab9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 Mar 2021 14:19:48 -0400 Subject: [PATCH] Support backend volume summing; handle disconnects --- piker/data/__init__.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index fd98452f..afc74d75 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -20,6 +20,7 @@ Data feed apis and infra. We provide tsdb integrations for retrieving and storing data from your brokers as well as sharing your feeds with other fellow pikers. + """ from dataclasses import dataclass, field from contextlib import asynccontextmanager @@ -227,7 +228,9 @@ async def allocate_persistent_feed( # begin shm write loop and broadcast to subscribers - sum_tick_vlm: bool = True + sum_tick_vlm: bool = init_msg.get( + 'shm_write_opts', {} + ).get('sum_tick_vlm', True) async with quote_stream: @@ -247,6 +250,9 @@ async def allocate_persistent_feed( # start writing the shm buffer with appropriate trade data for tick in quote['ticks']: + # if tick['type'] in ('utrade',): + # print(tick) + # write trade events to shm last OHLC sample if tick['type'] in ('trade', 'utrade'): @@ -268,24 +274,34 @@ async def allocate_persistent_feed( if sum_tick_vlm: volume = v + new_v else: - volume = v + # presume backend takes care of summing + # it's own vlm + volume = quote['volume'] shm.array[[ 'open', 'high', 'low', 'close', + 'bar_wap', # can be optionally provided 'volume', ]][-1] = ( o, max(high, last), min(low, last), last, + quote.get('bar_wap', 0), volume, ) for ctx in bus.subscribers[sym]: - await ctx.send_yield({sym: quote}) + try: + await ctx.send_yield({sym: quote}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error(f'{ctx.chan.uid} dropped connection') @tractor.stream