diff --git a/piker/data/feed.py b/piker/data/feed.py index a29f337a..ef027322 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -56,6 +56,7 @@ from ._sharedmem import ( maybe_open_shm_array, attach_shm_array, ShmArray, + _secs_in_day, ) from .ingest import get_ingestormod from .types import Struct @@ -72,6 +73,7 @@ from ._sampling import ( iter_ohlc_periods, sample_and_broadcast, uniform_rate_send, + _default_delay_s, ) from ..brokers._util import ( NoData, @@ -256,7 +258,7 @@ async def start_backfill( write_tsdb: bool = True, tsdb_is_up: bool = False, - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, ) -> int: @@ -294,7 +296,7 @@ async def start_backfill( bf_done = trio.Event() # let caller unblock and deliver latest history frame - task_status.started((shm, start_dt, end_dt, bf_done)) + task_status.started((start_dt, end_dt, bf_done)) # based on the sample step size, maybe load a certain amount history if last_tsdb_dt is None: @@ -544,7 +546,6 @@ async def start_backfill( ) frames.pop(epoch) continue - # await tractor.breakpoint() if diff > step_size_s: @@ -672,8 +673,8 @@ async def manage_history( ''' # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. - shm, opened = maybe_open_shm_array( - key=fqsn, + hist_shm, opened = maybe_open_shm_array( + key=f'{fqsn}_hist', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -687,6 +688,21 @@ async def manage_history( "Persistent shm for sym was already open?!" ) + rt_shm, opened = maybe_open_shm_array( + key=f'{fqsn}_rt', + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + size=3*_secs_in_day, + ) + if not opened: + raise RuntimeError( + "Persistent shm for sym was already open?!" + ) + log.info('Scanning for existing `marketstored`') is_up = await check_for_service('marketstored') @@ -714,7 +730,6 @@ async def manage_history( broker, symbol, expiry = unpack_fqsn(fqsn) ( - shm, latest_start_dt, latest_end_dt, bf_done, @@ -723,14 +738,14 @@ async def manage_history( start_backfill, mod, bfqsn, - shm, + hist_shm, last_tsdb_dt=last_tsdb_dt, tsdb_is_up=True, storage=storage, ) ) - # if len(shm.array) < 2: + # if len(hist_shm.array) < 2: # TODO: there's an edge case here to solve where if the last # frame before market close (at least on ib) was pushed and # there was only "1 new" row pushed from the first backfill @@ -740,7 +755,7 @@ async def manage_history( # the tsdb series and stash that somewhere as meta data on # the shm buffer?.. no se. - task_status.started(shm) + task_status.started((hist_shm, rt_shm)) some_data_ready.set() await bf_done.wait() @@ -758,7 +773,7 @@ async def manage_history( # TODO: see if there's faster multi-field reads: # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # re-index with a `time` and index field - prepend_start = shm._first.value + prepend_start = hist_shm._first.value # sanity check on most-recent-data loading assert prepend_start > dt_diff_s @@ -768,7 +783,7 @@ async def manage_history( fastest = history[0] to_push = fastest[:prepend_start] - shm.push( + hist_shm.push( to_push, # insert the history pre a "days worth" of samples @@ -784,7 +799,7 @@ async def manage_history( count = 0 end = fastest['Epoch'][0] - while shm._first.value > 0: + while hist_shm._first.value > 0: count += 1 series = await storage.read_ohlcv( fqsn, @@ -796,7 +811,7 @@ async def manage_history( prepend_start -= len(to_push) to_push = fastest[:prepend_start] - shm.push( + hist_shm.push( to_push, # insert the history pre a "days worth" of samples @@ -840,12 +855,12 @@ async def manage_history( start_backfill, mod, bfqsn, - shm, + hist_shm, ) ) # yield back after client connect with filled shm - task_status.started(shm) + task_status.started((hist_shm, rt_shm)) # indicate to caller that feed can be delivered to # remote requesting client since we've loaded history @@ -922,7 +937,7 @@ async def allocate_persistent_feed( # https://github.com/python-trio/trio/issues/2258 # bus.nursery.start_soon( # await bus.start_task( - shm = await bus.nursery.start( + hist_shm, rt_shm = await bus.nursery.start( manage_history, mod, bus, @@ -935,7 +950,8 @@ async def allocate_persistent_feed( # can read directly from the memory which will be written by # this task. msg = init_msg[symbol] - msg['shm_token'] = shm.token + msg['hist_shm_token'] = hist_shm.token + msg['rt_shm_token'] = rt_shm.token # true fqsn fqsn = '.'.join((bfqsn, brokername)) @@ -971,6 +987,25 @@ async def allocate_persistent_feed( # for ambiguous names we simply apply the retreived # feed to that name (for now). + sampler.ohlcv_shms.setdefault( + 1, + [] + ).append(rt_shm) + ohlckeys = ['open', 'high', 'low', 'close'] + + # set the rt (hft) shm array as append only + # (for now). + rt_shm._first.value = 0 + rt_shm._last.value = 0 + + # push last sample from history to rt buffer just as a filler datum + # but we don't want a history sized datum outlier so set vlm to zero + # and ohlc to the close value. + rt_shm.push(hist_shm.array[-2:-1]) + + rt_shm.array[ohlckeys] = hist_shm.array['close'][-1] + rt_shm._array['volume'] = 0 + task_status.started() if not start_stream: @@ -982,14 +1017,18 @@ async def allocate_persistent_feed( # start shm incrementer task for OHLC style sampling # at the current detected step period. - times = shm.array['time'] - delay_s = 1 #times[-1] - times[times != times[-1]][-1] + times = hist_shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + sampler.ohlcv_shms.setdefault(delay_s, []).append(hist_shm) - sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) - if sampler.incrementers.get(delay_s) is None: + # create buffer a single incrementer task broker backend + # (aka `brokerd`) using the lowest sampler period. + # await tractor.breakpoint() + # for delay_s in sampler.ohlcv_shms: + if sampler.incrementers.get(_default_delay_s) is None: await bus.start_task( increment_ohlc_buffer, - 1, + _default_delay_s, ) sum_tick_vlm: bool = init_msg.get( @@ -1000,7 +1039,7 @@ async def allocate_persistent_feed( try: await sample_and_broadcast( bus, - shm, + rt_shm, quote_stream, brokername, sum_tick_vlm @@ -1163,35 +1202,6 @@ async def open_feed_bus( log.warning(f'{sub} for {symbol} was already removed?') -@asynccontextmanager -async def open_sample_step_stream( - portal: tractor.Portal, - delay_s: int, - -) -> tractor.ReceiveMsgStream: - - # XXX: this should be singleton on a host, - # a lone broker-daemon per provider should be - # created for all practical purposes - async with maybe_open_context( - acm_func=partial( - portal.open_context, - iter_ohlc_periods, - ), - # kwargs={'delay_s': delay_s}, - kwargs={'delay_s': 1}, - - ) as (cache_hit, (ctx, first)): - async with ctx.open_stream() as istream: - if cache_hit: - # add a new broadcast subscription for the quote stream - # if this feed is likely already in use - async with istream.subscribe() as bistream: - yield bistream - else: - yield istream - - @dataclass class Feed: ''' @@ -1204,7 +1214,8 @@ class Feed: ''' name: str - shm: ShmArray + hist_shm: ShmArray + rt_shm: ShmArray mod: ModuleType first_quotes: dict # symbol names to first quote dicts _portal: tractor.Portal @@ -1236,11 +1247,24 @@ class Feed: delay_s = 1 #delay_s or self._max_sample_rate - async with open_sample_step_stream( - self.portal, - delay_s, - ) as istream: - yield istream + # XXX: this should be singleton on a host, + # a lone broker-daemon per provider should be + # created for all practical purposes + async with maybe_open_context( + acm_func=partial( + self.portal.open_context, + iter_ohlc_periods, + ), + kwargs={'delay_s': delay_s}, + ) as (cache_hit, (ctx, first)): + async with ctx.open_stream() as istream: + if cache_hit: + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + async with istream.subscribe() as bistream: + yield bistream + else: + yield istream async def pause(self) -> None: await self.stream.send('pause') @@ -1338,15 +1362,21 @@ async def open_feed( ): # we can only read from shm - shm = attach_shm_array( - token=init_msg[bfqsn]['shm_token'], + hist_shm = attach_shm_array( + token=init_msg[bfqsn]['hist_shm_token'], readonly=True, ) + rt_shm = attach_shm_array( + token=init_msg[bfqsn]['rt_shm_token'], + readonly=True, + ) + assert fqsn in first_quotes feed = Feed( name=brokername, - shm=shm, + hist_shm=hist_shm, + rt_shm=rt_shm, mod=mod, first_quotes=first_quotes, stream=stream, @@ -1364,7 +1394,7 @@ async def open_feed( 'actor_name': feed.portal.channel.uid[0], 'host': host, 'port': port, - 'shm': f'{humanize(feed.shm._shm.size)}', + 'shm': f'{humanize(feed.hist_shm._shm.size)}', 'throttle_rate': feed.throttle_rate, }) feed.status.update(init_msg.pop('status', {})) @@ -1382,13 +1412,17 @@ async def open_feed( feed.symbols[sym] = symbol # cast shm dtype to list... can't member why we need this - shm_token = data['shm_token'] + for shm_key, shm in [ + ('rt_shm_token', rt_shm), + ('hist_shm_token', hist_shm), + ]: + shm_token = data[shm_key] - # XXX: msgspec won't relay through the tuples XD - shm_token['dtype_descr'] = tuple( - map(tuple, shm_token['dtype_descr'])) + # XXX: msgspec won't relay through the tuples XD + shm_token['dtype_descr'] = tuple( + map(tuple, shm_token['dtype_descr'])) - assert shm_token == shm.token # sanity + assert shm_token == shm.token # sanity feed._max_sample_rate = 1