Get real-time trade oriented pp updates workin

What a nightmare this was.. main holdup was that cost (commissions)
reports are fired independent from "fills" so you can't really emit
a proper full position update until they both arrive.

Deatz:
- move `push_tradesies()` and relay loop in `deliver_trade_events()` to
  the new py3.10 `match:` syntax B)
- subscribe for, and handle `CommissionReport` events from `ib_insync`
  and repack as a `cost` event type.
- handle cons with no primary/listing exchange (like futes) in
  `update_ledger_from_api_trades()` by falling back to the plain
  'exchange' field.
- drop reverse fqsn lookup from ib positions map; just use contract
  lookup for api trade logs since we're already connected..
- make validation in `update_and_audit()` optional via flag.
- pass in the accounts def, ib pp msg table and the proxies table to the
  trade event relay task-loop.
- add `emit_pp_update()` too encapsulate a full api trade entry
  incremental update which calls into the `piker.pp` apis to,
  - update the ledger
  - update the pps.toml
  - generate a new `BrokerdPosition` msg to send to the ems
- adjust trades relay loop to only emit pp updates when a cost report
  arrives for the fill/execution by maintaining a small table per exec
  id.
lifo_pps_ib
Tyler Goodlet 2022-06-16 10:38:11 -04:00
parent 3991d8f911
commit fbee33b00d
1 changed files with 289 additions and 173 deletions

View File

@ -44,6 +44,7 @@ from ib_insync.order import (
from ib_insync.objects import ( from ib_insync.objects import (
Fill, Fill,
Execution, Execution,
CommissionReport,
) )
from ib_insync.objects import Position from ib_insync.objects import Position
import pendulum import pendulum
@ -214,19 +215,35 @@ async def recv_trade_updates(
# sync with trio task # sync with trio task
to_trio.send_nowait(None) to_trio.send_nowait(None)
def push_tradesies(eventkit_obj, obj, fill=None): def push_tradesies(
"""Push events to trio task. eventkit_obj,
obj,
fill: Optional[Fill] = None,
report: Optional[CommissionReport] = None,
):
'''
Push events to trio task.
""" '''
if fill is not None: match eventkit_obj.name():
case 'orderStatusEvent':
item = ('status', obj)
case 'commissionReportEvent':
assert report
item = ('cost', report)
case 'execDetailsEvent':
# execution details event # execution details event
item = ('fill', (obj, fill)) item = ('fill', (obj, fill))
elif eventkit_obj.name() == 'positionEvent': case 'positionEvent':
item = ('position', obj) item = ('position', obj)
else: case _:
item = ('status', obj) log.error(f'Error unknown event {obj}')
return
log.info(f'eventkit event ->\n{pformat(item)}') log.info(f'eventkit event ->\n{pformat(item)}')
@ -242,15 +259,15 @@ async def recv_trade_updates(
'execDetailsEvent', # all "fill" updates 'execDetailsEvent', # all "fill" updates
'positionEvent', # avg price updates per symbol per account 'positionEvent', # avg price updates per symbol per account
# 'commissionReportEvent',
# XXX: ugh, it is a separate event from IB and it's # XXX: ugh, it is a separate event from IB and it's
# emitted as follows: # emitted as follows:
# self.ib.commissionReportEvent.emit(trade, fill, report) # self.ib.commissionReportEvent.emit(trade, fill, report)
'commissionReportEvent',
# XXX: not sure yet if we need these # XXX: not sure yet if we need these
# 'updatePortfolioEvent', # 'updatePortfolioEvent',
# XXX: these all seem to be weird ib_insync intrernal # XXX: these all seem to be weird ib_insync internal
# events that we probably don't care that much about # events that we probably don't care that much about
# given the internal design is wonky af.. # given the internal design is wonky af..
# 'newOrderEvent', # 'newOrderEvent',
@ -267,7 +284,7 @@ async def recv_trade_updates(
async def update_ledger_from_api_trades( async def update_ledger_from_api_trades(
trade_entries: dict[str, Any], trade_entries: list[dict[str, Any]],
ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg ib_pp_msgs: dict[int, BrokerdPosition], # conid -> msg
client: Union[Client, MethodProxy], client: Union[Client, MethodProxy],
@ -277,14 +294,6 @@ async def update_ledger_from_api_trades(
# LIFO style breakeven pricing calcs. # LIFO style breakeven pricing calcs.
conf = get_config() conf = get_config()
# retreive new trade executions from the last session
# and/or day's worth of trading and convert into trade
# records suitable for a local ledger file.
# trades_by_account: dict = {}
# for client in clients:
# trade_entries = await client.trades()
# XXX; ERRGGG.. # XXX; ERRGGG..
# pack in the "primary/listing exchange" value from a # pack in the "primary/listing exchange" value from a
# contract lookup since it seems this isn't available by # contract lookup since it seems this isn't available by
@ -295,8 +304,13 @@ async def update_ledger_from_api_trades(
pexch = condict['primaryExchange'] pexch = condict['primaryExchange']
if not pexch: if not pexch:
con = (await client.get_con(conid=conid))[0] cons = await client.get_con(conid=conid)
pexch = con.primaryExchange if cons:
con = cons[0]
pexch = con.primaryExchange or con.exchange
else:
# for futes it seems like the primary is always empty?
pexch = condict['exchange']
entry['listingExchange'] = pexch entry['listingExchange'] = pexch
@ -304,38 +318,29 @@ async def update_ledger_from_api_trades(
conf['accounts'].inverse, conf['accounts'].inverse,
trade_entries, trade_entries,
) )
# trades_by_account.update(records)
actives = {}
# write recent session's trades to the user's (local) ledger file. # write recent session's trades to the user's (local) ledger file.
for acctid, trades_by_id in records.items(): for acctid, trades_by_id in records.items():
with pp.open_trade_ledger('ib', acctid) as ledger: with pp.open_trade_ledger('ib', acctid) as ledger:
ledger.update(trades_by_id) ledger.update(trades_by_id)
# (incrementally) update the user's pps in mem and # normalize
# in the `pps.toml`.
records = norm_trade_records(trades_by_id) records = norm_trade_records(trades_by_id)
# remap stupid ledger fqsns (which are often # (incrementally) update the user's pps in mem and
# filled with lesser venue/exchange values) to # in the `pps.toml`.
# the ones we pull from the API via ib's reported
# positioning messages.
for r in records:
normed_msg = ib_pp_msgs[r.bsuid]
if normed_msg.symbol != r.fqsn:
log.warning(
f'Remapping ledger fqsn: {r.fqsn} -> {normed_msg.symbol}'
)
r.fqsn = normed_msg.symbol
active = pp.update_pps_conf('ib', acctid, records) active = pp.update_pps_conf('ib', acctid, records)
actives.update(active)
return active return actives
async def update_and_audit( async def update_and_audit(
by_fqsn: dict[str, pp.Position], by_fqsn: dict[str, pp.Position],
cids2pps: dict[int, BrokerdPosition], cids2pps: dict[int, BrokerdPosition],
validate: bool = False,
) -> list[BrokerdPosition]: ) -> list[BrokerdPosition]:
@ -369,6 +374,9 @@ async def update_and_audit(
size=p.size, size=p.size,
avg_price=p.avg_price, avg_price=p.avg_price,
) )
msgs.append(msg)
if validate:
ibsize = ibppmsg.size ibsize = ibppmsg.size
pikersize = msg.size pikersize = msg.size
diff = pikersize - ibsize diff = pikersize - ibsize
@ -379,8 +387,8 @@ async def update_and_audit(
if diff: if diff:
raise ValueError( raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n' f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {msg}\n' f'ib: {ibppmsg}\n'
f'piker: {ibppmsg}\n' f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' 'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
) )
msg.size = ibsize msg.size = ibsize
@ -395,8 +403,6 @@ async def update_and_audit(
f'piker, LIFO breakeven PnL price: {msg.avg_price}' f'piker, LIFO breakeven PnL price: {msg.avg_price}'
) )
msgs.append(msg)
return msgs return msgs
@ -449,8 +455,8 @@ async def trades_dialogue(
assert account in accounts_def assert account in accounts_def
accounts.add(account) accounts.add(account)
cids2pps = {} cids2pps: dict[str, BrokerdPosition] = {}
used_accounts = set() active_accts: set[str] = set()
# process pp value reported from ib's system. we only use these # process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses # to cross-check sizing since average pricing on their end uses
@ -461,7 +467,7 @@ async def trades_dialogue(
for pos in client.positions(): for pos in client.positions():
cid, msg = pack_position(pos) cid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account] acctid = msg.account = accounts_def.inverse[msg.account]
used_accounts.add(acctid) active_accts.add(acctid)
cids2pps[cid] = msg cids2pps[cid] = msg
assert msg.account in accounts, ( assert msg.account in accounts, (
f'Position for unknown account: {msg.account}') f'Position for unknown account: {msg.account}')
@ -469,8 +475,9 @@ async def trades_dialogue(
# update trades ledgers for all accounts from # update trades ledgers for all accounts from
# connected api clients. # connected api clients.
for account, proxy in proxies.items(): for account, proxy in proxies.items():
trades = await proxy.trades()
await update_ledger_from_api_trades( await update_ledger_from_api_trades(
await proxy.trades(), trades,
cids2pps, # pass these in to map to correct fqsns.. cids2pps, # pass these in to map to correct fqsns..
proxy, proxy,
) )
@ -478,10 +485,10 @@ async def trades_dialogue(
# load all positions from `pps.toml`, cross check with ib's # load all positions from `pps.toml`, cross check with ib's
# positions data, and relay re-formatted pps as msgs to the ems. # positions data, and relay re-formatted pps as msgs to the ems.
for acctid, by_fqsn in pp.get_pps( for acctid, by_fqsn in pp.get_pps(
'ib', acctids=used_accounts, 'ib', acctids=active_accts,
).items(): ).items():
msgs = await update_and_audit(by_fqsn, cids2pps) msgs = await update_and_audit(by_fqsn, cids2pps, validate=True)
all_positions.extend(msg.dict() for msg in msgs) all_positions.extend(msg.dict() for msg in msgs)
if not all_positions and cids2pps: if not all_positions and cids2pps:
@ -512,32 +519,73 @@ async def trades_dialogue(
deliver_trade_events, deliver_trade_events,
stream, stream,
ems_stream, ems_stream,
accounts_def accounts_def,
cids2pps,
proxies,
) )
# block until cancelled # block until cancelled
await trio.sleep_forever() await trio.sleep_forever()
async def emit_pp_update(
ems_stream: tractor.MsgStream,
trade_entry: dict,
accounts_def: bidict,
proxies: dict,
cids2pps: dict,
) -> None:
# compute and relay incrementally updated piker pp
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
proxy = proxies[acctid]
await update_ledger_from_api_trades(
[trade_entry],
cids2pps, # pass these in to map to correct fqsns..
proxy,
)
# load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as
# msgs to the ems.
for acctid, by_fqsn in pp.get_pps(
'ib',
acctids={acctid},
).items():
# should only be one right?
msgs = await update_and_audit(
by_fqsn,
cids2pps,
validate=False,
)
for msg in msgs:
await ems_stream.send(msg.dict())
async def deliver_trade_events( async def deliver_trade_events(
trade_event_stream: trio.MemoryReceiveChannel, trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
accounts_def: dict[str, str], accounts_def: dict[str, str],
cids2pps: dict[str, BrokerdPosition],
proxies: dict[str, MethodProxy],
) -> None: ) -> None:
'''Format and relay all trade events for a given client to the EMS. '''
Format and relay all trade events for a given client to emsd.
''' '''
action_map = {'BOT': 'buy', 'SLD': 'sell'} action_map = {'BOT': 'buy', 'SLD': 'sell'}
ids2fills: dict[str, dict] = {}
# TODO: for some reason we can receive a ``None`` here when the # TODO: for some reason we can receive a ``None`` here when the
# ib-gw goes down? Not sure exactly how that's happening looking # ib-gw goes down? Not sure exactly how that's happening looking
# at the eventkit code above but we should probably handle it... # at the eventkit code above but we should probably handle it...
async for event_name, item in trade_event_stream: async for event_name, item in trade_event_stream:
log.info(f'ib sending {event_name}:\n{pformat(item)}') log.info(f'ib sending {event_name}:\n{pformat(item)}')
match event_name:
# TODO: templating the ib statuses in comparison with other # TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go: # brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
@ -561,7 +609,7 @@ async def deliver_trade_events(
# reqId 1550: Order held while securities are located.'), # reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')], # status='PreSubmitted', message='')],
if event_name == 'status': case 'status':
# XXX: begin normalization of nonsense ib_insync internal # XXX: begin normalization of nonsense ib_insync internal
# object-state tracking representations... # object-state tracking representations...
@ -590,8 +638,9 @@ async def deliver_trade_events(
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
await ems_stream.send(msg.dict())
elif event_name == 'fill': case 'fill':
# for wtv reason this is a separate event type # for wtv reason this is a separate event type
# from IB, not sure why it's needed other then for extra # from IB, not sure why it's needed other then for extra
@ -605,17 +654,35 @@ async def deliver_trade_events(
# https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations
trade: Trade trade: Trade
fill: Fill fill: Fill
# TODO: maybe we can use matching to better handle these cases.
trade, fill = item trade, fill = item
execu: Execution = fill.execution execu: Execution = fill.execution
execid = execu.execId
# TODO: normalize out commissions details? # TODO:
details = { # - normalize out commissions details?
# - this is the same as the unpacking loop above in
# ``trades_to_records()`` no?
trade_entry = ids2fills.setdefault(execid, {})
cost_already_rx = bool(trade_entry)
# if the costs report was already received this
# should be not empty right?
comms = fill.commissionReport.commission
if cost_already_rx:
assert comms
trade_entry.update(
{
'contract': asdict(fill.contract), 'contract': asdict(fill.contract),
'execution': asdict(fill.execution), 'execution': asdict(fill.execution),
'commissions': asdict(fill.commissionReport), 'commissionReport': asdict(fill.commissionReport),
'broker_time': execu.time, # supposedly server fill time # supposedly server fill time?
'broker_time': execu.time,
'name': 'ib', 'name': 'ib',
} }
)
msg = BrokerdFill( msg = BrokerdFill(
# should match the value returned from `.submit_limit()` # should match the value returned from `.submit_limit()`
@ -626,14 +693,62 @@ async def deliver_trade_events(
size=execu.shares, size=execu.shares,
price=execu.price, price=execu.price,
broker_details=details, broker_details=trade_entry,
# XXX: required by order mode currently # XXX: required by order mode currently
broker_time=details['broker_time'], broker_time=trade_entry['broker_time'],
) )
await ems_stream.send(msg.dict())
elif event_name == 'error': # 2 cases:
# - fill comes first or
# - comms report comes first
comms = fill.commissionReport.commission
if comms:
# UGHHH since the commision report object might be
# filled in **after** we already serialized to dict..
# def need something better for all this.
trade_entry.update(
{'commissionReport': asdict(fill.commissionReport)}
)
if comms or cost_already_rx:
# only send a pp update once we have a cost report
print("EMITTING PP")
await emit_pp_update(
ems_stream,
trade_entry,
accounts_def,
proxies,
cids2pps,
)
case 'cost':
cr: CommissionReport = item
execid = cr.execId
trade_entry = ids2fills.setdefault(execid, {})
fill_already_rx = bool(trade_entry)
# no fill msg has arrived yet so just fill out the
# cost report for now and when the fill arrives a pp
# msg can be emitted.
trade_entry.update(
{'commissionReport': asdict(cr)}
)
if fill_already_rx:
print("EMITTING PP")
await emit_pp_update(
ems_stream,
trade_entry,
accounts_def,
proxies,
cids2pps,
)
case 'error':
err: dict = item err: dict = item
# f$#$% gawd dammit insync.. # f$#$% gawd dammit insync..
@ -647,14 +762,15 @@ async def deliver_trade_events(
# TODO: what schema for this msg if we're going to make it # TODO: what schema for this msg if we're going to make it
# portable across all backends? # portable across all backends?
# msg = BrokerdError(**err) # msg = BrokerdError(**err)
continue
elif event_name == 'position': case 'position':
cid, msg = pack_position(item) cid, msg = pack_position(item)
msg.account = accounts_def.inverse[msg.account] # acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict())
elif event_name == 'event': case 'event':
# it's either a general system status event or an external # it's either a general system status event or an external
# trade event? # trade event?
@ -666,8 +782,6 @@ async def deliver_trade_events(
# if getattr(msg, 'reqid', 0) < -1: # if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}") # log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
continue
# msg.reqid = 'tws-' + str(-1 * reqid) # msg.reqid = 'tws-' + str(-1 * reqid)
# mark msg as from "external system" # mark msg as from "external system"
@ -675,11 +789,8 @@ async def deliver_trade_events(
# considering multiplayer/group trades tracking # considering multiplayer/group trades tracking
# msg.broker_details['external_src'] = 'tws' # msg.broker_details['external_src'] = 'tws'
# XXX: we always serialize to a dict for msgpack case _:
# translations, ideally we can move to an msgspec (or other) log.error(f'WTF: {event_name}: {item}')
# encoder # that can be enabled in ``tractor`` ahead of
# time so we can pass through the message types directly.
await ems_stream.send(msg.dict())
def norm_trade_records( def norm_trade_records(
@ -814,6 +925,11 @@ def trades_to_records(
case 'contract' | 'execution' | 'commissionReport': case 'contract' | 'execution' | 'commissionReport':
# sub-dict cases # sub-dict cases
entry.update(val) entry.update(val)
case 'time':
# ib has wack ns timestamps, or is that us?
continue
case _: case _:
entry[section] = val entry[section] = val