diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 93128240..d881a866 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS. """ from __future__ import annotations +from contextlib import ExitStack from dataclasses import asdict from functools import partial from pprint import pformat @@ -35,8 +36,8 @@ from trio_typing import TaskStatus import tractor from ib_insync.contract import ( Contract, - Option, - Forex, + # Option, + # Forex, ) from ib_insync.order import ( Trade, @@ -47,11 +48,18 @@ from ib_insync.objects import ( Execution, CommissionReport, ) -from ib_insync.objects import Position +from ib_insync.objects import Position as IbPosition import pendulum from piker import config -from piker import pp +from piker.pp import ( + # update_pps_conf, + Position, + Transaction, + open_trade_ledger, + open_pps, + PpTable, +) from piker.log import get_console_log from piker.clearing._messages import ( BrokerdOrder, @@ -66,7 +74,8 @@ from piker.data._source import Symbol from .api import ( _accounts2clients, # _adhoc_futes_set, - _adhoc_symbol_map, + con2fqsn, + # _adhoc_symbol_map, log, get_config, open_client_proxies, @@ -76,49 +85,12 @@ from .api import ( def pack_position( - pos: Position + pos: IbPosition ) -> dict[str, Any]: + con = pos.contract - - if isinstance(con, Option): - # TODO: option symbol parsing and sane display: - symbol = con.localSymbol.replace(' ', '') - - else: - # TODO: lookup fqsn even for derivs. - symbol = con.symbol.lower() - - # TODO: probably write a mofo exchange mapper routine since ib - # can't get it's shit together like, ever. - - # try our best to figure out the exchange / venue - exch = (con.primaryExchange or con.exchange).lower() - if not exch: - - if isinstance(con, Forex): - # bc apparently it's not in the contract obj? - exch = 'idealfx' - - else: - # for wtv cucked reason some futes don't show their - # exchange (like CL.NYMEX) ... - entry = _adhoc_symbol_map.get( - con.symbol or con.localSymbol - ) - if entry: - meta, kwargs = entry - cid = meta.get('conId') - if cid: - assert con.conId == meta['conId'] - exch = meta['exchange'] - - assert exch, f'No clue:\n {con}' - fqsn = '.'.join((symbol, exch)) - - expiry = con.lastTradeDateOrContractMonth - if expiry: - fqsn += f'.{expiry}' + fqsn, calc_price = con2fqsn(con) # TODO: options contracts into a sane format.. return ( @@ -305,12 +277,10 @@ async def update_ledger_from_api_trades( client: Union[Client, MethodProxy], ) -> tuple[ - dict[str, pp.Transaction], + dict[str, Transaction], dict[str, dict], ]: - conf = get_config() - # XXX; ERRGGG.. # pack in the "primary/listing exchange" value from a # contract lookup since it seems this isn't available by @@ -331,39 +301,34 @@ async def update_ledger_from_api_trades( entry['listingExchange'] = pexch + conf = get_config() entries = trades_to_ledger_entries( conf['accounts'].inverse, trade_entries, ) - # write recent session's trades to the user's (local) ledger file. - records: dict[str, pp.Transactions] = {} + # normalize recent session's trades to the `Transaction` type + trans_by_acct: dict[str, dict[str, Transaction]] = {} for acctid, trades_by_id in entries.items(): # normalize to transaction form - records[acctid] = norm_trade_records(trades_by_id) + trans_by_acct[acctid] = norm_trade_records(trades_by_id) - return records, entries + return trans_by_acct, entries async def update_and_audit_msgs( acctid: str, # no `ib.` prefix is required! - pps: list[pp.Position], + pps: list[Position], cids2pps: dict[tuple[str, int], BrokerdPosition], validate: bool = False, ) -> list[BrokerdPosition]: msgs: list[BrokerdPosition] = [] - # pps: dict[int, pp.Position] = {} - for p in pps: bsuid = p.bsuid - # build trade-session-actor local table - # of pps from unique symbol ids. - # pps[bsuid] = p - # retreive equivalent ib reported position message # for comparison/audit versus the piker equivalent # breakeven pp calcs. @@ -479,7 +444,7 @@ async def trades_dialogue( client = aioclients[account] - async def open_stream( + async def open_trade_event_stream( task_status: TaskStatus[ trio.abc.ReceiveChannel ] = trio.TASK_STATUS_IGNORED, @@ -493,7 +458,7 @@ async def trades_dialogue( task_status.started(trade_event_stream) await trio.sleep_forever() - trade_event_stream = await nurse.start(open_stream) + trade_event_stream = await nurse.start(open_trade_event_stream) clients.append((client, trade_event_stream)) @@ -501,122 +466,147 @@ async def trades_dialogue( accounts.add(account) cids2pps: dict[str, BrokerdPosition] = {} - update_records: dict[str, bidict] = {} - # process pp value reported from ib's system. we only use these - # to cross-check sizing since average pricing on their end uses - # the so called (bs) "FIFO" style which more or less results in - # a price that's not useful for traders who want to not lose - # money.. xb - for client in aioclients.values(): - for pos in client.positions(): - - cid, msg = pack_position(pos) - acctid = msg.account = accounts_def.inverse[msg.account] - acctid = acctid.strip('ib.') - cids2pps[(acctid, cid)] = msg - assert msg.account in accounts, ( - f'Position for unknown account: {msg.account}') - - # collect all ib-pp reported positions so that we can be - # sure know which positions to update from the ledger if - # any are missing from the ``pps.toml`` - update_records.setdefault(acctid, bidict())[cid] = msg.symbol - - # update trades ledgers for all accounts from - # connected api clients which report trades for **this session**. - new_trades = {} - for account, proxy in proxies.items(): - trades = await proxy.trades() - ( - records_by_acct, - ledger_entries, - ) = await update_ledger_from_api_trades( - trades, - proxy, - ) - new_trades.update(records_by_acct) - - for acctid, trans in new_trades.items(): - for t in trans: - bsuid = t.bsuid - if bsuid in update_records: - assert update_records[bsuid] == t.fqsn - else: - update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn - - # load all positions from `pps.toml`, cross check with ib's - # positions data, and relay re-formatted pps as msgs to the ems. - # __2 cases__: - # - new trades have taken place this session that we want to - # always reprocess indempotently, - # - no new trades yet but we want to reload and audit any - # positions reported by ib's sys that may not yet be in - # piker's ``pps.toml`` state-file. - for acctid, to_update in update_records.items(): - trans = new_trades.get(acctid) - active, closed = pp.update_pps_conf( - 'ib', - acctid, - trade_records=trans, - ledger_reload=to_update, - ) - for pps in [active, closed]: - msgs = await update_and_audit_msgs( - acctid, - pps.values(), - cids2pps, - validate=True, - ) - all_positions.extend(msg for msg in msgs) - - if not all_positions and cids2pps: - raise RuntimeError( - 'Positions reported by ib but not found in `pps.toml`!?\n' - f'{pformat(cids2pps)}' - ) - - # log.info(f'Loaded {len(trades)} from this session') - # TODO: write trades to local ``trades.toml`` - # - use above per-session trades data and write to local file - # - get the "flex reports" working and pull historical data and - # also save locally. - - await ctx.started(( - all_positions, - tuple(name for name in accounts_def if name in accounts), - )) - - # TODO: maybe just write on teardown? - # we might also want to delegate a specific actor for - # ledger writing / reading for speed? - - # write ledger with all new trades **AFTER** we've updated the - # `pps.toml` from the original ledger state! - for acctid, trades_by_id in ledger_entries.items(): - with pp.open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, + # Open a trade ledgers stack for appending trade records over + # multiple accounts. + # TODO: we probably want to generalize this into a "ledgers" api.. + ledgers: dict[str, dict] = {} + tables: dict[str, PpTable] = {} + with ( + ExitStack() as lstack, ): - # start order request handler **before** local trades event loop - n.start_soon(handle_order_requests, ems_stream, accounts_def) - # allocate event relay tasks for each client connection - for client, stream in clients: - n.start_soon( - deliver_trade_events, - stream, - ems_stream, - accounts_def, - cids2pps, - proxies, + # process pp value reported from ib's system. we only use these + # to cross-check sizing since average pricing on their end uses + # the so called (bs) "FIFO" style which more or less results in + # a price that's not useful for traders who want to not lose + # money.. xb + acctids = set() + for client in aioclients.values(): + for pos in client.positions(): + + # collect all ib-pp reported positions so that we can be + # sure know which positions to update from the ledger if + # any are missing from the ``pps.toml`` + cid, msg = pack_position(pos) + acctid = msg.account = accounts_def.inverse[msg.account] + acctid = acctid.strip('ib.') + acctids.add(acctid) + + cids2pps[(acctid, cid)] = msg + assert msg.account in accounts, ( + f'Position for unknown account: {msg.account}') + + for acctid in acctids: + # open ledger and pptable wrapper for each + # detected account. + ledgers[acctid] = lstack.enter_context( + open_trade_ledger('ib', acctid) + ) + tables[acctid] = lstack.enter_context( + open_pps('ib', acctid) ) - # block until cancelled - await trio.sleep_forever() + # update trades ledgers for all accounts from + # connected api clients which report trades for **this session**. + for account, proxy in proxies.items(): + + trades = await proxy.trades() + ( + trans_by_acct, + ready_for_ledger_entries, + ) = await update_ledger_from_api_trades( + trades, + proxy, + ) + acctid = acctid.strip('ib.') + ledgers[acctid].update(ready_for_ledger_entries) + + # WTF, yet again this key error is getting ignored?!?! + # tables[acctid].update_from_trans(trans_by_acct[account]) + + # this causes a hang.. + # - marketstored tries to kill container, cant, + # - ctrl-c makes pikerd get stuck... + # assert 0 + + trans = trans_by_acct.get(acctid) + if trans: + tables[acctid].update_from_trans(trans) + + # load all positions from `pps.toml`, cross check with ib's + # positions data, and relay re-formatted pps as msgs to the ems. + # __2 cases__: + # - new trades have taken place this session that we want to + # always reprocess indempotently, + # - no new trades yet but we want to reload and audit any + # positions reported by ib's sys that may not yet be in + # piker's ``pps.toml`` state-file. + for acctid in acctids: + + table = tables[acctid] + _, closed_pps = table.dump_active('ib') + active_pps = table.pps + + for pps in [active_pps, closed_pps]: + msgs = await update_and_audit_msgs( + acctid, + pps.values(), + cids2pps, + validate=True, + ) + all_positions.extend(msg for msg in msgs) + + if not all_positions and cids2pps: + raise RuntimeError( + 'Positions reported by ib but not found in `pps.toml`!?\n' + f'{pformat(cids2pps)}' + ) + + # log.info(f'Loaded {len(trades)} from this session') + # TODO: write trades to local ``trades.toml`` + # - use above per-session trades data and write to local file + # - get the "flex reports" working and pull historical data and + # also save locally. + + await ctx.started(( + all_positions, + tuple(name for name in accounts_def if name in accounts), + )) + + # TODO: maybe just write on teardown? + # we might also want to delegate a specific actor for + # ledger writing / reading for speed? + + # write ledger with all new trades **AFTER** we've updated the + # `pps.toml` from the original ledger state! + for acctid, trades_by_id in ready_for_ledger_entries.items(): + ledgers[acctid].update(trades_by_id) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + # start order request handler **before** local trades + # event loop + n.start_soon(handle_order_requests, ems_stream, accounts_def) + + # allocate event relay tasks for each client connection + for client, stream in clients: + n.start_soon( + deliver_trade_events, + stream, + ems_stream, + accounts_def, + cids2pps, + proxies, + + ledgers, + tables, + ) + + # block until cancelled + await trio.sleep_forever() async def emit_pp_update( @@ -626,44 +616,44 @@ async def emit_pp_update( proxies: dict, cids2pps: dict, + ledgers, + tables, + ) -> None: # compute and relay incrementally updated piker pp acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] proxy = proxies[acctid] - acctname = acctid.strip('ib.') - records_by_acct, ledger_entries = await update_ledger_from_api_trades( + acctid = acctid.strip('ib.') + ( + records_by_acct, + ready_for_ledger_entries, + ) = await update_ledger_from_api_trades( [trade_entry], proxy, ) - records = records_by_acct[acctname] - r = records[0] + trans = records_by_acct[acctid] + r = list(trans.values())[0] - # update and load all positions from `pps.toml`, cross check with - # ib's positions data, and relay re-formatted pps as msgs to the - # ems. we report both the open and closed updates in one map since - # for incremental update we may have just fully closed a pp and need - # to relay that msg as well! - active, closed = pp.update_pps_conf( - 'ib', - acctname, - trade_records=records, - ledger_reload={r.bsuid: r.fqsn}, - ) + table = tables[acctid] + table.update_from_trans(trans) + _, closed = table.dump_active('ib') + active = table.pps - # NOTE: write ledger with all new trades **AFTER** we've updated the - # `pps.toml` from the original ledger state! - for acctid, trades_by_id in ledger_entries.items(): - with pp.open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) + # NOTE: update ledger with all new trades + for acctid, trades_by_id in ready_for_ledger_entries.items(): + ledger = ledgers[acctid] + ledger.update(trades_by_id) + # generate pp msgs and cross check with ib's positions data, relay + # re-formatted pps as msgs to the ems. for pos in filter( bool, [active.get(r.bsuid), closed.get(r.bsuid)] ): msgs = await update_and_audit_msgs( - acctname, + acctid, [pos], cids2pps, @@ -685,6 +675,9 @@ async def deliver_trade_events( cids2pps: dict[tuple[str, str], BrokerdPosition], proxies: dict[str, MethodProxy], + ledgers, + tables, + ) -> None: ''' Format and relay all trade events for a given client to emsd. @@ -834,6 +827,8 @@ async def deliver_trade_events( accounts_def, proxies, cids2pps, + ledgers, + tables, ) case 'cost': @@ -866,6 +861,8 @@ async def deliver_trade_events( accounts_def, proxies, cids2pps, + ledgers, + tables, ) case 'error': @@ -916,14 +913,13 @@ async def deliver_trade_events( def norm_trade_records( ledger: dict[str, Any], -) -> list[pp.Transaction]: +) -> list[Transaction]: ''' Normalize a flex report or API retrieved executions ledger into our standard record format. ''' - records: list[pp.Transaction] = [] - + records: dict[str, Transaction] = {} for tid, record in ledger.items(): conid = record.get('conId') or record['conid'] @@ -1001,7 +997,7 @@ def norm_trade_records( # which case, we can pull the fqsn from that table (see # `trades_dialogue()` above). - records.append(pp.Transaction( + records[tid] = Transaction( fqsn=fqsn, tid=tid, size=size, @@ -1010,7 +1006,7 @@ def norm_trade_records( dt=dt, expiry=expiry, bsuid=conid, - )) + ) return records @@ -1151,7 +1147,7 @@ def load_flex_trades( ledgers = {} for acctid, trades_by_id in trades_by_account.items(): - with pp.open_trade_ledger('ib', acctid) as ledger: + with open_trade_ledger('ib', acctid) as ledger: ledger.update(trades_by_id) ledgers[acctid] = ledger