Compare commits

..

No commits in common. "c933f2ad5601530dcd71255493bd1d70913040f1" and "5382815b2de322b8fd20c6e219243c9ba110576d" have entirely different histories.

6 changed files with 30 additions and 83 deletions

View File

@ -567,7 +567,6 @@ class Client:
) -> str: ) -> str:
return { return {
'USDTM': 'usdtm_futes', 'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes', # 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..? # ^-TODO-^ bc someone might want it..?
}[pair.venue] }[pair.venue]

View File

@ -42,6 +42,7 @@ from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
) )
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
@ -110,7 +111,6 @@ class AggTrade(Struct, frozen=True):
async def stream_messages( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
) -> AsyncGenerator[NoBsWs, dict]: ) -> AsyncGenerator[NoBsWs, dict]:
# TODO: match syntax here! # TODO: match syntax here!
@ -221,8 +221,6 @@ def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
} }
# TODO, why aren't frame resp `log.info()`s showing in upstream
# code?!
@acm @acm
async def open_history_client( async def open_history_client(
mkt: MktPair, mkt: MktPair,
@ -465,8 +463,6 @@ async def stream_quotes(
): ):
init_msgs: list[FeedInit] = [] init_msgs: list[FeedInit] = []
for sym in symbols: for sym in symbols:
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym) mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec # build out init msgs according to latest spec
@ -515,6 +511,7 @@ async def stream_quotes(
# start streaming # start streaming
async for typ, quote in msg_gen: async for typ, quote in msg_gen:
# period = time.time() - last # period = time.time() - last
# hz = 1/period if period else float('inf') # hz = 1/period if period else float('inf')
# if hz > 60: # if hz > 60:
@ -550,7 +547,7 @@ async def open_symbol_search(
) )
# repack in fqme-keyed table # repack in fqme-keyed table
byfqme: dict[str, Pair] = {} byfqme: dict[start, Pair] = {}
for pair in pairs.values(): for pair in pairs.values():
byfqme[pair.bs_fqme] = pair byfqme[pair.bs_fqme] = pair

View File

@ -181,6 +181,7 @@ class FutesPair(Pair):
quoteAsset: str # 'USDT', quoteAsset: str # 'USDT',
quotePrecision: int # 8, quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000', requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500', triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'], underlyingSubType: list[str] # ['PoW'],

View File

@ -62,7 +62,7 @@ from piker._cacheables import (
) )
from piker.log import get_logger from piker.log import get_logger
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
from piker.types import Struct # NOTE, this is already a `tractor.msg.Struct` from piker.types import Struct
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
@ -98,18 +98,9 @@ class KucoinMktPair(Struct, frozen=True):
def size_tick(self) -> Decimal: def size_tick(self) -> Decimal:
return Decimal(str(self.quoteMinSize)) return Decimal(str(self.quoteMinSize))
callauctionFirstStageStartTime: None|float
callauctionIsEnabled: bool
callauctionPriceCeiling: float|None
callauctionPriceFloor: float|None
callauctionSecondStageStartTime: float|None
callauctionThirdStageStartTime: float|None
enableTrading: bool enableTrading: bool
feeCategory: int
feeCurrency: str feeCurrency: str
isMarginEnabled: bool isMarginEnabled: bool
makerFeeCoefficient: float
market: str market: str
minFunds: float minFunds: float
name: str name: str
@ -119,10 +110,7 @@ class KucoinMktPair(Struct, frozen=True):
quoteIncrement: float quoteIncrement: float
quoteMaxSize: float quoteMaxSize: float
quoteMinSize: float quoteMinSize: float
st: bool
symbol: str # our bs_mktid, kucoin's internal id symbol: str # our bs_mktid, kucoin's internal id
takerFeeCoefficient: float
tradingStartTime: float|None
class AccountTrade(Struct, frozen=True): class AccountTrade(Struct, frozen=True):
@ -404,13 +392,7 @@ class Client:
pairs: dict[str, KucoinMktPair] = {} pairs: dict[str, KucoinMktPair] = {}
fqmes2mktids: bidict[str, str] = bidict() fqmes2mktids: bidict[str, str] = bidict()
for item in entries: for item in entries:
try:
pair = pairs[item['name']] = KucoinMktPair(**item) pair = pairs[item['name']] = KucoinMktPair(**item)
except TypeError as te:
raise TypeError(
'`KucoinMktPair` and reponse fields do not match ??\n'
f'{KucoinMktPair.fields_diff(item)}\n'
) from te
fqmes2mktids[ fqmes2mktids[
item['name'].lower().replace('-', '') item['name'].lower().replace('-', '')
] = pair.name ] = pair.name
@ -611,7 +593,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
''' '''
async with ( async with (
httpx.AsyncClient( httpx.AsyncClient(
base_url='https://api.kucoin.com/api', base_url=f'https://api.kucoin.com/api',
) as trio_client, ) as trio_client,
): ):
client = Client(httpx_client=trio_client) client = Client(httpx_client=trio_client)
@ -655,7 +637,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'}) await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.warning('Starting ping task for kucoin ws connection') log.info('Starting ping task for kucoin ws connection')
n.start_soon(ping_server) n.start_soon(ping_server)
yield yield
@ -667,14 +649,9 @@ async def open_ping_task(
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[ ) -> tuple[MktPair, KucoinMktPair]:
MktPair,
KucoinMktPair,
]:
''' '''
Query for and return both a `piker.accounting.MktPair` and Query for and return a `MktPair` and `KucoinMktPair`.
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
''' '''
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
@ -749,8 +726,6 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}') log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols: for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str) mkt, pair = await get_mkt_info(sym_str)
init_msgs.append( init_msgs.append(
FeedInit(mkt_info=mkt) FeedInit(mkt_info=mkt)
@ -758,11 +733,7 @@ async def stream_quotes(
ws: NoBsWs ws: NoBsWs
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
log.info('API reported ping_interval: {ping_interval}\n') connect_id = str(uuid4())
connect_id: str = str(uuid4())
typ: str
quote: dict
async with ( async with (
open_autorecon_ws( open_autorecon_ws(
( (
@ -776,37 +747,20 @@ async def stream_quotes(
), ),
) as ws, ) as ws,
open_ping_task(ws, ping_interval, connect_id), open_ping_task(ws, ping_interval, connect_id),
aclosing( aclosing(stream_messages(ws, sym_str)) as msg_gen,
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
): ):
typ, quote = await anext(iter_quotes) typ, quote = await anext(msg_gen)
while typ != 'trade':
# take care to not unblock here until we get a real # take care to not unblock here until we get a real
# trade quote? # trade quote
# ^TODO, remove this right? typ, quote = await anext(msg_gen)
# -[ ] what often blocks chart boot/new-feed switching
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
feed_is_live.set() feed_is_live.set()
# XXX NOTE, DO NOT include the `.<backend>` suffix! async for typ, msg in msg_gen:
# OW the sampling loop will not broadcast correctly.. await send_chan.send({sym_str: msg})
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm @acm
@ -861,7 +815,7 @@ async def subscribe(
) )
async def iter_normed_quotes( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
sym: str, sym: str,
@ -892,9 +846,6 @@ async def iter_normed_quotes(
yield 'trade', { yield 'trade', {
'symbol': sym, 'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price, 'last': trade_data.price,
'brokerd_ts': last_trade_ts, 'brokerd_ts': last_trade_ts,
'ticks': [ 'ticks': [
@ -987,7 +938,7 @@ async def open_history_client(
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
log.debug( print(
f'difference in time between load and processing' f'difference in time between load and processing'
f'{inow - times[-1]}' f'{inow - times[-1]}'
) )

View File

@ -104,15 +104,14 @@ def get_app_dir(
# `tractor`) with the testing dir and check for it whenever we # `tractor`) with the testing dir and check for it whenever we
# detect `pytest` is being used (which it isn't under normal # detect `pytest` is being used (which it isn't under normal
# operation). # operation).
# if "pytest" in sys.modules: if "pytest" in sys.modules:
# import tractor import tractor
# actor = tractor.current_actor(err_on_no_runtime=False) actor = tractor.current_actor(err_on_no_runtime=False)
# if actor: # runtime is up if actor: # runtime is up
# rvs = tractor._state._runtime_vars rvs = tractor._state._runtime_vars
# import pdbp; pdbp.set_trace() testdirpath = Path(rvs['piker_vars']['piker_test_dir'])
# testdirpath = Path(rvs['piker_vars']['piker_test_dir']) assert testdirpath.exists(), 'piker test harness might be borked!?'
# assert testdirpath.exists(), 'piker test harness might be borked!?' app_name = str(testdirpath)
# app_name = str(testdirpath)
if platform.system() == 'Windows': if platform.system() == 'Windows':
key = "APPDATA" if roaming else "LOCALAPPDATA" key = "APPDATA" if roaming else "LOCALAPPDATA"

View File

@ -144,4 +144,4 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" } asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" } tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
msgspec = { git = "https://github.com/jcrist/msgspec.git" } msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor", editable = true } tractor = { path = "../tractor" }