From 435b2a56e812f05503b489389e4a9080bf279ca0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 20 Feb 2019 21:39:57 -0500 Subject: [PATCH] Remove stream opening lock on `DataFeed` Fixes to `tractor` that resolve issues with async generators being non-task safe make the need for the mutex lock in `DataFeed.open_stream()` unnecessary. Also, don't bother pushing empty quotes from the publisher; avoids hitting the network when possible. --- piker/brokers/data.py | 118 +++++++++++++++++++++++------------------- 1 file changed, 64 insertions(+), 54 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 340ef73a..115b9ff7 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -1,5 +1,5 @@ """ -Live data feed machinery +Real-time data feed machinery """ import time from functools import partial @@ -9,7 +9,11 @@ import socket import json from types import ModuleType import typing -from typing import Coroutine, Callable, Dict, List, Any, Tuple +from typing import ( + Coroutine, Callable, Dict, + List, Any, Tuple, AsyncGenerator, + Sequence, +) import contextlib from operator import itemgetter @@ -77,7 +81,7 @@ async def stream_quotes( get_topics: typing.Callable, get_quotes: Coroutine, feed: BrokerFeed, - rate: int = 5, # delay between quote requests + rate: int = 3, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: """Stream quotes for a sequence of tickers at the given ``rate`` @@ -135,11 +139,12 @@ async def stream_quotes( # quote). new_quotes.setdefault(quote['key'], []).append(quote) else: - log.info(f"Delivering quotes:\n{quotes}") + # log.debug(f"Delivering quotes:\n{quotes}") for quote in quotes: new_quotes.setdefault(quote['key'], []).append(quote) - yield new_quotes + if new_quotes: + yield new_quotes # latency monitoring req_time = round(postquote_start - prequote_start, 3) @@ -341,65 +346,70 @@ class DataFeed: self._quote_type = None self._symbols = None self.quote_gen = None - self._mutex = trio.StrictFIFOLock() self._symbol_data_cache: Dict[str, Any] = {} - async def open_stream(self, symbols, feed_type, rate=1, test=None): + async def open_stream( + self, + symbols: Sequence[str], + feed_type: str, + rate: int = 1, + diff_cached: bool = True, + test: bool = None, + ) -> (AsyncGenerator, dict): if feed_type not in self._allowed: raise ValueError(f"Only feed types {self._allowed} are supported") self._quote_type = feed_type + try: + if self.quote_gen is not None and symbols != self._symbols: + log.info( + f"Stopping existing subscription for {self._symbols}") + await self.quote_gen.aclose() + self._symbols = symbols - async with self._mutex: - try: - if self.quote_gen is not None and symbols != self._symbols: - log.info( - f"Stopping existing subscription for {self._symbols}") - await self.quote_gen.aclose() - self._symbols = symbols + if feed_type == 'stock' and not ( + all(symbol in self._symbol_data_cache + for symbol in symbols) + ): + # subscribe for tickers (this performs a possible filtering + # where invalid symbols are discarded) + sd = await self.portal.run( + "piker.brokers.data", 'symbol_data', + broker=self.brokermod.name, tickers=symbols) + self._symbol_data_cache.update(sd) - if feed_type == 'stock' and not ( - all(symbol in self._symbol_data_cache - for symbol in symbols) - ): - # subscribe for tickers (this performs a possible filtering - # where invalid symbols are discarded) - sd = await self.portal.run( - "piker.brokers.data", 'symbol_data', - broker=self.brokermod.name, tickers=symbols) - self._symbol_data_cache.update(sd) + if test: + # stream from a local test file + quote_gen = await self.portal.run( + "piker.brokers.data", 'stream_from_file', + filename=test + ) + else: + log.info(f"Starting new stream for {symbols}") + # start live streaming from broker daemon + quote_gen = await self.portal.run( + "piker.brokers.data", + 'start_quote_stream', + broker=self.brokermod.name, + symbols=symbols, + feed_type=feed_type, + diff_cached=diff_cached, + rate=rate, + ) - if test: - # stream from a local test file - quote_gen = await self.portal.run( - "piker.brokers.data", 'stream_from_file', - filename=test - ) - else: - log.info(f"Starting new stream for {symbols}") - # start live streaming from broker daemon - quote_gen = await self.portal.run( - "piker.brokers.data", - 'start_quote_stream', - broker=self.brokermod.name, - symbols=symbols, - feed_type=feed_type, - rate=rate, - ) + # get first quotes response + log.debug(f"Waiting on first quote for {symbols}...") + quotes = {} + quotes = await quote_gen.__anext__() - # get first quotes response - log.debug(f"Waiting on first quote for {symbols}...") - quotes = {} - quotes = await quote_gen.__anext__() - - self.quote_gen = quote_gen - self.first_quotes = quotes - return quote_gen, quotes - except Exception: - if self.quote_gen: - await self.quote_gen.aclose() - self.quote_gen = None - raise + self.quote_gen = quote_gen + self.first_quotes = quotes + return quote_gen, quotes + except Exception: + if self.quote_gen: + await self.quote_gen.aclose() + self.quote_gen = None + raise def format_quotes(self, quotes, symbol_data={}): self._symbol_data_cache.update(symbol_data)