diff --git a/piker/brokers/core.py b/piker/brokers/core.py index b493a9e7..2fe6b736 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -5,16 +5,25 @@ import inspect from types import ModuleType from typing import List, Dict, Any, Optional +from async_generator import asynccontextmanager +import tractor + from ..log import get_logger from .data import DataFeed +from . import get_brokermod log = get_logger('broker.core') +_data_mods = [ + 'piker.brokers.core', + 'piker.brokers.data', +] -async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict: +async def api(brokername: str, methname: str, **kwargs) -> dict: """Make (proxy through) a broker API call by name and return its result. """ + brokermod = get_brokermod(brokername) async with brokermod.get_client() as client: meth = getattr(client.api, methname, None) @@ -39,6 +48,24 @@ async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict: return await meth(**kwargs) +@asynccontextmanager +async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None): + """If no ``brokerd`` daemon-actor can be found spawn one in a + local subactor. + """ + async with tractor.open_nursery() as nursery: + async with tractor.find_actor('brokerd') as portal: + if not portal: + log.info( + "No broker daemon could be found, spawning brokerd..") + portal = await nursery.start_actor( + 'brokerd', + rpc_module_paths=_data_mods, + loglevel=loglevel, + ) + yield portal + + async def stocks_quote( brokermod: ModuleType, tickers: List[str]