Compare commits
3 Commits
956b974d6e
...
d767be9bd9
| Author | SHA1 | Date |
|---|---|---|
|
|
d767be9bd9 | |
|
|
4e8717262c | |
|
|
e3d87046f9 |
|
|
@ -289,15 +289,26 @@ def iter_by_dt(
|
||||||
else:
|
else:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# XXX: should never get here..
|
# XXX: we should never really get here bc it means some kinda
|
||||||
|
# bad txn-record (field) data..
|
||||||
|
#
|
||||||
|
# -> set the `debug_mode = True` if you want to trace such
|
||||||
|
# cases from REPL ;)
|
||||||
else:
|
else:
|
||||||
with maybe_open_crash_handler(pdb=True):
|
debug_mode: bool = False
|
||||||
raise ValueError(
|
report: str = (
|
||||||
f'Invalid txn time ??\n'
|
f'Invalid txn time ??\n'
|
||||||
f'txn-id: {k!r}\n'
|
f'txn-id: {k!r}\n'
|
||||||
f'{k!r}: {v!r}\n'
|
f'{k!r}: {v!r}\n'
|
||||||
)
|
)
|
||||||
# assert v is not None, f'No valid value for `{k}`!?'
|
if debug_mode:
|
||||||
|
with maybe_open_crash_handler(
|
||||||
|
pdb=debug_mode,
|
||||||
|
raise_on_exit=False,
|
||||||
|
):
|
||||||
|
raise ValueError(report)
|
||||||
|
else:
|
||||||
|
log.error(report)
|
||||||
|
|
||||||
if _invalid is not None:
|
if _invalid is not None:
|
||||||
_invalid.append(tx)
|
_invalid.append(tx)
|
||||||
|
|
@ -400,7 +411,10 @@ def open_ledger_dfs(
|
||||||
can update the ledger on exit.
|
can update the ledger on exit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
with maybe_open_crash_handler(pdb=debug_mode):
|
with maybe_open_crash_handler(
|
||||||
|
pdb=debug_mode,
|
||||||
|
# raise_on_exit=False,
|
||||||
|
):
|
||||||
if not ledger:
|
if not ledger:
|
||||||
import time
|
import time
|
||||||
from ._ledger import open_trade_ledger
|
from ._ledger import open_trade_ledger
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ from ._anal import (
|
||||||
dedupe as dedupe,
|
dedupe as dedupe,
|
||||||
detect_time_gaps as detect_time_gaps,
|
detect_time_gaps as detect_time_gaps,
|
||||||
pl2np as pl2np,
|
pl2np as pl2np,
|
||||||
|
np2pl as np2pl,
|
||||||
|
|
||||||
# `numpy` only
|
# `numpy` only
|
||||||
slice_from_time as slice_from_time,
|
slice_from_time as slice_from_time,
|
||||||
|
|
|
||||||
|
|
@ -886,7 +886,7 @@ async def load_tsdb_hist(
|
||||||
np.ndarray,
|
np.ndarray,
|
||||||
DateTime,
|
DateTime,
|
||||||
DateTime,
|
DateTime,
|
||||||
] | None:
|
]|None:
|
||||||
# loads a (large) frame of data from the tsdb depending
|
# loads a (large) frame of data from the tsdb depending
|
||||||
# on the db's query size limit; our "nativedb" (using
|
# on the db's query size limit; our "nativedb" (using
|
||||||
# parquet) generally can load the entire history into mem
|
# parquet) generally can load the entire history into mem
|
||||||
|
|
@ -899,7 +899,7 @@ async def load_tsdb_hist(
|
||||||
DateTime,
|
DateTime,
|
||||||
]
|
]
|
||||||
try:
|
try:
|
||||||
tsdb_entry: tuple | None = await storage.load(
|
tsdb_entry: tuple|None = await storage.load(
|
||||||
fqme,
|
fqme,
|
||||||
timeframe=timeframe,
|
timeframe=timeframe,
|
||||||
)
|
)
|
||||||
|
|
@ -1046,12 +1046,15 @@ async def tsdb_backfill(
|
||||||
last_tsdb_dt,
|
last_tsdb_dt,
|
||||||
) = tsdb_entry
|
) = tsdb_entry
|
||||||
|
|
||||||
|
# await tractor.pause()
|
||||||
|
|
||||||
# if there is a gap to backfill from the first
|
# if there is a gap to backfill from the first
|
||||||
# history frame until the last datum loaded from the tsdb
|
# history frame until the last datum loaded from the tsdb
|
||||||
# continue that now in the background
|
# continue that now in the background
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
tractor.trionics.collapse_eg(),
|
||||||
) as tn:
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
|
||||||
bf_done = await tn.start(
|
bf_done = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
|
|
@ -1322,8 +1325,14 @@ async def manage_history(
|
||||||
# TODO: maybe it should be a subpkg of `.data`?
|
# TODO: maybe it should be a subpkg of `.data`?
|
||||||
from piker import storage
|
from piker import storage
|
||||||
|
|
||||||
|
storemod: ModuleType
|
||||||
|
client: StorageClient
|
||||||
|
tn: trio.Nursery
|
||||||
async with (
|
async with (
|
||||||
storage.open_storage_client() as (storemod, client),
|
storage.open_storage_client() as (
|
||||||
|
storemod,
|
||||||
|
client,
|
||||||
|
),
|
||||||
|
|
||||||
# NOTE: this nursery spawns a task per "timeframe" (aka
|
# NOTE: this nursery spawns a task per "timeframe" (aka
|
||||||
# sampling period) data set since normally differently
|
# sampling period) data set since normally differently
|
||||||
|
|
@ -1392,7 +1401,7 @@ async def manage_history(
|
||||||
some_data_ready.set()
|
some_data_ready.set()
|
||||||
|
|
||||||
# wait for a live feed before starting the sampler.
|
# wait for a live feed before starting the sampler.
|
||||||
await feed_is_live.wait()
|
# await feed_is_live.wait()
|
||||||
|
|
||||||
# yield back after client connect with filled shm
|
# yield back after client connect with filled shm
|
||||||
task_status.started((
|
task_status.started((
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue