`ib`: never override existing ledger records
If user has loaded from a flex report then we don't want the API records from the same period to override those; instead just update with any missing fields from the API schema. Also, always `str`-ify the contract id (what is set for the `.bs_mktid` *before* packing into transaction type to ensure when serialized to `pps.toml` there are no discrepancies at the codec level.. smhpre_overruns_ctxcancelled
							parent
							
								
									366de901df
								
							
						
					
					
						commit
						4236e5c3b1
					
				| 
						 | 
				
			
			@ -342,12 +342,6 @@ async def update_and_audit_msgs(
 | 
			
		|||
        # retreive equivalent ib reported position message
 | 
			
		||||
        # for comparison/audit versus the piker equivalent
 | 
			
		||||
        # breakeven pp calcs.
 | 
			
		||||
        # if (
 | 
			
		||||
        #     acctid == 'reg'
 | 
			
		||||
        #     and bs_mktid == 36285627
 | 
			
		||||
        # ):
 | 
			
		||||
        #     await tractor.breakpoint()
 | 
			
		||||
 | 
			
		||||
        ibppmsg = cids2pps.get((acctid, bs_mktid))
 | 
			
		||||
 | 
			
		||||
        if ibppmsg:
 | 
			
		||||
| 
						 | 
				
			
			@ -777,15 +771,15 @@ async def emit_pp_update(
 | 
			
		|||
    proxies: dict,
 | 
			
		||||
    cids2pps: dict,
 | 
			
		||||
 | 
			
		||||
    ledgers,
 | 
			
		||||
    tables,
 | 
			
		||||
    ledgers: dict[str, dict[str, Any]],
 | 
			
		||||
    tables: dict[str, PpTable],
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
    # compute and relay incrementally updated piker pp
 | 
			
		||||
    accounts_def_inv: bidict[str, str] = accounts_def.inverse
 | 
			
		||||
    acctid = accounts_def_inv[trade_entry['execution']['acctNumber']]
 | 
			
		||||
    proxy = proxies[acctid]
 | 
			
		||||
    fq_acctid = accounts_def_inv[trade_entry['execution']['acctNumber']]
 | 
			
		||||
    proxy = proxies[fq_acctid]
 | 
			
		||||
    (
 | 
			
		||||
        records_by_acct,
 | 
			
		||||
        api_to_ledger_entries,
 | 
			
		||||
| 
						 | 
				
			
			@ -794,9 +788,10 @@ async def emit_pp_update(
 | 
			
		|||
        proxy,
 | 
			
		||||
        accounts_def_inv,
 | 
			
		||||
    )
 | 
			
		||||
    trans = records_by_acct[acctid]
 | 
			
		||||
    trans = records_by_acct[fq_acctid]
 | 
			
		||||
    r = list(trans.values())[0]
 | 
			
		||||
 | 
			
		||||
    acctid = fq_acctid.strip('ib.')
 | 
			
		||||
    table = tables[acctid]
 | 
			
		||||
    table.update_from_trans(trans)
 | 
			
		||||
    active, closed = table.dump_active()
 | 
			
		||||
| 
						 | 
				
			
			@ -804,7 +799,11 @@ async def emit_pp_update(
 | 
			
		|||
    # NOTE: update ledger with all new trades
 | 
			
		||||
    for acctid, trades_by_id in api_to_ledger_entries.items():
 | 
			
		||||
        ledger = ledgers[acctid]
 | 
			
		||||
        ledger.update(trades_by_id)
 | 
			
		||||
 | 
			
		||||
        for tid, tdict in trades_by_id.items():
 | 
			
		||||
            # NOTE: don't override flex/previous entries with new API
 | 
			
		||||
            # ones, just update with new fields!
 | 
			
		||||
            ledger.setdefaults(tid, {}).update(tdict)
 | 
			
		||||
 | 
			
		||||
    # generate pp msgs and cross check with ib's positions data, relay
 | 
			
		||||
    # re-formatted pps as msgs to the ems.
 | 
			
		||||
| 
						 | 
				
			
			@ -909,8 +908,8 @@ async def deliver_trade_events(
 | 
			
		|||
                # https://github.com/erdewit/ib_insync/issues/363
 | 
			
		||||
                # acctid = accounts_def.inverse[trade.order.account]
 | 
			
		||||
 | 
			
		||||
                # # double check there is no error when
 | 
			
		||||
                # # cancelling.. gawwwd
 | 
			
		||||
                # double check there is no error when
 | 
			
		||||
                # cancelling.. gawwwd
 | 
			
		||||
                # if ib_status_key == 'cancelled':
 | 
			
		||||
                #     last_log = trade.log[-1]
 | 
			
		||||
                #     if (
 | 
			
		||||
| 
						 | 
				
			
			@ -1050,6 +1049,7 @@ async def deliver_trade_events(
 | 
			
		|||
                        accounts_def,
 | 
			
		||||
                        proxies,
 | 
			
		||||
                        cids2pps,
 | 
			
		||||
 | 
			
		||||
                        ledgers,
 | 
			
		||||
                        tables,
 | 
			
		||||
                    )
 | 
			
		||||
| 
						 | 
				
			
			@ -1084,6 +1084,7 @@ async def deliver_trade_events(
 | 
			
		|||
                        accounts_def,
 | 
			
		||||
                        proxies,
 | 
			
		||||
                        cids2pps,
 | 
			
		||||
 | 
			
		||||
                        ledgers,
 | 
			
		||||
                        tables,
 | 
			
		||||
                    )
 | 
			
		||||
| 
						 | 
				
			
			@ -1145,7 +1146,7 @@ async def deliver_trade_events(
 | 
			
		|||
def norm_trade_records(
 | 
			
		||||
    ledger: dict[str, Any],
 | 
			
		||||
 | 
			
		||||
) -> list[Transaction]:
 | 
			
		||||
) -> dict[str, Transaction]:
 | 
			
		||||
    '''
 | 
			
		||||
    Normalize a flex report or API retrieved executions
 | 
			
		||||
    ledger into our standard record format.
 | 
			
		||||
| 
						 | 
				
			
			@ -1275,7 +1276,7 @@ def norm_trade_records(
 | 
			
		|||
                cost=comms,
 | 
			
		||||
                dt=dt,
 | 
			
		||||
                expiry=expiry,
 | 
			
		||||
                bs_mktid=conid,
 | 
			
		||||
                bs_mktid=str(conid),
 | 
			
		||||
            ),
 | 
			
		||||
            key=lambda t: t.dt
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue