Support monitor linked symbol selection

This allows for using a monitor to select the current option chain
symbol!

The deats:
- start a bg task which streams the monitor selected symbol
- dynamically repopulate expiry buttons on a newly published symbol
- move static widget creation into a chain method to avoid multiple
  quotes requests at startup
- rename a bunch of methods
kivy_mainline_and_py3.8
Tyler Goodlet 2018-12-30 15:00:46 -05:00
parent 152062ba8a
commit 72f417b9c2
1 changed files with 171 additions and 131 deletions

View File

@ -141,11 +141,7 @@ class ExpiryButton(Cell):
log.info(f"Clicked {self}") log.info(f"Clicked {self}")
self.click_toggle = True self.click_toggle = True
if self.chain.sub[1] == self.key: self.chain.start_displaying(self.chain.symbol, self.key)
log.info(f"Clicked {self} is already selected")
return
log.info(f"Subscribing for {self.chain.sub}")
self.chain.start_displaying(self.chain.sub[0], self.key)
class DataFeed(object): class DataFeed(object):
@ -194,14 +190,32 @@ class DataFeed(object):
self.quote_gen = quote_gen self.quote_gen = quote_gen
self.first_quotes = quotes self.first_quotes = quotes
# self.records = records
# self.displayables = displayables
return quote_gen, quotes return quote_gen, quotes
except Exception: except Exception:
if self.quote_gen: if self.quote_gen:
await self.quote_gen.aclose() await self.quote_gen.aclose()
self.quote_gen = None
raise raise
def format_quotes(self, quotes):
records, displayables = zip(*[
self.brokermod.format_option_quote(quote, {})
for quote in quotes.values()
])
return records, displayables
@asynccontextmanager
async def find_local_monitor():
"""Establish a portal to a local monitor for triggering
symbol changes.
"""
async with tractor.find_actor('monitor') as portal:
if not portal:
log.warn(
"No monitor app could be found, no symbol link established..")
yield portal
class OptionChain(object): class OptionChain(object):
"""A real-time options chain UI. """A real-time options chain UI.
@ -210,14 +224,13 @@ class OptionChain(object):
def __init__( def __init__(
self, self,
symbol: str,
expiry: str,
widgets: dict, widgets: dict,
bidasks: Dict[str, List[str]], bidasks: Dict[str, List[str]],
feed: DataFeed, feed: DataFeed,
rate: int, rate: int,
): ):
self.sub = (symbol, expiry) self.symbol = None
self.expiry = None
self.widgets = widgets self.widgets = widgets
self.bidasks = bidasks self.bidasks = bidasks
self._strikes2rows = {} self._strikes2rows = {}
@ -230,21 +243,35 @@ class OptionChain(object):
# right now it's only needed for the UI uupdate loop to cancel itself # right now it's only needed for the UI uupdate loop to cancel itself
self._first_quotes = None self._first_quotes = None
self._last_expiry = None self._last_expiry = None
# flag to determine if one-time widgets have been generated
self._static_widgets_initialized = False
async def _rx_symbols(self):
async with find_local_monitor() as portal:
if not portal:
log.warn("No local monitor could be found")
return
async for symbol in await portal.run(
'piker.ui.monitor',
'stream_symbol_selection',
):
log.info(f"Changing symbol subscriptions to {symbol}")
self.start_displaying(symbol, self.expiry)
@asynccontextmanager @asynccontextmanager
async def open_update_scope(self): async def open_rt_display(self, nursery, symbol, expiry=None):
"""Open an internal update task scope required to allow """Open an internal update task scope required to allow
for dynamic real-time operation. for dynamic real-time operation.
""" """
# assign us to each expiry button self._parent_nursery = nursery
for key, button in (
self.widgets['expiry_buttons']._cell_widgets.items()
):
button.chain = self
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
self._nursery = n self._nursery = n
n.start_soon(self._start_displaying, *self.sub) # fill out and start updatingn strike table
n.start_soon(
partial(self._start_displaying, symbol, expiry=expiry)
)
# listen for undlerlying symbol changes from a local monitor app
n.start_soon(self._rx_symbols)
yield self yield self
n.cancel_scope.cancel() n.cancel_scope.cancel()
@ -259,9 +286,6 @@ class OptionChain(object):
table.clear() table.clear()
self._strikes2rows.clear() self._strikes2rows.clear()
def clear_expiries(self):
pass
def render_rows(self, records, displayables): def render_rows(self, records, displayables):
"""Render all strike rows in the internal table. """Render all strike rows in the internal table.
""" """
@ -295,7 +319,7 @@ class OptionChain(object):
log.debug("Finished rendering rows!") log.debug("Finished rendering rows!")
async def _start_displaying(self, symbol, expiry): async def _start_displaying(self, symbol, expiry=None):
"""Main routine to start displaying the real time updated strike """Main routine to start displaying the real time updated strike
table. table.
@ -303,10 +327,34 @@ class OptionChain(object):
(eg. when clicking a new expiry button) spin up a new subscription, (eg. when clicking a new expiry button) spin up a new subscription,
populate the table and start updating it. populate the table and start updating it.
""" """
# set window title # redraw any symbol specific UI components
self.widgets['window'].set_title( if self.symbol != symbol or expiry is None:
self._title.format(symbol=symbol) # set window title
) self.widgets['window'].set_title(
self._title.format(symbol=symbol)
)
# retreive all contracts to populate expiry row
all_contracts = await contracts(self.feed.brokermod, symbol)
# start streaming soonest contract by default if not provided
expiry = next(iter(all_contracts)).expiry if not expiry else expiry
# TODO: figure out how to compact these buttons
expiries = {
key.expiry: key.expiry[:key.expiry.find('T')]
for key in all_contracts
}
expiry_row = self.widgets['expiry_row']
expiry_row.clear_widgets()
for expiry, justdate in expiries.items():
button = ExpiryButton(text=str(justdate), key=expiry)
# assign us to each expiry button
button.chain = self
expiry_row.add_widget(button)
if self.widgets.get('table'):
self.clear_strikes()
if self._update_cs: if self._update_cs:
log.warn("Cancelling existing update task") log.warn("Cancelling existing update task")
@ -316,24 +364,23 @@ class OptionChain(object):
if self._quote_gen: if self._quote_gen:
await self._quote_gen.aclose() await self._quote_gen.aclose()
self.clear_strikes()
if self._nursery is None: if self._nursery is None:
raise RuntimeError( raise RuntimeError(
"You must call open this chain's update scope first!") "You must call open this chain's update scope first!")
n = self._nursery
log.debug(f"Waiting on first_quotes for {symbol}:{expiry}") log.debug(f"Waiting on first_quotes for {symbol}:{expiry}")
self._quote_gen, first_quotes = await self.feed.open_stream( self._quote_gen, first_quotes = await self.feed.open_stream(
[(symbol, expiry)] [(symbol, expiry)]
) )
log.debug(f"Got first_quotes for {symbol}:{expiry}") log.debug(f"Got first_quotes for {symbol}:{expiry}")
records, displayables = self.feed.format_quotes(first_quotes)
# redraw the UI # draw static widgets only once
records, displayables = zip(*[ if self._static_widgets_initialized is False:
self.feed.brokermod.format_option_quote(quote, {}) self._init_static_widgets(displayables)
for quote in first_quotes.values() self._static_widgets_initialized = True
])
n = self._nursery
self.render_rows(records, displayables) self.render_rows(records, displayables)
with trio.open_cancel_scope() as cs: with trio.open_cancel_scope() as cs:
@ -349,10 +396,78 @@ class OptionChain(object):
first_quotes=first_quotes, first_quotes=first_quotes,
) )
) )
self.symbol, self.expiry = symbol, expiry
def start_displaying(self, symbol, expiry): def start_displaying(self, symbol, expiry):
self.sub = (symbol, expiry) if self.symbol == symbol and self.expiry == expiry:
self._nursery.start_soon(self._start_displaying, symbol, expiry) log.info(f"Clicked {symbol}:{expiry} is already selected")
return
log.info(f"Subscribing for {symbol}:{expiry}")
self._nursery.start_soon(
partial(self._start_displaying, symbol, expiry=expiry)
)
def _init_static_widgets(self, displayables):
assert self._static_widgets_initialized is False
container = self.widgets['container']
# calls / puts header
type_header = BoxLayout(
orientation='horizontal',
size_hint=(1, 1/30.),
)
calls = Label(text='calls', font_size='20')
puts = Label(text='puts', font_size='20')
type_header.add_widget(calls)
type_header.add_widget(puts)
container.add_widget(type_header)
# figure out header fields for each table based on quote keys
headers = displayables[0].keys()
header_row = StrikeRow(strike='strike', size_hint=(1, None))
header_record = {key: key for key in headers}
header_record['contract_type'] = 'put'
header_row.append_sub_row(
header_record,
header_record,
headers=headers,
bidasks=self.bidasks,
is_header=True,
size_hint=(1, None),
)
header_record['contract_type'] = 'call'
header_row.append_sub_row(
header_record,
header_record,
headers=headers,
bidasks=self.bidasks,
is_header=True,
size_hint=(1, None),
)
container.add_widget(header_row)
# build out chain tables
table = TickerTable(
sort_key='strike',
cols=1,
size_hint=(1, None),
)
header_row.table = table
table.bind(minimum_height=table.setter('height'))
pager = PagerView(
container=container,
contained=table,
nursery=self._nursery
)
container.add_widget(pager)
self.widgets.update({
'table': table,
'type_header': type_header,
'table': table,
'pager': pager,
})
async def new_chain_ui( async def new_chain_ui(
@ -364,33 +479,6 @@ async def new_chain_ui(
) -> None: ) -> None:
"""Create and return a new option chain UI. """Create and return a new option chain UI.
""" """
# retreive all contracts just because we need a default when the
# UI starts up
all_contracts = await contracts(brokermod, symbol)
# start streaming soonest contract by default
expiry = next(iter(all_contracts)).expiry
widgets = {}
# define bid-ask "stacked" cells
# (TODO: needs some rethinking and renaming for sure)
bidasks = brokermod._option_bidasks
feed = DataFeed(portal, brokermod)
chain = OptionChain(
symbol,
expiry,
widgets,
bidasks,
feed,
rate=rate,
)
quote_gen, first_quotes = await chain.feed.open_stream([chain.sub])
records, displayables = zip(*[
brokermod.format_option_quote(quote, {})
for quote in first_quotes.values()
])
# use `monitor` styling for now # use `monitor` styling for now
from .monitor import _kv from .monitor import _kv
Builder.load_string(_kv) Builder.load_string(_kv)
@ -398,78 +486,30 @@ async def new_chain_ui(
# the master container # the master container
container = BoxLayout(orientation='vertical', spacing=0) container = BoxLayout(orientation='vertical', spacing=0)
# TODO: figure out how to compact these buttons # expiry buttons row (populated later once contracts are retreived)
expiries = { expiry_row = BoxLayout(
key.expiry: key.expiry[:key.expiry.find('T')]
for key in all_contracts
}
expiry_buttons = Row(
record=expiries,
headers=expiries,
is_header=True,
size_hint=(1, None),
cell_type=ExpiryButton,
)
# top row of expiry buttons
container.add_widget(expiry_buttons)
# denote calls vs. puts side of table
type_header = BoxLayout(
orientation='horizontal', orientation='horizontal',
size_hint=(1, 1/28.), size_hint=(1, None),
) )
calls = Label(text='calls', font_size='20') container.add_widget(expiry_row)
puts = Label(text='puts', font_size='20')
type_header.add_widget(calls)
type_header.add_widget(puts)
container.add_widget(type_header)
# figure out header fields for each table based on quote keys widgets = {
headers = displayables[0].keys()
header_row = StrikeRow(strike='strike', size_hint=(1, None))
header_record = {key: key for key in headers}
header_record['contract_type'] = 'put'
header_row.append_sub_row(
header_record,
header_record,
headers=headers,
bidasks=bidasks,
is_header=True,
size_hint=(1, None),
)
header_record['contract_type'] = 'call'
header_row.append_sub_row(
header_record,
header_record,
headers=headers,
bidasks=bidasks,
is_header=True,
size_hint=(1, None),
)
container.add_widget(header_row)
# build out chain tables
table = TickerTable(
sort_key='strike',
cols=1,
size_hint=(1, None),
)
header_row.table = table
table.bind(minimum_height=table.setter('height'))
pager = PagerView(
container=container,
contained=table,
nursery=nursery
)
container.add_widget(pager)
widgets.update({
'window': Window, 'window': Window,
'root': container, 'root': container,
'container': container, 'container': container,
'table': table, 'expiry_row': expiry_row,
'expiry_buttons': expiry_buttons, }
'pager': pager, # define bid-ask "stacked" cells
}) # (TODO: needs some rethinking and renaming for sure)
bidasks = brokermod._option_bidasks
feed = DataFeed(portal, brokermod)
chain = OptionChain(
widgets,
bidasks,
feed,
rate=rate,
)
return chain return chain
@ -493,9 +533,9 @@ async def _async_main(
nursery, nursery,
rate=rate, rate=rate,
) )
async with chain.open_update_scope(): async with chain.open_rt_display(nursery, symbol):
try: try:
# Trio-kivy entry point. # trio-kivy entry point.
await async_runTouchApp(chain.widgets['root']) # run kivy await async_runTouchApp(chain.widgets['root']) # run kivy
finally: finally:
# cancel GUI update task # cancel GUI update task