Add a `DataFeed.call_client()` method

Allows for calling an actor local broker client's methods from a remote
actor.
kivy_mainline_and_py3.8
Tyler Goodlet 2019-02-09 21:38:00 -05:00
parent 026b015627
commit f6230dd6df
1 changed files with 45 additions and 24 deletions

View File

@ -25,7 +25,7 @@ log = get_logger('broker.data')
async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
"""Wait until the network comes back up. """Wait until the network (DNS) comes back up.
""" """
down = False down = False
while True: while True:
@ -220,29 +220,30 @@ async def get_cached_feed(
ss = tractor.current_actor().statespace ss = tractor.current_actor().statespace
feeds = ss.setdefault('feeds', {'_lock': trio.Lock()}) feeds = ss.setdefault('feeds', {'_lock': trio.Lock()})
lock = feeds['_lock'] lock = feeds['_lock']
feed = None
try: try:
try: async with lock:
async with lock: feed = feeds[brokername]
feed = feeds[brokername] log.info(f"Subscribing with existing `{brokername}` daemon")
log.info(f"Subscribing with existing `{brokername}` daemon") yield feed
yield feed except KeyError:
except KeyError: async with lock:
async with lock: log.info(f"Creating new client for broker {brokername}")
log.info(f"Creating new client for broker {brokername}") brokermod = get_brokermod(brokername)
brokermod = get_brokermod(brokername) exit_stack = contextlib.AsyncExitStack()
exit_stack = contextlib.AsyncExitStack() client = await exit_stack.enter_async_context(
client = await exit_stack.enter_async_context( brokermod.get_client())
brokermod.get_client()) feed = BrokerFeed(
feed = BrokerFeed( mod=brokermod,
mod=brokermod, client=client,
client=client, exit_stack=exit_stack,
exit_stack=exit_stack, )
) feeds[brokername] = feed
feeds[brokername] = feed yield feed
yield feed
finally: finally:
# destroy the API client if feed is not None:
await feed.exit_stack.aclose() # destroy the API client
await feed.exit_stack.aclose()
async def start_quote_stream( async def start_quote_stream(
@ -319,9 +320,18 @@ async def start_quote_stream(
f"Terminating stream quoter task for {feed.mod.name}") f"Terminating stream quoter task for {feed.mod.name}")
async def call_client(
broker: str,
methname: str,
**kwargs,
):
async with get_cached_feed(broker) as feed:
return await getattr(feed.client, methname)(**kwargs)
class DataFeed: class DataFeed:
"""Data feed client for streaming symbol data from a (remote) """Data feed client for streaming symbol data from and making API client calls
``brokerd`` data daemon. to a (remote) ``brokerd`` daemon.
""" """
_allowed = ('stock', 'option') _allowed = ('stock', 'option')
@ -400,6 +410,17 @@ class DataFeed:
]) ])
return records, displayables return records, displayables
async def call_client(self, method, **kwargs):
"""Call a broker ``Client`` method using RPC and return result.
"""
return await self.portal.run(
'piker.brokers.data',
'call_client',
broker=self.brokermod.name,
methname=method,
**kwargs
)
async def stream_to_file( async def stream_to_file(
watchlist_name: str, watchlist_name: str,