Move sample-broadcast routine into sampling module

cached_feeds
Tyler Goodlet 2021-04-06 14:20:33 -04:00
parent 71d02db126
commit 3147a49384
2 changed files with 93 additions and 79 deletions

View File

@ -57,6 +57,7 @@ from ._sampling import (
_incrementers, _incrementers,
increment_ohlc_buffer, increment_ohlc_buffer,
iter_ohlc_periods, iter_ohlc_periods,
sample_and_broadcast,
) )
__all__ = [ __all__ = [
@ -242,84 +243,8 @@ async def allocate_persistent_feed(
'shm_write_opts', {} 'shm_write_opts', {}
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)
log.info("Started shared mem bar writer") # start sample loop
await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm)
# iterate stream delivered by broker
async for quotes in quote_stream:
for sym, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# start writing the shm buffer with appropriate
# trade data
for tick in quote['ticks']:
# if tick['type'] in ('utrade',):
# print(tick)
# write trade events to shm last OHLC sample
if tick['type'] in ('trade', 'utrade'):
last = tick['price']
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick.get('size', 0)
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
if sum_tick_vlm:
volume = v + new_v
else:
# presume backend takes care of summing
# it's own vlm
volume = quote['volume']
shm.array[[
'open',
'high',
'low',
'close',
'bar_wap', # can be optionally provided
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
last,
quote.get('bar_wap', 0),
volume,
)
# XXX: we need to be very cautious here that no
# context-channel is left lingering which doesn't have
# a far end receiver actor-task. In such a case you can
# end up triggering backpressure which which will
# eventually block this producer end of the feed and
# thus other consumers still attached.
subs = bus.subscribers[sym]
for ctx in subs:
# print(f'sub is {ctx.chan.uid}')
try:
await ctx.send_yield({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
subs.remove(ctx)
log.error(f'{ctx.chan.uid} dropped connection')
@tractor.stream @tractor.stream

View File

@ -95,7 +95,7 @@ async def increment_ohlc_buffer(
if total_s % delay_s != 0: if total_s % delay_s != 0:
continue continue
# TODO: numa this! # TODO: ``numba`` this!
for shm in shms: for shm in shms:
# TODO: in theory we could make this faster by copying the # TODO: in theory we could make this faster by copying the
# "last" readable value into the underlying larger buffer's # "last" readable value into the underlying larger buffer's
@ -149,3 +149,92 @@ async def iter_ohlc_periods(
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
subs.remove(ctx) subs.remove(ctx)
async def sample_and_broadcast(
bus: '_FeedBus', # noqa
shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel,
sum_tick_vlm: bool = True,
) -> None:
log.info("Started shared mem bar writer")
# iterate stream delivered by broker
async for quotes in quote_stream:
# TODO: ``numba`` this!
for sym, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# start writing the shm buffer with appropriate
# trade data
for tick in quote['ticks']:
# if tick['type'] in ('utrade',):
# print(tick)
# write trade events to shm last OHLC sample
if tick['type'] in ('trade', 'utrade'):
last = tick['price']
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick.get('size', 0)
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
if sum_tick_vlm:
volume = v + new_v
else:
# presume backend takes care of summing
# it's own vlm
volume = quote['volume']
shm.array[[
'open',
'high',
'low',
'close',
'bar_wap', # can be optionally provided
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
last,
quote.get('bar_wap', 0),
volume,
)
# XXX: we need to be very cautious here that no
# context-channel is left lingering which doesn't have
# a far end receiver actor-task. In such a case you can
# end up triggering backpressure which which will
# eventually block this producer end of the feed and
# thus other consumers still attached.
subs = bus.subscribers[sym]
for ctx in subs:
# print(f'sub is {ctx.chan.uid}')
try:
await ctx.send_yield({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
subs.remove(ctx)
log.error(f'{ctx.chan.uid} dropped connection')