From cd3dbc92759a73a25c2f03fc5edd327e0d015cc8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Mar 2022 13:44:12 -0400 Subject: [PATCH] Ensure bfqsn is lower cased for feed api consumers Also, Start tinkering with `tractor.trionics.ipython_embed()` In effort to get back to a usable REPL around the mkts client this adds usage of the new `tractor` integration api as well as logic for skipping backfilling if existing tsdb arrays are found. --- piker/data/feed.py | 47 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 35d006de..ff8a543a 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -192,6 +192,22 @@ async def _setup_persistent_brokerd( await trio.sleep_forever() +async def start_backfill( + mod: ModuleType, + fqsn: str, + shm: ShmArray, + + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> int: + + return await mod.backfill_bars( + fqsn, + shm, + task_status=task_status, + ) + + async def manage_history( mod: ModuleType, bus: _FeedsBus, @@ -222,7 +238,12 @@ async def manage_history( ) log.info('Scanning for existing `marketstored`') + is_up = await check_for_service('marketstored') + + # for now only do backfilling if no tsdb can be found + do_backfill = not is_up and opened + if is_up and opened: log.info('Found existing `marketstored`') from . import marketstore @@ -231,6 +252,11 @@ async def manage_history( fqsn, ) as (storage, tsdb_arrays): + # TODO: get this shit workin + from tractor.trionics import ipython_embed + await ipython_embed() + # await ipython_embed(ns=locals()) + # TODO: history validation # assert opened, f'Persistent shm for {symbol} was already open?!' # if not opened: @@ -272,16 +298,27 @@ async def manage_history( last_dt = datetime.fromtimestamp(last_s) array, next_dt = await hist(end_dt=last_dt) + else: + do_backfill = True + + # await tractor.breakpoint() some_data_ready.set() - elif opened: + if do_backfill: log.info('No existing `marketstored` found..') # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars - _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) + await bus.nursery.start( + start_backfill, + mod, + fqsn, + shm, + ) + + # _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) # yield back after client connect with filled shm task_status.started(shm) @@ -361,8 +398,10 @@ async def allocate_persistent_feed( loglevel=loglevel, ) ) - # the broker-specific fully qualified symbol name - bfqsn = init_msg[symbol]['fqsn'] + # the broker-specific fully qualified symbol name, + # but ensure it is lower-cased for external use. + bfqsn = init_msg[symbol]['fqsn'].lower() + init_msg[symbol]['fqsn'] = bfqsn # HISTORY, run 2 tasks: # - a history loader / maintainer