Highlight current expiry; mutex data feed access

kivy_mainline_and_py3.8
Tyler Goodlet 2018-12-15 19:40:54 -05:00
parent 7ed409501d
commit a13b13e144
1 changed files with 67 additions and 42 deletions

View File

@ -6,7 +6,6 @@ Launch with ``piker options <symbol>``.
import types import types
from functools import partial from functools import partial
from typing import Dict, List from typing import Dict, List
# import typing
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
@ -30,7 +29,7 @@ async def modify_symbol(symbol):
pass pass
class StrikeCell(Cell): class StrikeCell(HeaderCell):
"""Strike cell""" """Strike cell"""
@ -61,9 +60,6 @@ class StrikeRow(BoxLayout):
table=None, table=None,
**kwargs, **kwargs,
) -> None: ) -> None:
# if self.is_populated():
# raise TypeError(f"{self} can only append two sub-rows?")
# the 'contract_type' determines whether this # the 'contract_type' determines whether this
# is a put or call row # is a put or call row
contract_type = record['contract_type'] contract_type = record['contract_type']
@ -75,6 +71,7 @@ class StrikeRow(BoxLayout):
# reverse order of call side cells # reverse order of call side cells
if contract_type == 'call': if contract_type == 'call':
record = dict(list(reversed(list(record.items())))) record = dict(list(reversed(list(record.items()))))
row = Row( row = Row(
record, record,
bidasks=bidasks, bidasks=bidasks,
@ -99,7 +96,7 @@ class StrikeRow(BoxLayout):
self.strike, StrikeCell( self.strike, StrikeCell(
key=self.strike, key=self.strike,
text=str(self.strike), text=str(self.strike),
is_header=True, # is_header=True,
# make centre strike cell nice and small # make centre strike cell nice and small
size_hint=(1/10., 1), size_hint=(1/10., 1),
) )
@ -122,10 +119,25 @@ class StrikeRow(BoxLayout):
self._sub_rows[record['contract_type']].update( self._sub_rows[record['contract_type']].update(
record, displayable) record, displayable)
def get_field(self, key):
"""Always sort on the lone field, the strike price.
"""
return int(self.strike)
class ExpiryButton(Cell):
# must be set to allow 'plain bg colors' since default texture is grey
background_normal = ''
class ExpiryButton(HeaderCell):
def on_press(self, value=None): def on_press(self, value=None):
# import pdb; pdb.set_trace()
last = self.chain._last_expiry
if last:
last.click_toggle = False
self.chain._last_expiry = self
log.info(f"Clicked {self}") log.info(f"Clicked {self}")
self.click_toggle = True
if self.chain.sub[1] == self.key: if self.chain.sub[1] == self.key:
log.info(f"Clicked {self} is already selected") log.info(f"Clicked {self} is already selected")
return return
@ -142,39 +154,50 @@ class DataFeed(object):
self.brokermod = brokermod self.brokermod = brokermod
self.sub = None self.sub = None
self.quote_gen = None self.quote_gen = None
self._mutex = trio.StrictFIFOLock()
async def open_stream(self, symbols, rate=3, test=None): async def open_stream(self, symbols, rate=3, test=None):
if self.quote_gen is not None and symbols != self.sub: async with self._mutex:
log.info(f"Stopping existing subscription for {self.sub}") try:
await self.quote_gen.aclose() if self.quote_gen is not None and symbols != self.sub:
self.sub = symbols log.info(
f"Stopping pre-existing subscription for {self.sub}")
await self.quote_gen.aclose()
self.sub = symbols
if test: if test:
# stream from a local test file # stream from a local test file
quote_gen = await self.portal.run( quote_gen = await self.portal.run(
"piker.brokers.data", 'stream_from_file', "piker.brokers.data", 'stream_from_file',
filename=test filename=test
) )
else: else:
# start live streaming from broker daemon log.info(f"Starting new stream for {self.sub}")
quote_gen = await self.portal.run( # start live streaming from broker daemon
"piker.brokers.data", quote_gen = await self.portal.run(
'start_quote_stream', "piker.brokers.data",
broker=self.brokermod.name, 'start_quote_stream',
symbols=symbols, broker=self.brokermod.name,
feed_type='option', symbols=symbols,
rate=rate, feed_type='option',
) rate=rate,
)
# get first quotes response # get first quotes response
log.debug(f"Waiting on first quote for {symbols}...") log.debug(f"Waiting on first quote for {symbols}...")
quotes = await quote_gen.__anext__() quotes = {}
with trio.move_on_after(5):
quotes = await quote_gen.__anext__()
self.quote_gen = quote_gen self.quote_gen = quote_gen
self.first_quotes = quotes self.first_quotes = quotes
# self.records = records # self.records = records
# self.displayables = displayables # self.displayables = displayables
return quote_gen, quotes return quote_gen, quotes
except Exception:
if self.quote_gen:
await self.quote_gen.aclose()
raise
class OptionChain(object): class OptionChain(object):
@ -194,11 +217,13 @@ class OptionChain(object):
self.bidasks = bidasks self.bidasks = bidasks
self._strikes2rows = {} self._strikes2rows = {}
self._nursery = None self._nursery = None
self._update_nursery = None
self.feed = feed self.feed = feed
self._update_cs = None self._update_cs = None
# TODO: this should be moved down to the data feed layer # TODO: this should be moved down to the data feed layer
# right now it's only needed for the UI uupdate loop to cancel itself # right now it's only needed for the UI uupdate loop to cancel itself
self._first_quotes = None self._first_quotes = None
self._last_expiry = None
@asynccontextmanager @asynccontextmanager
async def open_scope(self): async def open_scope(self):
@ -215,6 +240,7 @@ class OptionChain(object):
self._nursery = n self._nursery = n
n.start_soon(self.start_updating) n.start_soon(self.start_updating)
yield self yield self
n.cancel_scope.cancel()
self._nursery = None self._nursery = None
await self.feed.quote_gen.aclose() await self.feed.quote_gen.aclose()
@ -223,9 +249,8 @@ class OptionChain(object):
"""Clear the strike rows from the internal table. """Clear the strike rows from the internal table.
""" """
table = self.widgets['table'] table = self.widgets['table']
table.clear_widgets() table.clear()
for strike in self._strikes2rows.copy(): self._strikes2rows.clear()
self._strikes2rows.pop(strike)
def render_rows(self, records, displayables): def render_rows(self, records, displayables):
"""Render all strike rows in the internal table. """Render all strike rows in the internal table.
@ -255,8 +280,7 @@ class OptionChain(object):
table.symbols2rows[symbol] = row table.symbols2rows[symbol] = row
if strike not in self._strikes2rows: if strike not in self._strikes2rows:
# readding widgets is an error # re-adding widgets is an error
table.add_widget(strike_row)
self._strikes2rows[strike] = strike_row self._strikes2rows[strike] = strike_row
log.debug("Finished rendering rows!") log.debug("Finished rendering rows!")
@ -278,9 +302,11 @@ class OptionChain(object):
async def start_updating(self): async def start_updating(self):
if self._update_cs: if self._update_cs:
log.warn("Cancelling existing update task")
self._update_cs.cancel() self._update_cs.cancel()
await trio.sleep(0) await trio.sleep(0)
# drop all current rows
self.clear() self.clear()
if self._nursery is None: if self._nursery is None:
@ -301,11 +327,10 @@ class OptionChain(object):
with trio.open_cancel_scope() as cs: with trio.open_cancel_scope() as cs:
self._update_cs = cs self._update_cs = cs
# start quote update loop
await n.start( await n.start(
partial( partial(
update_quotes, update_quotes,
self._nursery, n,
self.feed.brokermod.format_option_quote, self.feed.brokermod.format_option_quote,
self.widgets, self.widgets,
quote_gen, quote_gen,