From e344bdbf1b3c9505628394bd39955419854fcd07 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 25 Jul 2023 18:03:32 -0400 Subject: [PATCH] ib: rework trade handling, take ib position sizes as gospel Instead of casting to `dict`s and rewriting event names in the `push_tradesies()` handler, be transparent with event names (also defining and piker-equivalent mapping them in a redefined `_statuses` table) and types passing them directly to the `deliver_trade_events()` task and generally make event handler blocks much easier to grok with type annotations. To deal with the causality dilemma of *when to emit a pos msg* due to needing all of `execDetailsEvent, commissionReportEvent, positionEvent` but having no guarantee on received order, we implement a small task `clears: dict[Contract, tuple[Position, Fill]]` tracker table and (as before) only emit a position event once the "cost" can be accessed for the fill. We now ALWAYS relay any `Position` update from IB directly to ensure (at least) the cumsize is correct (since it appears we still have ongoing issues with computing this correctly via `.accounting.Position` updates..). Further related adjustments: - load (fiat) balances and startup positions into a new `IbAcnt` struct. - change `update_and_audit_pos_msg()` to blindly forward ib position event updates for the **the size** since it should always be considered the true gospel for accounting! - drop ib-has-no-position handling since it should never occur.. - move `update_ledger_from_api_trades()` to the `.ledger` submod and do processing of ib_insync `Fill` related objects instead of dict-casted versions instead doing the casting in `api_trades_to_ledger_entries()`. - `norm_trade()`: add `symcache.mktmaps[bs_mktid] = mkt` in since it turns out API (and sometimes FLEX) records don't contain the listing exchange/venue thus making it impossible to map an asset pair in the "position sense" (i.e. over multiple venues: qqq.nasdaq, qqq.arca, qqq.directedge) to an fqme when doing offline ledger processing; instead use frickin IB's internal int-id so there's no discrepancy. - also much better handle futures mkt trade flex records such that parsed `MktPair.fqme` is consistent. --- piker/brokers/ib/broker.py | 1091 +++++++++++++++++++----------------- piker/brokers/ib/ledger.py | 202 +++++-- 2 files changed, 719 insertions(+), 574 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 56c3d82a..e9c6c83f 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -26,18 +26,16 @@ from pprint import pformat import time from typing import ( Any, - Optional, AsyncIterator, - Union, ) from bidict import bidict import trio from trio_typing import TaskStatus import tractor +from tractor.to_asyncio import LinkedTaskChannel from ib_insync.contract import ( Contract, - Option, ) from ib_insync.order import ( Trade, @@ -47,23 +45,24 @@ from ib_insync.objects import ( Fill, Execution, CommissionReport, + Position as IbPosition, ) -from ib_insync.objects import Position as IbPosition from piker import config from piker.accounting import ( - # dec_digits, - # digits_to_dec, Position, Transaction, open_trade_ledger, TransactionLedger, open_account, Account, + Asset, + MktPair, ) -from piker.data._symcache import ( +from piker.data import ( open_symcache, SymbologyCache, + Struct, ) from piker.clearing._messages import ( Order, @@ -79,28 +78,32 @@ from piker.clearing._messages import ( from ._util import log from .api import ( _accounts2clients, - con2fqme, get_config, open_client_proxies, Client, MethodProxy, ) +from .symbols import ( + con2fqme, + # get_mkt_info, +) from .ledger import ( norm_trade_records, - api_trades_to_ledger_entries, tx_sort, + update_ledger_from_api_trades, ) def pack_position( - pos: IbPosition + pos: IbPosition, + accounts_def: bidict[str, str], ) -> tuple[ str, dict[str, Any] ]: - con = pos.contract + con: Contract = pos.contract fqme, calc_price = con2fqme(con) # TODO: options contracts into a sane format.. @@ -108,7 +111,7 @@ def pack_position( str(con.conId), BrokerdPosition( broker='ib', - account=pos.account, + account=accounts_def.inverse[pos.account], symbol=fqme, currency=con.currency, size=float(pos.position), @@ -128,9 +131,8 @@ async def handle_order_requests( async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') - action = request_msg['action'] - account = request_msg['account'] - + action: str = request_msg['action'] + account: str = request_msg['account'] acct_number = accounts_def.get(account) if not acct_number: log.error( @@ -213,8 +215,12 @@ async def recv_trade_updates( to_trio: trio.abc.SendChannel, ) -> None: - """Stream a ticker using the std L1 api. - """ + ''' + Receive and relay order control and positioning related events + from `ib_insync`, pack as tuples and push over mem-chan to our + trio relay task for processing and relay to EMS. + + ''' client.inline_errors(to_trio) # sync with trio task @@ -223,37 +229,40 @@ async def recv_trade_updates( def push_tradesies( eventkit_obj, obj, - fill: Optional[Fill] = None, - report: Optional[CommissionReport] = None, + fill: Fill | None = None, + report: CommissionReport | None = None, ): ''' Push events to trio task. ''' - match eventkit_obj.name(): + emit: tuple | object + event_name: str = eventkit_obj.name() + match event_name: case 'orderStatusEvent': - item = ('status', obj) + emit: Trade = obj case 'commissionReportEvent': assert report - item = ('cost', report) + emit: CommissionReport = report case 'execDetailsEvent': # execution details event - item = ('fill', (obj, fill)) + emit: tuple[Trade, Fill] = (obj, fill) case 'positionEvent': - item = ('position', obj) + emit: Position = obj case _: log.error(f'Error unknown event {obj}') return - log.info(f'eventkit event ->\n{pformat(item)}') + log.info(f'eventkit event ->\n{pformat(emit)}') try: - to_trio.send_nowait(item) + # emit event name + relevant ibis internal objects + to_trio.send_nowait((event_name, emit)) except trio.BrokenResourceError: log.exception(f'Disconnected from {eventkit_obj} updates') eventkit_obj.disconnect(push_tradesies) @@ -270,6 +279,8 @@ async def recv_trade_updates( 'commissionReportEvent', # XXX: not sure yet if we need these + # -> prolly not since the named tuple type doesn't offer + # much more then a few more pnl fields.. # 'updatePortfolioEvent', # XXX: these all seem to be weird ib_insync internal @@ -288,196 +299,114 @@ async def recv_trade_updates( await client.ib.disconnectedEvent -# TODO: maybe we should allow the `trade_entries` input to be -# a list of the actual `Contract` types instead, though a couple -# other callers will need to be changed as well. -async def update_ledger_from_api_trades( - trade_entries: list[dict[str, Any]], - client: Union[Client, MethodProxy], - accounts_def_inv: bidict[str, str], - - # provided for ad-hoc insertions "as transactions are - # processed" - symcache: SymbologyCache | None = None, - -) -> tuple[ - dict[str, Transaction], - dict[str, dict], -]: - # XXX; ERRGGG.. - # pack in the "primary/listing exchange" value from a - # contract lookup since it seems this isn't available by - # default from the `.fills()` method endpoint... - for entry in trade_entries: - condict = entry['contract'] - # print( - # f"{condict['symbol']}: GETTING CONTRACT INFO!\n" - # ) - conid = condict['conId'] - pexch = condict['primaryExchange'] - - if not pexch: - cons = await client.get_con(conid=conid) - if cons: - con = cons[0] - pexch = con.primaryExchange or con.exchange - else: - # for futes it seems like the primary is always empty? - pexch = condict['exchange'] - - entry['listingExchange'] = pexch - - # pack in the ``Contract.secType`` - entry['asset_type'] = condict['secType'] - - entries: dict[str, dict] = api_trades_to_ledger_entries( - accounts_def_inv, - trade_entries, - ) - # normalize recent session's trades to the `Transaction` type - trans_by_acct: dict[str, dict[str, Transaction]] = {} - - for acctid, trades_by_id in entries.items(): - # normalize to transaction form - trans_by_acct[acctid] = norm_trade_records( - trades_by_id, - symcache=symcache, - ) - - return trans_by_acct, entries - - -async def update_and_audit_msgs( +async def update_and_audit_pos_msg( acctid: str, # no `ib.` prefix is required! - pps: list[Position], - cids2pps: dict[tuple[str, int], BrokerdPosition], + pikerpos: Position, + ibpos: IbPosition, + cons2mkts: dict[Contract, MktPair], validate: bool = True, -) -> list[BrokerdPosition]: +) -> BrokerdPosition: - msgs: list[BrokerdPosition] = [] - p: Position - for p in pps: - bs_mktid = p.bs_mktid + # NOTE: lookup the ideal `MktPair` value, since multi-venue + # trade records can result in multiple MktpPairs (eg. qqq.arca.ib and + # qqq.nasdaq.ib can map to the same bs_mktid but have obviously + # different .fqme: str values..), instead allow caller to + # provide a table with the desired output mkt-map values; + # eventually this should probably come from a deterministically + # generated symcache.. + # TODO: figure out how make this not be so frickin CRAP by + # either allowing bs_mktid to be the position key or possibly + # be extra pendantic with the `Client._mkts` table? + con: Contract = ibpos.contract + mkt: MktPair = cons2mkts.get(con, pikerpos.mkt) + bs_fqme: str = mkt.bs_fqme - # retreive equivalent ib reported position message - # for comparison/audit versus the piker equivalent - # breakeven pp calcs. - ibppmsg = cids2pps.get((acctid, bs_mktid)) - if ibppmsg: - msg = BrokerdPosition( - broker='ib', + msg = BrokerdPosition( + broker='ib', - # XXX: ok so this is annoying, we're relaying - # an account name with the backend suffix prefixed - # but when reading accounts from ledgers we don't - # need it and/or it's prefixed in the section - # table.. - account=ibppmsg.account, + # TODO: probably forget about this once we drop this msg + # entirely from our set.. + # XXX: ok so this is annoying, we're relaying + # an account name with the backend suffix prefixed + # but when reading accounts from ledgers we don't + # need it and/or it's prefixed in the section + # table.. we should just strip this from the message + # right since `.broker` is already included? + account=f'ib.{acctid}', + # account=account_def.inverse[ibpos.account], - # XXX: the `.ib` is stripped..? - symbol=ibppmsg.symbol, + # XXX: the `.ib` is stripped..? + symbol=bs_fqme, - # remove.. - # currency=ibppmsg.currency, + # remove.. + # currency=ibpos.currency, - # NOTE: always take their size since it's usually the - # true gospel.. - # size=p.size, - size=ibppmsg.size, + # NOTE: always take their size since it's usually the + # true gospel.. this SHOULD be the same always as ours + # tho.. + # size=pikerpos.size, + size=ibpos.position, - avg_price=p.ppu, - ) - msgs.append(msg) + avg_price=pikerpos.ppu, + ) - ibfmtmsg = pformat(ibppmsg.to_dict()) - pikerfmtmsg = pformat(msg.to_dict()) + ibfmtmsg: str = pformat(ibpos._asdict()) + pikerfmtmsg: str = pformat(msg.to_dict()) - ibsize = ibppmsg.size - pikersize = msg.size - diff = pikersize - ibsize + ibsize: float = ibpos.position + pikersize: float = msg.size + diff: float = pikersize - ibsize - # if ib reports a lesser pp it's not as bad since we can - # presume we're at least not more in the shit then we - # thought. - if ( - diff - and ( - pikersize - or ibsize - ) - ): + # NOTE: compare equivalent ib reported position message for + # comparison/audit versus the piker equivalent breakeven pp + # calcs. if ib reports a lesser pp it's not as bad since we can + # presume we're at least not more in the shit then we thought. + if ( + diff + and ( + pikersize + or ibsize + ) + ): - # reverse_split_ratio = pikersize / ibsize - # split_ratio = 1/reverse_split_ratio - # if split_ratio >= reverse_split_ratio: - # entry = f'split_ratio = {int(split_ratio)}' - # else: - # entry = f'split_ratio = 1/{int(reverse_split_ratio)}' + # reverse_split_ratio = pikersize / ibsize + # split_ratio = 1/reverse_split_ratio + # if split_ratio >= reverse_split_ratio: + # entry = f'split_ratio = {int(split_ratio)}' + # else: + # entry = f'split_ratio = 1/{int(reverse_split_ratio)}' - msg.size = ibsize - logmsg: str = ( - f'Pos mismatch in ib vs. the piker ledger!\n' - f'IB:\n{ibfmtmsg}\n\n' - f'PIKER:\n{pikerfmtmsg}\n\n' + msg.size = ibsize + logmsg: str = ( + f'Pos mismatch in ib vs. the piker ledger!\n' + f'IB:\n{ibfmtmsg}\n\n' + f'PIKER:\n{pikerfmtmsg}\n\n' - # 'If you are expecting a (reverse) split in this ' - # 'instrument you should probably put the following' - # 'in the `pps.toml` section:\n' - # f'{entry}\n' - # f'reverse_split_ratio: {reverse_split_ratio}\n' - # f'split_ratio: {split_ratio}\n\n' - ) - - if validate: - raise ValueError(logmsg) - else: - # await tractor.pause() - log.error(logmsg) - - # TODO: make this a "propaganda" log level? - if ibppmsg.avg_price != msg.avg_price: - log.warning( - f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' - f'ib: {ibfmtmsg}\n' - '---------------------------\n' - f'piker: {pformat(msg.to_dict())}' - ) + # 'If you are expecting a (reverse) split in this ' + # 'instrument you should probably put the following' + # 'in the `pps.toml` section:\n' + # f'{entry}\n' + # f'reverse_split_ratio: {reverse_split_ratio}\n' + # f'split_ratio: {split_ratio}\n\n' + ) + if validate: + raise ValueError(logmsg) else: - # XXX: though it shouldn't be possible (means an error - # in our accounting subsys) create a new message for - # a supposed "missing position" that IB never reported. - msg = BrokerdPosition( - broker='ib', + # await tractor.pause() + log.error(logmsg) - # XXX: ok so this is annoying, we're relaying - # an account name with the backend suffix prefixed - # but when reading accounts from ledgers we don't - # need it and/or it's prefixed in the section - # table.. we should just strip this from the message - # right since `.broker` is already included? - account=f'ib.{acctid}', + # TODO: make this a "propaganda" log level? + if ibpos.avgCost != msg.avg_price: + log.warning( + f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' + f'ib: {ibfmtmsg}\n' + '---------------------------\n' + f'piker: {pformat(msg.to_dict())}' + ) - # XXX: the `.ib` is stripped..? - symbol=p.mkt.fqme, - - # TODO: we should remove from msg schema.. - # currency=ibppmsg.currency, - - size=p.size, - avg_price=p.ppu, - ) - if p.size: - logmsg: str = ( - f'UNEXPECTED POSITION says IB => {msg.symbol}\n' - 'Maybe they LIQUIDATED YOU or are missing ledger entries?\n' - ) - log.error(logmsg) - if validate: - raise ValueError(logmsg) - - return msgs + return msg async def aggr_open_orders( @@ -538,24 +467,54 @@ async def aggr_open_orders( return order_msgs -# proxy wrapper for starting trade event stream async def open_trade_event_stream( client: Client, task_status: TaskStatus[ - trio.abc.ReceiveChannel + LinkedTaskChannel ] = trio.TASK_STATUS_IGNORED, ): - # each api client has a unique event stream + ''' + Proxy wrapper for starting trade event stream from ib_insync + which spawns an asyncio task that registers an internal closure + (`push_tradies()`) which in turn relays trading events through + a `tractor.to_asyncio.LinkedTaskChannel` which the parent + (caller task) can use to process said events in trio-land. + + NOTE: each api client has a unique event stream. + + ''' + trade_event_stream: LinkedTaskChannel + async with tractor.to_asyncio.open_channel_from( recv_trade_updates, client=client, - ) as (_, trade_event_stream): - - # assert ibclient is client.ib + ) as ( + _, # first pushed val + trade_event_stream, + ): task_status.started(trade_event_stream) + # block forever to keep session trio-asyncio session + # up until cancelled or error on either side. await trio.sleep_forever() +class IbAcnt(Struct): + ''' + Wrapper around the useful info for doing accounting (mostly for + position tracking). + + ''' + key: str + balances: dict[ + str, # fiat or crypto name + float # current balance + ] + # TODO: do we need the asset instances? + # (float, Asset), + # ] + positions: dict[str, IbPosition] + + @tractor.context async def open_trade_dialog( ctx: tractor.Context, @@ -565,11 +524,11 @@ async def open_trade_dialog( accounts_def = config.load_accounts(['ib']) # deliver positions to subscriber before anything else - all_positions = [] - accounts = set() - acctids = set() - cids2pps: dict[str, BrokerdPosition] = {} + all_positions: list[BrokerdPosition] = [] + accounts: set[str] = set() + acctids: set[str] = set() + symcache: SymbologyCache async with ( open_client_proxies() as ( proxies, @@ -592,10 +551,12 @@ async def open_trade_dialog( ExitStack() as lstack, ): # load ledgers and pps for all detected client-proxies + account: str + proxy: MethodProxy for account, proxy in proxies.items(): assert account in accounts_def accounts.add(account) - acctid = account.strip('ib.') + acctid: str = account.strip('ib.') acctids.add(acctid) # open ledger and pptable wrapper for each @@ -628,67 +589,7 @@ async def open_trade_dialog( ) for account, proxy in proxies.items(): - client = aioclients[account] - - # order_msgs is filled in by this helper - await aggr_open_orders( - order_msgs, - client, - proxy, - accounts_def, - ) - acctid: str = account.strip('ib.') - ledger: dict = ledgers[acctid] - table: Account = tables[acctid] - - # update position table with latest ledger from all - # gathered transactions: ledger file + api records. - trans: dict[str, Transaction] = norm_trade_records( - ledger, - symcache=symcache, - ) - - # update trades ledgers for all accounts from connected - # api clients which report trades for **this session**. - api_trades: list[dict] = await proxy.trades() - if api_trades: - api_trans_by_acct: dict[str, Transaction] - api_to_ledger_entries: dict[str, dict] - ( - api_trans_by_acct, - api_to_ledger_entries, - ) = await update_ledger_from_api_trades( - api_trades, - proxy, - accounts_def_inv, - symcache=symcache, - ) - - # if new api_trades are detected from the API, prepare - # them for the ledger file and update the pptable. - if ( - api_to_ledger_entries - and (trade_entries := api_to_ledger_entries.get(acctid)) - ): - # TODO: fix this `tractor` BUG! - # https://github.com/goodboy/tractor/issues/354 - # await tractor.pp() - - # write ledger with all new api_trades - # **AFTER** we've updated the `pps.toml` - # from the original ledger state! (i.e. this - # is currently done on exit) - for tid, entry in trade_entries.items(): - ledger.setdefault(tid, {}).update(entry) - - if api_trans := api_trans_by_acct.get(acctid): - trans.update(api_trans) - - # update account (and thus pps) from all gathered transactions - table.update_from_ledger( - trans, - symcache=ledger.symcache, - ) + client: Client = aioclients[account] # process pp value reported from ib's system. we only # use these to cross-check sizing since average pricing @@ -699,47 +600,135 @@ async def open_trade_dialog( # sure know which positions to update from the ledger if # any are missing from the ``pps.toml`` # await tractor.pp() - + ib_positions: dict[str, IbPosition] = {} pos: IbPosition # named tuple subtype for pos in client.positions(): + bs_mktid: str = str(pos.contract.conId) + ib_positions[bs_mktid] = pos - # NOTE XXX: we skip options for now since we don't - # yet support the symbology nor the live feeds. - if isinstance(pos.contract, Option): - log.warning( - f'Option contracts not supported for now:\n' - f'{pos._asdict()}' - ) - continue - - bs_mktid, msg = pack_position(pos) - msg.account = accounts_def.inverse[msg.account] - acctid = msg.account.strip('ib.') - cids2pps[(acctid, bs_mktid)] = msg + bs_mktid, msg = pack_position(pos, accounts_def) + acctid: str = msg.account.strip('ib.') assert msg.account in accounts, ( f'Position for unknown account: {msg.account}') + balances: dict[str, tuple[float, Asset]] = {} + for av in client.ib.accountValues(): + match av.tag: + case 'CashBalance': + balances[av.currency] = float(av.value) + + # TODO: if we want supposed forex pnls? + # case 'UnrealizedPnL': + # ... + + ibacnt = IbAcnt( + key=acctid, + balances=balances, + positions=ib_positions, + ) + # print( + # f'Current balances for {ibacnt.key}: {ibacnt.balances}' + # ) + + # order_msgs is filled in by this helper + await aggr_open_orders( + order_msgs, + client, + proxy, + accounts_def, + ) + acctid: str = account.strip('ib.') + ledger: dict = ledgers[acctid] + acnt: Account = tables[acctid] + + # update position table with latest ledger from all + # gathered transactions: ledger file + api records. + trans: dict[str, Transaction] = norm_trade_records( + ledger, + symcache=symcache, + ) + + # update trades ledgers for all accounts from connected + # api clients which report trades for **this session**. + api_fills: list[Fill] = await proxy.get_fills() + if api_fills: + api_trans_by_acct: dict[str, Transaction] + api_to_ledger_entries: dict[str, dict] + ( + api_trans_by_acct, + api_to_ledger_entries, + ) = await update_ledger_from_api_trades( + api_fills, + proxy, + accounts_def_inv, + symcache=symcache, + ) + + # if new api_fills are detected from the API, prepare + # them for the ledger file and update the pptable. + if ( + api_to_ledger_entries + and (trade_entries := api_to_ledger_entries.get(acctid)) + ): + # TODO: fix this `tractor` BUG! + # https://github.com/goodboy/tractor/issues/354 + # await tractor.pp() + + # write ledger with all new api_fills + # **AFTER** we've updated the `pps.toml` + # from the original ledger state! (i.e. this + # is currently done on exit) + for tid, entry in trade_entries.items(): + ledger.setdefault(tid, {}).update(entry) + + if api_trans := api_trans_by_acct.get(acctid): + trans.update(api_trans) + + # update account (and thus pps) from all gathered transactions + acnt.update_from_ledger( + trans, + symcache=ledger.symcache, + ) + # iterate all (newly) updated pps tables for every # client-account and build out position msgs to deliver to # EMS. - for acctid, table in tables.items(): - active_pps, closed_pps = table.dump_active() + for acctid, acnt in tables.items(): + active_pps, closed_pps = acnt.dump_active() for pps in [active_pps, closed_pps]: - msgs = await update_and_audit_msgs( - acctid, - pps.values(), - cids2pps, - validate=False, - ) - all_positions.extend(msg for msg in msgs if msg.size != 0) - - if not all_positions and cids2pps: - raise RuntimeError( - 'Positions reported by ib but not found in `pps.toml`!?\n' - f'{pformat(cids2pps)}' - ) + piker_pps: list[Position] = list(pps.values()) + for pikerpos in piker_pps: + # TODO: map from both the contract ID + # (bs_mktid) AND the piker-ified FQME ?? + # - they might change the fqme when bbby get's + # downlisted to pink XD + # - the bs_mktid can randomly change like in + # gnln.nasdaq.. + ibpos: IbPosition | None = ibacnt.positions.get( + pikerpos.bs_mktid + ) + if ibpos: + bs_mktid: str = str(ibpos.contract.conId) + msg = await update_and_audit_pos_msg( + acctid, + pikerpos, + ibpos, + cons2mkts=client._cons2mkts, + validate=False, + ) + if msg and msg.size != 0: + all_positions.append(msg) + elif ( + not ibpos + and pikerpos.cumsize + ): + logmsg: str = ( + f'UNEXPECTED POSITION says IB => {msg.symbol}\n' + 'Maybe they LIQUIDATED YOU or your ledger is wrong?\n' + ) + log.error(logmsg) await ctx.started(( all_positions, @@ -755,7 +744,7 @@ async def open_trade_dialog( await ems_stream.send(msg) for client in set(aioclients.values()): - trade_event_stream = await n.start( + trade_event_stream: LinkedTaskChannel = await n.start( open_trade_event_stream, client, ) @@ -771,21 +760,19 @@ async def open_trade_dialog( # allocate event relay tasks for each client connection n.start_soon( deliver_trade_events, - # n, + trade_event_stream, ems_stream, accounts_def, - cids2pps, proxies, - ledgers, tables, ) # write account and ledger files immediately! # TODO: make this thread-async! - for acctid, table in tables.items(): - table.write_config() + for acctid, acnt in tables.items(): + acnt.write_config() ledgers[acctid].write_config() # block until cancelled @@ -794,103 +781,196 @@ async def open_trade_dialog( async def emit_pp_update( ems_stream: tractor.MsgStream, - trade_entry: dict, + accounts_def: bidict[str, str], proxies: dict, - cids2pps: dict, ledgers: dict[str, dict[str, Any]], acnts: dict[str, Account], + ibpos: IbPosition, # required! + + # NEED it before we actually update the trade ledger + fill: Fill | None = None, + ) -> None: ''' - Extract trade record from an API event, convert it into a `Transaction`, - update the backing ledger and finally emit a position update to the EMS. + Emit a position update to the EMS either directly from + a `IbPosition` update (received from the API) or ideally from + a `piker.accounting.Position` update (once it's entirely bug + free xD) by extracting the trade record from the (optionally + provided) `Fill` event, convert it into a `Transaction`, update + the backing ledger and emit a msg for the account's `Position` + entry. ''' accounts_def_inv: bidict[str, str] = accounts_def.inverse - accnum: str = trade_entry['execution']['acctNumber'] + accnum: str = ibpos.account fq_acctid: str = accounts_def_inv[accnum] proxy: MethodProxy = proxies[fq_acctid] + client: Client = proxy._aio_ns - # compute and relay incrementally updated piker pp - ( - records_by_acct, - api_to_ledger_entries, - ) = await update_ledger_from_api_trades( - [trade_entry], - proxy, - accounts_def_inv, - ) - trans: dict[str, Transaction] = records_by_acct[fq_acctid] - tx: Transaction = list(trans.values())[0] - - acctid = fq_acctid.strip('ib.') - acnt = acnts[acctid] - ledger: dict = ledgers[acctid] - - acnt.update_from_ledger( - trans, - symcache=ledger.symcache - ) - - active, closed = acnt.dump_active() - - # NOTE: update ledger with all new trades - for fq_acctid, trades_by_id in api_to_ledger_entries.items(): - acctid: str = fq_acctid.strip('ib.') - ledger: dict = ledgers[acctid] - - # NOTE: don't override flex/previous entries with new API - # ones, just update with new fields! - for tid, tdict in trades_by_id.items(): - ledger.setdefault(tid, {}).update(tdict) - - # generate pp msgs and cross check with ib's positions data, relay - # re-formatted pps as msgs to the ems. - for pos in filter( - bool, - [ - active.get(tx.bs_mktid), - closed.get(tx.bs_mktid) - ] - ): - msgs = await update_and_audit_msgs( - acctid, - [pos], - cids2pps, - - # ib pp event might not have arrived yet - validate=False, + # XXX FILL CASE: + # compute and relay incrementally updated piker pos + # after doing accounting calcs + if fill: + ( + records_by_acct, + api_to_ledger_entries, + ) = await update_ledger_from_api_trades( + [fill], + proxy, + accounts_def_inv, ) - if msgs: - msg = msgs[0] - log.info(f'Emitting pp msg: {msg}') - break + trans: dict[str, Transaction] = records_by_acct[fq_acctid] + tx: Transaction = list(trans.values())[0] - await ems_stream.send(msg) + acctid: str = fq_acctid.strip('ib.') + acnt: Account = acnts[acctid] + ledger: TransactionLedger = ledgers[acctid] + + # write to disk/storage + ledger.write_config() + + # con: Contract = fill.contract + + acnt.update_from_ledger( + trans, + + # XXX: currently this is likely empty since we have no + # support! + symcache=ledger.symcache, + + # TODO: remove this hack by attempting to symcache an + # incrementally updated table? + _mktmap_table=client._contracts + ) + + # re-compute all positions that have changed state. + # TODO: likely we should change the API to return the + # position updates from `.update_from_ledger()`? + active, closed = acnt.dump_active() + + # NOTE: update ledger with all new trades + for fq_acctid, trades_by_id in api_to_ledger_entries.items(): + acctid: str = fq_acctid.strip('ib.') + ledger: dict = ledgers[acctid] + + # NOTE: don't override flex/previous entries with new API + # ones, just update with new fields! + for tid, tdict in trades_by_id.items(): + # ledger.setdefault(tid, {}).update(tdict) + ledger[tid].update(tdict) + + # generate pp msgs and cross check with ib's positions data, relay + # re-formatted pps as msgs to the ems. + msg: dict | None = None + for pos in filter( + bool, + [ + active.get(tx.bs_mktid), + closed.get(tx.bs_mktid) + ] + ): + msg = await update_and_audit_pos_msg( + acctid, + pos, + ibpos, + cons2mkts=client._cons2mkts, + + # ib pp event might not have arrived yet + validate=False, + ) + if msg: + log.info(f'Emitting pp msg: {msg}') + break + + # XXX NO FILL CASE: + # if just handed an `IbPosition`, pack it and relay for now + # since we always want the size to be up to date even if + # the ppu is wrong.. + else: + bs_mktid, msg = pack_position(ibpos, accounts_def) + + if msg: + await ems_stream.send(msg) + else: + await tractor.pause() +# NOTE: See `OrderStatus` def for key schema; +# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 +# => we remap statuses to the ems set via the below table: +# short list: +# - PendingSubmit +# - PendingCancel +# - PreSubmitted (simulated orders) +# - ApiCancelled (cancelled by client before submission +# to routing) +# - Cancelled +# - Filled +# - Inactive (reject or cancelled but not by trader) + +# XXX: here's some other sucky cases from the api +# - short-sale but securities haven't been located, in this +# case we should probably keep the order in some kind of +# weird state or cancel it outright? + +# status='PendingSubmit', message=''), +# status='Cancelled', message='Error 404, +# reqId 1550: Order held while securities are located.'), +# status='PreSubmitted', message='')], _statuses: dict[str, str] = { - 'cancelled': 'canceled', - 'submitted': 'open', + 'Filled': 'filled', + 'Cancelled': 'canceled', + 'Submitted': 'open', - # XXX: just pass these through? it duplicates actual fill events other - # then the case where you the `.remaining == 0` case which is our - # 'closed'` case. - # 'filled': 'pending', - # 'pendingsubmit': 'pending', + 'PendingSubmit': 'pending', + 'PendingCancel': 'pending', + 'PreSubmitted': 'pending', + 'ApiPending': 'pending', + 'ApiCancelled': 'pending', # TODO: see a current ``ib_insync`` issue around this: # https://github.com/erdewit/ib_insync/issues/363 - 'inactive': 'pending', + 'Inactive': 'pending', } + _action_map = { 'BOT': 'buy', 'SLD': 'sell', } +# TODO: try out cancelling inactive orders after delay: +# https://github.com/erdewit/ib_insync/issues/363 (was originally +# inside `deliver_trade_events` status handler block. +# acctid = accounts_def.inverse[trade.order.account] +# double check there is no error when +# cancelling.. gawwwd +# if ib_status_key == 'cancelled': +# last_log = trade.log[-1] +# if ( +# last_log.message +# and 'Error' not in last_log.message +# ): +# ib_status_key = trade.log[-2].status +# +# elif ib_status_key == 'inactive': +# +# async def sched_cancel(): +# log.warning( +# 'OH GAWD an inactive order.scheduling a cancel\n' +# f'{pformat(item)}' +# ) +# proxy = proxies[acctid] +# await proxy.submit_cancel(reqid=trade.order.orderId) +# await trio.sleep(1) +# nurse.start_soon(sched_cancel) +# +# nurse.start_soon(sched_cancel) + + # TODO: maybe just make this a flat func without an interal loop # and call it *from* the `trade_event_stream` loop? Might look # a lot nicer doing that from open_trade_dialog() instead of @@ -901,7 +981,6 @@ async def deliver_trade_events( trade_event_stream: trio.MemoryReceiveChannel, ems_stream: tractor.MsgStream, accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'` - cids2pps: dict[tuple[str, str], BrokerdPosition], proxies: dict[str, MethodProxy], ledgers, @@ -912,39 +991,23 @@ async def deliver_trade_events( Format and relay all trade events for a given client to emsd. ''' - ids2fills: dict[str, dict] = {} + # task local msg dialog tracking + clears: dict[ + Contract, + list[ + IbPosition | None, # filled by positionEvent + Fill | None, # filled by order status and exec details + ] + ] = {} + execid2con: dict[str, Contract] = {} # TODO: for some reason we can receive a ``None`` here when the # ib-gw goes down? Not sure exactly how that's happening looking # at the eventkit code above but we should probably handle it... async for event_name, item in trade_event_stream: - log.info(f'ib sending {event_name}:\n{pformat(item)}') + log.info(f'Relaying `{event_name}`:\n{pformat(item)}') match event_name: - # NOTE: we remap statuses to the ems set via the - # ``_statuses: dict`` above. - - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # XXX: here's some other sucky cases from the api - # - short-sale but securities haven't been located, in this - # case we should probably keep the order in some kind of - # weird state or cancel it outright? - - # status='PendingSubmit', message=''), - # status='Cancelled', message='Error 404, - # reqId 1550: Order held while securities are located.'), - # status='PreSubmitted', message='')], - - case 'status': + case 'orderStatusEvent': # XXX: begin normalization of nonsense ib_insync internal # object-state tracking representations... @@ -952,59 +1015,29 @@ async def deliver_trade_events( # unwrap needed data from ib_insync internal types trade: Trade = item status: OrderStatus = trade.orderStatus - ib_status_key = status.status.lower() - - # TODO: try out cancelling inactive orders after delay: - # https://github.com/erdewit/ib_insync/issues/363 - # acctid = accounts_def.inverse[trade.order.account] - - # double check there is no error when - # cancelling.. gawwwd - # if ib_status_key == 'cancelled': - # last_log = trade.log[-1] - # if ( - # last_log.message - # and 'Error' not in last_log.message - # ): - # ib_status_key = trade.log[-2].status - - # elif ib_status_key == 'inactive': - - # async def sched_cancel(): - # log.warning( - # 'OH GAWD an inactive order.scheduling a cancel\n' - # f'{pformat(item)}' - # ) - # proxy = proxies[acctid] - # await proxy.submit_cancel(reqid=trade.order.orderId) - # await trio.sleep(1) - # nurse.start_soon(sched_cancel) - - # nurse.start_soon(sched_cancel) - - status_key = ( - _statuses.get(ib_status_key.lower()) - or ib_status_key.lower() - ) - - remaining = status.remaining + status_str: str = _statuses[status.status] + remaining: float = status.remaining if ( - status_key == 'filled' + status_str == 'filled' ): fill: Fill = trade.fills[-1] execu: Execution = fill.execution - # execdict = asdict(execu) - # execdict.pop('acctNumber') fill_msg = BrokerdFill( + time_ns=time.time_ns(), # cuz why not + # NOTE: should match the value returned from # `.submit_limit()` reqid=execu.orderId, - time_ns=time.time_ns(), # cuz why not + action=_action_map[execu.side], size=execu.shares, price=execu.price, + + # DO we care? should this be in another + # msg like the position msg? # broker_details=execdict, + # XXX: required by order mode currently broker_time=execu.time, ) @@ -1013,7 +1046,7 @@ async def deliver_trade_events( if remaining == 0: # emit a closed status on filled statuses where # all units were cleared. - status_key = 'closed' + status_str = 'closed' # skip duplicate filled updates - we get the deats # from the execution details event @@ -1023,7 +1056,7 @@ async def deliver_trade_events( account=accounts_def.inverse[trade.order.account], # everyone doin camel case.. - status=status_key, # force lower case + status=status_str, filled=status.filled, reason=status.whyHeld, @@ -1037,107 +1070,134 @@ async def deliver_trade_events( await ems_stream.send(msg) continue - case 'fill': - # for wtv reason this is a separate event type - # from IB, not sure why it's needed other then for extra - # complexity and over-engineering :eyeroll:. - # we may just end up dropping these events (or - # translating them to ``Status`` msgs) if we can - # show the equivalent status events are no more latent. - - # unpack ib_insync types - # pep-0526 style: - # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations + # XXX: for wtv reason this is a separate event type + # from IB, not sure why it's needed other then for extra + # complexity and over-engineering :eyeroll:. + # we may just end up dropping these events (or + # translating them to ``Status`` msgs) if we can + # show the equivalent status events are no more latent. + case 'execDetailsEvent': + # unpack attrs pep-0526 style. trade: Trade + con: Contract = trade.contract fill: Fill trade, fill = item execu: Execution = fill.execution - execid = execu.execId + execid: str = execu.execId + report: CommissionReport = fill.commissionReport + + # always fill in id to con map so when commissions + # arrive we can maybe fire the pos update.. + execid2con[execid] = con # TODO: # - normalize out commissions details? # - this is the same as the unpacking loop above in # ``trades_to_ledger_entries()`` no? - trade_entry = ids2fills.setdefault(execid, {}) - cost_already_rx = bool(trade_entry) - - # if the costs report was already received this - # should be not empty right? - comms = fill.commissionReport.commission - if cost_already_rx: - assert comms - - trade_entry.update( - { - 'contract': asdict(fill.contract), - 'execution': asdict(fill.execution), - # 'commissionReport': asdict(fill.commissionReport), - # supposedly server fill time? - 'broker_time': execu.time, - 'name': 'ib', - } - ) # 2 cases: # - fill comes first or - # - comms report comes first - comms = fill.commissionReport.commission - if comms: - # UGHHH since the commision report object might be - # filled in **after** we already serialized to dict.. - # def need something better for all this. - trade_entry.update( - {'commissionReport': asdict(fill.commissionReport)} - ) + # - commission report comes first + clear: tuple = clears.setdefault( + con, + [None, fill], + ) + pos, _fill = clear - if comms or cost_already_rx: - # only send a pp update once we have a cost report + # NOTE: we have to handle the case where a pos msg + # has already been set (bc we already relayed rxed + # one before both the exec-deats AND the + # comms-report?) but the comms-report hasn't yet + # arrived, so we fill in the fill (XD) and wait for + # the cost to show up before relaying the pos msg + # to the EMS.. + if _fill is None: + clear[1] = fill + + cost: float = report.commission + if ( + pos + and fill + and cost + ): await emit_pp_update( ems_stream, - trade_entry, accounts_def, proxies, - cids2pps, - ledgers, tables, - ) - case 'cost': + ibpos=pos, + fill=fill, + ) + clears.pop(con) + + case 'commissionReportEvent': cr: CommissionReport = item - execid = cr.execId - - trade_entry = ids2fills.setdefault(execid, {}) - fill_already_rx = bool(trade_entry) + execid: str = cr.execId # only fire a pp msg update if, # - we haven't already # - the fill event has already arrived # but it didn't yet have a commision report # which we fill in now. + + # placehold i guess until someone who know wtf + # contract this is from can fill it in... + con: Contract | None = execid2con.setdefault(execid, None) if ( - fill_already_rx - and 'commissionReport' not in trade_entry + con + and (clear := clears.get(con)) ): - # no fill msg has arrived yet so just fill out the - # cost report for now and when the fill arrives a pp - # msg can be emitted. - trade_entry.update( - {'commissionReport': asdict(cr)} - ) + pos, fill = clear + if ( + pos + and fill + ): + assert fill.commissionReport == cr + await emit_pp_update( + ems_stream, + accounts_def, + proxies, + ledgers, + tables, - await emit_pp_update( - ems_stream, - trade_entry, - accounts_def, - proxies, - cids2pps, + ibpos=pos, + fill=fill, + ) + clears.pop(con) + # TODO: should we clean this? + # execid2con.pop(execid) - ledgers, - tables, - ) + # always update with latest ib pos msg info since + # we generally audit against it for sanity and + # testing AND we require it to be updated to avoid + # error msgs emitted from `update_and_audit_pos_msg()` + case 'positionEvent': + pos: IbPosition = item + con: Contract = pos.contract + + bs_mktid, ppmsg = pack_position(pos, accounts_def) + log.info(f'New IB position msg: {ppmsg}') + + _, fill = clears.setdefault( + con, + [pos, None], + ) + # only send a pos update once we've actually rxed + # the msg from IB since generally speaking we use + # their 'cumsize' as gospel. + await emit_pp_update( + ems_stream, + accounts_def, + proxies, + ledgers, + tables, + + ibpos=pos, + ) case 'error': err: dict = item @@ -1163,21 +1223,6 @@ async def deliver_trade_events( # broker_details={'name': 'ib'}, # )) - case 'position': - - pos: IbPosition = item - bs_mktid, msg = pack_position(pos) - log.info(f'New IB position msg: {msg}') - - # always update with latest ib pos msg info since - # we generally audit against it for sanity and - # testing AND we require it to be updated to avoid - # error msgs emitted from `update_and_audit_msgs()` - cids2pps[(msg.account, bs_mktid)] = msg - - # cuck ib and it's shitty fifo sys for pps! - continue - case 'event': # it's either a general system status event or an external diff --git a/piker/brokers/ib/ledger.py b/piker/brokers/ib/ledger.py index cc79122c..9143cce6 100644 --- a/piker/brokers/ib/ledger.py +++ b/piker/brokers/ib/ledger.py @@ -18,17 +18,26 @@ Trade transaction accounting and normalization. ''' +from __future__ import annotations from bisect import insort +from dataclasses import asdict from decimal import Decimal from functools import partial from pprint import pformat from typing import ( Any, Callable, + TYPE_CHECKING, ) from bidict import bidict import pendulum +from ib_insync.objects import ( + Contract, + Fill, + Execution, + CommissionReport, +) from piker.data import ( Struct, @@ -45,6 +54,12 @@ from piker.accounting import ( from ._flex_reports import parse_flex_dt from ._util import log +if TYPE_CHECKING: + from .api import ( + Client, + MethodProxy, + ) + tx_sort: Callable = partial( iter_by_dt, @@ -71,7 +86,8 @@ def norm_trade( ) -> Transaction | None: - conid = record.get('conId') or record['conid'] + conid: int = str(record.get('conId') or record['conid']) + bs_mktid: str = str(conid) comms = record.get('commission') if comms is None: comms = -1*record['ibCommission'] @@ -86,7 +102,11 @@ def norm_trade( }[record['side']] symbol: str = record['symbol'] - exch: str = record.get('listingExchange') or record['exchange'] + exch: str = ( + record.get('listingExchange') + or record.get('primaryExchange') + or record['exchange'] + ) # NOTE: remove null values since `tomlkit` can't serialize # them to file. @@ -156,11 +176,31 @@ def norm_trade( match asset_type: case 'FUT': - # (flex) ledger entries don't have any simple 3-char key? - # TODO: XXX: WOA this is kinda hacky.. probably - # should figure out the correct future pair key more - # explicitly and consistently? - symbol: str = symbol[:3] + # XXX (flex) ledger entries don't necessarily have any + # simple 3-char key.. sometimes the .symbol is some + # weird internal key that we probably don't want in the + # .fqme => we should probably just wrap `Contract` to + # this like we do other crypto$ backends XD + + # NOTE: at least older FLEX records should have + # this field.. no idea about API entries.. + local_symbol: str | None = record.get('localSymbol') + underlying_key: str = record.get('underlyingSymbol') + descr: str | None = record.get('description') + + if ( + not ( + local_symbol + and symbol in local_symbol + ) + and ( + descr + and symbol not in descr + ) + ): + con_key, exp_str = descr.split(' ') + symbol: str = underlying_key or con_key + dst = Asset( name=symbol.lower(), atype='future', @@ -206,8 +246,9 @@ def norm_trade( # NOTE: can't serlialize `tomlkit.String` so cast to native atype: str = str(dst.atype) + # if not (mkt := symcache.mktmaps.get(bs_mktid)): mkt = MktPair( - bs_mktid=str(conid), + bs_mktid=bs_mktid, dst=dst, price_tick=price_tick, @@ -232,7 +273,21 @@ def norm_trade( # XXX: if passed in, we fill out the symcache ad-hoc in order # to make downstream accounting work.. - if symcache: + if symcache is not None: + orig_mkt: MktPair | None = symcache.mktmaps.get(bs_mktid) + if ( + orig_mkt + and orig_mkt.fqme != mkt.fqme + ): + log.warning( + # print( + f'Contracts with common `conId`: {bs_mktid} mismatch..\n' + f'{orig_mkt.fqme} -> {mkt.fqme}\n' + # 'with DIFF:\n' + # f'{mkt - orig_mkt}' + ) + + symcache.mktmaps[bs_mktid] = mkt symcache.mktmaps[fqme] = mkt symcache.assets[src.name] = src symcache.assets[dst.name] = dst @@ -271,9 +326,7 @@ def norm_trade_records( extraction to fill in the `Transaction.sys: MktPair` field. ''' - # select: list[transactions] = [] records: list[Transaction] = [] - for tid, record in ledger.items(): txn = norm_trade( @@ -294,64 +347,54 @@ def norm_trade_records( key=lambda t: t.dt ) - # if ( - # atype == 'fiat' - # or atype == 'option' - # ): - # select.append(trans) - - # if select: - # breakpoint() - return {r.tid: r for r in records} def api_trades_to_ledger_entries( accounts: bidict[str, str], - - # TODO: maybe we should just be passing through the - # ``ib_insync.order.Trade`` instance directly here - # instead of pre-casting to dicts? - trade_entries: list[dict], + fills: list[Fill], ) -> dict[str, dict]: ''' - Convert API execution objects entry objects into ``dict`` form, - pretty much straight up without modification except add - a `pydatetime` field from the parsed timestamp. + Convert API execution objects entry objects into + flattened-``dict`` form, pretty much straight up without + modification except add a `pydatetime` field from the parsed + timestamp so that on write ''' trades_by_account: dict[str, dict] = {} - for t in trade_entries: - # NOTE: example of schema we pull from the API client. - # { - # 'commissionReport': CommissionReport(... - # 'contract': {... - # 'execution': Execution(... - # 'time': 1654801166.0 - # } + for fill in fills: - # flatten all sub-dicts and values into one top level entry. - entry = {} - for section, val in t.items(): - match section: + # NOTE: for the schema, see the defn for `Fill` which is + # a `NamedTuple` subtype + fdict: dict = fill._asdict() + + # flatten all (sub-)objects and convert to dicts. + # with values packed into one top level entry. + val: CommissionReport | Execution | Contract + txn_dict: dict[str, Any] = {} + for attr_name, val in fdict.items(): + match attr_name: + # value is a `@dataclass` subtype case 'contract' | 'execution' | 'commissionReport': - # sub-dict cases - entry.update(val) + txn_dict.update(asdict(val)) case 'time': # ib has wack ns timestamps, or is that us? continue + # TODO: we can remove this case right since there's + # only 4 fields on a `Fill`? case _: - entry[section] = val + txn_dict[attr_name] = val - tid = str(entry['execId']) - dt = pendulum.from_timestamp(entry['time']) - # TODO: why isn't this showing seconds in the str? - entry['pydatetime'] = dt - entry['datetime'] = str(dt) - acctid = accounts[entry['acctNumber']] + tid = str(txn_dict['execId']) + dt = pendulum.from_timestamp(txn_dict['time']) + txn_dict['datetime'] = str(dt) + acctid = accounts[txn_dict['acctNumber']] + + # NOTE: only inserted (then later popped) for sorting below! + txn_dict['pydatetime'] = dt if not tid: # this is likely some kind of internal adjustment @@ -362,13 +405,18 @@ def api_trades_to_ledger_entries( # the user from the accounts window in TWS where they can # manually set the avg price and size: # https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST - log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') + log.warning( + 'Skipping ID-less ledger txn_dict:\n' + f'{pformat(txn_dict)}' + ) continue trades_by_account.setdefault( acctid, {} - )[tid] = entry + )[tid] = txn_dict + # TODO: maybe we should just bisect.insort() into a list of + # tuples and then return a dict of that? # sort entries in output by python based datetime for acctid in trades_by_account: trades_by_account[acctid] = dict(sorted( @@ -377,3 +425,55 @@ def api_trades_to_ledger_entries( )) return trades_by_account + + +async def update_ledger_from_api_trades( + fills: list[Fill], + client: Client | MethodProxy, + accounts_def_inv: bidict[str, str], + + # NOTE: provided for ad-hoc insertions "as transactions are + # processed" -> see `norm_trade()` signature requirements. + symcache: SymbologyCache | None = None, + +) -> tuple[ + dict[str, Transaction], + dict[str, dict], +]: + # XXX; ERRGGG.. + # pack in the "primary/listing exchange" value from a + # contract lookup since it seems this isn't available by + # default from the `.fills()` method endpoint... + fill: Fill + for fill in fills: + con: Contract = fill.contract + conid: str = con.conId + pexch: str | None = con.primaryExchange + + if not pexch: + cons = await client.get_con(conid=conid) + if cons: + con = cons[0] + pexch = con.primaryExchange or con.exchange + else: + # for futes it seems like the primary is always empty? + pexch: str = con.exchange + + # pack in the ``Contract.secType`` + # entry['asset_type'] = condict['secType'] + + entries: dict[str, dict] = api_trades_to_ledger_entries( + accounts_def_inv, + fills, + ) + # normalize recent session's trades to the `Transaction` type + trans_by_acct: dict[str, dict[str, Transaction]] = {} + + for acctid, trades_by_id in entries.items(): + # normalize to transaction form + trans_by_acct[acctid] = norm_trade_records( + trades_by_id, + symcache=symcache, + ) + + return trans_by_acct, entries