From 6f172479ebab9e6d91be46a143bf9d429d0ed90c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 15 May 2022 13:32:39 -0400 Subject: [PATCH] Drop task-per-method `trio`-`asyncio` proxying Use method proxies through the remaining endpoints and drop the old spawn-a-task-per-method-call style helpers from module. --- piker/brokers/ib.py | 415 ++++++++++++++++++++------------------------ 1 file changed, 185 insertions(+), 230 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index bb131fec..bf66485b 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1143,6 +1143,13 @@ def get_preferred_data_client( clients: dict[str, Client], ) -> tuple[str, Client]: + ''' + Load and return the (first found) `Client` instance that is + preferred and should be used for data by iterating, in priority + order, the ``ib.prefer_data_account: list[str]`` account names in + the users ``brokers.toml`` file. + + ''' conf = get_config() data_accounts = conf['prefer_data_account'] @@ -1165,7 +1172,6 @@ async def open_data_client() -> MethodProxy: and deliver that client wrapped in a ``MethodProxy``. ''' - async with ( open_client_proxies() as (proxies, clients), ): @@ -1179,60 +1185,6 @@ async def open_data_client() -> MethodProxy: yield proxy -async def _aio_run_client_method( - meth: str, - to_trio=None, - from_trio=None, - client=None, - **kwargs, -) -> None: - - async with load_aio_clients() as accts2clients: - client = list(accts2clients.values())[0] - async_meth = getattr(client, meth) - - # handle streaming methods - args = tuple(inspect.getfullargspec(async_meth).args) - if to_trio and 'to_trio' in args: - kwargs['to_trio'] = to_trio - - log.runtime(f'Running {meth}({kwargs})') - return await async_meth(**kwargs) - - -async def _trio_run_client_method( - method: str, - client: Optional[Client] = None, - **kwargs, - -) -> None: - ''' - Asyncio entry point to run tasks against the ``ib_insync`` api. - - ''' - ca = tractor.current_actor() - assert ca.is_infected_aio() - - # if the method is an *async gen* stream for it - # meth = getattr(Client, method) - - # args = tuple(inspect.getfullargspec(meth).args) - - # if inspect.isasyncgenfunction(meth) or ( - # # if the method is an *async func* but manually - # # streams back results, make sure to also stream it - # 'to_trio' in args - # ): - # kwargs['_treat_as_stream'] = True - - return await to_asyncio.run_task( - _aio_run_client_method, - meth=method, - client=client, - **kwargs - ) - - class MethodProxy: def __init__( @@ -1830,6 +1782,9 @@ async def _setup_quote_stream( ''' Stream a ticker using the std L1 api. + This task is ``asyncio``-side and must be called from + ``tractor.to_asyncio.open_channel_from()``. + ''' global _quote_streams @@ -1926,6 +1881,7 @@ async def open_aio_quote_stream( _setup_quote_stream, symbol=symbol, contract=contract, + ) as (first, from_aio): # cache feed for later consumers @@ -1956,122 +1912,120 @@ async def stream_quotes( sym = symbols[0] log.info(f'request for real-time quotes: {sym}') - con, first_ticker, details = await _trio_run_client_method( - method='get_sym_details', - symbol=sym, - ) - first_quote = normalize(first_ticker) - # print(f'first quote: {first_quote}') + async with open_data_client() as proxy: - def mk_init_msgs() -> dict[str, dict]: - # pass back some symbol info like min_tick, trading_hours, etc. - syminfo = asdict(details) - syminfo.update(syminfo['contract']) + con, first_ticker, details = await proxy.get_sym_details(symbol=sym) + first_quote = normalize(first_ticker) + # print(f'first quote: {first_quote}') - # nested dataclass we probably don't need and that won't IPC serialize - syminfo.pop('secIdList') + def mk_init_msgs() -> dict[str, dict]: + # pass back some symbol info like min_tick, trading_hours, etc. + syminfo = asdict(details) + syminfo.update(syminfo['contract']) - # TODO: more consistent field translation - atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] + # nested dataclass we probably don't need and that won't IPC + # serialize + syminfo.pop('secIdList') - # for stocks it seems TWS reports too small a tick size - # such that you can't submit orders with that granularity? - min_tick = 0.01 if atype == 'stock' else 0 + # TODO: more consistent field translation + atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) + # for stocks it seems TWS reports too small a tick size + # such that you can't submit orders with that granularity? + min_tick = 0.01 if atype == 'stock' else 0 - # for "traditional" assets, volume is normally discreet, not a float - syminfo['lot_tick_size'] = 0.0 + syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - # TODO: for loop through all symbols passed in - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': syminfo, - 'fqsn': first_quote['fqsn'], + # for "traditional" assets, volume is normally discreet, not + # a float + syminfo['lot_tick_size'] = 0.0 + + # TODO: for loop through all symbols passed in + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': syminfo, + 'fqsn': first_quote['fqsn'], + } } - } - return init_msgs + return init_msgs - init_msgs = mk_init_msgs() + init_msgs = mk_init_msgs() - # TODO: we should instead spawn a task that waits on a feed to start - # and let it wait indefinitely..instead of this hard coded stuff. - with trio.move_on_after(1): - contract, first_ticker, details = await _trio_run_client_method( - method='get_quote', - symbol=sym, - ) + # TODO: we should instead spawn a task that waits on a feed to start + # and let it wait indefinitely..instead of this hard coded stuff. + with trio.move_on_after(1): + contract, first_ticker, details = await proxy.get_quote(symbol=sym) - # it might be outside regular trading hours so see if we can at - # least grab history. - if isnan(first_ticker.last): - task_status.started((init_msgs, first_quote)) + # it might be outside regular trading hours so see if we can at + # least grab history. + if isnan(first_ticker.last): + task_status.started((init_msgs, first_quote)) - # it's not really live but this will unblock - # the brokerd feed task to tell the ui to update? - feed_is_live.set() - - # block and let data history backfill code run. - await trio.sleep_forever() - return # we never expect feed to come up? - - async with open_aio_quote_stream( - symbol=sym, - contract=con, - ) as stream: - - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] - - task_status.started((init_msgs, first_quote)) - - async with aclosing(stream): - if type(first_ticker.contract) not in ( - ibis.Commodity, - ibis.Forex - ): - # wait for real volume on feed (trading might be closed) - while True: - ticker = await stream.receive() - - # for a real volume contract we rait for the first - # "real" trade to take place - if ( - # not calc_price - # and not ticker.rtTime - not ticker.rtTime - ): - # spin consuming tickers until we get a real - # market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] - - # XXX: this works because we don't use - # ``aclosing()`` above? - break - - quote = normalize(ticker) - log.debug(f"First ticker received {quote}") - - # tell caller quotes are now coming in live + # it's not really live but this will unblock + # the brokerd feed task to tell the ui to update? feed_is_live.set() - # last = time.time() - async for ticker in stream: - quote = normalize(ticker) - await send_chan.send({quote['fqsn']: quote}) + # block and let data history backfill code run. + await trio.sleep_forever() + return # we never expect feed to come up? + + async with open_aio_quote_stream( + symbol=sym, + contract=con, + ) as stream: + + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] + + task_status.started((init_msgs, first_quote)) + + async with aclosing(stream): + if type(first_ticker.contract) not in ( + ibis.Commodity, + ibis.Forex + ): + # wait for real volume on feed (trading might be closed) + while True: + ticker = await stream.receive() + + # for a real volume contract we rait for the first + # "real" trade to take place + if ( + # not calc_price + # and not ticker.rtTime + not ticker.rtTime + ): + # spin consuming tickers until we get a real + # market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) + ticker.ticks = [] + + # XXX: this works because we don't use + # ``aclosing()`` above? + break + + quote = normalize(ticker) + log.debug(f"First ticker received {quote}") + + # tell caller quotes are now coming in live + feed_is_live.set() - # ugh, clear ticks since we've consumed them - ticker.ticks = [] # last = time.time() + async for ticker in stream: + quote = normalize(ticker) + await send_chan.send({quote['fqsn']: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + # last = time.time() def pack_position( @@ -2466,95 +2420,96 @@ async def open_symbol_search( ctx: tractor.Context, ) -> None: - # load all symbols locally for fast search + + # TODO: load user defined symbol set locally for fast search? await ctx.started({}) - async with ctx.open_stream() as stream: - - last = time.time() - - async for pattern in stream: - log.debug(f'received {pattern}') - now = time.time() - - assert pattern, 'IB can not accept blank search pattern' - - # throttle search requests to no faster then 1Hz - diff = now - last - if diff < 1.0: - log.debug('throttle sleeping') - await trio.sleep(diff) - try: - pattern = stream.receive_nowait() - except trio.WouldBlock: - pass - - if not pattern or pattern.isspace(): - log.warning('empty pattern received, skipping..') - - # TODO: *BUG* if nothing is returned here the client - # side will cache a null set result and not showing - # anything to the use on re-searches when this query - # timed out. We probably need a special "timeout" msg - # or something... - - # XXX: this unblocks the far end search task which may - # hold up a multi-search nursery block - await stream.send({}) - - continue - - log.debug(f'searching for {pattern}') + async with open_data_client() as proxy: + async with ctx.open_stream() as stream: last = time.time() - # async batch search using api stocks endpoint and module - # defined adhoc symbol set. - stock_results = [] + async for pattern in stream: + log.debug(f'received {pattern}') + now = time.time() - async def stash_results(target: Awaitable[list]): - stock_results.extend(await target) + assert pattern, 'IB can not accept blank search pattern' - async with trio.open_nursery() as sn: - sn.start_soon( - stash_results, - _trio_run_client_method( - method='search_symbols', - pattern=pattern, - upto=5, + # throttle search requests to no faster then 1Hz + diff = now - last + if diff < 1.0: + log.debug('throttle sleeping') + await trio.sleep(diff) + try: + pattern = stream.receive_nowait() + except trio.WouldBlock: + pass + + if not pattern or pattern.isspace(): + log.warning('empty pattern received, skipping..') + + # TODO: *BUG* if nothing is returned here the client + # side will cache a null set result and not showing + # anything to the use on re-searches when this query + # timed out. We probably need a special "timeout" msg + # or something... + + # XXX: this unblocks the far end search task which may + # hold up a multi-search nursery block + await stream.send({}) + + continue + + log.debug(f'searching for {pattern}') + + last = time.time() + + # async batch search using api stocks endpoint and module + # defined adhoc symbol set. + stock_results = [] + + async def stash_results(target: Awaitable[list]): + stock_results.extend(await target) + + async with trio.open_nursery() as sn: + sn.start_soon( + stash_results, + proxy.search_symbols( + pattern=pattern, + upto=5, + ), ) - ) - # trigger async request - await trio.sleep(0) + # trigger async request + await trio.sleep(0) - # match against our ad-hoc set immediately - adhoc_matches = fuzzy.extractBests( + # match against our ad-hoc set immediately + adhoc_matches = fuzzy.extractBests( + pattern, + list(_adhoc_futes_set), + score_cutoff=90, + ) + log.info(f'fuzzy matched adhocs: {adhoc_matches}') + adhoc_match_results = {} + if adhoc_matches: + # TODO: do we need to pull contract details? + adhoc_match_results = {i[0]: {} for i in adhoc_matches} + + log.debug(f'fuzzy matching stocks {stock_results}') + stock_matches = fuzzy.extractBests( pattern, - list(_adhoc_futes_set), - score_cutoff=90, + stock_results, + score_cutoff=50, ) - log.info(f'fuzzy matched adhocs: {adhoc_matches}') - adhoc_match_results = {} - if adhoc_matches: - # TODO: do we need to pull contract details? - adhoc_match_results = {i[0]: {} for i in adhoc_matches} - log.debug(f'fuzzy matching stocks {stock_results}') - stock_matches = fuzzy.extractBests( - pattern, - stock_results, - score_cutoff=50, - ) + matches = adhoc_match_results | { + item[0]: {} for item in stock_matches + } + # TODO: we used to deliver contract details + # {item[2]: item[0] for item in stock_matches} - matches = adhoc_match_results | { - item[0]: {} for item in stock_matches - } - # TODO: we used to deliver contract details - # {item[2]: item[0] for item in stock_matches} - - log.debug(f"sending matches: {matches.keys()}") - await stream.send(matches) + log.debug(f"sending matches: {matches.keys()}") + await stream.send(matches) async def data_reset_hack(