`ib` rt pps update hotfix..

Not sure this didn't get caught in usage, but basically real-time
updates got broken by a rework of `update_ledger_from_api_trades()`.
The issue is that the ledger was being updated **before** calling
`piker.pp.update_pps_conf()` which resulted in the `Position.size`
not being updated correctly since the [latest added] clears passed
in via the `trade_records` arg were already found in the `.clears` table
and thus were causing the loop to skip the `Position.lifo_update()`
call..

The solution here is to not update the ledger **until after** we call
`update_pps_conf()` - it's more read/writes but it's correct and we
figure out a less io heavy way to do the file writing later.

Further this includes a fix to avoid double emitting a pp update caused
by non-thorough logic that waits for a commission report to arrive
during a fill event; previously we were emitting the same message twice
due to the lack of a check for an existing comms report in the case
where the report arrives *after* the fill.
ib_rt_pp_update_hotfix
Tyler Goodlet 2022-07-05 16:09:32 -04:00
parent 479ad1bb15
commit 95dd0e6bd6
1 changed files with 47 additions and 17 deletions

View File

@ -295,7 +295,10 @@ async def update_ledger_from_api_trades(
trade_entries: list[dict[str, Any]], trade_entries: list[dict[str, Any]],
client: Union[Client, MethodProxy], client: Union[Client, MethodProxy],
) -> dict[str, pp.Transaction]: ) -> tuple[
dict[str, pp.Transaction],
dict[str, dict],
]:
conf = get_config() conf = get_config()
@ -326,14 +329,12 @@ async def update_ledger_from_api_trades(
# write recent session's trades to the user's (local) ledger file. # write recent session's trades to the user's (local) ledger file.
records: dict[str, pp.Transactions] = {} records: dict[str, pp.Transactions] = {}
for acctid, trades_by_id in entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
for acctid, trades_by_id in entries.items():
# normalize to transaction form # normalize to transaction form
records[acctid] = norm_trade_records(trades_by_id) records[acctid] = norm_trade_records(trades_by_id)
return records return records, entries
async def update_and_audit_msgs( async def update_and_audit_msgs(
@ -518,10 +519,14 @@ async def trades_dialogue(
new_trades = {} new_trades = {}
for account, proxy in proxies.items(): for account, proxy in proxies.items():
trades = await proxy.trades() trades = await proxy.trades()
new_trades.update(await update_ledger_from_api_trades( (
records_by_acct,
ledger_entries,
) = await update_ledger_from_api_trades(
trades, trades,
proxy, proxy,
)) )
new_trades.update(records_by_acct)
for acctid, trans in new_trades.items(): for acctid, trans in new_trades.items():
for t in trans: for t in trans:
@ -573,6 +578,16 @@ async def trades_dialogue(
tuple(name for name in accounts_def if name in accounts), tuple(name for name in accounts_def if name in accounts),
)) ))
# TODO: maybe just write on teardown?
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
# write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
@ -609,10 +624,11 @@ async def emit_pp_update(
proxy = proxies[acctid] proxy = proxies[acctid]
acctname = acctid.strip('ib.') acctname = acctid.strip('ib.')
records = (await update_ledger_from_api_trades( records_by_acct, ledger_entries = await update_ledger_from_api_trades(
[trade_entry], [trade_entry],
proxy, proxy,
))[acctname] )
records = records_by_acct[acctname]
r = records[0] r = records[0]
# update and load all positions from `pps.toml`, cross check with # update and load all positions from `pps.toml`, cross check with
@ -627,6 +643,12 @@ async def emit_pp_update(
ledger_reload={r.bsuid: r.fqsn}, ledger_reload={r.bsuid: r.fqsn},
) )
# NOTE: write ledger with all new trades **AFTER** we've updated the
# `pps.toml` from the original ledger state!
for acctid, trades_by_id in ledger_entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
for pos in filter( for pos in filter(
bool, bool,
[active.get(r.bsuid), closed.get(r.bsuid)] [active.get(r.bsuid), closed.get(r.bsuid)]
@ -760,7 +782,7 @@ async def deliver_trade_events(
{ {
'contract': asdict(fill.contract), 'contract': asdict(fill.contract),
'execution': asdict(fill.execution), 'execution': asdict(fill.execution),
'commissionReport': asdict(fill.commissionReport), # 'commissionReport': asdict(fill.commissionReport),
# supposedly server fill time? # supposedly server fill time?
'broker_time': execu.time, 'broker_time': execu.time,
'name': 'ib', 'name': 'ib',
@ -813,6 +835,15 @@ async def deliver_trade_events(
trade_entry = ids2fills.setdefault(execid, {}) trade_entry = ids2fills.setdefault(execid, {})
fill_already_rx = bool(trade_entry) fill_already_rx = bool(trade_entry)
# only fire a pp msg update if,
# - we haven't already
# - the fill event has already arrived
# but it didn't yet have a commision report
# which we fill in now.
if (
fill_already_rx
and 'commissionReport' not in trade_entry
):
# no fill msg has arrived yet so just fill out the # no fill msg has arrived yet so just fill out the
# cost report for now and when the fill arrives a pp # cost report for now and when the fill arrives a pp
# msg can be emitted. # msg can be emitted.
@ -820,7 +851,6 @@ async def deliver_trade_events(
{'commissionReport': asdict(cr)} {'commissionReport': asdict(cr)}
) )
if fill_already_rx:
await emit_pp_update( await emit_pp_update(
ems_stream, ems_stream,
trade_entry, trade_entry,