Ensure bfqsn is lower cased for feed api consumers

Also, Start tinkering with `tractor.trionics.ipython_embed()`

In effort to get back to a usable REPL around the mkts client
this adds usage of the new `tractor` integration api as well as logic
for skipping backfilling if existing tsdb arrays are found.
l1_precision_fix
Tyler Goodlet 2022-03-24 13:44:12 -04:00
parent 6dc6d00a9b
commit 6cdd017cd6
1 changed files with 43 additions and 4 deletions

View File

@ -192,6 +192,22 @@ async def _setup_persistent_brokerd(
await trio.sleep_forever() await trio.sleep_forever()
async def start_backfill(
mod: ModuleType,
fqsn: str,
shm: ShmArray,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> int:
return await mod.backfill_bars(
fqsn,
shm,
task_status=task_status,
)
async def manage_history( async def manage_history(
mod: ModuleType, mod: ModuleType,
bus: _FeedsBus, bus: _FeedsBus,
@ -222,7 +238,12 @@ async def manage_history(
) )
log.info('Scanning for existing `marketstored`') log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored') is_up = await check_for_service('marketstored')
# for now only do backfilling if no tsdb can be found
do_backfill = not is_up and opened
if is_up and opened: if is_up and opened:
log.info('Found existing `marketstored`') log.info('Found existing `marketstored`')
from . import marketstore from . import marketstore
@ -231,6 +252,11 @@ async def manage_history(
fqsn, fqsn,
) as (storage, tsdb_arrays): ) as (storage, tsdb_arrays):
# TODO: get this shit workin
from tractor.trionics import ipython_embed
await ipython_embed()
# await ipython_embed(ns=locals())
# TODO: history validation # TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!' # assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened: # if not opened:
@ -272,16 +298,27 @@ async def manage_history(
last_dt = datetime.fromtimestamp(last_s) last_dt = datetime.fromtimestamp(last_s)
array, next_dt = await hist(end_dt=last_dt) array, next_dt = await hist(end_dt=last_dt)
else:
do_backfill = True
# await tractor.breakpoint()
some_data_ready.set() some_data_ready.set()
elif opened: if do_backfill:
log.info('No existing `marketstored` found..') log.info('No existing `marketstored` found..')
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is # a required backend func this must block until shm is
# filled with first set of ohlc bars # filled with first set of ohlc bars
_ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) await bus.nursery.start(
start_backfill,
mod,
fqsn,
shm,
)
# _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
# yield back after client connect with filled shm # yield back after client connect with filled shm
task_status.started(shm) task_status.started(shm)
@ -361,8 +398,10 @@ async def allocate_persistent_feed(
loglevel=loglevel, loglevel=loglevel,
) )
) )
# the broker-specific fully qualified symbol name # the broker-specific fully qualified symbol name,
bfqsn = init_msg[symbol]['fqsn'] # but ensure it is lower-cased for external use.
bfqsn = init_msg[symbol]['fqsn'].lower()
init_msg[symbol]['fqsn'] = bfqsn
# HISTORY, run 2 tasks: # HISTORY, run 2 tasks:
# - a history loader / maintainer # - a history loader / maintainer