diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 3c4cb944..340ef73a 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -25,7 +25,7 @@ log = get_logger('broker.data') async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: - """Wait until the network comes back up. + """Wait until the network (DNS) comes back up. """ down = False while True: @@ -220,29 +220,30 @@ async def get_cached_feed( ss = tractor.current_actor().statespace feeds = ss.setdefault('feeds', {'_lock': trio.Lock()}) lock = feeds['_lock'] + feed = None try: - try: - async with lock: - feed = feeds[brokername] - log.info(f"Subscribing with existing `{brokername}` daemon") - yield feed - except KeyError: - async with lock: - log.info(f"Creating new client for broker {brokername}") - brokermod = get_brokermod(brokername) - exit_stack = contextlib.AsyncExitStack() - client = await exit_stack.enter_async_context( - brokermod.get_client()) - feed = BrokerFeed( - mod=brokermod, - client=client, - exit_stack=exit_stack, - ) - feeds[brokername] = feed - yield feed + async with lock: + feed = feeds[brokername] + log.info(f"Subscribing with existing `{brokername}` daemon") + yield feed + except KeyError: + async with lock: + log.info(f"Creating new client for broker {brokername}") + brokermod = get_brokermod(brokername) + exit_stack = contextlib.AsyncExitStack() + client = await exit_stack.enter_async_context( + brokermod.get_client()) + feed = BrokerFeed( + mod=brokermod, + client=client, + exit_stack=exit_stack, + ) + feeds[brokername] = feed + yield feed finally: - # destroy the API client - await feed.exit_stack.aclose() + if feed is not None: + # destroy the API client + await feed.exit_stack.aclose() async def start_quote_stream( @@ -319,9 +320,18 @@ async def start_quote_stream( f"Terminating stream quoter task for {feed.mod.name}") +async def call_client( + broker: str, + methname: str, + **kwargs, +): + async with get_cached_feed(broker) as feed: + return await getattr(feed.client, methname)(**kwargs) + + class DataFeed: - """Data feed client for streaming symbol data from a (remote) - ``brokerd`` data daemon. + """Data feed client for streaming symbol data from and making API client calls + to a (remote) ``brokerd`` daemon. """ _allowed = ('stock', 'option') @@ -400,6 +410,17 @@ class DataFeed: ]) return records, displayables + async def call_client(self, method, **kwargs): + """Call a broker ``Client`` method using RPC and return result. + """ + return await self.portal.run( + 'piker.brokers.data', + 'call_client', + broker=self.brokermod.name, + methname=method, + **kwargs + ) + async def stream_to_file( watchlist_name: str,