Fixes for newly opened and closed pps

Before we weren't emitting pp msgs when a position went back to "net
zero" (aka the size is zero) nor when a new one was opened (wasn't
previously loaded from the `pps.toml`). This reworks a bunch of the
incremental update logic as well as ports to the changes in the
`piker.pp` module:

- rename a few of the normalizing helpers to be more explicit.
- drop calling `pp.get_pps()` in the trades dialog task and instead
  create msgs iteratively, per account, by iterating through collected
  position and API trade records and calling instead
  `pp.update_pps_conf()`.
- always from-ledger-update both positions reported from ib's pp sys and
  session api trades detected on ems-trade-dialog startup.
- `update_ledger_from_api_trades()` now does **just** that: only updates
  the trades ledger and returns the transaction set.
- `update_and_audit_msgs()` now only the input list of msgs and properly
  generates new msgs for newly created positions that weren't previously
  loaded from the `pps.toml`.
lifo_pps_ib
Tyler Goodlet 2022-06-22 18:18:02 -04:00
parent a12e6800ff
commit f9c4b3cc96
1 changed files with 156 additions and 101 deletions

View File

@ -286,8 +286,7 @@ async def update_ledger_from_api_trades(
trade_entries: list[dict[str, Any]], trade_entries: list[dict[str, Any]],
client: Union[Client, MethodProxy], client: Union[Client, MethodProxy],
) -> dict[str, Any]: ) -> dict[str, pp.Transaction]:
# construct piker pps from trade ledger, underneath using # construct piker pps from trade ledger, underneath using
# LIFO style breakeven pricing calcs. # LIFO style breakeven pricing calcs.
conf = get_config() conf = get_config()
@ -312,52 +311,47 @@ async def update_ledger_from_api_trades(
entry['listingExchange'] = pexch entry['listingExchange'] = pexch
records = trades_to_records( entries = trades_to_ledger_entries(
conf['accounts'].inverse, conf['accounts'].inverse,
trade_entries, trade_entries,
) )
actives = {}
# write recent session's trades to the user's (local) ledger file. # write recent session's trades to the user's (local) ledger file.
for acctid, trades_by_id in records.items(): records: dict[str, pp.Transactions] = {}
for acctid, trades_by_id in entries.items():
with pp.open_trade_ledger('ib', acctid) as ledger: with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id) ledger.update(trades_by_id)
# normalize # normalize to transaction form
records = norm_trade_records(trades_by_id) records[acctid] = norm_trade_records(trades_by_id)
# (incrementally) update the user's pps in mem and return records
# in the `pps.toml`.
active = pp.update_pps_conf('ib', acctid, records)
actives.update(active)
return actives
async def update_and_audit( async def update_and_audit_msgs(
acctid: str, acctid: str, # no `ib.` prefix is required!
by_fqsn: dict[str, pp.Position], pps: list[pp.Position],
cids2pps: dict[tuple[str, int], BrokerdPosition], cids2pps: dict[tuple[str, int], BrokerdPosition],
validate: bool = False, validate: bool = False,
) -> list[BrokerdPosition]: ) -> list[BrokerdPosition]:
msgs: list[BrokerdPosition] = [] msgs: list[BrokerdPosition] = []
pps: dict[int, pp.Position] = {} # pps: dict[int, pp.Position] = {}
for fqsn, p in by_fqsn.items(): for p in pps:
bsuid = p.bsuid bsuid = p.bsuid
# build trade-session-actor local table # build trade-session-actor local table
# of pps from unique symbol ids. # of pps from unique symbol ids.
pps[bsuid] = p # pps[bsuid] = p
# retreive equivalent ib reported position message # retreive equivalent ib reported position message
# for comparison/audit versus the piker equivalent # for comparison/audit versus the piker equivalent
# breakeven pp calcs. # breakeven pp calcs.
ibppmsg = cids2pps[(acctid, bsuid)] ibppmsg = cids2pps.get((acctid, bsuid))
if ibppmsg:
msg = BrokerdPosition( msg = BrokerdPosition(
broker='ib', broker='ib',
@ -402,6 +396,26 @@ async def update_and_audit(
f'piker, LIFO breakeven PnL price: {msg.avg_price}' f'piker, LIFO breakeven PnL price: {msg.avg_price}'
) )
else:
# make brand new message
msg = BrokerdPosition(
broker='ib',
# XXX: ok so this is annoying, we're relaying
# an account name with the backend suffix prefixed
# but when reading accounts from ledgers we don't
# need it and/or it's prefixed in the section
# table.. we should just strip this from the message
# right since `.broker` is already included?
account=f'ib.{acctid}',
# XXX: the `.ib` is stripped..?
symbol=p.symbol.front_fqsn(),
# currency=ibppmsg.currency,
size=p.size,
avg_price=p.be_price,
)
msgs.append(msg)
return msgs return msgs
@ -455,7 +469,7 @@ async def trades_dialogue(
accounts.add(account) accounts.add(account)
cids2pps: dict[str, BrokerdPosition] = {} cids2pps: dict[str, BrokerdPosition] = {}
active_accts: set[str] = set() update_records: dict[str, bidict] = {}
# process pp value reported from ib's system. we only use these # process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses # to cross-check sizing since average pricing on their end uses
@ -464,31 +478,57 @@ async def trades_dialogue(
# money.. xb # money.. xb
for client in aioclients.values(): for client in aioclients.values():
for pos in client.positions(): for pos in client.positions():
cid, msg = pack_position(pos) cid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account] acctid = msg.account = accounts_def.inverse[msg.account]
active_accts.add(acctid) acctid = acctid.strip('ib.')
cids2pps[(acctid.strip('ib.'), cid)] = msg cids2pps[(acctid, cid)] = msg
assert msg.account in accounts, ( assert msg.account in accounts, (
f'Position for unknown account: {msg.account}') f'Position for unknown account: {msg.account}')
# collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
update_records.setdefault(acctid, bidict())[cid] = msg.symbol
# update trades ledgers for all accounts from # update trades ledgers for all accounts from
# connected api clients. # connected api clients which report trades for **this session**.
new_trades = {}
for account, proxy in proxies.items(): for account, proxy in proxies.items():
trades = await proxy.trades() trades = await proxy.trades()
if trades: new_trades.update(await update_ledger_from_api_trades(
await update_ledger_from_api_trades(
trades, trades,
proxy, proxy,
) ))
for acctid, trans in new_trades.items():
for t in trans:
bsuid = t.bsuid
if bsuid in update_records:
assert update_records[bsuid] == t.fqsn
else:
update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn
# load all positions from `pps.toml`, cross check with ib's # load all positions from `pps.toml`, cross check with ib's
# positions data, and relay re-formatted pps as msgs to the ems. # positions data, and relay re-formatted pps as msgs to the ems.
pps_by_account = pp.get_pps('ib', acctids=active_accts) # __2 cases__:
# - new trades have taken place this session that we want to
for acctid, by_fqsn in pps_by_account.items(): # always reprocess indempotently,
msgs = await update_and_audit( # - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
for acctid, to_update in update_records.items():
trans = new_trades.get(acctid)
active, closed = pp.update_pps_conf(
'ib',
acctid, acctid,
by_fqsn, trade_records=trans,
ledger_reload=to_update,
)
for pps in [active, closed]:
msgs = await update_and_audit_msgs(
acctid,
pps.values(),
cids2pps, cids2pps,
validate=True, validate=True,
) )
@ -496,7 +536,9 @@ async def trades_dialogue(
if not all_positions and cids2pps: if not all_positions and cids2pps:
raise RuntimeError( raise RuntimeError(
'Positions report by ib but not found in `pps.toml` !?') 'Positions reported by ib but not found in `pps.toml`!?\n'
f'{pformat(cids2pps)}'
)
# log.info(f'Loaded {len(trades)} from this session') # log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml`` # TODO: write trades to local ``trades.toml``
@ -543,26 +585,39 @@ async def emit_pp_update(
# compute and relay incrementally updated piker pp # compute and relay incrementally updated piker pp
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
proxy = proxies[acctid] proxy = proxies[acctid]
await update_ledger_from_api_trades(
acctname = acctid.strip('ib.')
records = (await update_ledger_from_api_trades(
[trade_entry], [trade_entry],
proxy, proxy,
) ))[acctname]
# load all positions from `pps.toml`, cross check with r = records[0]
# ib's positions data, and relay re-formatted pps as
# msgs to the ems.
by_acct = pp.get_pps('ib', acctids={acctid})
acctname = acctid.strip('ib.')
by_fqsn = by_acct[acctname]
for fqsn, p in by_fqsn.items(): # update and load all positions from `pps.toml`, cross check with
if p.bsuid == trade_entry['contract']['conId']: # ib's positions data, and relay re-formatted pps as msgs to the
# should only be one right? # ems. we report both the open and closed updates in one map since
msgs = await update_and_audit( # for incremental update we may have just fully closed a pp and need
# to relay that msg as well!
active, closed = pp.update_pps_conf(
'ib',
acctname, acctname,
{fqsn: p}, trade_records=records,
ledger_reload={r.bsuid: r.fqsn},
)
for pos in filter(
bool,
[active.get(r.bsuid), closed.get(r.bsuid)]
):
msgs = await update_and_audit_msgs(
acctname,
[pos],
cids2pps, cids2pps,
# ib pp event might not have arrived yet
validate=False, validate=False,
) )
if msgs:
msg = msgs[0] msg = msgs[0]
break break
@ -669,7 +724,7 @@ async def deliver_trade_events(
# TODO: # TODO:
# - normalize out commissions details? # - normalize out commissions details?
# - this is the same as the unpacking loop above in # - this is the same as the unpacking loop above in
# ``trades_to_records()`` no? # ``trades_to_ledger_entries()`` no?
trade_entry = ids2fills.setdefault(execid, {}) trade_entry = ids2fills.setdefault(execid, {})
cost_already_rx = bool(trade_entry) cost_already_rx = bool(trade_entry)
@ -800,7 +855,7 @@ async def deliver_trade_events(
def norm_trade_records( def norm_trade_records(
ledger: dict[str, Any], ledger: dict[str, Any],
) -> dict[str, list[pp.Transaction]]: ) -> list[pp.Transaction]:
''' '''
Normalize a flex report or API retrieved executions Normalize a flex report or API retrieved executions
ledger into our standard record format. ledger into our standard record format.
@ -899,7 +954,7 @@ def norm_trade_records(
return records return records
def trades_to_records( def trades_to_ledger_entries(
accounts: bidict, accounts: bidict,
trade_entries: list[object], trade_entries: list[object],
source_type: str = 'api', source_type: str = 'api',
@ -1026,7 +1081,7 @@ def load_flex_trades(
# log.info(f'Loaded {ln} trades from flex query') # log.info(f'Loaded {ln} trades from flex query')
print(f'Loaded {ln} trades from flex query') print(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_records( trades_by_account = trades_to_ledger_entries(
# get reverse map to user account names # get reverse map to user account names
conf['accounts'].inverse, conf['accounts'].inverse,
trade_entries, trade_entries,