Use fqsn in `.manage_history()`

Allocate and `.started()` return the `ShmArray` from here as well in
prep for tsdb integration.
fqsns
Tyler Goodlet 2022-03-08 09:31:12 -05:00
parent b16167b8f3
commit e9d64ffee8
1 changed files with 24 additions and 27 deletions

View File

@ -191,10 +191,8 @@ async def _setup_persistent_brokerd(
async def manage_history( async def manage_history(
mod: ModuleType, mod: ModuleType,
shm: ShmArray,
bus: _FeedsBus, bus: _FeedsBus,
symbol: str, symbol: str,
we_opened_shm: bool,
some_data_ready: trio.Event, some_data_ready: trio.Event,
feed_is_live: trio.Event, feed_is_live: trio.Event,
@ -208,21 +206,30 @@ async def manage_history(
buffer. buffer.
''' '''
task_status.started() fqsn = mk_fqsn(mod.name, symbol)
opened = we_opened_shm # (maybe) allocate shm array for this broker/symbol which will
# TODO: history validation # be used for fast near-term history capture and processing.
# assert opened, f'Persistent shm for {symbol} was already open?!' shm, opened = maybe_open_shm_array(
# if not opened: key=fqsn,
# raise RuntimeError("Persistent shm for sym was already open?!")
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
)
if opened: if opened:
# ask broker backend for new history log.info('No existing `marketstored` found..')
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is # a required backend func this must block until shm is
# filled with first set of ohlc bars # filled with first set of ohlc bars
cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) _ = await bus.nursery.start(mod.backfill_bars, symbol, shm)
# yield back after client connect with filled shm
task_status.started(shm)
# indicate to caller that feed can be delivered to # indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history # remote requesting client since we've loaded history
@ -243,13 +250,12 @@ async def manage_history(
# start shm incrementing for OHLC sampling at the current # start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne. # detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None: if sampler.incrementers.get(delay_s) is None:
cs = await bus.start_task( await bus.start_task(
increment_ohlc_buffer, increment_ohlc_buffer,
delay_s, delay_s,
) )
await trio.sleep_forever() await trio.sleep_forever()
cs.cancel()
async def allocate_persistent_feed( async def allocate_persistent_feed(
@ -281,18 +287,6 @@ async def allocate_persistent_feed(
fqsn = mk_fqsn(brokername, symbol) fqsn = mk_fqsn(brokername, symbol)
# (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array(
key=fqsn,
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
)
# mem chan handed to broker backend so it can push real-time # mem chan handed to broker backend so it can push real-time
# quotes to this task for sampling and history storage (see below). # quotes to this task for sampling and history storage (see below).
send, quote_stream = trio.open_memory_channel(10) send, quote_stream = trio.open_memory_channel(10)
@ -311,13 +305,11 @@ async def allocate_persistent_feed(
# bus.nursery.start_soon( # bus.nursery.start_soon(
# await bus.start_task( # await bus.start_task(
await bus.nursery.start( shm = await bus.nursery.start(
manage_history, manage_history,
mod, mod,
shm,
bus, bus,
symbol, symbol,
opened,
some_data_ready, some_data_ready,
feed_is_live, feed_is_live,
) )
@ -454,6 +446,11 @@ async def open_feed_bus(
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
# re-send to trigger display loop cycle (necessary especially
# when the mkt is closed and no real-time messages are
# expected).
await stream.send(first_quotes)
if tick_throttle: if tick_throttle:
# open a bg task which receives quotes over a mem chan # open a bg task which receives quotes over a mem chan