go_httpx #2

Open
goodboy wants to merge 13 commits from go_httpx into pyqt6
15 changed files with 594 additions and 307 deletions

View File

@ -50,7 +50,7 @@ __brokers__: list[str] = [
'binance', 'binance',
'ib', 'ib',
'kraken', 'kraken',
'kucoin' 'kucoin',
# broken but used to work # broken but used to work
# 'questrade', # 'questrade',
@ -71,7 +71,7 @@ def get_brokermod(brokername: str) -> ModuleType:
Return the imported broker module by name. Return the imported broker module by name.
''' '''
module = import_module('.' + brokername, 'piker.brokers') module: ModuleType = import_module('.' + brokername, 'piker.brokers')
# we only allow monkeying because it's for internal keying # we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1] module.name = module.__name__.split('.')[-1]
return module return module

View File

@ -18,10 +18,11 @@
Handy cross-broker utils. Handy cross-broker utils.
""" """
from __future__ import annotations
from functools import partial from functools import partial
import json import json
import asks import httpx
import logging import logging
from ..log import ( from ..log import (
@ -60,11 +61,11 @@ class NoData(BrokerError):
def __init__( def __init__(
self, self,
*args, *args,
info: dict, info: dict|None = None,
) -> None: ) -> None:
super().__init__(*args) super().__init__(*args)
self.info: dict = info self.info: dict|None = info
# when raised, machinery can check if the backend # when raised, machinery can check if the backend
# set a "frame size" for doing datetime calcs. # set a "frame size" for doing datetime calcs.
@ -90,16 +91,18 @@ class DataThrottle(BrokerError):
def resproc( def resproc(
resp: asks.response_objects.Response, resp: httpx.Response,
log: logging.Logger, log: logging.Logger,
return_json: bool = True, return_json: bool = True,
log_resp: bool = False, log_resp: bool = False,
) -> asks.response_objects.Response: ) -> httpx.Response:
"""Process response and return its json content. '''
Process response and return its json content.
Raise the appropriate error on non-200 OK responses. Raise the appropriate error on non-200 OK responses.
"""
'''
if not resp.status_code == 200: if not resp.status_code == 200:
raise BrokerError(resp.body) raise BrokerError(resp.body)
try: try:

View File

@ -1,8 +1,8 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) # Copyright (C)
# Guillermo Rodriguez (aka ze jefe) # Guillermo Rodriguez (aka ze jefe)
# Tyler Goodlet # Tyler Goodlet
# (in stewardship for pikers) # (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -25,14 +25,13 @@ from __future__ import annotations
from collections import ChainMap from collections import ChainMap
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
AsyncExitStack,
) )
from datetime import datetime from datetime import datetime
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable, Callable,
Hashable,
Sequence,
Type, Type,
) )
import hmac import hmac
@ -43,8 +42,7 @@ import trio
from pendulum import ( from pendulum import (
now, now,
) )
import asks import httpx
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
from piker import config from piker import config
@ -54,6 +52,7 @@ from piker.clearing._messages import (
from piker.accounting import ( from piker.accounting import (
Asset, Asset,
digits_to_dec, digits_to_dec,
MktPair,
) )
from piker.types import Struct from piker.types import Struct
from piker.data import ( from piker.data import (
@ -69,7 +68,6 @@ from .venues import (
PAIRTYPES, PAIRTYPES,
Pair, Pair,
MarketType, MarketType,
_spot_url, _spot_url,
_futes_url, _futes_url,
_testnet_futes_url, _testnet_futes_url,
@ -79,19 +77,18 @@ from .venues import (
log = get_logger('piker.brokers.binance') log = get_logger('piker.brokers.binance')
def get_config() -> dict: def get_config() -> dict[str, Any]:
conf: dict conf: dict
path: Path path: Path
conf, path = config.load( conf, path = config.load(
conf_name='brokers', conf_name='brokers',
touch_if_dne=True, touch_if_dne=True,
) )
section: dict = conf.get('binance')
section = conf.get('binance')
if not section: if not section:
log.warning(f'No config section found for binance in {path}') log.warning(
f'No config section found for binance in {path}'
)
return {} return {}
return section return section
@ -147,7 +144,7 @@ def binance_timestamp(
class Client: class Client:
''' '''
Async ReST API client using ``trio`` + ``asks`` B) Async ReST API client using `trio` + `httpx` B)
Supports all of the spot, margin and futures endpoints depending Supports all of the spot, margin and futures endpoints depending
on method. on method.
@ -156,10 +153,17 @@ class Client:
def __init__( def __init__(
self, self,
venue_sessions: dict[
str, # venue key
tuple[httpx.AsyncClient, str] # session, eps path
],
conf: dict[str, Any],
# TODO: change this to `Client.[mkt_]venue: MarketType`? # TODO: change this to `Client.[mkt_]venue: MarketType`?
mkt_mode: MarketType = 'spot', mkt_mode: MarketType = 'spot',
) -> None: ) -> None:
self.conf = conf
# build out pair info tables for each market type # build out pair info tables for each market type
# and wrap in a chain-map view for search / query. # and wrap in a chain-map view for search / query.
self._spot_pairs: dict[str, Pair] = {} # spot info table self._spot_pairs: dict[str, Pair] = {} # spot info table
@ -186,44 +190,13 @@ class Client:
# market symbols for use by search. See `.exch_info()`. # market symbols for use by search. See `.exch_info()`.
self._pairs: ChainMap[str, Pair] = ChainMap() self._pairs: ChainMap[str, Pair] = ChainMap()
# spot EPs sesh
self._sesh = asks.Session(connections=4)
self._sesh.base_location: str = _spot_url
# spot testnet
self._test_sesh: asks.Session = asks.Session(connections=4)
self._test_sesh.base_location: str = _testnet_spot_url
# margin and extended spot endpoints session.
self._sapi_sesh = asks.Session(connections=4)
self._sapi_sesh.base_location: str = _spot_url
# futes EPs sesh
self._fapi_sesh = asks.Session(connections=4)
self._fapi_sesh.base_location: str = _futes_url
# futes testnet
self._test_fapi_sesh: asks.Session = asks.Session(connections=4)
self._test_fapi_sesh.base_location: str = _testnet_futes_url
# global client "venue selection" mode. # global client "venue selection" mode.
# set this when you want to switch venues and not have to # set this when you want to switch venues and not have to
# specify the venue for the next request. # specify the venue for the next request.
self.mkt_mode: MarketType = mkt_mode self.mkt_mode: MarketType = mkt_mode
# per 8 # per-mkt-venue API client table
self.venue_sesh: dict[ self.venue_sesh = venue_sessions
str, # venue key
tuple[asks.Session, str] # session, eps path
] = {
'spot': (self._sesh, '/api/v3/'),
'spot_testnet': (self._test_sesh, '/fapi/v1/'),
'margin': (self._sapi_sesh, '/sapi/v1/'),
'usdtm_futes': (self._fapi_sesh, '/fapi/v1/'),
'usdtm_futes_testnet': (self._test_fapi_sesh, '/fapi/v1/'),
# 'futes_coin': self._dapi, # TODO
}
# lookup for going from `.mkt_mode: str` to the config # lookup for going from `.mkt_mode: str` to the config
# subsection `key: str` # subsection `key: str`
@ -238,40 +211,6 @@ class Client:
'futes': ['usdtm_futes'], 'futes': ['usdtm_futes'],
} }
# for creating API keys see,
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
self.conf: dict = get_config()
for key, subconf in self.conf.items():
if api_key := subconf.get('api_key', ''):
venue_keys: list[str] = self.confkey2venuekeys[key]
venue_key: str
sesh: asks.Session
for venue_key in venue_keys:
sesh, _ = self.venue_sesh[venue_key]
api_key_header: dict = {
# taken from official:
# https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47
"Content-Type": "application/json;charset=utf-8",
# TODO: prolly should just always query and copy
# in the real latest ver?
"User-Agent": "binance-connector/6.1.6smbz6",
"X-MBX-APIKEY": api_key,
}
sesh.headers.update(api_key_header)
# if `.use_tesnet = true` in the config then
# also add headers for the testnet session which
# will be used for all order control
if subconf.get('use_testnet', False):
testnet_sesh, _ = self.venue_sesh[
venue_key + '_testnet'
]
testnet_sesh.headers.update(api_key_header)
def _mk_sig( def _mk_sig(
self, self,
data: dict, data: dict,
@ -290,7 +229,6 @@ class Client:
'to define the creds for auth-ed endpoints!?' 'to define the creds for auth-ed endpoints!?'
) )
# XXX: Info on security and authentification # XXX: Info on security and authentification
# https://binance-docs.github.io/apidocs/#endpoint-security-type # https://binance-docs.github.io/apidocs/#endpoint-security-type
if not (api_secret := subconf.get('api_secret')): if not (api_secret := subconf.get('api_secret')):
@ -319,7 +257,7 @@ class Client:
params: dict, params: dict,
method: str = 'get', method: str = 'get',
venue: str | None = None, # if None use `.mkt_mode` state venue: str|None = None, # if None use `.mkt_mode` state
signed: bool = False, signed: bool = False,
allow_testnet: bool = False, allow_testnet: bool = False,
@ -330,8 +268,9 @@ class Client:
- /fapi/v3/ USD-M FUTURES, or - /fapi/v3/ USD-M FUTURES, or
- /api/v3/ SPOT/MARGIN - /api/v3/ SPOT/MARGIN
account/market endpoint request depending on either passed in `venue: str` account/market endpoint request depending on either passed in
or the current setting `.mkt_mode: str` setting, default `'spot'`. `venue: str` or the current setting `.mkt_mode: str` setting,
default `'spot'`.
Docs per venue API: Docs per venue API:
@ -360,9 +299,6 @@ class Client:
venue=venue_key, venue=venue_key,
) )
sesh: asks.Session
path: str
# Check if we're configured to route order requests to the # Check if we're configured to route order requests to the
# venue equivalent's testnet. # venue equivalent's testnet.
use_testnet: bool = False use_testnet: bool = False
@ -387,11 +323,12 @@ class Client:
# ctl machinery B) # ctl machinery B)
venue_key += '_testnet' venue_key += '_testnet'
sesh, path = self.venue_sesh[venue_key] client: httpx.AsyncClient
path: str
meth: Callable = getattr(sesh, method) client, path = self.venue_sesh[venue_key]
meth: Callable = getattr(client, method)
resp = await meth( resp = await meth(
path=path + endpoint, url=path + endpoint,
params=params, params=params,
timeout=float('inf'), timeout=float('inf'),
) )
@ -433,7 +370,15 @@ class Client:
item['filters'] = filters item['filters'] = filters
pair_type: Type = PAIRTYPES[venue] pair_type: Type = PAIRTYPES[venue]
pair: Pair = pair_type(**item) try:
pair: Pair = pair_type(**item)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
)
raise
pair_table[pair.symbol.upper()] = pair pair_table[pair.symbol.upper()] = pair
# update an additional top-level-cross-venue-table # update an additional top-level-cross-venue-table
@ -528,7 +473,9 @@ class Client:
''' '''
pair_table: dict[str, Pair] = self._venue2pairs[ pair_table: dict[str, Pair] = self._venue2pairs[
venue or self.mkt_mode venue
or
self.mkt_mode
] ]
if ( if (
expiry expiry
@ -547,9 +494,9 @@ class Client:
venues: list[str] = [venue] venues: list[str] = [venue]
# batch per-venue download of all exchange infos # batch per-venue download of all exchange infos
async with trio.open_nursery() as rn: async with trio.open_nursery() as tn:
for ven in venues: for ven in venues:
rn.start_soon( tn.start_soon(
self._cache_pairs, self._cache_pairs,
ven, ven,
) )
@ -602,11 +549,11 @@ class Client:
) -> dict[str, Any]: ) -> dict[str, Any]:
fq_pairs: dict = await self.exch_info() fq_pairs: dict[str, Pair] = await self.exch_info()
# TODO: cache this list like we were in # TODO: cache this list like we were in
# `open_symbol_search()`? # `open_symbol_search()`?
keys: list[str] = list(fq_pairs) # keys: list[str] = list(fq_pairs)
return match_from_pairs( return match_from_pairs(
pairs=fq_pairs, pairs=fq_pairs,
@ -614,9 +561,19 @@ class Client:
score_cutoff=50, score_cutoff=50,
) )
def pair2venuekey(
self,
pair: Pair,
) -> str:
return {
'USDTM': 'usdtm_futes',
# 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..?
}[pair.venue]
async def bars( async def bars(
self, self,
symbol: str, mkt: MktPair,
start_dt: datetime | None = None, start_dt: datetime | None = None,
end_dt: datetime | None = None, end_dt: datetime | None = None,
@ -646,16 +603,20 @@ class Client:
start_time = binance_timestamp(start_dt) start_time = binance_timestamp(start_dt)
end_time = binance_timestamp(end_dt) end_time = binance_timestamp(end_dt)
bs_pair: Pair = self._pairs[mkt.bs_fqme.upper()]
# https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data
bars = await self._api( bars = await self._api(
'klines', 'klines',
params={ params={
'symbol': symbol.upper(), # NOTE: always query using their native symbology!
'symbol': mkt.bs_mktid.upper(),
'interval': '1m', 'interval': '1m',
'startTime': start_time, 'startTime': start_time,
'endTime': end_time, 'endTime': end_time,
'limit': limit 'limit': limit
}, },
venue=self.pair2venuekey(bs_pair),
allow_testnet=False, allow_testnet=False,
) )
new_bars: list[tuple] = [] new_bars: list[tuple] = []
@ -972,17 +933,148 @@ class Client:
await self.close_listen_key(key) await self.close_listen_key(key)
_venue_urls: dict[str, str] = {
'spot': (
_spot_url,
'/api/v3/',
),
'spot_testnet': (
_testnet_spot_url,
'/fapi/v1/'
),
# margin and extended spot endpoints session.
# TODO: did this ever get implemented fully?
# 'margin': (
# _spot_url,
# '/sapi/v1/'
# ),
'usdtm_futes': (
_futes_url,
'/fapi/v1/',
),
'usdtm_futes_testnet': (
_testnet_futes_url,
'/fapi/v1/',
),
# TODO: for anyone who actually needs it ;P
# 'coin_futes': ()
}
def init_api_keys(
client: Client,
conf: dict[str, Any],
) -> None:
'''
Set up per-venue API keys each http client according to the user's
`brokers.conf`.
For ex, to use spot-testnet and live usdt futures APIs:
```toml
[binance]
# spot test net
spot.use_testnet = true
spot.api_key = '<spot_api_key_from_binance_account>'
spot.api_secret = '<spot_api_key_password>'
# futes live
futes.use_testnet = false
accounts.usdtm = 'futes'
futes.api_key = '<futes_api_key_from_binance>'
futes.api_secret = '<futes_api_key_password>''
# if uncommented will use the built-in paper engine and not
# connect to `binance` API servers for order ctl.
# accounts.paper = 'paper'
```
'''
for key, subconf in conf.items():
if api_key := subconf.get('api_key', ''):
venue_keys: list[str] = client.confkey2venuekeys[key]
venue_key: str
client: httpx.AsyncClient
for venue_key in venue_keys:
client, _ = client.venue_sesh[venue_key]
api_key_header: dict = {
# taken from official:
# https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47
"Content-Type": "application/json;charset=utf-8",
# TODO: prolly should just always query and copy
# in the real latest ver?
"User-Agent": "binance-connector/6.1.6smbz6",
"X-MBX-APIKEY": api_key,
}
client.headers.update(api_key_header)
# if `.use_tesnet = true` in the config then
# also add headers for the testnet session which
# will be used for all order control
if subconf.get('use_testnet', False):
testnet_sesh, _ = client.venue_sesh[
venue_key + '_testnet'
]
testnet_sesh.headers.update(api_key_header)
@acm @acm
async def get_client() -> Client: async def get_client(
mkt_mode: MarketType = 'spot',
) -> Client:
'''
Construct an single `piker` client which composes multiple underlying venue
specific API clients both for live and test networks.
client = Client() '''
await client.exch_info() venue_sessions: dict[
log.info( str, # venue key
f'{client} in {client.mkt_mode} mode: caching exchange infos..\n' tuple[httpx.AsyncClient, str] # session, eps path
'Cached multi-market pairs:\n' ] = {}
f'spot: {len(client._spot_pairs)}\n' async with AsyncExitStack() as client_stack:
f'usdtm_futes: {len(client._ufutes_pairs)}\n' for name, (base_url, path) in _venue_urls.items():
f'Total: {len(client._pairs)}\n' api: httpx.AsyncClient = await client_stack.enter_async_context(
) httpx.AsyncClient(
base_url=base_url,
# headers={},
yield client # TODO: is there a way to numerate this?
# https://www.python-httpx.org/advanced/clients/#why-use-a-client
# connections=4
)
)
venue_sessions[name] = (
api,
path,
)
conf: dict[str, Any] = get_config()
# for creating API keys see,
# https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072
client = Client(
venue_sessions=venue_sessions,
conf=conf,
mkt_mode=mkt_mode,
)
init_api_keys(
client=client,
conf=conf,
)
fq_pairs: dict[str, Pair] = await client.exch_info()
assert fq_pairs
log.info(
f'Loaded multi-venue `Client` in mkt_mode={client.mkt_mode!r}\n\n'
f'Symbology Summary:\n'
f'------ - ------\n'
f'spot: {len(client._spot_pairs)}\n'
f'usdtm_futes: {len(client._ufutes_pairs)}\n'
'------ - ------\n'
f'total: {len(client._pairs)}\n'
)
yield client

View File

@ -264,15 +264,20 @@ async def open_trade_dialog(
# do a open_symcache() call.. though maybe we can hide # do a open_symcache() call.. though maybe we can hide
# this in a new async version of open_account()? # this in a new async version of open_account()?
async with open_cached_client('binance') as client: async with open_cached_client('binance') as client:
subconf: dict = client.conf[venue_name] subconf: dict|None = client.conf.get(venue_name)
use_testnet = subconf.get('use_testnet', False)
# XXX: if no futes.api_key or spot.api_key has been set we # XXX: if no futes.api_key or spot.api_key has been set we
# always fall back to the paper engine! # always fall back to the paper engine!
if not subconf.get('api_key'): if (
not subconf
or
not subconf.get('api_key')
):
await ctx.started('paper') await ctx.started('paper')
return return
use_testnet: bool = subconf.get('use_testnet', False)
async with ( async with (
open_cached_client('binance') as client, open_cached_client('binance') as client,
): ):

View File

@ -48,6 +48,7 @@ import tractor
from piker.brokers import ( from piker.brokers import (
open_cached_client, open_cached_client,
NoData,
) )
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
@ -252,24 +253,30 @@ async def open_history_client(
else: else:
client.mkt_mode = 'spot' client.mkt_mode = 'spot'
# NOTE: always query using their native symbology! array: np.ndarray = await client.bars(
mktid: str = mkt.bs_mktid mkt=mkt,
array = await client.bars(
mktid,
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
if array.size == 0:
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
times = array['time'] times = array['time']
if ( if not times.any():
end_dt is None raise ValueError(
): 'Bad frame with null-times?\n\n'
inow = round(time.time()) f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60: if (inow - times[-1]) > 60:
await tractor.pause() await tractor.pause()
start_dt = from_timestamp(times[0]) start_dt = from_timestamp(times[0])
end_dt = from_timestamp(times[-1]) end_dt = from_timestamp(times[-1])
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield get_ohlc, {'erlangs': 3, 'rate': 3}

View File

@ -137,10 +137,12 @@ class SpotPair(Pair, frozen=True):
quoteOrderQtyMarketAllowed: bool quoteOrderQtyMarketAllowed: bool
isSpotTradingAllowed: bool isSpotTradingAllowed: bool
isMarginTradingAllowed: bool isMarginTradingAllowed: bool
otoAllowed: bool
defaultSelfTradePreventionMode: str defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str] allowedSelfTradePreventionModes: list[str]
permissions: list[str] permissions: list[str]
permissionSets: list[list[str]]
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair' ns_path: str = 'piker.brokers.binance:SpotPair'

View File

@ -100,7 +100,7 @@ async def data_reset_hack(
log.warning( log.warning(
no_setup_msg no_setup_msg
+ +
f'REQUIRES A `vnc_addrs: array` ENTRY' 'REQUIRES A `vnc_addrs: array` ENTRY'
) )
vnc_host, vnc_port = vnc_sockaddr.get( vnc_host, vnc_port = vnc_sockaddr.get(
@ -259,7 +259,7 @@ def i3ipc_xdotool_manual_click_hack() -> None:
timeout=timeout, timeout=timeout,
) )
# re-activate and focus original window # re-activate and focus original window
subprocess.call([ subprocess.call([
'xdotool', 'xdotool',
'windowactivate', '--sync', str(orig_win_id), 'windowactivate', '--sync', str(orig_win_id),

View File

@ -287,9 +287,31 @@ class Client:
self.conf = config self.conf = config
# NOTE: the ib.client here is "throttled" to 45 rps by default # NOTE: the ib.client here is "throttled" to 45 rps by default
self.ib = ib self.ib: IB = ib
self.ib.RaiseRequestErrors: bool = True self.ib.RaiseRequestErrors: bool = True
# self._acnt_names: set[str] = {}
self._acnt_names: list[str] = []
@property
def acnts(self) -> list[str]:
# return list(self._acnt_names)
return self._acnt_names
def __repr__(self) -> str:
return (
f'<{type(self).__name__}('
f'ib={self.ib} '
f'acnts={self.acnts}'
# TODO: we need to mask out acnt-#s and other private
# infos if we're going to console this!
# f' |_.conf:\n'
# f' {pformat(self.conf)}\n'
')>'
)
async def get_fills(self) -> list[Fill]: async def get_fills(self) -> list[Fill]:
''' '''
Return list of rents `Fills` from trading session. Return list of rents `Fills` from trading session.
@ -376,55 +398,63 @@ class Client:
# whatToShow='MIDPOINT', # whatToShow='MIDPOINT',
# whatToShow='TRADES', # whatToShow='TRADES',
) )
log.info(
f'REQUESTING {ib_duration_str} worth {bar_size} BARS\n'
f'fqme: {fqme}\n'
f'global _enters: {_enters}\n'
f'kwargs: {pformat(kwargs)}\n'
)
bars = await self.ib.reqHistoricalDataAsync( bars = await self.ib.reqHistoricalDataAsync(
**kwargs, **kwargs,
) )
query_info: str = (
f'REQUESTING IB history BARS\n'
f' ------ - ------\n'
f'dt_duration: {dt_duration}\n'
f'ib_duration_str: {ib_duration_str}\n'
f'bar_size: {bar_size}\n'
f'fqme: {fqme}\n'
f'actor-global _enters: {_enters}\n'
f'kwargs: {pformat(kwargs)}\n'
)
# tail case if no history for range or none prior. # tail case if no history for range or none prior.
# NOTE: there's actually 3 cases here to handle (and
# this should be read alongside the implementation of
# `.reqHistoricalDataAsync()`):
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
# - no data exists for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - LITERALLY this is the start of the mkt's history!
if not bars: if not bars:
# NOTE: there's actually 3 cases here to handle (and # TODO: figure out wut's going on here.
# this should be read alongside the implementation of
# `.reqHistoricalDataAsync()`):
# - a timeout occurred in which case insync internals return
# an empty list thing with bars.clear()...
# - no data exists for the period likely due to
# a weekend, holiday or other non-trading period prior to
# ``end_dt`` which exceeds the ``duration``,
# - LITERALLY this is the start of the mkt's history!
# TODO: is this handy, a sync requester for tinkering
# with empty frame cases?
# def get_hist():
# return self.ib.reqHistoricalData(**kwargs)
# import pdbp
# pdbp.set_trace()
# sync requester for debugging empty frame cases log.critical(
def get_hist(): 'STUPID IB SAYS NO HISTORY\n\n'
return self.ib.reqHistoricalData(**kwargs) + query_info
)
assert get_hist
import pdbp
pdbp.set_trace()
return [], np.empty(0), dt_duration
# TODO: we could maybe raise ``NoData`` instead if we # TODO: we could maybe raise ``NoData`` instead if we
# rewrite the method in the first case? right now there's no # rewrite the method in the first case?
# way to detect a timeout. # right now there's no way to detect a timeout..
return [], np.empty(0), dt_duration
# NOTE XXX: ensure minimum duration in bars B) log.info(query_info)
# => we recursively call this method until we get at least # NOTE XXX: ensure minimum duration in bars?
# as many bars such that they sum in aggregate to the the # => recursively call this method until we get at least as
# desired total time (duration) at most. # many bars such that they sum in aggregate to the the
# XXX XXX XXX # desired total time (duration) at most.
# WHY DID WE EVEN NEED THIS ORIGINALLY!? # - if you query over a gap and get no data
# XXX XXX XXX # that may short circuit the history
# - if you query over a gap and get no data
# that may short circuit the history
if ( if (
end_dt # XXX XXX XXX
and False # => WHY DID WE EVEN NEED THIS ORIGINALLY!? <=
# XXX XXX XXX
False
and end_dt
): ):
nparr: np.ndarray = bars_to_np(bars) nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time'] times: np.ndarray = nparr['time']
@ -927,7 +957,10 @@ class Client:
warnset = True warnset = True
else: else:
log.info(f'Got first quote for {contract}') log.info(
'Got first quote for contract\n'
f'{contract}\n'
)
break break
else: else:
if timeouterr and raise_on_timeout: if timeouterr and raise_on_timeout:
@ -991,8 +1024,12 @@ class Client:
outsideRth=True, outsideRth=True,
optOutSmartRouting=True, optOutSmartRouting=True,
# TODO: need to understand this setting better as
# it pertains to shit ass mms..
routeMarketableToBbo=True, routeMarketableToBbo=True,
designatedLocation='SMART', designatedLocation='SMART',
# TODO: make all orders GTC? # TODO: make all orders GTC?
# https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba # https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba
# goodTillDate=f"yyyyMMdd-HH:mm:ss", # goodTillDate=f"yyyyMMdd-HH:mm:ss",
@ -1120,8 +1157,8 @@ def get_config() -> dict[str, Any]:
names = list(accounts.keys()) names = list(accounts.keys())
accts = section['accounts'] = bidict(accounts) accts = section['accounts'] = bidict(accounts)
log.info( log.info(
f'brokers.toml defines {len(accts)} accounts: ' f'{path} defines {len(accts)} account aliases:\n'
f'{pformat(names)}' f'{pformat(names)}\n'
) )
if section is None: if section is None:
@ -1188,7 +1225,7 @@ async def load_aio_clients(
try_ports = list(try_ports.values()) try_ports = list(try_ports.values())
_err = None _err = None
accounts_def = config.load_accounts(['ib']) accounts_def: dict[str, str] = config.load_accounts(['ib'])
ports = try_ports if port is None else [port] ports = try_ports if port is None else [port]
combos = list(itertools.product(hosts, ports)) combos = list(itertools.product(hosts, ports))
accounts_found: dict[str, Client] = {} accounts_found: dict[str, Client] = {}
@ -1227,7 +1264,9 @@ async def load_aio_clients(
client = Client(ib=ib, config=conf) client = Client(ib=ib, config=conf)
# update all actor-global caches # update all actor-global caches
log.info(f"Caching client for {sockaddr}") log.runtime(
f'Connected and caching `Client` @ {sockaddr!r}'
)
_client_cache[sockaddr] = client _client_cache[sockaddr] = client
break break
@ -1242,37 +1281,59 @@ async def load_aio_clients(
OSError, OSError,
) as ce: ) as ce:
_err = ce _err = ce
log.warning( message: str = (
f'Failed to connect on {host}:{port} for {i} time with,\n' f'Failed to connect on {host}:{port} after {i} tries with\n'
f'{ib.client.apiError.value()}\n' f'{ib.client.apiError.value()!r}\n\n'
'retrying with a new client id..') 'Retrying with a new client id..\n'
)
log.runtime(message)
else:
# XXX report loudly if we never established after all
# re-tries
log.warning(message)
# Pre-collect all accounts available for this # Pre-collect all accounts available for this
# connection and map account names to this client # connection and map account names to this client
# instance. # instance.
for value in ib.accountValues(): for value in ib.accountValues():
acct_number = value.account acct_number: str = value.account
entry = accounts_def.inverse.get(acct_number) acnt_alias: str = accounts_def.inverse.get(acct_number)
if not entry: if not acnt_alias:
# TODO: should we constuct the below reco-ex from
# the existing config content?
_, path = config.load(
conf_name='brokers',
)
raise ValueError( raise ValueError(
'No section in brokers.toml for account:' 'No alias in account section for account!\n'
f' {acct_number}\n' f'Please add an acnt alias entry to your {path}\n'
f'Please add entry to continue using this API client' 'For example,\n\n'
'[ib.accounts]\n'
'margin = {accnt_number!r}\n'
'^^^^^^ <- you need this part!\n\n'
'This ensures `piker` will not leak private acnt info '
'to console output by default!\n'
) )
# surjection of account names to operating clients. # surjection of account names to operating clients.
if acct_number not in accounts_found: if acnt_alias not in accounts_found:
accounts_found[entry] = client accounts_found[acnt_alias] = client
# client._acnt_names.add(acnt_alias)
client._acnt_names.append(acnt_alias)
log.info( if accounts_found:
f'Loaded accounts for client @ {host}:{port}\n' log.info(
f'{pformat(accounts_found)}' f'Loaded accounts for api client\n\n'
) f'{pformat(accounts_found)}\n'
)
# XXX: why aren't we just updating this directy above # XXX: why aren't we just updating this directy above
# instead of using the intermediary `accounts_found`? # instead of using the intermediary `accounts_found`?
_accounts2clients.update(accounts_found) _accounts2clients.update(accounts_found)
# if we have no clients after the scan loop then error out. # if we have no clients after the scan loop then error out.
if not _client_cache: if not _client_cache:
@ -1472,7 +1533,7 @@ async def open_aio_client_method_relay(
msg: tuple[str, dict] | dict | None = await from_trio.get() msg: tuple[str, dict] | dict | None = await from_trio.get()
match msg: match msg:
case None: # termination sentinel case None: # termination sentinel
print('asyncio PROXY-RELAY SHUTDOWN') log.info('asyncio `Client` method-proxy SHUTDOWN!')
break break
case (meth_name, kwargs): case (meth_name, kwargs):

View File

@ -1183,7 +1183,14 @@ async def deliver_trade_events(
pos pos
and fill and fill
): ):
assert fill.commissionReport == cr now_cr: CommissionReport = fill.commissionReport
if (now_cr != cr):
log.warning(
'UhhHh ib updated the commission report mid-fill..?\n'
f'was: {pformat(cr)}\n'
f'now: {pformat(now_cr)}\n'
)
await emit_pp_update( await emit_pp_update(
ems_stream, ems_stream,
accounts_def, accounts_def,

View File

@ -671,8 +671,8 @@ async def _setup_quote_stream(
# making them mostly useless and explains why the scanner # making them mostly useless and explains why the scanner
# is always slow XD # is always slow XD
# '293', # Trade count for day # '293', # Trade count for day
'294', # Trade rate / minute # '294', # Trade rate / minute
'295', # Vlm rate / minute # '295', # Vlm rate / minute
), ),
contract: Contract | None = None, contract: Contract | None = None,
@ -915,9 +915,13 @@ async def stream_quotes(
if first_ticker: if first_ticker:
first_quote: dict = normalize(first_ticker) first_quote: dict = normalize(first_ticker)
log.info(
'Rxed init quote:\n' # TODO: we need a stack-oriented log levels filters for
f'{pformat(first_quote)}' # this!
# log.info(message, filter={'stack': 'live_feed'}) ?
log.runtime(
'Rxed init quote:\n\n'
f'{pformat(first_quote)}\n'
) )
# NOTE: it might be outside regular trading hours for # NOTE: it might be outside regular trading hours for
@ -969,7 +973,11 @@ async def stream_quotes(
raise_on_timeout=True, raise_on_timeout=True,
) )
first_quote: dict = normalize(first_ticker) first_quote: dict = normalize(first_ticker)
log.info(
# TODO: we need a stack-oriented log levels filters for
# this!
# log.info(message, filter={'stack': 'live_feed'}) ?
log.runtime(
'Rxed init quote:\n' 'Rxed init quote:\n'
f'{pformat(first_quote)}' f'{pformat(first_quote)}'
) )

View File

@ -31,7 +31,11 @@ from typing import (
) )
from bidict import bidict from bidict import bidict
import pendulum from pendulum import (
DateTime,
parse,
from_timestamp,
)
from ib_insync import ( from ib_insync import (
Contract, Contract,
Commodity, Commodity,
@ -66,10 +70,11 @@ tx_sort: Callable = partial(
iter_by_dt, iter_by_dt,
parsers={ parsers={
'dateTime': parse_flex_dt, 'dateTime': parse_flex_dt,
'datetime': pendulum.parse, 'datetime': parse,
# for some some fucking 2022 and
# back options records...fuck me. # XXX: for some some fucking 2022 and
'date': pendulum.parse, # back options records.. f@#$ me..
'date': parse,
} }
) )
@ -89,15 +94,38 @@ def norm_trade(
conid: int = str(record.get('conId') or record['conid']) conid: int = str(record.get('conId') or record['conid'])
bs_mktid: str = str(conid) bs_mktid: str = str(conid)
comms = record.get('commission')
if comms is None:
comms = -1*record['ibCommission']
price = record.get('price') or record['tradePrice'] # NOTE: sometimes weird records (like BTTX?)
# have no field for this?
comms: float = -1 * (
record.get('commission')
or record.get('ibCommission')
or 0
)
if not comms:
log.warning(
'No commissions found for record?\n'
f'{pformat(record)}\n'
)
price: float = (
record.get('price')
or record.get('tradePrice')
)
if price is None:
log.warning(
'No `price` field found in record?\n'
'Skipping normalization..\n'
f'{pformat(record)}\n'
)
return None
# the api doesn't do the -/+ on the quantity for you but flex # the api doesn't do the -/+ on the quantity for you but flex
# records do.. are you fucking serious ib...!? # records do.. are you fucking serious ib...!?
size = record.get('quantity') or record['shares'] * { size: float|int = (
record.get('quantity')
or record['shares']
) * {
'BOT': 1, 'BOT': 1,
'SLD': -1, 'SLD': -1,
}[record['side']] }[record['side']]
@ -128,26 +156,31 @@ def norm_trade(
# otype = tail[6] # otype = tail[6]
# strike = tail[7:] # strike = tail[7:]
print(f'skipping opts contract {symbol}') log.warning(
f'Skipping option contract -> NO SUPPORT YET!\n'
f'{symbol}\n'
)
return None return None
# timestamping is way different in API records # timestamping is way different in API records
dtstr = record.get('datetime') dtstr: str = record.get('datetime')
date = record.get('date') date: str = record.get('date')
flex_dtstr = record.get('dateTime') flex_dtstr: str = record.get('dateTime')
if dtstr or date: if dtstr or date:
dt = pendulum.parse(dtstr or date) dt: DateTime = parse(dtstr or date)
elif flex_dtstr: elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp.. # probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime']) dt: DateTime = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from # special handling of symbol extraction from
# flex records using some ad-hoc schema parsing. # flex records using some ad-hoc schema parsing.
asset_type: str = record.get( asset_type: str = (
'assetCategory' record.get('assetCategory')
) or record.get('secType', 'STK') or record.get('secType')
or 'STK'
)
if (expiry := ( if (expiry := (
record.get('lastTradeDateOrContractMonth') record.get('lastTradeDateOrContractMonth')
@ -357,6 +390,7 @@ def norm_trade_records(
if txn is None: if txn is None:
continue continue
# inject txns sorted by datetime
insort( insort(
records, records,
txn, txn,
@ -405,7 +439,7 @@ def api_trades_to_ledger_entries(
txn_dict[attr_name] = val txn_dict[attr_name] = val
tid = str(txn_dict['execId']) tid = str(txn_dict['execId'])
dt = pendulum.from_timestamp(txn_dict['time']) dt = from_timestamp(txn_dict['time'])
txn_dict['datetime'] = str(dt) txn_dict['datetime'] = str(dt)
acctid = accounts[txn_dict['acctNumber']] acctid = accounts[txn_dict['acctNumber']]

View File

@ -209,7 +209,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
break break
ib_client = proxy._aio_ns.ib ib_client = proxy._aio_ns.ib
log.info(f'Using {ib_client} for symbol search') log.info(
f'Using API client for symbol-search\n'
f'{ib_client}\n'
)
last = time.time() last = time.time()
async for pattern in stream: async for pattern in stream:
@ -294,7 +297,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
elif stock_results: elif stock_results:
break break
# else: # else:
await tractor.pause() # await tractor.pause()
# # match against our ad-hoc set immediately # # match against our ad-hoc set immediately
# adhoc_matches = fuzzy.extract( # adhoc_matches = fuzzy.extract(
@ -522,7 +525,21 @@ async def get_mkt_info(
venue = con.primaryExchange or con.exchange venue = con.primaryExchange or con.exchange
price_tick: Decimal = Decimal(str(details.minTick)) price_tick: Decimal = Decimal(str(details.minTick))
# price_tick: Decimal = Decimal('0.01') ib_min_tick_gt_2: Decimal = Decimal('0.01')
if (
price_tick < ib_min_tick_gt_2
):
# TODO: we need to add some kinda dynamic rounding sys
# to our MktPair i guess?
# not sure where the logic should sit, but likely inside
# the `.clearing._ems` i suppose...
log.warning(
'IB seems to disallow a min price tick < 0.01 '
'when the price is > 2.0..?\n'
f'Decreasing min tick precision for {fqme} to 0.01'
)
# price_tick = ib_min_tick
# await tractor.pause()
if atype == 'stock': if atype == 'stock':
# XXX: GRRRR they don't support fractional share sizes for # XXX: GRRRR they don't support fractional share sizes for

View File

@ -27,8 +27,8 @@ from typing import (
) )
import time import time
import httpx
import pendulum import pendulum
import asks
import numpy as np import numpy as np
import urllib.parse import urllib.parse
import hashlib import hashlib
@ -60,6 +60,11 @@ log = get_logger('piker.brokers.kraken')
# <uri>/<version>/ # <uri>/<version>/
_url = 'https://api.kraken.com/0' _url = 'https://api.kraken.com/0'
_headers: dict[str, str] = {
'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
}
# TODO: this is the only backend providing this right? # TODO: this is the only backend providing this right?
# in which case we should drop it from the defaults and # in which case we should drop it from the defaults and
# instead make a custom fields descr in this module! # instead make a custom fields descr in this module!
@ -135,16 +140,15 @@ class Client:
def __init__( def __init__(
self, self,
config: dict[str, str], config: dict[str, str],
httpx_client: httpx.AsyncClient,
name: str = '', name: str = '',
api_key: str = '', api_key: str = '',
secret: str = '' secret: str = ''
) -> None: ) -> None:
self._sesh = asks.Session(connections=4)
self._sesh.base_location = _url self._sesh: httpx.AsyncClient = httpx_client
self._sesh.headers.update({
'User-Agent':
'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)'
})
self._name = name self._name = name
self._api_key = api_key self._api_key = api_key
self._secret = secret self._secret = secret
@ -166,10 +170,9 @@ class Client:
method: str, method: str,
data: dict, data: dict,
) -> dict[str, Any]: ) -> dict[str, Any]:
resp = await self._sesh.post( resp: httpx.Response = await self._sesh.post(
path=f'/public/{method}', url=f'/public/{method}',
json=data, json=data,
timeout=float('inf')
) )
return resproc(resp, log) return resproc(resp, log)
@ -180,18 +183,18 @@ class Client:
uri_path: str uri_path: str
) -> dict[str, Any]: ) -> dict[str, Any]:
headers = { headers = {
'Content-Type': 'Content-Type': 'application/x-www-form-urlencoded',
'application/x-www-form-urlencoded', 'API-Key': self._api_key,
'API-Key': 'API-Sign': get_kraken_signature(
self._api_key, uri_path,
'API-Sign': data,
get_kraken_signature(uri_path, data, self._secret) self._secret,
),
} }
resp = await self._sesh.post( resp: httpx.Response = await self._sesh.post(
path=f'/private/{method}', url=f'/private/{method}',
data=data, data=data,
headers=headers, headers=headers,
timeout=float('inf')
) )
return resproc(resp, log) return resproc(resp, log)
@ -665,24 +668,36 @@ class Client:
@acm @acm
async def get_client() -> Client: async def get_client() -> Client:
conf = get_config() conf: dict[str, Any] = get_config()
if conf: async with httpx.AsyncClient(
client = Client( base_url=_url,
conf, headers=_headers,
# TODO: don't break these up and just do internal # TODO: is there a way to numerate this?
# conf lookups instead.. # https://www.python-httpx.org/advanced/clients/#why-use-a-client
name=conf['key_descr'], # connections=4
api_key=conf['api_key'], ) as trio_client:
secret=conf['secret'] if conf:
) client = Client(
else: conf,
client = Client({}) httpx_client=trio_client,
# at startup, load all symbols, and asset info in # TODO: don't break these up and just do internal
# batch requests. # conf lookups instead..
async with trio.open_nursery() as nurse: name=conf['key_descr'],
nurse.start_soon(client.get_assets) api_key=conf['api_key'],
await client.get_mkt_pairs() secret=conf['secret']
)
else:
client = Client(
conf={},
httpx_client=trio_client,
)
yield client # at startup, load all symbols, and asset info in
# batch requests.
async with trio.open_nursery() as nurse:
nurse.start_soon(client.get_assets)
await client.get_mkt_pairs()
yield client

View File

@ -612,18 +612,18 @@ async def open_trade_dialog(
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
client, client=client,
ws, ws=ws,
stream, ws_stream=stream,
ems_stream, ems_stream=ems_stream,
apiflows, apiflows=apiflows,
ids, ids=ids,
reqids2txids, reqids2txids=reqids2txids,
acnt, acnt=acnt,
api_trans, ledger=ledger,
acctid, acctid=acctid,
acc_name, acc_name=acc_name,
token, token=token,
) )
@ -639,7 +639,8 @@ async def handle_order_updates(
# transaction records which will be updated # transaction records which will be updated
# on new trade clearing events (aka order "fills") # on new trade clearing events (aka order "fills")
ledger_trans: dict[str, Transaction], ledger: TransactionLedger,
# ledger_trans: dict[str, Transaction],
acctid: str, acctid: str,
acc_name: str, acc_name: str,
token: str, token: str,
@ -699,7 +700,8 @@ async def handle_order_updates(
# if tid not in ledger_trans # if tid not in ledger_trans
} }
for tid, trade in trades.items(): for tid, trade in trades.items():
assert tid not in ledger_trans # assert tid not in ledger_trans
assert tid not in ledger
txid = trade['ordertxid'] txid = trade['ordertxid']
reqid = trade.get('userref') reqid = trade.get('userref')
@ -747,11 +749,17 @@ async def handle_order_updates(
client, client,
api_name_set='wsname', api_name_set='wsname',
) )
ppmsgs = trades2pps( ppmsgs: list[BrokerdPosition] = trades2pps(
acnt, acnt=acnt,
acctid, ledger=ledger,
new_trans, acctid=acctid,
new_trans=new_trans,
) )
# ppmsgs = trades2pps(
# acnt,
# acctid,
# new_trans,
# )
for pp_msg in ppmsgs: for pp_msg in ppmsgs:
await ems_stream.send(pp_msg) await ems_stream.send(pp_msg)

View File

@ -16,10 +16,9 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Kucoin broker backend Kucoin cex API backend.
''' '''
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
aclosing, aclosing,
@ -42,7 +41,7 @@ import wsproto
from uuid import uuid4 from uuid import uuid4
from trio_typing import TaskStatus from trio_typing import TaskStatus
import asks import httpx
from bidict import bidict from bidict import bidict
import numpy as np import numpy as np
import pendulum import pendulum
@ -212,8 +211,12 @@ def get_config() -> BrokerConfig | None:
class Client: class Client:
def __init__(self) -> None: def __init__(
self._config: BrokerConfig | None = get_config() self,
httpx_client: httpx.AsyncClient,
) -> None:
self._http: httpx.AsyncClient = httpx_client
self._config: BrokerConfig|None = get_config()
self._pairs: dict[str, KucoinMktPair] = {} self._pairs: dict[str, KucoinMktPair] = {}
self._fqmes2mktids: bidict[str, str] = bidict() self._fqmes2mktids: bidict[str, str] = bidict()
self._bars: list[list[float]] = [] self._bars: list[list[float]] = []
@ -227,18 +230,24 @@ class Client:
) -> dict[str, str | bytes]: ) -> dict[str, str | bytes]:
''' '''
Generate authenticated request headers Generate authenticated request headers:
https://docs.kucoin.com/#authentication https://docs.kucoin.com/#authentication
https://www.kucoin.com/docs/basic-info/connection-method/authentication/creating-a-request
https://www.kucoin.com/docs/basic-info/connection-method/authentication/signing-a-message
''' '''
if not self._config: if not self._config:
raise ValueError( raise ValueError(
'No config found when trying to send authenticated request') 'No config found when trying to send authenticated request'
)
str_to_sign = ( str_to_sign = (
str(int(time.time() * 1000)) str(int(time.time() * 1000))
+ action + f'/api/{api}/{endpoint.lstrip("/")}' +
action
+
f'/api/{api}/{endpoint.lstrip("/")}'
) )
signature = base64.b64encode( signature = base64.b64encode(
@ -249,6 +258,7 @@ class Client:
).digest() ).digest()
) )
# TODO: can we cache this between calls?
passphrase = base64.b64encode( passphrase = base64.b64encode(
hmac.new( hmac.new(
self._config.key_secret.encode('utf-8'), self._config.key_secret.encode('utf-8'),
@ -270,8 +280,10 @@ class Client:
self, self,
action: Literal['POST', 'GET'], action: Literal['POST', 'GET'],
endpoint: str, endpoint: str,
api: str = 'v2', api: str = 'v2',
headers: dict = {}, headers: dict = {},
) -> Any: ) -> Any:
''' '''
Generic request wrapper for Kucoin API Generic request wrapper for Kucoin API
@ -284,14 +296,19 @@ class Client:
api, api,
) )
api_url = f'https://api.kucoin.com/api/{api}/{endpoint}' req_meth: Callable = getattr(
self._http,
res = await asks.request(action, api_url, headers=headers) action.lower(),
)
json = res.json() res = await req_meth(
if 'data' in json: url=f'/{api}/{endpoint}',
return json['data'] headers=headers,
)
json: dict = res.json()
if (data := json.get('data')) is not None:
return data
else: else:
api_url: str = self._http.base_url
log.error( log.error(
f'Error making request to {api_url} ->\n' f'Error making request to {api_url} ->\n'
f'{pformat(res)}' f'{pformat(res)}'
@ -311,7 +328,7 @@ class Client:
''' '''
token_type = 'private' if private else 'public' token_type = 'private' if private else 'public'
try: try:
data: dict[str, Any] | None = await self._request( data: dict[str, Any]|None = await self._request(
'POST', 'POST',
endpoint=f'bullet-{token_type}', endpoint=f'bullet-{token_type}',
api='v1' api='v1'
@ -349,8 +366,8 @@ class Client:
currencies: dict[str, Currency] = {} currencies: dict[str, Currency] = {}
entries: list[dict] = await self._request( entries: list[dict] = await self._request(
'GET', 'GET',
api='v1',
endpoint='currencies', endpoint='currencies',
api='v1',
) )
for entry in entries: for entry in entries:
curr = Currency(**entry).copy() curr = Currency(**entry).copy()
@ -366,7 +383,10 @@ class Client:
dict[str, KucoinMktPair], dict[str, KucoinMktPair],
bidict[str, KucoinMktPair], bidict[str, KucoinMktPair],
]: ]:
entries = await self._request('GET', 'symbols') entries = await self._request(
'GET',
endpoint='symbols',
)
log.info(f' {len(entries)} Kucoin market pairs fetched') log.info(f' {len(entries)} Kucoin market pairs fetched')
pairs: dict[str, KucoinMktPair] = {} pairs: dict[str, KucoinMktPair] = {}
@ -567,13 +587,21 @@ def fqme_to_kucoin_sym(
@acm @acm
async def get_client() -> AsyncGenerator[Client, None]: async def get_client() -> AsyncGenerator[Client, None]:
client = Client() '''
Load an API `Client` preconfigured from user settings
async with trio.open_nursery() as n: '''
n.start_soon(client.get_mkt_pairs) async with (
await client.get_currencies() httpx.AsyncClient(
base_url=f'https://api.kucoin.com/api',
) as trio_client,
):
client = Client(httpx_client=trio_client)
async with trio.open_nursery() as tn:
tn.start_soon(client.get_mkt_pairs)
await client.get_currencies()
yield client yield client
@tractor.context @tractor.context