From 95ace5acb8c4d26f8f259cf9d83b4b8b1778e81b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 May 2024 11:09:10 -0400 Subject: [PATCH 01/13] Port `kraken` backend to `httpx` --- piker/brokers/kraken/api.py | 85 ++++++++++++++++++++-------------- piker/brokers/kraken/broker.py | 44 +++++++++++------- 2 files changed, 76 insertions(+), 53 deletions(-) diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 6414de8e..df2ebd6a 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -27,8 +27,8 @@ from typing import ( ) import time +import httpx import pendulum -import asks import numpy as np import urllib.parse import hashlib @@ -60,6 +60,11 @@ log = get_logger('piker.brokers.kraken') # // _url = 'https://api.kraken.com/0' + +_headers: dict[str, str] = { + 'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' +} + # TODO: this is the only backend providing this right? # in which case we should drop it from the defaults and # instead make a custom fields descr in this module! @@ -135,16 +140,15 @@ class Client: def __init__( self, config: dict[str, str], + httpx_client: httpx.AsyncClient, + name: str = '', api_key: str = '', secret: str = '' ) -> None: - self._sesh = asks.Session(connections=4) - self._sesh.base_location = _url - self._sesh.headers.update({ - 'User-Agent': - 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' - }) + + self._sesh: httpx.AsyncClient = httpx_client + self._name = name self._api_key = api_key self._secret = secret @@ -167,9 +171,8 @@ class Client: data: dict, ) -> dict[str, Any]: resp = await self._sesh.post( - path=f'/public/{method}', + url=f'/public/{method}', json=data, - timeout=float('inf') ) return resproc(resp, log) @@ -180,18 +183,18 @@ class Client: uri_path: str ) -> dict[str, Any]: headers = { - 'Content-Type': - 'application/x-www-form-urlencoded', - 'API-Key': - self._api_key, - 'API-Sign': - get_kraken_signature(uri_path, data, self._secret) + 'Content-Type': 'application/x-www-form-urlencoded', + 'API-Key': self._api_key, + 'API-Sign': get_kraken_signature( + uri_path, + data, + self._secret, + ), } resp = await self._sesh.post( - path=f'/private/{method}', + url=f'/private/{method}', data=data, headers=headers, - timeout=float('inf') ) return resproc(resp, log) @@ -665,24 +668,36 @@ class Client: @acm async def get_client() -> Client: - conf = get_config() - if conf: - client = Client( - conf, + conf: dict[str, Any] = get_config() + async with httpx.AsyncClient( + base_url=_url, + headers=_headers, - # TODO: don't break these up and just do internal - # conf lookups instead.. - name=conf['key_descr'], - api_key=conf['api_key'], - secret=conf['secret'] - ) - else: - client = Client({}) + # TODO: is there a way to numerate this? + # https://www.python-httpx.org/advanced/clients/#why-use-a-client + # connections=4 + ) as trio_client: + if conf: + client = Client( + conf, + httpx_client=trio_client, - # at startup, load all symbols, and asset info in - # batch requests. - async with trio.open_nursery() as nurse: - nurse.start_soon(client.get_assets) - await client.get_mkt_pairs() + # TODO: don't break these up and just do internal + # conf lookups instead.. + name=conf['key_descr'], + api_key=conf['api_key'], + secret=conf['secret'] + ) + else: + client = Client( + conf={}, + httpx_client=trio_client, + ) - yield client + # at startup, load all symbols, and asset info in + # batch requests. + async with trio.open_nursery() as nurse: + nurse.start_soon(client.get_assets) + await client.get_mkt_pairs() + + yield client diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 53168c03..eb5963cd 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -612,18 +612,18 @@ async def open_trade_dialog( # enter relay loop await handle_order_updates( - client, - ws, - stream, - ems_stream, - apiflows, - ids, - reqids2txids, - acnt, - api_trans, - acctid, - acc_name, - token, + client=client, + ws=ws, + ws_stream=stream, + ems_stream=ems_stream, + apiflows=apiflows, + ids=ids, + reqids2txids=reqids2txids, + acnt=acnt, + ledger=ledger, + acctid=acctid, + acc_name=acc_name, + token=token, ) @@ -639,7 +639,8 @@ async def handle_order_updates( # transaction records which will be updated # on new trade clearing events (aka order "fills") - ledger_trans: dict[str, Transaction], + ledger: TransactionLedger, + # ledger_trans: dict[str, Transaction], acctid: str, acc_name: str, token: str, @@ -699,7 +700,8 @@ async def handle_order_updates( # if tid not in ledger_trans } for tid, trade in trades.items(): - assert tid not in ledger_trans + # assert tid not in ledger_trans + assert tid not in ledger txid = trade['ordertxid'] reqid = trade.get('userref') @@ -747,11 +749,17 @@ async def handle_order_updates( client, api_name_set='wsname', ) - ppmsgs = trades2pps( - acnt, - acctid, - new_trans, + ppmsgs: list[BrokerdPosition] = trades2pps( + acnt=acnt, + ledger=ledger, + acctid=acctid, + new_trans=new_trans, ) + # ppmsgs = trades2pps( + # acnt, + # acctid, + # new_trans, + # ) for pp_msg in ppmsgs: await ems_stream.send(pp_msg) -- 2.34.1 From e6af97c596761e42dc0b3097f6ae977c732736a1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 May 2024 11:09:30 -0400 Subject: [PATCH 02/13] Port `kucoin` backend to `httpx` --- piker/brokers/kucoin.py | 75 ++++++++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 24 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 28524a22..8bb36f33 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -16,10 +16,9 @@ # along with this program. If not, see . ''' -Kucoin broker backend +Kucoin cex API backend. ''' - from contextlib import ( asynccontextmanager as acm, aclosing, @@ -42,7 +41,7 @@ import wsproto from uuid import uuid4 from trio_typing import TaskStatus -import asks +import httpx from bidict import bidict import numpy as np import pendulum @@ -212,8 +211,12 @@ def get_config() -> BrokerConfig | None: class Client: - def __init__(self) -> None: - self._config: BrokerConfig | None = get_config() + def __init__( + self, + httpx_client: httpx.AsyncClient, + ) -> None: + self._http: httpx.AsyncClient = httpx_client + self._config: BrokerConfig|None = get_config() self._pairs: dict[str, KucoinMktPair] = {} self._fqmes2mktids: bidict[str, str] = bidict() self._bars: list[list[float]] = [] @@ -227,18 +230,24 @@ class Client: ) -> dict[str, str | bytes]: ''' - Generate authenticated request headers + Generate authenticated request headers: + https://docs.kucoin.com/#authentication + https://www.kucoin.com/docs/basic-info/connection-method/authentication/creating-a-request + https://www.kucoin.com/docs/basic-info/connection-method/authentication/signing-a-message ''' - if not self._config: raise ValueError( - 'No config found when trying to send authenticated request') + 'No config found when trying to send authenticated request' + ) str_to_sign = ( str(int(time.time() * 1000)) - + action + f'/api/{api}/{endpoint.lstrip("/")}' + + + action + + + f'/api/{api}/{endpoint.lstrip("/")}' ) signature = base64.b64encode( @@ -249,6 +258,7 @@ class Client: ).digest() ) + # TODO: can we cache this between calls? passphrase = base64.b64encode( hmac.new( self._config.key_secret.encode('utf-8'), @@ -270,8 +280,10 @@ class Client: self, action: Literal['POST', 'GET'], endpoint: str, + api: str = 'v2', headers: dict = {}, + ) -> Any: ''' Generic request wrapper for Kucoin API @@ -284,13 +296,17 @@ class Client: api, ) - api_url = f'https://api.kucoin.com/api/{api}/{endpoint}' - - res = await asks.request(action, api_url, headers=headers) - - json = res.json() - if 'data' in json: - return json['data'] + req_meth: Callable = getattr( + self._http, + action.lower(), + ) + res = await req_meth( + url=f'/{api}/{endpoint}', + headers=headers, + ) + json: dict = res.json() + if data := json.get('data'): + return data else: log.error( f'Error making request to {api_url} ->\n' @@ -311,7 +327,7 @@ class Client: ''' token_type = 'private' if private else 'public' try: - data: dict[str, Any] | None = await self._request( + data: dict[str, Any]|None = await self._request( 'POST', endpoint=f'bullet-{token_type}', api='v1' @@ -349,8 +365,8 @@ class Client: currencies: dict[str, Currency] = {} entries: list[dict] = await self._request( 'GET', - api='v1', endpoint='currencies', + api='v1', ) for entry in entries: curr = Currency(**entry).copy() @@ -366,7 +382,10 @@ class Client: dict[str, KucoinMktPair], bidict[str, KucoinMktPair], ]: - entries = await self._request('GET', 'symbols') + entries = await self._request( + 'GET', + endpoint='symbols', + ) log.info(f' {len(entries)} Kucoin market pairs fetched') pairs: dict[str, KucoinMktPair] = {} @@ -567,13 +586,21 @@ def fqme_to_kucoin_sym( @acm async def get_client() -> AsyncGenerator[Client, None]: - client = Client() + ''' + Load an API `Client` preconfigured from user settings - async with trio.open_nursery() as n: - n.start_soon(client.get_mkt_pairs) - await client.get_currencies() + ''' + async with ( + httpx.AsyncClient( + base_url=f'https://api.kucoin.com/api', + ) as trio_client, + ): + client = Client(httpx_client=trio_client) + async with trio.open_nursery() as tn: + tn.start_soon(client.get_mkt_pairs) + await client.get_currencies() - yield client + yield client @tractor.context -- 2.34.1 From 44b8c705214c4e71ec635ae9440ba0f807fbfedd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 May 2024 12:55:45 -0400 Subject: [PATCH 03/13] Change type-annots to use `httpx.Response` --- piker/brokers/__init__.py | 4 ++-- piker/brokers/_util.py | 17 ++++++++++------- piker/brokers/kraken/api.py | 4 ++-- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 8817842e..0c328d9f 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -50,7 +50,7 @@ __brokers__: list[str] = [ 'binance', 'ib', 'kraken', - 'kucoin' + 'kucoin', # broken but used to work # 'questrade', @@ -71,7 +71,7 @@ def get_brokermod(brokername: str) -> ModuleType: Return the imported broker module by name. ''' - module = import_module('.' + brokername, 'piker.brokers') + module: ModuleType = import_module('.' + brokername, 'piker.brokers') # we only allow monkeying because it's for internal keying module.name = module.__name__.split('.')[-1] return module diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index d35c2578..3588a87a 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -18,10 +18,11 @@ Handy cross-broker utils. """ +from __future__ import annotations from functools import partial import json -import asks +import httpx import logging from ..log import ( @@ -60,11 +61,11 @@ class NoData(BrokerError): def __init__( self, *args, - info: dict, + info: dict|None = None, ) -> None: super().__init__(*args) - self.info: dict = info + self.info: dict|None = info # when raised, machinery can check if the backend # set a "frame size" for doing datetime calcs. @@ -90,16 +91,18 @@ class DataThrottle(BrokerError): def resproc( - resp: asks.response_objects.Response, + resp: httpx.Response, log: logging.Logger, return_json: bool = True, log_resp: bool = False, -) -> asks.response_objects.Response: - """Process response and return its json content. +) -> httpx.Response: + ''' + Process response and return its json content. Raise the appropriate error on non-200 OK responses. - """ + + ''' if not resp.status_code == 200: raise BrokerError(resp.body) try: diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index df2ebd6a..4b16a2d0 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -170,7 +170,7 @@ class Client: method: str, data: dict, ) -> dict[str, Any]: - resp = await self._sesh.post( + resp: httpx.Response = await self._sesh.post( url=f'/public/{method}', json=data, ) @@ -191,7 +191,7 @@ class Client: self._secret, ), } - resp = await self._sesh.post( + resp: httpx.Response = await self._sesh.post( url=f'/private/{method}', data=data, headers=headers, -- 2.34.1 From 37ca081555603835054d8386aea7c65c254e71bc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 24 May 2024 12:24:25 -0400 Subject: [PATCH 04/13] Woops, fix missing `api_url` ref in error log --- piker/brokers/kucoin.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 8bb36f33..939d1c60 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -308,6 +308,7 @@ class Client: if data := json.get('data'): return data else: + api_url: str = self._http.base_url log.error( f'Error making request to {api_url} ->\n' f'{pformat(res)}' -- 2.34.1 From b7883325a9e1ce9136f757c2fe9d3c8bbe597949 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 May 2024 16:19:28 -0400 Subject: [PATCH 05/13] Woops, `data` can be an empty list XD --- piker/brokers/kucoin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 939d1c60..0f5961ae 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -305,7 +305,7 @@ class Client: headers=headers, ) json: dict = res.json() - if data := json.get('data'): + if (data := json.get('data')) is not None: return data else: api_url: str = self._http.base_url -- 2.34.1 From 19c343e8b2946a9a780517a8b2adb5922d17189c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 11 Jun 2024 10:28:56 -0400 Subject: [PATCH 06/13] binance: raise `NoData` on null hist arrays Like we do with other history backends to indicate lack of a data set. This avoids any raise that will will bring down the backloader task with some downstream error. Raise a `ValueError` on no time index for now. --- piker/brokers/binance/feed.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index 1416d6a7..9e36a626 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -48,6 +48,7 @@ import tractor from piker.brokers import ( open_cached_client, + NoData, ) from piker._cacheables import ( async_lifo_cache, @@ -254,22 +255,28 @@ async def open_history_client( # NOTE: always query using their native symbology! mktid: str = mkt.bs_mktid - array = await client.bars( + array: np.ndarray = await client.bars( mktid, start_dt=start_dt, end_dt=end_dt, ) + if array.size == 0: + raise NoData('No frame for {start_dt} -> {end_dt}\n') + times = array['time'] - if ( - end_dt is None - ): - inow = round(time.time()) + if not times.any(): + raise ValueError( + 'Bad frame with null-times?\n\n' + f'{times}' + ) + + if end_dt is None: + inow: int = round(time.time()) if (inow - times[-1]) > 60: await tractor.pause() start_dt = from_timestamp(times[0]) end_dt = from_timestamp(times[-1]) - return array, start_dt, end_dt yield get_ohlc, {'erlangs': 3, 'rate': 3} -- 2.34.1 From 0c0b7116e3eac577c3a01a976d973e1b67dc803d Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Tue, 21 May 2024 17:56:06 -0300 Subject: [PATCH 07/13] Added new fields to SpotPair class in venues --- piker/brokers/binance/venues.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/piker/brokers/binance/venues.py b/piker/brokers/binance/venues.py index 2529e520..dce0ea95 100644 --- a/piker/brokers/binance/venues.py +++ b/piker/brokers/binance/venues.py @@ -137,10 +137,12 @@ class SpotPair(Pair, frozen=True): quoteOrderQtyMarketAllowed: bool isSpotTradingAllowed: bool isMarginTradingAllowed: bool + otoAllowed: bool defaultSelfTradePreventionMode: str allowedSelfTradePreventionModes: list[str] permissions: list[str] + permissionSets: list[list[str]] # NOTE: see `.data._symcache.SymbologyCache.load()` for why ns_path: str = 'piker.brokers.binance:SpotPair' -- 2.34.1 From 5314cb79d4d965014755861f440d7d2055f02290 Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Wed, 22 May 2024 00:08:47 -0300 Subject: [PATCH 08/13] Added note to exception when missing field in SpotPair class --- piker/brokers/binance/api.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index 0055668d..616be6b3 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -433,7 +433,11 @@ class Client: item['filters'] = filters pair_type: Type = PAIRTYPES[venue] - pair: Pair = pair_type(**item) + try: + pair: Pair = pair_type(**item) + except Exception as e: + e.add_note(f'\nDon\'t panic, check out this https://binance-docs.github.io/apidocs/spot/en/#exchange-information') + raise pair_table[pair.symbol.upper()] = pair # update an additional top-level-cross-venue-table -- 2.34.1 From ab1463d9429b0e8e9708a0dfdbc461cad1cf4b5e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Jun 2024 09:41:23 -0400 Subject: [PATCH 09/13] Port binance to `httpx` Like other backends use the `AsyncClient` for all venue specific client-sessions but change to allocating them inside `get_client()` using an `AsyncExitStack` and inserting directly in the `Client.venue_sesh: dict` table during init. Supporting impl tweaks: - remove most of the API client session building logic and instead make `Client.__init__()` take in a `venue_sessions: dict` (set it to `.venue_sesh`) and `conf: dict`, instead opting to do the http client configuration inside `get_client()` since all that code only needs to be run once. |_load config inside `get_client()` once. |_move session token creation into a new util func `init_api_keys()` and also call it from `get_client()` factory; toss in an ex. toml section config to the doc string. - define `_venue_urls: dict[str, str]` (content taken from old static `.venue_sesh` dict) at module level and feed them as `base_url: str` inputs to the client create loop. - adjust all call sigs in httpx-sesh-using methods, namely just `._api()`. - do a `.exch_info()` call in `get_client()` to cache the symbology set. Unrelated changes for various other outstanding buggers: - to get futures feeds correctly loading when selected from search (like 'XMRUSDT.USDTM.PERP'), expect a `MktPair` input to `Client.bars()` such that the exact venue-key can be looked up (via a new `.pair2venuekey()` meth) and then passed to `._api()`. - adjust `.broker.open_trade_dialog()` to failover to paper engine when there's no `api_key` key set for the `subconf` venue-key. --- piker/brokers/binance/api.py | 312 ++++++++++++++++++++------------ piker/brokers/binance/broker.py | 11 +- piker/brokers/binance/feed.py | 8 +- 3 files changed, 212 insertions(+), 119 deletions(-) diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index 616be6b3..2d1c4ee6 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -1,8 +1,8 @@ # piker: trading gear for hackers # Copyright (C) -# Guillermo Rodriguez (aka ze jefe) -# Tyler Goodlet -# (in stewardship for pikers) +# Guillermo Rodriguez (aka ze jefe) +# Tyler Goodlet +# (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -25,14 +25,13 @@ from __future__ import annotations from collections import ChainMap from contextlib import ( asynccontextmanager as acm, + AsyncExitStack, ) from datetime import datetime from pprint import pformat from typing import ( Any, Callable, - Hashable, - Sequence, Type, ) import hmac @@ -43,8 +42,7 @@ import trio from pendulum import ( now, ) -import asks -from rapidfuzz import process as fuzzy +import httpx import numpy as np from piker import config @@ -54,6 +52,7 @@ from piker.clearing._messages import ( from piker.accounting import ( Asset, digits_to_dec, + MktPair, ) from piker.types import Struct from piker.data import ( @@ -69,7 +68,6 @@ from .venues import ( PAIRTYPES, Pair, MarketType, - _spot_url, _futes_url, _testnet_futes_url, @@ -79,19 +77,18 @@ from .venues import ( log = get_logger('piker.brokers.binance') -def get_config() -> dict: - +def get_config() -> dict[str, Any]: conf: dict path: Path conf, path = config.load( conf_name='brokers', touch_if_dne=True, ) - - section = conf.get('binance') - + section: dict = conf.get('binance') if not section: - log.warning(f'No config section found for binance in {path}') + log.warning( + f'No config section found for binance in {path}' + ) return {} return section @@ -147,7 +144,7 @@ def binance_timestamp( class Client: ''' - Async ReST API client using ``trio`` + ``asks`` B) + Async ReST API client using `trio` + `httpx` B) Supports all of the spot, margin and futures endpoints depending on method. @@ -156,10 +153,17 @@ class Client: def __init__( self, + venue_sessions: dict[ + str, # venue key + tuple[httpx.AsyncClient, str] # session, eps path + ], + conf: dict[str, Any], # TODO: change this to `Client.[mkt_]venue: MarketType`? mkt_mode: MarketType = 'spot', ) -> None: + self.conf = conf + # build out pair info tables for each market type # and wrap in a chain-map view for search / query. self._spot_pairs: dict[str, Pair] = {} # spot info table @@ -186,44 +190,13 @@ class Client: # market symbols for use by search. See `.exch_info()`. self._pairs: ChainMap[str, Pair] = ChainMap() - # spot EPs sesh - self._sesh = asks.Session(connections=4) - self._sesh.base_location: str = _spot_url - # spot testnet - self._test_sesh: asks.Session = asks.Session(connections=4) - self._test_sesh.base_location: str = _testnet_spot_url - - # margin and extended spot endpoints session. - self._sapi_sesh = asks.Session(connections=4) - self._sapi_sesh.base_location: str = _spot_url - - # futes EPs sesh - self._fapi_sesh = asks.Session(connections=4) - self._fapi_sesh.base_location: str = _futes_url - # futes testnet - self._test_fapi_sesh: asks.Session = asks.Session(connections=4) - self._test_fapi_sesh.base_location: str = _testnet_futes_url - # global client "venue selection" mode. # set this when you want to switch venues and not have to # specify the venue for the next request. self.mkt_mode: MarketType = mkt_mode - # per 8 - self.venue_sesh: dict[ - str, # venue key - tuple[asks.Session, str] # session, eps path - ] = { - 'spot': (self._sesh, '/api/v3/'), - 'spot_testnet': (self._test_sesh, '/fapi/v1/'), - - 'margin': (self._sapi_sesh, '/sapi/v1/'), - - 'usdtm_futes': (self._fapi_sesh, '/fapi/v1/'), - 'usdtm_futes_testnet': (self._test_fapi_sesh, '/fapi/v1/'), - - # 'futes_coin': self._dapi, # TODO - } + # per-mkt-venue API client table + self.venue_sesh = venue_sessions # lookup for going from `.mkt_mode: str` to the config # subsection `key: str` @@ -238,40 +211,6 @@ class Client: 'futes': ['usdtm_futes'], } - # for creating API keys see, - # https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072 - self.conf: dict = get_config() - - for key, subconf in self.conf.items(): - if api_key := subconf.get('api_key', ''): - venue_keys: list[str] = self.confkey2venuekeys[key] - - venue_key: str - sesh: asks.Session - for venue_key in venue_keys: - sesh, _ = self.venue_sesh[venue_key] - - api_key_header: dict = { - # taken from official: - # https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47 - "Content-Type": "application/json;charset=utf-8", - - # TODO: prolly should just always query and copy - # in the real latest ver? - "User-Agent": "binance-connector/6.1.6smbz6", - "X-MBX-APIKEY": api_key, - } - sesh.headers.update(api_key_header) - - # if `.use_tesnet = true` in the config then - # also add headers for the testnet session which - # will be used for all order control - if subconf.get('use_testnet', False): - testnet_sesh, _ = self.venue_sesh[ - venue_key + '_testnet' - ] - testnet_sesh.headers.update(api_key_header) - def _mk_sig( self, data: dict, @@ -290,7 +229,6 @@ class Client: 'to define the creds for auth-ed endpoints!?' ) - # XXX: Info on security and authentification # https://binance-docs.github.io/apidocs/#endpoint-security-type if not (api_secret := subconf.get('api_secret')): @@ -319,7 +257,7 @@ class Client: params: dict, method: str = 'get', - venue: str | None = None, # if None use `.mkt_mode` state + venue: str|None = None, # if None use `.mkt_mode` state signed: bool = False, allow_testnet: bool = False, @@ -330,8 +268,9 @@ class Client: - /fapi/v3/ USD-M FUTURES, or - /api/v3/ SPOT/MARGIN - account/market endpoint request depending on either passed in `venue: str` - or the current setting `.mkt_mode: str` setting, default `'spot'`. + account/market endpoint request depending on either passed in + `venue: str` or the current setting `.mkt_mode: str` setting, + default `'spot'`. Docs per venue API: @@ -360,9 +299,6 @@ class Client: venue=venue_key, ) - sesh: asks.Session - path: str - # Check if we're configured to route order requests to the # venue equivalent's testnet. use_testnet: bool = False @@ -387,11 +323,12 @@ class Client: # ctl machinery B) venue_key += '_testnet' - sesh, path = self.venue_sesh[venue_key] - - meth: Callable = getattr(sesh, method) + client: httpx.AsyncClient + path: str + client, path = self.venue_sesh[venue_key] + meth: Callable = getattr(client, method) resp = await meth( - path=path + endpoint, + url=path + endpoint, params=params, timeout=float('inf'), ) @@ -436,7 +373,11 @@ class Client: try: pair: Pair = pair_type(**item) except Exception as e: - e.add_note(f'\nDon\'t panic, check out this https://binance-docs.github.io/apidocs/spot/en/#exchange-information') + e.add_note( + "\nDon't panic, prolly stupid binance changed their symbology schema again..\n" + 'Check out their API docs here:\n\n' + 'https://binance-docs.github.io/apidocs/spot/en/#exchange-information' + ) raise pair_table[pair.symbol.upper()] = pair @@ -532,7 +473,9 @@ class Client: ''' pair_table: dict[str, Pair] = self._venue2pairs[ - venue or self.mkt_mode + venue + or + self.mkt_mode ] if ( expiry @@ -551,9 +494,9 @@ class Client: venues: list[str] = [venue] # batch per-venue download of all exchange infos - async with trio.open_nursery() as rn: + async with trio.open_nursery() as tn: for ven in venues: - rn.start_soon( + tn.start_soon( self._cache_pairs, ven, ) @@ -606,11 +549,11 @@ class Client: ) -> dict[str, Any]: - fq_pairs: dict = await self.exch_info() + fq_pairs: dict[str, Pair] = await self.exch_info() # TODO: cache this list like we were in # `open_symbol_search()`? - keys: list[str] = list(fq_pairs) + # keys: list[str] = list(fq_pairs) return match_from_pairs( pairs=fq_pairs, @@ -618,9 +561,19 @@ class Client: score_cutoff=50, ) + def pair2venuekey( + self, + pair: Pair, + ) -> str: + return { + 'USDTM': 'usdtm_futes', + # 'COINM': 'coin_futes', + # ^-TODO-^ bc someone might want it..? + }[pair.venue] + async def bars( self, - symbol: str, + mkt: MktPair, start_dt: datetime | None = None, end_dt: datetime | None = None, @@ -650,16 +603,20 @@ class Client: start_time = binance_timestamp(start_dt) end_time = binance_timestamp(end_dt) + bs_pair: Pair = self._pairs[mkt.bs_fqme.upper()] + # https://binance-docs.github.io/apidocs/spot/en/#kline-candlestick-data bars = await self._api( 'klines', params={ - 'symbol': symbol.upper(), + # NOTE: always query using their native symbology! + 'symbol': mkt.bs_mktid.upper(), 'interval': '1m', 'startTime': start_time, 'endTime': end_time, 'limit': limit }, + venue=self.pair2venuekey(bs_pair), allow_testnet=False, ) new_bars: list[tuple] = [] @@ -976,17 +933,148 @@ class Client: await self.close_listen_key(key) +_venue_urls: dict[str, str] = { + 'spot': ( + _spot_url, + '/api/v3/', + ), + 'spot_testnet': ( + _testnet_spot_url, + '/fapi/v1/' + ), + # margin and extended spot endpoints session. + # TODO: did this ever get implemented fully? + # 'margin': ( + # _spot_url, + # '/sapi/v1/' + # ), + + 'usdtm_futes': ( + _futes_url, + '/fapi/v1/', + ), + + 'usdtm_futes_testnet': ( + _testnet_futes_url, + '/fapi/v1/', + ), + + # TODO: for anyone who actually needs it ;P + # 'coin_futes': () +} + + +def init_api_keys( + client: Client, + conf: dict[str, Any], +) -> None: + ''' + Set up per-venue API keys each http client according to the user's + `brokers.conf`. + + For ex, to use spot-testnet and live usdt futures APIs: + + ```toml + [binance] + # spot test net + spot.use_testnet = true + spot.api_key = '' + spot.api_secret = '' + + # futes live + futes.use_testnet = false + accounts.usdtm = 'futes' + futes.api_key = '' + futes.api_secret = ''' + + # if uncommented will use the built-in paper engine and not + # connect to `binance` API servers for order ctl. + # accounts.paper = 'paper' + ``` + + ''' + for key, subconf in conf.items(): + if api_key := subconf.get('api_key', ''): + venue_keys: list[str] = client.confkey2venuekeys[key] + + venue_key: str + client: httpx.AsyncClient + for venue_key in venue_keys: + client, _ = client.venue_sesh[venue_key] + + api_key_header: dict = { + # taken from official: + # https://github.com/binance/binance-futures-connector-python/blob/main/binance/api.py#L47 + "Content-Type": "application/json;charset=utf-8", + + # TODO: prolly should just always query and copy + # in the real latest ver? + "User-Agent": "binance-connector/6.1.6smbz6", + "X-MBX-APIKEY": api_key, + } + client.headers.update(api_key_header) + + # if `.use_tesnet = true` in the config then + # also add headers for the testnet session which + # will be used for all order control + if subconf.get('use_testnet', False): + testnet_sesh, _ = client.venue_sesh[ + venue_key + '_testnet' + ] + testnet_sesh.headers.update(api_key_header) + + @acm -async def get_client() -> Client: +async def get_client( + mkt_mode: MarketType = 'spot', +) -> Client: + ''' + Construct an single `piker` client which composes multiple underlying venue + specific API clients both for live and test networks. - client = Client() - await client.exch_info() - log.info( - f'{client} in {client.mkt_mode} mode: caching exchange infos..\n' - 'Cached multi-market pairs:\n' - f'spot: {len(client._spot_pairs)}\n' - f'usdtm_futes: {len(client._ufutes_pairs)}\n' - f'Total: {len(client._pairs)}\n' - ) + ''' + venue_sessions: dict[ + str, # venue key + tuple[httpx.AsyncClient, str] # session, eps path + ] = {} + async with AsyncExitStack() as client_stack: + for name, (base_url, path) in _venue_urls.items(): + api: httpx.AsyncClient = await client_stack.enter_async_context( + httpx.AsyncClient( + base_url=base_url, + # headers={}, - yield client + # TODO: is there a way to numerate this? + # https://www.python-httpx.org/advanced/clients/#why-use-a-client + # connections=4 + ) + ) + venue_sessions[name] = ( + api, + path, + ) + + conf: dict[str, Any] = get_config() + # for creating API keys see, + # https://www.binance.com/en/support/faq/how-to-create-api-keys-on-binance-360002502072 + client = Client( + venue_sessions=venue_sessions, + conf=conf, + mkt_mode=mkt_mode, + ) + init_api_keys( + client=client, + conf=conf, + ) + fq_pairs: dict[str, Pair] = await client.exch_info() + assert fq_pairs + log.info( + f'Loaded multi-venue `Client` in mkt_mode={client.mkt_mode!r}\n\n' + f'Symbology Summary:\n' + f'------ - ------\n' + f'spot: {len(client._spot_pairs)}\n' + f'usdtm_futes: {len(client._ufutes_pairs)}\n' + '------ - ------\n' + f'total: {len(client._pairs)}\n' + ) + yield client diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index ff6a2ff5..a13ce38f 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -264,15 +264,20 @@ async def open_trade_dialog( # do a open_symcache() call.. though maybe we can hide # this in a new async version of open_account()? async with open_cached_client('binance') as client: - subconf: dict = client.conf[venue_name] - use_testnet = subconf.get('use_testnet', False) + subconf: dict|None = client.conf.get(venue_name) # XXX: if no futes.api_key or spot.api_key has been set we # always fall back to the paper engine! - if not subconf.get('api_key'): + if ( + not subconf + or + not subconf.get('api_key') + ): await ctx.started('paper') return + use_testnet: bool = subconf.get('use_testnet', False) + async with ( open_cached_client('binance') as client, ): diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index 9e36a626..3a242e02 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -253,15 +253,15 @@ async def open_history_client( else: client.mkt_mode = 'spot' - # NOTE: always query using their native symbology! - mktid: str = mkt.bs_mktid array: np.ndarray = await client.bars( - mktid, + mkt=mkt, start_dt=start_dt, end_dt=end_dt, ) if array.size == 0: - raise NoData('No frame for {start_dt} -> {end_dt}\n') + raise NoData( + f'No frame for {start_dt} -> {end_dt}\n' + ) times = array['time'] if not times.any(): -- 2.34.1 From 97dd7e766aebe22c848e1ce9d910cecc7779aacc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 18 Jun 2024 10:00:18 -0400 Subject: [PATCH 10/13] ib: more trade record edge case handling - timestamps came as `'date'`-keyed from 2022 and before but now are `'datetime'`.. - some symbols seem to have no commission field, so handle that.. - when no `'price'` field found return `None` from `norm_trade()`. - add a warn log on mid-fill commission updates. --- piker/brokers/ib/broker.py | 9 ++++- piker/brokers/ib/ledger.py | 74 +++++++++++++++++++++++++++----------- 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index b317da22..ddda9020 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -1183,7 +1183,14 @@ async def deliver_trade_events( pos and fill ): - assert fill.commissionReport == cr + now_cr: CommissionReport = fill.commissionReport + if (now_cr != cr): + log.warning( + 'UhhHh ib updated the commission report mid-fill..?\n' + f'was: {pformat(cr)}\n' + f'now: {pformat(now_cr)}\n' + ) + await emit_pp_update( ems_stream, accounts_def, diff --git a/piker/brokers/ib/ledger.py b/piker/brokers/ib/ledger.py index a767d551..d62b4ba7 100644 --- a/piker/brokers/ib/ledger.py +++ b/piker/brokers/ib/ledger.py @@ -31,7 +31,11 @@ from typing import ( ) from bidict import bidict -import pendulum +from pendulum import ( + DateTime, + parse, + from_timestamp, +) from ib_insync import ( Contract, Commodity, @@ -66,10 +70,11 @@ tx_sort: Callable = partial( iter_by_dt, parsers={ 'dateTime': parse_flex_dt, - 'datetime': pendulum.parse, - # for some some fucking 2022 and - # back options records...fuck me. - 'date': pendulum.parse, + 'datetime': parse, + + # XXX: for some some fucking 2022 and + # back options records.. f@#$ me.. + 'date': parse, } ) @@ -89,15 +94,38 @@ def norm_trade( conid: int = str(record.get('conId') or record['conid']) bs_mktid: str = str(conid) - comms = record.get('commission') - if comms is None: - comms = -1*record['ibCommission'] - price = record.get('price') or record['tradePrice'] + # NOTE: sometimes weird records (like BTTX?) + # have no field for this? + comms: float = -1 * ( + record.get('commission') + or record.get('ibCommission') + or 0 + ) + if not comms: + log.warning( + 'No commissions found for record?\n' + f'{pformat(record)}\n' + ) + + price: float = ( + record.get('price') + or record.get('tradePrice') + ) + if price is None: + log.warning( + 'No `price` field found in record?\n' + 'Skipping normalization..\n' + f'{pformat(record)}\n' + ) + return None # the api doesn't do the -/+ on the quantity for you but flex # records do.. are you fucking serious ib...!? - size = record.get('quantity') or record['shares'] * { + size: float|int = ( + record.get('quantity') + or record['shares'] + ) * { 'BOT': 1, 'SLD': -1, }[record['side']] @@ -128,26 +156,31 @@ def norm_trade( # otype = tail[6] # strike = tail[7:] - print(f'skipping opts contract {symbol}') + log.warning( + f'Skipping option contract -> NO SUPPORT YET!\n' + f'{symbol}\n' + ) return None # timestamping is way different in API records - dtstr = record.get('datetime') - date = record.get('date') - flex_dtstr = record.get('dateTime') + dtstr: str = record.get('datetime') + date: str = record.get('date') + flex_dtstr: str = record.get('dateTime') if dtstr or date: - dt = pendulum.parse(dtstr or date) + dt: DateTime = parse(dtstr or date) elif flex_dtstr: # probably a flex record with a wonky non-std timestamp.. - dt = parse_flex_dt(record['dateTime']) + dt: DateTime = parse_flex_dt(record['dateTime']) # special handling of symbol extraction from # flex records using some ad-hoc schema parsing. - asset_type: str = record.get( - 'assetCategory' - ) or record.get('secType', 'STK') + asset_type: str = ( + record.get('assetCategory') + or record.get('secType') + or 'STK' + ) if (expiry := ( record.get('lastTradeDateOrContractMonth') @@ -357,6 +390,7 @@ def norm_trade_records( if txn is None: continue + # inject txns sorted by datetime insort( records, txn, @@ -405,7 +439,7 @@ def api_trades_to_ledger_entries( txn_dict[attr_name] = val tid = str(txn_dict['execId']) - dt = pendulum.from_timestamp(txn_dict['time']) + dt = from_timestamp(txn_dict['time']) txn_dict['datetime'] = str(dt) acctid = accounts[txn_dict['acctNumber']] -- 2.34.1 From 3531c2edc1797ea0bdd6e8dd13fa4a1df2c04ded Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 18 Jun 2024 10:03:34 -0400 Subject: [PATCH 11/13] ib: mask out trade and vlm rates for now --- piker/brokers/ib/feed.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 89d43b98..08751129 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -671,8 +671,8 @@ async def _setup_quote_stream( # making them mostly useless and explains why the scanner # is always slow XD # '293', # Trade count for day - '294', # Trade rate / minute - '295', # Vlm rate / minute + # '294', # Trade rate / minute + # '295', # Vlm rate / minute ), contract: Contract | None = None, -- 2.34.1 From f12c452d966b7f3cb29f232fef67b33bb431f51a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 18 Jun 2024 12:42:21 -0400 Subject: [PATCH 12/13] ib: warn about mkt precision cuckups that `Contract`s clearly deliver wrong.. --- piker/brokers/ib/symbols.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib/symbols.py b/piker/brokers/ib/symbols.py index 6fe86aa9..d7c09fef 100644 --- a/piker/brokers/ib/symbols.py +++ b/piker/brokers/ib/symbols.py @@ -294,7 +294,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None: elif stock_results: break # else: - await tractor.pause() + # await tractor.pause() # # match against our ad-hoc set immediately # adhoc_matches = fuzzy.extract( @@ -522,7 +522,21 @@ async def get_mkt_info( venue = con.primaryExchange or con.exchange price_tick: Decimal = Decimal(str(details.minTick)) - # price_tick: Decimal = Decimal('0.01') + ib_min_tick_gt_2: Decimal = Decimal('0.01') + if ( + price_tick < ib_min_tick_gt_2 + ): + # TODO: we need to add some kinda dynamic rounding sys + # to our MktPair i guess? + # not sure where the logic should sit, but likely inside + # the `.clearing._ems` i suppose... + log.warning( + 'IB seems to disallow a min price tick < 0.01 ' + 'when the price is > 2.0..?\n' + f'Decreasing min tick precision for {fqme} to 0.01' + ) + # price_tick = ib_min_tick + # await tractor.pause() if atype == 'stock': # XXX: GRRRR they don't support fractional share sizes for -- 2.34.1 From b57718077381093ea35397a076cd8f8acef0aafa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 20 Jun 2024 14:40:21 -0400 Subject: [PATCH 13/13] ib: `.api` mod and log-fmt cleaning About time we tidy'd a buncha status logging in this backend.. particularly for boot-up where there's lots of client-try-connect poll looping with account detection from the user config. `.api.Client` pprint and logging fmt improvements: - add `Client.__repr__()` which shows the minimally useful set of info from the underlying `.ib: IB` as well as a new `.acnts: list[str]` of the account aliases defined in the user's `brokers.toml`. - mk `.bars()` define a comprehensive `query_info: str` with all the request deats but only display if there's a problem with the response data. - mk `.get_config()` report both the config file path and the acnt aliases (NOT the actual account #s). - move all `.load_aio_clients()` client poll loop requests do `log.runtime()` statuses, only falling through to a `.warning()` when the loop fails to connect the client to the spec-ed API-gw addr, and |_ don't allow loading accounts for which the user has not defined an alias in `brokers.toml::[ib]`; raise a value-error in such cases with a message indicating how to mod the config. |_ only `log.info()` about acnts if some were loaded.. Other mod logging de-noising: - better status fmting in `.symbols.open_symbol_search()` with `repr(Client)`. - for `.feed.stream_quotes()` first quote reporting use `.runtime()`. --- piker/brokers/ib/_util.py | 4 +- piker/brokers/ib/api.py | 187 ++++++++++++++++++++++++------------ piker/brokers/ib/feed.py | 16 ++- piker/brokers/ib/symbols.py | 5 +- 4 files changed, 142 insertions(+), 70 deletions(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index a80bd514..2c71bc46 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -100,7 +100,7 @@ async def data_reset_hack( log.warning( no_setup_msg + - f'REQUIRES A `vnc_addrs: array` ENTRY' + 'REQUIRES A `vnc_addrs: array` ENTRY' ) vnc_host, vnc_port = vnc_sockaddr.get( @@ -259,7 +259,7 @@ def i3ipc_xdotool_manual_click_hack() -> None: timeout=timeout, ) - # re-activate and focus original window + # re-activate and focus original window subprocess.call([ 'xdotool', 'windowactivate', '--sync', str(orig_win_id), diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 2fe540bd..ae32b0ee 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -287,9 +287,31 @@ class Client: self.conf = config # NOTE: the ib.client here is "throttled" to 45 rps by default - self.ib = ib + self.ib: IB = ib self.ib.RaiseRequestErrors: bool = True + # self._acnt_names: set[str] = {} + self._acnt_names: list[str] = [] + + @property + def acnts(self) -> list[str]: + # return list(self._acnt_names) + return self._acnt_names + + def __repr__(self) -> str: + return ( + f'<{type(self).__name__}(' + f'ib={self.ib} ' + f'acnts={self.acnts}' + + # TODO: we need to mask out acnt-#s and other private + # infos if we're going to console this! + # f' |_.conf:\n' + # f' {pformat(self.conf)}\n' + + ')>' + ) + async def get_fills(self) -> list[Fill]: ''' Return list of rents `Fills` from trading session. @@ -376,55 +398,63 @@ class Client: # whatToShow='MIDPOINT', # whatToShow='TRADES', ) - log.info( - f'REQUESTING {ib_duration_str} worth {bar_size} BARS\n' - f'fqme: {fqme}\n' - f'global _enters: {_enters}\n' - f'kwargs: {pformat(kwargs)}\n' - ) - bars = await self.ib.reqHistoricalDataAsync( **kwargs, ) + query_info: str = ( + f'REQUESTING IB history BARS\n' + f' ------ - ------\n' + f'dt_duration: {dt_duration}\n' + f'ib_duration_str: {ib_duration_str}\n' + f'bar_size: {bar_size}\n' + f'fqme: {fqme}\n' + f'actor-global _enters: {_enters}\n' + f'kwargs: {pformat(kwargs)}\n' + ) # tail case if no history for range or none prior. + # NOTE: there's actually 3 cases here to handle (and + # this should be read alongside the implementation of + # `.reqHistoricalDataAsync()`): + # - a timeout occurred in which case insync internals return + # an empty list thing with bars.clear()... + # - no data exists for the period likely due to + # a weekend, holiday or other non-trading period prior to + # ``end_dt`` which exceeds the ``duration``, + # - LITERALLY this is the start of the mkt's history! if not bars: - # NOTE: there's actually 3 cases here to handle (and - # this should be read alongside the implementation of - # `.reqHistoricalDataAsync()`): - # - a timeout occurred in which case insync internals return - # an empty list thing with bars.clear()... - # - no data exists for the period likely due to - # a weekend, holiday or other non-trading period prior to - # ``end_dt`` which exceeds the ``duration``, - # - LITERALLY this is the start of the mkt's history! + # TODO: figure out wut's going on here. + # TODO: is this handy, a sync requester for tinkering + # with empty frame cases? + # def get_hist(): + # return self.ib.reqHistoricalData(**kwargs) + # import pdbp + # pdbp.set_trace() - # sync requester for debugging empty frame cases - def get_hist(): - return self.ib.reqHistoricalData(**kwargs) + log.critical( + 'STUPID IB SAYS NO HISTORY\n\n' + + query_info + ) - assert get_hist - import pdbp - pdbp.set_trace() - - return [], np.empty(0), dt_duration # TODO: we could maybe raise ``NoData`` instead if we - # rewrite the method in the first case? right now there's no - # way to detect a timeout. + # rewrite the method in the first case? + # right now there's no way to detect a timeout.. + return [], np.empty(0), dt_duration - # NOTE XXX: ensure minimum duration in bars B) - # => we recursively call this method until we get at least - # as many bars such that they sum in aggregate to the the - # desired total time (duration) at most. - # XXX XXX XXX - # WHY DID WE EVEN NEED THIS ORIGINALLY!? - # XXX XXX XXX - # - if you query over a gap and get no data - # that may short circuit the history + log.info(query_info) + # NOTE XXX: ensure minimum duration in bars? + # => recursively call this method until we get at least as + # many bars such that they sum in aggregate to the the + # desired total time (duration) at most. + # - if you query over a gap and get no data + # that may short circuit the history if ( - end_dt - and False + # XXX XXX XXX + # => WHY DID WE EVEN NEED THIS ORIGINALLY!? <= + # XXX XXX XXX + False + and end_dt ): nparr: np.ndarray = bars_to_np(bars) times: np.ndarray = nparr['time'] @@ -927,7 +957,10 @@ class Client: warnset = True else: - log.info(f'Got first quote for {contract}') + log.info( + 'Got first quote for contract\n' + f'{contract}\n' + ) break else: if timeouterr and raise_on_timeout: @@ -991,8 +1024,12 @@ class Client: outsideRth=True, optOutSmartRouting=True, + # TODO: need to understand this setting better as + # it pertains to shit ass mms.. routeMarketableToBbo=True, + designatedLocation='SMART', + # TODO: make all orders GTC? # https://interactivebrokers.github.io/tws-api/classIBApi_1_1Order.html#a95539081751afb9980f4c6bd1655a6ba # goodTillDate=f"yyyyMMdd-HH:mm:ss", @@ -1120,8 +1157,8 @@ def get_config() -> dict[str, Any]: names = list(accounts.keys()) accts = section['accounts'] = bidict(accounts) log.info( - f'brokers.toml defines {len(accts)} accounts: ' - f'{pformat(names)}' + f'{path} defines {len(accts)} account aliases:\n' + f'{pformat(names)}\n' ) if section is None: @@ -1188,7 +1225,7 @@ async def load_aio_clients( try_ports = list(try_ports.values()) _err = None - accounts_def = config.load_accounts(['ib']) + accounts_def: dict[str, str] = config.load_accounts(['ib']) ports = try_ports if port is None else [port] combos = list(itertools.product(hosts, ports)) accounts_found: dict[str, Client] = {} @@ -1227,7 +1264,9 @@ async def load_aio_clients( client = Client(ib=ib, config=conf) # update all actor-global caches - log.info(f"Caching client for {sockaddr}") + log.runtime( + f'Connected and caching `Client` @ {sockaddr!r}' + ) _client_cache[sockaddr] = client break @@ -1242,37 +1281,59 @@ async def load_aio_clients( OSError, ) as ce: _err = ce - log.warning( - f'Failed to connect on {host}:{port} for {i} time with,\n' - f'{ib.client.apiError.value()}\n' - 'retrying with a new client id..') + message: str = ( + f'Failed to connect on {host}:{port} after {i} tries with\n' + f'{ib.client.apiError.value()!r}\n\n' + 'Retrying with a new client id..\n' + ) + log.runtime(message) + else: + # XXX report loudly if we never established after all + # re-tries + log.warning(message) # Pre-collect all accounts available for this # connection and map account names to this client # instance. for value in ib.accountValues(): - acct_number = value.account + acct_number: str = value.account - entry = accounts_def.inverse.get(acct_number) - if not entry: + acnt_alias: str = accounts_def.inverse.get(acct_number) + if not acnt_alias: + + # TODO: should we constuct the below reco-ex from + # the existing config content? + _, path = config.load( + conf_name='brokers', + ) raise ValueError( - 'No section in brokers.toml for account:' - f' {acct_number}\n' - f'Please add entry to continue using this API client' + 'No alias in account section for account!\n' + f'Please add an acnt alias entry to your {path}\n' + 'For example,\n\n' + + '[ib.accounts]\n' + 'margin = {accnt_number!r}\n' + '^^^^^^ <- you need this part!\n\n' + + 'This ensures `piker` will not leak private acnt info ' + 'to console output by default!\n' ) # surjection of account names to operating clients. - if acct_number not in accounts_found: - accounts_found[entry] = client + if acnt_alias not in accounts_found: + accounts_found[acnt_alias] = client + # client._acnt_names.add(acnt_alias) + client._acnt_names.append(acnt_alias) - log.info( - f'Loaded accounts for client @ {host}:{port}\n' - f'{pformat(accounts_found)}' - ) + if accounts_found: + log.info( + f'Loaded accounts for api client\n\n' + f'{pformat(accounts_found)}\n' + ) - # XXX: why aren't we just updating this directy above - # instead of using the intermediary `accounts_found`? - _accounts2clients.update(accounts_found) + # XXX: why aren't we just updating this directy above + # instead of using the intermediary `accounts_found`? + _accounts2clients.update(accounts_found) # if we have no clients after the scan loop then error out. if not _client_cache: @@ -1472,7 +1533,7 @@ async def open_aio_client_method_relay( msg: tuple[str, dict] | dict | None = await from_trio.get() match msg: case None: # termination sentinel - print('asyncio PROXY-RELAY SHUTDOWN') + log.info('asyncio `Client` method-proxy SHUTDOWN!') break case (meth_name, kwargs): diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 08751129..2c1a9224 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -915,9 +915,13 @@ async def stream_quotes( if first_ticker: first_quote: dict = normalize(first_ticker) - log.info( - 'Rxed init quote:\n' - f'{pformat(first_quote)}' + + # TODO: we need a stack-oriented log levels filters for + # this! + # log.info(message, filter={'stack': 'live_feed'}) ? + log.runtime( + 'Rxed init quote:\n\n' + f'{pformat(first_quote)}\n' ) # NOTE: it might be outside regular trading hours for @@ -969,7 +973,11 @@ async def stream_quotes( raise_on_timeout=True, ) first_quote: dict = normalize(first_ticker) - log.info( + + # TODO: we need a stack-oriented log levels filters for + # this! + # log.info(message, filter={'stack': 'live_feed'}) ? + log.runtime( 'Rxed init quote:\n' f'{pformat(first_quote)}' ) diff --git a/piker/brokers/ib/symbols.py b/piker/brokers/ib/symbols.py index d7c09fef..04ec74e4 100644 --- a/piker/brokers/ib/symbols.py +++ b/piker/brokers/ib/symbols.py @@ -209,7 +209,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None: break ib_client = proxy._aio_ns.ib - log.info(f'Using {ib_client} for symbol search') + log.info( + f'Using API client for symbol-search\n' + f'{ib_client}\n' + ) last = time.time() async for pattern in stream: -- 2.34.1