From 54261ecc4c87a7f9f3cee1794e616397286b3d86 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Dec 2018 01:49:19 -0500 Subject: [PATCH] Refer to async exit stack via feed --- piker/brokers/data.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index e35b0dec..b6e5ab6a 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -116,6 +116,7 @@ class DataFeed(typing.NamedTuple): """ mod: ModuleType client: object + exit_stack: contextlib.AsyncExitStack quoter_keys: List[str] = ['stock', 'option'] tasks: Dict[str, trio._core._run.Task] = dict.fromkeys( quoter_keys, False) @@ -271,8 +272,6 @@ async def get_cached_feed( ss = tractor.current_actor().statespace feeds = ss.setdefault('feeds', {'_lock': trio.Lock()}) lock = feeds['_lock'] - feed_stacks = ss.setdefault('feed_stacks', {}) - feed_stack = feed_stacks.setdefault(brokername, contextlib.AsyncExitStack()) async with lock: try: feed = feeds[brokername] @@ -281,11 +280,13 @@ async def get_cached_feed( except KeyError: log.info(f"Creating new client for broker {brokername}") brokermod = get_brokermod(brokername) - client = await feed_stack.enter_async_context( + exit_stack = contextlib.AsyncExitStack() + client = await exit_stack.enter_async_context( brokermod.get_client()) feed = DataFeed( mod=brokermod, client=client, + exit_stack=exit_stack, ) feeds[brokername] = feed return feed @@ -306,7 +307,6 @@ async def start_quote_stream( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ - actor = tractor.current_actor() # set log level after fork get_console_log(actor.loglevel) @@ -376,7 +376,7 @@ async def start_quote_stream( # broker2symbolsubs.pop(broker, None) # destroy the API client - await feed_stack.aclose() + await feed.exit_stack.aclose() async def stream_to_file(