Update ledger *after* pps updates from new trades
Addressing same issue as in #350 where we need to compute position updates using the *first read* from the ledger **before** we update it to make sure `Position.lifo_update()` gets called and **not skipped** because new trades were read as clears entries but haven't actually been included in update calcs yet.. aka we call `Position.lifo_update()`. Main change here is to convert `update_ledger()` into a context mngr so that the ledger write is committed after pps updates using `pp.update_pps_conf()`.. This is basically a hotfix to #346 as well.kraken_ws_orders
parent
9fa9c27e4d
commit
bbcc55b24c
|
@ -18,7 +18,10 @@
|
||||||
Order api and machinery
|
Order api and machinery
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
contextmanager as cm,
|
||||||
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from itertools import chain, count
|
from itertools import chain, count
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
@ -259,13 +262,13 @@ async def trades_dialogue(
|
||||||
log.info(
|
log.info(
|
||||||
f'Loaded {len(trades)} trades from account `{acc_name}`'
|
f'Loaded {len(trades)} trades from account `{acc_name}`'
|
||||||
)
|
)
|
||||||
trans = await update_ledger(acctid, trades)
|
with open_ledger(acctid, trades) as trans:
|
||||||
active, closed = pp.update_pps_conf(
|
active, closed = pp.update_pps_conf(
|
||||||
'kraken',
|
'kraken',
|
||||||
acctid,
|
acctid,
|
||||||
trade_records=trans,
|
trade_records=trans,
|
||||||
ledger_reload={}.fromkeys(t.bsuid for t in trans),
|
ledger_reload={}.fromkeys(t.bsuid for t in trans),
|
||||||
)
|
)
|
||||||
|
|
||||||
position_msgs: list[dict] = []
|
position_msgs: list[dict] = []
|
||||||
pps: dict[int, pp.Position]
|
pps: dict[int, pp.Position]
|
||||||
|
@ -425,14 +428,14 @@ async def handle_order_updates(
|
||||||
await ems_stream.send(filled_msg)
|
await ems_stream.send(filled_msg)
|
||||||
|
|
||||||
# update ledger and position tracking
|
# update ledger and position tracking
|
||||||
trans = await update_ledger(acctid, trades)
|
with open_ledger(acctid, trades) as trans:
|
||||||
active, closed = pp.update_pps_conf(
|
active, closed = pp.update_pps_conf(
|
||||||
'kraken',
|
'kraken',
|
||||||
acctid,
|
acctid,
|
||||||
trade_records=trans,
|
trade_records=trans,
|
||||||
ledger_reload={}.fromkeys(
|
ledger_reload={}.fromkeys(
|
||||||
t.bsuid for t in trans),
|
t.bsuid for t in trans),
|
||||||
)
|
)
|
||||||
|
|
||||||
# emit any new pp msgs to ems
|
# emit any new pp msgs to ems
|
||||||
for pos in filter(
|
for pos in filter(
|
||||||
|
@ -742,7 +745,8 @@ def norm_trade_records(
|
||||||
return records
|
return records
|
||||||
|
|
||||||
|
|
||||||
async def update_ledger(
|
@cm
|
||||||
|
def open_ledger(
|
||||||
acctid: str,
|
acctid: str,
|
||||||
trade_entries: list[dict[str, Any]],
|
trade_entries: list[dict[str, Any]],
|
||||||
|
|
||||||
|
@ -755,8 +759,10 @@ async def update_ledger(
|
||||||
'kraken',
|
'kraken',
|
||||||
acctid,
|
acctid,
|
||||||
) as ledger:
|
) as ledger:
|
||||||
ledger.update(trade_entries)
|
|
||||||
|
|
||||||
# normalize to transaction form
|
# normalize to transaction form
|
||||||
records = norm_trade_records(trade_entries)
|
records = norm_trade_records(trade_entries)
|
||||||
return records
|
yield records
|
||||||
|
|
||||||
|
# update on exit
|
||||||
|
ledger.update(trade_entries)
|
||||||
|
|
Loading…
Reference in New Issue