Compare commits
	
		
			2 Commits 
		
	
	
		
			6555ccfbba
			...
			9232d09440
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						9232d09440 | |
| 
							
							
								 | 
						f96bd51442 | 
| 
						 | 
					@ -44,8 +44,10 @@ import trio
 | 
				
			||||||
from trio_typing import TaskStatus
 | 
					from trio_typing import TaskStatus
 | 
				
			||||||
import tractor
 | 
					import tractor
 | 
				
			||||||
from pendulum import (
 | 
					from pendulum import (
 | 
				
			||||||
 | 
					    Interval,
 | 
				
			||||||
    DateTime,
 | 
					    DateTime,
 | 
				
			||||||
    Duration,
 | 
					    Duration,
 | 
				
			||||||
 | 
					    duration as mk_duration,
 | 
				
			||||||
    from_timestamp,
 | 
					    from_timestamp,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
import numpy as np
 | 
					import numpy as np
 | 
				
			||||||
| 
						 | 
					@ -214,7 +216,8 @@ async def maybe_fill_null_segments(
 | 
				
			||||||
        # pair, immediately stop backfilling?
 | 
					        # pair, immediately stop backfilling?
 | 
				
			||||||
        if (
 | 
					        if (
 | 
				
			||||||
            start_dt
 | 
					            start_dt
 | 
				
			||||||
            and end_dt < start_dt
 | 
					            and
 | 
				
			||||||
 | 
					            end_dt < start_dt
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
            await tractor.pause()
 | 
					            await tractor.pause()
 | 
				
			||||||
            break
 | 
					            break
 | 
				
			||||||
| 
						 | 
					@ -262,6 +265,7 @@ async def maybe_fill_null_segments(
 | 
				
			||||||
        except tractor.ContextCancelled:
 | 
					        except tractor.ContextCancelled:
 | 
				
			||||||
            # log.exception
 | 
					            # log.exception
 | 
				
			||||||
            await tractor.pause()
 | 
					            await tractor.pause()
 | 
				
			||||||
 | 
					            raise
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    null_segs_detected.set()
 | 
					    null_segs_detected.set()
 | 
				
			||||||
    # RECHECK for more null-gaps
 | 
					    # RECHECK for more null-gaps
 | 
				
			||||||
| 
						 | 
					@ -349,7 +353,7 @@ async def maybe_fill_null_segments(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def start_backfill(
 | 
					async def start_backfill(
 | 
				
			||||||
    get_hist,
 | 
					    get_hist,
 | 
				
			||||||
    frame_types: dict[str, Duration] | None,
 | 
					    def_frame_duration: Duration,
 | 
				
			||||||
    mod: ModuleType,
 | 
					    mod: ModuleType,
 | 
				
			||||||
    mkt: MktPair,
 | 
					    mkt: MktPair,
 | 
				
			||||||
    shm: ShmArray,
 | 
					    shm: ShmArray,
 | 
				
			||||||
| 
						 | 
					@ -379,22 +383,23 @@ async def start_backfill(
 | 
				
			||||||
        update_start_on_prepend: bool = False
 | 
					        update_start_on_prepend: bool = False
 | 
				
			||||||
        if backfill_until_dt is None:
 | 
					        if backfill_until_dt is None:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # TODO: drop this right and just expose the backfill
 | 
					            # TODO: per-provider default history-durations?
 | 
				
			||||||
            # limits inside a [storage] section in conf.toml?
 | 
					            # -[ ] inside the `open_history_client()` config allow
 | 
				
			||||||
            # when no tsdb "last datum" is provided, we just load
 | 
					            #    declaring the history duration limits instead of
 | 
				
			||||||
            # some near-term history.
 | 
					            #    guessing and/or applying the same limits to all?
 | 
				
			||||||
            # periods = {
 | 
					            #
 | 
				
			||||||
            #     1: {'days': 1},
 | 
					            # -[ ] allow declaring (default) per-provider backfill
 | 
				
			||||||
            #     60: {'days': 14},
 | 
					            #     limits inside a [storage] sub-section in conf.toml?
 | 
				
			||||||
            # }
 | 
					            #
 | 
				
			||||||
 | 
					            # NOTE, when no tsdb "last datum" is provided, we just
 | 
				
			||||||
            # do a decently sized backfill and load it into storage.
 | 
					            # load some near-term history by presuming a "decently
 | 
				
			||||||
 | 
					            # large" 60s duration limit and a much shorter 1s range.
 | 
				
			||||||
            periods = {
 | 
					            periods = {
 | 
				
			||||||
                1: {'days': 2},
 | 
					                1: {'days': 2},
 | 
				
			||||||
                60: {'years': 6},
 | 
					                60: {'years': 6},
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            period_duration: int = periods[timeframe]
 | 
					            period_duration: int = periods[timeframe]
 | 
				
			||||||
            update_start_on_prepend = True
 | 
					            update_start_on_prepend: bool = True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # NOTE: manually set the "latest" datetime which we intend to
 | 
					            # NOTE: manually set the "latest" datetime which we intend to
 | 
				
			||||||
            # backfill history "until" so as to adhere to the history
 | 
					            # backfill history "until" so as to adhere to the history
 | 
				
			||||||
| 
						 | 
					@ -416,7 +421,6 @@ async def start_backfill(
 | 
				
			||||||
                f'backfill_until_dt: {backfill_until_dt}\n'
 | 
					                f'backfill_until_dt: {backfill_until_dt}\n'
 | 
				
			||||||
                f'last_start_dt: {last_start_dt}\n'
 | 
					                f'last_start_dt: {last_start_dt}\n'
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                (
 | 
					                (
 | 
				
			||||||
                    array,
 | 
					                    array,
 | 
				
			||||||
| 
						 | 
					@ -426,37 +430,58 @@ async def start_backfill(
 | 
				
			||||||
                    timeframe,
 | 
					                    timeframe,
 | 
				
			||||||
                    end_dt=last_start_dt,
 | 
					                    end_dt=last_start_dt,
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					 | 
				
			||||||
            except NoData as _daterr:
 | 
					            except NoData as _daterr:
 | 
				
			||||||
                # 3 cases:
 | 
					                orig_last_start_dt: datetime = last_start_dt
 | 
				
			||||||
                # - frame in the middle of a legit venue gap
 | 
					                gap_report: str = (
 | 
				
			||||||
                # - history actually began at the `last_start_dt`
 | 
					                    f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
 | 
				
			||||||
                # - some other unknown error (ib blocking the
 | 
					                    f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
 | 
				
			||||||
                #   history bc they don't want you seeing how they
 | 
					                    f'last_start_dt: {orig_last_start_dt}\n\n'
 | 
				
			||||||
                #   cucked all the tinas..)
 | 
					                    f'bf_until: {backfill_until_dt}\n'
 | 
				
			||||||
                if dur := frame_types.get(timeframe):
 | 
					 | 
				
			||||||
                    # decrement by a frame's worth of duration and
 | 
					 | 
				
			||||||
                    # retry a few times.
 | 
					 | 
				
			||||||
                    last_start_dt.subtract(
 | 
					 | 
				
			||||||
                        seconds=dur.total_seconds()
 | 
					 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
                    log.warning(
 | 
					                # EMPTY FRAME signal with 3 (likely) causes:
 | 
				
			||||||
                        f'{mod.name} -> EMPTY FRAME for end_dt?\n'
 | 
					                #
 | 
				
			||||||
                        f'tf@fqme: {timeframe}@{mkt.fqme}\n'
 | 
					                # 1. range contains legit gap in venue history
 | 
				
			||||||
                        'bf_until <- last_start_dt:\n'
 | 
					                # 2. history actually (edge case) **began** at the
 | 
				
			||||||
                        f'{backfill_until_dt} <- {last_start_dt}\n'
 | 
					                #    value `last_start_dt`
 | 
				
			||||||
                        f'Decrementing `end_dt` by {dur} and retry..\n'
 | 
					                # 3. some other unknown error (ib blocking the
 | 
				
			||||||
 | 
					                #    history-query bc they don't want you seeing how
 | 
				
			||||||
 | 
					                #    they cucked all the tinas.. like with options
 | 
				
			||||||
 | 
					                #    hist)
 | 
				
			||||||
 | 
					                #
 | 
				
			||||||
 | 
					                if def_frame_duration:
 | 
				
			||||||
 | 
					                    # decrement by a duration's (frame) worth of time
 | 
				
			||||||
 | 
					                    # as maybe indicated by the backend to see if we
 | 
				
			||||||
 | 
					                    # can get older data before this possible
 | 
				
			||||||
 | 
					                    # "history gap".
 | 
				
			||||||
 | 
					                    last_start_dt: datetime = last_start_dt.subtract(
 | 
				
			||||||
 | 
					                        seconds=def_frame_duration.total_seconds()
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
 | 
					                    gap_report += (
 | 
				
			||||||
 | 
					                        f'Decrementing `end_dt` and retrying with,\n'
 | 
				
			||||||
 | 
					                        f'def_frame_duration: {def_frame_duration}\n'
 | 
				
			||||||
 | 
					                        f'(new) last_start_dt: {last_start_dt}\n'
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					                    log.warning(gap_report)
 | 
				
			||||||
 | 
					                    # skip writing to shm/tsdb and try the next
 | 
				
			||||||
 | 
					                    # duration's worth of prior history.
 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    # await tractor.pause()
 | 
				
			||||||
 | 
					                    raise DataUnavailable(gap_report)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # broker says there never was or is no more history to pull
 | 
					            # broker says there never was or is no more history to pull
 | 
				
			||||||
            except DataUnavailable:
 | 
					            except DataUnavailable as due:
 | 
				
			||||||
 | 
					                message: str = due.args[0]
 | 
				
			||||||
                log.warning(
 | 
					                log.warning(
 | 
				
			||||||
                    f'NO-MORE-DATA in range?\n'
 | 
					                    f'Provider {mod.name!r} halted backfill due to,\n\n'
 | 
				
			||||||
                    f'`{mod.name}` halted history:\n'
 | 
					
 | 
				
			||||||
                    f'tf@fqme: {timeframe}@{mkt.fqme}\n'
 | 
					                    f'{message}\n'
 | 
				
			||||||
                    'bf_until <- last_start_dt:\n'
 | 
					
 | 
				
			||||||
                    f'{backfill_until_dt} <- {last_start_dt}\n'
 | 
					                    f'fqme: {mkt.fqme}\n'
 | 
				
			||||||
 | 
					                    f'timeframe: {timeframe}\n'
 | 
				
			||||||
 | 
					                    f'last_start_dt: {last_start_dt}\n'
 | 
				
			||||||
 | 
					                    f'bf_until: {backfill_until_dt}\n'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
                # UGH: what's a better way?
 | 
					                # UGH: what's a better way?
 | 
				
			||||||
                # TODO: backends are responsible for being correct on
 | 
					                # TODO: backends are responsible for being correct on
 | 
				
			||||||
| 
						 | 
					@ -465,34 +490,54 @@ async def start_backfill(
 | 
				
			||||||
                #     to halt the request loop until the condition is
 | 
					                #     to halt the request loop until the condition is
 | 
				
			||||||
                #     resolved or should the backend be entirely in
 | 
					                #     resolved or should the backend be entirely in
 | 
				
			||||||
                #     charge of solving such faults? yes, right?
 | 
					                #     charge of solving such faults? yes, right?
 | 
				
			||||||
                # if timeframe > 1:
 | 
					 | 
				
			||||||
                #     await tractor.pause()
 | 
					 | 
				
			||||||
                return
 | 
					                return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            time: np.ndarray = array['time']
 | 
				
			||||||
            assert (
 | 
					            assert (
 | 
				
			||||||
                array['time'][0]
 | 
					                time[0]
 | 
				
			||||||
                ==
 | 
					                ==
 | 
				
			||||||
                next_start_dt.timestamp()
 | 
					                next_start_dt.timestamp()
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            diff = last_start_dt - next_start_dt
 | 
					            assert time[-1] == next_end_dt.timestamp()
 | 
				
			||||||
            frame_time_diff_s = diff.seconds
 | 
					
 | 
				
			||||||
 | 
					            expected_dur: Interval = last_start_dt - next_start_dt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # frame's worth of sample-period-steps, in seconds
 | 
					            # frame's worth of sample-period-steps, in seconds
 | 
				
			||||||
            frame_size_s: float = len(array) * timeframe
 | 
					            frame_size_s: float = len(array) * timeframe
 | 
				
			||||||
            expected_frame_size_s: float = frame_size_s + timeframe
 | 
					            recv_frame_dur: Duration = (
 | 
				
			||||||
            if frame_time_diff_s > expected_frame_size_s:
 | 
					                from_timestamp(array[-1]['time'])
 | 
				
			||||||
 | 
					                -
 | 
				
			||||||
 | 
					                from_timestamp(array[0]['time'])
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            if (
 | 
				
			||||||
 | 
					                (lt_frame := (recv_frame_dur < expected_dur))
 | 
				
			||||||
 | 
					                or
 | 
				
			||||||
 | 
					                (null_frame := (frame_size_s == 0))
 | 
				
			||||||
 | 
					                # ^XXX, should NEVER hit now!
 | 
				
			||||||
 | 
					            ):
 | 
				
			||||||
                # XXX: query result includes a start point prior to our
 | 
					                # XXX: query result includes a start point prior to our
 | 
				
			||||||
                # expected "frame size" and thus is likely some kind of
 | 
					                # expected "frame size" and thus is likely some kind of
 | 
				
			||||||
                # history gap (eg. market closed period, outage, etc.)
 | 
					                # history gap (eg. market closed period, outage, etc.)
 | 
				
			||||||
                # so just report it to console for now.
 | 
					                # so just report it to console for now.
 | 
				
			||||||
 | 
					                if lt_frame:
 | 
				
			||||||
 | 
					                    reason = 'Possible GAP (or first-datum)'
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    assert null_frame
 | 
				
			||||||
 | 
					                    reason = 'NULL-FRAME'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                missing_dur: Interval = expected_dur.end - recv_frame_dur.end
 | 
				
			||||||
                log.warning(
 | 
					                log.warning(
 | 
				
			||||||
                    'GAP DETECTED:\n'
 | 
					                    f'{timeframe}s-series {reason} detected!\n'
 | 
				
			||||||
                    f'last_start_dt: {last_start_dt}\n'
 | 
					                    f'fqme: {mkt.fqme}\n'
 | 
				
			||||||
                    f'diff: {diff}\n'
 | 
					                    f'last_start_dt: {last_start_dt}\n\n'
 | 
				
			||||||
                    f'frame_time_diff_s: {frame_time_diff_s}\n'
 | 
					                    f'recv interval: {recv_frame_dur}\n'
 | 
				
			||||||
 | 
					                    f'expected interval: {expected_dur}\n\n'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    f'Missing duration of history of {missing_dur.in_words()!r}\n'
 | 
				
			||||||
 | 
					                    f'{missing_dur}\n'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					                # await tractor.pause()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            to_push = diff_history(
 | 
					            to_push = diff_history(
 | 
				
			||||||
                array,
 | 
					                array,
 | 
				
			||||||
| 
						 | 
					@ -567,7 +612,8 @@ async def start_backfill(
 | 
				
			||||||
            # long-term storage.
 | 
					            # long-term storage.
 | 
				
			||||||
            if (
 | 
					            if (
 | 
				
			||||||
                storage is not None
 | 
					                storage is not None
 | 
				
			||||||
                and write_tsdb
 | 
					                and
 | 
				
			||||||
 | 
					                write_tsdb
 | 
				
			||||||
            ):
 | 
					            ):
 | 
				
			||||||
                log.info(
 | 
					                log.info(
 | 
				
			||||||
                    f'Writing {ln} frame to storage:\n'
 | 
					                    f'Writing {ln} frame to storage:\n'
 | 
				
			||||||
| 
						 | 
					@ -688,7 +734,7 @@ async def back_load_from_tsdb(
 | 
				
			||||||
        last_tsdb_dt
 | 
					        last_tsdb_dt
 | 
				
			||||||
        and latest_start_dt
 | 
					        and latest_start_dt
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
        backfilled_size_s = (
 | 
					        backfilled_size_s: Duration = (
 | 
				
			||||||
            latest_start_dt - last_tsdb_dt
 | 
					            latest_start_dt - last_tsdb_dt
 | 
				
			||||||
        ).seconds
 | 
					        ).seconds
 | 
				
			||||||
        # if the shm buffer len is not large enough to contain
 | 
					        # if the shm buffer len is not large enough to contain
 | 
				
			||||||
| 
						 | 
					@ -911,6 +957,8 @@ async def tsdb_backfill(
 | 
				
			||||||
            f'{pformat(config)}\n'
 | 
					            f'{pformat(config)}\n'
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # concurrently load the provider's most-recent-frame AND any
 | 
				
			||||||
 | 
					        # pre-existing tsdb history already saved in `piker` storage.
 | 
				
			||||||
        dt_eps: list[DateTime, DateTime] = []
 | 
					        dt_eps: list[DateTime, DateTime] = []
 | 
				
			||||||
        async with trio.open_nursery() as tn:
 | 
					        async with trio.open_nursery() as tn:
 | 
				
			||||||
            tn.start_soon(
 | 
					            tn.start_soon(
 | 
				
			||||||
| 
						 | 
					@ -921,7 +969,6 @@ async def tsdb_backfill(
 | 
				
			||||||
                timeframe,
 | 
					                timeframe,
 | 
				
			||||||
                config,
 | 
					                config,
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					 | 
				
			||||||
            tsdb_entry: tuple = await load_tsdb_hist(
 | 
					            tsdb_entry: tuple = await load_tsdb_hist(
 | 
				
			||||||
                storage,
 | 
					                storage,
 | 
				
			||||||
                mkt,
 | 
					                mkt,
 | 
				
			||||||
| 
						 | 
					@ -950,6 +997,25 @@ async def tsdb_backfill(
 | 
				
			||||||
                mr_end_dt,
 | 
					                mr_end_dt,
 | 
				
			||||||
            ) = dt_eps
 | 
					            ) = dt_eps
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
 | 
				
			||||||
 | 
					            calced_frame_size: Duration = mk_duration(
 | 
				
			||||||
 | 
					                seconds=first_frame_dur_s,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            # NOTE, attempt to use the backend declared default frame
 | 
				
			||||||
 | 
					            # sizing (as allowed by their time-series query APIs) and
 | 
				
			||||||
 | 
					            # if not provided try to construct a default from the
 | 
				
			||||||
 | 
					            # first frame received above.
 | 
				
			||||||
 | 
					            def_frame_durs: dict[
 | 
				
			||||||
 | 
					                int,
 | 
				
			||||||
 | 
					                Duration,
 | 
				
			||||||
 | 
					            ]|None = config.get('frame_types', None)
 | 
				
			||||||
 | 
					            if def_frame_durs:
 | 
				
			||||||
 | 
					                def_frame_size: Duration = def_frame_durs[timeframe]
 | 
				
			||||||
 | 
					                assert def_frame_size == calced_frame_size
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                # use what we calced from first frame above.
 | 
				
			||||||
 | 
					                def_frame_size = calced_frame_size
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # NOTE: when there's no offline data, there's 2 cases:
 | 
					            # NOTE: when there's no offline data, there's 2 cases:
 | 
				
			||||||
            # - data backend doesn't support timeframe/sample
 | 
					            # - data backend doesn't support timeframe/sample
 | 
				
			||||||
            #   period (in which case `dt_eps` should be `None` and
 | 
					            #   period (in which case `dt_eps` should be `None` and
 | 
				
			||||||
| 
						 | 
					@ -980,7 +1046,7 @@ async def tsdb_backfill(
 | 
				
			||||||
                    partial(
 | 
					                    partial(
 | 
				
			||||||
                        start_backfill,
 | 
					                        start_backfill,
 | 
				
			||||||
                        get_hist=get_hist,
 | 
					                        get_hist=get_hist,
 | 
				
			||||||
                        frame_types=config.get('frame_types', None),
 | 
					                        def_frame_duration=def_frame_size,
 | 
				
			||||||
                        mod=mod,
 | 
					                        mod=mod,
 | 
				
			||||||
                        mkt=mkt,
 | 
					                        mkt=mkt,
 | 
				
			||||||
                        shm=shm,
 | 
					                        shm=shm,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue