1344 lines
46 KiB
Python
1344 lines
46 KiB
Python
# piker: trading gear for hackers
|
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
"""
|
|
Order and trades endpoints for use with ``piker``'s EMS.
|
|
|
|
"""
|
|
from __future__ import annotations
|
|
from bisect import insort
|
|
from contextlib import ExitStack
|
|
from dataclasses import asdict
|
|
from functools import partial
|
|
from pprint import pformat
|
|
import time
|
|
from typing import (
|
|
Any,
|
|
Optional,
|
|
AsyncIterator,
|
|
Union,
|
|
)
|
|
|
|
from bidict import bidict
|
|
import trio
|
|
from trio_typing import TaskStatus
|
|
import tractor
|
|
from ib_insync.contract import (
|
|
Contract,
|
|
)
|
|
from ib_insync.order import (
|
|
Trade,
|
|
OrderStatus,
|
|
)
|
|
from ib_insync.objects import (
|
|
Fill,
|
|
Execution,
|
|
CommissionReport,
|
|
)
|
|
from ib_insync.objects import Position as IbPosition
|
|
import pendulum
|
|
|
|
from piker import config
|
|
from piker.pp import (
|
|
Position,
|
|
Transaction,
|
|
open_trade_ledger,
|
|
open_pps,
|
|
PpTable,
|
|
)
|
|
from piker.log import get_console_log
|
|
from piker.clearing._messages import (
|
|
Order,
|
|
Status,
|
|
BrokerdOrder,
|
|
BrokerdOrderAck,
|
|
BrokerdStatus,
|
|
BrokerdPosition,
|
|
BrokerdCancel,
|
|
BrokerdFill,
|
|
BrokerdError,
|
|
)
|
|
from piker.data._source import Symbol
|
|
from .api import (
|
|
_accounts2clients,
|
|
con2fqsn,
|
|
log,
|
|
get_config,
|
|
open_client_proxies,
|
|
Client,
|
|
MethodProxy,
|
|
)
|
|
|
|
|
|
def pack_position(
|
|
pos: IbPosition
|
|
|
|
) -> dict[str, Any]:
|
|
|
|
con = pos.contract
|
|
fqsn, calc_price = con2fqsn(con)
|
|
|
|
# TODO: options contracts into a sane format..
|
|
return (
|
|
con.conId,
|
|
BrokerdPosition(
|
|
broker='ib',
|
|
account=pos.account,
|
|
symbol=fqsn,
|
|
currency=con.currency,
|
|
size=float(pos.position),
|
|
avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
|
|
),
|
|
)
|
|
|
|
|
|
async def handle_order_requests(
|
|
|
|
ems_order_stream: tractor.MsgStream,
|
|
accounts_def: dict[str, str],
|
|
|
|
) -> None:
|
|
|
|
request_msg: dict
|
|
async for request_msg in ems_order_stream:
|
|
log.info(f'Received order request {request_msg}')
|
|
|
|
action = request_msg['action']
|
|
account = request_msg['account']
|
|
|
|
acct_number = accounts_def.get(account)
|
|
if not acct_number:
|
|
log.error(
|
|
f'An IB account number for name {account} is not found?\n'
|
|
'Make sure you have all TWS and GW instances running.'
|
|
)
|
|
await ems_order_stream.send(
|
|
BrokerdError(
|
|
oid=request_msg['oid'],
|
|
symbol=request_msg['symbol'],
|
|
reason=f'No account found: `{account}` ?',
|
|
)
|
|
)
|
|
continue
|
|
|
|
client = _accounts2clients.get(account)
|
|
if not client:
|
|
log.error(
|
|
f'An IB client for account name {account} is not found.\n'
|
|
'Make sure you have all TWS and GW instances running.'
|
|
)
|
|
await ems_order_stream.send(BrokerdError(
|
|
oid=request_msg['oid'],
|
|
symbol=request_msg['symbol'],
|
|
reason=f'No api client loaded for account: `{account}` ?',
|
|
))
|
|
continue
|
|
|
|
if action in {'buy', 'sell'}:
|
|
# validate
|
|
order = BrokerdOrder(**request_msg)
|
|
|
|
# XXX: by default 0 tells ``ib_insync`` methods that
|
|
# there is no existing order so ask the client to create
|
|
# a new one (which it seems to do by allocating an int
|
|
# counter - collision prone..)
|
|
reqid = order.reqid
|
|
if reqid is not None:
|
|
reqid = int(reqid)
|
|
|
|
# call our client api to submit the order
|
|
reqid = client.submit_limit(
|
|
oid=order.oid,
|
|
symbol=order.symbol,
|
|
price=order.price,
|
|
action=order.action,
|
|
size=order.size,
|
|
account=acct_number,
|
|
reqid=reqid,
|
|
)
|
|
if reqid is None:
|
|
await ems_order_stream.send(BrokerdError(
|
|
oid=request_msg['oid'],
|
|
symbol=request_msg['symbol'],
|
|
reason='Order already active?',
|
|
))
|
|
|
|
# deliver ack that order has been submitted to broker routing
|
|
await ems_order_stream.send(
|
|
BrokerdOrderAck(
|
|
# ems order request id
|
|
oid=order.oid,
|
|
# broker specific request id
|
|
reqid=reqid,
|
|
account=account,
|
|
)
|
|
)
|
|
|
|
if action == 'cancel':
|
|
msg = BrokerdCancel(**request_msg)
|
|
client.submit_cancel(reqid=int(msg.reqid))
|
|
|
|
else:
|
|
log.error(f'Unknown order command: {request_msg}')
|
|
|
|
|
|
async def recv_trade_updates(
|
|
|
|
client: Client,
|
|
to_trio: trio.abc.SendChannel,
|
|
|
|
) -> None:
|
|
"""Stream a ticker using the std L1 api.
|
|
"""
|
|
client.inline_errors(to_trio)
|
|
|
|
# sync with trio task
|
|
to_trio.send_nowait(None)
|
|
|
|
def push_tradesies(
|
|
eventkit_obj,
|
|
obj,
|
|
fill: Optional[Fill] = None,
|
|
report: Optional[CommissionReport] = None,
|
|
):
|
|
'''
|
|
Push events to trio task.
|
|
|
|
'''
|
|
match eventkit_obj.name():
|
|
|
|
case 'orderStatusEvent':
|
|
item = ('status', obj)
|
|
|
|
case 'commissionReportEvent':
|
|
assert report
|
|
item = ('cost', report)
|
|
|
|
case 'execDetailsEvent':
|
|
# execution details event
|
|
item = ('fill', (obj, fill))
|
|
|
|
case 'positionEvent':
|
|
item = ('position', obj)
|
|
|
|
case _:
|
|
log.error(f'Error unknown event {obj}')
|
|
return
|
|
|
|
log.info(f'eventkit event ->\n{pformat(item)}')
|
|
|
|
try:
|
|
to_trio.send_nowait(item)
|
|
except trio.BrokenResourceError:
|
|
log.exception(f'Disconnected from {eventkit_obj} updates')
|
|
eventkit_obj.disconnect(push_tradesies)
|
|
|
|
# hook up to the weird eventkit object - event stream api
|
|
for ev_name in [
|
|
'orderStatusEvent', # all order updates
|
|
'execDetailsEvent', # all "fill" updates
|
|
'positionEvent', # avg price updates per symbol per account
|
|
|
|
# XXX: ugh, it is a separate event from IB and it's
|
|
# emitted as follows:
|
|
# self.ib.commissionReportEvent.emit(trade, fill, report)
|
|
'commissionReportEvent',
|
|
|
|
# XXX: not sure yet if we need these
|
|
# 'updatePortfolioEvent',
|
|
|
|
# XXX: these all seem to be weird ib_insync internal
|
|
# events that we probably don't care that much about
|
|
# given the internal design is wonky af..
|
|
# 'newOrderEvent',
|
|
# 'orderModifyEvent',
|
|
# 'cancelOrderEvent',
|
|
# 'openOrderEvent',
|
|
]:
|
|
eventkit_obj = getattr(client.ib, ev_name)
|
|
handler = partial(push_tradesies, eventkit_obj)
|
|
eventkit_obj.connect(handler)
|
|
|
|
# let the engine run and stream
|
|
await client.ib.disconnectedEvent
|
|
|
|
|
|
async def update_ledger_from_api_trades(
|
|
trade_entries: list[dict[str, Any]],
|
|
client: Union[Client, MethodProxy],
|
|
|
|
) -> tuple[
|
|
dict[str, Transaction],
|
|
dict[str, dict],
|
|
]:
|
|
|
|
# XXX; ERRGGG..
|
|
# pack in the "primary/listing exchange" value from a
|
|
# contract lookup since it seems this isn't available by
|
|
# default from the `.fills()` method endpoint...
|
|
for entry in trade_entries:
|
|
condict = entry['contract']
|
|
conid = condict['conId']
|
|
pexch = condict['primaryExchange']
|
|
|
|
if not pexch:
|
|
cons = await client.get_con(conid=conid)
|
|
if cons:
|
|
con = cons[0]
|
|
pexch = con.primaryExchange or con.exchange
|
|
else:
|
|
# for futes it seems like the primary is always empty?
|
|
pexch = condict['exchange']
|
|
|
|
entry['listingExchange'] = pexch
|
|
|
|
conf = get_config()
|
|
entries = trades_to_ledger_entries(
|
|
conf['accounts'].inverse,
|
|
trade_entries,
|
|
)
|
|
# normalize recent session's trades to the `Transaction` type
|
|
trans_by_acct: dict[str, dict[str, Transaction]] = {}
|
|
|
|
for acctid, trades_by_id in entries.items():
|
|
# normalize to transaction form
|
|
trans_by_acct[acctid] = norm_trade_records(trades_by_id)
|
|
|
|
return trans_by_acct, entries
|
|
|
|
|
|
async def update_and_audit_msgs(
|
|
acctid: str, # no `ib.` prefix is required!
|
|
pps: list[Position],
|
|
cids2pps: dict[tuple[str, int], BrokerdPosition],
|
|
validate: bool = False,
|
|
|
|
) -> list[BrokerdPosition]:
|
|
|
|
msgs: list[BrokerdPosition] = []
|
|
for p in pps:
|
|
bsuid = p.bsuid
|
|
|
|
# retreive equivalent ib reported position message
|
|
# for comparison/audit versus the piker equivalent
|
|
# breakeven pp calcs.
|
|
ibppmsg = cids2pps.get((acctid, bsuid))
|
|
|
|
if ibppmsg:
|
|
msg = BrokerdPosition(
|
|
broker='ib',
|
|
|
|
# XXX: ok so this is annoying, we're relaying
|
|
# an account name with the backend suffix prefixed
|
|
# but when reading accounts from ledgers we don't
|
|
# need it and/or it's prefixed in the section
|
|
# table..
|
|
account=ibppmsg.account,
|
|
# XXX: the `.ib` is stripped..?
|
|
symbol=ibppmsg.symbol,
|
|
currency=ibppmsg.currency,
|
|
size=p.size,
|
|
avg_price=p.ppu,
|
|
)
|
|
msgs.append(msg)
|
|
|
|
if validate:
|
|
ibsize = ibppmsg.size
|
|
pikersize = msg.size
|
|
diff = pikersize - ibsize
|
|
|
|
# if ib reports a lesser pp it's not as bad since we can
|
|
# presume we're at least not more in the shit then we
|
|
# thought.
|
|
if diff:
|
|
reverse_split_ratio = pikersize / ibsize
|
|
split_ratio = 1/reverse_split_ratio
|
|
|
|
if split_ratio >= reverse_split_ratio:
|
|
entry = f'split_ratio = {int(split_ratio)}'
|
|
else:
|
|
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
|
|
|
|
raise ValueError(
|
|
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
|
f'ib: {ibppmsg}\n'
|
|
f'piker: {msg}\n'
|
|
f'reverse_split_ratio: {reverse_split_ratio}\n'
|
|
f'split_ratio: {split_ratio}\n\n'
|
|
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
|
|
'If you are expecting a (reverse) split in this '
|
|
'instrument you should probably put the following '
|
|
f'in the `pps.toml` section:\n{entry}'
|
|
)
|
|
msg.size = ibsize
|
|
|
|
if ibppmsg.avg_price != msg.avg_price:
|
|
|
|
# TODO: make this a "propoganda" log level?
|
|
log.warning(
|
|
'The mega-cucks at IB want you to believe with their '
|
|
f'"FIFO" positioning for {msg.symbol}:\n'
|
|
f'"ib" mega-cucker avg price: {ibppmsg.avg_price}\n'
|
|
f'piker, LIFO breakeven PnL price: {msg.avg_price}'
|
|
)
|
|
|
|
else:
|
|
# make brand new message
|
|
msg = BrokerdPosition(
|
|
broker='ib',
|
|
|
|
# XXX: ok so this is annoying, we're relaying
|
|
# an account name with the backend suffix prefixed
|
|
# but when reading accounts from ledgers we don't
|
|
# need it and/or it's prefixed in the section
|
|
# table.. we should just strip this from the message
|
|
# right since `.broker` is already included?
|
|
account=f'ib.{acctid}',
|
|
# XXX: the `.ib` is stripped..?
|
|
symbol=p.symbol.front_fqsn(),
|
|
# currency=ibppmsg.currency,
|
|
size=p.size,
|
|
avg_price=p.ppu,
|
|
)
|
|
if validate and p.size:
|
|
# raise ValueError(
|
|
log.error(
|
|
f'UNEXPECTED POSITION says ib:\n'
|
|
f'piker: {msg}\n'
|
|
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?\n'
|
|
'THEY LIQUIDATED YOU OR YOUR MISSING LEDGER RECORDS!?'
|
|
)
|
|
msgs.append(msg)
|
|
|
|
return msgs
|
|
|
|
|
|
@tractor.context
|
|
async def trades_dialogue(
|
|
|
|
ctx: tractor.Context,
|
|
loglevel: str = None,
|
|
|
|
) -> AsyncIterator[dict[str, Any]]:
|
|
|
|
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
|
get_console_log(loglevel or tractor.current_actor().loglevel)
|
|
|
|
accounts_def = config.load_accounts(['ib'])
|
|
|
|
global _client_cache
|
|
|
|
# deliver positions to subscriber before anything else
|
|
all_positions = []
|
|
accounts = set()
|
|
acctids = set()
|
|
cids2pps: dict[str, BrokerdPosition] = {}
|
|
|
|
# TODO: this causes a massive tractor bug when you run marketstored
|
|
# with ``--tsdb``... you should get:
|
|
# - first error the assertion
|
|
# - chart should get that error and die
|
|
# - pikerd goes to debugger again from trio nursery multi-error
|
|
# - hitting final control-c to kill daemon will lead to hang
|
|
# assert 0
|
|
|
|
# TODO: just write on teardown?
|
|
# we might also want to delegate a specific actor for
|
|
# ledger writing / reading for speed?
|
|
async with (
|
|
open_client_proxies() as (proxies, aioclients),
|
|
):
|
|
# Open a trade ledgers stack for appending trade records over
|
|
# multiple accounts.
|
|
# TODO: we probably want to generalize this into a "ledgers" api..
|
|
ledgers: dict[str, dict] = {}
|
|
tables: dict[str, PpTable] = {}
|
|
with (
|
|
ExitStack() as lstack,
|
|
):
|
|
# load ledgers and pps for all detected client-proxies
|
|
for account, proxy in proxies.items():
|
|
assert account in accounts_def
|
|
accounts.add(account)
|
|
acctid = account.strip('ib.')
|
|
acctids.add(acctid)
|
|
|
|
# open ledger and pptable wrapper for each
|
|
# detected account.
|
|
ledger = ledgers[acctid] = lstack.enter_context(
|
|
open_trade_ledger('ib', acctid)
|
|
)
|
|
table = tables[acctid] = lstack.enter_context(
|
|
open_pps('ib', acctid)
|
|
)
|
|
|
|
for account, proxy in proxies.items():
|
|
client = aioclients[account]
|
|
|
|
trades: list[Trade] = client.ib.openTrades()
|
|
order_msgs = []
|
|
for trade in trades:
|
|
|
|
order = trade.order
|
|
quant = trade.order.totalQuantity
|
|
action = order.action.lower()
|
|
size = {
|
|
'sell': -1,
|
|
'buy': 1,
|
|
}[action] * quant
|
|
con = trade.contract
|
|
|
|
# TODO: in the case of the SMART venue (aka ib's
|
|
# router-clearing sys) we probably should handle
|
|
# showing such orders overtop of the fqsn for the
|
|
# primary exchange, how to map this easily is going
|
|
# to be a bit tricky though?
|
|
deats = await proxy.con_deats(contracts=[con])
|
|
fqsn = list(deats)[0]
|
|
|
|
reqid = order.orderId
|
|
|
|
# TODO: maybe embed a ``BrokerdOrder`` instead
|
|
# since then we can directly load it on the client
|
|
# side in the order mode loop?
|
|
msg = Status(
|
|
time_ns=time.time_ns(),
|
|
resp='open',
|
|
oid=str(reqid),
|
|
reqid=reqid,
|
|
|
|
# embedded order info
|
|
req=Order(
|
|
action=action,
|
|
exec_mode='live',
|
|
oid=str(reqid),
|
|
symbol=fqsn,
|
|
account=accounts_def.inverse[order.account],
|
|
price=order.lmtPrice,
|
|
size=size,
|
|
),
|
|
src='ib',
|
|
)
|
|
order_msgs.append(msg)
|
|
|
|
# process pp value reported from ib's system. we only use these
|
|
# to cross-check sizing since average pricing on their end uses
|
|
# the so called (bs) "FIFO" style which more or less results in
|
|
# a price that's not useful for traders who want to not lose
|
|
# money.. xb
|
|
for pos in client.positions():
|
|
|
|
# collect all ib-pp reported positions so that we can be
|
|
# sure know which positions to update from the ledger if
|
|
# any are missing from the ``pps.toml``
|
|
bsuid, msg = pack_position(pos)
|
|
|
|
acctid = msg.account = accounts_def.inverse[msg.account]
|
|
acctid = acctid.strip('ib.')
|
|
cids2pps[(acctid, bsuid)] = msg
|
|
assert msg.account in accounts, (
|
|
f'Position for unknown account: {msg.account}')
|
|
|
|
ledger = ledgers[acctid]
|
|
table = tables[acctid]
|
|
|
|
pp = table.pps.get(bsuid)
|
|
if (
|
|
not pp
|
|
or pp.size != msg.size
|
|
):
|
|
trans = norm_trade_records(ledger)
|
|
table.update_from_trans(trans)
|
|
|
|
# update trades ledgers for all accounts from connected
|
|
# api clients which report trades for **this session**.
|
|
trades = await proxy.trades()
|
|
(
|
|
trans_by_acct,
|
|
api_to_ledger_entries,
|
|
) = await update_ledger_from_api_trades(
|
|
trades,
|
|
proxy,
|
|
)
|
|
|
|
# if new trades are detected from the API, prepare
|
|
# them for the ledger file and update the pptable.
|
|
if api_to_ledger_entries:
|
|
trade_entries = api_to_ledger_entries[acctid]
|
|
|
|
# write ledger with all new trades **AFTER**
|
|
# we've updated the `pps.toml` from the
|
|
# original ledger state! (i.e. this is
|
|
# currently done on exit)
|
|
ledger.update(trade_entries)
|
|
|
|
trans = trans_by_acct.get(acctid)
|
|
if trans:
|
|
table.update_from_trans(trans)
|
|
|
|
# XXX: not sure exactly why it wouldn't be in
|
|
# the updated output (maybe this is a bug?) but
|
|
# if you create a pos from TWS and then load it
|
|
# from the api trades it seems we get a key
|
|
# error from ``update[bsuid]`` ?
|
|
pp = table.pps.get(bsuid)
|
|
if not pp:
|
|
log.error(
|
|
f'The contract id for {msg} may have '
|
|
f'changed to {bsuid}\nYou may need to '
|
|
'adjust your ledger for this, skipping '
|
|
'for now.'
|
|
)
|
|
continue
|
|
|
|
# XXX: not sure exactly why it wouldn't be in
|
|
# the updated output (maybe this is a bug?) but
|
|
# if you create a pos from TWS and then load it
|
|
# from the api trades it seems we get a key
|
|
# error from ``update[bsuid]`` ?
|
|
pp = table.pps[bsuid]
|
|
if msg.size != pp.size:
|
|
log.error(
|
|
'Position mismatch {pp.symbol.front_fqsn()}:\n'
|
|
f'ib: {msg.size}\n'
|
|
f'piker: {pp.size}\n'
|
|
)
|
|
|
|
active_pps, closed_pps = table.dump_active()
|
|
|
|
# load all positions from `pps.toml`, cross check with
|
|
# ib's positions data, and relay re-formatted pps as
|
|
# msgs to the ems.
|
|
# __2 cases__:
|
|
# - new trades have taken place this session that we want to
|
|
# always reprocess indempotently,
|
|
# - no new trades yet but we want to reload and audit any
|
|
# positions reported by ib's sys that may not yet be in
|
|
# piker's ``pps.toml`` state-file.
|
|
for pps in [active_pps, closed_pps]:
|
|
msgs = await update_and_audit_msgs(
|
|
acctid,
|
|
pps.values(),
|
|
cids2pps,
|
|
validate=True,
|
|
)
|
|
all_positions.extend(msg for msg in msgs)
|
|
|
|
if not all_positions and cids2pps:
|
|
raise RuntimeError(
|
|
'Positions reported by ib but not found in `pps.toml`!?\n'
|
|
f'{pformat(cids2pps)}'
|
|
)
|
|
|
|
await ctx.started((
|
|
all_positions,
|
|
tuple(name for name in accounts_def if name in accounts),
|
|
))
|
|
|
|
# proxy wrapper for starting trade event stream
|
|
async def open_trade_event_stream(
|
|
client: Client,
|
|
task_status: TaskStatus[
|
|
trio.abc.ReceiveChannel
|
|
] = trio.TASK_STATUS_IGNORED,
|
|
):
|
|
# each api client has a unique event stream
|
|
async with tractor.to_asyncio.open_channel_from(
|
|
recv_trade_updates,
|
|
client=client,
|
|
) as (first, trade_event_stream):
|
|
|
|
task_status.started(trade_event_stream)
|
|
await trio.sleep_forever()
|
|
|
|
async with (
|
|
ctx.open_stream() as ems_stream,
|
|
trio.open_nursery() as n,
|
|
):
|
|
# relay existing open orders to ems
|
|
for msg in order_msgs:
|
|
await ems_stream.send(msg)
|
|
|
|
for client in set(aioclients.values()):
|
|
trade_event_stream = await n.start(
|
|
open_trade_event_stream,
|
|
client,
|
|
)
|
|
|
|
# start order request handler **before** local trades
|
|
# event loop
|
|
n.start_soon(
|
|
handle_order_requests,
|
|
ems_stream,
|
|
accounts_def,
|
|
)
|
|
|
|
# allocate event relay tasks for each client connection
|
|
n.start_soon(
|
|
deliver_trade_events,
|
|
n,
|
|
trade_event_stream,
|
|
ems_stream,
|
|
accounts_def,
|
|
cids2pps,
|
|
proxies,
|
|
|
|
ledgers,
|
|
tables,
|
|
)
|
|
|
|
# TODO: make this thread-async!
|
|
table.write_config()
|
|
|
|
# block until cancelled
|
|
await trio.sleep_forever()
|
|
|
|
|
|
async def emit_pp_update(
|
|
ems_stream: tractor.MsgStream,
|
|
trade_entry: dict,
|
|
accounts_def: bidict,
|
|
proxies: dict,
|
|
cids2pps: dict,
|
|
|
|
ledgers,
|
|
tables,
|
|
|
|
) -> None:
|
|
|
|
# compute and relay incrementally updated piker pp
|
|
acctid = accounts_def.inverse[trade_entry['execution']['acctNumber']]
|
|
proxy = proxies[acctid]
|
|
|
|
acctid = acctid.strip('ib.')
|
|
(
|
|
records_by_acct,
|
|
api_to_ledger_entries,
|
|
) = await update_ledger_from_api_trades(
|
|
[trade_entry],
|
|
proxy,
|
|
)
|
|
trans = records_by_acct[acctid]
|
|
r = list(trans.values())[0]
|
|
|
|
table = tables[acctid]
|
|
table.update_from_trans(trans)
|
|
active, closed = table.dump_active()
|
|
|
|
# NOTE: update ledger with all new trades
|
|
for acctid, trades_by_id in api_to_ledger_entries.items():
|
|
ledger = ledgers[acctid]
|
|
ledger.update(trades_by_id)
|
|
|
|
# generate pp msgs and cross check with ib's positions data, relay
|
|
# re-formatted pps as msgs to the ems.
|
|
for pos in filter(
|
|
bool,
|
|
[active.get(r.bsuid), closed.get(r.bsuid)]
|
|
):
|
|
msgs = await update_and_audit_msgs(
|
|
acctid,
|
|
[pos],
|
|
cids2pps,
|
|
|
|
# ib pp event might not have arrived yet
|
|
validate=False,
|
|
)
|
|
if msgs:
|
|
msg = msgs[0]
|
|
log.info('Emitting pp msg: {msg}')
|
|
break
|
|
|
|
await ems_stream.send(msg)
|
|
|
|
|
|
_statuses: dict[str, str] = {
|
|
'cancelled': 'canceled',
|
|
'submitted': 'open',
|
|
|
|
# XXX: just pass these through? it duplicates actual fill events other
|
|
# then the case where you the `.remaining == 0` case which is our
|
|
# 'closed'` case.
|
|
# 'filled': 'pending',
|
|
# 'pendingsubmit': 'pending',
|
|
|
|
# TODO: see a current ``ib_insync`` issue around this:
|
|
# https://github.com/erdewit/ib_insync/issues/363
|
|
'inactive': 'pending',
|
|
}
|
|
|
|
|
|
async def deliver_trade_events(
|
|
|
|
nurse: trio.Nursery,
|
|
trade_event_stream: trio.MemoryReceiveChannel,
|
|
ems_stream: tractor.MsgStream,
|
|
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
|
|
cids2pps: dict[tuple[str, str], BrokerdPosition],
|
|
proxies: dict[str, MethodProxy],
|
|
|
|
ledgers,
|
|
tables,
|
|
|
|
) -> None:
|
|
'''
|
|
Format and relay all trade events for a given client to emsd.
|
|
|
|
'''
|
|
action_map = {'BOT': 'buy', 'SLD': 'sell'}
|
|
ids2fills: dict[str, dict] = {}
|
|
|
|
# TODO: for some reason we can receive a ``None`` here when the
|
|
# ib-gw goes down? Not sure exactly how that's happening looking
|
|
# at the eventkit code above but we should probably handle it...
|
|
async for event_name, item in trade_event_stream:
|
|
log.info(f'ib sending {event_name}:\n{pformat(item)}')
|
|
|
|
match event_name:
|
|
# TODO: templating the ib statuses in comparison with other
|
|
# brokers is likely the way to go:
|
|
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
|
|
# short list:
|
|
# - PendingSubmit
|
|
# - PendingCancel
|
|
# - PreSubmitted (simulated orders)
|
|
# - ApiCancelled (cancelled by client before submission
|
|
# to routing)
|
|
# - Cancelled
|
|
# - Filled
|
|
# - Inactive (reject or cancelled but not by trader)
|
|
|
|
# XXX: here's some other sucky cases from the api
|
|
# - short-sale but securities haven't been located, in this
|
|
# case we should probably keep the order in some kind of
|
|
# weird state or cancel it outright?
|
|
|
|
# status='PendingSubmit', message=''),
|
|
# status='Cancelled', message='Error 404,
|
|
# reqId 1550: Order held while securities are located.'),
|
|
# status='PreSubmitted', message='')],
|
|
|
|
case 'status':
|
|
|
|
# XXX: begin normalization of nonsense ib_insync internal
|
|
# object-state tracking representations...
|
|
|
|
# unwrap needed data from ib_insync internal types
|
|
trade: Trade = item
|
|
status: OrderStatus = trade.orderStatus
|
|
ib_status_key = status.status.lower()
|
|
|
|
acctid = accounts_def.inverse[trade.order.account]
|
|
|
|
# double check there is no error when
|
|
# cancelling.. gawwwd
|
|
if ib_status_key == 'cancelled':
|
|
last_log = trade.log[-1]
|
|
if (
|
|
last_log.message
|
|
and 'Error' not in last_log.message
|
|
):
|
|
ib_status_key = trade.log[-2].status
|
|
print(ib_status_key)
|
|
|
|
elif ib_status_key == 'inactive':
|
|
async def sched_cancel():
|
|
log.warning(
|
|
'OH GAWD an inactive order..scheduling a cancel\n'
|
|
f'{pformat(item)}'
|
|
)
|
|
proxy = proxies[acctid]
|
|
await proxy.submit_cancel(reqid=trade.order.orderId)
|
|
await trio.sleep(1)
|
|
nurse.start_soon(sched_cancel)
|
|
|
|
nurse.start_soon(sched_cancel)
|
|
|
|
status_key = (
|
|
_statuses.get(ib_status_key)
|
|
or ib_status_key.lower()
|
|
)
|
|
|
|
remaining = status.remaining
|
|
if (
|
|
status_key == 'filled'
|
|
and remaining == 0
|
|
):
|
|
status_key = 'closed'
|
|
|
|
# skip duplicate filled updates - we get the deats
|
|
# from the execution details event
|
|
msg = BrokerdStatus(
|
|
|
|
reqid=trade.order.orderId,
|
|
time_ns=time.time_ns(), # cuz why not
|
|
account=accounts_def.inverse[trade.order.account],
|
|
|
|
# everyone doin camel case..
|
|
status=status_key, # force lower case
|
|
|
|
filled=status.filled,
|
|
reason=status.whyHeld,
|
|
|
|
# this seems to not be necessarily up to date in the
|
|
# execDetails event.. so we have to send it here I guess?
|
|
remaining=remaining,
|
|
|
|
broker_details={'name': 'ib'},
|
|
)
|
|
await ems_stream.send(msg)
|
|
|
|
case 'fill':
|
|
|
|
# for wtv reason this is a separate event type
|
|
# from IB, not sure why it's needed other then for extra
|
|
# complexity and over-engineering :eyeroll:.
|
|
# we may just end up dropping these events (or
|
|
# translating them to ``Status`` msgs) if we can
|
|
# show the equivalent status events are no more latent.
|
|
|
|
# unpack ib_insync types
|
|
# pep-0526 style:
|
|
# https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations
|
|
trade: Trade
|
|
fill: Fill
|
|
|
|
# TODO: maybe we can use matching to better handle these cases.
|
|
trade, fill = item
|
|
execu: Execution = fill.execution
|
|
execid = execu.execId
|
|
|
|
# TODO:
|
|
# - normalize out commissions details?
|
|
# - this is the same as the unpacking loop above in
|
|
# ``trades_to_ledger_entries()`` no?
|
|
trade_entry = ids2fills.setdefault(execid, {})
|
|
cost_already_rx = bool(trade_entry)
|
|
|
|
# if the costs report was already received this
|
|
# should be not empty right?
|
|
comms = fill.commissionReport.commission
|
|
if cost_already_rx:
|
|
assert comms
|
|
|
|
trade_entry.update(
|
|
{
|
|
'contract': asdict(fill.contract),
|
|
'execution': asdict(fill.execution),
|
|
# 'commissionReport': asdict(fill.commissionReport),
|
|
# supposedly server fill time?
|
|
'broker_time': execu.time,
|
|
'name': 'ib',
|
|
}
|
|
)
|
|
|
|
msg = BrokerdFill(
|
|
# should match the value returned from `.submit_limit()`
|
|
reqid=execu.orderId,
|
|
time_ns=time.time_ns(), # cuz why not
|
|
|
|
action=action_map[execu.side],
|
|
size=execu.shares,
|
|
price=execu.price,
|
|
|
|
broker_details=trade_entry,
|
|
# XXX: required by order mode currently
|
|
broker_time=trade_entry['broker_time'],
|
|
|
|
)
|
|
await ems_stream.send(msg)
|
|
|
|
# 2 cases:
|
|
# - fill comes first or
|
|
# - comms report comes first
|
|
comms = fill.commissionReport.commission
|
|
if comms:
|
|
# UGHHH since the commision report object might be
|
|
# filled in **after** we already serialized to dict..
|
|
# def need something better for all this.
|
|
trade_entry.update(
|
|
{'commissionReport': asdict(fill.commissionReport)}
|
|
)
|
|
|
|
if comms or cost_already_rx:
|
|
# only send a pp update once we have a cost report
|
|
await emit_pp_update(
|
|
ems_stream,
|
|
trade_entry,
|
|
accounts_def,
|
|
proxies,
|
|
cids2pps,
|
|
ledgers,
|
|
tables,
|
|
)
|
|
|
|
case 'cost':
|
|
|
|
cr: CommissionReport = item
|
|
execid = cr.execId
|
|
|
|
trade_entry = ids2fills.setdefault(execid, {})
|
|
fill_already_rx = bool(trade_entry)
|
|
|
|
# only fire a pp msg update if,
|
|
# - we haven't already
|
|
# - the fill event has already arrived
|
|
# but it didn't yet have a commision report
|
|
# which we fill in now.
|
|
if (
|
|
fill_already_rx
|
|
and 'commissionReport' not in trade_entry
|
|
):
|
|
# no fill msg has arrived yet so just fill out the
|
|
# cost report for now and when the fill arrives a pp
|
|
# msg can be emitted.
|
|
trade_entry.update(
|
|
{'commissionReport': asdict(cr)}
|
|
)
|
|
|
|
await emit_pp_update(
|
|
ems_stream,
|
|
trade_entry,
|
|
accounts_def,
|
|
proxies,
|
|
cids2pps,
|
|
ledgers,
|
|
tables,
|
|
)
|
|
|
|
case 'error':
|
|
err: dict = item
|
|
|
|
# f$#$% gawd dammit insync..
|
|
con = err['contract']
|
|
if isinstance(con, Contract):
|
|
err['contract'] = asdict(con)
|
|
|
|
if err['reqid'] == -1:
|
|
log.error(f'TWS external order error:\n{pformat(err)}')
|
|
|
|
# TODO: we don't want to relay data feed / lookup errors
|
|
# so we need some further filtering logic here..
|
|
# for most cases the 'status' block above should take
|
|
# care of this.
|
|
# await ems_stream.send(BrokerdStatus(
|
|
# status='error',
|
|
# reqid=err['reqid'],
|
|
# reason=err['reason'],
|
|
# time_ns=time.time_ns(),
|
|
# account=accounts_def.inverse[trade.order.account],
|
|
# broker_details={'name': 'ib'},
|
|
# ))
|
|
|
|
case 'position':
|
|
|
|
cid, msg = pack_position(item)
|
|
log.info(f'New IB position msg: {msg}')
|
|
# cuck ib and it's shitty fifo sys for pps!
|
|
continue
|
|
|
|
case 'event':
|
|
|
|
# it's either a general system status event or an external
|
|
# trade event?
|
|
log.info(f"TWS system status: \n{pformat(item)}")
|
|
|
|
# TODO: support this again but needs parsing at the callback
|
|
# level...
|
|
# reqid = item.get('reqid', 0)
|
|
# if getattr(msg, 'reqid', 0) < -1:
|
|
# log.info(f"TWS triggered trade\n{pformat(msg)}")
|
|
|
|
# msg.reqid = 'tws-' + str(-1 * reqid)
|
|
|
|
# mark msg as from "external system"
|
|
# TODO: probably something better then this.. and start
|
|
# considering multiplayer/group trades tracking
|
|
# msg.broker_details['external_src'] = 'tws'
|
|
|
|
case _:
|
|
log.error(f'WTF: {event_name}: {item}')
|
|
|
|
|
|
def norm_trade_records(
|
|
ledger: dict[str, Any],
|
|
|
|
) -> list[Transaction]:
|
|
'''
|
|
Normalize a flex report or API retrieved executions
|
|
ledger into our standard record format.
|
|
|
|
'''
|
|
records: list[Transaction] = []
|
|
for tid, record in ledger.items():
|
|
|
|
conid = record.get('conId') or record['conid']
|
|
comms = record.get('commission')
|
|
if comms is None:
|
|
comms = -1*record['ibCommission']
|
|
price = record.get('price') or record['tradePrice']
|
|
|
|
# the api doesn't do the -/+ on the quantity for you but flex
|
|
# records do.. are you fucking serious ib...!?
|
|
size = record.get('quantity') or record['shares'] * {
|
|
'BOT': 1,
|
|
'SLD': -1,
|
|
}[record['side']]
|
|
|
|
exch = record['exchange']
|
|
lexch = record.get('listingExchange')
|
|
|
|
suffix = lexch or exch
|
|
symbol = record['symbol']
|
|
|
|
# likely an opts contract record from a flex report..
|
|
# TODO: no idea how to parse ^ the strike part from flex..
|
|
# (00010000 any, or 00007500 tsla, ..)
|
|
# we probably must do the contract lookup for this?
|
|
if ' ' in symbol or '--' in exch:
|
|
underlying, _, tail = symbol.partition(' ')
|
|
suffix = exch = 'opt'
|
|
expiry = tail[:6]
|
|
# otype = tail[6]
|
|
# strike = tail[7:]
|
|
|
|
print(f'skipping opts contract {symbol}')
|
|
continue
|
|
|
|
# timestamping is way different in API records
|
|
date = record.get('date')
|
|
if not date:
|
|
# probably a flex record with a wonky non-std timestamp..
|
|
date, ts = record['dateTime'].split(';')
|
|
dt = pendulum.parse(date)
|
|
ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}'
|
|
tsdt = pendulum.parse(ts)
|
|
dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second)
|
|
|
|
else:
|
|
# epoch_dt = pendulum.from_timestamp(record.get('time'))
|
|
dt = pendulum.parse(date)
|
|
|
|
# special handling of symbol extraction from
|
|
# flex records using some ad-hoc schema parsing.
|
|
instr = record.get('assetCategory')
|
|
if instr == 'FUT':
|
|
symbol = record['description'][:3]
|
|
|
|
# try to build out piker fqsn from record.
|
|
expiry = record.get(
|
|
'lastTradeDateOrContractMonth') or record.get('expiry')
|
|
if expiry:
|
|
expiry = str(expiry).strip(' ')
|
|
suffix = f'{exch}.{expiry}'
|
|
expiry = pendulum.parse(expiry)
|
|
|
|
fqsn = Symbol.from_fqsn(
|
|
fqsn=f'{symbol}.{suffix}.ib',
|
|
info={},
|
|
).front_fqsn().rstrip('.ib')
|
|
|
|
# NOTE: for flex records the normal fields for defining an fqsn
|
|
# sometimes won't be available so we rely on two approaches for
|
|
# the "reverse lookup" of piker style fqsn keys:
|
|
# - when dealing with API trade records received from
|
|
# `IB.trades()` we do a contract lookup at he time of processing
|
|
# - when dealing with flex records, it is assumed the record
|
|
# is at least a day old and thus the TWS position reporting system
|
|
# should already have entries if the pps are still open, in
|
|
# which case, we can pull the fqsn from that table (see
|
|
# `trades_dialogue()` above).
|
|
insort(
|
|
records,
|
|
Transaction(
|
|
fqsn=fqsn,
|
|
tid=tid,
|
|
size=size,
|
|
price=price,
|
|
cost=comms,
|
|
dt=dt,
|
|
expiry=expiry,
|
|
bsuid=conid,
|
|
),
|
|
key=lambda t: t.dt
|
|
)
|
|
|
|
return {r.tid: r for r in records}
|
|
|
|
|
|
def trades_to_ledger_entries(
|
|
accounts: bidict,
|
|
trade_entries: list[object],
|
|
source_type: str = 'api',
|
|
|
|
) -> dict:
|
|
'''
|
|
Convert either of API execution objects or flex report
|
|
entry objects into ``dict`` form, pretty much straight up
|
|
without modification.
|
|
|
|
'''
|
|
trades_by_account = {}
|
|
|
|
for t in trade_entries:
|
|
if source_type == 'flex':
|
|
entry = t.__dict__
|
|
|
|
# XXX: LOL apparently ``toml`` has a bug
|
|
# where a section key error will show up in the write
|
|
# if you leave a table key as an `int`? So i guess
|
|
# cast to strs for all keys..
|
|
|
|
# oddly for some so-called "BookTrade" entries
|
|
# this field seems to be blank, no cuckin clue.
|
|
# trade['ibExecID']
|
|
tid = str(entry.get('ibExecID') or entry['tradeID'])
|
|
# date = str(entry['tradeDate'])
|
|
|
|
# XXX: is it going to cause problems if a account name
|
|
# get's lost? The user should be able to find it based
|
|
# on the actual exec history right?
|
|
acctid = accounts[str(entry['accountId'])]
|
|
|
|
elif source_type == 'api':
|
|
# NOTE: example of schema we pull from the API client.
|
|
# {
|
|
# 'commissionReport': CommissionReport(...
|
|
# 'contract': {...
|
|
# 'execution': Execution(...
|
|
# 'time': 1654801166.0
|
|
# }
|
|
|
|
# flatten all sub-dicts and values into one top level entry.
|
|
entry = {}
|
|
for section, val in t.items():
|
|
match section:
|
|
case 'contract' | 'execution' | 'commissionReport':
|
|
# sub-dict cases
|
|
entry.update(val)
|
|
|
|
case 'time':
|
|
# ib has wack ns timestamps, or is that us?
|
|
continue
|
|
|
|
case _:
|
|
entry[section] = val
|
|
|
|
tid = str(entry['execId'])
|
|
dt = pendulum.from_timestamp(entry['time'])
|
|
# TODO: why isn't this showing seconds in the str?
|
|
entry['date'] = str(dt)
|
|
acctid = accounts[entry['acctNumber']]
|
|
|
|
if not tid:
|
|
# this is likely some kind of internal adjustment
|
|
# transaction, likely one of the following:
|
|
# - an expiry event that will show a "book trade" indicating
|
|
# some adjustment to cash balances: zeroing or itm settle.
|
|
# - a manual cash balance position adjustment likely done by
|
|
# the user from the accounts window in TWS where they can
|
|
# manually set the avg price and size:
|
|
# https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST
|
|
log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}')
|
|
continue
|
|
|
|
trades_by_account.setdefault(
|
|
acctid, {}
|
|
)[tid] = entry
|
|
|
|
return trades_by_account
|
|
|
|
|
|
def load_flex_trades(
|
|
path: Optional[str] = None,
|
|
|
|
) -> dict[str, Any]:
|
|
|
|
from ib_insync import flexreport, util
|
|
|
|
conf = get_config()
|
|
|
|
if not path:
|
|
# load ``brokers.toml`` and try to get the flex
|
|
# token and query id that must be previously defined
|
|
# by the user.
|
|
token = conf.get('flex_token')
|
|
if not token:
|
|
raise ValueError(
|
|
'You must specify a ``flex_token`` field in your'
|
|
'`brokers.toml` in order load your trade log, see our'
|
|
'intructions for how to set this up here:\n'
|
|
'PUT LINK HERE!'
|
|
)
|
|
|
|
qid = conf['flex_trades_query_id']
|
|
|
|
# TODO: hack this into our logging
|
|
# system like we do with the API client..
|
|
util.logToConsole()
|
|
|
|
# TODO: rewrite the query part of this with async..httpx?
|
|
report = flexreport.FlexReport(
|
|
token=token,
|
|
queryId=qid,
|
|
)
|
|
|
|
else:
|
|
# XXX: another project we could potentially look at,
|
|
# https://pypi.org/project/ibflex/
|
|
report = flexreport.FlexReport(path=path)
|
|
|
|
trade_entries = report.extract('Trade')
|
|
ln = len(trade_entries)
|
|
log.info(f'Loaded {ln} trades from flex query')
|
|
|
|
trades_by_account = trades_to_ledger_entries(
|
|
# get reverse map to user account names
|
|
conf['accounts'].inverse,
|
|
trade_entries,
|
|
source_type='flex',
|
|
)
|
|
|
|
for acctid in trades_by_account:
|
|
trades_by_id = trades_by_account[acctid]
|
|
with open_trade_ledger('ib', acctid) as ledger_dict:
|
|
tid_delta = set(trades_by_id) - set(ledger_dict)
|
|
log.info(
|
|
'New trades detected\n'
|
|
f'{pformat(tid_delta)}'
|
|
)
|
|
if tid_delta:
|
|
ledger_dict.update(
|
|
{tid: trades_by_id[tid] for tid in tid_delta}
|
|
)
|
|
|
|
return ledger_dict
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
import os
|
|
|
|
args = sys.argv
|
|
if len(args) > 1:
|
|
args = args[1:]
|
|
for arg in args:
|
|
path = os.path.abspath(arg)
|
|
load_flex_trades(path=path)
|
|
else:
|
|
# expect brokers.toml to have an entry and
|
|
# pull from the web service.
|
|
load_flex_trades()
|