Compare commits
	
		
			3 Commits 
		
	
	
		
			956b974d6e
			...
			d767be9bd9
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						d767be9bd9 | |
| 
							
							
								 | 
						4e8717262c | |
| 
							
							
								 | 
						e3d87046f9 | 
| 
						 | 
				
			
			@ -289,15 +289,26 @@ def iter_by_dt(
 | 
			
		|||
            else:
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
        # XXX: should never get here..
 | 
			
		||||
        # XXX: we should never really get here bc it means some kinda
 | 
			
		||||
        # bad txn-record (field) data..
 | 
			
		||||
        #
 | 
			
		||||
        # -> set the `debug_mode = True` if you want to trace such
 | 
			
		||||
        # cases from REPL ;)
 | 
			
		||||
        else:
 | 
			
		||||
            with maybe_open_crash_handler(pdb=True):
 | 
			
		||||
                raise ValueError(
 | 
			
		||||
                    f'Invalid txn time ??\n'
 | 
			
		||||
                    f'txn-id: {k!r}\n'
 | 
			
		||||
                    f'{k!r}: {v!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
                    # assert v is not None, f'No valid value for `{k}`!?'
 | 
			
		||||
            debug_mode: bool = False
 | 
			
		||||
            report: str = (
 | 
			
		||||
                f'Invalid txn time ??\n'
 | 
			
		||||
                f'txn-id: {k!r}\n'
 | 
			
		||||
                f'{k!r}: {v!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            if debug_mode:
 | 
			
		||||
                with maybe_open_crash_handler(
 | 
			
		||||
                    pdb=debug_mode,
 | 
			
		||||
                    raise_on_exit=False,
 | 
			
		||||
                ):
 | 
			
		||||
                    raise ValueError(report)
 | 
			
		||||
            else:
 | 
			
		||||
                log.error(report)
 | 
			
		||||
 | 
			
		||||
            if _invalid is not None:
 | 
			
		||||
                _invalid.append(tx)
 | 
			
		||||
| 
						 | 
				
			
			@ -400,7 +411,10 @@ def open_ledger_dfs(
 | 
			
		|||
    can update the ledger on exit.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    with maybe_open_crash_handler(pdb=debug_mode):
 | 
			
		||||
    with maybe_open_crash_handler(
 | 
			
		||||
        pdb=debug_mode,
 | 
			
		||||
        # raise_on_exit=False,
 | 
			
		||||
    ):
 | 
			
		||||
        if not ledger:
 | 
			
		||||
            import time
 | 
			
		||||
            from ._ledger import open_trade_ledger
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -35,6 +35,7 @@ from ._anal import (
 | 
			
		|||
    dedupe as dedupe,
 | 
			
		||||
    detect_time_gaps as detect_time_gaps,
 | 
			
		||||
    pl2np as pl2np,
 | 
			
		||||
    np2pl as np2pl,
 | 
			
		||||
 | 
			
		||||
    # `numpy` only
 | 
			
		||||
    slice_from_time as slice_from_time,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -886,7 +886,7 @@ async def load_tsdb_hist(
 | 
			
		|||
    np.ndarray,
 | 
			
		||||
    DateTime,
 | 
			
		||||
    DateTime,
 | 
			
		||||
] | None:
 | 
			
		||||
]|None:
 | 
			
		||||
    # loads a (large) frame of data from the tsdb depending
 | 
			
		||||
    # on the db's query size limit; our "nativedb" (using
 | 
			
		||||
    # parquet) generally can load the entire history into mem
 | 
			
		||||
| 
						 | 
				
			
			@ -899,7 +899,7 @@ async def load_tsdb_hist(
 | 
			
		|||
        DateTime,
 | 
			
		||||
    ]
 | 
			
		||||
    try:
 | 
			
		||||
        tsdb_entry: tuple | None =  await storage.load(
 | 
			
		||||
        tsdb_entry: tuple|None =  await storage.load(
 | 
			
		||||
            fqme,
 | 
			
		||||
            timeframe=timeframe,
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -1046,12 +1046,15 @@ async def tsdb_backfill(
 | 
			
		|||
                    last_tsdb_dt,
 | 
			
		||||
                ) = tsdb_entry
 | 
			
		||||
 | 
			
		||||
            # await tractor.pause()
 | 
			
		||||
 | 
			
		||||
            # if there is a gap to backfill from the first
 | 
			
		||||
            # history frame until the last datum loaded from the tsdb
 | 
			
		||||
            # continue that now in the background
 | 
			
		||||
            async with trio.open_nursery(
 | 
			
		||||
                strict_exception_groups=False,
 | 
			
		||||
            ) as tn:
 | 
			
		||||
            async with (
 | 
			
		||||
                tractor.trionics.collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as tn,
 | 
			
		||||
            ):
 | 
			
		||||
 | 
			
		||||
                bf_done = await tn.start(
 | 
			
		||||
                    partial(
 | 
			
		||||
| 
						 | 
				
			
			@ -1322,8 +1325,14 @@ async def manage_history(
 | 
			
		|||
    # TODO: maybe it should be a subpkg of `.data`?
 | 
			
		||||
    from piker import storage
 | 
			
		||||
 | 
			
		||||
    storemod: ModuleType
 | 
			
		||||
    client: StorageClient
 | 
			
		||||
    tn: trio.Nursery
 | 
			
		||||
    async with (
 | 
			
		||||
        storage.open_storage_client() as (storemod, client),
 | 
			
		||||
        storage.open_storage_client() as (
 | 
			
		||||
            storemod,
 | 
			
		||||
            client,
 | 
			
		||||
        ),
 | 
			
		||||
 | 
			
		||||
        # NOTE: this nursery spawns a task per "timeframe" (aka
 | 
			
		||||
        # sampling period) data set since normally differently
 | 
			
		||||
| 
						 | 
				
			
			@ -1392,7 +1401,7 @@ async def manage_history(
 | 
			
		|||
            some_data_ready.set()
 | 
			
		||||
 | 
			
		||||
            # wait for a live feed before starting the sampler.
 | 
			
		||||
            await feed_is_live.wait()
 | 
			
		||||
            # await feed_is_live.wait()
 | 
			
		||||
 | 
			
		||||
            # yield back after client connect with filled shm
 | 
			
		||||
            task_status.started((
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue