From 2baa1b460571191f504eee845004f4c89b9c0548 Mon Sep 17 00:00:00 2001 From: Konstantine Tsafatinos Date: Mon, 28 Mar 2022 18:28:19 -0400 Subject: [PATCH] fix hang when kraken is not in config --- piker/brokers/kraken.py | 133 ++++++++++++++++++++++------------------ 1 file changed, 74 insertions(+), 59 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 4a8d65d5..1a35c760 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -450,11 +450,14 @@ class Client: async def get_client() -> Client: section = get_config() - client = Client( - name=section['key_descr'], - api_key=section['api_key'], - secret=section['secret'] - ) + if section: + client = Client( + name=section['key_descr'], + api_key=section['api_key'], + secret=section['secret'] + ) + else: + client = Client() # at startup, load all symbols locally for fast search await client.cache_symbols() @@ -688,68 +691,80 @@ async def trades_dialogue( # Authenticated block async with get_client() as client: - acc_name = 'kraken.' + client._name - trades = await client.get_trades() + if client._api_key: + acc_name = 'kraken.' + client._name + trades = await client.get_trades() - position_msgs = pack_positions(acc_name, trades) + position_msgs = pack_positions(acc_name, trades) - await ctx.started((position_msgs, (acc_name,))) + await ctx.started((position_msgs, (acc_name,))) - # Get websocket token for authenticated data stream - # Assert that a token was actually received - resp = await client.endpoint('GetWebSocketsToken', {}) - assert resp['error'] == [] - token = resp['result']['token'] + # Get websocket token for authenticated data stream + # Assert that a token was actually received + resp = await client.endpoint('GetWebSocketsToken', {}) + assert resp['error'] == [] + token = resp['result']['token'] - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - ## TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + ## TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) - # Process trades msg stream of ws - async with open_autorecon_ws( - 'wss://ws-auth.kraken.com/', - fixture=subscribe, - token=token, - ) as ws: - async for msg in process_trade_msgs(ws): - for trade in msg: - # check the type of packaged message - assert type(trade) == Trade - # prepare and send a status update for line update - trade_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), + # Process trades msg stream of ws + async with open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=subscribe, + token=token, + ) as ws: + async for msg in process_trade_msgs(ws): + for trade in msg: + # check the type of packaged message + assert type(trade) == Trade + # prepare and send a status update for line update + trade_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), - account='kraken.spot', - status='executed', - filled=float(trade.size), - reason='Order filled by kraken', - # remaining='' # TODO: not sure what to do here. - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - } - ) - - await ems_stream.send(trade_msg.dict()) + account='kraken.spot', + status='executed', + filled=float(trade.size), + reason='Order filled by kraken', + # remaining='' # TODO: not sure what to do here. + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + } + ) + + await ems_stream.send(trade_msg.dict()) - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) - - await ems_stream.send(fill_msg.dict()) + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) + + await ems_stream.send(fill_msg.dict()) + + else: + log.error('Missing Kraken API key: Trades WS connection failed') + await ctx.started(({}, {'paper',})) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + ## TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) async def stream_messages(