Add back in OHLCV dtype template and client side ws streamer
							parent
							
								
									35f7c3409a
								
							
						
					
					
						commit
						f9b799b53d
					
				| 
						 | 
				
			
			@ -26,12 +26,12 @@
 | 
			
		|||
from contextlib import asynccontextmanager
 | 
			
		||||
from typing import Any, Optional
 | 
			
		||||
import time
 | 
			
		||||
# from math import isnan
 | 
			
		||||
from math import isnan
 | 
			
		||||
 | 
			
		||||
# import msgpack
 | 
			
		||||
import msgpack
 | 
			
		||||
import numpy as np
 | 
			
		||||
import pandas as pd
 | 
			
		||||
# import tractor
 | 
			
		||||
import tractor
 | 
			
		||||
from trio_websocket import open_websocket_url
 | 
			
		||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -44,15 +44,29 @@ log = get_logger(__name__)
 | 
			
		|||
_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
 | 
			
		||||
_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids)
 | 
			
		||||
 | 
			
		||||
_tick_dt = [
 | 
			
		||||
    # these two are required for as a "primary key"
 | 
			
		||||
    ('Epoch', 'i8'),
 | 
			
		||||
    ('Nanoseconds', 'i4'),
 | 
			
		||||
    ('IsTrade', 'i1'),
 | 
			
		||||
    ('IsBid', 'i1'),
 | 
			
		||||
    ('Price', 'f4'),
 | 
			
		||||
    ('Size', 'f4')
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
_quote_dt = [
 | 
			
		||||
    # these two are required for as a "primary key"
 | 
			
		||||
    ('Epoch', 'i8'),
 | 
			
		||||
    ('Nanoseconds', 'i4'),
 | 
			
		||||
 | 
			
		||||
    ('IsTrade', 'i1'),
 | 
			
		||||
    ('IsBid', 'i1'),
 | 
			
		||||
    ('Price', 'f4'),
 | 
			
		||||
    ('Size', 'f4')
 | 
			
		||||
    ('Tick', 'i4'),  # do we need this?
 | 
			
		||||
    ('Last', 'f4'),
 | 
			
		||||
    ('Bid', 'f4'),
 | 
			
		||||
    ('Bsize', 'f4'),
 | 
			
		||||
    ('Asize', 'f4'),
 | 
			
		||||
    ('Ask', 'f4'),
 | 
			
		||||
    ('Size', 'i8'),
 | 
			
		||||
    ('Volume', 'f4'),
 | 
			
		||||
]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -69,11 +83,12 @@ def mk_tbk(keys: tuple[str, str, str]) -> str:
 | 
			
		|||
def quote_to_marketstore_structarray(
 | 
			
		||||
    quote: dict[str, Any],
 | 
			
		||||
    last_fill: Optional[float]
 | 
			
		||||
 | 
			
		||||
) -> np.array:
 | 
			
		||||
    '''
 | 
			
		||||
    Return marketstore writeable structarray from quote ``dict``.
 | 
			
		||||
    '''
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    if last_fill:
 | 
			
		||||
        # new fill bby
 | 
			
		||||
        now = timestamp(last_fill, unit='s')
 | 
			
		||||
| 
						 | 
				
			
			@ -82,7 +97,7 @@ def quote_to_marketstore_structarray(
 | 
			
		|||
        # this should get inserted upstream by the broker-client to
 | 
			
		||||
        # subtract from IPC latency
 | 
			
		||||
        now = time.time_ns()
 | 
			
		||||
        
 | 
			
		||||
 | 
			
		||||
    secs, ns = now / 10**9, now % 10**9
 | 
			
		||||
 | 
			
		||||
    # pack into list[tuple[str, Any]]
 | 
			
		||||
| 
						 | 
				
			
			@ -100,7 +115,8 @@ def quote_to_marketstore_structarray(
 | 
			
		|||
            # for ``np.int`` we use 0 as a null value
 | 
			
		||||
            none = 0
 | 
			
		||||
 | 
			
		||||
        val = quote.get(name, none)
 | 
			
		||||
        # casefold? see https://github.com/alpacahq/marketstore/issues/324
 | 
			
		||||
        val = quote.get(name.casefold(), none)
 | 
			
		||||
        array_input.append(val)
 | 
			
		||||
 | 
			
		||||
    return np.array([tuple(array_input)], dtype=_quote_dt)
 | 
			
		||||
| 
						 | 
				
			
			@ -119,6 +135,7 @@ def timestamp(date, **kwargs) -> int:
 | 
			
		|||
async def get_client(
 | 
			
		||||
    host: str = 'localhost',
 | 
			
		||||
    port: int = 5995
 | 
			
		||||
 | 
			
		||||
) -> MarketstoreClient:
 | 
			
		||||
    async with open_marketstore_client(host, port) as client:
 | 
			
		||||
        yield client
 | 
			
		||||
| 
						 | 
				
			
			@ -145,129 +162,184 @@ async def ingest_quote_stream(
 | 
			
		|||
                for tick in quote.get('ticks', ()):
 | 
			
		||||
                    ticktype = tick.get('type', 'n/a')
 | 
			
		||||
 | 
			
		||||
                    if ticktype == 'n/a':
 | 
			
		||||
                        # okkk..
 | 
			
		||||
                        continue
 | 
			
		||||
            # _quote_dt = [
 | 
			
		||||
            #     # these two are required for as a "primary key"
 | 
			
		||||
            #    ('Epoch', 'i8'),
 | 
			
		||||
            #    ('Nanoseconds', 'i4'),
 | 
			
		||||
            #    ('Tick', 'i4'),
 | 
			
		||||
            #
 | 
			
		||||
            #    ('Last', 'f4'),
 | 
			
		||||
            #    ('Bid', 'f4'),
 | 
			
		||||
            #    ('Bsize', 'f4'),
 | 
			
		||||
            #    ('Asize', 'f4'),
 | 
			
		||||
            #    ('Ask', 'f4'),
 | 
			
		||||
            #    ('Size', 'i8'),
 | 
			
		||||
            #    ('Volume', 'f4'),
 | 
			
		||||
            # ]
 | 
			
		||||
 | 
			
		||||
                    array = quote_to_marketstore_structarray({
 | 
			
		||||
                        'IsTrade': 1 if ticktype == 'trade' else 0,
 | 
			
		||||
                        'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
 | 
			
		||||
                        'Price': tick.get('price'),
 | 
			
		||||
                        'Size': tick.get('size')
 | 
			
		||||
                    }, last_fill=quote.get('broker_ts', None))
 | 
			
		||||
            # techtonic tick write
 | 
			
		||||
            array = quote_to_marketstore_structarray({
 | 
			
		||||
                'IsTrade': 1 if ticktype == 'trade' else 0,
 | 
			
		||||
                'IsBid': 1 if ticktype in ('bid', 'bsize') else 0,
 | 
			
		||||
                'Price': tick.get('price'),
 | 
			
		||||
                'Size': tick.get('size')
 | 
			
		||||
            }, last_fill=quote.get('broker_ts', None))
 | 
			
		||||
 | 
			
		||||
                    await ms_client.write(array, _tick_tbk)
 | 
			
		||||
            await ms_client.write(array, _tick_tbk)
 | 
			
		||||
 | 
			
		||||
            quote_cache = {
 | 
			
		||||
                'size': 0,
 | 
			
		||||
                'tick': 0
 | 
			
		||||
            }
 | 
			
		||||
            # start ingest to marketstore
 | 
			
		||||
            async for quotes in feed.stream:
 | 
			
		||||
                log.info(quotes)
 | 
			
		||||
                for symbol, quote in quotes.items():
 | 
			
		||||
 | 
			
		||||
                    for tick in quote.get('ticks', ()):
 | 
			
		||||
                        ticktype = tick.get('type')
 | 
			
		||||
                        price = tick.get('price')
 | 
			
		||||
                        size = tick.get('size')
 | 
			
		||||
 | 
			
		||||
                        if ticktype == 'n/a' or price == -1:
 | 
			
		||||
                            # okkk..
 | 
			
		||||
                            continue
 | 
			
		||||
 | 
			
		||||
                        # clearing price event
 | 
			
		||||
                        if ticktype == 'trade':
 | 
			
		||||
                            quote_cache['volume'] = quote['volume']
 | 
			
		||||
                            quote_cache['last'] = price
 | 
			
		||||
                            # quote_cache['broker_ts'] = quote['broker_ts']
 | 
			
		||||
 | 
			
		||||
                        # l1 book events
 | 
			
		||||
                        elif ticktype in ('ask', 'asize'):
 | 
			
		||||
                            quote_cache['ask'] = price
 | 
			
		||||
                            quote_cache['asize'] = size
 | 
			
		||||
 | 
			
		||||
                        elif ticktype in ('bid', 'bsize'):
 | 
			
		||||
                            quote_cache['bid'] = price
 | 
			
		||||
                            quote_cache['bsize'] = size
 | 
			
		||||
 | 
			
		||||
                    a = quote_to_marketstore_structarray(
 | 
			
		||||
                        quote_cache,
 | 
			
		||||
                        last_fill=quote.get('broker_ts', None)
 | 
			
		||||
                    )
 | 
			
		||||
                    log.info(a)
 | 
			
		||||
                    # breakpoint()
 | 
			
		||||
                    await ms_client.write(symbol, a)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# async def stream_quotes(
 | 
			
		||||
#     symbols: list[str],
 | 
			
		||||
#     timeframe: str = '1Min',
 | 
			
		||||
#     attr_group: str = 'TICK',
 | 
			
		||||
#     host: str = 'localhost',
 | 
			
		||||
#     port: int = 5993,
 | 
			
		||||
#     loglevel: str = None
 | 
			
		||||
 | 
			
		||||
# ) -> None:
 | 
			
		||||
#     '''
 | 
			
		||||
#     Open a symbol stream from a running instance of marketstore and
 | 
			
		||||
#     log to console.
 | 
			
		||||
 | 
			
		||||
#     '''
 | 
			
		||||
#     tbks: dict[str, str] = {
 | 
			
		||||
#         sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def stream_quotes(
 | 
			
		||||
    symbols: list[str],
 | 
			
		||||
    timeframe: str = '1Min',
 | 
			
		||||
    attr_group: str = 'TICK',
 | 
			
		||||
    host: str = 'localhost',
 | 
			
		||||
    port: int = 5993,
 | 
			
		||||
    loglevel: str = None
 | 
			
		||||
    diff_cached: bool = True,
 | 
			
		||||
    loglevel: str = None,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
    Open a symbol stream from a running instance of marketstore and
 | 
			
		||||
    log to console.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
			
		||||
    get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
			
		||||
 | 
			
		||||
    tbks: dict[str, str] = {
 | 
			
		||||
        sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
 | 
			
		||||
    tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
 | 
			
		||||
 | 
			
		||||
    async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
 | 
			
		||||
        # send subs topics to server
 | 
			
		||||
        resp = await ws.send_message(
 | 
			
		||||
            msgpack.dumps({'streams': list(tbks.values())})
 | 
			
		||||
        )
 | 
			
		||||
        log.info(resp)
 | 
			
		||||
 | 
			
		||||
        async def recv() -> dict[str, Any]:
 | 
			
		||||
            return msgpack.loads((await ws.get_message()), encoding='utf-8')
 | 
			
		||||
 | 
			
		||||
# async def stream_quotes(
 | 
			
		||||
#     symbols: list[str],
 | 
			
		||||
#     host: str = 'localhost',
 | 
			
		||||
#     port: int = 5993,
 | 
			
		||||
#     diff_cached: bool = True,
 | 
			
		||||
#     loglevel: str = None,
 | 
			
		||||
# ) -> None:
 | 
			
		||||
#     """Open a symbol stream from a running instance of marketstore and
 | 
			
		||||
#     log to console.
 | 
			
		||||
#     """
 | 
			
		||||
#     # XXX: required to propagate ``tractor`` loglevel to piker logging
 | 
			
		||||
#     get_console_log(loglevel or tractor.current_actor().loglevel)
 | 
			
		||||
# 
 | 
			
		||||
#     tbks: dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols}
 | 
			
		||||
# 
 | 
			
		||||
#     async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
 | 
			
		||||
#         # send subs topics to server
 | 
			
		||||
#         resp = await ws.send_message(
 | 
			
		||||
#             msgpack.dumps({'streams': list(tbks.values())})
 | 
			
		||||
#         )
 | 
			
		||||
#         log.info(resp)
 | 
			
		||||
# 
 | 
			
		||||
#         async def recv() -> dict[str, Any]:
 | 
			
		||||
#             return msgpack.loads((await ws.get_message()), encoding='utf-8')
 | 
			
		||||
# 
 | 
			
		||||
#         streams = (await recv())['streams']
 | 
			
		||||
#         log.info(f"Subscribed to {streams}")
 | 
			
		||||
# 
 | 
			
		||||
#         _cache = {}
 | 
			
		||||
# 
 | 
			
		||||
#         while True:
 | 
			
		||||
#             msg = await recv()
 | 
			
		||||
# 
 | 
			
		||||
#             # unpack symbol and quote data
 | 
			
		||||
#             # key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
 | 
			
		||||
#             symbol = msg['key'].split('/')[0]
 | 
			
		||||
#             data = msg['data']
 | 
			
		||||
# 
 | 
			
		||||
#             # calc time stamp(s)
 | 
			
		||||
#             s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
 | 
			
		||||
#             ts = s * 10**9 + ns
 | 
			
		||||
#             data['broker_fill_time_ns'] = ts
 | 
			
		||||
# 
 | 
			
		||||
#             quote = {}
 | 
			
		||||
#             for k, v in data.items():
 | 
			
		||||
#                 if isnan(v):
 | 
			
		||||
#                     continue
 | 
			
		||||
# 
 | 
			
		||||
#                 quote[k.lower()] = v
 | 
			
		||||
# 
 | 
			
		||||
#             quote['symbol'] = symbol
 | 
			
		||||
# 
 | 
			
		||||
#             quotes = {}
 | 
			
		||||
# 
 | 
			
		||||
#             if diff_cached:
 | 
			
		||||
#                 last = _cache.setdefault(symbol, {})
 | 
			
		||||
#                 new = set(quote.items()) - set(last.items())
 | 
			
		||||
#                 if new:
 | 
			
		||||
#                     log.info(f"New quote {quote['symbol']}:\n{new}")
 | 
			
		||||
# 
 | 
			
		||||
#                     # only ship diff updates and other required fields
 | 
			
		||||
#                     payload = {k: quote[k] for k, v in new}
 | 
			
		||||
#                     payload['symbol'] = symbol
 | 
			
		||||
# 
 | 
			
		||||
#                     # if there was volume likely the last size of
 | 
			
		||||
#                     # shares traded is useful info and it's possible
 | 
			
		||||
#                     # that the set difference from above will disregard
 | 
			
		||||
#                     # a "size" value since the same # of shares were traded
 | 
			
		||||
#                     size = quote.get('size')
 | 
			
		||||
#                     volume = quote.get('volume')
 | 
			
		||||
#                     if size and volume:
 | 
			
		||||
#                         new_volume_since_last = max(
 | 
			
		||||
#                             volume - last.get('volume', 0), 0)
 | 
			
		||||
#                         log.warning(
 | 
			
		||||
#                             f"NEW VOLUME {symbol}:{new_volume_since_last}")
 | 
			
		||||
#                         payload['size'] = size
 | 
			
		||||
#                         payload['last'] = quote.get('last')
 | 
			
		||||
# 
 | 
			
		||||
#                     # XXX: we append to a list for the options case where the
 | 
			
		||||
#                     # subscription topic (key) is the same for all
 | 
			
		||||
#                     # expiries even though this is uncessary for the
 | 
			
		||||
#                     # stock case (different topic [i.e. symbol] for each
 | 
			
		||||
#                     # quote).
 | 
			
		||||
#                     quotes.setdefault(symbol, []).append(payload)
 | 
			
		||||
# 
 | 
			
		||||
#                     # update cache
 | 
			
		||||
#                     _cache[symbol].update(quote)
 | 
			
		||||
#             else:
 | 
			
		||||
#                 quotes = {
 | 
			
		||||
#                     symbol: [{key.lower(): val for key, val in quote.items()}]}
 | 
			
		||||
# 
 | 
			
		||||
#             if quotes:
 | 
			
		||||
#                 yield quotes
 | 
			
		||||
        streams = (await recv())['streams']
 | 
			
		||||
        log.info(f"Subscribed to {streams}")
 | 
			
		||||
 | 
			
		||||
        _cache = {}
 | 
			
		||||
 | 
			
		||||
        while True:
 | 
			
		||||
            msg = await recv()
 | 
			
		||||
 | 
			
		||||
            # unpack symbol and quote data
 | 
			
		||||
            # key is in format ``<SYMBOL>/<TIMEFRAME>/<ID>``
 | 
			
		||||
            symbol = msg['key'].split('/')[0]
 | 
			
		||||
            data = msg['data']
 | 
			
		||||
 | 
			
		||||
            # calc time stamp(s)
 | 
			
		||||
            s, ns = data.pop('Epoch'), data.pop('Nanoseconds')
 | 
			
		||||
            ts = s * 10**9 + ns
 | 
			
		||||
            data['broker_fill_time_ns'] = ts
 | 
			
		||||
 | 
			
		||||
            quote = {}
 | 
			
		||||
            for k, v in data.items():
 | 
			
		||||
                if isnan(v):
 | 
			
		||||
                    continue
 | 
			
		||||
 | 
			
		||||
                quote[k.lower()] = v
 | 
			
		||||
 | 
			
		||||
            quote['symbol'] = symbol
 | 
			
		||||
 | 
			
		||||
            quotes = {}
 | 
			
		||||
 | 
			
		||||
            if diff_cached:
 | 
			
		||||
                last = _cache.setdefault(symbol, {})
 | 
			
		||||
                new = set(quote.items()) - set(last.items())
 | 
			
		||||
                if new:
 | 
			
		||||
                    log.info(f"New quote {quote['symbol']}:\n{new}")
 | 
			
		||||
 | 
			
		||||
                    # only ship diff updates and other required fields
 | 
			
		||||
                    payload = {k: quote[k] for k, v in new}
 | 
			
		||||
                    payload['symbol'] = symbol
 | 
			
		||||
 | 
			
		||||
                    # if there was volume likely the last size of
 | 
			
		||||
                    # shares traded is useful info and it's possible
 | 
			
		||||
                    # that the set difference from above will disregard
 | 
			
		||||
                    # a "size" value since the same # of shares were traded
 | 
			
		||||
                    size = quote.get('size')
 | 
			
		||||
                    volume = quote.get('volume')
 | 
			
		||||
                    if size and volume:
 | 
			
		||||
                        new_volume_since_last = max(
 | 
			
		||||
                            volume - last.get('volume', 0), 0)
 | 
			
		||||
                        log.warning(
 | 
			
		||||
                            f"NEW VOLUME {symbol}:{new_volume_since_last}")
 | 
			
		||||
                        payload['size'] = size
 | 
			
		||||
                        payload['last'] = quote.get('last')
 | 
			
		||||
 | 
			
		||||
                    # XXX: we append to a list for the options case where the
 | 
			
		||||
                    # subscription topic (key) is the same for all
 | 
			
		||||
                    # expiries even though this is uncessary for the
 | 
			
		||||
                    # stock case (different topic [i.e. symbol] for each
 | 
			
		||||
                    # quote).
 | 
			
		||||
                    quotes.setdefault(symbol, []).append(payload)
 | 
			
		||||
 | 
			
		||||
                    # update cache
 | 
			
		||||
                    _cache[symbol].update(quote)
 | 
			
		||||
            else:
 | 
			
		||||
                quotes = {
 | 
			
		||||
                    symbol: [{key.lower(): val for key, val in quote.items()}]}
 | 
			
		||||
 | 
			
		||||
            if quotes:
 | 
			
		||||
                yield quotes
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue