piker/piker/brokers/ib/broker.py

1312 lines
44 KiB
Python

# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Order and trades endpoints for use with ``piker``'s EMS.
"""
from __future__ import annotations
from contextlib import ExitStack
# from collections import ChainMap
from functools import partial
from pprint import pformat
import time
from typing import (
Any,
AsyncIterator,
)
from bidict import bidict
import trio
from trio_typing import TaskStatus
import tractor
from tractor.to_asyncio import LinkedTaskChannel
from ib_insync.contract import (
Contract,
)
from ib_insync.order import (
Trade,
OrderStatus,
)
from ib_insync.objects import (
Fill,
Execution,
CommissionReport,
Position as IbPosition,
)
from piker import config
from piker.types import Struct
from piker.accounting import (
Position,
Transaction,
open_trade_ledger,
TransactionLedger,
open_account,
Account,
Asset,
MktPair,
)
from piker.data import (
open_symcache,
SymbologyCache,
)
from piker.clearing import OrderDialogs
from piker.clearing._messages import (
Order,
Status,
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdPosition,
BrokerdCancel,
BrokerdFill,
BrokerdError,
)
from ._util import log
from .api import (
_accounts2clients,
get_config,
open_client_proxies,
Client,
MethodProxy,
)
from .symbols import (
con2fqme,
# get_mkt_info,
)
from .ledger import (
norm_trade_records,
tx_sort,
update_ledger_from_api_trades,
)
def pack_position(
pos: IbPosition,
accounts_def: bidict[str, str],
) -> tuple[
str,
dict[str, Any]
]:
con: Contract = pos.contract
fqme, calc_price = con2fqme(con)
# TODO: options contracts into a sane format..
return (
str(con.conId),
BrokerdPosition(
broker='ib',
account=accounts_def.inverse[pos.account],
symbol=fqme,
currency=con.currency,
size=float(pos.position),
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
),
)
async def handle_order_requests(
ems_order_stream: tractor.MsgStream,
accounts_def: dict[str, str],
flows: OrderDialogs,
) -> None:
request_msg: dict
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action: str = request_msg['action']
account: str = request_msg['account']
acct_number = accounts_def.get(account)
oid: str = request_msg['oid']
if not acct_number:
log.error(
f'An IB account number for name {account} is not found?\n'
'Make sure you have all TWS and GW instances running.'
)
err_msg = BrokerdError(
oid=oid,
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
)
await ems_order_stream.send(err_msg)
continue
client = _accounts2clients.get(account)
if not client:
log.error(
f'An IB client for account name {account} is not found.\n'
'Make sure you have all TWS and GW instances running.'
)
err_msg = BrokerdError(
oid=oid,
symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?',
)
await ems_order_stream.send(err_msg)
continue
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid: int | None = order.reqid
if reqid is not None:
log.error(f'TYPE .reqid: {reqid} -> {type(reqid)}')
reqid = int(reqid)
# call our client api to submit the order
reqid = client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
account=acct_number,
reqid=reqid,
)
str_reqid: str = str(reqid)
if reqid is None:
err_msg = BrokerdError(
oid=oid,
symbol=request_msg['symbol'],
reason='Order already active?',
)
await ems_order_stream.send(err_msg)
# deliver ack that order has been submitted to broker routing
ack = BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=str_reqid,
account=account,
)
await ems_order_stream.send(ack)
flows.add_msg(str_reqid, order.to_dict())
flows.add_msg(str_reqid, ack.to_dict())
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
client.submit_cancel(reqid=int(msg.reqid))
else:
log.error(f'Unknown order command: {request_msg}')
async def recv_trade_updates(
client: Client,
to_trio: trio.abc.SendChannel,
) -> None:
'''
Receive and relay order control and positioning related events
from `ib_insync`, pack as tuples and push over mem-chan to our
trio relay task for processing and relay to EMS.
'''
client.inline_errors(to_trio)
# sync with trio task
to_trio.send_nowait(client.ib)
def push_tradesies(
eventkit_obj,
obj,
fill: Fill | None = None,
report: CommissionReport | None = None,
):
'''
Push events to trio task.
'''
emit: tuple | object
event_name: str = eventkit_obj.name()
match event_name:
case 'orderStatusEvent':
emit: Trade = obj
case 'commissionReportEvent':
assert report
emit: CommissionReport = report
case 'execDetailsEvent':
# execution details event
emit: tuple[Trade, Fill] = (obj, fill)
case 'positionEvent':
emit: Position = obj
case _:
log.error(f'Error unknown event {obj}')
return
log.info(f'eventkit event ->\n{pformat(emit)}')
try:
# emit event name + relevant ibis internal objects
to_trio.send_nowait((event_name, emit))
except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies)
# hook up to the weird eventkit object - event stream api
for ev_name in [
'orderStatusEvent', # all order updates
'execDetailsEvent', # all "fill" updates
'positionEvent', # avg price updates per symbol per account
# XXX: ugh, it is a separate event from IB and it's
# emitted as follows:
# self.ib.commissionReportEvent.emit(trade, fill, report)
'commissionReportEvent',
# XXX: not sure yet if we need these
# -> prolly not since the named tuple type doesn't offer
# much more then a few more pnl fields..
# 'updatePortfolioEvent',
# XXX: these all seem to be weird ib_insync internal
# events that we probably don't care that much about
# given the internal design is wonky af..
# 'newOrderEvent',
# 'orderModifyEvent',
# 'cancelOrderEvent',
# 'openOrderEvent',
]:
eventkit_obj = getattr(client.ib, ev_name)
handler = partial(push_tradesies, eventkit_obj)
eventkit_obj.connect(handler)
# let the engine run and stream
await client.ib.disconnectedEvent
async def update_and_audit_pos_msg(
acctid: str, # no `ib.` prefix is required!
pikerpos: Position,
ibpos: IbPosition,
cons2mkts: dict[Contract, MktPair],
validate: bool = True,
) -> BrokerdPosition:
# NOTE: lookup the ideal `MktPair` value, since multi-venue
# trade records can result in multiple MktpPairs (eg. qqq.arca.ib and
# qqq.nasdaq.ib can map to the same bs_mktid but have obviously
# different .fqme: str values..), instead allow caller to
# provide a table with the desired output mkt-map values;
# eventually this should probably come from a deterministically
# generated symcache..
# TODO: figure out how make this not be so frickin CRAP by
# either allowing bs_mktid to be the position key or possibly
# be extra pendantic with the `Client._mkts` table?
con: Contract = ibpos.contract
mkt: MktPair = cons2mkts.get(con, pikerpos.mkt)
bs_fqme: str = mkt.bs_fqme
msg = BrokerdPosition(
broker='ib',
# TODO: probably forget about this once we drop this msg
# entirely from our set..
# XXX: ok so this is annoying, we're relaying
# an account name with the backend suffix prefixed
# but when reading accounts from ledgers we don't
# need it and/or it's prefixed in the section
# table.. we should just strip this from the message
# right since `.broker` is already included?
account=f'ib.{acctid}',
# account=account_def.inverse[ibpos.account],
# XXX: the `.ib` is stripped..?
symbol=bs_fqme,
# remove..
# currency=ibpos.currency,
# NOTE: always take their size since it's usually the
# true gospel.. this SHOULD be the same always as ours
# tho..
# size=pikerpos.size,
size=ibpos.position,
avg_price=pikerpos.ppu,
)
ibfmtmsg: str = pformat(ibpos._asdict())
pikerfmtmsg: str = pformat(msg.to_dict())
ibsize: float = ibpos.position
pikersize: float = msg.size
diff: float = pikersize - ibsize
# NOTE: compare equivalent ib reported position message for
# comparison/audit versus the piker equivalent breakeven pp
# calcs. if ib reports a lesser pp it's not as bad since we can
# presume we're at least not more in the shit then we thought.
if (
diff
and (
pikersize
or ibsize
)
):
# reverse_split_ratio = pikersize / ibsize
# split_ratio = 1/reverse_split_ratio
# if split_ratio >= reverse_split_ratio:
# entry = f'split_ratio = {int(split_ratio)}'
# else:
# entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
msg.size = ibsize
logmsg: str = (
f'Pos mismatch in ib vs. the piker ledger!\n'
f'IB:\n{ibfmtmsg}\n\n'
f'PIKER:\n{pikerfmtmsg}\n\n'
# 'If you are expecting a (reverse) split in this '
# 'instrument you should probably put the following'
# 'in the `pps.toml` section:\n'
# f'{entry}\n'
# f'reverse_split_ratio: {reverse_split_ratio}\n'
# f'split_ratio: {split_ratio}\n\n'
)
if validate:
raise ValueError(logmsg)
else:
# await tractor.pause()
log.error(logmsg)
# TODO: make this a "propaganda" log level?
if ibpos.avgCost != msg.avg_price:
log.warning(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {ibfmtmsg}\n'
'---------------------------\n'
f'piker: {pformat(msg.to_dict())}'
)
return msg
async def aggr_open_orders(
order_msgs: list[Status],
client: Client,
proxy: MethodProxy,
accounts_def: bidict[str, str],
) -> None:
'''
Collect all open orders from client and fill in `order_msgs: list`.
'''
trades: list[Trade] = client.ib.openTrades()
for trade in trades:
order = trade.order
quant = trade.order.totalQuantity
action = order.action.lower()
size = {
'sell': -1,
'buy': 1,
}[action] * quant
con = trade.contract
# TODO: in the case of the SMART venue (aka ib's
# router-clearing sys) we probably should handle
# showing such orders overtop of the fqme for the
# primary exchange, how to map this easily is going
# to be a bit tricky though?
deats = await proxy.con_deats(contracts=[con])
fqme = list(deats)[0]
reqid: str = str(order.orderId)
# TODO: maybe embed a ``BrokerdOrder`` instead
# since then we can directly load it on the client
# side in the order mode loop?
msg = Status(
time_ns=time.time_ns(),
resp='open',
oid=reqid,
reqid=reqid,
# embedded order info
req=Order(
action=action,
exec_mode='live',
oid=str(reqid),
symbol=fqme,
account=accounts_def.inverse[order.account],
price=order.lmtPrice,
size=size,
),
src='ib',
)
order_msgs.append(msg)
return order_msgs
async def open_trade_event_stream(
client: Client,
task_status: TaskStatus[
LinkedTaskChannel
] = trio.TASK_STATUS_IGNORED,
):
'''
Proxy wrapper for starting trade event stream from ib_insync
which spawns an asyncio task that registers an internal closure
(`push_tradies()`) which in turn relays trading events through
a `tractor.to_asyncio.LinkedTaskChannel` which the parent
(caller task) can use to process said events in trio-land.
NOTE: each api client has a unique event stream.
'''
trade_event_stream: LinkedTaskChannel
async with tractor.to_asyncio.open_channel_from(
recv_trade_updates,
client=client,
) as (
_, # first pushed val
trade_event_stream,
):
task_status.started(trade_event_stream)
# block forever to keep session trio-asyncio session
# up until cancelled or error on either side.
await trio.sleep_forever()
class IbAcnt(Struct):
'''
Wrapper around the useful info for doing accounting (mostly for
position tracking).
'''
key: str
balances: dict[
str, # fiat or crypto name
float # current balance
]
# TODO: do we need the asset instances?
# (float, Asset),
# ]
positions: dict[str, IbPosition]
@tractor.context
async def open_trade_dialog(
ctx: tractor.Context,
) -> AsyncIterator[dict[str, Any]]:
# task local msg dialog tracking
flows = OrderDialogs()
accounts_def = config.load_accounts(['ib'])
# deliver positions to subscriber before anything else
all_positions: list[BrokerdPosition] = []
accounts: set[str] = set()
acctids: set[str] = set()
symcache: SymbologyCache
async with (
open_client_proxies() as (
proxies,
aioclients,
),
# TODO: do this as part of `open_account()`!?
open_symcache('ib', only_from_memcache=True) as symcache,
):
# Open a trade ledgers stack for appending trade records over
# multiple accounts.
# TODO: we probably want to generalize this into a "ledgers" api..
ledgers: dict[str, TransactionLedger] = {}
tables: dict[str, Account] = {}
order_msgs: list[Status] = []
conf = get_config()
accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse
with (
ExitStack() as lstack,
):
# load ledgers and pps for all detected client-proxies
account: str
proxy: MethodProxy
for account, proxy in proxies.items():
assert account in accounts_def
accounts.add(account)
acctid: str = account.strip('ib.')
acctids.add(acctid)
# open ledger and pptable wrapper for each
# detected account.
ledger: TransactionLedger
ledger = ledgers[acctid] = lstack.enter_context(
open_trade_ledger(
'ib',
acctid,
tx_sort=tx_sort,
symcache=symcache,
)
)
# load all positions from `pps.toml`, cross check with
# ib's positions data, and relay re-formatted pps as
# msgs to the ems.
# __2 cases__:
# - new trades have taken place this session that we want to
# always reprocess indempotently,
# - no new trades yet but we want to reload and audit any
# positions reported by ib's sys that may not yet be in
# piker's ``pps.toml`` state-file.
tables[acctid] = lstack.enter_context(
open_account(
'ib',
acctid,
write_on_exit=True,
)
)
for account, proxy in proxies.items():
client: Client = aioclients[account]
# process pp value reported from ib's system. we only
# use these to cross-check sizing since average pricing
# on their end uses the so called (bs) "FIFO" style
# which more or less results in a price that's not
# useful for traders who want to not lose money.. xb
# -> collect all ib-pp reported positions so that we can be
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
# await tractor.pp()
ib_positions: dict[str, IbPosition] = {}
pos: IbPosition # named tuple subtype
for pos in client.positions():
bs_mktid: str = str(pos.contract.conId)
ib_positions[bs_mktid] = pos
bs_mktid, msg = pack_position(pos, accounts_def)
acctid: str = msg.account.strip('ib.')
assert msg.account in accounts, (
f'Position for unknown account: {msg.account}')
balances: dict[str, tuple[float, Asset]] = {}
for av in client.ib.accountValues():
match av.tag:
case 'CashBalance':
balances[av.currency] = float(av.value)
# TODO: if we want supposed forex pnls?
# case 'UnrealizedPnL':
# ...
ibacnt = IbAcnt(
key=acctid,
balances=balances,
positions=ib_positions,
)
# print(
# f'Current balances for {ibacnt.key}: {ibacnt.balances}'
# )
# order_msgs is filled in by this helper
await aggr_open_orders(
order_msgs,
client,
proxy,
accounts_def,
)
acctid: str = account.strip('ib.')
ledger: dict = ledgers[acctid]
acnt: Account = tables[acctid]
# update position table with latest ledger from all
# gathered transactions: ledger file + api records.
trans: dict[str, Transaction] = norm_trade_records(
ledger,
symcache=symcache,
)
# update trades ledgers for all accounts from connected
# api clients which report trades for **this session**.
api_fills: list[Fill] = await proxy.get_fills()
if api_fills:
api_trans_by_acct: dict[str, Transaction]
api_to_ledger_entries: dict[str, dict]
(
api_trans_by_acct,
api_to_ledger_entries,
) = await update_ledger_from_api_trades(
api_fills,
proxy,
accounts_def_inv,
symcache=symcache,
)
# if new api_fills are detected from the API, prepare
# them for the ledger file and update the pptable.
if (
api_to_ledger_entries
and (trade_entries := api_to_ledger_entries.get(acctid))
):
# TODO: fix this `tractor` BUG!
# https://github.com/goodboy/tractor/issues/354
# await tractor.pp()
# write ledger with all new api_fills
# **AFTER** we've updated the `pps.toml`
# from the original ledger state! (i.e. this
# is currently done on exit)
for tid, entry in trade_entries.items():
ledger.setdefault(tid, {}).update(entry)
if api_trans := api_trans_by_acct.get(acctid):
trans.update(api_trans)
# update account (and thus pps) from all gathered transactions
acnt.update_from_ledger(
trans,
symcache=ledger.symcache,
)
# iterate all (newly) updated pps tables for every
# client-account and build out position msgs to deliver to
# EMS.
for acctid, acnt in tables.items():
active_pps, closed_pps = acnt.dump_active()
for pps in [active_pps, closed_pps]:
piker_pps: list[Position] = list(pps.values())
for pikerpos in piker_pps:
# TODO: map from both the contract ID
# (bs_mktid) AND the piker-ified FQME ??
# - they might change the fqme when bbby get's
# downlisted to pink XD
# - the bs_mktid can randomly change like in
# gnln.nasdaq..
ibpos: IbPosition | None = ibacnt.positions.get(
pikerpos.bs_mktid
)
if ibpos:
bs_mktid: str = str(ibpos.contract.conId)
msg = await update_and_audit_pos_msg(
acctid,
pikerpos,
ibpos,
cons2mkts=client._cons2mkts,
validate=False,
)
if msg and msg.size != 0:
all_positions.append(msg)
elif (
not ibpos
and pikerpos.cumsize
):
logmsg: str = (
f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
)
log.error(logmsg)
await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
))
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# relay existing open orders to ems
for msg in order_msgs:
await ems_stream.send(msg)
for client in set(aioclients.values()):
trade_event_stream: LinkedTaskChannel = await n.start(
open_trade_event_stream,
client,
)
# start order request handler **before** local trades
# event loop
n.start_soon(
handle_order_requests,
ems_stream,
accounts_def,
flows,
)
# allocate event relay tasks for each client connection
n.start_soon(
deliver_trade_events,
trade_event_stream,
ems_stream,
accounts_def,
proxies,
ledgers,
tables,
flows,
)
# write account and ledger files immediately!
# TODO: make this thread-async!
for acctid, acnt in tables.items():
acnt.write_config()
ledgers[acctid].write_config()
# block until cancelled
await trio.sleep_forever()
async def emit_pp_update(
ems_stream: tractor.MsgStream,
accounts_def: bidict[str, str],
proxies: dict,
ledgers: dict[str, dict[str, Any]],
acnts: dict[str, Account],
ibpos: IbPosition, # required!
# NEED it before we actually update the trade ledger
fill: Fill | None = None,
) -> None:
'''
Emit a position update to the EMS either directly from
a `IbPosition` update (received from the API) or ideally from
a `piker.accounting.Position` update (once it's entirely bug
free xD) by extracting the trade record from the (optionally
provided) `Fill` event, convert it into a `Transaction`, update
the backing ledger and emit a msg for the account's `Position`
entry.
'''
accounts_def_inv: bidict[str, str] = accounts_def.inverse
accnum: str = ibpos.account
fq_acctid: str = accounts_def_inv[accnum]
proxy: MethodProxy = proxies[fq_acctid]
client: Client = proxy._aio_ns
# XXX FILL CASE:
# compute and relay incrementally updated piker pos
# after doing accounting calcs
if fill:
(
records_by_acct,
api_to_ledger_entries,
) = await update_ledger_from_api_trades(
[fill],
proxy,
accounts_def_inv,
)
trans: dict[str, Transaction] = records_by_acct[fq_acctid]
tx: Transaction = list(trans.values())[0]
acctid: str = fq_acctid.strip('ib.')
acnt: Account = acnts[acctid]
ledger: TransactionLedger = ledgers[acctid]
# write to disk/storage
ledger.write_config()
# con: Contract = fill.contract
# provide a backup fqme -> MktPair table in case the
# symcache does not (yet) have an entry for the current mkt
# txn.
backup_table: dict[str, MktPair] = {}
for tid, txn in trans.items():
fqme: str = txn.fqme
if fqme not in ledger.symcache.mktmaps:
# bs_mktid: str = txn.bs_mktid
backup_table[fqme] = client._cons2mkts[
client._contracts[fqme]
]
acnt.update_from_ledger(
trans,
# XXX: currently this is likely empty since we have no
# support!
symcache=ledger.symcache,
# TODO: remove this hack by attempting to symcache an
# incrementally updated table?
_mktmap_table=backup_table,
)
# re-compute all positions that have changed state.
# TODO: likely we should change the API to return the
# position updates from `.update_from_ledger()`?
active, closed = acnt.dump_active()
# NOTE: update ledger with all new trades
for fq_acctid, trades_by_id in api_to_ledger_entries.items():
acctid: str = fq_acctid.strip('ib.')
ledger: dict = ledgers[acctid]
# NOTE: don't override flex/previous entries with new API
# ones, just update with new fields or create new entry.
for tid, tdict in trades_by_id.items():
ledger.setdefault(tid, {}).update(tdict)
# generate pp msgs and cross check with ib's positions data, relay
# re-formatted pps as msgs to the ems.
msg: dict | None = None
for pos in filter(
bool,
[
active.get(tx.bs_mktid),
closed.get(tx.bs_mktid)
]
):
msg = await update_and_audit_pos_msg(
acctid,
pos,
ibpos,
cons2mkts=client._cons2mkts,
# ib pp event might not have arrived yet
validate=False,
)
if msg:
log.info(f'Emitting pp msg: {msg}')
break
# XXX NO FILL CASE:
# if just handed an `IbPosition`, pack it and relay for now
# since we always want the size to be up to date even if
# the ppu is wrong..
else:
bs_mktid, msg = pack_position(ibpos, accounts_def)
if msg:
await ems_stream.send(msg)
else:
await tractor.pause()
# NOTE: See `OrderStatus` def for key schema;
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# => we remap statuses to the ems set via the below table:
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# XXX: here's some other sucky cases from the api
# - short-sale but securities haven't been located, in this
# case we should probably keep the order in some kind of
# weird state or cancel it outright?
# status='PendingSubmit', message=''),
# status='Cancelled', message='Error 404,
# reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')],
_statuses: dict[str, str] = {
'Filled': 'filled',
'Cancelled': 'canceled',
'Submitted': 'open',
'PendingSubmit': 'pending',
'PendingCancel': 'pending',
'PreSubmitted': 'pending',
'ApiPending': 'pending',
'ApiCancelled': 'pending',
# TODO: see a current ``ib_insync`` issue around this:
# https://github.com/erdewit/ib_insync/issues/363
'Inactive': 'pending',
}
_action_map = {
'BOT': 'buy',
'SLD': 'sell',
}
# TODO: try out cancelling inactive orders after delay:
# https://github.com/erdewit/ib_insync/issues/363 (was originally
# inside `deliver_trade_events` status handler block.
# acctid = accounts_def.inverse[trade.order.account]
# double check there is no error when
# cancelling.. gawwwd
# if ib_status_key == 'cancelled':
# last_log = trade.log[-1]
# if (
# last_log.message
# and 'Error' not in last_log.message
# ):
# ib_status_key = trade.log[-2].status
#
# elif ib_status_key == 'inactive':
#
# async def sched_cancel():
# log.warning(
# 'OH GAWD an inactive order.scheduling a cancel\n'
# f'{pformat(item)}'
# )
# proxy = proxies[acctid]
# await proxy.submit_cancel(reqid=trade.order.orderId)
# await trio.sleep(1)
# nurse.start_soon(sched_cancel)
#
# nurse.start_soon(sched_cancel)
# TODO: maybe just make this a flat func without an interal loop
# and call it *from* the `trade_event_stream` loop? Might look
# a lot nicer doing that from open_trade_dialog() instead of
# starting a separate task?
async def deliver_trade_events(
# nurse: trio.Nursery,
trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream,
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
proxies: dict[str, MethodProxy],
ledgers,
tables,
flows: OrderDialogs,
) -> None:
'''
Format and relay all trade events for a given client to emsd.
'''
# task local msg dialog tracking
clears: dict[
Contract,
list[
IbPosition | None, # filled by positionEvent
Fill | None, # filled by order status and exec details
]
] = {}
execid2con: dict[str, Contract] = {}
# TODO: for some reason we can receive a ``None`` here when the
# ib-gw goes down? Not sure exactly how that's happening looking
# at the eventkit code above but we should probably handle it...
async for event_name, item in trade_event_stream:
log.info(f'Relaying `{event_name}`:\n{pformat(item)}')
match event_name:
case 'orderStatusEvent':
# XXX: begin normalization of nonsense ib_insync internal
# object-state tracking representations...
# unwrap needed data from ib_insync internal types
trade: Trade = item
reqid: str = str(trade.order.orderId)
status: OrderStatus = trade.orderStatus
status_str: str = _statuses[status.status]
remaining: float = status.remaining
if (
status_str == 'filled'
):
fill: Fill = trade.fills[-1]
execu: Execution = fill.execution
fill_msg = BrokerdFill(
time_ns=time.time_ns(), # cuz why not
# NOTE: should match the value returned from
# `.submit_limit()`
reqid=reqid,
action=_action_map[execu.side],
size=execu.shares,
price=execu.price,
# DO we care? should this be in another
# msg like the position msg?
# broker_details=execdict,
# XXX: required by order mode currently
broker_time=execu.time,
)
await ems_stream.send(fill_msg)
flows.add_msg(reqid, fill_msg.to_dict())
if remaining == 0:
# emit a closed status on filled statuses where
# all units were cleared.
status_str = 'closed'
# skip duplicate filled updates - we get the deats
# from the execution details event
msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(), # cuz why not
account=accounts_def.inverse[trade.order.account],
# everyone doin camel case..
status=status_str,
filled=status.filled,
reason=status.whyHeld,
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
remaining=remaining,
broker_details={'name': 'ib'},
)
await ems_stream.send(msg)
flows.add_msg(reqid, msg.to_dict())
# XXX: for wtv reason this is a separate event type
# from IB, not sure why it's needed other then for extra
# complexity and over-engineering :eyeroll:.
# we may just end up dropping these events (or
# translating them to ``Status`` msgs) if we can
# show the equivalent status events are no more latent.
case 'execDetailsEvent':
# unpack attrs pep-0526 style.
trade: Trade
con: Contract = trade.contract
fill: Fill
trade, fill = item
execu: Execution = fill.execution
execid: str = execu.execId
report: CommissionReport = fill.commissionReport
# always fill in id to con map so when commissions
# arrive we can maybe fire the pos update..
execid2con[execid] = con
# TODO:
# - normalize out commissions details?
# - this is the same as the unpacking loop above in
# ``trades_to_ledger_entries()`` no?
# 2 cases:
# - fill comes first or
# - commission report comes first
clear: tuple = clears.setdefault(
con,
[None, fill],
)
pos, _fill = clear
# NOTE: we have to handle the case where a pos msg
# has already been set (bc we already relayed rxed
# one before both the exec-deats AND the
# comms-report?) but the comms-report hasn't yet
# arrived, so we fill in the fill (XD) and wait for
# the cost to show up before relaying the pos msg
# to the EMS..
if _fill is None:
clear[1] = fill
cost: float = report.commission
if (
pos
and fill
and cost
):
await emit_pp_update(
ems_stream,
accounts_def,
proxies,
ledgers,
tables,
ibpos=pos,
fill=fill,
)
clears.pop(con)
case 'commissionReportEvent':
cr: CommissionReport = item
execid: str = cr.execId
# only fire a pp msg update if,
# - we haven't already
# - the fill event has already arrived
# but it didn't yet have a commision report
# which we fill in now.
# placehold i guess until someone who know wtf
# contract this is from can fill it in...
con: Contract | None = execid2con.setdefault(execid, None)
if (
con
and (clear := clears.get(con))
):
pos, fill = clear
if (
pos
and fill
):
now_cr: CommissionReport = fill.commissionReport
if (now_cr != cr):
log.warning(
'UhhHh ib updated the commission report mid-fill..?\n'
f'was: {pformat(cr)}\n'
f'now: {pformat(now_cr)}\n'
)
await emit_pp_update(
ems_stream,
accounts_def,
proxies,
ledgers,
tables,
ibpos=pos,
fill=fill,
)
clears.pop(con)
# TODO: should we clean this?
# execid2con.pop(execid)
# always update with latest ib pos msg info since
# we generally audit against it for sanity and
# testing AND we require it to be updated to avoid
# error msgs emitted from `update_and_audit_pos_msg()`
case 'positionEvent':
pos: IbPosition = item
con: Contract = pos.contract
bs_mktid, ppmsg = pack_position(pos, accounts_def)
log.info(f'New IB position msg: {ppmsg}')
_, fill = clears.setdefault(
con,
[pos, None],
)
# only send a pos update once we've actually rxed
# the msg from IB since generally speaking we use
# their 'cumsize' as gospel.
await emit_pp_update(
ems_stream,
accounts_def,
proxies,
ledgers,
tables,
ibpos=pos,
)
case 'error':
# NOTE: see impl deats in
# `Client.inline_errors()::push_err()`
err: dict = item
# never relay errors for non-broker related issues
# https://interactivebrokers.github.io/tws-api/message_codes.html
code: int = err['error_code']
if code in {
200, # uhh
# hist pacing / connectivity
162,
165,
# WARNING codes:
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# Attribute 'Outside Regular Trading Hours' is
# " 'ignored based on the order type and
# destination. PlaceOrder is now ' 'being
# processed.',
2109,
# XXX: lol this isn't even documented..
# 'No market data during competing live session'
1669,
}:
continue
reqid: str = str(err['reqid'])
reason: str = err['reason']
if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}')
flow: dict = dict(
flows.get(reqid)
or {}
)
# TODO: we don't want to relay data feed / lookup errors
# so we need some further filtering logic here..
# for most cases the 'status' block above should take
# care of this.
err_msg = BrokerdError(
reqid=reqid,
reason=reason,
broker_details={
'name': 'ib',
'flow': flow,
},
)
flows.add_msg(reqid, err_msg.to_dict())
await ems_stream.send(err_msg)
case 'event':
# it's either a general system status event or an external
# trade event?
log.info(f"TWS system status: \n{pformat(item)}")
# TODO: support this again but needs parsing at the callback
# level...
# reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg)}")
# msg.reqid = 'tws-' + str(-1 * reqid)
# mark msg as from "external system"
# TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking
# msg.broker_details['external_src'] = 'tws'
case _:
log.error(f'WTF: {event_name}: {item}')