From 9e87b6515b6597c349aa80dd3f0cdc6ebb3914e3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 17 Jul 2023 17:31:34 -0400 Subject: [PATCH] ib: be symcache compat by using bypass attr Since there's no easy way to support it yet, we bypass symbology caching in for now and instead allow the `ib.ledger` routines to fill in `MktPair` and `Asset` entries ad-hoc for the purposes of txn ledger processing. --- piker/brokers/ib/__init__.py | 16 +- piker/brokers/ib/broker.py | 39 +++- piker/brokers/ib/ledger.py | 395 +++++++++++++++++++---------------- 3 files changed, 263 insertions(+), 187 deletions(-) diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index d42002a1..c98057a7 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -30,18 +30,27 @@ from .api import ( ) from .feed import ( open_history_client, - open_symbol_search, stream_quotes, + get_mkt_info, + open_symbol_search, ) from .broker import ( open_trade_dialog, ) from .ledger import ( + norm_trade, norm_trade_records, ) +# TODO: +# from .symbols import ( +# get_mkt_info, +# open_symbol_search, +# ) __all__ = [ 'get_client', + 'get_mkt_info', + 'norm_trade', 'norm_trade_records', 'open_trade_dialog', 'open_history_client', @@ -75,3 +84,8 @@ _spawn_kwargs = { # know if ``brokerd`` should be spawned with # ``tractor``'s aio mode. _infect_asyncio: bool = True + +# XXX NOTE: for now we disable symcache with this backend since +# there is no clearly simple nor practical way to download "all +# symbology info" for all supported venues.. +_no_symcache: bool = True diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index e4ac0598..842fdbc3 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -60,9 +60,13 @@ from piker.accounting import ( open_trade_ledger, TransactionLedger, iter_by_dt, - open_pps, + open_account, Account, ) +from piker.data._symcache import ( + open_symcache, + SymbologyCache, +) from piker.clearing._messages import ( Order, Status, @@ -295,6 +299,10 @@ async def update_ledger_from_api_trades( client: Union[Client, MethodProxy], accounts_def_inv: bidict[str, str], + # provided for ad-hoc insertions "as transactions are + # processed" + symcache: SymbologyCache | None = None, + ) -> tuple[ dict[str, Transaction], dict[str, dict], @@ -325,7 +333,7 @@ async def update_ledger_from_api_trades( # pack in the ``Contract.secType`` entry['asset_type'] = condict['secType'] - entries = api_trades_to_ledger_entries( + entries: dict[str, dict] = api_trades_to_ledger_entries( accounts_def_inv, trade_entries, ) @@ -334,7 +342,10 @@ async def update_ledger_from_api_trades( for acctid, trades_by_id in entries.items(): # normalize to transaction form - trans_by_acct[acctid] = norm_trade_records(trades_by_id) + trans_by_acct[acctid] = norm_trade_records( + trades_by_id, + symcache=symcache, + ) return trans_by_acct, entries @@ -547,11 +558,11 @@ async def open_trade_dialog( ) -> AsyncIterator[dict[str, Any]]: + # from piker.brokers import ( + # get_brokermod, + # ) accounts_def = config.load_accounts(['ib']) - # TODO: do this as part of `open_account()`!? - from piker.data._symcache import open_symcache - global _client_cache # deliver positions to subscriber before anything else @@ -565,12 +576,14 @@ async def open_trade_dialog( proxies, aioclients, ), + + # TODO: do this as part of `open_account()`!? open_symcache('ib', only_from_memcache=True) as symcache, ): # Open a trade ledgers stack for appending trade records over # multiple accounts. # TODO: we probably want to generalize this into a "ledgers" api.. - ledgers: dict[str, dict] = {} + ledgers: dict[str, TransactionLedger] = {} tables: dict[str, Account] = {} order_msgs: list[Status] = [] conf = get_config() @@ -617,7 +630,7 @@ async def open_trade_dialog( # positions reported by ib's sys that may not yet be in # piker's ``pps.toml`` state-file. tables[acctid] = lstack.enter_context( - open_pps( + open_account( 'ib', acctid, write_on_exit=True, @@ -640,7 +653,10 @@ async def open_trade_dialog( # update position table with latest ledger from all # gathered transactions: ledger file + api records. - trans: dict[str, Transaction] = norm_trade_records(ledger) + trans: dict[str, Transaction] = norm_trade_records( + ledger, + symcache=symcache, + ) # update trades ledgers for all accounts from connected # api clients which report trades for **this session**. @@ -655,6 +671,7 @@ async def open_trade_dialog( api_trades, proxy, accounts_def_inv, + symcache=symcache, ) # if new api_trades are detected from the API, prepare @@ -797,7 +814,11 @@ async def emit_pp_update( acnts: dict[str, Account], ) -> None: + ''' + Extract trade record from an API event, convert it into a `Transaction`, + update the backing ledger and finally emit a position update to the EMS. + ''' accounts_def_inv: bidict[str, str] = accounts_def.inverse accnum: str = trade_entry['execution']['acctNumber'] fq_acctid: str = accounts_def_inv[accnum] diff --git a/piker/brokers/ib/ledger.py b/piker/brokers/ib/ledger.py index e12bab13..aaeda153 100644 --- a/piker/brokers/ib/ledger.py +++ b/piker/brokers/ib/ledger.py @@ -28,6 +28,10 @@ from typing import ( from bidict import bidict import pendulum +from piker.data import ( + Struct, + SymbologyCache, +) from piker.accounting import ( Asset, dec_digits, @@ -39,8 +43,211 @@ from ._flex_reports import parse_flex_dt from ._util import log +def norm_trade( + tid: str, + record: dict[str, Any], + + # this is the dict that was returned from + # `Client.get_mkt_pairs()` and when running offline ledger + # processing from `.accounting`, this will be the table loaded + # into `SymbologyCache.pairs`. + pairs: dict[str, Struct], + symcache: SymbologyCache | None = None, + +) -> Transaction | None: + + conid = record.get('conId') or record['conid'] + comms = record.get('commission') + if comms is None: + comms = -1*record['ibCommission'] + + price = record.get('price') or record['tradePrice'] + + # the api doesn't do the -/+ on the quantity for you but flex + # records do.. are you fucking serious ib...!? + size = record.get('quantity') or record['shares'] * { + 'BOT': 1, + 'SLD': -1, + }[record['side']] + + symbol: str = record['symbol'] + exch: str = record.get('listingExchange') or record['exchange'] + + # NOTE: remove null values since `tomlkit` can't serialize + # them to file. + if dnc := record.pop('deltaNeutralContract', None): + record['deltaNeutralContract'] = dnc + + # likely an opts contract record from a flex report.. + # TODO: no idea how to parse ^ the strike part from flex.. + # (00010000 any, or 00007500 tsla, ..) + # we probably must do the contract lookup for this? + if ( + ' ' in symbol + or '--' in exch + ): + underlying, _, tail = symbol.partition(' ') + exch: str = 'opt' + expiry: str = tail[:6] + # otype = tail[6] + # strike = tail[7:] + + print(f'skipping opts contract {symbol}') + return None + + # timestamping is way different in API records + dtstr = record.get('datetime') + date = record.get('date') + flex_dtstr = record.get('dateTime') + + if dtstr or date: + dt = pendulum.parse(dtstr or date) + + elif flex_dtstr: + # probably a flex record with a wonky non-std timestamp.. + dt = parse_flex_dt(record['dateTime']) + + # special handling of symbol extraction from + # flex records using some ad-hoc schema parsing. + asset_type: str = record.get( + 'assetCategory' + ) or record.get('secType', 'STK') + + if (expiry := ( + record.get('lastTradeDateOrContractMonth') + or record.get('expiry') + ) + ): + expiry: str = str(expiry).strip(' ') + # NOTE: we directly use the (simple and usually short) + # date-string expiry token when packing the `MktPair` + # since we want the fqme to contain *that* token. + # It might make sense later to instead parse and then + # render different output str format(s) for this same + # purpose depending on asset-type-market down the road. + # Eg. for derivs we use the short token only for fqme + # but use the isoformat('T') for transactions and + # account file position entries? + # dt_str: str = pendulum.parse(expiry).isoformat('T') + + # XXX: pretty much all legacy market assets have a fiat + # currency (denomination) determined by their venue. + currency: str = record['currency'] + src = Asset( + name=currency.lower(), + atype='fiat', + tx_tick=Decimal('0.01'), + ) + + match asset_type: + case 'FUT': + # (flex) ledger entries don't have any simple 3-char key? + # TODO: XXX: WOA this is kinda hacky.. probably + # should figure out the correct future pair key more + # explicitly and consistently? + symbol: str = symbol[:3] + dst = Asset( + name=symbol.lower(), + atype='future', + tx_tick=Decimal('1'), + ) + + case 'STK': + dst = Asset( + name=symbol.lower(), + atype='stock', + tx_tick=Decimal('1'), + ) + + case 'CASH': + if currency not in symbol: + # likely a dict-casted `Forex` contract which + # has .symbol as the dst and .currency as the + # src. + name: str = symbol.lower() + else: + # likely a flex-report record which puts + # EUR.USD as the symbol field and just USD in + # the currency field. + name: str = symbol.lower().replace(f'.{src.name}', '') + + dst = Asset( + name=name, + atype='fiat', + tx_tick=Decimal('0.01'), + ) + + case 'OPT': + dst = Asset( + name=symbol.lower(), + atype='option', + tx_tick=Decimal('1'), + ) + + # try to build out piker fqme from record. + # src: str = record['currency'] + price_tick: Decimal = digits_to_dec(dec_digits(price)) + + # NOTE: can't serlialize `tomlkit.String` so cast to native + atype: str = str(dst.atype) + + mkt = MktPair( + bs_mktid=str(conid), + dst=dst, + + price_tick=price_tick, + # NOTE: for "legacy" assets, volume is normally discreet, not + # a float, but we keep a digit in case the suitz decide + # to get crazy and change it; we'll be kinda ready + # schema-wise.. + size_tick=Decimal('1'), + + src=src, # XXX: normally always a fiat + + _atype=atype, + + venue=exch, + expiry=expiry, + broker='ib', + + _fqme_without_src=(atype != 'fiat'), + ) + + fqme: str = mkt.fqme + + # XXX: if passed in, we fill out the symcache ad-hoc in order + # to make downstream accounting work.. + if symcache: + symcache.mktmaps[fqme] = mkt + symcache.assets[src.name] = src + symcache.assets[dst.name] = dst + + # NOTE: for flex records the normal fields for defining an fqme + # sometimes won't be available so we rely on two approaches for + # the "reverse lookup" of piker style fqme keys: + # - when dealing with API trade records received from + # `IB.trades()` we do a contract lookup at he time of processing + # - when dealing with flex records, it is assumed the record + # is at least a day old and thus the TWS position reporting system + # should already have entries if the pps are still open, in + # which case, we can pull the fqme from that table (see + # `trades_dialogue()` above). + return Transaction( + fqme=fqme, + tid=tid, + size=size, + price=price, + cost=comms, + dt=dt, + expiry=expiry, + bs_mktid=str(conid), + ) + + + def norm_trade_records( ledger: dict[str, Any], + symcache: SymbologyCache | None = None, ) -> dict[str, Transaction]: ''' @@ -53,188 +260,22 @@ def norm_trade_records( records: list[Transaction] = [] for tid, record in ledger.items(): - conid = record.get('conId') or record['conid'] - comms = record.get('commission') - if comms is None: - comms = -1*record['ibCommission'] - price = record.get('price') or record['tradePrice'] + txn = norm_trade( + tid, + record, - # the api doesn't do the -/+ on the quantity for you but flex - # records do.. are you fucking serious ib...!? - size = record.get('quantity') or record['shares'] * { - 'BOT': 1, - 'SLD': -1, - }[record['side']] + # NOTE: currently no symcache support + pairs={}, + symcache=symcache, + ) - symbol: str = record['symbol'] - exch: str = record.get('listingExchange') or record['exchange'] - - # NOTE: remove null values since `tomlkit` can't serialize - # them to file. - if dnc := record.pop('deltaNeutralContract', None): - record['deltaNeutralContract'] = dnc - - # likely an opts contract record from a flex report.. - # TODO: no idea how to parse ^ the strike part from flex.. - # (00010000 any, or 00007500 tsla, ..) - # we probably must do the contract lookup for this? - if ( - ' ' in symbol - or '--' in exch - ): - underlying, _, tail = symbol.partition(' ') - exch: str = 'opt' - expiry: str = tail[:6] - # otype = tail[6] - # strike = tail[7:] - - print(f'skipping opts contract {symbol}') + if txn is None: continue - # timestamping is way different in API records - dtstr = record.get('datetime') - date = record.get('date') - flex_dtstr = record.get('dateTime') - - if dtstr or date: - dt = pendulum.parse(dtstr or date) - - elif flex_dtstr: - # probably a flex record with a wonky non-std timestamp.. - dt = parse_flex_dt(record['dateTime']) - - # special handling of symbol extraction from - # flex records using some ad-hoc schema parsing. - asset_type: str = record.get( - 'assetCategory' - ) or record.get('secType', 'STK') - - if (expiry := ( - record.get('lastTradeDateOrContractMonth') - or record.get('expiry') - ) - ): - expiry: str = str(expiry).strip(' ') - # NOTE: we directly use the (simple and usually short) - # date-string expiry token when packing the `MktPair` - # since we want the fqme to contain *that* token. - # It might make sense later to instead parse and then - # render different output str format(s) for this same - # purpose depending on asset-type-market down the road. - # Eg. for derivs we use the short token only for fqme - # but use the isoformat('T') for transactions and - # account file position entries? - # dt_str: str = pendulum.parse(expiry).isoformat('T') - - # XXX: pretty much all legacy market assets have a fiat - # currency (denomination) determined by their venue. - currency: str = record['currency'] - src = Asset( - name=currency.lower(), - atype='fiat', - tx_tick=Decimal('0.01'), - ) - - match asset_type: - case 'FUT': - # (flex) ledger entries don't have any simple 3-char key? - # TODO: XXX: WOA this is kinda hacky.. probably - # should figure out the correct future pair key more - # explicitly and consistently? - symbol: str = symbol[:3] - dst = Asset( - name=symbol.lower(), - atype='future', - tx_tick=Decimal('1'), - ) - - case 'STK': - dst = Asset( - name=symbol.lower(), - atype='stock', - tx_tick=Decimal('1'), - ) - - case 'CASH': - if currency not in symbol: - # likely a dict-casted `Forex` contract which - # has .symbol as the dst and .currency as the - # src. - name: str = symbol.lower() - else: - # likely a flex-report record which puts - # EUR.USD as the symbol field and just USD in - # the currency field. - name: str = symbol.lower().replace(f'.{src.name}', '') - - dst = Asset( - name=name, - atype='fiat', - tx_tick=Decimal('0.01'), - ) - - case 'OPT': - dst = Asset( - name=symbol.lower(), - atype='option', - tx_tick=Decimal('1'), - ) - - # try to build out piker fqme from record. - # src: str = record['currency'] - price_tick: Decimal = digits_to_dec(dec_digits(price)) - - # NOTE: can't serlialize `tomlkit.String` so cast to native - atype: str = str(dst.atype) - - pair = MktPair( - bs_mktid=str(conid), - dst=dst, - - price_tick=price_tick, - # NOTE: for "legacy" assets, volume is normally discreet, not - # a float, but we keep a digit in case the suitz decide - # to get crazy and change it; we'll be kinda ready - # schema-wise.. - size_tick=Decimal('1'), - - src=src, # XXX: normally always a fiat - - _atype=atype, - - venue=exch, - expiry=expiry, - broker='ib', - - _fqme_without_src=(atype != 'fiat'), - ) - - fqme: str = pair.fqme - - # NOTE: for flex records the normal fields for defining an fqme - # sometimes won't be available so we rely on two approaches for - # the "reverse lookup" of piker style fqme keys: - # - when dealing with API trade records received from - # `IB.trades()` we do a contract lookup at he time of processing - # - when dealing with flex records, it is assumed the record - # is at least a day old and thus the TWS position reporting system - # should already have entries if the pps are still open, in - # which case, we can pull the fqme from that table (see - # `trades_dialogue()` above). - trans = Transaction( - fqme=fqme, - tid=tid, - size=size, - price=price, - cost=comms, - dt=dt, - expiry=expiry, - bs_mktid=str(conid), - ) insort( records, - trans, + txn, key=lambda t: t.dt ) @@ -258,14 +299,14 @@ def api_trades_to_ledger_entries( # instead of pre-casting to dicts? trade_entries: list[dict], -) -> dict: +) -> dict[str, dict]: ''' Convert API execution objects entry objects into ``dict`` form, pretty much straight up without modification except add a `pydatetime` field from the parsed timestamp. ''' - trades_by_account = {} + trades_by_account: dict[str, dict] = {} for t in trade_entries: # NOTE: example of schema we pull from the API client. # {