Add API trade/exec entry parsing and ledger updates

Since "flex reports" are only available for the current session's trades
the day after, this adds support for also collecting trade execution
records for the current session and writing them to the equivalent
ledger file.

Summary:
- add `trades_to_records()` to handle parsing both flex and API event
  objects into a common record form.
- add `norm_trade_records()` to handle converting ledger entries into
  `TradeRecord` types from the new `piker.pps` mod (coming in next
  commit).
lifo_pps_ib
Tyler Goodlet 2022-06-10 13:25:08 -04:00
parent eb2bad5138
commit 88b4ccc768
3 changed files with 193 additions and 29 deletions

View File

@ -38,7 +38,10 @@ from .feed import (
open_symbol_search, open_symbol_search,
stream_quotes, stream_quotes,
) )
from .broker import trades_dialogue from .broker import (
trades_dialogue,
norm_trade_records,
)
__all__ = [ __all__ = [
'get_client', 'get_client',

View File

@ -483,6 +483,14 @@ class Client:
return con return con
async def get_con(
self,
conid: int,
) -> Contract:
return await self.ib.qualifyContractsAsync(
ibis.Contract(conId=conid)
)
async def find_contract( async def find_contract(
self, self,
pattern: str, pattern: str,

View File

@ -28,6 +28,7 @@ from typing import (
AsyncIterator, AsyncIterator,
) )
from bidict import bidict
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
@ -44,8 +45,10 @@ from ib_insync.objects import (
Execution, Execution,
) )
from ib_insync.objects import Position from ib_insync.objects import Position
import pendulum
from piker import config from piker import config
from piker.pp import TradeRecord
from piker.log import get_console_log from piker.log import get_console_log
from piker.clearing._messages import ( from piker.clearing._messages import (
BrokerdOrder, BrokerdOrder,
@ -56,6 +59,7 @@ from piker.clearing._messages import (
BrokerdFill, BrokerdFill,
BrokerdError, BrokerdError,
) )
from piker.data._source import Symbol
from .api import ( from .api import (
_accounts2clients, _accounts2clients,
_adhoc_futes_set, _adhoc_futes_set,
@ -64,6 +68,7 @@ from .api import (
open_client_proxies, open_client_proxies,
Client, Client,
) )
# from .feed import open_data_client
def pack_position( def pack_position(
@ -95,7 +100,6 @@ def pack_position(
symkey += f'.{expiry}' symkey += f'.{expiry}'
# TODO: options contracts into a sane format.. # TODO: options contracts into a sane format..
return BrokerdPosition( return BrokerdPosition(
broker='ib', broker='ib',
account=pos.account, account=pos.account,
@ -317,11 +321,72 @@ async def trades_dialogue(
all_positions.append(msg.dict()) all_positions.append(msg.dict())
trades: list[dict] = [] trades_by_account: dict = {}
conf = get_config()
for proxy in proxies.values(): for proxy in proxies.values():
trades.append(await proxy.trades()) trade_entries = await proxy.trades()
# {
# 'commissionReport': CommissionReport(
# execId='',
# commission=0.0,
# currency='',
# realizedPNL=0.0,
# yield_=0.0,
# yieldRedemptionDate=0),
# 'contract': {
# 'comboLegs': [],
# 'comboLegsDescrip': '',
# 'conId': 477837024,
# 'currency': 'USD',
# 'deltaNeutralContract': None,
# 'exchange': 'GLOBEX',
# 'includeExpired': False,
# 'lastTradeDateOrContractMonth': '20220617',
# 'localSymbol': 'MNQM2',
# 'multiplier': '2',
# 'primaryExchange': '',
# 'right': '?',
# 'secId': '',
# 'secIdType': '',
# 'secType': 'FUT',
# 'strike': 0.0,
# 'symbol': 'MNQ',
# 'tradingClass': 'MNQ'
# },
# 'execution': Execution(
# execId='0000e1a7.62a2315f.01.01',
# time=1654801166.0,
# acctNumber='DU5612476',
# exchange='GLOBEX',
# side='BOT',
# shares=1.0,
# price=12443.5,
# permId=778998556,
# clientId=6116,
# orderId=555,
# liquidation=0,
# cumQty=1.0,
# avgPrice=12443.5,
# orderRef='',
# evRule='',
# evMultiplier=0.0,
# modelCode='',
# lastLiquidity=1
# ),
# 'time': 1654801166.0
# }
trades_by_account.update(
trades_to_records(
conf['accounts'].inverse,
trade_entries,
)
)
log.info(f'Loaded {len(trades)} from this session') for acctid, trades_by_id in trades_by_account.items():
with config.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id)
# log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml`` # TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file # - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and # - get the "flex reports" working and pull historical data and
@ -514,12 +579,114 @@ async def deliver_trade_events(
await ems_stream.send(msg.dict()) await ems_stream.send(msg.dict())
def norm_trade_records(
ledger: dict[str, Any],
) -> dict[str, list[TradeRecord]]:
'''
Normalize a flex report or API retrieved executions
ledger into our standard record format.
'''
records: list[TradeRecord] = []
# async with open_data_client() as proxy:
for tid, record in ledger.items():
# date, time = record['dateTime']
# cost = record['cost']
# action = record['buySell']
conid = record.get('conId') or record['conid']
comms = record.get('ibCommission', 0)
price = record.get('price') or record['tradePrice']
size = record.get('shares') or record['quantity']
symbol = record['symbol']
# special handling of symbol extraction from
# flex records using some ad-hoc schema parsing.
instr = record.get('assetCategory')
if instr == 'FUT':
symbol = record['description'][:3]
# try to build out piker fqsn from record.
expiry = record.get('lastTradeDateOrContractMonth') or record['expiry']
exch = record.get('listingExchange') or record['exchange']
fqsn = Symbol.from_broker_info(
broker='ib',
symbol=symbol,
suffix=f'{exch}.{expiry}',
info={},
).front_fqsn()
# NOTE: for flex records the normal fields won't be available so
# we have to do a lookup at some point to reverse map the conid
# to a fqsn.
# con = await proxy.get_con(conid)
records.append(TradeRecord(
fqsn=fqsn,
tid=tid,
size=size,
price=price,
cost=comms,
symkey=conid,
))
return records
def trades_to_records(
accounts: bidict,
trade_entries: list[object],
source_type: str = 'api',
) -> dict:
trades_by_account = {}
for t in trade_entries:
if source_type == 'flex':
entry = t.__dict__
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave this as an ``int``?
tid = str(entry['tradeID'])
# date = str(entry['tradeDate'])
acctid = accounts[str(entry['accountId'])]
elif source_type == 'api':
entry = {}
for section, obj in t.items():
match section:
case 'commisionReport' | 'execution':
entry.update(asdict(obj))
case 'contract':
entry.update(obj)
tid = str(entry['execId'])
dt = pendulum.from_timestamp(entry['time'])
entry['date'] = str(dt)
acctid = accounts[entry['acctNumber']]
trades_by_account.setdefault(
acctid, {}
)[tid] = entry
return trades_by_account
def load_flex_trades( def load_flex_trades(
path: Optional[str] = None, path: Optional[str] = None,
) -> dict[str, str]: ) -> dict[str, str]:
from pprint import pprint
from ib_insync import flexreport, util from ib_insync import flexreport, util
conf = get_config() conf = get_config()
@ -555,33 +722,19 @@ def load_flex_trades(
report = flexreport.FlexReport(path=path) report = flexreport.FlexReport(path=path)
trade_entries = report.extract('Trade') trade_entries = report.extract('Trade')
trades_by_account = trades_to_records(
# get reverse map to user account names
conf['accounts'].inverse,
trade_entries,
source_type='flex',
)
# get reverse map to user account names # ln = len(trades)
accounts = conf['accounts'].inverse # log.info(f'Loaded {ln} trades from flex query')
trades_by_account = {}
for t in trade_entries:
# XXX: LOL apparently ``toml`` has a bug
# where a section key error will show up in the write
# if you leave this as an ``int``?
trade = t.__dict__
# oddly for some so-called "BookTrade" entries
# this field seems to be blank, no cuckin clue.
# trade['ibExecID']
tid = str(trade['tradeID'])
date = str(trade['tradeDate'])
acctid = accounts[str(trade['accountId'])]
trades_by_account.setdefault(
acctid, {}
).setdefault(date, {})[tid] = trade
ln = len(trades_by_account.values())
log.info(f'Loaded {ln} trades from flex query')
for acctid, trades_by_id in trades_by_account.items(): for acctid, trades_by_id in trades_by_account.items():
with config.open_trade_ledger('ib', acctid) as ledger: with config.open_trade_ledger('ib', acctid) as ledger:
ledger.update({'ib': trades_by_id}) ledger.update(trades_by_id)
if __name__ == '__main__': if __name__ == '__main__':