Get sync-to-marketstore-tsdb history retrieval workinnn
							parent
							
								
									ac22190b60
								
							
						
					
					
						commit
						a6b8c03e0e
					
				| 
						 | 
					@ -38,6 +38,7 @@ from trio.abc import ReceiveChannel
 | 
				
			||||||
from trio_typing import TaskStatus
 | 
					from trio_typing import TaskStatus
 | 
				
			||||||
import tractor
 | 
					import tractor
 | 
				
			||||||
from pydantic import BaseModel
 | 
					from pydantic import BaseModel
 | 
				
			||||||
 | 
					import numpy as np
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from ..brokers import get_brokermod
 | 
					from ..brokers import get_brokermod
 | 
				
			||||||
from .._cacheables import maybe_open_context
 | 
					from .._cacheables import maybe_open_context
 | 
				
			||||||
| 
						 | 
					@ -278,13 +279,38 @@ async def manage_history(
 | 
				
			||||||
                # TODO: this should be used verbatim for the pure
 | 
					                # TODO: this should be used verbatim for the pure
 | 
				
			||||||
                # shm backfiller approach below.
 | 
					                # shm backfiller approach below.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                def diff_history(
 | 
				
			||||||
 | 
					                    array,
 | 
				
			||||||
 | 
					                    start_dt,
 | 
				
			||||||
 | 
					                    end_dt,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                ) -> np.ndarray:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    s_diff = (last_tsdb_dt - start_dt).seconds
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # if we detect a partial frame's worth of data
 | 
				
			||||||
 | 
					                    # that is new, slice out only that history and
 | 
				
			||||||
 | 
					                    # write to shm.
 | 
				
			||||||
 | 
					                    if s_diff > 0:
 | 
				
			||||||
 | 
					                        assert last_tsdb_dt > start_dt
 | 
				
			||||||
 | 
					                        selected = array['time'] > last_tsdb_dt.timestamp()
 | 
				
			||||||
 | 
					                        to_push = array[selected]
 | 
				
			||||||
 | 
					                        log.info(
 | 
				
			||||||
 | 
					                            f'Pushing partial frame {to_push.size} to shm'
 | 
				
			||||||
 | 
					                        )
 | 
				
			||||||
 | 
					                        return to_push
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    else:
 | 
				
			||||||
 | 
					                        return array
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # start history anal and load missing new data via backend.
 | 
					                # start history anal and load missing new data via backend.
 | 
				
			||||||
                async with open_history_client(fqsn) as hist:
 | 
					                async with open_history_client(fqsn) as hist:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # get latest query's worth of history all the way
 | 
					                    # get latest query's worth of history all the way
 | 
				
			||||||
                    # back to what is recorded in the tsdb
 | 
					                    # back to what is recorded in the tsdb
 | 
				
			||||||
                    array, start_dt, end_dt = await hist(end_dt='')
 | 
					                    array, start_dt, end_dt = await hist(end_dt='')
 | 
				
			||||||
                    shm.push(array)
 | 
					                    to_push = diff_history(array, start_dt, end_dt)
 | 
				
			||||||
 | 
					                    shm.push(to_push)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # let caller unblock and deliver latest history frame
 | 
					                    # let caller unblock and deliver latest history frame
 | 
				
			||||||
                    task_status.started(shm)
 | 
					                    task_status.started(shm)
 | 
				
			||||||
| 
						 | 
					@ -293,33 +319,13 @@ async def manage_history(
 | 
				
			||||||
                    # pull new history frames until we hit latest
 | 
					                    # pull new history frames until we hit latest
 | 
				
			||||||
                    # already in the tsdb
 | 
					                    # already in the tsdb
 | 
				
			||||||
                    while start_dt > last_tsdb_dt:
 | 
					                    while start_dt > last_tsdb_dt:
 | 
				
			||||||
 | 
					 | 
				
			||||||
                        array, start_dt, end_dt = await hist(end_dt=start_dt)
 | 
					                        array, start_dt, end_dt = await hist(end_dt=start_dt)
 | 
				
			||||||
                        s_diff = (last_tsdb_dt - start_dt).seconds
 | 
					                        to_push = diff_history(array, start_dt, end_dt)
 | 
				
			||||||
 | 
					                        shm.push(to_push, prepend=True)
 | 
				
			||||||
                        # if we detect a partial frame's worth of data
 | 
					 | 
				
			||||||
                        # that is new, slice out only that history and
 | 
					 | 
				
			||||||
                        # write to shm.
 | 
					 | 
				
			||||||
                        if s_diff > 0:
 | 
					 | 
				
			||||||
                            assert last_tsdb_dt > start_dt
 | 
					 | 
				
			||||||
                            selected = array['time'] > last_tsdb_dt.timestamp()
 | 
					 | 
				
			||||||
                            to_push = array[selected]
 | 
					 | 
				
			||||||
                            log.info(
 | 
					 | 
				
			||||||
                                f'Pushing partial frame {to_push.size} to shm'
 | 
					 | 
				
			||||||
                            )
 | 
					 | 
				
			||||||
                            shm.push(to_push, prepend=True)
 | 
					 | 
				
			||||||
                            break
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        else:
 | 
					 | 
				
			||||||
                            # write to shm
 | 
					 | 
				
			||||||
                            log.info(f'Pushing {array.size} datums to shm')
 | 
					 | 
				
			||||||
                            shm.push(array, prepend=True)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # TODO: see if there's faster multi-field reads:
 | 
					                    # TODO: see if there's faster multi-field reads:
 | 
				
			||||||
                    # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
 | 
					                    # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
 | 
				
			||||||
                    # re-index  with a `time` and index field
 | 
					                    # re-index  with a `time` and index field
 | 
				
			||||||
                    # await tractor.breakpoint()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    shm.push(
 | 
					                    shm.push(
 | 
				
			||||||
                        fastest[-shm._first.value:],
 | 
					                        fastest[-shm._first.value:],
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue