Port `kraken` backend to `httpx`
parent
129cf58d41
commit
95ace5acb8
|
@ -27,8 +27,8 @@ from typing import (
|
|||
)
|
||||
import time
|
||||
|
||||
import httpx
|
||||
import pendulum
|
||||
import asks
|
||||
import numpy as np
|
||||
import urllib.parse
|
||||
import hashlib
|
||||
|
@ -60,6 +60,11 @@ log = get_logger('piker.brokers.kraken')
|
|||
|
||||
# <uri>/<version>/
|
||||
_url = 'https://api.kraken.com/0'
|
||||
|
||||
_headers: dict[str, str] = {
|
||||
'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
|
||||
}
|
||||
|
||||
# TODO: this is the only backend providing this right?
|
||||
# in which case we should drop it from the defaults and
|
||||
# instead make a custom fields descr in this module!
|
||||
|
@ -135,16 +140,15 @@ class Client:
|
|||
def __init__(
|
||||
self,
|
||||
config: dict[str, str],
|
||||
httpx_client: httpx.AsyncClient,
|
||||
|
||||
name: str = '',
|
||||
api_key: str = '',
|
||||
secret: str = ''
|
||||
) -> None:
|
||||
self._sesh = asks.Session(connections=4)
|
||||
self._sesh.base_location = _url
|
||||
self._sesh.headers.update({
|
||||
'User-Agent':
|
||||
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
|
||||
})
|
||||
|
||||
self._sesh: httpx.AsyncClient = httpx_client
|
||||
|
||||
self._name = name
|
||||
self._api_key = api_key
|
||||
self._secret = secret
|
||||
|
@ -167,9 +171,8 @@ class Client:
|
|||
data: dict,
|
||||
) -> dict[str, Any]:
|
||||
resp = await self._sesh.post(
|
||||
path=f'/public/{method}',
|
||||
url=f'/public/{method}',
|
||||
json=data,
|
||||
timeout=float('inf')
|
||||
)
|
||||
return resproc(resp, log)
|
||||
|
||||
|
@ -180,18 +183,18 @@ class Client:
|
|||
uri_path: str
|
||||
) -> dict[str, Any]:
|
||||
headers = {
|
||||
'Content-Type':
|
||||
'application/x-www-form-urlencoded',
|
||||
'API-Key':
|
||||
self._api_key,
|
||||
'API-Sign':
|
||||
get_kraken_signature(uri_path, data, self._secret)
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
'API-Key': self._api_key,
|
||||
'API-Sign': get_kraken_signature(
|
||||
uri_path,
|
||||
data,
|
||||
self._secret,
|
||||
),
|
||||
}
|
||||
resp = await self._sesh.post(
|
||||
path=f'/private/{method}',
|
||||
url=f'/private/{method}',
|
||||
data=data,
|
||||
headers=headers,
|
||||
timeout=float('inf')
|
||||
)
|
||||
return resproc(resp, log)
|
||||
|
||||
|
@ -665,24 +668,36 @@ class Client:
|
|||
@acm
|
||||
async def get_client() -> Client:
|
||||
|
||||
conf = get_config()
|
||||
if conf:
|
||||
client = Client(
|
||||
conf,
|
||||
conf: dict[str, Any] = get_config()
|
||||
async with httpx.AsyncClient(
|
||||
base_url=_url,
|
||||
headers=_headers,
|
||||
|
||||
# TODO: don't break these up and just do internal
|
||||
# conf lookups instead..
|
||||
name=conf['key_descr'],
|
||||
api_key=conf['api_key'],
|
||||
secret=conf['secret']
|
||||
)
|
||||
else:
|
||||
client = Client({})
|
||||
# TODO: is there a way to numerate this?
|
||||
# https://www.python-httpx.org/advanced/clients/#why-use-a-client
|
||||
# connections=4
|
||||
) as trio_client:
|
||||
if conf:
|
||||
client = Client(
|
||||
conf,
|
||||
httpx_client=trio_client,
|
||||
|
||||
# at startup, load all symbols, and asset info in
|
||||
# batch requests.
|
||||
async with trio.open_nursery() as nurse:
|
||||
nurse.start_soon(client.get_assets)
|
||||
await client.get_mkt_pairs()
|
||||
# TODO: don't break these up and just do internal
|
||||
# conf lookups instead..
|
||||
name=conf['key_descr'],
|
||||
api_key=conf['api_key'],
|
||||
secret=conf['secret']
|
||||
)
|
||||
else:
|
||||
client = Client(
|
||||
conf={},
|
||||
httpx_client=trio_client,
|
||||
)
|
||||
|
||||
yield client
|
||||
# at startup, load all symbols, and asset info in
|
||||
# batch requests.
|
||||
async with trio.open_nursery() as nurse:
|
||||
nurse.start_soon(client.get_assets)
|
||||
await client.get_mkt_pairs()
|
||||
|
||||
yield client
|
||||
|
|
|
@ -612,18 +612,18 @@ async def open_trade_dialog(
|
|||
|
||||
# enter relay loop
|
||||
await handle_order_updates(
|
||||
client,
|
||||
ws,
|
||||
stream,
|
||||
ems_stream,
|
||||
apiflows,
|
||||
ids,
|
||||
reqids2txids,
|
||||
acnt,
|
||||
api_trans,
|
||||
acctid,
|
||||
acc_name,
|
||||
token,
|
||||
client=client,
|
||||
ws=ws,
|
||||
ws_stream=stream,
|
||||
ems_stream=ems_stream,
|
||||
apiflows=apiflows,
|
||||
ids=ids,
|
||||
reqids2txids=reqids2txids,
|
||||
acnt=acnt,
|
||||
ledger=ledger,
|
||||
acctid=acctid,
|
||||
acc_name=acc_name,
|
||||
token=token,
|
||||
)
|
||||
|
||||
|
||||
|
@ -639,7 +639,8 @@ async def handle_order_updates(
|
|||
|
||||
# transaction records which will be updated
|
||||
# on new trade clearing events (aka order "fills")
|
||||
ledger_trans: dict[str, Transaction],
|
||||
ledger: TransactionLedger,
|
||||
# ledger_trans: dict[str, Transaction],
|
||||
acctid: str,
|
||||
acc_name: str,
|
||||
token: str,
|
||||
|
@ -699,7 +700,8 @@ async def handle_order_updates(
|
|||
# if tid not in ledger_trans
|
||||
}
|
||||
for tid, trade in trades.items():
|
||||
assert tid not in ledger_trans
|
||||
# assert tid not in ledger_trans
|
||||
assert tid not in ledger
|
||||
txid = trade['ordertxid']
|
||||
reqid = trade.get('userref')
|
||||
|
||||
|
@ -747,11 +749,17 @@ async def handle_order_updates(
|
|||
client,
|
||||
api_name_set='wsname',
|
||||
)
|
||||
ppmsgs = trades2pps(
|
||||
acnt,
|
||||
acctid,
|
||||
new_trans,
|
||||
ppmsgs: list[BrokerdPosition] = trades2pps(
|
||||
acnt=acnt,
|
||||
ledger=ledger,
|
||||
acctid=acctid,
|
||||
new_trans=new_trans,
|
||||
)
|
||||
# ppmsgs = trades2pps(
|
||||
# acnt,
|
||||
# acctid,
|
||||
# new_trans,
|
||||
# )
|
||||
for pp_msg in ppmsgs:
|
||||
await ems_stream.send(pp_msg)
|
||||
|
||||
|
|
Loading…
Reference in New Issue