From 95ace5acb8c4d26f8f259cf9d83b4b8b1778e81b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 May 2024 11:09:10 -0400 Subject: [PATCH] Port `kraken` backend to `httpx` --- piker/brokers/kraken/api.py | 85 ++++++++++++++++++++-------------- piker/brokers/kraken/broker.py | 44 +++++++++++------- 2 files changed, 76 insertions(+), 53 deletions(-) diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 6414de8e..df2ebd6a 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -27,8 +27,8 @@ from typing import ( ) import time +import httpx import pendulum -import asks import numpy as np import urllib.parse import hashlib @@ -60,6 +60,11 @@ log = get_logger('piker.brokers.kraken') # // _url = 'https://api.kraken.com/0' + +_headers: dict[str, str] = { + 'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' +} + # TODO: this is the only backend providing this right? # in which case we should drop it from the defaults and # instead make a custom fields descr in this module! @@ -135,16 +140,15 @@ class Client: def __init__( self, config: dict[str, str], + httpx_client: httpx.AsyncClient, + name: str = '', api_key: str = '', secret: str = '' ) -> None: - self._sesh = asks.Session(connections=4) - self._sesh.base_location = _url - self._sesh.headers.update({ - 'User-Agent': - 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' - }) + + self._sesh: httpx.AsyncClient = httpx_client + self._name = name self._api_key = api_key self._secret = secret @@ -167,9 +171,8 @@ class Client: data: dict, ) -> dict[str, Any]: resp = await self._sesh.post( - path=f'/public/{method}', + url=f'/public/{method}', json=data, - timeout=float('inf') ) return resproc(resp, log) @@ -180,18 +183,18 @@ class Client: uri_path: str ) -> dict[str, Any]: headers = { - 'Content-Type': - 'application/x-www-form-urlencoded', - 'API-Key': - self._api_key, - 'API-Sign': - get_kraken_signature(uri_path, data, self._secret) + 'Content-Type': 'application/x-www-form-urlencoded', + 'API-Key': self._api_key, + 'API-Sign': get_kraken_signature( + uri_path, + data, + self._secret, + ), } resp = await self._sesh.post( - path=f'/private/{method}', + url=f'/private/{method}', data=data, headers=headers, - timeout=float('inf') ) return resproc(resp, log) @@ -665,24 +668,36 @@ class Client: @acm async def get_client() -> Client: - conf = get_config() - if conf: - client = Client( - conf, + conf: dict[str, Any] = get_config() + async with httpx.AsyncClient( + base_url=_url, + headers=_headers, - # TODO: don't break these up and just do internal - # conf lookups instead.. - name=conf['key_descr'], - api_key=conf['api_key'], - secret=conf['secret'] - ) - else: - client = Client({}) + # TODO: is there a way to numerate this? + # https://www.python-httpx.org/advanced/clients/#why-use-a-client + # connections=4 + ) as trio_client: + if conf: + client = Client( + conf, + httpx_client=trio_client, - # at startup, load all symbols, and asset info in - # batch requests. - async with trio.open_nursery() as nurse: - nurse.start_soon(client.get_assets) - await client.get_mkt_pairs() + # TODO: don't break these up and just do internal + # conf lookups instead.. + name=conf['key_descr'], + api_key=conf['api_key'], + secret=conf['secret'] + ) + else: + client = Client( + conf={}, + httpx_client=trio_client, + ) - yield client + # at startup, load all symbols, and asset info in + # batch requests. + async with trio.open_nursery() as nurse: + nurse.start_soon(client.get_assets) + await client.get_mkt_pairs() + + yield client diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 53168c03..eb5963cd 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -612,18 +612,18 @@ async def open_trade_dialog( # enter relay loop await handle_order_updates( - client, - ws, - stream, - ems_stream, - apiflows, - ids, - reqids2txids, - acnt, - api_trans, - acctid, - acc_name, - token, + client=client, + ws=ws, + ws_stream=stream, + ems_stream=ems_stream, + apiflows=apiflows, + ids=ids, + reqids2txids=reqids2txids, + acnt=acnt, + ledger=ledger, + acctid=acctid, + acc_name=acc_name, + token=token, ) @@ -639,7 +639,8 @@ async def handle_order_updates( # transaction records which will be updated # on new trade clearing events (aka order "fills") - ledger_trans: dict[str, Transaction], + ledger: TransactionLedger, + # ledger_trans: dict[str, Transaction], acctid: str, acc_name: str, token: str, @@ -699,7 +700,8 @@ async def handle_order_updates( # if tid not in ledger_trans } for tid, trade in trades.items(): - assert tid not in ledger_trans + # assert tid not in ledger_trans + assert tid not in ledger txid = trade['ordertxid'] reqid = trade.get('userref') @@ -747,11 +749,17 @@ async def handle_order_updates( client, api_name_set='wsname', ) - ppmsgs = trades2pps( - acnt, - acctid, - new_trans, + ppmsgs: list[BrokerdPosition] = trades2pps( + acnt=acnt, + ledger=ledger, + acctid=acctid, + new_trans=new_trans, ) + # ppmsgs = trades2pps( + # acnt, + # acctid, + # new_trans, + # ) for pp_msg in ppmsgs: await ems_stream.send(pp_msg)