From 4f387ea2bee8a3e7895d692c942dc126ee9f5d09 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Apr 2018 01:06:14 -0400 Subject: [PATCH] Fix subscriptions and connection handling Oh boy where to start. - Handle broken streams in the `StreamQueue` gracefully; terminate the async generator. - When a stream queue connection is unwritable discard its subscriptions inside the quoter task - If all subscriptions are discarded for a broker then tear down its quoter task - Use listener parent nursery for spawning quoter tasks - Make broker subs data structures global/shared between conn handler tasks - Register the `tickers2qs` entry *after* instantiating broker client(s) (avoids race condition when mulitple client connections are coming online simultaneously) - Push smoke quotes to every client not just the first that connects - Track quoter tasks in a cross-task set - Handle unsubscriptions more correctly --- piker/brokers/core.py | 195 +++++++++++++++++++++++++++--------------- 1 file changed, 124 insertions(+), 71 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index d3c6e6c0..7cb16002 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -82,8 +82,12 @@ class StreamQueue: """ unpacker = msgpack.Unpacker(raw=False) while True: - data = await self.stream.receive_some(2**10) - log.trace(f"Data is {data}") + try: + data = await self.stream.receive_some(2**10) + log.trace(f"Data is {data}") + except trio.BrokenStreamError: + log.error(f"Stream connection {self.peer} broke") + return if data == b'': log.debug("Stream connection was closed") @@ -118,7 +122,6 @@ class Client: self._sockaddr = sockaddr self._startup_seq = startup_seq self._autorecon = auto_reconnect - self.stream = None self.squeue = None async def connect(self, sockaddr: tuple = None, **kwargs): @@ -139,13 +142,15 @@ class Client: await self._reconnect() return await self.recv() + async def aclose(self, *args): + await self.squeue.stream.aclose() + async def __aenter__(self): await self.connect(self._sockaddr) return self async def __aexit__(self, *args): - await self.squeue.stream.__aexit__() - self.stream = None + await self.aclose(*args) async def _reconnect(self): """Handle connection failures by polling until a reconnect can be @@ -163,11 +168,12 @@ class Client: else: log.warn("Stream connection re-established!") break - except OSError: + except (OSError, ConnectionRefusedError): if not down: down = True log.warn( - "Connection went down, waiting for re-establishment") + f"Connection to {self._sockaddr} went down, waiting" + " for re-establishment") await trio.sleep(1) async def aiter_recv(self): @@ -244,7 +250,16 @@ async def stream_quotes( # deliver to each subscriber if q_payloads: for queue, payload in q_payloads.items(): - await queue.put(payload) + try: + await queue.put(payload) + except ( + # That's right, anything you can think of... + trio.ClosedStreamError, ConnectionResetError, + ConnectionRefusedError, + ): + log.warn(f"{queue.peer} went down?") + for qset in tickers2qs.values(): + qset.discard(queue) req_time = round(postquote_start - prequote_start, 3) proc_time = round(time.time() - postquote_start, 3) @@ -259,86 +274,124 @@ async def stream_quotes( log.debug(f"Sleeping for {delay}") await trio.sleep(delay) + if not any(tickers2qs.values()): + log.warn( + f"No subscriptions left, tearing down {brokermod.name} daemon") + break -async def start_quoter(stream: trio.SocketStream) -> None: + +async def start_quoter( + broker2tickersubs: dict, + clients: dict, + nursery: "Nusery", + stream: trio.SocketStream, +) -> None: """Handle per-broker quote stream subscriptions. Spawns new quoter tasks for each broker backend on-demand. + Since most brokers seems to support batch quote requests we + limit to one task per process for now. """ - broker2tickersubs = {} - tickers2qs = {} - clients = {} - + daemons = set() queue = StreamQueue(stream) # wrap in a shabby queue-like api - log.debug(f"Accepted new connection from {queue.peer}") - async with trio.open_nursery() as nursery: - async with queue.stream: - async for (broker, tickers) in queue: - log.info( - f"{queue.peer} subscribed to {broker} for tickers {tickers}") + log.info(f"Accepted new connection from {queue.peer}") + async with queue.stream: + async for broker, tickers in queue: + log.info( + f"{queue.peer} subscribed to {broker} for tickers {tickers}") - if broker not in broker2tickersubs: - tickers2qs = broker2tickersubs.setdefault( - broker, {}.fromkeys(tickers, {queue,})) - brokermod = get_brokermod(broker) - log.info(f"Spawning quote streamer for broker {broker}") + if broker not in broker2tickersubs: + brokermod = get_brokermod(broker) + log.info(f"Spawning quote streamer for broker {broker}") - # TODO: move to AsyncExitStack in 3.7 - client = await brokermod.get_client().__aenter__() - get_quotes = await brokermod.quoter(client, tickers) - else: - brokermod, client, get_quotes = clients[broker] - tickers2qs = broker2tickersubs[broker] - # update map from each symbol to requesting client's queue - for ticker in tickers: - tickers2qs.setdefault(ticker, set()).add(queue) - # remove stale ticker subscriptions - for ticker in set(tickers2qs) - set(tickers): - tickers2qs[ticker].remove(queue) - - # run a single quote filtering out any bad tickers - quotes = await get_quotes(tickers) - # pop any tickers that aren't returned in the first quote - for ticker in set(tickers) - set(quotes): - log.warn( - f"Symbol `{ticker}` not found by broker `{brokermod.name}`") - tickers2qs.pop(ticker) - - # pop any tickers that return "empty" quotes - payload = {} - for symbol, quote in quotes.items(): - if quote is None: - log.warn( - f"Symbol `{symbol}` not found by broker" - f" `{brokermod.name}`") - tickers2qs.pop(symbol, None) - continue - payload[symbol] = quote - - if broker not in clients: # no quoter task yet - clients[broker] = (brokermod, client, get_quotes) - # push initial quotes response for client initialization - await queue.put(payload) - - # task should begin on the next checkpoint/iteration - log.info(f"Spawning quoter task for {brokermod.name}") - nursery.start_soon( - stream_quotes, brokermod, get_quotes, tickers2qs) + # TODO: move to AsyncExitStack in 3.7 + client_cntxmng = brokermod.get_client() + client = await client_cntxmng.__aenter__() + get_quotes = await brokermod.quoter(client, tickers) + clients[broker] = ( + brokermod, client, client_cntxmng, get_quotes) + tickers2qs = broker2tickersubs.setdefault( + broker, {}.fromkeys(tickers, {queue,})) else: - log.info(f"{queue.peer} was disconnected") - nursery.cancel_scope.cancel() + log.info(f"Subscribing with existing `{broker}` daemon") + brokermod, client, _, get_quotes = clients[broker] + tickers2qs = broker2tickersubs[broker] + # update map from each symbol to requesting new client's queue + for ticker in tickers: + tickers2qs.setdefault(ticker, set()).add(queue) - # TODO: move to AsyncExitStack in 3.7 - for _, client, _ in clients.values(): - await client.__aexit__() + # beginning of section to be trimmed out with #37 + ################################################# + # get a single quote filtering out any bad tickers + # NOTE: this code is always run for every new client + # subscription even when a broker quoter task is already running + # since the new client needs to know what symbols are accepted + log.warn(f"Retrieving smoke quote for {queue.peer}") + quotes = await get_quotes(tickers) + # pop any tickers that aren't returned in the first quote + tickers = set(tickers) - set(quotes) + for ticker in tickers: + log.warn( + f"Symbol `{ticker}` not found by broker `{brokermod.name}`") + tickers2qs.pop(ticker, None) + + # pop any tickers that return "empty" quotes + payload = {} + for symbol, quote in quotes.items(): + if quote is None: + log.warn( + f"Symbol `{symbol}` not found by broker" + f" `{brokermod.name}`") + tickers2qs.pop(symbol, None) + continue + payload[symbol] = quote + + # push initial quotes response for client initialization + await queue.put(payload) + + # end of section to be trimmed out with #37 + ########################################### + + if broker not in daemons: # no quoter task yet + # task should begin on the next checkpoint/iteration + log.info(f"Spawning quoter task for {brokermod.name}") + nursery.start_soon( + stream_quotes, brokermod, get_quotes, tickers2qs) + daemons.add(broker) + + log.debug("Waiting on subscription request") + else: + log.info(f"client @ {queue.peer} disconnected") + # drop any lingering subscriptions + for ticker, qset in tickers2qs.items(): + qset.discard(queue) + + # if there are no more subscriptions with this broker + # drop from broker subs dict + if not any(tickers2qs.values()): + log.info(f"No more subscriptions for {broker}") + broker2tickersubs.pop(broker) + + # TODO: move to AsyncExitStack in 3.7 + for _, _, cntxmng, _ in clients.values(): + # FIXME: yes I know it's totally wrong... + await cntxmng.__aexit__(None, None, None) async def _daemon_main() -> None: """Entry point for the broker daemon which waits for connections before spawning micro-services. """ + # global space for broker-daemon subscriptions + broker2tickersubs = {} + clients = {} + async with trio.open_nursery() as nursery: listeners = await nursery.start( - partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1') + partial( + trio.serve_tcp, + partial(start_quoter, broker2tickersubs, clients, nursery), + 1616, host='127.0.0.1' + ) ) log.debug(f"Spawned {listeners}")