Many, many `ib` trade log schema hackz

I don't want to rant too much any more since it's pretty clear `ib` has
either zero concern for its (api) user's or a severely terrible data
management team and/or general inter-team coordination system, but this
patch more or less hacks the flex report records to be similar enough to
API "execution" / "fill" records such that they can be similarly
normalized and stored as well as processed for position calculations..

Dirty deats,
- use the `IB.fills()` method for pulling current session trade events
  since it's both recommended in the docs and does seem to capture
  more extensive meta-data.
- add a `update_ledger_from_api()` helper which does all the insane work
  of making sure api trade entries are usable both within piker's global
  fqsn system but also compatible with incremental updates of positions
  computed from trade ledgers derived from ib's "flex reports".
- add "auditting" of `ib`'s reported positioning API messages by
  comparison with piker's new "traders first" breakeven price style and
  complain via logging on mismatches.
- handle buy vs. sell arithmetic (via a +ve or -ve multiplier) to make
  "size" arithmetic work for API trade entries..
- draft out options contract transaction parsing but skip in pps
  generation for now.
- always use the "execution id" as ledger keys both in flex and api
  trade processing.
- for whatever weird reason `ib_insync` doesn't include the so called
  "primary exchange" in contracts reported in fill events, so do manual
  contract lookups in such cases such that pps entries can be placed
  in the right fqsn section...

Still ToDo:
- incremental update on trade clears / position updates
- pps audit from ledger depending on user config?
lifo_pps_ib
Tyler Goodlet 2022-06-14 16:23:46 -04:00
parent 05a1a4e3d8
commit 82b718d5a3
1 changed files with 198 additions and 88 deletions

View File

@ -26,6 +26,7 @@ from typing import (
Any,
Optional,
AsyncIterator,
Union,
)
from bidict import bidict
@ -48,8 +49,7 @@ from ib_insync.objects import Position
import pendulum
from piker import config
from piker.pp import update_pps_conf
from piker.pp import TradeRecord
from piker import pp
from piker.log import get_console_log
from piker.clearing._messages import (
BrokerdOrder,
@ -68,6 +68,7 @@ from .api import (
get_config,
open_client_proxies,
Client,
MethodProxy,
)
# from .feed import open_data_client
@ -87,27 +88,30 @@ def pack_position(
symbol = con.symbol.lower()
exch = (con.primaryExchange or con.exchange).lower()
symkey = '.'.join((symbol, exch))
fqsn = '.'.join((symbol, exch))
if not exch:
# attempt to lookup the symbol from our
# hacked set..
for sym in _adhoc_futes_set:
if symbol in sym:
symkey = sym
fqsn = sym
break
expiry = con.lastTradeDateOrContractMonth
if expiry:
symkey += f'.{expiry}'
fqsn += f'.{expiry}'
# TODO: options contracts into a sane format..
return BrokerdPosition(
broker='ib',
account=pos.account,
symbol=symkey,
currency=con.currency,
size=float(pos.position),
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
return (
con.conId,
BrokerdPosition(
broker='ib',
account=pos.account,
symbol=fqsn,
currency=con.currency,
size=float(pos.position),
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
),
)
@ -262,6 +266,70 @@ async def recv_trade_updates(
await client.ib.disconnectedEvent
async def update_ledger_from_api_trades(
clients: list[Union[Client, MethodProxy]],
ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg
) -> dict[str, Any]:
# construct piker pps from trade ledger, underneath using
# LIFO style breakeven pricing calcs.
conf = get_config()
# retreive new trade executions from the last session
# and/or day's worth of trading and convert into trade
# records suitable for a local ledger file.
trades_by_account: dict = {}
for client in clients:
trade_entries = await client.trades()
# XXX; ERRGGG..
# pack in the "primary/listing exchange" value from a
# contract lookup since it seems this isn't available by
# default from the `.fills()` method endpoint...
for entry in trade_entries:
condict = entry['contract']
conid = condict['conId']
pexch = condict['primaryExchange']
if not pexch:
con = (await client.get_con(conid=conid))[0]
pexch = con.primaryExchange
entry['listingExchange'] = pexch
records = trades_to_records(
conf['accounts'].inverse,
trade_entries,
)
trades_by_account.update(records)
# write recent session's trades to the user's (local) ledger file.
for acctid, trades_by_id in trades_by_account.items():
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
# (incrementally) update the user's pps in mem and
# in the `pps.toml`.
records = norm_trade_records(trades_by_id)
# remap stupid ledger fqsns (which are often
# filled with lesser venue/exchange values) to
# the ones we pull from the API via ib's reported
# positioning messages.
for r in records:
normed_msg = ib_pp_msgs[r.bsuid]
if normed_msg.symbol != r.fqsn:
log.warning(
f'Remapping ledger fqsn: {r.fqsn} -> {normed_msg.symbol}'
)
r.fqsn = normed_msg.symbol
pp.update_pps_conf('ib', acctid, records)
@tractor.context
async def trades_dialogue(
@ -311,7 +379,7 @@ async def trades_dialogue(
assert account in accounts_def
accounts.add(account)
pp_msgs = {}
cids2pps = {}
# process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses
@ -320,65 +388,65 @@ async def trades_dialogue(
# money.. xb
for client in aioclients.values():
for pos in client.positions():
msg = pack_position(pos)
msg.account = accounts_def.inverse[msg.account]
pp_msgs[msg.symbol] = msg
cid, msg = pack_position(pos)
msg.account = accounts_def.inverse[msg.account]
cids2pps[cid] = msg
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
# built-out piker pps from trade ledger, underneath using
# LIFO style breakeven pricing calcs.
trades_by_account: dict = {}
conf = get_config()
# update trades ledgers for all accounts from
# connected api clients.
await update_ledger_from_api_trades(
proxies.values(),
cids2pps, # pass these in to map to correct fqsns..
)
# retreive new trade executions from the last session
# and/or day's worth of trading and convert into trade
# records suitable for a local ledger file.
for proxy in proxies.values():
trade_entries = await proxy.trades()
records = trades_to_records(
conf['accounts'].inverse,
trade_entries,
)
trades_by_account.update(records)
# write recent session's trades to the user's (local) ledger
# file.
for acctid, trades_by_id in trades_by_account.items():
with config.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
# (incrementally) update the user's pps in mem and
# in the `pps.toml`.
records = norm_trade_records(trades_by_id)
active = update_pps_conf('ib', acctid, records)
# relay re-formatted pps as msgs to the ems.
for fqsn, pp in active.items():
ibppmsg = pp_msgs[fqsn.rstrip('.ib')]
# load all positions from `pps.toml`, cross check with ib's
# positions data, and relay re-formatted pps as msgs to the ems.
for acctid, by_fqsn in pp.get_pps('ib').items():
for fqsn, posdict in by_fqsn.items():
ibppmsg = cids2pps[posdict['bsuid']]
msg = BrokerdPosition(
broker='ib',
# account=acctid + '.ib',
# XXX: ok so this is annoying, we're relaying
# an account name with the backend suffix prefixed
# but when reading accounts from ledgers
# but when reading accounts from ledgers we don't
# need it and/or it's prefixed in the section
# table..
account=ibppmsg.account,
# XXX: the `.ib` is stripped..?
symbol=ibppmsg.symbol,
currency=ibppmsg.currency,
size=pp['size'],
avg_price=pp['avg_price'],
size=posdict['size'],
avg_price=posdict['avg_price'],
)
assert ibppmsg.size == msg.size
print(msg)
ibsize = ibppmsg.size
pikersize = msg.size
diff = pikersize - ibsize
# if ib reports a lesser pp it's not as bad since we can
# presume we're at least not more in the shit then we
# thought.
if diff:
raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibsize}\n'
f'piker: {pikersize}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
)
msg.size = ibsize
if ibppmsg.avg_price != msg.avg_price:
# TODO: make this a "propoganda" log level?
log.warning(
'The mega-cucks at IB want you to believe with their '
'"FIFO" positioning the following:\n'
f'"FIFO" positioning for {msg.symbol}:\n'
f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n'
f'piker, legitamous-ness, LIFO avg price: {msg.avg_price}'
f'piker, LIFO breakeven PnL price: {msg.avg_price}'
)
all_positions.append(msg.dict())
@ -545,7 +613,7 @@ async def deliver_trade_events(
continue
elif event_name == 'position':
msg = pack_position(item)
cid, msg = pack_position(item)
msg.account = accounts_def.inverse[msg.account]
elif event_name == 'event':
@ -579,25 +647,49 @@ async def deliver_trade_events(
def norm_trade_records(
ledger: dict[str, Any],
) -> dict[str, list[TradeRecord]]:
) -> dict[str, list[pp.Transaction]]:
'''
Normalize a flex report or API retrieved executions
ledger into our standard record format.
'''
records: list[TradeRecord] = []
# async with open_data_client() as proxy:
records: list[pp.Transaction] = []
for tid, record in ledger.items():
# date, time = record['dateTime']
# cost = record['cost']
# action = record['buySell']
conid = record.get('conId') or record['conid']
comms = record.get('ibCommission', 0)
comms = record.get('commission') or -1*record['ibCommission']
price = record.get('price') or record['tradePrice']
size = record.get('shares') or record['quantity']
# the api doesn't do the -/+ on the quantity for you but flex
# records do.. are you fucking serious ib...!?
size = record.get('quantity') or record['shares'] * {
'BOT': 1,
'SLD': -1,
}[record['side']]
exch = record['exchange']
lexch = record.get('listingExchange')
suffix = lexch or exch
symbol = record['symbol']
# likely an opts contract record from a flex report..
# TODO: no idea how to parse ^ the strike part from flex..
# (00010000 any, or 00007500 tsla, ..)
# we probably must do the contract lookup for this?
if ' ' in symbol or '--' in exch:
underlying, _, tail = symbol.partition(' ')
suffix = exch = 'opt'
expiry = tail[:6]
# otype = tail[6]
# strike = tail[7:]
print(f'skipping opts contract {symbol}')
continue
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
instr = record.get('assetCategory')
@ -605,15 +697,16 @@ def norm_trade_records(
symbol = record['description'][:3]
# try to build out piker fqsn from record.
expiry = record.get('lastTradeDateOrContractMonth') or record['expiry']
exch = record.get('listingExchange') or record['exchange']
expiry = record.get(
'lastTradeDateOrContractMonth') or record.get('expiry')
if expiry:
expiry = str(expiry).strip(' ')
suffix = f'{exch}.{expiry}'
fqsn = Symbol.from_broker_info(
broker='ib',
symbol=symbol,
suffix=f'{exch}.{expiry}',
fqsn = Symbol.from_fqsn(
fqsn=f'{symbol}.{suffix}.ib',
info={},
).front_fqsn()
).front_fqsn().rstrip('.ib')
# NOTE: for flex records the normal fields won't be available so
# we have to do a lookup at some point to reverse map the conid
@ -621,41 +714,50 @@ def norm_trade_records(
# con = await proxy.get_con(conid)
records.append(TradeRecord(
records.append(pp.Transaction(
fqsn=fqsn,
tid=tid,
size=size,
price=price,
cost=comms,
symkey=conid,
bsuid=conid,
))
return records
def trades_to_records(
accounts: bidict,
trade_entries: list[object],
source_type: str = 'api',
) -> dict:
'''
Convert either of API execution objects or flex report
entry objects into ``dict`` form, pretty much straight up
without modification.
'''
trades_by_account = {}
for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave a table key as an `int`? So i guess
# cast to strs for all keys..
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave this as an ``int``?
tid = str(entry['tradeID'])
tid = str(entry.get('ibExecID') or entry['tradeID'])
# date = str(entry['tradeDate'])
# XXX: is it going to cause problems if a account name
# get's lost? The user should be able to find it based
# on the actual exec history right?
acctid = accounts[str(entry['accountId'])]
elif source_type == 'api':
@ -667,17 +769,19 @@ def trades_to_records(
# 'time': 1654801166.0
# }
# flatten all sub-dicts and values into one top level entry.
entry = {}
for section, obj in t.items():
for section, val in t.items():
match section:
case 'commisionReport' | 'execution':
entry.update(asdict(obj))
case 'contract':
entry.update(obj)
case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases
entry.update(val)
case _:
entry[section] = val
tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
# TODO: why isn't this showing seconds in the str?
entry['date'] = str(dt)
acctid = accounts[entry['acctNumber']]
@ -691,7 +795,7 @@ def trades_to_records(
def load_flex_trades(
path: Optional[str] = None,
) -> dict[str, str]:
) -> dict[str, Any]:
from ib_insync import flexreport, util
@ -704,10 +808,10 @@ def load_flex_trades(
token = conf.get('flex_token')
if not token:
raise ValueError(
'You must specify a ``flex_token`` field in your'
'`brokers.toml` in order load your trade log, see our'
'intructions for how to set this up here:\n'
'PUT LINK HERE!'
'You must specify a ``flex_token`` field in your'
'`brokers.toml` in order load your trade log, see our'
'intructions for how to set this up here:\n'
'PUT LINK HERE!'
)
qid = conf['flex_trades_query_id']
@ -728,6 +832,10 @@ def load_flex_trades(
report = flexreport.FlexReport(path=path)
trade_entries = report.extract('Trade')
ln = len(trade_entries)
# log.info(f'Loaded {ln} trades from flex query')
print(f'Loaded {ln} trades from flex query')
trades_by_account = trades_to_records(
# get reverse map to user account names
conf['accounts'].inverse,
@ -735,13 +843,15 @@ def load_flex_trades(
source_type='flex',
)
# ln = len(trades)
# log.info(f'Loaded {ln} trades from flex query')
ledgers = {}
for acctid, trades_by_id in trades_by_account.items():
with config.open_trade_ledger('ib', acctid) as ledger:
with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
ledgers[acctid] = ledger
return ledgers
if __name__ == '__main__':
import sys