Compare commits
9 Commits
f8e05effbc
...
07502aab1e
Author | SHA1 | Date |
---|---|---|
|
07502aab1e | |
|
492d673ec0 | |
|
cf9cb67b5d | |
|
897d51d564 | |
|
17e29715af | |
|
68940e8a73 | |
|
42fdb204dc | |
|
0ea0f3f293 | |
|
b901dce551 |
|
@ -30,7 +30,8 @@ from types import ModuleType
|
|||
from typing import (
|
||||
Any,
|
||||
Iterator,
|
||||
Generator
|
||||
Generator,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import pendulum
|
||||
|
@ -59,8 +60,10 @@ from ..clearing._messages import (
|
|||
BrokerdPosition,
|
||||
)
|
||||
from piker.types import Struct
|
||||
from piker.data._symcache import SymbologyCache
|
||||
from ..log import get_logger
|
||||
from piker.log import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from piker.data._symcache import SymbologyCache
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
@ -493,6 +496,17 @@ class Account(Struct):
|
|||
|
||||
_mktmap_table: dict[str, MktPair] | None = None,
|
||||
|
||||
only_require: list[str]|True = True,
|
||||
# ^list of fqmes that are "required" to be processed from
|
||||
# this ledger pass; we often don't care about others and
|
||||
# definitely shouldn't always error in such cases.
|
||||
# (eg. broker backend loaded that doesn't yet supsport the
|
||||
# symcache but also, inside the paper engine we don't ad-hoc
|
||||
# request `get_mkt_info()` for every symbol in the ledger,
|
||||
# only the one for which we're simulating against).
|
||||
# TODO, not sure if there's a better soln for this, ideally
|
||||
# all backends get symcache support afap i guess..
|
||||
|
||||
) -> dict[str, Position]:
|
||||
'''
|
||||
Update the internal `.pps[str, Position]` table from input
|
||||
|
@ -535,11 +549,32 @@ class Account(Struct):
|
|||
if _mktmap_table is None:
|
||||
raise
|
||||
|
||||
required: bool = (
|
||||
only_require is True
|
||||
or (
|
||||
only_require is not True
|
||||
and
|
||||
fqme in only_require
|
||||
)
|
||||
)
|
||||
# XXX: caller is allowed to provide a fallback
|
||||
# mktmap table for the case where a new position is
|
||||
# being added and the preloaded symcache didn't
|
||||
# have this entry prior (eg. with frickin IB..)
|
||||
mkt = _mktmap_table[fqme]
|
||||
if (
|
||||
not (mkt := _mktmap_table.get(fqme))
|
||||
and
|
||||
required
|
||||
):
|
||||
raise
|
||||
|
||||
elif not required:
|
||||
continue
|
||||
|
||||
else:
|
||||
# should be an entry retreived somewhere
|
||||
assert mkt
|
||||
|
||||
|
||||
if not (pos := pps.get(bs_mktid)):
|
||||
|
||||
|
@ -656,7 +691,7 @@ class Account(Struct):
|
|||
def write_config(self) -> None:
|
||||
'''
|
||||
Write the current account state to the user's account TOML file, normally
|
||||
something like ``pps.toml``.
|
||||
something like `pps.toml`.
|
||||
|
||||
'''
|
||||
# TODO: show diff output?
|
||||
|
|
|
@ -567,6 +567,7 @@ class Client:
|
|||
) -> str:
|
||||
return {
|
||||
'USDTM': 'usdtm_futes',
|
||||
'SPOT': 'spot',
|
||||
# 'COINM': 'coin_futes',
|
||||
# ^-TODO-^ bc someone might want it..?
|
||||
}[pair.venue]
|
||||
|
|
|
@ -181,7 +181,6 @@ class FutesPair(Pair):
|
|||
quoteAsset: str # 'USDT',
|
||||
quotePrecision: int # 8,
|
||||
requiredMarginPercent: float # '5.0000',
|
||||
settlePlan: int # 0,
|
||||
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
|
||||
triggerProtect: float # '0.0500',
|
||||
underlyingSubType: list[str] # ['PoW'],
|
||||
|
|
|
@ -111,6 +111,10 @@ class KucoinMktPair(Struct, frozen=True):
|
|||
quoteMaxSize: float
|
||||
quoteMinSize: float
|
||||
symbol: str # our bs_mktid, kucoin's internal id
|
||||
feeCategory: int
|
||||
makerFeeCoefficient: float
|
||||
takerFeeCoefficient: float
|
||||
st: bool
|
||||
|
||||
|
||||
class AccountTrade(Struct, frozen=True):
|
||||
|
@ -593,7 +597,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
|
|||
'''
|
||||
async with (
|
||||
httpx.AsyncClient(
|
||||
base_url=f'https://api.kucoin.com/api',
|
||||
base_url='https://api.kucoin.com/api',
|
||||
) as trio_client,
|
||||
):
|
||||
client = Client(httpx_client=trio_client)
|
||||
|
@ -637,7 +641,7 @@ async def open_ping_task(
|
|||
await trio.sleep((ping_interval - 1000) / 1000)
|
||||
await ws.send_msg({'id': connect_id, 'type': 'ping'})
|
||||
|
||||
log.info('Starting ping task for kucoin ws connection')
|
||||
log.warning('Starting ping task for kucoin ws connection')
|
||||
n.start_soon(ping_server)
|
||||
|
||||
yield
|
||||
|
@ -649,9 +653,14 @@ async def open_ping_task(
|
|||
async def get_mkt_info(
|
||||
fqme: str,
|
||||
|
||||
) -> tuple[MktPair, KucoinMktPair]:
|
||||
) -> tuple[
|
||||
MktPair,
|
||||
KucoinMktPair,
|
||||
]:
|
||||
'''
|
||||
Query for and return a `MktPair` and `KucoinMktPair`.
|
||||
Query for and return both a `piker.accounting.MktPair` and
|
||||
`KucoinMktPair` from provided `fqme: str`
|
||||
(fully-qualified-market-endpoint).
|
||||
|
||||
'''
|
||||
async with open_cached_client('kucoin') as client:
|
||||
|
@ -726,6 +735,8 @@ async def stream_quotes(
|
|||
|
||||
log.info(f'Starting up quote stream(s) for {symbols}')
|
||||
for sym_str in symbols:
|
||||
mkt: MktPair
|
||||
pair: KucoinMktPair
|
||||
mkt, pair = await get_mkt_info(sym_str)
|
||||
init_msgs.append(
|
||||
FeedInit(mkt_info=mkt)
|
||||
|
@ -733,7 +744,11 @@ async def stream_quotes(
|
|||
|
||||
ws: NoBsWs
|
||||
token, ping_interval = await client._get_ws_token()
|
||||
connect_id = str(uuid4())
|
||||
log.info('API reported ping_interval: {ping_interval}\n')
|
||||
|
||||
connect_id: str = str(uuid4())
|
||||
typ: str
|
||||
quote: dict
|
||||
async with (
|
||||
open_autorecon_ws(
|
||||
(
|
||||
|
@ -747,20 +762,37 @@ async def stream_quotes(
|
|||
),
|
||||
) as ws,
|
||||
open_ping_task(ws, ping_interval, connect_id),
|
||||
aclosing(stream_messages(ws, sym_str)) as msg_gen,
|
||||
aclosing(
|
||||
iter_normed_quotes(
|
||||
ws, sym_str
|
||||
)
|
||||
) as iter_quotes,
|
||||
):
|
||||
typ, quote = await anext(msg_gen)
|
||||
typ, quote = await anext(iter_quotes)
|
||||
|
||||
while typ != 'trade':
|
||||
# take care to not unblock here until we get a real
|
||||
# trade quote
|
||||
typ, quote = await anext(msg_gen)
|
||||
# take care to not unblock here until we get a real
|
||||
# trade quote?
|
||||
# ^TODO, remove this right?
|
||||
# -[ ] what often blocks chart boot/new-feed switching
|
||||
# since we'ere waiting for a live quote instead of just
|
||||
# loading history afap..
|
||||
# |_ XXX, not sure if we require a bit of rework to core
|
||||
# feed init logic or if backends justg gotta be
|
||||
# changed up.. feel like there was some causality
|
||||
# dilema prolly only seen with IB too..
|
||||
# while typ != 'trade':
|
||||
# typ, quote = await anext(iter_quotes)
|
||||
|
||||
task_status.started((init_msgs, quote))
|
||||
feed_is_live.set()
|
||||
|
||||
async for typ, msg in msg_gen:
|
||||
await send_chan.send({sym_str: msg})
|
||||
# XXX NOTE, DO NOT include the `.<backend>` suffix!
|
||||
# OW the sampling loop will not broadcast correctly..
|
||||
# since `bus._subscribers.setdefault(bs_fqme, set())`
|
||||
# is used inside `.data.open_feed_bus()` !!!
|
||||
topic: str = mkt.bs_fqme
|
||||
async for typ, quote in iter_quotes:
|
||||
await send_chan.send({topic: quote})
|
||||
|
||||
|
||||
@acm
|
||||
|
@ -815,7 +847,7 @@ async def subscribe(
|
|||
)
|
||||
|
||||
|
||||
async def stream_messages(
|
||||
async def iter_normed_quotes(
|
||||
ws: NoBsWs,
|
||||
sym: str,
|
||||
|
||||
|
@ -846,6 +878,9 @@ async def stream_messages(
|
|||
|
||||
yield 'trade', {
|
||||
'symbol': sym,
|
||||
# TODO, is 'last' even used elsewhere/a-good
|
||||
# semantic? can't we just read the ticks with our
|
||||
# .data.ticktools.frame_ticks()`/
|
||||
'last': trade_data.price,
|
||||
'brokerd_ts': last_trade_ts,
|
||||
'ticks': [
|
||||
|
@ -938,7 +973,7 @@ async def open_history_client(
|
|||
if end_dt is None:
|
||||
inow = round(time.time())
|
||||
|
||||
print(
|
||||
log.debug(
|
||||
f'difference in time between load and processing'
|
||||
f'{inow - times[-1]}'
|
||||
)
|
||||
|
|
|
@ -653,7 +653,11 @@ class Router(Struct):
|
|||
flume = feed.flumes[fqme]
|
||||
first_quote: dict = flume.first_quote
|
||||
book: DarkBook = self.get_dark_book(broker)
|
||||
book.lasts[fqme]: float = float(first_quote['last'])
|
||||
|
||||
if not (last := first_quote.get('last')):
|
||||
last: float = flume.rt_shm.array[-1]['close']
|
||||
|
||||
book.lasts[fqme]: float = float(last)
|
||||
|
||||
async with self.maybe_open_brokerd_dialog(
|
||||
brokermod=brokermod,
|
||||
|
@ -716,7 +720,7 @@ class Router(Struct):
|
|||
subs = self.subscribers[sub_key]
|
||||
|
||||
sent_some: bool = False
|
||||
for client_stream in subs:
|
||||
for client_stream in subs.copy():
|
||||
try:
|
||||
await client_stream.send(msg)
|
||||
sent_some = True
|
||||
|
@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events(
|
|||
status_msg.brokerd_msg = msg
|
||||
status_msg.src = msg.broker_details['name']
|
||||
|
||||
await router.client_broadcast(
|
||||
status_msg.req.symbol,
|
||||
status_msg,
|
||||
)
|
||||
if not status_msg.req:
|
||||
# likely some order change state?
|
||||
await tractor.pause()
|
||||
else:
|
||||
await router.client_broadcast(
|
||||
status_msg.req.symbol,
|
||||
status_msg,
|
||||
)
|
||||
|
||||
if status == 'closed':
|
||||
log.info(f'Execution for {oid} is complete!')
|
||||
|
|
|
@ -653,6 +653,7 @@ async def open_trade_dialog(
|
|||
# in) use manually constructed table from calling
|
||||
# the `.get_mkt_info()` provider EP above.
|
||||
_mktmap_table=mkt_by_fqme,
|
||||
only_require=list(mkt_by_fqme),
|
||||
)
|
||||
|
||||
pp_msgs: list[BrokerdPosition] = []
|
||||
|
|
|
@ -273,7 +273,7 @@ async def _reconnect_forever(
|
|||
nobsws._connected.set()
|
||||
await trio.sleep_forever()
|
||||
except HandshakeError:
|
||||
log.exception(f'Retrying connection')
|
||||
log.exception('Retrying connection')
|
||||
|
||||
# ws & nursery block ends
|
||||
|
||||
|
@ -359,8 +359,8 @@ async def open_autorecon_ws(
|
|||
|
||||
|
||||
'''
|
||||
JSONRPC response-request style machinery for transparent multiplexing of msgs
|
||||
over a NoBsWs.
|
||||
JSONRPC response-request style machinery for transparent multiplexing
|
||||
of msgs over a NoBsWs.
|
||||
|
||||
'''
|
||||
|
||||
|
@ -377,16 +377,25 @@ async def open_jsonrpc_session(
|
|||
url: str,
|
||||
start_id: int = 0,
|
||||
response_type: type = JSONRPCResult,
|
||||
request_type: Optional[type] = None,
|
||||
request_hook: Optional[Callable] = None,
|
||||
error_hook: Optional[Callable] = None,
|
||||
) -> Callable[[str, dict], dict]:
|
||||
'''
|
||||
Init a json-RPC-over-websocket connection to the provided `url`.
|
||||
|
||||
A `json_rpc: Callable[[str, dict], dict` is delivered to the
|
||||
caller for sending requests and a bg-`trio.Task` handles
|
||||
processing of response msgs including error reporting/raising in
|
||||
the parent/caller task.
|
||||
|
||||
'''
|
||||
# NOTE, store all request msgs so we can raise errors on the
|
||||
# caller side!
|
||||
req_msgs: dict[int, dict] = {}
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
trio.open_nursery() as tn,
|
||||
open_autorecon_ws(url) as ws
|
||||
):
|
||||
rpc_id: Iterable = count(start_id)
|
||||
rpc_id: Iterable[int] = count(start_id)
|
||||
rpc_results: dict[int, dict] = {}
|
||||
|
||||
async def json_rpc(method: str, params: dict) -> dict:
|
||||
|
@ -394,26 +403,40 @@ async def open_jsonrpc_session(
|
|||
perform a json rpc call and wait for the result, raise exception in
|
||||
case of error field present on response
|
||||
'''
|
||||
nonlocal req_msgs
|
||||
|
||||
req_id: int = next(rpc_id)
|
||||
msg = {
|
||||
'jsonrpc': '2.0',
|
||||
'id': next(rpc_id),
|
||||
'id': req_id,
|
||||
'method': method,
|
||||
'params': params
|
||||
}
|
||||
_id = msg['id']
|
||||
|
||||
rpc_results[_id] = {
|
||||
result = rpc_results[_id] = {
|
||||
'result': None,
|
||||
'event': trio.Event()
|
||||
'error': None,
|
||||
'event': trio.Event(), # signal caller resp arrived
|
||||
}
|
||||
req_msgs[_id] = msg
|
||||
|
||||
await ws.send_msg(msg)
|
||||
|
||||
# wait for reponse before unblocking requester code
|
||||
await rpc_results[_id]['event'].wait()
|
||||
|
||||
ret = rpc_results[_id]['result']
|
||||
if (maybe_result := result['result']):
|
||||
ret = maybe_result
|
||||
del rpc_results[_id]
|
||||
|
||||
del rpc_results[_id]
|
||||
else:
|
||||
err = result['error']
|
||||
raise Exception(
|
||||
f'JSONRPC request failed\n'
|
||||
f'req: {msg}\n'
|
||||
f'resp: {err}\n'
|
||||
)
|
||||
|
||||
if ret.error is not None:
|
||||
raise Exception(json.dumps(ret.error, indent=4))
|
||||
|
@ -428,6 +451,7 @@ async def open_jsonrpc_session(
|
|||
the server side.
|
||||
|
||||
'''
|
||||
nonlocal req_msgs
|
||||
async for msg in ws:
|
||||
match msg:
|
||||
case {
|
||||
|
@ -451,19 +475,28 @@ async def open_jsonrpc_session(
|
|||
'params': _,
|
||||
}:
|
||||
log.debug(f'Recieved\n{msg}')
|
||||
if request_hook:
|
||||
await request_hook(request_type(**msg))
|
||||
|
||||
case {
|
||||
'error': error
|
||||
}:
|
||||
log.warning(f'Recieved\n{error}')
|
||||
if error_hook:
|
||||
await error_hook(response_type(**msg))
|
||||
# retreive orig request msg, set error
|
||||
# response in original "result" msg,
|
||||
# THEN FINALLY set the event to signal caller
|
||||
# to raise the error in the parent task.
|
||||
req_id: int = error['id']
|
||||
req_msg: dict = req_msgs[req_id]
|
||||
result: dict = rpc_results[req_id]
|
||||
result['error'] = error
|
||||
result['event'].set()
|
||||
log.error(
|
||||
f'JSONRPC request failed\n'
|
||||
f'req: {req_msg}\n'
|
||||
f'resp: {error}\n'
|
||||
)
|
||||
|
||||
case _:
|
||||
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
||||
|
||||
n.start_soon(recv_task)
|
||||
tn.start_soon(recv_task)
|
||||
yield json_rpc
|
||||
n.cancel_scope.cancel()
|
||||
tn.cancel_scope.cancel()
|
||||
|
|
|
@ -458,13 +458,15 @@ async def start_backfill(
|
|||
'bf_until <- last_start_dt:\n'
|
||||
f'{backfill_until_dt} <- {last_start_dt}\n'
|
||||
)
|
||||
|
||||
# ugh, what's a better way?
|
||||
# TODO: fwiw, we probably want a way to signal a throttle
|
||||
# condition (eg. with ib) so that we can halt the
|
||||
# request loop until the condition is resolved?
|
||||
if timeframe > 1:
|
||||
await tractor.pause()
|
||||
# UGH: what's a better way?
|
||||
# TODO: backends are responsible for being correct on
|
||||
# this right!?
|
||||
# -[ ] in the `ib` case we could maybe offer some way
|
||||
# to halt the request loop until the condition is
|
||||
# resolved or should the backend be entirely in
|
||||
# charge of solving such faults? yes, right?
|
||||
# if timeframe > 1:
|
||||
# await tractor.pause()
|
||||
return
|
||||
|
||||
assert (
|
||||
|
@ -572,15 +574,19 @@ async def start_backfill(
|
|||
f'{next_start_dt} -> {last_start_dt}'
|
||||
)
|
||||
|
||||
# always drop the src asset token for
|
||||
# NOTE, always drop the src asset token for
|
||||
# non-currency-pair like market types (for now)
|
||||
#
|
||||
# THAT IS, for now our table key schema is NOT
|
||||
# including the dst[/src] source asset token. SO,
|
||||
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
|
||||
# historical reasons ONLY.
|
||||
if mkt.dst.atype not in {
|
||||
'crypto',
|
||||
'crypto_currency',
|
||||
'fiat', # a "forex pair"
|
||||
'perpetual_future', # stupid "perps" from cex land
|
||||
}:
|
||||
# for now, our table key schema is not including
|
||||
# the dst[/src] source asset token.
|
||||
col_sym_key: str = mkt.get_fqme(
|
||||
delim_char='',
|
||||
without_src=True,
|
||||
|
|
Loading…
Reference in New Issue