diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 9a77f5fb..8fda1dec 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -673,23 +673,28 @@ class Client: # against non-known prices. raise RuntimeError("Can not order {symbol}, no live feed?") - trade = self.ib.placeOrder( - contract, - Order( - orderId=reqid or 0, # stupid api devs.. - action=action.upper(), # BUY/SELL - # lookup the literal account number by name here. - account=account, - orderType='LMT', - lmtPrice=price, - totalQuantity=size, - outsideRth=True, + try: + trade = self.ib.placeOrder( + contract, + Order( + orderId=reqid or 0, # stupid api devs.. + action=action.upper(), # BUY/SELL + # lookup the literal account number by name here. + account=account, + orderType='LMT', + lmtPrice=price, + totalQuantity=size, + outsideRth=True, - optOutSmartRouting=True, - routeMarketableToBbo=True, - designatedLocation='SMART', - ), - ) + optOutSmartRouting=True, + routeMarketableToBbo=True, + designatedLocation='SMART', + ), + ) + except AssertionError: # errrg insync.. + log.warning(f'order for {reqid} already complete?') + # will trigger an error in ems request handler task. + return None # ib doesn't support setting your own id outside # their own weird client int counting ids.. @@ -1116,7 +1121,25 @@ class MethodProxy: chan = self.chan # send through method + ``kwargs: dict`` as pair await chan.send((meth, kwargs)) - return await chan.receive() + msg = await chan.receive() + res = msg.get('result') + if res: + return res + + err = msg.get('error') + if not err: + raise ValueError(f'Received unexpected asyncio msg {msg}') + + raise err + + async def wait_for_data_reset(self) -> None: + ''' + Send hacker hot keys to ib program and wait + for the event that declares the data feeds to be + back up before unblocking. + + ''' + ... async def open_aio_client_method_relay( @@ -1132,6 +1155,9 @@ async def open_aio_client_method_relay( ): to_trio.send_nowait(client) + # TODO: separate channel for error handling? + # client.inline_errors(to_trio) + # relay all method requests to ``asyncio``-side client and # deliver back results while not to_trio._closed: @@ -1143,10 +1169,18 @@ async def open_aio_client_method_relay( meth_name, kwargs = msg meth = getattr(client, meth_name) - resp = await meth(**kwargs) + try: + resp = await meth(**kwargs) + # echo the msg back + to_trio.send_nowait({'result': resp}) - # echo the msg back - to_trio.send_nowait(resp) + except ( + RequestError, + + # TODO: relay all errors to trio? + # BaseException, + ) as err: + to_trio.send_nowait({'error': err}) @acm @@ -1361,6 +1395,7 @@ async def get_bars( return await get() except RequestError as err: + # why do we always need to rebind this? _err = err # TODO: retreive underlying ``ib_insync`` error? @@ -1409,6 +1444,10 @@ async def get_bars( if not in_throttle: await tractor.breakpoint() + # TODO: wait on data con reset event + # then begin backfilling again. + # await proxy.wait_for_data() + in_throttle = True fails += 1 continue @@ -1463,6 +1502,7 @@ async def backfill_bars( # on that until we have the `marketstore` daemon in place in which # case the shm size will be driven by user config and available sys # memory. + # count: int = 120, count: int = 22, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1745,7 +1785,9 @@ async def stream_quotes( init_msgs = mk_init_msgs() - with trio.move_on_after(1): + # TODO: we should instead spawn a task that waits on a feed to start + # and let it wait indefinitely..instead of this hard coded stuff. + with trio.move_on_after(6): contract, first_ticker, details = await _trio_run_client_method( method='get_quote', symbol=sym, @@ -1922,6 +1964,12 @@ async def handle_order_requests( # counter - collision prone..) reqid=order.reqid, ) + if reqid is None: + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason='Order already active?', + ).dict()) # deliver ack that order has been submitted to broker routing await ems_order_stream.send(