From bfad676b7c44a63df39974a3222a7cf83cfe8d6b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 18 Jun 2022 15:53:56 -0400 Subject: [PATCH] Add expiry and datetime support to ledger parsing --- piker/brokers/ib/broker.py | 48 +++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index eac7a8a4..09922055 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -370,7 +370,7 @@ async def update_and_audit( symbol=ibppmsg.symbol, currency=ibppmsg.currency, size=p.size, - avg_price=p.avg_price, + avg_price=p.be_price, ) msgs.append(msg) @@ -474,17 +474,17 @@ async def trades_dialogue( # connected api clients. for account, proxy in proxies.items(): trades = await proxy.trades() - await update_ledger_from_api_trades( - trades, - proxy, - ) + if trades: + await update_ledger_from_api_trades( + trades, + proxy, + ) # load all positions from `pps.toml`, cross check with ib's # positions data, and relay re-formatted pps as msgs to the ems. - for acctid, by_fqsn in pp.get_pps( - 'ib', acctids=active_accts, - ).items(): + pps_by_account = pp.get_pps('ib', acctids=active_accts) + for acctid, by_fqsn in pps_by_account.items(): msgs = await update_and_audit(by_fqsn, cids2pps, validate=True) all_positions.extend(msg.dict() for msg in msgs) @@ -835,6 +835,20 @@ def norm_trade_records( print(f'skipping opts contract {symbol}') continue + # timestamping is way different in API records + date = record.get('date') + if not date: + # probably a flex record with a wonky non-std timestamp.. + date, ts = record['dateTime'].split(';') + dt = pendulum.parse(date) + ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' + tsdt = pendulum.parse(ts) + dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) + + else: + epoch_dt = pendulum.from_timestamp(record.get('time')) + dt = pendulum.parse(date) + # special handling of symbol extraction from # flex records using some ad-hoc schema parsing. instr = record.get('assetCategory') @@ -847,17 +861,23 @@ def norm_trade_records( if expiry: expiry = str(expiry).strip(' ') suffix = f'{exch}.{expiry}' + expiry = pendulum.parse(expiry) fqsn = Symbol.from_fqsn( fqsn=f'{symbol}.{suffix}.ib', info={}, ).front_fqsn().rstrip('.ib') - # NOTE: for flex records the normal fields won't be available so - # we have to do a lookup at some point to reverse map the conid - # to a fqsn? - - # con = await proxy.get_con(conid) + # NOTE: for flex records the normal fields for defining an fqsn + # sometimes won't be available so we rely on two approaches for + # the "reverse lookup" of piker style fqsn keys: + # - when dealing with API trade records received from + # `IB.trades()` we do a contract lookup at he time of processing + # - when dealing with flex records, it is assumed the record + # is at least a day old and thus the TWS position reporting system + # should already have entries if the pps are still open, in + # which case, we can pull the fqsn from that table (see + # `trades_dialogue()` above). records.append(pp.Transaction( fqsn=fqsn, @@ -865,6 +885,8 @@ def norm_trade_records( size=size, price=price, cost=comms, + dt=dt, + expiry=expiry, bsuid=conid, ))