Factor data feeds endpoints into new sub-mod
							parent
							
								
									f87a2a810a
								
							
						
					
					
						commit
						d3caad6e11
					
				|  | @ -27,16 +27,16 @@ Sub-modules within break into the core functionalities: | |||
| ''' | ||||
| from .api import ( | ||||
|     get_client, | ||||
| ) | ||||
| from .feed import ( | ||||
|     open_history_client, | ||||
|     open_symbol_search, | ||||
|     stream_quotes, | ||||
| ) | ||||
| # TODO: | ||||
| # from .feed import ( | ||||
| # ) | ||||
| from .broker import ( | ||||
|     trades_dialogue, | ||||
|     # TODO: | ||||
| 
 | ||||
|     # TODO: part of pps/ledger work | ||||
|     # norm_trade_records, | ||||
| ) | ||||
| 
 | ||||
|  | @ -52,7 +52,6 @@ __all__ = [ | |||
| # tractor RPC enable arg | ||||
| __enable_modules__: list[str] = [ | ||||
|     'api', | ||||
|     # TODO: | ||||
|     # 'feed', | ||||
|     'feed', | ||||
|     'broker', | ||||
| ] | ||||
|  |  | |||
|  | @ -19,38 +19,34 @@ Kraken web API wrapping. | |||
| 
 | ||||
| ''' | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from dataclasses import asdict, field | ||||
| from dataclasses import field | ||||
| from datetime import datetime | ||||
| from typing import Any, Optional, Callable, Union | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Optional, | ||||
|     Union, | ||||
| ) | ||||
| import time | ||||
| 
 | ||||
| from trio_typing import TaskStatus | ||||
| import trio | ||||
| import pendulum | ||||
| import asks | ||||
| from fuzzywuzzy import process as fuzzy | ||||
| import numpy as np | ||||
| import tractor | ||||
| from pydantic.dataclasses import dataclass | ||||
| from pydantic import BaseModel | ||||
| import wsproto | ||||
| import urllib.parse | ||||
| import hashlib | ||||
| import hmac | ||||
| import base64 | ||||
| 
 | ||||
| from piker import config | ||||
| from piker._cacheables import open_cached_client | ||||
| from piker.brokers._util import ( | ||||
|     resproc, | ||||
|     SymbolNotFound, | ||||
|     BrokerError, | ||||
|     DataThrottle, | ||||
|     DataUnavailable, | ||||
| ) | ||||
| from piker.log import get_logger, get_console_log | ||||
| from piker.data import ShmArray | ||||
| from piker.data._web_bs import open_autorecon_ws, NoBsWs | ||||
| from piker.log import get_logger | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
|  | @ -76,47 +72,11 @@ _ohlc_dtype = [ | |||
| ohlc_dtype = np.dtype(_ohlc_dtype) | ||||
| 
 | ||||
| _show_wap_in_history = True | ||||
| 
 | ||||
| 
 | ||||
| _symbol_info_translation: dict[str, str] = { | ||||
|     'tick_decimals': 'pair_decimals', | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| # https://www.kraken.com/features/api#get-tradable-pairs | ||||
| class Pair(BaseModel): | ||||
|     altname: str  # alternate pair name | ||||
|     wsname: str  # WebSocket pair name (if available) | ||||
|     aclass_base: str  # asset class of base component | ||||
|     base: str  # asset id of base component | ||||
|     aclass_quote: str  # asset class of quote component | ||||
|     quote: str  # asset id of quote component | ||||
|     lot: str  # volume lot size | ||||
| 
 | ||||
|     pair_decimals: int  # scaling decimal places for pair | ||||
|     lot_decimals: int  # scaling decimal places for volume | ||||
| 
 | ||||
|     # amount to multiply lot volume by to get currency volume | ||||
|     lot_multiplier: float | ||||
| 
 | ||||
|     # array of leverage amounts available when buying | ||||
|     leverage_buy: list[int] | ||||
|     # array of leverage amounts available when selling | ||||
|     leverage_sell: list[int] | ||||
| 
 | ||||
|     # fee schedule array in [volume, percent fee] tuples | ||||
|     fees: list[tuple[int, float]] | ||||
| 
 | ||||
|     # maker fee schedule array in [volume, percent fee] tuples (if on | ||||
|     # maker/taker) | ||||
|     fees_maker: list[tuple[int, float]] | ||||
| 
 | ||||
|     fee_volume_currency: str  # volume discount currency | ||||
|     margin_call: str  # margin call level | ||||
|     margin_stop: str  # stop-out/liquidation margin level | ||||
|     ordermin: float  # minimum order volume for pair | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class OHLC: | ||||
|     ''' | ||||
|  | @ -485,380 +445,3 @@ def normalize_symbol( | |||
|             if sym in ticker: | ||||
|                 ticker = ticker.replace(sym, sym[1:]) | ||||
|         return ticker.lower() | ||||
| 
 | ||||
| 
 | ||||
| async def stream_messages( | ||||
|     ws: NoBsWs, | ||||
| ): | ||||
|     ''' | ||||
|     Message stream parser and heartbeat handler. | ||||
| 
 | ||||
|     Deliver ws subscription messages as well as handle heartbeat logic | ||||
|     though a single async generator. | ||||
| 
 | ||||
|     ''' | ||||
|     too_slow_count = last_hb = 0 | ||||
| 
 | ||||
|     while True: | ||||
| 
 | ||||
|         with trio.move_on_after(5) as cs: | ||||
|             msg = await ws.recv_msg() | ||||
| 
 | ||||
|         # trigger reconnection if heartbeat is laggy | ||||
|         if cs.cancelled_caught: | ||||
| 
 | ||||
|             too_slow_count += 1 | ||||
| 
 | ||||
|             if too_slow_count > 20: | ||||
|                 log.warning( | ||||
|                     "Heartbeat is too slow, resetting ws connection") | ||||
| 
 | ||||
|                 await ws._connect() | ||||
|                 too_slow_count = 0 | ||||
|                 continue | ||||
| 
 | ||||
|         if isinstance(msg, dict): | ||||
|             if msg.get('event') == 'heartbeat': | ||||
| 
 | ||||
|                 now = time.time() | ||||
|                 delay = now - last_hb | ||||
|                 last_hb = now | ||||
| 
 | ||||
|                 # XXX: why tf is this not printing without --tl flag? | ||||
|                 log.debug(f"Heartbeat after {delay}") | ||||
|                 # print(f"Heartbeat after {delay}") | ||||
| 
 | ||||
|                 continue | ||||
| 
 | ||||
|             err = msg.get('errorMessage') | ||||
|             if err: | ||||
|                 raise BrokerError(err) | ||||
|         else: | ||||
|             yield msg | ||||
| 
 | ||||
| 
 | ||||
| async def process_data_feed_msgs( | ||||
|     ws: NoBsWs, | ||||
| ): | ||||
|     ''' | ||||
|     Parse and pack data feed messages. | ||||
| 
 | ||||
|     ''' | ||||
|     async for msg in stream_messages(ws): | ||||
| 
 | ||||
|         chan_id, *payload_array, chan_name, pair = msg | ||||
| 
 | ||||
|         if 'ohlc' in chan_name: | ||||
| 
 | ||||
|             yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) | ||||
| 
 | ||||
|         elif 'spread' in chan_name: | ||||
| 
 | ||||
|             bid, ask, ts, bsize, asize = map(float, payload_array[0]) | ||||
| 
 | ||||
|             # TODO: really makes you think IB has a horrible API... | ||||
|             quote = { | ||||
|                 'symbol': pair.replace('/', ''), | ||||
|                 'ticks': [ | ||||
|                     {'type': 'bid', 'price': bid, 'size': bsize}, | ||||
|                     {'type': 'bsize', 'price': bid, 'size': bsize}, | ||||
| 
 | ||||
|                     {'type': 'ask', 'price': ask, 'size': asize}, | ||||
|                     {'type': 'asize', 'price': ask, 'size': asize}, | ||||
|                 ], | ||||
|             } | ||||
|             yield 'l1', quote | ||||
| 
 | ||||
|         # elif 'book' in msg[-2]: | ||||
|         #     chan_id, *payload_array, chan_name, pair = msg | ||||
|         #     print(msg) | ||||
| 
 | ||||
|         else: | ||||
|             print(f'UNHANDLED MSG: {msg}') | ||||
|             yield msg | ||||
| 
 | ||||
| 
 | ||||
| def normalize( | ||||
|     ohlc: OHLC, | ||||
| 
 | ||||
| ) -> dict: | ||||
|     quote = asdict(ohlc) | ||||
|     quote['broker_ts'] = quote['time'] | ||||
|     quote['brokerd_ts'] = time.time() | ||||
|     quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') | ||||
|     quote['last'] = quote['close'] | ||||
|     quote['bar_wap'] = ohlc.vwap | ||||
| 
 | ||||
|     # seriously eh? what's with this non-symmetry everywhere | ||||
|     # in subscription systems... | ||||
|     # XXX: piker style is always lowercases symbols. | ||||
|     topic = quote['pair'].replace('/', '').lower() | ||||
| 
 | ||||
|     # print(quote) | ||||
|     return topic, quote | ||||
| 
 | ||||
| 
 | ||||
| def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: | ||||
|     ''' | ||||
|     Create a request subscription packet dict. | ||||
| 
 | ||||
|     https://docs.kraken.com/websockets/#message-subscribe | ||||
| 
 | ||||
|     ''' | ||||
|     # eg. specific logic for this in kraken's sync client: | ||||
|     # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 | ||||
|     return { | ||||
|         'pair': pairs, | ||||
|         'event': 'subscribe', | ||||
|         'subscription': data, | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_history_client( | ||||
|     symbol: str, | ||||
| 
 | ||||
| ) -> tuple[Callable, int]: | ||||
| 
 | ||||
|     # TODO implement history getter for the new storage layer. | ||||
|     async with open_cached_client('kraken') as client: | ||||
| 
 | ||||
|         # lol, kraken won't send any more then the "last" | ||||
|         # 720 1m bars.. so we have to just ignore further | ||||
|         # requests of this type.. | ||||
|         queries: int = 0 | ||||
| 
 | ||||
|         async def get_ohlc( | ||||
|             end_dt: Optional[datetime] = None, | ||||
|             start_dt: Optional[datetime] = None, | ||||
| 
 | ||||
|         ) -> tuple[ | ||||
|             np.ndarray, | ||||
|             datetime,  # start | ||||
|             datetime,  # end | ||||
|         ]: | ||||
| 
 | ||||
|             nonlocal queries | ||||
|             if queries > 0: | ||||
|                 raise DataUnavailable | ||||
| 
 | ||||
|             count = 0 | ||||
|             while count <= 3: | ||||
|                 try: | ||||
|                     array = await client.bars( | ||||
|                         symbol, | ||||
|                         since=end_dt, | ||||
|                     ) | ||||
|                     count += 1 | ||||
|                     queries += 1 | ||||
|                     break | ||||
|                 except DataThrottle: | ||||
|                     log.warning(f'kraken OHLC throttle for {symbol}') | ||||
|                     await trio.sleep(1) | ||||
| 
 | ||||
|             start_dt = pendulum.from_timestamp(array[0]['time']) | ||||
|             end_dt = pendulum.from_timestamp(array[-1]['time']) | ||||
|             return array, start_dt, end_dt | ||||
| 
 | ||||
|         yield get_ohlc, {'erlangs': 1, 'rate': 1} | ||||
| 
 | ||||
| 
 | ||||
| async def backfill_bars( | ||||
| 
 | ||||
|     sym: str, | ||||
|     shm: ShmArray,  # type: ignore # noqa | ||||
|     count: int = 10,  # NOTE: any more and we'll overrun the underlying buffer | ||||
|     task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Fill historical bars into shared mem / storage afap. | ||||
|     ''' | ||||
|     with trio.CancelScope() as cs: | ||||
|         async with open_cached_client('kraken') as client: | ||||
|             bars = await client.bars(symbol=sym) | ||||
|             shm.push(bars) | ||||
|             task_status.started(cs) | ||||
| 
 | ||||
| 
 | ||||
| async def stream_quotes( | ||||
| 
 | ||||
|     send_chan: trio.abc.SendChannel, | ||||
|     symbols: list[str], | ||||
|     feed_is_live: trio.Event, | ||||
|     loglevel: str = None, | ||||
| 
 | ||||
|     # backend specific | ||||
|     sub_type: str = 'ohlc', | ||||
| 
 | ||||
|     # startup sync | ||||
|     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Subscribe for ohlc stream of quotes for ``pairs``. | ||||
| 
 | ||||
|     ``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>. | ||||
| 
 | ||||
|     ''' | ||||
|     # XXX: required to propagate ``tractor`` loglevel to piker logging | ||||
|     get_console_log(loglevel or tractor.current_actor().loglevel) | ||||
| 
 | ||||
|     ws_pairs = {} | ||||
|     sym_infos = {} | ||||
| 
 | ||||
|     async with open_cached_client('kraken') as client, send_chan as send_chan: | ||||
| 
 | ||||
|         # keep client cached for real-time section | ||||
|         for sym in symbols: | ||||
| 
 | ||||
|             # transform to upper since piker style is always lower | ||||
|             sym = sym.upper() | ||||
| 
 | ||||
|             si = Pair(**await client.symbol_info(sym))  # validation | ||||
|             syminfo = si.dict() | ||||
|             syminfo['price_tick_size'] = 1 / 10**si.pair_decimals | ||||
|             syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals | ||||
|             syminfo['asset_type'] = 'crypto' | ||||
|             sym_infos[sym] = syminfo | ||||
|             ws_pairs[sym] = si.wsname | ||||
| 
 | ||||
|         symbol = symbols[0].lower() | ||||
| 
 | ||||
|         init_msgs = { | ||||
|             # pass back token, and bool, signalling if we're the writer | ||||
|             # and that history has been written | ||||
|             symbol: { | ||||
|                 'symbol_info': sym_infos[sym], | ||||
|                 'shm_write_opts': {'sum_tick_vml': False}, | ||||
|                 'fqsn': sym, | ||||
|             }, | ||||
|         } | ||||
| 
 | ||||
|         @acm | ||||
|         async def subscribe(ws: wsproto.WSConnection): | ||||
|             # XXX: setup subs | ||||
|             # https://docs.kraken.com/websockets/#message-subscribe | ||||
|             # specific logic for this in kraken's shitty sync client: | ||||
|             # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 | ||||
|             ohlc_sub = make_sub( | ||||
|                 list(ws_pairs.values()), | ||||
|                 {'name': 'ohlc', 'interval': 1} | ||||
|             ) | ||||
| 
 | ||||
|             # TODO: we want to eventually allow unsubs which should | ||||
|             # be completely fine to request from a separate task | ||||
|             # since internally the ws methods appear to be FIFO | ||||
|             # locked. | ||||
|             await ws.send_msg(ohlc_sub) | ||||
| 
 | ||||
|             # trade data (aka L1) | ||||
|             l1_sub = make_sub( | ||||
|                 list(ws_pairs.values()), | ||||
|                 {'name': 'spread'}  # 'depth': 10} | ||||
|             ) | ||||
| 
 | ||||
|             # pull a first quote and deliver | ||||
|             await ws.send_msg(l1_sub) | ||||
| 
 | ||||
|             yield | ||||
| 
 | ||||
|             # unsub from all pairs on teardown | ||||
|             await ws.send_msg({ | ||||
|                 'pair': list(ws_pairs.values()), | ||||
|                 'event': 'unsubscribe', | ||||
|                 'subscription': ['ohlc', 'spread'], | ||||
|             }) | ||||
| 
 | ||||
|             # XXX: do we need to ack the unsub? | ||||
|             # await ws.recv_msg() | ||||
| 
 | ||||
|         # see the tips on reconnection logic: | ||||
|         # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds | ||||
|         ws: NoBsWs | ||||
|         async with open_autorecon_ws( | ||||
|             'wss://ws.kraken.com/', | ||||
|             fixture=subscribe, | ||||
|         ) as ws: | ||||
| 
 | ||||
|             # pull a first quote and deliver | ||||
|             msg_gen = process_data_feed_msgs(ws) | ||||
| 
 | ||||
|             # TODO: use ``anext()`` when it lands in 3.10! | ||||
|             typ, ohlc_last = await msg_gen.__anext__() | ||||
| 
 | ||||
|             topic, quote = normalize(ohlc_last) | ||||
| 
 | ||||
|             task_status.started((init_msgs,  quote)) | ||||
| 
 | ||||
|             # lol, only "closes" when they're margin squeezing clients ;P | ||||
|             feed_is_live.set() | ||||
| 
 | ||||
|             # keep start of last interval for volume tracking | ||||
|             last_interval_start = ohlc_last.etime | ||||
| 
 | ||||
|             # start streaming | ||||
|             async for typ, ohlc in msg_gen: | ||||
| 
 | ||||
|                 if typ == 'ohlc': | ||||
| 
 | ||||
|                     # TODO: can get rid of all this by using | ||||
|                     # ``trades`` subscription... | ||||
| 
 | ||||
|                     # generate tick values to match time & sales pane: | ||||
|                     # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m | ||||
|                     volume = ohlc.volume | ||||
| 
 | ||||
|                     # new OHLC sample interval | ||||
|                     if ohlc.etime > last_interval_start: | ||||
|                         last_interval_start = ohlc.etime | ||||
|                         tick_volume = volume | ||||
| 
 | ||||
|                     else: | ||||
|                         # this is the tick volume *within the interval* | ||||
|                         tick_volume = volume - ohlc_last.volume | ||||
| 
 | ||||
|                     ohlc_last = ohlc | ||||
|                     last = ohlc.close | ||||
| 
 | ||||
|                     if tick_volume: | ||||
|                         ohlc.ticks.append({ | ||||
|                             'type': 'trade', | ||||
|                             'price': last, | ||||
|                             'size': tick_volume, | ||||
|                         }) | ||||
| 
 | ||||
|                     topic, quote = normalize(ohlc) | ||||
| 
 | ||||
|                 elif typ == 'l1': | ||||
|                     quote = ohlc | ||||
|                     topic = quote['symbol'].lower() | ||||
| 
 | ||||
|                 await send_chan.send({topic: quote}) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def open_symbol_search( | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> Client: | ||||
|     async with open_cached_client('kraken') as client: | ||||
| 
 | ||||
|         # load all symbols locally for fast search | ||||
|         cache = await client.cache_symbols() | ||||
|         await ctx.started(cache) | ||||
| 
 | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             async for pattern in stream: | ||||
| 
 | ||||
|                 matches = fuzzy.extractBests( | ||||
|                     pattern, | ||||
|                     cache, | ||||
|                     score_cutoff=50, | ||||
|                 ) | ||||
|                 # repack in dict form | ||||
|                 await stream.send( | ||||
|                     {item[0]['altname']: item[0] | ||||
|                      for item in matches} | ||||
|                 ) | ||||
|  |  | |||
|  | @ -44,9 +44,11 @@ from .api import ( | |||
|     Client, | ||||
|     BrokerError, | ||||
|     get_client, | ||||
|     get_console_log, | ||||
|     log, | ||||
|     normalize_symbol, | ||||
| ) | ||||
| from .feed import ( | ||||
|     get_console_log, | ||||
|     open_autorecon_ws, | ||||
|     NoBsWs, | ||||
|     stream_messages, | ||||
|  |  | |||
|  | @ -0,0 +1,464 @@ | |||
| # piker: trading gear for hackers | ||||
| # Copyright (C) Tyler Goodlet (in stewardship for pikers) | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| ''' | ||||
| Real-time and historical data feed endpoints. | ||||
| 
 | ||||
| ''' | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from dataclasses import asdict | ||||
| from datetime import datetime | ||||
| from typing import ( | ||||
|     Any, | ||||
|     Optional, | ||||
|     Callable, | ||||
| ) | ||||
| import time | ||||
| 
 | ||||
| from fuzzywuzzy import process as fuzzy | ||||
| import numpy as np | ||||
| import pendulum | ||||
| from pydantic import BaseModel | ||||
| from trio_typing import TaskStatus | ||||
| import tractor | ||||
| import trio | ||||
| import wsproto | ||||
| 
 | ||||
| from piker._cacheables import open_cached_client | ||||
| from piker.brokers._util import ( | ||||
|     BrokerError, | ||||
|     DataThrottle, | ||||
|     DataUnavailable, | ||||
| ) | ||||
| from piker.log import get_console_log | ||||
| from piker.data import ShmArray | ||||
| from piker.data._web_bs import open_autorecon_ws, NoBsWs | ||||
| from .api import ( | ||||
|     Client, | ||||
|     log, | ||||
|     OHLC, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| # https://www.kraken.com/features/api#get-tradable-pairs | ||||
| class Pair(BaseModel): | ||||
|     altname: str  # alternate pair name | ||||
|     wsname: str  # WebSocket pair name (if available) | ||||
|     aclass_base: str  # asset class of base component | ||||
|     base: str  # asset id of base component | ||||
|     aclass_quote: str  # asset class of quote component | ||||
|     quote: str  # asset id of quote component | ||||
|     lot: str  # volume lot size | ||||
| 
 | ||||
|     pair_decimals: int  # scaling decimal places for pair | ||||
|     lot_decimals: int  # scaling decimal places for volume | ||||
| 
 | ||||
|     # amount to multiply lot volume by to get currency volume | ||||
|     lot_multiplier: float | ||||
| 
 | ||||
|     # array of leverage amounts available when buying | ||||
|     leverage_buy: list[int] | ||||
|     # array of leverage amounts available when selling | ||||
|     leverage_sell: list[int] | ||||
| 
 | ||||
|     # fee schedule array in [volume, percent fee] tuples | ||||
|     fees: list[tuple[int, float]] | ||||
| 
 | ||||
|     # maker fee schedule array in [volume, percent fee] tuples (if on | ||||
|     # maker/taker) | ||||
|     fees_maker: list[tuple[int, float]] | ||||
| 
 | ||||
|     fee_volume_currency: str  # volume discount currency | ||||
|     margin_call: str  # margin call level | ||||
|     margin_stop: str  # stop-out/liquidation margin level | ||||
|     ordermin: float  # minimum order volume for pair | ||||
| 
 | ||||
| 
 | ||||
| async def stream_messages( | ||||
|     ws: NoBsWs, | ||||
| ): | ||||
|     ''' | ||||
|     Message stream parser and heartbeat handler. | ||||
| 
 | ||||
|     Deliver ws subscription messages as well as handle heartbeat logic | ||||
|     though a single async generator. | ||||
| 
 | ||||
|     ''' | ||||
|     too_slow_count = last_hb = 0 | ||||
| 
 | ||||
|     while True: | ||||
| 
 | ||||
|         with trio.move_on_after(5) as cs: | ||||
|             msg = await ws.recv_msg() | ||||
| 
 | ||||
|         # trigger reconnection if heartbeat is laggy | ||||
|         if cs.cancelled_caught: | ||||
| 
 | ||||
|             too_slow_count += 1 | ||||
| 
 | ||||
|             if too_slow_count > 20: | ||||
|                 log.warning( | ||||
|                     "Heartbeat is too slow, resetting ws connection") | ||||
| 
 | ||||
|                 await ws._connect() | ||||
|                 too_slow_count = 0 | ||||
|                 continue | ||||
| 
 | ||||
|         if isinstance(msg, dict): | ||||
|             if msg.get('event') == 'heartbeat': | ||||
| 
 | ||||
|                 now = time.time() | ||||
|                 delay = now - last_hb | ||||
|                 last_hb = now | ||||
| 
 | ||||
|                 # XXX: why tf is this not printing without --tl flag? | ||||
|                 log.debug(f"Heartbeat after {delay}") | ||||
|                 # print(f"Heartbeat after {delay}") | ||||
| 
 | ||||
|                 continue | ||||
| 
 | ||||
|             err = msg.get('errorMessage') | ||||
|             if err: | ||||
|                 raise BrokerError(err) | ||||
|         else: | ||||
|             yield msg | ||||
| 
 | ||||
| 
 | ||||
| async def process_data_feed_msgs( | ||||
|     ws: NoBsWs, | ||||
| ): | ||||
|     ''' | ||||
|     Parse and pack data feed messages. | ||||
| 
 | ||||
|     ''' | ||||
|     async for msg in stream_messages(ws): | ||||
| 
 | ||||
|         chan_id, *payload_array, chan_name, pair = msg | ||||
| 
 | ||||
|         if 'ohlc' in chan_name: | ||||
| 
 | ||||
|             yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0]) | ||||
| 
 | ||||
|         elif 'spread' in chan_name: | ||||
| 
 | ||||
|             bid, ask, ts, bsize, asize = map(float, payload_array[0]) | ||||
| 
 | ||||
|             # TODO: really makes you think IB has a horrible API... | ||||
|             quote = { | ||||
|                 'symbol': pair.replace('/', ''), | ||||
|                 'ticks': [ | ||||
|                     {'type': 'bid', 'price': bid, 'size': bsize}, | ||||
|                     {'type': 'bsize', 'price': bid, 'size': bsize}, | ||||
| 
 | ||||
|                     {'type': 'ask', 'price': ask, 'size': asize}, | ||||
|                     {'type': 'asize', 'price': ask, 'size': asize}, | ||||
|                 ], | ||||
|             } | ||||
|             yield 'l1', quote | ||||
| 
 | ||||
|         # elif 'book' in msg[-2]: | ||||
|         #     chan_id, *payload_array, chan_name, pair = msg | ||||
|         #     print(msg) | ||||
| 
 | ||||
|         else: | ||||
|             print(f'UNHANDLED MSG: {msg}') | ||||
|             yield msg | ||||
| 
 | ||||
| 
 | ||||
| def normalize( | ||||
|     ohlc: OHLC, | ||||
| 
 | ||||
| ) -> dict: | ||||
|     quote = asdict(ohlc) | ||||
|     quote['broker_ts'] = quote['time'] | ||||
|     quote['brokerd_ts'] = time.time() | ||||
|     quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') | ||||
|     quote['last'] = quote['close'] | ||||
|     quote['bar_wap'] = ohlc.vwap | ||||
| 
 | ||||
|     # seriously eh? what's with this non-symmetry everywhere | ||||
|     # in subscription systems... | ||||
|     # XXX: piker style is always lowercases symbols. | ||||
|     topic = quote['pair'].replace('/', '').lower() | ||||
| 
 | ||||
|     # print(quote) | ||||
|     return topic, quote | ||||
| 
 | ||||
| 
 | ||||
| def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: | ||||
|     ''' | ||||
|     Create a request subscription packet dict. | ||||
| 
 | ||||
|     https://docs.kraken.com/websockets/#message-subscribe | ||||
| 
 | ||||
|     ''' | ||||
|     # eg. specific logic for this in kraken's sync client: | ||||
|     # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 | ||||
|     return { | ||||
|         'pair': pairs, | ||||
|         'event': 'subscribe', | ||||
|         'subscription': data, | ||||
|     } | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_history_client( | ||||
|     symbol: str, | ||||
| 
 | ||||
| ) -> tuple[Callable, int]: | ||||
| 
 | ||||
|     # TODO implement history getter for the new storage layer. | ||||
|     async with open_cached_client('kraken') as client: | ||||
| 
 | ||||
|         # lol, kraken won't send any more then the "last" | ||||
|         # 720 1m bars.. so we have to just ignore further | ||||
|         # requests of this type.. | ||||
|         queries: int = 0 | ||||
| 
 | ||||
|         async def get_ohlc( | ||||
|             end_dt: Optional[datetime] = None, | ||||
|             start_dt: Optional[datetime] = None, | ||||
| 
 | ||||
|         ) -> tuple[ | ||||
|             np.ndarray, | ||||
|             datetime,  # start | ||||
|             datetime,  # end | ||||
|         ]: | ||||
| 
 | ||||
|             nonlocal queries | ||||
|             if queries > 0: | ||||
|                 raise DataUnavailable | ||||
| 
 | ||||
|             count = 0 | ||||
|             while count <= 3: | ||||
|                 try: | ||||
|                     array = await client.bars( | ||||
|                         symbol, | ||||
|                         since=end_dt, | ||||
|                     ) | ||||
|                     count += 1 | ||||
|                     queries += 1 | ||||
|                     break | ||||
|                 except DataThrottle: | ||||
|                     log.warning(f'kraken OHLC throttle for {symbol}') | ||||
|                     await trio.sleep(1) | ||||
| 
 | ||||
|             start_dt = pendulum.from_timestamp(array[0]['time']) | ||||
|             end_dt = pendulum.from_timestamp(array[-1]['time']) | ||||
|             return array, start_dt, end_dt | ||||
| 
 | ||||
|         yield get_ohlc, {'erlangs': 1, 'rate': 1} | ||||
| 
 | ||||
| 
 | ||||
| async def backfill_bars( | ||||
| 
 | ||||
|     sym: str, | ||||
|     shm: ShmArray,  # type: ignore # noqa | ||||
|     count: int = 10,  # NOTE: any more and we'll overrun the underlying buffer | ||||
|     task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Fill historical bars into shared mem / storage afap. | ||||
|     ''' | ||||
|     with trio.CancelScope() as cs: | ||||
|         async with open_cached_client('kraken') as client: | ||||
|             bars = await client.bars(symbol=sym) | ||||
|             shm.push(bars) | ||||
|             task_status.started(cs) | ||||
| 
 | ||||
| 
 | ||||
| async def stream_quotes( | ||||
| 
 | ||||
|     send_chan: trio.abc.SendChannel, | ||||
|     symbols: list[str], | ||||
|     feed_is_live: trio.Event, | ||||
|     loglevel: str = None, | ||||
| 
 | ||||
|     # backend specific | ||||
|     sub_type: str = 'ohlc', | ||||
| 
 | ||||
|     # startup sync | ||||
|     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Subscribe for ohlc stream of quotes for ``pairs``. | ||||
| 
 | ||||
|     ``pairs`` must be formatted <crypto_symbol>/<fiat_symbol>. | ||||
| 
 | ||||
|     ''' | ||||
|     # XXX: required to propagate ``tractor`` loglevel to piker logging | ||||
|     get_console_log(loglevel or tractor.current_actor().loglevel) | ||||
| 
 | ||||
|     ws_pairs = {} | ||||
|     sym_infos = {} | ||||
| 
 | ||||
|     async with open_cached_client('kraken') as client, send_chan as send_chan: | ||||
| 
 | ||||
|         # keep client cached for real-time section | ||||
|         for sym in symbols: | ||||
| 
 | ||||
|             # transform to upper since piker style is always lower | ||||
|             sym = sym.upper() | ||||
| 
 | ||||
|             si = Pair(**await client.symbol_info(sym))  # validation | ||||
|             syminfo = si.dict() | ||||
|             syminfo['price_tick_size'] = 1 / 10**si.pair_decimals | ||||
|             syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals | ||||
|             syminfo['asset_type'] = 'crypto' | ||||
|             sym_infos[sym] = syminfo | ||||
|             ws_pairs[sym] = si.wsname | ||||
| 
 | ||||
|         symbol = symbols[0].lower() | ||||
| 
 | ||||
|         init_msgs = { | ||||
|             # pass back token, and bool, signalling if we're the writer | ||||
|             # and that history has been written | ||||
|             symbol: { | ||||
|                 'symbol_info': sym_infos[sym], | ||||
|                 'shm_write_opts': {'sum_tick_vml': False}, | ||||
|                 'fqsn': sym, | ||||
|             }, | ||||
|         } | ||||
| 
 | ||||
|         @acm | ||||
|         async def subscribe(ws: wsproto.WSConnection): | ||||
|             # XXX: setup subs | ||||
|             # https://docs.kraken.com/websockets/#message-subscribe | ||||
|             # specific logic for this in kraken's shitty sync client: | ||||
|             # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 | ||||
|             ohlc_sub = make_sub( | ||||
|                 list(ws_pairs.values()), | ||||
|                 {'name': 'ohlc', 'interval': 1} | ||||
|             ) | ||||
| 
 | ||||
|             # TODO: we want to eventually allow unsubs which should | ||||
|             # be completely fine to request from a separate task | ||||
|             # since internally the ws methods appear to be FIFO | ||||
|             # locked. | ||||
|             await ws.send_msg(ohlc_sub) | ||||
| 
 | ||||
|             # trade data (aka L1) | ||||
|             l1_sub = make_sub( | ||||
|                 list(ws_pairs.values()), | ||||
|                 {'name': 'spread'}  # 'depth': 10} | ||||
|             ) | ||||
| 
 | ||||
|             # pull a first quote and deliver | ||||
|             await ws.send_msg(l1_sub) | ||||
| 
 | ||||
|             yield | ||||
| 
 | ||||
|             # unsub from all pairs on teardown | ||||
|             await ws.send_msg({ | ||||
|                 'pair': list(ws_pairs.values()), | ||||
|                 'event': 'unsubscribe', | ||||
|                 'subscription': ['ohlc', 'spread'], | ||||
|             }) | ||||
| 
 | ||||
|             # XXX: do we need to ack the unsub? | ||||
|             # await ws.recv_msg() | ||||
| 
 | ||||
|         # see the tips on reconnection logic: | ||||
|         # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds | ||||
|         ws: NoBsWs | ||||
|         async with open_autorecon_ws( | ||||
|             'wss://ws.kraken.com/', | ||||
|             fixture=subscribe, | ||||
|         ) as ws: | ||||
| 
 | ||||
|             # pull a first quote and deliver | ||||
|             msg_gen = process_data_feed_msgs(ws) | ||||
| 
 | ||||
|             # TODO: use ``anext()`` when it lands in 3.10! | ||||
|             typ, ohlc_last = await msg_gen.__anext__() | ||||
| 
 | ||||
|             topic, quote = normalize(ohlc_last) | ||||
| 
 | ||||
|             task_status.started((init_msgs,  quote)) | ||||
| 
 | ||||
|             # lol, only "closes" when they're margin squeezing clients ;P | ||||
|             feed_is_live.set() | ||||
| 
 | ||||
|             # keep start of last interval for volume tracking | ||||
|             last_interval_start = ohlc_last.etime | ||||
| 
 | ||||
|             # start streaming | ||||
|             async for typ, ohlc in msg_gen: | ||||
| 
 | ||||
|                 if typ == 'ohlc': | ||||
| 
 | ||||
|                     # TODO: can get rid of all this by using | ||||
|                     # ``trades`` subscription... | ||||
| 
 | ||||
|                     # generate tick values to match time & sales pane: | ||||
|                     # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m | ||||
|                     volume = ohlc.volume | ||||
| 
 | ||||
|                     # new OHLC sample interval | ||||
|                     if ohlc.etime > last_interval_start: | ||||
|                         last_interval_start = ohlc.etime | ||||
|                         tick_volume = volume | ||||
| 
 | ||||
|                     else: | ||||
|                         # this is the tick volume *within the interval* | ||||
|                         tick_volume = volume - ohlc_last.volume | ||||
| 
 | ||||
|                     ohlc_last = ohlc | ||||
|                     last = ohlc.close | ||||
| 
 | ||||
|                     if tick_volume: | ||||
|                         ohlc.ticks.append({ | ||||
|                             'type': 'trade', | ||||
|                             'price': last, | ||||
|                             'size': tick_volume, | ||||
|                         }) | ||||
| 
 | ||||
|                     topic, quote = normalize(ohlc) | ||||
| 
 | ||||
|                 elif typ == 'l1': | ||||
|                     quote = ohlc | ||||
|                     topic = quote['symbol'].lower() | ||||
| 
 | ||||
|                 await send_chan.send({topic: quote}) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def open_symbol_search( | ||||
|     ctx: tractor.Context, | ||||
| 
 | ||||
| ) -> Client: | ||||
|     async with open_cached_client('kraken') as client: | ||||
| 
 | ||||
|         # load all symbols locally for fast search | ||||
|         cache = await client.cache_symbols() | ||||
|         await ctx.started(cache) | ||||
| 
 | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             async for pattern in stream: | ||||
| 
 | ||||
|                 matches = fuzzy.extractBests( | ||||
|                     pattern, | ||||
|                     cache, | ||||
|                     score_cutoff=50, | ||||
|                 ) | ||||
|                 # repack in dict form | ||||
|                 await stream.send( | ||||
|                     {item[0]['altname']: item[0] | ||||
|                      for item in matches} | ||||
|                 ) | ||||
		Loading…
	
		Reference in New Issue