Start brokers.api module

its_happening
Tyler Goodlet 2020-08-21 14:28:02 -04:00
parent f5ad56a257
commit bfdd2c43cc
2 changed files with 56 additions and 41 deletions

View File

@ -0,0 +1,53 @@
"""
Actor-aware broker agnostic interface.
"""
from contextlib import asynccontextmanager, AsyncExitStack
import trio
import tractor
from . import get_brokermod
from ..log import get_logger
log = get_logger(__name__)
@asynccontextmanager
async def get_cached_client(
brokername: str,
*args,
**kwargs,
) -> 'Client': # noqa
"""Get a cached broker client from the current actor's local vars.
If one has not been setup do it and cache it.
"""
# check if a cached client is in the local actor's statespace
ss = tractor.current_actor().statespace
clients = ss.setdefault('clients', {'_lock': trio.Lock()})
lock = clients['_lock']
client = None
try:
log.info(f"Loading existing `{brokername}` daemon")
async with lock:
client = clients[brokername]
client._consumers += 1
yield client
except KeyError:
log.info(f"Creating new client for broker {brokername}")
async with lock:
brokermod = get_brokermod(brokername)
exit_stack = AsyncExitStack()
client = await exit_stack.enter_async_context(
brokermod.get_client()
)
client._consumers = 0
client._exit_stack = exit_stack
clients[brokername] = client
yield client
finally:
client._consumers -= 1
if client._consumers <= 0:
# teardown the client
await client._exit_stack.aclose()

View File

@ -31,6 +31,8 @@ from ._util import resproc, BrokerError, SymbolNotFound
from ..log import get_logger, colorize_json, get_console_log
from .._async_utils import async_lifo_cache
from . import get_brokermod
from . import api
log = get_logger(__name__)
@ -1025,46 +1027,6 @@ def format_option_quote(
return new, displayable
@asynccontextmanager
async def get_cached_client(
brokername: str,
*args,
**kwargs,
) -> 'Client':
"""Get a cached broker client from the current actor's local vars.
If one has not been setup do it and cache it.
"""
# check if a cached client is in the local actor's statespace
ss = tractor.current_actor().statespace
clients = ss.setdefault('clients', {'_lock': trio.Lock()})
lock = clients['_lock']
client = None
try:
log.info(f"Loading existing `{brokername}` daemon")
async with lock:
client = clients[brokername]
client._consumers += 1
yield client
except KeyError:
log.info(f"Creating new client for broker {brokername}")
async with lock:
brokermod = get_brokermod(brokername)
exit_stack = contextlib.AsyncExitStack()
client = await exit_stack.enter_async_context(
brokermod.get_client()
)
client._consumers = 0
client._exit_stack = exit_stack
clients[brokername] = client
yield client
finally:
client._consumers -= 1
if client._consumers <= 0:
# teardown the client
await client._exit_stack.aclose()
async def smoke_quote(
get_quotes,
tickers
@ -1215,7 +1177,7 @@ async def stream_quotes(
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel)
async with get_cached_client('questrade') as client:
async with api.get_cached_client('questrade') as client:
if feed_type == 'stock':
formatter = format_stock_quote
get_quotes = await stock_quoter(client, symbols)