From f2fff5a5fa9f0162aac1cba6c2d276d307517676 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 27 Jun 2023 13:21:59 -0400 Subject: [PATCH] ib._ledger: move trades transaction processing helpers into new module --- piker/brokers/ib/__init__.py | 2 + piker/brokers/ib/broker.py | 292 +++++------------------------------ piker/brokers/ib/ledger.py | 250 ++++++++++++++++++++++++++++++ 3 files changed, 289 insertions(+), 255 deletions(-) create mode 100644 piker/brokers/ib/ledger.py diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 07ed8af5..d42002a1 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -35,6 +35,8 @@ from .feed import ( ) from .broker import ( open_trade_dialog, +) +from .ledger import ( norm_trade_records, ) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 21d4baa5..9be0e13e 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -19,10 +19,8 @@ Order and trades endpoints for use with ``piker``'s EMS. """ from __future__ import annotations -from bisect import insort from contextlib import ExitStack from dataclasses import asdict -from decimal import Decimal from functools import partial from pprint import pformat import time @@ -55,8 +53,8 @@ import pendulum from piker import config from piker.accounting import ( - dec_digits, - digits_to_dec, + # dec_digits, + # digits_to_dec, Position, Transaction, open_trade_ledger, @@ -76,9 +74,6 @@ from piker.clearing._messages import ( BrokerdFill, BrokerdError, ) -from piker.accounting import ( - MktPair, -) from ._util import log from .api import ( _accounts2clients, @@ -89,6 +84,10 @@ from .api import ( MethodProxy, ) from ._flex_reports import parse_flex_dt +from .ledger import ( + norm_trade_records, + api_trades_to_ledger_entries, +) @@ -546,17 +545,6 @@ async def open_trade_dialog( acctids = set() cids2pps: dict[str, BrokerdPosition] = {} - # TODO: this causes a massive tractor bug when you run marketstored - # with ``--tsdb``... you should get: - # - first error the assertion - # - chart should get that error and die - # - pikerd goes to debugger again from trio nursery multi-error - # - hitting final control-c to kill daemon will lead to hang - # assert 0 - - # TODO: just write on teardown? - # we might also want to delegate a specific actor for - # ledger writing / reading for speed? async with ( open_client_proxies() as ( proxies, @@ -630,15 +618,19 @@ async def open_trade_dialog( ledger: dict = ledgers[acctid] table: PpTable = tables[acctid] + # update position table with latest ledger from all + # gathered transactions: ledger file + api records. + trans: dict[str, Transaction] = norm_trade_records(ledger) + # update trades ledgers for all accounts from connected # api clients which report trades for **this session**. api_trades = await proxy.trades() if api_trades: - trans_by_acct: dict[str, Transaction] + api_trans_by_acct: dict[str, Transaction] api_to_ledger_entries: dict[str, dict] ( - trans_by_acct, + api_trans_by_acct, api_to_ledger_entries, ) = await update_ledger_from_api_trades( api_trades, @@ -648,29 +640,26 @@ async def open_trade_dialog( # if new api_trades are detected from the API, prepare # them for the ledger file and update the pptable. - if api_to_ledger_entries: - trade_entries = api_to_ledger_entries.get(acctid) + if ( + api_to_ledger_entries + and (trade_entries := api_to_ledger_entries.get(acctid)) + ): # TODO: fix this `tractor` BUG! # https://github.com/goodboy/tractor/issues/354 # await tractor.pp() - if trade_entries: - # write ledger with all new api_trades - # **AFTER** we've updated the `pps.toml` - # from the original ledger state! (i.e. this - # is currently done on exit) + # write ledger with all new api_trades + # **AFTER** we've updated the `pps.toml` + # from the original ledger state! (i.e. this + # is currently done on exit) + for tid, entry in trade_entries.items(): + ledger.setdefault(tid, {}).update(entry) - for tid, entry in trade_entries.items(): - ledger.setdefault(tid, {}).update(entry) + if api_trans := api_trans_by_acct.get(acctid): + trans.update(api_trans) - trans = trans_by_acct.get(acctid) - if trans: - table.update_from_trans(trans) - - # update position table with latest ledger from all - # gathered transactions: ledger file + api records. - trans: dict[str, Transaction] = norm_trade_records(ledger) + # update account (and thus pps) from all gathered transactions table.update_from_trans(trans) # process pp value reported from ib's system. we only @@ -765,8 +754,11 @@ async def open_trade_dialog( tables, ) + # write account and ledger files immediately! # TODO: make this thread-async! - table.write_config() + for acctid, table in tables.items(): + table.write_config() + ledgers[acctid].write_config() # block until cancelled await trio.sleep_forever() @@ -784,10 +776,12 @@ async def emit_pp_update( ) -> None: - # compute and relay incrementally updated piker pp accounts_def_inv: bidict[str, str] = accounts_def.inverse - fq_acctid = accounts_def_inv[trade_entry['execution']['acctNumber']] - proxy = proxies[fq_acctid] + accnum: str = trade_entry['execution']['acctNumber'] + fq_acctid: str = accounts_def_inv[accnum] + proxy: MethodProxy = proxies[fq_acctid] + + # compute and relay incrementally updated piker pp ( records_by_acct, api_to_ledger_entries, @@ -796,8 +790,8 @@ async def emit_pp_update( proxy, accounts_def_inv, ) - trans = records_by_acct[fq_acctid] - r = list(trans.values())[0] + trans: dict[str, Transaction] = records_by_acct[fq_acctid] + tx: Transaction = list(trans.values())[0] acctid = fq_acctid.strip('ib.') table = tables[acctid] @@ -818,7 +812,7 @@ async def emit_pp_update( # re-formatted pps as msgs to the ems. for pos in filter( bool, - [active.get(r.bs_mktid), closed.get(r.bs_mktid)] + [active.get(tx.bs_mktid), closed.get(tx.bs_mktid)] ): msgs = await update_and_audit_msgs( acctid, @@ -1150,215 +1144,3 @@ async def deliver_trade_events( case _: log.error(f'WTF: {event_name}: {item}') - - -def norm_trade_records( - ledger: dict[str, Any], - -) -> dict[str, Transaction]: - ''' - Normalize a flex report or API retrieved executions - ledger into our standard record format. - - ''' - records: list[Transaction] = [] - - for tid, record in ledger.items(): - conid = record.get('conId') or record['conid'] - comms = record.get('commission') - if comms is None: - comms = -1*record['ibCommission'] - - price = record.get('price') or record['tradePrice'] - - # the api doesn't do the -/+ on the quantity for you but flex - # records do.. are you fucking serious ib...!? - size = record.get('quantity') or record['shares'] * { - 'BOT': 1, - 'SLD': -1, - }[record['side']] - - exch = record['exchange'] - lexch = record.get('listingExchange') - - # NOTE: remove null values since `tomlkit` can't serialize - # them to file. - dnc = record.pop('deltaNeutralContract', False) - if dnc is not None: - record['deltaNeutralContract'] = dnc - - suffix = lexch or exch - symbol = record['symbol'] - - # likely an opts contract record from a flex report.. - # TODO: no idea how to parse ^ the strike part from flex.. - # (00010000 any, or 00007500 tsla, ..) - # we probably must do the contract lookup for this? - if ' ' in symbol or '--' in exch: - underlying, _, tail = symbol.partition(' ') - suffix = exch = 'opt' - expiry = tail[:6] - # otype = tail[6] - # strike = tail[7:] - - print(f'skipping opts contract {symbol}') - continue - - # timestamping is way different in API records - dtstr = record.get('datetime') - date = record.get('date') - flex_dtstr = record.get('dateTime') - - if dtstr or date: - dt = pendulum.parse(dtstr or date) - - elif flex_dtstr: - # probably a flex record with a wonky non-std timestamp.. - dt = parse_flex_dt(record['dateTime']) - - # special handling of symbol extraction from - # flex records using some ad-hoc schema parsing. - asset_type: str = record.get( - 'assetCategory' - ) or record.get('secType', 'STK') - - # TODO: XXX: WOA this is kinda hacky.. probably - # should figure out the correct future pair key more - # explicitly and consistently? - if asset_type == 'FUT': - # (flex) ledger entries don't have any simple 3-char key? - symbol = record['symbol'][:3] - asset_type: str = 'future' - - elif asset_type == 'STK': - asset_type: str = 'stock' - - # try to build out piker fqme from record. - expiry = ( - record.get('lastTradeDateOrContractMonth') - or record.get('expiry') - ) - - if expiry: - expiry = str(expiry).strip(' ') - suffix = f'{exch}.{expiry}' - expiry = pendulum.parse(expiry) - - # src: str = record['currency'] - price_tick: Decimal = digits_to_dec(dec_digits(price)) - - pair = MktPair.from_fqme( - fqme=f'{symbol}.{suffix}.ib', - bs_mktid=str(conid), - _atype=str(asset_type), # XXX: can't serlialize `tomlkit.String` - - price_tick=price_tick, - # NOTE: for "legacy" assets, volume is normally discreet, not - # a float, but we keep a digit in case the suitz decide - # to get crazy and change it; we'll be kinda ready - # schema-wise.. - size_tick='1', - ) - - fqme = pair.fqme - - # NOTE: for flex records the normal fields for defining an fqme - # sometimes won't be available so we rely on two approaches for - # the "reverse lookup" of piker style fqme keys: - # - when dealing with API trade records received from - # `IB.trades()` we do a contract lookup at he time of processing - # - when dealing with flex records, it is assumed the record - # is at least a day old and thus the TWS position reporting system - # should already have entries if the pps are still open, in - # which case, we can pull the fqme from that table (see - # `trades_dialogue()` above). - insort( - records, - Transaction( - fqme=fqme, - sym=pair, - tid=tid, - size=size, - price=price, - cost=comms, - dt=dt, - expiry=expiry, - bs_mktid=str(conid), - ), - key=lambda t: t.dt - ) - - return {r.tid: r for r in records} - - -def api_trades_to_ledger_entries( - accounts: bidict[str, str], - - # TODO: maybe we should just be passing through the - # ``ib_insync.order.Trade`` instance directly here - # instead of pre-casting to dicts? - trade_entries: list[dict], - -) -> dict: - ''' - Convert API execution objects entry objects into ``dict`` form, - pretty much straight up without modification except add - a `pydatetime` field from the parsed timestamp. - - ''' - trades_by_account = {} - for t in trade_entries: - # NOTE: example of schema we pull from the API client. - # { - # 'commissionReport': CommissionReport(... - # 'contract': {... - # 'execution': Execution(... - # 'time': 1654801166.0 - # } - - # flatten all sub-dicts and values into one top level entry. - entry = {} - for section, val in t.items(): - match section: - case 'contract' | 'execution' | 'commissionReport': - # sub-dict cases - entry.update(val) - - case 'time': - # ib has wack ns timestamps, or is that us? - continue - - case _: - entry[section] = val - - tid = str(entry['execId']) - dt = pendulum.from_timestamp(entry['time']) - # TODO: why isn't this showing seconds in the str? - entry['pydatetime'] = dt - entry['datetime'] = str(dt) - acctid = accounts[entry['acctNumber']] - - if not tid: - # this is likely some kind of internal adjustment - # transaction, likely one of the following: - # - an expiry event that will show a "book trade" indicating - # some adjustment to cash balances: zeroing or itm settle. - # - a manual cash balance position adjustment likely done by - # the user from the accounts window in TWS where they can - # manually set the avg price and size: - # https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST - log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') - continue - - trades_by_account.setdefault( - acctid, {} - )[tid] = entry - - # sort entries in output by python based datetime - for acctid in trades_by_account: - trades_by_account[acctid] = dict(sorted( - trades_by_account[acctid].items(), - key=lambda entry: entry[1].pop('pydatetime'), - )) - - return trades_by_account diff --git a/piker/brokers/ib/ledger.py b/piker/brokers/ib/ledger.py new file mode 100644 index 00000000..2d1c1003 --- /dev/null +++ b/piker/brokers/ib/ledger.py @@ -0,0 +1,250 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Trade transaction accounting and normalization. + +''' +from bisect import insort +from decimal import Decimal +from pprint import pformat +from typing import ( + Any, +) + +from bidict import bidict +import pendulum + +from piker.accounting import ( + dec_digits, + digits_to_dec, + Transaction, + MktPair, +) +from ._flex_reports import parse_flex_dt +from ._util import log + + +def norm_trade_records( + ledger: dict[str, Any], + +) -> dict[str, Transaction]: + ''' + Normalize a flex report or API retrieved executions + ledger into our standard record format. + + ''' + records: list[Transaction] = [] + + for tid, record in ledger.items(): + conid = record.get('conId') or record['conid'] + comms = record.get('commission') + if comms is None: + comms = -1*record['ibCommission'] + + price = record.get('price') or record['tradePrice'] + + # the api doesn't do the -/+ on the quantity for you but flex + # records do.. are you fucking serious ib...!? + size = record.get('quantity') or record['shares'] * { + 'BOT': 1, + 'SLD': -1, + }[record['side']] + + exch = record['exchange'] + lexch = record.get('listingExchange') + + # NOTE: remove null values since `tomlkit` can't serialize + # them to file. + dnc = record.pop('deltaNeutralContract', False) + if dnc is not None: + record['deltaNeutralContract'] = dnc + + suffix = lexch or exch + symbol = record['symbol'] + + # likely an opts contract record from a flex report.. + # TODO: no idea how to parse ^ the strike part from flex.. + # (00010000 any, or 00007500 tsla, ..) + # we probably must do the contract lookup for this? + if ' ' in symbol or '--' in exch: + underlying, _, tail = symbol.partition(' ') + suffix = exch = 'opt' + expiry = tail[:6] + # otype = tail[6] + # strike = tail[7:] + + print(f'skipping opts contract {symbol}') + continue + + # timestamping is way different in API records + dtstr = record.get('datetime') + date = record.get('date') + flex_dtstr = record.get('dateTime') + + if dtstr or date: + dt = pendulum.parse(dtstr or date) + + elif flex_dtstr: + # probably a flex record with a wonky non-std timestamp.. + dt = parse_flex_dt(record['dateTime']) + + # special handling of symbol extraction from + # flex records using some ad-hoc schema parsing. + asset_type: str = record.get( + 'assetCategory' + ) or record.get('secType', 'STK') + + # TODO: XXX: WOA this is kinda hacky.. probably + # should figure out the correct future pair key more + # explicitly and consistently? + if asset_type == 'FUT': + # (flex) ledger entries don't have any simple 3-char key? + symbol = record['symbol'][:3] + asset_type: str = 'future' + + elif asset_type == 'STK': + asset_type: str = 'stock' + + # try to build out piker fqme from record. + expiry = ( + record.get('lastTradeDateOrContractMonth') + or record.get('expiry') + ) + + if expiry: + expiry = str(expiry).strip(' ') + suffix = f'{exch}.{expiry}' + expiry = pendulum.parse(expiry) + + # src: str = record['currency'] + price_tick: Decimal = digits_to_dec(dec_digits(price)) + + pair = MktPair.from_fqme( + fqme=f'{symbol}.{suffix}.ib', + bs_mktid=str(conid), + _atype=str(asset_type), # XXX: can't serlialize `tomlkit.String` + + price_tick=price_tick, + # NOTE: for "legacy" assets, volume is normally discreet, not + # a float, but we keep a digit in case the suitz decide + # to get crazy and change it; we'll be kinda ready + # schema-wise.. + size_tick='1', + ) + + fqme = pair.fqme + + # NOTE: for flex records the normal fields for defining an fqme + # sometimes won't be available so we rely on two approaches for + # the "reverse lookup" of piker style fqme keys: + # - when dealing with API trade records received from + # `IB.trades()` we do a contract lookup at he time of processing + # - when dealing with flex records, it is assumed the record + # is at least a day old and thus the TWS position reporting system + # should already have entries if the pps are still open, in + # which case, we can pull the fqme from that table (see + # `trades_dialogue()` above). + insort( + records, + Transaction( + fqme=fqme, + sym=pair, + tid=tid, + size=size, + price=price, + cost=comms, + dt=dt, + expiry=expiry, + bs_mktid=str(conid), + ), + key=lambda t: t.dt + ) + + return {r.tid: r for r in records} + + +def api_trades_to_ledger_entries( + accounts: bidict[str, str], + + # TODO: maybe we should just be passing through the + # ``ib_insync.order.Trade`` instance directly here + # instead of pre-casting to dicts? + trade_entries: list[dict], + +) -> dict: + ''' + Convert API execution objects entry objects into ``dict`` form, + pretty much straight up without modification except add + a `pydatetime` field from the parsed timestamp. + + ''' + trades_by_account = {} + for t in trade_entries: + # NOTE: example of schema we pull from the API client. + # { + # 'commissionReport': CommissionReport(... + # 'contract': {... + # 'execution': Execution(... + # 'time': 1654801166.0 + # } + + # flatten all sub-dicts and values into one top level entry. + entry = {} + for section, val in t.items(): + match section: + case 'contract' | 'execution' | 'commissionReport': + # sub-dict cases + entry.update(val) + + case 'time': + # ib has wack ns timestamps, or is that us? + continue + + case _: + entry[section] = val + + tid = str(entry['execId']) + dt = pendulum.from_timestamp(entry['time']) + # TODO: why isn't this showing seconds in the str? + entry['pydatetime'] = dt + entry['datetime'] = str(dt) + acctid = accounts[entry['acctNumber']] + + if not tid: + # this is likely some kind of internal adjustment + # transaction, likely one of the following: + # - an expiry event that will show a "book trade" indicating + # some adjustment to cash balances: zeroing or itm settle. + # - a manual cash balance position adjustment likely done by + # the user from the accounts window in TWS where they can + # manually set the avg price and size: + # https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST + log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') + continue + + trades_by_account.setdefault( + acctid, {} + )[tid] = entry + + # sort entries in output by python based datetime + for acctid in trades_by_account: + trades_by_account[acctid] = dict(sorted( + trades_by_account[acctid].items(), + key=lambda entry: entry[1].pop('pydatetime'), + )) + + return trades_by_account