Compare commits

..

No commits in common. "c9aacd6a842000052d63853b6210b1e677383769" and "039d06cc4859ae48da18cac2e2877ac0eae3498f" have entirely different histories.

1 changed files with 34 additions and 33 deletions

View File

@ -33,6 +33,7 @@ import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
import wsproto import wsproto
import itertools
import urllib.parse import urllib.parse
import hashlib import hashlib
import hmac import hmac
@ -160,6 +161,13 @@ def get_config() -> dict[str, Any]:
conf, path = config.load() conf, path = config.load()
section = conf.get('kraken') section = conf.get('kraken')
if section:
log.warning(
'Kraken order mode is currently disabled due to bug!\n'
'See https://github.com/pikers/piker/issues/299'
)
return {}
if section is None: if section is None:
log.warning(f'No config section found for kraken in {path}') log.warning(f'No config section found for kraken in {path}')
return {} return {}
@ -300,38 +308,34 @@ class Client:
async def submit_limit( async def submit_limit(
self, self,
oid: str,
symbol: str, symbol: str,
price: float, price: float,
action: str, action: str,
size: float, size: float,
reqid: str = None, reqid: int = None,
validate: bool = False # set True test call without a real submission ) -> int:
) -> dict:
''' '''
Place an order and return integer request id provided by client. Place an order and return integer request id provided by client.
''' '''
# Build common data dict for common keys from both endpoints # Build order data for kraken api
data = { data = {
"userref": reqid,
"ordertype": "limit",
"type": action,
"volume": str(size),
"pair": symbol, "pair": symbol,
"price": str(price), "price": str(price),
"validate": validate # set to True test AddOrder call without a real submission
"validate": False
} }
if reqid is None: return await self.endpoint('AddOrder', data)
# Build order data for kraken api
data["ordertype"] = "limit"
data["type"] = action
data["volume"] = str(size)
return await self.endpoint('AddOrder', data)
else:
# Edit order data for kraken api
data["txid"] = reqid
return await self.endpoint('EditOrder', data)
async def submit_cancel( async def submit_cancel(
self, self,
reqid: str, reqid: str,
) -> dict: ) -> None:
''' '''
Send cancel request for order id ``reqid``. Send cancel request for order id ``reqid``.
@ -542,6 +546,7 @@ async def handle_order_requests(
request_msg: dict request_msg: dict
order: BrokerdOrder order: BrokerdOrder
userref_counter = itertools.count()
async for request_msg in ems_order_stream: async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}') log.info(f'Received order request {request_msg}')
@ -570,14 +575,17 @@ async def handle_order_requests(
continue continue
# validate # validate
temp_id = next(userref_counter)
order = BrokerdOrder(**request_msg) order = BrokerdOrder(**request_msg)
# call our client api to submit the order # call our client api to submit the order
resp = await client.submit_limit( resp = await client.submit_limit(
oid=order.oid,
symbol=order.symbol, symbol=order.symbol,
price=order.price, price=order.price,
action=order.action, action=order.action,
size=order.size, size=order.size,
reqid=order.reqid, reqid=temp_id,
) )
err = resp['error'] err = resp['error']
@ -588,21 +596,16 @@ async def handle_order_requests(
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=order.oid, oid=order.oid,
reqid=order.reqid, reqid=temp_id,
symbol=order.symbol, symbol=order.symbol,
reason="Failed order submission", reason="Failed order submission",
broker_details=resp broker_details=resp
).dict() ).dict()
) )
else: else:
# TODO: handle multiple orders (cancels?) # TODO: handle multiple cancels
# txid is an array of strings # txid is an array of strings
if order.reqid is None: reqid = resp['result']['txid'][0]
reqid = resp['result']['txid'][0]
else:
# update the internal pairing of oid to krakens
# txid with the new txid that is returned on edit
reqid = resp['result']['txid']
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
BrokerdOrderAck( BrokerdOrderAck(
@ -639,9 +642,9 @@ async def handle_order_requests(
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg.oid, oid=order.oid,
reqid=msg.reqid, reqid=temp_id,
symbol=msg.symbol, symbol=order.symbol,
reason="Failed order cancel", reason="Failed order cancel",
broker_details=resp broker_details=resp
).dict() ).dict()
@ -665,11 +668,9 @@ async def handle_order_requests(
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg.oid, oid=order.oid,
reqid=msg.reqid, reqid=temp_id,
symbol=msg.symbol, symbol=order.symbol,
# TODO: maybe figure out if pending cancels will
# eventually get cancelled
reason="Order cancel is still pending?", reason="Order cancel is still pending?",
broker_details=resp broker_details=resp
).dict() ).dict()