From 6c977cfb7b7d02d8fa20f0b305d7783ff3054c9c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 6 Jul 2018 17:25:40 -0400 Subject: [PATCH] Port broker core to tractor; fix chan unsub bug --- piker/brokers/core.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 506e1869..f92f3e53 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -9,10 +9,9 @@ from types import ModuleType from typing import Coroutine, Callable import trio +import tractor -from .. import tractor -from ..log import get_logger -from ..ipc import Channel +from ..log import get_logger, get_console_log from . import get_brokermod @@ -75,7 +74,7 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: async def stream_quotes( brokermod: ModuleType, get_quotes: Coroutine, - tickers2chans: {str: Channel}, + tickers2chans: {str: tractor.Channel}, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue cid: str = None, @@ -246,8 +245,10 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None): lambda ticker: ticker not in tickers, tickers2chans.copy() ): chanset = tickers2chans.get(ticker) - if chanset: - chanset.discard((chan, cid)) + # XXX: cid will be different on unsub call + for item in chanset.copy(): + if chan in item: + chanset.discard(item) if not chanset: # pop empty sets which will trigger bg quoter task termination @@ -257,7 +258,7 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None): async def start_quote_stream( broker: str, tickers: [str], - chan: 'Channel' = None, + chan: tractor.Channel = None, cid: str = None, ) -> None: """Handle per-broker quote stream subscriptions. @@ -266,8 +267,11 @@ async def start_quote_stream( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ + actor = tractor.current_actor() + # set log level after fork + get_console_log(actor.loglevel) # pull global vars from local actor - ss = tractor.current_actor().statespace + ss = actor.statespace broker2tickersubs = ss['broker2tickersubs'] clients = ss['clients'] dtasks = ss['dtasks']