From ad9d6457823737fb4670f8257e512d5c7bfdab0e Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Thu, 2 Mar 2023 23:01:17 -0500 Subject: [PATCH] WIP - setup basic history and streaming client --- piker/brokers/__init__.py | 3 +- piker/brokers/kucoin.py | 226 +++++++++++++++++++++++++++++++------- piker/data/cryptofeeds.py | 226 +++++++++++++++----------------------- 3 files changed, 273 insertions(+), 182 deletions(-) diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index a35e4aea..c67f4003 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -24,7 +24,7 @@ __brokers__ = [ 'binance', 'ib', 'kraken', - + 'kucoin' # broken but used to work # 'questrade', # 'robinhood', @@ -35,7 +35,6 @@ __brokers__ = [ # iex # deribit - # kucoin # bitso ] diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 75d72037..edf5d508 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -15,89 +15,196 @@ """ +from dataclasses import field from typing import Any, Optional, Literal from contextlib import asynccontextmanager as acm +from datetime import datetime +import time +import math +from os import path, walk import asks import tractor import trio from trio_typing import TaskStatus from fuzzywuzzy import process as fuzzy -from cryptofeed.defines import ( - KUCOIN, - TRADES, - L2_BOOK +from cryptofeed.defines import KUCOIN, TRADES, L2_BOOK +from cryptofeed.symbols import Symbol +import pendulum +import numpy as np +from piker.data.cryptofeeds import ( + fqsn_to_cf_sym, + mk_stream_quotes, + get_config, ) -from piker.data.cryptofeeds import mk_stream_quotes from piker._cacheables import open_cached_client from piker.log import get_logger -from ._util import SymbolNotFound +from piker.pp import config +from ._util import DataUnavailable _spawn_kwargs = { "infect_asyncio": True, } log = get_logger(__name__) +_ohlc_dtype = [ + ('index', int), + ('time', int), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', float), + ('bar_wap', float), # will be zeroed by sampler if not filled +] + class Client: def __init__(self) -> None: - self._pairs: dict[str, Any] = None + self._pairs: dict[str, Symbol] = {} + self._bars: list[list] = [] # TODO" Shouldn't have to write kucoin twice here - # config = get_config('kucoin').get('kucoin', {}) + config = get_config("kucoin").get("kucoin", {}) # - # if ('key_id' in config) and ('key_secret' in config): - # self._key_id = config['key_id'] - # self._key_secret = config['key_secret'] - # - # else: - # self._key_id = None - # self._key_secret = None - async def request(self, action: Literal["POST", "GET", "PUT", "DELETE"], route: str, api_v: str = 'v2'): + if ("key_id" in config) and ("key_secret" in config): + self._key_id = config["key_id"] + self._key_secret = config["key_secret"] + + else: + self._key_id = None + self._key_secret = None + + async def _request( + self, + action: Literal["POST", "GET", "PUT", "DELETE"], + route: str, + api_v: str = "v2", + ) -> Any: api_url = f"https://api.kucoin.com/api/{api_v}{route}" res = await asks.request(action, api_url) - return res.json()['data'] + #breakpoint() + try: + return res.json()["data"] + except KeyError as e: + print(f'KUCOIN ERROR: {res.json()["msg"]}') + breakpoint() - async def symbol_info( + async def get_pairs( self, - sym: str = None, ) -> dict[str, Any]: - if self._pairs: return self._pairs - entries = await self.request("GET", "/symbols") - if not entries: - raise SymbolNotFound(f'{sym} not found') - - syms = {item['name']: item for item in entries} + entries = await self._request("GET", "/symbols") + syms = {item["name"]: item for item in entries} return syms - async def cache_symbols( + async def cache_pairs( self, - ) -> dict: + normalize: bool = True, + ) -> dict[str, Symbol]: if not self._pairs: - self._pairs = await self.symbol_info() - + self._pairs = await self.get_pairs() + if normalize: + self._pairs = self.normalize_pairs(self._pairs) return self._pairs + def normalize_pairs(self, pairs: dict[str, Symbol]) -> dict[str, Symbol]: + """ + Map crypfeeds symbols to fqsn strings + + """ + norm_pairs = {} + + for key, value in pairs.items(): + fqsn = key.lower().replace("-", "") + norm_pairs[fqsn] = value + + return norm_pairs + async def search_symbols( self, pattern: str, limit: int = 30, ) -> dict[str, Any]: - data = await self.symbol_info() + data = await self.get_pairs() matches = fuzzy.extractBests(pattern, data, score_cutoff=35, limit=limit) # repack in dict form - return {item[0]["instrument_name"].lower(): item[0] for item in matches} + return {item[0]["name"].lower(): item[0] for item in matches} + + async def get_bars( + self, + fqsn: str, + start_dt: Optional[datetime] = None, + end_dt: Optional[datetime] = None, + limit: int = 1000, + as_np: bool = True, + type: str = "1min", + ): + if len(self._bars): + return self._bars + + if end_dt is None: + end_dt = pendulum.now("UTC").add(minutes=1) + + if start_dt is None: + start_dt = end_dt.start_of("minute").subtract(minutes=limit) + + # Format datetime to unix + start_dt = math.trunc(time.mktime(start_dt.timetuple())) + end_dt = math.trunc(time.mktime(end_dt.timetuple())) + kucoin_sym = fqsn_to_cf_sym(fqsn, self._pairs) + url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}" + + bars = await self._request( + "GET", + url, + api_v="v1", + ) + + new_bars = [] + for i, bar in enumerate(bars[::-1]): + # TODO: implement struct/typecasting/validation here + + data = { + 'index': i, + 'time': bar[0], + 'open': bar[1], + 'close': bar[2], + 'high': bar[3], + 'low': bar[4], + 'volume': bar[5], + 'amount': bar [6], + 'bar_wap': 0.0, + } + + row = [] + for j, (field_name, field_type) in enumerate(_ohlc_dtype): + + value = data[field_name] + match field_name: + case 'index' | 'time': + row.append(int(value)) + # case 'time': + # dt_from_unix_ts = datetime.utcfromtimestamp(int(value)) + # # convert unix time to epoch seconds + # row.append(int(dt_from_unix_ts.timestamp())) + case _: + row.append(float(value)) + + new_bars.append(tuple(row)) + + self._bars = array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars + return array @acm async def get_client(): client = Client() # Do we need to open a nursery here? - await client.cache_symbols() + await client.cache_pairs() yield client @@ -107,13 +214,13 @@ async def open_symbol_search( ): async with open_cached_client("kucoin") as client: # load all symbols locally for fast search - cache = await client.cache_symbols() + cache = await client.cache_pairs() await ctx.started() - # async with ctx.open_stream() as stream: - # async for pattern in stream: - # # repack in dict form - # await stream.send(await client.search_symbols(pattern)) + async with ctx.open_stream() as stream: + async for pattern in stream: + # repack in dict form + await stream.send(await client.search_symbols(pattern)) async def stream_quotes( @@ -126,7 +233,7 @@ async def stream_quotes( ): return await mk_stream_quotes( KUCOIN, - [L2_BOOK], + [L2_BOOK, TRADES], send_chan, symbols, feed_is_live, @@ -134,5 +241,44 @@ async def stream_quotes( task_status, ) -async def open_history_client(): - + +@acm +async def open_history_client( + symbol: str, + type: str = "1m", +): + async with open_cached_client("kucoin") as client: + # call bars on kucoin + async def get_ohlc_history( + timeframe: float, + end_dt: datetime | None = None, + start_dt: datetime | None = None, + ) -> tuple[ + np.ndarray, + datetime | None, # start + datetime | None, # end + ]: + if timeframe != 60: + raise DataUnavailable('Only 1m bars are supported') + + array = await client.get_bars( + symbol, + start_dt=start_dt, + end_dt=end_dt, + ) + + times = array['time'] + + if ( + end_dt is None + ): + inow = round(time.time()) + print(f'difference in time between load and processing {inow - times[-1]}') + if (inow - times[-1]) > 60: + await tractor.breakpoint() + + start_dt = pendulum.from_timestamp(times[0]) + end_dt = pendulum.from_timestamp(times[-1]) + return array, start_dt, end_dt + + yield get_ohlc_history, {'erlangs': 3, 'rate': 3} diff --git a/piker/data/cryptofeeds.py b/piker/data/cryptofeeds.py index 931207c6..727c3a3c 100644 --- a/piker/data/cryptofeeds.py +++ b/piker/data/cryptofeeds.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) +# Copyright (C) Jared Goldman (in stewardship for piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -35,6 +35,7 @@ from tractor import to_asyncio from cryptofeed import FeedHandler from cryptofeed.defines import TRADES, L2_BOOK from cryptofeed.symbols import Symbol +from cryptofeed.types import OrderBook import asyncio from piker._cacheables import open_cached_client @@ -53,84 +54,34 @@ _spawn_kwargs = { log = get_logger(__name__) -def deribit_timestamp(when): - return int((when.timestamp() * 1000) + (when.microsecond / 1000)) +def fqsn_to_cb_sym(pair_data: Symbol) -> Symbol: + return Symbol(base=pair_data["baseCurrency"], quote=pair_data["quoteCurrency"]) -# def str_to_cb_sym(name: str) -> Symbol: -# base, strike_price, expiry_date, option_type = name.split("-") -# -# quote = base -# -# if option_type == "put": -# option_type = PUT -# elif option_type == "call": -# option_type = CALL -# else: -# raise Exception("Couldn't parse option type") -# -# return Symbol( -# base, -# quote, -# type=OPTION, -# strike_price=strike_price, -# option_type=option_type, -# expiry_date=expiry_date, -# expiry_normalize=False, -# ) -# +def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, Symbol]) -> str: + pair_data = pairs[fqsn] + return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"] -def piker_sym_to_cb_sym(symbol) -> Symbol: - return Symbol( - base=symbol['baseCurrency'], - quote=symbol['quoteCurrency'] - ) +def pair_data_to_cf_sym(sym_data: Symbol): + return sym_data["baseCurrency"] + "-" + sym_data["quoteCurrency"] -def cb_sym_to_deribit_inst(sym: Symbol): - # cryptofeed normalized - cb_norm = ["F", "G", "H", "J", "K", "M", "N", "Q", "U", "V", "X", "Z"] - - # deribit specific - months = [ - "JAN", - "FEB", - "MAR", - "APR", - "MAY", - "JUN", - "JUL", - "AUG", - "SEP", - "OCT", - "NOV", - "DEC", - ] - - exp = sym.expiry_date - - # YYMDD - # 01234 - year, month, day = (exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) - - otype = "C" if sym.option_type == CALL else "P" - - return f"{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}" +def cf_sym_to_fqsn(sym: str) -> str: + return sym.lower().replace("-", "") def get_config(exchange: str) -> dict[str, Any]: conf, path = config.load() section = conf.get(exchange.lower()) - breakpoint() # TODO: document why we send this, basically because logging params for cryptofeed conf["log"] = {} conf["log"]["disabled"] = True if section is None: - log.warning(f"No config section found for deribit in {path}") + log.warning(f"No config section found for deribit in {exchange}") return conf @@ -145,64 +96,53 @@ async def mk_stream_quotes( # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) sym = symbols[0] async with (open_cached_client(exchange.lower()) as client, send_chan as send_chan): - # create init message here + pairs = await client.cache_pairs() - cache = await client.cache_symbols() - - cf_syms = {} - for key, value in cache.items(): - cf_sym = key.lower().replace('-', '') - cf_syms[cf_sym] = value - - cf_sym = cf_syms[sym] - - async with maybe_open_price_feed(cf_sym, exchange, channels) as stream: + pair_data = pairs[sym] + async with maybe_open_price_feed(pair_data, exchange, channels) as stream: init_msgs = { # pass back token, and bool, signalling if we're the writer # and that history has been written sym: { - 'symbol_info': { - 'asset_type': 'crypto', - 'price_tick_size': 0.0005 - }, - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym, + "symbol_info": {"asset_type": "crypto", "price_tick_size": 0.0005}, + "shm_write_opts": {"sum_tick_vml": False}, + "fqsn": sym, }, } # broker schemas to validate symbol data - quote_msg = {"symbol": cf_sym["name"], "last": 0, "ticks": []} - + quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []} task_status.started((init_msgs, quote_msg)) feed_is_live.set() async for typ, quote in stream: + print(f'streaming {typ} quote: {quote}') topic = quote["symbol"] await send_chan.send({topic: quote}) + @acm async def maybe_open_price_feed( - symbol, exchange, channels + pair_data: Symbol, exchange: str, channels ) -> trio.abc.ReceiveStream: # TODO: add a predicate to maybe_open_context # TODO: ensure we can dynamically pass down args here async with maybe_open_context( acm_func=open_price_feed, kwargs={ - "symbol": symbol, + "pair_data": pair_data, "exchange": exchange, "channels": channels, }, - key=symbol['name'], + key=pair_data["name"], ) as (cache_hit, feed): if cache_hit: yield broadcast_receiver(feed, 10) @@ -211,10 +151,12 @@ async def maybe_open_price_feed( @acm -async def open_price_feed(symbol: str, exchange, channels) -> trio.abc.ReceiveStream: +async def open_price_feed( + pair_data: Symbol, exchange, channels +) -> trio.abc.ReceiveStream: async with maybe_open_feed_handler(exchange) as fh: async with to_asyncio.open_channel_from( - partial(aio_price_feed_relay, exchange, channels, fh, symbol) + partial(aio_price_feed_relay, pair_data, exchange, channels, fh) ) as (first, chan): yield chan @@ -224,7 +166,7 @@ async def maybe_open_feed_handler(exchange: str) -> trio.abc.ReceiveStream: async with maybe_open_context( acm_func=open_feed_handler, kwargs={ - 'exchange': exchange, + "exchange": exchange, }, key="feedhandler", ) as (cache_hit, fh): @@ -239,74 +181,78 @@ async def open_feed_handler(exchange: str): async def aio_price_feed_relay( + pair_data: Symbol, exchange: str, channels: list[str], fh: FeedHandler, - symbol: Symbol, from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, ) -> None: async def _trade(data: dict, receipt_timestamp): - breakpoint() - # to_trio.send_nowait( - # ( - # "trade", - # { - # "symbol": cb_sym_to_deribit_inst( - # str_to_cb_sym(data.symbol) - # ).lower(), - # "last": data, - # "broker_ts": time.time(), - # "data": data.to_dict(), - # "receipt": receipt_timestamp, - # }, - # ) - # ) + print(f' trade data: {data}') + to_trio.send_nowait( + ( + "trade", + { + "symbol": cf_sym_to_fqsn(data.symbol), + "last": float(data.to_dict()['price']), + "broker_ts": time.time(), + "data": data.to_dict(), + "receipt": receipt_timestamp, + }, + ) + ) async def _l1(data: dict, receipt_timestamp): - breakpoint() - # to_trio.send_nowait( - # ( - # "l1", - # { - # "symbol": cb_sym_to_deribit_inst( - # str_to_cb_sym(data.symbol) - # ).lower(), - # "ticks": [ - # { - # "type": "bid", - # "price": float(data.bid_price), - # "size": float(data.bid_size), - # }, - # { - # "type": "bsize", - # "price": float(data.bid_price), - # "size": float(data.bid_size), - # }, - # { - # "type": "ask", - # "price": float(data.ask_price), - # "size": float(data.ask_size), - # }, - # { - # "type": "asize", - # "price": float(data.ask_price), - # "size": float(data.ask_size), - # }, - # ], - # }, - # ) - # ) + print(f'l2 data: {data}') + bid = data.book.to_dict()['bid'] + ask = data.book.to_dict()['ask'] + l1_ask_price, l1_ask_size = next(iter(ask.items())) + l1_bid_price, l1_bid_size = next(iter(bid.items())) + + to_trio.send_nowait( + ( + "l1", + { + "symbol": cf_sym_to_fqsn(data.symbol), + "ticks": [ + { + "type": "bid", + "price": float(l1_bid_price), + "size": float(l1_bid_size), + }, + { + "type": "bsize", + "price": float(l1_bid_price), + "size": float(l1_bid_size), + }, + { + "type": "ask", + "price": float(l1_ask_price), + "size": float(l1_ask_size), + }, + { + "type": "asize", + "price": float(l1_ask_price), + "size": float(l1_ask_size), + }, + ] + } + ) + ) + fh.add_feed( exchange, channels=channels, - symbols=[piker_sym_to_cb_sym(symbol)], - callbacks={TRADES: _trade, L2_BOOK: _l1}, + symbols=[pair_data_to_cf_sym(pair_data)], + callbacks={TRADES: _trade, L2_BOOK: _l1} ) if not fh.running: - fh.run(start_loop=False, install_signal_handlers=False) - + try: + fh.run(start_loop=False, install_signal_handlers=False) + except BaseExceptionGroup as e: + breakpoint() # sync with trio to_trio.send_nowait(None)