diff --git a/piker/ui/option_chain.py b/piker/ui/option_chain.py index 9a8b4d74..d9d9bab6 100644 --- a/piker/ui/option_chain.py +++ b/piker/ui/option_chain.py @@ -6,7 +6,6 @@ Launch with ``piker options ``. import types from functools import partial from typing import Dict, List -# import typing import trio from async_generator import asynccontextmanager @@ -30,7 +29,7 @@ async def modify_symbol(symbol): pass -class StrikeCell(Cell): +class StrikeCell(HeaderCell): """Strike cell""" @@ -61,9 +60,6 @@ class StrikeRow(BoxLayout): table=None, **kwargs, ) -> None: - # if self.is_populated(): - # raise TypeError(f"{self} can only append two sub-rows?") - # the 'contract_type' determines whether this # is a put or call row contract_type = record['contract_type'] @@ -75,6 +71,7 @@ class StrikeRow(BoxLayout): # reverse order of call side cells if contract_type == 'call': record = dict(list(reversed(list(record.items())))) + row = Row( record, bidasks=bidasks, @@ -99,7 +96,7 @@ class StrikeRow(BoxLayout): self.strike, StrikeCell( key=self.strike, text=str(self.strike), - is_header=True, + # is_header=True, # make centre strike cell nice and small size_hint=(1/10., 1), ) @@ -122,10 +119,25 @@ class StrikeRow(BoxLayout): self._sub_rows[record['contract_type']].update( record, displayable) + def get_field(self, key): + """Always sort on the lone field, the strike price. + """ + return int(self.strike) + + +class ExpiryButton(Cell): + # must be set to allow 'plain bg colors' since default texture is grey + background_normal = '' -class ExpiryButton(HeaderCell): def on_press(self, value=None): + # import pdb; pdb.set_trace() + last = self.chain._last_expiry + if last: + last.click_toggle = False + self.chain._last_expiry = self + log.info(f"Clicked {self}") + self.click_toggle = True if self.chain.sub[1] == self.key: log.info(f"Clicked {self} is already selected") return @@ -142,39 +154,50 @@ class DataFeed(object): self.brokermod = brokermod self.sub = None self.quote_gen = None + self._mutex = trio.StrictFIFOLock() async def open_stream(self, symbols, rate=3, test=None): - if self.quote_gen is not None and symbols != self.sub: - log.info(f"Stopping existing subscription for {self.sub}") - await self.quote_gen.aclose() - self.sub = symbols + async with self._mutex: + try: + if self.quote_gen is not None and symbols != self.sub: + log.info( + f"Stopping pre-existing subscription for {self.sub}") + await self.quote_gen.aclose() + self.sub = symbols - if test: - # stream from a local test file - quote_gen = await self.portal.run( - "piker.brokers.data", 'stream_from_file', - filename=test - ) - else: - # start live streaming from broker daemon - quote_gen = await self.portal.run( - "piker.brokers.data", - 'start_quote_stream', - broker=self.brokermod.name, - symbols=symbols, - feed_type='option', - rate=rate, - ) + if test: + # stream from a local test file + quote_gen = await self.portal.run( + "piker.brokers.data", 'stream_from_file', + filename=test + ) + else: + log.info(f"Starting new stream for {self.sub}") + # start live streaming from broker daemon + quote_gen = await self.portal.run( + "piker.brokers.data", + 'start_quote_stream', + broker=self.brokermod.name, + symbols=symbols, + feed_type='option', + rate=rate, + ) - # get first quotes response - log.debug(f"Waiting on first quote for {symbols}...") - quotes = await quote_gen.__anext__() + # get first quotes response + log.debug(f"Waiting on first quote for {symbols}...") + quotes = {} + with trio.move_on_after(5): + quotes = await quote_gen.__anext__() - self.quote_gen = quote_gen - self.first_quotes = quotes - # self.records = records - # self.displayables = displayables - return quote_gen, quotes + self.quote_gen = quote_gen + self.first_quotes = quotes + # self.records = records + # self.displayables = displayables + return quote_gen, quotes + except Exception: + if self.quote_gen: + await self.quote_gen.aclose() + raise class OptionChain(object): @@ -194,11 +217,13 @@ class OptionChain(object): self.bidasks = bidasks self._strikes2rows = {} self._nursery = None + self._update_nursery = None self.feed = feed self._update_cs = None # TODO: this should be moved down to the data feed layer # right now it's only needed for the UI uupdate loop to cancel itself self._first_quotes = None + self._last_expiry = None @asynccontextmanager async def open_scope(self): @@ -215,6 +240,7 @@ class OptionChain(object): self._nursery = n n.start_soon(self.start_updating) yield self + n.cancel_scope.cancel() self._nursery = None await self.feed.quote_gen.aclose() @@ -223,9 +249,8 @@ class OptionChain(object): """Clear the strike rows from the internal table. """ table = self.widgets['table'] - table.clear_widgets() - for strike in self._strikes2rows.copy(): - self._strikes2rows.pop(strike) + table.clear() + self._strikes2rows.clear() def render_rows(self, records, displayables): """Render all strike rows in the internal table. @@ -255,8 +280,7 @@ class OptionChain(object): table.symbols2rows[symbol] = row if strike not in self._strikes2rows: - # readding widgets is an error - table.add_widget(strike_row) + # re-adding widgets is an error self._strikes2rows[strike] = strike_row log.debug("Finished rendering rows!") @@ -278,9 +302,11 @@ class OptionChain(object): async def start_updating(self): if self._update_cs: + log.warn("Cancelling existing update task") self._update_cs.cancel() await trio.sleep(0) + # drop all current rows self.clear() if self._nursery is None: @@ -301,11 +327,10 @@ class OptionChain(object): with trio.open_cancel_scope() as cs: self._update_cs = cs - # start quote update loop await n.start( partial( update_quotes, - self._nursery, + n, self.feed.brokermod.format_option_quote, self.widgets, quote_gen,