From 063dfad5b4fdc8ed6c832818d368a78ff2073531 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Apr 2018 11:40:45 -0400 Subject: [PATCH] Make daemon registry cross-task --- piker/brokers/core.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 7cb16002..6797ead4 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -215,6 +215,10 @@ async def stream_quotes( while True: # use an event here to trigger exit? prequote_start = time.time() + if not any(tickers2qs.values()): + log.warn(f"No subs left for broker {brokermod.name}, exiting task") + break + tickers = list(tickers2qs.keys()) with trio.move_on_after(3) as cancel_scope: quotes = await get_quotes(tickers) @@ -274,15 +278,11 @@ async def stream_quotes( log.debug(f"Sleeping for {delay}") await trio.sleep(delay) - if not any(tickers2qs.values()): - log.warn( - f"No subscriptions left, tearing down {brokermod.name} daemon") - break - async def start_quoter( broker2tickersubs: dict, clients: dict, + dtasks: set, # daemon task registry nursery: "Nusery", stream: trio.SocketStream, ) -> None: @@ -292,7 +292,6 @@ async def start_quoter( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ - daemons = set() queue = StreamQueue(stream) # wrap in a shabby queue-like api log.info(f"Accepted new connection from {queue.peer}") async with queue.stream: @@ -352,12 +351,12 @@ async def start_quoter( # end of section to be trimmed out with #37 ########################################### - if broker not in daemons: # no quoter task yet + if broker not in dtasks: # no quoter task yet # task should begin on the next checkpoint/iteration log.info(f"Spawning quoter task for {brokermod.name}") nursery.start_soon( stream_quotes, brokermod, get_quotes, tickers2qs) - daemons.add(broker) + dtasks.add(broker) log.debug("Waiting on subscription request") else: @@ -370,7 +369,8 @@ async def start_quoter( # drop from broker subs dict if not any(tickers2qs.values()): log.info(f"No more subscriptions for {broker}") - broker2tickersubs.pop(broker) + broker2tickersubs.pop(broker, None) + dtasks.discard(broker) # TODO: move to AsyncExitStack in 3.7 for _, _, cntxmng, _ in clients.values(): @@ -385,12 +385,16 @@ async def _daemon_main() -> None: # global space for broker-daemon subscriptions broker2tickersubs = {} clients = {} + dtasks = set() async with trio.open_nursery() as nursery: listeners = await nursery.start( partial( trio.serve_tcp, - partial(start_quoter, broker2tickersubs, clients, nursery), + partial( + start_quoter, broker2tickersubs, clients, + dtasks, nursery + ), 1616, host='127.0.0.1' ) )