diff --git a/piker/ui/option_chain.py b/piker/ui/option_chain.py index d9d9bab6..00e5cf5a 100644 --- a/piker/ui/option_chain.py +++ b/piker/ui/option_chain.py @@ -124,13 +124,15 @@ class StrikeRow(BoxLayout): """ return int(self.strike) + def rowsitems(self): + return self._sub_rows.items() + class ExpiryButton(Cell): # must be set to allow 'plain bg colors' since default texture is grey background_normal = '' def on_press(self, value=None): - # import pdb; pdb.set_trace() last = self.chain._last_expiry if last: last.click_toggle = False @@ -146,24 +148,24 @@ class ExpiryButton(Cell): class DataFeed(object): - """Data feed client for streaming symbol data from a remote - broker data source. + """Data feed client for streaming symbol data from a (remote) + ``brokerd`` data daemon. """ def __init__(self, portal, brokermod): self.portal = portal self.brokermod = brokermod - self.sub = None + self._symbols = None self.quote_gen = None self._mutex = trio.StrictFIFOLock() - async def open_stream(self, symbols, rate=3, test=None): + async def open_stream(self, symbols, rate=1, test=None): async with self._mutex: try: - if self.quote_gen is not None and symbols != self.sub: + if self.quote_gen is not None and symbols != self._symbols: log.info( - f"Stopping pre-existing subscription for {self.sub}") + f"Stopping existing subscription for {self._symbols}") await self.quote_gen.aclose() - self.sub = symbols + self._symbols = symbols if test: # stream from a local test file @@ -172,7 +174,7 @@ class DataFeed(object): filename=test ) else: - log.info(f"Starting new stream for {self.sub}") + log.info(f"Starting new stream for {self._symbols}") # start live streaming from broker daemon quote_gen = await self.portal.run( "piker.brokers.data", @@ -203,6 +205,8 @@ class DataFeed(object): class OptionChain(object): """A real-time options chain UI. """ + _title = "option chain: {symbol}\t(press ? for help)" + def __init__( self, symbol: str, @@ -210,7 +214,7 @@ class OptionChain(object): widgets: dict, bidasks: Dict[str, List[str]], feed: DataFeed, - rate: int = 1, + rate: int, ): self.sub = (symbol, expiry) self.widgets = widgets @@ -220,15 +224,16 @@ class OptionChain(object): self._update_nursery = None self.feed = feed self._update_cs = None + self._quote_gen = None # TODO: this should be moved down to the data feed layer # right now it's only needed for the UI uupdate loop to cancel itself self._first_quotes = None self._last_expiry = None @asynccontextmanager - async def open_scope(self): - """Open an internal resource and update task scope required - to allow for dynamic real-time operation. + async def open_update_scope(self): + """Open an internal update task scope required to allow + for dynamic real-time operation. """ # assign us to each expiry button for key, button in ( @@ -238,20 +243,24 @@ class OptionChain(object): async with trio.open_nursery() as n: self._nursery = n - n.start_soon(self.start_updating) + n.start_soon(self._start_displaying, *self.sub) yield self n.cancel_scope.cancel() self._nursery = None + # make sure we always tear down our existing data feed await self.feed.quote_gen.aclose() - def clear(self): + def clear_strikes(self): """Clear the strike rows from the internal table. """ table = self.widgets['table'] table.clear() self._strikes2rows.clear() + def clear_expiries(self): + pass + def render_rows(self, records, displayables): """Render all strike rows in the internal table. """ @@ -275,7 +284,7 @@ class OptionChain(object): # using each contracts "symbol" so that the quote updater # task can look up the right row to update easily # See update_quotes() and ``Row`` internals for details. - for contract_type, row in strike_row._sub_rows.items(): + for contract_type, row in strike_row.rowsitems(): symbol = row._last_record['symbol'] table.symbols2rows[symbol] = row @@ -285,38 +294,39 @@ class OptionChain(object): log.debug("Finished rendering rows!") - async def start_feed( - self, - symbol: str, - expiry: str, - # max QT rate per API customer is approx 4 rps - # and usually 3 rps is allocated to the stock monitor - rate: int = 1, - test: str = None - ): - if self.feed.sub != self.sub: - return await self.feed.open_stream([(symbol, expiry)], rate=rate) - else: - feed = self.feed - return feed.quote_gen, feed.first_quotes + async def _start_displaying(self, symbol, expiry): + """Main routine to start displaying the real time updated strike + table. + + Clear any existing data feed subscription that is no longer needed + (eg. when clicking a new expiry button) spin up a new subscription, + populate the table and start updating it. + """ + # set window title + self.widgets['window'].set_title( + self._title.format(symbol=symbol) + ) - async def start_updating(self): if self._update_cs: log.warn("Cancelling existing update task") self._update_cs.cancel() await trio.sleep(0) - # drop all current rows - self.clear() + if self._quote_gen: + await self._quote_gen.aclose() + + self.clear_strikes() if self._nursery is None: raise RuntimeError( - "You must call await `start()` first!") + "You must call open this chain's update scope first!") n = self._nursery - log.debug(f"Waiting on first_quotes for {self.sub}") - quote_gen, first_quotes = await self.start_feed(*self.sub) - log.debug(f"Got first_quotes for {self.sub}") + log.debug(f"Waiting on first_quotes for {symbol}:{expiry}") + self._quote_gen, first_quotes = await self.feed.open_stream( + [(symbol, expiry)] + ) + log.debug(f"Got first_quotes for {symbol}:{expiry}") # redraw the UI records, displayables = zip(*[ @@ -333,7 +343,7 @@ class OptionChain(object): n, self.feed.brokermod.format_option_quote, self.widgets, - quote_gen, + self._quote_gen, symbol_data={}, first_quotes=first_quotes, ) @@ -341,21 +351,26 @@ class OptionChain(object): def start_displaying(self, symbol, expiry): self.sub = (symbol, expiry) - self._nursery.start_soon(self.start_updating) + self._nursery.start_soon(self._start_displaying, symbol, expiry) async def new_chain_ui( portal: tractor._portal.Portal, symbol: str, - expiry: str, - contracts, brokermod: types.ModuleType, nursery: trio._core._run.Nursery, - rate: int = 1, + rate: int = 2, ) -> None: """Create and return a new option chain UI. """ + # retreive all contracts just because we need a default when the + # UI starts up + all_contracts = await contracts(brokermod, symbol) + # start streaming soonest contract by default + expiry = next(iter(all_contracts)).expiry + widgets = {} + # define bid-ask "stacked" cells # (TODO: needs some rethinking and renaming for sure) bidasks = brokermod._option_bidasks @@ -369,17 +384,12 @@ async def new_chain_ui( feed, rate=rate, ) - - quote_gen, first_quotes = await chain.start_feed(symbol, expiry) + quote_gen, first_quotes = await chain.feed.open_stream([chain.sub]) records, displayables = zip(*[ brokermod.format_option_quote(quote, {}) for quote in first_quotes.values() ]) - # build out root UI - title = f"option chain: {symbol}\t(press ? for help)" - Window.set_title(title) - # use `monitor` styling for now from .monitor import _kv Builder.load_string(_kv) @@ -390,7 +400,7 @@ async def new_chain_ui( # TODO: figure out how to compact these buttons expiries = { key.expiry: key.expiry[:key.expiry.find('T')] - for key in contracts + for key in all_contracts } expiry_buttons = Row( record=expiries, @@ -439,6 +449,7 @@ async def new_chain_ui( ) container.add_widget(pager) widgets.update({ + 'window': Window, 'root': container, 'container': container, 'table': table, @@ -459,24 +470,16 @@ async def _async_main( This is started with cli cmd `piker options`. ''' - # retreive all contracts just because we need a default when the - # UI starts up - all_contracts = await contracts(brokermod, symbol) - # start streaming soonest contract by default - first_expiry = next(iter(all_contracts)).expiry - async with trio.open_nursery() as nursery: # set up a pager view for large ticker lists chain = await new_chain_ui( portal, symbol, - first_expiry, - all_contracts, brokermod, nursery, rate=rate, ) - async with chain.open_scope(): + async with chain.open_update_scope(): try: # Trio-kivy entry point. await async_runTouchApp(chain.widgets['root']) # run kivy