Drop `OptionChain.start_feed()`

kivy_mainline_and_py3.8
Tyler Goodlet 2018-12-25 12:38:04 -05:00
parent 6cc8b4cc2f
commit fb876f3770
1 changed files with 61 additions and 58 deletions

View File

@ -124,13 +124,15 @@ class StrikeRow(BoxLayout):
""" """
return int(self.strike) return int(self.strike)
def rowsitems(self):
return self._sub_rows.items()
class ExpiryButton(Cell): class ExpiryButton(Cell):
# must be set to allow 'plain bg colors' since default texture is grey # must be set to allow 'plain bg colors' since default texture is grey
background_normal = '' background_normal = ''
def on_press(self, value=None): def on_press(self, value=None):
# import pdb; pdb.set_trace()
last = self.chain._last_expiry last = self.chain._last_expiry
if last: if last:
last.click_toggle = False last.click_toggle = False
@ -146,24 +148,24 @@ class ExpiryButton(Cell):
class DataFeed(object): class DataFeed(object):
"""Data feed client for streaming symbol data from a remote """Data feed client for streaming symbol data from a (remote)
broker data source. ``brokerd`` data daemon.
""" """
def __init__(self, portal, brokermod): def __init__(self, portal, brokermod):
self.portal = portal self.portal = portal
self.brokermod = brokermod self.brokermod = brokermod
self.sub = None self._symbols = None
self.quote_gen = None self.quote_gen = None
self._mutex = trio.StrictFIFOLock() self._mutex = trio.StrictFIFOLock()
async def open_stream(self, symbols, rate=3, test=None): async def open_stream(self, symbols, rate=1, test=None):
async with self._mutex: async with self._mutex:
try: try:
if self.quote_gen is not None and symbols != self.sub: if self.quote_gen is not None and symbols != self._symbols:
log.info( log.info(
f"Stopping pre-existing subscription for {self.sub}") f"Stopping existing subscription for {self._symbols}")
await self.quote_gen.aclose() await self.quote_gen.aclose()
self.sub = symbols self._symbols = symbols
if test: if test:
# stream from a local test file # stream from a local test file
@ -172,7 +174,7 @@ class DataFeed(object):
filename=test filename=test
) )
else: else:
log.info(f"Starting new stream for {self.sub}") log.info(f"Starting new stream for {self._symbols}")
# start live streaming from broker daemon # start live streaming from broker daemon
quote_gen = await self.portal.run( quote_gen = await self.portal.run(
"piker.brokers.data", "piker.brokers.data",
@ -203,6 +205,8 @@ class DataFeed(object):
class OptionChain(object): class OptionChain(object):
"""A real-time options chain UI. """A real-time options chain UI.
""" """
_title = "option chain: {symbol}\t(press ? for help)"
def __init__( def __init__(
self, self,
symbol: str, symbol: str,
@ -210,7 +214,7 @@ class OptionChain(object):
widgets: dict, widgets: dict,
bidasks: Dict[str, List[str]], bidasks: Dict[str, List[str]],
feed: DataFeed, feed: DataFeed,
rate: int = 1, rate: int,
): ):
self.sub = (symbol, expiry) self.sub = (symbol, expiry)
self.widgets = widgets self.widgets = widgets
@ -220,15 +224,16 @@ class OptionChain(object):
self._update_nursery = None self._update_nursery = None
self.feed = feed self.feed = feed
self._update_cs = None self._update_cs = None
self._quote_gen = None
# TODO: this should be moved down to the data feed layer # TODO: this should be moved down to the data feed layer
# right now it's only needed for the UI uupdate loop to cancel itself # right now it's only needed for the UI uupdate loop to cancel itself
self._first_quotes = None self._first_quotes = None
self._last_expiry = None self._last_expiry = None
@asynccontextmanager @asynccontextmanager
async def open_scope(self): async def open_update_scope(self):
"""Open an internal resource and update task scope required """Open an internal update task scope required to allow
to allow for dynamic real-time operation. for dynamic real-time operation.
""" """
# assign us to each expiry button # assign us to each expiry button
for key, button in ( for key, button in (
@ -238,20 +243,24 @@ class OptionChain(object):
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
self._nursery = n self._nursery = n
n.start_soon(self.start_updating) n.start_soon(self._start_displaying, *self.sub)
yield self yield self
n.cancel_scope.cancel() n.cancel_scope.cancel()
self._nursery = None self._nursery = None
# make sure we always tear down our existing data feed
await self.feed.quote_gen.aclose() await self.feed.quote_gen.aclose()
def clear(self): def clear_strikes(self):
"""Clear the strike rows from the internal table. """Clear the strike rows from the internal table.
""" """
table = self.widgets['table'] table = self.widgets['table']
table.clear() table.clear()
self._strikes2rows.clear() self._strikes2rows.clear()
def clear_expiries(self):
pass
def render_rows(self, records, displayables): def render_rows(self, records, displayables):
"""Render all strike rows in the internal table. """Render all strike rows in the internal table.
""" """
@ -275,7 +284,7 @@ class OptionChain(object):
# using each contracts "symbol" so that the quote updater # using each contracts "symbol" so that the quote updater
# task can look up the right row to update easily # task can look up the right row to update easily
# See update_quotes() and ``Row`` internals for details. # See update_quotes() and ``Row`` internals for details.
for contract_type, row in strike_row._sub_rows.items(): for contract_type, row in strike_row.rowsitems():
symbol = row._last_record['symbol'] symbol = row._last_record['symbol']
table.symbols2rows[symbol] = row table.symbols2rows[symbol] = row
@ -285,38 +294,39 @@ class OptionChain(object):
log.debug("Finished rendering rows!") log.debug("Finished rendering rows!")
async def start_feed( async def _start_displaying(self, symbol, expiry):
self, """Main routine to start displaying the real time updated strike
symbol: str, table.
expiry: str,
# max QT rate per API customer is approx 4 rps Clear any existing data feed subscription that is no longer needed
# and usually 3 rps is allocated to the stock monitor (eg. when clicking a new expiry button) spin up a new subscription,
rate: int = 1, populate the table and start updating it.
test: str = None """
): # set window title
if self.feed.sub != self.sub: self.widgets['window'].set_title(
return await self.feed.open_stream([(symbol, expiry)], rate=rate) self._title.format(symbol=symbol)
else: )
feed = self.feed
return feed.quote_gen, feed.first_quotes
async def start_updating(self):
if self._update_cs: if self._update_cs:
log.warn("Cancelling existing update task") log.warn("Cancelling existing update task")
self._update_cs.cancel() self._update_cs.cancel()
await trio.sleep(0) await trio.sleep(0)
# drop all current rows if self._quote_gen:
self.clear() await self._quote_gen.aclose()
self.clear_strikes()
if self._nursery is None: if self._nursery is None:
raise RuntimeError( raise RuntimeError(
"You must call await `start()` first!") "You must call open this chain's update scope first!")
n = self._nursery n = self._nursery
log.debug(f"Waiting on first_quotes for {self.sub}") log.debug(f"Waiting on first_quotes for {symbol}:{expiry}")
quote_gen, first_quotes = await self.start_feed(*self.sub) self._quote_gen, first_quotes = await self.feed.open_stream(
log.debug(f"Got first_quotes for {self.sub}") [(symbol, expiry)]
)
log.debug(f"Got first_quotes for {symbol}:{expiry}")
# redraw the UI # redraw the UI
records, displayables = zip(*[ records, displayables = zip(*[
@ -333,7 +343,7 @@ class OptionChain(object):
n, n,
self.feed.brokermod.format_option_quote, self.feed.brokermod.format_option_quote,
self.widgets, self.widgets,
quote_gen, self._quote_gen,
symbol_data={}, symbol_data={},
first_quotes=first_quotes, first_quotes=first_quotes,
) )
@ -341,21 +351,26 @@ class OptionChain(object):
def start_displaying(self, symbol, expiry): def start_displaying(self, symbol, expiry):
self.sub = (symbol, expiry) self.sub = (symbol, expiry)
self._nursery.start_soon(self.start_updating) self._nursery.start_soon(self._start_displaying, symbol, expiry)
async def new_chain_ui( async def new_chain_ui(
portal: tractor._portal.Portal, portal: tractor._portal.Portal,
symbol: str, symbol: str,
expiry: str,
contracts,
brokermod: types.ModuleType, brokermod: types.ModuleType,
nursery: trio._core._run.Nursery, nursery: trio._core._run.Nursery,
rate: int = 1, rate: int = 2,
) -> None: ) -> None:
"""Create and return a new option chain UI. """Create and return a new option chain UI.
""" """
# retreive all contracts just because we need a default when the
# UI starts up
all_contracts = await contracts(brokermod, symbol)
# start streaming soonest contract by default
expiry = next(iter(all_contracts)).expiry
widgets = {} widgets = {}
# define bid-ask "stacked" cells # define bid-ask "stacked" cells
# (TODO: needs some rethinking and renaming for sure) # (TODO: needs some rethinking and renaming for sure)
bidasks = brokermod._option_bidasks bidasks = brokermod._option_bidasks
@ -369,17 +384,12 @@ async def new_chain_ui(
feed, feed,
rate=rate, rate=rate,
) )
quote_gen, first_quotes = await chain.feed.open_stream([chain.sub])
quote_gen, first_quotes = await chain.start_feed(symbol, expiry)
records, displayables = zip(*[ records, displayables = zip(*[
brokermod.format_option_quote(quote, {}) brokermod.format_option_quote(quote, {})
for quote in first_quotes.values() for quote in first_quotes.values()
]) ])
# build out root UI
title = f"option chain: {symbol}\t(press ? for help)"
Window.set_title(title)
# use `monitor` styling for now # use `monitor` styling for now
from .monitor import _kv from .monitor import _kv
Builder.load_string(_kv) Builder.load_string(_kv)
@ -390,7 +400,7 @@ async def new_chain_ui(
# TODO: figure out how to compact these buttons # TODO: figure out how to compact these buttons
expiries = { expiries = {
key.expiry: key.expiry[:key.expiry.find('T')] key.expiry: key.expiry[:key.expiry.find('T')]
for key in contracts for key in all_contracts
} }
expiry_buttons = Row( expiry_buttons = Row(
record=expiries, record=expiries,
@ -439,6 +449,7 @@ async def new_chain_ui(
) )
container.add_widget(pager) container.add_widget(pager)
widgets.update({ widgets.update({
'window': Window,
'root': container, 'root': container,
'container': container, 'container': container,
'table': table, 'table': table,
@ -459,24 +470,16 @@ async def _async_main(
This is started with cli cmd `piker options`. This is started with cli cmd `piker options`.
''' '''
# retreive all contracts just because we need a default when the
# UI starts up
all_contracts = await contracts(brokermod, symbol)
# start streaming soonest contract by default
first_expiry = next(iter(all_contracts)).expiry
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
# set up a pager view for large ticker lists # set up a pager view for large ticker lists
chain = await new_chain_ui( chain = await new_chain_ui(
portal, portal,
symbol, symbol,
first_expiry,
all_contracts,
brokermod, brokermod,
nursery, nursery,
rate=rate, rate=rate,
) )
async with chain.open_scope(): async with chain.open_update_scope():
try: try:
# Trio-kivy entry point. # Trio-kivy entry point.
await async_runTouchApp(chain.widgets['root']) # run kivy await async_runTouchApp(chain.widgets['root']) # run kivy