Set statespace defaults in `get_cached_feed()`
parent
eee19048f0
commit
b8815cde4a
|
@ -269,9 +269,10 @@ async def get_cached_feed(
|
||||||
"""
|
"""
|
||||||
# check if a cached client is in the local actor's statespace
|
# check if a cached client is in the local actor's statespace
|
||||||
ss = tractor.current_actor().statespace
|
ss = tractor.current_actor().statespace
|
||||||
feeds = ss['feeds']
|
feeds = ss.setdefault('feeds', {'_lock': trio.Lock()})
|
||||||
lock = feeds['_lock']
|
lock = feeds['_lock']
|
||||||
feed_stack = ss['feed_stacks'][brokername]
|
feed_stacks = ss.setdefault('feed_stacks', {})
|
||||||
|
feed_stack = feed_stacks.setdefault(brokername, contextlib.AsyncExitStack())
|
||||||
async with lock:
|
async with lock:
|
||||||
try:
|
try:
|
||||||
feed = feeds[brokername]
|
feed = feeds[brokername]
|
||||||
|
@ -305,18 +306,15 @@ async def start_quote_stream(
|
||||||
Since most brokers seems to support batch quote requests we
|
Since most brokers seems to support batch quote requests we
|
||||||
limit to one task per process for now.
|
limit to one task per process for now.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
# set log level after fork
|
# set log level after fork
|
||||||
get_console_log(actor.loglevel)
|
get_console_log(actor.loglevel)
|
||||||
# pull global vars from local actor
|
# pull global vars from local actor
|
||||||
ss = actor.statespace
|
ss = actor.statespace
|
||||||
# broker2symbolsubs = ss.setdefault('broker2symbolsubs', {})
|
|
||||||
ss.setdefault('feeds', {'_lock': trio.Lock()})
|
|
||||||
feed_stacks = ss.setdefault('feed_stacks', {})
|
|
||||||
symbols = list(symbols)
|
symbols = list(symbols)
|
||||||
log.info(
|
log.info(
|
||||||
f"{chan.uid} subscribed to {broker} for symbols {symbols}")
|
f"{chan.uid} subscribed to {broker} for symbols {symbols}")
|
||||||
feed_stack = feed_stacks.setdefault(broker, contextlib.AsyncExitStack())
|
|
||||||
# another actor task may have already created it
|
# another actor task may have already created it
|
||||||
feed = await get_cached_feed(broker)
|
feed = await get_cached_feed(broker)
|
||||||
symbols2chans = feed.subscriptions[feed_type]
|
symbols2chans = feed.subscriptions[feed_type]
|
||||||
|
|
Loading…
Reference in New Issue