From 5fdec8012dc8d7493655f7b18f37e0ecaed1d5cf Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Tue, 28 Feb 2023 12:42:37 -0500 Subject: [PATCH] Add cryptofeeds data feed module, Add Kucoin backend client wip --- piker/brokers/kucoin.py | 137 +++++++++++++++++ piker/data/cryptofeeds.py | 313 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 450 insertions(+) create mode 100644 piker/brokers/kucoin.py create mode 100644 piker/data/cryptofeeds.py diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py new file mode 100644 index 00000000..677e5456 --- /dev/null +++ b/piker/brokers/kucoin.py @@ -0,0 +1,137 @@ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" + +""" + +from typing import Any, Optional, Literal +from contextlib import asynccontextmanager as acm + +import asks +import tractor +import trio +from trio_typing import TaskStatus +from fuzzywuzzy import process as fuzzy +from cryptofeed.defines import ( + KUCOIN, + TRADES, + L2_BOOK +) +from piker.data.cryptofeeds import mk_stream_quotes +from piker._cacheables import open_cached_client +from piker.log import get_logger +from ._util import SymbolNotFound + +_spawn_kwargs = { + "infect_asyncio": True, +} + +log = get_logger(__name__) + +class Client: + def __init__(self) -> None: + self._pairs: dict[str, Any] = None + # TODO" Shouldn't have to write kucoin twice here + + # config = get_config('kucoin').get('kucoin', {}) + # + # if ('key_id' in config) and ('key_secret' in config): + # self._key_id = config['key_id'] + # self._key_secret = config['key_secret'] + # + # else: + # self._key_id = None + # self._key_secret = None + + async def symbol_info( + self, + sym: str = None, + ) -> dict[str, Any]: + + if self._pairs: + return self._pairs + + entries = await self.request("GET", "/symbols") + if not entries: + raise SymbolNotFound(f'{sym} not found') + + syms = {item['name']: item for item in entries} + return syms + + + async def request(self, action: Literal["POST", "GET", "PUT", "DELETE"], route: str): + api_url = f"https://api.kucoin.com/api/v2{route}" + res = await asks.request(action, api_url) + return res.json()['data'] + + async def cache_symbols( + self, + ) -> dict: + if not self._pairs: + self._pairs = await self.symbol_info() + + return self._pairs + + async def search_symbols( + self, + pattern: str, + limit: int = 30, + ) -> dict[str, Any]: + data = await self.symbol_info() + + matches = fuzzy.extractBests(pattern, data, score_cutoff=35, limit=limit) + # repack in dict form + return {item[0]["instrument_name"].lower(): item[0] for item in matches} + + +@acm +async def get_client(): + client = Client() + # Do we need to open a nursery here? + await client.cache_symbols() + yield client + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, +): + async with open_cached_client("kucoin") as client: + # load all symbols locally for fast search + cache = await client.cache_symbols() + await ctx.started() + + # async with ctx.open_stream() as stream: + # async for pattern in stream: + # # repack in dict form + # await stream.send(await client.search_symbols(pattern)) + + +async def stream_quotes( + send_chan: trio.abc.SendChannel, + symbols: list[str], + feed_is_live: trio.Event, + loglevel: str = None, + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, +): + return await mk_stream_quotes( + KUCOIN, + [L2_BOOK], + send_chan, + symbols, + feed_is_live, + loglevel, + task_status, + ) diff --git a/piker/data/cryptofeeds.py b/piker/data/cryptofeeds.py new file mode 100644 index 00000000..931207c6 --- /dev/null +++ b/piker/data/cryptofeeds.py @@ -0,0 +1,313 @@ +# piker: trading gear for hackers +# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +""" +from os import walk +from contextlib import asynccontextmanager as acm +from datetime import datetime +from types import ModuleType +from typing import Any, Literal, Optional, Callable +import time +from functools import partial + +import trio +from trio_typing import TaskStatus +from tractor.trionics import broadcast_receiver, maybe_open_context +import pendulum +from fuzzywuzzy import process as fuzzy +import numpy as np +import tractor +from tractor import to_asyncio +from cryptofeed import FeedHandler +from cryptofeed.defines import TRADES, L2_BOOK +from cryptofeed.symbols import Symbol +import asyncio + +from piker._cacheables import open_cached_client +from piker.log import get_logger, get_console_log +from piker.data import ShmArray +from piker.brokers._util import ( + BrokerError, + DataUnavailable, +) +from piker.pp import config + +_spawn_kwargs = { + "infect_asyncio": True, +} + +log = get_logger(__name__) + + +def deribit_timestamp(when): + return int((when.timestamp() * 1000) + (when.microsecond / 1000)) + + +# def str_to_cb_sym(name: str) -> Symbol: +# base, strike_price, expiry_date, option_type = name.split("-") +# +# quote = base +# +# if option_type == "put": +# option_type = PUT +# elif option_type == "call": +# option_type = CALL +# else: +# raise Exception("Couldn't parse option type") +# +# return Symbol( +# base, +# quote, +# type=OPTION, +# strike_price=strike_price, +# option_type=option_type, +# expiry_date=expiry_date, +# expiry_normalize=False, +# ) +# + + +def piker_sym_to_cb_sym(symbol) -> Symbol: + return Symbol( + base=symbol['baseCurrency'], + quote=symbol['quoteCurrency'] + ) + + +def cb_sym_to_deribit_inst(sym: Symbol): + # cryptofeed normalized + cb_norm = ["F", "G", "H", "J", "K", "M", "N", "Q", "U", "V", "X", "Z"] + + # deribit specific + months = [ + "JAN", + "FEB", + "MAR", + "APR", + "MAY", + "JUN", + "JUL", + "AUG", + "SEP", + "OCT", + "NOV", + "DEC", + ] + + exp = sym.expiry_date + + # YYMDD + # 01234 + year, month, day = (exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) + + otype = "C" if sym.option_type == CALL else "P" + + return f"{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}" + + +def get_config(exchange: str) -> dict[str, Any]: + conf, path = config.load() + + section = conf.get(exchange.lower()) + breakpoint() + + # TODO: document why we send this, basically because logging params for cryptofeed + conf["log"] = {} + conf["log"]["disabled"] = True + + if section is None: + log.warning(f"No config section found for deribit in {path}") + + return conf + + +async def mk_stream_quotes( + exchange: str, + channels: list[str], + send_chan: trio.abc.SendChannel, + symbols: list[str], + feed_is_live: trio.Event, + loglevel: str = None, + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, +) -> None: + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + sym = symbols[0] + + async with (open_cached_client(exchange.lower()) as client, send_chan as send_chan): + # create init message here + + cache = await client.cache_symbols() + + cf_syms = {} + for key, value in cache.items(): + cf_sym = key.lower().replace('-', '') + cf_syms[cf_sym] = value + + cf_sym = cf_syms[sym] + + async with maybe_open_price_feed(cf_sym, exchange, channels) as stream: + + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': { + 'asset_type': 'crypto', + 'price_tick_size': 0.0005 + }, + 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, + }, + } + + # broker schemas to validate symbol data + quote_msg = {"symbol": cf_sym["name"], "last": 0, "ticks": []} + + task_status.started((init_msgs, quote_msg)) + + feed_is_live.set() + + async for typ, quote in stream: + topic = quote["symbol"] + await send_chan.send({topic: quote}) + +@acm +async def maybe_open_price_feed( + symbol, exchange, channels +) -> trio.abc.ReceiveStream: + # TODO: add a predicate to maybe_open_context + # TODO: ensure we can dynamically pass down args here + async with maybe_open_context( + acm_func=open_price_feed, + kwargs={ + "symbol": symbol, + "exchange": exchange, + "channels": channels, + }, + key=symbol['name'], + ) as (cache_hit, feed): + if cache_hit: + yield broadcast_receiver(feed, 10) + else: + yield feed + + +@acm +async def open_price_feed(symbol: str, exchange, channels) -> trio.abc.ReceiveStream: + async with maybe_open_feed_handler(exchange) as fh: + async with to_asyncio.open_channel_from( + partial(aio_price_feed_relay, exchange, channels, fh, symbol) + ) as (first, chan): + yield chan + + +@acm +async def maybe_open_feed_handler(exchange: str) -> trio.abc.ReceiveStream: + async with maybe_open_context( + acm_func=open_feed_handler, + kwargs={ + 'exchange': exchange, + }, + key="feedhandler", + ) as (cache_hit, fh): + yield fh + + +@acm +async def open_feed_handler(exchange: str): + fh = FeedHandler(config=get_config(exchange)) + yield fh + await to_asyncio.run_task(fh.stop_async) + + +async def aio_price_feed_relay( + exchange: str, + channels: list[str], + fh: FeedHandler, + symbol: Symbol, + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, +) -> None: + async def _trade(data: dict, receipt_timestamp): + breakpoint() + # to_trio.send_nowait( + # ( + # "trade", + # { + # "symbol": cb_sym_to_deribit_inst( + # str_to_cb_sym(data.symbol) + # ).lower(), + # "last": data, + # "broker_ts": time.time(), + # "data": data.to_dict(), + # "receipt": receipt_timestamp, + # }, + # ) + # ) + + async def _l1(data: dict, receipt_timestamp): + breakpoint() + # to_trio.send_nowait( + # ( + # "l1", + # { + # "symbol": cb_sym_to_deribit_inst( + # str_to_cb_sym(data.symbol) + # ).lower(), + # "ticks": [ + # { + # "type": "bid", + # "price": float(data.bid_price), + # "size": float(data.bid_size), + # }, + # { + # "type": "bsize", + # "price": float(data.bid_price), + # "size": float(data.bid_size), + # }, + # { + # "type": "ask", + # "price": float(data.ask_price), + # "size": float(data.ask_size), + # }, + # { + # "type": "asize", + # "price": float(data.ask_price), + # "size": float(data.ask_size), + # }, + # ], + # }, + # ) + # ) + fh.add_feed( + exchange, + channels=channels, + symbols=[piker_sym_to_cb_sym(symbol)], + callbacks={TRADES: _trade, L2_BOOK: _l1}, + ) + + if not fh.running: + fh.run(start_loop=False, install_signal_handlers=False) + + # sync with trio + to_trio.send_nowait(None) + + await asyncio.sleep(float("inf"))