go_httpx_binance #5

Closed
ntorres wants to merge 4 commits from go_httpx_binance into go_httpx
6 changed files with 177 additions and 110 deletions

View File

@ -50,7 +50,7 @@ __brokers__: list[str] = [
'binance',
'ib',
'kraken',
'kucoin'
'kucoin',
# broken but used to work
# 'questrade',
@ -71,7 +71,7 @@ def get_brokermod(brokername: str) -> ModuleType:
Return the imported broker module by name.
'''
module = import_module('.' + brokername, 'piker.brokers')
module: ModuleType = import_module('.' + brokername, 'piker.brokers')
# we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1]
return module

View File

@ -18,10 +18,11 @@
Handy cross-broker utils.
"""
from __future__ import annotations
from functools import partial
import json
import asks
import httpx
import logging
from ..log import (
@ -60,11 +61,11 @@ class NoData(BrokerError):
def __init__(
self,
*args,
info: dict,
info: dict|None = None,
) -> None:
super().__init__(*args)
self.info: dict = info
self.info: dict|None = info
# when raised, machinery can check if the backend
# set a "frame size" for doing datetime calcs.
@ -90,16 +91,18 @@ class DataThrottle(BrokerError):
def resproc(
resp: asks.response_objects.Response,
resp: httpx.Response,
log: logging.Logger,
return_json: bool = True,
log_resp: bool = False,
) -> asks.response_objects.Response:
"""Process response and return its json content.
) -> httpx.Response:
'''
Process response and return its json content.
Raise the appropriate error on non-200 OK responses.
"""
'''
if not resp.status_code == 200:
raise BrokerError(resp.body)
try:

View File

@ -43,7 +43,7 @@ import trio
from pendulum import (
now,
)
import asks
import httpx
from rapidfuzz import process as fuzzy
import numpy as np
@ -147,7 +147,7 @@ def binance_timestamp(
class Client:
'''
Async ReST API client using ``trio`` + ``asks`` B)
Async ReST API client using ``trio`` + ``httpx`` B)
Supports all of the spot, margin and futures endpoints depending
on method.
@ -158,6 +158,7 @@ class Client:
# TODO: change this to `Client.[mkt_]venue: MarketType`?
mkt_mode: MarketType = 'spot',
httpx_client: httpx.AsyncClient,
) -> None:
# build out pair info tables for each market type
@ -186,23 +187,7 @@ class Client:
# market symbols for use by search. See `.exch_info()`.
self._pairs: ChainMap[str, Pair] = ChainMap()
# spot EPs sesh
self._sesh = asks.Session(connections=4)
self._sesh.base_location: str = _spot_url
# spot testnet
self._test_sesh: asks.Session = asks.Session(connections=4)
self._test_sesh.base_location: str = _testnet_spot_url
# margin and extended spot endpoints session.
self._sapi_sesh = asks.Session(connections=4)
self._sapi_sesh.base_location: str = _spot_url
# futes EPs sesh
self._fapi_sesh = asks.Session(connections=4)
self._fapi_sesh.base_location: str = _futes_url
# futes testnet
self._test_fapi_sesh: asks.Session = asks.Session(connections=4)
self._test_fapi_sesh.base_location: str = _testnet_futes_url
self._create_sessions(httpx_client)
# global client "venue selection" mode.
# set this when you want to switch venues and not have to
@ -212,7 +197,7 @@ class Client:
# per 8
self.venue_sesh: dict[
str, # venue key
tuple[asks.Session, str] # session, eps path
tuple[httpx.AsyncClient, str] # session, eps path
] = {
'spot': (self._sesh, '/api/v3/'),
'spot_testnet': (self._test_sesh, '/fapi/v1/'),
@ -242,12 +227,21 @@ class Client:
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
self.conf: dict = get_config()
self._setup_api_keys()
def _setup_api_keys(
self
) -> None:
"""
Set up API keys for the configured venues and sessions.
"""
for key, subconf in self.conf.items():
if api_key := subconf.get('api_key', ''):
venue_keys: list[str] = self.confkey2venuekeys[key]
venue_key: str
sesh: asks.Session
sesh: httpx.AsyncClient
for venue_key in venue_keys:
sesh, _ = self.venue_sesh[venue_key]
@ -272,6 +266,26 @@ class Client:
]
testnet_sesh.headers.update(api_key_header)
def _create_sessions(
self,
httpx_client: httpx.AsyncClient
) -> None:
"""
Create the necessary AsyncClient sessions for different endpoints.
"""
# spot EPs sesh
self._sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_spot_url)
# spot testnet
self._test_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=__testnet_spot_url)
# margin and extended spot endpoints session.
self._sapi_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_spot_url)
# futes EPs sesh
self._fapi_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_futes_url)
# futes testnet
self._test_fapi_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_testnet_futes_url)
def _mk_sig(
self,
data: dict,
@ -360,7 +374,7 @@ class Client:
venue=venue_key,
)
sesh: asks.Session
sesh: httpx.AsyncClient
path: str
# Check if we're configured to route order requests to the

View File

@ -27,8 +27,8 @@ from typing import (
)
import time
import httpx
import pendulum
import asks
import numpy as np
import urllib.parse
import hashlib
@ -60,6 +60,11 @@ log = get_logger('piker.brokers.kraken')
# <uri>/<version>/
_url = 'https://api.kraken.com/0'
_headers: dict[str, str] = {
'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
}
# TODO: this is the only backend providing this right?
# in which case we should drop it from the defaults and
# instead make a custom fields descr in this module!
@ -135,16 +140,15 @@ class Client:
def __init__(
self,
config: dict[str, str],
httpx_client: httpx.AsyncClient,
name: str = '',
api_key: str = '',
secret: str = ''
) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url
self._sesh.headers.update({
'User-Agent':
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
self._sesh: httpx.AsyncClient = httpx_client
self._name = name
self._api_key = api_key
self._secret = secret
@ -166,10 +170,9 @@ class Client:
method: str,
data: dict,
) -> dict[str, Any]:
resp = await self._sesh.post(
path=f'/public/{method}',
resp: httpx.Response = await self._sesh.post(
url=f'/public/{method}',
json=data,
timeout=float('inf')
)
return resproc(resp, log)
@ -180,18 +183,18 @@ class Client:
uri_path: str
) -> dict[str, Any]:
headers = {
'Content-Type':
'application/x-www-form-urlencoded',
'API-Key':
self._api_key,
'API-Sign':
get_kraken_signature(uri_path, data, self._secret)
'Content-Type': 'application/x-www-form-urlencoded',
'API-Key': self._api_key,
'API-Sign': get_kraken_signature(
uri_path,
data,
self._secret,
),
}
resp = await self._sesh.post(
path=f'/private/{method}',
resp: httpx.Response = await self._sesh.post(
url=f'/private/{method}',
data=data,
headers=headers,
timeout=float('inf')
)
return resproc(resp, log)
@ -665,24 +668,36 @@ class Client:
@acm
async def get_client() -> Client:
conf = get_config()
if conf:
client = Client(
conf,
conf: dict[str, Any] = get_config()
async with httpx.AsyncClient(
base_url=_url,
headers=_headers,
# TODO: don't break these up and just do internal
# conf lookups instead..
name=conf['key_descr'],
api_key=conf['api_key'],
secret=conf['secret']
)
else:
client = Client({})
# TODO: is there a way to numerate this?
# https://www.python-httpx.org/advanced/clients/#why-use-a-client
# connections=4
) as trio_client:
if conf:
client = Client(
conf,
httpx_client=trio_client,
# at startup, load all symbols, and asset info in
# batch requests.
async with trio.open_nursery() as nurse:
nurse.start_soon(client.get_assets)
await client.get_mkt_pairs()
# TODO: don't break these up and just do internal
# conf lookups instead..
name=conf['key_descr'],
api_key=conf['api_key'],
secret=conf['secret']
)
else:
client = Client(
conf={},
httpx_client=trio_client,
)
yield client
# at startup, load all symbols, and asset info in
# batch requests.
async with trio.open_nursery() as nurse:
nurse.start_soon(client.get_assets)
await client.get_mkt_pairs()
yield client

View File

@ -612,18 +612,18 @@ async def open_trade_dialog(
# enter relay loop
await handle_order_updates(
client,
ws,
stream,
ems_stream,
apiflows,
ids,
reqids2txids,
acnt,
api_trans,
acctid,
acc_name,
token,
client=client,
ws=ws,
ws_stream=stream,
ems_stream=ems_stream,
apiflows=apiflows,
ids=ids,
reqids2txids=reqids2txids,
acnt=acnt,
ledger=ledger,
acctid=acctid,
acc_name=acc_name,
token=token,
)
@ -639,7 +639,8 @@ async def handle_order_updates(
# transaction records which will be updated
# on new trade clearing events (aka order "fills")
ledger_trans: dict[str, Transaction],
ledger: TransactionLedger,
# ledger_trans: dict[str, Transaction],
acctid: str,
acc_name: str,
token: str,
@ -699,7 +700,8 @@ async def handle_order_updates(
# if tid not in ledger_trans
}
for tid, trade in trades.items():
assert tid not in ledger_trans
# assert tid not in ledger_trans
assert tid not in ledger
txid = trade['ordertxid']
reqid = trade.get('userref')
@ -747,11 +749,17 @@ async def handle_order_updates(
client,
api_name_set='wsname',
)
ppmsgs = trades2pps(
acnt,
acctid,
new_trans,
ppmsgs: list[BrokerdPosition] = trades2pps(
acnt=acnt,
ledger=ledger,
acctid=acctid,
new_trans=new_trans,
)
# ppmsgs = trades2pps(
# acnt,
# acctid,
# new_trans,
# )
for pp_msg in ppmsgs:
await ems_stream.send(pp_msg)

View File

@ -16,10 +16,9 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Kucoin broker backend
Kucoin cex API backend.
'''
from contextlib import (
asynccontextmanager as acm,
aclosing,
@ -42,7 +41,7 @@ import wsproto
from uuid import uuid4
from trio_typing import TaskStatus
import asks
import httpx
from bidict import bidict
import numpy as np
import pendulum
@ -212,8 +211,12 @@ def get_config() -> BrokerConfig | None:
class Client:
def __init__(self) -> None:
self._config: BrokerConfig | None = get_config()
def __init__(
self,
httpx_client: httpx.AsyncClient,
) -> None:
self._http: httpx.AsyncClient = httpx_client
self._config: BrokerConfig|None = get_config()
self._pairs: dict[str, KucoinMktPair] = {}
self._fqmes2mktids: bidict[str, str] = bidict()
self._bars: list[list[float]] = []
@ -227,18 +230,24 @@ class Client:
) -> dict[str, str | bytes]:
'''
Generate authenticated request headers
Generate authenticated request headers:
https://docs.kucoin.com/#authentication
https://www.kucoin.com/docs/basic-info/connection-method/authentication/creating-a-request
https://www.kucoin.com/docs/basic-info/connection-method/authentication/signing-a-message
'''
if not self._config:
raise ValueError(
'No config found when trying to send authenticated request')
'No config found when trying to send authenticated request'
)
str_to_sign = (
str(int(time.time() * 1000))
+ action + f'/api/{api}/{endpoint.lstrip("/")}'
+
action
+
f'/api/{api}/{endpoint.lstrip("/")}'
)
signature = base64.b64encode(
@ -249,6 +258,7 @@ class Client:
).digest()
)
# TODO: can we cache this between calls?
passphrase = base64.b64encode(
hmac.new(
self._config.key_secret.encode('utf-8'),
@ -270,8 +280,10 @@ class Client:
self,
action: Literal['POST', 'GET'],
endpoint: str,
api: str = 'v2',
headers: dict = {},
) -> Any:
'''
Generic request wrapper for Kucoin API
@ -284,13 +296,17 @@ class Client:
api,
)
api_url = f'https://api.kucoin.com/api/{api}/{endpoint}'
res = await asks.request(action, api_url, headers=headers)
json = res.json()
if 'data' in json:
return json['data']
req_meth: Callable = getattr(
self._http,
action.lower(),
)
res = await req_meth(
url=f'/{api}/{endpoint}',
headers=headers,
)
json: dict = res.json()
if data := json.get('data'):
return data
else:
log.error(
f'Error making request to {api_url} ->\n'
@ -311,7 +327,7 @@ class Client:
'''
token_type = 'private' if private else 'public'
try:
data: dict[str, Any] | None = await self._request(
data: dict[str, Any]|None = await self._request(
'POST',
endpoint=f'bullet-{token_type}',
api='v1'
@ -349,8 +365,8 @@ class Client:
currencies: dict[str, Currency] = {}
entries: list[dict] = await self._request(
'GET',
api='v1',
endpoint='currencies',
api='v1',
)
for entry in entries:
curr = Currency(**entry).copy()
@ -366,7 +382,10 @@ class Client:
dict[str, KucoinMktPair],
bidict[str, KucoinMktPair],
]:
entries = await self._request('GET', 'symbols')
entries = await self._request(
'GET',
endpoint='symbols',
)
log.info(f' {len(entries)} Kucoin market pairs fetched')
pairs: dict[str, KucoinMktPair] = {}
@ -567,13 +586,21 @@ def fqme_to_kucoin_sym(
@acm
async def get_client() -> AsyncGenerator[Client, None]:
client = Client()
'''
Load an API `Client` preconfigured from user settings
async with trio.open_nursery() as n:
n.start_soon(client.get_mkt_pairs)
await client.get_currencies()
'''
async with (
httpx.AsyncClient(
base_url=f'https://api.kucoin.com/api',
) as trio_client,
):
client = Client(httpx_client=trio_client)
async with trio.open_nursery() as tn:
tn.start_soon(client.get_mkt_pairs)
await client.get_currencies()
yield client
yield client
@tractor.context