diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 8f321659..ff959ec0 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -112,11 +112,14 @@ async def stream_quotes( # TODO: at this point probably just just make this a class and # a lot of these functions should be methods. It will definitely # make stateful UI apps easier to implement -class DataFeed(typing.NamedTuple): - """A per broker "data feed" container. +class BrokerFeed(typing.NamedTuple): + """A per broker "client feed" container. A structure to keep track of components used by - real-time data daemons. + real-time data daemons. This is a backend "client" which pulls + data from broker specific data lakes: + + ``DataFeed`` <- tractor -> ``BrokerFeed`` <- broker IPC -> broker API """ mod: ModuleType client: object @@ -129,7 +132,7 @@ class DataFeed(typing.NamedTuple): async def fan_out_to_chans( - feed: DataFeed, + feed: BrokerFeed, get_quotes: Coroutine, symbols2chans: Dict[str, tractor.Channel], rate: int = 5, # delay between quote requests @@ -275,8 +278,8 @@ def modify_quote_stream(broker, feed_type, symbols, chan, cid): async def get_cached_feed( brokername: str, -) -> DataFeed: - """Get/create a ``DataFeed`` from/in the current actor. +) -> BrokerFeed: + """Get/create a ``BrokerFeed`` from/in the current actor. """ # check if a cached client is in the local actor's statespace ss = tractor.current_actor().statespace @@ -293,7 +296,7 @@ async def get_cached_feed( exit_stack = contextlib.AsyncExitStack() client = await exit_stack.enter_async_context( brokermod.get_client()) - feed = DataFeed( + feed = BrokerFeed( mod=brokermod, client=client, exit_stack=exit_stack, @@ -422,6 +425,89 @@ async def start_quote_stream( await feed.exit_stack.aclose() +class DataFeed(object): + """Data feed client for streaming symbol data from a (remote) + ``brokerd`` data daemon. + """ + _allowed = ('stock', 'option') + + def __init__(self, portal, brokermod): + self.portal = portal + self.brokermod = brokermod + self._quote_type = None + self._symbols = None + self.quote_gen = None + self._mutex = trio.StrictFIFOLock() + self._symbol_data_cache: Dict[str, Any] = {} + + async def open_stream(self, symbols, feed_type, rate=1, test=None): + if feed_type not in self._allowed: + raise ValueError(f"Only feed types {self._allowed} are supported") + + self._quote_type = feed_type + + async with self._mutex: + try: + if self.quote_gen is not None and symbols != self._symbols: + log.info( + f"Stopping existing subscription for {self._symbols}") + await self.quote_gen.aclose() + self._symbols = symbols + + if feed_type == 'stock' and not ( + all(symbol in self._symbol_data_cache + for symbol in symbols) + ): + # subscribe for tickers (this performs a possible filtering + # where invalid symbols are discarded) + sd = await self.portal.run( + "piker.brokers.data", 'symbol_data', + broker=self.brokermod.name, tickers=symbols) + self._symbol_data_cache.update(sd) + + if test: + # stream from a local test file + quote_gen = await self.portal.run( + "piker.brokers.data", 'stream_from_file', + filename=test + ) + else: + log.info(f"Starting new stream for {self._symbols}") + # start live streaming from broker daemon + quote_gen = await self.portal.run( + "piker.brokers.data", + 'start_quote_stream', + broker=self.brokermod.name, + symbols=symbols, + feed_type=feed_type, + rate=rate, + ) + + # get first quotes response + log.debug(f"Waiting on first quote for {symbols}...") + quotes = {} + # with trio.move_on_after(5): + quotes = await quote_gen.__anext__() + + self.quote_gen = quote_gen + self.first_quotes = quotes + return quote_gen, quotes + except Exception: + if self.quote_gen: + await self.quote_gen.aclose() + self.quote_gen = None + raise + + def format_quotes(self, quotes, symbol_data={}): + self._symbol_data_cache.update(symbol_data) + formatter = getattr(self.brokermod, f'format_{self._quote_type}_quote') + records, displayables = zip(*[ + formatter(quote, self._symbol_data_cache) + for quote in quotes.values() + ]) + return records, displayables + + async def stream_to_file( watchlist_name: str, filename: str, diff --git a/piker/ui/option_chain.py b/piker/ui/option_chain.py index 237c8431..d68808a9 100644 --- a/piker/ui/option_chain.py +++ b/piker/ui/option_chain.py @@ -18,6 +18,7 @@ from kivy.uix.label import Label from ..log import get_logger from ..brokers.core import contracts +from ..brokers.data import DataFeed from .pager import PagerView from .tabular import Row, HeaderCell, Cell, TickerTable @@ -146,67 +147,6 @@ class ExpiryButton(Cell): self.chain.start_displaying(self.chain.symbol, self.key) -class DataFeed(object): - """Data feed client for streaming symbol data from a (remote) - ``brokerd`` data daemon. - """ - def __init__(self, portal, brokermod): - self.portal = portal - self.brokermod = brokermod - self._symbols = None - self.quote_gen = None - self._mutex = trio.StrictFIFOLock() - - async def open_stream(self, symbols, rate=1, test=None): - async with self._mutex: - try: - if self.quote_gen is not None and symbols != self._symbols: - log.info( - f"Stopping existing subscription for {self._symbols}") - await self.quote_gen.aclose() - self._symbols = symbols - - if test: - # stream from a local test file - quote_gen = await self.portal.run( - "piker.brokers.data", 'stream_from_file', - filename=test - ) - else: - log.info(f"Starting new stream for {self._symbols}") - # start live streaming from broker daemon - quote_gen = await self.portal.run( - "piker.brokers.data", - 'start_quote_stream', - broker=self.brokermod.name, - symbols=symbols, - feed_type='option', - rate=rate, - ) - - # get first quotes response - log.debug(f"Waiting on first quote for {symbols}...") - quotes = {} - # with trio.move_on_after(5): - quotes = await quote_gen.__anext__() - - self.quote_gen = quote_gen - self.first_quotes = quotes - return quote_gen, quotes - except Exception: - if self.quote_gen: - await self.quote_gen.aclose() - self.quote_gen = None - raise - - def format_quotes(self, quotes): - records, displayables = zip(*[ - self.brokermod.format_option_quote(quote, {}) - for quote in quotes.values() - ]) - return records, displayables - - @asynccontextmanager async def find_local_monitor(): """Establish a portal to a local monitor for triggering @@ -276,7 +216,7 @@ class OptionChain(object): self._parent_nursery = nursery async with trio.open_nursery() as n: self._nursery = n - # fill out and start updatingn strike table + # fill out and start updating strike table n.start_soon( partial(self._start_displaying, symbol, expiry=expiry) ) @@ -450,7 +390,8 @@ class OptionChain(object): log.debug(f"Waiting on first_quotes for {symbol}:{expiry}") self._quote_gen, first_quotes = await self.feed.open_stream( - [(symbol, expiry)] + [(symbol, expiry)], + 'option', ) log.debug(f"Got first_quotes for {symbol}:{expiry}") records, displayables = self.feed.format_quotes(first_quotes)