diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 0c328d9f..94e4cbe1 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -98,13 +98,14 @@ async def open_cached_client( If one has not been setup do it and cache it. ''' - brokermod = get_brokermod(brokername) + brokermod: ModuleType = get_brokermod(brokername) + + # TODO: make abstract or `typing.Protocol` + # client: Client async with maybe_open_context( acm_func=brokermod.get_client, kwargs=kwargs, - ) as (cache_hit, client): - if cache_hit: log.runtime(f'Reusing existing {client}') diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index f163162e..78be9ef8 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -374,9 +374,14 @@ class Client: pair: Pair = pair_type(**item) except Exception as e: e.add_note( - "\nDon't panic, prolly stupid binance changed their symbology schema again..\n" - 'Check out their API docs here:\n\n' - 'https://binance-docs.github.io/apidocs/spot/en/#exchange-information' + f'\n' + f'New or removed field we need to codify!\n' + f'pair-type: {pair_type!r}\n' + f'\n' + f"Don't panic, prolly stupid binance changed their symbology schema again..\n" + f'Check out their API docs here:\n' + f'\n' + f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n' ) raise pair_table[pair.symbol.upper()] = pair diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index b7d68edb..79e5cbeb 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -94,13 +94,15 @@ class L1(Struct): # validation type +# https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Aggregate-Trade-Streams#response-example class AggTrade(Struct, frozen=True): e: str # Event type E: int # Event time s: str # Symbol a: int # Aggregate trade ID p: float # Price - q: float # Quantity + q: float # Quantity with all the market trades + nq: float # Normal quantity without the trades involving RPI orders f: int # First trade ID l: int # noqa Last trade ID T: int # Trade time diff --git a/piker/brokers/binance/venues.py b/piker/brokers/binance/venues.py index 2c025fe1..efe27967 100644 --- a/piker/brokers/binance/venues.py +++ b/piker/brokers/binance/venues.py @@ -97,6 +97,16 @@ class Pair(Struct, frozen=True, kw_only=True): baseAsset: str baseAssetPrecision: int + permissionSets: list[list[str]] + + # https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26 + # will become non-optional 2025-08-28? + # https://developers.binance.com/docs/binance-spot-api-docs#future-changes + pegInstructionsAllowed: bool = False + + # https://developers.binance.com/docs/binance-spot-api-docs#2025-12-02 + opoAllowed: bool = False + filters: dict[ str, str | int | float, @@ -142,7 +152,11 @@ class SpotPair(Pair, frozen=True): defaultSelfTradePreventionMode: str allowedSelfTradePreventionModes: list[str] permissions: list[str] - permissionSets: list[list[str]] + + # can the paint botz creat liq gaps even easier on this asset? + # Bp + # https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority + amendAllowed: bool # NOTE: see `.data._symcache.SymbologyCache.load()` for why ns_path: str = 'piker.brokers.binance:SpotPair' @@ -209,7 +223,10 @@ class FutesPair(Pair): assert pair == self.pair # sanity return f'{expiry}' - case 'PERPETUAL': + case ( + 'PERPETUAL' + | 'TRADIFI_PERPETUAL' + ): return 'PERP' case '': @@ -238,7 +255,10 @@ class FutesPair(Pair): margin: str = self.marginAsset match ctype: - case 'PERPETUAL': + case ( + 'PERPETUAL' + | 'TRADIFI_PERPETUAL' + ): return f'{margin}M' case ( diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index d54b2203..626b4ff8 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -471,11 +471,15 @@ def search( ''' # global opts - brokermods = list(config['brokermods'].values()) + brokermods: list[ModuleType] = list(config['brokermods'].values()) + + # TODO: this is coming from the `search --pdb` NOT from + # the `piker --pdb` XD .. + # -[ ] pull from the parent click ctx's values..dumdum + # assert pdb # define tractor entrypoint async def main(func): - async with maybe_open_pikerd( loglevel=config['loglevel'], debug_mode=pdb, diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 2c1a9224..062b2c2e 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -587,7 +587,7 @@ async def get_bars( data_cs.cancel() # spawn new data reset task - data_cs, reset_done = await nurse.start( + data_cs, reset_done = await tn.start( partial( wait_on_data_reset, proxy, @@ -607,11 +607,11 @@ async def get_bars( # such that simultaneous symbol queries don't try data resettingn # too fast.. unset_resetter: bool = False - async with trio.open_nursery() as nurse: + async with trio.open_nursery() as tn: # start history request that we allow # to run indefinitely until a result is acquired - nurse.start_soon(query) + tn.start_soon(query) # start history reset loop which waits up to the timeout # for a result before triggering a data feed reset. @@ -631,7 +631,7 @@ async def get_bars( unset_resetter: bool = True # spawn new data reset task - data_cs, reset_done = await nurse.start( + data_cs, reset_done = await tn.start( partial( wait_on_data_reset, proxy, @@ -705,7 +705,9 @@ async def _setup_quote_stream( # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore def teardown(): ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") + log.error( + f'Disconnected stream for `{symbol}`' + ) client.ib.cancelMktData(contract) # decouple broadcast mem chan @@ -761,7 +763,10 @@ async def open_aio_quote_stream( symbol: str, contract: Contract | None = None, -) -> trio.abc.ReceiveStream: +) -> ( + trio.abc.Channel| # iface + tractor.to_asyncio.LinkedTaskChannel # actually +): from tractor.trionics import broadcast_receiver global _quote_streams @@ -778,6 +783,7 @@ async def open_aio_quote_stream( yield from_aio return + from_aio: tractor.to_asyncio.LinkedTaskChannel async with tractor.to_asyncio.open_channel_from( _setup_quote_stream, symbol=symbol, @@ -983,17 +989,18 @@ async def stream_quotes( ) cs: trio.CancelScope | None = None startup: bool = True + iter_quotes: trio.abc.Channel while ( startup or cs.cancel_called ): with trio.CancelScope() as cs: async with ( - trio.open_nursery() as nurse, + trio.open_nursery() as tn, open_aio_quote_stream( symbol=sym, contract=con, - ) as stream, + ) as iter_quotes, ): # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) @@ -1021,9 +1028,9 @@ async def stream_quotes( await rt_ev.wait() cs.cancel() # cancel called should now be set - nurse.start_soon(reset_on_feed) + tn.start_soon(reset_on_feed) - async with aclosing(stream): + async with aclosing(iter_quotes): # if syminfo.get('no_vlm', False): if not init_msg.shm_write_opts['has_vlm']: @@ -1038,19 +1045,21 @@ async def stream_quotes( # wait for real volume on feed (trading might be # closed) while True: - ticker = await stream.receive() + ticker = await iter_quotes.receive() # for a real volume contract we rait for # the first "real" trade to take place if ( # not calc_price # and not ticker.rtTime - not ticker.rtTime + False + # not ticker.rtTime ): # spin consuming tickers until we # get a real market datum log.debug(f"New unsent ticker: {ticker}") continue + else: log.debug("Received first volume tick") # ugh, clear ticks since we've @@ -1066,13 +1075,18 @@ async def stream_quotes( log.debug(f"First ticker received {quote}") # tell data-layer spawner-caller that live - # quotes are now streaming. + # quotes are now active desptie not having + # necessarily received a first vlm/clearing + # tick. + ticker = await iter_quotes.receive() feed_is_live.set() + fqme: str = quote['fqme'] + await send_chan.send({fqme: quote}) # last = time.time() - async for ticker in stream: + async for ticker in iter_quotes: quote = normalize(ticker) - fqme = quote['fqme'] + fqme: str = quote['fqme'] await send_chan.send({fqme: quote}) # ugh, clear ticks since we've consumed them diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 4b16a2d0..5c29e9c7 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -34,6 +34,7 @@ import urllib.parse import hashlib import hmac import base64 +import tractor import trio from piker import config @@ -372,8 +373,7 @@ class Client: # 1658347714, 'status': 'Success'}]} if xfers: - import tractor - await tractor.pp() + await tractor.pause() trans: dict[str, Transaction] = {} for entry in xfers: @@ -501,7 +501,8 @@ class Client: for xkey, data in resp['result'].items(): # NOTE: always cache in pairs tables for faster lookup - pair = Pair(xname=xkey, **data) + with tractor.devx.maybe_open_crash_handler(): # as bxerr: + pair = Pair(xname=xkey, **data) # register the above `Pair` structs for all # key-sets/monikers: a set of 4 (frickin) tables diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index eb5963cd..0aaf5730 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -175,9 +175,8 @@ async def handle_order_requests( case { 'account': 'kraken.spot' as account, - 'action': action, - } if action in {'buy', 'sell'}: - + 'action': 'buy'|'sell', + }: # validate order = BrokerdOrder(**msg) @@ -262,6 +261,12 @@ async def handle_order_requests( } | extra log.info(f'Submitting WS order request:\n{pformat(req)}') + + # NOTE HOWTO, debug order requests + # + # if 'XRP' in pair: + # await tractor.pause() + await ws.send_msg(req) # placehold for sanity checking in relay loop @@ -544,7 +549,7 @@ async def open_trade_dialog( # to be reloaded. balances: dict[str, float] = await client.get_balances() - verify_balances( + await verify_balances( acnt, src_fiat, balances, @@ -1085,6 +1090,8 @@ async def handle_order_updates( f'Failed to {action} order {reqid}:\n' f'{errmsg}' ) + # if tractor._state.debug_mode(): + # await tractor.pause() symbol: str = 'N/A' if chain := apiflows.get(reqid): diff --git a/piker/brokers/kraken/symbols.py b/piker/brokers/kraken/symbols.py index 80176b4d..0cd4cdb1 100644 --- a/piker/brokers/kraken/symbols.py +++ b/piker/brokers/kraken/symbols.py @@ -21,7 +21,6 @@ Symbology defs and search. from decimal import Decimal import tractor -from rapidfuzz import process as fuzzy from piker._cacheables import ( async_lifo_cache, @@ -41,8 +40,13 @@ from piker.accounting._mktinfo import ( ) -# https://www.kraken.com/features/api#get-tradable-pairs class Pair(Struct): + ''' + A tradable asset pair as schema-defined by, + + https://docs.kraken.com/api/docs/rest-api/get-tradable-asset-pairs + + ''' xname: str # idiotic bs_mktid equiv i guess? altname: str # alternate pair name wsname: str # WebSocket pair name (if available) @@ -53,7 +57,6 @@ class Pair(Struct): lot: str # volume lot size cost_decimals: int - costmin: float pair_decimals: int # scaling decimal places for pair lot_decimals: int # scaling decimal places for volume @@ -79,6 +82,7 @@ class Pair(Struct): tick_size: float # min price step size status: str + costmin: str|None = None # XXX, only some mktpairs? short_position_limit: float = 0 long_position_limit: float = float('inf') diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 31133f23..97ef5a3a 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -37,6 +37,12 @@ import tractor from async_generator import asynccontextmanager import numpy as np import wrapt + +# TODO, port to `httpx`/`trio-websocket` whenver i get back to +# writing a proper ws-api streamer for this backend (since the data +# feeds are free now) as per GH feat-req: +# https://github.com/pikers/piker/issues/509 +# import asks from ..calc import humanize, percent_change