diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 370722b6..f47a7b25 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -14,7 +14,7 @@ import tractor from ..cli import cli from .. import watchlists as wl from ..log import get_console_log, colorize_json, get_logger -from ..brokers.core import maybe_spawn_brokerd_as_subactor +from ..data import maybe_spawn_brokerd from ..brokers import core, get_brokermod, data log = get_logger('cli') @@ -99,7 +99,7 @@ def quote(config, tickers, df_output): @cli.command() @click.option('--df-output', '-df', flag_value=True, help='Output in `pandas.DataFrame` format') -@click.option('--count', '-c', default=100, +@click.option('--count', '-c', default=1000, help='Number of bars to retrieve') @click.argument('symbol', required=True) @click.pass_obj @@ -117,10 +117,11 @@ def bars(config, symbol, count, df_output): brokermod, symbol, count=count, + as_np=df_output ) ) - if not bars: + if not len(bars): log.error(f"No quotes could be found for {symbol}?") return @@ -198,7 +199,7 @@ def record(config, rate, name, dhost, filename): return async def main(tries): - async with maybe_spawn_brokerd_as_subactor( + async with maybe_spawn_brokerd( tries=tries, loglevel=loglevel ) as portal: # run app "main" diff --git a/piker/brokers/core.py b/piker/brokers/core.py index d2c958e6..6162b4e5 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -1,23 +1,18 @@ """ -Broker high level API layer. +Broker high level cross-process API layer. + +This API should be kept "remote service compatible" meaning inputs to +routines should be primitive data types where possible. """ import inspect from types import ModuleType from typing import List, Dict, Any, Optional -from async_generator import asynccontextmanager -import tractor - from ..log import get_logger -from .data import DataFeed from . import get_brokermod -log = get_logger('broker.core') -_data_mods = [ - 'piker.brokers.core', - 'piker.brokers.data', -] +log = get_logger(__name__) async def api(brokername: str, methname: str, **kwargs) -> dict: @@ -25,12 +20,11 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: """ brokermod = get_brokermod(brokername) async with brokermod.get_client() as client: - - meth = getattr(client.api, methname, None) + meth = getattr(client, methname, None) if meth is None: log.warning( f"Couldn't find API method {methname} looking up on client") - meth = getattr(client, methname, None) + meth = getattr(client.api, methname, None) if meth is None: log.error(f"No api method `{methname}` could be found?") @@ -48,24 +42,6 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: return await meth(**kwargs) -@asynccontextmanager -async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None): - """If no ``brokerd`` daemon-actor can be found spawn one in a - local subactor. - """ - async with tractor.open_nursery() as nursery: - async with tractor.find_actor('brokerd') as portal: - if not portal: - log.info( - "No broker daemon could be found, spawning brokerd..") - portal = await nursery.start_actor( - 'brokerd', - rpc_module_paths=_data_mods, - loglevel=loglevel, - ) - yield portal - - async def stocks_quote( brokermod: ModuleType, tickers: List[str] @@ -121,3 +97,26 @@ async def bars( """ async with brokermod.get_client() as client: return await client.bars(symbol, **kwargs) + + +async def symbol_info( + brokermod: ModuleType, + symbol: str, + **kwargs, +) -> Dict[str, Dict[str, Dict[str, Any]]]: + """Return symbol info from broker. + """ + async with brokermod.get_client() as client: + return await client.symbol_info(symbol, **kwargs) + + +async def symbol_search( + brokermod: ModuleType, + pattern: str, + **kwargs, +) -> Dict[str, Dict[str, Dict[str, Any]]]: + """Return symbol info from broker. + """ + async with brokermod.get_client() as client: + # TODO: support multiple asset type concurrent searches. + return await client.search_stocks(pattern=pattern, **kwargs) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 48856739..15e90238 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -25,7 +25,7 @@ from ..log import get_logger, get_console_log from . import get_brokermod -log = get_logger('broker.data') +log = get_logger(__name__) async def wait_for_network( @@ -80,7 +80,7 @@ class BrokerFeed: @tractor.msg.pub(tasks=['stock', 'option']) -async def stream_requests( +async def stream_poll_requests( get_topics: typing.Callable, get_quotes: Coroutine, feed: BrokerFeed, @@ -90,6 +90,12 @@ async def stream_requests( """Stream requests for quotes for a set of symbols at the given ``rate`` (per second). + This routine is built for brokers who support quote polling for multiple + symbols per request. The ``get_topics()`` func is called to retreive the + set of symbols each iteration and ``get_quotes()`` is to retreive + the quotes. + + A stock-broker client ``get_quotes()`` async function must be provided which returns an async quote retrieval function. """ @@ -327,7 +333,7 @@ async def start_quote_stream( # push initial smoke quote response for client initialization await ctx.send_yield(payload) - await stream_requests( + await stream_poll_requests( # ``msg.pub`` required kwargs task_name=feed_type, @@ -394,15 +400,19 @@ class DataFeed: # subscribe for tickers (this performs a possible filtering # where invalid symbols are discarded) sd = await self.portal.run( - "piker.brokers.data", 'symbol_data', - broker=self.brokermod.name, tickers=symbols) + "piker.brokers.data", + 'symbol_data', + broker=self.brokermod.name, + tickers=symbols + ) self._symbol_data_cache.update(sd) if test: # stream from a local test file quote_gen = await self.portal.run( - "piker.brokers.data", 'stream_from_file', - filename=test + "piker.brokers.data", + 'stream_from_file', + filename=test, ) else: log.info(f"Starting new stream for {symbols}") diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index cc528f8a..1e7ceef5 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -8,7 +8,6 @@ import tractor from ..log import get_console_log, get_logger from ..brokers import get_brokermod, config -from ..brokers.core import _data_mods log = get_logger('cli') DEFAULT_BROKER = 'questrade' @@ -34,6 +33,7 @@ _context_defaults = dict( def pikerd(loglevel, host, tl): """Spawn the piker broker-daemon. """ + from ..data import _data_mods get_console_log(loglevel) tractor.run_daemon( rpc_module_paths=_data_mods, @@ -64,7 +64,12 @@ def cli(ctx, broker, loglevel, configdir): }) +def _load_clis() -> None: + from ..data import marketstore as _ + from ..brokers import cli as _ # noqa + from ..ui import cli as _ # noqa + from ..watchlists import cli as _ # noqa + + # load downstream cli modules -from ..brokers import cli as _ -from ..watchlists import cli as _ -from ..data import marketstore as _ +_load_clis() diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index b1c6463d..54dd0d7a 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -19,11 +19,11 @@ import trio import tractor from trio_websocket import open_websocket_url +from . import maybe_spawn_brokerd from ..cli import cli from .. import watchlists as wl from ..brokers.data import DataFeed from ..log import get_logger -from ..brokers.core import maybe_spawn_brokerd_as_subactor log = get_logger(__name__) @@ -138,7 +138,7 @@ def ingest(config, name, test_file, tl, url): symbols = watchlists[name] async def main(tries): - async with maybe_spawn_brokerd_as_subactor( + async with maybe_spawn_brokerd( tries=tries, loglevel=loglevel ) as portal: diff --git a/piker/ui/cli.py b/piker/ui/cli.py new file mode 100644 index 00000000..2be6d436 --- /dev/null +++ b/piker/ui/cli.py @@ -0,0 +1,124 @@ +""" +Console interface to UI components. +""" +from functools import partial +import os +import click +import tractor + +from ..cli import cli +from .. import watchlists as wl +from ..data import maybe_spawn_brokerd + + +_config_dir = click.get_app_dir('piker') +_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') + + +def _kivy_import_hack(): + # Command line hacks to make it work. + # See the pkg mod. + from .kivy import kivy # noqa + + +@cli.command() +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--rate', '-r', default=3, help='Quote rate limit') +@click.option('--test', '-t', help='Test quote stream file') +@click.option('--dhost', '-dh', default='127.0.0.1', + help='Daemon host address to connect to') +@click.argument('name', nargs=1, required=True) +@click.pass_obj +def monitor(config, rate, name, dhost, test, tl): + """Start a real-time watchlist UI + """ + # global opts + brokermod = config['brokermod'] + loglevel = config['loglevel'] + log = config['log'] + + watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) + watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) + tickers = watchlists[name] + if not tickers: + log.error(f"No symbols found for watchlist `{name}`?") + return + + _kivy_import_hack() + from .kivy.monitor import _async_main + + async def main(tries): + async with maybe_spawn_brokerd( + brokername=brokermod.name, + tries=tries, loglevel=loglevel + ) as portal: + # run app "main" + await _async_main( + name, portal, tickers, + brokermod, rate, test=test, + ) + + tractor.run( + partial(main, tries=1), + name='monitor', + loglevel=loglevel if tl else None, + rpc_module_paths=['piker.ui.kivy.monitor'], + start_method='forkserver', + ) + + +@cli.command() +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--date', '-d', help='Contracts expiry date') +@click.option('--test', '-t', help='Test quote stream file') +@click.option('--rate', '-r', default=1, help='Logging level') +@click.argument('symbol', required=True) +@click.pass_obj +def optschain(config, symbol, date, tl, rate, test): + """Start an option chain UI + """ + # global opts + loglevel = config['loglevel'] + brokername = config['broker'] + + _kivy_import_hack() + from .kivy.option_chain import _async_main + + async def main(tries): + async with maybe_spawn_brokerd( + tries=tries, loglevel=loglevel + ): + # run app "main" + await _async_main( + symbol, + brokername, + rate=rate, + loglevel=loglevel, + test=test, + ) + + tractor.run( + partial(main, tries=1), + name='kivy-options-chain', + loglevel=loglevel if tl else None, + start_method='forkserver', + ) + + +@cli.command() +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--date', '-d', help='Contracts expiry date') +@click.option('--test', '-t', help='Test quote stream file') +@click.option('--rate', '-r', default=1, help='Logging level') +@click.argument('symbol', required=True) +@click.pass_obj +def chart(config, symbol, date, tl, rate, test): + """Start an option chain UI + """ + from ._chart import main + + # global opts + loglevel = config['loglevel'] + brokername = config['broker'] + + main(sym=symbol, brokername=brokername, loglevel=loglevel)