From 85c2f6e79fa77d1faae05a9f9381b12998f150bd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Jun 2022 15:48:39 -0400 Subject: [PATCH] Factor trades endpoint into `.ib.broker.py` --- piker/brokers/ib/__init__.py | 3 +- piker/brokers/ib/broker.py | 593 +++++++++++++++++++++++++++++++++++ piker/brokers/ib/client.py | 543 +------------------------------- 3 files changed, 598 insertions(+), 541 deletions(-) create mode 100644 piker/brokers/ib/broker.py diff --git a/piker/brokers/ib/__init__.py b/piker/brokers/ib/__init__.py index 1d6bac33..2d9c198e 100644 --- a/piker/brokers/ib/__init__.py +++ b/piker/brokers/ib/__init__.py @@ -32,13 +32,13 @@ Sub-modules within break into the core functionalities: """ from .client import ( get_client, - trades_dialogue, ) from .feed import ( open_history_client, open_symbol_search, stream_quotes, ) +from .broker import trades_dialogue __all__ = [ 'get_client', @@ -53,6 +53,7 @@ __all__ = [ __enable_modules__: list[str] = [ 'client', 'feed', + 'broker', ] # passed to ``tractor.ActorNursery.start_actor()`` diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py new file mode 100644 index 00000000..6bd35b37 --- /dev/null +++ b/piker/brokers/ib/broker.py @@ -0,0 +1,593 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +""" +Order and trades endpoints for use with ``piker``'s EMS. + +""" +from __future__ import annotations +from dataclasses import asdict +from functools import partial +from pprint import pformat +import time +from typing import ( + Any, + Optional, + AsyncIterator, +) + +import trio +from trio_typing import TaskStatus +import tractor +from ib_insync.contract import ( + Contract, + Option, +) +from ib_insync.order import ( + Trade, + OrderStatus, +) +from ib_insync.objects import ( + Fill, + Execution, +) +from ib_insync.objects import Position + +from piker import config +from piker.log import get_console_log +from piker.clearing._messages import ( + BrokerdOrder, + BrokerdOrderAck, + BrokerdStatus, + BrokerdPosition, + BrokerdCancel, + BrokerdFill, + BrokerdError, +) +from .client import ( + _adhoc_futes_set, + log, + get_config, + open_client_proxies, + Client, +) + + +def pack_position( + pos: Position + +) -> dict[str, Any]: + con = pos.contract + + if isinstance(con, Option): + # TODO: option symbol parsing and sane display: + symbol = con.localSymbol.replace(' ', '') + + else: + # TODO: lookup fqsn even for derivs. + symbol = con.symbol.lower() + + exch = (con.primaryExchange or con.exchange).lower() + symkey = '.'.join((symbol, exch)) + if not exch: + # attempt to lookup the symbol from our + # hacked set.. + for sym in _adhoc_futes_set: + if symbol in sym: + symkey = sym + break + + expiry = con.lastTradeDateOrContractMonth + if expiry: + symkey += f'.{expiry}' + + # TODO: options contracts into a sane format.. + + return BrokerdPosition( + broker='ib', + account=pos.account, + symbol=symkey, + currency=con.currency, + size=float(pos.position), + avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + ) + + +async def handle_order_requests( + + ems_order_stream: tractor.MsgStream, + accounts_def: dict[str, str], + +) -> None: + + global _accounts2clients + + request_msg: dict + async for request_msg in ems_order_stream: + log.info(f'Received order request {request_msg}') + + action = request_msg['action'] + account = request_msg['account'] + + acct_number = accounts_def.get(account) + if not acct_number: + log.error( + f'An IB account number for name {account} is not found?\n' + 'Make sure you have all TWS and GW instances running.' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No account found: `{account}` ?', + ).dict()) + continue + + client = _accounts2clients.get(account) + if not client: + log.error( + f'An IB client for account name {account} is not found.\n' + 'Make sure you have all TWS and GW instances running.' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No api client loaded for account: `{account}` ?', + ).dict()) + continue + + if action in {'buy', 'sell'}: + # validate + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + reqid = client.submit_limit( + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + account=acct_number, + + # XXX: by default 0 tells ``ib_insync`` methods that + # there is no existing order so ask the client to create + # a new one (which it seems to do by allocating an int + # counter - collision prone..) + reqid=order.reqid, + ) + if reqid is None: + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason='Order already active?', + ).dict()) + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + # ems order request id + oid=order.oid, + # broker specific request id + reqid=reqid, + time_ns=time.time_ns(), + account=account, + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + client.submit_cancel(reqid=msg.reqid) + + else: + log.error(f'Unknown order command: {request_msg}') + + +async def recv_trade_updates( + + client: Client, + to_trio: trio.abc.SendChannel, + +) -> None: + """Stream a ticker using the std L1 api. + """ + client.inline_errors(to_trio) + + # sync with trio task + to_trio.send_nowait(None) + + def push_tradesies(eventkit_obj, obj, fill=None): + """Push events to trio task. + + """ + if fill is not None: + # execution details event + item = ('fill', (obj, fill)) + + elif eventkit_obj.name() == 'positionEvent': + item = ('position', obj) + + else: + item = ('status', obj) + + log.info(f'eventkit event ->\n{pformat(item)}') + + try: + to_trio.send_nowait(item) + except trio.BrokenResourceError: + log.exception(f'Disconnected from {eventkit_obj} updates') + eventkit_obj.disconnect(push_tradesies) + + # hook up to the weird eventkit object - event stream api + for ev_name in [ + 'orderStatusEvent', # all order updates + 'execDetailsEvent', # all "fill" updates + 'positionEvent', # avg price updates per symbol per account + + # 'commissionReportEvent', + # XXX: ugh, it is a separate event from IB and it's + # emitted as follows: + # self.ib.commissionReportEvent.emit(trade, fill, report) + + # XXX: not sure yet if we need these + # 'updatePortfolioEvent', + + # XXX: these all seem to be weird ib_insync intrernal + # events that we probably don't care that much about + # given the internal design is wonky af.. + # 'newOrderEvent', + # 'orderModifyEvent', + # 'cancelOrderEvent', + # 'openOrderEvent', + ]: + eventkit_obj = getattr(client.ib, ev_name) + handler = partial(push_tradesies, eventkit_obj) + eventkit_obj.connect(handler) + + # let the engine run and stream + await client.ib.disconnectedEvent + + +@tractor.context +async def trades_dialogue( + + ctx: tractor.Context, + loglevel: str = None, + +) -> AsyncIterator[dict[str, Any]]: + + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + accounts_def = config.load_accounts(['ib']) + + global _accounts2clients + global _client_cache + + # deliver positions to subscriber before anything else + all_positions = [] + accounts = set() + clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] + + async with ( + trio.open_nursery() as nurse, + open_client_proxies() as (proxies, aioclients), + ): + # for account, client in _accounts2clients.items(): + for account, proxy in proxies.items(): + + client = aioclients[account] + + async def open_stream( + task_status: TaskStatus[ + trio.abc.ReceiveChannel + ] = trio.TASK_STATUS_IGNORED, + ): + # each api client has a unique event stream + async with tractor.to_asyncio.open_channel_from( + recv_trade_updates, + client=client, + ) as (first, trade_event_stream): + + task_status.started(trade_event_stream) + await trio.sleep_forever() + + trade_event_stream = await nurse.start(open_stream) + + clients.append((client, trade_event_stream)) + + assert account in accounts_def + accounts.add(account) + + for client in aioclients.values(): + for pos in client.positions(): + + msg = pack_position(pos) + msg.account = accounts_def.inverse[msg.account] + + assert msg.account in accounts, ( + f'Position for unknown account: {msg.account}') + + all_positions.append(msg.dict()) + + trades: list[dict] = [] + for proxy in proxies.values(): + trades.append(await proxy.trades()) + + log.info(f'Loaded {len(trades)} from this session') + # TODO: write trades to local ``trades.toml`` + # - use above per-session trades data and write to local file + # - get the "flex reports" working and pull historical data and + # also save locally. + + await ctx.started(( + all_positions, + tuple(name for name in accounts_def if name in accounts), + )) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + # start order request handler **before** local trades event loop + n.start_soon(handle_order_requests, ems_stream, accounts_def) + + # allocate event relay tasks for each client connection + for client, stream in clients: + n.start_soon( + deliver_trade_events, + stream, + ems_stream, + accounts_def + ) + + # block until cancelled + await trio.sleep_forever() + + +async def deliver_trade_events( + + trade_event_stream: trio.MemoryReceiveChannel, + ems_stream: tractor.MsgStream, + accounts_def: dict[str, str], + +) -> None: + '''Format and relay all trade events for a given client to the EMS. + + ''' + action_map = {'BOT': 'buy', 'SLD': 'sell'} + + # TODO: for some reason we can receive a ``None`` here when the + # ib-gw goes down? Not sure exactly how that's happening looking + # at the eventkit code above but we should probably handle it... + async for event_name, item in trade_event_stream: + + log.info(f'ib sending {event_name}:\n{pformat(item)}') + + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) + + # XXX: here's some other sucky cases from the api + # - short-sale but securities haven't been located, in this + # case we should probably keep the order in some kind of + # weird state or cancel it outright? + + # status='PendingSubmit', message=''), + # status='Cancelled', message='Error 404, + # reqId 1550: Order held while securities are located.'), + # status='PreSubmitted', message='')], + + if event_name == 'status': + + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... + + # unwrap needed data from ib_insync internal types + trade: Trade = item + status: OrderStatus = trade.orderStatus + + # skip duplicate filled updates - we get the deats + # from the execution details event + msg = BrokerdStatus( + + reqid=trade.order.orderId, + time_ns=time.time_ns(), # cuz why not + account=accounts_def.inverse[trade.order.account], + + # everyone doin camel case.. + status=status.status.lower(), # force lower case + + filled=status.filled, + reason=status.whyHeld, + + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + remaining=status.remaining, + + broker_details={'name': 'ib'}, + ) + + elif event_name == 'fill': + + # for wtv reason this is a separate event type + # from IB, not sure why it's needed other then for extra + # complexity and over-engineering :eyeroll:. + # we may just end up dropping these events (or + # translating them to ``Status`` msgs) if we can + # show the equivalent status events are no more latent. + + # unpack ib_insync types + # pep-0526 style: + # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations + trade: Trade + fill: Fill + trade, fill = item + execu: Execution = fill.execution + + # TODO: normalize out commissions details? + details = { + 'contract': asdict(fill.contract), + 'execution': asdict(fill.execution), + 'commissions': asdict(fill.commissionReport), + 'broker_time': execu.time, # supposedly server fill time + 'name': 'ib', + } + + msg = BrokerdFill( + # should match the value returned from `.submit_limit()` + reqid=execu.orderId, + time_ns=time.time_ns(), # cuz why not + + action=action_map[execu.side], + size=execu.shares, + price=execu.price, + + broker_details=details, + # XXX: required by order mode currently + broker_time=details['broker_time'], + + ) + + elif event_name == 'error': + + err: dict = item + + # f$#$% gawd dammit insync.. + con = err['contract'] + if isinstance(con, Contract): + err['contract'] = asdict(con) + + if err['reqid'] == -1: + log.error(f'TWS external order error:\n{pformat(err)}') + + # TODO: what schema for this msg if we're going to make it + # portable across all backends? + # msg = BrokerdError(**err) + continue + + elif event_name == 'position': + msg = pack_position(item) + msg.account = accounts_def.inverse[msg.account] + + elif event_name == 'event': + + # it's either a general system status event or an external + # trade event? + log.info(f"TWS system status: \n{pformat(item)}") + + # TODO: support this again but needs parsing at the callback + # level... + # reqid = item.get('reqid', 0) + # if getattr(msg, 'reqid', 0) < -1: + # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + + continue + + # msg.reqid = 'tws-' + str(-1 * reqid) + + # mark msg as from "external system" + # TODO: probably something better then this.. and start + # considering multiplayer/group trades tracking + # msg.broker_details['external_src'] = 'tws' + + # XXX: we always serialize to a dict for msgpack + # translations, ideally we can move to an msgspec (or other) + # encoder # that can be enabled in ``tractor`` ahead of + # time so we can pass through the message types directly. + await ems_stream.send(msg.dict()) + + +def load_flex_trades( + path: Optional[str] = None, + +) -> dict[str, str]: + + from pprint import pprint + from ib_insync import flexreport, util + + conf = get_config() + + if not path: + # load ``brokers.toml`` and try to get the flex + # token and query id that must be previously defined + # by the user. + token = conf.get('flex_token') + if not token: + raise ValueError( + 'You must specify a ``flex_token`` field in your' + '`brokers.toml` in order load your trade log, see our' + 'intructions for how to set this up here:\n' + 'PUT LINK HERE!' + ) + + qid = conf['flex_trades_query_id'] + + # TODO: hack this into our logging + # system like we do with the API client.. + util.logToConsole() + + # TODO: rewrite the query part of this with async..httpx? + report = flexreport.FlexReport( + token=token, + queryId=qid, + ) + + else: + # XXX: another project we could potentially look at, + # https://pypi.org/project/ibflex/ + report = flexreport.FlexReport(path=path) + + trade_entries = report.extract('Trade') + trades = { + # XXX: LOL apparently ``toml`` has a bug + # where a section key error will show up in the write + # if you leave this as an ``int``? + str(t.__dict__['tradeID']): t.__dict__ + for t in trade_entries + } + + ln = len(trades) + log.info(f'Loaded {ln} trades from flex query') + + trades_by_account = {} + for tid, trade in trades.items(): + trades_by_account.setdefault( + # oddly for some so-called "BookTrade" entries + # this field seems to be blank, no cuckin clue. + # trade['ibExecID'] + str(trade['accountId']), {} + )[tid] = trade + + section = {'ib': trades_by_account} + pprint(section) + + # TODO: load the config first and append in + # the new trades loaded here.. + try: + config.write(section, 'trades') + except KeyError: + import pdbpp; pdbpp.set_trace() # noqa + + +if __name__ == '__main__': + load_flex_trades() diff --git a/piker/brokers/ib/client.py b/piker/brokers/ib/client.py index 43626c04..5405ab27 100644 --- a/piker/brokers/ib/client.py +++ b/piker/brokers/ib/client.py @@ -29,8 +29,6 @@ import itertools from math import isnan from typing import ( Any, - Optional, - AsyncIterator, Union, ) import asyncio @@ -41,13 +39,11 @@ from types import SimpleNamespace import trio -from trio_typing import TaskStatus import tractor from tractor import to_asyncio from ib_insync.wrapper import RequestError -from ib_insync.contract import Contract, ContractDetails, Option -from ib_insync.order import Order, Trade, OrderStatus -from ib_insync.objects import Fill, Execution +from ib_insync.contract import Contract, ContractDetails +from ib_insync.order import Order from ib_insync.ticker import Ticker from ib_insync.objects import Position import ib_insync as ibis @@ -55,15 +51,9 @@ from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client import numpy as np - from piker import config -from piker.log import get_logger, get_console_log +from piker.log import get_logger from piker.data._source import base_ohlc_dtype -from piker.clearing._messages import ( - BrokerdOrder, BrokerdOrderAck, BrokerdStatus, - BrokerdPosition, BrokerdCancel, - BrokerdFill, BrokerdError, -) log = get_logger(__name__) @@ -809,70 +799,6 @@ class Client: return self.ib.positions(account=account) -async def recv_trade_updates( - - client: Client, - to_trio: trio.abc.SendChannel, - -) -> None: - """Stream a ticker using the std L1 api. - """ - client.inline_errors(to_trio) - - # sync with trio task - to_trio.send_nowait(None) - - def push_tradesies(eventkit_obj, obj, fill=None): - """Push events to trio task. - - """ - if fill is not None: - # execution details event - item = ('fill', (obj, fill)) - - elif eventkit_obj.name() == 'positionEvent': - item = ('position', obj) - - else: - item = ('status', obj) - - log.info(f'eventkit event ->\n{pformat(item)}') - - try: - to_trio.send_nowait(item) - except trio.BrokenResourceError: - log.exception(f'Disconnected from {eventkit_obj} updates') - eventkit_obj.disconnect(push_tradesies) - - # hook up to the weird eventkit object - event stream api - for ev_name in [ - 'orderStatusEvent', # all order updates - 'execDetailsEvent', # all "fill" updates - 'positionEvent', # avg price updates per symbol per account - - # 'commissionReportEvent', - # XXX: ugh, it is a separate event from IB and it's - # emitted as follows: - # self.ib.commissionReportEvent.emit(trade, fill, report) - - # XXX: not sure yet if we need these - # 'updatePortfolioEvent', - - # XXX: these all seem to be weird ib_insync intrernal - # events that we probably don't care that much about - # given the internal design is wonky af.. - # 'newOrderEvent', - # 'orderModifyEvent', - # 'cancelOrderEvent', - # 'openOrderEvent', - ]: - eventkit_obj = getattr(client.ib, ev_name) - handler = partial(push_tradesies, eventkit_obj) - eventkit_obj.connect(handler) - - # let the engine run and stream - await client.ib.disconnectedEvent - # per-actor API ep caching _client_cache: dict[tuple[str, int], Client] = {} _scan_ignore: set[tuple[str, int]] = set() @@ -1336,466 +1262,3 @@ async def get_client( # actor isn't in aio mode. async with open_data_client() as proxy: yield proxy - - -def pack_position( - pos: Position - -) -> dict[str, Any]: - con = pos.contract - - if isinstance(con, Option): - # TODO: option symbol parsing and sane display: - symbol = con.localSymbol.replace(' ', '') - - else: - # TODO: lookup fqsn even for derivs. - symbol = con.symbol.lower() - - exch = (con.primaryExchange or con.exchange).lower() - symkey = '.'.join((symbol, exch)) - if not exch: - # attempt to lookup the symbol from our - # hacked set.. - for sym in _adhoc_futes_set: - if symbol in sym: - symkey = sym - break - - expiry = con.lastTradeDateOrContractMonth - if expiry: - symkey += f'.{expiry}' - - # TODO: options contracts into a sane format.. - - return BrokerdPosition( - broker='ib', - account=pos.account, - symbol=symkey, - currency=con.currency, - size=float(pos.position), - avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), - ) - - -async def handle_order_requests( - - ems_order_stream: tractor.MsgStream, - accounts_def: dict[str, str], - -) -> None: - - global _accounts2clients - - request_msg: dict - async for request_msg in ems_order_stream: - log.info(f'Received order request {request_msg}') - - action = request_msg['action'] - account = request_msg['account'] - - acct_number = accounts_def.get(account) - if not acct_number: - log.error( - f'An IB account number for name {account} is not found?\n' - 'Make sure you have all TWS and GW instances running.' - ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=f'No account found: `{account}` ?', - ).dict()) - continue - - client = _accounts2clients.get(account) - if not client: - log.error( - f'An IB client for account name {account} is not found.\n' - 'Make sure you have all TWS and GW instances running.' - ) - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason=f'No api client loaded for account: `{account}` ?', - ).dict()) - continue - - if action in {'buy', 'sell'}: - # validate - order = BrokerdOrder(**request_msg) - - # call our client api to submit the order - reqid = client.submit_limit( - oid=order.oid, - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - account=acct_number, - - # XXX: by default 0 tells ``ib_insync`` methods that - # there is no existing order so ask the client to create - # a new one (which it seems to do by allocating an int - # counter - collision prone..) - reqid=order.reqid, - ) - if reqid is None: - await ems_order_stream.send(BrokerdError( - oid=request_msg['oid'], - symbol=request_msg['symbol'], - reason='Order already active?', - ).dict()) - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - # ems order request id - oid=order.oid, - # broker specific request id - reqid=reqid, - time_ns=time.time_ns(), - account=account, - ).dict() - ) - - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) - client.submit_cancel(reqid=msg.reqid) - - else: - log.error(f'Unknown order command: {request_msg}') - - -@tractor.context -async def trades_dialogue( - - ctx: tractor.Context, - loglevel: str = None, - -) -> AsyncIterator[dict[str, Any]]: - - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - accounts_def = config.load_accounts(['ib']) - - global _accounts2clients - global _client_cache - - # deliver positions to subscriber before anything else - all_positions = [] - accounts = set() - clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] - - async with ( - trio.open_nursery() as nurse, - open_client_proxies() as (proxies, aioclients), - ): - # for account, client in _accounts2clients.items(): - for account, proxy in proxies.items(): - - client = aioclients[account] - - async def open_stream( - task_status: TaskStatus[ - trio.abc.ReceiveChannel - ] = trio.TASK_STATUS_IGNORED, - ): - # each api client has a unique event stream - async with tractor.to_asyncio.open_channel_from( - recv_trade_updates, - client=client, - ) as (first, trade_event_stream): - - task_status.started(trade_event_stream) - await trio.sleep_forever() - - trade_event_stream = await nurse.start(open_stream) - - clients.append((client, trade_event_stream)) - - assert account in accounts_def - accounts.add(account) - - for client in aioclients.values(): - for pos in client.positions(): - - msg = pack_position(pos) - msg.account = accounts_def.inverse[msg.account] - - assert msg.account in accounts, ( - f'Position for unknown account: {msg.account}') - - all_positions.append(msg.dict()) - - trades: list[dict] = [] - for proxy in proxies.values(): - trades.append(await proxy.trades()) - - log.info(f'Loaded {len(trades)} from this session') - # TODO: write trades to local ``trades.toml`` - # - use above per-session trades data and write to local file - # - get the "flex reports" working and pull historical data and - # also save locally. - - await ctx.started(( - all_positions, - tuple(name for name in accounts_def if name in accounts), - )) - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - # start order request handler **before** local trades event loop - n.start_soon(handle_order_requests, ems_stream, accounts_def) - - # allocate event relay tasks for each client connection - for client, stream in clients: - n.start_soon( - deliver_trade_events, - stream, - ems_stream, - accounts_def - ) - - # block until cancelled - await trio.sleep_forever() - - -async def deliver_trade_events( - - trade_event_stream: trio.MemoryReceiveChannel, - ems_stream: tractor.MsgStream, - accounts_def: dict[str, str], - -) -> None: - '''Format and relay all trade events for a given client to the EMS. - - ''' - action_map = {'BOT': 'buy', 'SLD': 'sell'} - - # TODO: for some reason we can receive a ``None`` here when the - # ib-gw goes down? Not sure exactly how that's happening looking - # at the eventkit code above but we should probably handle it... - async for event_name, item in trade_event_stream: - - log.info(f'ib sending {event_name}:\n{pformat(item)}') - - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # XXX: here's some other sucky cases from the api - # - short-sale but securities haven't been located, in this - # case we should probably keep the order in some kind of - # weird state or cancel it outright? - - # status='PendingSubmit', message=''), - # status='Cancelled', message='Error 404, - # reqId 1550: Order held while securities are located.'), - # status='PreSubmitted', message='')], - - if event_name == 'status': - - # XXX: begin normalization of nonsense ib_insync internal - # object-state tracking representations... - - # unwrap needed data from ib_insync internal types - trade: Trade = item - status: OrderStatus = trade.orderStatus - - # skip duplicate filled updates - we get the deats - # from the execution details event - msg = BrokerdStatus( - - reqid=trade.order.orderId, - time_ns=time.time_ns(), # cuz why not - account=accounts_def.inverse[trade.order.account], - - # everyone doin camel case.. - status=status.status.lower(), # force lower case - - filled=status.filled, - reason=status.whyHeld, - - # this seems to not be necessarily up to date in the - # execDetails event.. so we have to send it here I guess? - remaining=status.remaining, - - broker_details={'name': 'ib'}, - ) - - elif event_name == 'fill': - - # for wtv reason this is a separate event type - # from IB, not sure why it's needed other then for extra - # complexity and over-engineering :eyeroll:. - # we may just end up dropping these events (or - # translating them to ``Status`` msgs) if we can - # show the equivalent status events are no more latent. - - # unpack ib_insync types - # pep-0526 style: - # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations - trade: Trade - fill: Fill - trade, fill = item - execu: Execution = fill.execution - - # TODO: normalize out commissions details? - details = { - 'contract': asdict(fill.contract), - 'execution': asdict(fill.execution), - 'commissions': asdict(fill.commissionReport), - 'broker_time': execu.time, # supposedly server fill time - 'name': 'ib', - } - - msg = BrokerdFill( - # should match the value returned from `.submit_limit()` - reqid=execu.orderId, - time_ns=time.time_ns(), # cuz why not - - action=action_map[execu.side], - size=execu.shares, - price=execu.price, - - broker_details=details, - # XXX: required by order mode currently - broker_time=details['broker_time'], - - ) - - elif event_name == 'error': - - err: dict = item - - # f$#$% gawd dammit insync.. - con = err['contract'] - if isinstance(con, Contract): - err['contract'] = asdict(con) - - if err['reqid'] == -1: - log.error(f'TWS external order error:\n{pformat(err)}') - - # TODO: what schema for this msg if we're going to make it - # portable across all backends? - # msg = BrokerdError(**err) - continue - - elif event_name == 'position': - msg = pack_position(item) - msg.account = accounts_def.inverse[msg.account] - - elif event_name == 'event': - - # it's either a general system status event or an external - # trade event? - log.info(f"TWS system status: \n{pformat(item)}") - - # TODO: support this again but needs parsing at the callback - # level... - # reqid = item.get('reqid', 0) - # if getattr(msg, 'reqid', 0) < -1: - # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") - - continue - - # msg.reqid = 'tws-' + str(-1 * reqid) - - # mark msg as from "external system" - # TODO: probably something better then this.. and start - # considering multiplayer/group trades tracking - # msg.broker_details['external_src'] = 'tws' - - # XXX: we always serialize to a dict for msgpack - # translations, ideally we can move to an msgspec (or other) - # encoder # that can be enabled in ``tractor`` ahead of - # time so we can pass through the message types directly. - await ems_stream.send(msg.dict()) - - -def load_flex_trades( - path: Optional[str] = None, - -) -> dict[str, str]: - - from pprint import pprint - from ib_insync import flexreport, util - - conf = get_config() - - if not path: - # load ``brokers.toml`` and try to get the flex - # token and query id that must be previously defined - # by the user. - token = conf.get('flex_token') - if not token: - raise ValueError( - 'You must specify a ``flex_token`` field in your' - '`brokers.toml` in order load your trade log, see our' - 'intructions for how to set this up here:\n' - 'PUT LINK HERE!' - ) - - qid = conf['flex_trades_query_id'] - - # TODO: hack this into our logging - # system like we do with the API client.. - util.logToConsole() - - # TODO: rewrite the query part of this with async..httpx? - report = flexreport.FlexReport( - token=token, - queryId=qid, - ) - - else: - # XXX: another project we could potentially look at, - # https://pypi.org/project/ibflex/ - report = flexreport.FlexReport(path=path) - - trade_entries = report.extract('Trade') - trades = { - # XXX: LOL apparently ``toml`` has a bug - # where a section key error will show up in the write - # if you leave this as an ``int``? - str(t.__dict__['tradeID']): t.__dict__ - for t in trade_entries - } - - ln = len(trades) - log.info(f'Loaded {ln} trades from flex query') - - trades_by_account = {} - for tid, trade in trades.items(): - trades_by_account.setdefault( - # oddly for some so-called "BookTrade" entries - # this field seems to be blank, no cuckin clue. - # trade['ibExecID'] - str(trade['accountId']), {} - )[tid] = trade - - section = {'ib': trades_by_account} - pprint(section) - - # TODO: load the config first and append in - # the new trades loaded here.. - try: - config.write(section, 'trades') - except KeyError: - import pdbpp; pdbpp.set_trace() # noqa - - -if __name__ == '__main__': - load_flex_trades()