Remove stream opening lock on `DataFeed`

Fixes to `tractor` that resolve issues with async generators being
non-task safe make the need for the mutex lock in
`DataFeed.open_stream()` unnecessary. Also, don't bother pushing empty
quotes from the publisher; avoids hitting the network when possible.
kivy_mainline_and_py3.8
Tyler Goodlet 2019-02-20 21:39:57 -05:00
parent b2322d885c
commit 435b2a56e8
1 changed files with 64 additions and 54 deletions

View File

@ -1,5 +1,5 @@
""" """
Live data feed machinery Real-time data feed machinery
""" """
import time import time
from functools import partial from functools import partial
@ -9,7 +9,11 @@ import socket
import json import json
from types import ModuleType from types import ModuleType
import typing import typing
from typing import Coroutine, Callable, Dict, List, Any, Tuple from typing import (
Coroutine, Callable, Dict,
List, Any, Tuple, AsyncGenerator,
Sequence,
)
import contextlib import contextlib
from operator import itemgetter from operator import itemgetter
@ -77,7 +81,7 @@ async def stream_quotes(
get_topics: typing.Callable, get_topics: typing.Callable,
get_quotes: Coroutine, get_quotes: Coroutine,
feed: BrokerFeed, feed: BrokerFeed,
rate: int = 5, # delay between quote requests rate: int = 3, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None: ) -> None:
"""Stream quotes for a sequence of tickers at the given ``rate`` """Stream quotes for a sequence of tickers at the given ``rate``
@ -135,11 +139,12 @@ async def stream_quotes(
# quote). # quote).
new_quotes.setdefault(quote['key'], []).append(quote) new_quotes.setdefault(quote['key'], []).append(quote)
else: else:
log.info(f"Delivering quotes:\n{quotes}") # log.debug(f"Delivering quotes:\n{quotes}")
for quote in quotes: for quote in quotes:
new_quotes.setdefault(quote['key'], []).append(quote) new_quotes.setdefault(quote['key'], []).append(quote)
yield new_quotes if new_quotes:
yield new_quotes
# latency monitoring # latency monitoring
req_time = round(postquote_start - prequote_start, 3) req_time = round(postquote_start - prequote_start, 3)
@ -341,65 +346,70 @@ class DataFeed:
self._quote_type = None self._quote_type = None
self._symbols = None self._symbols = None
self.quote_gen = None self.quote_gen = None
self._mutex = trio.StrictFIFOLock()
self._symbol_data_cache: Dict[str, Any] = {} self._symbol_data_cache: Dict[str, Any] = {}
async def open_stream(self, symbols, feed_type, rate=1, test=None): async def open_stream(
self,
symbols: Sequence[str],
feed_type: str,
rate: int = 1,
diff_cached: bool = True,
test: bool = None,
) -> (AsyncGenerator, dict):
if feed_type not in self._allowed: if feed_type not in self._allowed:
raise ValueError(f"Only feed types {self._allowed} are supported") raise ValueError(f"Only feed types {self._allowed} are supported")
self._quote_type = feed_type self._quote_type = feed_type
try:
if self.quote_gen is not None and symbols != self._symbols:
log.info(
f"Stopping existing subscription for {self._symbols}")
await self.quote_gen.aclose()
self._symbols = symbols
async with self._mutex: if feed_type == 'stock' and not (
try: all(symbol in self._symbol_data_cache
if self.quote_gen is not None and symbols != self._symbols: for symbol in symbols)
log.info( ):
f"Stopping existing subscription for {self._symbols}") # subscribe for tickers (this performs a possible filtering
await self.quote_gen.aclose() # where invalid symbols are discarded)
self._symbols = symbols sd = await self.portal.run(
"piker.brokers.data", 'symbol_data',
broker=self.brokermod.name, tickers=symbols)
self._symbol_data_cache.update(sd)
if feed_type == 'stock' and not ( if test:
all(symbol in self._symbol_data_cache # stream from a local test file
for symbol in symbols) quote_gen = await self.portal.run(
): "piker.brokers.data", 'stream_from_file',
# subscribe for tickers (this performs a possible filtering filename=test
# where invalid symbols are discarded) )
sd = await self.portal.run( else:
"piker.brokers.data", 'symbol_data', log.info(f"Starting new stream for {symbols}")
broker=self.brokermod.name, tickers=symbols) # start live streaming from broker daemon
self._symbol_data_cache.update(sd) quote_gen = await self.portal.run(
"piker.brokers.data",
'start_quote_stream',
broker=self.brokermod.name,
symbols=symbols,
feed_type=feed_type,
diff_cached=diff_cached,
rate=rate,
)
if test: # get first quotes response
# stream from a local test file log.debug(f"Waiting on first quote for {symbols}...")
quote_gen = await self.portal.run( quotes = {}
"piker.brokers.data", 'stream_from_file', quotes = await quote_gen.__anext__()
filename=test
)
else:
log.info(f"Starting new stream for {symbols}")
# start live streaming from broker daemon
quote_gen = await self.portal.run(
"piker.brokers.data",
'start_quote_stream',
broker=self.brokermod.name,
symbols=symbols,
feed_type=feed_type,
rate=rate,
)
# get first quotes response self.quote_gen = quote_gen
log.debug(f"Waiting on first quote for {symbols}...") self.first_quotes = quotes
quotes = {} return quote_gen, quotes
quotes = await quote_gen.__anext__() except Exception:
if self.quote_gen:
self.quote_gen = quote_gen await self.quote_gen.aclose()
self.first_quotes = quotes self.quote_gen = None
return quote_gen, quotes raise
except Exception:
if self.quote_gen:
await self.quote_gen.aclose()
self.quote_gen = None
raise
def format_quotes(self, quotes, symbol_data={}): def format_quotes(self, quotes, symbol_data={}):
self._symbol_data_cache.update(symbol_data) self._symbol_data_cache.update(symbol_data)