From 3765c61f2dec7c3e25620a7b55b7e809b8a91335 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Jul 2022 16:39:18 -0400 Subject: [PATCH] Update ledger *after* pps updates from new trades Addressing same issue as in #350 where we need to compute position updates using the *first read* from the ledger **before** we update it to make sure `Position.lifo_update()` gets called and **not skipped** because new trades were read as clears entries but haven't actually been included in update calcs yet.. aka we call `Position.lifo_update()`. Main change here is to convert `update_ledger()` into a context mngr so that the ledger write is committed after pps updates using `pp.update_pps_conf()`.. This is basically a hotfix to #346 as well. --- piker/brokers/kraken/broker.py | 48 +++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 6ac8083b..a39086de 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -18,7 +18,10 @@ Order api and machinery ''' -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, +) from functools import partial from itertools import chain, count from pprint import pformat @@ -260,13 +263,13 @@ async def trades_dialogue( log.info( f'Loaded {len(trades)} trades from account `{acc_name}`' ) - trans = await update_ledger(acctid, trades) - active, closed = pp.update_pps_conf( - 'kraken', - acctid, - trade_records=trans, - ledger_reload={}.fromkeys(t.bsuid for t in trans), - ) + with open_ledger(acctid, trades) as trans: + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys(t.bsuid for t in trans), + ) position_msgs: list[dict] = [] pps: dict[int, pp.Position] @@ -426,14 +429,14 @@ async def handle_order_updates( await ems_stream.send(filled_msg.dict()) # update ledger and position tracking - trans = await update_ledger(acctid, trades) - active, closed = pp.update_pps_conf( - 'kraken', - acctid, - trade_records=trans, - ledger_reload={}.fromkeys( - t.bsuid for t in trans), - ) + with open_ledger(acctid, trades) as trans: + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys( + t.bsuid for t in trans), + ) # emit any new pp msgs to ems for pos in filter( @@ -743,7 +746,8 @@ def norm_trade_records( return records -async def update_ledger( +@cm +def open_ledger( acctid: str, trade_entries: list[dict[str, Any]], @@ -756,8 +760,10 @@ async def update_ledger( 'kraken', acctid, ) as ledger: - ledger.update(trade_entries) - # normalize to transaction form - records = norm_trade_records(trade_entries) - return records + # normalize to transaction form + records = norm_trade_records(trade_entries) + yield records + + # update on exit + ledger.update(trade_entries)