diff --git a/dockering/ib/run_x11_vnc.sh b/dockering/ib/run_x11_vnc.sh index 69b6da85..1005fb41 100755 --- a/dockering/ib/run_x11_vnc.sh +++ b/dockering/ib/run_x11_vnc.sh @@ -2,15 +2,19 @@ # start VNC server x11vnc \ - -ncache_cr \ - -listen localhost \ + -listen 127.0.0.1 \ + -allow 127.0.0.1 \ + -autoport 3003 \ + -no6 \ + -noipv6 \ -display :1 \ + -bg \ -forever \ -shared \ -logappend /var/log/x11vnc.log \ - -bg \ - -noipv6 \ - -autoport 3003 \ + -ncache_cr \ + -ncache \ + # can't use this because of ``asyncvnc`` issue: # https://github.com/barneygale/asyncvnc/issues/1 # -passwd 'ibcansmbz' diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 93128240..9f384166 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS. """ from __future__ import annotations +from contextlib import ExitStack from dataclasses import asdict from functools import partial from pprint import pformat @@ -35,8 +36,8 @@ from trio_typing import TaskStatus import tractor from ib_insync.contract import ( Contract, - Option, - Forex, + # Option, + # Forex, ) from ib_insync.order import ( Trade, @@ -47,11 +48,17 @@ from ib_insync.objects import ( Execution, CommissionReport, ) -from ib_insync.objects import Position +from ib_insync.objects import Position as IbPosition import pendulum from piker import config -from piker import pp +from piker.pp import ( + Position, + Transaction, + open_trade_ledger, + open_pps, + PpTable, +) from piker.log import get_console_log from piker.clearing._messages import ( BrokerdOrder, @@ -65,8 +72,7 @@ from piker.clearing._messages import ( from piker.data._source import Symbol from .api import ( _accounts2clients, - # _adhoc_futes_set, - _adhoc_symbol_map, + con2fqsn, log, get_config, open_client_proxies, @@ -76,49 +82,12 @@ from .api import ( def pack_position( - pos: Position + pos: IbPosition ) -> dict[str, Any]: + con = pos.contract - - if isinstance(con, Option): - # TODO: option symbol parsing and sane display: - symbol = con.localSymbol.replace(' ', '') - - else: - # TODO: lookup fqsn even for derivs. - symbol = con.symbol.lower() - - # TODO: probably write a mofo exchange mapper routine since ib - # can't get it's shit together like, ever. - - # try our best to figure out the exchange / venue - exch = (con.primaryExchange or con.exchange).lower() - if not exch: - - if isinstance(con, Forex): - # bc apparently it's not in the contract obj? - exch = 'idealfx' - - else: - # for wtv cucked reason some futes don't show their - # exchange (like CL.NYMEX) ... - entry = _adhoc_symbol_map.get( - con.symbol or con.localSymbol - ) - if entry: - meta, kwargs = entry - cid = meta.get('conId') - if cid: - assert con.conId == meta['conId'] - exch = meta['exchange'] - - assert exch, f'No clue:\n {con}' - fqsn = '.'.join((symbol, exch)) - - expiry = con.lastTradeDateOrContractMonth - if expiry: - fqsn += f'.{expiry}' + fqsn, calc_price = con2fqsn(con) # TODO: options contracts into a sane format.. return ( @@ -305,12 +274,10 @@ async def update_ledger_from_api_trades( client: Union[Client, MethodProxy], ) -> tuple[ - dict[str, pp.Transaction], + dict[str, Transaction], dict[str, dict], ]: - conf = get_config() - # XXX; ERRGGG.. # pack in the "primary/listing exchange" value from a # contract lookup since it seems this isn't available by @@ -331,39 +298,33 @@ async def update_ledger_from_api_trades( entry['listingExchange'] = pexch + conf = get_config() entries = trades_to_ledger_entries( conf['accounts'].inverse, trade_entries, ) - - # write recent session's trades to the user's (local) ledger file. - records: dict[str, pp.Transactions] = {} + # normalize recent session's trades to the `Transaction` type + trans_by_acct: dict[str, dict[str, Transaction]] = {} for acctid, trades_by_id in entries.items(): # normalize to transaction form - records[acctid] = norm_trade_records(trades_by_id) + trans_by_acct[acctid] = norm_trade_records(trades_by_id) - return records, entries + return trans_by_acct, entries async def update_and_audit_msgs( acctid: str, # no `ib.` prefix is required! - pps: list[pp.Position], + pps: list[Position], cids2pps: dict[tuple[str, int], BrokerdPosition], validate: bool = False, ) -> list[BrokerdPosition]: msgs: list[BrokerdPosition] = [] - # pps: dict[int, pp.Position] = {} - for p in pps: bsuid = p.bsuid - # build trade-session-actor local table - # of pps from unique symbol ids. - # pps[bsuid] = p - # retreive equivalent ib reported position message # for comparison/audit versus the piker equivalent # breakeven pp calcs. @@ -436,7 +397,8 @@ async def update_and_audit_msgs( raise ValueError( f'UNEXPECTED POSITION ib <-> piker ledger:\n' f'piker: {msg}\n' - 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n' + 'MAYBE THEY LIQUIDATED YOU BRO!??!' ) msgs.append(msg) @@ -462,6 +424,8 @@ async def trades_dialogue( all_positions = [] accounts = set() clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] + acctids = set() + cids2pps: dict[str, BrokerdPosition] = {} # TODO: this causes a massive tractor bug when you run marketstored # with ``--tsdb``... you should get: @@ -471,15 +435,129 @@ async def trades_dialogue( # - hitting final control-c to kill daemon will lead to hang # assert 0 + # TODO: just write on teardown? + # we might also want to delegate a specific actor for + # ledger writing / reading for speed? async with ( - trio.open_nursery() as nurse, + # trio.open_nursery() as nurse, open_client_proxies() as (proxies, aioclients), ): - for account, proxy in proxies.items(): + # Open a trade ledgers stack for appending trade records over + # multiple accounts. + # TODO: we probably want to generalize this into a "ledgers" api.. + ledgers: dict[str, dict] = {} + tables: dict[str, PpTable] = {} + with ( + ExitStack() as lstack, + ): + for account, proxy in proxies.items(): - client = aioclients[account] + assert account in accounts_def + accounts.add(account) + acctid = account.strip('ib.') + acctids.add(acctid) - async def open_stream( + # open ledger and pptable wrapper for each + # detected account. + ledger = ledgers[acctid] = lstack.enter_context( + open_trade_ledger('ib', acctid) + ) + table = tables[acctid] = lstack.enter_context( + open_pps('ib', acctid) + ) + + client = aioclients[account] + + # process pp value reported from ib's system. we only use these + # to cross-check sizing since average pricing on their end uses + # the so called (bs) "FIFO" style which more or less results in + # a price that's not useful for traders who want to not lose + # money.. xb + # for client in aioclients.values(): + for pos in client.positions(): + + # collect all ib-pp reported positions so that we can be + # sure know which positions to update from the ledger if + # any are missing from the ``pps.toml`` + bsuid, msg = pack_position(pos) + acctid = msg.account = accounts_def.inverse[msg.account] + acctid = acctid.strip('ib.') + cids2pps[(acctid, bsuid)] = msg + assert msg.account in accounts, ( + f'Position for unknown account: {msg.account}') + + table = tables[acctid] + pp = table.pps.get(bsuid) + if ( + not pp + or pp.size != msg.size + ): + trans = norm_trade_records(ledger) + updated = table.update_from_trans(trans) + pp = updated[bsuid] + + # update trades ledgers for all accounts from connected + # api clients which report trades for **this session**. + trades = await proxy.trades() + ( + trans_by_acct, + api_to_ledger_entries, + ) = await update_ledger_from_api_trades( + trades, + proxy, + ) + + # if new trades are detected from the API, prepare + # them for the ledger file and update the pptable. + if api_to_ledger_entries: + trade_entries = api_to_ledger_entries[acctid] + + # write ledger with all new trades **AFTER** + # we've updated the `pps.toml` from the + # original ledger state! (i.e. this is + # currently done on exit) + ledger.update(trade_entries) + + trans = trans_by_acct.get(acctid) + if trans: + table.update_from_trans(trans) + updated = table.update_from_trans(trans) + + assert msg.size == pp.size, 'WTF' + + active_pps, closed_pps = table.dump_active() + + # load all positions from `pps.toml`, cross check with + # ib's positions data, and relay re-formatted pps as + # msgs to the ems. + # __2 cases__: + # - new trades have taken place this session that we want to + # always reprocess indempotently, + # - no new trades yet but we want to reload and audit any + # positions reported by ib's sys that may not yet be in + # piker's ``pps.toml`` state-file. + for pps in [active_pps, closed_pps]: + msgs = await update_and_audit_msgs( + acctid, + pps.values(), + cids2pps, + validate=True, + ) + all_positions.extend(msg for msg in msgs) + + if not all_positions and cids2pps: + raise RuntimeError( + 'Positions reported by ib but not found in `pps.toml`!?\n' + f'{pformat(cids2pps)}' + ) + + await ctx.started(( + all_positions, + tuple(name for name in accounts_def if name in accounts), + )) + + # proxy wrapper for starting trade event stream + async def open_trade_event_stream( task_status: TaskStatus[ trio.abc.ReceiveChannel ] = trio.TASK_STATUS_IGNORED, @@ -493,130 +571,36 @@ async def trades_dialogue( task_status.started(trade_event_stream) await trio.sleep_forever() - trade_event_stream = await nurse.start(open_stream) + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + trade_event_stream = await n.start(open_trade_event_stream) + clients.append((client, trade_event_stream)) - clients.append((client, trade_event_stream)) + # start order request handler **before** local trades + # event loop + n.start_soon(handle_order_requests, ems_stream, accounts_def) - assert account in accounts_def - accounts.add(account) + # allocate event relay tasks for each client connection + for client, stream in clients: + n.start_soon( + deliver_trade_events, + stream, + ems_stream, + accounts_def, + cids2pps, + proxies, - cids2pps: dict[str, BrokerdPosition] = {} - update_records: dict[str, bidict] = {} + ledgers, + tables, + ) - # process pp value reported from ib's system. we only use these - # to cross-check sizing since average pricing on their end uses - # the so called (bs) "FIFO" style which more or less results in - # a price that's not useful for traders who want to not lose - # money.. xb - for client in aioclients.values(): - for pos in client.positions(): + # TODO: make this thread-async! + table.write_config() - cid, msg = pack_position(pos) - acctid = msg.account = accounts_def.inverse[msg.account] - acctid = acctid.strip('ib.') - cids2pps[(acctid, cid)] = msg - assert msg.account in accounts, ( - f'Position for unknown account: {msg.account}') - - # collect all ib-pp reported positions so that we can be - # sure know which positions to update from the ledger if - # any are missing from the ``pps.toml`` - update_records.setdefault(acctid, bidict())[cid] = msg.symbol - - # update trades ledgers for all accounts from - # connected api clients which report trades for **this session**. - new_trades = {} - for account, proxy in proxies.items(): - trades = await proxy.trades() - ( - records_by_acct, - ledger_entries, - ) = await update_ledger_from_api_trades( - trades, - proxy, - ) - new_trades.update(records_by_acct) - - for acctid, trans in new_trades.items(): - for t in trans: - bsuid = t.bsuid - if bsuid in update_records: - assert update_records[bsuid] == t.fqsn - else: - update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn - - # load all positions from `pps.toml`, cross check with ib's - # positions data, and relay re-formatted pps as msgs to the ems. - # __2 cases__: - # - new trades have taken place this session that we want to - # always reprocess indempotently, - # - no new trades yet but we want to reload and audit any - # positions reported by ib's sys that may not yet be in - # piker's ``pps.toml`` state-file. - for acctid, to_update in update_records.items(): - trans = new_trades.get(acctid) - active, closed = pp.update_pps_conf( - 'ib', - acctid, - trade_records=trans, - ledger_reload=to_update, - ) - for pps in [active, closed]: - msgs = await update_and_audit_msgs( - acctid, - pps.values(), - cids2pps, - validate=True, - ) - all_positions.extend(msg for msg in msgs) - - if not all_positions and cids2pps: - raise RuntimeError( - 'Positions reported by ib but not found in `pps.toml`!?\n' - f'{pformat(cids2pps)}' - ) - - # log.info(f'Loaded {len(trades)} from this session') - # TODO: write trades to local ``trades.toml`` - # - use above per-session trades data and write to local file - # - get the "flex reports" working and pull historical data and - # also save locally. - - await ctx.started(( - all_positions, - tuple(name for name in accounts_def if name in accounts), - )) - - # TODO: maybe just write on teardown? - # we might also want to delegate a specific actor for - # ledger writing / reading for speed? - - # write ledger with all new trades **AFTER** we've updated the - # `pps.toml` from the original ledger state! - for acctid, trades_by_id in ledger_entries.items(): - with pp.open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - # start order request handler **before** local trades event loop - n.start_soon(handle_order_requests, ems_stream, accounts_def) - - # allocate event relay tasks for each client connection - for client, stream in clients: - n.start_soon( - deliver_trade_events, - stream, - ems_stream, - accounts_def, - cids2pps, - proxies, - ) - - # block until cancelled - await trio.sleep_forever() + # block until cancelled + await trio.sleep_forever() async def emit_pp_update( @@ -626,44 +610,43 @@ async def emit_pp_update( proxies: dict, cids2pps: dict, + ledgers, + tables, + ) -> None: # compute and relay incrementally updated piker pp acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] proxy = proxies[acctid] - acctname = acctid.strip('ib.') - records_by_acct, ledger_entries = await update_ledger_from_api_trades( + acctid = acctid.strip('ib.') + ( + records_by_acct, + api_to_ledger_entries, + ) = await update_ledger_from_api_trades( [trade_entry], proxy, ) - records = records_by_acct[acctname] - r = records[0] + trans = records_by_acct[acctid] + r = list(trans.values())[0] - # update and load all positions from `pps.toml`, cross check with - # ib's positions data, and relay re-formatted pps as msgs to the - # ems. we report both the open and closed updates in one map since - # for incremental update we may have just fully closed a pp and need - # to relay that msg as well! - active, closed = pp.update_pps_conf( - 'ib', - acctname, - trade_records=records, - ledger_reload={r.bsuid: r.fqsn}, - ) + table = tables[acctid] + table.update_from_trans(trans) + active, closed = table.dump_active() - # NOTE: write ledger with all new trades **AFTER** we've updated the - # `pps.toml` from the original ledger state! - for acctid, trades_by_id in ledger_entries.items(): - with pp.open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) + # NOTE: update ledger with all new trades + for acctid, trades_by_id in api_to_ledger_entries.items(): + ledger = ledgers[acctid] + ledger.update(trades_by_id) + # generate pp msgs and cross check with ib's positions data, relay + # re-formatted pps as msgs to the ems. for pos in filter( bool, [active.get(r.bsuid), closed.get(r.bsuid)] ): msgs = await update_and_audit_msgs( - acctname, + acctid, [pos], cids2pps, @@ -672,6 +655,7 @@ async def emit_pp_update( ) if msgs: msg = msgs[0] + log.info('Emitting pp msg: {msg}') break await ems_stream.send(msg) @@ -685,6 +669,9 @@ async def deliver_trade_events( cids2pps: dict[tuple[str, str], BrokerdPosition], proxies: dict[str, MethodProxy], + ledgers, + tables, + ) -> None: ''' Format and relay all trade events for a given client to emsd. @@ -834,6 +821,8 @@ async def deliver_trade_events( accounts_def, proxies, cids2pps, + ledgers, + tables, ) case 'cost': @@ -866,6 +855,8 @@ async def deliver_trade_events( accounts_def, proxies, cids2pps, + ledgers, + tables, ) case 'error': @@ -886,6 +877,7 @@ async def deliver_trade_events( case 'position': cid, msg = pack_position(item) + log.info(f'New IB position msg: {msg}') # acctid = msg.account = accounts_def.inverse[msg.account] # cuck ib and it's shitty fifo sys for pps! # await ems_stream.send(msg) @@ -916,14 +908,13 @@ async def deliver_trade_events( def norm_trade_records( ledger: dict[str, Any], -) -> list[pp.Transaction]: +) -> list[Transaction]: ''' Normalize a flex report or API retrieved executions ledger into our standard record format. ''' - records: list[pp.Transaction] = [] - + records: dict[str, Transaction] = {} for tid, record in ledger.items(): conid = record.get('conId') or record['conid'] @@ -1001,7 +992,7 @@ def norm_trade_records( # which case, we can pull the fqsn from that table (see # `trades_dialogue()` above). - records.append(pp.Transaction( + records[tid] = Transaction( fqsn=fqsn, tid=tid, size=size, @@ -1010,7 +1001,7 @@ def norm_trade_records( dt=dt, expiry=expiry, bsuid=conid, - )) + ) return records @@ -1139,8 +1130,7 @@ def load_flex_trades( trade_entries = report.extract('Trade') ln = len(trade_entries) - # log.info(f'Loaded {ln} trades from flex query') - print(f'Loaded {ln} trades from flex query') + log.info(f'Loaded {ln} trades from flex query') trades_by_account = trades_to_ledger_entries( # get reverse map to user account names @@ -1149,14 +1139,20 @@ def load_flex_trades( source_type='flex', ) - ledgers = {} - for acctid, trades_by_id in trades_by_account.items(): - with pp.open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) + for acctid in trades_by_account: + trades_by_id = trades_by_account[acctid] + with open_trade_ledger('ib', acctid) as ledger_dict: + tid_delta = set(trades_by_id) - set(ledger_dict) + log.info( + 'New trades detected\n' + f'{pformat(tid_delta)}' + ) + if tid_delta: + ledger_dict.update( + {tid: trades_by_id[tid] for tid in tid_delta} + ) - ledgers[acctid] = ledger - - return ledgers + return ledger_dict if __name__ == '__main__': diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index c3ac985f..4ec214bc 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -301,7 +301,13 @@ async def get_bars( else: log.warning('Sending CONNECTION RESET') - await data_reset_hack(reset_type='connection') + res = await data_reset_hack(reset_type='connection') + if not res: + log.warning( + 'NO VNC DETECTED!\n' + 'Manually press ctrl-alt-f on your IB java app' + ) + # break with trio.move_on_after(timeout) as cs: for name, ev in [ @@ -842,7 +848,10 @@ async def data_reset_hack( client.mouse.click() client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked - await tractor.to_asyncio.run_task(vnc_click_hack) + try: + await tractor.to_asyncio.run_task(vnc_click_hack) + except OSError: + return False # we don't really need the ``xdotool`` approach any more B) return True