From 88306a6c1ecd1872e73c3382e4c0f91980fe763d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 9 Apr 2022 16:56:05 -0400 Subject: [PATCH] Drop invalid status msg, linting cleanups --- piker/brokers/kraken.py | 126 +++++++++++++++++++--------------------- 1 file changed, 59 insertions(+), 67 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 18475886..ce80679d 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -124,11 +124,11 @@ class Trade(BaseModel): Trade class that helps parse and validate ownTrades stream ''' - reqid: str # kraken order transaction id - action: str # buy or sell - price: str # price of asset - size: str # vol of asset - broker_time: str # e.g GTC, GTD + reqid: str # kraken order transaction id + action: str # buy or sell + price: str # price of asset + size: str # vol of asset + broker_time: str # e.g GTC, GTD @dataclass @@ -242,7 +242,7 @@ class Client: uri_path: str ) -> Dict[str, Any]: headers = { - 'Content-Type': + 'Content-Type': 'application/x-www-form-urlencoded', 'API-Key': self._api_key, @@ -288,7 +288,7 @@ class Client: count = resp['result']['count'] break # update existing dict if num trades exceeds 100 - else: + else: trades.update(resp['result']['trades']) # increment the offset counter data['ofs'] += 50 @@ -446,12 +446,11 @@ class Client: raise SymbolNotFound(json['error'][0] + f': {symbol}') - @asynccontextmanager async def get_client() -> Client: section = get_config() - if section: + if section: client = Client( name=section['key_descr'], api_key=section['api_key'], @@ -565,7 +564,7 @@ async def handle_order_requests( # validate temp_id = next(userref_counter) order = BrokerdOrder(**request_msg) - + # call our client api to submit the order resp = await client.submit_limit( oid=order.oid, @@ -578,7 +577,9 @@ async def handle_order_requests( err = resp['error'] if err: - log.error(f'Failed to submit order') + oid = order.oid + log.error(f'Failed to submit order: {oid}') + await ems_order_stream.send( BrokerdError( oid=order.oid, @@ -616,42 +617,41 @@ async def handle_order_requests( reqid=msg.reqid ) - try: - # Check to make sure there was no error returned by - # the kraken endpoint. Assert one order was cancelled - assert resp['error'] == [] - assert resp['result']['count'] == 1 + # Check to make sure there was no error returned by + # the kraken endpoint. Assert one order was cancelled + assert resp['error'] == [] + assert resp['result']['count'] == 1 + + # Check to make sure the cancellation is NOT pending, + # then send the confirmation to the ems order stream + pending = resp['result'].get('pending') + if pending: + + oid = order.oid + log.error(f'Order {oid} cancel was not successful') - # TODO: Change this code using .get - try: - pending = resp['result']['pending'] - # Check to make sure the cancellation is NOT pending, - # then send the confirmation to the ems order stream - except KeyError: - await ems_order_stream.send( - BrokerdStatus( - reqid=msg.reqid, - account=msg.account, - time_ns=time.time_ns(), - status='cancelled', - reason='Order cancelled', - broker_details={'name': 'kraken'} - ).dict() - ) - except AssertionError: - log.error(f'Order cancel was not successful') await ems_order_stream.send( BrokerdError( - oid=order.oid, + oid=oid, reqid=temp_id, symbol=order.symbol, reason="Failed order cancel", broker_details=resp ).dict() ) - - else: - log.error(f'Unknown order command: {request_msg}') + else: + await ems_order_stream.send( + BrokerdStatus( + reqid=msg.reqid, + account=msg.account, + time_ns=time.time_ns(), + status='cancelled', + reason='Order cancelled', + broker_details={'name': 'kraken'} + ).dict() + ) + else: + log.error(f'Unknown order command: {request_msg}') @tractor.context @@ -694,7 +694,7 @@ async def trades_dialogue( async with get_client() as client: if not client._api_key: log.error('Missing Kraken API key: Trades WS connection failed') - await ctx.started(({}, {'paper',})) + await ctx.started(({}, ['paper'])) async with ( ctx.open_stream() as ems_stream, @@ -713,7 +713,7 @@ async def trades_dialogue( _positions={}, ) - ## TODO: maybe add multiple accounts + # TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) acc_name = 'kraken.' + client._name @@ -724,7 +724,7 @@ async def trades_dialogue( await ctx.started((position_msgs, (acc_name,))) # Get websocket token for authenticated data stream - # Assert that a token was actually received + # Assert that a token was actually received. resp = await client.endpoint('GetWebSocketsToken', {}) assert resp['error'] == [] token = resp['result']['token'] @@ -733,7 +733,7 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): - ## TODO: maybe add multiple accounts + # TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) # Process trades msg stream of ws @@ -746,24 +746,8 @@ async def trades_dialogue( for trade in msg: # check the type of packaged message assert type(trade) == Trade - # prepare and send a status update for line update - trade_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), - - account='kraken.spot', - status='executed', - filled=float(trade.size), - reason='Order filled by kraken', - # remaining='' # TODO: not sure what to do here. - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - } - ) - - await ems_stream.send(trade_msg.dict()) + # prepare and send a filled status update filled_msg = BrokerdStatus( reqid=trade.reqid, time_ns=time.time_ns(), @@ -772,13 +756,21 @@ async def trades_dialogue( status='filled', filled=float(trade.size), reason='Order filled by kraken', - # remaining='' # TODO: not sure what to do here. broker_details={ 'name': 'kraken', 'broker_time': trade.broker_time - } + }, + + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, ) - + await ems_stream.send(filled_msg.dict()) # send a fill msg for gui update @@ -793,7 +785,7 @@ async def trades_dialogue( broker_details={'name': 'kraken'}, broker_time=float(trade.broker_time) ) - + await ems_stream.send(fill_msg.dict()) @@ -899,10 +891,10 @@ async def process_trade_msgs( async for msg in stream_messages(ws): try: - # check that we are on the ownTrades stream and that msgs are - # arriving in sequence with kraken - # For clarification the kraken ws api docs for this stream: - # https://docs.kraken.com/websockets/#message-ownTrades + # check that we are on the ownTrades stream and that msgs + # are arriving in sequence with kraken For clarification the + # kraken ws api docs for this stream: + # https://docs.kraken.com/websockets/#message-ownTrades assert msg[1] == 'ownTrades' assert msg[2]['sequence'] > sequence_counter sequence_counter += 1