Update ledger *after* pps updates from new trades

Addressing same issue as in #350 where we need to compute position
updates using the *first read* from the ledger **before** we update it
to make sure `Position.lifo_update()` gets called and **not skipped**
because new trades were read as clears entries but haven't actually been
included in update calcs yet.. aka we call `Position.lifo_update()`.

Main change here is to convert `update_ledger()` into a context mngr so
that the ledger write is committed after pps updates using
`pp.update_pps_conf()`..

This is basically a hotfix to #346 as well.
tractor_typed_msg_hackin
Tyler Goodlet 2022-07-05 16:39:18 -04:00
parent e901547e9f
commit 65ff9a1fa1
1 changed files with 27 additions and 21 deletions

View File

@ -18,7 +18,10 @@
Order api and machinery Order api and machinery
''' '''
from contextlib import asynccontextmanager as acm from contextlib import (
asynccontextmanager as acm,
contextmanager as cm,
)
from functools import partial from functools import partial
from itertools import chain, count from itertools import chain, count
from pprint import pformat from pprint import pformat
@ -260,13 +263,13 @@ async def trades_dialogue(
log.info( log.info(
f'Loaded {len(trades)} trades from account `{acc_name}`' f'Loaded {len(trades)} trades from account `{acc_name}`'
) )
trans = await update_ledger(acctid, trades) with open_ledger(acctid, trades) as trans:
active, closed = pp.update_pps_conf( active, closed = pp.update_pps_conf(
'kraken', 'kraken',
acctid, acctid,
trade_records=trans, trade_records=trans,
ledger_reload={}.fromkeys(t.bsuid for t in trans), ledger_reload={}.fromkeys(t.bsuid for t in trans),
) )
position_msgs: list[dict] = [] position_msgs: list[dict] = []
pps: dict[int, pp.Position] pps: dict[int, pp.Position]
@ -426,14 +429,14 @@ async def handle_order_updates(
await ems_stream.send(filled_msg.dict()) await ems_stream.send(filled_msg.dict())
# update ledger and position tracking # update ledger and position tracking
trans = await update_ledger(acctid, trades) with open_ledger(acctid, trades) as trans:
active, closed = pp.update_pps_conf( active, closed = pp.update_pps_conf(
'kraken', 'kraken',
acctid, acctid,
trade_records=trans, trade_records=trans,
ledger_reload={}.fromkeys( ledger_reload={}.fromkeys(
t.bsuid for t in trans), t.bsuid for t in trans),
) )
# emit any new pp msgs to ems # emit any new pp msgs to ems
for pos in filter( for pos in filter(
@ -743,7 +746,8 @@ def norm_trade_records(
return records return records
async def update_ledger( @cm
def open_ledger(
acctid: str, acctid: str,
trade_entries: list[dict[str, Any]], trade_entries: list[dict[str, Any]],
@ -756,8 +760,10 @@ async def update_ledger(
'kraken', 'kraken',
acctid, acctid,
) as ledger: ) as ledger:
ledger.update(trade_entries)
# normalize to transaction form # normalize to transaction form
records = norm_trade_records(trade_entries) records = norm_trade_records(trade_entries)
return records yield records
# update on exit
ledger.update(trade_entries)