`kraken`: don't presume src fiat symbol size in pos predicate

misc_brokerd_backend_repairs
Tyler Goodlet 2022-12-21 11:08:05 -05:00
parent f3ef73ef41
commit 70ad1a1860
1 changed files with 60 additions and 27 deletions

View File

@ -413,20 +413,27 @@ async def trades_dialogue(
) -> AsyncIterator[dict[str, Any]]: ) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to ``piker`` logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
async with get_client() as client: async with get_client() as client:
# TODO: make ems flip to paper mode via
# some returned signal if the user only wants to use
# the data feed or we return this?
# await ctx.started(({}, ['paper']))
if not client._api_key: if not client._api_key:
raise RuntimeError( raise RuntimeError(
'Missing Kraken API key in `brokers.toml`!?!?') 'Missing Kraken API key in `brokers.toml`!?!?')
# TODO: make ems flip to paper mode via
# some returned signal if the user only wants to use
# the data feed or we return this?
# else:
# await ctx.started(({}, ['paper']))
# NOTE: currently we expect the user to define a "source fiat"
# (much like the web UI let's you set an "account currency")
# such that all positions (nested or flat) will be translated to
# this source currency's terms.
src_fiat = client.conf['src_fiat']
# auth required block # auth required block
acctid = client._name acctid = client._name
acc_name = 'kraken.' + acctid acc_name = 'kraken.' + acctid
@ -444,10 +451,9 @@ async def trades_dialogue(
# NOTE: testing code for making sure the rt incremental update # NOTE: testing code for making sure the rt incremental update
# of positions, via newly generated msgs works. In order to test # of positions, via newly generated msgs works. In order to test
# this, # this,
# - delete the *ABSOLUTE LAST* entry from accont's corresponding # - delete the *ABSOLUTE LAST* entry from account's corresponding
# trade ledgers file (NOTE this MUST be the last record # trade ledgers file (NOTE this MUST be the last record
# delivered from the # delivered from the api ledger),
# api ledger),
# - open you ``pps.toml`` and find that same tid and delete it # - open you ``pps.toml`` and find that same tid and delete it
# from the pp's clears table, # from the pp's clears table,
# - set this flag to `True` # - set this flag to `True`
@ -486,27 +492,51 @@ async def trades_dialogue(
# and do diff with ledger to determine # and do diff with ledger to determine
# what amount of trades-transactions need # what amount of trades-transactions need
# to be reloaded. # to be reloaded.
sizes = await client.get_balances() balances = await client.get_balances()
for dst, size in sizes.items(): for dst, size in balances.items():
# we don't care about tracking positions # we don't care about tracking positions
# in the user's source fiat currency. # in the user's source fiat currency.
if dst == client.conf['src_fiat']: if dst == src_fiat:
continue continue
def has_pp(dst: str) -> Position | bool: def has_pp(
pps_dst_assets = {bsuid[:3]: bsuid for bsuid in table.pps} dst: str,
pair = pps_dst_assets.get(dst) size: float,
pp = table.pps.get(pair)
if ( ) -> Position | bool:
not pair or not pp
or not math.isclose(pp.size, size)
):
return False
return pp src2dst: dict[str, str] = {}
for bsuid in table.pps:
try:
dst_name_start = bsuid.rindex(src_fiat)
except IndexError:
# TODO: handle nested positions..(i.e.
# positions where the src fiat was used to
# buy some other dst which was furhter used
# to buy another dst..)
log.warning(
f'No src fiat {src_fiat} found in {bsuid}?'
)
continue
pos = has_pp(dst) _dst = bsuid[:dst_name_start]
if _dst != dst:
continue
src2dst[src_fiat] = dst
for src, dst in src2dst.items():
pair = f'{dst}{src_fiat}'
pp = table.pps.get(pair)
if (
pp
and math.isclose(pp.size, size)
):
return pp
return False
pos = has_pp(dst, size)
if not pos: if not pos:
# we have a balance for which there is no pp # we have a balance for which there is no pp
@ -514,12 +544,15 @@ async def trades_dialogue(
# ledger. # ledger.
updated = table.update_from_trans(ledger_trans) updated = table.update_from_trans(ledger_trans)
log.info(f'Updated pps from ledger:\n{pformat(updated)}') log.info(f'Updated pps from ledger:\n{pformat(updated)}')
pos = has_pp(dst) pos = has_pp(dst, size)
if not pos and not simulate_pp_update: if (
not pos
and not simulate_pp_update
):
# try reloading from API # try reloading from API
table.update_from_trans(api_trans) table.update_from_trans(api_trans)
pos = has_pp(dst) pos = has_pp(dst, size)
if not pos: if not pos:
# get transfers to make sense of abs balances. # get transfers to make sense of abs balances.
@ -557,7 +590,7 @@ async def trades_dialogue(
f'{pformat(updated)}' f'{pformat(updated)}'
) )
if not has_pp(dst): if not has_pp(dst, size):
raise ValueError( raise ValueError(
'Could not reproduce balance:\n' 'Could not reproduce balance:\n'
f'dst: {dst}, {size}\n' f'dst: {dst}, {size}\n'