Get basic OHLCV writes working with `anyio` client
							parent
							
								
									236df4b6d6
								
							
						
					
					
						commit
						a0c3d5f32f
					
				| 
						 | 
				
			
			@ -21,10 +21,15 @@
 | 
			
		|||
- ticK data ingest routines
 | 
			
		||||
- websocket client for subscribing to write triggers
 | 
			
		||||
- todo: tick sequence stream-cloning for testing
 | 
			
		||||
- todo: docker container management automation
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from contextlib import asynccontextmanager
 | 
			
		||||
from typing import Any, Optional
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
    Optional,
 | 
			
		||||
    # Callable,
 | 
			
		||||
    TYPE_CHECKING,
 | 
			
		||||
)
 | 
			
		||||
import time
 | 
			
		||||
from math import isnan
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -36,7 +41,12 @@ from trio_websocket import open_websocket_url
 | 
			
		|||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
 | 
			
		||||
 | 
			
		||||
from ..log import get_logger, get_console_log
 | 
			
		||||
from ..data.feed import maybe_open_feed
 | 
			
		||||
from .feed import maybe_open_feed
 | 
			
		||||
from ._source import mk_fqsn, Symbol
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# if TYPE_CHECKING:
 | 
			
		||||
#     from ._sharedmem import ShmArray
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
| 
						 | 
				
			
			@ -83,7 +93,7 @@ _tick_map = {
 | 
			
		|||
_ohlcv_dt = [
 | 
			
		||||
    # these two are required for as a "primary key"
 | 
			
		||||
    ('Epoch', 'i8'),
 | 
			
		||||
    ('Nanoseconds', 'i4'),
 | 
			
		||||
    # ('Nanoseconds', 'i4'),
 | 
			
		||||
 | 
			
		||||
    # ohlcv sampling
 | 
			
		||||
    ('Open', 'f4'),
 | 
			
		||||
| 
						 | 
				
			
			@ -157,30 +167,87 @@ def timestamp(date, **kwargs) -> int:
 | 
			
		|||
@asynccontextmanager
 | 
			
		||||
async def get_client(
 | 
			
		||||
    host: str = 'localhost',
 | 
			
		||||
    port: int = 5993
 | 
			
		||||
    port: int = 5995
 | 
			
		||||
 | 
			
		||||
) -> MarketstoreClient:
 | 
			
		||||
    async with open_marketstore_client(host, port) as client:
 | 
			
		||||
    async with open_marketstore_client(
 | 
			
		||||
        host,
 | 
			
		||||
        port
 | 
			
		||||
    ) as client:
 | 
			
		||||
        yield client
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def backfill_history():
 | 
			
		||||
# class MarketStoreError(Exception):
 | 
			
		||||
#     "Generic marketstore client error"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# def err_on_resp(response: dict) -> None:
 | 
			
		||||
#     """Raise any errors found in responses from client request.
 | 
			
		||||
#     """
 | 
			
		||||
#     responses = response['responses']
 | 
			
		||||
#     if responses is not None:
 | 
			
		||||
#         for r in responses:
 | 
			
		||||
#             err = r['error']
 | 
			
		||||
#             if err:
 | 
			
		||||
#                 raise MarketStoreError(err)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def backfill_history(
 | 
			
		||||
    # symbol: Symbol
 | 
			
		||||
 | 
			
		||||
) -> list[str]:
 | 
			
		||||
    # TODO: 
 | 
			
		||||
    # - compute time-smaple step
 | 
			
		||||
    # - take ``Symbol`` as input
 | 
			
		||||
    # - backtrack into history using backend helper endpoint
 | 
			
		||||
 | 
			
		||||
    # broker = 'ib'
 | 
			
		||||
    # symbol = 'mnq.globex'
 | 
			
		||||
 | 
			
		||||
    broker = 'binance'
 | 
			
		||||
    symbol = 'btcusdt'
 | 
			
		||||
 | 
			
		||||
    fqsn = mk_fqsn(broker, symbol)
 | 
			
		||||
 | 
			
		||||
    async with (
 | 
			
		||||
        get_client() as msclient,
 | 
			
		||||
        maybe_open_feed(
 | 
			
		||||
            'ib',
 | 
			
		||||
            ['mnq.globex'],
 | 
			
		||||
            broker,
 | 
			
		||||
            [symbol],
 | 
			
		||||
            loglevel='info',
 | 
			
		||||
            # backpressure=False,
 | 
			
		||||
            start_stream=False,
 | 
			
		||||
        ) as (feed, stream),
 | 
			
		||||
    ):
 | 
			
		||||
        await tractor.breakpoint()
 | 
			
		||||
        await msclient.write(
 | 
			
		||||
            feed.shm.array,
 | 
			
		||||
            tbk='mnq.globex.ib/1Sec/OHLCV',
 | 
			
		||||
        ohlcv = feed.shm.array
 | 
			
		||||
        mkts_dt = np.dtype(_ohlcv_dt)
 | 
			
		||||
 | 
			
		||||
        syms = await msclient.list_symbols()
 | 
			
		||||
        log.info(f'Existing symbol set:\n{pformat(syms)}')
 | 
			
		||||
 | 
			
		||||
        # build mkts schema compat array
 | 
			
		||||
        mkts_array = np.zeros(
 | 
			
		||||
            len(ohlcv),
 | 
			
		||||
            dtype=mkts_dt,
 | 
			
		||||
        )
 | 
			
		||||
        # copy from shm array
 | 
			
		||||
        mkts_array[:] = ohlcv[[
 | 
			
		||||
            'time',
 | 
			
		||||
            'open',
 | 
			
		||||
            'high',
 | 
			
		||||
            'low',
 | 
			
		||||
            'close',
 | 
			
		||||
            'volume',
 | 
			
		||||
        ]]
 | 
			
		||||
 | 
			
		||||
        # write to db
 | 
			
		||||
        resp = await msclient.write(
 | 
			
		||||
            mkts_array,
 | 
			
		||||
            tbk=f'{fqsn}/1Min/OHLCV',
 | 
			
		||||
            isvariablelength=True,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # TODO: backfiller loop
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def ingest_quote_stream(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue