WIP - setup basic history and streaming client
							parent
							
								
									c96d4387c5
								
							
						
					
					
						commit
						ad9d645782
					
				|  | @ -24,7 +24,7 @@ __brokers__ = [ | ||||||
|     'binance', |     'binance', | ||||||
|     'ib', |     'ib', | ||||||
|     'kraken', |     'kraken', | ||||||
| 
 |     'kucoin' | ||||||
|     # broken but used to work |     # broken but used to work | ||||||
|     # 'questrade', |     # 'questrade', | ||||||
|     # 'robinhood', |     # 'robinhood', | ||||||
|  | @ -35,7 +35,6 @@ __brokers__ = [ | ||||||
|     # iex |     # iex | ||||||
| 
 | 
 | ||||||
|     # deribit |     # deribit | ||||||
|     # kucoin |  | ||||||
|     # bitso |     # bitso | ||||||
| ] | ] | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -15,89 +15,196 @@ | ||||||
| 
 | 
 | ||||||
| """ | """ | ||||||
| 
 | 
 | ||||||
|  | from dataclasses import field | ||||||
| from typing import Any, Optional, Literal | from typing import Any, Optional, Literal | ||||||
| from contextlib import asynccontextmanager as acm | from contextlib import asynccontextmanager as acm | ||||||
|  | from datetime import datetime | ||||||
|  | import time | ||||||
|  | import math | ||||||
|  | from os import path, walk | ||||||
| 
 | 
 | ||||||
| import asks | import asks | ||||||
| import tractor | import tractor | ||||||
| import trio | import trio | ||||||
| from trio_typing import TaskStatus | from trio_typing import TaskStatus | ||||||
| from fuzzywuzzy import process as fuzzy | from fuzzywuzzy import process as fuzzy | ||||||
| from cryptofeed.defines import ( | from cryptofeed.defines import KUCOIN, TRADES, L2_BOOK | ||||||
|     KUCOIN, | from cryptofeed.symbols import Symbol | ||||||
|     TRADES, | import pendulum | ||||||
|     L2_BOOK | import numpy as np | ||||||
|  | from piker.data.cryptofeeds import ( | ||||||
|  |     fqsn_to_cf_sym, | ||||||
|  |     mk_stream_quotes,  | ||||||
|  |     get_config, | ||||||
| ) | ) | ||||||
| from piker.data.cryptofeeds import mk_stream_quotes |  | ||||||
| from piker._cacheables import open_cached_client | from piker._cacheables import open_cached_client | ||||||
| from piker.log import get_logger | from piker.log import get_logger | ||||||
| from ._util import SymbolNotFound | from piker.pp import config | ||||||
|  | from ._util import DataUnavailable | ||||||
| 
 | 
 | ||||||
| _spawn_kwargs = { | _spawn_kwargs = { | ||||||
|     "infect_asyncio": True, |     "infect_asyncio": True, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
|  | _ohlc_dtype = [ | ||||||
|  |     ('index', int), | ||||||
|  |     ('time', int), | ||||||
|  |     ('open', float), | ||||||
|  |     ('high', float), | ||||||
|  |     ('low', float), | ||||||
|  |     ('close', float), | ||||||
|  |     ('volume', float), | ||||||
|  |     ('bar_wap', float),  # will be zeroed by sampler if not filled | ||||||
|  | ] | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| class Client: | class Client: | ||||||
|     def __init__(self) -> None: |     def __init__(self) -> None: | ||||||
|         self._pairs: dict[str, Any] = None |         self._pairs: dict[str, Symbol] = {} | ||||||
|  |         self._bars: list[list] = [] | ||||||
|         # TODO" Shouldn't have to write kucoin twice here |         # TODO" Shouldn't have to write kucoin twice here | ||||||
| 
 | 
 | ||||||
|         # config = get_config('kucoin').get('kucoin', {}) |         config = get_config("kucoin").get("kucoin", {}) | ||||||
|         # |         # | ||||||
|         # if ('key_id' in config) and ('key_secret' in config): |         if ("key_id" in config) and ("key_secret" in config): | ||||||
|         #     self._key_id = config['key_id'] |             self._key_id = config["key_id"] | ||||||
|         #     self._key_secret = config['key_secret'] |             self._key_secret = config["key_secret"] | ||||||
|         # | 
 | ||||||
|         # else: |         else: | ||||||
|         #     self._key_id = None |             self._key_id = None | ||||||
|         #     self._key_secret = None |             self._key_secret = None | ||||||
|     async def request(self, action: Literal["POST", "GET", "PUT", "DELETE"], route: str, api_v: str = 'v2'): | 
 | ||||||
|  |     async def _request( | ||||||
|  |         self, | ||||||
|  |         action: Literal["POST", "GET", "PUT", "DELETE"], | ||||||
|  |         route: str, | ||||||
|  |         api_v: str = "v2", | ||||||
|  |     ) -> Any: | ||||||
|         api_url = f"https://api.kucoin.com/api/{api_v}{route}" |         api_url = f"https://api.kucoin.com/api/{api_v}{route}" | ||||||
|         res = await asks.request(action, api_url) |         res = await asks.request(action, api_url) | ||||||
|         return res.json()['data'] |         #breakpoint() | ||||||
|  |         try: | ||||||
|  |             return res.json()["data"] | ||||||
|  |         except KeyError as e: | ||||||
|  |             print(f'KUCOIN ERROR: {res.json()["msg"]}') | ||||||
|  |             breakpoint() | ||||||
| 
 | 
 | ||||||
|     async def symbol_info( |     async def get_pairs( | ||||||
|         self, |         self, | ||||||
|         sym: str = None,   |  | ||||||
|     ) -> dict[str, Any]: |     ) -> dict[str, Any]: | ||||||
| 
 |  | ||||||
|         if self._pairs: |         if self._pairs: | ||||||
|             return self._pairs |             return self._pairs | ||||||
| 
 | 
 | ||||||
|         entries = await self.request("GET", "/symbols") |         entries = await self._request("GET", "/symbols") | ||||||
|         if not entries: |         syms = {item["name"]: item for item in entries} | ||||||
|             raise SymbolNotFound(f'{sym} not found') |  | ||||||
|          |  | ||||||
|         syms = {item['name']: item for item in entries} |  | ||||||
|         return syms |         return syms | ||||||
| 
 | 
 | ||||||
|     async def cache_symbols( |     async def cache_pairs( | ||||||
|         self, |         self, | ||||||
|     ) -> dict: |         normalize: bool = True, | ||||||
|  |     ) -> dict[str, Symbol]: | ||||||
|         if not self._pairs: |         if not self._pairs: | ||||||
|             self._pairs = await self.symbol_info() |             self._pairs = await self.get_pairs() | ||||||
| 
 |         if normalize: | ||||||
|  |             self._pairs = self.normalize_pairs(self._pairs) | ||||||
|         return self._pairs |         return self._pairs | ||||||
| 
 | 
 | ||||||
|  |     def normalize_pairs(self, pairs: dict[str, Symbol]) -> dict[str, Symbol]: | ||||||
|  |         """ | ||||||
|  |         Map crypfeeds symbols to fqsn strings | ||||||
|  | 
 | ||||||
|  |         """ | ||||||
|  |         norm_pairs = {} | ||||||
|  | 
 | ||||||
|  |         for key, value in pairs.items(): | ||||||
|  |             fqsn = key.lower().replace("-", "") | ||||||
|  |             norm_pairs[fqsn] = value | ||||||
|  | 
 | ||||||
|  |         return norm_pairs | ||||||
|  | 
 | ||||||
|     async def search_symbols( |     async def search_symbols( | ||||||
|         self, |         self, | ||||||
|         pattern: str, |         pattern: str, | ||||||
|         limit: int = 30, |         limit: int = 30, | ||||||
|     ) -> dict[str, Any]: |     ) -> dict[str, Any]: | ||||||
|         data = await self.symbol_info() |         data = await self.get_pairs() | ||||||
| 
 | 
 | ||||||
|         matches = fuzzy.extractBests(pattern, data, score_cutoff=35, limit=limit) |         matches = fuzzy.extractBests(pattern, data, score_cutoff=35, limit=limit) | ||||||
|         # repack in dict form |         # repack in dict form | ||||||
|         return {item[0]["instrument_name"].lower(): item[0] for item in matches} |         return {item[0]["name"].lower(): item[0] for item in matches} | ||||||
|  | 
 | ||||||
|  |     async def get_bars( | ||||||
|  |         self, | ||||||
|  |         fqsn: str, | ||||||
|  |         start_dt: Optional[datetime] = None, | ||||||
|  |         end_dt: Optional[datetime] = None, | ||||||
|  |         limit: int = 1000, | ||||||
|  |         as_np: bool = True, | ||||||
|  |         type: str = "1min", | ||||||
|  |     ): | ||||||
|  |         if len(self._bars): | ||||||
|  |             return self._bars | ||||||
|  | 
 | ||||||
|  |         if end_dt is None: | ||||||
|  |             end_dt = pendulum.now("UTC").add(minutes=1) | ||||||
|  | 
 | ||||||
|  |         if start_dt is None: | ||||||
|  |             start_dt = end_dt.start_of("minute").subtract(minutes=limit) | ||||||
|  | 
 | ||||||
|  |         # Format datetime to unix | ||||||
|  |         start_dt = math.trunc(time.mktime(start_dt.timetuple())) | ||||||
|  |         end_dt = math.trunc(time.mktime(end_dt.timetuple())) | ||||||
|  |         kucoin_sym = fqsn_to_cf_sym(fqsn, self._pairs)  | ||||||
|  |         url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}" | ||||||
|  | 
 | ||||||
|  |         bars = await self._request( | ||||||
|  |             "GET", | ||||||
|  |             url, | ||||||
|  |             api_v="v1", | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         new_bars = [] | ||||||
|  |         for i, bar in enumerate(bars[::-1]): | ||||||
|  |             # TODO: implement struct/typecasting/validation here | ||||||
|  |              | ||||||
|  |             data = { | ||||||
|  |                 'index': i, | ||||||
|  |                 'time': bar[0], | ||||||
|  |                 'open': bar[1], | ||||||
|  |                 'close': bar[2], | ||||||
|  |                 'high': bar[3], | ||||||
|  |                 'low': bar[4], | ||||||
|  |                 'volume': bar[5], | ||||||
|  |                 'amount': bar [6], | ||||||
|  |                 'bar_wap': 0.0, | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             row = [] | ||||||
|  |             for j, (field_name, field_type) in enumerate(_ohlc_dtype): | ||||||
|  |   | ||||||
|  |                 value = data[field_name]  | ||||||
|  |                 match field_name: | ||||||
|  |                     case 'index' | 'time': | ||||||
|  |                         row.append(int(value)) | ||||||
|  |                     # case 'time': | ||||||
|  |                         # dt_from_unix_ts = datetime.utcfromtimestamp(int(value)) | ||||||
|  |                         # # convert unix time to epoch seconds | ||||||
|  |                         # row.append(int(dt_from_unix_ts.timestamp())) | ||||||
|  |                     case _:  | ||||||
|  |                         row.append(float(value)) | ||||||
|  | 
 | ||||||
|  |             new_bars.append(tuple(row)) | ||||||
|  |          | ||||||
|  |         self._bars = array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars | ||||||
|  |         return array | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def get_client(): | async def get_client(): | ||||||
|     client = Client() |     client = Client() | ||||||
|     # Do we need to open a nursery here? |     # Do we need to open a nursery here? | ||||||
|     await client.cache_symbols() |     await client.cache_pairs() | ||||||
|     yield client |     yield client | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -107,13 +214,13 @@ async def open_symbol_search( | ||||||
| ): | ): | ||||||
|     async with open_cached_client("kucoin") as client: |     async with open_cached_client("kucoin") as client: | ||||||
|         # load all symbols locally for fast search |         # load all symbols locally for fast search | ||||||
|         cache = await client.cache_symbols() |         cache = await client.cache_pairs() | ||||||
|         await ctx.started() |         await ctx.started() | ||||||
| 
 | 
 | ||||||
|         # async with ctx.open_stream() as stream: |         async with ctx.open_stream() as stream: | ||||||
|         #     async for pattern in stream: |             async for pattern in stream: | ||||||
|         #         # repack in dict form |                 # repack in dict form | ||||||
|         #         await stream.send(await client.search_symbols(pattern)) |                 await stream.send(await client.search_symbols(pattern)) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def stream_quotes( | async def stream_quotes( | ||||||
|  | @ -126,7 +233,7 @@ async def stream_quotes( | ||||||
| ): | ): | ||||||
|     return await mk_stream_quotes( |     return await mk_stream_quotes( | ||||||
|         KUCOIN, |         KUCOIN, | ||||||
|         [L2_BOOK], |         [L2_BOOK, TRADES], | ||||||
|         send_chan, |         send_chan, | ||||||
|         symbols, |         symbols, | ||||||
|         feed_is_live, |         feed_is_live, | ||||||
|  | @ -134,5 +241,44 @@ async def stream_quotes( | ||||||
|         task_status, |         task_status, | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
| async def open_history_client(): |  | ||||||
| 
 | 
 | ||||||
|  | @acm | ||||||
|  | async def open_history_client( | ||||||
|  |     symbol: str, | ||||||
|  |     type: str = "1m", | ||||||
|  | ): | ||||||
|  |     async with open_cached_client("kucoin") as client: | ||||||
|  |         # call bars on kucoin | ||||||
|  |         async def get_ohlc_history( | ||||||
|  |             timeframe: float, | ||||||
|  |             end_dt: datetime | None = None, | ||||||
|  |             start_dt: datetime | None = None, | ||||||
|  |         ) -> tuple[ | ||||||
|  |             np.ndarray, | ||||||
|  |             datetime | None,  # start | ||||||
|  |             datetime | None,  # end | ||||||
|  |         ]: | ||||||
|  |             if timeframe != 60: | ||||||
|  |                 raise DataUnavailable('Only 1m bars are supported') | ||||||
|  | 
 | ||||||
|  |             array = await client.get_bars( | ||||||
|  |                 symbol, | ||||||
|  |                 start_dt=start_dt, | ||||||
|  |                 end_dt=end_dt, | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             times = array['time'] | ||||||
|  | 
 | ||||||
|  |             if ( | ||||||
|  |                 end_dt is None | ||||||
|  |             ): | ||||||
|  |                 inow = round(time.time()) | ||||||
|  |                 print(f'difference in time between load and processing {inow - times[-1]}') | ||||||
|  |                 if (inow - times[-1]) > 60: | ||||||
|  |                     await tractor.breakpoint() | ||||||
|  | 
 | ||||||
|  |             start_dt = pendulum.from_timestamp(times[0]) | ||||||
|  |             end_dt = pendulum.from_timestamp(times[-1]) | ||||||
|  |             return array, start_dt, end_dt | ||||||
|  | 
 | ||||||
|  |         yield get_ohlc_history, {'erlangs': 3, 'rate': 3} | ||||||
|  |  | ||||||
|  | @ -1,5 +1,5 @@ | ||||||
| # piker: trading gear for hackers | # piker: trading gear for hackers | ||||||
| # Copyright (C) Guillermo Rodriguez (in stewardship for piker0) | # Copyright (C) Jared Goldman (in stewardship for piker0) | ||||||
| 
 | 
 | ||||||
| # This program is free software: you can redistribute it and/or modify | # This program is free software: you can redistribute it and/or modify | ||||||
| # it under the terms of the GNU Affero General Public License as published by | # it under the terms of the GNU Affero General Public License as published by | ||||||
|  | @ -35,6 +35,7 @@ from tractor import to_asyncio | ||||||
| from cryptofeed import FeedHandler | from cryptofeed import FeedHandler | ||||||
| from cryptofeed.defines import TRADES, L2_BOOK | from cryptofeed.defines import TRADES, L2_BOOK | ||||||
| from cryptofeed.symbols import Symbol | from cryptofeed.symbols import Symbol | ||||||
|  | from cryptofeed.types import OrderBook | ||||||
| import asyncio | import asyncio | ||||||
| 
 | 
 | ||||||
| from piker._cacheables import open_cached_client | from piker._cacheables import open_cached_client | ||||||
|  | @ -53,84 +54,34 @@ _spawn_kwargs = { | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def deribit_timestamp(when): | def fqsn_to_cb_sym(pair_data: Symbol) -> Symbol: | ||||||
|     return int((when.timestamp() * 1000) + (when.microsecond / 1000)) |     return Symbol(base=pair_data["baseCurrency"], quote=pair_data["quoteCurrency"]) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # def str_to_cb_sym(name: str) -> Symbol: | def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, Symbol]) -> str: | ||||||
| #     base, strike_price, expiry_date, option_type = name.split("-") |     pair_data = pairs[fqsn] | ||||||
| # |     return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"] | ||||||
| #     quote = base |  | ||||||
| # |  | ||||||
| #     if option_type == "put": |  | ||||||
| #         option_type = PUT |  | ||||||
| #     elif option_type == "call": |  | ||||||
| #         option_type = CALL |  | ||||||
| #     else: |  | ||||||
| #         raise Exception("Couldn't parse option type") |  | ||||||
| # |  | ||||||
| #     return Symbol( |  | ||||||
| #         base, |  | ||||||
| #         quote, |  | ||||||
| #         type=OPTION, |  | ||||||
| #         strike_price=strike_price, |  | ||||||
| #         option_type=option_type, |  | ||||||
| #         expiry_date=expiry_date, |  | ||||||
| #         expiry_normalize=False, |  | ||||||
| #     ) |  | ||||||
| # |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def piker_sym_to_cb_sym(symbol) -> Symbol: | def pair_data_to_cf_sym(sym_data: Symbol): | ||||||
|     return Symbol( |     return sym_data["baseCurrency"] + "-" + sym_data["quoteCurrency"] | ||||||
|         base=symbol['baseCurrency'], |  | ||||||
|         quote=symbol['quoteCurrency'] |  | ||||||
|     ) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def cb_sym_to_deribit_inst(sym: Symbol): | def cf_sym_to_fqsn(sym: str) -> str: | ||||||
|     # cryptofeed normalized |     return sym.lower().replace("-", "") | ||||||
|     cb_norm = ["F", "G", "H", "J", "K", "M", "N", "Q", "U", "V", "X", "Z"] |  | ||||||
| 
 |  | ||||||
|     # deribit specific |  | ||||||
|     months = [ |  | ||||||
|         "JAN", |  | ||||||
|         "FEB", |  | ||||||
|         "MAR", |  | ||||||
|         "APR", |  | ||||||
|         "MAY", |  | ||||||
|         "JUN", |  | ||||||
|         "JUL", |  | ||||||
|         "AUG", |  | ||||||
|         "SEP", |  | ||||||
|         "OCT", |  | ||||||
|         "NOV", |  | ||||||
|         "DEC", |  | ||||||
|     ] |  | ||||||
| 
 |  | ||||||
|     exp = sym.expiry_date |  | ||||||
| 
 |  | ||||||
|     # YYMDD |  | ||||||
|     # 01234 |  | ||||||
|     year, month, day = (exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) |  | ||||||
| 
 |  | ||||||
|     otype = "C" if sym.option_type == CALL else "P" |  | ||||||
| 
 |  | ||||||
|     return f"{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}" |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def get_config(exchange: str) -> dict[str, Any]: | def get_config(exchange: str) -> dict[str, Any]: | ||||||
|     conf, path = config.load() |     conf, path = config.load() | ||||||
| 
 | 
 | ||||||
|     section = conf.get(exchange.lower()) |     section = conf.get(exchange.lower()) | ||||||
|     breakpoint() |  | ||||||
| 
 | 
 | ||||||
|     # TODO: document why we send this, basically because logging params for cryptofeed |     # TODO: document why we send this, basically because logging params for cryptofeed | ||||||
|     conf["log"] = {} |     conf["log"] = {} | ||||||
|     conf["log"]["disabled"] = True |     conf["log"]["disabled"] = True | ||||||
| 
 | 
 | ||||||
|     if section is None: |     if section is None: | ||||||
|         log.warning(f"No config section found for deribit in {path}") |         log.warning(f"No config section found for deribit in {exchange}") | ||||||
| 
 | 
 | ||||||
|     return conf |     return conf | ||||||
| 
 | 
 | ||||||
|  | @ -145,64 +96,53 @@ async def mk_stream_quotes( | ||||||
|     # startup sync |     # startup sync | ||||||
|     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, |     task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, | ||||||
| ) -> None: | ) -> None: | ||||||
| 
 |  | ||||||
|     # XXX: required to propagate ``tractor`` loglevel to piker logging |     # XXX: required to propagate ``tractor`` loglevel to piker logging | ||||||
|     get_console_log(loglevel or tractor.current_actor().loglevel) |     get_console_log(loglevel or tractor.current_actor().loglevel) | ||||||
| 
 | 
 | ||||||
|     sym = symbols[0] |     sym = symbols[0] | ||||||
| 
 | 
 | ||||||
|     async with (open_cached_client(exchange.lower()) as client, send_chan as send_chan): |     async with (open_cached_client(exchange.lower()) as client, send_chan as send_chan): | ||||||
|         # create init message here |         pairs = await client.cache_pairs() | ||||||
| 
 | 
 | ||||||
|         cache = await client.cache_symbols() |         pair_data = pairs[sym] | ||||||
| 
 |  | ||||||
|         cf_syms = {} |  | ||||||
|         for key, value in cache.items(): |  | ||||||
|             cf_sym = key.lower().replace('-', '') |  | ||||||
|             cf_syms[cf_sym] = value  |  | ||||||
| 
 |  | ||||||
|         cf_sym = cf_syms[sym] |  | ||||||
| 
 |  | ||||||
|         async with maybe_open_price_feed(cf_sym, exchange, channels) as stream: |  | ||||||
| 
 | 
 | ||||||
|  |         async with maybe_open_price_feed(pair_data, exchange, channels) as stream: | ||||||
|             init_msgs = { |             init_msgs = { | ||||||
|                 # pass back token, and bool, signalling if we're the writer |                 # pass back token, and bool, signalling if we're the writer | ||||||
|                 # and that history has been written |                 # and that history has been written | ||||||
|                 sym: { |                 sym: { | ||||||
|                     'symbol_info': { |                     "symbol_info": {"asset_type": "crypto", "price_tick_size": 0.0005}, | ||||||
|                         'asset_type': 'crypto', |                     "shm_write_opts": {"sum_tick_vml": False}, | ||||||
|                         'price_tick_size': 0.0005 |                     "fqsn": sym, | ||||||
|                     }, |  | ||||||
|                     'shm_write_opts': {'sum_tick_vml': False}, |  | ||||||
|                     'fqsn': sym, |  | ||||||
|                 }, |                 }, | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             # broker schemas to validate symbol data |             # broker schemas to validate symbol data | ||||||
|             quote_msg = {"symbol": cf_sym["name"], "last": 0, "ticks": []} |             quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []} | ||||||
| 
 |  | ||||||
|             task_status.started((init_msgs, quote_msg)) |             task_status.started((init_msgs, quote_msg)) | ||||||
| 
 | 
 | ||||||
|             feed_is_live.set() |             feed_is_live.set() | ||||||
| 
 | 
 | ||||||
|             async for typ, quote in stream: |             async for typ, quote in stream: | ||||||
|  |                 print(f'streaming {typ} quote: {quote}') | ||||||
|                 topic = quote["symbol"] |                 topic = quote["symbol"] | ||||||
|                 await send_chan.send({topic: quote}) |                 await send_chan.send({topic: quote}) | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
| @acm | @acm | ||||||
| async def maybe_open_price_feed( | async def maybe_open_price_feed( | ||||||
|     symbol, exchange, channels |     pair_data: Symbol, exchange: str, channels | ||||||
| ) -> trio.abc.ReceiveStream: | ) -> trio.abc.ReceiveStream: | ||||||
|     # TODO: add a predicate to maybe_open_context |     # TODO: add a predicate to maybe_open_context | ||||||
|     # TODO: ensure we can dynamically pass down args here |     # TODO: ensure we can dynamically pass down args here | ||||||
|     async with maybe_open_context( |     async with maybe_open_context( | ||||||
|         acm_func=open_price_feed, |         acm_func=open_price_feed, | ||||||
|         kwargs={ |         kwargs={ | ||||||
|             "symbol": symbol, |             "pair_data": pair_data, | ||||||
|             "exchange": exchange, |             "exchange": exchange, | ||||||
|             "channels": channels, |             "channels": channels, | ||||||
|         }, |         }, | ||||||
|         key=symbol['name'], |         key=pair_data["name"], | ||||||
|     ) as (cache_hit, feed): |     ) as (cache_hit, feed): | ||||||
|         if cache_hit: |         if cache_hit: | ||||||
|             yield broadcast_receiver(feed, 10) |             yield broadcast_receiver(feed, 10) | ||||||
|  | @ -211,10 +151,12 @@ async def maybe_open_price_feed( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
| async def open_price_feed(symbol: str, exchange, channels) -> trio.abc.ReceiveStream: | async def open_price_feed( | ||||||
|  |     pair_data: Symbol, exchange, channels | ||||||
|  | ) -> trio.abc.ReceiveStream: | ||||||
|     async with maybe_open_feed_handler(exchange) as fh: |     async with maybe_open_feed_handler(exchange) as fh: | ||||||
|         async with to_asyncio.open_channel_from( |         async with to_asyncio.open_channel_from( | ||||||
|             partial(aio_price_feed_relay, exchange, channels, fh, symbol) |             partial(aio_price_feed_relay, pair_data, exchange, channels, fh) | ||||||
|         ) as (first, chan): |         ) as (first, chan): | ||||||
|             yield chan |             yield chan | ||||||
| 
 | 
 | ||||||
|  | @ -224,7 +166,7 @@ async def maybe_open_feed_handler(exchange: str) -> trio.abc.ReceiveStream: | ||||||
|     async with maybe_open_context( |     async with maybe_open_context( | ||||||
|         acm_func=open_feed_handler, |         acm_func=open_feed_handler, | ||||||
|         kwargs={ |         kwargs={ | ||||||
|             'exchange': exchange, |             "exchange": exchange, | ||||||
|         }, |         }, | ||||||
|         key="feedhandler", |         key="feedhandler", | ||||||
|     ) as (cache_hit, fh): |     ) as (cache_hit, fh): | ||||||
|  | @ -239,74 +181,78 @@ async def open_feed_handler(exchange: str): | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def aio_price_feed_relay( | async def aio_price_feed_relay( | ||||||
|  |     pair_data: Symbol, | ||||||
|     exchange: str, |     exchange: str, | ||||||
|     channels: list[str], |     channels: list[str], | ||||||
|     fh: FeedHandler, |     fh: FeedHandler, | ||||||
|     symbol: Symbol, |  | ||||||
|     from_trio: asyncio.Queue, |     from_trio: asyncio.Queue, | ||||||
|     to_trio: trio.abc.SendChannel, |     to_trio: trio.abc.SendChannel, | ||||||
| ) -> None: | ) -> None: | ||||||
|     async def _trade(data: dict, receipt_timestamp): |     async def _trade(data: dict, receipt_timestamp): | ||||||
|         breakpoint() |         print(f' trade data: {data}') | ||||||
|         # to_trio.send_nowait( |         to_trio.send_nowait( | ||||||
|         #     ( |             ( | ||||||
|         #         "trade", |                 "trade", | ||||||
|         #         { |                 { | ||||||
|         #             "symbol": cb_sym_to_deribit_inst( |                     "symbol": cf_sym_to_fqsn(data.symbol), | ||||||
|         #                 str_to_cb_sym(data.symbol) |                     "last": float(data.to_dict()['price']), | ||||||
|         #             ).lower(), |                     "broker_ts": time.time(), | ||||||
|         #             "last": data, |                     "data": data.to_dict(), | ||||||
|         #             "broker_ts": time.time(), |                     "receipt": receipt_timestamp, | ||||||
|         #             "data": data.to_dict(), |                 }, | ||||||
|         #             "receipt": receipt_timestamp, |             ) | ||||||
|         #         }, |         ) | ||||||
|         #     ) |  | ||||||
|         # ) |  | ||||||
| 
 | 
 | ||||||
|     async def _l1(data: dict, receipt_timestamp): |     async def _l1(data: dict, receipt_timestamp): | ||||||
|         breakpoint() |         print(f'l2 data: {data}') | ||||||
|         # to_trio.send_nowait( |         bid = data.book.to_dict()['bid'] | ||||||
|         #     ( |         ask = data.book.to_dict()['ask'] | ||||||
|         #         "l1", |         l1_ask_price, l1_ask_size = next(iter(ask.items())) | ||||||
|         #         { |         l1_bid_price, l1_bid_size = next(iter(bid.items())) | ||||||
|         #             "symbol": cb_sym_to_deribit_inst( | 
 | ||||||
|         #                 str_to_cb_sym(data.symbol) |         to_trio.send_nowait( | ||||||
|         #             ).lower(), |             (  | ||||||
|         #             "ticks": [ |                 "l1", | ||||||
|         #                 { |                 { | ||||||
|         #                     "type": "bid", |                     "symbol": cf_sym_to_fqsn(data.symbol), | ||||||
|         #                     "price": float(data.bid_price), |                     "ticks": [ | ||||||
|         #                     "size": float(data.bid_size), |                         { | ||||||
|         #                 }, |                             "type": "bid", | ||||||
|         #                 { |                             "price": float(l1_bid_price), | ||||||
|         #                     "type": "bsize", |                             "size": float(l1_bid_size), | ||||||
|         #                     "price": float(data.bid_price), |                         }, | ||||||
|         #                     "size": float(data.bid_size), |                         { | ||||||
|         #                 }, |                             "type": "bsize", | ||||||
|         #                 { |                             "price": float(l1_bid_price), | ||||||
|         #                     "type": "ask", |                             "size": float(l1_bid_size), | ||||||
|         #                     "price": float(data.ask_price), |                         }, | ||||||
|         #                     "size": float(data.ask_size), |                         { | ||||||
|         #                 }, |                             "type": "ask", | ||||||
|         #                 { |                             "price": float(l1_ask_price), | ||||||
|         #                     "type": "asize", |                             "size": float(l1_ask_size), | ||||||
|         #                     "price": float(data.ask_price), |                         }, | ||||||
|         #                     "size": float(data.ask_size), |                         { | ||||||
|         #                 }, |                             "type": "asize", | ||||||
|         #             ], |                             "price": float(l1_ask_price), | ||||||
|         #         }, |                             "size": float(l1_ask_size), | ||||||
|         #     ) |                         }, | ||||||
|         # ) |                     ] | ||||||
|  |                 } | ||||||
|  |             ) | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|     fh.add_feed( |     fh.add_feed( | ||||||
|         exchange, |         exchange, | ||||||
|         channels=channels, |         channels=channels, | ||||||
|         symbols=[piker_sym_to_cb_sym(symbol)], |         symbols=[pair_data_to_cf_sym(pair_data)], | ||||||
|         callbacks={TRADES: _trade, L2_BOOK: _l1}, |         callbacks={TRADES: _trade, L2_BOOK: _l1} | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     if not fh.running: |     if not fh.running: | ||||||
|         fh.run(start_loop=False, install_signal_handlers=False) |         try: | ||||||
| 
 |             fh.run(start_loop=False, install_signal_handlers=False) | ||||||
|  |         except BaseExceptionGroup as e: | ||||||
|  |             breakpoint() | ||||||
|     # sync with trio |     # sync with trio | ||||||
|     to_trio.send_nowait(None) |     to_trio.send_nowait(None) | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue