ib: rework trade handling, take ib position sizes as gospel

Instead of casting to `dict`s and rewriting event names in the
`push_tradesies()` handler, be transparent with event names (also
defining and piker-equivalent mapping them in a redefined `_statuses`
table) and types
passing them directly to the `deliver_trade_events()` task and generally
make event handler blocks much easier to grok with type annotations. To
deal with the causality dilemma of *when to emit a pos msg* due to
needing all of `execDetailsEvent, commissionReportEvent, positionEvent`
but having no guarantee on received order, we implement a small task
`clears: dict[Contract, tuple[Position, Fill]]` tracker table and (as
before) only emit a position event once the "cost" can be accessed for
the fill. We now ALWAYS relay any `Position` update from IB directly to
ensure (at least) the cumsize is correct (since it appears we still have
ongoing issues with computing this correctly via `.accounting.Position`
updates..).

Further related adjustments:
- load (fiat) balances and startup positions into a new `IbAcnt` struct.
- change `update_and_audit_pos_msg()` to blindly forward ib position
  event updates for the **the size** since it should always be
  considered the true gospel for accounting!
  - drop ib-has-no-position handling since it should never occur..
- move `update_ledger_from_api_trades()` to the `.ledger` submod and do
  processing of ib_insync `Fill` related objects instead of dict-casted
  versions instead doing the casting in
  `api_trades_to_ledger_entries()`.
- `norm_trade()`: add `symcache.mktmaps[bs_mktid] = mkt` in since it
  turns out API (and sometimes FLEX) records don't contain the listing
  exchange/venue thus making it impossible to map an asset pair in the
  "position sense" (i.e. over multiple venues: qqq.nasdaq, qqq.arca,
  qqq.directedge) to an fqme when doing offline ledger processing;
  instead use frickin IB's internal int-id so there's no discrepancy.
  - also much better handle futures mkt trade flex records such that
    parsed `MktPair.fqme` is consistent.
account_tests
Tyler Goodlet 2023-07-25 18:03:32 -04:00
parent b33be86b2f
commit e344bdbf1b
2 changed files with 719 additions and 574 deletions

File diff suppressed because it is too large Load Diff

View File

@ -18,17 +18,26 @@
Trade transaction accounting and normalization. Trade transaction accounting and normalization.
''' '''
from __future__ import annotations
from bisect import insort from bisect import insort
from dataclasses import asdict
from decimal import Decimal from decimal import Decimal
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
Callable, Callable,
TYPE_CHECKING,
) )
from bidict import bidict from bidict import bidict
import pendulum import pendulum
from ib_insync.objects import (
Contract,
Fill,
Execution,
CommissionReport,
)
from piker.data import ( from piker.data import (
Struct, Struct,
@ -45,6 +54,12 @@ from piker.accounting import (
from ._flex_reports import parse_flex_dt from ._flex_reports import parse_flex_dt
from ._util import log from ._util import log
if TYPE_CHECKING:
from .api import (
Client,
MethodProxy,
)
tx_sort: Callable = partial( tx_sort: Callable = partial(
iter_by_dt, iter_by_dt,
@ -71,7 +86,8 @@ def norm_trade(
) -> Transaction | None: ) -> Transaction | None:
conid = record.get('conId') or record['conid'] conid: int = str(record.get('conId') or record['conid'])
bs_mktid: str = str(conid)
comms = record.get('commission') comms = record.get('commission')
if comms is None: if comms is None:
comms = -1*record['ibCommission'] comms = -1*record['ibCommission']
@ -86,7 +102,11 @@ def norm_trade(
}[record['side']] }[record['side']]
symbol: str = record['symbol'] symbol: str = record['symbol']
exch: str = record.get('listingExchange') or record['exchange'] exch: str = (
record.get('listingExchange')
or record.get('primaryExchange')
or record['exchange']
)
# NOTE: remove null values since `tomlkit` can't serialize # NOTE: remove null values since `tomlkit` can't serialize
# them to file. # them to file.
@ -156,11 +176,31 @@ def norm_trade(
match asset_type: match asset_type:
case 'FUT': case 'FUT':
# (flex) ledger entries don't have any simple 3-char key? # XXX (flex) ledger entries don't necessarily have any
# TODO: XXX: WOA this is kinda hacky.. probably # simple 3-char key.. sometimes the .symbol is some
# should figure out the correct future pair key more # weird internal key that we probably don't want in the
# explicitly and consistently? # .fqme => we should probably just wrap `Contract` to
symbol: str = symbol[:3] # this like we do other crypto$ backends XD
# NOTE: at least older FLEX records should have
# this field.. no idea about API entries..
local_symbol: str | None = record.get('localSymbol')
underlying_key: str = record.get('underlyingSymbol')
descr: str | None = record.get('description')
if (
not (
local_symbol
and symbol in local_symbol
)
and (
descr
and symbol not in descr
)
):
con_key, exp_str = descr.split(' ')
symbol: str = underlying_key or con_key
dst = Asset( dst = Asset(
name=symbol.lower(), name=symbol.lower(),
atype='future', atype='future',
@ -206,8 +246,9 @@ def norm_trade(
# NOTE: can't serlialize `tomlkit.String` so cast to native # NOTE: can't serlialize `tomlkit.String` so cast to native
atype: str = str(dst.atype) atype: str = str(dst.atype)
# if not (mkt := symcache.mktmaps.get(bs_mktid)):
mkt = MktPair( mkt = MktPair(
bs_mktid=str(conid), bs_mktid=bs_mktid,
dst=dst, dst=dst,
price_tick=price_tick, price_tick=price_tick,
@ -232,7 +273,21 @@ def norm_trade(
# XXX: if passed in, we fill out the symcache ad-hoc in order # XXX: if passed in, we fill out the symcache ad-hoc in order
# to make downstream accounting work.. # to make downstream accounting work..
if symcache: if symcache is not None:
orig_mkt: MktPair | None = symcache.mktmaps.get(bs_mktid)
if (
orig_mkt
and orig_mkt.fqme != mkt.fqme
):
log.warning(
# print(
f'Contracts with common `conId`: {bs_mktid} mismatch..\n'
f'{orig_mkt.fqme} -> {mkt.fqme}\n'
# 'with DIFF:\n'
# f'{mkt - orig_mkt}'
)
symcache.mktmaps[bs_mktid] = mkt
symcache.mktmaps[fqme] = mkt symcache.mktmaps[fqme] = mkt
symcache.assets[src.name] = src symcache.assets[src.name] = src
symcache.assets[dst.name] = dst symcache.assets[dst.name] = dst
@ -271,9 +326,7 @@ def norm_trade_records(
extraction to fill in the `Transaction.sys: MktPair` field. extraction to fill in the `Transaction.sys: MktPair` field.
''' '''
# select: list[transactions] = []
records: list[Transaction] = [] records: list[Transaction] = []
for tid, record in ledger.items(): for tid, record in ledger.items():
txn = norm_trade( txn = norm_trade(
@ -294,64 +347,54 @@ def norm_trade_records(
key=lambda t: t.dt key=lambda t: t.dt
) )
# if (
# atype == 'fiat'
# or atype == 'option'
# ):
# select.append(trans)
# if select:
# breakpoint()
return {r.tid: r for r in records} return {r.tid: r for r in records}
def api_trades_to_ledger_entries( def api_trades_to_ledger_entries(
accounts: bidict[str, str], accounts: bidict[str, str],
fills: list[Fill],
# TODO: maybe we should just be passing through the
# ``ib_insync.order.Trade`` instance directly here
# instead of pre-casting to dicts?
trade_entries: list[dict],
) -> dict[str, dict]: ) -> dict[str, dict]:
''' '''
Convert API execution objects entry objects into ``dict`` form, Convert API execution objects entry objects into
pretty much straight up without modification except add flattened-``dict`` form, pretty much straight up without
a `pydatetime` field from the parsed timestamp. modification except add a `pydatetime` field from the parsed
timestamp so that on write
''' '''
trades_by_account: dict[str, dict] = {} trades_by_account: dict[str, dict] = {}
for t in trade_entries: for fill in fills:
# NOTE: example of schema we pull from the API client.
# {
# 'commissionReport': CommissionReport(...
# 'contract': {...
# 'execution': Execution(...
# 'time': 1654801166.0
# }
# flatten all sub-dicts and values into one top level entry. # NOTE: for the schema, see the defn for `Fill` which is
entry = {} # a `NamedTuple` subtype
for section, val in t.items(): fdict: dict = fill._asdict()
match section:
# flatten all (sub-)objects and convert to dicts.
# with values packed into one top level entry.
val: CommissionReport | Execution | Contract
txn_dict: dict[str, Any] = {}
for attr_name, val in fdict.items():
match attr_name:
# value is a `@dataclass` subtype
case 'contract' | 'execution' | 'commissionReport': case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases txn_dict.update(asdict(val))
entry.update(val)
case 'time': case 'time':
# ib has wack ns timestamps, or is that us? # ib has wack ns timestamps, or is that us?
continue continue
# TODO: we can remove this case right since there's
# only 4 fields on a `Fill`?
case _: case _:
entry[section] = val txn_dict[attr_name] = val
tid = str(entry['execId']) tid = str(txn_dict['execId'])
dt = pendulum.from_timestamp(entry['time']) dt = pendulum.from_timestamp(txn_dict['time'])
# TODO: why isn't this showing seconds in the str? txn_dict['datetime'] = str(dt)
entry['pydatetime'] = dt acctid = accounts[txn_dict['acctNumber']]
entry['datetime'] = str(dt)
acctid = accounts[entry['acctNumber']] # NOTE: only inserted (then later popped) for sorting below!
txn_dict['pydatetime'] = dt
if not tid: if not tid:
# this is likely some kind of internal adjustment # this is likely some kind of internal adjustment
@ -362,13 +405,18 @@ def api_trades_to_ledger_entries(
# the user from the accounts window in TWS where they can # the user from the accounts window in TWS where they can
# manually set the avg price and size: # manually set the avg price and size:
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST # https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') log.warning(
'Skipping ID-less ledger txn_dict:\n'
f'{pformat(txn_dict)}'
)
continue continue
trades_by_account.setdefault( trades_by_account.setdefault(
acctid, {} acctid, {}
)[tid] = entry )[tid] = txn_dict
# TODO: maybe we should just bisect.insort() into a list of
# tuples and then return a dict of that?
# sort entries in output by python based datetime # sort entries in output by python based datetime
for acctid in trades_by_account: for acctid in trades_by_account:
trades_by_account[acctid] = dict(sorted( trades_by_account[acctid] = dict(sorted(
@ -377,3 +425,55 @@ def api_trades_to_ledger_entries(
)) ))
return trades_by_account return trades_by_account
async def update_ledger_from_api_trades(
fills: list[Fill],
client: Client | MethodProxy,
accounts_def_inv: bidict[str, str],
# NOTE: provided for ad-hoc insertions "as transactions are
# processed" -> see `norm_trade()` signature requirements.
symcache: SymbologyCache | None = None,
) -> tuple[
dict[str, Transaction],
dict[str, dict],
]:
# XXX; ERRGGG..
# pack in the "primary/listing exchange" value from a
# contract lookup since it seems this isn't available by
# default from the `.fills()` method endpoint...
fill: Fill
for fill in fills:
con: Contract = fill.contract
conid: str = con.conId
pexch: str | None = con.primaryExchange
if not pexch:
cons = await client.get_con(conid=conid)
if cons:
con = cons[0]
pexch = con.primaryExchange or con.exchange
else:
# for futes it seems like the primary is always empty?
pexch: str = con.exchange
# pack in the ``Contract.secType``
# entry['asset_type'] = condict['secType']
entries: dict[str, dict] = api_trades_to_ledger_entries(
accounts_def_inv,
fills,
)
# normalize recent session's trades to the `Transaction` type
trans_by_acct: dict[str, dict[str, Transaction]] = {}
for acctid, trades_by_id in entries.items():
# normalize to transaction form
trans_by_acct[acctid] = norm_trade_records(
trades_by_id,
symcache=symcache,
)
return trans_by_acct, entries