Deliver and utilise broker backend OHLC sample rate in init msg

cached_feeds
Tyler Goodlet 2021-04-03 01:23:18 -04:00
parent c05fc8991a
commit ce4144aace
1 changed files with 49 additions and 25 deletions

View File

@ -53,9 +53,10 @@ from ._sharedmem import (
)
from ._source import base_iohlc_dtype, Symbol
from ._buffer import (
_shms,
_incrementers,
increment_ohlc_buffer,
subscribe_ohlc_for_increment,
shm_incrementing,
iter_ohlc_periods,
)
__all__ = [
@ -64,7 +65,7 @@ __all__ = [
'attach_shm_array',
'open_shm_array',
'get_shm_token',
'subscribe_ohlc_for_increment',
# 'subscribe_ohlc_for_increment',
]
@ -181,10 +182,10 @@ async def allocate_persistent_feed(
readonly=False,
)
# assert opened
if not opened:
# do history validation?
pass
# do history validation?
assert opened, "Persistent shm for sym was already open?!"
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
send, quote_stream = trio.open_memory_channel(2**8)
feed_is_live = trio.Event()
@ -204,34 +205,48 @@ async def allocate_persistent_feed(
init_msg[symbol]['shm_token'] = shm.token
cs = trio.CancelScope()
# TODO: make this into a composed type which also
# contains the backfiller cs for individual super-based
# resspawns when needed.
bus.feeds[symbol] = (cs, init_msg, first_quote)
with cs:
if opened:
# start history backfill task
# ``backfill_bars()`` is a required backend func
await bus.nursery.start(mod.backfill_bars, symbol, shm)
# yield back control to starting nursery
task_status.started((init_msg, first_quote))
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
await bus.nursery.start(mod.backfill_bars, symbol, shm)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# pass OHLC sample rate in seconds
init_msg[symbol]['sample_rate'] = delay_s
# yield back control to starting nursery
task_status.started((init_msg, first_quote))
await feed_is_live.wait()
# tell incrementer task it can start
shm_incrementing(shm.token['shm_name']).set()
# # tell incrementer task it can start
# shm_incrementing(shm.token['shm_name']).set()
# start shm incrementingn for OHLC sampling
subscribe_ohlc_for_increment(shm, delay_s)
# subscribe_ohlc_for_increment(shm, delay_s)
# begin shm write loop and broadcast to subscribers
if opened:
_shms.setdefault(delay_s, []).append(shm)
if _incrementers.get(delay_s) is None:
cs = await bus.nursery.start(increment_ohlc_buffer, delay_s)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
# begin shm write loop and broadcast to subscribers
async with quote_stream:
log.info("Started shared mem bar writer")
@ -372,6 +387,7 @@ class Feed:
_brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
_max_sample_rate: int = 0
# cache of symbol info messages received as first message when
# a stream startsc.
@ -380,15 +396,19 @@ class Feed:
async def receive(self) -> dict:
return await self.stream.__anext__()
async def index_stream(self) -> AsyncIterator[int]:
async def index_stream(
self,
delay_s: Optional[int] = None
) -> AsyncIterator[int]:
if not self._index_stream:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
self._index_stream = await self._brokerd_portal.run(
increment_ohlc_buffer,
shm_token=self.shm.token,
topics=['index'],
iter_ohlc_periods,
delay_s=delay_s or self._max_sample_rate,
)
return self._index_stream
@ -459,9 +479,9 @@ async def open_feed(
loglevel=loglevel,
)
# TODO: we can't do this **and** be compate with
# ``tractor.msg.pub``, should we maybe just drop this after
# tests are in?
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
init_msg = await stream.receive()
shm = attach_shm_array(
@ -478,10 +498,12 @@ async def open_feed(
mod=mod,
_brokerd_portal=portal,
)
ohlc_sample_rates = []
for sym, data in init_msg.items():
si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
symbol = Symbol(
key=sym,
@ -493,9 +515,11 @@ async def open_feed(
feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this
shm_token = data['shm_token']
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity
feed._max_sample_rate = max(ohlc_sample_rates)
yield feed