go_httpx_binance #5
|
@ -50,7 +50,7 @@ __brokers__: list[str] = [
|
|||
'binance',
|
||||
'ib',
|
||||
'kraken',
|
||||
'kucoin'
|
||||
'kucoin',
|
||||
|
||||
# broken but used to work
|
||||
# 'questrade',
|
||||
|
@ -71,7 +71,7 @@ def get_brokermod(brokername: str) -> ModuleType:
|
|||
Return the imported broker module by name.
|
||||
|
||||
'''
|
||||
module = import_module('.' + brokername, 'piker.brokers')
|
||||
module: ModuleType = import_module('.' + brokername, 'piker.brokers')
|
||||
# we only allow monkeying because it's for internal keying
|
||||
module.name = module.__name__.split('.')[-1]
|
||||
return module
|
||||
|
|
|
@ -18,10 +18,11 @@
|
|||
Handy cross-broker utils.
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from functools import partial
|
||||
|
||||
import json
|
||||
import asks
|
||||
import httpx
|
||||
import logging
|
||||
|
||||
from ..log import (
|
||||
|
@ -60,11 +61,11 @@ class NoData(BrokerError):
|
|||
def __init__(
|
||||
self,
|
||||
*args,
|
||||
info: dict,
|
||||
info: dict|None = None,
|
||||
|
||||
) -> None:
|
||||
super().__init__(*args)
|
||||
self.info: dict = info
|
||||
self.info: dict|None = info
|
||||
|
||||
# when raised, machinery can check if the backend
|
||||
# set a "frame size" for doing datetime calcs.
|
||||
|
@ -90,16 +91,18 @@ class DataThrottle(BrokerError):
|
|||
|
||||
|
||||
def resproc(
|
||||
resp: asks.response_objects.Response,
|
||||
resp: httpx.Response,
|
||||
log: logging.Logger,
|
||||
return_json: bool = True,
|
||||
log_resp: bool = False,
|
||||
|
||||
) -> asks.response_objects.Response:
|
||||
"""Process response and return its json content.
|
||||
) -> httpx.Response:
|
||||
'''
|
||||
Process response and return its json content.
|
||||
|
||||
Raise the appropriate error on non-200 OK responses.
|
||||
"""
|
||||
|
||||
'''
|
||||
if not resp.status_code == 200:
|
||||
raise BrokerError(resp.body)
|
||||
try:
|
||||
|
|
|
@ -43,7 +43,7 @@ import trio
|
|||
from pendulum import (
|
||||
now,
|
||||
)
|
||||
import asks
|
||||
import httpx
|
||||
from rapidfuzz import process as fuzzy
|
||||
import numpy as np
|
||||
|
||||
|
@ -147,7 +147,7 @@ def binance_timestamp(
|
|||
|
||||
class Client:
|
||||
'''
|
||||
Async ReST API client using ``trio`` + ``asks`` B)
|
||||
Async ReST API client using ``trio`` + ``httpx`` B)
|
||||
|
||||
Supports all of the spot, margin and futures endpoints depending
|
||||
on method.
|
||||
|
@ -158,6 +158,7 @@ class Client:
|
|||
|
||||
# TODO: change this to `Client.[mkt_]venue: MarketType`?
|
||||
mkt_mode: MarketType = 'spot',
|
||||
httpx_client: httpx.AsyncClient,
|
||||
|
||||
) -> None:
|
||||
# build out pair info tables for each market type
|
||||
|
@ -186,23 +187,7 @@ class Client:
|
|||
# market symbols for use by search. See `.exch_info()`.
|
||||
self._pairs: ChainMap[str, Pair] = ChainMap()
|
||||
|
||||
# spot EPs sesh
|
||||
self._sesh = asks.Session(connections=4)
|
||||
self._sesh.base_location: str = _spot_url
|
||||
# spot testnet
|
||||
self._test_sesh: asks.Session = asks.Session(connections=4)
|
||||
self._test_sesh.base_location: str = _testnet_spot_url
|
||||
|
||||
# margin and extended spot endpoints session.
|
||||
self._sapi_sesh = asks.Session(connections=4)
|
||||
self._sapi_sesh.base_location: str = _spot_url
|
||||
|
||||
# futes EPs sesh
|
||||
self._fapi_sesh = asks.Session(connections=4)
|
||||
self._fapi_sesh.base_location: str = _futes_url
|
||||
# futes testnet
|
||||
self._test_fapi_sesh: asks.Session = asks.Session(connections=4)
|
||||
self._test_fapi_sesh.base_location: str = _testnet_futes_url
|
||||
self._create_sessions(httpx_client)
|
||||
|
||||
# global client "venue selection" mode.
|
||||
# set this when you want to switch venues and not have to
|
||||
|
@ -212,7 +197,7 @@ class Client:
|
|||
# per 8
|
||||
self.venue_sesh: dict[
|
||||
str, # venue key
|
||||
tuple[asks.Session, str] # session, eps path
|
||||
tuple[httpx.AsyncClient, str] # session, eps path
|
||||
] = {
|
||||
'spot': (self._sesh, '/api/v3/'),
|
||||
'spot_testnet': (self._test_sesh, '/fapi/v1/'),
|
||||
|
@ -242,12 +227,21 @@ class Client:
|
|||
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
|
||||
self.conf: dict = get_config()
|
||||
|
||||
self._setup_api_keys()
|
||||
|
||||
def _setup_api_keys(
|
||||
self
|
||||
|
||||
) -> None:
|
||||
"""
|
||||
Set up API keys for the configured venues and sessions.
|
||||
"""
|
||||
for key, subconf in self.conf.items():
|
||||
if api_key := subconf.get('api_key', ''):
|
||||
venue_keys: list[str] = self.confkey2venuekeys[key]
|
||||
|
||||
venue_key: str
|
||||
sesh: asks.Session
|
||||
sesh: httpx.AsyncClient
|
||||
for venue_key in venue_keys:
|
||||
sesh, _ = self.venue_sesh[venue_key]
|
||||
|
||||
|
@ -272,6 +266,26 @@ class Client:
|
|||
]
|
||||
testnet_sesh.headers.update(api_key_header)
|
||||
|
||||
def _create_sessions(
|
||||
self,
|
||||
httpx_client: httpx.AsyncClient
|
||||
|
||||
) -> None:
|
||||
"""
|
||||
Create the necessary AsyncClient sessions for different endpoints.
|
||||
"""
|
||||
# spot EPs sesh
|
||||
self._sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_spot_url)
|
||||
# spot testnet
|
||||
self._test_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=__testnet_spot_url)
|
||||
# margin and extended spot endpoints session.
|
||||
self._sapi_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_spot_url)
|
||||
# futes EPs sesh
|
||||
self._fapi_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_futes_url)
|
||||
# futes testnet
|
||||
self._test_fapi_sesh: httpx.AsyncClient = httpx_client.AsyncClient(base_url=_testnet_futes_url)
|
||||
|
||||
|
||||
def _mk_sig(
|
||||
self,
|
||||
data: dict,
|
||||
|
@ -360,7 +374,7 @@ class Client:
|
|||
venue=venue_key,
|
||||
)
|
||||
|
||||
sesh: asks.Session
|
||||
sesh: httpx.AsyncClient
|
||||
path: str
|
||||
|
||||
# Check if we're configured to route order requests to the
|
||||
|
|
|
@ -27,8 +27,8 @@ from typing import (
|
|||
)
|
||||
import time
|
||||
|
||||
import httpx
|
||||
import pendulum
|
||||
import asks
|
||||
import numpy as np
|
||||
import urllib.parse
|
||||
import hashlib
|
||||
|
@ -60,6 +60,11 @@ log = get_logger('piker.brokers.kraken')
|
|||
|
||||
# <uri>/<version>/
|
||||
_url = 'https://api.kraken.com/0'
|
||||
|
||||
_headers: dict[str, str] = {
|
||||
'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
|
||||
}
|
||||
|
||||
# TODO: this is the only backend providing this right?
|
||||
# in which case we should drop it from the defaults and
|
||||
# instead make a custom fields descr in this module!
|
||||
|
@ -135,16 +140,15 @@ class Client:
|
|||
def __init__(
|
||||
self,
|
||||
config: dict[str, str],
|
||||
httpx_client: httpx.AsyncClient,
|
||||
|
||||
name: str = '',
|
||||
api_key: str = '',
|
||||
secret: str = ''
|
||||
) -> None:
|
||||
self._sesh = asks.Session(connections=4)
|
||||
self._sesh.base_location = _url
|
||||
self._sesh.headers.update({
|
||||
'User-Agent':
|
||||
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
|
||||
})
|
||||
|
||||
self._sesh: httpx.AsyncClient = httpx_client
|
||||
|
||||
self._name = name
|
||||
self._api_key = api_key
|
||||
self._secret = secret
|
||||
|
@ -166,10 +170,9 @@ class Client:
|
|||
method: str,
|
||||
data: dict,
|
||||
) -> dict[str, Any]:
|
||||
resp = await self._sesh.post(
|
||||
path=f'/public/{method}',
|
||||
resp: httpx.Response = await self._sesh.post(
|
||||
url=f'/public/{method}',
|
||||
json=data,
|
||||
timeout=float('inf')
|
||||
)
|
||||
return resproc(resp, log)
|
||||
|
||||
|
@ -180,18 +183,18 @@ class Client:
|
|||
uri_path: str
|
||||
) -> dict[str, Any]:
|
||||
headers = {
|
||||
'Content-Type':
|
||||
'application/x-www-form-urlencoded',
|
||||
'API-Key':
|
||||
self._api_key,
|
||||
'API-Sign':
|
||||
get_kraken_signature(uri_path, data, self._secret)
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
'API-Key': self._api_key,
|
||||
'API-Sign': get_kraken_signature(
|
||||
uri_path,
|
||||
data,
|
||||
self._secret,
|
||||
),
|
||||
}
|
||||
resp = await self._sesh.post(
|
||||
path=f'/private/{method}',
|
||||
resp: httpx.Response = await self._sesh.post(
|
||||
url=f'/private/{method}',
|
||||
data=data,
|
||||
headers=headers,
|
||||
timeout=float('inf')
|
||||
)
|
||||
return resproc(resp, log)
|
||||
|
||||
|
@ -665,24 +668,36 @@ class Client:
|
|||
@acm
|
||||
async def get_client() -> Client:
|
||||
|
||||
conf = get_config()
|
||||
if conf:
|
||||
client = Client(
|
||||
conf,
|
||||
conf: dict[str, Any] = get_config()
|
||||
async with httpx.AsyncClient(
|
||||
base_url=_url,
|
||||
headers=_headers,
|
||||
|
||||
# TODO: don't break these up and just do internal
|
||||
# conf lookups instead..
|
||||
name=conf['key_descr'],
|
||||
api_key=conf['api_key'],
|
||||
secret=conf['secret']
|
||||
)
|
||||
else:
|
||||
client = Client({})
|
||||
# TODO: is there a way to numerate this?
|
||||
# https://www.python-httpx.org/advanced/clients/#why-use-a-client
|
||||
# connections=4
|
||||
) as trio_client:
|
||||
if conf:
|
||||
client = Client(
|
||||
conf,
|
||||
httpx_client=trio_client,
|
||||
|
||||
# at startup, load all symbols, and asset info in
|
||||
# batch requests.
|
||||
async with trio.open_nursery() as nurse:
|
||||
nurse.start_soon(client.get_assets)
|
||||
await client.get_mkt_pairs()
|
||||
# TODO: don't break these up and just do internal
|
||||
# conf lookups instead..
|
||||
name=conf['key_descr'],
|
||||
api_key=conf['api_key'],
|
||||
secret=conf['secret']
|
||||
)
|
||||
else:
|
||||
client = Client(
|
||||
conf={},
|
||||
httpx_client=trio_client,
|
||||
)
|
||||
|
||||
yield client
|
||||
# at startup, load all symbols, and asset info in
|
||||
# batch requests.
|
||||
async with trio.open_nursery() as nurse:
|
||||
nurse.start_soon(client.get_assets)
|
||||
await client.get_mkt_pairs()
|
||||
|
||||
yield client
|
||||
|
|
|
@ -612,18 +612,18 @@ async def open_trade_dialog(
|
|||
|
||||
# enter relay loop
|
||||
await handle_order_updates(
|
||||
client,
|
||||
ws,
|
||||
stream,
|
||||
ems_stream,
|
||||
apiflows,
|
||||
ids,
|
||||
reqids2txids,
|
||||
acnt,
|
||||
api_trans,
|
||||
acctid,
|
||||
acc_name,
|
||||
token,
|
||||
client=client,
|
||||
ws=ws,
|
||||
ws_stream=stream,
|
||||
ems_stream=ems_stream,
|
||||
apiflows=apiflows,
|
||||
ids=ids,
|
||||
reqids2txids=reqids2txids,
|
||||
acnt=acnt,
|
||||
ledger=ledger,
|
||||
acctid=acctid,
|
||||
acc_name=acc_name,
|
||||
token=token,
|
||||
)
|
||||
|
||||
|
||||
|
@ -639,7 +639,8 @@ async def handle_order_updates(
|
|||
|
||||
# transaction records which will be updated
|
||||
# on new trade clearing events (aka order "fills")
|
||||
ledger_trans: dict[str, Transaction],
|
||||
ledger: TransactionLedger,
|
||||
# ledger_trans: dict[str, Transaction],
|
||||
acctid: str,
|
||||
acc_name: str,
|
||||
token: str,
|
||||
|
@ -699,7 +700,8 @@ async def handle_order_updates(
|
|||
# if tid not in ledger_trans
|
||||
}
|
||||
for tid, trade in trades.items():
|
||||
assert tid not in ledger_trans
|
||||
# assert tid not in ledger_trans
|
||||
assert tid not in ledger
|
||||
txid = trade['ordertxid']
|
||||
reqid = trade.get('userref')
|
||||
|
||||
|
@ -747,11 +749,17 @@ async def handle_order_updates(
|
|||
client,
|
||||
api_name_set='wsname',
|
||||
)
|
||||
ppmsgs = trades2pps(
|
||||
acnt,
|
||||
acctid,
|
||||
new_trans,
|
||||
ppmsgs: list[BrokerdPosition] = trades2pps(
|
||||
acnt=acnt,
|
||||
ledger=ledger,
|
||||
acctid=acctid,
|
||||
new_trans=new_trans,
|
||||
)
|
||||
# ppmsgs = trades2pps(
|
||||
# acnt,
|
||||
# acctid,
|
||||
# new_trans,
|
||||
# )
|
||||
for pp_msg in ppmsgs:
|
||||
await ems_stream.send(pp_msg)
|
||||
|
||||
|
|
|
@ -16,10 +16,9 @@
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Kucoin broker backend
|
||||
Kucoin cex API backend.
|
||||
|
||||
'''
|
||||
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
aclosing,
|
||||
|
@ -42,7 +41,7 @@ import wsproto
|
|||
from uuid import uuid4
|
||||
|
||||
from trio_typing import TaskStatus
|
||||
import asks
|
||||
import httpx
|
||||
from bidict import bidict
|
||||
import numpy as np
|
||||
import pendulum
|
||||
|
@ -212,8 +211,12 @@ def get_config() -> BrokerConfig | None:
|
|||
|
||||
class Client:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._config: BrokerConfig | None = get_config()
|
||||
def __init__(
|
||||
self,
|
||||
httpx_client: httpx.AsyncClient,
|
||||
) -> None:
|
||||
self._http: httpx.AsyncClient = httpx_client
|
||||
self._config: BrokerConfig|None = get_config()
|
||||
self._pairs: dict[str, KucoinMktPair] = {}
|
||||
self._fqmes2mktids: bidict[str, str] = bidict()
|
||||
self._bars: list[list[float]] = []
|
||||
|
@ -227,18 +230,24 @@ class Client:
|
|||
|
||||
) -> dict[str, str | bytes]:
|
||||
'''
|
||||
Generate authenticated request headers
|
||||
Generate authenticated request headers:
|
||||
|
||||
https://docs.kucoin.com/#authentication
|
||||
https://www.kucoin.com/docs/basic-info/connection-method/authentication/creating-a-request
|
||||
https://www.kucoin.com/docs/basic-info/connection-method/authentication/signing-a-message
|
||||
|
||||
'''
|
||||
|
||||
if not self._config:
|
||||
raise ValueError(
|
||||
'No config found when trying to send authenticated request')
|
||||
'No config found when trying to send authenticated request'
|
||||
)
|
||||
|
||||
str_to_sign = (
|
||||
str(int(time.time() * 1000))
|
||||
+ action + f'/api/{api}/{endpoint.lstrip("/")}'
|
||||
+
|
||||
action
|
||||
+
|
||||
f'/api/{api}/{endpoint.lstrip("/")}'
|
||||
)
|
||||
|
||||
signature = base64.b64encode(
|
||||
|
@ -249,6 +258,7 @@ class Client:
|
|||
).digest()
|
||||
)
|
||||
|
||||
# TODO: can we cache this between calls?
|
||||
passphrase = base64.b64encode(
|
||||
hmac.new(
|
||||
self._config.key_secret.encode('utf-8'),
|
||||
|
@ -270,8 +280,10 @@ class Client:
|
|||
self,
|
||||
action: Literal['POST', 'GET'],
|
||||
endpoint: str,
|
||||
|
||||
api: str = 'v2',
|
||||
headers: dict = {},
|
||||
|
||||
) -> Any:
|
||||
'''
|
||||
Generic request wrapper for Kucoin API
|
||||
|
@ -284,13 +296,17 @@ class Client:
|
|||
api,
|
||||
)
|
||||
|
||||
api_url = f'https://api.kucoin.com/api/{api}/{endpoint}'
|
||||
|
||||
res = await asks.request(action, api_url, headers=headers)
|
||||
|
||||
json = res.json()
|
||||
if 'data' in json:
|
||||
return json['data']
|
||||
req_meth: Callable = getattr(
|
||||
self._http,
|
||||
action.lower(),
|
||||
)
|
||||
res = await req_meth(
|
||||
url=f'/{api}/{endpoint}',
|
||||
headers=headers,
|
||||
)
|
||||
json: dict = res.json()
|
||||
if data := json.get('data'):
|
||||
return data
|
||||
else:
|
||||
log.error(
|
||||
f'Error making request to {api_url} ->\n'
|
||||
|
@ -311,7 +327,7 @@ class Client:
|
|||
'''
|
||||
token_type = 'private' if private else 'public'
|
||||
try:
|
||||
data: dict[str, Any] | None = await self._request(
|
||||
data: dict[str, Any]|None = await self._request(
|
||||
'POST',
|
||||
endpoint=f'bullet-{token_type}',
|
||||
api='v1'
|
||||
|
@ -349,8 +365,8 @@ class Client:
|
|||
currencies: dict[str, Currency] = {}
|
||||
entries: list[dict] = await self._request(
|
||||
'GET',
|
||||
api='v1',
|
||||
endpoint='currencies',
|
||||
api='v1',
|
||||
)
|
||||
for entry in entries:
|
||||
curr = Currency(**entry).copy()
|
||||
|
@ -366,7 +382,10 @@ class Client:
|
|||
dict[str, KucoinMktPair],
|
||||
bidict[str, KucoinMktPair],
|
||||
]:
|
||||
entries = await self._request('GET', 'symbols')
|
||||
entries = await self._request(
|
||||
'GET',
|
||||
endpoint='symbols',
|
||||
)
|
||||
log.info(f' {len(entries)} Kucoin market pairs fetched')
|
||||
|
||||
pairs: dict[str, KucoinMktPair] = {}
|
||||
|
@ -567,13 +586,21 @@ def fqme_to_kucoin_sym(
|
|||
|
||||
@acm
|
||||
async def get_client() -> AsyncGenerator[Client, None]:
|
||||
client = Client()
|
||||
'''
|
||||
Load an API `Client` preconfigured from user settings
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
n.start_soon(client.get_mkt_pairs)
|
||||
await client.get_currencies()
|
||||
'''
|
||||
async with (
|
||||
httpx.AsyncClient(
|
||||
base_url=f'https://api.kucoin.com/api',
|
||||
) as trio_client,
|
||||
):
|
||||
client = Client(httpx_client=trio_client)
|
||||
async with trio.open_nursery() as tn:
|
||||
tn.start_soon(client.get_mkt_pairs)
|
||||
await client.get_currencies()
|
||||
|
||||
yield client
|
||||
yield client
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
|
Loading…
Reference in New Issue