diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 6ac8083b..a39086de 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -18,7 +18,10 @@ Order api and machinery ''' -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + contextmanager as cm, +) from functools import partial from itertools import chain, count from pprint import pformat @@ -260,13 +263,13 @@ async def trades_dialogue( log.info( f'Loaded {len(trades)} trades from account `{acc_name}`' ) - trans = await update_ledger(acctid, trades) - active, closed = pp.update_pps_conf( - 'kraken', - acctid, - trade_records=trans, - ledger_reload={}.fromkeys(t.bsuid for t in trans), - ) + with open_ledger(acctid, trades) as trans: + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys(t.bsuid for t in trans), + ) position_msgs: list[dict] = [] pps: dict[int, pp.Position] @@ -426,14 +429,14 @@ async def handle_order_updates( await ems_stream.send(filled_msg.dict()) # update ledger and position tracking - trans = await update_ledger(acctid, trades) - active, closed = pp.update_pps_conf( - 'kraken', - acctid, - trade_records=trans, - ledger_reload={}.fromkeys( - t.bsuid for t in trans), - ) + with open_ledger(acctid, trades) as trans: + active, closed = pp.update_pps_conf( + 'kraken', + acctid, + trade_records=trans, + ledger_reload={}.fromkeys( + t.bsuid for t in trans), + ) # emit any new pp msgs to ems for pos in filter( @@ -743,7 +746,8 @@ def norm_trade_records( return records -async def update_ledger( +@cm +def open_ledger( acctid: str, trade_entries: list[dict[str, Any]], @@ -756,8 +760,10 @@ async def update_ledger( 'kraken', acctid, ) as ledger: - ledger.update(trade_entries) - # normalize to transaction form - records = norm_trade_records(trade_entries) - return records + # normalize to transaction form + records = norm_trade_records(trade_entries) + yield records + + # update on exit + ledger.update(trade_entries)