diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 18475886..f64ef7aa 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -33,7 +33,7 @@ import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto -from itertools import count +import itertools import urllib.parse import hashlib import hmac @@ -124,11 +124,11 @@ class Trade(BaseModel): Trade class that helps parse and validate ownTrades stream ''' - reqid: str # kraken order transaction id - action: str # buy or sell - price: str # price of asset - size: str # vol of asset - broker_time: str # e.g GTC, GTD + reqid: str # kraken order transaction id + action: str # buy or sell + price: str # price of asset + size: str # vol of asset + broker_time: str # e.g GTC, GTD @dataclass @@ -159,9 +159,15 @@ class OHLC: def get_config() -> dict[str, Any]: conf, path = config.load() - section = conf.get('kraken') + if section: + log.warning( + 'Kraken order mode is currently disabled due to bug!\n' + 'See https://github.com/pikers/piker/issues/299' + ) + return {} + if section is None: log.warning(f'No config section found for kraken in {path}') return {} @@ -242,7 +248,7 @@ class Client: uri_path: str ) -> Dict[str, Any]: headers = { - 'Content-Type': + 'Content-Type': 'application/x-www-form-urlencoded', 'API-Key': self._api_key, @@ -288,7 +294,7 @@ class Client: count = resp['result']['count'] break # update existing dict if num trades exceeds 100 - else: + else: trades.update(resp['result']['trades']) # increment the offset counter data['ofs'] += 50 @@ -446,12 +452,11 @@ class Client: raise SymbolNotFound(json['error'][0] + f': {symbol}') - @asynccontextmanager async def get_client() -> Client: section = get_config() - if section: + if section: client = Client( name=section['key_descr'], api_key=section['api_key'], @@ -541,7 +546,8 @@ async def handle_order_requests( request_msg: dict order: BrokerdOrder - userref_counter = count() + userref_counter = itertools.count() + async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') @@ -558,14 +564,20 @@ async def handle_order_requests( await ems_order_stream.send(BrokerdError( oid=request_msg['oid'], symbol=request_msg['symbol'], - reason=f'Kraken only, No account found: `{account}` ?', + + # reason=f'Kraken only, No account found: `{account}` ?', + reason=( + 'Kraken only, order mode disabled due to ' + 'https://github.com/pikers/piker/issues/299' + ), + ).dict()) continue # validate temp_id = next(userref_counter) order = BrokerdOrder(**request_msg) - + # call our client api to submit the order resp = await client.submit_limit( oid=order.oid, @@ -578,7 +590,9 @@ async def handle_order_requests( err = resp['error'] if err: - log.error(f'Failed to submit order') + oid = order.oid + log.error(f'Failed to submit order: {oid}') + await ems_order_stream.send( BrokerdError( oid=order.oid, @@ -616,30 +630,16 @@ async def handle_order_requests( reqid=msg.reqid ) + # Check to make sure there was no error returned by + # the kraken endpoint. Assert one order was cancelled. try: - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled - assert resp['error'] == [] - assert resp['result']['count'] == 1 + result = resp['result'] + count = result['count'] + + # check for 'error' key if we received no 'result' + except KeyError: + error = resp.get('error') - # TODO: Change this code using .get - try: - pending = resp['result']['pending'] - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - except KeyError: - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ).dict() - ) - except AssertionError: - log.error(f'Order cancel was not successful') await ems_order_stream.send( BrokerdError( oid=order.oid, @@ -650,8 +650,46 @@ async def handle_order_requests( ).dict() ) - else: - log.error(f'Unknown order command: {request_msg}') + if not error: + raise BrokerError(f'Unknown order cancel response: {resp}') + + else: + if not count: # no orders were cancelled? + + # XXX: what exactly is this from and why would we care? + # there doesn't seem to be any docs here? + # https://docs.kraken.com/rest/#operation/cancelOrder + + # Check to make sure the cancellation is NOT pending, + # then send the confirmation to the ems order stream + pending = result.get('pending') + if pending: + log.error(f'Order {oid} cancel was not yet successful') + + await ems_order_stream.send( + BrokerdError( + oid=order.oid, + reqid=temp_id, + symbol=order.symbol, + reason="Order cancel is still pending?", + broker_details=resp + ).dict() + ) + + else: # order cancel success case. + + await ems_order_stream.send( + BrokerdStatus( + reqid=msg.reqid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) + else: + log.error(f'Unknown order command: {request_msg}') @tractor.context @@ -694,7 +732,7 @@ async def trades_dialogue( async with get_client() as client: if not client._api_key: log.error('Missing Kraken API key: Trades WS connection failed') - await ctx.started(({}, {'paper',})) + await ctx.started(({}, ['paper'])) async with ( ctx.open_stream() as ems_stream, @@ -713,7 +751,7 @@ async def trades_dialogue( _positions={}, ) - ## TODO: maybe add multiple accounts + # TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) acc_name = 'kraken.' + client._name @@ -724,7 +762,7 @@ async def trades_dialogue( await ctx.started((position_msgs, (acc_name,))) # Get websocket token for authenticated data stream - # Assert that a token was actually received + # Assert that a token was actually received. resp = await client.endpoint('GetWebSocketsToken', {}) assert resp['error'] == [] token = resp['result']['token'] @@ -733,7 +771,7 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): - ## TODO: maybe add multiple accounts + # TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) # Process trades msg stream of ws @@ -746,24 +784,8 @@ async def trades_dialogue( for trade in msg: # check the type of packaged message assert type(trade) == Trade - # prepare and send a status update for line update - trade_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), - - account='kraken.spot', - status='executed', - filled=float(trade.size), - reason='Order filled by kraken', - # remaining='' # TODO: not sure what to do here. - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - } - ) - - await ems_stream.send(trade_msg.dict()) + # prepare and send a filled status update filled_msg = BrokerdStatus( reqid=trade.reqid, time_ns=time.time_ns(), @@ -772,13 +794,21 @@ async def trades_dialogue( status='filled', filled=float(trade.size), reason='Order filled by kraken', - # remaining='' # TODO: not sure what to do here. broker_details={ 'name': 'kraken', 'broker_time': trade.broker_time - } + }, + + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, ) - + await ems_stream.send(filled_msg.dict()) # send a fill msg for gui update @@ -793,7 +823,7 @@ async def trades_dialogue( broker_details={'name': 'kraken'}, broker_time=float(trade.broker_time) ) - + await ems_stream.send(fill_msg.dict()) @@ -899,10 +929,10 @@ async def process_trade_msgs( async for msg in stream_messages(ws): try: - # check that we are on the ownTrades stream and that msgs are - # arriving in sequence with kraken - # For clarification the kraken ws api docs for this stream: - # https://docs.kraken.com/websockets/#message-ownTrades + # check that we are on the ownTrades stream and that msgs + # are arriving in sequence with kraken For clarification the + # kraken ws api docs for this stream: + # https://docs.kraken.com/websockets/#message-ownTrades assert msg[1] == 'ownTrades' assert msg[2]['sequence'] > sequence_counter sequence_counter += 1 diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index ec5f4afb..7a48768c 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -184,7 +184,7 @@ class BrokerdStatus(BaseModel): # { # 'submitted', # 'cancelled', - # 'executed', + # 'filled', # } status: str