diff --git a/piker/_daemon.py b/piker/_daemon.py index 4c37812d..07a584c3 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -142,8 +142,14 @@ async def maybe_open_runtime( Start the ``tractor`` runtime (a root actor) if none exists. """ + settings = _tractor_kwargs + settings.update(kwargs) + if not tractor.current_actor(err_on_no_runtime=False): - async with tractor.open_root_actor(loglevel=loglevel, **kwargs): + async with tractor.open_root_actor( + loglevel=loglevel, + **settings, + ): yield else: yield diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 6f3014d1..c2de7ded 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -207,6 +207,25 @@ class Client: return self._pairs + async def search_symbols( + self, + pattern: str, + limit: int = None, + ) -> Dict[str, Any]: + if self._pairs is not None: + data = self._pairs + else: + data = await self.symbol_info() + + matches = fuzzy.extractBests( + pattern, + data, + score_cutoff=50, + ) + # repack in dict form + return {item[0]['symbol']: item[0] + for item in matches} + async def bars( self, symbol: str, diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index b0083cfa..164a060b 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -30,7 +30,7 @@ import tractor from ..cli import cli from .. import watchlists as wl from ..log import get_console_log, colorize_json, get_logger -from .._daemon import maybe_spawn_brokerd +from .._daemon import maybe_spawn_brokerd, maybe_open_pikerd from ..brokers import core, get_brokermod, data log = get_logger('cli') @@ -273,13 +273,25 @@ def search(config, pattern): """Search for symbols from broker backend(s). """ # global opts - brokermod = config['brokermods'][0] + brokermods = config['brokermods'] - quotes = tractor.run( - partial(core.symbol_search, brokermod, pattern), - start_method='forkserver', - loglevel='info', + # define tractor entrypoint + async def main(func): + + async with maybe_open_pikerd( + loglevel=config['loglevel'], + ): + return await func() + + quotes = trio.run( + main, + partial( + core.symbol_search, + brokermods, + pattern, + ), ) + if not quotes: log.error(f"No matches could be found for {pattern}?") return diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 5189df85..59621b63 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -24,8 +24,12 @@ import inspect from types import ModuleType from typing import List, Dict, Any, Optional +import trio + from ..log import get_logger from . import get_brokermod +from .._daemon import maybe_spawn_brokerd +from .api import open_cached_client log = get_logger(__name__) @@ -126,13 +130,41 @@ async def symbol_info( return await client.symbol_info(symbol, **kwargs) +async def search_w_brokerd(name: str, pattern: str) -> dict: + + async with open_cached_client(name) as client: + + # TODO: support multiple asset type concurrent searches. + return await client.search_symbols(pattern=pattern) + + async def symbol_search( - brokermod: ModuleType, + brokermods: list[ModuleType], pattern: str, **kwargs, ) -> Dict[str, Dict[str, Dict[str, Any]]]: """Return symbol info from broker. """ - async with brokermod.get_client() as client: - # TODO: support multiple asset type concurrent searches. - return await client.search_stocks(pattern=pattern, **kwargs) + results = [] + + async def search_backend(brokername: str) -> None: + + async with maybe_spawn_brokerd( + brokername, + ) as portal: + + results.append(( + brokername, + await portal.run( + search_w_brokerd, + name=brokername, + pattern=pattern, + ), + )) + + async with trio.open_nursery() as n: + + for mod in brokermods: + n.start_soon(search_backend, mod.name) + + return results diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 189f800d..96398c44 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -52,7 +52,7 @@ from ..log import get_logger, get_console_log from .._daemon import maybe_spawn_brokerd from ..data._source import from_df from ..data._sharedmem import ShmArray -from ._util import SymbolNotFound +from ._util import SymbolNotFound, NoData log = get_logger(__name__) @@ -311,6 +311,18 @@ class Client: else: return {} + async def search_symbols( + self, + pattern: str, + # how many contracts to search "up to" + upto: int = 3, + asdicts: bool = True, + ) -> Dict[str, ContractDetails]: + + # TODO add search though our adhoc-locally defined symbol set + # for futes/cmdtys/ + return await self.search_stocks(pattern, upto, asdicts) + async def search_futes( self, pattern: str, @@ -862,6 +874,13 @@ async def get_bars( # throttling despite the rps being low break + elif 'No market data permissions for' in err.message: + + # TODO: signalling for no permissions searches + raise NoData(f'Symbol: {sym}') + break + + else: log.exception( "Data query rate reached: Press `ctrl-alt-f`" @@ -1133,8 +1152,10 @@ async def stream_quotes( # tell caller quotes are now coming in live feed_is_live.set() + # last = time.time() async with aclosing(stream): async for ticker in stream: + # print(f'ticker rate: {1/(time.time() - last)}') # print(ticker.vwap) quote = normalize( @@ -1149,6 +1170,7 @@ async def stream_quotes( # ugh, clear ticks since we've consumed them ticker.ticks = [] + # last = time.time() def pack_position(pos: Position) -> Dict[str, Any]: diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index f2adccf4..7e9afd5c 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -207,7 +207,7 @@ class Client: return self._pairs - async def search_stocks( + async def search_symbols( self, pattern: str, limit: int = None, diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 528df91e..3f09587c 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -628,7 +628,7 @@ class Client: f"Took {time.time() - start} seconds to retreive {len(bars)} bars") return bars - async def search_stocks( + async def search_symbols( self, pattern: str, # how many contracts to return