diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index c77f2140..bd992c95 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -413,20 +413,27 @@ async def trades_dialogue( ) -> AsyncIterator[dict[str, Any]]: - # XXX: required to propagate ``tractor`` loglevel to piker logging + # XXX: required to propagate ``tractor`` loglevel to ``piker`` logging get_console_log(loglevel or tractor.current_actor().loglevel) async with get_client() as client: - # TODO: make ems flip to paper mode via - # some returned signal if the user only wants to use - # the data feed or we return this? - # await ctx.started(({}, ['paper'])) - if not client._api_key: raise RuntimeError( 'Missing Kraken API key in `brokers.toml`!?!?') + # TODO: make ems flip to paper mode via + # some returned signal if the user only wants to use + # the data feed or we return this? + # else: + # await ctx.started(({}, ['paper'])) + + # NOTE: currently we expect the user to define a "source fiat" + # (much like the web UI let's you set an "account currency") + # such that all positions (nested or flat) will be translated to + # this source currency's terms. + src_fiat = client.conf['src_fiat'] + # auth required block acctid = client._name acc_name = 'kraken.' + acctid @@ -444,10 +451,9 @@ async def trades_dialogue( # NOTE: testing code for making sure the rt incremental update # of positions, via newly generated msgs works. In order to test # this, - # - delete the *ABSOLUTE LAST* entry from accont's corresponding + # - delete the *ABSOLUTE LAST* entry from account's corresponding # trade ledgers file (NOTE this MUST be the last record - # delivered from the - # api ledger), + # delivered from the api ledger), # - open you ``pps.toml`` and find that same tid and delete it # from the pp's clears table, # - set this flag to `True` @@ -486,27 +492,51 @@ async def trades_dialogue( # and do diff with ledger to determine # what amount of trades-transactions need # to be reloaded. - sizes = await client.get_balances() - for dst, size in sizes.items(): + balances = await client.get_balances() + for dst, size in balances.items(): # we don't care about tracking positions # in the user's source fiat currency. - if dst == client.conf['src_fiat']: + if dst == src_fiat: continue - def has_pp(dst: str) -> Position | bool: - pps_dst_assets = {bsuid[:3]: bsuid for bsuid in table.pps} - pair = pps_dst_assets.get(dst) - pp = table.pps.get(pair) + def has_pp( + dst: str, + size: float, - if ( - not pair or not pp - or not math.isclose(pp.size, size) - ): - return False + ) -> Position | bool: - return pp + src2dst: dict[str, str] = {} + for bsuid in table.pps: + try: + dst_name_start = bsuid.rindex(src_fiat) + except IndexError: + # TODO: handle nested positions..(i.e. + # positions where the src fiat was used to + # buy some other dst which was furhter used + # to buy another dst..) + log.warning( + f'No src fiat {src_fiat} found in {bsuid}?' + ) + continue - pos = has_pp(dst) + _dst = bsuid[:dst_name_start] + if _dst != dst: + continue + + src2dst[src_fiat] = dst + + for src, dst in src2dst.items(): + pair = f'{dst}{src_fiat}' + pp = table.pps.get(pair) + if ( + pp + and math.isclose(pp.size, size) + ): + return pp + + return False + + pos = has_pp(dst, size) if not pos: # we have a balance for which there is no pp @@ -514,12 +544,15 @@ async def trades_dialogue( # ledger. updated = table.update_from_trans(ledger_trans) log.info(f'Updated pps from ledger:\n{pformat(updated)}') - pos = has_pp(dst) + pos = has_pp(dst, size) - if not pos and not simulate_pp_update: + if ( + not pos + and not simulate_pp_update + ): # try reloading from API table.update_from_trans(api_trans) - pos = has_pp(dst) + pos = has_pp(dst, size) if not pos: # get transfers to make sense of abs balances. @@ -557,7 +590,7 @@ async def trades_dialogue( f'{pformat(updated)}' ) - if not has_pp(dst): + if not has_pp(dst, size): raise ValueError( 'Could not reproduce balance:\n' f'dst: {dst}, {size}\n'