commit
e69f0b286c
|
@ -90,6 +90,7 @@ async def stream_quotes(
|
|||
new_quotes.append(quote)
|
||||
else:
|
||||
new_quotes = quotes
|
||||
log.info(f"Delivering quotes:\n{quotes}")
|
||||
|
||||
yield new_quotes
|
||||
|
||||
|
@ -108,6 +109,9 @@ async def stream_quotes(
|
|||
await trio.sleep(delay)
|
||||
|
||||
|
||||
# TODO: at this point probably just just make this a class and
|
||||
# a lot of these functions should be methods. It will definitely
|
||||
# make stateful UI apps easier to implement
|
||||
class DataFeed(typing.NamedTuple):
|
||||
"""A per broker "data feed" container.
|
||||
|
||||
|
@ -116,8 +120,9 @@ class DataFeed(typing.NamedTuple):
|
|||
"""
|
||||
mod: ModuleType
|
||||
client: object
|
||||
exit_stack: contextlib.AsyncExitStack
|
||||
quoter_keys: List[str] = ['stock', 'option']
|
||||
tasks: Dict[str, trio._core._run.Task] = dict.fromkeys(
|
||||
tasks: Dict[str, trio.Event] = dict.fromkeys(
|
||||
quoter_keys, False)
|
||||
quoters: Dict[str, typing.Coroutine] = {}
|
||||
subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}}
|
||||
|
@ -141,7 +146,9 @@ async def fan_out_to_chans(
|
|||
async def request():
|
||||
"""Get quotes for current symbol subscription set.
|
||||
"""
|
||||
return await get_quotes(list(symbols2chans.keys()))
|
||||
symbols = list(symbols2chans.keys())
|
||||
# subscription can be changed at any time
|
||||
return await get_quotes(symbols) if symbols else ()
|
||||
|
||||
async for quotes in stream_quotes(
|
||||
feed.mod, request, rate,
|
||||
|
@ -149,18 +156,16 @@ async def fan_out_to_chans(
|
|||
):
|
||||
chan_payloads = {}
|
||||
for quote in quotes:
|
||||
# is this too QT specific?
|
||||
symbol = quote['symbol']
|
||||
# set symbol quotes for each subscriber
|
||||
packet = {quote['symbol']: quote}
|
||||
for chan, cid in symbols2chans.get(quote['key'], set()):
|
||||
chan_payloads.setdefault(
|
||||
chan,
|
||||
(chan, cid),
|
||||
{'yield': {}, 'cid': cid}
|
||||
)['yield'][symbol] = quote
|
||||
)['yield'].update(packet)
|
||||
|
||||
# deliver to each subscriber (fan out)
|
||||
if chan_payloads:
|
||||
for chan, payload in chan_payloads.items():
|
||||
for (chan, cid), payload in chan_payloads.items():
|
||||
try:
|
||||
await chan.send(payload)
|
||||
except (
|
||||
|
@ -233,13 +238,19 @@ async def smoke_quote(get_quotes, tickers, broker):
|
|||
###########################################
|
||||
|
||||
|
||||
async def modify_quote_stream(broker, feed_type, symbols, chan=None, cid=None):
|
||||
def modify_quote_stream(broker, feed_type, symbols, chan, cid):
|
||||
"""Absolute symbol subscription list for each quote stream.
|
||||
|
||||
Effectively a symbol subscription api.
|
||||
"""
|
||||
log.info(f"{chan} changed symbol subscription to {symbols}")
|
||||
feed = await get_cached_feed(broker)
|
||||
ss = tractor.current_actor().statespace
|
||||
feed = ss['feeds'].get(broker)
|
||||
if feed is None:
|
||||
raise RuntimeError(
|
||||
"`get_cached_feed()` must be called before modifying its stream"
|
||||
)
|
||||
|
||||
symbols2chans = feed.subscriptions[feed_type]
|
||||
# update map from each symbol to requesting client's chan
|
||||
for ticker in symbols:
|
||||
|
@ -254,7 +265,7 @@ async def modify_quote_stream(broker, feed_type, symbols, chan=None, cid=None):
|
|||
chanset = symbols2chans.get(ticker)
|
||||
# XXX: cid will be different on unsub call
|
||||
for item in chanset.copy():
|
||||
if chan in item:
|
||||
if (chan, cid) == item:
|
||||
chanset.discard(item)
|
||||
|
||||
if not chanset:
|
||||
|
@ -271,8 +282,6 @@ async def get_cached_feed(
|
|||
ss = tractor.current_actor().statespace
|
||||
feeds = ss.setdefault('feeds', {'_lock': trio.Lock()})
|
||||
lock = feeds['_lock']
|
||||
feed_stacks = ss.setdefault('feed_stacks', {})
|
||||
feed_stack = feed_stacks.setdefault(brokername, contextlib.AsyncExitStack())
|
||||
async with lock:
|
||||
try:
|
||||
feed = feeds[brokername]
|
||||
|
@ -281,11 +290,13 @@ async def get_cached_feed(
|
|||
except KeyError:
|
||||
log.info(f"Creating new client for broker {brokername}")
|
||||
brokermod = get_brokermod(brokername)
|
||||
client = await feed_stack.enter_async_context(
|
||||
exit_stack = contextlib.AsyncExitStack()
|
||||
client = await exit_stack.enter_async_context(
|
||||
brokermod.get_client())
|
||||
feed = DataFeed(
|
||||
mod=brokermod,
|
||||
client=client,
|
||||
exit_stack=exit_stack,
|
||||
)
|
||||
feeds[brokername] = feed
|
||||
return feed
|
||||
|
@ -298,6 +309,7 @@ async def start_quote_stream(
|
|||
diff_cached: bool = True,
|
||||
chan: tractor.Channel = None,
|
||||
cid: str = None,
|
||||
rate: int = 3,
|
||||
) -> None:
|
||||
"""Handle per-broker quote stream subscriptions using a "lazy" pub-sub
|
||||
pattern.
|
||||
|
@ -306,12 +318,10 @@ async def start_quote_stream(
|
|||
Since most brokers seems to support batch quote requests we
|
||||
limit to one task per process for now.
|
||||
"""
|
||||
|
||||
actor = tractor.current_actor()
|
||||
# set log level after fork
|
||||
get_console_log(actor.loglevel)
|
||||
# pull global vars from local actor
|
||||
ss = actor.statespace
|
||||
symbols = list(symbols)
|
||||
log.info(
|
||||
f"{chan.uid} subscribed to {broker} for symbols {symbols}")
|
||||
|
@ -337,38 +347,71 @@ async def start_quote_stream(
|
|||
'option',
|
||||
await feed.mod.option_quoter(feed.client, symbols)
|
||||
)
|
||||
|
||||
# update map from each symbol to requesting client's chan
|
||||
await modify_quote_stream(broker, feed_type, symbols, chan, cid)
|
||||
|
||||
payload = {
|
||||
quote['symbol']: quote
|
||||
for quote in await get_quotes(symbols)
|
||||
}
|
||||
# push initial smoke quote response for client initialization
|
||||
await chan.send({'yield': payload, 'cid': cid})
|
||||
try:
|
||||
if not feed.tasks.get(feed_type):
|
||||
# no data feeder task yet; so start one
|
||||
respawn = True
|
||||
# update map from each symbol to requesting client's chan
|
||||
modify_quote_stream(broker, feed_type, symbols, chan, cid)
|
||||
|
||||
# event indicating that task was started and then killed
|
||||
task_is_dead = feed.tasks.get(feed_type)
|
||||
if task_is_dead is False:
|
||||
task_is_dead = trio.Event()
|
||||
task_is_dead.set()
|
||||
feed.tasks[feed_type] = task_is_dead
|
||||
|
||||
if not task_is_dead.is_set():
|
||||
# block and let existing feed task deliver
|
||||
# stream data until it is cancelled in which case
|
||||
# we'll take over and spawn it again
|
||||
await task_is_dead.wait()
|
||||
# client channel was likely disconnected
|
||||
# but we still want to keep the broker task
|
||||
# alive if there are other consumers (including
|
||||
# ourselves)
|
||||
if any(symbols2chans.values()):
|
||||
log.warn(
|
||||
f"Data feed task for {feed.mod.name} was cancelled but"
|
||||
f" there are still active clients, respawning")
|
||||
|
||||
# no data feeder task yet; so start one
|
||||
respawn = True
|
||||
while respawn:
|
||||
respawn = False
|
||||
log.info(f"Spawning data feed task for {feed.mod.name}")
|
||||
while respawn:
|
||||
respawn = False
|
||||
try:
|
||||
async with trio.open_nursery() as nursery:
|
||||
nursery.start_soon(
|
||||
partial(
|
||||
fan_out_to_chans, feed, get_quotes,
|
||||
symbols2chans,
|
||||
diff_cached=diff_cached,
|
||||
cid=cid
|
||||
)
|
||||
try:
|
||||
async with trio.open_nursery() as nursery:
|
||||
nursery.start_soon(
|
||||
partial(
|
||||
fan_out_to_chans, feed, get_quotes,
|
||||
symbols2chans,
|
||||
diff_cached=diff_cached,
|
||||
cid=cid,
|
||||
rate=rate,
|
||||
)
|
||||
feed.tasks[feed_type] = True
|
||||
except trio.BrokenResourceError:
|
||||
log.exception("Respawning failed data feed task")
|
||||
respawn = True
|
||||
# unblocks when no more symbols subscriptions exist and the
|
||||
# quote streamer task terminates (usually because another call
|
||||
# was made to `modify_quoter` to unsubscribe from streaming
|
||||
# symbols)
|
||||
)
|
||||
# it's alive!
|
||||
task_is_dead.clear()
|
||||
|
||||
except trio.BrokenResourceError:
|
||||
log.exception("Respawning failed data feed task")
|
||||
respawn = True
|
||||
|
||||
# unblocks when no more symbols subscriptions exist and the
|
||||
# quote streamer task terminates (usually because another call
|
||||
# was made to `modify_quoter` to unsubscribe from streaming
|
||||
# symbols)
|
||||
finally:
|
||||
log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}")
|
||||
feed.tasks.pop(feed_type)
|
||||
task_is_dead.set()
|
||||
|
||||
# if we're cancelled externally unsubscribe our quote feed
|
||||
modify_quote_stream(broker, feed_type, [], chan, cid)
|
||||
|
||||
# if there are truly no more subscriptions with this broker
|
||||
# drop from broker subs dict
|
||||
if not any(symbols2chans.values()):
|
||||
|
@ -376,7 +419,7 @@ async def start_quote_stream(
|
|||
# broker2symbolsubs.pop(broker, None)
|
||||
|
||||
# destroy the API client
|
||||
await feed_stack.aclose()
|
||||
await feed.exit_stack.aclose()
|
||||
|
||||
|
||||
async def stream_to_file(
|
||||
|
|
|
@ -5,7 +5,6 @@ import time
|
|||
from datetime import datetime
|
||||
from functools import partial
|
||||
import configparser
|
||||
from operator import itemgetter
|
||||
from typing import List, Tuple, Dict, Any, Iterator, NamedTuple
|
||||
|
||||
import trio
|
||||
|
@ -25,7 +24,10 @@ log = get_logger(__name__)
|
|||
|
||||
_refresh_token_ep = 'https://login.questrade.com/oauth2/'
|
||||
_version = 'v1'
|
||||
_rate_limit = 4 # queries/sec
|
||||
|
||||
# stock queries/sec
|
||||
# it seems 4 rps is best we can do total
|
||||
_rate_limit = 4
|
||||
|
||||
|
||||
class QuestradeError(Exception):
|
||||
|
@ -90,7 +92,7 @@ class _API:
|
|||
|
||||
async def option_quotes(
|
||||
self,
|
||||
contracts: Dict[ContractsKey, Dict[int, dict]],
|
||||
contracts: Dict[ContractsKey, Dict[int, dict]] = {},
|
||||
option_ids: List[int] = [], # if you don't want them all
|
||||
) -> dict:
|
||||
"""Retrieve option chain quotes for all option ids or by filter(s).
|
||||
|
@ -105,6 +107,8 @@ class _API:
|
|||
]
|
||||
resp = await self._sess.post(
|
||||
path=f'/markets/quotes/options',
|
||||
# XXX: b'{"code":1024,"message":"The size of the array requested is not valid: optionIds"}'
|
||||
# ^ what I get when trying to use too many ids manually...
|
||||
json={'filters': filters, 'optionIds': option_ids}
|
||||
)
|
||||
return resproc(resp, log)['optionQuotes']
|
||||
|
@ -123,7 +127,8 @@ class Client:
|
|||
self.access_data = {}
|
||||
self._reload_config(config)
|
||||
self._symbol_cache: Dict[str, int] = {}
|
||||
self._contracts2expiries = {}
|
||||
self._optids2contractinfo = {}
|
||||
self._contract2ids = {}
|
||||
|
||||
def _reload_config(self, config=None, **kwargs):
|
||||
log.warn("Reloading access config data")
|
||||
|
@ -312,17 +317,44 @@ class Client:
|
|||
contracts.items(),
|
||||
key=lambda item: item[0].expiry
|
||||
):
|
||||
by_key[
|
||||
ContractsKey(
|
||||
key.symbol,
|
||||
key.id,
|
||||
# converting back - maybe just do this initially?
|
||||
key.expiry.isoformat(timespec='microseconds'),
|
||||
)
|
||||
] = {
|
||||
item['strikePrice']: item for item in
|
||||
byroot['chainPerRoot'][0]['chainPerStrikePrice']
|
||||
}
|
||||
for chain in byroot['chainPerRoot']:
|
||||
optroot = chain['optionRoot']
|
||||
|
||||
# handle QTs "adjusted contracts" (aka adjusted for
|
||||
# the underlying in some way; usually has a '(1)' in
|
||||
# the expiry key in their UI)
|
||||
adjusted_contracts = optroot not in key.symbol
|
||||
tail = optroot[len(key.symbol):]
|
||||
suffix = '-' + tail if adjusted_contracts else ''
|
||||
|
||||
by_key[
|
||||
ContractsKey(
|
||||
key.symbol + suffix,
|
||||
key.id,
|
||||
# converting back - maybe just do this initially?
|
||||
key.expiry.isoformat(timespec='microseconds'),
|
||||
)
|
||||
] = {
|
||||
item['strikePrice']: item for item in
|
||||
chain['chainPerStrikePrice']
|
||||
}
|
||||
|
||||
# fill out contract id to strike expiry map
|
||||
for tup, bystrikes in by_key.items():
|
||||
for strike, ids in bystrikes.items():
|
||||
for key, contract_type in (
|
||||
('callSymbolId', 'call'), ('putSymbolId', 'put')
|
||||
):
|
||||
contract_int_id = ids[key]
|
||||
self._optids2contractinfo[contract_int_id] = {
|
||||
'strike': strike,
|
||||
'expiry': tup.expiry,
|
||||
'contract_type': contract_type,
|
||||
'contract_key': tup,
|
||||
}
|
||||
# store ids per contract
|
||||
self._contract2ids.setdefault(
|
||||
tup, set()).add(contract_int_id)
|
||||
return by_key
|
||||
|
||||
async def option_chains(
|
||||
|
@ -332,16 +364,31 @@ class Client:
|
|||
) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
||||
"""Return option chain snap quote for each ticker in ``symbols``.
|
||||
"""
|
||||
batch = []
|
||||
for key, bystrike in contracts.items():
|
||||
quotes = await self.api.option_quotes({key: bystrike})
|
||||
for quote in quotes:
|
||||
# index by .symbol, .expiry since that's what
|
||||
# a subscriber (currently) sends initially
|
||||
quote['key'] = (key[0], key[2])
|
||||
batch.extend(quotes)
|
||||
quotes = await self.api.option_quotes(contracts=contracts)
|
||||
# XXX the below doesn't work so well due to the symbol count
|
||||
# limit per quote request
|
||||
# quotes = await self.api.option_quotes(option_ids=list(contract_ids))
|
||||
for quote in quotes:
|
||||
id = quote['symbolId']
|
||||
contract_info = self._optids2contractinfo[id].copy()
|
||||
key = contract_info.pop('contract_key')
|
||||
|
||||
return batch
|
||||
# XXX TODO: this currently doesn't handle adjusted contracts
|
||||
# (i.e. ones that we stick a '(1)' after)
|
||||
|
||||
# index by .symbol, .expiry since that's what
|
||||
# a subscriber (currently) sends initially
|
||||
quote['key'] = (key.symbol, key.expiry)
|
||||
|
||||
# update with expiry and strike (Obviously the
|
||||
# QT api designers are using some kind of severely
|
||||
# stupid disparate table system where they keep
|
||||
# contract info in a separate table from the quote format
|
||||
# keys. I'm really not surprised though - windows shop..)
|
||||
# quote.update(self._optids2contractinfo[quote['symbolId']])
|
||||
quote.update(contract_info)
|
||||
|
||||
return quotes
|
||||
|
||||
|
||||
async def token_refresher(client):
|
||||
|
@ -394,7 +441,8 @@ async def get_client() -> Client:
|
|||
try:
|
||||
log.debug("Check time to ensure access token is valid")
|
||||
try:
|
||||
await client.api.time()
|
||||
# await client.api.time()
|
||||
await client.quote(['RY.TO'])
|
||||
except Exception:
|
||||
# access token is likely no good
|
||||
log.warn(f"Access token {client.access_data['access_token']} seems"
|
||||
|
@ -471,18 +519,17 @@ async def option_quoter(client: Client, tickers: List[str]):
|
|||
if isinstance(tickers[0], tuple):
|
||||
datetime.fromisoformat(tickers[0][1])
|
||||
else:
|
||||
log.warn(f"Ignoring option quoter call with {tickers}")
|
||||
# TODO make caller always check that a quoter has been set
|
||||
return
|
||||
raise ValueError(f'Option subscription format is (symbol, expiry)')
|
||||
|
||||
@async_lifo_cache(maxsize=128)
|
||||
async def get_contract_by_date(sym_date_pairs: Tuple[Tuple[str, str]]):
|
||||
async def get_contract_by_date(
|
||||
sym_date_pairs: Tuple[Tuple[str, str]],
|
||||
):
|
||||
"""For each tuple,
|
||||
``(symbol_date_1, symbol_date_2, ... , symbol_date_n)``
|
||||
return a contract dict.
|
||||
"""
|
||||
symbols = map(itemgetter(0), sym_date_pairs)
|
||||
dates = map(itemgetter(1), sym_date_pairs)
|
||||
symbols, dates = zip(*sym_date_pairs)
|
||||
contracts = await client.get_all_contracts(symbols)
|
||||
selected = {}
|
||||
for key, val in contracts.items():
|
||||
|
@ -536,11 +583,11 @@ _qt_stock_keys = {
|
|||
'bidSize': 'bsize',
|
||||
'askSize': 'asize',
|
||||
'VWAP': ('VWAP', partial(round, ndigits=3)),
|
||||
'mktcap': ('mktcap', humanize),
|
||||
'MC': ('MC', humanize),
|
||||
'$ vol': ('$ vol', humanize),
|
||||
'volume': ('vol', humanize),
|
||||
'close': 'close',
|
||||
'openPrice': 'open',
|
||||
# 'close': 'close',
|
||||
# 'openPrice': 'open',
|
||||
'lowPrice': 'low',
|
||||
'highPrice': 'high',
|
||||
# 'low52w': 'low52w', # put in info widget
|
||||
|
@ -556,15 +603,15 @@ _qt_stock_keys = {
|
|||
|
||||
# BidAskLayout columns which will contain three cells the first stacked on top
|
||||
# of the other 2
|
||||
_bidasks = {
|
||||
_stock_bidasks = {
|
||||
'last': ['bid', 'ask'],
|
||||
'size': ['bsize', 'asize'],
|
||||
'VWAP': ['low', 'high'],
|
||||
'mktcap': ['vol', '$ vol'],
|
||||
'vol': ['MC', '$ vol'],
|
||||
}
|
||||
|
||||
|
||||
def format_quote(
|
||||
def format_stock_quote(
|
||||
quote: dict,
|
||||
symbol_data: dict,
|
||||
keymap: dict = _qt_stock_keys,
|
||||
|
@ -586,7 +633,7 @@ def format_quote(
|
|||
computed = {
|
||||
'symbol': quote['symbol'],
|
||||
'%': round(change, 3),
|
||||
'mktcap': mktcap,
|
||||
'MC': mktcap,
|
||||
# why QT do you have to be an asshole shipping null values!!!
|
||||
'$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3),
|
||||
'close': previous,
|
||||
|
@ -609,3 +656,100 @@ def format_quote(
|
|||
displayable[new_key] = display_value
|
||||
|
||||
return new, displayable
|
||||
|
||||
|
||||
_qt_option_keys = {
|
||||
"lastTradePrice": 'last',
|
||||
"askPrice": 'ask',
|
||||
"bidPrice": 'bid',
|
||||
"lastTradeSize": 'size',
|
||||
"bidSize": 'bsize',
|
||||
"askSize": 'asize',
|
||||
'VWAP': ('VWAP', partial(round, ndigits=3)),
|
||||
"lowPrice": 'low',
|
||||
"highPrice": 'high',
|
||||
# "expiry": "expiry",
|
||||
# "delay": 0,
|
||||
"delta": ('delta', partial(round, ndigits=3)),
|
||||
# "gamma": ('gama', partial(round, ndigits=3)),
|
||||
# "rho": ('rho', partial(round, ndigits=3)),
|
||||
# "theta": ('theta', partial(round, ndigits=3)),
|
||||
# "vega": ('vega', partial(round, ndigits=3)),
|
||||
'$ vol': ('$ vol', humanize),
|
||||
'volume': ('vol', humanize),
|
||||
# "2021-01-15T00:00:00.000000-05:00",
|
||||
# "isHalted": false,
|
||||
# "key": [
|
||||
# "APHA.TO",
|
||||
# "2021-01-15T00:00:00.000000-05:00"
|
||||
# ],
|
||||
# "lastTradePriceTrHrs": null,
|
||||
# "lastTradeTick": 'tick',
|
||||
"lastTradeTime": 'time',
|
||||
"openInterest": 'oi',
|
||||
"openPrice": 'open',
|
||||
# "strike": 'strike',
|
||||
# "symbol": "APHA15Jan21P8.00.MX",
|
||||
# "symbolId": 23881868,
|
||||
# "underlying": "APHA.TO",
|
||||
# "underlyingId": 8297492,
|
||||
"symbol": 'symbol',
|
||||
"contract_type": 'contract_type',
|
||||
"volatility": (
|
||||
'IV %',
|
||||
lambda v: '{}'.format(round(v, ndigits=2))
|
||||
),
|
||||
"strike": 'strike',
|
||||
}
|
||||
|
||||
_option_bidasks = {
|
||||
'last': ['bid', 'ask'],
|
||||
'size': ['bsize', 'asize'],
|
||||
'VWAP': ['low', 'high'],
|
||||
'vol': ['oi', '$ vol'],
|
||||
}
|
||||
|
||||
|
||||
def format_option_quote(
|
||||
quote: dict,
|
||||
symbol_data: dict,
|
||||
keymap: dict = _qt_option_keys,
|
||||
) -> Tuple[dict, dict]:
|
||||
"""Remap a list of quote dicts ``quotes`` using the mapping of old keys
|
||||
-> new keys ``keymap`` returning 2 dicts: one with raw data and the other
|
||||
for display.
|
||||
|
||||
Returns 2 dicts: first is the original values mapped by new keys,
|
||||
and the second is the same but with all values converted to a
|
||||
"display-friendly" string format.
|
||||
"""
|
||||
# TODO: need historical data..
|
||||
# (cause why would QT keep their quote structure consistent across
|
||||
# assets..)
|
||||
# previous = symbol_data[symbol]['prevDayClosePrice']
|
||||
# change = percent_change(previous, last)
|
||||
computed = {
|
||||
# why QT do you have to be an asshole shipping null values!!!
|
||||
'$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3),
|
||||
# '%': round(change, 3),
|
||||
# 'close': previous,
|
||||
}
|
||||
new = {}
|
||||
displayable = {}
|
||||
|
||||
# structuring and normalization
|
||||
for key, new_key in keymap.items():
|
||||
display_value = value = computed.get(key) or quote.get(key)
|
||||
|
||||
# API servers can return `None` vals when markets are closed (weekend)
|
||||
value = 0 if value is None else value
|
||||
|
||||
# convert values to a displayble format using available formatting func
|
||||
if isinstance(new_key, tuple):
|
||||
new_key, func = new_key
|
||||
display_value = func(value) if value else value
|
||||
|
||||
new[new_key] = value
|
||||
displayable[new_key] = display_value
|
||||
|
||||
return new, displayable
|
||||
|
|
48
piker/cli.py
48
piker/cli.py
|
@ -22,6 +22,10 @@ DEFAULT_BROKER = 'robinhood'
|
|||
|
||||
_config_dir = click.get_app_dir('piker')
|
||||
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
|
||||
_data_mods = [
|
||||
'piker.brokers.core',
|
||||
'piker.brokers.data',
|
||||
]
|
||||
|
||||
|
||||
@click.command()
|
||||
|
@ -33,7 +37,7 @@ def pikerd(loglevel, host, tl):
|
|||
"""
|
||||
get_console_log(loglevel)
|
||||
tractor.run_daemon(
|
||||
rpc_module_paths=['piker.brokers.data'],
|
||||
rpc_module_paths=_data_mods,
|
||||
name='brokerd',
|
||||
loglevel=loglevel if tl else None,
|
||||
)
|
||||
|
@ -133,7 +137,7 @@ async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None):
|
|||
"No broker daemon could be found, spawning brokerd..")
|
||||
portal = await nursery.start_actor(
|
||||
'brokerd',
|
||||
rpc_module_paths=['piker.brokers.data'],
|
||||
rpc_module_paths=_data_mods,
|
||||
loglevel=loglevel,
|
||||
)
|
||||
yield portal
|
||||
|
@ -144,7 +148,7 @@ async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None):
|
|||
help='Broker backend to use')
|
||||
@click.option('--loglevel', '-l', default='warning', help='Logging level')
|
||||
@click.option('--tl', is_flag=True, help='Enable tractor logging')
|
||||
@click.option('--rate', '-r', default=5, help='Quote rate limit')
|
||||
@click.option('--rate', '-r', default=3, help='Quote rate limit')
|
||||
@click.option('--test', '-t', help='Test quote stream file')
|
||||
@click.option('--dhost', '-dh', default='127.0.0.1',
|
||||
help='Daemon host address to connect to')
|
||||
|
@ -174,8 +178,9 @@ def monitor(loglevel, broker, rate, name, dhost, test, tl):
|
|||
|
||||
tractor.run(
|
||||
partial(main, tries=1),
|
||||
name='kivy-monitor',
|
||||
name='monitor',
|
||||
loglevel=loglevel if tl else None,
|
||||
rpc_module_paths=['piker.ui.monitor'],
|
||||
)
|
||||
|
||||
|
||||
|
@ -358,3 +363,38 @@ def optsquote(loglevel, broker, symbol, df_output, date):
|
|||
click.echo(df)
|
||||
else:
|
||||
click.echo(colorize_json(quotes))
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option('--broker', '-b', default=DEFAULT_BROKER,
|
||||
help='Broker backend to use')
|
||||
@click.option('--loglevel', '-l', default='warning', help='Logging level')
|
||||
@click.option('--tl', is_flag=True, help='Enable tractor logging')
|
||||
@click.option('--date', '-d', help='Contracts expiry date')
|
||||
@click.option('--test', '-t', help='Test quote stream file')
|
||||
@click.option('--rate', '-r', default=1, help='Logging level')
|
||||
@click.argument('symbol', required=True)
|
||||
def optschain(loglevel, broker, symbol, date, tl, rate, test):
|
||||
"""Start the real-time option chain UI.
|
||||
"""
|
||||
from .ui.option_chain import _async_main
|
||||
log = get_console_log(loglevel) # activate console logging
|
||||
brokermod = get_brokermod(broker)
|
||||
|
||||
async def main(tries):
|
||||
async with maybe_spawn_brokerd_as_subactor(
|
||||
tries=tries, loglevel=loglevel
|
||||
) as portal:
|
||||
# run app "main"
|
||||
await _async_main(
|
||||
symbol, portal,
|
||||
brokermod,
|
||||
rate=rate,
|
||||
test=test,
|
||||
)
|
||||
|
||||
tractor.run(
|
||||
partial(main, tries=1),
|
||||
name='kivy-options-chain',
|
||||
loglevel=loglevel if tl else None,
|
||||
)
|
||||
|
|
|
@ -89,14 +89,16 @@ class MouseOverBehavior(object):
|
|||
def __init__(self, **kwargs):
|
||||
self.register_event_type('on_enter')
|
||||
self.register_event_type('on_leave')
|
||||
MouseOverBehavior._widgets.append(self)
|
||||
super().__init__(**kwargs)
|
||||
Window.bind(mouse_pos=self._on_mouse_pos)
|
||||
self._widgets.append(self)
|
||||
|
||||
def __del__(self):
|
||||
MouseOverBehavior.remove(self)
|
||||
|
||||
@classmethod
|
||||
# try throttling to 1ms latency (doesn't seem to work
|
||||
# best I can get is 0.01...)
|
||||
@triggered(timeout=0.001, interval=False)
|
||||
# throttle at 10ms latency
|
||||
@triggered(timeout=0.01, interval=False)
|
||||
def _on_mouse_pos(cls, *args):
|
||||
log.debug(f"{cls} time since last call: {time.time() - cls._last_time}")
|
||||
cls._last_time = time.time()
|
||||
|
@ -107,10 +109,11 @@ class MouseOverBehavior(object):
|
|||
|
||||
pos = args[1]
|
||||
# Next line to_widget allow to compensate for relative layout
|
||||
for widget in cls._widgets.copy():
|
||||
for widget in cls._widgets:
|
||||
w_coords = widget.to_widget(*pos)
|
||||
inside = widget.collide_point(*w_coords)
|
||||
if inside and widget.hovered:
|
||||
log.debug('already hovered')
|
||||
return
|
||||
elif inside:
|
||||
# un-highlight the last highlighted
|
||||
|
|
|
@ -5,419 +5,38 @@ Launch with ``piker monitor <watchlist name>``.
|
|||
|
||||
(Currently there's a bunch of questrade specific stuff in here)
|
||||
"""
|
||||
from itertools import chain
|
||||
from types import ModuleType, AsyncGeneratorType
|
||||
from typing import List
|
||||
from typing import List, Callable
|
||||
|
||||
import trio
|
||||
import tractor
|
||||
from kivy.uix.boxlayout import BoxLayout
|
||||
from kivy.uix.gridlayout import GridLayout
|
||||
from kivy.uix.stacklayout import StackLayout
|
||||
from kivy.uix.button import Button
|
||||
from kivy.lang import Builder
|
||||
from kivy import utils
|
||||
from kivy.app import async_runTouchApp
|
||||
from kivy.core.window import Window
|
||||
from async_generator import aclosing
|
||||
|
||||
from .tabular import (
|
||||
Row, TickerTable, _kv, _black_rgba, colorcode,
|
||||
)
|
||||
from ..log import get_logger
|
||||
from .pager import PagerView
|
||||
from .kivy.mouse_over import new_mouse_over_group
|
||||
|
||||
|
||||
HoverBehavior = new_mouse_over_group()
|
||||
log = get_logger('monitor')
|
||||
|
||||
|
||||
_colors2hexs = {
|
||||
'darkgray': 'a9a9a9',
|
||||
'gray': '808080',
|
||||
'green': '008000',
|
||||
'forestgreen': '228b22',
|
||||
'red2': 'ff3333',
|
||||
'red': 'ff0000',
|
||||
'firebrick': 'b22222',
|
||||
}
|
||||
|
||||
_colors = {key: utils.rgba(val) for key, val in _colors2hexs.items()}
|
||||
|
||||
|
||||
def colorcode(name):
|
||||
return _colors[name if name else 'gray']
|
||||
|
||||
|
||||
_bs = 0.75 # border size
|
||||
|
||||
# medium shade of gray that seems to match the
|
||||
# default i3 window borders
|
||||
_i3_rgba = [0.14]*3 + [1]
|
||||
|
||||
# slightly off black like the jellybean bg from
|
||||
# vim colorscheme
|
||||
_cell_rgba = [0.07]*3 + [1]
|
||||
_black_rgba = [0]*4
|
||||
|
||||
_kv = (f'''
|
||||
#:kivy 1.10.0
|
||||
|
||||
<Cell>
|
||||
font_size: 21
|
||||
# make text wrap to botom
|
||||
text_size: self.size
|
||||
halign: 'center'
|
||||
valign: 'middle'
|
||||
size: self.texture_size
|
||||
# color: {colorcode('gray')}
|
||||
# font_color: {colorcode('gray')}
|
||||
font_name: 'Roboto-Regular'
|
||||
background_color: [0]*4 # by default transparent; use row color
|
||||
# background_color: {_cell_rgba}
|
||||
# spacing: 0, 0
|
||||
# padding: [0]*4
|
||||
|
||||
<HeaderCell>
|
||||
font_size: 21
|
||||
background_color: [0]*4 # by default transparent; use row color
|
||||
# background_color: {_cell_rgba}
|
||||
# canvas.before:
|
||||
# Color:
|
||||
# rgba: [0.13]*4
|
||||
# BorderImage: # use a fixed size border
|
||||
# pos: self.pos
|
||||
# size: [self.size[0] - {_bs}, self.size[1]]
|
||||
# # 0s are because the containing TickerTable already has spacing
|
||||
# # border: [0, {_bs} , 0, {_bs}]
|
||||
# border: [0, {_bs} , 0, 0]
|
||||
|
||||
<TickerTable>
|
||||
spacing: [{_bs}]
|
||||
# row_force_default: True
|
||||
row_default_height: 62
|
||||
cols: 1
|
||||
canvas.before:
|
||||
Color:
|
||||
# i3 style gray as background
|
||||
rgba: {_i3_rgba}
|
||||
# rgba: {_cell_rgba}
|
||||
Rectangle:
|
||||
# scale with container self here refers to the widget i.e BoxLayout
|
||||
pos: self.pos
|
||||
size: self.size
|
||||
|
||||
<BidAskLayout>
|
||||
spacing: [{_bs}, 0]
|
||||
|
||||
<Row>
|
||||
# minimum_height: 200 # should be pulled from Cell text size
|
||||
# minimum_width: 200
|
||||
# row_force_default: True
|
||||
# row_default_height: 61 # determines the header row size
|
||||
padding: [0]*4
|
||||
spacing: [0]
|
||||
canvas.before:
|
||||
Color:
|
||||
# rgba: [0]*4
|
||||
rgba: {_cell_rgba}
|
||||
Rectangle:
|
||||
# self here refers to the widget i.e Row(GridLayout)
|
||||
pos: self.pos
|
||||
size: self.size
|
||||
# row higlighting on mouse over
|
||||
Color:
|
||||
rgba: {_i3_rgba}
|
||||
RoundedRectangle:
|
||||
size: self.width, self.height if self.hovered else 1
|
||||
pos: self.pos
|
||||
radius: (10,)
|
||||
|
||||
|
||||
|
||||
# part of the `PagerView`
|
||||
<SearchBar>
|
||||
size_hint: 1, None
|
||||
# static size of 51 px
|
||||
height: 51
|
||||
font_size: 25
|
||||
background_color: {_i3_rgba}
|
||||
''')
|
||||
|
||||
|
||||
class Cell(Button):
|
||||
"""Data cell: the fundemental widget.
|
||||
|
||||
``key`` is the column name index value.
|
||||
"""
|
||||
def __init__(self, key=None, **kwargs):
|
||||
super(Cell, self).__init__(**kwargs)
|
||||
self.key = key
|
||||
|
||||
|
||||
class HeaderCell(Cell):
|
||||
"""Column header cell label.
|
||||
"""
|
||||
def on_press(self, value=None):
|
||||
"""Clicking on a col header indicates to sort rows by this column
|
||||
in `update_quotes()`.
|
||||
"""
|
||||
table = self.row.table
|
||||
# if this is a row header cell then sort by the clicked field
|
||||
if self.row.is_header:
|
||||
table.sort_key = self.key
|
||||
|
||||
last = table.last_clicked_col_cell
|
||||
if last and last is not self:
|
||||
last.underline = False
|
||||
last.bold = False
|
||||
|
||||
# outline the header text to indicate it's been the last clicked
|
||||
self.underline = True
|
||||
self.bold = True
|
||||
# mark this cell as the last selected
|
||||
table.last_clicked_col_cell = self
|
||||
# sort and render the rows immediately
|
||||
self.row.table.render_rows(table.quote_cache)
|
||||
|
||||
# allow highlighting of row headers for tracking
|
||||
elif self.is_header:
|
||||
if self.background_color == self.color:
|
||||
self.background_color = _black_rgba
|
||||
else:
|
||||
self.background_color = self.color
|
||||
|
||||
|
||||
class BidAskLayout(StackLayout):
|
||||
"""Cell which houses three buttons containing a last, bid, and ask in a
|
||||
single unit oriented with the last 2 under the first.
|
||||
"""
|
||||
def __init__(self, values, header=False, **kwargs):
|
||||
# uncomment to get vertical stacked bid-ask
|
||||
# super(BidAskLayout, self).__init__(orientation='bt-lr', **kwargs)
|
||||
super(BidAskLayout, self).__init__(orientation='lr-tb', **kwargs)
|
||||
assert len(values) == 3, "You can only provide 3 values: last,bid,ask"
|
||||
self._keys2cells = {}
|
||||
cell_type = HeaderCell if header else Cell
|
||||
top_size = cell_type().font_size
|
||||
small_size = top_size - 4
|
||||
top_prop = 0.5 # proportion of size used by top cell
|
||||
bottom_prop = 1 - top_prop
|
||||
for (key, size_hint, font_size), value in zip(
|
||||
[('last', (1, top_prop), top_size),
|
||||
('bid', (0.5, bottom_prop), small_size),
|
||||
('ask', (0.5, bottom_prop), small_size)],
|
||||
# uncomment to get vertical stacked bid-ask
|
||||
# [('last', (top_prop, 1), top_size),
|
||||
# ('bid', (bottom_prop, 0.5), small_size),
|
||||
# ('ask', (bottom_prop, 0.5), small_size)],
|
||||
values
|
||||
):
|
||||
cell = cell_type(
|
||||
text=str(value),
|
||||
size_hint=size_hint,
|
||||
# width=self.width/2 - 3,
|
||||
font_size=font_size
|
||||
)
|
||||
self._keys2cells[key] = cell
|
||||
cell.key = value
|
||||
cell.is_header = header
|
||||
setattr(self, key, cell)
|
||||
self.add_widget(cell)
|
||||
|
||||
# should be assigned by referrer
|
||||
self.row = None
|
||||
|
||||
def get_cell(self, key):
|
||||
return self._keys2cells[key]
|
||||
|
||||
@property
|
||||
def row(self):
|
||||
return self.row
|
||||
|
||||
@row.setter
|
||||
def row(self, row):
|
||||
# so hideous
|
||||
for cell in self.cells:
|
||||
cell.row = row
|
||||
|
||||
@property
|
||||
def cells(self):
|
||||
return [self.last, self.bid, self.ask]
|
||||
|
||||
|
||||
class Row(GridLayout, HoverBehavior):
|
||||
"""A grid for displaying a row of ticker quote data.
|
||||
|
||||
The row fields can be updated using the ``fields`` property which will in
|
||||
turn adjust the text color of the values based on content changes.
|
||||
"""
|
||||
def __init__(
|
||||
self, record, headers=(), bidasks=None, table=None,
|
||||
is_header=False,
|
||||
**kwargs
|
||||
):
|
||||
super(Row, self).__init__(cols=len(record), **kwargs)
|
||||
self._cell_widgets = {}
|
||||
self._last_record = record
|
||||
self.table = table
|
||||
self.is_header = is_header
|
||||
|
||||
# selection state
|
||||
self.mouse_over = False
|
||||
|
||||
# create `BidAskCells` first
|
||||
layouts = {}
|
||||
bidasks = bidasks or {}
|
||||
ba_cells = {}
|
||||
for key, children in bidasks.items():
|
||||
layout = BidAskLayout(
|
||||
[record[key]] + [record[child] for child in children],
|
||||
header=is_header
|
||||
)
|
||||
layout.row = self
|
||||
layouts[key] = layout
|
||||
for i, child in enumerate([key] + children):
|
||||
ba_cells[child] = layout.cells[i]
|
||||
|
||||
children_flat = list(chain.from_iterable(bidasks.values()))
|
||||
self._cell_widgets.update(ba_cells)
|
||||
|
||||
# build out row using Cell labels
|
||||
for (key, val) in record.items():
|
||||
header = key in headers
|
||||
|
||||
# handle bidask cells
|
||||
if key in layouts:
|
||||
self.add_widget(layouts[key])
|
||||
elif key in children_flat:
|
||||
# these cells have already been added to the `BidAskLayout`
|
||||
continue
|
||||
else:
|
||||
cell = self._append_cell(val, key, header=header)
|
||||
cell.key = key
|
||||
self._cell_widgets[key] = cell
|
||||
|
||||
def get_cell(self, key):
|
||||
return self._cell_widgets[key]
|
||||
|
||||
def _append_cell(self, text, key, header=False):
|
||||
if not len(self._cell_widgets) < self.cols:
|
||||
raise ValueError(f"Can not append more then {self.cols} cells")
|
||||
|
||||
# header cells just have a different colour
|
||||
celltype = HeaderCell if header else Cell
|
||||
cell = celltype(text=str(text), key=key)
|
||||
cell.is_header = header
|
||||
cell.row = self
|
||||
self.add_widget(cell)
|
||||
return cell
|
||||
|
||||
def update(self, record, displayable):
|
||||
"""Update this row's cells with new values from a quote ``record``.
|
||||
|
||||
Return all cells that changed in a ``dict``.
|
||||
"""
|
||||
# color changed field values
|
||||
cells = {}
|
||||
gray = colorcode('gray')
|
||||
fgreen = colorcode('forestgreen')
|
||||
red = colorcode('red2')
|
||||
for key, val in record.items():
|
||||
# logic for cell text coloring: up-green, down-red
|
||||
if self._last_record[key] < val:
|
||||
color = fgreen
|
||||
elif self._last_record[key] > val:
|
||||
color = red
|
||||
else:
|
||||
color = gray
|
||||
|
||||
cell = self.get_cell(key)
|
||||
cell.text = str(displayable[key])
|
||||
cell.color = color
|
||||
if color != gray:
|
||||
cells[key] = cell
|
||||
|
||||
self._last_record = record
|
||||
return cells
|
||||
|
||||
# mouse over handlers
|
||||
def on_enter(self):
|
||||
"""Highlight layout on enter.
|
||||
"""
|
||||
log.debug(
|
||||
f"Entered row {type(self)} through {self.border_point}")
|
||||
# don't highlight header row
|
||||
if getattr(self, 'is_header', None):
|
||||
self.hovered = False
|
||||
|
||||
def on_leave(self):
|
||||
"""Un-highlight layout on exit.
|
||||
"""
|
||||
log.debug(
|
||||
f"Left row {type(self)} through {self.border_point}")
|
||||
|
||||
|
||||
class TickerTable(GridLayout):
|
||||
"""A grid for displaying ticker quote records as a table.
|
||||
"""
|
||||
def __init__(self, sort_key='%', quote_cache={}, **kwargs):
|
||||
super(TickerTable, self).__init__(**kwargs)
|
||||
self.symbols2rows = {}
|
||||
self.sort_key = sort_key
|
||||
self.quote_cache = quote_cache
|
||||
self.row_filter = lambda item: item
|
||||
# for tracking last clicked column header cell
|
||||
self.last_clicked_col_cell = None
|
||||
self._last_row_toggle = 0
|
||||
|
||||
def append_row(self, record, bidasks=None):
|
||||
"""Append a `Row` of `Cell` objects to this table.
|
||||
"""
|
||||
row = Row(record, headers=('symbol',), bidasks=bidasks, table=self)
|
||||
# store ref to each row
|
||||
self.symbols2rows[row._last_record['symbol']] = row
|
||||
self.add_widget(row)
|
||||
return row
|
||||
|
||||
def render_rows(
|
||||
self, pairs: {str: (dict, Row)}, sort_key: str = None,
|
||||
row_filter=None,
|
||||
):
|
||||
"""Sort and render all rows on the ticker grid from ``pairs``.
|
||||
"""
|
||||
self.clear_widgets()
|
||||
sort_key = sort_key or self.sort_key
|
||||
for data, row in filter(
|
||||
row_filter or self.row_filter,
|
||||
reversed(
|
||||
sorted(pairs.values(), key=lambda item: item[0][sort_key])
|
||||
)
|
||||
):
|
||||
self.add_widget(row) # row append
|
||||
|
||||
def ticker_search(self, patt):
|
||||
"""Return sequence of matches when pattern ``patt`` is in a
|
||||
symbol name. Most naive algo possible for the moment.
|
||||
"""
|
||||
for symbol, row in self.symbols2rows.items():
|
||||
if patt in symbol:
|
||||
yield symbol, row
|
||||
|
||||
def search(self, patt):
|
||||
"""Search bar api compat.
|
||||
"""
|
||||
return dict(self.ticker_search(patt)) or {}
|
||||
|
||||
|
||||
async def update_quotes(
|
||||
nursery: trio._core._run.Nursery,
|
||||
brokermod: ModuleType,
|
||||
formatter: Callable,
|
||||
widgets: dict,
|
||||
agen: AsyncGeneratorType,
|
||||
symbol_data: dict,
|
||||
first_quotes: dict
|
||||
first_quotes: dict,
|
||||
task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
"""Process live quotes by updating ticker rows.
|
||||
"""
|
||||
log.debug("Initializing UI update loop")
|
||||
table = widgets['table']
|
||||
flash_keys = {'low', 'high'}
|
||||
|
||||
|
@ -426,24 +45,27 @@ async def update_quotes(
|
|||
for cell in cells:
|
||||
cell.background_color = _black_rgba
|
||||
|
||||
def color_row(row, data, cells):
|
||||
def color_row(row, record, cells):
|
||||
hdrcell = row.get_cell('symbol')
|
||||
chngcell = row.get_cell('%')
|
||||
|
||||
# determine daily change color
|
||||
daychange = float(data['%'])
|
||||
if daychange < 0.:
|
||||
color = colorcode('red2')
|
||||
elif daychange > 0.:
|
||||
color = colorcode('forestgreen')
|
||||
else:
|
||||
color = colorcode('gray')
|
||||
color = colorcode('gray')
|
||||
percent_change = record.get('%')
|
||||
if percent_change:
|
||||
daychange = float(record['%'])
|
||||
if daychange < 0.:
|
||||
color = colorcode('red2')
|
||||
elif daychange > 0.:
|
||||
color = colorcode('forestgreen')
|
||||
|
||||
# update row header and '%' cell text color
|
||||
chngcell.color = hdrcell.color = color
|
||||
# if the cell has been "highlighted" make sure to change its color
|
||||
if hdrcell.background_color != [0]*4:
|
||||
hdrcell.background_color = color
|
||||
if chngcell:
|
||||
chngcell.color = color
|
||||
hdrcell.color = color
|
||||
# if the cell has been "highlighted" make sure to change its color
|
||||
if hdrcell.background_color != [0]*4:
|
||||
hdrcell.background_color = color
|
||||
|
||||
# briefly highlight bg of certain cells on each trade execution
|
||||
unflash = set()
|
||||
|
@ -478,36 +100,62 @@ async def update_quotes(
|
|||
# revert flash state momentarily
|
||||
nursery.start_soon(revert_cells_color, unflash)
|
||||
|
||||
cache = {}
|
||||
table.quote_cache = cache
|
||||
|
||||
# initial coloring
|
||||
to_sort = set()
|
||||
for sym, quote in first_quotes.items():
|
||||
row = table.symbols2rows[sym]
|
||||
record, displayable = brokermod.format_quote(
|
||||
row = table.get_row(sym)
|
||||
record, displayable = formatter(
|
||||
quote, symbol_data=symbol_data)
|
||||
row.update(record, displayable)
|
||||
color_row(row, record, {})
|
||||
cache[sym] = (record, row)
|
||||
to_sort.add(row.widget)
|
||||
|
||||
# render all rows once up front
|
||||
table.render_rows(cache)
|
||||
table.render_rows(to_sort)
|
||||
|
||||
log.debug("Finished initializing update loop")
|
||||
task_status.started()
|
||||
# real-time cell update loop
|
||||
async for quotes in agen: # new quotes data only
|
||||
to_sort = set()
|
||||
for symbol, quote in quotes.items():
|
||||
record, displayable = brokermod.format_quote(
|
||||
row = table.get_row(symbol)
|
||||
record, displayable = formatter(
|
||||
quote, symbol_data=symbol_data)
|
||||
row = table.symbols2rows[symbol]
|
||||
cache[symbol] = (record, row)
|
||||
|
||||
# determine if sorting should happen
|
||||
sort_key = table.sort_key
|
||||
new = record[sort_key]
|
||||
last = row.get_field(sort_key)
|
||||
if new != last:
|
||||
to_sort.add(row.widget)
|
||||
|
||||
# update and color
|
||||
cells = row.update(record, displayable)
|
||||
color_row(row, record, cells)
|
||||
|
||||
table.render_rows(cache)
|
||||
if to_sort:
|
||||
table.render_rows(to_sort)
|
||||
|
||||
log.debug("Waiting on quotes")
|
||||
|
||||
log.warn("Data feed connection dropped")
|
||||
nursery.cancel_scope.cancel()
|
||||
# XXX: if we're cancelled this should never get called
|
||||
# nursery.cancel_scope.cancel()
|
||||
|
||||
|
||||
async def stream_symbol_selection():
|
||||
"""An RPC async gen for streaming the symbol corresponding
|
||||
value corresponding to the last clicked row.
|
||||
"""
|
||||
widgets = tractor.current_actor().statespace['widgets']
|
||||
table = widgets['table']
|
||||
q = trio.Queue(1)
|
||||
table._click_queues.append(q)
|
||||
try:
|
||||
async for symbol in q:
|
||||
yield symbol
|
||||
finally:
|
||||
table._click_queues.remove(q)
|
||||
|
||||
|
||||
async def _async_main(
|
||||
|
@ -515,7 +163,7 @@ async def _async_main(
|
|||
portal: tractor._portal.Portal,
|
||||
tickers: List[str],
|
||||
brokermod: ModuleType,
|
||||
rate: int,
|
||||
rate: int = 3,
|
||||
test: bool = False
|
||||
) -> None:
|
||||
'''Launch kivy app + all other related tasks.
|
||||
|
@ -528,11 +176,18 @@ async def _async_main(
|
|||
"piker.brokers.data", 'stream_from_file',
|
||||
filename=test
|
||||
)
|
||||
# TODO: need a set of test packets to make this work
|
||||
# seriously fu QT
|
||||
# sd = {}
|
||||
else:
|
||||
# start live streaming from broker daemon
|
||||
quote_gen = await portal.run(
|
||||
"piker.brokers.data", 'start_quote_stream',
|
||||
broker=brokermod.name, symbols=tickers)
|
||||
"piker.brokers.data",
|
||||
'start_quote_stream',
|
||||
broker=brokermod.name,
|
||||
symbols=tickers,
|
||||
rate=3,
|
||||
)
|
||||
|
||||
# subscribe for tickers (this performs a possible filtering
|
||||
# where invalid symbols are discarded)
|
||||
|
@ -540,85 +195,94 @@ async def _async_main(
|
|||
"piker.brokers.data", 'symbol_data',
|
||||
broker=brokermod.name, tickers=tickers)
|
||||
|
||||
async with trio.open_nursery() as nursery:
|
||||
# get first quotes response
|
||||
log.debug("Waiting on first quote...")
|
||||
quotes = await quote_gen.__anext__()
|
||||
first_quotes = [
|
||||
brokermod.format_quote(quote, symbol_data=sd)[0]
|
||||
for quote in quotes.values()]
|
||||
# get first quotes response
|
||||
log.debug("Waiting on first quote...")
|
||||
quotes = await quote_gen.__anext__()
|
||||
first_quotes = [
|
||||
brokermod.format_stock_quote(quote, symbol_data=sd)[0]
|
||||
for quote in quotes.values()]
|
||||
|
||||
if first_quotes[0].get('last') is None:
|
||||
log.error("Broker API is down temporarily")
|
||||
nursery.cancel_scope.cancel()
|
||||
return
|
||||
if first_quotes[0].get('last') is None:
|
||||
log.error("Broker API is down temporarily")
|
||||
return
|
||||
|
||||
# build out UI
|
||||
Window.set_title(f"monitor: {name}\t(press ? for help)")
|
||||
Builder.load_string(_kv)
|
||||
box = BoxLayout(orientation='vertical', spacing=0)
|
||||
# build out UI
|
||||
Window.set_title(f"monitor: {name}\t(press ? for help)")
|
||||
Builder.load_string(_kv)
|
||||
box = BoxLayout(orientation='vertical', spacing=0)
|
||||
|
||||
# define bid-ask "stacked" cells
|
||||
# (TODO: needs some rethinking and renaming for sure)
|
||||
bidasks = brokermod._bidasks
|
||||
# define bid-ask "stacked" cells
|
||||
# (TODO: needs some rethinking and renaming for sure)
|
||||
bidasks = brokermod._stock_bidasks
|
||||
|
||||
# add header row
|
||||
headers = first_quotes[0].keys()
|
||||
header = Row(
|
||||
{key: key for key in headers},
|
||||
headers=headers,
|
||||
bidasks=bidasks,
|
||||
is_header=True,
|
||||
size_hint=(1, None),
|
||||
# add header row
|
||||
headers = first_quotes[0].keys()
|
||||
header = Row(
|
||||
{key: key for key in headers},
|
||||
headers=headers,
|
||||
bidasks=bidasks,
|
||||
is_header=True,
|
||||
size_hint=(1, None),
|
||||
)
|
||||
box.add_widget(header)
|
||||
|
||||
# build table
|
||||
table = TickerTable(
|
||||
cols=1,
|
||||
size_hint=(1, None),
|
||||
)
|
||||
for ticker_record in first_quotes:
|
||||
table.append_row(
|
||||
ticker_record['symbol'],
|
||||
Row(ticker_record, headers=('symbol',),
|
||||
bidasks=bidasks, table=table)
|
||||
)
|
||||
box.add_widget(header)
|
||||
|
||||
# build table
|
||||
table = TickerTable(
|
||||
cols=1,
|
||||
size_hint=(1, None),
|
||||
)
|
||||
for ticker_record in first_quotes:
|
||||
table.append_row(ticker_record, bidasks=bidasks)
|
||||
# associate the col headers row with the ticker table even though
|
||||
# they're technically wrapped separately in containing BoxLayout
|
||||
header.table = table
|
||||
# associate the col headers row with the ticker table even though
|
||||
# they're technically wrapped separately in containing BoxLayout
|
||||
header.table = table
|
||||
|
||||
# mark the initial sorted column header as bold and underlined
|
||||
sort_cell = header.get_cell(table.sort_key)
|
||||
sort_cell.bold = sort_cell.underline = True
|
||||
table.last_clicked_col_cell = sort_cell
|
||||
# mark the initial sorted column header as bold and underlined
|
||||
sort_cell = header.get_cell(table.sort_key)
|
||||
sort_cell.bold = sort_cell.underline = True
|
||||
table.last_clicked_col_cell = sort_cell
|
||||
|
||||
# set up a pager view for large ticker lists
|
||||
table.bind(minimum_height=table.setter('height'))
|
||||
pager = PagerView(box, table, nursery)
|
||||
box.add_widget(pager)
|
||||
# set up a pager view for large ticker lists
|
||||
table.bind(minimum_height=table.setter('height'))
|
||||
|
||||
widgets = {
|
||||
# 'anchor': anchor,
|
||||
'root': box,
|
||||
'table': table,
|
||||
'box': box,
|
||||
'header': header,
|
||||
'pager': pager,
|
||||
}
|
||||
nursery.start_soon(
|
||||
update_quotes, nursery, brokermod, widgets, quote_gen, sd, quotes)
|
||||
ss = tractor.current_actor().statespace
|
||||
try:
|
||||
async with trio.open_nursery() as nursery:
|
||||
pager = PagerView(
|
||||
container=box,
|
||||
contained=table,
|
||||
nursery=nursery
|
||||
)
|
||||
box.add_widget(pager)
|
||||
|
||||
widgets = {
|
||||
'root': box,
|
||||
'table': table,
|
||||
'box': box,
|
||||
'header': header,
|
||||
'pager': pager,
|
||||
}
|
||||
ss['widgets'] = widgets
|
||||
nursery.start_soon(
|
||||
update_quotes,
|
||||
nursery,
|
||||
brokermod.format_stock_quote,
|
||||
widgets,
|
||||
quote_gen,
|
||||
sd,
|
||||
quotes
|
||||
)
|
||||
|
||||
try:
|
||||
# Trio-kivy entry point.
|
||||
await async_runTouchApp(widgets['root']) # run kivy
|
||||
finally:
|
||||
await quote_gen.aclose() # cancel aysnc gen call
|
||||
# un-subscribe from symbols stream (cancel if brokerd
|
||||
# was already torn down - say by SIGINT)
|
||||
with trio.move_on_after(0.2):
|
||||
await portal.run(
|
||||
"piker.brokers.data", 'modify_quote_stream',
|
||||
broker=brokermod.name,
|
||||
feed_type='stock',
|
||||
symbols=[]
|
||||
)
|
||||
|
||||
# cancel GUI update task
|
||||
nursery.cancel_scope.cancel()
|
||||
finally:
|
||||
with trio.open_cancel_scope(shield=True):
|
||||
# cancel aysnc gen call
|
||||
await quote_gen.aclose()
|
||||
|
|
|
@ -0,0 +1,561 @@
|
|||
"""
|
||||
options: a real-time option chain.
|
||||
|
||||
Launch with ``piker options <symbol>``.
|
||||
"""
|
||||
import types
|
||||
from functools import partial
|
||||
from typing import Dict, List
|
||||
|
||||
import trio
|
||||
from async_generator import asynccontextmanager
|
||||
import tractor
|
||||
from kivy.uix.boxlayout import BoxLayout
|
||||
from kivy.lang import Builder
|
||||
from kivy.app import async_runTouchApp
|
||||
from kivy.core.window import Window
|
||||
from kivy.uix.label import Label
|
||||
|
||||
from ..log import get_logger
|
||||
from ..brokers.core import contracts
|
||||
from .pager import PagerView
|
||||
|
||||
from .tabular import Row, HeaderCell, Cell, TickerTable
|
||||
from .monitor import update_quotes
|
||||
|
||||
|
||||
log = get_logger('option_chain')
|
||||
|
||||
|
||||
async def modify_symbol(symbol):
|
||||
pass
|
||||
|
||||
|
||||
class StrikeCell(HeaderCell):
|
||||
"""Strike cell"""
|
||||
|
||||
|
||||
_no_display = ['symbol', 'contract_type', 'strike', 'time', 'open']
|
||||
_strike_row_cache = {}
|
||||
_strike_cell_cache = {}
|
||||
_no_contracts_msg = "No contracts available for symbol"
|
||||
|
||||
|
||||
class StrikeRow(BoxLayout):
|
||||
"""A 'row' composed of two ``Row``s sandwiching a
|
||||
``StrikeCell`.
|
||||
"""
|
||||
_row_cache = {}
|
||||
|
||||
def __init__(self, strike, **kwargs):
|
||||
super().__init__(orientation='horizontal', **kwargs)
|
||||
self.strike = strike
|
||||
# store 2 rows: 1 for call, 1 for put
|
||||
self._sub_rows = {}
|
||||
self._widgets_added = False
|
||||
|
||||
def append_sub_row(
|
||||
self,
|
||||
record: dict,
|
||||
displayable: dict,
|
||||
bidasks=None,
|
||||
headers=(),
|
||||
table=None,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
# the 'contract_type' determines whether this
|
||||
# is a put or call row
|
||||
contract_type = record['contract_type']
|
||||
|
||||
# We want to only create a few ``Row`` widgets as possible to
|
||||
# speed up rendering; we cache sub rows after creation.
|
||||
row = self._row_cache.get((self.strike, contract_type))
|
||||
if not row:
|
||||
# reverse order of call side cells
|
||||
if contract_type == 'call':
|
||||
record = dict(list(reversed(list(record.items()))))
|
||||
|
||||
row = Row(
|
||||
record,
|
||||
bidasks=bidasks,
|
||||
headers=headers,
|
||||
table=table,
|
||||
no_cell=_no_display,
|
||||
**kwargs
|
||||
)
|
||||
self._row_cache[(self.strike, contract_type)] = row
|
||||
else:
|
||||
# must update the internal cells
|
||||
row.update(record, displayable)
|
||||
|
||||
# reassign widget for when rendered in the update loop
|
||||
row.widget = self
|
||||
self._sub_rows[contract_type] = row
|
||||
|
||||
if self.is_populated() and not self._widgets_added:
|
||||
# calls on the left
|
||||
self.add_widget(self._sub_rows['call'])
|
||||
strike_cell = _strike_cell_cache.setdefault(
|
||||
self.strike, StrikeCell(
|
||||
key=self.strike,
|
||||
text=str(self.strike),
|
||||
# is_header=True,
|
||||
# make centre strike cell nice and small
|
||||
size_hint=(1/10., 1),
|
||||
)
|
||||
)
|
||||
# strikes in the middle
|
||||
self.add_widget(strike_cell)
|
||||
# puts on the right
|
||||
self.add_widget(self._sub_rows['put'])
|
||||
self._widgets_added = True
|
||||
|
||||
def is_populated(self):
|
||||
"""Bool determing if both a put and call subrow have beed appended.
|
||||
"""
|
||||
return len(self._sub_rows) == 2
|
||||
|
||||
def has_widgets(self):
|
||||
return self._widgets_added
|
||||
|
||||
def update(self, record, displayable):
|
||||
self._sub_rows[record['contract_type']].update(
|
||||
record, displayable)
|
||||
|
||||
def get_field(self, key):
|
||||
"""Always sort on the lone field, the strike price.
|
||||
"""
|
||||
return int(self.strike)
|
||||
|
||||
def rowsitems(self):
|
||||
return self._sub_rows.items()
|
||||
|
||||
|
||||
class ExpiryButton(Cell):
|
||||
# must be set to allow 'plain bg colors' since default texture is grey
|
||||
background_normal = ''
|
||||
|
||||
def on_press(self, value=None):
|
||||
last = self.chain._last_expiry
|
||||
if last:
|
||||
last.click_toggle = False
|
||||
self.chain._last_expiry = self
|
||||
|
||||
log.info(f"Clicked {self}")
|
||||
self.click_toggle = True
|
||||
self.chain.start_displaying(self.chain.symbol, self.key)
|
||||
|
||||
|
||||
class DataFeed(object):
|
||||
"""Data feed client for streaming symbol data from a (remote)
|
||||
``brokerd`` data daemon.
|
||||
"""
|
||||
def __init__(self, portal, brokermod):
|
||||
self.portal = portal
|
||||
self.brokermod = brokermod
|
||||
self._symbols = None
|
||||
self.quote_gen = None
|
||||
self._mutex = trio.StrictFIFOLock()
|
||||
|
||||
async def open_stream(self, symbols, rate=1, test=None):
|
||||
async with self._mutex:
|
||||
try:
|
||||
if self.quote_gen is not None and symbols != self._symbols:
|
||||
log.info(
|
||||
f"Stopping existing subscription for {self._symbols}")
|
||||
await self.quote_gen.aclose()
|
||||
self._symbols = symbols
|
||||
|
||||
if test:
|
||||
# stream from a local test file
|
||||
quote_gen = await self.portal.run(
|
||||
"piker.brokers.data", 'stream_from_file',
|
||||
filename=test
|
||||
)
|
||||
else:
|
||||
log.info(f"Starting new stream for {self._symbols}")
|
||||
# start live streaming from broker daemon
|
||||
quote_gen = await self.portal.run(
|
||||
"piker.brokers.data",
|
||||
'start_quote_stream',
|
||||
broker=self.brokermod.name,
|
||||
symbols=symbols,
|
||||
feed_type='option',
|
||||
rate=rate,
|
||||
)
|
||||
|
||||
# get first quotes response
|
||||
log.debug(f"Waiting on first quote for {symbols}...")
|
||||
quotes = {}
|
||||
# with trio.move_on_after(5):
|
||||
quotes = await quote_gen.__anext__()
|
||||
|
||||
self.quote_gen = quote_gen
|
||||
self.first_quotes = quotes
|
||||
return quote_gen, quotes
|
||||
except Exception:
|
||||
if self.quote_gen:
|
||||
await self.quote_gen.aclose()
|
||||
self.quote_gen = None
|
||||
raise
|
||||
|
||||
def format_quotes(self, quotes):
|
||||
records, displayables = zip(*[
|
||||
self.brokermod.format_option_quote(quote, {})
|
||||
for quote in quotes.values()
|
||||
])
|
||||
return records, displayables
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def find_local_monitor():
|
||||
"""Establish a portal to a local monitor for triggering
|
||||
symbol changes.
|
||||
"""
|
||||
async with tractor.find_actor('monitor') as portal:
|
||||
if not portal:
|
||||
log.warn(
|
||||
"No monitor app could be found, no symbol link established..")
|
||||
yield portal
|
||||
|
||||
|
||||
class OptionChain(object):
|
||||
"""A real-time options chain UI.
|
||||
"""
|
||||
_title = "option chain: {symbol}\t(press ? for help)"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
widgets: dict,
|
||||
bidasks: Dict[str, List[str]],
|
||||
feed: DataFeed,
|
||||
rate: int,
|
||||
):
|
||||
self.symbol = None
|
||||
self.expiry = None
|
||||
self.widgets = widgets
|
||||
self.bidasks = bidasks
|
||||
self._strikes2rows = {}
|
||||
self._nursery = None
|
||||
self._update_nursery = None
|
||||
self.feed = feed
|
||||
self._quote_gen = None
|
||||
# TODO: this should be moved down to the data feed layer
|
||||
# right now it's only needed for the UI update loop to cancel itself
|
||||
self._update_cs = None
|
||||
self._first_quotes = None
|
||||
self._last_expiry = None
|
||||
# flag to determine if one-time widgets have been generated
|
||||
self._static_widgets_initialized = False
|
||||
self._no_opts_label = None
|
||||
|
||||
@property
|
||||
def no_opts_label(self):
|
||||
if self._no_opts_label is None:
|
||||
label = self._no_opts_label = Label(text=_no_contracts_msg)
|
||||
label.font_size = 30
|
||||
return self._no_opts_label
|
||||
|
||||
async def _rx_symbols(self):
|
||||
async with find_local_monitor() as portal:
|
||||
if not portal:
|
||||
log.warn("No local monitor could be found")
|
||||
return
|
||||
async for symbol in await portal.run(
|
||||
'piker.ui.monitor',
|
||||
'stream_symbol_selection',
|
||||
):
|
||||
log.info(f"Changing symbol subscriptions to {symbol}")
|
||||
self.start_displaying(symbol, self.expiry)
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_rt_display(self, nursery, symbol, expiry=None):
|
||||
"""Open an internal update task scope required to allow
|
||||
for dynamic real-time operation.
|
||||
"""
|
||||
self._parent_nursery = nursery
|
||||
async with trio.open_nursery() as n:
|
||||
self._nursery = n
|
||||
# fill out and start updatingn strike table
|
||||
n.start_soon(
|
||||
partial(self._start_displaying, symbol, expiry=expiry)
|
||||
)
|
||||
# listen for undlerlying symbol changes from a local monitor app
|
||||
n.start_soon(self._rx_symbols)
|
||||
yield self
|
||||
n.cancel_scope.cancel()
|
||||
|
||||
self._nursery = None
|
||||
# make sure we always tear down our existing data feed
|
||||
await self.feed.quote_gen.aclose()
|
||||
|
||||
def clear_strikes(self):
|
||||
"""Clear the strike rows from the internal table.
|
||||
"""
|
||||
table = self.widgets['table']
|
||||
table.clear()
|
||||
self._strikes2rows.clear()
|
||||
|
||||
def render_rows(self, records, displayables):
|
||||
"""Render all strike rows in the internal table.
|
||||
"""
|
||||
log.debug("Rendering rows")
|
||||
table = self.widgets['table']
|
||||
for record, display in zip(
|
||||
sorted(records, key=lambda q: q['strike']),
|
||||
displayables
|
||||
):
|
||||
strike = record['strike']
|
||||
strike_row = _strike_row_cache.setdefault(
|
||||
strike, StrikeRow(strike))
|
||||
strike_row.append_sub_row(
|
||||
record,
|
||||
display,
|
||||
bidasks=self.bidasks,
|
||||
table=table,
|
||||
)
|
||||
if strike_row.is_populated():
|
||||
# We must fill out the the table's symbol2rows manually
|
||||
# using each contracts "symbol" so that the quote updater
|
||||
# task can look up the right row to update easily
|
||||
# See update_quotes() and ``Row`` internals for details.
|
||||
for contract_type, row in strike_row.rowsitems():
|
||||
symbol = row._last_record['symbol']
|
||||
table.symbols2rows[symbol] = row
|
||||
|
||||
if strike not in self._strikes2rows:
|
||||
# re-adding widgets is an error
|
||||
self._strikes2rows[strike] = strike_row
|
||||
|
||||
log.debug("Finished rendering rows!")
|
||||
|
||||
def _init_static_widgets(self, displayables):
|
||||
assert self._static_widgets_initialized is False
|
||||
container = self.widgets['container']
|
||||
|
||||
# calls / puts header
|
||||
type_header = BoxLayout(
|
||||
orientation='horizontal',
|
||||
size_hint=(1, 1/30.),
|
||||
)
|
||||
calls = Label(text='calls', font_size='20')
|
||||
puts = Label(text='puts', font_size='20')
|
||||
type_header.add_widget(calls)
|
||||
type_header.add_widget(puts)
|
||||
container.add_widget(type_header)
|
||||
|
||||
# figure out header fields for each table based on quote keys
|
||||
headers = displayables[0].keys()
|
||||
header_row = StrikeRow(strike='strike', size_hint=(1, None))
|
||||
header_record = {key: key for key in headers}
|
||||
header_record['contract_type'] = 'put'
|
||||
header_row.append_sub_row(
|
||||
header_record,
|
||||
header_record,
|
||||
headers=headers,
|
||||
bidasks=self.bidasks,
|
||||
is_header=True,
|
||||
size_hint=(1, None),
|
||||
)
|
||||
header_record['contract_type'] = 'call'
|
||||
header_row.append_sub_row(
|
||||
header_record,
|
||||
header_record,
|
||||
headers=headers,
|
||||
bidasks=self.bidasks,
|
||||
is_header=True,
|
||||
size_hint=(1, None),
|
||||
)
|
||||
container.add_widget(header_row)
|
||||
|
||||
# build out chain tables
|
||||
table = TickerTable(
|
||||
sort_key='strike',
|
||||
cols=1,
|
||||
size_hint=(1, None),
|
||||
)
|
||||
header_row.table = table
|
||||
table.bind(minimum_height=table.setter('height'))
|
||||
pager = PagerView(
|
||||
container=container,
|
||||
contained=table,
|
||||
nursery=self._nursery
|
||||
)
|
||||
container.add_widget(pager)
|
||||
|
||||
self.widgets.update({
|
||||
'table': table,
|
||||
'type_header': type_header,
|
||||
'table': table,
|
||||
'pager': pager,
|
||||
})
|
||||
|
||||
async def _start_displaying(self, symbol, expiry=None):
|
||||
"""Main routine to start displaying the real time updated strike
|
||||
table.
|
||||
|
||||
Clear any existing data feed subscription that is no longer needed
|
||||
(eg. when clicking a new expiry button) spin up a new subscription,
|
||||
populate the table and start updating it.
|
||||
"""
|
||||
table = self.widgets.get('table')
|
||||
if table:
|
||||
self.clear_strikes()
|
||||
|
||||
if self._update_cs:
|
||||
log.warn("Cancelling existing update task")
|
||||
self._update_cs.cancel()
|
||||
await trio.sleep(0)
|
||||
|
||||
if self._quote_gen:
|
||||
await self._quote_gen.aclose()
|
||||
|
||||
# redraw any symbol specific UI components
|
||||
if self.symbol != symbol or expiry is None:
|
||||
# set window title
|
||||
self.widgets['window'].set_title(
|
||||
self._title.format(symbol=symbol)
|
||||
)
|
||||
|
||||
# retreive all contracts to populate expiry row
|
||||
all_contracts = await contracts(self.feed.brokermod, symbol)
|
||||
|
||||
if not all_contracts:
|
||||
label = self.no_opts_label
|
||||
label.symbol = symbol
|
||||
if table:
|
||||
table.add_widget(label)
|
||||
return
|
||||
|
||||
# start streaming soonest contract by default if not provided
|
||||
expiry = next(iter(all_contracts)).expiry if not expiry else expiry
|
||||
|
||||
# TODO: figure out how to compact these buttons
|
||||
expiries = {
|
||||
key.expiry: key.expiry[:key.expiry.find('T')]
|
||||
for key in all_contracts
|
||||
}
|
||||
expiry_row = self.widgets['expiry_row']
|
||||
expiry_row.clear_widgets()
|
||||
|
||||
for expiry, justdate in expiries.items():
|
||||
button = ExpiryButton(text=str(justdate), key=expiry)
|
||||
# assign us to each expiry button
|
||||
button.chain = self
|
||||
expiry_row.add_widget(button)
|
||||
|
||||
if self._nursery is None:
|
||||
raise RuntimeError(
|
||||
"You must call open this chain's update scope first!")
|
||||
|
||||
log.debug(f"Waiting on first_quotes for {symbol}:{expiry}")
|
||||
self._quote_gen, first_quotes = await self.feed.open_stream(
|
||||
[(symbol, expiry)]
|
||||
)
|
||||
log.debug(f"Got first_quotes for {symbol}:{expiry}")
|
||||
records, displayables = self.feed.format_quotes(first_quotes)
|
||||
|
||||
# draw static widgets only once
|
||||
if self._static_widgets_initialized is False:
|
||||
self._init_static_widgets(displayables)
|
||||
self._static_widgets_initialized = True
|
||||
|
||||
self.render_rows(records, displayables)
|
||||
|
||||
with trio.open_cancel_scope() as cs:
|
||||
self._update_cs = cs
|
||||
await self._nursery.start(
|
||||
partial(
|
||||
update_quotes,
|
||||
self._nursery,
|
||||
self.feed.brokermod.format_option_quote,
|
||||
self.widgets,
|
||||
self._quote_gen,
|
||||
symbol_data={},
|
||||
first_quotes=first_quotes,
|
||||
)
|
||||
)
|
||||
# always keep track of current subscription
|
||||
self.symbol, self.expiry = symbol, expiry
|
||||
|
||||
def start_displaying(self, symbol, expiry):
|
||||
if self.symbol == symbol and self.expiry == expiry:
|
||||
log.info(f"Clicked {symbol}:{expiry} is already selected")
|
||||
return
|
||||
|
||||
log.info(f"Subscribing for {symbol}:{expiry}")
|
||||
self._nursery.start_soon(
|
||||
partial(self._start_displaying, symbol, expiry=expiry)
|
||||
)
|
||||
|
||||
|
||||
async def new_chain_ui(
|
||||
portal: tractor._portal.Portal,
|
||||
symbol: str,
|
||||
brokermod: types.ModuleType,
|
||||
nursery: trio._core._run.Nursery,
|
||||
rate: int = 1,
|
||||
) -> None:
|
||||
"""Create and return a new option chain UI.
|
||||
"""
|
||||
# use `monitor` styling for now
|
||||
from .monitor import _kv
|
||||
Builder.load_string(_kv)
|
||||
|
||||
# the master container
|
||||
container = BoxLayout(orientation='vertical', spacing=0)
|
||||
|
||||
# expiry buttons row (populated later once contracts are retreived)
|
||||
expiry_row = BoxLayout(
|
||||
orientation='horizontal',
|
||||
size_hint=(1, None),
|
||||
)
|
||||
container.add_widget(expiry_row)
|
||||
|
||||
widgets = {
|
||||
'window': Window,
|
||||
'root': container,
|
||||
'container': container,
|
||||
'expiry_row': expiry_row,
|
||||
}
|
||||
# define bid-ask "stacked" cells
|
||||
# (TODO: needs some rethinking and renaming for sure)
|
||||
bidasks = brokermod._option_bidasks
|
||||
|
||||
feed = DataFeed(portal, brokermod)
|
||||
chain = OptionChain(
|
||||
widgets,
|
||||
bidasks,
|
||||
feed,
|
||||
rate=rate,
|
||||
)
|
||||
return chain
|
||||
|
||||
|
||||
async def _async_main(
|
||||
symbol: str,
|
||||
portal: tractor._portal.Portal,
|
||||
brokermod: types.ModuleType,
|
||||
rate: int = 1,
|
||||
test: bool = False
|
||||
) -> None:
|
||||
'''Launch kivy app + all other related tasks.
|
||||
|
||||
This is started with cli cmd `piker options`.
|
||||
'''
|
||||
async with trio.open_nursery() as nursery:
|
||||
# set up a pager view for large ticker lists
|
||||
chain = await new_chain_ui(
|
||||
portal,
|
||||
symbol,
|
||||
brokermod,
|
||||
nursery,
|
||||
rate=rate,
|
||||
)
|
||||
async with chain.open_rt_display(nursery, symbol):
|
||||
try:
|
||||
# trio-kivy entry point.
|
||||
await async_runTouchApp(chain.widgets['root']) # run kivy
|
||||
finally:
|
||||
# cancel GUI update task
|
||||
nursery.cancel_scope.cancel()
|
|
@ -184,5 +184,5 @@ class PagerView(ScrollView):
|
|||
_, yscale = self.convert_distance_to_scroll(0, pxs)
|
||||
new = self.scroll_y + (yscale * {'u': 1, 'd': -1}[direction])
|
||||
# bound to near [0, 1] to avoid "over-scrolling"
|
||||
limited = max(-0.01, min(new, 1.01))
|
||||
limited = max(-0.001, min(new, 1.001))
|
||||
self.scroll_y = limited
|
||||
|
|
|
@ -0,0 +1,470 @@
|
|||
"""
|
||||
Real-time table components
|
||||
"""
|
||||
from itertools import chain
|
||||
from typing import List
|
||||
from bisect import bisect
|
||||
|
||||
import trio
|
||||
from kivy.uix.gridlayout import GridLayout
|
||||
from kivy.uix.stacklayout import StackLayout
|
||||
from kivy.uix.button import Button
|
||||
from kivy import utils
|
||||
from kivy.properties import BooleanProperty
|
||||
|
||||
from ..log import get_logger
|
||||
from .kivy.mouse_over import new_mouse_over_group
|
||||
|
||||
|
||||
HoverBehavior = new_mouse_over_group()
|
||||
log = get_logger('monitor')
|
||||
|
||||
_colors2hexs = {
|
||||
'darkgray': 'a9a9a9',
|
||||
'gray': '808080',
|
||||
'green': '008000',
|
||||
'forestgreen': '228b22',
|
||||
'red2': 'ff3333',
|
||||
'red': 'ff0000',
|
||||
'firebrick': 'b22222',
|
||||
}
|
||||
|
||||
_colors = {key: utils.rgba(val) for key, val in _colors2hexs.items()}
|
||||
|
||||
|
||||
def colorcode(name):
|
||||
return _colors[name if name else 'gray']
|
||||
|
||||
|
||||
_bs = 0.75 # border size
|
||||
_fs = 20 # font size
|
||||
|
||||
# medium shade of gray that seems to match the
|
||||
# default i3 window borders
|
||||
_i3_rgba = [0.14]*3 + [1]
|
||||
|
||||
# slightly off black like the jellybean bg from
|
||||
# vim colorscheme
|
||||
_cell_rgba = [0.07]*3 + [1]
|
||||
_black_rgba = [0]*4
|
||||
|
||||
_kv = (f'''
|
||||
#:kivy 1.10.0
|
||||
|
||||
<Cell>
|
||||
font_size: {_fs}
|
||||
|
||||
# make text wrap to botom
|
||||
text_size: self.size
|
||||
halign: 'center'
|
||||
valign: 'middle'
|
||||
size: self.texture_size
|
||||
|
||||
# don't set these as the update loop already does it
|
||||
# color: {colorcode('gray')}
|
||||
# font_color: {colorcode('gray')}
|
||||
# font_name: 'Hack-Regular'
|
||||
|
||||
# if `highlight` is set use i3 color by default transparent; use row color
|
||||
# this is currently used for expiry cells on the options chain
|
||||
background_color: {_i3_rgba} if self.click_toggle else {_black_rgba}
|
||||
# must be set to allow 'plain bg colors' since default texture is grey
|
||||
# but right now is only set for option chain expiry buttons
|
||||
# background_normal: ''
|
||||
# spacing: 0, 0
|
||||
# padding: 3, 3
|
||||
|
||||
|
||||
<HeaderCell>
|
||||
font_size: {_fs}
|
||||
# canvas.before:
|
||||
# Color:
|
||||
# rgba: [0.13]*4
|
||||
# BorderImage: # use a fixed size border
|
||||
# pos: self.pos
|
||||
# size: [self.size[0] - {_bs}, self.size[1]]
|
||||
# # 0s are because the containing TickerTable already has spacing
|
||||
# # border: [0, {_bs} , 0, {_bs}]
|
||||
# border: [0, {_bs} , 0, 0]
|
||||
|
||||
|
||||
<TickerTable>
|
||||
spacing: [{_bs}]
|
||||
# row_force_default: True
|
||||
row_default_height: 56
|
||||
cols: 1
|
||||
canvas.before:
|
||||
Color:
|
||||
# i3 style gray as background
|
||||
rgba: {_i3_rgba}
|
||||
Rectangle:
|
||||
# scale with container self here refers to the widget i.e BoxLayout
|
||||
pos: self.pos
|
||||
size: self.size
|
||||
|
||||
|
||||
<BidAskLayout>
|
||||
spacing: [{_bs}, 0]
|
||||
|
||||
|
||||
<Row>
|
||||
# minimum_height: 200 # should be pulled from Cell text size
|
||||
# minimum_width: 200
|
||||
# row_force_default: True
|
||||
# row_default_height: 61 # determines the header row size
|
||||
padding: [0]*4
|
||||
spacing: [0]
|
||||
canvas.before:
|
||||
Color:
|
||||
rgba: {_cell_rgba}
|
||||
Rectangle:
|
||||
# self here refers to the widget i.e Row(GridLayout)
|
||||
pos: self.pos
|
||||
size: self.size
|
||||
# row higlighting on mouse over
|
||||
Color:
|
||||
rgba: {_i3_rgba}
|
||||
# RoundedRectangle:
|
||||
Rectangle:
|
||||
size: self.width, self.height if self.hovered else 1
|
||||
pos: self.pos
|
||||
# radius: (0,)
|
||||
|
||||
|
||||
# part of the `PagerView`
|
||||
<SearchBar>
|
||||
size_hint: 1, None
|
||||
# static size of 51 px
|
||||
height: 51
|
||||
font_size: 25
|
||||
background_color: {_i3_rgba}
|
||||
''')
|
||||
|
||||
|
||||
class Cell(Button):
|
||||
"""Data cell: the fundemental widget.
|
||||
|
||||
``key`` is the column name index value.
|
||||
"""
|
||||
click_toggle = BooleanProperty(False)
|
||||
|
||||
def __init__(self, key=None, is_header=False, **kwargs):
|
||||
super(Cell, self).__init__(**kwargs)
|
||||
self.key = key
|
||||
self.row = None
|
||||
self.is_header = is_header
|
||||
|
||||
def on_press(self, value=None):
|
||||
self.row.on_press()
|
||||
|
||||
|
||||
class HeaderCell(Cell):
|
||||
"""Column header cell label.
|
||||
"""
|
||||
def on_press(self, value=None):
|
||||
"""Clicking on a col header indicates to sort rows by this column
|
||||
in `update_quotes()`.
|
||||
"""
|
||||
table = self.row.table
|
||||
# if this is a row header cell then sort by the clicked field
|
||||
if self.row.is_header:
|
||||
table.sort_key = self.key
|
||||
|
||||
last = table.last_clicked_col_cell
|
||||
if last and last is not self:
|
||||
last.underline = False
|
||||
last.bold = False
|
||||
|
||||
# outline the header text to indicate it's been the last clicked
|
||||
self.underline = True
|
||||
self.bold = True
|
||||
# mark this cell as the last selected
|
||||
table.last_clicked_col_cell = self
|
||||
# sort and render the rows immediately
|
||||
self.row.table.render_rows(table.symbols2rows.values())
|
||||
|
||||
# TODO: make this some kind of small geometry instead
|
||||
# (maybe like how trading view does it).
|
||||
# allow highlighting of row headers for tracking
|
||||
elif self.is_header:
|
||||
if self.background_color == self.color:
|
||||
self.background_color = _black_rgba
|
||||
else:
|
||||
self.background_color = self.color
|
||||
|
||||
|
||||
class BidAskLayout(StackLayout):
|
||||
"""Cell which houses three buttons containing a last, bid, and ask in a
|
||||
single unit oriented with the last 2 under the first.
|
||||
"""
|
||||
def __init__(self, values, header=False, **kwargs):
|
||||
# uncomment to get vertical stacked bid-ask
|
||||
# super(BidAskLayout, self).__init__(orientation='bt-lr', **kwargs)
|
||||
super(BidAskLayout, self).__init__(orientation='lr-tb', **kwargs)
|
||||
assert len(values) == 3, "You can only provide 3 values: last,bid,ask"
|
||||
self._keys2cells = {}
|
||||
cell_type = HeaderCell if header else Cell
|
||||
top_size = cell_type().font_size
|
||||
small_size = top_size - 4
|
||||
top_prop = 0.5 # proportion of size used by top cell
|
||||
bottom_prop = 1 - top_prop
|
||||
for (key, size_hint, font_size), value in zip(
|
||||
[('last', (1, top_prop), top_size),
|
||||
('bid', (0.5, bottom_prop), small_size),
|
||||
('ask', (0.5, bottom_prop), small_size)],
|
||||
# uncomment to get vertical stacked bid-ask
|
||||
# [('last', (top_prop, 1), top_size),
|
||||
# ('bid', (bottom_prop, 0.5), small_size),
|
||||
# ('ask', (bottom_prop, 0.5), small_size)],
|
||||
values
|
||||
):
|
||||
cell = cell_type(
|
||||
text=str(value),
|
||||
size_hint=size_hint,
|
||||
# width=self.width/2 - 3,
|
||||
font_size=font_size
|
||||
)
|
||||
self._keys2cells[key] = cell
|
||||
cell.key = value
|
||||
cell.is_header = header
|
||||
setattr(self, key, cell)
|
||||
self.add_widget(cell)
|
||||
|
||||
# should be assigned by referrer
|
||||
self.row = None
|
||||
|
||||
def get_cell(self, key):
|
||||
return self._keys2cells.get(key)
|
||||
|
||||
@property
|
||||
def row(self):
|
||||
return self.row
|
||||
|
||||
@row.setter
|
||||
def row(self, row):
|
||||
# so hideous
|
||||
for cell in self.cells:
|
||||
cell.row = row
|
||||
|
||||
@property
|
||||
def cells(self):
|
||||
return [self.last, self.bid, self.ask]
|
||||
|
||||
|
||||
class Row(HoverBehavior, GridLayout):
|
||||
"""A grid for displaying a row of ticker quote data.
|
||||
"""
|
||||
def __init__(
|
||||
self,
|
||||
record,
|
||||
headers=(),
|
||||
no_cell=(),
|
||||
bidasks=None,
|
||||
table=None,
|
||||
is_header=False,
|
||||
cell_type=None,
|
||||
**kwargs
|
||||
):
|
||||
super().__init__(cols=len(record), **kwargs)
|
||||
self._cell_widgets = {}
|
||||
self._last_record = record
|
||||
self.table = table
|
||||
self.is_header = is_header
|
||||
self._cell_type = cell_type
|
||||
self.widget = self
|
||||
|
||||
# Create `BidAskCells` first.
|
||||
# bid/ask cells are just 3 cells grouped in a
|
||||
# ``BidAskLayout`` which just stacks the parent cell
|
||||
# on top of 2 children.
|
||||
layouts = {}
|
||||
bidasks = bidasks or {}
|
||||
ba_cells = {}
|
||||
for key, children in bidasks.items():
|
||||
layout = BidAskLayout(
|
||||
[record[key]] + [record[child] for child in children],
|
||||
header=is_header
|
||||
)
|
||||
layout.row = self
|
||||
layouts[key] = layout
|
||||
for i, child in enumerate([key] + children):
|
||||
ba_cells[child] = layout.cells[i]
|
||||
|
||||
children_flat = list(chain.from_iterable(bidasks.values()))
|
||||
self._cell_widgets.update(ba_cells)
|
||||
|
||||
# build out row using Cell labels
|
||||
for (key, val) in record.items():
|
||||
header = key in headers
|
||||
|
||||
# handle bidask cells
|
||||
if key in layouts:
|
||||
self.add_widget(layouts[key])
|
||||
elif key in children_flat:
|
||||
# these cells have already been added to the `BidAskLayout`
|
||||
continue
|
||||
elif key not in no_cell:
|
||||
cell = self._append_cell(val, key, header=header)
|
||||
cell.key = key
|
||||
self._cell_widgets[key] = cell
|
||||
|
||||
def iter_cells(self):
|
||||
return self._cell_widgets.items()
|
||||
|
||||
def get_cell(self, key):
|
||||
return self._cell_widgets.get(key)
|
||||
|
||||
def get_field(self, key):
|
||||
return self._last_record[key]
|
||||
|
||||
def _append_cell(self, text, key, header=False):
|
||||
if not len(self._cell_widgets) < self.cols:
|
||||
raise ValueError(f"Can not append more then {self.cols} cells")
|
||||
|
||||
# header cells just have a different colour
|
||||
celltype = self._cell_type or (HeaderCell if header else Cell)
|
||||
cell = celltype(text=str(text), key=key)
|
||||
cell.is_header = header
|
||||
cell.row = self
|
||||
self.add_widget(cell)
|
||||
return cell
|
||||
|
||||
def update(self, record, displayable):
|
||||
"""Update this row's cells with new values from a quote
|
||||
``record``.
|
||||
|
||||
Return all cells that changed in a ``dict``.
|
||||
"""
|
||||
# color changed field values
|
||||
cells = {}
|
||||
gray = colorcode('gray')
|
||||
fgreen = colorcode('forestgreen')
|
||||
red = colorcode('red2')
|
||||
for key, val in record.items():
|
||||
last = self.get_field(key)
|
||||
color = gray
|
||||
try:
|
||||
# logic for cell text coloring: up-green, down-red
|
||||
if last < val:
|
||||
color = fgreen
|
||||
elif last > val:
|
||||
color = red
|
||||
except TypeError:
|
||||
log.warn(f"wtf QT {val} is not regular?")
|
||||
|
||||
cell = self.get_cell(key)
|
||||
# some displayable fields might have specifically
|
||||
# not had cells created as set in the `no_cell` attr
|
||||
if cell is not None:
|
||||
cell.text = str(displayable[key])
|
||||
cell.color = color
|
||||
if color != gray:
|
||||
cells[key] = cell
|
||||
|
||||
self._last_record = record
|
||||
return cells
|
||||
|
||||
# mouse over handlers
|
||||
def on_enter(self):
|
||||
"""Highlight layout on enter.
|
||||
"""
|
||||
log.debug(
|
||||
f"Entered row {self} through {self.border_point}")
|
||||
# don't highlight header row
|
||||
if self.is_header:
|
||||
self.hovered = False
|
||||
|
||||
def on_leave(self):
|
||||
"""Un-highlight layout on exit.
|
||||
"""
|
||||
log.debug(
|
||||
f"Left row {self} through {self.border_point}")
|
||||
|
||||
def on_press(self, value=None):
|
||||
log.info(f"Pressed row for {self._last_record['symbol']}")
|
||||
if self.table and not self.is_header:
|
||||
for q in self.table._click_queues:
|
||||
q.put_nowait(self._last_record['symbol'])
|
||||
|
||||
|
||||
class TickerTable(GridLayout):
|
||||
"""A grid for displaying ticker quote records as a table.
|
||||
"""
|
||||
def __init__(self, sort_key='%', auto_sort=True, **kwargs):
|
||||
super(TickerTable, self).__init__(**kwargs)
|
||||
self.symbols2rows = {}
|
||||
self.sort_key = sort_key
|
||||
# for tracking last clicked column header cell
|
||||
self.last_clicked_col_cell = None
|
||||
self._auto_sort = auto_sort
|
||||
self._symbols2index = {}
|
||||
self._sorted = []
|
||||
self._click_queues: List[trio.Queue] = []
|
||||
|
||||
def append_row(self, key, row):
|
||||
"""Append a `Row` of `Cell` objects to this table.
|
||||
"""
|
||||
# store ref to each row
|
||||
self.symbols2rows[key] = row
|
||||
self.add_widget(row)
|
||||
self._sorted.append(row)
|
||||
return row
|
||||
|
||||
def clear(self):
|
||||
self.clear_widgets()
|
||||
self._sorted.clear()
|
||||
|
||||
def render_rows(
|
||||
self,
|
||||
changed: set,
|
||||
sort_key: str = None,
|
||||
):
|
||||
"""Sort and render all rows on the ticker grid from ``syms2rows``.
|
||||
"""
|
||||
sort_key = sort_key or self.sort_key
|
||||
key_row_pairs = list(sorted(
|
||||
[(row.get_field(sort_key), row) for row in self._sorted],
|
||||
key=lambda item: item[0],
|
||||
))
|
||||
if key_row_pairs:
|
||||
sorted_keys, sorted_rows = zip(*key_row_pairs)
|
||||
sorted_keys, sorted_rows = list(sorted_keys), list(sorted_rows)
|
||||
else:
|
||||
sorted_keys, sorted_rows = [], []
|
||||
|
||||
# now remove and re-insert any rows that need to be shuffled
|
||||
# due to new a new field change
|
||||
for row in changed:
|
||||
try:
|
||||
old_index = sorted_rows.index(row)
|
||||
except ValueError:
|
||||
# row is not yet added so nothing to remove
|
||||
pass
|
||||
else:
|
||||
del sorted_rows[old_index]
|
||||
del sorted_keys[old_index]
|
||||
self._sorted.remove(row)
|
||||
self.remove_widget(row)
|
||||
|
||||
for row in changed:
|
||||
key = row.get_field(sort_key)
|
||||
index = bisect(sorted_keys, key)
|
||||
sorted_keys.insert(index, key)
|
||||
self._sorted.insert(index, row)
|
||||
self.add_widget(row, index=index)
|
||||
|
||||
def ticker_search(self, patt):
|
||||
"""Return sequence of matches when pattern ``patt`` is in a
|
||||
symbol name. Most naive algo possible for the moment.
|
||||
"""
|
||||
for symbol, row in self.symbols2rows.items():
|
||||
if patt in symbol:
|
||||
yield symbol, row
|
||||
|
||||
def get_row(self, symbol: str) -> Row:
|
||||
return self.symbols2rows[symbol]
|
||||
|
||||
def search(self, patt):
|
||||
"""Search bar api compat.
|
||||
"""
|
||||
return dict(self.ticker_search(patt)) or {}
|
|
@ -51,8 +51,10 @@ _ex_quotes = {
|
|||
'askSize': 0,
|
||||
'bidPrice': None,
|
||||
'bidSize': 0,
|
||||
'contract_type': 'call',
|
||||
'delay': 0,
|
||||
'delta': -0.212857,
|
||||
"expiry": "2021-01-15T00:00:00.000000-05:00",
|
||||
'gamma': 0.003524,
|
||||
'highPrice': 0,
|
||||
'isHalted': False,
|
||||
|
@ -66,6 +68,7 @@ _ex_quotes = {
|
|||
'openInterest': 1,
|
||||
'openPrice': 0,
|
||||
'rho': -0.891868,
|
||||
"strike": 8,
|
||||
'symbol': 'WEED15Jan21P54.00.MX',
|
||||
'symbolId': 22739148,
|
||||
'theta': -0.012911,
|
||||
|
@ -183,7 +186,7 @@ async def stream_option_chain(portal, symbols):
|
|||
|
||||
``symbols`` arg is ignored here.
|
||||
"""
|
||||
symbol = 'APHA.TO' # your fave greenhouse LP
|
||||
symbol = symbols[0]
|
||||
async with qt.get_client() as client:
|
||||
contracts = await client.get_all_contracts([symbol])
|
||||
|
||||
|
@ -198,16 +201,21 @@ async def stream_option_chain(portal, symbols):
|
|||
broker='questrade',
|
||||
symbols=[sub],
|
||||
feed_type='option',
|
||||
rate=4,
|
||||
diff_cached=False,
|
||||
)
|
||||
# latency arithmetic
|
||||
loops = 8
|
||||
rate = 1/3. # 3 rps
|
||||
timeout = loops / rate
|
||||
|
||||
try:
|
||||
# wait on the data streamer to actually start
|
||||
# delivering
|
||||
await agen.__anext__()
|
||||
|
||||
# it'd sure be nice to have an asyncitertools here...
|
||||
with trio.fail_after(2.1):
|
||||
loops = 8
|
||||
with trio.fail_after(timeout):
|
||||
count = 0
|
||||
async for quotes in agen:
|
||||
# print(f'got quotes for {quotes.keys()}')
|
||||
|
@ -221,15 +229,33 @@ async def stream_option_chain(portal, symbols):
|
|||
count += 1
|
||||
if count == loops:
|
||||
break
|
||||
|
||||
# switch the subscription and make sure
|
||||
# stream is still working
|
||||
sub = subs_keys[1]
|
||||
await agen.aclose()
|
||||
agen = await portal.run(
|
||||
'piker.brokers.data',
|
||||
'start_quote_stream',
|
||||
broker='questrade',
|
||||
symbols=[sub],
|
||||
feed_type='option',
|
||||
rate=4,
|
||||
diff_cached=False,
|
||||
)
|
||||
|
||||
await agen.__anext__()
|
||||
with trio.fail_after(timeout):
|
||||
count = 0
|
||||
async for quotes in agen:
|
||||
for symbol, quote in quotes.items():
|
||||
assert quote['key'] == sub
|
||||
count += 1
|
||||
if count == loops:
|
||||
break
|
||||
finally:
|
||||
# unsub
|
||||
await portal.run(
|
||||
'piker.brokers.data',
|
||||
'modify_quote_stream',
|
||||
broker='questrade',
|
||||
feed_type='option',
|
||||
symbols=[],
|
||||
)
|
||||
await agen.aclose()
|
||||
|
||||
|
||||
async def stream_stocks(portal, symbols):
|
||||
|
@ -240,6 +266,7 @@ async def stream_stocks(portal, symbols):
|
|||
'start_quote_stream',
|
||||
broker='questrade',
|
||||
symbols=symbols,
|
||||
diff_cached=False,
|
||||
)
|
||||
try:
|
||||
# it'd sure be nice to have an asyncitertools here...
|
||||
|
@ -250,13 +277,7 @@ async def stream_stocks(portal, symbols):
|
|||
break
|
||||
finally:
|
||||
# unsub
|
||||
await portal.run(
|
||||
'piker.brokers.data',
|
||||
'modify_quote_stream',
|
||||
broker='questrade',
|
||||
feed_type='stock',
|
||||
symbols=[],
|
||||
)
|
||||
await agen.aclose()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -265,8 +286,14 @@ async def stream_stocks(portal, symbols):
|
|||
(stream_stocks,),
|
||||
(stream_option_chain,),
|
||||
(stream_stocks, stream_option_chain),
|
||||
(stream_stocks, stream_stocks),
|
||||
(stream_option_chain, stream_option_chain),
|
||||
],
|
||||
ids=[
|
||||
'stocks', 'options',
|
||||
'stocks_and_options', 'stocks_and_stocks',
|
||||
'options_and_options',
|
||||
],
|
||||
ids=['stocks', 'options', 'stocks_and_options'],
|
||||
)
|
||||
@tractor_test
|
||||
async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
|
||||
|
@ -284,9 +311,16 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
|
|||
'piker.brokers.core'
|
||||
],
|
||||
)
|
||||
async with trio.open_nursery() as n:
|
||||
for func in stream_what:
|
||||
n.start_soon(func, portal, tmx_symbols)
|
||||
if len(stream_what) > 1:
|
||||
# stream disparate symbol sets per task
|
||||
first, *tail = tmx_symbols
|
||||
symbols = ([first], tail)
|
||||
else:
|
||||
symbols = [tmx_symbols]
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
for syms, func in zip(symbols, stream_what):
|
||||
n.start_soon(func, portal, syms)
|
||||
|
||||
# stop all spawned subactors
|
||||
await nursery.cancel()
|
||||
|
|
Loading…
Reference in New Issue