Add WIP backfiller from data feed helper
							parent
							
								
									fa69fca311
								
							
						
					
					
						commit
						3487f76147
					
				| 
						 | 
					@ -36,7 +36,7 @@ from trio_websocket import open_websocket_url
 | 
				
			||||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
 | 
					from anyio_marketstore import open_marketstore_client, MarketstoreClient
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from ..log import get_logger, get_console_log
 | 
					from ..log import get_logger, get_console_log
 | 
				
			||||||
from ..data import open_feed
 | 
					from ..data.feed import maybe_open_feed
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
log = get_logger(__name__)
 | 
					log = get_logger(__name__)
 | 
				
			||||||
| 
						 | 
					@ -71,6 +71,7 @@ _quote_dt = [
 | 
				
			||||||
    # ('brokerd_ts', 'i64'),
 | 
					    # ('brokerd_ts', 'i64'),
 | 
				
			||||||
    # ('VWAP', 'f4')
 | 
					    # ('VWAP', 'f4')
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
 | 
					_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan)
 | 
				
			||||||
_tick_map = {
 | 
					_tick_map = {
 | 
				
			||||||
    'Up': 1,
 | 
					    'Up': 1,
 | 
				
			||||||
| 
						 | 
					@ -79,6 +80,19 @@ _tick_map = {
 | 
				
			||||||
    None: np.nan,
 | 
					    None: np.nan,
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					_ohlcv_dt = [
 | 
				
			||||||
 | 
					    # these two are required for as a "primary key"
 | 
				
			||||||
 | 
					    ('Epoch', 'i8'),
 | 
				
			||||||
 | 
					    ('Nanoseconds', 'i4'),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # ohlcv sampling
 | 
				
			||||||
 | 
					    ('Open', 'f4'),
 | 
				
			||||||
 | 
					    ('High', 'f4'),
 | 
				
			||||||
 | 
					    ('Low', 'i8'),
 | 
				
			||||||
 | 
					    ('Close', 'i8'),
 | 
				
			||||||
 | 
					    ('Volume', 'f4'),
 | 
				
			||||||
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def mk_tbk(keys: tuple[str, str, str]) -> str:
 | 
					def mk_tbk(keys: tuple[str, str, str]) -> str:
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
| 
						 | 
					@ -143,13 +157,32 @@ def timestamp(date, **kwargs) -> int:
 | 
				
			||||||
@asynccontextmanager
 | 
					@asynccontextmanager
 | 
				
			||||||
async def get_client(
 | 
					async def get_client(
 | 
				
			||||||
    host: str = 'localhost',
 | 
					    host: str = 'localhost',
 | 
				
			||||||
    port: int = 5995
 | 
					    port: int = 5993
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> MarketstoreClient:
 | 
					) -> MarketstoreClient:
 | 
				
			||||||
    async with open_marketstore_client(host, port) as client:
 | 
					    async with open_marketstore_client(host, port) as client:
 | 
				
			||||||
        yield client
 | 
					        yield client
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async def backfill_history():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async with (
 | 
				
			||||||
 | 
					        get_client() as msclient,
 | 
				
			||||||
 | 
					        maybe_open_feed(
 | 
				
			||||||
 | 
					            'ib',
 | 
				
			||||||
 | 
					            ['mnq.globex'],
 | 
				
			||||||
 | 
					            loglevel='info',
 | 
				
			||||||
 | 
					            # backpressure=False,
 | 
				
			||||||
 | 
					            start_stream=False,
 | 
				
			||||||
 | 
					        ) as (feed, stream),
 | 
				
			||||||
 | 
					    ):
 | 
				
			||||||
 | 
					        await tractor.breakpoint()
 | 
				
			||||||
 | 
					        await msclient.write(
 | 
				
			||||||
 | 
					            feed.shm.array,
 | 
				
			||||||
 | 
					            tbk='mnq.globex.ib/1Sec/OHLCV',
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def ingest_quote_stream(
 | 
					async def ingest_quote_stream(
 | 
				
			||||||
    symbols: list[str],
 | 
					    symbols: list[str],
 | 
				
			||||||
    brokername: str,
 | 
					    brokername: str,
 | 
				
			||||||
| 
						 | 
					@ -162,7 +195,7 @@ async def ingest_quote_stream(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    async with (
 | 
					    async with (
 | 
				
			||||||
        open_feed(brokername, symbols, loglevel=loglevel) as feed,
 | 
					        maybe_open_feed(brokername, symbols, loglevel=loglevel) as feed,
 | 
				
			||||||
        get_client() as ms_client,
 | 
					        get_client() as ms_client,
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
        async for quotes in feed.stream:
 | 
					        async for quotes in feed.stream:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue