diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 56e99fac..e35b0dec 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -269,9 +269,10 @@ async def get_cached_feed( """ # check if a cached client is in the local actor's statespace ss = tractor.current_actor().statespace - feeds = ss['feeds'] + feeds = ss.setdefault('feeds', {'_lock': trio.Lock()}) lock = feeds['_lock'] - feed_stack = ss['feed_stacks'][brokername] + feed_stacks = ss.setdefault('feed_stacks', {}) + feed_stack = feed_stacks.setdefault(brokername, contextlib.AsyncExitStack()) async with lock: try: feed = feeds[brokername] @@ -305,18 +306,15 @@ async def start_quote_stream( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ + actor = tractor.current_actor() # set log level after fork get_console_log(actor.loglevel) # pull global vars from local actor ss = actor.statespace - # broker2symbolsubs = ss.setdefault('broker2symbolsubs', {}) - ss.setdefault('feeds', {'_lock': trio.Lock()}) - feed_stacks = ss.setdefault('feed_stacks', {}) symbols = list(symbols) log.info( f"{chan.uid} subscribed to {broker} for symbols {symbols}") - feed_stack = feed_stacks.setdefault(broker, contextlib.AsyncExitStack()) # another actor task may have already created it feed = await get_cached_feed(broker) symbols2chans = feed.subscriptions[feed_type]