Port ib orders to new msgs and bidir streaming api

ems_to_bidir_streaming
Tyler Goodlet 2021-06-08 14:19:55 -04:00
parent 6e58f31fd8
commit db92683ede
1 changed files with 193 additions and 77 deletions

View File

@ -25,7 +25,7 @@ from contextlib import asynccontextmanager
from dataclasses import asdict from dataclasses import asdict
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable from typing import List, Dict, Any, Tuple, Optional, AsyncIterator
import asyncio import asyncio
from pprint import pformat from pprint import pformat
import inspect import inspect
@ -39,7 +39,8 @@ import tractor
from async_generator import aclosing from async_generator import aclosing
from ib_insync.wrapper import RequestError from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails, Option from ib_insync.contract import Contract, ContractDetails, Option
from ib_insync.order import Order from ib_insync.order import Order, Trade, OrderStatus
from ib_insync.objects import Fill, Execution
from ib_insync.ticker import Ticker from ib_insync.ticker import Ticker
from ib_insync.objects import Position from ib_insync.objects import Position
import ib_insync as ibis import ib_insync as ibis
@ -53,6 +54,12 @@ from .._daemon import maybe_spawn_brokerd
from ..data._source import from_df from ..data._source import from_df
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ._util import SymbolNotFound, NoData from ._util import SymbolNotFound, NoData
from ..clearing._messages import (
BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdPosition, BrokerdCancel,
BrokerdFill,
# BrokerdError,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -472,7 +479,7 @@ class Client:
# XXX: by default 0 tells ``ib_insync`` methods that there is no # XXX: by default 0 tells ``ib_insync`` methods that there is no
# existing order so ask the client to create a new one (which it # existing order so ask the client to create a new one (which it
# seems to do by allocating an int counter - collision prone..) # seems to do by allocating an int counter - collision prone..)
brid: int = None, reqid: int = None,
) -> int: ) -> int:
"""Place an order and return integer request id provided by client. """Place an order and return integer request id provided by client.
@ -488,7 +495,7 @@ class Client:
trade = self.ib.placeOrder( trade = self.ib.placeOrder(
contract, contract,
Order( Order(
orderId=brid or 0, # stupid api devs.. orderId=reqid or 0, # stupid api devs..
action=action.upper(), # BUY/SELL action=action.upper(), # BUY/SELL
orderType='LMT', orderType='LMT',
lmtPrice=price, lmtPrice=price,
@ -582,6 +589,7 @@ class Client:
self, self,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
# connect error msgs # connect error msgs
def push_err( def push_err(
reqId: int, reqId: int,
@ -589,13 +597,16 @@ class Client:
errorString: str, errorString: str,
contract: Contract, contract: Contract,
) -> None: ) -> None:
log.error(errorString) log.error(errorString)
try: try:
to_trio.send_nowait(( to_trio.send_nowait((
'error', 'error',
# error "object" # error "object"
{'reqid': reqId, {'reqid': reqId,
'message': errorString, 'reason': errorString,
'contract': contract} 'contract': contract}
)) ))
except trio.BrokenResourceError: except trio.BrokenResourceError:
@ -635,6 +646,8 @@ async def _aio_get_client(
"""Return an ``ib_insync.IB`` instance wrapped in our client API. """Return an ``ib_insync.IB`` instance wrapped in our client API.
Client instances are cached for later use. Client instances are cached for later use.
TODO: consider doing this with a ctx mngr eventually?
""" """
# first check cache for existing client # first check cache for existing client
@ -848,7 +861,7 @@ async def get_bars(
end_dt: str = "", end_dt: str = "",
) -> (dict, np.ndarray): ) -> (dict, np.ndarray):
_err = None _err: Optional[Exception] = None
fails = 0 fails = 0
for _ in range(2): for _ in range(2):
@ -885,12 +898,12 @@ async def get_bars(
raise NoData(f'Symbol: {sym}') raise NoData(f'Symbol: {sym}')
break break
else: else:
log.exception( log.exception(
"Data query rate reached: Press `ctrl-alt-f`" "Data query rate reached: Press `ctrl-alt-f`"
"in TWS" "in TWS"
) )
print(_err)
# TODO: should probably create some alert on screen # TODO: should probably create some alert on screen
# and then somehow get that to trigger an event here # and then somehow get that to trigger an event here
@ -937,7 +950,7 @@ async def backfill_bars(
if fails is None or fails > 1: if fails is None or fails > 1:
break break
if out is (None, None): if out == (None, None):
# could be trying to retreive bars over weekend # could be trying to retreive bars over weekend
# TODO: add logic here to handle tradable hours and only grab # TODO: add logic here to handle tradable hours and only grab
# valid bars in the range # valid bars in the range
@ -1188,114 +1201,217 @@ def pack_position(pos: Position) -> Dict[str, Any]:
else: else:
symbol = con.symbol symbol = con.symbol
return { return BrokerdPosition(
'broker': 'ib', broker='ib',
'account': pos.account, account=pos.account,
'symbol': symbol, symbol=symbol,
'currency': con.currency, currency=con.currency,
'size': float(pos.position), size=float(pos.position),
'avg_price': float(pos.avgCost) / float(con.multiplier or 1.0), avg_price=float(pos.avgCost) / float(con.multiplier or 1.0),
} )
@tractor.msg.pub( async def handle_order_requests(
send_on_connect={'local_trades': 'start'}
)
async def stream_trades(
ems_order_stream: tractor.MsgStream,
) -> None:
# request_msg: dict
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
action = request_msg['action']
if action in {'buy', 'sell'}:
# validate
order = BrokerdOrder(**request_msg)
# call our client api to submit the order
reqid = await _trio_run_client_method(
method='submit_limit',
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid=order.reqid,
)
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
time_ns=time.time_ns(),
).dict()
)
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
await _trio_run_client_method(
method='submit_cancel',
reqid=msg.reqid
)
else:
log.error(f'Unknown order command: {request_msg}')
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None, loglevel: str = None,
get_topics: Callable = None,
) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
stream = await _trio_run_client_method( ib_trade_events_stream = await _trio_run_client_method(
method='recv_trade_updates', method='recv_trade_updates',
) )
# deliver positions to subscriber before anything else # deliver positions to subscriber before anything else
positions = await _trio_run_client_method(method='positions') positions = await _trio_run_client_method(method='positions')
all_positions = {}
for pos in positions: for pos in positions:
yield {'local_trades': ('position', pack_position(pos))} msg = pack_position(pos)
all_positions[msg.symbol] = msg.dict()
await ctx.started(all_positions)
action_map = {'BOT': 'buy', 'SLD': 'sell'} action_map = {'BOT': 'buy', 'SLD': 'sell'}
async for event_name, item in stream: async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# start order request handler **before** local trades event loop
n.start_soon(handle_order_requests, ems_stream)
# XXX: begin normalization of nonsense ib_insync internal async for event_name, item in ib_trade_events_stream:
# object-state tracking representations...
if event_name == 'status': # XXX: begin normalization of nonsense ib_insync internal
# object-state tracking representations...
# unwrap needed data from ib_insync internal objects if event_name == 'status':
trade = item
status = trade.orderStatus
# skip duplicate filled updates - we get the deats # unwrap needed data from ib_insync internal types
# from the execution details event trade: Trade = item
msg = { status: OrderStatus = trade.orderStatus
'reqid': trade.order.orderId,
'status': status.status,
'filled': status.filled,
'reason': status.whyHeld,
# this seems to not be necessarily up to date in the # skip duplicate filled updates - we get the deats
# execDetails event.. so we have to send it here I guess? # from the execution details event
'remaining': status.remaining, msg = BrokerdStatus(
}
elif event_name == 'fill': reqid=trade.order.orderId,
time_ns=time.time_ns(), # cuz why not
status=status.status.lower(), # force lower case
trade, fill = item filled=status.filled,
execu = fill.execution reason=status.whyHeld,
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
remaining=status.remaining,
msg = { broker_details={'name': 'ib'},
'reqid': execu.orderId, )
'execid': execu.execId,
elif event_name == 'fill':
# for wtv reason this is a separate event type
# from IB, not sure why it's needed other then for extra
# complexity and over-engineering :eyeroll:.
# we may just end up dropping these events (or
# translating them to ``Status`` msgs) if we can
# show the equivalent status events are no more latent.
# unpack ib_insync types
# pep-0526 style:
# https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations
trade: Trade
fill: Fill
trade, fill = item
execu: Execution = fill.execution
# TODO: normalize out commissions details?
details = {
'contract': asdict(fill.contract),
'execution': asdict(fill.execution),
'commissions': asdict(fill.commissionReport),
}
# supposedly IB server fill time # supposedly IB server fill time
'broker_time': execu.time, # converted to float by us details['broker_time'] = execu.time
# ns from main TCP handler by us inside ``ib_insync`` override details['name'] = 'ib'
'time': fill.time,
'time_ns': time.time_ns(), # cuz why not
'action': action_map[execu.side],
'size': execu.shares,
'price': execu.price,
}
elif event_name == 'error': msg = BrokerdFill(
msg = item # should match the value returned from `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
# f$#$% gawd dammit insync.. action=action_map[execu.side],
con = msg['contract'] size=execu.shares,
if isinstance(con, Contract): price=execu.price,
msg['contract'] = asdict(con)
if msg['reqid'] == -1: broker_details=details,
log.error(pformat(msg)) # XXX: required by order mode currently
broker_time=details['execution']['time'],
# don't forward, it's pointless.. )
continue
elif event_name == 'position': elif event_name == 'error':
msg = pack_position(item)
if msg.get('reqid', 0) < -1: err: dict = item
# it's a trade event generated by TWS usage.
log.warning(f"TWS triggered trade:\n{pformat(msg)}")
msg['reqid'] = 'tws-' + str(-1 * msg['reqid']) # f$#$% gawd dammit insync..
con = err['contract']
if isinstance(con, Contract):
err['contract'] = asdict(con)
# mark msg as from "external system" if err['reqid'] == -1:
# TODO: probably something better then this.. log.error(f'TWS external order error:\n{pformat(err)}')
msg['external'] = True
yield {'remote_trades': (event_name, msg)} # don't forward for now, it's unecessary.. but if we wanted to,
continue # msg = BrokerdError(**err)
continue
yield {'local_trades': (event_name, msg)} elif event_name == 'position':
msg = pack_position(item)
# msg = BrokerdPosition(**item)
# if msg.get('reqid', 0) < -1:
if getattr(msg, 'reqid', 0) < -1:
# it's a trade event generated by TWS usage.
log.warning(f"TWS triggered trade:\n{pformat(msg)}")
msg['reqid'] = 'tws-' + str(-1 * msg['reqid'])
# mark msg as from "external system"
# TODO: probably something better then this.. and start
# considering multiplayer/group trades tracking
msg['external'] = True
continue
# XXX: we always serialize to a dict for msgpack
# translations, ideally we can move to an msgspec (or other)
# encoder # that can be enabled in ``tractor`` ahead of
# time so we can pass through the message types directly.
await ems_stream.send(msg.dict())
@tractor.context @tractor.context