Compare commits
5 Commits
5466acb764
...
573ee3e7a3
| Author | SHA1 | Date |
|---|---|---|
|
|
573ee3e7a3 | |
|
|
d326eaccca | |
|
|
c210c286a5 | |
|
|
12b015cec4 | |
|
|
c5fa262474 |
|
|
@ -501,7 +501,7 @@ async def update_ledger_from_api_trades(
|
||||||
for fill in fills:
|
for fill in fills:
|
||||||
con: Contract = fill.contract
|
con: Contract = fill.contract
|
||||||
conid: str = con.conId
|
conid: str = con.conId
|
||||||
pexch: str|None = con.primaryExchange or con.exchange
|
pexch: str | None = con.primaryExchange
|
||||||
|
|
||||||
if not pexch:
|
if not pexch:
|
||||||
cons = await client.get_con(conid=conid)
|
cons = await client.get_con(conid=conid)
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ import hashlib
|
||||||
import hmac
|
import hmac
|
||||||
import base64
|
import base64
|
||||||
import tractor
|
import tractor
|
||||||
# from tractor._exceptions import reg_err_types
|
from tractor._exceptions import reg_err_types
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from piker import config
|
from piker import config
|
||||||
|
|
@ -109,37 +109,37 @@ def get_kraken_signature(
|
||||||
return sigdigest.decode()
|
return sigdigest.decode()
|
||||||
|
|
||||||
|
|
||||||
# class InvalidKey(ValueError):
|
class InvalidKey(ValueError):
|
||||||
# '''
|
'''
|
||||||
# EAPI:Invalid key
|
EAPI:Invalid key
|
||||||
|
|
||||||
# This error is returned when the API key used for the call is
|
This error is returned when the API key used for the call is
|
||||||
# either expired or disabled, please review the API key in your
|
either expired or disabled, please review the API key in your
|
||||||
# Settings -> API tab of account management or generate a new one
|
Settings -> API tab of account management or generate a new one
|
||||||
# and update your application.
|
and update your application.
|
||||||
|
|
||||||
# '''
|
'''
|
||||||
|
|
||||||
|
|
||||||
# class InvalidSession(RuntimeError):
|
class InvalidSession(RuntimeError):
|
||||||
# '''
|
'''
|
||||||
# ESession:Invalid session
|
ESession:Invalid session
|
||||||
|
|
||||||
# This error is returned when the ws API key used for an authenticated
|
This error is returned when the ws API key used for an authenticated
|
||||||
# sub/endpoint becomes stale, normally after a sufficient network
|
sub/endpoint becomes stale, normally after a sufficient network
|
||||||
# disconnect/outage.
|
disconnect/outage.
|
||||||
|
|
||||||
# Normally the sub will need to be restarted, likely re-init of the
|
Normally the sub will need to be restarted, likely re-init of the
|
||||||
# auth handshake sequence.
|
auth handshake sequence.
|
||||||
|
|
||||||
# '''
|
'''
|
||||||
# subscription: dict
|
subscription: dict
|
||||||
|
|
||||||
|
|
||||||
# reg_err_types([
|
reg_err_types([
|
||||||
# InvalidKey,
|
InvalidKey,
|
||||||
# InvalidSession,
|
InvalidSession,
|
||||||
# ])
|
])
|
||||||
|
|
||||||
|
|
||||||
class Client:
|
class Client:
|
||||||
|
|
@ -176,7 +176,6 @@ class Client:
|
||||||
self._api_key = api_key
|
self._api_key = api_key
|
||||||
self._secret = secret
|
self._secret = secret
|
||||||
self.conf: dict[str, str] = config
|
self.conf: dict[str, str] = config
|
||||||
self._ws_token: str|None = None
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pairs(self) -> dict[str, Pair]:
|
def pairs(self) -> dict[str, Pair]:
|
||||||
|
|
@ -264,22 +263,13 @@ class Client:
|
||||||
async def get_ws_token(
|
async def get_ws_token(
|
||||||
self,
|
self,
|
||||||
params: dict = {},
|
params: dict = {},
|
||||||
force_renewal: bool = False,
|
|
||||||
) -> str:
|
) -> str:
|
||||||
'''
|
'''
|
||||||
Get websocket token for authenticated data stream and cache
|
Get websocket token for authenticated data stream.
|
||||||
it for reuse.
|
|
||||||
|
|
||||||
Assert a value was actually received before return.
|
Assert a value was actually received before return.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if (
|
|
||||||
not force_renewal
|
|
||||||
and
|
|
||||||
self._ws_token
|
|
||||||
):
|
|
||||||
return self._ws_token
|
|
||||||
|
|
||||||
resp = await self.endpoint(
|
resp = await self.endpoint(
|
||||||
'GetWebSocketsToken',
|
'GetWebSocketsToken',
|
||||||
{},
|
{},
|
||||||
|
|
@ -290,8 +280,6 @@ class Client:
|
||||||
# resp token for ws init
|
# resp token for ws init
|
||||||
token: str = resp['result']['token']
|
token: str = resp['result']['token']
|
||||||
assert token
|
assert token
|
||||||
|
|
||||||
self._ws_token: str = token
|
|
||||||
return token
|
return token
|
||||||
|
|
||||||
async def get_assets(
|
async def get_assets(
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ from bidict import bidict
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.devx.pformat import ppfmt
|
from tractor.devx.pformat import ppfmt
|
||||||
# from tractor._exceptions import reg_err_types
|
from tractor._exceptions import reg_err_types
|
||||||
|
|
||||||
from piker.accounting import (
|
from piker.accounting import (
|
||||||
Position,
|
Position,
|
||||||
|
|
@ -97,6 +97,13 @@ MsgUnion = Union[
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class TooFastEdit(Exception):
|
||||||
|
'Edit requests faster then api submissions'
|
||||||
|
|
||||||
|
|
||||||
|
reg_err_types([TooFastEdit])
|
||||||
|
|
||||||
|
|
||||||
# TODO: make this wrap the `api.Client` and `ws` instances
|
# TODO: make this wrap the `api.Client` and `ws` instances
|
||||||
# and give it methods to submit cancel vs. add vs. edit
|
# and give it methods to submit cancel vs. add vs. edit
|
||||||
# requests?
|
# requests?
|
||||||
|
|
@ -128,19 +135,19 @@ async def handle_order_requests(
|
||||||
ws: NoBsWs,
|
ws: NoBsWs,
|
||||||
client: api.Client,
|
client: api.Client,
|
||||||
ems_order_stream: tractor.MsgStream,
|
ems_order_stream: tractor.MsgStream,
|
||||||
|
token: str,
|
||||||
apiflows: OrderDialogs,
|
apiflows: OrderDialogs,
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
reqids2txids: dict[int, str],
|
reqids2txids: dict[int, str],
|
||||||
toofastedit: set[int],
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
`trio.Task` which handles order ctl requests from the EMS and
|
Process new order submission requests from the EMS
|
||||||
deliver acks or errors back on that IPC dialog.
|
and deliver acks or errors.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# XXX: UGH, let's unify this.. with ``msgspec``!!!
|
# XXX: UGH, let's unify this.. with ``msgspec``!!!
|
||||||
msg: dict|Order
|
msg: dict | Order
|
||||||
async for msg in ems_order_stream:
|
async for msg in ems_order_stream:
|
||||||
log.info(f'Rx order msg:\n{pformat(msg)}')
|
log.info(f'Rx order msg:\n{pformat(msg)}')
|
||||||
match msg:
|
match msg:
|
||||||
|
|
@ -154,13 +161,8 @@ async def handle_order_requests(
|
||||||
txid = reqids2txids[reqid]
|
txid = reqids2txids[reqid]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# XXX: not sure if this block ever gets hit now?
|
# XXX: not sure if this block ever gets hit now?
|
||||||
# SEEMS TO on the race case with the update task?
|
|
||||||
# - update dark order quickly after
|
|
||||||
# triggered-submitted and then we have inavlid
|
|
||||||
# value in `reqids2txids` sent over ws.send()??
|
|
||||||
log.error('TOO FAST CANCEL/EDIT')
|
log.error('TOO FAST CANCEL/EDIT')
|
||||||
toofastedit.add(reqid)
|
reqids2txids[reqid] = TooFastEdit(reqid)
|
||||||
reqids2txids[reqid] = reqid
|
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
BrokerdError(
|
BrokerdError(
|
||||||
oid=msg['oid'],
|
oid=msg['oid'],
|
||||||
|
|
@ -176,7 +178,7 @@ async def handle_order_requests(
|
||||||
# https://docs.kraken.com/websockets/#message-cancelOrder
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
await ws.send_msg({
|
await ws.send_msg({
|
||||||
'event': 'cancelOrder',
|
'event': 'cancelOrder',
|
||||||
'token': await client.get_ws_token(),
|
'token': token,
|
||||||
'reqid': reqid,
|
'reqid': reqid,
|
||||||
'txid': [txid], # should be txid from submission
|
'txid': [txid], # should be txid from submission
|
||||||
})
|
})
|
||||||
|
|
@ -198,15 +200,13 @@ async def handle_order_requests(
|
||||||
|
|
||||||
# XXX: not sure if this block ever gets hit now?
|
# XXX: not sure if this block ever gets hit now?
|
||||||
log.error('TOO FAST EDIT')
|
log.error('TOO FAST EDIT')
|
||||||
reqids2txids[reqid] = reqid
|
reqids2txids[reqid] = TooFastEdit(reqid)
|
||||||
toofastedit.add(reqid)
|
|
||||||
await tractor.pause()
|
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
BrokerdError(
|
BrokerdError(
|
||||||
oid=msg['oid'],
|
oid=msg['oid'],
|
||||||
symbol=msg['symbol'],
|
symbol=msg['symbol'],
|
||||||
reason=(
|
reason=(
|
||||||
f'TooFastEdit reqid: {reqid}, cancelling..'
|
f'TooFastEdit reqid:{reqid}, cancelling..'
|
||||||
),
|
),
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
@ -252,7 +252,7 @@ async def handle_order_requests(
|
||||||
# https://docs.kraken.com/websockets/#message-addOrder
|
# https://docs.kraken.com/websockets/#message-addOrder
|
||||||
req = {
|
req = {
|
||||||
'event': ep,
|
'event': ep,
|
||||||
'token': await client.get_ws_token(),
|
'token': token,
|
||||||
|
|
||||||
'reqid': reqid, # remapped-to-int uid from ems
|
'reqid': reqid, # remapped-to-int uid from ems
|
||||||
# XXX: we set these to the same value since for us
|
# XXX: we set these to the same value since for us
|
||||||
|
|
@ -296,8 +296,7 @@ async def handle_order_requests(
|
||||||
symbol=msg['symbol'],
|
symbol=msg['symbol'],
|
||||||
reason=(
|
reason=(
|
||||||
'Invalid request msg:\n{msg}'
|
'Invalid request msg:\n{msg}'
|
||||||
),
|
))
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -329,19 +328,7 @@ async def subscribe(
|
||||||
'''
|
'''
|
||||||
# more specific logic for this in kraken's sync client:
|
# more specific logic for this in kraken's sync client:
|
||||||
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
|
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
|
||||||
latest_token: str = await client.get_ws_token()
|
assert token
|
||||||
if (
|
|
||||||
token
|
|
||||||
!=
|
|
||||||
latest_token
|
|
||||||
):
|
|
||||||
log.info(
|
|
||||||
f'RE-subscribing to WS connection..\n'
|
|
||||||
f'orig-token: {token!r}\n'
|
|
||||||
f'latest-token: {latest_token!r}\n'
|
|
||||||
)
|
|
||||||
token = latest_token
|
|
||||||
|
|
||||||
subnames: set[str] = set()
|
subnames: set[str] = set()
|
||||||
|
|
||||||
for name, sub_opts in subs:
|
for name, sub_opts in subs:
|
||||||
|
|
@ -349,8 +336,7 @@ async def subscribe(
|
||||||
'event': 'subscribe',
|
'event': 'subscribe',
|
||||||
'subscription': {
|
'subscription': {
|
||||||
'name': name,
|
'name': name,
|
||||||
# 'token': await client.get_ws_token(),
|
'token': token,
|
||||||
'token': latest_token,
|
|
||||||
**sub_opts,
|
**sub_opts,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -365,9 +351,7 @@ async def subscribe(
|
||||||
# wait on subscriptionn acks
|
# wait on subscriptionn acks
|
||||||
with trio.move_on_after(5):
|
with trio.move_on_after(5):
|
||||||
while True:
|
while True:
|
||||||
msg: dict = await ws.recv_msg()
|
match (msg := await ws.recv_msg()):
|
||||||
fmt_msg: str = ppfmt(msg)
|
|
||||||
match msg:
|
|
||||||
case {
|
case {
|
||||||
'event': 'subscriptionStatus',
|
'event': 'subscriptionStatus',
|
||||||
'status': 'subscribed',
|
'status': 'subscribed',
|
||||||
|
|
@ -398,36 +382,23 @@ async def subscribe(
|
||||||
exc = etype(
|
exc = etype(
|
||||||
f'{ev_msg}\n'
|
f'{ev_msg}\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f'{fmt_msg}'
|
f'{ppfmt(msg)}'
|
||||||
)
|
)
|
||||||
# !TODO, for `InvalidSession` we should
|
# !TODO, for `InvalidSession` we should
|
||||||
# attempt retries to resub and ensure all
|
# attempt retries to resub and ensure all
|
||||||
# sibling (task) `token` holders update
|
# sibling (task) `token` holders update
|
||||||
# their refs accoridingly!
|
# their refs accoridingly!
|
||||||
match (etype_str, ev_msg):
|
if isinstance(exc, api.InvalidSession):
|
||||||
case (
|
|
||||||
'ESession',
|
|
||||||
'Invalid session',
|
|
||||||
):
|
|
||||||
# attempt ws-token refresh
|
# attempt ws-token refresh
|
||||||
token: str = await client.get_ws_token(
|
token: str = await client.get_ws_token()
|
||||||
force_renewal=True
|
|
||||||
)
|
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
continue
|
|
||||||
|
|
||||||
case _:
|
|
||||||
log.warning(
|
|
||||||
f'Unhandled subscription-status,\n'
|
|
||||||
f'{fmt_msg}'
|
|
||||||
)
|
|
||||||
|
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Unknown ws event rxed?\n'
|
f'Unknown ws event rxed?\n'
|
||||||
f'{fmt_msg}'
|
f'{ppfmt(msg)}'
|
||||||
)
|
)
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
@ -675,10 +646,6 @@ async def open_trade_dialog(
|
||||||
|
|
||||||
token: str = await client.get_ws_token()
|
token: str = await client.get_ws_token()
|
||||||
|
|
||||||
# XXX tracks EMS orders which are updated too quickly
|
|
||||||
# on the emds side with sync-issues on the kraken side.
|
|
||||||
toofastedit: set[int] = set()
|
|
||||||
|
|
||||||
ws: NoBsWs
|
ws: NoBsWs
|
||||||
async with (
|
async with (
|
||||||
ctx.open_stream() as ems_stream,
|
ctx.open_stream() as ems_stream,
|
||||||
|
|
@ -694,16 +661,16 @@ async def open_trade_dialog(
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
# task for processing inbound requests from ems
|
# task for processing inbound requests from ems
|
||||||
tn.start_soon(partial(
|
tn.start_soon(
|
||||||
handle_order_requests,
|
handle_order_requests,
|
||||||
ws=ws,
|
ws,
|
||||||
client=client,
|
client,
|
||||||
ems_order_stream=ems_stream,
|
ems_stream,
|
||||||
apiflows=apiflows,
|
token,
|
||||||
ids=ids,
|
apiflows,
|
||||||
reqids2txids=reqids2txids,
|
ids,
|
||||||
toofastedit=toofastedit,
|
reqids2txids,
|
||||||
))
|
)
|
||||||
|
|
||||||
# enter relay loop
|
# enter relay loop
|
||||||
await handle_order_updates(
|
await handle_order_updates(
|
||||||
|
|
@ -714,11 +681,11 @@ async def open_trade_dialog(
|
||||||
apiflows=apiflows,
|
apiflows=apiflows,
|
||||||
ids=ids,
|
ids=ids,
|
||||||
reqids2txids=reqids2txids,
|
reqids2txids=reqids2txids,
|
||||||
toofastedit=toofastedit,
|
|
||||||
acnt=acnt,
|
acnt=acnt,
|
||||||
ledger=ledger,
|
ledger=ledger,
|
||||||
acctid=acctid,
|
acctid=acctid,
|
||||||
acc_name=fqan,
|
acc_name=fqan,
|
||||||
|
token=token,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -730,7 +697,6 @@ async def handle_order_updates(
|
||||||
apiflows: OrderDialogs,
|
apiflows: OrderDialogs,
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
reqids2txids: bidict[int, str],
|
reqids2txids: bidict[int, str],
|
||||||
toofastedit: set[int],
|
|
||||||
acnt: Account,
|
acnt: Account,
|
||||||
|
|
||||||
# transaction records which will be updated
|
# transaction records which will be updated
|
||||||
|
|
@ -739,6 +705,7 @@ async def handle_order_updates(
|
||||||
# ledger_trans: dict[str, Transaction],
|
# ledger_trans: dict[str, Transaction],
|
||||||
acctid: str,
|
acctid: str,
|
||||||
acc_name: str,
|
acc_name: str,
|
||||||
|
token: str,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
|
@ -868,7 +835,7 @@ async def handle_order_updates(
|
||||||
for order_msg in order_msgs:
|
for order_msg in order_msgs:
|
||||||
log.info(
|
log.info(
|
||||||
f'`openOrders` msg update_{seq}:\n'
|
f'`openOrders` msg update_{seq}:\n'
|
||||||
f'{ppfmt(order_msg)}'
|
f'{pformat(order_msg)}'
|
||||||
)
|
)
|
||||||
txid, update_msg = list(order_msg.items())[0]
|
txid, update_msg = list(order_msg.items())[0]
|
||||||
|
|
||||||
|
|
@ -1038,8 +1005,10 @@ async def handle_order_updates(
|
||||||
# <-> ems dialog.
|
# <-> ems dialog.
|
||||||
if (
|
if (
|
||||||
status == 'open'
|
status == 'open'
|
||||||
and
|
and isinstance(
|
||||||
reqid in toofastedit
|
reqids2txids.get(reqid),
|
||||||
|
TooFastEdit
|
||||||
|
)
|
||||||
):
|
):
|
||||||
# TODO: don't even allow this case
|
# TODO: don't even allow this case
|
||||||
# by not moving the client side line
|
# by not moving the client side line
|
||||||
|
|
@ -1054,8 +1023,7 @@ async def handle_order_updates(
|
||||||
# https://docs.kraken.com/websockets/#message-cancelOrder
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
await ws.send_msg({
|
await ws.send_msg({
|
||||||
'event': 'cancelOrder',
|
'event': 'cancelOrder',
|
||||||
# 'token': token,
|
'token': token,
|
||||||
'token': await client.get_ws_token(),
|
|
||||||
'reqid': reqid or 0,
|
'reqid': reqid or 0,
|
||||||
'txid': [txid],
|
'txid': [txid],
|
||||||
})
|
})
|
||||||
|
|
@ -1201,8 +1169,7 @@ async def handle_order_updates(
|
||||||
txid
|
txid
|
||||||
|
|
||||||
# we throttle too-fast-requests on the ems side
|
# we throttle too-fast-requests on the ems side
|
||||||
and
|
and not isinstance(txid, TooFastEdit)
|
||||||
reqid in toofastedit
|
|
||||||
):
|
):
|
||||||
# client was editting too quickly
|
# client was editting too quickly
|
||||||
# so we instead cancel this order
|
# so we instead cancel this order
|
||||||
|
|
@ -1210,8 +1177,7 @@ async def handle_order_updates(
|
||||||
f'Cancelling {reqid}@{txid} due to:\n {event}')
|
f'Cancelling {reqid}@{txid} due to:\n {event}')
|
||||||
await ws.send_msg({
|
await ws.send_msg({
|
||||||
'event': 'cancelOrder',
|
'event': 'cancelOrder',
|
||||||
# 'token': token,
|
'token': token,
|
||||||
'token': await client.get_ws_token(),
|
|
||||||
'reqid': reqid or 0,
|
'reqid': reqid or 0,
|
||||||
'txid': [txid],
|
'txid': [txid],
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -1022,22 +1022,13 @@ async def open_order_mode(
|
||||||
started.set()
|
started.set()
|
||||||
|
|
||||||
for oid, msg in ems_dialog_msgs.items():
|
for oid, msg in ems_dialog_msgs.items():
|
||||||
|
|
||||||
# HACK ALERT: ensure a resp field is filled out since
|
# HACK ALERT: ensure a resp field is filled out since
|
||||||
# techincally the call below expects a ``Status``. TODO:
|
# techincally the call below expects a ``Status``. TODO:
|
||||||
# parse into proper ``Status`` equivalents ems-side?
|
# parse into proper ``Status`` equivalents ems-side?
|
||||||
# msg.setdefault('resp', msg['broker_details']['resp'])
|
# msg.setdefault('resp', msg['broker_details']['resp'])
|
||||||
# msg.setdefault('oid', msg['broker_details']['oid'])
|
# msg.setdefault('oid', msg['broker_details']['oid'])
|
||||||
ya_msg: dict = msg.setdefault(
|
msg['brokerd_msg'] = msg
|
||||||
'brokerd_msg',
|
|
||||||
msg,
|
|
||||||
)
|
|
||||||
if msg is not ya_msg:
|
|
||||||
log.warning(
|
|
||||||
f'A `.brokerd_msg` was already set for ems-dialog msg?\n'
|
|
||||||
f'oid: {oid!r}\n'
|
|
||||||
f'ya_msg: {ya_msg!r}\n'
|
|
||||||
f'msg: {ya_msg!r}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
await process_trade_msg(
|
await process_trade_msg(
|
||||||
mode,
|
mode,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue