`ib`: never override existing ledger records

If user has loaded from a flex report then we don't want the API records
from the same period to override those; instead just update with any
missing fields from the API schema.

Also, always `str`-ify the contract id (what is set for the `.bs_mktid`
*before* packing into transaction type to ensure when serialized to
`pps.toml` there are no discrepancies at the codec level.. smh
rekt_pps
Tyler Goodlet 2023-03-23 12:52:53 -04:00
parent f3049016d6
commit aa5f25231a
1 changed files with 17 additions and 16 deletions

View File

@ -342,12 +342,6 @@ async def update_and_audit_msgs(
# retreive equivalent ib reported position message
# for comparison/audit versus the piker equivalent
# breakeven pp calcs.
# if (
# acctid == 'reg'
# and bs_mktid == 36285627
# ):
# await tractor.breakpoint()
ibppmsg = cids2pps.get((acctid, bs_mktid))
if ibppmsg:
@ -777,15 +771,15 @@ async def emit_pp_update(
proxies: dict,
cids2pps: dict,
ledgers,
tables,
ledgers: dict[str, dict[str, Any]],
tables: dict[str, PpTable],
) -> None:
# compute and relay incrementally updated piker pp
accounts_def_inv: bidict[str, str] = accounts_def.inverse
acctid = accounts_def_inv[trade_entry['execution']['acctNumber']]
proxy = proxies[acctid]
fq_acctid = accounts_def_inv[trade_entry['execution']['acctNumber']]
proxy = proxies[fq_acctid]
(
records_by_acct,
api_to_ledger_entries,
@ -794,9 +788,10 @@ async def emit_pp_update(
proxy,
accounts_def_inv,
)
trans = records_by_acct[acctid]
trans = records_by_acct[fq_acctid]
r = list(trans.values())[0]
acctid = fq_acctid.strip('ib.')
table = tables[acctid]
table.update_from_trans(trans)
active, closed = table.dump_active()
@ -804,7 +799,11 @@ async def emit_pp_update(
# NOTE: update ledger with all new trades
for acctid, trades_by_id in api_to_ledger_entries.items():
ledger = ledgers[acctid]
ledger.update(trades_by_id)
for tid, tdict in trades_by_id.items():
# NOTE: don't override flex/previous entries with new API
# ones, just update with new fields!
ledger.setdefaults(tid, {}).update(tdict)
# generate pp msgs and cross check with ib's positions data, relay
# re-formatted pps as msgs to the ems.
@ -909,8 +908,8 @@ async def deliver_trade_events(
# https://github.com/erdewit/ib_insync/issues/363
# acctid = accounts_def.inverse[trade.order.account]
# # double check there is no error when
# # cancelling.. gawwwd
# double check there is no error when
# cancelling.. gawwwd
# if ib_status_key == 'cancelled':
# last_log = trade.log[-1]
# if (
@ -1050,6 +1049,7 @@ async def deliver_trade_events(
accounts_def,
proxies,
cids2pps,
ledgers,
tables,
)
@ -1084,6 +1084,7 @@ async def deliver_trade_events(
accounts_def,
proxies,
cids2pps,
ledgers,
tables,
)
@ -1145,7 +1146,7 @@ async def deliver_trade_events(
def norm_trade_records(
ledger: dict[str, Any],
) -> list[Transaction]:
) -> dict[str, Transaction]:
'''
Normalize a flex report or API retrieved executions
ledger into our standard record format.
@ -1275,7 +1276,7 @@ def norm_trade_records(
cost=comms,
dt=dt,
expiry=expiry,
bs_mktid=conid,
bs_mktid=str(conid),
),
key=lambda t: t.dt
)