From 9e4786e62fc077d1d33b07551a7cd711beb7c8ff Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Dec 2018 13:04:05 -0500 Subject: [PATCH] Initial dynamic option chain UI draft There's still a ton to polish (and some bugs to fix) but this is a first working draft of a real-time option chain! Insights and todos: - `kivy` widgets need to be cached and reused (eg. rows, cells, etc.) for speed since it seems creating new ones constantly is quite taxing on the CPU - the chain will tear down and re-setup the option data feed stream each time a different contract expiry button set is clicked - there's still some weird bug with row highlighting where it seems rows added from a new expiry set (which weren't previously rendered) aren't being highlighted reliably --- piker/ui/monitor.py | 22 +- piker/ui/option_chain.py | 453 +++++++++++++++++++++++++++------------ 2 files changed, 333 insertions(+), 142 deletions(-) diff --git a/piker/ui/monitor.py b/piker/ui/monitor.py index de371c06..180c88c6 100644 --- a/piker/ui/monitor.py +++ b/piker/ui/monitor.py @@ -304,6 +304,9 @@ class Row(GridLayout, HoverBehavior): cell.key = key self._cell_widgets[key] = cell + def iter_cells(self): + return self._cell_widgets.items() + def get_cell(self, key): return self._cell_widgets.get(key) @@ -356,7 +359,7 @@ class Row(GridLayout, HoverBehavior): """Highlight layout on enter. """ log.debug( - f"Entered row {type(self)} through {self.border_point}") + f"Entered row {self} through {self.border_point}") # don't highlight header row if getattr(self, 'is_header', None): self.hovered = False @@ -365,7 +368,7 @@ class Row(GridLayout, HoverBehavior): """Un-highlight layout on exit. """ log.debug( - f"Left row {type(self)} through {self.border_point}") + f"Left row {self} through {self.border_point}") class TickerTable(GridLayout): @@ -424,7 +427,7 @@ class TickerTable(GridLayout): symbol name. Most naive algo possible for the moment. """ for symbol, row in self.symbols2rows.items(): - if patt in symbol: + if patt in symbol: yield symbol, row def get_row(self, symbol: str) -> Row: @@ -442,10 +445,12 @@ async def update_quotes( widgets: dict, agen: AsyncGeneratorType, symbol_data: dict, - first_quotes: dict + first_quotes: dict, + task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, ): """Process live quotes by updating ticker rows. """ + log.debug("Initializing UI update loop") table = widgets['table'] flash_keys = {'low', 'high'} @@ -521,9 +526,8 @@ async def update_quotes( color_row(row, record, {}) cache[sym] = row - # render all rows once up front - table.render_rows(cache) - + log.debug("Finished initializing update loop") + task_status.started() # real-time cell update loop async for quotes in agen: # new quotes data only for symbol, quote in quotes.items(): @@ -538,6 +542,7 @@ async def update_quotes( log.debug("Waiting on quotes") log.warn("Data feed connection dropped") + # XXX: if we're cancelled this should never get called nursery.cancel_scope.cancel() @@ -617,7 +622,8 @@ async def _async_main( for ticker_record in first_quotes: table.append_row( ticker_record['symbol'], - Row(ticker_record, headers=('symbol',), bidasks=bidasks, table=table) + Row(ticker_record, headers=('symbol',), + bidasks=bidasks, table=table) ) # associate the col headers row with the ticker table even though diff --git a/piker/ui/option_chain.py b/piker/ui/option_chain.py index 5e1764cf..9a8b4d74 100644 --- a/piker/ui/option_chain.py +++ b/piker/ui/option_chain.py @@ -5,8 +5,11 @@ Launch with ``piker options ``. """ import types from functools import partial +from typing import Dict, List +# import typing import trio +from async_generator import asynccontextmanager import tractor from kivy.uix.boxlayout import BoxLayout from kivy.lang import Builder @@ -27,129 +30,328 @@ async def modify_symbol(symbol): pass -class ExpiryButton(HeaderCell): - def on_press(self, value=None): - log.info(f"Clicked {self}") - - class StrikeCell(Cell): """Strike cell""" _no_display = ['symbol', 'contract_type', 'strike', 'time', 'open'] +_strike_row_cache = {} +_strike_cell_cache = {} class StrikeRow(BoxLayout): """A 'row' composed of two ``Row``s sandwiching a ``StrikeCell`. """ + _row_cache = {} + def __init__(self, strike, **kwargs): super().__init__(orientation='horizontal', **kwargs) self.strike = strike # store 2 rows: 1 for call, 1 for put self._sub_rows = {} - self.table = None + self._widgets_added = False def append_sub_row( self, record: dict, + displayable: dict, bidasks=None, headers=(), table=None, **kwargs, ) -> None: - if self.is_populated(): - raise TypeError(f"{self} can only append two sub-rows?") + # if self.is_populated(): + # raise TypeError(f"{self} can only append two sub-rows?") # the 'contract_type' determines whether this # is a put or call row contract_type = record['contract_type'] - # reverse order of call side cells - if contract_type == 'call': - record = dict(list(reversed(list(record.items())))) + # We want to only create a few ``Row`` widgets as possible to + # speed up rendering; we cache sub rows after creation. + row = self._row_cache.get((self.strike, contract_type)) + if not row: + # reverse order of call side cells + if contract_type == 'call': + record = dict(list(reversed(list(record.items())))) + row = Row( + record, + bidasks=bidasks, + headers=headers, + table=table, + no_cell=_no_display, + **kwargs + ) + self._row_cache[(self.strike, contract_type)] = row + else: + # must update the internal cells + row.update(record, displayable) - row = Row( - record, - bidasks=bidasks, - headers=headers, - table=table, - no_cell=_no_display, - **kwargs - ) # reassign widget for when rendered in the update loop row.widget = self self._sub_rows[contract_type] = row - if self.is_populated(): + + if self.is_populated() and not self._widgets_added: # calls on the left self.add_widget(self._sub_rows['call']) - # strikes in the middle - self.add_widget( - StrikeCell( + strike_cell = _strike_cell_cache.setdefault( + self.strike, StrikeCell( key=self.strike, text=str(self.strike), is_header=True, # make centre strike cell nice and small - size_hint=(1/8., 1), + size_hint=(1/10., 1), ) ) + # strikes in the middle + self.add_widget(strike_cell) # puts on the right self.add_widget(self._sub_rows['put']) + self._widgets_added = True def is_populated(self): """Bool determing if both a put and call subrow have beed appended. """ return len(self._sub_rows) == 2 + def has_widgets(self): + return self._widgets_added + def update(self, record, displayable): self._sub_rows[record['contract_type']].update( record, displayable) -async def _async_main( - symbol: str, +class ExpiryButton(HeaderCell): + def on_press(self, value=None): + log.info(f"Clicked {self}") + if self.chain.sub[1] == self.key: + log.info(f"Clicked {self} is already selected") + return + log.info(f"Subscribing for {self.chain.sub}") + self.chain.start_displaying(self.chain.sub[0], self.key) + + +class DataFeed(object): + """Data feed client for streaming symbol data from a remote + broker data source. + """ + def __init__(self, portal, brokermod): + self.portal = portal + self.brokermod = brokermod + self.sub = None + self.quote_gen = None + + async def open_stream(self, symbols, rate=3, test=None): + if self.quote_gen is not None and symbols != self.sub: + log.info(f"Stopping existing subscription for {self.sub}") + await self.quote_gen.aclose() + self.sub = symbols + + if test: + # stream from a local test file + quote_gen = await self.portal.run( + "piker.brokers.data", 'stream_from_file', + filename=test + ) + else: + # start live streaming from broker daemon + quote_gen = await self.portal.run( + "piker.brokers.data", + 'start_quote_stream', + broker=self.brokermod.name, + symbols=symbols, + feed_type='option', + rate=rate, + ) + + # get first quotes response + log.debug(f"Waiting on first quote for {symbols}...") + quotes = await quote_gen.__anext__() + + self.quote_gen = quote_gen + self.first_quotes = quotes + # self.records = records + # self.displayables = displayables + return quote_gen, quotes + + +class OptionChain(object): + """A real-time options chain UI. + """ + def __init__( + self, + symbol: str, + expiry: str, + widgets: dict, + bidasks: Dict[str, List[str]], + feed: DataFeed, + rate: int = 1, + ): + self.sub = (symbol, expiry) + self.widgets = widgets + self.bidasks = bidasks + self._strikes2rows = {} + self._nursery = None + self.feed = feed + self._update_cs = None + # TODO: this should be moved down to the data feed layer + # right now it's only needed for the UI uupdate loop to cancel itself + self._first_quotes = None + + @asynccontextmanager + async def open_scope(self): + """Open an internal resource and update task scope required + to allow for dynamic real-time operation. + """ + # assign us to each expiry button + for key, button in ( + self.widgets['expiry_buttons']._cell_widgets.items() + ): + button.chain = self + + async with trio.open_nursery() as n: + self._nursery = n + n.start_soon(self.start_updating) + yield self + + self._nursery = None + await self.feed.quote_gen.aclose() + + def clear(self): + """Clear the strike rows from the internal table. + """ + table = self.widgets['table'] + table.clear_widgets() + for strike in self._strikes2rows.copy(): + self._strikes2rows.pop(strike) + + def render_rows(self, records, displayables): + """Render all strike rows in the internal table. + """ + log.debug("Rendering rows") + table = self.widgets['table'] + for record, display in zip( + sorted(records, key=lambda q: q['strike']), + displayables + ): + strike = record['strike'] + strike_row = _strike_row_cache.setdefault( + strike, StrikeRow(strike)) + strike_row.append_sub_row( + record, + display, + bidasks=self.bidasks, + table=table, + ) + if strike_row.is_populated(): + # We must fill out the the table's symbol2rows manually + # using each contracts "symbol" so that the quote updater + # task can look up the right row to update easily + # See update_quotes() and ``Row`` internals for details. + for contract_type, row in strike_row._sub_rows.items(): + symbol = row._last_record['symbol'] + table.symbols2rows[symbol] = row + + if strike not in self._strikes2rows: + # readding widgets is an error + table.add_widget(strike_row) + self._strikes2rows[strike] = strike_row + + log.debug("Finished rendering rows!") + + async def start_feed( + self, + symbol: str, + expiry: str, + # max QT rate per API customer is approx 4 rps + # and usually 3 rps is allocated to the stock monitor + rate: int = 1, + test: str = None + ): + if self.feed.sub != self.sub: + return await self.feed.open_stream([(symbol, expiry)], rate=rate) + else: + feed = self.feed + return feed.quote_gen, feed.first_quotes + + async def start_updating(self): + if self._update_cs: + self._update_cs.cancel() + await trio.sleep(0) + + self.clear() + + if self._nursery is None: + raise RuntimeError( + "You must call await `start()` first!") + + n = self._nursery + log.debug(f"Waiting on first_quotes for {self.sub}") + quote_gen, first_quotes = await self.start_feed(*self.sub) + log.debug(f"Got first_quotes for {self.sub}") + + # redraw the UI + records, displayables = zip(*[ + self.feed.brokermod.format_option_quote(quote, {}) + for quote in first_quotes.values() + ]) + self.render_rows(records, displayables) + + with trio.open_cancel_scope() as cs: + self._update_cs = cs + # start quote update loop + await n.start( + partial( + update_quotes, + self._nursery, + self.feed.brokermod.format_option_quote, + self.widgets, + quote_gen, + symbol_data={}, + first_quotes=first_quotes, + ) + ) + + def start_displaying(self, symbol, expiry): + self.sub = (symbol, expiry) + self._nursery.start_soon(self.start_updating) + + +async def new_chain_ui( portal: tractor._portal.Portal, + symbol: str, + expiry: str, + contracts, brokermod: types.ModuleType, - rate: int = 4, - test: bool = False + nursery: trio._core._run.Nursery, + rate: int = 1, ) -> None: - '''Launch kivy app + all other related tasks. - - This is started with cli cmd `piker options`. - ''' - # retreive all contracts - all_contracts = await contracts(brokermod, symbol) - first_expiry = next(iter(all_contracts)).expiry - - if test: - # stream from a local test file - quote_gen = await portal.run( - "piker.brokers.data", 'stream_from_file', - filename=test - ) - else: - # start live streaming from broker daemon - quote_gen = await portal.run( - "piker.brokers.data", - 'start_quote_stream', - broker=brokermod.name, - symbols=[(symbol, first_expiry)], - feed_type='option', - ) - - # get first quotes response - log.debug("Waiting on first quote...") - quotes = await quote_gen.__anext__() - records, displayables = zip(*[ - brokermod.format_option_quote(quote, {}) - for quote in quotes.values() - ]) - + """Create and return a new option chain UI. + """ + widgets = {} # define bid-ask "stacked" cells # (TODO: needs some rethinking and renaming for sure) bidasks = brokermod._option_bidasks - # build out UI + feed = DataFeed(portal, brokermod) + chain = OptionChain( + symbol, + expiry, + widgets, + bidasks, + feed, + rate=rate, + ) + + quote_gen, first_quotes = await chain.start_feed(symbol, expiry) + records, displayables = zip(*[ + brokermod.format_option_quote(quote, {}) + for quote in first_quotes.values() + ]) + + # build out root UI title = f"option chain: {symbol}\t(press ? for help)" Window.set_title(title) @@ -163,7 +365,7 @@ async def _async_main( # TODO: figure out how to compact these buttons expiries = { key.expiry: key.expiry[:key.expiry.find('T')] - for key in all_contracts + for key in contracts } expiry_buttons = Row( record=expiries, @@ -182,94 +384,77 @@ async def _async_main( header_record['contract_type'] = 'put' header_row.append_sub_row( header_record, - headers=headers, - bidasks=bidasks, - is_header=True, - size_hint=(1, None), - - ) - header_record['contract_type'] = 'call' - header_row.append_sub_row( header_record, headers=headers, bidasks=bidasks, is_header=True, size_hint=(1, None), - + ) + header_record['contract_type'] = 'call' + header_row.append_sub_row( + header_record, + header_record, + headers=headers, + bidasks=bidasks, + is_header=True, + size_hint=(1, None), ) container.add_widget(header_row) - table = TickerTable( sort_key='strike', cols=1, size_hint=(1, None), ) header_row.table = table + table.bind(minimum_height=table.setter('height')) + pager = PagerView( + container=container, + contained=table, + nursery=nursery + ) + container.add_widget(pager) + widgets.update({ + 'root': container, + 'container': container, + 'table': table, + 'expiry_buttons': expiry_buttons, + 'pager': pager, + }) + return chain - strike_rows = {} - for record, display in zip(sorted( - records, - key=lambda q: q['strike'], - ), displayables): - strike = record['strike'] - strike_row = strike_rows.setdefault( - strike, StrikeRow(strike)) - strike_row.append_sub_row( - record, - bidasks=bidasks, - table=table, - ) - if strike_row.is_populated(): - # We must fill out the the table's symbol2rows manually - # using each contracts "symbol" so that the quote updater - # task can look up the right row to update easily - # See update_quotes() and ``Row`` for details. - for contract_type, row in strike_row._sub_rows.items(): - table.symbols2rows[row._last_record['symbol']] = row - table.append_row(symbol, strike_row) +async def _async_main( + symbol: str, + portal: tractor._portal.Portal, + brokermod: types.ModuleType, + rate: int = 1, + test: bool = False +) -> None: + '''Launch kivy app + all other related tasks. + + This is started with cli cmd `piker options`. + ''' + # retreive all contracts just because we need a default when the + # UI starts up + all_contracts = await contracts(brokermod, symbol) + # start streaming soonest contract by default + first_expiry = next(iter(all_contracts)).expiry async with trio.open_nursery() as nursery: # set up a pager view for large ticker lists - table.bind(minimum_height=table.setter('height')) - pager = PagerView( - container=container, - contained=table, - nursery=nursery + chain = await new_chain_ui( + portal, + symbol, + first_expiry, + all_contracts, + brokermod, + nursery, + rate=rate, ) - container.add_widget(pager) - widgets = { - 'root': container, - 'container': container, - 'table': table, - 'expiry_buttons': expiry_buttons, - 'pager': pager, - } - nursery.start_soon( - partial( - update_quotes, - nursery, - brokermod.format_option_quote, - widgets, - quote_gen, - symbol_data={}, - first_quotes=quotes, - ) - ) - try: - # Trio-kivy entry point. - await async_runTouchApp(widgets['root']) # run kivy - finally: - await quote_gen.aclose() # cancel aysnc gen call - # un-subscribe from symbols stream (cancel if brokerd - # was already torn down - say by SIGINT) - with trio.move_on_after(0.2): - await portal.run( - "piker.brokers.data", 'modify_quote_stream', - broker=brokermod.name, - feed_type='option', - symbols=[] - ) - - # cancel GUI update task - nursery.cancel_scope.cancel() + async with chain.open_scope(): + try: + # Trio-kivy entry point. + await async_runTouchApp(chain.widgets['root']) # run kivy + finally: + # cancel GUI update task + nursery.cancel_scope.cancel()