Add expiry and datetime support to ledger parsing

lifo_pps_ib
Tyler Goodlet 2022-06-18 15:53:56 -04:00
parent c617a06905
commit bfad676b7c
1 changed files with 35 additions and 13 deletions

View File

@ -370,7 +370,7 @@ async def update_and_audit(
symbol=ibppmsg.symbol, symbol=ibppmsg.symbol,
currency=ibppmsg.currency, currency=ibppmsg.currency,
size=p.size, size=p.size,
avg_price=p.avg_price, avg_price=p.be_price,
) )
msgs.append(msg) msgs.append(msg)
@ -474,6 +474,7 @@ async def trades_dialogue(
# connected api clients. # connected api clients.
for account, proxy in proxies.items(): for account, proxy in proxies.items():
trades = await proxy.trades() trades = await proxy.trades()
if trades:
await update_ledger_from_api_trades( await update_ledger_from_api_trades(
trades, trades,
proxy, proxy,
@ -481,10 +482,9 @@ async def trades_dialogue(
# load all positions from `pps.toml`, cross check with ib's # load all positions from `pps.toml`, cross check with ib's
# positions data, and relay re-formatted pps as msgs to the ems. # positions data, and relay re-formatted pps as msgs to the ems.
for acctid, by_fqsn in pp.get_pps( pps_by_account = pp.get_pps('ib', acctids=active_accts)
'ib', acctids=active_accts,
).items():
for acctid, by_fqsn in pps_by_account.items():
msgs = await update_and_audit(by_fqsn, cids2pps, validate=True) msgs = await update_and_audit(by_fqsn, cids2pps, validate=True)
all_positions.extend(msg.dict() for msg in msgs) all_positions.extend(msg.dict() for msg in msgs)
@ -835,6 +835,20 @@ def norm_trade_records(
print(f'skipping opts contract {symbol}') print(f'skipping opts contract {symbol}')
continue continue
# timestamping is way different in API records
date = record.get('date')
if not date:
# probably a flex record with a wonky non-std timestamp..
date, ts = record['dateTime'].split(';')
dt = pendulum.parse(date)
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
tsdt = pendulum.parse(ts)
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
else:
epoch_dt = pendulum.from_timestamp(record.get('time'))
dt = pendulum.parse(date)
# special handling of symbol extraction from # special handling of symbol extraction from
# flex records using some ad-hoc schema parsing. # flex records using some ad-hoc schema parsing.
instr = record.get('assetCategory') instr = record.get('assetCategory')
@ -847,17 +861,23 @@ def norm_trade_records(
if expiry: if expiry:
expiry = str(expiry).strip(' ') expiry = str(expiry).strip(' ')
suffix = f'{exch}.{expiry}' suffix = f'{exch}.{expiry}'
expiry = pendulum.parse(expiry)
fqsn = Symbol.from_fqsn( fqsn = Symbol.from_fqsn(
fqsn=f'{symbol}.{suffix}.ib', fqsn=f'{symbol}.{suffix}.ib',
info={}, info={},
).front_fqsn().rstrip('.ib') ).front_fqsn().rstrip('.ib')
# NOTE: for flex records the normal fields won't be available so # NOTE: for flex records the normal fields for defining an fqsn
# we have to do a lookup at some point to reverse map the conid # sometimes won't be available so we rely on two approaches for
# to a fqsn? # the "reverse lookup" of piker style fqsn keys:
# - when dealing with API trade records received from
# con = await proxy.get_con(conid) # `IB.trades()` we do a contract lookup at he time of processing
# - when dealing with flex records, it is assumed the record
# is at least a day old and thus the TWS position reporting system
# should already have entries if the pps are still open, in
# which case, we can pull the fqsn from that table (see
# `trades_dialogue()` above).
records.append(pp.Transaction( records.append(pp.Transaction(
fqsn=fqsn, fqsn=fqsn,
@ -865,6 +885,8 @@ def norm_trade_records(
size=size, size=size,
price=price, price=price,
cost=comms, cost=comms,
dt=dt,
expiry=expiry,
bsuid=conid, bsuid=conid,
)) ))