Allocate 2 shm buffers for history and real-time

As part of supporting a "history view" chart which shows downsampled
datums alongside our 1s (or higher) sampled OHLC we need a separate
buffer to store a the slower history from broker backends. This begins
that design by allocating 2 buffers:
- `rt_shm: ShmArray` which maps to a `/dev/shm/` file with `_rt` suffix
- `hist_shm: ShmArray` which maps to a file with `_hist` suffix

Deliver both of these shms back from both `manage_history()` and load
them as `Feed.rt_shm`/`.hist_shm` on the client side.

Impl deats:
- init the rt buffer with the first datum from loaded history and
  assign all OHLC values to that row's 'close' and the vlm to 0.
- pass the hist buffer to the backfiller task
- only spawn **one** global sampler array-row increment task per
  `brokerd` and pass in the 1s delay which we presume is our lowest
  OHLC sample rate for now.
- drop `open_sample_step_stream()` and just move its body contents into
  `Feed.index_stream()`
history_view
Tyler Goodlet 2022-08-30 10:53:59 -04:00
parent 60052ff73a
commit 861fe791eb
1 changed files with 101 additions and 67 deletions

View File

@ -56,6 +56,7 @@ from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
attach_shm_array, attach_shm_array,
ShmArray, ShmArray,
_secs_in_day,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
from .types import Struct from .types import Struct
@ -72,6 +73,7 @@ from ._sampling import (
iter_ohlc_periods, iter_ohlc_periods,
sample_and_broadcast, sample_and_broadcast,
uniform_rate_send, uniform_rate_send,
_default_delay_s,
) )
from ..brokers._util import ( from ..brokers._util import (
NoData, NoData,
@ -256,7 +258,7 @@ async def start_backfill(
write_tsdb: bool = True, write_tsdb: bool = True,
tsdb_is_up: bool = False, tsdb_is_up: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
) -> int: ) -> int:
@ -294,7 +296,7 @@ async def start_backfill(
bf_done = trio.Event() bf_done = trio.Event()
# let caller unblock and deliver latest history frame # let caller unblock and deliver latest history frame
task_status.started((shm, start_dt, end_dt, bf_done)) task_status.started((start_dt, end_dt, bf_done))
# based on the sample step size, maybe load a certain amount history # based on the sample step size, maybe load a certain amount history
if last_tsdb_dt is None: if last_tsdb_dt is None:
@ -544,7 +546,6 @@ async def start_backfill(
) )
frames.pop(epoch) frames.pop(epoch)
continue continue
# await tractor.breakpoint()
if diff > step_size_s: if diff > step_size_s:
@ -672,8 +673,8 @@ async def manage_history(
''' '''
# (maybe) allocate shm array for this broker/symbol which will # (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing. # be used for fast near-term history capture and processing.
shm, opened = maybe_open_shm_array( hist_shm, opened = maybe_open_shm_array(
key=fqsn, key=f'{fqsn}_hist',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -687,6 +688,21 @@ async def manage_history(
"Persistent shm for sym was already open?!" "Persistent shm for sym was already open?!"
) )
rt_shm, opened = maybe_open_shm_array(
key=f'{fqsn}_rt',
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
size=3*_secs_in_day,
)
if not opened:
raise RuntimeError(
"Persistent shm for sym was already open?!"
)
log.info('Scanning for existing `marketstored`') log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored') is_up = await check_for_service('marketstored')
@ -714,7 +730,6 @@ async def manage_history(
broker, symbol, expiry = unpack_fqsn(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn)
( (
shm,
latest_start_dt, latest_start_dt,
latest_end_dt, latest_end_dt,
bf_done, bf_done,
@ -723,14 +738,14 @@ async def manage_history(
start_backfill, start_backfill,
mod, mod,
bfqsn, bfqsn,
shm, hist_shm,
last_tsdb_dt=last_tsdb_dt, last_tsdb_dt=last_tsdb_dt,
tsdb_is_up=True, tsdb_is_up=True,
storage=storage, storage=storage,
) )
) )
# if len(shm.array) < 2: # if len(hist_shm.array) < 2:
# TODO: there's an edge case here to solve where if the last # TODO: there's an edge case here to solve where if the last
# frame before market close (at least on ib) was pushed and # frame before market close (at least on ib) was pushed and
# there was only "1 new" row pushed from the first backfill # there was only "1 new" row pushed from the first backfill
@ -740,7 +755,7 @@ async def manage_history(
# the tsdb series and stash that somewhere as meta data on # the tsdb series and stash that somewhere as meta data on
# the shm buffer?.. no se. # the shm buffer?.. no se.
task_status.started(shm) task_status.started((hist_shm, rt_shm))
some_data_ready.set() some_data_ready.set()
await bf_done.wait() await bf_done.wait()
@ -758,7 +773,7 @@ async def manage_history(
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field
prepend_start = shm._first.value prepend_start = hist_shm._first.value
# sanity check on most-recent-data loading # sanity check on most-recent-data loading
assert prepend_start > dt_diff_s assert prepend_start > dt_diff_s
@ -768,7 +783,7 @@ async def manage_history(
fastest = history[0] fastest = history[0]
to_push = fastest[:prepend_start] to_push = fastest[:prepend_start]
shm.push( hist_shm.push(
to_push, to_push,
# insert the history pre a "days worth" of samples # insert the history pre a "days worth" of samples
@ -784,7 +799,7 @@ async def manage_history(
count = 0 count = 0
end = fastest['Epoch'][0] end = fastest['Epoch'][0]
while shm._first.value > 0: while hist_shm._first.value > 0:
count += 1 count += 1
series = await storage.read_ohlcv( series = await storage.read_ohlcv(
fqsn, fqsn,
@ -796,7 +811,7 @@ async def manage_history(
prepend_start -= len(to_push) prepend_start -= len(to_push)
to_push = fastest[:prepend_start] to_push = fastest[:prepend_start]
shm.push( hist_shm.push(
to_push, to_push,
# insert the history pre a "days worth" of samples # insert the history pre a "days worth" of samples
@ -840,12 +855,12 @@ async def manage_history(
start_backfill, start_backfill,
mod, mod,
bfqsn, bfqsn,
shm, hist_shm,
) )
) )
# yield back after client connect with filled shm # yield back after client connect with filled shm
task_status.started(shm) task_status.started((hist_shm, rt_shm))
# indicate to caller that feed can be delivered to # indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history # remote requesting client since we've loaded history
@ -922,7 +937,7 @@ async def allocate_persistent_feed(
# https://github.com/python-trio/trio/issues/2258 # https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon( # bus.nursery.start_soon(
# await bus.start_task( # await bus.start_task(
shm = await bus.nursery.start( hist_shm, rt_shm = await bus.nursery.start(
manage_history, manage_history,
mod, mod,
bus, bus,
@ -935,7 +950,8 @@ async def allocate_persistent_feed(
# can read directly from the memory which will be written by # can read directly from the memory which will be written by
# this task. # this task.
msg = init_msg[symbol] msg = init_msg[symbol]
msg['shm_token'] = shm.token msg['hist_shm_token'] = hist_shm.token
msg['rt_shm_token'] = rt_shm.token
# true fqsn # true fqsn
fqsn = '.'.join((bfqsn, brokername)) fqsn = '.'.join((bfqsn, brokername))
@ -971,6 +987,25 @@ async def allocate_persistent_feed(
# for ambiguous names we simply apply the retreived # for ambiguous names we simply apply the retreived
# feed to that name (for now). # feed to that name (for now).
sampler.ohlcv_shms.setdefault(
1,
[]
).append(rt_shm)
ohlckeys = ['open', 'high', 'low', 'close']
# set the rt (hft) shm array as append only
# (for now).
rt_shm._first.value = 0
rt_shm._last.value = 0
# push last sample from history to rt buffer just as a filler datum
# but we don't want a history sized datum outlier so set vlm to zero
# and ohlc to the close value.
rt_shm.push(hist_shm.array[-2:-1])
rt_shm.array[ohlckeys] = hist_shm.array['close'][-1]
rt_shm._array['volume'] = 0
task_status.started() task_status.started()
if not start_stream: if not start_stream:
@ -982,14 +1017,18 @@ async def allocate_persistent_feed(
# start shm incrementer task for OHLC style sampling # start shm incrementer task for OHLC style sampling
# at the current detected step period. # at the current detected step period.
times = shm.array['time'] times = hist_shm.array['time']
delay_s = 1 #times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
sampler.ohlcv_shms.setdefault(delay_s, []).append(hist_shm)
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) # create buffer a single incrementer task broker backend
if sampler.incrementers.get(delay_s) is None: # (aka `brokerd`) using the lowest sampler period.
# await tractor.breakpoint()
# for delay_s in sampler.ohlcv_shms:
if sampler.incrementers.get(_default_delay_s) is None:
await bus.start_task( await bus.start_task(
increment_ohlc_buffer, increment_ohlc_buffer,
1, _default_delay_s,
) )
sum_tick_vlm: bool = init_msg.get( sum_tick_vlm: bool = init_msg.get(
@ -1000,7 +1039,7 @@ async def allocate_persistent_feed(
try: try:
await sample_and_broadcast( await sample_and_broadcast(
bus, bus,
shm, rt_shm,
quote_stream, quote_stream,
brokername, brokername,
sum_tick_vlm sum_tick_vlm
@ -1163,35 +1202,6 @@ async def open_feed_bus(
log.warning(f'{sub} for {symbol} was already removed?') log.warning(f'{sub} for {symbol} was already removed?')
@asynccontextmanager
async def open_sample_step_stream(
portal: tractor.Portal,
delay_s: int,
) -> tractor.ReceiveMsgStream:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_context(
acm_func=partial(
portal.open_context,
iter_ohlc_periods,
),
# kwargs={'delay_s': delay_s},
kwargs={'delay_s': 1},
) as (cache_hit, (ctx, first)):
async with ctx.open_stream() as istream:
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with istream.subscribe() as bistream:
yield bistream
else:
yield istream
@dataclass @dataclass
class Feed: class Feed:
''' '''
@ -1204,7 +1214,8 @@ class Feed:
''' '''
name: str name: str
shm: ShmArray hist_shm: ShmArray
rt_shm: ShmArray
mod: ModuleType mod: ModuleType
first_quotes: dict # symbol names to first quote dicts first_quotes: dict # symbol names to first quote dicts
_portal: tractor.Portal _portal: tractor.Portal
@ -1236,10 +1247,23 @@ class Feed:
delay_s = 1 #delay_s or self._max_sample_rate delay_s = 1 #delay_s or self._max_sample_rate
async with open_sample_step_stream( # XXX: this should be singleton on a host,
self.portal, # a lone broker-daemon per provider should be
delay_s, # created for all practical purposes
) as istream: async with maybe_open_context(
acm_func=partial(
self.portal.open_context,
iter_ohlc_periods,
),
kwargs={'delay_s': delay_s},
) as (cache_hit, (ctx, first)):
async with ctx.open_stream() as istream:
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with istream.subscribe() as bistream:
yield bistream
else:
yield istream yield istream
async def pause(self) -> None: async def pause(self) -> None:
@ -1338,15 +1362,21 @@ async def open_feed(
): ):
# we can only read from shm # we can only read from shm
shm = attach_shm_array( hist_shm = attach_shm_array(
token=init_msg[bfqsn]['shm_token'], token=init_msg[bfqsn]['hist_shm_token'],
readonly=True, readonly=True,
) )
rt_shm = attach_shm_array(
token=init_msg[bfqsn]['rt_shm_token'],
readonly=True,
)
assert fqsn in first_quotes assert fqsn in first_quotes
feed = Feed( feed = Feed(
name=brokername, name=brokername,
shm=shm, hist_shm=hist_shm,
rt_shm=rt_shm,
mod=mod, mod=mod,
first_quotes=first_quotes, first_quotes=first_quotes,
stream=stream, stream=stream,
@ -1364,7 +1394,7 @@ async def open_feed(
'actor_name': feed.portal.channel.uid[0], 'actor_name': feed.portal.channel.uid[0],
'host': host, 'host': host,
'port': port, 'port': port,
'shm': f'{humanize(feed.shm._shm.size)}', 'shm': f'{humanize(feed.hist_shm._shm.size)}',
'throttle_rate': feed.throttle_rate, 'throttle_rate': feed.throttle_rate,
}) })
feed.status.update(init_msg.pop('status', {})) feed.status.update(init_msg.pop('status', {}))
@ -1382,7 +1412,11 @@ async def open_feed(
feed.symbols[sym] = symbol feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this # cast shm dtype to list... can't member why we need this
shm_token = data['shm_token'] for shm_key, shm in [
('rt_shm_token', rt_shm),
('hist_shm_token', hist_shm),
]:
shm_token = data[shm_key]
# XXX: msgspec won't relay through the tuples XD # XXX: msgspec won't relay through the tuples XD
shm_token['dtype_descr'] = tuple( shm_token['dtype_descr'] = tuple(