From 61e460a4223b2c60072bf95a75e70924f516645d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 Aug 2020 14:28:02 -0400 Subject: [PATCH] Start brokers.api module --- piker/brokers/api.py | 53 ++++++++++++++++++++++++++++++++++++++ piker/brokers/questrade.py | 44 +++---------------------------- 2 files changed, 56 insertions(+), 41 deletions(-) create mode 100644 piker/brokers/api.py diff --git a/piker/brokers/api.py b/piker/brokers/api.py new file mode 100644 index 00000000..29fe5577 --- /dev/null +++ b/piker/brokers/api.py @@ -0,0 +1,53 @@ +""" +Actor-aware broker agnostic interface. +""" +from contextlib import asynccontextmanager, AsyncExitStack + +import trio +import tractor + +from . import get_brokermod +from ..log import get_logger + + +log = get_logger(__name__) + + +@asynccontextmanager +async def get_cached_client( + brokername: str, + *args, + **kwargs, +) -> 'Client': # noqa + """Get a cached broker client from the current actor's local vars. + + If one has not been setup do it and cache it. + """ + # check if a cached client is in the local actor's statespace + ss = tractor.current_actor().statespace + clients = ss.setdefault('clients', {'_lock': trio.Lock()}) + lock = clients['_lock'] + client = None + try: + log.info(f"Loading existing `{brokername}` daemon") + async with lock: + client = clients[brokername] + client._consumers += 1 + yield client + except KeyError: + log.info(f"Creating new client for broker {brokername}") + async with lock: + brokermod = get_brokermod(brokername) + exit_stack = AsyncExitStack() + client = await exit_stack.enter_async_context( + brokermod.get_client() + ) + client._consumers = 0 + client._exit_stack = exit_stack + clients[brokername] = client + yield client + finally: + client._consumers -= 1 + if client._consumers <= 0: + # teardown the client + await client._exit_stack.aclose() diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 98989741..6063b9f6 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -31,6 +31,8 @@ from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json, get_console_log from .._async_utils import async_lifo_cache from . import get_brokermod +from . import api + log = get_logger(__name__) @@ -1024,46 +1026,6 @@ def format_option_quote( return new, displayable -@asynccontextmanager -async def get_cached_client( - brokername: str, - *args, - **kwargs, -) -> 'Client': - """Get a cached broker client from the current actor's local vars. - - If one has not been setup do it and cache it. - """ - # check if a cached client is in the local actor's statespace - ss = tractor.current_actor().statespace - clients = ss.setdefault('clients', {'_lock': trio.Lock()}) - lock = clients['_lock'] - client = None - try: - log.info(f"Loading existing `{brokername}` daemon") - async with lock: - client = clients[brokername] - client._consumers += 1 - yield client - except KeyError: - log.info(f"Creating new client for broker {brokername}") - async with lock: - brokermod = get_brokermod(brokername) - exit_stack = contextlib.AsyncExitStack() - client = await exit_stack.enter_async_context( - brokermod.get_client() - ) - client._consumers = 0 - client._exit_stack = exit_stack - clients[brokername] = client - yield client - finally: - client._consumers -= 1 - if client._consumers <= 0: - # teardown the client - await client._exit_stack.aclose() - - async def smoke_quote( get_quotes, tickers @@ -1214,7 +1176,7 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel) - async with get_cached_client('questrade') as client: + async with api.get_cached_client('questrade') as client: if feed_type == 'stock': formatter = format_stock_quote get_quotes = await stock_quoter(client, symbols)