Support backend volume summing; handle disconnects

cached_feeds
Tyler Goodlet 2021-03-31 14:19:48 -04:00
parent 1d013126b9
commit 30dabbab44
1 changed files with 19 additions and 3 deletions

View File

@ -20,6 +20,7 @@ Data feed apis and infra.
We provide tsdb integrations for retrieving We provide tsdb integrations for retrieving
and storing data from your brokers as well as and storing data from your brokers as well as
sharing your feeds with other fellow pikers. sharing your feeds with other fellow pikers.
""" """
from dataclasses import dataclass, field from dataclasses import dataclass, field
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
@ -227,7 +228,9 @@ async def allocate_persistent_feed(
# begin shm write loop and broadcast to subscribers # begin shm write loop and broadcast to subscribers
sum_tick_vlm: bool = True sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
async with quote_stream: async with quote_stream:
@ -247,6 +250,9 @@ async def allocate_persistent_feed(
# start writing the shm buffer with appropriate trade data # start writing the shm buffer with appropriate trade data
for tick in quote['ticks']: for tick in quote['ticks']:
# if tick['type'] in ('utrade',):
# print(tick)
# write trade events to shm last OHLC sample # write trade events to shm last OHLC sample
if tick['type'] in ('trade', 'utrade'): if tick['type'] in ('trade', 'utrade'):
@ -268,24 +274,34 @@ async def allocate_persistent_feed(
if sum_tick_vlm: if sum_tick_vlm:
volume = v + new_v volume = v + new_v
else: else:
volume = v # presume backend takes care of summing
# it's own vlm
volume = quote['volume']
shm.array[[ shm.array[[
'open', 'open',
'high', 'high',
'low', 'low',
'close', 'close',
'bar_wap', # can be optionally provided
'volume', 'volume',
]][-1] = ( ]][-1] = (
o, o,
max(high, last), max(high, last),
min(low, last), min(low, last),
last, last,
quote.get('bar_wap', 0),
volume, volume,
) )
for ctx in bus.subscribers[sym]: for ctx in bus.subscribers[sym]:
await ctx.send_yield({sym: quote}) try:
await ctx.send_yield({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(f'{ctx.chan.uid} dropped connection')
@tractor.stream @tractor.stream