parent
							
								
									609b91e848
								
							
						
					
					
						commit
						5fdec8012d
					
				|  | @ -0,0 +1,137 @@ | |||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| """ | ||||
| 
 | ||||
| """ | ||||
| 
 | ||||
| from typing import Any, Optional, Literal | ||||
| from contextlib import asynccontextmanager as acm | ||||
| 
 | ||||
| import asks | ||||
| import tractor | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| from fuzzywuzzy import process as fuzzy | ||||
| from cryptofeed.defines import ( | ||||
|     KUCOIN, | ||||
|     TRADES, | ||||
|     L2_BOOK | ||||
| ) | ||||
| from piker.data.cryptofeeds import mk_stream_quotes | ||||
| from piker._cacheables import open_cached_client | ||||
| from piker.log import get_logger | ||||
| from ._util import SymbolNotFound | ||||
| 
 | ||||
| _spawn_kwargs = { | ||||
|     "infect_asyncio": True, | ||||
| } | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| class Client: | ||||
|     def __init__(self) -> None: | ||||
|         self._pairs: dict[str, Any] = None | ||||
|         # TODO" Shouldn't have to write kucoin twice here | ||||
| 
 | ||||
|         # config = get_config('kucoin').get('kucoin', {}) | ||||
|         # | ||||
|         # if ('key_id' in config) and ('key_secret' in config): | ||||
|         #     self._key_id = config['key_id'] | ||||
|         #     self._key_secret = config['key_secret'] | ||||
|         # | ||||
|         # else: | ||||
|         #     self._key_id = None | ||||
|         #     self._key_secret = None | ||||
| 
 | ||||
|     async def symbol_info( | ||||
|         self, | ||||
|         sym: str = None,   | ||||
|     ) -> dict[str, Any]: | ||||
| 
 | ||||
|         if self._pairs: | ||||
|             return self._pairs | ||||
| 
 | ||||
|         entries = await self.request("GET", "/symbols") | ||||
|         if not entries: | ||||
|             raise SymbolNotFound(f'{sym} not found') | ||||
|          | ||||
|         syms = {item['name']: item for item in entries} | ||||
|         return syms | ||||
|   | ||||
| 
 | ||||
|     async def request(self, action: Literal["POST", "GET", "PUT", "DELETE"], route: str): | ||||
|         api_url = f"https://api.kucoin.com/api/v2{route}" | ||||
|         res = await asks.request(action, api_url) | ||||
|         return res.json()['data'] | ||||
| 
 | ||||
|     async def cache_symbols( | ||||
|         self, | ||||
|     ) -> dict: | ||||
|         if not self._pairs: | ||||
|             self._pairs = await self.symbol_info() | ||||
| 
 | ||||
|         return self._pairs | ||||
| 
 | ||||
|     async def search_symbols( | ||||
|         self, | ||||
|         pattern: str, | ||||
|         limit: int = 30, | ||||
|     ) -> dict[str, Any]: | ||||
|         data = await self.symbol_info() | ||||
| 
 | ||||
|         matches = fuzzy.extractBests(pattern, data, score_cutoff=35, limit=limit) | ||||
|         # repack in dict form | ||||
|         return {item[0]["instrument_name"].lower(): item[0] for item in matches} | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def get_client(): | ||||
|     client = Client() | ||||
|     # Do we need to open a nursery here? | ||||
|     await client.cache_symbols() | ||||
|     yield client | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def open_symbol_search( | ||||
|     ctx: tractor.Context, | ||||
| ): | ||||
|     async with open_cached_client("kucoin") as client: | ||||
|         # load all symbols locally for fast search | ||||
|         cache = await client.cache_symbols() | ||||
|         await ctx.started() | ||||
| 
 | ||||
|         # async with ctx.open_stream() as stream: | ||||
|         #     async for pattern in stream: | ||||
|         #         # repack in dict form | ||||
|         #         await stream.send(await client.search_symbols(pattern)) | ||||
| 
 | ||||
| 
 | ||||
| async def stream_quotes( | ||||
|     send_chan: trio.abc.SendChannel, | ||||
|     symbols: list[str], | ||||
|     feed_is_live: trio.Event, | ||||
|     loglevel: str = None, | ||||
|     # startup sync | ||||
|     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, | ||||
| ): | ||||
|     return await mk_stream_quotes( | ||||
|         KUCOIN, | ||||
|         [L2_BOOK], | ||||
|         send_chan, | ||||
|         symbols, | ||||
|         feed_is_live, | ||||
|         loglevel, | ||||
|         task_status, | ||||
|     ) | ||||
|  | @ -0,0 +1,313 @@ | |||
| # piker: trading gear for hackers | ||||
| # Copyright (C) Guillermo Rodriguez (in stewardship for piker0) | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| """ | ||||
| """ | ||||
| from os import walk | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from datetime import datetime | ||||
| from types import ModuleType | ||||
| from typing import Any, Literal, Optional, Callable | ||||
| import time | ||||
| from functools import partial | ||||
| 
 | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| from tractor.trionics import broadcast_receiver, maybe_open_context | ||||
| import pendulum | ||||
| from fuzzywuzzy import process as fuzzy | ||||
| import numpy as np | ||||
| import tractor | ||||
| from tractor import to_asyncio | ||||
| from cryptofeed import FeedHandler | ||||
| from cryptofeed.defines import TRADES, L2_BOOK | ||||
| from cryptofeed.symbols import Symbol | ||||
| import asyncio | ||||
| 
 | ||||
| from piker._cacheables import open_cached_client | ||||
| from piker.log import get_logger, get_console_log | ||||
| from piker.data import ShmArray | ||||
| from piker.brokers._util import ( | ||||
|     BrokerError, | ||||
|     DataUnavailable, | ||||
| ) | ||||
| from piker.pp import config | ||||
| 
 | ||||
| _spawn_kwargs = { | ||||
|     "infect_asyncio": True, | ||||
| } | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| def deribit_timestamp(when): | ||||
|     return int((when.timestamp() * 1000) + (when.microsecond / 1000)) | ||||
| 
 | ||||
| 
 | ||||
| # def str_to_cb_sym(name: str) -> Symbol: | ||||
| #     base, strike_price, expiry_date, option_type = name.split("-") | ||||
| # | ||||
| #     quote = base | ||||
| # | ||||
| #     if option_type == "put": | ||||
| #         option_type = PUT | ||||
| #     elif option_type == "call": | ||||
| #         option_type = CALL | ||||
| #     else: | ||||
| #         raise Exception("Couldn't parse option type") | ||||
| # | ||||
| #     return Symbol( | ||||
| #         base, | ||||
| #         quote, | ||||
| #         type=OPTION, | ||||
| #         strike_price=strike_price, | ||||
| #         option_type=option_type, | ||||
| #         expiry_date=expiry_date, | ||||
| #         expiry_normalize=False, | ||||
| #     ) | ||||
| # | ||||
| 
 | ||||
| 
 | ||||
| def piker_sym_to_cb_sym(symbol) -> Symbol: | ||||
|     return Symbol( | ||||
|         base=symbol['baseCurrency'], | ||||
|         quote=symbol['quoteCurrency'] | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def cb_sym_to_deribit_inst(sym: Symbol): | ||||
|     # cryptofeed normalized | ||||
|     cb_norm = ["F", "G", "H", "J", "K", "M", "N", "Q", "U", "V", "X", "Z"] | ||||
| 
 | ||||
|     # deribit specific | ||||
|     months = [ | ||||
|         "JAN", | ||||
|         "FEB", | ||||
|         "MAR", | ||||
|         "APR", | ||||
|         "MAY", | ||||
|         "JUN", | ||||
|         "JUL", | ||||
|         "AUG", | ||||
|         "SEP", | ||||
|         "OCT", | ||||
|         "NOV", | ||||
|         "DEC", | ||||
|     ] | ||||
| 
 | ||||
|     exp = sym.expiry_date | ||||
| 
 | ||||
|     # YYMDD | ||||
|     # 01234 | ||||
|     year, month, day = (exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) | ||||
| 
 | ||||
|     otype = "C" if sym.option_type == CALL else "P" | ||||
| 
 | ||||
|     return f"{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}" | ||||
| 
 | ||||
| 
 | ||||
| def get_config(exchange: str) -> dict[str, Any]: | ||||
|     conf, path = config.load() | ||||
| 
 | ||||
|     section = conf.get(exchange.lower()) | ||||
|     breakpoint() | ||||
| 
 | ||||
|     # TODO: document why we send this, basically because logging params for cryptofeed | ||||
|     conf["log"] = {} | ||||
|     conf["log"]["disabled"] = True | ||||
| 
 | ||||
|     if section is None: | ||||
|         log.warning(f"No config section found for deribit in {path}") | ||||
| 
 | ||||
|     return conf | ||||
| 
 | ||||
| 
 | ||||
| async def mk_stream_quotes( | ||||
|     exchange: str, | ||||
|     channels: list[str], | ||||
|     send_chan: trio.abc.SendChannel, | ||||
|     symbols: list[str], | ||||
|     feed_is_live: trio.Event, | ||||
|     loglevel: str = None, | ||||
|     # startup sync | ||||
|     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, | ||||
| ) -> None: | ||||
| 
 | ||||
|     # XXX: required to propagate ``tractor`` loglevel to piker logging | ||||
|     get_console_log(loglevel or tractor.current_actor().loglevel) | ||||
| 
 | ||||
|     sym = symbols[0] | ||||
| 
 | ||||
|     async with (open_cached_client(exchange.lower()) as client, send_chan as send_chan): | ||||
|         # create init message here | ||||
| 
 | ||||
|         cache = await client.cache_symbols() | ||||
| 
 | ||||
|         cf_syms = {} | ||||
|         for key, value in cache.items(): | ||||
|             cf_sym = key.lower().replace('-', '') | ||||
|             cf_syms[cf_sym] = value  | ||||
| 
 | ||||
|         cf_sym = cf_syms[sym] | ||||
| 
 | ||||
|         async with maybe_open_price_feed(cf_sym, exchange, channels) as stream: | ||||
| 
 | ||||
|             init_msgs = { | ||||
|                 # pass back token, and bool, signalling if we're the writer | ||||
|                 # and that history has been written | ||||
|                 sym: { | ||||
|                     'symbol_info': { | ||||
|                         'asset_type': 'crypto', | ||||
|                         'price_tick_size': 0.0005 | ||||
|                     }, | ||||
|                     'shm_write_opts': {'sum_tick_vml': False}, | ||||
|                     'fqsn': sym, | ||||
|                 }, | ||||
|             } | ||||
| 
 | ||||
|             # broker schemas to validate symbol data | ||||
|             quote_msg = {"symbol": cf_sym["name"], "last": 0, "ticks": []} | ||||
| 
 | ||||
|             task_status.started((init_msgs, quote_msg)) | ||||
| 
 | ||||
|             feed_is_live.set() | ||||
| 
 | ||||
|             async for typ, quote in stream: | ||||
|                 topic = quote["symbol"] | ||||
|                 await send_chan.send({topic: quote}) | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_price_feed( | ||||
|     symbol, exchange, channels | ||||
| ) -> trio.abc.ReceiveStream: | ||||
|     # TODO: add a predicate to maybe_open_context | ||||
|     # TODO: ensure we can dynamically pass down args here | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_price_feed, | ||||
|         kwargs={ | ||||
|             "symbol": symbol, | ||||
|             "exchange": exchange, | ||||
|             "channels": channels, | ||||
|         }, | ||||
|         key=symbol['name'], | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|         else: | ||||
|             yield feed | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_price_feed(symbol: str, exchange, channels) -> trio.abc.ReceiveStream: | ||||
|     async with maybe_open_feed_handler(exchange) as fh: | ||||
|         async with to_asyncio.open_channel_from( | ||||
|             partial(aio_price_feed_relay, exchange, channels, fh, symbol) | ||||
|         ) as (first, chan): | ||||
|             yield chan | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_feed_handler(exchange: str) -> trio.abc.ReceiveStream: | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_feed_handler, | ||||
|         kwargs={ | ||||
|             'exchange': exchange, | ||||
|         }, | ||||
|         key="feedhandler", | ||||
|     ) as (cache_hit, fh): | ||||
|         yield fh | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_feed_handler(exchange: str): | ||||
|     fh = FeedHandler(config=get_config(exchange)) | ||||
|     yield fh | ||||
|     await to_asyncio.run_task(fh.stop_async) | ||||
| 
 | ||||
| 
 | ||||
| async def aio_price_feed_relay( | ||||
|     exchange: str, | ||||
|     channels: list[str], | ||||
|     fh: FeedHandler, | ||||
|     symbol: Symbol, | ||||
|     from_trio: asyncio.Queue, | ||||
|     to_trio: trio.abc.SendChannel, | ||||
| ) -> None: | ||||
|     async def _trade(data: dict, receipt_timestamp): | ||||
|         breakpoint() | ||||
|         # to_trio.send_nowait( | ||||
|         #     ( | ||||
|         #         "trade", | ||||
|         #         { | ||||
|         #             "symbol": cb_sym_to_deribit_inst( | ||||
|         #                 str_to_cb_sym(data.symbol) | ||||
|         #             ).lower(), | ||||
|         #             "last": data, | ||||
|         #             "broker_ts": time.time(), | ||||
|         #             "data": data.to_dict(), | ||||
|         #             "receipt": receipt_timestamp, | ||||
|         #         }, | ||||
|         #     ) | ||||
|         # ) | ||||
| 
 | ||||
|     async def _l1(data: dict, receipt_timestamp): | ||||
|         breakpoint() | ||||
|         # to_trio.send_nowait( | ||||
|         #     ( | ||||
|         #         "l1", | ||||
|         #         { | ||||
|         #             "symbol": cb_sym_to_deribit_inst( | ||||
|         #                 str_to_cb_sym(data.symbol) | ||||
|         #             ).lower(), | ||||
|         #             "ticks": [ | ||||
|         #                 { | ||||
|         #                     "type": "bid", | ||||
|         #                     "price": float(data.bid_price), | ||||
|         #                     "size": float(data.bid_size), | ||||
|         #                 }, | ||||
|         #                 { | ||||
|         #                     "type": "bsize", | ||||
|         #                     "price": float(data.bid_price), | ||||
|         #                     "size": float(data.bid_size), | ||||
|         #                 }, | ||||
|         #                 { | ||||
|         #                     "type": "ask", | ||||
|         #                     "price": float(data.ask_price), | ||||
|         #                     "size": float(data.ask_size), | ||||
|         #                 }, | ||||
|         #                 { | ||||
|         #                     "type": "asize", | ||||
|         #                     "price": float(data.ask_price), | ||||
|         #                     "size": float(data.ask_size), | ||||
|         #                 }, | ||||
|         #             ], | ||||
|         #         }, | ||||
|         #     ) | ||||
|         # ) | ||||
|     fh.add_feed( | ||||
|         exchange, | ||||
|         channels=channels, | ||||
|         symbols=[piker_sym_to_cb_sym(symbol)], | ||||
|         callbacks={TRADES: _trade, L2_BOOK: _l1}, | ||||
|     ) | ||||
| 
 | ||||
|     if not fh.running: | ||||
|         fh.run(start_loop=False, install_signal_handlers=False) | ||||
| 
 | ||||
|     # sync with trio | ||||
|     to_trio.send_nowait(None) | ||||
| 
 | ||||
|     await asyncio.sleep(float("inf")) | ||||
		Loading…
	
		Reference in New Issue