Add position event relay to ib broker backend

basic_orders
Tyler Goodlet 2021-03-11 21:38:31 -05:00
parent 98bfee028a
commit 6265ae8057
1 changed files with 48 additions and 12 deletions

View File

@ -40,6 +40,7 @@ from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails from ib_insync.contract import Contract, ContractDetails
from ib_insync.order import Order from ib_insync.order import Order
from ib_insync.ticker import Ticker from ib_insync.ticker import Ticker
from ib_insync.objects import Position
import ib_insync as ibis import ib_insync as ibis
from ib_insync.wrapper import Wrapper from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client from ib_insync.client import Client as ib_Client
@ -449,9 +450,8 @@ class Client:
size: int, size: int,
# XXX: by default 0 tells ``ib_insync`` methods that there is no # XXX: by default 0 tells ``ib_insync`` methods that there is no
# existing order so ask the client to create a new one (which # existing order so ask the client to create a new one (which it
# it seems to do by allocating an int counter - collision # seems to do by allocating an int counter - collision prone..)
# prone..)
brid: int = None, brid: int = None,
) -> int: ) -> int:
"""Place an order and return integer request id provided by client. """Place an order and return integer request id provided by client.
@ -507,17 +507,21 @@ class Client:
""" """
self.inline_errors(to_trio) self.inline_errors(to_trio)
def push_tradesies(eventkit_obj, trade, fill=None): def push_tradesies(eventkit_obj, obj, fill=None):
"""Push events to trio task. """Push events to trio task.
""" """
if fill is not None: if fill is not None:
# execution details event # execution details event
item = ('fill', (trade, fill)) item = ('fill', (obj, fill))
else:
item = ('status', trade)
log.info(f'{eventkit_obj}: {item}') elif eventkit_obj.name() == 'positionEvent':
item = ('position', obj)
else:
item = ('status', obj)
log.info(f'eventkit event -> {eventkit_obj}: {item}')
try: try:
to_trio.send_nowait(item) to_trio.send_nowait(item)
@ -529,6 +533,7 @@ class Client:
for ev_name in [ for ev_name in [
'orderStatusEvent', # all order updates 'orderStatusEvent', # all order updates
'execDetailsEvent', # all "fill" updates 'execDetailsEvent', # all "fill" updates
'positionEvent', # avg price updates per symbol per account
# 'commissionReportEvent', # 'commissionReportEvent',
# XXX: ugh, it is a separate event from IB and it's # XXX: ugh, it is a separate event from IB and it's
@ -537,7 +542,6 @@ class Client:
# XXX: not sure yet if we need these # XXX: not sure yet if we need these
# 'updatePortfolioEvent', # 'updatePortfolioEvent',
# 'positionEvent',
# XXX: these all seem to be weird ib_insync intrernal # XXX: these all seem to be weird ib_insync intrernal
# events that we probably don't care that much about # events that we probably don't care that much about
@ -584,6 +588,15 @@ class Client:
self.ib.errorEvent.connect(push_err) self.ib.errorEvent.connect(push_err)
async def positions(
self,
account: str = '',
) -> List[Position]:
"""
Retrieve position info for ``account``.
"""
return self.ib.positions(account=account)
# default config ports # default config ports
_tws_port: int = 7497 _tws_port: int = 7497
@ -1155,6 +1168,18 @@ async def stream_and_write(
ticker.ticks = [] ticker.ticks = []
def pack_position(pos: Position) -> Dict[str, Any]:
con = pos.contract
return {
'broker': 'ib',
'account': pos.account,
'symbol': con.symbol,
'currency': con.currency,
'size': float(pos.position),
'avg_price': float(pos.avgCost) / float(con.multiplier or 1.0),
}
@tractor.msg.pub( @tractor.msg.pub(
send_on_connect={'local_trades': 'start'} send_on_connect={'local_trades': 'start'}
) )
@ -1163,14 +1188,18 @@ async def stream_trades(
get_topics: Callable = None, get_topics: Callable = None,
) -> AsyncIterator[Dict[str, Any]]: ) -> AsyncIterator[Dict[str, Any]]:
global _trades_stream_is_live
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
stream = await _trio_run_client_method( stream = await _trio_run_client_method(
method='recv_trade_updates', method='recv_trade_updates',
) )
# deliver positions to subscriber before anything else
positions = await _trio_run_client_method(method='positions')
for pos in positions:
yield {'local_trades': ('position', pack_position(pos))}
action_map = {'BOT': 'buy', 'SLD': 'sell'} action_map = {'BOT': 'buy', 'SLD': 'sell'}
async for event_name, item in stream: async for event_name, item in stream:
@ -1230,12 +1259,19 @@ async def stream_trades(
# don't forward, it's pointless.. # don't forward, it's pointless..
continue continue
if msg['reqid'] < -1: elif event_name == 'position':
msg = pack_position(item)
if msg.get('reqid', 0) < -1:
# it's a trade event generated by TWS usage. # it's a trade event generated by TWS usage.
log.warning(f"TWS triggered trade:\n{pformat(msg)}") log.warning(f"TWS triggered trade:\n{pformat(msg)}")
msg['reqid'] = 'tws-' + str(-1 * msg['reqid']) msg['reqid'] = 'tws-' + str(-1 * msg['reqid'])
# mark msg as from "external system"
# TODO: probably something better then this..
msg['external'] = True
yield {'remote_trades': (event_name, msg)} yield {'remote_trades': (event_name, msg)}
continue continue