Compare commits
	
		
			5 Commits 
		
	
	
		
			gitea_feat
			...
			ib_native_
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | e1d57e8a8a | |
|  | ad458e3fcd | |
|  | f26c399ad3 | |
|  | a741ed3161 | |
|  | b9fbbeb44e | 
|  | @ -18,6 +18,7 @@ Order and trades endpoints for use with ``piker``'s EMS. | ||||||
| 
 | 
 | ||||||
| """ | """ | ||||||
| from __future__ import annotations | from __future__ import annotations | ||||||
|  | from contextlib import ExitStack | ||||||
| from dataclasses import asdict | from dataclasses import asdict | ||||||
| from functools import partial | from functools import partial | ||||||
| from pprint import pformat | from pprint import pformat | ||||||
|  | @ -35,8 +36,8 @@ from trio_typing import TaskStatus | ||||||
| import tractor | import tractor | ||||||
| from ib_insync.contract import ( | from ib_insync.contract import ( | ||||||
|     Contract, |     Contract, | ||||||
|     Option, |     # Option, | ||||||
|     Forex, |     # Forex, | ||||||
| ) | ) | ||||||
| from ib_insync.order import ( | from ib_insync.order import ( | ||||||
|     Trade, |     Trade, | ||||||
|  | @ -47,11 +48,17 @@ from ib_insync.objects import ( | ||||||
|     Execution, |     Execution, | ||||||
|     CommissionReport, |     CommissionReport, | ||||||
| ) | ) | ||||||
| from ib_insync.objects import Position | from ib_insync.objects import Position as IbPosition | ||||||
| import pendulum | import pendulum | ||||||
| 
 | 
 | ||||||
| from piker import config | from piker import config | ||||||
| from piker import pp | from piker.pp import ( | ||||||
|  |     Position, | ||||||
|  |     Transaction, | ||||||
|  |     open_trade_ledger, | ||||||
|  |     open_pps, | ||||||
|  |     PpTable, | ||||||
|  | ) | ||||||
| from piker.log import get_console_log | from piker.log import get_console_log | ||||||
| from piker.clearing._messages import ( | from piker.clearing._messages import ( | ||||||
|     BrokerdOrder, |     BrokerdOrder, | ||||||
|  | @ -66,7 +73,8 @@ from piker.data._source import Symbol | ||||||
| from .api import ( | from .api import ( | ||||||
|     _accounts2clients, |     _accounts2clients, | ||||||
|     # _adhoc_futes_set, |     # _adhoc_futes_set, | ||||||
|     _adhoc_symbol_map, |     con2fqsn, | ||||||
|  |     # _adhoc_symbol_map, | ||||||
|     log, |     log, | ||||||
|     get_config, |     get_config, | ||||||
|     open_client_proxies, |     open_client_proxies, | ||||||
|  | @ -76,49 +84,12 @@ from .api import ( | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def pack_position( | def pack_position( | ||||||
|     pos: Position |     pos: IbPosition | ||||||
| 
 | 
 | ||||||
| ) -> dict[str, Any]: | ) -> dict[str, Any]: | ||||||
|  | 
 | ||||||
|     con = pos.contract |     con = pos.contract | ||||||
| 
 |     fqsn, calc_price = con2fqsn(con) | ||||||
|     if isinstance(con, Option): |  | ||||||
|         # TODO: option symbol parsing and sane display: |  | ||||||
|         symbol = con.localSymbol.replace(' ', '') |  | ||||||
| 
 |  | ||||||
|     else: |  | ||||||
|         # TODO: lookup fqsn even for derivs. |  | ||||||
|         symbol = con.symbol.lower() |  | ||||||
| 
 |  | ||||||
|     # TODO: probably write a mofo exchange mapper routine since ib |  | ||||||
|     # can't get it's shit together like, ever. |  | ||||||
| 
 |  | ||||||
|     # try our best to figure out the exchange / venue |  | ||||||
|     exch = (con.primaryExchange or con.exchange).lower() |  | ||||||
|     if not exch: |  | ||||||
| 
 |  | ||||||
|         if isinstance(con, Forex): |  | ||||||
|             # bc apparently it's not in the contract obj? |  | ||||||
|             exch = 'idealfx' |  | ||||||
| 
 |  | ||||||
|         else: |  | ||||||
|             # for wtv cucked reason some futes don't show their |  | ||||||
|             # exchange (like CL.NYMEX) ... |  | ||||||
|             entry = _adhoc_symbol_map.get( |  | ||||||
|                 con.symbol or con.localSymbol |  | ||||||
|             ) |  | ||||||
|             if entry: |  | ||||||
|                 meta, kwargs = entry |  | ||||||
|                 cid = meta.get('conId') |  | ||||||
|                 if cid: |  | ||||||
|                     assert con.conId == meta['conId'] |  | ||||||
|                 exch = meta['exchange'] |  | ||||||
| 
 |  | ||||||
|     assert exch, f'No clue:\n {con}' |  | ||||||
|     fqsn = '.'.join((symbol, exch)) |  | ||||||
| 
 |  | ||||||
|     expiry = con.lastTradeDateOrContractMonth |  | ||||||
|     if expiry: |  | ||||||
|         fqsn += f'.{expiry}' |  | ||||||
| 
 | 
 | ||||||
|     # TODO: options contracts into a sane format.. |     # TODO: options contracts into a sane format.. | ||||||
|     return ( |     return ( | ||||||
|  | @ -305,12 +276,10 @@ async def update_ledger_from_api_trades( | ||||||
|     client: Union[Client, MethodProxy], |     client: Union[Client, MethodProxy], | ||||||
| 
 | 
 | ||||||
| ) -> tuple[ | ) -> tuple[ | ||||||
|     dict[str, pp.Transaction], |     dict[str, Transaction], | ||||||
|     dict[str, dict], |     dict[str, dict], | ||||||
| ]: | ]: | ||||||
| 
 | 
 | ||||||
|     conf = get_config() |  | ||||||
| 
 |  | ||||||
|     # XXX; ERRGGG.. |     # XXX; ERRGGG.. | ||||||
|     # pack in the "primary/listing exchange" value from a |     # pack in the "primary/listing exchange" value from a | ||||||
|     # contract lookup since it seems this isn't available by |     # contract lookup since it seems this isn't available by | ||||||
|  | @ -331,39 +300,33 @@ async def update_ledger_from_api_trades( | ||||||
| 
 | 
 | ||||||
|         entry['listingExchange'] = pexch |         entry['listingExchange'] = pexch | ||||||
| 
 | 
 | ||||||
|  |     conf = get_config() | ||||||
|     entries = trades_to_ledger_entries( |     entries = trades_to_ledger_entries( | ||||||
|         conf['accounts'].inverse, |         conf['accounts'].inverse, | ||||||
|         trade_entries, |         trade_entries, | ||||||
|     ) |     ) | ||||||
| 
 |     # normalize recent session's trades to the `Transaction` type | ||||||
|     # write recent session's trades to the user's (local) ledger file. |     trans_by_acct: dict[str, dict[str, Transaction]] = {} | ||||||
|     records: dict[str, pp.Transactions] = {} |  | ||||||
| 
 | 
 | ||||||
|     for acctid, trades_by_id in entries.items(): |     for acctid, trades_by_id in entries.items(): | ||||||
|         # normalize to transaction form |         # normalize to transaction form | ||||||
|         records[acctid] = norm_trade_records(trades_by_id) |         trans_by_acct[acctid] = norm_trade_records(trades_by_id) | ||||||
| 
 | 
 | ||||||
|     return records, entries |     return trans_by_acct, entries | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def update_and_audit_msgs( | async def update_and_audit_msgs( | ||||||
|     acctid: str,  # no `ib.` prefix is required! |     acctid: str,  # no `ib.` prefix is required! | ||||||
|     pps: list[pp.Position], |     pps: list[Position], | ||||||
|     cids2pps: dict[tuple[str, int], BrokerdPosition], |     cids2pps: dict[tuple[str, int], BrokerdPosition], | ||||||
|     validate: bool = False, |     validate: bool = False, | ||||||
| 
 | 
 | ||||||
| ) -> list[BrokerdPosition]: | ) -> list[BrokerdPosition]: | ||||||
| 
 | 
 | ||||||
|     msgs: list[BrokerdPosition] = [] |     msgs: list[BrokerdPosition] = [] | ||||||
|     # pps: dict[int, pp.Position] = {} |  | ||||||
| 
 |  | ||||||
|     for p in pps: |     for p in pps: | ||||||
|         bsuid = p.bsuid |         bsuid = p.bsuid | ||||||
| 
 | 
 | ||||||
|         # build trade-session-actor local table |  | ||||||
|         # of pps from unique symbol ids. |  | ||||||
|         # pps[bsuid] = p |  | ||||||
| 
 |  | ||||||
|         # retreive equivalent ib reported position message |         # retreive equivalent ib reported position message | ||||||
|         # for comparison/audit versus the piker equivalent |         # for comparison/audit versus the piker equivalent | ||||||
|         # breakeven pp calcs. |         # breakeven pp calcs. | ||||||
|  | @ -462,6 +425,8 @@ async def trades_dialogue( | ||||||
|     all_positions = [] |     all_positions = [] | ||||||
|     accounts = set() |     accounts = set() | ||||||
|     clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] |     clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] | ||||||
|  |     acctids = set() | ||||||
|  |     cids2pps: dict[str, BrokerdPosition] = {} | ||||||
| 
 | 
 | ||||||
|     # TODO: this causes a massive tractor bug when you run marketstored |     # TODO: this causes a massive tractor bug when you run marketstored | ||||||
|     # with ``--tsdb``... you should get: |     # with ``--tsdb``... you should get: | ||||||
|  | @ -471,152 +436,170 @@ async def trades_dialogue( | ||||||
|     # - hitting final control-c to kill daemon will lead to hang |     # - hitting final control-c to kill daemon will lead to hang | ||||||
|     # assert 0 |     # assert 0 | ||||||
| 
 | 
 | ||||||
|  |     # TODO: just write on teardown? | ||||||
|  |     # we might also want to delegate a specific actor for | ||||||
|  |     # ledger writing / reading for speed? | ||||||
|     async with ( |     async with ( | ||||||
|         trio.open_nursery() as nurse, |         trio.open_nursery() as nurse, | ||||||
|         open_client_proxies() as (proxies, aioclients), |         open_client_proxies() as (proxies, aioclients), | ||||||
|     ): |     ): | ||||||
|         for account, proxy in proxies.items(): |         # Open a trade ledgers stack for appending trade records over | ||||||
| 
 |         # multiple accounts. | ||||||
|             client = aioclients[account] |         # TODO: we probably want to generalize this into a "ledgers" api.. | ||||||
| 
 |         ledgers: dict[str, dict] = {} | ||||||
|             async def open_stream( |         tables: dict[str, PpTable] = {} | ||||||
|                 task_status: TaskStatus[ |         with ( | ||||||
|                     trio.abc.ReceiveChannel |             ExitStack() as lstack, | ||||||
|                 ] = trio.TASK_STATUS_IGNORED, |  | ||||||
|             ): |  | ||||||
|                 # each api client has a unique event stream |  | ||||||
|                 async with tractor.to_asyncio.open_channel_from( |  | ||||||
|                     recv_trade_updates, |  | ||||||
|                     client=client, |  | ||||||
|                 ) as (first, trade_event_stream): |  | ||||||
| 
 |  | ||||||
|                     task_status.started(trade_event_stream) |  | ||||||
|                     await trio.sleep_forever() |  | ||||||
| 
 |  | ||||||
|             trade_event_stream = await nurse.start(open_stream) |  | ||||||
| 
 |  | ||||||
|             clients.append((client, trade_event_stream)) |  | ||||||
| 
 |  | ||||||
|             assert account in accounts_def |  | ||||||
|             accounts.add(account) |  | ||||||
| 
 |  | ||||||
|         cids2pps: dict[str, BrokerdPosition] = {} |  | ||||||
|         update_records: dict[str, bidict] = {} |  | ||||||
| 
 |  | ||||||
|         # process pp value reported from ib's system. we only use these |  | ||||||
|         # to cross-check sizing since average pricing on their end uses |  | ||||||
|         # the so called (bs) "FIFO" style which more or less results in |  | ||||||
|         # a price that's not useful for traders who want to not lose |  | ||||||
|         # money.. xb |  | ||||||
|         for client in aioclients.values(): |  | ||||||
|             for pos in client.positions(): |  | ||||||
| 
 |  | ||||||
|                 cid, msg = pack_position(pos) |  | ||||||
|                 acctid = msg.account = accounts_def.inverse[msg.account] |  | ||||||
|                 acctid = acctid.strip('ib.') |  | ||||||
|                 cids2pps[(acctid, cid)] = msg |  | ||||||
|                 assert msg.account in accounts, ( |  | ||||||
|                     f'Position for unknown account: {msg.account}') |  | ||||||
| 
 |  | ||||||
|                 # collect all ib-pp reported positions so that we can be |  | ||||||
|                 # sure know which positions to update from the ledger if |  | ||||||
|                 # any are missing from the ``pps.toml`` |  | ||||||
|                 update_records.setdefault(acctid, bidict())[cid] = msg.symbol |  | ||||||
| 
 |  | ||||||
|         # update trades ledgers for all accounts from |  | ||||||
|         # connected api clients which report trades for **this session**. |  | ||||||
|         new_trades = {} |  | ||||||
|         for account, proxy in proxies.items(): |  | ||||||
|             trades = await proxy.trades() |  | ||||||
|             ( |  | ||||||
|                 records_by_acct, |  | ||||||
|                 ledger_entries, |  | ||||||
|             ) = await update_ledger_from_api_trades( |  | ||||||
|                 trades, |  | ||||||
|                 proxy, |  | ||||||
|             ) |  | ||||||
|             new_trades.update(records_by_acct) |  | ||||||
| 
 |  | ||||||
|         for acctid, trans in new_trades.items(): |  | ||||||
|             for t in trans: |  | ||||||
|                 bsuid = t.bsuid |  | ||||||
|                 if bsuid in update_records: |  | ||||||
|                     assert update_records[bsuid] == t.fqsn |  | ||||||
|                 else: |  | ||||||
|                     update_records.setdefault(acctid, bidict())[bsuid] = t.fqsn |  | ||||||
| 
 |  | ||||||
|         # load all positions from `pps.toml`, cross check with ib's |  | ||||||
|         # positions data, and relay re-formatted pps as msgs to the ems. |  | ||||||
|         # __2 cases__: |  | ||||||
|         # - new trades have taken place this session that we want to |  | ||||||
|         #   always reprocess indempotently, |  | ||||||
|         # - no new trades yet but we want to reload and audit any |  | ||||||
|         #   positions reported by ib's sys that may not yet be in |  | ||||||
|         #   piker's ``pps.toml`` state-file. |  | ||||||
|         for acctid, to_update in update_records.items(): |  | ||||||
|             trans = new_trades.get(acctid) |  | ||||||
|             active, closed = pp.update_pps_conf( |  | ||||||
|                 'ib', |  | ||||||
|                 acctid, |  | ||||||
|                 trade_records=trans, |  | ||||||
|                 ledger_reload=to_update, |  | ||||||
|             ) |  | ||||||
|             for pps in [active, closed]: |  | ||||||
|                 msgs = await update_and_audit_msgs( |  | ||||||
|                     acctid, |  | ||||||
|                     pps.values(), |  | ||||||
|                     cids2pps, |  | ||||||
|                     validate=True, |  | ||||||
|                 ) |  | ||||||
|                 all_positions.extend(msg for msg in msgs) |  | ||||||
| 
 |  | ||||||
|         if not all_positions and cids2pps: |  | ||||||
|             raise RuntimeError( |  | ||||||
|                 'Positions reported by ib but not found in `pps.toml`!?\n' |  | ||||||
|                 f'{pformat(cids2pps)}' |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|         # log.info(f'Loaded {len(trades)} from this session') |  | ||||||
|         # TODO: write trades to local ``trades.toml`` |  | ||||||
|         # - use above per-session trades data and write to local file |  | ||||||
|         # - get the "flex reports" working and pull historical data and |  | ||||||
|         # also save locally. |  | ||||||
| 
 |  | ||||||
|         await ctx.started(( |  | ||||||
|             all_positions, |  | ||||||
|             tuple(name for name in accounts_def if name in accounts), |  | ||||||
|         )) |  | ||||||
| 
 |  | ||||||
|         # TODO: maybe just write on teardown? |  | ||||||
|         # we might also want to delegate a specific actor for |  | ||||||
|         # ledger writing / reading for speed? |  | ||||||
| 
 |  | ||||||
|         # write ledger with all new trades **AFTER** we've updated the |  | ||||||
|         # `pps.toml` from the original ledger state! |  | ||||||
|         for acctid, trades_by_id in ledger_entries.items(): |  | ||||||
|             with pp.open_trade_ledger('ib', acctid) as ledger: |  | ||||||
|                 ledger.update(trades_by_id) |  | ||||||
| 
 |  | ||||||
|         async with ( |  | ||||||
|             ctx.open_stream() as ems_stream, |  | ||||||
|             trio.open_nursery() as n, |  | ||||||
|         ): |         ): | ||||||
|             # start order request handler **before** local trades event loop |             for account, proxy in proxies.items(): | ||||||
|             n.start_soon(handle_order_requests, ems_stream, accounts_def) |  | ||||||
| 
 | 
 | ||||||
|             # allocate event relay tasks for each client connection |                 acctid = account.strip('ib.') | ||||||
|             for client, stream in clients: |                 acctids.add(acctid) | ||||||
|                 n.start_soon( | 
 | ||||||
|                     deliver_trade_events, |                 # open ledger and pptable wrapper for each | ||||||
|                     stream, |                 # detected account. | ||||||
|                     ems_stream, |                 ledger = ledgers[acctid] = lstack.enter_context( | ||||||
|                     accounts_def, |                     open_trade_ledger('ib', acctid) | ||||||
|                     cids2pps, |                 ) | ||||||
|                     proxies, |                 table = tables[acctid] = lstack.enter_context( | ||||||
|  |                     open_pps('ib', acctid) | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|             # block until cancelled |                 client = aioclients[account] | ||||||
|             await trio.sleep_forever() | 
 | ||||||
|  |                 async def open_trade_event_stream( | ||||||
|  |                     task_status: TaskStatus[ | ||||||
|  |                         trio.abc.ReceiveChannel | ||||||
|  |                     ] = trio.TASK_STATUS_IGNORED, | ||||||
|  |                 ): | ||||||
|  |                     # each api client has a unique event stream | ||||||
|  |                     async with tractor.to_asyncio.open_channel_from( | ||||||
|  |                         recv_trade_updates, | ||||||
|  |                         client=client, | ||||||
|  |                     ) as (first, trade_event_stream): | ||||||
|  | 
 | ||||||
|  |                         task_status.started(trade_event_stream) | ||||||
|  |                         await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  |                 trade_event_stream = await nurse.start(open_trade_event_stream) | ||||||
|  |                 clients.append((client, trade_event_stream)) | ||||||
|  | 
 | ||||||
|  |                 assert account in accounts_def | ||||||
|  |                 accounts.add(account) | ||||||
|  | 
 | ||||||
|  |                 # update trades ledgers for all accounts from connected | ||||||
|  |                 # api clients which report trades for **this session**. | ||||||
|  |                 trades = await proxy.trades() | ||||||
|  |                 ( | ||||||
|  |                     trans_by_acct, | ||||||
|  |                     api_ready_for_ledger_entries, | ||||||
|  |                 ) = await update_ledger_from_api_trades( | ||||||
|  |                     trades, | ||||||
|  |                     proxy, | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |                 # if new trades are detected from the API, prepare | ||||||
|  |                 # them for the ledger file and update the pptable. | ||||||
|  |                 if api_ready_for_ledger_entries: | ||||||
|  |                     trade_entries = api_ready_for_ledger_entries[acctid] | ||||||
|  |                     ledger.update(trade_entries) | ||||||
|  |                     trans = trans_by_acct.get(acctid) | ||||||
|  |                     if trans: | ||||||
|  |                         table.update_from_trans(trans) | ||||||
|  | 
 | ||||||
|  |                 # process pp value reported from ib's system. we only use these | ||||||
|  |                 # to cross-check sizing since average pricing on their end uses | ||||||
|  |                 # the so called (bs) "FIFO" style which more or less results in | ||||||
|  |                 # a price that's not useful for traders who want to not lose | ||||||
|  |                 # money.. xb | ||||||
|  |                 # for client in aioclients.values(): | ||||||
|  |                 for pos in client.positions(): | ||||||
|  | 
 | ||||||
|  |                     # collect all ib-pp reported positions so that we can be | ||||||
|  |                     # sure know which positions to update from the ledger if | ||||||
|  |                     # any are missing from the ``pps.toml`` | ||||||
|  |                     bsuid, msg = pack_position(pos) | ||||||
|  |                     acctid = msg.account = accounts_def.inverse[msg.account] | ||||||
|  |                     acctid = acctid.strip('ib.') | ||||||
|  |                     cids2pps[(acctid, bsuid)] = msg | ||||||
|  |                     assert msg.account in accounts, ( | ||||||
|  |                         f'Position for unknown account: {msg.account}') | ||||||
|  | 
 | ||||||
|  |                     table = tables[acctid] | ||||||
|  |                     pp = table.pps.get(bsuid) | ||||||
|  |                     if ( | ||||||
|  |                         not pp | ||||||
|  |                         or pp.size != msg.size | ||||||
|  |                     ): | ||||||
|  |                         trans = norm_trade_records(ledger) | ||||||
|  |                         updated = table.update_from_trans(trans) | ||||||
|  |                         pp = updated[bsuid] | ||||||
|  |                         assert msg.size == pp.size, 'WTF' | ||||||
|  | 
 | ||||||
|  |                         # TODO: figure out why these don't match? | ||||||
|  |                         # assert pp.calc_be_price() == pp.be_price | ||||||
|  | 
 | ||||||
|  |                 _, closed_pps = table.dump_active('ib') | ||||||
|  |                 active_pps = table.pps | ||||||
|  | 
 | ||||||
|  |                 # load all positions from `pps.toml`, cross check with | ||||||
|  |                 # ib's positions data, and relay re-formatted pps as | ||||||
|  |                 # msgs to the ems. | ||||||
|  |                 # __2 cases__: | ||||||
|  |                 # - new trades have taken place this session that we want to | ||||||
|  |                 #   always reprocess indempotently, | ||||||
|  |                 # - no new trades yet but we want to reload and audit any | ||||||
|  |                 #   positions reported by ib's sys that may not yet be in | ||||||
|  |                 #   piker's ``pps.toml`` state-file. | ||||||
|  |                 for pps in [active_pps, closed_pps]: | ||||||
|  |                     msgs = await update_and_audit_msgs( | ||||||
|  |                         acctid, | ||||||
|  |                         pps.values(), | ||||||
|  |                         cids2pps, | ||||||
|  |                         validate=True, | ||||||
|  |                     ) | ||||||
|  |                     all_positions.extend(msg for msg in msgs) | ||||||
|  | 
 | ||||||
|  |             if not all_positions and cids2pps: | ||||||
|  |                 raise RuntimeError( | ||||||
|  |                     'Positions reported by ib but not found in `pps.toml`!?\n' | ||||||
|  |                     f'{pformat(cids2pps)}' | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |             await ctx.started(( | ||||||
|  |                 all_positions, | ||||||
|  |                 tuple(name for name in accounts_def if name in accounts), | ||||||
|  |             )) | ||||||
|  | 
 | ||||||
|  |             # write ledger with all new trades **AFTER** we've updated the | ||||||
|  |             # `pps.toml` from the original ledger state! | ||||||
|  |             for acctid, trades_by_id in api_ready_for_ledger_entries.items(): | ||||||
|  |                 ledgers[acctid].update(trades_by_id) | ||||||
|  | 
 | ||||||
|  |             async with ( | ||||||
|  |                 ctx.open_stream() as ems_stream, | ||||||
|  |                 trio.open_nursery() as n, | ||||||
|  |             ): | ||||||
|  |                 # start order request handler **before** local trades | ||||||
|  |                 # event loop | ||||||
|  |                 n.start_soon(handle_order_requests, ems_stream, accounts_def) | ||||||
|  | 
 | ||||||
|  |                 # allocate event relay tasks for each client connection | ||||||
|  |                 for client, stream in clients: | ||||||
|  |                     n.start_soon( | ||||||
|  |                         deliver_trade_events, | ||||||
|  |                         stream, | ||||||
|  |                         ems_stream, | ||||||
|  |                         accounts_def, | ||||||
|  |                         cids2pps, | ||||||
|  |                         proxies, | ||||||
|  | 
 | ||||||
|  |                         ledgers, | ||||||
|  |                         tables, | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |                 # block until cancelled | ||||||
|  |                 await trio.sleep_forever() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def emit_pp_update( | async def emit_pp_update( | ||||||
|  | @ -626,44 +609,44 @@ async def emit_pp_update( | ||||||
|     proxies: dict, |     proxies: dict, | ||||||
|     cids2pps: dict, |     cids2pps: dict, | ||||||
| 
 | 
 | ||||||
|  |     ledgers, | ||||||
|  |     tables, | ||||||
|  | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
| 
 | 
 | ||||||
|     # compute and relay incrementally updated piker pp |     # compute and relay incrementally updated piker pp | ||||||
|     acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] |     acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']] | ||||||
|     proxy = proxies[acctid] |     proxy = proxies[acctid] | ||||||
| 
 | 
 | ||||||
|     acctname = acctid.strip('ib.') |     acctid = acctid.strip('ib.') | ||||||
|     records_by_acct, ledger_entries = await update_ledger_from_api_trades( |     ( | ||||||
|  |         records_by_acct, | ||||||
|  |         api_ready_for_ledger_entries, | ||||||
|  |     ) = await update_ledger_from_api_trades( | ||||||
|         [trade_entry], |         [trade_entry], | ||||||
|         proxy, |         proxy, | ||||||
|     ) |     ) | ||||||
|     records = records_by_acct[acctname] |     trans = records_by_acct[acctid] | ||||||
|     r = records[0] |     r = list(trans.values())[0] | ||||||
| 
 | 
 | ||||||
|     # update and load all positions from `pps.toml`, cross check with |     table = tables[acctid] | ||||||
|     # ib's positions data, and relay re-formatted pps as msgs to the |     table.update_from_trans(trans) | ||||||
|     # ems. we report both the open and closed updates in one map since |     _, closed = table.dump_active('ib') | ||||||
|     # for incremental update we may have just fully closed a pp and need |     active = table.pps | ||||||
|     # to relay that msg as well! |  | ||||||
|     active, closed = pp.update_pps_conf( |  | ||||||
|         'ib', |  | ||||||
|         acctname, |  | ||||||
|         trade_records=records, |  | ||||||
|         ledger_reload={r.bsuid: r.fqsn}, |  | ||||||
|     ) |  | ||||||
| 
 | 
 | ||||||
|     # NOTE: write ledger with all new trades **AFTER** we've updated the |     # NOTE: update ledger with all new trades | ||||||
|     # `pps.toml` from the original ledger state! |     for acctid, trades_by_id in api_ready_for_ledger_entries.items(): | ||||||
|     for acctid, trades_by_id in ledger_entries.items(): |         ledger = ledgers[acctid] | ||||||
|         with pp.open_trade_ledger('ib', acctid) as ledger: |         ledger.update(trades_by_id) | ||||||
|             ledger.update(trades_by_id) |  | ||||||
| 
 | 
 | ||||||
|  |     # generate pp msgs and cross check with ib's positions data, relay | ||||||
|  |     # re-formatted pps as msgs to the ems. | ||||||
|     for pos in filter( |     for pos in filter( | ||||||
|         bool, |         bool, | ||||||
|         [active.get(r.bsuid), closed.get(r.bsuid)] |         [active.get(r.bsuid), closed.get(r.bsuid)] | ||||||
|     ): |     ): | ||||||
|         msgs = await update_and_audit_msgs( |         msgs = await update_and_audit_msgs( | ||||||
|             acctname, |             acctid, | ||||||
|             [pos], |             [pos], | ||||||
|             cids2pps, |             cids2pps, | ||||||
| 
 | 
 | ||||||
|  | @ -685,6 +668,9 @@ async def deliver_trade_events( | ||||||
|     cids2pps: dict[tuple[str, str], BrokerdPosition], |     cids2pps: dict[tuple[str, str], BrokerdPosition], | ||||||
|     proxies: dict[str, MethodProxy], |     proxies: dict[str, MethodProxy], | ||||||
| 
 | 
 | ||||||
|  |     ledgers, | ||||||
|  |     tables, | ||||||
|  | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|     ''' |     ''' | ||||||
|     Format and relay all trade events for a given client to emsd. |     Format and relay all trade events for a given client to emsd. | ||||||
|  | @ -834,6 +820,8 @@ async def deliver_trade_events( | ||||||
|                         accounts_def, |                         accounts_def, | ||||||
|                         proxies, |                         proxies, | ||||||
|                         cids2pps, |                         cids2pps, | ||||||
|  |                         ledgers, | ||||||
|  |                         tables, | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|             case 'cost': |             case 'cost': | ||||||
|  | @ -866,6 +854,8 @@ async def deliver_trade_events( | ||||||
|                         accounts_def, |                         accounts_def, | ||||||
|                         proxies, |                         proxies, | ||||||
|                         cids2pps, |                         cids2pps, | ||||||
|  |                         ledgers, | ||||||
|  |                         tables, | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|             case 'error': |             case 'error': | ||||||
|  | @ -916,14 +906,13 @@ async def deliver_trade_events( | ||||||
| def norm_trade_records( | def norm_trade_records( | ||||||
|     ledger: dict[str, Any], |     ledger: dict[str, Any], | ||||||
| 
 | 
 | ||||||
| ) -> list[pp.Transaction]: | ) -> list[Transaction]: | ||||||
|     ''' |     ''' | ||||||
|     Normalize a flex report or API retrieved executions |     Normalize a flex report or API retrieved executions | ||||||
|     ledger into our standard record format. |     ledger into our standard record format. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     records: list[pp.Transaction] = [] |     records: dict[str, Transaction] = {} | ||||||
| 
 |  | ||||||
|     for tid, record in ledger.items(): |     for tid, record in ledger.items(): | ||||||
| 
 | 
 | ||||||
|         conid = record.get('conId') or record['conid'] |         conid = record.get('conId') or record['conid'] | ||||||
|  | @ -1001,7 +990,7 @@ def norm_trade_records( | ||||||
|         #   which case, we can pull the fqsn from that table (see |         #   which case, we can pull the fqsn from that table (see | ||||||
|         #   `trades_dialogue()` above). |         #   `trades_dialogue()` above). | ||||||
| 
 | 
 | ||||||
|         records.append(pp.Transaction( |         records[tid] = Transaction( | ||||||
|             fqsn=fqsn, |             fqsn=fqsn, | ||||||
|             tid=tid, |             tid=tid, | ||||||
|             size=size, |             size=size, | ||||||
|  | @ -1010,7 +999,7 @@ def norm_trade_records( | ||||||
|             dt=dt, |             dt=dt, | ||||||
|             expiry=expiry, |             expiry=expiry, | ||||||
|             bsuid=conid, |             bsuid=conid, | ||||||
|         )) |         ) | ||||||
| 
 | 
 | ||||||
|     return records |     return records | ||||||
| 
 | 
 | ||||||
|  | @ -1139,8 +1128,7 @@ def load_flex_trades( | ||||||
| 
 | 
 | ||||||
|     trade_entries = report.extract('Trade') |     trade_entries = report.extract('Trade') | ||||||
|     ln = len(trade_entries) |     ln = len(trade_entries) | ||||||
|     # log.info(f'Loaded {ln} trades from flex query') |     log.info(f'Loaded {ln} trades from flex query') | ||||||
|     print(f'Loaded {ln} trades from flex query') |  | ||||||
| 
 | 
 | ||||||
|     trades_by_account = trades_to_ledger_entries( |     trades_by_account = trades_to_ledger_entries( | ||||||
|         # get reverse map to user account names |         # get reverse map to user account names | ||||||
|  | @ -1149,14 +1137,20 @@ def load_flex_trades( | ||||||
|         source_type='flex', |         source_type='flex', | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     ledgers = {} |     for acctid in trades_by_account: | ||||||
|     for acctid, trades_by_id in trades_by_account.items(): |         trades_by_id = trades_by_account[acctid] | ||||||
|         with pp.open_trade_ledger('ib', acctid) as ledger: |         with open_trade_ledger('ib', acctid) as ledger_dict: | ||||||
|             ledger.update(trades_by_id) |             tid_delta = set(trades_by_id) - set(ledger_dict) | ||||||
|  |             log.info( | ||||||
|  |                 'New trades detected\n' | ||||||
|  |                 f'{pformat(tid_delta)}' | ||||||
|  |             ) | ||||||
|  |             if tid_delta: | ||||||
|  |                 ledger_dict.update( | ||||||
|  |                     {tid: trades_by_id[tid] for tid in tid_delta} | ||||||
|  |                 ) | ||||||
| 
 | 
 | ||||||
|         ledgers[acctid] = ledger |     return ledger_dict | ||||||
| 
 |  | ||||||
|     return ledgers |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||||
|  |  | ||||||
|  | @ -301,7 +301,13 @@ async def get_bars( | ||||||
|                 else: |                 else: | ||||||
| 
 | 
 | ||||||
|                     log.warning('Sending CONNECTION RESET') |                     log.warning('Sending CONNECTION RESET') | ||||||
|                     await data_reset_hack(reset_type='connection') |                     res = await data_reset_hack(reset_type='connection') | ||||||
|  |                     if not res: | ||||||
|  |                         log.warning( | ||||||
|  |                             'NO VNC DETECTED!\n' | ||||||
|  |                             'Manually press ctrl-alt-f on your IB java app' | ||||||
|  |                         ) | ||||||
|  |                         # break | ||||||
| 
 | 
 | ||||||
|                     with trio.move_on_after(timeout) as cs: |                     with trio.move_on_after(timeout) as cs: | ||||||
|                         for name, ev in [ |                         for name, ev in [ | ||||||
|  | @ -570,7 +576,6 @@ def normalize( | ||||||
| 
 | 
 | ||||||
|     # check for special contract types |     # check for special contract types | ||||||
|     con = ticker.contract |     con = ticker.contract | ||||||
| 
 |  | ||||||
|     fqsn, calc_price = con2fqsn(con) |     fqsn, calc_price = con2fqsn(con) | ||||||
| 
 | 
 | ||||||
|     # convert named tuples to dicts so we send usable keys |     # convert named tuples to dicts so we send usable keys | ||||||
|  | @ -842,7 +847,10 @@ async def data_reset_hack( | ||||||
|             client.mouse.click() |             client.mouse.click() | ||||||
|             client.keyboard.press('Ctrl', 'Alt', key)  # keys are stacked |             client.keyboard.press('Ctrl', 'Alt', key)  # keys are stacked | ||||||
| 
 | 
 | ||||||
|     await tractor.to_asyncio.run_task(vnc_click_hack) |     try: | ||||||
|  |         await tractor.to_asyncio.run_task(vnc_click_hack) | ||||||
|  |     except OSError: | ||||||
|  |         return False | ||||||
| 
 | 
 | ||||||
|     # we don't really need the ``xdotool`` approach any more B) |     # we don't really need the ``xdotool`` approach any more B) | ||||||
|     return True |     return True | ||||||
|  | @ -857,15 +865,24 @@ async def open_symbol_search( | ||||||
|     # TODO: load user defined symbol set locally for fast search? |     # TODO: load user defined symbol set locally for fast search? | ||||||
|     await ctx.started({}) |     await ctx.started({}) | ||||||
| 
 | 
 | ||||||
|     async with open_data_client() as proxy: |     # async with open_data_client() as proxy: | ||||||
|  |     async with ( | ||||||
|  |         open_client_proxies() as (proxies, clients), | ||||||
|  |     ): | ||||||
|         async with ctx.open_stream() as stream: |         async with ctx.open_stream() as stream: | ||||||
| 
 | 
 | ||||||
|  |             # await tractor.breakpoint() | ||||||
|  |             proxy = proxies['ib.algopaper'] | ||||||
|  | 
 | ||||||
|             last = time.time() |             last = time.time() | ||||||
| 
 | 
 | ||||||
|             async for pattern in stream: |             async for pattern in stream: | ||||||
|                 log.debug(f'received {pattern}') |                 log.info(f'received {pattern}') | ||||||
|                 now = time.time() |                 now = time.time() | ||||||
| 
 | 
 | ||||||
|  |                 # this causes tractor hang... | ||||||
|  |                 # assert 0 | ||||||
|  | 
 | ||||||
|                 assert pattern, 'IB can not accept blank search pattern' |                 assert pattern, 'IB can not accept blank search pattern' | ||||||
| 
 | 
 | ||||||
|                 # throttle search requests to no faster then 1Hz |                 # throttle search requests to no faster then 1Hz | ||||||
|  | @ -893,7 +910,7 @@ async def open_symbol_search( | ||||||
| 
 | 
 | ||||||
|                     continue |                     continue | ||||||
| 
 | 
 | ||||||
|                 log.debug(f'searching for {pattern}') |                 log.info(f'searching for {pattern}') | ||||||
| 
 | 
 | ||||||
|                 last = time.time() |                 last = time.time() | ||||||
| 
 | 
 | ||||||
|  | @ -904,17 +921,25 @@ async def open_symbol_search( | ||||||
|                 async def stash_results(target: Awaitable[list]): |                 async def stash_results(target: Awaitable[list]): | ||||||
|                     stock_results.extend(await target) |                     stock_results.extend(await target) | ||||||
| 
 | 
 | ||||||
|                 async with trio.open_nursery() as sn: |                 for i in range(10): | ||||||
|                     sn.start_soon( |                     with trio.move_on_after(3) as cs: | ||||||
|                         stash_results, |                         async with trio.open_nursery() as sn: | ||||||
|                         proxy.search_symbols( |                             sn.start_soon( | ||||||
|                             pattern=pattern, |                                 stash_results, | ||||||
|                             upto=5, |                                 proxy.search_symbols( | ||||||
|                         ), |                                     pattern=pattern, | ||||||
|                     ) |                                     upto=5, | ||||||
|  |                                 ), | ||||||
|  |                             ) | ||||||
| 
 | 
 | ||||||
|                     # trigger async request |                             # trigger async request | ||||||
|                     await trio.sleep(0) |                             await trio.sleep(0) | ||||||
|  | 
 | ||||||
|  |                     if cs.cancelled_caught: | ||||||
|  |                         log.warning(f'Search timeout? {proxy._aio_ns.ib.client}') | ||||||
|  |                         continue | ||||||
|  |                     else: | ||||||
|  |                         break | ||||||
| 
 | 
 | ||||||
|                     # # match against our ad-hoc set immediately |                     # # match against our ad-hoc set immediately | ||||||
|                     # adhoc_matches = fuzzy.extractBests( |                     # adhoc_matches = fuzzy.extractBests( | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue