Support pp audits with multiple accounts
parent
2063b9d8bb
commit
f32b4d37cb
|
@ -336,8 +336,9 @@ async def update_ledger_from_api_trades(
|
||||||
|
|
||||||
|
|
||||||
async def update_and_audit(
|
async def update_and_audit(
|
||||||
|
acctid: str,
|
||||||
by_fqsn: dict[str, pp.Position],
|
by_fqsn: dict[str, pp.Position],
|
||||||
cids2pps: dict[int, BrokerdPosition],
|
cids2pps: dict[tuple[str, int], BrokerdPosition],
|
||||||
validate: bool = False,
|
validate: bool = False,
|
||||||
|
|
||||||
) -> list[BrokerdPosition]:
|
) -> list[BrokerdPosition]:
|
||||||
|
@ -355,7 +356,7 @@ async def update_and_audit(
|
||||||
# retreive equivalent ib reported position message
|
# retreive equivalent ib reported position message
|
||||||
# for comparison/audit versus the piker equivalent
|
# for comparison/audit versus the piker equivalent
|
||||||
# breakeven pp calcs.
|
# breakeven pp calcs.
|
||||||
ibppmsg = cids2pps[bsuid]
|
ibppmsg = cids2pps[(acctid, bsuid)]
|
||||||
|
|
||||||
msg = BrokerdPosition(
|
msg = BrokerdPosition(
|
||||||
broker='ib',
|
broker='ib',
|
||||||
|
@ -466,7 +467,7 @@ async def trades_dialogue(
|
||||||
cid, msg = pack_position(pos)
|
cid, msg = pack_position(pos)
|
||||||
acctid = msg.account = accounts_def.inverse[msg.account]
|
acctid = msg.account = accounts_def.inverse[msg.account]
|
||||||
active_accts.add(acctid)
|
active_accts.add(acctid)
|
||||||
cids2pps[cid] = msg
|
cids2pps[(acctid.strip('ib.'), cid)] = msg
|
||||||
assert msg.account in accounts, (
|
assert msg.account in accounts, (
|
||||||
f'Position for unknown account: {msg.account}')
|
f'Position for unknown account: {msg.account}')
|
||||||
|
|
||||||
|
@ -485,7 +486,12 @@ async def trades_dialogue(
|
||||||
pps_by_account = pp.get_pps('ib', acctids=active_accts)
|
pps_by_account = pp.get_pps('ib', acctids=active_accts)
|
||||||
|
|
||||||
for acctid, by_fqsn in pps_by_account.items():
|
for acctid, by_fqsn in pps_by_account.items():
|
||||||
msgs = await update_and_audit(by_fqsn, cids2pps, validate=True)
|
msgs = await update_and_audit(
|
||||||
|
acctid,
|
||||||
|
by_fqsn,
|
||||||
|
cids2pps,
|
||||||
|
validate=True,
|
||||||
|
)
|
||||||
all_positions.extend(msg.dict() for msg in msgs)
|
all_positions.extend(msg.dict() for msg in msgs)
|
||||||
|
|
||||||
if not all_positions and cids2pps:
|
if not all_positions and cids2pps:
|
||||||
|
@ -551,6 +557,7 @@ async def emit_pp_update(
|
||||||
if p.bsuid == trade_entry['contract']['conId']:
|
if p.bsuid == trade_entry['contract']['conId']:
|
||||||
# should only be one right?
|
# should only be one right?
|
||||||
msgs = await update_and_audit(
|
msgs = await update_and_audit(
|
||||||
|
acctid,
|
||||||
{fqsn: p},
|
{fqsn: p},
|
||||||
cids2pps,
|
cids2pps,
|
||||||
validate=False,
|
validate=False,
|
||||||
|
@ -566,7 +573,7 @@ async def deliver_trade_events(
|
||||||
trade_event_stream: trio.MemoryReceiveChannel,
|
trade_event_stream: trio.MemoryReceiveChannel,
|
||||||
ems_stream: tractor.MsgStream,
|
ems_stream: tractor.MsgStream,
|
||||||
accounts_def: dict[str, str],
|
accounts_def: dict[str, str],
|
||||||
cids2pps: dict[str, BrokerdPosition],
|
cids2pps: dict[tuple[str, str], BrokerdPosition],
|
||||||
proxies: dict[str, MethodProxy],
|
proxies: dict[str, MethodProxy],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
Loading…
Reference in New Issue