From 666587991acb879eb43b5d9d21524e854c7a7955 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Jul 2022 11:03:32 -0400 Subject: [PATCH 1/8] Avoid crash when no vnc server running --- piker/brokers/ib/feed.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index c3ac985f..4ec214bc 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -301,7 +301,13 @@ async def get_bars( else: log.warning('Sending CONNECTION RESET') - await data_reset_hack(reset_type='connection') + res = await data_reset_hack(reset_type='connection') + if not res: + log.warning( + 'NO VNC DETECTED!\n' + 'Manually press ctrl-alt-f on your IB java app' + ) + # break with trio.move_on_after(timeout) as cs: for name, ev in [ @@ -842,7 +848,10 @@ async def data_reset_hack( client.mouse.click() client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked - await tractor.to_asyncio.run_task(vnc_click_hack) + try: + await tractor.to_asyncio.run_task(vnc_click_hack) + except OSError: + return False # we don't really need the ``xdotool`` approach any more B) return True From 30ff793a22d13ccc237188cbd3d517c7253667e8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 21 Jul 2022 10:06:18 -0400 Subject: [PATCH 2/8] Port `ib` broker machinery to new ctx mngr pp api This drops the use of `pp.update_pps_conf()` (and friends) and instead moves to using the context style `open_trade_ledger()` and `open_pps()` managers for faster pp msg gen due to delayed file writing (which was the main source update latency). In order to make this work with potentially multiple accounts this also uses an exit stack which loads each ledger / `pps.toml` into an account id mapped `dict`; a POC for likely how we should implement some higher level position manager api. --- piker/brokers/ib/broker.py | 394 ++++++++++++++++++------------------- 1 file changed, 195 insertions(+), 199 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 93128240..d881a866 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS. """ from __future__ import annotations +from contextlib import ExitStack from dataclasses import asdict from functools import partial from pprint import pformat @@ -35,8 +36,8 @@ from trio_typing import TaskStatus import tractor from ib_insync.contract import ( Contract, - Option, - Forex, + # Option, + # Forex, ) from ib_insync.order import ( Trade, @@ -47,11 +48,18 @@ from ib_insync.objects import ( Execution, CommissionReport, ) -from ib_insync.objects import Position +from ib_insync.objects import Position as IbPosition import pendulum from piker import config -from piker import pp +from piker.pp import ( + # update_pps_conf, + Position, + Transaction, + open_trade_ledger, + open_pps, + PpTable, +) from piker.log import get_console_log from piker.clearing._messages import ( BrokerdOrder, @@ -66,7 +74,8 @@ from piker.data._source import Symbol from .api import ( _accounts2clients, # _adhoc_futes_set, - _adhoc_symbol_map, + con2fqsn, + # _adhoc_symbol_map, log, get_config, open_client_proxies, @@ -76,49 +85,12 @@ from .api import ( def pack_position( - pos: Position + pos: IbPosition ) -> dict[str, Any]: + con = pos.contract - - if isinstance(con, Option): - # TODO: option symbol parsing and sane display: - symbol = con.localSymbol.replace(' ', '') - - else: - # TODO: lookup fqsn even for derivs. - symbol = con.symbol.lower() - - # TODO: probably write a mofo exchange mapper routine since ib - # can't get it's shit together like, ever. - - # try our best to figure out the exchange / venue - exch = (con.primaryExchange or con.exchange).lower() - if not exch: - - if isinstance(con, Forex): - # bc apparently it's not in the contract obj? - exch = 'idealfx' - - else: - # for wtv cucked reason some futes don't show their - # exchange (like CL.NYMEX) ... - entry = _adhoc_symbol_map.get( - con.symbol or con.localSymbol - ) - if entry: - meta, kwargs = entry - cid = meta.get('conId') - if cid: - assert con.conId == meta['conId'] - exch = meta['exchange'] - - assert exch, f'No clue:\n {con}' - fqsn = '.'.join((symbol, exch)) - - expiry = con.lastTradeDateOrContractMonth - if expiry: - fqsn += f'.{expiry}' + fqsn, calc_price = con2fqsn(con) # TODO: options contracts into a sane format.. return ( @@ -305,12 +277,10 @@ async def update_ledger_from_api_trades( client: Union[Client, MethodProxy], ) -> tuple[ - dict[str, pp.Transaction], + dict[str, Transaction], dict[str, dict], ]: - conf = get_config() - # XXX; ERRGGG.. # pack in the "primary/listing exchange" value from a # contract lookup since it seems this isn't available by @@ -331,39 +301,34 @@ async def update_ledger_from_api_trades( entry['listingExchange'] = pexch + conf = get_config() entries = trades_to_ledger_entries( conf['accounts'].inverse, trade_entries, ) - # write recent session's trades to the user's (local) ledger file. - records: dict[str, pp.Transactions] = {} + # normalize recent session's trades to the `Transaction` type + trans_by_acct: dict[str, dict[str, Transaction]] = {} for acctid, trades_by_id in entries.items(): # normalize to transaction form - records[acctid] = norm_trade_records(trades_by_id) + trans_by_acct[acctid] = norm_trade_records(trades_by_id) - return records, entries + return trans_by_acct, entries async def update_and_audit_msgs( acctid: str, # no `ib.` prefix is required! - pps: list[pp.Position], + pps: list[Position], cids2pps: dict[tuple[str, int], BrokerdPosition], validate: bool = False, ) -> list[BrokerdPosition]: msgs: list[BrokerdPosition] = [] - # pps: dict[int, pp.Position] = {} - for p in pps: bsuid = p.bsuid - # build trade-session-actor local table - # of pps from unique symbol ids. - # pps[bsuid] = p - # retreive equivalent ib reported position message # for comparison/audit versus the piker equivalent # breakeven pp calcs. @@ -479,7 +444,7 @@ async def trades_dialogue( client = aioclients[account] - async def open_stream( + async def open_trade_event_stream( task_status: TaskStatus[ trio.abc.ReceiveChannel ] = trio.TASK_STATUS_IGNORED, @@ -493,7 +458,7 @@ async def trades_dialogue( task_status.started(trade_event_stream) await trio.sleep_forever() - trade_event_stream = await nurse.start(open_stream) + trade_event_stream = await nurse.start(open_trade_event_stream) clients.append((client, trade_event_stream)) @@ -501,122 +466,147 @@ async def trades_dialogue( accounts.add(account) cids2pps: dict[str, BrokerdPosition] = {} - update_records: dict[str, bidict] = {} - # process pp value reported from ib's system. we only use these - # to cross-check sizing since average pricing on their end uses - # the so called (bs) "FIFO" style which more or less results in - # a price that's not useful for traders who want to not lose - # money.. xb - for client in aioclients.values(): - for pos in client.positions(): - - cid, msg = pack_position(pos) - acctid = msg.account = accounts_def.inverse[msg.account] - acctid = acctid.strip('ib.') - cids2pps[(acctid, cid)] = msg - assert msg.account in accounts, ( - f'Position for unknown account: {msg.account}') - - # collect all ib-pp reported positions so that we can be - # sure know which positions to update from the ledger if - # any are missing from the ``pps.toml`` - update_records.setdefault(acctid, bidict())[cid] = msg.symbol - - # update trades ledgers for all accounts from - # connected api clients which report trades for **this session**. - new_trades = {} - for account, proxy in proxies.items(): - trades = await proxy.trades() - ( - records_by_acct, - ledger_entries, - ) = await update_ledger_from_api_trades( - trades, - proxy, - ) - new_trades.update(records_by_acct) - - for acctid, trans in new_trades.items(): - for t in trans: - bsuid = t.bsuid - if bsuid in update_records: - assert update_records[bsuid] == t.fqsn - else: - update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn - - # load all positions from `pps.toml`, cross check with ib's - # positions data, and relay re-formatted pps as msgs to the ems. - # __2 cases__: - # - new trades have taken place this session that we want to - # always reprocess indempotently, - # - no new trades yet but we want to reload and audit any - # positions reported by ib's sys that may not yet be in - # piker's ``pps.toml`` state-file. - for acctid, to_update in update_records.items(): - trans = new_trades.get(acctid) - active, closed = pp.update_pps_conf( - 'ib', - acctid, - trade_records=trans, - ledger_reload=to_update, - ) - for pps in [active, closed]: - msgs = await update_and_audit_msgs( - acctid, - pps.values(), - cids2pps, - validate=True, - ) - all_positions.extend(msg for msg in msgs) - - if not all_positions and cids2pps: - raise RuntimeError( - 'Positions reported by ib but not found in `pps.toml`!?\n' - f'{pformat(cids2pps)}' - ) - - # log.info(f'Loaded {len(trades)} from this session') - # TODO: write trades to local ``trades.toml`` - # - use above per-session trades data and write to local file - # - get the "flex reports" working and pull historical data and - # also save locally. - - await ctx.started(( - all_positions, - tuple(name for name in accounts_def if name in accounts), - )) - - # TODO: maybe just write on teardown? - # we might also want to delegate a specific actor for - # ledger writing / reading for speed? - - # write ledger with all new trades **AFTER** we've updated the - # `pps.toml` from the original ledger state! - for acctid, trades_by_id in ledger_entries.items(): - with pp.open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, + # Open a trade ledgers stack for appending trade records over + # multiple accounts. + # TODO: we probably want to generalize this into a "ledgers" api.. + ledgers: dict[str, dict] = {} + tables: dict[str, PpTable] = {} + with ( + ExitStack() as lstack, ): - # start order request handler **before** local trades event loop - n.start_soon(handle_order_requests, ems_stream, accounts_def) - # allocate event relay tasks for each client connection - for client, stream in clients: - n.start_soon( - deliver_trade_events, - stream, - ems_stream, - accounts_def, - cids2pps, - proxies, + # process pp value reported from ib's system. we only use these + # to cross-check sizing since average pricing on their end uses + # the so called (bs) "FIFO" style which more or less results in + # a price that's not useful for traders who want to not lose + # money.. xb + acctids = set() + for client in aioclients.values(): + for pos in client.positions(): + + # collect all ib-pp reported positions so that we can be + # sure know which positions to update from the ledger if + # any are missing from the ``pps.toml`` + cid, msg = pack_position(pos) + acctid = msg.account = accounts_def.inverse[msg.account] + acctid = acctid.strip('ib.') + acctids.add(acctid) + + cids2pps[(acctid, cid)] = msg + assert msg.account in accounts, ( + f'Position for unknown account: {msg.account}') + + for acctid in acctids: + # open ledger and pptable wrapper for each + # detected account. + ledgers[acctid] = lstack.enter_context( + open_trade_ledger('ib', acctid) + ) + tables[acctid] = lstack.enter_context( + open_pps('ib', acctid) ) - # block until cancelled - await trio.sleep_forever() + # update trades ledgers for all accounts from + # connected api clients which report trades for **this session**. + for account, proxy in proxies.items(): + + trades = await proxy.trades() + ( + trans_by_acct, + ready_for_ledger_entries, + ) = await update_ledger_from_api_trades( + trades, + proxy, + ) + acctid = acctid.strip('ib.') + ledgers[acctid].update(ready_for_ledger_entries) + + # WTF, yet again this key error is getting ignored?!?! + # tables[acctid].update_from_trans(trans_by_acct[account]) + + # this causes a hang.. + # - marketstored tries to kill container, cant, + # - ctrl-c makes pikerd get stuck... + # assert 0 + + trans = trans_by_acct.get(acctid) + if trans: + tables[acctid].update_from_trans(trans) + + # load all positions from `pps.toml`, cross check with ib's + # positions data, and relay re-formatted pps as msgs to the ems. + # __2 cases__: + # - new trades have taken place this session that we want to + # always reprocess indempotently, + # - no new trades yet but we want to reload and audit any + # positions reported by ib's sys that may not yet be in + # piker's ``pps.toml`` state-file. + for acctid in acctids: + + table = tables[acctid] + _, closed_pps = table.dump_active('ib') + active_pps = table.pps + + for pps in [active_pps, closed_pps]: + msgs = await update_and_audit_msgs( + acctid, + pps.values(), + cids2pps, + validate=True, + ) + all_positions.extend(msg for msg in msgs) + + if not all_positions and cids2pps: + raise RuntimeError( + 'Positions reported by ib but not found in `pps.toml`!?\n' + f'{pformat(cids2pps)}' + ) + + # log.info(f'Loaded {len(trades)} from this session') + # TODO: write trades to local ``trades.toml`` + # - use above per-session trades data and write to local file + # - get the "flex reports" working and pull historical data and + # also save locally. + + await ctx.started(( + all_positions, + tuple(name for name in accounts_def if name in accounts), + )) + + # TODO: maybe just write on teardown? + # we might also want to delegate a specific actor for + # ledger writing / reading for speed? + + # write ledger with all new trades **AFTER** we've updated the + # `pps.toml` from the original ledger state! + for acctid, trades_by_id in ready_for_ledger_entries.items(): + ledgers[acctid].update(trades_by_id) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + # start order request handler **before** local trades + # event loop + n.start_soon(handle_order_requests, ems_stream, accounts_def) + + # allocate event relay tasks for each client connection + for client, stream in clients: + n.start_soon( + deliver_trade_events, + stream, + ems_stream, + accounts_def, + cids2pps, + proxies, + + ledgers, + tables, + ) + + # block until cancelled + await trio.sleep_forever() async def emit_pp_update( @@ -626,44 +616,44 @@ async def emit_pp_update( proxies: dict, cids2pps: dict, + ledgers, + tables, + ) -> None: # compute and relay incrementally updated piker pp acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] proxy = proxies[acctid] - acctname = acctid.strip('ib.') - records_by_acct, ledger_entries = await update_ledger_from_api_trades( + acctid = acctid.strip('ib.') + ( + records_by_acct, + ready_for_ledger_entries, + ) = await update_ledger_from_api_trades( [trade_entry], proxy, ) - records = records_by_acct[acctname] - r = records[0] + trans = records_by_acct[acctid] + r = list(trans.values())[0] - # update and load all positions from `pps.toml`, cross check with - # ib's positions data, and relay re-formatted pps as msgs to the - # ems. we report both the open and closed updates in one map since - # for incremental update we may have just fully closed a pp and need - # to relay that msg as well! - active, closed = pp.update_pps_conf( - 'ib', - acctname, - trade_records=records, - ledger_reload={r.bsuid: r.fqsn}, - ) + table = tables[acctid] + table.update_from_trans(trans) + _, closed = table.dump_active('ib') + active = table.pps - # NOTE: write ledger with all new trades **AFTER** we've updated the - # `pps.toml` from the original ledger state! - for acctid, trades_by_id in ledger_entries.items(): - with pp.open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) + # NOTE: update ledger with all new trades + for acctid, trades_by_id in ready_for_ledger_entries.items(): + ledger = ledgers[acctid] + ledger.update(trades_by_id) + # generate pp msgs and cross check with ib's positions data, relay + # re-formatted pps as msgs to the ems. for pos in filter( bool, [active.get(r.bsuid), closed.get(r.bsuid)] ): msgs = await update_and_audit_msgs( - acctname, + acctid, [pos], cids2pps, @@ -685,6 +675,9 @@ async def deliver_trade_events( cids2pps: dict[tuple[str, str], BrokerdPosition], proxies: dict[str, MethodProxy], + ledgers, + tables, + ) -> None: ''' Format and relay all trade events for a given client to emsd. @@ -834,6 +827,8 @@ async def deliver_trade_events( accounts_def, proxies, cids2pps, + ledgers, + tables, ) case 'cost': @@ -866,6 +861,8 @@ async def deliver_trade_events( accounts_def, proxies, cids2pps, + ledgers, + tables, ) case 'error': @@ -916,14 +913,13 @@ async def deliver_trade_events( def norm_trade_records( ledger: dict[str, Any], -) -> list[pp.Transaction]: +) -> list[Transaction]: ''' Normalize a flex report or API retrieved executions ledger into our standard record format. ''' - records: list[pp.Transaction] = [] - + records: dict[str, Transaction] = {} for tid, record in ledger.items(): conid = record.get('conId') or record['conid'] @@ -1001,7 +997,7 @@ def norm_trade_records( # which case, we can pull the fqsn from that table (see # `trades_dialogue()` above). - records.append(pp.Transaction( + records[tid] = Transaction( fqsn=fqsn, tid=tid, size=size, @@ -1010,7 +1006,7 @@ def norm_trade_records( dt=dt, expiry=expiry, bsuid=conid, - )) + ) return records @@ -1151,7 +1147,7 @@ def load_flex_trades( ledgers = {} for acctid, trades_by_id in trades_by_account.items(): - with pp.open_trade_ledger('ib', acctid) as ledger: + with open_trade_ledger('ib', acctid) as ledger: ledger.update(trades_by_id) ledgers[acctid] = ledger From 9967adb371e92163f4b8c01bf73d1076a200aa8c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 21 Jul 2022 15:23:47 -0400 Subject: [PATCH 3/8] Lol, drop unintented accound name key layer from ledger ledger --- piker/brokers/ib/broker.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index d881a866..12c7dd38 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -306,7 +306,6 @@ async def update_ledger_from_api_trades( conf['accounts'].inverse, trade_entries, ) - # normalize recent session's trades to the `Transaction` type trans_by_acct: dict[str, dict[str, Transaction]] = {} @@ -519,16 +518,10 @@ async def trades_dialogue( trades, proxy, ) - acctid = acctid.strip('ib.') - ledgers[acctid].update(ready_for_ledger_entries) - # WTF, yet again this key error is getting ignored?!?! - # tables[acctid].update_from_trans(trans_by_acct[account]) - - # this causes a hang.. - # - marketstored tries to kill container, cant, - # - ctrl-c makes pikerd get stuck... - # assert 0 + acctid = account.strip('ib.') + ledger = ledgers[acctid] + ledger.update(ready_for_ledger_entries[acctid]) trans = trans_by_acct.get(acctid) if trans: From c7b84ab5004bbe4a428e12b4aada99100361b5ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Jul 2022 15:14:24 -0400 Subject: [PATCH 4/8] Port position calcs to new ctx mngr apis and drop multi-loop madness --- piker/brokers/ib/broker.py | 191 +++++++++++++++++++------------------ 1 file changed, 98 insertions(+), 93 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 12c7dd38..acf055dc 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -53,7 +53,6 @@ import pendulum from piker import config from piker.pp import ( - # update_pps_conf, Position, Transaction, open_trade_ledger, @@ -426,6 +425,8 @@ async def trades_dialogue( all_positions = [] accounts = set() clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] + acctids = set() + cids2pps: dict[str, BrokerdPosition] = {} # TODO: this causes a massive tractor bug when you run marketstored # with ``--tsdb``... you should get: @@ -435,37 +436,13 @@ async def trades_dialogue( # - hitting final control-c to kill daemon will lead to hang # assert 0 + # TODO: just write on teardown? + # we might also want to delegate a specific actor for + # ledger writing / reading for speed? async with ( trio.open_nursery() as nurse, open_client_proxies() as (proxies, aioclients), ): - for account, proxy in proxies.items(): - - client = aioclients[account] - - async def open_trade_event_stream( - task_status: TaskStatus[ - trio.abc.ReceiveChannel - ] = trio.TASK_STATUS_IGNORED, - ): - # each api client has a unique event stream - async with tractor.to_asyncio.open_channel_from( - recv_trade_updates, - client=client, - ) as (first, trade_event_stream): - - task_status.started(trade_event_stream) - await trio.sleep_forever() - - trade_event_stream = await nurse.start(open_trade_event_stream) - - clients.append((client, trade_event_stream)) - - assert account in accounts_def - accounts.add(account) - - cids2pps: dict[str, BrokerdPosition] = {} - # Open a trade ledgers stack for appending trade records over # multiple accounts. # TODO: we probably want to generalize this into a "ledgers" api.. @@ -474,73 +451,106 @@ async def trades_dialogue( with ( ExitStack() as lstack, ): + for account, proxy in proxies.items(): - # process pp value reported from ib's system. we only use these - # to cross-check sizing since average pricing on their end uses - # the so called (bs) "FIFO" style which more or less results in - # a price that's not useful for traders who want to not lose - # money.. xb - acctids = set() - for client in aioclients.values(): - for pos in client.positions(): + acctid = account.strip('ib.') + acctids.add(acctid) - # collect all ib-pp reported positions so that we can be - # sure know which positions to update from the ledger if - # any are missing from the ``pps.toml`` - cid, msg = pack_position(pos) - acctid = msg.account = accounts_def.inverse[msg.account] - acctid = acctid.strip('ib.') - acctids.add(acctid) - - cids2pps[(acctid, cid)] = msg - assert msg.account in accounts, ( - f'Position for unknown account: {msg.account}') - - for acctid in acctids: # open ledger and pptable wrapper for each # detected account. - ledgers[acctid] = lstack.enter_context( + ledger = ledgers[acctid] = lstack.enter_context( open_trade_ledger('ib', acctid) ) - tables[acctid] = lstack.enter_context( + table = tables[acctid] = lstack.enter_context( open_pps('ib', acctid) ) - # update trades ledgers for all accounts from - # connected api clients which report trades for **this session**. - for account, proxy in proxies.items(): + client = aioclients[account] + async def open_trade_event_stream( + task_status: TaskStatus[ + trio.abc.ReceiveChannel + ] = trio.TASK_STATUS_IGNORED, + ): + # each api client has a unique event stream + async with tractor.to_asyncio.open_channel_from( + recv_trade_updates, + client=client, + ) as (first, trade_event_stream): + + task_status.started(trade_event_stream) + await trio.sleep_forever() + + trade_event_stream = await nurse.start(open_trade_event_stream) + clients.append((client, trade_event_stream)) + + assert account in accounts_def + accounts.add(account) + + # update trades ledgers for all accounts from connected + # api clients which report trades for **this session**. trades = await proxy.trades() ( trans_by_acct, - ready_for_ledger_entries, + api_ready_for_ledger_entries, ) = await update_ledger_from_api_trades( trades, proxy, ) - acctid = account.strip('ib.') - ledger = ledgers[acctid] - ledger.update(ready_for_ledger_entries[acctid]) + # if new trades are detected from the API, prepare + # them for the ledger file and update the pptable. + if api_ready_for_ledger_entries: + trade_entries = api_ready_for_ledger_entries[acctid] + ledger.update(trade_entries) + trans = trans_by_acct.get(acctid) + if trans: + table.update_from_trans(trans) - trans = trans_by_acct.get(acctid) - if trans: - tables[acctid].update_from_trans(trans) + # process pp value reported from ib's system. we only use these + # to cross-check sizing since average pricing on their end uses + # the so called (bs) "FIFO" style which more or less results in + # a price that's not useful for traders who want to not lose + # money.. xb + # for client in aioclients.values(): + for pos in client.positions(): - # load all positions from `pps.toml`, cross check with ib's - # positions data, and relay re-formatted pps as msgs to the ems. - # __2 cases__: - # - new trades have taken place this session that we want to - # always reprocess indempotently, - # - no new trades yet but we want to reload and audit any - # positions reported by ib's sys that may not yet be in - # piker's ``pps.toml`` state-file. - for acctid in acctids: + # collect all ib-pp reported positions so that we can be + # sure know which positions to update from the ledger if + # any are missing from the ``pps.toml`` + bsuid, msg = pack_position(pos) + acctid = msg.account = accounts_def.inverse[msg.account] + acctid = acctid.strip('ib.') + cids2pps[(acctid, bsuid)] = msg + assert msg.account in accounts, ( + f'Position for unknown account: {msg.account}') + + table = tables[acctid] + pp = table.pps.get(bsuid) + if ( + not pp + or pp.size != msg.size + ): + trans = norm_trade_records(ledger) + updated = table.update_from_trans(trans) + pp = updated[bsuid] + assert msg.size == pp.size, 'WTF' + + # TODO: figure out why these don't match? + # assert pp.calc_be_price() == pp.be_price - table = tables[acctid] _, closed_pps = table.dump_active('ib') active_pps = table.pps + # load all positions from `pps.toml`, cross check with + # ib's positions data, and relay re-formatted pps as + # msgs to the ems. + # __2 cases__: + # - new trades have taken place this session that we want to + # always reprocess indempotently, + # - no new trades yet but we want to reload and audit any + # positions reported by ib's sys that may not yet be in + # piker's ``pps.toml`` state-file. for pps in [active_pps, closed_pps]: msgs = await update_and_audit_msgs( acctid, @@ -556,24 +566,14 @@ async def trades_dialogue( f'{pformat(cids2pps)}' ) - # log.info(f'Loaded {len(trades)} from this session') - # TODO: write trades to local ``trades.toml`` - # - use above per-session trades data and write to local file - # - get the "flex reports" working and pull historical data and - # also save locally. - await ctx.started(( all_positions, tuple(name for name in accounts_def if name in accounts), )) - # TODO: maybe just write on teardown? - # we might also want to delegate a specific actor for - # ledger writing / reading for speed? - # write ledger with all new trades **AFTER** we've updated the # `pps.toml` from the original ledger state! - for acctid, trades_by_id in ready_for_ledger_entries.items(): + for acctid, trades_by_id in api_ready_for_ledger_entries.items(): ledgers[acctid].update(trades_by_id) async with ( @@ -621,7 +621,7 @@ async def emit_pp_update( acctid = acctid.strip('ib.') ( records_by_acct, - ready_for_ledger_entries, + api_ready_for_ledger_entries, ) = await update_ledger_from_api_trades( [trade_entry], proxy, @@ -635,7 +635,7 @@ async def emit_pp_update( active = table.pps # NOTE: update ledger with all new trades - for acctid, trades_by_id in ready_for_ledger_entries.items(): + for acctid, trades_by_id in api_ready_for_ledger_entries.items(): ledger = ledgers[acctid] ledger.update(trades_by_id) @@ -1128,8 +1128,7 @@ def load_flex_trades( trade_entries = report.extract('Trade') ln = len(trade_entries) - # log.info(f'Loaded {ln} trades from flex query') - print(f'Loaded {ln} trades from flex query') + log.info(f'Loaded {ln} trades from flex query') trades_by_account = trades_to_ledger_entries( # get reverse map to user account names @@ -1138,14 +1137,20 @@ def load_flex_trades( source_type='flex', ) - ledgers = {} - for acctid, trades_by_id in trades_by_account.items(): - with open_trade_ledger('ib', acctid) as ledger: - ledger.update(trades_by_id) + for acctid in trades_by_account: + trades_by_id = trades_by_account[acctid] + with open_trade_ledger('ib', acctid) as ledger_dict: + tid_delta = set(trades_by_id) - set(ledger_dict) + log.info( + 'New trades detected\n' + f'{pformat(tid_delta)}' + ) + if tid_delta: + ledger_dict.update( + {tid: trades_by_id[tid] for tid in tid_delta} + ) - ledgers[acctid] = ledger - - return ledgers + return ledger_dict if __name__ == '__main__': From db5aacdb9cde6b3f2cc71189add0328aae63f578 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 26 Jul 2022 09:47:36 -0400 Subject: [PATCH 5/8] Only allow vnc client connections from localhost --- dockering/ib/run_x11_vnc.sh | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dockering/ib/run_x11_vnc.sh b/dockering/ib/run_x11_vnc.sh index 69b6da85..1005fb41 100755 --- a/dockering/ib/run_x11_vnc.sh +++ b/dockering/ib/run_x11_vnc.sh @@ -2,15 +2,19 @@ # start VNC server x11vnc \ - -ncache_cr \ - -listen localhost \ + -listen 127.0.0.1 \ + -allow 127.0.0.1 \ + -autoport 3003 \ + -no6 \ + -noipv6 \ -display :1 \ + -bg \ -forever \ -shared \ -logappend /var/log/x11vnc.log \ - -bg \ - -noipv6 \ - -autoport 3003 \ + -ncache_cr \ + -ncache \ + # can't use this because of ``asyncvnc`` issue: # https://github.com/barneygale/asyncvnc/issues/1 # -passwd 'ibcansmbz' From 279c899de53f2d5f518d486022bd960277f29a5f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 26 Jul 2022 12:06:53 -0400 Subject: [PATCH 6/8] Port to new PpTable.dump_active()` output, move order event task to child nursery --- piker/brokers/ib/broker.py | 110 ++++++++++++++++++------------------- 1 file changed, 54 insertions(+), 56 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index acf055dc..6efdf7b1 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -72,9 +72,7 @@ from piker.clearing._messages import ( from piker.data._source import Symbol from .api import ( _accounts2clients, - # _adhoc_futes_set, con2fqsn, - # _adhoc_symbol_map, log, get_config, open_client_proxies, @@ -440,7 +438,7 @@ async def trades_dialogue( # we might also want to delegate a specific actor for # ledger writing / reading for speed? async with ( - trio.open_nursery() as nurse, + # trio.open_nursery() as nurse, open_client_proxies() as (proxies, aioclients), ): # Open a trade ledgers stack for appending trade records over @@ -453,6 +451,8 @@ async def trades_dialogue( ): for account, proxy in proxies.items(): + assert account in accounts_def + accounts.add(account) acctid = account.strip('ib.') acctids.add(acctid) @@ -467,46 +467,6 @@ async def trades_dialogue( client = aioclients[account] - async def open_trade_event_stream( - task_status: TaskStatus[ - trio.abc.ReceiveChannel - ] = trio.TASK_STATUS_IGNORED, - ): - # each api client has a unique event stream - async with tractor.to_asyncio.open_channel_from( - recv_trade_updates, - client=client, - ) as (first, trade_event_stream): - - task_status.started(trade_event_stream) - await trio.sleep_forever() - - trade_event_stream = await nurse.start(open_trade_event_stream) - clients.append((client, trade_event_stream)) - - assert account in accounts_def - accounts.add(account) - - # update trades ledgers for all accounts from connected - # api clients which report trades for **this session**. - trades = await proxy.trades() - ( - trans_by_acct, - api_ready_for_ledger_entries, - ) = await update_ledger_from_api_trades( - trades, - proxy, - ) - - # if new trades are detected from the API, prepare - # them for the ledger file and update the pptable. - if api_ready_for_ledger_entries: - trade_entries = api_ready_for_ledger_entries[acctid] - ledger.update(trade_entries) - trans = trans_by_acct.get(acctid) - if trans: - table.update_from_trans(trans) - # process pp value reported from ib's system. we only use these # to cross-check sizing since average pricing on their end uses # the so called (bs) "FIFO" style which more or less results in @@ -534,13 +494,37 @@ async def trades_dialogue( trans = norm_trade_records(ledger) updated = table.update_from_trans(trans) pp = updated[bsuid] + + # update trades ledgers for all accounts from connected + # api clients which report trades for **this session**. + trades = await proxy.trades() + ( + trans_by_acct, + api_to_ledger_entries, + ) = await update_ledger_from_api_trades( + trades, + proxy, + ) + + # if new trades are detected from the API, prepare + # them for the ledger file and update the pptable. + if api_to_ledger_entries: + trade_entries = api_to_ledger_entries[acctid] + + # write ledger with all new trades **AFTER** + # we've updated the `pps.toml` from the + # original ledger state! (i.e. this is + # currently done on exit) + ledger.update(trade_entries) + + trans = trans_by_acct.get(acctid) + if trans: + table.update_from_trans(trans) + updated = table.update_from_trans(trans) + assert msg.size == pp.size, 'WTF' - # TODO: figure out why these don't match? - # assert pp.calc_be_price() == pp.be_price - - _, closed_pps = table.dump_active('ib') - active_pps = table.pps + active_pps, closed_pps = table.dump_active() # load all positions from `pps.toml`, cross check with # ib's positions data, and relay re-formatted pps as @@ -571,15 +555,28 @@ async def trades_dialogue( tuple(name for name in accounts_def if name in accounts), )) - # write ledger with all new trades **AFTER** we've updated the - # `pps.toml` from the original ledger state! - for acctid, trades_by_id in api_ready_for_ledger_entries.items(): - ledgers[acctid].update(trades_by_id) + # proxy wrapper for starting trade event stream + async def open_trade_event_stream( + task_status: TaskStatus[ + trio.abc.ReceiveChannel + ] = trio.TASK_STATUS_IGNORED, + ): + # each api client has a unique event stream + async with tractor.to_asyncio.open_channel_from( + recv_trade_updates, + client=client, + ) as (first, trade_event_stream): + + task_status.started(trade_event_stream) + await trio.sleep_forever() async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): + trade_event_stream = await n.start(open_trade_event_stream) + clients.append((client, trade_event_stream)) + # start order request handler **before** local trades # event loop n.start_soon(handle_order_requests, ems_stream, accounts_def) @@ -621,7 +618,7 @@ async def emit_pp_update( acctid = acctid.strip('ib.') ( records_by_acct, - api_ready_for_ledger_entries, + api_to_ledger_entries, ) = await update_ledger_from_api_trades( [trade_entry], proxy, @@ -631,11 +628,10 @@ async def emit_pp_update( table = tables[acctid] table.update_from_trans(trans) - _, closed = table.dump_active('ib') - active = table.pps + active, closed = table.dump_active() # NOTE: update ledger with all new trades - for acctid, trades_by_id in api_ready_for_ledger_entries.items(): + for acctid, trades_by_id in api_to_ledger_entries.items(): ledger = ledgers[acctid] ledger.update(trades_by_id) @@ -655,6 +651,7 @@ async def emit_pp_update( ) if msgs: msg = msgs[0] + log.info('Emitting pp msg: {msg}') break await ems_stream.send(msg) @@ -876,6 +873,7 @@ async def deliver_trade_events( case 'position': cid, msg = pack_position(item) + log.info(f'New IB position msg: {msg}') # acctid = msg.account = accounts_def.inverse[msg.account] # cuck ib and it's shitty fifo sys for pps! # await ems_stream.send(msg) From 7dbcbfdcd5747c8ff784f7f7de57b2f3d81d68b1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Jul 2022 08:44:30 -0400 Subject: [PATCH 7/8] Write `pps.toml` shortly after broker startup --- piker/brokers/ib/broker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 6efdf7b1..b604e4db 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -595,6 +595,9 @@ async def trades_dialogue( tables, ) + # TODO: make this thread-async! + table.write_config() + # block until cancelled await trio.sleep_forever() From d950c78b81fc41ff949f6e3acbdd6b2fb94fb80b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Jul 2022 11:17:14 -0400 Subject: [PATCH 8/8] Mention liquidation in error msg --- piker/brokers/ib/broker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index b604e4db..9f384166 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -397,7 +397,8 @@ async def update_and_audit_msgs( raise ValueError( f'UNEXPECTED POSITION ib <-> piker ledger:\n' f'piker: {msg}\n' - 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' + 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n' + 'MAYBE THEY LIQUIDATED YOU BRO!??!' ) msgs.append(msg)