Start tinkering with `tractor.trionics.ipython_embed()`
In effort to get back to a usable REPL around the mkts client this adds usage of the new `tractor` integration api as well as logic for skipping backfilling if existing tsdb arrays are found.mkts_backup
parent
4d2b5f9196
commit
544c6c3180
|
@ -192,6 +192,22 @@ async def _setup_persistent_brokerd(
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
async def start_backfill(
|
||||||
|
mod: ModuleType,
|
||||||
|
fqsn: str,
|
||||||
|
shm: ShmArray,
|
||||||
|
|
||||||
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
) -> int:
|
||||||
|
|
||||||
|
return await mod.backfill_bars(
|
||||||
|
fqsn,
|
||||||
|
shm,
|
||||||
|
task_status=task_status,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def manage_history(
|
async def manage_history(
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
bus: _FeedsBus,
|
bus: _FeedsBus,
|
||||||
|
@ -222,7 +238,12 @@ async def manage_history(
|
||||||
)
|
)
|
||||||
|
|
||||||
log.info('Scanning for existing `marketstored`')
|
log.info('Scanning for existing `marketstored`')
|
||||||
|
|
||||||
is_up = await check_for_service('marketstored')
|
is_up = await check_for_service('marketstored')
|
||||||
|
|
||||||
|
# for now only do backfilling if no tsdb can be found
|
||||||
|
do_backfill = not is_up and opened
|
||||||
|
|
||||||
if is_up and opened:
|
if is_up and opened:
|
||||||
log.info('Found existing `marketstored`')
|
log.info('Found existing `marketstored`')
|
||||||
from . import marketstore
|
from . import marketstore
|
||||||
|
@ -231,6 +252,11 @@ async def manage_history(
|
||||||
fqsn,
|
fqsn,
|
||||||
) as (storage, tsdb_arrays):
|
) as (storage, tsdb_arrays):
|
||||||
|
|
||||||
|
# TODO: get this shit workin
|
||||||
|
from tractor.trionics import ipython_embed
|
||||||
|
await ipython_embed()
|
||||||
|
# await ipython_embed(ns=locals())
|
||||||
|
|
||||||
# TODO: history validation
|
# TODO: history validation
|
||||||
# assert opened, f'Persistent shm for {symbol} was already open?!'
|
# assert opened, f'Persistent shm for {symbol} was already open?!'
|
||||||
# if not opened:
|
# if not opened:
|
||||||
|
@ -238,6 +264,7 @@ async def manage_history(
|
||||||
# "Persistent shm for sym was already open?!"
|
# "Persistent shm for sym was already open?!"
|
||||||
# )
|
# )
|
||||||
|
|
||||||
|
|
||||||
if tsdb_arrays:
|
if tsdb_arrays:
|
||||||
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
||||||
fastest = list(tsdb_arrays[fqsn].values())[0]
|
fastest = list(tsdb_arrays[fqsn].values())[0]
|
||||||
|
@ -272,16 +299,28 @@ async def manage_history(
|
||||||
|
|
||||||
last_dt = datetime.fromtimestamp(last_s)
|
last_dt = datetime.fromtimestamp(last_s)
|
||||||
array, next_dt = await hist(end_dt=last_dt)
|
array, next_dt = await hist(end_dt=last_dt)
|
||||||
|
else:
|
||||||
|
do_backfill = True
|
||||||
|
|
||||||
|
|
||||||
|
# await tractor.breakpoint()
|
||||||
|
|
||||||
some_data_ready.set()
|
some_data_ready.set()
|
||||||
|
|
||||||
elif opened:
|
if do_backfill:
|
||||||
log.info('No existing `marketstored` found..')
|
log.info('No existing `marketstored` found..')
|
||||||
|
|
||||||
# start history backfill task ``backfill_bars()`` is
|
# start history backfill task ``backfill_bars()`` is
|
||||||
# a required backend func this must block until shm is
|
# a required backend func this must block until shm is
|
||||||
# filled with first set of ohlc bars
|
# filled with first set of ohlc bars
|
||||||
_ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
|
await bus.nursery.start(
|
||||||
|
start_backfill,
|
||||||
|
mod,
|
||||||
|
fqsn,
|
||||||
|
shm,
|
||||||
|
)
|
||||||
|
|
||||||
|
# _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm)
|
||||||
|
|
||||||
# yield back after client connect with filled shm
|
# yield back after client connect with filled shm
|
||||||
task_status.started(shm)
|
task_status.started(shm)
|
||||||
|
@ -366,7 +405,6 @@ async def allocate_persistent_feed(
|
||||||
bfqsn = init_msg[symbol]['fqsn'].lower()
|
bfqsn = init_msg[symbol]['fqsn'].lower()
|
||||||
init_msg[symbol]['fqsn'] = bfqsn
|
init_msg[symbol]['fqsn'] = bfqsn
|
||||||
|
|
||||||
|
|
||||||
# HISTORY, run 2 tasks:
|
# HISTORY, run 2 tasks:
|
||||||
# - a history loader / maintainer
|
# - a history loader / maintainer
|
||||||
# - a real-time streamer which consumers and sends new data to any
|
# - a real-time streamer which consumers and sends new data to any
|
||||||
|
|
Loading…
Reference in New Issue