Compare commits
	
		
			5 Commits 
		
	
	
		
			gitea_feat
			...
			ib_native_
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | e1d57e8a8a | |
|  | ad458e3fcd | |
|  | f26c399ad3 | |
|  | a741ed3161 | |
|  | b9fbbeb44e | 
|  | @ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS. | |||
| 
 | ||||
| """ | ||||
| from __future__ import annotations | ||||
| from contextlib import ExitStack | ||||
| from dataclasses import asdict | ||||
| from functools import partial | ||||
| from pprint import pformat | ||||
|  | @ -35,8 +36,8 @@ from trio_typing import TaskStatus | |||
| import tractor | ||||
| from ib_insync.contract import ( | ||||
|     Contract, | ||||
|     Option, | ||||
|     Forex, | ||||
|     # Option, | ||||
|     # Forex, | ||||
| ) | ||||
| from ib_insync.order import ( | ||||
|     Trade, | ||||
|  | @ -47,11 +48,17 @@ from ib_insync.objects import ( | |||
|     Execution, | ||||
|     CommissionReport, | ||||
| ) | ||||
| from ib_insync.objects import Position | ||||
| from ib_insync.objects import Position as IbPosition | ||||
| import pendulum | ||||
| 
 | ||||
| from piker import config | ||||
| from piker import pp | ||||
| from piker.pp import ( | ||||
|     Position, | ||||
|     Transaction, | ||||
|     open_trade_ledger, | ||||
|     open_pps, | ||||
|     PpTable, | ||||
| ) | ||||
| from piker.log import get_console_log | ||||
| from piker.clearing._messages import ( | ||||
|     BrokerdOrder, | ||||
|  | @ -66,7 +73,8 @@ from piker.data._source import Symbol | |||
| from .api import ( | ||||
|     _accounts2clients, | ||||
|     # _adhoc_futes_set, | ||||
|     _adhoc_symbol_map, | ||||
|     con2fqsn, | ||||
|     # _adhoc_symbol_map, | ||||
|     log, | ||||
|     get_config, | ||||
|     open_client_proxies, | ||||
|  | @ -76,49 +84,12 @@ from .api import ( | |||
| 
 | ||||
| 
 | ||||
| def pack_position( | ||||
|     pos: Position | ||||
|     pos: IbPosition | ||||
| 
 | ||||
| ) -> dict[str, Any]: | ||||
| 
 | ||||
|     con = pos.contract | ||||
| 
 | ||||
|     if isinstance(con, Option): | ||||
|         # TODO: option symbol parsing and sane display: | ||||
|         symbol = con.localSymbol.replace(' ', '') | ||||
| 
 | ||||
|     else: | ||||
|         # TODO: lookup fqsn even for derivs. | ||||
|         symbol = con.symbol.lower() | ||||
| 
 | ||||
|     # TODO: probably write a mofo exchange mapper routine since ib | ||||
|     # can't get it's shit together like, ever. | ||||
| 
 | ||||
|     # try our best to figure out the exchange / venue | ||||
|     exch = (con.primaryExchange or con.exchange).lower() | ||||
|     if not exch: | ||||
| 
 | ||||
|         if isinstance(con, Forex): | ||||
|             # bc apparently it's not in the contract obj? | ||||
|             exch = 'idealfx' | ||||
| 
 | ||||
|         else: | ||||
|             # for wtv cucked reason some futes don't show their | ||||
|             # exchange (like CL.NYMEX) ... | ||||
|             entry = _adhoc_symbol_map.get( | ||||
|                 con.symbol or con.localSymbol | ||||
|             ) | ||||
|             if entry: | ||||
|                 meta, kwargs = entry | ||||
|                 cid = meta.get('conId') | ||||
|                 if cid: | ||||
|                     assert con.conId == meta['conId'] | ||||
|                 exch = meta['exchange'] | ||||
| 
 | ||||
|     assert exch, f'No clue:\n {con}' | ||||
|     fqsn = '.'.join((symbol, exch)) | ||||
| 
 | ||||
|     expiry = con.lastTradeDateOrContractMonth | ||||
|     if expiry: | ||||
|         fqsn += f'.{expiry}' | ||||
|     fqsn, calc_price = con2fqsn(con) | ||||
| 
 | ||||
|     # TODO: options contracts into a sane format.. | ||||
|     return ( | ||||
|  | @ -305,12 +276,10 @@ async def update_ledger_from_api_trades( | |||
|     client: Union[Client, MethodProxy], | ||||
| 
 | ||||
| ) -> tuple[ | ||||
|     dict[str, pp.Transaction], | ||||
|     dict[str, Transaction], | ||||
|     dict[str, dict], | ||||
| ]: | ||||
| 
 | ||||
|     conf = get_config() | ||||
| 
 | ||||
|     # XXX; ERRGGG.. | ||||
|     # pack in the "primary/listing exchange" value from a | ||||
|     # contract lookup since it seems this isn't available by | ||||
|  | @ -331,39 +300,33 @@ async def update_ledger_from_api_trades( | |||
| 
 | ||||
|         entry['listingExchange'] = pexch | ||||
| 
 | ||||
|     conf = get_config() | ||||
|     entries = trades_to_ledger_entries( | ||||
|         conf['accounts'].inverse, | ||||
|         trade_entries, | ||||
|     ) | ||||
| 
 | ||||
|     # write recent session's trades to the user's (local) ledger file. | ||||
|     records: dict[str, pp.Transactions] = {} | ||||
|     # normalize recent session's trades to the `Transaction` type | ||||
|     trans_by_acct: dict[str, dict[str, Transaction]] = {} | ||||
| 
 | ||||
|     for acctid, trades_by_id in entries.items(): | ||||
|         # normalize to transaction form | ||||
|         records[acctid] = norm_trade_records(trades_by_id) | ||||
|         trans_by_acct[acctid] = norm_trade_records(trades_by_id) | ||||
| 
 | ||||
|     return records, entries | ||||
|     return trans_by_acct, entries | ||||
| 
 | ||||
| 
 | ||||
| async def update_and_audit_msgs( | ||||
|     acctid: str,  # no `ib.` prefix is required! | ||||
|     pps: list[pp.Position], | ||||
|     pps: list[Position], | ||||
|     cids2pps: dict[tuple[str, int], BrokerdPosition], | ||||
|     validate: bool = False, | ||||
| 
 | ||||
| ) -> list[BrokerdPosition]: | ||||
| 
 | ||||
|     msgs: list[BrokerdPosition] = [] | ||||
|     # pps: dict[int, pp.Position] = {} | ||||
| 
 | ||||
|     for p in pps: | ||||
|         bsuid = p.bsuid | ||||
| 
 | ||||
|         # build trade-session-actor local table | ||||
|         # of pps from unique symbol ids. | ||||
|         # pps[bsuid] = p | ||||
| 
 | ||||
|         # retreive equivalent ib reported position message | ||||
|         # for comparison/audit versus the piker equivalent | ||||
|         # breakeven pp calcs. | ||||
|  | @ -462,6 +425,8 @@ async def trades_dialogue( | |||
|     all_positions = [] | ||||
|     accounts = set() | ||||
|     clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] | ||||
|     acctids = set() | ||||
|     cids2pps: dict[str, BrokerdPosition] = {} | ||||
| 
 | ||||
|     # TODO: this causes a massive tractor bug when you run marketstored | ||||
|     # with ``--tsdb``... you should get: | ||||
|  | @ -471,15 +436,38 @@ async def trades_dialogue( | |||
|     # - hitting final control-c to kill daemon will lead to hang | ||||
|     # assert 0 | ||||
| 
 | ||||
|     # TODO: just write on teardown? | ||||
|     # we might also want to delegate a specific actor for | ||||
|     # ledger writing / reading for speed? | ||||
|     async with ( | ||||
|         trio.open_nursery() as nurse, | ||||
|         open_client_proxies() as (proxies, aioclients), | ||||
|     ): | ||||
|         # Open a trade ledgers stack for appending trade records over | ||||
|         # multiple accounts. | ||||
|         # TODO: we probably want to generalize this into a "ledgers" api.. | ||||
|         ledgers: dict[str, dict] = {} | ||||
|         tables: dict[str, PpTable] = {} | ||||
|         with ( | ||||
|             ExitStack() as lstack, | ||||
|         ): | ||||
|             for account, proxy in proxies.items(): | ||||
| 
 | ||||
|                 acctid = account.strip('ib.') | ||||
|                 acctids.add(acctid) | ||||
| 
 | ||||
|                 # open ledger and pptable wrapper for each | ||||
|                 # detected account. | ||||
|                 ledger = ledgers[acctid] = lstack.enter_context( | ||||
|                     open_trade_ledger('ib', acctid) | ||||
|                 ) | ||||
|                 table = tables[acctid] = lstack.enter_context( | ||||
|                     open_pps('ib', acctid) | ||||
|                 ) | ||||
| 
 | ||||
|                 client = aioclients[account] | ||||
| 
 | ||||
|             async def open_stream( | ||||
|                 async def open_trade_event_stream( | ||||
|                     task_status: TaskStatus[ | ||||
|                         trio.abc.ReceiveChannel | ||||
|                     ] = trio.TASK_STATUS_IGNORED, | ||||
|  | @ -493,75 +481,77 @@ async def trades_dialogue( | |||
|                         task_status.started(trade_event_stream) | ||||
|                         await trio.sleep_forever() | ||||
| 
 | ||||
|             trade_event_stream = await nurse.start(open_stream) | ||||
| 
 | ||||
|                 trade_event_stream = await nurse.start(open_trade_event_stream) | ||||
|                 clients.append((client, trade_event_stream)) | ||||
| 
 | ||||
|                 assert account in accounts_def | ||||
|                 accounts.add(account) | ||||
| 
 | ||||
|         cids2pps: dict[str, BrokerdPosition] = {} | ||||
|         update_records: dict[str, bidict] = {} | ||||
|                 # update trades ledgers for all accounts from connected | ||||
|                 # api clients which report trades for **this session**. | ||||
|                 trades = await proxy.trades() | ||||
|                 ( | ||||
|                     trans_by_acct, | ||||
|                     api_ready_for_ledger_entries, | ||||
|                 ) = await update_ledger_from_api_trades( | ||||
|                     trades, | ||||
|                     proxy, | ||||
|                 ) | ||||
| 
 | ||||
|                 # if new trades are detected from the API, prepare | ||||
|                 # them for the ledger file and update the pptable. | ||||
|                 if api_ready_for_ledger_entries: | ||||
|                     trade_entries = api_ready_for_ledger_entries[acctid] | ||||
|                     ledger.update(trade_entries) | ||||
|                     trans = trans_by_acct.get(acctid) | ||||
|                     if trans: | ||||
|                         table.update_from_trans(trans) | ||||
| 
 | ||||
|                 # process pp value reported from ib's system. we only use these | ||||
|                 # to cross-check sizing since average pricing on their end uses | ||||
|                 # the so called (bs) "FIFO" style which more or less results in | ||||
|                 # a price that's not useful for traders who want to not lose | ||||
|                 # money.. xb | ||||
|         for client in aioclients.values(): | ||||
|                 # for client in aioclients.values(): | ||||
|                 for pos in client.positions(): | ||||
| 
 | ||||
|                 cid, msg = pack_position(pos) | ||||
|                 acctid = msg.account = accounts_def.inverse[msg.account] | ||||
|                 acctid = acctid.strip('ib.') | ||||
|                 cids2pps[(acctid, cid)] = msg | ||||
|                 assert msg.account in accounts, ( | ||||
|                     f'Position for unknown account: {msg.account}') | ||||
| 
 | ||||
|                     # collect all ib-pp reported positions so that we can be | ||||
|                     # sure know which positions to update from the ledger if | ||||
|                     # any are missing from the ``pps.toml`` | ||||
|                 update_records.setdefault(acctid, bidict())[cid] = msg.symbol | ||||
|                     bsuid, msg = pack_position(pos) | ||||
|                     acctid = msg.account = accounts_def.inverse[msg.account] | ||||
|                     acctid = acctid.strip('ib.') | ||||
|                     cids2pps[(acctid, bsuid)] = msg | ||||
|                     assert msg.account in accounts, ( | ||||
|                         f'Position for unknown account: {msg.account}') | ||||
| 
 | ||||
|         # update trades ledgers for all accounts from | ||||
|         # connected api clients which report trades for **this session**. | ||||
|         new_trades = {} | ||||
|         for account, proxy in proxies.items(): | ||||
|             trades = await proxy.trades() | ||||
|             ( | ||||
|                 records_by_acct, | ||||
|                 ledger_entries, | ||||
|             ) = await update_ledger_from_api_trades( | ||||
|                 trades, | ||||
|                 proxy, | ||||
|             ) | ||||
|             new_trades.update(records_by_acct) | ||||
|                     table = tables[acctid] | ||||
|                     pp = table.pps.get(bsuid) | ||||
|                     if ( | ||||
|                         not pp | ||||
|                         or pp.size != msg.size | ||||
|                     ): | ||||
|                         trans = norm_trade_records(ledger) | ||||
|                         updated = table.update_from_trans(trans) | ||||
|                         pp = updated[bsuid] | ||||
|                         assert msg.size == pp.size, 'WTF' | ||||
| 
 | ||||
|         for acctid, trans in new_trades.items(): | ||||
|             for t in trans: | ||||
|                 bsuid = t.bsuid | ||||
|                 if bsuid in update_records: | ||||
|                     assert update_records[bsuid] == t.fqsn | ||||
|                 else: | ||||
|                     update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn | ||||
|                         # TODO: figure out why these don't match? | ||||
|                         # assert pp.calc_be_price() == pp.be_price | ||||
| 
 | ||||
|         # load all positions from `pps.toml`, cross check with ib's | ||||
|         # positions data, and relay re-formatted pps as msgs to the ems. | ||||
|                 _, closed_pps = table.dump_active('ib') | ||||
|                 active_pps = table.pps | ||||
| 
 | ||||
|                 # load all positions from `pps.toml`, cross check with | ||||
|                 # ib's positions data, and relay re-formatted pps as | ||||
|                 # msgs to the ems. | ||||
|                 # __2 cases__: | ||||
|                 # - new trades have taken place this session that we want to | ||||
|                 #   always reprocess indempotently, | ||||
|                 # - no new trades yet but we want to reload and audit any | ||||
|                 #   positions reported by ib's sys that may not yet be in | ||||
|                 #   piker's ``pps.toml`` state-file. | ||||
|         for acctid, to_update in update_records.items(): | ||||
|             trans = new_trades.get(acctid) | ||||
|             active, closed = pp.update_pps_conf( | ||||
|                 'ib', | ||||
|                 acctid, | ||||
|                 trade_records=trans, | ||||
|                 ledger_reload=to_update, | ||||
|             ) | ||||
|             for pps in [active, closed]: | ||||
|                 for pps in [active_pps, closed_pps]: | ||||
|                     msgs = await update_and_audit_msgs( | ||||
|                         acctid, | ||||
|                         pps.values(), | ||||
|  | @ -576,32 +566,22 @@ async def trades_dialogue( | |||
|                     f'{pformat(cids2pps)}' | ||||
|                 ) | ||||
| 
 | ||||
|         # log.info(f'Loaded {len(trades)} from this session') | ||||
|         # TODO: write trades to local ``trades.toml`` | ||||
|         # - use above per-session trades data and write to local file | ||||
|         # - get the "flex reports" working and pull historical data and | ||||
|         # also save locally. | ||||
| 
 | ||||
|             await ctx.started(( | ||||
|                 all_positions, | ||||
|                 tuple(name for name in accounts_def if name in accounts), | ||||
|             )) | ||||
| 
 | ||||
|         # TODO: maybe just write on teardown? | ||||
|         # we might also want to delegate a specific actor for | ||||
|         # ledger writing / reading for speed? | ||||
| 
 | ||||
|             # write ledger with all new trades **AFTER** we've updated the | ||||
|             # `pps.toml` from the original ledger state! | ||||
|         for acctid, trades_by_id in ledger_entries.items(): | ||||
|             with pp.open_trade_ledger('ib', acctid) as ledger: | ||||
|                 ledger.update(trades_by_id) | ||||
|             for acctid, trades_by_id in api_ready_for_ledger_entries.items(): | ||||
|                 ledgers[acctid].update(trades_by_id) | ||||
| 
 | ||||
|             async with ( | ||||
|                 ctx.open_stream() as ems_stream, | ||||
|                 trio.open_nursery() as n, | ||||
|             ): | ||||
|             # start order request handler **before** local trades event loop | ||||
|                 # start order request handler **before** local trades | ||||
|                 # event loop | ||||
|                 n.start_soon(handle_order_requests, ems_stream, accounts_def) | ||||
| 
 | ||||
|                 # allocate event relay tasks for each client connection | ||||
|  | @ -613,6 +593,9 @@ async def trades_dialogue( | |||
|                         accounts_def, | ||||
|                         cids2pps, | ||||
|                         proxies, | ||||
| 
 | ||||
|                         ledgers, | ||||
|                         tables, | ||||
|                     ) | ||||
| 
 | ||||
|                 # block until cancelled | ||||
|  | @ -626,44 +609,44 @@ async def emit_pp_update( | |||
|     proxies: dict, | ||||
|     cids2pps: dict, | ||||
| 
 | ||||
|     ledgers, | ||||
|     tables, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     # compute and relay incrementally updated piker pp | ||||
|     acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] | ||||
|     proxy = proxies[acctid] | ||||
| 
 | ||||
|     acctname = acctid.strip('ib.') | ||||
|     records_by_acct, ledger_entries = await update_ledger_from_api_trades( | ||||
|     acctid = acctid.strip('ib.') | ||||
|     ( | ||||
|         records_by_acct, | ||||
|         api_ready_for_ledger_entries, | ||||
|     ) = await update_ledger_from_api_trades( | ||||
|         [trade_entry], | ||||
|         proxy, | ||||
|     ) | ||||
|     records = records_by_acct[acctname] | ||||
|     r = records[0] | ||||
|     trans = records_by_acct[acctid] | ||||
|     r = list(trans.values())[0] | ||||
| 
 | ||||
|     # update and load all positions from `pps.toml`, cross check with | ||||
|     # ib's positions data, and relay re-formatted pps as msgs to the | ||||
|     # ems. we report both the open and closed updates in one map since | ||||
|     # for incremental update we may have just fully closed a pp and need | ||||
|     # to relay that msg as well! | ||||
|     active, closed = pp.update_pps_conf( | ||||
|         'ib', | ||||
|         acctname, | ||||
|         trade_records=records, | ||||
|         ledger_reload={r.bsuid: r.fqsn}, | ||||
|     ) | ||||
|     table = tables[acctid] | ||||
|     table.update_from_trans(trans) | ||||
|     _, closed = table.dump_active('ib') | ||||
|     active = table.pps | ||||
| 
 | ||||
|     # NOTE: write ledger with all new trades **AFTER** we've updated the | ||||
|     # `pps.toml` from the original ledger state! | ||||
|     for acctid, trades_by_id in ledger_entries.items(): | ||||
|         with pp.open_trade_ledger('ib', acctid) as ledger: | ||||
|     # NOTE: update ledger with all new trades | ||||
|     for acctid, trades_by_id in api_ready_for_ledger_entries.items(): | ||||
|         ledger = ledgers[acctid] | ||||
|         ledger.update(trades_by_id) | ||||
| 
 | ||||
|     # generate pp msgs and cross check with ib's positions data, relay | ||||
|     # re-formatted pps as msgs to the ems. | ||||
|     for pos in filter( | ||||
|         bool, | ||||
|         [active.get(r.bsuid), closed.get(r.bsuid)] | ||||
|     ): | ||||
|         msgs = await update_and_audit_msgs( | ||||
|             acctname, | ||||
|             acctid, | ||||
|             [pos], | ||||
|             cids2pps, | ||||
| 
 | ||||
|  | @ -685,6 +668,9 @@ async def deliver_trade_events( | |||
|     cids2pps: dict[tuple[str, str], BrokerdPosition], | ||||
|     proxies: dict[str, MethodProxy], | ||||
| 
 | ||||
|     ledgers, | ||||
|     tables, | ||||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Format and relay all trade events for a given client to emsd. | ||||
|  | @ -834,6 +820,8 @@ async def deliver_trade_events( | |||
|                         accounts_def, | ||||
|                         proxies, | ||||
|                         cids2pps, | ||||
|                         ledgers, | ||||
|                         tables, | ||||
|                     ) | ||||
| 
 | ||||
|             case 'cost': | ||||
|  | @ -866,6 +854,8 @@ async def deliver_trade_events( | |||
|                         accounts_def, | ||||
|                         proxies, | ||||
|                         cids2pps, | ||||
|                         ledgers, | ||||
|                         tables, | ||||
|                     ) | ||||
| 
 | ||||
|             case 'error': | ||||
|  | @ -916,14 +906,13 @@ async def deliver_trade_events( | |||
| def norm_trade_records( | ||||
|     ledger: dict[str, Any], | ||||
| 
 | ||||
| ) -> list[pp.Transaction]: | ||||
| ) -> list[Transaction]: | ||||
|     ''' | ||||
|     Normalize a flex report or API retrieved executions | ||||
|     ledger into our standard record format. | ||||
| 
 | ||||
|     ''' | ||||
|     records: list[pp.Transaction] = [] | ||||
| 
 | ||||
|     records: dict[str, Transaction] = {} | ||||
|     for tid, record in ledger.items(): | ||||
| 
 | ||||
|         conid = record.get('conId') or record['conid'] | ||||
|  | @ -1001,7 +990,7 @@ def norm_trade_records( | |||
|         #   which case, we can pull the fqsn from that table (see | ||||
|         #   `trades_dialogue()` above). | ||||
| 
 | ||||
|         records.append(pp.Transaction( | ||||
|         records[tid] = Transaction( | ||||
|             fqsn=fqsn, | ||||
|             tid=tid, | ||||
|             size=size, | ||||
|  | @ -1010,7 +999,7 @@ def norm_trade_records( | |||
|             dt=dt, | ||||
|             expiry=expiry, | ||||
|             bsuid=conid, | ||||
|         )) | ||||
|         ) | ||||
| 
 | ||||
|     return records | ||||
| 
 | ||||
|  | @ -1139,8 +1128,7 @@ def load_flex_trades( | |||
| 
 | ||||
|     trade_entries = report.extract('Trade') | ||||
|     ln = len(trade_entries) | ||||
|     # log.info(f'Loaded {ln} trades from flex query') | ||||
|     print(f'Loaded {ln} trades from flex query') | ||||
|     log.info(f'Loaded {ln} trades from flex query') | ||||
| 
 | ||||
|     trades_by_account = trades_to_ledger_entries( | ||||
|         # get reverse map to user account names | ||||
|  | @ -1149,14 +1137,20 @@ def load_flex_trades( | |||
|         source_type='flex', | ||||
|     ) | ||||
| 
 | ||||
|     ledgers = {} | ||||
|     for acctid, trades_by_id in trades_by_account.items(): | ||||
|         with pp.open_trade_ledger('ib', acctid) as ledger: | ||||
|             ledger.update(trades_by_id) | ||||
|     for acctid in trades_by_account: | ||||
|         trades_by_id = trades_by_account[acctid] | ||||
|         with open_trade_ledger('ib', acctid) as ledger_dict: | ||||
|             tid_delta = set(trades_by_id) - set(ledger_dict) | ||||
|             log.info( | ||||
|                 'New trades detected\n' | ||||
|                 f'{pformat(tid_delta)}' | ||||
|             ) | ||||
|             if tid_delta: | ||||
|                 ledger_dict.update( | ||||
|                     {tid: trades_by_id[tid] for tid in tid_delta} | ||||
|                 ) | ||||
| 
 | ||||
|         ledgers[acctid] = ledger | ||||
| 
 | ||||
|     return ledgers | ||||
|     return ledger_dict | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|  |  | |||
|  | @ -301,7 +301,13 @@ async def get_bars( | |||
|                 else: | ||||
| 
 | ||||
|                     log.warning('Sending CONNECTION RESET') | ||||
|                     await data_reset_hack(reset_type='connection') | ||||
|                     res = await data_reset_hack(reset_type='connection') | ||||
|                     if not res: | ||||
|                         log.warning( | ||||
|                             'NO VNC DETECTED!\n' | ||||
|                             'Manually press ctrl-alt-f on your IB java app' | ||||
|                         ) | ||||
|                         # break | ||||
| 
 | ||||
|                     with trio.move_on_after(timeout) as cs: | ||||
|                         for name, ev in [ | ||||
|  | @ -570,7 +576,6 @@ def normalize( | |||
| 
 | ||||
|     # check for special contract types | ||||
|     con = ticker.contract | ||||
| 
 | ||||
|     fqsn, calc_price = con2fqsn(con) | ||||
| 
 | ||||
|     # convert named tuples to dicts so we send usable keys | ||||
|  | @ -842,7 +847,10 @@ async def data_reset_hack( | |||
|             client.mouse.click() | ||||
|             client.keyboard.press('Ctrl', 'Alt', key)  # keys are stacked | ||||
| 
 | ||||
|     try: | ||||
|         await tractor.to_asyncio.run_task(vnc_click_hack) | ||||
|     except OSError: | ||||
|         return False | ||||
| 
 | ||||
|     # we don't really need the ``xdotool`` approach any more B) | ||||
|     return True | ||||
|  | @ -857,15 +865,24 @@ async def open_symbol_search( | |||
|     # TODO: load user defined symbol set locally for fast search? | ||||
|     await ctx.started({}) | ||||
| 
 | ||||
|     async with open_data_client() as proxy: | ||||
|     # async with open_data_client() as proxy: | ||||
|     async with ( | ||||
|         open_client_proxies() as (proxies, clients), | ||||
|     ): | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             # await tractor.breakpoint() | ||||
|             proxy = proxies['ib.algopaper'] | ||||
| 
 | ||||
|             last = time.time() | ||||
| 
 | ||||
|             async for pattern in stream: | ||||
|                 log.debug(f'received {pattern}') | ||||
|                 log.info(f'received {pattern}') | ||||
|                 now = time.time() | ||||
| 
 | ||||
|                 # this causes tractor hang... | ||||
|                 # assert 0 | ||||
| 
 | ||||
|                 assert pattern, 'IB can not accept blank search pattern' | ||||
| 
 | ||||
|                 # throttle search requests to no faster then 1Hz | ||||
|  | @ -893,7 +910,7 @@ async def open_symbol_search( | |||
| 
 | ||||
|                     continue | ||||
| 
 | ||||
|                 log.debug(f'searching for {pattern}') | ||||
|                 log.info(f'searching for {pattern}') | ||||
| 
 | ||||
|                 last = time.time() | ||||
| 
 | ||||
|  | @ -904,6 +921,8 @@ async def open_symbol_search( | |||
|                 async def stash_results(target: Awaitable[list]): | ||||
|                     stock_results.extend(await target) | ||||
| 
 | ||||
|                 for i in range(10): | ||||
|                     with trio.move_on_after(3) as cs: | ||||
|                         async with trio.open_nursery() as sn: | ||||
|                             sn.start_soon( | ||||
|                                 stash_results, | ||||
|  | @ -916,6 +935,12 @@ async def open_symbol_search( | |||
|                             # trigger async request | ||||
|                             await trio.sleep(0) | ||||
| 
 | ||||
|                     if cs.cancelled_caught: | ||||
|                         log.warning(f'Search timeout? {proxy._aio_ns.ib.client}') | ||||
|                         continue | ||||
|                     else: | ||||
|                         break | ||||
| 
 | ||||
|                     # # match against our ad-hoc set immediately | ||||
|                     # adhoc_matches = fuzzy.extractBests( | ||||
|                     #     pattern, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue