diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 90b40b2a..96f8101c 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -265,15 +265,14 @@ class Client: data['nonce'] = str(int(1000*time.time())) return await self._private(method, data, uri_path) - async def get_positions( + async def get_trades( self, data: Dict[str, Any] = {} - ) -> (Dict[str, Any], Dict[str, Any]): + ) -> Dict[str, Any]: data['ofs'] = 0 - positions = {} - vols = {} # Grab all trade history # https://docs.kraken.com/rest/#operation/getTradeHistory + # Kraken uses 'ofs' to refer to the offset while True: resp = await self.endpoint('TradesHistory', data) # grab the first 50 trades @@ -298,31 +297,7 @@ class Client: # make sure you grabbed all the trades assert count == len(trades.values()) - # positions - # TODO: Make sure to add option to include fees in positions calc - for trade in trades.values(): - sign = -1 if trade['type'] == 'sell' else 1 - # This catch is for populating the dict with new values - # as the plus assigment will fail if there no value - # tied to the key - try: - positions[trade['pair']] += sign * float(trade['cost']) - vols[trade['pair']] += sign * float(trade['vol']) - except KeyError: - positions[trade['pair']] = sign * float(trade['cost']) - vols[trade['pair']] = sign * float(trade['vol']) - # This cycles through the summed trades of an asset and then - # normalizes the price with the current volume of the asset - # you are holding. If you have no more of the asset, the balance - # is 0, then it sets the position to 0. - for pair in positions.keys(): - asset_balance = vols[pair] - if asset_balance == 0: - positions[pair] = 0 - else: - positions[pair] /= asset_balance - - return positions, vols + return trades async def submit_limit( self, @@ -488,21 +463,49 @@ async def get_client() -> Client: yield client -def pack_position( +def pack_positions( acc: str, - symkey: str, - pos: float, - vol: float -) -> dict[str, Any]: + trades: dict +) -> list[Any]: + positions: dict[str, float] = {} + vols: dict[str, float] = {} + costs: dict[str, float] = {} + position_msgs: list[Any] = [] - return BrokerdPosition( - broker='kraken', - account=acc, - symbol=symkey, - currency=symkey[-3:], - size=float(vol), - avg_price=float(pos), - ) + for trade in trades.values(): + sign = -1 if trade['type'] == 'sell' else 1 + # This catch is for populating the dict with new values + # as the plus assigment will fail if there no value + # tied to the key + pair = trade['pair'] + vol = float(trade['vol']) + # This is for the initial addition of a pair so the + # += operation does not fail. + vols[pair] = vols.setdefault(pair, 0) + costs[pair] = costs.setdefault(pair, 0) + positions[pair] = positions.setdefault(pair, 0) + vols[pair] += sign * vol + costs[pair] += sign * float(trade['cost']) + if vols[pair] != 0: + positions[pair] = costs[pair] / vols[pair] + else: + positions[pair] = 0 + + for ticker, pos in positions.items(): + norm_sym = normalize_symbol(ticker) + vol = float(vols[ticker]) + if vol != 0: + msg = BrokerdPosition( + broker='kraken', + account=acc, + symbol=norm_sym, + currency=norm_sym[-3:], + size=vol, + avg_price=float(pos), + ) + position_msgs.append(msg.dict()) + + return position_msgs def normalize_symbol( @@ -667,8 +670,6 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - # Generate - @asynccontextmanager async def subscribe(ws: wsproto.WSConnection, token: str): # XXX: setup subs @@ -699,17 +700,11 @@ async def trades_dialogue( # Authenticated block async with get_client() as client: acc_name = 'kraken.' + client._name - positions, vols = await client.get_positions() + trades = await client.get_trades() - all_positions = [] + position_msgs = pack_positions(acc_name, trades) - for ticker, pos in positions.items(): - norm_sym = normalize_symbol(ticker) - if float(vols[ticker]) != 0: - msg = pack_position(acc_name, norm_sym, pos, vols[ticker]) - all_positions.append(msg.dict()) - - await ctx.started((all_positions, (acc_name,))) + await ctx.started((position_msgs, (acc_name,))) # Get websocket token for authenticated data stream # Assert that a token was actually received