diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 248039d4..b378f5f2 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -489,26 +489,41 @@ class Client: formatDate=2, # timezone aware UTC datetime ) - async def get_quote( + async def get_sym_details( self, symbol: str, - ) -> Ticker: - """Return a single quote for symbol. + ) -> tuple[Contract, Ticker, ContractDetails]: - """ contract = await self.find_contract(symbol) - - details_fute = self.ib.reqContractDetailsAsync(contract) ticker: Ticker = self.ib.reqMktData( contract, snapshot=True, ) + details_fute = self.ib.reqContractDetailsAsync(contract) + details = (await details_fute)[0] + + return contract, ticker, details + + async def get_quote( + self, + symbol: str, + ) -> tuple[Contract, Ticker, ContractDetails]: + ''' + Return a single quote for symbol. + + ''' + contract, ticker, details = await self.get_sym_details(symbol) # ensure a last price gets filled in before we deliver quote - while isnan(ticker.last): - ticker = await ticker.updateEvent + for _ in range(25): + if isnan(ticker.last): + ticker = await ticker.updateEvent + await asyncio.sleep(0.2) + else: + log.warning( + f'Symbol {symbol} is not returning a quote ' + 'it may be outside trading hours?') - details = (await details_fute)[0] return contract, ticker, details # async to be consistent for the client proxy, and cuz why not. @@ -724,11 +739,13 @@ async def load_aio_clients( client_id: Optional[int] = None, ) -> Client: - '''Return an ``ib_insync.IB`` instance wrapped in our client API. + ''' + Return an ``ib_insync.IB`` instance wrapped in our client API. Client instances are cached for later use. TODO: consider doing this with a ctx mngr eventually? + ''' global _accounts2clients, _client_cache, _scan_ignore @@ -767,7 +784,7 @@ async def load_aio_clients( try_ports = list(ports.values()) ports = try_ports if port is None else [port] # we_connected = [] - connect_timeout = 0.5 if platform.system() != 'Windows' else 1 + connect_timeout = 1 if platform.system() != 'Windows' else 2 combos = list(itertools.product(hosts, ports)) # allocate new and/or reload disconnected but cached clients @@ -929,9 +946,10 @@ async def _trio_run_client_method( **kwargs, ) -> None: - """Asyncio entry point to run tasks against the ``ib_insync`` api. + ''' + Asyncio entry point to run tasks against the ``ib_insync`` api. - """ + ''' ca = tractor.current_actor() assert ca.is_infected_aio() @@ -1158,6 +1176,7 @@ async def backfill_bars( """ out, fails = await get_bars(sym) + if out is None: raise RuntimeError("Could not pull currrent history?!") @@ -1283,6 +1302,10 @@ async def _setup_quote_stream( # resulting in tracebacks spammed to console.. # Manually do the dereg ourselves. teardown() + except trio.WouldBlock: + log.warning(f'channel is blocking symbol feed for {symbol}?' + f'\n{to_trio.statistics}' + ) # except trio.WouldBlock: # # for slow debugging purposes to avoid clobbering prompt @@ -1350,16 +1373,13 @@ async def stream_quotes( # TODO: support multiple subscriptions sym = symbols[0] - contract, first_ticker, details = await _trio_run_client_method( - method='get_quote', - symbol=sym, - ) - - # stream = await start_aio_quote_stream(symbol=sym, contract=contract) - async with open_aio_quote_stream( - symbol=sym, contract=contract - ) as stream: + with trio.fail_after(16) as cs: + contract, first_ticker, details = await _trio_run_client_method( + method='get_quote', + symbol=sym, + ) + def mk_init_msgs() -> dict[str, dict]: # pass back some symbol info like min_tick, trading_hours, etc. syminfo = asdict(details) syminfo.update(syminfo['contract']) @@ -1387,6 +1407,34 @@ async def stream_quotes( 'symbol_info': syminfo, } } + return init_msgs + + if cs.cancelled_caught: + # it might be outside regular trading hours so see if we can at + # least grab history. + + contract, first_ticker, details = await _trio_run_client_method( + method='get_sym_details', + symbol=sym, + ) + + # init_msgs = mk_init_msgs() + + # try again but without timeout and then do feed startup once we + # get one. + contract, first_ticker, details = await _trio_run_client_method( + method='get_quote', + symbol=sym, + ) + + else: + init_msgs = mk_init_msgs() + + + # stream = await start_aio_quote_stream(symbol=sym, contract=contract) + async with open_aio_quote_stream( + symbol=sym, contract=contract + ) as stream: con = first_ticker.contract diff --git a/snippets/ib_data_reset.py b/snippets/ib_data_reset.py index f5d4ca39..a65321dc 100644 --- a/snippets/ib_data_reset.py +++ b/snippets/ib_data_reset.py @@ -25,6 +25,8 @@ import i3ipc i3 = i3ipc.Connection() t = i3.get_tree() +orig_win_id = t.find_focused().window + # for tws win_names: list[str] = [ 'Interactive Brokers', # tws running in i3 @@ -51,11 +53,20 @@ for name in win_names: # move mouse to bottom left of window (where there should # be nothing to click). - 'mousemove_relative', '--sync', str(w-3), str(h-3), + 'mousemove_relative', '--sync', str(w-4), str(h-4), # NOTE: we may need to stick a `--retry 3` in here.. - 'click', '--window', win_id, '1', + 'click', '--window', win_id, '--repeat', '3', '1', # hackzorzes 'key', 'ctrl+alt+f', - ]) + ], + timeout=1, + ) + +# re-activate and focus original window +subprocess.call([ + 'xdotool', + 'windowactivate', '--sync', str(orig_win_id), + 'click', '--window', str(orig_win_id), '1', +])