diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index ff911edc..b1047b18 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -14,7 +14,7 @@ import tractor from ..cli import cli from .. import watchlists as wl from ..log import get_console_log, colorize_json, get_logger -from ..brokers.core import maybe_spawn_brokerd_as_subactor +from ..data import maybe_spawn_brokerd from ..brokers import core, get_brokermod, data log = get_logger('cli') @@ -99,7 +99,7 @@ def quote(config, tickers, df_output): @cli.command() @click.option('--df-output', '-df', flag_value=True, help='Output in `pandas.DataFrame` format') -@click.option('--count', '-c', default=100, +@click.option('--count', '-c', default=1000, help='Number of bars to retrieve') @click.argument('symbol', required=True) @click.pass_obj @@ -117,10 +117,11 @@ def bars(config, symbol, count, df_output): brokermod, symbol, count=count, + as_np=df_output ) ) - if not bars: + if not len(bars): log.error(f"No quotes could be found for {symbol}?") return @@ -154,7 +155,7 @@ def record(config, rate, name, dhost, filename): return async def main(tries): - async with maybe_spawn_brokerd_as_subactor( + async with maybe_spawn_brokerd( tries=tries, loglevel=loglevel ) as portal: # run app "main" diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 1250f439..67255a41 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -2,25 +2,17 @@ Broker high level cross-process API layer. This API should be kept "remote service compatible" meaning inputs to -routines here should be +routines should be primitive data types where possible. """ import inspect from types import ModuleType from typing import List, Dict, Any, Optional -from async_generator import asynccontextmanager -import tractor - from ..log import get_logger -from .data import DataFeed from . import get_brokermod -log = get_logger('broker.core') -_data_mods = [ - 'piker.brokers.core', - 'piker.brokers.data', -] +log = get_logger(__name__) async def api(brokername: str, methname: str, **kwargs) -> dict: @@ -28,12 +20,11 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: """ brokermod = get_brokermod(brokername) async with brokermod.get_client() as client: - - meth = getattr(client.api, methname, None) + meth = getattr(client, methname, None) if meth is None: log.debug( f"Couldn't find API method {methname} looking up on client") - meth = getattr(client, methname, None) + meth = getattr(client.api, methname, None) if meth is None: log.error(f"No api method `{methname}` could be found?") @@ -51,24 +42,6 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: return await meth(**kwargs) -@asynccontextmanager -async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None): - """If no ``brokerd`` daemon-actor can be found spawn one in a - local subactor. - """ - async with tractor.open_nursery() as nursery: - async with tractor.find_actor('brokerd') as portal: - if not portal: - log.info( - "No broker daemon could be found, spawning brokerd..") - portal = await nursery.start_actor( - 'brokerd', - rpc_module_paths=_data_mods, - loglevel=loglevel, - ) - yield portal - - async def stocks_quote( brokermod: ModuleType, tickers: List[str] @@ -139,11 +112,11 @@ async def symbol_info( async def symbol_search( brokermod: ModuleType, - symbol: str, + pattern: str, **kwargs, ) -> Dict[str, Dict[str, Dict[str, Any]]]: """Return symbol info from broker. """ async with brokermod.get_client() as client: # TODO: support multiple asset type concurrent searches. - return await client.search_stocks(symbol, **kwargs) + return await client.search_stocks(pattern=pattern, **kwargs) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 0b18be3f..96bfa1e1 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -25,7 +25,7 @@ from ..log import get_logger, get_console_log from . import get_brokermod -log = get_logger('broker.data') +log = get_logger(__name__) async def wait_for_network( @@ -80,7 +80,7 @@ class BrokerFeed: @tractor.msg.pub(tasks=['stock', 'option']) -async def stream_requests( +async def stream_poll_requests( get_topics: typing.Callable, get_quotes: Coroutine, feed: BrokerFeed, @@ -90,6 +90,12 @@ async def stream_requests( """Stream requests for quotes for a set of symbols at the given ``rate`` (per second). + This routine is built for brokers who support quote polling for multiple + symbols per request. The ``get_topics()`` func is called to retreive the + set of symbols each iteration and ``get_quotes()`` is to retreive + the quotes. + + A stock-broker client ``get_quotes()`` async function must be provided which returns an async quote retrieval function. """ @@ -327,7 +333,7 @@ async def start_quote_stream( # push initial smoke quote response for client initialization await ctx.send_yield(payload) - await stream_requests( + await stream_poll_requests( # ``msg.pub`` required kwargs task_name=feed_type, @@ -394,15 +400,19 @@ class DataFeed: # subscribe for tickers (this performs a possible filtering # where invalid symbols are discarded) sd = await self.portal.run( - "piker.brokers.data", 'symbol_data', - broker=self.brokermod.name, tickers=symbols) + "piker.brokers.data", + 'symbol_data', + broker=self.brokermod.name, + tickers=symbols + ) self._symbol_data_cache.update(sd) if test: # stream from a local test file quote_gen = await self.portal.run( - "piker.brokers.data", 'stream_from_file', - filename=test + "piker.brokers.data", + 'stream_from_file', + filename=test, ) else: log.info(f"Starting new stream for {symbols}") diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 101f00a7..425df4af 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -8,7 +8,7 @@ import tractor from ..log import get_console_log, get_logger from ..brokers import get_brokermod, config -from ..brokers.core import _data_mods +from ..data import maybe_spawn_brokerd log = get_logger('cli') DEFAULT_BROKER = 'questrade' @@ -35,6 +35,7 @@ _context_defaults = dict( def pikerd(loglevel, host, tl): """Spawn the piker broker-daemon. """ + from ..data import _data_mods get_console_log(loglevel) tractor.run_daemon( rpc_module_paths=_data_mods, @@ -65,8 +66,12 @@ def cli(ctx, broker, loglevel, configdir): }) +def _load_clis() -> None: + from ..data import marketstore as _ + from ..brokers import cli as _ + from ..ui import cli as _ + from ..watchlists import cli as _ + + # load downstream cli modules -from ..brokers import cli as _ -from ..ui import cli as _ -from ..watchlists import cli as _ -from ..data import marketstore as _ +_load_clis() diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index b1c6463d..54dd0d7a 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -19,11 +19,11 @@ import trio import tractor from trio_websocket import open_websocket_url +from . import maybe_spawn_brokerd from ..cli import cli from .. import watchlists as wl from ..brokers.data import DataFeed from ..log import get_logger -from ..brokers.core import maybe_spawn_brokerd_as_subactor log = get_logger(__name__) @@ -138,7 +138,7 @@ def ingest(config, name, test_file, tl, url): symbols = watchlists[name] async def main(tries): - async with maybe_spawn_brokerd_as_subactor( + async with maybe_spawn_brokerd( tries=tries, loglevel=loglevel ) as portal: diff --git a/piker/ui/cli.py b/piker/ui/cli.py index 3782a8cf..2be6d436 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -8,7 +8,7 @@ import tractor from ..cli import cli from .. import watchlists as wl -from ..brokers.core import maybe_spawn_brokerd_as_subactor +from ..data import maybe_spawn_brokerd _config_dir = click.get_app_dir('piker') @@ -48,7 +48,8 @@ def monitor(config, rate, name, dhost, test, tl): from .kivy.monitor import _async_main async def main(tries): - async with maybe_spawn_brokerd_as_subactor( + async with maybe_spawn_brokerd( + brokername=brokermod.name, tries=tries, loglevel=loglevel ) as portal: # run app "main" @@ -84,7 +85,7 @@ def optschain(config, symbol, date, tl, rate, test): from .kivy.option_chain import _async_main async def main(tries): - async with maybe_spawn_brokerd_as_subactor( + async with maybe_spawn_brokerd( tries=tries, loglevel=loglevel ): # run app "main" @@ -116,4 +117,8 @@ def chart(config, symbol, date, tl, rate, test): """ from ._chart import main - main(symbol) + # global opts + loglevel = config['loglevel'] + brokername = config['broker'] + + main(sym=symbol, brokername=brokername, loglevel=loglevel)