diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index aa07f9ef..2679e988 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -40,6 +40,7 @@ from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails from ib_insync.order import Order from ib_insync.ticker import Ticker +from ib_insync.objects import Position import ib_insync as ibis from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client @@ -449,9 +450,8 @@ class Client: size: int, # XXX: by default 0 tells ``ib_insync`` methods that there is no - # existing order so ask the client to create a new one (which - # it seems to do by allocating an int counter - collision - # prone..) + # existing order so ask the client to create a new one (which it + # seems to do by allocating an int counter - collision prone..) brid: int = None, ) -> int: """Place an order and return integer request id provided by client. @@ -507,17 +507,21 @@ class Client: """ self.inline_errors(to_trio) - def push_tradesies(eventkit_obj, trade, fill=None): + def push_tradesies(eventkit_obj, obj, fill=None): """Push events to trio task. """ if fill is not None: # execution details event - item = ('fill', (trade, fill)) - else: - item = ('status', trade) + item = ('fill', (obj, fill)) - log.info(f'{eventkit_obj}: {item}') + elif eventkit_obj.name() == 'positionEvent': + item = ('position', obj) + + else: + item = ('status', obj) + + log.info(f'eventkit event -> {eventkit_obj}: {item}') try: to_trio.send_nowait(item) @@ -529,6 +533,7 @@ class Client: for ev_name in [ 'orderStatusEvent', # all order updates 'execDetailsEvent', # all "fill" updates + 'positionEvent', # avg price updates per symbol per account # 'commissionReportEvent', # XXX: ugh, it is a separate event from IB and it's @@ -537,7 +542,6 @@ class Client: # XXX: not sure yet if we need these # 'updatePortfolioEvent', - # 'positionEvent', # XXX: these all seem to be weird ib_insync intrernal # events that we probably don't care that much about @@ -584,6 +588,15 @@ class Client: self.ib.errorEvent.connect(push_err) + async def positions( + self, + account: str = '', + ) -> List[Position]: + """ + Retrieve position info for ``account``. + """ + return self.ib.positions(account=account) + # default config ports _tws_port: int = 7497 @@ -1155,6 +1168,18 @@ async def stream_and_write( ticker.ticks = [] +def pack_position(pos: Position) -> Dict[str, Any]: + con = pos.contract + return { + 'broker': 'ib', + 'account': pos.account, + 'symbol': con.symbol, + 'currency': con.currency, + 'size': float(pos.position), + 'avg_price': float(pos.avgCost) / float(con.multiplier or 1.0), + } + + @tractor.msg.pub( send_on_connect={'local_trades': 'start'} ) @@ -1163,14 +1188,18 @@ async def stream_trades( get_topics: Callable = None, ) -> AsyncIterator[Dict[str, Any]]: - global _trades_stream_is_live - # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) stream = await _trio_run_client_method( method='recv_trade_updates', ) + + # deliver positions to subscriber before anything else + positions = await _trio_run_client_method(method='positions') + for pos in positions: + yield {'local_trades': ('position', pack_position(pos))} + action_map = {'BOT': 'buy', 'SLD': 'sell'} async for event_name, item in stream: @@ -1230,12 +1259,19 @@ async def stream_trades( # don't forward, it's pointless.. continue - if msg['reqid'] < -1: + elif event_name == 'position': + msg = pack_position(item) + + if msg.get('reqid', 0) < -1: # it's a trade event generated by TWS usage. log.warning(f"TWS triggered trade:\n{pformat(msg)}") msg['reqid'] = 'tws-' + str(-1 * msg['reqid']) + # mark msg as from "external system" + # TODO: probably something better then this.. + msg['external'] = True + yield {'remote_trades': (event_name, msg)} continue