diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index c17d9daa..9c98bebe 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -70,6 +70,7 @@ class Trade(BaseModel): def pack_positions( acc: str, trades: dict + ) -> list[Any]: positions: dict[str, float] = {} vols: dict[str, float] = {} @@ -104,8 +105,8 @@ def pack_positions( async def handle_order_requests( - client: Client, - ems_order_stream: tractor.MsgStream, + client: Client, + ems_order_stream: tractor.MsgStream, ) -> None: @@ -342,11 +343,13 @@ async def trades_dialogue( # TODO: maybe add multiple accounts n.start_soon(handle_order_requests, client, ems_stream) + # pull and deliver trades ledger acc_name = 'kraken.' + client._name trades = await client.get_trades() - + log.info( + f'Loaded {len(trades)} trades from account `{acc_name}`' + ) position_msgs = pack_positions(acc_name, trades) - await ctx.started((position_msgs, (acc_name,))) # Get websocket token for authenticated data stream @@ -355,74 +358,76 @@ async def trades_dialogue( # lol wtf is this.. assert resp['error'] == [] - token = resp['result']['token'] async with ( ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - # TODO: maybe add multiple accounts - n.start_soon(handle_order_requests, client, ems_stream) - - # Process trades msg stream of ws - async with open_autorecon_ws( + open_autorecon_ws( 'wss://ws-auth.kraken.com/', fixture=subscribe, token=token, - ) as ws: - async for msg in process_trade_msgs(ws): - for trade in msg: - # check the type of packaged message - assert type(trade) == Trade + ) as ws, + trio.open_nursery() as n, + ): + # task for processing inbound requests from ems + n.start_soon(handle_order_requests, client, ems_stream) + # begin trade event processing + async for msg in process_trade_msgs(ws): + for trade in msg: + match trade: # prepare and send a filled status update - filled_msg = BrokerdStatus( - reqid=trade.reqid, - time_ns=time.time_ns(), + case Trade(): + filled_msg = BrokerdStatus( + reqid=trade.reqid, + time_ns=time.time_ns(), - account='kraken.spot', - status='filled', - filled=float(trade.size), - reason='Order filled by kraken', - broker_details={ - 'name': 'kraken', - 'broker_time': trade.broker_time - }, + account=acc_name, + status='filled', + filled=float(trade.size), + reason='Order filled by kraken', + broker_details={ + 'name': 'kraken', + 'broker_time': trade.broker_time + }, - # TODO: figure out if kraken gives a count - # of how many units of underlying were - # filled. Alternatively we can decrement - # this value ourselves by associating and - # calcing from the diff with the original - # client-side request, see: - # https://github.com/pikers/piker/issues/296 - remaining=0, - ) + # TODO: figure out if kraken gives a count + # of how many units of underlying were + # filled. Alternatively we can decrement + # this value ourselves by associating and + # calcing from the diff with the original + # client-side request, see: + # https://github.com/pikers/piker/issues/296 + remaining=0, + ) + await ems_stream.send(filled_msg.dict()) - await ems_stream.send(filled_msg.dict()) + # send a fill msg for gui update + fill_msg = BrokerdFill( + reqid=trade.reqid, + time_ns=time.time_ns(), - # send a fill msg for gui update - fill_msg = BrokerdFill( - reqid=trade.reqid, - time_ns=time.time_ns(), + action=trade.action, + size=float(trade.size), + price=float(trade.price), + # TODO: maybe capture more msg data i.e fees? + broker_details={'name': 'kraken'}, + broker_time=float(trade.broker_time) + ) - action=trade.action, - size=float(trade.size), - price=float(trade.price), - # TODO: maybe capture more msg data i.e fees? - broker_details={'name': 'kraken'}, - broker_time=float(trade.broker_time) - ) + await ems_stream.send(fill_msg.dict()) - await ems_stream.send(fill_msg.dict()) + case _: + log.warning(f'Unhandled trades msg: {trade}') + await tractor.breakpoint() async def process_trade_msgs( ws: NoBsWs, ): ''' - Parse and pack data feed messages. + Parse and pack trades subscription messages, deliver framed + sequences of messages? ''' sequence_counter = 0