ib: be symcache compat by using bypass attr

Since there's no easy way to support it yet, we bypass symbology caching
in for now and instead allow the `ib.ledger` routines to fill in
`MktPair` and `Asset` entries ad-hoc for the purposes of txn ledger
processing.
account_tests
Tyler Goodlet 2023-07-17 17:31:34 -04:00
parent a05a82486d
commit 9e87b6515b
3 changed files with 263 additions and 187 deletions

View File

@ -30,18 +30,27 @@ from .api import (
)
from .feed import (
open_history_client,
open_symbol_search,
stream_quotes,
get_mkt_info,
open_symbol_search,
)
from .broker import (
open_trade_dialog,
)
from .ledger import (
norm_trade,
norm_trade_records,
)
# TODO:
# from .symbols import (
# get_mkt_info,
# open_symbol_search,
# )
__all__ = [
'get_client',
'get_mkt_info',
'norm_trade',
'norm_trade_records',
'open_trade_dialog',
'open_history_client',
@ -75,3 +84,8 @@ _spawn_kwargs = {
# know if ``brokerd`` should be spawned with
# ``tractor``'s aio mode.
_infect_asyncio: bool = True
# XXX NOTE: for now we disable symcache with this backend since
# there is no clearly simple nor practical way to download "all
# symbology info" for all supported venues..
_no_symcache: bool = True

View File

@ -60,9 +60,13 @@ from piker.accounting import (
open_trade_ledger,
TransactionLedger,
iter_by_dt,
open_pps,
open_account,
Account,
)
from piker.data._symcache import (
open_symcache,
SymbologyCache,
)
from piker.clearing._messages import (
Order,
Status,
@ -295,6 +299,10 @@ async def update_ledger_from_api_trades(
client: Union[Client, MethodProxy],
accounts_def_inv: bidict[str, str],
# provided for ad-hoc insertions "as transactions are
# processed"
symcache: SymbologyCache | None = None,
) -> tuple[
dict[str, Transaction],
dict[str, dict],
@ -325,7 +333,7 @@ async def update_ledger_from_api_trades(
# pack in the ``Contract.secType``
entry['asset_type'] = condict['secType']
entries = api_trades_to_ledger_entries(
entries: dict[str, dict] = api_trades_to_ledger_entries(
accounts_def_inv,
trade_entries,
)
@ -334,7 +342,10 @@ async def update_ledger_from_api_trades(
for acctid, trades_by_id in entries.items():
# normalize to transaction form
trans_by_acct[acctid] = norm_trade_records(trades_by_id)
trans_by_acct[acctid] = norm_trade_records(
trades_by_id,
symcache=symcache,
)
return trans_by_acct, entries
@ -547,11 +558,11 @@ async def open_trade_dialog(
) -> AsyncIterator[dict[str, Any]]:
# from piker.brokers import (
# get_brokermod,
# )
accounts_def = config.load_accounts(['ib'])
# TODO: do this as part of `open_account()`!?
from piker.data._symcache import open_symcache
global _client_cache
# deliver positions to subscriber before anything else
@ -565,12 +576,14 @@ async def open_trade_dialog(
proxies,
aioclients,
),
# TODO: do this as part of `open_account()`!?
open_symcache('ib', only_from_memcache=True) as symcache,
):
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
# TODO: we probably want to generalize this into a "ledgers" api..
ledgers: dict[str, dict] = {}
ledgers: dict[str, TransactionLedger] = {}
tables: dict[str, Account] = {}
order_msgs: list[Status] = []
conf = get_config()
@ -617,7 +630,7 @@ async def open_trade_dialog(
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
tables[acctid] = lstack.enter_context(
open_pps(
open_account(
'ib',
acctid,
write_on_exit=True,
@ -640,7 +653,10 @@ async def open_trade_dialog(
# update position table with latest ledger from all
# gathered transactions: ledger file + api records.
trans: dict[str, Transaction] = norm_trade_records(ledger)
trans: dict[str, Transaction] = norm_trade_records(
ledger,
symcache=symcache,
)
# update trades ledgers for all accounts from connected
# api clients which report trades for **this session**.
@ -655,6 +671,7 @@ async def open_trade_dialog(
api_trades,
proxy,
accounts_def_inv,
symcache=symcache,
)
# if new api_trades are detected from the API, prepare
@ -797,7 +814,11 @@ async def emit_pp_update(
acnts: dict[str, Account],
) -> None:
'''
Extract trade record from an API event, convert it into a `Transaction`,
update the backing ledger and finally emit a position update to the EMS.
'''
accounts_def_inv: bidict[str, str] = accounts_def.inverse
accnum: str = trade_entry['execution']['acctNumber']
fq_acctid: str = accounts_def_inv[accnum]

View File

@ -28,6 +28,10 @@ from typing import (
from bidict import bidict
import pendulum
from piker.data import (
Struct,
SymbologyCache,
)
from piker.accounting import (
Asset,
dec_digits,
@ -39,8 +43,211 @@ from ._flex_reports import parse_flex_dt
from ._util import log
def norm_trade(
tid: str,
record: dict[str, Any],
# this is the dict that was returned from
# `Client.get_mkt_pairs()` and when running offline ledger
# processing from `.accounting`, this will be the table loaded
# into `SymbologyCache.pairs`.
pairs: dict[str, Struct],
symcache: SymbologyCache | None = None,
) -> Transaction | None:
conid = record.get('conId') or record['conid']
comms = record.get('commission')
if comms is None:
comms = -1*record['ibCommission']
price = record.get('price') or record['tradePrice']
# the api doesn't do the -/+ on the quantity for you but flex
# records do.. are you fucking serious ib...!?
size = record.get('quantity') or record['shares'] * {
'BOT': 1,
'SLD': -1,
}[record['side']]
symbol: str = record['symbol']
exch: str = record.get('listingExchange') or record['exchange']
# NOTE: remove null values since `tomlkit` can't serialize
# them to file.
if dnc := record.pop('deltaNeutralContract', None):
record['deltaNeutralContract'] = dnc
# likely an opts contract record from a flex report..
# TODO: no idea how to parse ^ the strike part from flex..
# (00010000 any, or 00007500 tsla, ..)
# we probably must do the contract lookup for this?
if (
' ' in symbol
or '--' in exch
):
underlying, _, tail = symbol.partition(' ')
exch: str = 'opt'
expiry: str = tail[:6]
# otype = tail[6]
# strike = tail[7:]
print(f'skipping opts contract {symbol}')
return None
# timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date')
flex_dtstr = record.get('dateTime')
if dtstr or date:
dt = pendulum.parse(dtstr or date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
asset_type: str = record.get(
'assetCategory'
) or record.get('secType', 'STK')
if (expiry := (
record.get('lastTradeDateOrContractMonth')
or record.get('expiry')
)
):
expiry: str = str(expiry).strip(' ')
# NOTE: we directly use the (simple and usually short)
# date-string expiry token when packing the `MktPair`
# since we want the fqme to contain *that* token.
# It might make sense later to instead parse and then
# render different output str format(s) for this same
# purpose depending on asset-type-market down the road.
# Eg. for derivs we use the short token only for fqme
# but use the isoformat('T') for transactions and
# account file position entries?
# dt_str: str = pendulum.parse(expiry).isoformat('T')
# XXX: pretty much all legacy market assets have a fiat
# currency (denomination) determined by their venue.
currency: str = record['currency']
src = Asset(
name=currency.lower(),
atype='fiat',
tx_tick=Decimal('0.01'),
)
match asset_type:
case 'FUT':
# (flex) ledger entries don't have any simple 3-char key?
# TODO: XXX: WOA this is kinda hacky.. probably
# should figure out the correct future pair key more
# explicitly and consistently?
symbol: str = symbol[:3]
dst = Asset(
name=symbol.lower(),
atype='future',
tx_tick=Decimal('1'),
)
case 'STK':
dst = Asset(
name=symbol.lower(),
atype='stock',
tx_tick=Decimal('1'),
)
case 'CASH':
if currency not in symbol:
# likely a dict-casted `Forex` contract which
# has .symbol as the dst and .currency as the
# src.
name: str = symbol.lower()
else:
# likely a flex-report record which puts
# EUR.USD as the symbol field and just USD in
# the currency field.
name: str = symbol.lower().replace(f'.{src.name}', '')
dst = Asset(
name=name,
atype='fiat',
tx_tick=Decimal('0.01'),
)
case 'OPT':
dst = Asset(
name=symbol.lower(),
atype='option',
tx_tick=Decimal('1'),
)
# try to build out piker fqme from record.
# src: str = record['currency']
price_tick: Decimal = digits_to_dec(dec_digits(price))
# NOTE: can't serlialize `tomlkit.String` so cast to native
atype: str = str(dst.atype)
mkt = MktPair(
bs_mktid=str(conid),
dst=dst,
price_tick=price_tick,
# NOTE: for "legacy" assets, volume is normally discreet, not
# a float, but we keep a digit in case the suitz decide
# to get crazy and change it; we'll be kinda ready
# schema-wise..
size_tick=Decimal('1'),
src=src, # XXX: normally always a fiat
_atype=atype,
venue=exch,
expiry=expiry,
broker='ib',
_fqme_without_src=(atype != 'fiat'),
)
fqme: str = mkt.fqme
# XXX: if passed in, we fill out the symcache ad-hoc in order
# to make downstream accounting work..
if symcache:
symcache.mktmaps[fqme] = mkt
symcache.assets[src.name] = src
symcache.assets[dst.name] = dst
# NOTE: for flex records the normal fields for defining an fqme
# sometimes won't be available so we rely on two approaches for
# the "reverse lookup" of piker style fqme keys:
# - when dealing with API trade records received from
# `IB.trades()` we do a contract lookup at he time of processing
# - when dealing with flex records, it is assumed the record
# is at least a day old and thus the TWS position reporting system
# should already have entries if the pps are still open, in
# which case, we can pull the fqme from that table (see
# `trades_dialogue()` above).
return Transaction(
fqme=fqme,
tid=tid,
size=size,
price=price,
cost=comms,
dt=dt,
expiry=expiry,
bs_mktid=str(conid),
)
def norm_trade_records(
ledger: dict[str, Any],
symcache: SymbologyCache | None = None,
) -> dict[str, Transaction]:
'''
@ -53,188 +260,22 @@ def norm_trade_records(
records: list[Transaction] = []
for tid, record in ledger.items():
conid = record.get('conId') or record['conid']
comms = record.get('commission')
if comms is None:
comms = -1*record['ibCommission']
price = record.get('price') or record['tradePrice']
txn = norm_trade(
tid,
record,
# the api doesn't do the -/+ on the quantity for you but flex
# records do.. are you fucking serious ib...!?
size = record.get('quantity') or record['shares'] * {
'BOT': 1,
'SLD': -1,
}[record['side']]
# NOTE: currently no symcache support
pairs={},
symcache=symcache,
)
symbol: str = record['symbol']
exch: str = record.get('listingExchange') or record['exchange']
# NOTE: remove null values since `tomlkit` can't serialize
# them to file.
if dnc := record.pop('deltaNeutralContract', None):
record['deltaNeutralContract'] = dnc
# likely an opts contract record from a flex report..
# TODO: no idea how to parse ^ the strike part from flex..
# (00010000 any, or 00007500 tsla, ..)
# we probably must do the contract lookup for this?
if (
' ' in symbol
or '--' in exch
):
underlying, _, tail = symbol.partition(' ')
exch: str = 'opt'
expiry: str = tail[:6]
# otype = tail[6]
# strike = tail[7:]
print(f'skipping opts contract {symbol}')
if txn is None:
continue
# timestamping is way different in API records
dtstr = record.get('datetime')
date = record.get('date')
flex_dtstr = record.get('dateTime')
if dtstr or date:
dt = pendulum.parse(dtstr or date)
elif flex_dtstr:
# probably a flex record with a wonky non-std timestamp..
dt = parse_flex_dt(record['dateTime'])
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
asset_type: str = record.get(
'assetCategory'
) or record.get('secType', 'STK')
if (expiry := (
record.get('lastTradeDateOrContractMonth')
or record.get('expiry')
)
):
expiry: str = str(expiry).strip(' ')
# NOTE: we directly use the (simple and usually short)
# date-string expiry token when packing the `MktPair`
# since we want the fqme to contain *that* token.
# It might make sense later to instead parse and then
# render different output str format(s) for this same
# purpose depending on asset-type-market down the road.
# Eg. for derivs we use the short token only for fqme
# but use the isoformat('T') for transactions and
# account file position entries?
# dt_str: str = pendulum.parse(expiry).isoformat('T')
# XXX: pretty much all legacy market assets have a fiat
# currency (denomination) determined by their venue.
currency: str = record['currency']
src = Asset(
name=currency.lower(),
atype='fiat',
tx_tick=Decimal('0.01'),
)
match asset_type:
case 'FUT':
# (flex) ledger entries don't have any simple 3-char key?
# TODO: XXX: WOA this is kinda hacky.. probably
# should figure out the correct future pair key more
# explicitly and consistently?
symbol: str = symbol[:3]
dst = Asset(
name=symbol.lower(),
atype='future',
tx_tick=Decimal('1'),
)
case 'STK':
dst = Asset(
name=symbol.lower(),
atype='stock',
tx_tick=Decimal('1'),
)
case 'CASH':
if currency not in symbol:
# likely a dict-casted `Forex` contract which
# has .symbol as the dst and .currency as the
# src.
name: str = symbol.lower()
else:
# likely a flex-report record which puts
# EUR.USD as the symbol field and just USD in
# the currency field.
name: str = symbol.lower().replace(f'.{src.name}', '')
dst = Asset(
name=name,
atype='fiat',
tx_tick=Decimal('0.01'),
)
case 'OPT':
dst = Asset(
name=symbol.lower(),
atype='option',
tx_tick=Decimal('1'),
)
# try to build out piker fqme from record.
# src: str = record['currency']
price_tick: Decimal = digits_to_dec(dec_digits(price))
# NOTE: can't serlialize `tomlkit.String` so cast to native
atype: str = str(dst.atype)
pair = MktPair(
bs_mktid=str(conid),
dst=dst,
price_tick=price_tick,
# NOTE: for "legacy" assets, volume is normally discreet, not
# a float, but we keep a digit in case the suitz decide
# to get crazy and change it; we'll be kinda ready
# schema-wise..
size_tick=Decimal('1'),
src=src, # XXX: normally always a fiat
_atype=atype,
venue=exch,
expiry=expiry,
broker='ib',
_fqme_without_src=(atype != 'fiat'),
)
fqme: str = pair.fqme
# NOTE: for flex records the normal fields for defining an fqme
# sometimes won't be available so we rely on two approaches for
# the "reverse lookup" of piker style fqme keys:
# - when dealing with API trade records received from
# `IB.trades()` we do a contract lookup at he time of processing
# - when dealing with flex records, it is assumed the record
# is at least a day old and thus the TWS position reporting system
# should already have entries if the pps are still open, in
# which case, we can pull the fqme from that table (see
# `trades_dialogue()` above).
trans = Transaction(
fqme=fqme,
tid=tid,
size=size,
price=price,
cost=comms,
dt=dt,
expiry=expiry,
bs_mktid=str(conid),
)
insort(
records,
trans,
txn,
key=lambda t: t.dt
)
@ -258,14 +299,14 @@ def api_trades_to_ledger_entries(
# instead of pre-casting to dicts?
trade_entries: list[dict],
) -> dict:
) -> dict[str, dict]:
'''
Convert API execution objects entry objects into ``dict`` form,
pretty much straight up without modification except add
a `pydatetime` field from the parsed timestamp.
'''
trades_by_account = {}
trades_by_account: dict[str, dict] = {}
for t in trade_entries:
# NOTE: example of schema we pull from the API client.
# {