Refer to async exit stack via feed

kivy_mainline_and_py3.8
Tyler Goodlet 2018-12-10 01:49:19 -05:00
parent fb47ea2e5a
commit 54261ecc4c
1 changed files with 5 additions and 5 deletions

View File

@ -116,6 +116,7 @@ class DataFeed(typing.NamedTuple):
""" """
mod: ModuleType mod: ModuleType
client: object client: object
exit_stack: contextlib.AsyncExitStack
quoter_keys: List[str] = ['stock', 'option'] quoter_keys: List[str] = ['stock', 'option']
tasks: Dict[str, trio._core._run.Task] = dict.fromkeys( tasks: Dict[str, trio._core._run.Task] = dict.fromkeys(
quoter_keys, False) quoter_keys, False)
@ -271,8 +272,6 @@ async def get_cached_feed(
ss = tractor.current_actor().statespace ss = tractor.current_actor().statespace
feeds = ss.setdefault('feeds', {'_lock': trio.Lock()}) feeds = ss.setdefault('feeds', {'_lock': trio.Lock()})
lock = feeds['_lock'] lock = feeds['_lock']
feed_stacks = ss.setdefault('feed_stacks', {})
feed_stack = feed_stacks.setdefault(brokername, contextlib.AsyncExitStack())
async with lock: async with lock:
try: try:
feed = feeds[brokername] feed = feeds[brokername]
@ -281,11 +280,13 @@ async def get_cached_feed(
except KeyError: except KeyError:
log.info(f"Creating new client for broker {brokername}") log.info(f"Creating new client for broker {brokername}")
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
client = await feed_stack.enter_async_context( exit_stack = contextlib.AsyncExitStack()
client = await exit_stack.enter_async_context(
brokermod.get_client()) brokermod.get_client())
feed = DataFeed( feed = DataFeed(
mod=brokermod, mod=brokermod,
client=client, client=client,
exit_stack=exit_stack,
) )
feeds[brokername] = feed feeds[brokername] = feed
return feed return feed
@ -306,7 +307,6 @@ async def start_quote_stream(
Since most brokers seems to support batch quote requests we Since most brokers seems to support batch quote requests we
limit to one task per process for now. limit to one task per process for now.
""" """
actor = tractor.current_actor() actor = tractor.current_actor()
# set log level after fork # set log level after fork
get_console_log(actor.loglevel) get_console_log(actor.loglevel)
@ -376,7 +376,7 @@ async def start_quote_stream(
# broker2symbolsubs.pop(broker, None) # broker2symbolsubs.pop(broker, None)
# destroy the API client # destroy the API client
await feed_stack.aclose() await feed.exit_stack.aclose()
async def stream_to_file( async def stream_to_file(