Port to new data apis

its_happening
Tyler Goodlet 2020-07-15 08:28:13 -04:00
parent b05a205d1b
commit d0a9afbb36
6 changed files with 49 additions and 55 deletions

View File

@ -14,7 +14,7 @@ import tractor
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger from ..log import get_console_log, colorize_json, get_logger
from ..brokers.core import maybe_spawn_brokerd_as_subactor from ..data import maybe_spawn_brokerd
from ..brokers import core, get_brokermod, data from ..brokers import core, get_brokermod, data
log = get_logger('cli') log = get_logger('cli')
@ -99,7 +99,7 @@ def quote(config, tickers, df_output):
@cli.command() @cli.command()
@click.option('--df-output', '-df', flag_value=True, @click.option('--df-output', '-df', flag_value=True,
help='Output in `pandas.DataFrame` format') help='Output in `pandas.DataFrame` format')
@click.option('--count', '-c', default=100, @click.option('--count', '-c', default=1000,
help='Number of bars to retrieve') help='Number of bars to retrieve')
@click.argument('symbol', required=True) @click.argument('symbol', required=True)
@click.pass_obj @click.pass_obj
@ -117,10 +117,11 @@ def bars(config, symbol, count, df_output):
brokermod, brokermod,
symbol, symbol,
count=count, count=count,
as_np=df_output
) )
) )
if not bars: if not len(bars):
log.error(f"No quotes could be found for {symbol}?") log.error(f"No quotes could be found for {symbol}?")
return return
@ -154,7 +155,7 @@ def record(config, rate, name, dhost, filename):
return return
async def main(tries): async def main(tries):
async with maybe_spawn_brokerd_as_subactor( async with maybe_spawn_brokerd(
tries=tries, loglevel=loglevel tries=tries, loglevel=loglevel
) as portal: ) as portal:
# run app "main" # run app "main"

View File

@ -2,25 +2,17 @@
Broker high level cross-process API layer. Broker high level cross-process API layer.
This API should be kept "remote service compatible" meaning inputs to This API should be kept "remote service compatible" meaning inputs to
routines here should be routines should be primitive data types where possible.
""" """
import inspect import inspect
from types import ModuleType from types import ModuleType
from typing import List, Dict, Any, Optional from typing import List, Dict, Any, Optional
from async_generator import asynccontextmanager
import tractor
from ..log import get_logger from ..log import get_logger
from .data import DataFeed
from . import get_brokermod from . import get_brokermod
log = get_logger('broker.core') log = get_logger(__name__)
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
]
async def api(brokername: str, methname: str, **kwargs) -> dict: async def api(brokername: str, methname: str, **kwargs) -> dict:
@ -28,12 +20,11 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
""" """
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
meth = getattr(client, methname, None)
meth = getattr(client.api, methname, None)
if meth is None: if meth is None:
log.debug( log.debug(
f"Couldn't find API method {methname} looking up on client") f"Couldn't find API method {methname} looking up on client")
meth = getattr(client, methname, None) meth = getattr(client.api, methname, None)
if meth is None: if meth is None:
log.error(f"No api method `{methname}` could be found?") log.error(f"No api method `{methname}` could be found?")
@ -51,24 +42,6 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
return await meth(**kwargs) return await meth(**kwargs)
@asynccontextmanager
async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None):
"""If no ``brokerd`` daemon-actor can be found spawn one in a
local subactor.
"""
async with tractor.open_nursery() as nursery:
async with tractor.find_actor('brokerd') as portal:
if not portal:
log.info(
"No broker daemon could be found, spawning brokerd..")
portal = await nursery.start_actor(
'brokerd',
rpc_module_paths=_data_mods,
loglevel=loglevel,
)
yield portal
async def stocks_quote( async def stocks_quote(
brokermod: ModuleType, brokermod: ModuleType,
tickers: List[str] tickers: List[str]
@ -139,11 +112,11 @@ async def symbol_info(
async def symbol_search( async def symbol_search(
brokermod: ModuleType, brokermod: ModuleType,
symbol: str, pattern: str,
**kwargs, **kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]: ) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return symbol info from broker. """Return symbol info from broker.
""" """
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
# TODO: support multiple asset type concurrent searches. # TODO: support multiple asset type concurrent searches.
return await client.search_stocks(symbol, **kwargs) return await client.search_stocks(pattern=pattern, **kwargs)

View File

@ -25,7 +25,7 @@ from ..log import get_logger, get_console_log
from . import get_brokermod from . import get_brokermod
log = get_logger('broker.data') log = get_logger(__name__)
async def wait_for_network( async def wait_for_network(
@ -80,7 +80,7 @@ class BrokerFeed:
@tractor.msg.pub(tasks=['stock', 'option']) @tractor.msg.pub(tasks=['stock', 'option'])
async def stream_requests( async def stream_poll_requests(
get_topics: typing.Callable, get_topics: typing.Callable,
get_quotes: Coroutine, get_quotes: Coroutine,
feed: BrokerFeed, feed: BrokerFeed,
@ -90,6 +90,12 @@ async def stream_requests(
"""Stream requests for quotes for a set of symbols at the given """Stream requests for quotes for a set of symbols at the given
``rate`` (per second). ``rate`` (per second).
This routine is built for brokers who support quote polling for multiple
symbols per request. The ``get_topics()`` func is called to retreive the
set of symbols each iteration and ``get_quotes()`` is to retreive
the quotes.
A stock-broker client ``get_quotes()`` async function must be A stock-broker client ``get_quotes()`` async function must be
provided which returns an async quote retrieval function. provided which returns an async quote retrieval function.
""" """
@ -327,7 +333,7 @@ async def start_quote_stream(
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
await ctx.send_yield(payload) await ctx.send_yield(payload)
await stream_requests( await stream_poll_requests(
# ``msg.pub`` required kwargs # ``msg.pub`` required kwargs
task_name=feed_type, task_name=feed_type,
@ -394,15 +400,19 @@ class DataFeed:
# subscribe for tickers (this performs a possible filtering # subscribe for tickers (this performs a possible filtering
# where invalid symbols are discarded) # where invalid symbols are discarded)
sd = await self.portal.run( sd = await self.portal.run(
"piker.brokers.data", 'symbol_data', "piker.brokers.data",
broker=self.brokermod.name, tickers=symbols) 'symbol_data',
broker=self.brokermod.name,
tickers=symbols
)
self._symbol_data_cache.update(sd) self._symbol_data_cache.update(sd)
if test: if test:
# stream from a local test file # stream from a local test file
quote_gen = await self.portal.run( quote_gen = await self.portal.run(
"piker.brokers.data", 'stream_from_file', "piker.brokers.data",
filename=test 'stream_from_file',
filename=test,
) )
else: else:
log.info(f"Starting new stream for {symbols}") log.info(f"Starting new stream for {symbols}")

View File

@ -8,7 +8,7 @@ import tractor
from ..log import get_console_log, get_logger from ..log import get_console_log, get_logger
from ..brokers import get_brokermod, config from ..brokers import get_brokermod, config
from ..brokers.core import _data_mods from ..data import maybe_spawn_brokerd
log = get_logger('cli') log = get_logger('cli')
DEFAULT_BROKER = 'questrade' DEFAULT_BROKER = 'questrade'
@ -35,6 +35,7 @@ _context_defaults = dict(
def pikerd(loglevel, host, tl): def pikerd(loglevel, host, tl):
"""Spawn the piker broker-daemon. """Spawn the piker broker-daemon.
""" """
from ..data import _data_mods
get_console_log(loglevel) get_console_log(loglevel)
tractor.run_daemon( tractor.run_daemon(
rpc_module_paths=_data_mods, rpc_module_paths=_data_mods,
@ -65,8 +66,12 @@ def cli(ctx, broker, loglevel, configdir):
}) })
# load downstream cli modules def _load_clis() -> None:
from ..data import marketstore as _
from ..brokers import cli as _ from ..brokers import cli as _
from ..ui import cli as _ from ..ui import cli as _
from ..watchlists import cli as _ from ..watchlists import cli as _
from ..data import marketstore as _
# load downstream cli modules
_load_clis()

View File

@ -19,11 +19,11 @@ import trio
import tractor import tractor
from trio_websocket import open_websocket_url from trio_websocket import open_websocket_url
from . import maybe_spawn_brokerd
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..brokers.data import DataFeed from ..brokers.data import DataFeed
from ..log import get_logger from ..log import get_logger
from ..brokers.core import maybe_spawn_brokerd_as_subactor
log = get_logger(__name__) log = get_logger(__name__)
@ -138,7 +138,7 @@ def ingest(config, name, test_file, tl, url):
symbols = watchlists[name] symbols = watchlists[name]
async def main(tries): async def main(tries):
async with maybe_spawn_brokerd_as_subactor( async with maybe_spawn_brokerd(
tries=tries, tries=tries,
loglevel=loglevel loglevel=loglevel
) as portal: ) as portal:

View File

@ -8,7 +8,7 @@ import tractor
from ..cli import cli from ..cli import cli
from .. import watchlists as wl from .. import watchlists as wl
from ..brokers.core import maybe_spawn_brokerd_as_subactor from ..data import maybe_spawn_brokerd
_config_dir = click.get_app_dir('piker') _config_dir = click.get_app_dir('piker')
@ -48,7 +48,8 @@ def monitor(config, rate, name, dhost, test, tl):
from .kivy.monitor import _async_main from .kivy.monitor import _async_main
async def main(tries): async def main(tries):
async with maybe_spawn_brokerd_as_subactor( async with maybe_spawn_brokerd(
brokername=brokermod.name,
tries=tries, loglevel=loglevel tries=tries, loglevel=loglevel
) as portal: ) as portal:
# run app "main" # run app "main"
@ -84,7 +85,7 @@ def optschain(config, symbol, date, tl, rate, test):
from .kivy.option_chain import _async_main from .kivy.option_chain import _async_main
async def main(tries): async def main(tries):
async with maybe_spawn_brokerd_as_subactor( async with maybe_spawn_brokerd(
tries=tries, loglevel=loglevel tries=tries, loglevel=loglevel
): ):
# run app "main" # run app "main"
@ -116,4 +117,8 @@ def chart(config, symbol, date, tl, rate, test):
""" """
from ._chart import main from ._chart import main
main(symbol) # global opts
loglevel = config['loglevel']
brokername = config['broker']
main(sym=symbol, brokername=brokername, loglevel=loglevel)