Start brokers.api module
parent
fd21f4b0fe
commit
61e460a422
|
@ -0,0 +1,53 @@
|
||||||
|
"""
|
||||||
|
Actor-aware broker agnostic interface.
|
||||||
|
"""
|
||||||
|
from contextlib import asynccontextmanager, AsyncExitStack
|
||||||
|
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
from . import get_brokermod
|
||||||
|
from ..log import get_logger
|
||||||
|
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def get_cached_client(
|
||||||
|
brokername: str,
|
||||||
|
*args,
|
||||||
|
**kwargs,
|
||||||
|
) -> 'Client': # noqa
|
||||||
|
"""Get a cached broker client from the current actor's local vars.
|
||||||
|
|
||||||
|
If one has not been setup do it and cache it.
|
||||||
|
"""
|
||||||
|
# check if a cached client is in the local actor's statespace
|
||||||
|
ss = tractor.current_actor().statespace
|
||||||
|
clients = ss.setdefault('clients', {'_lock': trio.Lock()})
|
||||||
|
lock = clients['_lock']
|
||||||
|
client = None
|
||||||
|
try:
|
||||||
|
log.info(f"Loading existing `{brokername}` daemon")
|
||||||
|
async with lock:
|
||||||
|
client = clients[brokername]
|
||||||
|
client._consumers += 1
|
||||||
|
yield client
|
||||||
|
except KeyError:
|
||||||
|
log.info(f"Creating new client for broker {brokername}")
|
||||||
|
async with lock:
|
||||||
|
brokermod = get_brokermod(brokername)
|
||||||
|
exit_stack = AsyncExitStack()
|
||||||
|
client = await exit_stack.enter_async_context(
|
||||||
|
brokermod.get_client()
|
||||||
|
)
|
||||||
|
client._consumers = 0
|
||||||
|
client._exit_stack = exit_stack
|
||||||
|
clients[brokername] = client
|
||||||
|
yield client
|
||||||
|
finally:
|
||||||
|
client._consumers -= 1
|
||||||
|
if client._consumers <= 0:
|
||||||
|
# teardown the client
|
||||||
|
await client._exit_stack.aclose()
|
|
@ -31,6 +31,8 @@ from ._util import resproc, BrokerError, SymbolNotFound
|
||||||
from ..log import get_logger, colorize_json, get_console_log
|
from ..log import get_logger, colorize_json, get_console_log
|
||||||
from .._async_utils import async_lifo_cache
|
from .._async_utils import async_lifo_cache
|
||||||
from . import get_brokermod
|
from . import get_brokermod
|
||||||
|
from . import api
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -1024,46 +1026,6 @@ def format_option_quote(
|
||||||
return new, displayable
|
return new, displayable
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def get_cached_client(
|
|
||||||
brokername: str,
|
|
||||||
*args,
|
|
||||||
**kwargs,
|
|
||||||
) -> 'Client':
|
|
||||||
"""Get a cached broker client from the current actor's local vars.
|
|
||||||
|
|
||||||
If one has not been setup do it and cache it.
|
|
||||||
"""
|
|
||||||
# check if a cached client is in the local actor's statespace
|
|
||||||
ss = tractor.current_actor().statespace
|
|
||||||
clients = ss.setdefault('clients', {'_lock': trio.Lock()})
|
|
||||||
lock = clients['_lock']
|
|
||||||
client = None
|
|
||||||
try:
|
|
||||||
log.info(f"Loading existing `{brokername}` daemon")
|
|
||||||
async with lock:
|
|
||||||
client = clients[brokername]
|
|
||||||
client._consumers += 1
|
|
||||||
yield client
|
|
||||||
except KeyError:
|
|
||||||
log.info(f"Creating new client for broker {brokername}")
|
|
||||||
async with lock:
|
|
||||||
brokermod = get_brokermod(brokername)
|
|
||||||
exit_stack = contextlib.AsyncExitStack()
|
|
||||||
client = await exit_stack.enter_async_context(
|
|
||||||
brokermod.get_client()
|
|
||||||
)
|
|
||||||
client._consumers = 0
|
|
||||||
client._exit_stack = exit_stack
|
|
||||||
clients[brokername] = client
|
|
||||||
yield client
|
|
||||||
finally:
|
|
||||||
client._consumers -= 1
|
|
||||||
if client._consumers <= 0:
|
|
||||||
# teardown the client
|
|
||||||
await client._exit_stack.aclose()
|
|
||||||
|
|
||||||
|
|
||||||
async def smoke_quote(
|
async def smoke_quote(
|
||||||
get_quotes,
|
get_quotes,
|
||||||
tickers
|
tickers
|
||||||
|
@ -1214,7 +1176,7 @@ async def stream_quotes(
|
||||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||||
get_console_log(loglevel)
|
get_console_log(loglevel)
|
||||||
|
|
||||||
async with get_cached_client('questrade') as client:
|
async with api.get_cached_client('questrade') as client:
|
||||||
if feed_type == 'stock':
|
if feed_type == 'stock':
|
||||||
formatter = format_stock_quote
|
formatter = format_stock_quote
|
||||||
get_quotes = await stock_quoter(client, symbols)
|
get_quotes = await stock_quoter(client, symbols)
|
||||||
|
|
Loading…
Reference in New Issue