Add order cancellation and error support

basic_orders
Tyler Goodlet 2021-01-14 12:56:27 -05:00
parent 140f3231e7
commit ee8caa80d4
1 changed files with 139 additions and 32 deletions

View File

@ -100,8 +100,9 @@ class NonShittyWrapper(Wrapper):
def tcpDataArrived(self): def tcpDataArrived(self):
"""Override time stamps to be floats for now. """Override time stamps to be floats for now.
""" """
# use a float to store epoch time instead of datetime # use a ns int to store epoch time instead of datetime
self.lastTime = time.time() self.lastTime = time.time_ns()
for ticker in self.pendingTickers: for ticker in self.pendingTickers:
ticker.rtTime = None ticker.rtTime = None
ticker.ticks = [] ticker.ticks = []
@ -109,6 +110,18 @@ class NonShittyWrapper(Wrapper):
ticker.domTicks = [] ticker.domTicks = []
self.pendingTickers = set() self.pendingTickers = set()
def execDetails(
self,
reqId: int,
contract: Contract,
execu,
):
"""
Get rid of datetime on executions.
"""
execu.time = execu.time.timestamp()
return super().execDetails(reqId, contract, execu)
class NonShittyIB(ibis.IB): class NonShittyIB(ibis.IB):
"""The beginning of overriding quite a few decisions in this lib. """The beginning of overriding quite a few decisions in this lib.
@ -121,7 +134,7 @@ class NonShittyIB(ibis.IB):
# XXX: just to override this wrapper # XXX: just to override this wrapper
self.wrapper = NonShittyWrapper(self) self.wrapper = NonShittyWrapper(self)
self.client = ib_Client(self.wrapper) self.client = ib_Client(self.wrapper)
self.errorEvent += self._onError # self.errorEvent += self._onError
self.client.apiEnd += self.disconnectedEvent self.client.apiEnd += self.disconnectedEvent
self._logger = logging.getLogger('ib_insync.ib') self._logger = logging.getLogger('ib_insync.ib')
@ -220,9 +233,6 @@ class Client:
df = ibis.util.df(bars) df = ibis.util.df(bars)
return bars, from_df(df) return bars, from_df(df)
def onError(self, reqId, errorCode, errorString, contract) -> None:
breakpoint()
async def search_stocks( async def search_stocks(
self, self,
pattern: str, pattern: str,
@ -348,9 +358,6 @@ class Client:
exch = 'SMART' if not exch else exch exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0] contract = (await self.ib.qualifyContractsAsync(con))[0]
# head = await self.get_head_time(contract)
# print(head)
except IndexError: except IndexError:
raise ValueError(f"No contract could be found {con}") raise ValueError(f"No contract could be found {con}")
@ -423,9 +430,11 @@ class Client:
ticker = await ticker.updateEvent ticker = await ticker.updateEvent
return contract, ticker return contract, ticker
def submit_limit( # async to be consistent for the client proxy, and cuz why not.
async def submit_limit(
self, self,
contract: Contract, oid: str,
symbol: str,
price: float, price: float,
action: str = 'BUY', action: str = 'BUY',
quantity: int = 100, quantity: int = 100,
@ -433,29 +442,70 @@ class Client:
"""Place an order and return integer request id provided by client. """Place an order and return integer request id provided by client.
""" """
try:
contract = self._contracts[symbol]
except KeyError:
# require that the symbol has been previously cached by
# a data feed request - ensure we aren't making orders
# against non-known prices.
raise RuntimeError("Can not order {symbol}, no live feed?")
# contract.exchange = 'SMART'
trade = self.ib.placeOrder( trade = self.ib.placeOrder(
contract,
Order( Order(
self, # orderId=oid,
action=action.upper(), # BUY/SELL
orderType='LMT', orderType='LMT',
action=action,
totalQuantity=quantity,
lmtPrice=price, lmtPrice=price,
) totalQuantity=quantity,
outsideRth=True,
optOutSmartRouting=True,
routeMarketableToBbo=True,
designatedLocation='SMART',
),
) )
return trade.order.orderId return trade.order.orderId
async def submit_cancel(
self,
oid: str,
) -> None:
"""Send cancel request for order id ``oid``.
"""
self.ib.cancelOrder(
Order(
orderId=oid,
clientId=self.ib.client.clientId,
)
)
async def recv_trade_updates( async def recv_trade_updates(
self, self,
to_trio, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
"""Stream a ticker using the std L1 api. """Stream a ticker using the std L1 api.
""" """
# contract = contract or (await self.find_contract(symbol)) # contract = contract or (await self.find_contract(symbol))
self.inline_errors(to_trio)
def push(eventkit_obj, trade): def push_tradesies(eventkit_obj, trade, fill=None):
"""Push events to trio task. """Push events to trio task.
""" """
# if fill is not None:
# heyoo we executed, and thanks to ib_insync
# we have to handle the callback signature differently
# due to its consistently non-consistent design.
# yet again convert the datetime since they aren't
# ipc serializable...
# fill.time = fill.time.timestamp
# trade.fill = fill
print(f'{eventkit_obj}: {trade}') print(f'{eventkit_obj}: {trade}')
log.debug(trade) log.debug(trade)
if trade is None: if trade is None:
@ -468,21 +518,62 @@ class Client:
# resulting in tracebacks spammed to console.. # resulting in tracebacks spammed to console..
# Manually do the dereg ourselves. # Manually do the dereg ourselves.
log.exception(f'Disconnected from {eventkit_obj} updates') log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push) eventkit_obj.disconnect(push_tradesies)
# hook up to the weird eventkit object - event stream api # hook up to the weird eventkit object - event stream api
for ev_name in [ for ev_name in [
# 'newOrderEvent', 'orderModifyEvent', 'cancelOrderEvent', 'orderStatusEvent',
'openOrderEvent', 'orderStatusEvent', 'execDetailsEvent', 'execDetailsEvent',
# 'commissionReportEvent', 'updatePortfolioEvent', 'positionEvent', # XXX: not sure yet if we need these
# 'commissionReportEvent',
# 'updatePortfolioEvent',
# 'positionEvent',
# XXX: these all seem to be weird ib_insync intrernal
# events that we probably don't care that much about
# given the internal design is wonky af..
# 'newOrderEvent',
# 'orderModifyEvent',
# 'cancelOrderEvent',
# 'openOrderEvent',
]: ]:
eventkit_obj = getattr(self.ib, ev_name) eventkit_obj = getattr(self.ib, ev_name)
handler = partial(push, eventkit_obj) handler = partial(push_tradesies, eventkit_obj)
eventkit_obj.connect(handler) eventkit_obj.connect(handler)
# let the engine run and stream # let the engine run and stream
await self.ib.disconnectedEvent await self.ib.disconnectedEvent
def inline_errors(
self,
to_trio: trio.abc.SendChannel,
) -> None:
# connect error msgs
def push_err(
reqId: int,
errorCode: int,
errorString: str,
contract: Contract,
) -> None:
log.error(errorString)
try:
to_trio.send_nowait(
{'error': {
'brid': reqId,
'message': errorString,
'contract': contract,
}}
)
except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
log.exception('Disconnected from errorEvent updates')
self.ib.errorEvent.disconnect(push_err)
self.ib.errorEvent.connect(push_err)
# default config ports # default config ports
_tws_port: int = 7497 _tws_port: int = 7497
@ -536,11 +627,13 @@ async def _aio_get_client(
else: else:
raise ConnectionRefusedError(_err) raise ConnectionRefusedError(_err)
# create and cache
try: try:
client = Client(ib) client = Client(ib)
_client_cache[(host, port)] = client _client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}") log.debug(f"Caching client for {(host, port)}")
yield client yield client
except BaseException: except BaseException:
ib.disconnect() ib.disconnect()
raise raise
@ -607,8 +700,7 @@ class _MethodProxy:
**kwargs **kwargs
) -> Any: ) -> Any:
return await self._portal.run( return await self._portal.run(
__name__, _trio_run_client_method,
'_trio_run_client_method',
method=meth, method=meth,
**kwargs **kwargs
) )
@ -641,7 +733,8 @@ async def get_client(
infect_asyncio=True, infect_asyncio=True,
**kwargs **kwargs
) as portal: ) as portal:
yield get_client_proxy(portal) proxy_client = get_client_proxy(portal)
yield proxy_client
# https://interactivebrokers.github.io/tws-api/tick_types.html # https://interactivebrokers.github.io/tws-api/tick_types.html
@ -997,11 +1090,25 @@ async def stream_trades(
method='recv_trade_updates', method='recv_trade_updates',
) )
# more great work by our friend ib_insync... # init startup msg
# brutallll bby. yield {'trade_events': 'started'}
none = await stream.__anext__()
print(f'Cuz sending {none} makes sense..')
async for trade_event in stream: async for event in stream:
msg = asdict(trade_event) from pprint import pprint
yield {'all': msg}
if not isinstance(event, dict):
# remove trade log entries for now until we figure out if we
# even want to retreive them this way and because they're using
# datetimes
event = asdict(event)
pprint(event)
event.pop('log', None)
# fills = event.get('fills')
# if fills:
# await tractor.breakpoint()
# for fill in fills:
# fill['time'] = fill['time'].timestamp
# exec = fill.pop('execution')
yield {'trade_events': event}