Compare commits
8 Commits
c210c286a5
...
887e1ea6b7
| Author | SHA1 | Date |
|---|---|---|
|
|
887e1ea6b7 | |
|
|
54da297304 | |
|
|
9f7c38a37b | |
|
|
8299c65818 | |
|
|
170c95da28 | |
|
|
e1cd3fd955 | |
|
|
8cefc1bdf8 | |
|
|
709269fcf7 |
|
|
@ -501,7 +501,7 @@ async def update_ledger_from_api_trades(
|
||||||
for fill in fills:
|
for fill in fills:
|
||||||
con: Contract = fill.contract
|
con: Contract = fill.contract
|
||||||
conid: str = con.conId
|
conid: str = con.conId
|
||||||
pexch: str | None = con.primaryExchange
|
pexch: str|None = con.primaryExchange or con.exchange
|
||||||
|
|
||||||
if not pexch:
|
if not pexch:
|
||||||
cons = await client.get_con(conid=conid)
|
cons = await client.get_con(conid=conid)
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ import hashlib
|
||||||
import hmac
|
import hmac
|
||||||
import base64
|
import base64
|
||||||
import tractor
|
import tractor
|
||||||
from tractor._exceptions import reg_err_types
|
# from tractor._exceptions import reg_err_types
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from piker import config
|
from piker import config
|
||||||
|
|
@ -109,37 +109,37 @@ def get_kraken_signature(
|
||||||
return sigdigest.decode()
|
return sigdigest.decode()
|
||||||
|
|
||||||
|
|
||||||
class InvalidKey(ValueError):
|
# class InvalidKey(ValueError):
|
||||||
'''
|
# '''
|
||||||
EAPI:Invalid key
|
# EAPI:Invalid key
|
||||||
|
|
||||||
This error is returned when the API key used for the call is
|
# This error is returned when the API key used for the call is
|
||||||
either expired or disabled, please review the API key in your
|
# either expired or disabled, please review the API key in your
|
||||||
Settings -> API tab of account management or generate a new one
|
# Settings -> API tab of account management or generate a new one
|
||||||
and update your application.
|
# and update your application.
|
||||||
|
|
||||||
'''
|
# '''
|
||||||
|
|
||||||
|
|
||||||
class InvalidSession(RuntimeError):
|
# class InvalidSession(RuntimeError):
|
||||||
'''
|
# '''
|
||||||
ESession:Invalid session
|
# ESession:Invalid session
|
||||||
|
|
||||||
This error is returned when the ws API key used for an authenticated
|
# This error is returned when the ws API key used for an authenticated
|
||||||
sub/endpoint becomes stale, normally after a sufficient network
|
# sub/endpoint becomes stale, normally after a sufficient network
|
||||||
disconnect/outage.
|
# disconnect/outage.
|
||||||
|
|
||||||
Normally the sub will need to be restarted, likely re-init of the
|
# Normally the sub will need to be restarted, likely re-init of the
|
||||||
auth handshake sequence.
|
# auth handshake sequence.
|
||||||
|
|
||||||
'''
|
# '''
|
||||||
subscription: dict
|
# subscription: dict
|
||||||
|
|
||||||
|
|
||||||
reg_err_types([
|
# reg_err_types([
|
||||||
InvalidKey,
|
# InvalidKey,
|
||||||
InvalidSession,
|
# InvalidSession,
|
||||||
])
|
# ])
|
||||||
|
|
||||||
|
|
||||||
class Client:
|
class Client:
|
||||||
|
|
@ -176,6 +176,7 @@ class Client:
|
||||||
self._api_key = api_key
|
self._api_key = api_key
|
||||||
self._secret = secret
|
self._secret = secret
|
||||||
self.conf: dict[str, str] = config
|
self.conf: dict[str, str] = config
|
||||||
|
self._ws_token: str|None = None
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pairs(self) -> dict[str, Pair]:
|
def pairs(self) -> dict[str, Pair]:
|
||||||
|
|
@ -263,13 +264,22 @@ class Client:
|
||||||
async def get_ws_token(
|
async def get_ws_token(
|
||||||
self,
|
self,
|
||||||
params: dict = {},
|
params: dict = {},
|
||||||
|
force_renewal: bool = False,
|
||||||
) -> str:
|
) -> str:
|
||||||
'''
|
'''
|
||||||
Get websocket token for authenticated data stream.
|
Get websocket token for authenticated data stream and cache
|
||||||
|
it for reuse.
|
||||||
|
|
||||||
Assert a value was actually received before return.
|
Assert a value was actually received before return.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
if (
|
||||||
|
not force_renewal
|
||||||
|
and
|
||||||
|
self._ws_token
|
||||||
|
):
|
||||||
|
return self._ws_token
|
||||||
|
|
||||||
resp = await self.endpoint(
|
resp = await self.endpoint(
|
||||||
'GetWebSocketsToken',
|
'GetWebSocketsToken',
|
||||||
{},
|
{},
|
||||||
|
|
@ -280,6 +290,8 @@ class Client:
|
||||||
# resp token for ws init
|
# resp token for ws init
|
||||||
token: str = resp['result']['token']
|
token: str = resp['result']['token']
|
||||||
assert token
|
assert token
|
||||||
|
|
||||||
|
self._ws_token: str = token
|
||||||
return token
|
return token
|
||||||
|
|
||||||
async def get_assets(
|
async def get_assets(
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ from bidict import bidict
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.devx.pformat import ppfmt
|
from tractor.devx.pformat import ppfmt
|
||||||
from tractor._exceptions import reg_err_types
|
# from tractor._exceptions import reg_err_types
|
||||||
|
|
||||||
from piker.accounting import (
|
from piker.accounting import (
|
||||||
Position,
|
Position,
|
||||||
|
|
@ -97,13 +97,6 @@ MsgUnion = Union[
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class TooFastEdit(Exception):
|
|
||||||
'Edit requests faster then api submissions'
|
|
||||||
|
|
||||||
|
|
||||||
reg_err_types([TooFastEdit])
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: make this wrap the `api.Client` and `ws` instances
|
# TODO: make this wrap the `api.Client` and `ws` instances
|
||||||
# and give it methods to submit cancel vs. add vs. edit
|
# and give it methods to submit cancel vs. add vs. edit
|
||||||
# requests?
|
# requests?
|
||||||
|
|
@ -135,19 +128,19 @@ async def handle_order_requests(
|
||||||
ws: NoBsWs,
|
ws: NoBsWs,
|
||||||
client: api.Client,
|
client: api.Client,
|
||||||
ems_order_stream: tractor.MsgStream,
|
ems_order_stream: tractor.MsgStream,
|
||||||
token: str,
|
|
||||||
apiflows: OrderDialogs,
|
apiflows: OrderDialogs,
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
reqids2txids: dict[int, str],
|
reqids2txids: dict[int, str],
|
||||||
|
toofastedit: set[int],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Process new order submission requests from the EMS
|
`trio.Task` which handles order ctl requests from the EMS and
|
||||||
and deliver acks or errors.
|
deliver acks or errors back on that IPC dialog.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# XXX: UGH, let's unify this.. with ``msgspec``!!!
|
# XXX: UGH, let's unify this.. with ``msgspec``!!!
|
||||||
msg: dict | Order
|
msg: dict|Order
|
||||||
async for msg in ems_order_stream:
|
async for msg in ems_order_stream:
|
||||||
log.info(f'Rx order msg:\n{pformat(msg)}')
|
log.info(f'Rx order msg:\n{pformat(msg)}')
|
||||||
match msg:
|
match msg:
|
||||||
|
|
@ -161,8 +154,13 @@ async def handle_order_requests(
|
||||||
txid = reqids2txids[reqid]
|
txid = reqids2txids[reqid]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# XXX: not sure if this block ever gets hit now?
|
# XXX: not sure if this block ever gets hit now?
|
||||||
|
# SEEMS TO on the race case with the update task?
|
||||||
|
# - update dark order quickly after
|
||||||
|
# triggered-submitted and then we have inavlid
|
||||||
|
# value in `reqids2txids` sent over ws.send()??
|
||||||
log.error('TOO FAST CANCEL/EDIT')
|
log.error('TOO FAST CANCEL/EDIT')
|
||||||
reqids2txids[reqid] = TooFastEdit(reqid)
|
toofastedit.add(reqid)
|
||||||
|
reqids2txids[reqid] = reqid
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
BrokerdError(
|
BrokerdError(
|
||||||
oid=msg['oid'],
|
oid=msg['oid'],
|
||||||
|
|
@ -178,7 +176,7 @@ async def handle_order_requests(
|
||||||
# https://docs.kraken.com/websockets/#message-cancelOrder
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
await ws.send_msg({
|
await ws.send_msg({
|
||||||
'event': 'cancelOrder',
|
'event': 'cancelOrder',
|
||||||
'token': token,
|
'token': await client.get_ws_token(),
|
||||||
'reqid': reqid,
|
'reqid': reqid,
|
||||||
'txid': [txid], # should be txid from submission
|
'txid': [txid], # should be txid from submission
|
||||||
})
|
})
|
||||||
|
|
@ -200,13 +198,15 @@ async def handle_order_requests(
|
||||||
|
|
||||||
# XXX: not sure if this block ever gets hit now?
|
# XXX: not sure if this block ever gets hit now?
|
||||||
log.error('TOO FAST EDIT')
|
log.error('TOO FAST EDIT')
|
||||||
reqids2txids[reqid] = TooFastEdit(reqid)
|
reqids2txids[reqid] = reqid
|
||||||
|
toofastedit.add(reqid)
|
||||||
|
await tractor.pause()
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
BrokerdError(
|
BrokerdError(
|
||||||
oid=msg['oid'],
|
oid=msg['oid'],
|
||||||
symbol=msg['symbol'],
|
symbol=msg['symbol'],
|
||||||
reason=(
|
reason=(
|
||||||
f'TooFastEdit reqid:{reqid}, cancelling..'
|
f'TooFastEdit reqid: {reqid}, cancelling..'
|
||||||
),
|
),
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
@ -252,7 +252,7 @@ async def handle_order_requests(
|
||||||
# https://docs.kraken.com/websockets/#message-addOrder
|
# https://docs.kraken.com/websockets/#message-addOrder
|
||||||
req = {
|
req = {
|
||||||
'event': ep,
|
'event': ep,
|
||||||
'token': token,
|
'token': await client.get_ws_token(),
|
||||||
|
|
||||||
'reqid': reqid, # remapped-to-int uid from ems
|
'reqid': reqid, # remapped-to-int uid from ems
|
||||||
# XXX: we set these to the same value since for us
|
# XXX: we set these to the same value since for us
|
||||||
|
|
@ -296,7 +296,8 @@ async def handle_order_requests(
|
||||||
symbol=msg['symbol'],
|
symbol=msg['symbol'],
|
||||||
reason=(
|
reason=(
|
||||||
'Invalid request msg:\n{msg}'
|
'Invalid request msg:\n{msg}'
|
||||||
))
|
),
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -328,7 +329,19 @@ async def subscribe(
|
||||||
'''
|
'''
|
||||||
# more specific logic for this in kraken's sync client:
|
# more specific logic for this in kraken's sync client:
|
||||||
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
|
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
|
||||||
assert token
|
latest_token: str = await client.get_ws_token()
|
||||||
|
if (
|
||||||
|
token
|
||||||
|
!=
|
||||||
|
latest_token
|
||||||
|
):
|
||||||
|
log.info(
|
||||||
|
f'RE-subscribing to WS connection..\n'
|
||||||
|
f'orig-token: {token!r}\n'
|
||||||
|
f'latest-token: {latest_token!r}\n'
|
||||||
|
)
|
||||||
|
token = latest_token
|
||||||
|
|
||||||
subnames: set[str] = set()
|
subnames: set[str] = set()
|
||||||
|
|
||||||
for name, sub_opts in subs:
|
for name, sub_opts in subs:
|
||||||
|
|
@ -336,7 +349,8 @@ async def subscribe(
|
||||||
'event': 'subscribe',
|
'event': 'subscribe',
|
||||||
'subscription': {
|
'subscription': {
|
||||||
'name': name,
|
'name': name,
|
||||||
'token': token,
|
# 'token': await client.get_ws_token(),
|
||||||
|
'token': latest_token,
|
||||||
**sub_opts,
|
**sub_opts,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -351,7 +365,9 @@ async def subscribe(
|
||||||
# wait on subscriptionn acks
|
# wait on subscriptionn acks
|
||||||
with trio.move_on_after(5):
|
with trio.move_on_after(5):
|
||||||
while True:
|
while True:
|
||||||
match (msg := await ws.recv_msg()):
|
msg: dict = await ws.recv_msg()
|
||||||
|
fmt_msg: str = ppfmt(msg)
|
||||||
|
match msg:
|
||||||
case {
|
case {
|
||||||
'event': 'subscriptionStatus',
|
'event': 'subscriptionStatus',
|
||||||
'status': 'subscribed',
|
'status': 'subscribed',
|
||||||
|
|
@ -382,23 +398,36 @@ async def subscribe(
|
||||||
exc = etype(
|
exc = etype(
|
||||||
f'{ev_msg}\n'
|
f'{ev_msg}\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f'{ppfmt(msg)}'
|
f'{fmt_msg}'
|
||||||
)
|
)
|
||||||
# !TODO, for `InvalidSession` we should
|
# !TODO, for `InvalidSession` we should
|
||||||
# attempt retries to resub and ensure all
|
# attempt retries to resub and ensure all
|
||||||
# sibling (task) `token` holders update
|
# sibling (task) `token` holders update
|
||||||
# their refs accoridingly!
|
# their refs accoridingly!
|
||||||
if isinstance(exc, api.InvalidSession):
|
match (etype_str, ev_msg):
|
||||||
# attempt ws-token refresh
|
case (
|
||||||
token: str = await client.get_ws_token()
|
'ESession',
|
||||||
await tractor.pause()
|
'Invalid session',
|
||||||
|
):
|
||||||
|
# attempt ws-token refresh
|
||||||
|
token: str = await client.get_ws_token(
|
||||||
|
force_renewal=True
|
||||||
|
)
|
||||||
|
await tractor.pause()
|
||||||
|
continue
|
||||||
|
|
||||||
|
case _:
|
||||||
|
log.warning(
|
||||||
|
f'Unhandled subscription-status,\n'
|
||||||
|
f'{fmt_msg}'
|
||||||
|
)
|
||||||
|
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Unknown ws event rxed?\n'
|
f'Unknown ws event rxed?\n'
|
||||||
f'{ppfmt(msg)}'
|
f'{fmt_msg}'
|
||||||
)
|
)
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
@ -646,6 +675,10 @@ async def open_trade_dialog(
|
||||||
|
|
||||||
token: str = await client.get_ws_token()
|
token: str = await client.get_ws_token()
|
||||||
|
|
||||||
|
# XXX tracks EMS orders which are updated too quickly
|
||||||
|
# on the emds side with sync-issues on the kraken side.
|
||||||
|
toofastedit: set[int] = set()
|
||||||
|
|
||||||
ws: NoBsWs
|
ws: NoBsWs
|
||||||
async with (
|
async with (
|
||||||
ctx.open_stream() as ems_stream,
|
ctx.open_stream() as ems_stream,
|
||||||
|
|
@ -661,16 +694,16 @@ async def open_trade_dialog(
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
# task for processing inbound requests from ems
|
# task for processing inbound requests from ems
|
||||||
tn.start_soon(
|
tn.start_soon(partial(
|
||||||
handle_order_requests,
|
handle_order_requests,
|
||||||
ws,
|
ws=ws,
|
||||||
client,
|
client=client,
|
||||||
ems_stream,
|
ems_order_stream=ems_stream,
|
||||||
token,
|
apiflows=apiflows,
|
||||||
apiflows,
|
ids=ids,
|
||||||
ids,
|
reqids2txids=reqids2txids,
|
||||||
reqids2txids,
|
toofastedit=toofastedit,
|
||||||
)
|
))
|
||||||
|
|
||||||
# enter relay loop
|
# enter relay loop
|
||||||
await handle_order_updates(
|
await handle_order_updates(
|
||||||
|
|
@ -681,11 +714,11 @@ async def open_trade_dialog(
|
||||||
apiflows=apiflows,
|
apiflows=apiflows,
|
||||||
ids=ids,
|
ids=ids,
|
||||||
reqids2txids=reqids2txids,
|
reqids2txids=reqids2txids,
|
||||||
|
toofastedit=toofastedit,
|
||||||
acnt=acnt,
|
acnt=acnt,
|
||||||
ledger=ledger,
|
ledger=ledger,
|
||||||
acctid=acctid,
|
acctid=acctid,
|
||||||
acc_name=fqan,
|
acc_name=fqan,
|
||||||
token=token,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -697,6 +730,7 @@ async def handle_order_updates(
|
||||||
apiflows: OrderDialogs,
|
apiflows: OrderDialogs,
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
reqids2txids: bidict[int, str],
|
reqids2txids: bidict[int, str],
|
||||||
|
toofastedit: set[int],
|
||||||
acnt: Account,
|
acnt: Account,
|
||||||
|
|
||||||
# transaction records which will be updated
|
# transaction records which will be updated
|
||||||
|
|
@ -705,7 +739,6 @@ async def handle_order_updates(
|
||||||
# ledger_trans: dict[str, Transaction],
|
# ledger_trans: dict[str, Transaction],
|
||||||
acctid: str,
|
acctid: str,
|
||||||
acc_name: str,
|
acc_name: str,
|
||||||
token: str,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
|
@ -835,7 +868,7 @@ async def handle_order_updates(
|
||||||
for order_msg in order_msgs:
|
for order_msg in order_msgs:
|
||||||
log.info(
|
log.info(
|
||||||
f'`openOrders` msg update_{seq}:\n'
|
f'`openOrders` msg update_{seq}:\n'
|
||||||
f'{pformat(order_msg)}'
|
f'{ppfmt(order_msg)}'
|
||||||
)
|
)
|
||||||
txid, update_msg = list(order_msg.items())[0]
|
txid, update_msg = list(order_msg.items())[0]
|
||||||
|
|
||||||
|
|
@ -1005,10 +1038,8 @@ async def handle_order_updates(
|
||||||
# <-> ems dialog.
|
# <-> ems dialog.
|
||||||
if (
|
if (
|
||||||
status == 'open'
|
status == 'open'
|
||||||
and isinstance(
|
and
|
||||||
reqids2txids.get(reqid),
|
reqid in toofastedit
|
||||||
TooFastEdit
|
|
||||||
)
|
|
||||||
):
|
):
|
||||||
# TODO: don't even allow this case
|
# TODO: don't even allow this case
|
||||||
# by not moving the client side line
|
# by not moving the client side line
|
||||||
|
|
@ -1023,7 +1054,8 @@ async def handle_order_updates(
|
||||||
# https://docs.kraken.com/websockets/#message-cancelOrder
|
# https://docs.kraken.com/websockets/#message-cancelOrder
|
||||||
await ws.send_msg({
|
await ws.send_msg({
|
||||||
'event': 'cancelOrder',
|
'event': 'cancelOrder',
|
||||||
'token': token,
|
# 'token': token,
|
||||||
|
'token': await client.get_ws_token(),
|
||||||
'reqid': reqid or 0,
|
'reqid': reqid or 0,
|
||||||
'txid': [txid],
|
'txid': [txid],
|
||||||
})
|
})
|
||||||
|
|
@ -1169,7 +1201,8 @@ async def handle_order_updates(
|
||||||
txid
|
txid
|
||||||
|
|
||||||
# we throttle too-fast-requests on the ems side
|
# we throttle too-fast-requests on the ems side
|
||||||
and not isinstance(txid, TooFastEdit)
|
and
|
||||||
|
reqid in toofastedit
|
||||||
):
|
):
|
||||||
# client was editting too quickly
|
# client was editting too quickly
|
||||||
# so we instead cancel this order
|
# so we instead cancel this order
|
||||||
|
|
@ -1177,7 +1210,8 @@ async def handle_order_updates(
|
||||||
f'Cancelling {reqid}@{txid} due to:\n {event}')
|
f'Cancelling {reqid}@{txid} due to:\n {event}')
|
||||||
await ws.send_msg({
|
await ws.send_msg({
|
||||||
'event': 'cancelOrder',
|
'event': 'cancelOrder',
|
||||||
'token': token,
|
# 'token': token,
|
||||||
|
'token': await client.get_ws_token(),
|
||||||
'reqid': reqid or 0,
|
'reqid': reqid or 0,
|
||||||
'txid': [txid],
|
'txid': [txid],
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,6 @@ from collections import (
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from math import isnan
|
from math import isnan
|
||||||
from pprint import pformat
|
|
||||||
from time import time_ns
|
from time import time_ns
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
|
|
@ -43,6 +42,7 @@ import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import trionics
|
from tractor import trionics
|
||||||
|
from tractor.devx.pformat import ppfmt
|
||||||
|
|
||||||
from ._util import (
|
from ._util import (
|
||||||
log, # sub-sys logger
|
log, # sub-sys logger
|
||||||
|
|
@ -490,7 +490,7 @@ async def open_brokerd_dialog(
|
||||||
msg = BrokerdPosition(**msg)
|
msg = BrokerdPosition(**msg)
|
||||||
log.info(
|
log.info(
|
||||||
f'loading pp for {brokermod.__name__}:\n'
|
f'loading pp for {brokermod.__name__}:\n'
|
||||||
f'{pformat(msg.to_dict())}',
|
f'{ppfmt(msg.to_dict())}',
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: state any mismatch here?
|
# TODO: state any mismatch here?
|
||||||
|
|
@ -840,7 +840,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
brokerd_msg: dict[str, Any]
|
brokerd_msg: dict[str, Any]
|
||||||
async for brokerd_msg in brokerd_trades_stream:
|
async for brokerd_msg in brokerd_trades_stream:
|
||||||
fmsg = pformat(brokerd_msg)
|
fmsg = ppfmt(brokerd_msg)
|
||||||
log.info(
|
log.info(
|
||||||
f'Rx brokerd trade msg:\n'
|
f'Rx brokerd trade msg:\n'
|
||||||
f'{fmsg}'
|
f'{fmsg}'
|
||||||
|
|
@ -1039,7 +1039,8 @@ async def translate_and_relay_brokerd_events(
|
||||||
)
|
)
|
||||||
|
|
||||||
status_msg.reqid = reqid # THIS LINE IS CRITICAL!
|
status_msg.reqid = reqid # THIS LINE IS CRITICAL!
|
||||||
status_msg.brokerd_msg = msg
|
if not status_msg.brokerd_msg:
|
||||||
|
status_msg.brokerd_msg = msg
|
||||||
status_msg.src = msg.broker_details['name']
|
status_msg.src = msg.broker_details['name']
|
||||||
|
|
||||||
if not status_msg.req:
|
if not status_msg.req:
|
||||||
|
|
@ -1072,7 +1073,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
else: # open
|
else: # open
|
||||||
# relayed from backend but probably not handled so
|
# relayed from backend but probably not handled so
|
||||||
# just log it
|
# just log it
|
||||||
log.info(f'{broker} opened order {msg}')
|
log.info(f'{broker!r} opened order {msg}')
|
||||||
|
|
||||||
# BrokerdFill
|
# BrokerdFill
|
||||||
case {
|
case {
|
||||||
|
|
@ -1185,7 +1186,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
}:
|
}:
|
||||||
msg = (
|
msg = (
|
||||||
f'Unhandled broker status for dialog {reqid}:\n'
|
f'Unhandled broker status for dialog {reqid}:\n'
|
||||||
f'{pformat(brokerd_msg)}'
|
f'{ppfmt(brokerd_msg)}'
|
||||||
)
|
)
|
||||||
if (
|
if (
|
||||||
oid := book._ems2brokerd_ids.inverse.get(reqid)
|
oid := book._ems2brokerd_ids.inverse.get(reqid)
|
||||||
|
|
@ -1194,7 +1195,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
# clearable limits..
|
# clearable limits..
|
||||||
if status_msg := book._active.get(oid):
|
if status_msg := book._active.get(oid):
|
||||||
msg += (
|
msg += (
|
||||||
f'last status msg: {pformat(status_msg)}\n\n'
|
f'last status msg: {ppfmt(status_msg)}\n\n'
|
||||||
f'this msg:{fmsg}\n'
|
f'this msg:{fmsg}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -1233,7 +1234,7 @@ async def process_client_order_cmds(
|
||||||
async for cmd in client_order_stream:
|
async for cmd in client_order_stream:
|
||||||
log.info(
|
log.info(
|
||||||
f'Received order cmd:\n'
|
f'Received order cmd:\n'
|
||||||
f'{pformat(cmd)}\n'
|
f'{ppfmt(cmd)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# CAWT DAMN we need struct support!
|
# CAWT DAMN we need struct support!
|
||||||
|
|
@ -1398,8 +1399,8 @@ async def process_client_order_cmds(
|
||||||
# handle relaying the ems side responses back to
|
# handle relaying the ems side responses back to
|
||||||
# the client/cmd sender from this request
|
# the client/cmd sender from this request
|
||||||
log.info(
|
log.info(
|
||||||
f'Sending live order to {broker}:\n'
|
f'Sending live order to {broker!r}:\n'
|
||||||
f'{pformat(msg)}'
|
f'{ppfmt(msg)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
await brokerd_order_stream.send(msg)
|
await brokerd_order_stream.send(msg)
|
||||||
|
|
|
||||||
|
|
@ -168,7 +168,7 @@ async def _reconnect_forever(
|
||||||
nobsws: NoBsWs,
|
nobsws: NoBsWs,
|
||||||
reset_after: int, # msg recv timeout before reset attempt
|
reset_after: int, # msg recv timeout before reset attempt
|
||||||
|
|
||||||
fixture: AsyncContextManager | None = None,
|
fixture: AsyncContextManager|None = None,
|
||||||
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
@ -185,7 +185,7 @@ async def _reconnect_forever(
|
||||||
async def proxy_msgs(
|
async def proxy_msgs(
|
||||||
ws: WebSocketConnection,
|
ws: WebSocketConnection,
|
||||||
rent_cs: trio.CancelScope, # parent cancel scope
|
rent_cs: trio.CancelScope, # parent cancel scope
|
||||||
):
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Receive (under `timeout` deadline) all msgs from from underlying
|
Receive (under `timeout` deadline) all msgs from from underlying
|
||||||
websocket and relay them to (calling) parent task via ``trio``
|
websocket and relay them to (calling) parent task via ``trio``
|
||||||
|
|
@ -206,7 +206,7 @@ async def _reconnect_forever(
|
||||||
except nobsws.recon_errors:
|
except nobsws.recon_errors:
|
||||||
log.exception(
|
log.exception(
|
||||||
f'{src_mod}\n'
|
f'{src_mod}\n'
|
||||||
f'{url} connection bail with:'
|
f'{url!r} connection failed\n'
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await trio.sleep(0.5)
|
await trio.sleep(0.5)
|
||||||
|
|
@ -269,7 +269,7 @@ async def _reconnect_forever(
|
||||||
nobsws._ws = ws
|
nobsws._ws = ws
|
||||||
log.info(
|
log.info(
|
||||||
f'{src_mod}\n'
|
f'{src_mod}\n'
|
||||||
f'Connection success: {url}'
|
f'Connection success: {url!r}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# begin relay loop to forward msgs
|
# begin relay loop to forward msgs
|
||||||
|
|
@ -341,7 +341,7 @@ async def _reconnect_forever(
|
||||||
async def open_autorecon_ws(
|
async def open_autorecon_ws(
|
||||||
url: str,
|
url: str,
|
||||||
|
|
||||||
fixture: AsyncContextManager | None = None,
|
fixture: AsyncContextManager|None = None,
|
||||||
|
|
||||||
# time in sec between msgs received before
|
# time in sec between msgs received before
|
||||||
# we presume connection might need a reset.
|
# we presume connection might need a reset.
|
||||||
|
|
@ -361,7 +361,7 @@ async def open_autorecon_ws(
|
||||||
and restarts the full http(s) handshake on catches of certain
|
and restarts the full http(s) handshake on catches of certain
|
||||||
connetivity errors, or some user defined recv timeout.
|
connetivity errors, or some user defined recv timeout.
|
||||||
|
|
||||||
You can provide a ``fixture`` async-context-manager which will be
|
You can provide a `fixture` async-context-manager which will be
|
||||||
entered/exitted around each connection reset; eg. for
|
entered/exitted around each connection reset; eg. for
|
||||||
(re)requesting subscriptions without requiring streaming setup
|
(re)requesting subscriptions without requiring streaming setup
|
||||||
code to rerun.
|
code to rerun.
|
||||||
|
|
@ -402,7 +402,8 @@ async def open_autorecon_ws(
|
||||||
except NoBsWs.recon_errors as con_err:
|
except NoBsWs.recon_errors as con_err:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Entire ws-channel disconnect due to,\n'
|
f'Entire ws-channel disconnect due to,\n'
|
||||||
f'con_err: {con_err!r}\n'
|
f'\n'
|
||||||
|
f'{con_err!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -424,7 +425,7 @@ class JSONRPCResult(Struct):
|
||||||
async def open_jsonrpc_session(
|
async def open_jsonrpc_session(
|
||||||
url: str,
|
url: str,
|
||||||
start_id: int = 0,
|
start_id: int = 0,
|
||||||
response_type: type = JSONRPCResult,
|
response_type: Type[Struct] = JSONRPCResult,
|
||||||
msg_recv_timeout: float = float('inf'),
|
msg_recv_timeout: float = float('inf'),
|
||||||
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
|
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
|
||||||
# and options mkts are generally "slow moving"..
|
# and options mkts are generally "slow moving"..
|
||||||
|
|
@ -435,7 +436,10 @@ async def open_jsonrpc_session(
|
||||||
# broken and never restored with wtv init sequence is required to
|
# broken and never restored with wtv init sequence is required to
|
||||||
# re-establish a working req-resp session.
|
# re-establish a working req-resp session.
|
||||||
|
|
||||||
) -> Callable[[str, dict], dict]:
|
) -> Callable[
|
||||||
|
[str, dict],
|
||||||
|
dict,
|
||||||
|
]:
|
||||||
'''
|
'''
|
||||||
Init a json-RPC-over-websocket connection to the provided `url`.
|
Init a json-RPC-over-websocket connection to the provided `url`.
|
||||||
|
|
||||||
|
|
@ -531,14 +535,18 @@ async def open_jsonrpc_session(
|
||||||
'id': mid,
|
'id': mid,
|
||||||
} if not rpc_results.get(mid):
|
} if not rpc_results.get(mid):
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Unexpected ws msg: {json.dumps(msg, indent=4)}'
|
f'Unexpected ws msg?\n'
|
||||||
|
f'{json.dumps(msg, indent=4)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'method': _,
|
'method': _,
|
||||||
'params': _,
|
'params': _,
|
||||||
}:
|
}:
|
||||||
log.debug(f'Recieved\n{msg}')
|
log.debug(
|
||||||
|
f'Recieved\n'
|
||||||
|
f'{msg!r}'
|
||||||
|
)
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'error': error
|
'error': error
|
||||||
|
|
@ -554,12 +562,15 @@ async def open_jsonrpc_session(
|
||||||
result['event'].set()
|
result['event'].set()
|
||||||
log.error(
|
log.error(
|
||||||
f'JSONRPC request failed\n'
|
f'JSONRPC request failed\n'
|
||||||
f'req: {req_msg}\n'
|
f'req: {req_msg!r}\n'
|
||||||
f'resp: {error}\n'
|
f'resp: {error!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
log.warning(
|
||||||
|
f'Unhandled JSON-RPC msg!?\n'
|
||||||
|
f'{msg!r}'
|
||||||
|
)
|
||||||
|
|
||||||
tn.start_soon(recv_task)
|
tn.start_soon(recv_task)
|
||||||
yield json_rpc
|
yield json_rpc
|
||||||
|
|
|
||||||
|
|
@ -342,7 +342,10 @@ class MainWindow(QMainWindow):
|
||||||
log.debug('Saved window geometry for next session')
|
log.debug('Saved window geometry for next session')
|
||||||
|
|
||||||
# raising KBI seems to get intercepted by by Qt so just use the system.
|
# raising KBI seems to get intercepted by by Qt so just use the system.
|
||||||
os.kill(os.getpid(), signal.SIGINT)
|
os.kill(
|
||||||
|
os.getpid(),
|
||||||
|
signal.SIGINT,
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def status_bar(self) -> QStatusBar:
|
def status_bar(self) -> QStatusBar:
|
||||||
|
|
@ -549,9 +552,9 @@ class MainWindow(QMainWindow):
|
||||||
|
|
||||||
# update godwidget and its children
|
# update godwidget and its children
|
||||||
if self.godwidget:
|
if self.godwidget:
|
||||||
# update search widget if it exists
|
# update search bg if it exists
|
||||||
if hasattr(self.godwidget, 'search') and self.godwidget.search:
|
if search := getattr(self.godwidget, 'search', None):
|
||||||
self.godwidget.search.update_fonts()
|
search.update_fonts()
|
||||||
|
|
||||||
# update order mode panes in all chart views
|
# update order mode panes in all chart views
|
||||||
self._update_chart_order_panes()
|
self._update_chart_order_panes()
|
||||||
|
|
@ -571,7 +574,10 @@ class MainWindow(QMainWindow):
|
||||||
return
|
return
|
||||||
|
|
||||||
# iterate through all linked splits (hist and rt)
|
# iterate through all linked splits (hist and rt)
|
||||||
for splits_name in ['hist_linked', 'rt_linked']:
|
for splits_name in [
|
||||||
|
'hist_linked',
|
||||||
|
'rt_linked',
|
||||||
|
]:
|
||||||
splits = getattr(self.godwidget, splits_name, None)
|
splits = getattr(self.godwidget, splits_name, None)
|
||||||
if not splits:
|
if not splits:
|
||||||
continue
|
continue
|
||||||
|
|
@ -583,12 +589,14 @@ class MainWindow(QMainWindow):
|
||||||
self._update_chart_axes(chart)
|
self._update_chart_axes(chart)
|
||||||
|
|
||||||
# update order pane
|
# update order pane
|
||||||
if hasattr(chart, 'view'):
|
if (
|
||||||
view = chart.view
|
(view := getattr(chart, 'view', None))
|
||||||
if hasattr(view, 'order_mode') and view.order_mode:
|
and
|
||||||
order_mode = view.order_mode
|
(order_mode := getattr(view, 'order_mode', None))
|
||||||
if hasattr(order_mode, 'pane') and order_mode.pane:
|
and
|
||||||
order_mode.pane.update_fonts()
|
(pane := getattr(order_mode, 'pane', None))
|
||||||
|
):
|
||||||
|
pane.update_fonts()
|
||||||
|
|
||||||
# also check subplots
|
# also check subplots
|
||||||
subplots = getattr(splits, 'subplots', {})
|
subplots = getattr(splits, 'subplots', {})
|
||||||
|
|
@ -597,54 +605,90 @@ class MainWindow(QMainWindow):
|
||||||
self._update_chart_axes(subplot_chart)
|
self._update_chart_axes(subplot_chart)
|
||||||
|
|
||||||
# update subplot order pane
|
# update subplot order pane
|
||||||
if hasattr(subplot_chart, 'view'):
|
if (
|
||||||
subplot_view = subplot_chart.view
|
(view := getattr(subplot_chart, 'view', None))
|
||||||
if hasattr(subplot_view, 'order_mode') and subplot_view.order_mode:
|
and
|
||||||
subplot_order_mode = subplot_view.order_mode
|
(order_mode := getattr(
|
||||||
if hasattr(subplot_order_mode, 'pane') and subplot_order_mode.pane:
|
view, 'order_mode', None,
|
||||||
subplot_order_mode.pane.update_fonts()
|
))
|
||||||
|
and
|
||||||
|
(pane := getattr(order_mode, 'pane', None))
|
||||||
|
):
|
||||||
|
pane.update_fonts()
|
||||||
|
|
||||||
# resize all sidepanes to match main chart's sidepane width
|
# resize all sidepanes to match the
|
||||||
# this ensures volume/subplot sidepanes match the main chart
|
# main chart's sidepane width; ensures
|
||||||
if splits and hasattr(splits, 'resize_sidepanes'):
|
# volume/subplot sidepanes match.
|
||||||
splits.resize_sidepanes()
|
if (
|
||||||
|
splits
|
||||||
|
and
|
||||||
|
(resize := getattr(
|
||||||
|
splits, 'resize_sidepanes', None,
|
||||||
|
))
|
||||||
|
):
|
||||||
|
resize()
|
||||||
|
|
||||||
def _update_chart_axes(self, chart) -> None:
|
def _update_chart_axes(self, chart) -> None:
|
||||||
'''Update axis fonts and sizing for a chart.'''
|
'''
|
||||||
|
Update axis fonts and sizing for a chart.
|
||||||
|
|
||||||
|
'''
|
||||||
from . import _style
|
from . import _style
|
||||||
|
|
||||||
# update price axis (right side)
|
# update price axis (right side)
|
||||||
if hasattr(chart, 'pi') and chart.pi:
|
if plot_item := getattr(chart, 'pi', None):
|
||||||
plot_item = chart.pi
|
|
||||||
# get all axes from plot item
|
# get all axes from plot item
|
||||||
for axis_name in ['left', 'right', 'bottom', 'top']:
|
for axis_name in [
|
||||||
axis = plot_item.getAxis(axis_name)
|
'left',
|
||||||
if axis and hasattr(axis, 'update_fonts'):
|
'right',
|
||||||
axis.update_fonts(_style._font)
|
'bottom',
|
||||||
|
'top',
|
||||||
|
]:
|
||||||
|
if (
|
||||||
|
(axis := plot_item.getAxis(axis_name))
|
||||||
|
and
|
||||||
|
(update_fonts := getattr(
|
||||||
|
axis, 'update_fonts', None,
|
||||||
|
))
|
||||||
|
):
|
||||||
|
update_fonts(_style._font)
|
||||||
|
|
||||||
# force plot item to recalculate its entire layout
|
# force plot item to recalculate
|
||||||
|
# its entire layout
|
||||||
plot_item.updateGeometry()
|
plot_item.updateGeometry()
|
||||||
|
|
||||||
# force chart widget to update
|
# force chart widget to update
|
||||||
if hasattr(chart, 'updateGeometry'):
|
if update_geom := getattr(chart, 'updateGeometry', None):
|
||||||
chart.updateGeometry()
|
update_geom()
|
||||||
|
|
||||||
# trigger a full scene update
|
# trigger a full scene update
|
||||||
if hasattr(chart, 'update'):
|
if update := getattr(chart, 'update', None):
|
||||||
chart.update()
|
update()
|
||||||
|
|
||||||
def _refresh_widget_fonts(self, widget: QWidget) -> None:
|
def _refresh_widget_fonts(
|
||||||
|
self,
|
||||||
|
widget: QWidget,
|
||||||
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Recursively update font sizes in all child widgets.
|
Recursively update font sizes in all
|
||||||
|
child widgets.
|
||||||
|
|
||||||
|
Handles widgets that have font-size
|
||||||
|
hardcoded in their stylesheets.
|
||||||
|
|
||||||
This handles widgets that have font-size hardcoded in their stylesheets.
|
|
||||||
'''
|
'''
|
||||||
from . import _style
|
from . import _style
|
||||||
|
|
||||||
# recursively process all children
|
# recursively process all children
|
||||||
for child in widget.findChildren(QWidget):
|
for child in widget.findChildren(QWidget):
|
||||||
# skip widgets that have their own update_fonts method (handled separately)
|
# skip widgets that have custom update
|
||||||
if hasattr(child, 'update_fonts'):
|
# methods; handled separately below.
|
||||||
|
if getattr(child, 'update_fonts', None):
|
||||||
|
log.debug(
|
||||||
|
f'Skipping sub-widget with'
|
||||||
|
f' custom font-update meth..\n'
|
||||||
|
f'{child!r}\n'
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# update child's stylesheet if it has font-size
|
# update child's stylesheet if it has font-size
|
||||||
|
|
@ -654,23 +698,35 @@ class MainWindow(QMainWindow):
|
||||||
# this is a heuristic - may need refinement
|
# this is a heuristic - may need refinement
|
||||||
try:
|
try:
|
||||||
child.setFont(_style._font.font)
|
child.setFont(_style._font.font)
|
||||||
except (AttributeError, RuntimeError):
|
except (
|
||||||
pass
|
AttributeError,
|
||||||
|
RuntimeError,
|
||||||
|
):
|
||||||
|
log.exception(
|
||||||
|
'Failed to update sub-widget font?\n'
|
||||||
|
)
|
||||||
|
|
||||||
# update child's font
|
# update child's font
|
||||||
try:
|
try:
|
||||||
child.setFont(_style._font.font)
|
child.setFont(_style._font.font)
|
||||||
except (AttributeError, RuntimeError):
|
except (
|
||||||
pass
|
AttributeError,
|
||||||
|
RuntimeError,
|
||||||
|
):
|
||||||
|
log.exception(
|
||||||
|
'Failed to update sub-widget font?\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# singleton app per actor
|
# singleton app per actor
|
||||||
_qt_win: QMainWindow = None
|
_qt_win: QMainWindow|None = None
|
||||||
|
|
||||||
|
|
||||||
def main_window() -> MainWindow:
|
def main_window() -> MainWindow:
|
||||||
'Return the actor-global Qt window.'
|
'''
|
||||||
|
Return the actor-global Qt window.
|
||||||
|
|
||||||
|
'''
|
||||||
global _qt_win
|
global _qt_win
|
||||||
assert _qt_win
|
assert _qt_win
|
||||||
return _qt_win
|
return _qt_win
|
||||||
|
|
|
||||||
|
|
@ -1022,13 +1022,22 @@ async def open_order_mode(
|
||||||
started.set()
|
started.set()
|
||||||
|
|
||||||
for oid, msg in ems_dialog_msgs.items():
|
for oid, msg in ems_dialog_msgs.items():
|
||||||
|
|
||||||
# HACK ALERT: ensure a resp field is filled out since
|
# HACK ALERT: ensure a resp field is filled out since
|
||||||
# techincally the call below expects a ``Status``. TODO:
|
# techincally the call below expects a ``Status``. TODO:
|
||||||
# parse into proper ``Status`` equivalents ems-side?
|
# parse into proper ``Status`` equivalents ems-side?
|
||||||
# msg.setdefault('resp', msg['broker_details']['resp'])
|
# msg.setdefault('resp', msg['broker_details']['resp'])
|
||||||
# msg.setdefault('oid', msg['broker_details']['oid'])
|
# msg.setdefault('oid', msg['broker_details']['oid'])
|
||||||
msg['brokerd_msg'] = msg
|
ya_msg: dict = msg.setdefault(
|
||||||
|
'brokerd_msg',
|
||||||
|
msg,
|
||||||
|
)
|
||||||
|
if msg is not ya_msg:
|
||||||
|
log.warning(
|
||||||
|
f'A `.brokerd_msg` was already set for ems-dialog msg?\n'
|
||||||
|
f'oid: {oid!r}\n'
|
||||||
|
f'ya_msg: {ya_msg!r}\n'
|
||||||
|
f'msg: {ya_msg!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
await process_trade_msg(
|
await process_trade_msg(
|
||||||
mode,
|
mode,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue