WIP - setup basic history and streaming client

emit_clear_ticks_only_on_ts_change
jaredgoldman 2023-03-02 23:01:17 -05:00
parent f3f8aeae8d
commit 79feb1073d
3 changed files with 273 additions and 182 deletions

View File

@ -24,7 +24,7 @@ __brokers__ = [
'binance',
'ib',
'kraken',
'kucoin'
# broken but used to work
# 'questrade',
# 'robinhood',
@ -35,7 +35,6 @@ __brokers__ = [
# iex
# deribit
# kucoin
# bitso
]

View File

@ -15,89 +15,196 @@
"""
from dataclasses import field
from typing import Any, Optional, Literal
from contextlib import asynccontextmanager as acm
from datetime import datetime
import time
import math
from os import path, walk
import asks
import tractor
import trio
from trio_typing import TaskStatus
from fuzzywuzzy import process as fuzzy
from cryptofeed.defines import (
KUCOIN,
TRADES,
L2_BOOK
from cryptofeed.defines import KUCOIN, TRADES, L2_BOOK
from cryptofeed.symbols import Symbol
import pendulum
import numpy as np
from piker.data.cryptofeeds import (
fqsn_to_cf_sym,
mk_stream_quotes,
get_config,
)
from piker.data.cryptofeeds import mk_stream_quotes
from piker._cacheables import open_cached_client
from piker.log import get_logger
from ._util import SymbolNotFound
from piker.pp import config
from ._util import DataUnavailable
_spawn_kwargs = {
"infect_asyncio": True,
}
log = get_logger(__name__)
_ohlc_dtype = [
('index', int),
('time', int),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', float),
('bar_wap', float), # will be zeroed by sampler if not filled
]
class Client:
def __init__(self) -> None:
self._pairs: dict[str, Any] = None
self._pairs: dict[str, Symbol] = {}
self._bars: list[list] = []
# TODO" Shouldn't have to write kucoin twice here
# config = get_config('kucoin').get('kucoin', {})
config = get_config("kucoin").get("kucoin", {})
#
# if ('key_id' in config) and ('key_secret' in config):
# self._key_id = config['key_id']
# self._key_secret = config['key_secret']
#
# else:
# self._key_id = None
# self._key_secret = None
async def request(self, action: Literal["POST", "GET", "PUT", "DELETE"], route: str, api_v: str = 'v2'):
if ("key_id" in config) and ("key_secret" in config):
self._key_id = config["key_id"]
self._key_secret = config["key_secret"]
else:
self._key_id = None
self._key_secret = None
async def _request(
self,
action: Literal["POST", "GET", "PUT", "DELETE"],
route: str,
api_v: str = "v2",
) -> Any:
api_url = f"https://api.kucoin.com/api/{api_v}{route}"
res = await asks.request(action, api_url)
return res.json()['data']
#breakpoint()
try:
return res.json()["data"]
except KeyError as e:
print(f'KUCOIN ERROR: {res.json()["msg"]}')
breakpoint()
async def symbol_info(
async def get_pairs(
self,
sym: str = None,
) -> dict[str, Any]:
if self._pairs:
return self._pairs
entries = await self.request("GET", "/symbols")
if not entries:
raise SymbolNotFound(f'{sym} not found')
syms = {item['name']: item for item in entries}
entries = await self._request("GET", "/symbols")
syms = {item["name"]: item for item in entries}
return syms
async def cache_symbols(
async def cache_pairs(
self,
) -> dict:
normalize: bool = True,
) -> dict[str, Symbol]:
if not self._pairs:
self._pairs = await self.symbol_info()
self._pairs = await self.get_pairs()
if normalize:
self._pairs = self.normalize_pairs(self._pairs)
return self._pairs
def normalize_pairs(self, pairs: dict[str, Symbol]) -> dict[str, Symbol]:
"""
Map crypfeeds symbols to fqsn strings
"""
norm_pairs = {}
for key, value in pairs.items():
fqsn = key.lower().replace("-", "")
norm_pairs[fqsn] = value
return norm_pairs
async def search_symbols(
self,
pattern: str,
limit: int = 30,
) -> dict[str, Any]:
data = await self.symbol_info()
data = await self.get_pairs()
matches = fuzzy.extractBests(pattern, data, score_cutoff=35, limit=limit)
# repack in dict form
return {item[0]["instrument_name"].lower(): item[0] for item in matches}
return {item[0]["name"].lower(): item[0] for item in matches}
async def get_bars(
self,
fqsn: str,
start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None,
limit: int = 1000,
as_np: bool = True,
type: str = "1min",
):
if len(self._bars):
return self._bars
if end_dt is None:
end_dt = pendulum.now("UTC").add(minutes=1)
if start_dt is None:
start_dt = end_dt.start_of("minute").subtract(minutes=limit)
# Format datetime to unix
start_dt = math.trunc(time.mktime(start_dt.timetuple()))
end_dt = math.trunc(time.mktime(end_dt.timetuple()))
kucoin_sym = fqsn_to_cf_sym(fqsn, self._pairs)
url = f"/market/candles?type={type}&symbol={kucoin_sym}&startAt={start_dt}&endAt={end_dt}"
bars = await self._request(
"GET",
url,
api_v="v1",
)
new_bars = []
for i, bar in enumerate(bars[::-1]):
# TODO: implement struct/typecasting/validation here
data = {
'index': i,
'time': bar[0],
'open': bar[1],
'close': bar[2],
'high': bar[3],
'low': bar[4],
'volume': bar[5],
'amount': bar [6],
'bar_wap': 0.0,
}
row = []
for j, (field_name, field_type) in enumerate(_ohlc_dtype):
value = data[field_name]
match field_name:
case 'index' | 'time':
row.append(int(value))
# case 'time':
# dt_from_unix_ts = datetime.utcfromtimestamp(int(value))
# # convert unix time to epoch seconds
# row.append(int(dt_from_unix_ts.timestamp()))
case _:
row.append(float(value))
new_bars.append(tuple(row))
self._bars = array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars
return array
@acm
async def get_client():
client = Client()
# Do we need to open a nursery here?
await client.cache_symbols()
await client.cache_pairs()
yield client
@ -107,13 +214,13 @@ async def open_symbol_search(
):
async with open_cached_client("kucoin") as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
cache = await client.cache_pairs()
await ctx.started()
# async with ctx.open_stream() as stream:
# async for pattern in stream:
# # repack in dict form
# await stream.send(await client.search_symbols(pattern))
async with ctx.open_stream() as stream:
async for pattern in stream:
# repack in dict form
await stream.send(await client.search_symbols(pattern))
async def stream_quotes(
@ -126,7 +233,7 @@ async def stream_quotes(
):
return await mk_stream_quotes(
KUCOIN,
[L2_BOOK],
[L2_BOOK, TRADES],
send_chan,
symbols,
feed_is_live,
@ -134,5 +241,44 @@ async def stream_quotes(
task_status,
)
async def open_history_client():
@acm
async def open_history_client(
symbol: str,
type: str = "1m",
):
async with open_cached_client("kucoin") as client:
# call bars on kucoin
async def get_ohlc_history(
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[
np.ndarray,
datetime | None, # start
datetime | None, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.get_bars(
symbol,
start_dt=start_dt,
end_dt=end_dt,
)
times = array['time']
if (
end_dt is None
):
inow = round(time.time())
print(f'difference in time between load and processing {inow - times[-1]}')
if (inow - times[-1]) > 60:
await tractor.breakpoint()
start_dt = pendulum.from_timestamp(times[0])
end_dt = pendulum.from_timestamp(times[-1])
return array, start_dt, end_dt
yield get_ohlc_history, {'erlangs': 3, 'rate': 3}

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# Copyright (C) Jared Goldman (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -35,6 +35,7 @@ from tractor import to_asyncio
from cryptofeed import FeedHandler
from cryptofeed.defines import TRADES, L2_BOOK
from cryptofeed.symbols import Symbol
from cryptofeed.types import OrderBook
import asyncio
from piker._cacheables import open_cached_client
@ -53,84 +54,34 @@ _spawn_kwargs = {
log = get_logger(__name__)
def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def fqsn_to_cb_sym(pair_data: Symbol) -> Symbol:
return Symbol(base=pair_data["baseCurrency"], quote=pair_data["quoteCurrency"])
# def str_to_cb_sym(name: str) -> Symbol:
# base, strike_price, expiry_date, option_type = name.split("-")
#
# quote = base
#
# if option_type == "put":
# option_type = PUT
# elif option_type == "call":
# option_type = CALL
# else:
# raise Exception("Couldn't parse option type")
#
# return Symbol(
# base,
# quote,
# type=OPTION,
# strike_price=strike_price,
# option_type=option_type,
# expiry_date=expiry_date,
# expiry_normalize=False,
# )
#
def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, Symbol]) -> str:
pair_data = pairs[fqsn]
return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"]
def piker_sym_to_cb_sym(symbol) -> Symbol:
return Symbol(
base=symbol['baseCurrency'],
quote=symbol['quoteCurrency']
)
def pair_data_to_cf_sym(sym_data: Symbol):
return sym_data["baseCurrency"] + "-" + sym_data["quoteCurrency"]
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ["F", "G", "H", "J", "K", "M", "N", "Q", "U", "V", "X", "Z"]
# deribit specific
months = [
"JAN",
"FEB",
"MAR",
"APR",
"MAY",
"JUN",
"JUL",
"AUG",
"SEP",
"OCT",
"NOV",
"DEC",
]
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = "C" if sym.option_type == CALL else "P"
return f"{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}"
def cf_sym_to_fqsn(sym: str) -> str:
return sym.lower().replace("-", "")
def get_config(exchange: str) -> dict[str, Any]:
conf, path = config.load()
section = conf.get(exchange.lower())
breakpoint()
# TODO: document why we send this, basically because logging params for cryptofeed
conf["log"] = {}
conf["log"]["disabled"] = True
if section is None:
log.warning(f"No config section found for deribit in {path}")
log.warning(f"No config section found for deribit in {exchange}")
return conf
@ -145,64 +96,53 @@ async def mk_stream_quotes(
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
sym = symbols[0]
async with (open_cached_client(exchange.lower()) as client, send_chan as send_chan):
# create init message here
pairs = await client.cache_pairs()
cache = await client.cache_symbols()
cf_syms = {}
for key, value in cache.items():
cf_sym = key.lower().replace('-', '')
cf_syms[cf_sym] = value
cf_sym = cf_syms[sym]
async with maybe_open_price_feed(cf_sym, exchange, channels) as stream:
pair_data = pairs[sym]
async with maybe_open_price_feed(pair_data, exchange, channels) as stream:
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'crypto',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
"symbol_info": {"asset_type": "crypto", "price_tick_size": 0.0005},
"shm_write_opts": {"sum_tick_vml": False},
"fqsn": sym,
},
}
# broker schemas to validate symbol data
quote_msg = {"symbol": cf_sym["name"], "last": 0, "ticks": []}
quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []}
task_status.started((init_msgs, quote_msg))
feed_is_live.set()
async for typ, quote in stream:
print(f'streaming {typ} quote: {quote}')
topic = quote["symbol"]
await send_chan.send({topic: quote})
@acm
async def maybe_open_price_feed(
symbol, exchange, channels
pair_data: Symbol, exchange: str, channels
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
# TODO: ensure we can dynamically pass down args here
async with maybe_open_context(
acm_func=open_price_feed,
kwargs={
"symbol": symbol,
"pair_data": pair_data,
"exchange": exchange,
"channels": channels,
},
key=symbol['name'],
key=pair_data["name"],
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
@ -211,10 +151,12 @@ async def maybe_open_price_feed(
@acm
async def open_price_feed(symbol: str, exchange, channels) -> trio.abc.ReceiveStream:
async def open_price_feed(
pair_data: Symbol, exchange, channels
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler(exchange) as fh:
async with to_asyncio.open_channel_from(
partial(aio_price_feed_relay, exchange, channels, fh, symbol)
partial(aio_price_feed_relay, pair_data, exchange, channels, fh)
) as (first, chan):
yield chan
@ -224,7 +166,7 @@ async def maybe_open_feed_handler(exchange: str) -> trio.abc.ReceiveStream:
async with maybe_open_context(
acm_func=open_feed_handler,
kwargs={
'exchange': exchange,
"exchange": exchange,
},
key="feedhandler",
) as (cache_hit, fh):
@ -239,74 +181,78 @@ async def open_feed_handler(exchange: str):
async def aio_price_feed_relay(
pair_data: Symbol,
exchange: str,
channels: list[str],
fh: FeedHandler,
symbol: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
breakpoint()
# to_trio.send_nowait(
# (
# "trade",
# {
# "symbol": cb_sym_to_deribit_inst(
# str_to_cb_sym(data.symbol)
# ).lower(),
# "last": data,
# "broker_ts": time.time(),
# "data": data.to_dict(),
# "receipt": receipt_timestamp,
# },
# )
# )
print(f' trade data: {data}')
to_trio.send_nowait(
(
"trade",
{
"symbol": cf_sym_to_fqsn(data.symbol),
"last": float(data.to_dict()['price']),
"broker_ts": time.time(),
"data": data.to_dict(),
"receipt": receipt_timestamp,
},
)
)
async def _l1(data: dict, receipt_timestamp):
breakpoint()
# to_trio.send_nowait(
# (
# "l1",
# {
# "symbol": cb_sym_to_deribit_inst(
# str_to_cb_sym(data.symbol)
# ).lower(),
# "ticks": [
# {
# "type": "bid",
# "price": float(data.bid_price),
# "size": float(data.bid_size),
# },
# {
# "type": "bsize",
# "price": float(data.bid_price),
# "size": float(data.bid_size),
# },
# {
# "type": "ask",
# "price": float(data.ask_price),
# "size": float(data.ask_size),
# },
# {
# "type": "asize",
# "price": float(data.ask_price),
# "size": float(data.ask_size),
# },
# ],
# },
# )
# )
print(f'l2 data: {data}')
bid = data.book.to_dict()['bid']
ask = data.book.to_dict()['ask']
l1_ask_price, l1_ask_size = next(iter(ask.items()))
l1_bid_price, l1_bid_size = next(iter(bid.items()))
to_trio.send_nowait(
(
"l1",
{
"symbol": cf_sym_to_fqsn(data.symbol),
"ticks": [
{
"type": "bid",
"price": float(l1_bid_price),
"size": float(l1_bid_size),
},
{
"type": "bsize",
"price": float(l1_bid_price),
"size": float(l1_bid_size),
},
{
"type": "ask",
"price": float(l1_ask_price),
"size": float(l1_ask_size),
},
{
"type": "asize",
"price": float(l1_ask_price),
"size": float(l1_ask_size),
},
]
}
)
)
fh.add_feed(
exchange,
channels=channels,
symbols=[piker_sym_to_cb_sym(symbol)],
callbacks={TRADES: _trade, L2_BOOK: _l1},
symbols=[pair_data_to_cf_sym(pair_data)],
callbacks={TRADES: _trade, L2_BOOK: _l1}
)
if not fh.running:
fh.run(start_loop=False, install_signal_handlers=False)
try:
fh.run(start_loop=False, install_signal_handlers=False)
except BaseExceptionGroup as e:
breakpoint()
# sync with trio
to_trio.send_nowait(None)