diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 1a35c760..27e0cfe7 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -45,6 +45,7 @@ from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray from ..data._web_bs import open_autorecon_ws, NoBsWs +from ..clearing._paper_engine import PaperBoi from ..clearing._messages import ( BrokerdPosition, BrokerdOrder, BrokerdStatus, BrokerdOrderAck, BrokerdError, BrokerdCancel, @@ -691,71 +692,7 @@ async def trades_dialogue( # Authenticated block async with get_client() as client: - if client._api_key: - acc_name = 'kraken.' + client._name - trades = await client.get_trades() - - position_msgs = pack_positions(acc_name, trades) - - await ctx.started((position_msgs, (acc_name,))) - - # Get websocket token for authenticated data stream - # Assert that a token was actually received - resp = await client.endpoint('GetWebSocketsToken', {}) - assert resp['error'] == [] - token = resp['result']['token'] - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - ## TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) - - # Process trades msg stream of ws - async with open_autorecon_ws( - 'wss://ws-auth.kraken.com/', - fixture=subscribe, - token=token, - ) as ws: - async for msg in process_trade_msgs(ws): - for trade in msg: - # check the type of packaged message - assert type(trade) == Trade - # prepare and send a status update for line update - trade_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), - - account='kraken.spot', - status='executed', - filled=float(trade.size), - reason='Order filled by kraken', - # remaining='' # TODO: not sure what to do here. - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - } - ) - - await ems_stream.send(trade_msg.dict()) - - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), - - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) - - await ems_stream.send(fill_msg.dict()) - - else: + if not client._api_key: log.error('Missing Kraken API key: Trades WS connection failed') await ctx.started(({}, {'paper',})) @@ -763,9 +700,85 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): + + client = PaperBoi( + 'kraken', + ems_stream, + _buys={}, + _sells={}, + + _reqids={}, + + # TODO: load paper positions from ``positions.toml`` + _positions={}, + ) + ## TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) + acc_name = 'kraken.' + client._name + trades = await client.get_trades() + + position_msgs = pack_positions(acc_name, trades) + + await ctx.started((position_msgs, (acc_name,))) + + # Get websocket token for authenticated data stream + # Assert that a token was actually received + resp = await client.endpoint('GetWebSocketsToken', {}) + assert resp['error'] == [] + token = resp['result']['token'] + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + ## TODO: maybe add multiple accounts + n.start_soon(handle_order_requests, client, ems_stream) + + # Process trades msg stream of ws + async with open_autorecon_ws( + 'wss://ws-auth.kraken.com/', + fixture=subscribe, + token=token, + ) as ws: + async for msg in process_trade_msgs(ws): + for trade in msg: + # check the type of packaged message + assert type(trade) == Trade + # prepare and send a status update for line update + trade_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), + + account='kraken.spot', + status='executed', + filled=float(trade.size), + reason='Order filled by kraken', + # remaining='' # TODO: not sure what to do here. + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + } + ) + + await ems_stream.send(trade_msg.dict()) + + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), + + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) + + await ems_stream.send(fill_msg.dict()) + async def stream_messages( ws: NoBsWs,