Port feed layer to use new `samplerd` APIs

Always use `open_sample_stream()` to register fast and slow quote feed
buffers and get a sampler stream which we use to trigger
`Sampler.broadcast_all()` calls on the service side after backfill
events.
samplerd_service
Tyler Goodlet 2023-01-04 23:03:43 -05:00
parent 5ec1a72a3d
commit b3d1b1aa63
1 changed files with 82 additions and 79 deletions

View File

@ -74,10 +74,9 @@ from ._source import (
) )
from ..ui import _search from ..ui import _search
from ._sampling import ( from ._sampling import (
Sampler, open_sample_stream,
sample_and_broadcast, sample_and_broadcast,
uniform_rate_send, uniform_rate_send,
_default_delay_s,
) )
from ..brokers._util import ( from ..brokers._util import (
DataUnavailable, DataUnavailable,
@ -277,6 +276,7 @@ async def start_backfill(
bfqsn: str, bfqsn: str,
shm: ShmArray, shm: ShmArray,
timeframe: float, timeframe: float,
sampler_stream: tractor.MsgStream,
last_tsdb_dt: Optional[datetime] = None, last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None, storage: Optional[Storage] = None,
@ -325,7 +325,7 @@ async def start_backfill(
# TODO: *** THIS IS A BUG *** # TODO: *** THIS IS A BUG ***
# we need to only broadcast to subscribers for this fqsn.. # we need to only broadcast to subscribers for this fqsn..
# otherwise all fsps get reset on every chart.. # otherwise all fsps get reset on every chart..
await Sampler.broadcast_all() await sampler_stream.send('broadcast_all')
# signal that backfilling to tsdb's end datum is complete # signal that backfilling to tsdb's end datum is complete
bf_done = trio.Event() bf_done = trio.Event()
@ -493,7 +493,7 @@ async def start_backfill(
# in the block above to avoid entering new ``frames`` # in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to # values while we're pipelining the current ones to
# memory... # memory...
await Sampler.broadcast_all() await sampler_stream.send('broadcast_all')
# short-circuit (for now) # short-circuit (for now)
bf_done.set() bf_done.set()
@ -504,6 +504,7 @@ async def basic_backfill(
mod: ModuleType, mod: ModuleType,
bfqsn: str, bfqsn: str,
shms: dict[int, ShmArray], shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream,
) -> None: ) -> None:
@ -521,7 +522,8 @@ async def basic_backfill(
mod, mod,
bfqsn, bfqsn,
shm, shm,
timeframe=timeframe, timeframe,
sampler_stream,
) )
) )
except DataUnavailable: except DataUnavailable:
@ -537,6 +539,7 @@ async def tsdb_backfill(
fqsn: str, fqsn: str,
bfqsn: str, bfqsn: str,
shms: dict[int, ShmArray], shms: dict[int, ShmArray],
sampler_stream: tractor.MsgStream,
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ShmArray, ShmArray] tuple[ShmArray, ShmArray]
@ -569,7 +572,8 @@ async def tsdb_backfill(
mod, mod,
bfqsn, bfqsn,
shm, shm,
timeframe=timeframe, timeframe,
sampler_stream,
last_tsdb_dt=last_tsdb_dt, last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True, tsdb_is_up=True,
storage=storage, storage=storage,
@ -734,7 +738,7 @@ async def tsdb_backfill(
# (usually a chart showing graphics for said fsp) # (usually a chart showing graphics for said fsp)
# which tells the chart to conduct a manual full # which tells the chart to conduct a manual full
# graphics loop cycle. # graphics loop cycle.
await Sampler.broadcast_all() await sampler_stream.send('broadcast_all')
# TODO: write new data to tsdb to be ready to for next read. # TODO: write new data to tsdb to be ready to for next read.
@ -823,6 +827,17 @@ async def manage_history(
"Persistent shm for sym was already open?!" "Persistent shm for sym was already open?!"
) )
# register 1s and 1m buffers with the global incrementer task
async with open_sample_stream(
period_s=1,
cache_key=fqsn,
shms_by_period={
1.: rt_shm.token,
60.: hist_shm.token,
},
open_index_stream=True,
) as sample_stream:
log.info('Scanning for existing `marketstored`') log.info('Scanning for existing `marketstored`')
tsdb_is_up = await check_for_service('marketstored') tsdb_is_up = await check_for_service('marketstored')
@ -841,7 +856,11 @@ async def manage_history(
async with ( async with (
marketstore.open_storage_client(fqsn)as storage, marketstore.open_storage_client(fqsn)as storage,
): ):
hist_shm, rt_shm = await bus.nursery.start( # TODO: drop returning the output that we pass in?
(
hist_shm,
rt_shm,
) = await bus.nursery.start(
tsdb_backfill, tsdb_backfill,
mod, mod,
marketstore, marketstore,
@ -853,6 +872,7 @@ async def manage_history(
1: rt_shm, 1: rt_shm,
60: hist_shm, 60: hist_shm,
}, },
sample_stream,
) )
# yield back after client connect with filled shm # yield back after client connect with filled shm
@ -868,9 +888,9 @@ async def manage_history(
# data that can be used. # data that can be used.
some_data_ready.set() some_data_ready.set()
# history retreival loop depending on user interaction and thus # history retreival loop depending on user interaction
# a small RPC-prot for remotely controllinlg what data is loaded # and thus a small RPC-prot for remotely controllinlg
# for viewing. # what data is loaded for viewing.
await trio.sleep_forever() await trio.sleep_forever()
# load less history if no tsdb can be found # load less history if no tsdb can be found
@ -882,10 +902,11 @@ async def manage_history(
bus, bus,
mod, mod,
bfqsn, bfqsn,
shms={ {
1: rt_shm, 1: rt_shm,
60: hist_shm, 60: hist_shm,
}, },
sample_stream,
) )
task_status.started(( task_status.started((
hist_zero_index, hist_zero_index,
@ -997,6 +1018,7 @@ async def allocate_persistent_feed(
# https://github.com/python-trio/trio/issues/2258 # https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon( # bus.nursery.start_soon(
# await bus.start_task( # await bus.start_task(
( (
izero_hist, izero_hist,
hist_shm, hist_shm,
@ -1030,13 +1052,6 @@ async def allocate_persistent_feed(
# feed to that name (for now). # feed to that name (for now).
bus.feeds[symstr] = bus.feeds[bfqsn] = flume bus.feeds[symstr] = bus.feeds[bfqsn] = flume
# insert 1s ohlc into the increment buffer set
# to update and shift every second
Sampler.ohlcv_shms.setdefault(
1,
[]
).append(rt_shm)
task_status.started() task_status.started()
if not start_stream: if not start_stream:
@ -1046,18 +1061,6 @@ async def allocate_persistent_feed(
# the backend will indicate when real-time quotes have begun. # the backend will indicate when real-time quotes have begun.
await feed_is_live.wait() await feed_is_live.wait()
# insert 1m ohlc into the increment buffer set
# to shift every 60s.
Sampler.ohlcv_shms.setdefault(60, []).append(hist_shm)
# create buffer a single incrementer task broker backend
# (aka `brokerd`) using the lowest sampler period.
if Sampler.incrementers.get(_default_delay_s) is None:
await bus.start_task(
Sampler.increment_ohlc_buffer,
_default_delay_s,
)
sum_tick_vlm: bool = init_msg.get( sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {} 'shm_write_opts', {}
).get('sum_tick_vlm', True) ).get('sum_tick_vlm', True)