Add `update_and_audit()` in prep for rt per-trade-event pp udpates

lifo_pps_ib
Tyler Goodlet 2022-06-15 11:56:49 -04:00
parent 7b2e8f1ba5
commit 3991d8f911
1 changed files with 102 additions and 72 deletions

View File

@ -267,8 +267,9 @@ async def recv_trade_updates(
async def update_ledger_from_api_trades( async def update_ledger_from_api_trades(
clients: list[Union[Client, MethodProxy]], trade_entries: dict[str, Any],
ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg
client: Union[Client, MethodProxy],
) -> dict[str, Any]: ) -> dict[str, Any]:
@ -279,10 +280,10 @@ async def update_ledger_from_api_trades(
# retreive new trade executions from the last session # retreive new trade executions from the last session
# and/or day's worth of trading and convert into trade # and/or day's worth of trading and convert into trade
# records suitable for a local ledger file. # records suitable for a local ledger file.
trades_by_account: dict = {} # trades_by_account: dict = {}
for client in clients: # for client in clients:
trade_entries = await client.trades() # trade_entries = await client.trades()
# XXX; ERRGGG.. # XXX; ERRGGG..
# pack in the "primary/listing exchange" value from a # pack in the "primary/listing exchange" value from a
@ -303,10 +304,10 @@ async def update_ledger_from_api_trades(
conf['accounts'].inverse, conf['accounts'].inverse,
trade_entries, trade_entries,
) )
trades_by_account.update(records) # trades_by_account.update(records)
# write recent session's trades to the user's (local) ledger file. # write recent session's trades to the user's (local) ledger file.
for acctid, trades_by_id in trades_by_account.items(): for acctid, trades_by_id in records.items():
with pp.open_trade_ledger('ib', acctid) as ledger: with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id) ledger.update(trades_by_id)
@ -327,7 +328,76 @@ async def update_ledger_from_api_trades(
) )
r.fqsn = normed_msg.symbol r.fqsn = normed_msg.symbol
pp.update_pps_conf('ib', acctid, records) active = pp.update_pps_conf('ib', acctid, records)
return active
async def update_and_audit(
by_fqsn: dict[str, pp.Position],
cids2pps: dict[int, BrokerdPosition],
) -> list[BrokerdPosition]:
msgs: list[BrokerdPosition] = []
pps: dict[int, pp.Position] = {}
for fqsn, p in by_fqsn.items():
bsuid = p.bsuid
# build trade-session-actor local table
# of pps from unique symbol ids.
pps[bsuid] = p
# retreive equivalent ib reported position message
# for comparison/audit versus the piker equivalent
# breakeven pp calcs.
ibppmsg = cids2pps[bsuid]
msg = BrokerdPosition(
broker='ib',
# XXX: ok so this is annoying, we're relaying
# an account name with the backend suffix prefixed
# but when reading accounts from ledgers we don't
# need it and/or it's prefixed in the section
# table..
account=ibppmsg.account,
# XXX: the `.ib` is stripped..?
symbol=ibppmsg.symbol,
currency=ibppmsg.currency,
size=p.size,
avg_price=p.avg_price,
)
ibsize = ibppmsg.size
pikersize = msg.size
diff = pikersize - ibsize
# if ib reports a lesser pp it's not as bad since we can
# presume we're at least not more in the shit then we
# thought.
if diff:
raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {msg}\n'
f'piker: {ibppmsg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
)
msg.size = ibsize
if ibppmsg.avg_price != msg.avg_price:
# TODO: make this a "propoganda" log level?
log.warning(
'The mega-cucks at IB want you to believe with their '
f'"FIFO" positioning for {msg.symbol}:\n'
f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n'
f'piker, LIFO breakeven PnL price: {msg.avg_price}'
)
msgs.append(msg)
return msgs
@tractor.context @tractor.context
@ -389,7 +459,6 @@ async def trades_dialogue(
# money.. xb # money.. xb
for client in aioclients.values(): for client in aioclients.values():
for pos in client.positions(): for pos in client.positions():
cid, msg = pack_position(pos) cid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account] acctid = msg.account = accounts_def.inverse[msg.account]
used_accounts.add(acctid) used_accounts.add(acctid)
@ -399,9 +468,11 @@ async def trades_dialogue(
# update trades ledgers for all accounts from # update trades ledgers for all accounts from
# connected api clients. # connected api clients.
for account, proxy in proxies.items():
await update_ledger_from_api_trades( await update_ledger_from_api_trades(
proxies.values(), await proxy.trades(),
cids2pps, # pass these in to map to correct fqsns.. cids2pps, # pass these in to map to correct fqsns..
proxy,
) )
# load all positions from `pps.toml`, cross check with ib's # load all positions from `pps.toml`, cross check with ib's
@ -409,51 +480,9 @@ async def trades_dialogue(
for acctid, by_fqsn in pp.get_pps( for acctid, by_fqsn in pp.get_pps(
'ib', acctids=used_accounts, 'ib', acctids=used_accounts,
).items(): ).items():
for fqsn, posdict in by_fqsn.items():
ibppmsg = cids2pps[posdict['bsuid']]
msg = BrokerdPosition(
broker='ib',
# XXX: ok so this is annoying, we're relaying msgs = await update_and_audit(by_fqsn, cids2pps)
# an account name with the backend suffix prefixed all_positions.extend(msg.dict() for msg in msgs)
# but when reading accounts from ledgers we don't
# need it and/or it's prefixed in the section
# table..
account=ibppmsg.account,
# XXX: the `.ib` is stripped..?
symbol=ibppmsg.symbol,
currency=ibppmsg.currency,
size=posdict['size'],
avg_price=posdict['avg_price'],
)
print(msg)
ibsize = ibppmsg.size
pikersize = msg.size
diff = pikersize - ibsize
# if ib reports a lesser pp it's not as bad since we can
# presume we're at least not more in the shit then we
# thought.
if diff:
raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibsize}\n'
f'piker: {pikersize}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
)
msg.size = ibsize
if ibppmsg.avg_price != msg.avg_price:
# TODO: make this a "propoganda" log level?
log.warning(
'The mega-cucks at IB want you to believe with their '
f'"FIFO" positioning for {msg.symbol}:\n'
f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n'
f'piker, LIFO breakeven PnL price: {msg.avg_price}'
)
all_positions.append(msg.dict())
if not all_positions and cids2pps: if not all_positions and cids2pps:
raise RuntimeError( raise RuntimeError(
@ -621,6 +650,7 @@ async def deliver_trade_events(
continue continue
elif event_name == 'position': elif event_name == 'position':
cid, msg = pack_position(item) cid, msg = pack_position(item)
msg.account = accounts_def.inverse[msg.account] msg.account = accounts_def.inverse[msg.account]