Broadcast all tick types to subs, not just trades

cached_feeds
Tyler Goodlet 2021-03-30 10:56:19 -04:00
parent 5fc2aba3ed
commit 4f51ca74f4
1 changed files with 35 additions and 32 deletions

View File

@ -206,7 +206,6 @@ async def allocate_persistent_feed(
bus.feeds[symbol] = (cs, init_msg, first_quote)
with cs:
if opened:
# start history backfill task
# ``backfill_bars()`` is a required backend func
@ -246,43 +245,47 @@ async def allocate_persistent_feed(
# while another consumer is serviced..
# start writing the shm buffer with appropriate trade data
for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price']
for tick in quote['ticks']:
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
# write trade events to shm last OHLC sample
if tick['type'] in ('trade', 'utrade'):
new_v = tick.get('size', 0)
last = tick['price']
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
if sum_tick_vlm:
volume = v + new_v
else:
volume = v
new_v = tick.get('size', 0)
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
last,
volume,
)
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
for ctx in bus.subscribers[sym]:
await ctx.send_yield({sym: quote})
if sum_tick_vlm:
volume = v + new_v
else:
volume = v
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
last,
volume,
)
for ctx in bus.subscribers[sym]:
await ctx.send_yield({sym: quote})
@tractor.stream