diff --git a/piker/brokers/core.py b/piker/brokers/core.py index f92f3e53..f10524f5 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -6,7 +6,7 @@ import inspect from functools import partial import socket from types import ModuleType -from typing import Coroutine, Callable +from typing import Coroutine, Callable, List, Dict, Any import trio import tractor @@ -19,14 +19,21 @@ log = get_logger('broker.core') async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict: - """Make (proxy through) an api call by name and return its result. + """Make (proxy through) a broker API call by name and return its result. """ async with brokermod.get_client() as client: + meth = getattr(client.api, methname, None) + if meth is None: + log.warning( + "Couldn't find API method {methname} looking up on client") + meth = getattr(client, methname, None) + if meth is None: log.error(f"No api method `{methname}` could be found?") return - elif not kwargs: + + if not kwargs: # verify kwargs requirements are met sig = inspect.signature(meth) if sig.parameters: @@ -38,7 +45,10 @@ async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict: return await meth(**kwargs) -async def quote(brokermod: ModuleType, tickers: [str]) -> dict: +async def stocks_quote( + brokermod: ModuleType, + tickers: List[str] +) -> Dict[str, Dict[str, Any]]: """Return quotes dict for ``tickers``. """ async with brokermod.get_client() as client: @@ -74,7 +84,7 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: async def stream_quotes( brokermod: ModuleType, get_quotes: Coroutine, - tickers2chans: {str: tractor.Channel}, + tickers2chans: Dict[str, tractor.Channel], rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue cid: str = None, @@ -82,8 +92,8 @@ async def stream_quotes( """Stream quotes for a sequence of tickers at the given ``rate`` per second. - A broker-client ``quoter`` async context manager must be provided which - returns an async quote function. + A stock-broker client ``get_quotes()`` async context manager must be + provided which returns an async quote retrieval function. """ broker_limit = getattr(brokermod, '_rate_limit', float('inf')) if broker_limit < rate: @@ -133,7 +143,7 @@ async def stream_quotes( {'yield': {}, 'cid': cid} )['yield'][symbol] = quote - # deliver to each subscriber + # deliver to each subscriber (fan out) if chan_payloads: for chan, payload in chan_payloads.items(): try: @@ -147,6 +157,7 @@ async def stream_quotes( for chanset in tickers2chans.values(): chanset.discard((chan, cid)) + # latency monitoring req_time = round(postquote_start - prequote_start, 3) proc_time = round(time.time() - postquote_start, 3) tot = req_time + proc_time @@ -164,8 +175,7 @@ async def stream_quotes( async def get_cached_client(broker, tickers): - """Get the current actor's cached broker client if available or create a - new one. + """Get or create the current actor's cached broker client. """ # check if a cached client is in the local actor's statespace clients = tractor.current_actor().statespace.setdefault('clients', {}) @@ -232,6 +242,8 @@ async def smoke_quote(get_quotes, tickers, broker): def modify_quote_stream(broker, tickers, chan=None, cid=None): """Absolute symbol subscription list for each quote stream. + + Effectively a consumer subscription api. """ log.info(f"{chan} changed symbol subscription to {tickers}") ss = tractor.current_actor().statespace @@ -261,7 +273,8 @@ async def start_quote_stream( chan: tractor.Channel = None, cid: str = None, ) -> None: - """Handle per-broker quote stream subscriptions. + """Handle per-broker quote stream subscriptions using a "lazy" pub-sub + pattern. Spawns new quoter tasks for each broker backend on-demand. Since most brokers seems to support batch quote requests we @@ -286,7 +299,7 @@ async def start_quote_stream( log.info(f"Subscribing with existing `{broker}` daemon") tickers2chans = broker2tickersubs[broker] - # do a smoke quote (not this mutates the input list and filters out bad + # do a smoke quote (note this mutates the input list and filters out bad # symbols for now) payload = await smoke_quote(get_quotes, tickers, broker) # push initial smoke quote response for client initialization @@ -296,11 +309,9 @@ async def start_quote_stream( modify_quote_stream(broker, tickers, chan=chan, cid=cid) try: - if broker not in dtasks: # no quoter task yet - # task should begin on the next checkpoint/iteration - # with trio.open_cancel_scope(shield=True): + if broker not in dtasks: + # no quoter task yet so start a daemon task log.info(f"Spawning quoter task for {brokermod.name}") - # await actor._root_nursery.start(partial( async with trio.open_nursery() as nursery: nursery.start_soon(partial( stream_quotes, brokermod, get_quotes, tickers2chans, @@ -325,21 +336,3 @@ async def start_quote_stream( log.info(f"No more subscriptions for {broker}") broker2tickersubs.pop(broker, None) dtasks.discard(broker) - - -async def _test_price_stream(broker, symbols, *, chan=None, cid=None): - """Test function for initial tractor draft. - """ - brokermod = get_brokermod(broker) - client_cntxmng = brokermod.get_client() - client = await client_cntxmng.__aenter__() - get_quotes = await brokermod.quoter(client, symbols) - log.info(f"Spawning quoter task for {brokermod.name}") - assert chan - tickers2chans = {}.fromkeys(symbols, {(chan, cid), }) - - async with trio.open_nursery() as nursery: - nursery.start_soon( - partial( - stream_quotes, brokermod, get_quotes, tickers2chans, cid=cid) - ) diff --git a/piker/cli.py b/piker/cli.py index 717d2ef6..74bd1d69 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -90,7 +90,7 @@ def api(meth, kwargs, loglevel, broker, keys): help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--df-output', '-df', flag_value=True, - help='Ouput in `pandas.DataFrame` format') + help='Output in `pandas.DataFrame` format') @click.argument('tickers', nargs=-1, required=True) def quote(loglevel, broker, tickers, df_output): """Retreive symbol quotes on the console in either json or dataframe @@ -98,7 +98,7 @@ def quote(loglevel, broker, tickers, df_output): """ brokermod = get_brokermod(broker) get_console_log(loglevel) - quotes = trio.run(partial(core.quote, brokermod, tickers)) + quotes = trio.run(partial(core.stocks_quote, brokermod, tickers)) if not quotes: log.error(f"No quotes could be found for {tickers}?") return