Port feed layer to use new `samplerd` APIs

Always use `open_sample_stream()` to register fast and slow quote feed
buffers and get a sampler stream which we use to trigger
`Sampler.broadcast_all()` calls on the service side after backfill
events.
epoch_index_backup
Tyler Goodlet 2023-01-04 23:03:43 -05:00
parent 8ed48add18
commit 3328822e44
1 changed files with 82 additions and 79 deletions

View File

@ -74,10 +74,9 @@ from ._source import (
)
from ..ui import _search
from ._sampling import (
Sampler,
open_sample_stream,
sample_and_broadcast,
uniform_rate_send,
_default_delay_s,
)
from ..brokers._util import (
DataUnavailable,
@ -277,6 +276,7 @@ async def start_backfill(
bfqsn: str,
shm: ShmArray,
timeframe: float,
sampler_stream: tractor.MsgStream,
last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
@ -325,7 +325,7 @@ async def start_backfill(
# TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqsn..
# otherwise all fsps get reset on every chart..
await Sampler.broadcast_all()
await sampler_stream.send('broadcast_all')
# signal that backfilling to tsdb's end datum is complete
bf_done = trio.Event()
@ -493,7 +493,7 @@ async def start_backfill(
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
await Sampler.broadcast_all()
await sampler_stream.send('broadcast_all')
# short-circuit (for now)
bf_done.set()
@ -504,6 +504,7 @@ async def basic_backfill(
mod: ModuleType,
bfqsn: str,
shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream,
) -> None:
@ -521,7 +522,8 @@ async def basic_backfill(
mod,
bfqsn,
shm,
timeframe=timeframe,
timeframe,
sampler_stream,
)
)
except DataUnavailable:
@ -537,6 +539,7 @@ async def tsdb_backfill(
fqsn: str,
bfqsn: str,
shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream,
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
@ -569,7 +572,8 @@ async def tsdb_backfill(
mod,
bfqsn,
shm,
timeframe=timeframe,
timeframe,
sampler_stream,
last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True,
storage=storage,
@ -734,7 +738,7 @@ async def tsdb_backfill(
# (usually a chart showing graphics for said fsp)
# which tells the chart to conduct a manual full
# graphics loop cycle.
await Sampler.broadcast_all()
await sampler_stream.send('broadcast_all')
# TODO: write new data to tsdb to be ready to for next read.
@ -823,6 +827,17 @@ async def manage_history(
"Persistent shm for sym was already open?!"
)
# register 1s and 1m buffers with the global incrementer task
async with open_sample_stream(
period_s=1,
cache_key=fqsn,
shms_by_period={
1.: rt_shm.token,
60.: hist_shm.token,
},
open_index_stream=True,
) as sample_stream:
log.info('Scanning for existing `marketstored`')
tsdb_is_up = await check_for_service('marketstored')
@ -841,7 +856,11 @@ async def manage_history(
async with (
marketstore.open_storage_client(fqsn)as storage,
):
hist_shm, rt_shm = await bus.nursery.start(
# TODO: drop returning the output that we pass in?
(
hist_shm,
rt_shm,
) = await bus.nursery.start(
tsdb_backfill,
mod,
marketstore,
@ -853,6 +872,7 @@ async def manage_history(
1: rt_shm,
60: hist_shm,
},
sample_stream,
)
# yield back after client connect with filled shm
@ -868,9 +888,9 @@ async def manage_history(
# data that can be used.
some_data_ready.set()
# history retreival loop depending on user interaction and thus
# a small RPC-prot for remotely controllinlg what data is loaded
# for viewing.
# history retreival loop depending on user interaction
# and thus a small RPC-prot for remotely controllinlg
# what data is loaded for viewing.
await trio.sleep_forever()
# load less history if no tsdb can be found
@ -882,10 +902,11 @@ async def manage_history(
bus,
mod,
bfqsn,
shms={
{
1: rt_shm,
60: hist_shm,
},
sample_stream,
)
task_status.started((
hist_zero_index,
@ -997,6 +1018,7 @@ async def allocate_persistent_feed(
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(
(
izero_hist,
hist_shm,
@ -1029,13 +1051,6 @@ async def allocate_persistent_feed(
# feed to that name (for now).
bus.feeds[symstr] = bus.feeds[bfqsn] = flume
# insert 1s ohlc into the increment buffer set
# to update and shift every second
Sampler.ohlcv_shms.setdefault(
1,
[]
).append(rt_shm)
task_status.started()
if not start_stream:
@ -1045,18 +1060,6 @@ async def allocate_persistent_feed(
# the backend will indicate when real-time quotes have begun.
await feed_is_live.wait()
# insert 1m ohlc into the increment buffer set
# to shift every 60s.
Sampler.ohlcv_shms.setdefault(60, []).append(hist_shm)
# create buffer a single incrementer task broker backend
# (aka `brokerd`) using the lowest sampler period.
if Sampler.incrementers.get(_default_delay_s) is None:
await bus.start_task(
Sampler.increment_ohlc_buffer,
_default_delay_s,
)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)