Unify backfilling logic into common task-routine
							parent
							
								
									c5ed9b5955
								
							
						
					
					
						commit
						f7b3215aa4
					
				| 
						 | 
				
			
			@ -32,7 +32,6 @@ from typing import (
 | 
			
		|||
    Awaitable,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import pendulum
 | 
			
		||||
import trio
 | 
			
		||||
from trio.abc import ReceiveChannel
 | 
			
		||||
from trio_typing import TaskStatus
 | 
			
		||||
| 
						 | 
				
			
			@ -194,21 +193,98 @@ async def _setup_persistent_brokerd(
 | 
			
		|||
        await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def diff_history(
 | 
			
		||||
    array,
 | 
			
		||||
    start_dt,
 | 
			
		||||
    end_dt,
 | 
			
		||||
    last_tsdb_dt: Optional[datetime] = None
 | 
			
		||||
 | 
			
		||||
) -> np.ndarray:
 | 
			
		||||
 | 
			
		||||
    if last_tsdb_dt:
 | 
			
		||||
        s_diff = (last_tsdb_dt - start_dt).seconds
 | 
			
		||||
 | 
			
		||||
        # if we detect a partial frame's worth of data
 | 
			
		||||
        # that is new, slice out only that history and
 | 
			
		||||
        # write to shm.
 | 
			
		||||
        if s_diff > 0:
 | 
			
		||||
            assert last_tsdb_dt > start_dt
 | 
			
		||||
            selected = array['time'] > last_tsdb_dt.timestamp()
 | 
			
		||||
            to_push = array[selected]
 | 
			
		||||
            log.info(
 | 
			
		||||
                f'Pushing partial frame {to_push.size} to shm'
 | 
			
		||||
            )
 | 
			
		||||
            return to_push
 | 
			
		||||
 | 
			
		||||
    return array
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def start_backfill(
 | 
			
		||||
    mod: ModuleType,
 | 
			
		||||
    fqsn: str,
 | 
			
		||||
    bfqsn: str,
 | 
			
		||||
    shm: ShmArray,
 | 
			
		||||
 | 
			
		||||
    last_tsdb_dt: Optional[datetime] = None,
 | 
			
		||||
    do_legacy: bool = False,
 | 
			
		||||
 | 
			
		||||
    task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
 | 
			
		||||
) -> int:
 | 
			
		||||
 | 
			
		||||
    if do_legacy:
 | 
			
		||||
        return await mod.backfill_bars(
 | 
			
		||||
        fqsn,
 | 
			
		||||
            bfqsn,
 | 
			
		||||
            shm,
 | 
			
		||||
            task_status=task_status,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    async with mod.open_history_client(bfqsn) as hist:
 | 
			
		||||
 | 
			
		||||
        # get latest query's worth of history all the way
 | 
			
		||||
        # back to what is recorded in the tsdb
 | 
			
		||||
        array, start_dt, end_dt = await hist(end_dt=None)
 | 
			
		||||
 | 
			
		||||
        to_push = diff_history(
 | 
			
		||||
            array,
 | 
			
		||||
            start_dt,
 | 
			
		||||
            end_dt,
 | 
			
		||||
            last_tsdb_dt=last_tsdb_dt,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        log.info(f'Pushing {to_push.size} to shm!')
 | 
			
		||||
        shm.push(to_push)
 | 
			
		||||
 | 
			
		||||
        for delay_s in sampler.subscribers:
 | 
			
		||||
            await broadcast(delay_s)
 | 
			
		||||
 | 
			
		||||
        # let caller unblock and deliver latest history frame
 | 
			
		||||
        task_status.started(shm)
 | 
			
		||||
 | 
			
		||||
        # pull new history frames until we hit latest
 | 
			
		||||
        # already in the tsdb
 | 
			
		||||
        # while start_dt > last_tsdb_dt:
 | 
			
		||||
        while True:
 | 
			
		||||
            array, start_dt, end_dt = await hist(end_dt=start_dt)
 | 
			
		||||
            to_push = diff_history(
 | 
			
		||||
                array,
 | 
			
		||||
                start_dt,
 | 
			
		||||
                end_dt,
 | 
			
		||||
 | 
			
		||||
                # last_tsdb_dt=last_tsdb_dt,
 | 
			
		||||
                # XXX: hacky, just run indefinitely
 | 
			
		||||
                last_tsdb_dt=None,
 | 
			
		||||
            )
 | 
			
		||||
            log.info(f'Pushing {to_push.size} to shm!')
 | 
			
		||||
 | 
			
		||||
            # bail on shm allocation overrun
 | 
			
		||||
            try:
 | 
			
		||||
                shm.push(to_push, prepend=True)
 | 
			
		||||
            except ValueError:
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
            for delay_s in sampler.subscribers:
 | 
			
		||||
                await broadcast(delay_s)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def manage_history(
 | 
			
		||||
    mod: ModuleType,
 | 
			
		||||
| 
						 | 
				
			
			@ -251,108 +327,42 @@ async def manage_history(
 | 
			
		|||
    # for now only do backfilling if no tsdb can be found
 | 
			
		||||
    do_legacy_backfill = not is_up and opened
 | 
			
		||||
 | 
			
		||||
    open_history_client = getattr(mod, 'open_history_client', None)
 | 
			
		||||
 | 
			
		||||
    bfqsn = fqsn.replace('.' + mod.name, '')
 | 
			
		||||
    open_history_client = getattr(mod, 'open_history_client', None)
 | 
			
		||||
 | 
			
		||||
    if is_up and opened and open_history_client:
 | 
			
		||||
 | 
			
		||||
        log.info('Found existing `marketstored`')
 | 
			
		||||
        from . import marketstore
 | 
			
		||||
 | 
			
		||||
        async with marketstore.open_storage_client(
 | 
			
		||||
            fqsn,
 | 
			
		||||
        ) as storage:
 | 
			
		||||
 | 
			
		||||
            tsdb_arrays = await storage.read_ohlcv(fqsn)
 | 
			
		||||
 | 
			
		||||
            if not tsdb_arrays:
 | 
			
		||||
                do_legacy_backfill = True
 | 
			
		||||
 | 
			
		||||
            else:
 | 
			
		||||
                log.info(f'Loaded tsdb history {tsdb_arrays}')
 | 
			
		||||
 | 
			
		||||
                fastest = list(tsdb_arrays.values())[0]
 | 
			
		||||
                times = fastest['Epoch']
 | 
			
		||||
                first, last = times[0], times[-1]
 | 
			
		||||
                first_tsdb_dt, last_tsdb_dt = map(
 | 
			
		||||
                    pendulum.from_timestamp, [first, last]
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # TODO: this should be used verbatim for the pure
 | 
			
		||||
            # shm backfiller approach below.
 | 
			
		||||
 | 
			
		||||
                def diff_history(
 | 
			
		||||
                    array,
 | 
			
		||||
                    start_dt,
 | 
			
		||||
                    end_dt,
 | 
			
		||||
                    last_tsdb_dt: Optional[datetime] = None
 | 
			
		||||
 | 
			
		||||
                ) -> np.ndarray:
 | 
			
		||||
 | 
			
		||||
                    if last_tsdb_dt:
 | 
			
		||||
                        s_diff = (last_tsdb_dt - start_dt).seconds
 | 
			
		||||
 | 
			
		||||
                        # if we detect a partial frame's worth of data
 | 
			
		||||
                        # that is new, slice out only that history and
 | 
			
		||||
                        # write to shm.
 | 
			
		||||
                        if s_diff > 0:
 | 
			
		||||
                            assert last_tsdb_dt > start_dt
 | 
			
		||||
                            selected = array['time'] > last_tsdb_dt.timestamp()
 | 
			
		||||
                            to_push = array[selected]
 | 
			
		||||
                            log.info(
 | 
			
		||||
                                f'Pushing partial frame {to_push.size} to shm'
 | 
			
		||||
                            )
 | 
			
		||||
                            return to_push
 | 
			
		||||
 | 
			
		||||
                    return array
 | 
			
		||||
 | 
			
		||||
            # start history anal and load missing new data via backend.
 | 
			
		||||
            series, first_dt, last_dt = await storage.load(fqsn)
 | 
			
		||||
 | 
			
		||||
            broker, symbol, expiry = unpack_fqsn(fqsn)
 | 
			
		||||
 | 
			
		||||
                async with open_history_client(bfqsn) as hist:
 | 
			
		||||
 | 
			
		||||
                    # get latest query's worth of history all the way
 | 
			
		||||
                    # back to what is recorded in the tsdb
 | 
			
		||||
                    array, start_dt, end_dt = await hist(end_dt=None)
 | 
			
		||||
                    to_push = diff_history(
 | 
			
		||||
                        array,
 | 
			
		||||
                        start_dt,
 | 
			
		||||
                        end_dt,
 | 
			
		||||
                        last_tsdb_dt=last_tsdb_dt,
 | 
			
		||||
            await bus.nursery.start(
 | 
			
		||||
                partial(
 | 
			
		||||
                    start_backfill,
 | 
			
		||||
                    mod,
 | 
			
		||||
                    bfqsn,
 | 
			
		||||
                    shm,
 | 
			
		||||
                    last_tsdb_dt=last_dt,
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
                    log.info(f'Pushing {to_push.size} to shm!')
 | 
			
		||||
                    shm.push(to_push)
 | 
			
		||||
 | 
			
		||||
                    for delay_s in sampler.subscribers:
 | 
			
		||||
                        await broadcast(delay_s)
 | 
			
		||||
 | 
			
		||||
                    # let caller unblock and deliver latest history frame
 | 
			
		||||
            task_status.started(shm)
 | 
			
		||||
            some_data_ready.set()
 | 
			
		||||
 | 
			
		||||
                    # pull new history frames until we hit latest
 | 
			
		||||
                    # already in the tsdb
 | 
			
		||||
                    # while start_dt > last_tsdb_dt:
 | 
			
		||||
                    while True:
 | 
			
		||||
                        array, start_dt, end_dt = await hist(end_dt=start_dt)
 | 
			
		||||
                        to_push = diff_history(
 | 
			
		||||
                            array,
 | 
			
		||||
                            start_dt,
 | 
			
		||||
                            end_dt,
 | 
			
		||||
                            # last_tsdb_dt=last_tsdb_dt,
 | 
			
		||||
                            # just run indefinitely
 | 
			
		||||
                            last_tsdb_dt=None,
 | 
			
		||||
                        )
 | 
			
		||||
                        log.info(f'Pushing {to_push.size} to shm!')
 | 
			
		||||
                        shm.push(to_push, prepend=True)
 | 
			
		||||
                        for delay_s in sampler.subscribers:
 | 
			
		||||
                            await broadcast(delay_s)
 | 
			
		||||
 | 
			
		||||
            # TODO: see if there's faster multi-field reads:
 | 
			
		||||
            # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
 | 
			
		||||
            # re-index  with a `time` and index field
 | 
			
		||||
            history = list(series.values())
 | 
			
		||||
            if history:
 | 
			
		||||
                fastest = history[0]
 | 
			
		||||
                shm.push(
 | 
			
		||||
                    fastest[-shm._first.value:],
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -369,9 +379,7 @@ async def manage_history(
 | 
			
		|||
                        'Volume': 'volume',
 | 
			
		||||
                    },
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                    # TODO: write new data to tsdb to be ready to for next
 | 
			
		||||
                    # read.
 | 
			
		||||
                # TODO: write new data to tsdb to be ready to for next read.
 | 
			
		||||
 | 
			
		||||
    if do_legacy_backfill:
 | 
			
		||||
        # do a legacy incremental backfill from the provider.
 | 
			
		||||
| 
						 | 
				
			
			@ -385,6 +393,7 @@ async def manage_history(
 | 
			
		|||
            mod,
 | 
			
		||||
            bfqsn,
 | 
			
		||||
            shm,
 | 
			
		||||
            do_legacy=True,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # yield back after client connect with filled shm
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue