providers_sync: required API updates and .brokers refinements #56

Merged
goodboy merged 15 commits from providers_sync into main 2026-01-07 18:23:14 +00:00
10 changed files with 101 additions and 37 deletions

View File

@ -98,13 +98,14 @@ async def open_cached_client(
If one has not been setup do it and cache it. If one has not been setup do it and cache it.
''' '''
brokermod = get_brokermod(brokername) brokermod: ModuleType = get_brokermod(brokername)
# TODO: make abstract or `typing.Protocol`
# client: Client
async with maybe_open_context( async with maybe_open_context(
acm_func=brokermod.get_client, acm_func=brokermod.get_client,
kwargs=kwargs, kwargs=kwargs,
) as (cache_hit, client): ) as (cache_hit, client):
if cache_hit: if cache_hit:
log.runtime(f'Reusing existing {client}') log.runtime(f'Reusing existing {client}')

View File

@ -374,9 +374,14 @@ class Client:
pair: Pair = pair_type(**item) pair: Pair = pair_type(**item)
except Exception as e: except Exception as e:
e.add_note( e.add_note(
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n" f'\n'
'Check out their API docs here:\n\n' f'New or removed field we need to codify!\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information' f'pair-type: {pair_type!r}\n'
f'\n'
f"Don't panic, prolly stupid binance changed their symbology schema again..\n"
f'Check out their API docs here:\n'
f'\n'
f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n'
) )
raise raise
pair_table[pair.symbol.upper()] = pair pair_table[pair.symbol.upper()] = pair

View File

@ -94,13 +94,15 @@ class L1(Struct):
# validation type # validation type
# https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Aggregate-Trade-Streams#response-example
class AggTrade(Struct, frozen=True): class AggTrade(Struct, frozen=True):
e: str # Event type e: str # Event type
E: int # Event time E: int # Event time
s: str # Symbol s: str # Symbol
a: int # Aggregate trade ID a: int # Aggregate trade ID
p: float # Price p: float # Price
q: float # Quantity q: float # Quantity with all the market trades
nq: float # Normal quantity without the trades involving RPI orders
f: int # First trade ID f: int # First trade ID
l: int # noqa Last trade ID l: int # noqa Last trade ID
T: int # Trade time T: int # Trade time

View File

@ -97,6 +97,16 @@ class Pair(Struct, frozen=True, kw_only=True):
baseAsset: str baseAsset: str
baseAssetPrecision: int baseAssetPrecision: int
permissionSets: list[list[str]]
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
# will become non-optional 2025-08-28?
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
pegInstructionsAllowed: bool = False
# https://developers.binance.com/docs/binance-spot-api-docs#2025-12-02
opoAllowed: bool = False
filters: dict[ filters: dict[
str, str,
str | int | float, str | int | float,
@ -142,7 +152,11 @@ class SpotPair(Pair, frozen=True):
defaultSelfTradePreventionMode: str defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str] allowedSelfTradePreventionModes: list[str]
permissions: list[str] permissions: list[str]
permissionSets: list[list[str]]
# can the paint botz creat liq gaps even easier on this asset?
# Bp
# https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority
amendAllowed: bool
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair' ns_path: str = 'piker.brokers.binance:SpotPair'
@ -209,7 +223,10 @@ class FutesPair(Pair):
assert pair == self.pair # sanity assert pair == self.pair # sanity
return f'{expiry}' return f'{expiry}'
case 'PERPETUAL': case (
'PERPETUAL'
| 'TRADIFI_PERPETUAL'
):
return 'PERP' return 'PERP'
case '': case '':
@ -238,7 +255,10 @@ class FutesPair(Pair):
margin: str = self.marginAsset margin: str = self.marginAsset
match ctype: match ctype:
case 'PERPETUAL': case (
'PERPETUAL'
| 'TRADIFI_PERPETUAL'
):
return f'{margin}M' return f'{margin}M'
case ( case (

View File

@ -471,11 +471,15 @@ def search(
''' '''
# global opts # global opts
brokermods = list(config['brokermods'].values()) brokermods: list[ModuleType] = list(config['brokermods'].values())
# TODO: this is coming from the `search --pdb` NOT from
# the `piker --pdb` XD ..
# -[ ] pull from the parent click ctx's values..dumdum
# assert pdb
# define tractor entrypoint # define tractor entrypoint
async def main(func): async def main(func):
async with maybe_open_pikerd( async with maybe_open_pikerd(
loglevel=config['loglevel'], loglevel=config['loglevel'],
debug_mode=pdb, debug_mode=pdb,

View File

@ -587,7 +587,7 @@ async def get_bars(
data_cs.cancel() data_cs.cancel()
# spawn new data reset task # spawn new data reset task
data_cs, reset_done = await nurse.start( data_cs, reset_done = await tn.start(
partial( partial(
wait_on_data_reset, wait_on_data_reset,
proxy, proxy,
@ -607,11 +607,11 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn # such that simultaneous symbol queries don't try data resettingn
# too fast.. # too fast..
unset_resetter: bool = False unset_resetter: bool = False
async with trio.open_nursery() as nurse: async with trio.open_nursery() as tn:
# start history request that we allow # start history request that we allow
# to run indefinitely until a result is acquired # to run indefinitely until a result is acquired
nurse.start_soon(query) tn.start_soon(query)
# start history reset loop which waits up to the timeout # start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset. # for a result before triggering a data feed reset.
@ -631,7 +631,7 @@ async def get_bars(
unset_resetter: bool = True unset_resetter: bool = True
# spawn new data reset task # spawn new data reset task
data_cs, reset_done = await nurse.start( data_cs, reset_done = await tn.start(
partial( partial(
wait_on_data_reset, wait_on_data_reset,
proxy, proxy,
@ -705,7 +705,9 @@ async def _setup_quote_stream(
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown(): def teardown():
ticker.updateEvent.disconnect(push) ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`") log.error(
f'Disconnected stream for `{symbol}`'
)
client.ib.cancelMktData(contract) client.ib.cancelMktData(contract)
# decouple broadcast mem chan # decouple broadcast mem chan
@ -761,7 +763,10 @@ async def open_aio_quote_stream(
symbol: str, symbol: str,
contract: Contract | None = None, contract: Contract | None = None,
) -> trio.abc.ReceiveStream: ) -> (
trio.abc.Channel| # iface
tractor.to_asyncio.LinkedTaskChannel # actually
):
from tractor.trionics import broadcast_receiver from tractor.trionics import broadcast_receiver
global _quote_streams global _quote_streams
@ -778,6 +783,7 @@ async def open_aio_quote_stream(
yield from_aio yield from_aio
return return
from_aio: tractor.to_asyncio.LinkedTaskChannel
async with tractor.to_asyncio.open_channel_from( async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream, _setup_quote_stream,
symbol=symbol, symbol=symbol,
@ -983,17 +989,18 @@ async def stream_quotes(
) )
cs: trio.CancelScope | None = None cs: trio.CancelScope | None = None
startup: bool = True startup: bool = True
iter_quotes: trio.abc.Channel
while ( while (
startup startup
or cs.cancel_called or cs.cancel_called
): ):
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
async with ( async with (
trio.open_nursery() as nurse, trio.open_nursery() as tn,
open_aio_quote_stream( open_aio_quote_stream(
symbol=sym, symbol=sym,
contract=con, contract=con,
) as stream, ) as iter_quotes,
): ):
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
@ -1021,9 +1028,9 @@ async def stream_quotes(
await rt_ev.wait() await rt_ev.wait()
cs.cancel() # cancel called should now be set cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed) tn.start_soon(reset_on_feed)
async with aclosing(stream): async with aclosing(iter_quotes):
# if syminfo.get('no_vlm', False): # if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']: if not init_msg.shm_write_opts['has_vlm']:
@ -1038,19 +1045,21 @@ async def stream_quotes(
# wait for real volume on feed (trading might be # wait for real volume on feed (trading might be
# closed) # closed)
while True: while True:
ticker = await stream.receive() ticker = await iter_quotes.receive()
# for a real volume contract we rait for # for a real volume contract we rait for
# the first "real" trade to take place # the first "real" trade to take place
if ( if (
# not calc_price # not calc_price
# and not ticker.rtTime # and not ticker.rtTime
not ticker.rtTime False
# not ticker.rtTime
): ):
# spin consuming tickers until we # spin consuming tickers until we
# get a real market datum # get a real market datum
log.debug(f"New unsent ticker: {ticker}") log.debug(f"New unsent ticker: {ticker}")
continue continue
else: else:
log.debug("Received first volume tick") log.debug("Received first volume tick")
# ugh, clear ticks since we've # ugh, clear ticks since we've
@ -1066,13 +1075,18 @@ async def stream_quotes(
log.debug(f"First ticker received {quote}") log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live # tell data-layer spawner-caller that live
# quotes are now streaming. # quotes are now active desptie not having
# necessarily received a first vlm/clearing
# tick.
ticker = await iter_quotes.receive()
feed_is_live.set() feed_is_live.set()
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# last = time.time() # last = time.time()
async for ticker in stream: async for ticker in iter_quotes:
quote = normalize(ticker) quote = normalize(ticker)
fqme = quote['fqme'] fqme: str = quote['fqme']
await send_chan.send({fqme: quote}) await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them

View File

@ -34,6 +34,7 @@ import urllib.parse
import hashlib import hashlib
import hmac import hmac
import base64 import base64
import tractor
import trio import trio
from piker import config from piker import config
@ -372,8 +373,7 @@ class Client:
# 1658347714, 'status': 'Success'}]} # 1658347714, 'status': 'Success'}]}
if xfers: if xfers:
import tractor await tractor.pause()
await tractor.pp()
trans: dict[str, Transaction] = {} trans: dict[str, Transaction] = {}
for entry in xfers: for entry in xfers:
@ -501,7 +501,8 @@ class Client:
for xkey, data in resp['result'].items(): for xkey, data in resp['result'].items():
# NOTE: always cache in pairs tables for faster lookup # NOTE: always cache in pairs tables for faster lookup
pair = Pair(xname=xkey, **data) with tractor.devx.maybe_open_crash_handler(): # as bxerr:
pair = Pair(xname=xkey, **data)
# register the above `Pair` structs for all # register the above `Pair` structs for all
# key-sets/monikers: a set of 4 (frickin) tables # key-sets/monikers: a set of 4 (frickin) tables

View File

@ -175,9 +175,8 @@ async def handle_order_requests(
case { case {
'account': 'kraken.spot' as account, 'account': 'kraken.spot' as account,
'action': action, 'action': 'buy'|'sell',
} if action in {'buy', 'sell'}: }:
# validate # validate
order = BrokerdOrder(**msg) order = BrokerdOrder(**msg)
@ -262,6 +261,12 @@ async def handle_order_requests(
} | extra } | extra
log.info(f'Submitting WS order request:\n{pformat(req)}') log.info(f'Submitting WS order request:\n{pformat(req)}')
# NOTE HOWTO, debug order requests
#
# if 'XRP' in pair:
# await tractor.pause()
await ws.send_msg(req) await ws.send_msg(req)
# placehold for sanity checking in relay loop # placehold for sanity checking in relay loop
@ -544,7 +549,7 @@ async def open_trade_dialog(
# to be reloaded. # to be reloaded.
balances: dict[str, float] = await client.get_balances() balances: dict[str, float] = await client.get_balances()
verify_balances( await verify_balances(
acnt, acnt,
src_fiat, src_fiat,
balances, balances,
@ -1085,6 +1090,8 @@ async def handle_order_updates(
f'Failed to {action} order {reqid}:\n' f'Failed to {action} order {reqid}:\n'
f'{errmsg}' f'{errmsg}'
) )
# if tractor._state.debug_mode():
# await tractor.pause()
symbol: str = 'N/A' symbol: str = 'N/A'
if chain := apiflows.get(reqid): if chain := apiflows.get(reqid):

View File

@ -21,7 +21,6 @@ Symbology defs and search.
from decimal import Decimal from decimal import Decimal
import tractor import tractor
from rapidfuzz import process as fuzzy
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
@ -41,8 +40,13 @@ from piker.accounting._mktinfo import (
) )
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(Struct): class Pair(Struct):
'''
A tradable asset pair as schema-defined by,
https://docs.kraken.com/api/docs/rest-api/get-tradable-asset-pairs
'''
xname: str # idiotic bs_mktid equiv i guess? xname: str # idiotic bs_mktid equiv i guess?
altname: str # alternate pair name altname: str # alternate pair name
wsname: str # WebSocket pair name (if available) wsname: str # WebSocket pair name (if available)
@ -53,7 +57,6 @@ class Pair(Struct):
lot: str # volume lot size lot: str # volume lot size
cost_decimals: int cost_decimals: int
costmin: float
pair_decimals: int # scaling decimal places for pair pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume lot_decimals: int # scaling decimal places for volume
@ -79,6 +82,7 @@ class Pair(Struct):
tick_size: float # min price step size tick_size: float # min price step size
status: str status: str
costmin: str|None = None # XXX, only some mktpairs?
short_position_limit: float = 0 short_position_limit: float = 0
long_position_limit: float = float('inf') long_position_limit: float = float('inf')

View File

@ -37,6 +37,12 @@ import tractor
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
import numpy as np import numpy as np
import wrapt import wrapt
# TODO, port to `httpx`/`trio-websocket` whenver i get back to
# writing a proper ws-api streamer for this backend (since the data
# feeds are free now) as per GH feat-req:
# https://github.com/pikers/piker/issues/509
#
import asks import asks
from ..calc import humanize, percent_change from ..calc import humanize, percent_change