From 116f7fd40f078f480b60a8608840814bcedca42e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 22 Mar 2023 14:09:23 -0400 Subject: [PATCH] WIP: refactor ib pp load init --- piker/brokers/ib/broker.py | 276 ++++++++++++++++++++++--------------- 1 file changed, 163 insertions(+), 113 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 66dfe212..7cd857d7 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -59,7 +59,7 @@ from piker.accounting import ( open_pps, PpTable, ) -from piker.log import get_console_log +from .._util import get_console_log from piker.clearing._messages import ( Order, Status, @@ -281,18 +281,21 @@ async def recv_trade_updates( async def update_ledger_from_api_trades( trade_entries: list[dict[str, Any]], client: Union[Client, MethodProxy], + accounts_def_inv: bidict[str, str], ) -> tuple[ dict[str, Transaction], dict[str, dict], ]: - # XXX; ERRGGG.. # pack in the "primary/listing exchange" value from a # contract lookup since it seems this isn't available by # default from the `.fills()` method endpoint... for entry in trade_entries: condict = entry['contract'] + # print( + # f"{condict['symbol']}: GETTING CONTRACT INFO!\n" + # ) conid = condict['conId'] pexch = condict['primaryExchange'] @@ -310,9 +313,8 @@ async def update_ledger_from_api_trades( # pack in the ``Contract.secType`` entry['asset_type'] = condict['secType'] - conf = get_config() entries = api_trades_to_ledger_entries( - conf['accounts'].inverse, + accounts_def_inv, trade_entries, ) # normalize recent session's trades to the `Transaction` type @@ -340,9 +342,16 @@ async def update_and_audit_msgs( # retreive equivalent ib reported position message # for comparison/audit versus the piker equivalent # breakeven pp calcs. + # if ( + # acctid == 'reg' + # and bs_mktid == 36285627 + # ): + # await tractor.breakpoint() + ibppmsg = cids2pps.get((acctid, bs_mktid)) if ibppmsg: + symbol = ibppmsg.symbol msg = BrokerdPosition( broker='ib', @@ -353,7 +362,7 @@ async def update_and_audit_msgs( # table.. account=ibppmsg.account, # XXX: the `.ib` is stripped..? - symbol=ibppmsg.symbol, + symbol=symbol, currency=ibppmsg.currency, size=p.size, avg_price=p.ppu, @@ -432,6 +441,81 @@ async def update_and_audit_msgs( return msgs +async def aggr_open_orders( + order_msgs: list[Status], + client: Client, + proxy: MethodProxy, + accounts_def: bidict[str, str], + +) -> None: + ''' + Collect all open orders from client and fill in `order_msgs: list`. + + ''' + trades: list[Trade] = client.ib.openTrades() + for trade in trades: + order = trade.order + quant = trade.order.totalQuantity + action = order.action.lower() + size = { + 'sell': -1, + 'buy': 1, + }[action] * quant + con = trade.contract + + # TODO: in the case of the SMART venue (aka ib's + # router-clearing sys) we probably should handle + # showing such orders overtop of the fqsn for the + # primary exchange, how to map this easily is going + # to be a bit tricky though? + deats = await proxy.con_deats(contracts=[con]) + fqsn = list(deats)[0] + + reqid = order.orderId + + # TODO: maybe embed a ``BrokerdOrder`` instead + # since then we can directly load it on the client + # side in the order mode loop? + msg = Status( + time_ns=time.time_ns(), + resp='open', + oid=str(reqid), + reqid=reqid, + + # embedded order info + req=Order( + action=action, + exec_mode='live', + oid=str(reqid), + symbol=fqsn, + account=accounts_def.inverse[order.account], + price=order.lmtPrice, + size=size, + ), + src='ib', + ) + order_msgs.append(msg) + + return order_msgs + + +# proxy wrapper for starting trade event stream +async def open_trade_event_stream( + client: Client, + task_status: TaskStatus[ + trio.abc.ReceiveChannel + ] = trio.TASK_STATUS_IGNORED, +): + # each api client has a unique event stream + async with tractor.to_asyncio.open_channel_from( + recv_trade_updates, + client=client, + ) as (first, trade_event_stream): + + task_status.started(trade_event_stream) + await trio.sleep_forever() + + @tractor.context async def trades_dialogue( @@ -465,7 +549,10 @@ async def trades_dialogue( # we might also want to delegate a specific actor for # ledger writing / reading for speed? async with ( - open_client_proxies() as (proxies, aioclients), + open_client_proxies() as ( + proxies, + aioclients, + ), ): # Open a trade ledgers stack for appending trade records over # multiple accounts. @@ -473,6 +560,9 @@ async def trades_dialogue( ledgers: dict[str, dict] = {} tables: dict[str, PpTable] = {} order_msgs: list[Status] = [] + conf = get_config() + accounts_def_inv = conf['accounts'].inverse + with ( ExitStack() as lstack, ): @@ -491,7 +581,17 @@ async def trades_dialogue( acctid, ) ) - table = tables[acctid] = lstack.enter_context( + + # load all positions from `pps.toml`, cross check with + # ib's positions data, and relay re-formatted pps as + # msgs to the ems. + # __2 cases__: + # - new trades have taken place this session that we want to + # always reprocess indempotently, + # - no new trades yet but we want to reload and audit any + # positions reported by ib's sys that may not yet be in + # piker's ``pps.toml`` state-file. + tables[acctid] = lstack.enter_context( open_pps( 'ib', acctid, @@ -501,57 +601,54 @@ async def trades_dialogue( for account, proxy in proxies.items(): client = aioclients[account] - trades: list[Trade] = client.ib.openTrades() - for trade in trades: - order = trade.order - quant = trade.order.totalQuantity - action = order.action.lower() - size = { - 'sell': -1, - 'buy': 1, - }[action] * quant - con = trade.contract - # TODO: in the case of the SMART venue (aka ib's - # router-clearing sys) we probably should handle - # showing such orders overtop of the fqsn for the - # primary exchange, how to map this easily is going - # to be a bit tricky though? - deats = await proxy.con_deats(contracts=[con]) - fqsn = list(deats)[0] + # order_msgs is filled in by this helper + await aggr_open_orders( + order_msgs, + client, + proxy, + accounts_def, + ) + acctid: str = account.strip('ib.') + ledger: dict = ledgers[acctid] + table: PpTable = tables[acctid] - reqid = order.orderId - - # TODO: maybe embed a ``BrokerdOrder`` instead - # since then we can directly load it on the client - # side in the order mode loop? - msg = Status( - time_ns=time.time_ns(), - resp='open', - oid=str(reqid), - reqid=reqid, - - # embedded order info - req=Order( - action=action, - exec_mode='live', - oid=str(reqid), - symbol=fqsn, - account=accounts_def.inverse[order.account], - price=order.lmtPrice, - size=size, - ), - src='ib', + # update trades ledgers for all accounts from connected + # api clients which report trades for **this session**. + trades = await proxy.trades() + if trades: + ( + trans_by_acct, + api_to_ledger_entries, + ) = await update_ledger_from_api_trades( + trades, + proxy, + accounts_def_inv, ) - order_msgs.append(msg) - # process pp value reported from ib's system. we only use these - # to cross-check sizing since average pricing on their end uses - # the so called (bs) "FIFO" style which more or less results in - # a price that's not useful for traders who want to not lose - # money.. xb + # if new trades are detected from the API, prepare + # them for the ledger file and update the pptable. + if api_to_ledger_entries: + trade_entries = api_to_ledger_entries.get(acctid) + + if trade_entries: + # write ledger with all new trades + # **AFTER** we've updated the + # `pps.toml` from the original + # ledger state! (i.e. this is + # currently done on exit) + ledger.update(trade_entries) + + trans = trans_by_acct.get(acctid) + if trans: + table.update_from_trans(trans) + + # process pp value reported from ib's system. we only + # use these to cross-check sizing since average pricing + # on their end uses the so called (bs) "FIFO" style + # which more or less results in a price that's not + # useful for traders who want to not lose money.. xb for pos in client.positions(): - # collect all ib-pp reported positions so that we can be # sure know which positions to update from the ledger if # any are missing from the ``pps.toml`` @@ -560,13 +657,14 @@ async def trades_dialogue( acctid = msg.account = accounts_def.inverse[msg.account] acctid = acctid.strip('ib.') cids2pps[(acctid, bs_mktid)] = msg + assert msg.account in accounts, ( f'Position for unknown account: {msg.account}') - ledger = ledgers[acctid] - table = tables[acctid] + ledger: dict = ledgers[acctid] + table: PpTable = tables[acctid] + pp: Position = table.pps.get(bs_mktid) - pp = table.pps.get(bs_mktid) if ( not pp or pp.size != msg.size @@ -574,33 +672,6 @@ async def trades_dialogue( trans = norm_trade_records(ledger) table.update_from_trans(trans) - # update trades ledgers for all accounts from connected - # api clients which report trades for **this session**. - trades = await proxy.trades() - ( - trans_by_acct, - api_to_ledger_entries, - ) = await update_ledger_from_api_trades( - trades, - proxy, - ) - - # if new trades are detected from the API, prepare - # them for the ledger file and update the pptable. - if api_to_ledger_entries: - trade_entries = api_to_ledger_entries.get(acctid) - - if trade_entries: - # write ledger with all new trades **AFTER** - # we've updated the `pps.toml` from the - # original ledger state! (i.e. this is - # currently done on exit) - ledger.update(trade_entries) - - trans = trans_by_acct.get(acctid) - if trans: - table.update_from_trans(trans) - # XXX: not sure exactly why it wouldn't be in # the updated output (maybe this is a bug?) but # if you create a pos from TWS and then load it @@ -630,17 +701,12 @@ async def trades_dialogue( f'piker: {pp.size}\n' ) + # iterate all (newly) updated pps tables for every + # client-account and build out position msgs to deliver to + # EMS. + for acctid, table in tables.items(): active_pps, closed_pps = table.dump_active() - # load all positions from `pps.toml`, cross check with - # ib's positions data, and relay re-formatted pps as - # msgs to the ems. - # __2 cases__: - # - new trades have taken place this session that we want to - # always reprocess indempotently, - # - no new trades yet but we want to reload and audit any - # positions reported by ib's sys that may not yet be in - # piker's ``pps.toml`` state-file. for pps in [active_pps, closed_pps]: msgs = await update_and_audit_msgs( acctid, @@ -661,22 +727,6 @@ async def trades_dialogue( tuple(name for name in accounts_def if name in accounts), )) - # proxy wrapper for starting trade event stream - async def open_trade_event_stream( - client: Client, - task_status: TaskStatus[ - trio.abc.ReceiveChannel - ] = trio.TASK_STATUS_IGNORED, - ): - # each api client has a unique event stream - async with tractor.to_asyncio.open_channel_from( - recv_trade_updates, - client=client, - ) as (first, trade_event_stream): - - task_status.started(trade_event_stream) - await trio.sleep_forever() - async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, @@ -723,7 +773,7 @@ async def trades_dialogue( async def emit_pp_update( ems_stream: tractor.MsgStream, trade_entry: dict, - accounts_def: bidict, + accounts_def: bidict[str, str], proxies: dict, cids2pps: dict, @@ -733,16 +783,16 @@ async def emit_pp_update( ) -> None: # compute and relay incrementally updated piker pp - acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] + accounts_def_inv: bidict[str, str] = accounts_def.inverse + acctid = accounts_def_inv[trade_entry['execution']['acctNumber']] proxy = proxies[acctid] - - acctid = acctid.strip('ib.') ( records_by_acct, api_to_ledger_entries, ) = await update_ledger_from_api_trades( [trade_entry], proxy, + accounts_def_inv, ) trans = records_by_acct[acctid] r = list(trans.values())[0] @@ -1244,7 +1294,7 @@ def parse_flex_dt( def api_trades_to_ledger_entries( - accounts: bidict, + accounts: bidict[str, str], # TODO: maybe we should just be passing through the # ``ib_insync.order.Trade`` instance directly here