Compare commits

...

3 Commits

Author SHA1 Message Date
Tyler Goodlet d767be9bd9 `.tsp._history`: drop `feed_is_live` syncing, another seg flag
The `await feed_is_live.wait()` is more or less pointless and would only
cause slower startup afaig (as-far-as-i-grok) so i'm masking it here.
This also removes the final `strict_exception_groups=False` use from the
non-tests code base, flipping to the `tractor.trionics` collapser once
and for all!
2025-10-02 19:53:08 -04:00
Tyler Goodlet 4e8717262c Woops, keep `np2pl` exposed from `.tsp` 2025-10-02 14:14:28 -04:00
Tyler Goodlet e3d87046f9 accounting.calc: `.error()` on bad txn-time fields..
Since i'm seeing IB records with a `None` value and i don't want to be
debugging every time order-mode boots up..

Also use `pdb=debug_mode` in `.open_ledger_dfs()`
2025-10-02 12:17:57 -04:00
3 changed files with 40 additions and 16 deletions

View File

@ -289,15 +289,26 @@ def iter_by_dt(
else: else:
continue continue
# XXX: should never get here.. # XXX: we should never really get here bc it means some kinda
# bad txn-record (field) data..
#
# -> set the `debug_mode = True` if you want to trace such
# cases from REPL ;)
else: else:
with maybe_open_crash_handler(pdb=True): debug_mode: bool = False
raise ValueError( report: str = (
f'Invalid txn time ??\n' f'Invalid txn time ??\n'
f'txn-id: {k!r}\n' f'txn-id: {k!r}\n'
f'{k!r}: {v!r}\n' f'{k!r}: {v!r}\n'
) )
# assert v is not None, f'No valid value for `{k}`!?' if debug_mode:
with maybe_open_crash_handler(
pdb=debug_mode,
raise_on_exit=False,
):
raise ValueError(report)
else:
log.error(report)
if _invalid is not None: if _invalid is not None:
_invalid.append(tx) _invalid.append(tx)
@ -400,7 +411,10 @@ def open_ledger_dfs(
can update the ledger on exit. can update the ledger on exit.
''' '''
with maybe_open_crash_handler(pdb=debug_mode): with maybe_open_crash_handler(
pdb=debug_mode,
# raise_on_exit=False,
):
if not ledger: if not ledger:
import time import time
from ._ledger import open_trade_ledger from ._ledger import open_trade_ledger

View File

@ -35,6 +35,7 @@ from ._anal import (
dedupe as dedupe, dedupe as dedupe,
detect_time_gaps as detect_time_gaps, detect_time_gaps as detect_time_gaps,
pl2np as pl2np, pl2np as pl2np,
np2pl as np2pl,
# `numpy` only # `numpy` only
slice_from_time as slice_from_time, slice_from_time as slice_from_time,

View File

@ -886,7 +886,7 @@ async def load_tsdb_hist(
np.ndarray, np.ndarray,
DateTime, DateTime,
DateTime, DateTime,
] | None: ]|None:
# loads a (large) frame of data from the tsdb depending # loads a (large) frame of data from the tsdb depending
# on the db's query size limit; our "nativedb" (using # on the db's query size limit; our "nativedb" (using
# parquet) generally can load the entire history into mem # parquet) generally can load the entire history into mem
@ -899,7 +899,7 @@ async def load_tsdb_hist(
DateTime, DateTime,
] ]
try: try:
tsdb_entry: tuple | None = await storage.load( tsdb_entry: tuple|None = await storage.load(
fqme, fqme,
timeframe=timeframe, timeframe=timeframe,
) )
@ -1046,12 +1046,15 @@ async def tsdb_backfill(
last_tsdb_dt, last_tsdb_dt,
) = tsdb_entry ) = tsdb_entry
# await tractor.pause()
# if there is a gap to backfill from the first # if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb # history frame until the last datum loaded from the tsdb
# continue that now in the background # continue that now in the background
async with trio.open_nursery( async with (
strict_exception_groups=False, tractor.trionics.collapse_eg(),
) as tn: trio.open_nursery() as tn,
):
bf_done = await tn.start( bf_done = await tn.start(
partial( partial(
@ -1322,8 +1325,14 @@ async def manage_history(
# TODO: maybe it should be a subpkg of `.data`? # TODO: maybe it should be a subpkg of `.data`?
from piker import storage from piker import storage
storemod: ModuleType
client: StorageClient
tn: trio.Nursery
async with ( async with (
storage.open_storage_client() as (storemod, client), storage.open_storage_client() as (
storemod,
client,
),
# NOTE: this nursery spawns a task per "timeframe" (aka # NOTE: this nursery spawns a task per "timeframe" (aka
# sampling period) data set since normally differently # sampling period) data set since normally differently
@ -1392,7 +1401,7 @@ async def manage_history(
some_data_ready.set() some_data_ready.set()
# wait for a live feed before starting the sampler. # wait for a live feed before starting the sampler.
await feed_is_live.wait() # await feed_is_live.wait()
# yield back after client connect with filled shm # yield back after client connect with filled shm
task_status.started(( task_status.started((