From af464b45ffc744a011c426bd291ea3dd50a289d1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 25 Nov 2018 14:55:55 -0500 Subject: [PATCH 01/17] Add an async function cache with a LIFO policy Relates to #59 --- piker/_async_utils.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 piker/_async_utils.py diff --git a/piker/_async_utils.py b/piker/_async_utils.py new file mode 100644 index 00000000..ca3f5b4d --- /dev/null +++ b/piker/_async_utils.py @@ -0,0 +1,33 @@ +""" +Async utils no one seems to have built into a core lib (yet). +""" +from collections import OrderedDict + + +def alifo_cache(maxsize=128): + """Async ``cache`` with a LIFO policy. + + Implemented my own since no one else seems to have + a standard. I'll wait for the smarter people to come + up with one, but until then... + """ + cache = OrderedDict() + + def decorator(fn): + + async def wrapper(*args): + key = args + try: + return cache[key] + except KeyError: + if len(cache) >= maxsize: + # discard last added new entry + cache.popitem() + + # do it + cache[key] = await fn(*args) + return cache[key] + + return wrapper + + return decorator From 714c203c3ec3561c039bbb1e512aa3ff599af9d8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 25 Nov 2018 15:00:08 -0500 Subject: [PATCH 02/17] Cache symbol ids where possible Cache both in the client and at the function call level inside the quoter context using a `@afifo_cache`. --- piker/brokers/questrade.py | 55 +++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 0f8738a5..464b293a 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -14,6 +14,7 @@ from ..calc import humanize, percent_change from . import config from ._util import resproc, BrokerError from ..log import get_logger, colorize_json +from .._async_utils import alifo_cache # TODO: move to urllib3/requests once supported import asks @@ -112,6 +113,7 @@ class Client: self.access_data = {} self.user_data = {} self._reload_config(config) + self._symbol_cache = {} def _reload_config(self, config=None, **kwargs): log.warn("Reloading access config data") @@ -212,12 +214,25 @@ class Client: async def tickers2ids(self, tickers): """Helper routine that take a sequence of ticker symbols and returns - their corresponding QT symbol ids. + their corresponding QT numeric symbol ids. + + Cache any symbol to id lookups for later use. """ - data = await self.api.symbols(names=','.join(tickers)) + cache = self._symbol_cache symbols2ids = {} - for ticker, symbol in zip(tickers, data['symbols']): - symbols2ids[symbol['symbol']] = str(symbol['symbolId']) + for symbol in tickers: + id = cache.get(symbol) + if id is not None: + symbols2ids[symbol] = id + + # still missing uncached values - hit the server + to_lookup = list(set(tickers) - set(symbols2ids)) + if to_lookup: + data = await self.api.symbols(names=','.join(to_lookup)) + for ticker, symbol in zip(to_lookup, data['symbols']): + name = symbol['symbol'] + assert name == ticker + cache[name] = symbols2ids[name] = str(symbol['symbolId']) return symbols2ids @@ -384,30 +399,21 @@ async def quoter(client: Client, tickers: List[str]): a cache of this map lazily as requests from in for new tickers/symbols. Most of the closure variables here are to deal with that. """ - t2ids = {} - ids = '' - def filter_symbols(quotes_dict: dict): - nonlocal t2ids - for symbol, quote in quotes_dict.items(): - if quote['low52w'] is None: - log.warn( - f"{symbol} seems to be defunct discarding from tickers") - t2ids.pop(symbol) + @alifo_cache(maxsize=128) + async def get_symbol_id_seq(symbols: Tuple[str]): + """For each tuple ``(symbol_1, symbol_2, ... , symbol_n)`` + return a symbol id sequence string ``'id_1,id_2, ... , id_n'``. + """ + return ','.join(map(str, (await client.tickers2ids(symbols)).values())) async def get_quote(tickers): """Query for quotes using cached symbol ids. """ if not tickers: return {} - nonlocal ids, t2ids - new, current = set(tickers), set(t2ids.keys()) - if new != current: - # update ticker ids cache - log.debug(f"Tickers set changed {new - current}") - t2ids = await client.tickers2ids(tickers) - # re-save symbol -> ids cache - ids = ','.join(map(str, t2ids.values())) + + ids = await get_symbol_id_seq(tuple(tickers)) try: quotes_resp = await client.api.quotes(ids=ids) @@ -443,9 +449,10 @@ async def quoter(client: Client, tickers: List[str]): # strip out unknown/invalid symbols first_quotes_dict = await get_quote(tickers) - filter_symbols(first_quotes_dict) - # re-save symbol -> ids cache - ids = ','.join(map(str, t2ids.values())) + for symbol, quote in first_quotes_dict.items(): + if quote['low52w'] is None: + log.warn( + f"{symbol} seems to be defunct") return get_quote From 75d22c60580ec4f08bd74e6851722d7607a5cba9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 25 Nov 2018 19:23:07 -0500 Subject: [PATCH 03/17] An explicit name is prolly better --- piker/_async_utils.py | 2 +- piker/brokers/questrade.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/_async_utils.py b/piker/_async_utils.py index ca3f5b4d..7069d597 100644 --- a/piker/_async_utils.py +++ b/piker/_async_utils.py @@ -4,7 +4,7 @@ Async utils no one seems to have built into a core lib (yet). from collections import OrderedDict -def alifo_cache(maxsize=128): +def async_lifo_cache(maxsize=128): """Async ``cache`` with a LIFO policy. Implemented my own since no one else seems to have diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 464b293a..4192c2de 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -14,7 +14,7 @@ from ..calc import humanize, percent_change from . import config from ._util import resproc, BrokerError from ..log import get_logger, colorize_json -from .._async_utils import alifo_cache +from .._async_utils import async_lifo_cache # TODO: move to urllib3/requests once supported import asks @@ -400,7 +400,7 @@ async def quoter(client: Client, tickers: List[str]): Most of the closure variables here are to deal with that. """ - @alifo_cache(maxsize=128) + @async_lifo_cache(maxsize=128) async def get_symbol_id_seq(symbols: Tuple[str]): """For each tuple ``(symbol_1, symbol_2, ... , symbol_n)`` return a symbol id sequence string ``'id_1,id_2, ... , id_n'``. From cd7d8d024d5a327a6ebcfef949eaf54be60d853f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Nov 2018 00:33:40 -0500 Subject: [PATCH 04/17] Add option quoter support for streaming So much changed to get this working for both stocks and options: - Index contracts by a new `ContractsKey` named tuple - Move to pushing lists of quotes instead of dicts since option subscriptions are often not identified by their "symbol" key and this makes it difficult at fan out time to know how a quote should be indexed and delivered. Instead add a special `key` entry to each quote dict which is the quote's subscription key. --- piker/brokers/questrade.py | 203 +++++++++++++++++++++++++------------ 1 file changed, 136 insertions(+), 67 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 4192c2de..6c0427ed 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -5,7 +5,8 @@ import time from datetime import datetime from functools import partial import configparser -from typing import List, Tuple, Dict, Any +from operator import itemgetter +from typing import List, Tuple, Dict, Any, Iterator, NamedTuple import trio from async_generator import asynccontextmanager @@ -24,13 +25,19 @@ log = get_logger(__name__) _refresh_token_ep = 'https://login.questrade.com/oauth2/' _version = 'v1' -_rate_limit = 3 # queries/sec +_rate_limit = 4 # queries/sec class QuestradeError(Exception): "Non-200 OK response code" +class ContractsKey(NamedTuple): + symbol: str + id: int + expiry: datetime + + class _API: """Questrade API endpoints exposed as methods and wrapped with an http session. @@ -61,7 +68,11 @@ class _API: 'symbols', params={'ids': ids, 'names': names}) async def quotes(self, ids: str) -> dict: - return await self._request('markets/quotes', params={'ids': ids}) + quotes = (await self._request( + 'markets/quotes', params={'ids': ids}))['quotes'] + for quote in quotes: + quote['key'] = quote['symbol'] + return quotes async def candles(self, id: str, start: str, end, interval) -> dict: return await self._request(f'markets/candles/{id}', params={}) @@ -79,20 +90,19 @@ class _API: async def option_quotes( self, - contracts: Dict[int, Dict[str, dict]], + contracts: Dict[ContractsKey, Dict[int, dict]], option_ids: List[int] = [], # if you don't want them all ) -> dict: - "Retrieve option chain quotes for all option ids or by filter(s)." + """Retrieve option chain quotes for all option ids or by filter(s). + """ filters = [ { "underlyingId": int(symbol_id), "expiryDate": str(expiry), } # every expiry per symbol id - for symbol_id, expiries in contracts.items() - for expiry in expiries + for (symbol, symbol_id, expiry), bystrike in contracts.items() ] - resp = await self._sess.post( path=f'/markets/quotes/options', json={'filters': filters, 'optionIds': option_ids} @@ -111,9 +121,9 @@ class Client: self.api = _API(self._sess) self._conf = config self.access_data = {} - self.user_data = {} self._reload_config(config) - self._symbol_cache = {} + self._symbol_cache: Dict[str, int] = {} + self._contracts2expiries = {} def _reload_config(self, config=None, **kwargs): log.warn("Reloading access config data") @@ -252,8 +262,7 @@ class Client: """ t2ids = await self.tickers2ids(tickers) ids = ','.join(t2ids.values()) - results = (await self.api.quotes(ids=ids))['quotes'] - quotes = {quote['symbol']: quote for quote in results} + quotes = (await self.api.quotes(ids=ids)) # set None for all symbols not found if len(t2ids) < len(tickers): @@ -266,7 +275,7 @@ class Client: async def symbol2contracts( self, symbol: str - ) -> Tuple[int, Dict[datetime, dict]]: + ) -> Dict[Tuple[str, int, datetime], dict]: """Return option contract for the given symbol. The most useful part is the expiries which can be passed to the option @@ -274,15 +283,18 @@ class Client: """ id = int((await self.tickers2ids([symbol]))[symbol]) contracts = await self.api.option_contracts(id) - return id, { - # convert to native datetime objs for sorting - datetime.fromisoformat(item['expiryDate']): - item for item in contracts + return { + ContractsKey( + symbol=symbol, + id=id, + # convert to native datetime objs for sorting + expiry=datetime.fromisoformat(item['expiryDate'])): + item for item in contracts } async def get_all_contracts( self, - symbols: List[str], + symbols: Iterator[str], # {symbol_id: {dt_iso_contract: {strike_price: {contract_id: id}}}} ) -> Dict[int, Dict[str, Dict[int, Any]]]: """Look up all contracts for each symbol in ``symbols`` and return the @@ -293,21 +305,29 @@ class Client: per symbol) and thus the return values should be cached for use with ``option_chains()``. """ - by_id = {} + by_key = {} for symbol in symbols: - id, contracts = await self.symbol2contracts(symbol) - by_id[id] = { - dt.isoformat(timespec='microseconds'): { + contracts = await self.symbol2contracts(symbol) + # FIXME: chainPerRoot here is probably why in some UIs + # you see a second chain with a (1) suffixed; should + # probably handle this eventually. + for key, byroot in sorted( + # sort by datetime + contracts.items(), + key=lambda item: item[0].expiry + ): + by_key[ + ContractsKey( + key.symbol, + key.id, + # converting back - maybe just do this initially? + key.expiry.isoformat(timespec='microseconds'), + ) + ] = { item['strikePrice']: item for item in byroot['chainPerRoot'][0]['chainPerStrikePrice'] } - for dt, byroot in sorted( - # sort by datetime - contracts.items(), - key=lambda item: item[0] - ) - } - return by_id + return by_key async def option_chains( self, @@ -316,12 +336,14 @@ class Client: ) -> Dict[str, Dict[str, Dict[str, Any]]]: """Return option chain snap quote for each ticker in ``symbols``. """ - quotes = await self.api.option_quotes(contracts) - batch = {} - for quote in quotes: - batch.setdefault( - quote['underlying'], {} - )[quote['symbol']] = quote + batch = [] + for key, bystrike in contracts.items(): + quotes = await self.api.option_quotes({key: bystrike}) + for quote in quotes: + # index by .symbol, .expiry since that's what + # a subscriber (currently) sends initially + quote['key'] = (key[0], key[2]) + batch.extend(quotes) return batch @@ -391,15 +413,14 @@ async def get_client() -> Client: write_conf(client) -async def quoter(client: Client, tickers: List[str]): - """Stock Quoter context. +async def stock_quoter(client: Client, tickers: List[str]): + """Stock quoter context. Yeah so fun times..QT has this symbol to ``int`` id lookup system that you have to use to get any quotes. That means we try to be smart and maintain a cache of this map lazily as requests from in for new tickers/symbols. Most of the closure variables here are to deal with that. """ - @async_lifo_cache(maxsize=128) async def get_symbol_id_seq(symbols: Tuple[str]): """For each tuple ``(symbol_1, symbol_2, ... , symbol_n)`` @@ -411,6 +432,7 @@ async def quoter(client: Client, tickers: List[str]): """Query for quotes using cached symbol ids. """ if not tickers: + # don't hit the network return {} ids = await get_symbol_id_seq(tuple(tickers)) @@ -418,41 +440,88 @@ async def quoter(client: Client, tickers: List[str]): try: quotes_resp = await client.api.quotes(ids=ids) except (QuestradeError, BrokerError) as qterr: - if "Access token is invalid" in str(qterr.args[0]): - # out-of-process piker actor may have - # renewed already.. - client._reload_config() - try: - quotes_resp = await client.api.quotes(ids=ids) - except BrokerError as qterr: - if "Access token is invalid" in str(qterr.args[0]): - # TODO: this will crash when run from a sub-actor since - # STDIN can't be acquired. The right way to handle this - # is to make a request to the parent actor (i.e. - # spawner of this) to call this - # `client.ensure_access()` locally thus blocking until - # the user provides an API key on the "client side" - await client.ensure_access(force_refresh=True) - quotes_resp = await client.api.quotes(ids=ids) - else: + if "Access token is invalid" not in str(qterr.args[0]): raise + # out-of-process piker actor may have + # renewed already.. + client._reload_config() + try: + quotes_resp = await client.api.quotes(ids=ids) + except BrokerError as qterr: + if "Access token is invalid" in str(qterr.args[0]): + # TODO: this will crash when run from a sub-actor since + # STDIN can't be acquired. The right way to handle this + # is to make a request to the parent actor (i.e. + # spawner of this) to call this + # `client.ensure_access()` locally thus blocking until + # the user provides an API key on the "client side" + await client.ensure_access(force_refresh=True) + quotes_resp = await client.api.quotes(ids=ids) - # dict packing and post-processing - quotes = {} - for quote in quotes_resp['quotes']: - quotes[quote['symbol']] = quote - + # post-processing + for quote in quotes_resp: if quote.get('delay', 0) > 0: log.warn(f"Delayed quote:\n{quote}") - return quotes + return quotes_resp - # strip out unknown/invalid symbols - first_quotes_dict = await get_quote(tickers) - for symbol, quote in first_quotes_dict.items(): - if quote['low52w'] is None: - log.warn( - f"{symbol} seems to be defunct") + return get_quote + + +async def option_quoter(client: Client, tickers: List[str]): + """Option quoter context. + """ + # sanity + if isinstance(tickers[0], tuple): + datetime.fromisoformat(tickers[0][1]) + else: + log.warn(f"Ignoring option quoter call with {tickers}") + # TODO make caller always check that a quoter has been set + return + + @async_lifo_cache(maxsize=128) + async def get_contract_by_date(sym_date_pairs: Tuple[Tuple[str, str]]): + """For each tuple, + ``(symbol_date_1, symbol_date_2, ... , symbol_date_n)`` + return a contract dict. + """ + symbols = map(itemgetter(0), sym_date_pairs) + dates = map(itemgetter(1), sym_date_pairs) + contracts = await client.get_all_contracts(symbols) + selected = {} + for key, val in contracts.items(): + if key.expiry in dates: + selected[key] = val + + return selected + + async def get_quote(symbol_date_pairs): + """Query for quotes using cached symbol ids. + """ + contracts = await get_contract_by_date( + tuple(symbol_date_pairs)) + try: + quotes = await client.option_chains(contracts) + except (QuestradeError, BrokerError) as qterr: + if "Access token is invalid" not in str(qterr.args[0]): + raise + # out-of-process piker actor may have + # renewed already.. + client._reload_config() + try: + quotes = await client.option_chains(contracts) + except BrokerError as qterr: + if "Access token is invalid" in str(qterr.args[0]): + # TODO: this will crash when run from a sub-actor since + # STDIN can't be acquired. The right way to handle this + # is to make a request to the parent actor (i.e. + # spawner of this) to call this + # `client.ensure_access()` locally thus blocking until + # the user provides an API key on the "client side" + await client.ensure_access(force_refresh=True) + quotes = await client.option_chains(contracts) + + return quotes return get_quote From c7cf0cde9cafdf8e7b2819a6f4b05dc2c02b3f2b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Nov 2018 01:34:32 -0500 Subject: [PATCH 05/17] Add options streaming Well that was a doozy; had to rejig pretty much all of it. The deats: - Track broker components in a new `DataFeed` namedtuple - port to new list based batch quotes (not dicts any more) - lock access to cached broker-client / data-feed instantiation - respawn tasks that fail due to the network --- piker/brokers/data.py | 259 ++++++++++++++++++++++++++---------------- 1 file changed, 162 insertions(+), 97 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index b1427cab..bbbe5678 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -7,7 +7,10 @@ from itertools import cycle import socket import json from types import ModuleType -from typing import Coroutine, Callable, Dict, List +import typing +from typing import Coroutine, Callable, Dict, List, Any +import contextlib +from operator import itemgetter import trio import tractor @@ -56,12 +59,14 @@ async def stream_quotes( _cache = {} # ticker to quote caching while True: # use an event here to trigger exit? + prequote_start = time.time() - # tickers = list(tickers2chans.keys()) with trio.move_on_after(3) as cancel_scope: quotes = await request_quotes() + postquote_start = time.time() + cancelled = cancel_scope.cancelled_caught if cancelled: log.warn("Quote query timed out after 3 seconds, retrying...") @@ -69,18 +74,20 @@ async def stream_quotes( # quotes = await wait_for_network(partial(get_quotes, tickers)) quotes = await wait_for_network(request_quotes) - postquote_start = time.time() - new_quotes = {} + new_quotes = [] if diff_cached: - # if cache is enabled then only deliver "new" changes - for symbol, quote in quotes.items(): - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - new_quotes[symbol] = quote + # If cache is enabled then only deliver "new" changes. + # Useful for polling setups but obviously should be + # disabled if you're rx-ing event data. + for quote in quotes: + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + new_quotes.append(quote) else: new_quotes = quotes @@ -101,31 +108,51 @@ async def stream_quotes( await trio.sleep(delay) +class DataFeed(typing.NamedTuple): + """A per broker "data feed" container. + + A structure to keep track of components used by + real-time data daemons. + """ + mod: ModuleType + client: object + quoter_keys: List[str] = ['stock', 'option'] + tasks: Dict[str, trio._core._run.Task] = dict.fromkeys( + quoter_keys, False) + quoters: Dict[str, typing.Coroutine] = {} + subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}} + + async def fan_out_to_chans( - brokermod: ModuleType, + feed: DataFeed, get_quotes: Coroutine, - tickers2chans: Dict[str, tractor.Channel], + symbols2chans: Dict[str, tractor.Channel], rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue cid: str = None, ) -> None: """Request and fan out quotes to each subscribed actor channel. """ - broker_limit = getattr(brokermod, '_rate_limit', float('inf')) + broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) if broker_limit < rate: rate = broker_limit - log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") async def request(): """Get quotes for current symbol subscription set. """ - return await get_quotes(list(tickers2chans.keys())) + return await get_quotes(list(symbols2chans.keys())) - async for quotes in stream_quotes(brokermod, request, rate): + async for quotes in stream_quotes( + feed.mod, request, rate, + diff_cached=diff_cached, + ): chan_payloads = {} - for symbol, quote in quotes.items(): + for quote in quotes: + # is this too QT specific? + symbol = quote['symbol'] # set symbol quotes for each subscriber - for chan, cid in tickers2chans.get(symbol, set()): + for chan, cid in symbols2chans.get(quote['key'], set()): chan_payloads.setdefault( chan, {'yield': {}, 'cid': cid} @@ -142,41 +169,21 @@ async def fan_out_to_chans( ConnectionRefusedError, ): log.warn(f"{chan} went down?") - for chanset in tickers2chans.values(): + for chanset in symbols2chans.values(): chanset.discard((chan, cid)) - if not any(tickers2chans.values()): - log.warn(f"No subs left for broker {brokermod.name}, exiting task") + if not any(symbols2chans.values()): + log.warn(f"No subs left for broker {feed.mod.name}, exiting task") break - log.info(f"Terminating stream quoter task for {brokermod.name}") + log.info(f"Terminating stream quoter task for {feed.mod.name}") -async def get_cached_client(broker, tickers): - """Get or create the current actor's cached broker client. - """ - # check if a cached client is in the local actor's statespace - clients = tractor.current_actor().statespace.setdefault('clients', {}) - try: - return clients[broker] - except KeyError: - log.info(f"Creating new client for broker {broker}") - brokermod = get_brokermod(broker) - # TODO: move to AsyncExitStack in 3.7 - client_cntxmng = brokermod.get_client() - client = await client_cntxmng.__aenter__() - get_quotes = await brokermod.quoter(client, tickers) - clients[broker] = ( - brokermod, client, client_cntxmng, get_quotes) - - return brokermod, client, client_cntxmng, get_quotes - - -async def symbol_data(broker, tickers): +async def symbol_data(broker: str, tickers: List[str]): """Retrieve baseline symbol info from broker. """ - _, client, _, get_quotes = await get_cached_client(broker, tickers) - return await client.symbol_data(tickers) + feed = await get_cached_feed(broker) + return await feed.client.symbol_data(tickers) async def smoke_quote(get_quotes, tickers, broker): @@ -192,8 +199,9 @@ async def smoke_quote(get_quotes, tickers, broker): # since the new client needs to know what symbols are accepted log.warn(f"Retrieving smoke quote for symbols {tickers}") quotes = await get_quotes(tickers) + # report any tickers that aren't returned in the first quote - invalid_tickers = set(tickers) - set(quotes) + invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes)) for symbol in invalid_tickers: tickers.remove(symbol) log.warn( @@ -202,7 +210,8 @@ async def smoke_quote(get_quotes, tickers, broker): # pop any tickers that return "empty" quotes payload = {} - for symbol, quote in quotes.items(): + for quote in quotes: + symbol = quote['symbol'] if quote is None: log.warn( f"Symbol `{symbol}` not found by broker" @@ -210,6 +219,12 @@ async def smoke_quote(get_quotes, tickers, broker): # XXX: not this mutates the input list (for now) tickers.remove(symbol) continue + + # report any unknown/invalid symbols (QT specific) + if quote.get('low52w', False) is None: + log.warn( + f"{symbol} seems to be defunct") + payload[symbol] = quote return payload @@ -218,23 +233,25 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### -def modify_quote_stream(broker, tickers, chan=None, cid=None): +async def modify_quote_stream(broker, feed_type, tickers, chan=None, cid=None): """Absolute symbol subscription list for each quote stream. - Effectively a consumer subscription api. + Effectively a symbol subscription api. """ log.info(f"{chan} changed symbol subscription to {tickers}") - ss = tractor.current_actor().statespace - broker2tickersubs = ss['broker2tickersubs'] - tickers2chans = broker2tickersubs.get(broker) + feed = await get_cached_feed(broker) + symbols2chans = feed.subscriptions[feed_type] # update map from each symbol to requesting client's chan for ticker in tickers: - tickers2chans.setdefault(ticker, set()).add((chan, cid)) + symbols2chans.setdefault(ticker, set()).add((chan, cid)) + # remove any existing symbol subscriptions if symbol is not + # found in ``tickers`` + # TODO: this can likely be factored out into the pub-sub api for ticker in filter( - lambda ticker: ticker not in tickers, tickers2chans.copy() + lambda ticker: ticker not in tickers, symbols2chans.copy() ): - chanset = tickers2chans.get(ticker) + chanset = symbols2chans.get(ticker) # XXX: cid will be different on unsub call for item in chanset.copy(): if chan in item: @@ -242,12 +259,42 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None): if not chanset: # pop empty sets which will trigger bg quoter task termination - tickers2chans.pop(ticker) + symbols2chans.pop(ticker) + + +async def get_cached_feed( + brokername: str, +) -> DataFeed: + """Get/create a ``DataFeed`` from/in the current actor. + """ + # check if a cached client is in the local actor's statespace + ss = tractor.current_actor().statespace + feeds = ss['feeds'] + lock = feeds['_lock'] + feed_stack = ss['feed_stacks'][brokername] + async with lock: + try: + feed = feeds[brokername] + log.info(f"Subscribing with existing `{brokername}` daemon") + return feed + except KeyError: + log.info(f"Creating new client for broker {brokername}") + brokermod = get_brokermod(brokername) + client = await feed_stack.enter_async_context( + brokermod.get_client()) + feed = DataFeed( + mod=brokermod, + client=client, + ) + feeds[brokername] = feed + return feed async def start_quote_stream( broker: str, - tickers: List[str], + symbols: List[Any], + feed_type: str = 'stock', + diff_cached: bool = True, chan: tractor.Channel = None, cid: str = None, ) -> None: @@ -263,57 +310,75 @@ async def start_quote_stream( get_console_log(actor.loglevel) # pull global vars from local actor ss = actor.statespace - broker2tickersubs = ss['broker2tickersubs'] - clients = ss['clients'] - dtasks = ss['dtasks'] - tickers = list(tickers) + # broker2symbolsubs = ss.setdefault('broker2symbolsubs', {}) + ss.setdefault('feeds', {'_lock': trio.Lock()}) + feed_stacks = ss.setdefault('feed_stacks', {}) + symbols = list(symbols) log.info( - f"{chan.uid} subscribed to {broker} for tickers {tickers}") + f"{chan.uid} subscribed to {broker} for symbols {symbols}") + feed_stack = feed_stacks.setdefault(broker, contextlib.AsyncExitStack()) + # another actor task may have already created it + feed = await get_cached_feed(broker) + symbols2chans = feed.subscriptions[feed_type] - brokermod, client, _, get_quotes = await get_cached_client(broker, tickers) - if broker not in broker2tickersubs: - tickers2chans = broker2tickersubs.setdefault(broker, {}) - else: - log.info(f"Subscribing with existing `{broker}` daemon") - tickers2chans = broker2tickersubs[broker] - - # do a smoke quote (note this mutates the input list and filters - # out bad symbols for now) - payload = await smoke_quote(get_quotes, tickers, broker) - # push initial smoke quote response for client initialization - await chan.send({'yield': payload, 'cid': cid}) + if feed_type == 'stock': + get_quotes = feed.quoters.setdefault( + 'stock', + await feed.mod.stock_quoter(feed.client, symbols) + ) + # do a smoke quote (note this mutates the input list and filters + # out bad symbols for now) + payload = await smoke_quote(get_quotes, symbols, broker) + # push initial smoke quote response for client initialization + await chan.send({'yield': payload, 'cid': cid}) + elif feed_type == 'option': + # FIXME: yeah we need maybe a more general way to specify + # the arg signature for the option feed beasides a symbol + # + expiry date. + get_quotes = feed.quoters.setdefault( + 'option', + await feed.mod.option_quoter(feed.client, symbols) + ) # update map from each symbol to requesting client's chan - modify_quote_stream(broker, tickers, chan=chan, cid=cid) + await modify_quote_stream(broker, feed_type, symbols, chan, cid) try: - if broker not in dtasks: - # no quoter task yet so start a daemon task - log.info(f"Spawning quoter task for {brokermod.name}") - async with trio.open_nursery() as nursery: - nursery.start_soon(partial( - fan_out_to_chans, brokermod, get_quotes, tickers2chans, - cid=cid) - ) - dtasks.add(broker) - + if not feed.tasks.get(feed_type): + # no data feeder task yet; so start one + respawn = True + log.info(f"Spawning data feed task for {feed.mod.name}") + while respawn: + respawn = False + try: + async with trio.open_nursery() as nursery: + nursery.start_soon( + partial( + fan_out_to_chans, feed, get_quotes, + symbols2chans, + diff_cached=diff_cached, + cid=cid + ) + ) + feed.tasks[feed_type] = True + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True # unblocks when no more symbols subscriptions exist and the # quote streamer task terminates (usually because another call # was made to `modify_quoter` to unsubscribe from streaming # symbols) - log.info(f"Terminated quoter task for {brokermod.name}") - - # TODO: move to AsyncExitStack in 3.7 - for _, _, cntxmng, _ in clients.values(): - # FIXME: yes I know there's no error handling.. - await cntxmng.__aexit__(None, None, None) finally: + log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}") + feed.tasks.pop(feed_type) # if there are truly no more subscriptions with this broker # drop from broker subs dict - if not any(tickers2chans.values()): + if not any(symbols2chans.values()): log.info(f"No more subscriptions for {broker}") - broker2tickersubs.pop(broker, None) - dtasks.discard(broker) + # broker2symbolsubs.pop(broker, None) + + # destroy the API client + await feed_stack.aclose() async def stream_to_file( From cabc616b85ed391faf5413cf6d7f0f2f4657038d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Nov 2018 08:14:36 -0500 Subject: [PATCH 06/17] Port option api to new backend broker api --- piker/brokers/core.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 60f4292c..e6fed8a1 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -6,6 +6,7 @@ from types import ModuleType from typing import List, Dict, Any, Optional from ..log import get_logger +from .data import DataFeed log = get_logger('broker.core') @@ -19,7 +20,7 @@ async def api(brokermod: ModuleType, methname: str, **kwargs) -> dict: meth = getattr(client.api, methname, None) if meth is None: log.warning( - "Couldn't find API method {methname} looking up on client") + f"Couldn't find API method {methname} looking up on client") meth = getattr(client, methname, None) if meth is None: @@ -46,13 +47,14 @@ async def stocks_quote( """ async with brokermod.get_client() as client: results = await client.quote(tickers) - for key, val in results.items(): + for val in results: if val is None: brokermod.log.warn(f"Could not find symbol {key}?") return results +# TODO: these need tests async def option_chain( brokermod: ModuleType, symbol: str, @@ -67,7 +69,8 @@ async def option_chain( if date: id = int((await client.tickers2ids([symbol]))[symbol]) # build contracts dict for single expiry - return await client.option_chains({id: {date: None}}) + return await client.option_chains( + {(symbol, id, date): {}}) else: # get all contract expiries # (takes a long-ass time on QT fwiw) @@ -83,4 +86,5 @@ async def contracts( """Return option contracts (all expiries) for ``symbol``. """ async with brokermod.get_client() as client: + # return await client.get_all_contracts([symbol]) return await client.get_all_contracts([symbol]) From c2ec4800d6bf0ad2d869e969953f57a258e32282 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Nov 2018 08:16:31 -0500 Subject: [PATCH 07/17] Port cli to new options api --- piker/cli.py | 46 ++++++++++++---------------------------------- 1 file changed, 12 insertions(+), 34 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index 3ea0eaac..d0ab22fc 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -4,6 +4,7 @@ Console interface to broker client/daemons. from functools import partial import json import os +from operator import attrgetter import click import pandas as pd @@ -32,11 +33,6 @@ def pikerd(loglevel, host, tl): get_console_log(loglevel) tractor.run_daemon( rpc_module_paths=['piker.brokers.data'], - statespace={ - 'broker2tickersubs': {}, - 'clients': {}, - 'dtasks': set(), - }, name='brokerd', loglevel=loglevel if tl else None, ) @@ -130,11 +126,6 @@ async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None): "No broker daemon could be found, spawning brokerd..") portal = await nursery.start_actor( 'brokerd', - statespace={ - 'broker2tickersubs': {}, - 'clients': {}, - 'dtasks': set(), - }, rpc_module_paths=['piker.brokers.data'], loglevel=loglevel, ) @@ -168,23 +159,10 @@ def monitor(loglevel, broker, rate, name, dhost, test, tl): async with maybe_spawn_brokerd_as_subactor( tries=tries, loglevel=loglevel ) as portal: - if test: - # stream from a local test file - agen = await portal.run( - "piker.brokers.data", 'stream_from_file', - filename=test - ) - # agen = data.stream_from_file(test) - else: - # start live streaming from broker daemon - agen = await portal.run( - "piker.brokers.data", 'start_quote_stream', - broker=brokermod.name, tickers=tickers) - # run app "main" await _async_main( name, portal, tickers, - brokermod, rate, agen, + brokermod, rate, test=test, ) tractor.run( @@ -330,14 +308,16 @@ def dump(ctx, name): def contracts(loglevel, broker, symbol, ids): brokermod = get_brokermod(broker) get_console_log(loglevel) - quotes = trio.run(partial(core.contracts, brokermod, symbol)) + contracts = trio.run(partial(core.contracts, brokermod, symbol)) if not ids: # just print out expiry dates which can be used with # the option_chain_quote cmd - id, contracts = next(iter(quotes.items())) - quotes = list(contracts) + output = tuple(map(attrgetter('expiry'), contracts)) + else: + output = tuple(contracts.items()) - click.echo(colorize_json(quotes)) + # TODO: need a cli test to verify + click.echo(colorize_json(output)) @cli.command() @@ -358,17 +338,15 @@ def optsquote(loglevel, broker, symbol, df_output, date): partial( core.option_chain, brokermod, symbol, date ) - )[symbol] + ) if not quotes: - log.error(f"No quotes could be found for {symbol}?") + log.error(f"No option quotes could be found for {symbol}?") return if df_output: - cols = next(filter(bool, quotes.values())).copy() df = pd.DataFrame( - (quote.values() for contract, quote in quotes.items()), - index=quotes.keys(), - columns=cols.keys(), + (quote.values() for quote in quotes), + columns=quotes[0].keys(), ) click.echo(df) else: From 288ea604af37cf448e962a99793f7bdd7b924170 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Nov 2018 08:17:54 -0500 Subject: [PATCH 08/17] Call start_quote_stream() from monitor main --- piker/ui/monitor.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/piker/ui/monitor.py b/piker/ui/monitor.py index 5f159ace..97da2b32 100644 --- a/piker/ui/monitor.py +++ b/piker/ui/monitor.py @@ -19,6 +19,7 @@ from kivy.lang import Builder from kivy import utils from kivy.app import async_runTouchApp from kivy.core.window import Window +from async_generator import aclosing from ..log import get_logger from .pager import PagerView @@ -513,13 +514,24 @@ async def _async_main( tickers: List[str], brokermod: ModuleType, rate: int, - # an async generator instance which yields quotes dict packets - quote_gen: AsyncGeneratorType, + test: bool = False ) -> None: '''Launch kivy app + all other related tasks. This is started with cli cmd `piker monitor`. ''' + if test: + # stream from a local test file + quote_gen = await portal.run( + "piker.brokers.data", 'stream_from_file', + filename=test + ) + else: + # start live streaming from broker daemon + quote_gen = await portal.run( + "piker.brokers.data", 'start_quote_stream', + broker=brokermod.name, symbols=tickers) + # subscribe for tickers (this performs a possible filtering # where invalid symbols are discarded) sd = await portal.run( @@ -594,14 +606,17 @@ async def _async_main( try: # Trio-kivy entry point. await async_runTouchApp(widgets['root']) # run kivy - await quote_gen.aclose() # cancel aysnc gen call finally: + await quote_gen.aclose() # cancel aysnc gen call # un-subscribe from symbols stream (cancel if brokerd # was already torn down - say by SIGINT) with trio.move_on_after(0.2): await portal.run( "piker.brokers.data", 'modify_quote_stream', - broker=brokermod.name, tickers=[]) + broker=brokermod.name, + feed_type='stock', + tickers=[] + ) # cancel GUI update task nursery.cancel_scope.cancel() From 48a9c389c58ba2bf90b3b075416b4d878466b77f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Nov 2018 08:18:13 -0500 Subject: [PATCH 09/17] Add loglevel support to tests --- tests/conftest.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index f7688a9d..76a8ddee 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,20 @@ import pytest +import tractor +from piker import log + + +def pytest_addoption(parser): + parser.addoption("--ll", action="store", dest='loglevel', + default=None, help="logging level to set when testing") + + +@pytest.fixture(scope='session', autouse=True) +def loglevel(request): + orig = tractor.log._default_loglevel + level = tractor.log._default_loglevel = request.config.option.loglevel + log.get_console_log(level) + yield level + tractor.log._default_loglevel = orig @pytest.fixture From 15dec65ba1e676e7124b472393b114d0342b6738 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Nov 2018 08:18:54 -0500 Subject: [PATCH 10/17] Add an options streaming test --- tests/test_questrade.py | 225 ++++++++++++++++++++++++++-------------- tests/test_tractor.py | 5 - 2 files changed, 149 insertions(+), 81 deletions(-) diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 897ba531..7a5170b4 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -4,7 +4,9 @@ Questrade broker testing import time import trio +import tractor from trio.testing import trio_test +from tractor.testing import tractor_test from piker.brokers import questrade as qt import pytest @@ -18,72 +20,75 @@ def check_qt_conf_section(brokerconf): # stock quote -_ex_quote = { - "VWAP": 7.383792, - "askPrice": 7.56, - "askSize": 2, - "bidPrice": 6.1, - "bidSize": 2, - "delay": 0, - "high52w": 9.68, - "highPrice": 8, - "isHalted": 'false', - "lastTradePrice": 6.96, - "lastTradePriceTrHrs": 6.97, - "lastTradeSize": 2000, - "lastTradeTick": "Down", - "lastTradeTime": "2018-02-07T15:59:59.259000-05:00", - "low52w": 1.03, - "lowPrice": 6.88, - "openPrice": 7.64, - "symbol": "EMH.VN", - "symbolId": 10164524, - "tier": "", - "volume": 5357805 +_ex_quotes = { + 'stock': { + "VWAP": 7.383792, + "askPrice": 7.56, + "askSize": 2, + "bidPrice": 6.1, + "bidSize": 2, + "delay": 0, + "high52w": 9.68, + "highPrice": 8, + "key": "EMH.VN", + "isHalted": 'false', + "lastTradePrice": 6.96, + "lastTradePriceTrHrs": 6.97, + "lastTradeSize": 2000, + "lastTradeTick": "Down", + "lastTradeTime": "2018-02-07T15:59:59.259000-05:00", + "low52w": 1.03, + "lowPrice": 6.88, + "openPrice": 7.64, + "symbol": "EMH.VN", + "symbolId": 10164524, + "tier": "", + "volume": 5357805 + }, + 'option': { + 'VWAP': 0, + 'askPrice': None, + 'askSize': 0, + 'bidPrice': None, + 'bidSize': 0, + 'delay': 0, + 'delta': -0.212857, + 'gamma': 0.003524, + 'highPrice': 0, + 'isHalted': False, + "key": ["WEED.TO", '2018-10-23T00:00:00.000000-04:00'], + 'lastTradePrice': 22, + 'lastTradePriceTrHrs': None, + 'lastTradeSize': 0, + 'lastTradeTick': 'Equal', + 'lastTradeTime': '2018-10-23T00:00:00.000000-04:00', + 'lowPrice': 0, + 'openInterest': 1, + 'openPrice': 0, + 'rho': -0.891868, + 'symbol': 'WEED15Jan21P54.00.MX', + 'symbolId': 22739148, + 'theta': -0.012911, + 'underlying': 'WEED.TO', + 'underlyingId': 16529510, + 'vega': 0.220885, + 'volatility': 75.514171, + 'volume': 0 + } } -# option quote -_ex_contract = { - 'VWAP': 0, - 'askPrice': None, - 'askSize': 0, - 'bidPrice': None, - 'bidSize': 0, - 'delay': 0, - 'delta': -0.212857, - 'gamma': 0.003524, - 'highPrice': 0, - 'isHalted': False, - 'lastTradePrice': 22, - 'lastTradePriceTrHrs': None, - 'lastTradeSize': 0, - 'lastTradeTick': 'Equal', - 'lastTradeTime': '2018-10-23T00:00:00.000000-04:00', - 'lowPrice': 0, - 'openInterest': 1, - 'openPrice': 0, - 'rho': -0.891868, - 'symbol': 'WEED15Jan21P54.00.MX', - 'symbolId': 22739148, - 'theta': -0.012911, - 'underlying': 'WEED.TO', - 'underlyingId': 16529510, - 'vega': 0.220885, - 'volatility': 75.514171, - 'volume': 0 -} - - -def match_packet(symbols, quotes): +def match_packet(symbols, quotes, feed_type='stock'): """Verify target ``symbols`` match keys in ``quotes`` packet. """ assert len(quotes) == len(symbols) - for ticker in symbols: - quote = quotes.pop(ticker) + # for ticker in symbols: + for quote in quotes.copy(): + assert quote['key'] in symbols + quotes.remove(quote) # verify the quote packet format hasn't changed - for key in _ex_quote: + for key in _ex_quotes[feed_type]: quote.pop(key) # no additional fields either @@ -104,11 +109,11 @@ async def test_batched_stock_quote(us_symbols): @trio_test -async def test_quoter_context(us_symbols): +async def test_stock_quoter_context(us_symbols): """Test that a quoter "context" used by the data feed daemon. """ async with qt.get_client() as client: - quoter = await qt.quoter(client, us_symbols) + quoter = await qt.stock_quoter(client, us_symbols) quotes = await quoter(us_symbols) match_packet(us_symbols, quotes) @@ -119,12 +124,14 @@ async def test_option_contracts(tmx_symbols): """ async with qt.get_client() as client: for symbol in tmx_symbols: - id, contracts = await client.symbol2contracts(symbol) - assert isinstance(id, int) - assert isinstance(contracts, dict) - for dt in contracts: - assert dt.isoformat( - timespec='microseconds') == contracts[dt]['expiryDate'] + contracts = await client.symbol2contracts(symbol) + key, byroot = next(iter(contracts.items())) + assert isinstance(key.id, int) + assert isinstance(byroot, dict) + for key in contracts: + # check that datetime is same as reported in contract + assert key.expiry.isoformat( + timespec='microseconds') == contracts[key]['expiryDate'] @trio_test @@ -133,17 +140,15 @@ async def test_option_chain(tmx_symbols): """ async with qt.get_client() as client: # contract lookup - should be cached - contracts = await client.get_all_contracts(tmx_symbols) + contracts = await client.get_all_contracts([tmx_symbols[0]]) # chains quote for all symbols quotes = await client.option_chains(contracts) - for key in tmx_symbols: - contracts = quotes.pop(key) - for key, quote in contracts.items(): - for key in _ex_contract: - quote.pop(key) - assert not quote - # chains for each symbol were retreived - assert not quotes + # verify contents match what we expect + for quote in quotes: + assert quote['underlying'] in tmx_symbols + for key in _ex_quotes['option']: + quote.pop(key) + assert not quote @trio_test @@ -163,7 +168,7 @@ async def test_option_quote_latency(tmx_symbols): # NOTE: request latency is usually 2x faster that these (5, contracts), (0.5, single) ]: - for _ in range(10): + for _ in range(3): # chains quote for all symbols start = time.time() await client.option_chains(contract) @@ -171,3 +176,71 @@ async def test_option_quote_latency(tmx_symbols): print(f"Request took {took}") assert took <= expected_latency await trio.sleep(0.1) + + +@tractor_test +async def test_option_streaming(tmx_symbols, loglevel): + """Set up option streaming using the broker daemon. + """ + async with tractor.find_actor('brokerd') as portal: + async with tractor.open_nursery() as nursery: + # only one per host address, spawns an actor if None + if not portal: + # no brokerd actor found + portal = await nursery.start_actor( + 'data_feed', + rpc_module_paths=[ + 'piker.brokers.data', + 'piker.brokers.core' + ], + ) + + symbol = 'APHA.TO' # your fave greenhouse LP + async with qt.get_client() as client: + contracts = await client.get_all_contracts([symbol]) + + contractkey = next(iter(contracts)) + subs_keys = list( + map(lambda item: (item.symbol, item.expiry), contracts)) + sub = subs_keys[0] + + agen = await portal.run( + 'piker.brokers.data', + 'start_quote_stream', + broker='questrade', + symbols=[sub], + feed_type='option', + diff_cached=False, + ) + try: + # wait on the data streamer to actually start + # delivering + await agen.__anext__() + + # it'd sure be nice to have an asyncitertools here... + with trio.fail_after(2.1): + loops = 8 + count = 0 + async for quotes in agen: + # print(f'got quotes for {quotes.keys()}') + # we should receive all calls and puts + assert len(quotes) == len(contracts[contractkey]) * 2 + for symbol, quote in quotes.items(): + assert quote['key'] == sub + for key in _ex_quotes['option']: + quote.pop(key) + assert not quote + count += 1 + if count == loops: + break + finally: + # unsub + await portal.run( + 'piker.brokers.data', + 'modify_quote_stream', + broker='questrade', + feed_type='option', + tickers=[], + ) + # stop all spawned subactors + await nursery.cancel() diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 7e63e3a7..7afc5d77 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -16,11 +16,6 @@ async def rx_price_quotes_from_brokerd(us_symbols): portal = await nursery.start_actor( 'brokerd', rpc_module_paths=['piker.brokers.data'], - statespace={ - 'broker2tickersubs': {}, - 'clients': {}, - 'dtasks': set() - }, ) # gotta expose in a broker agnostic way... From f35671cc888492e94ed7f363eca44e384179b0a1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Dec 2018 16:08:03 -0500 Subject: [PATCH 11/17] Handle bad symbol names --- piker/brokers/questrade.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 6c0427ed..330580af 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -261,14 +261,10 @@ class Client: """Return stock quotes for each ticker in ``tickers``. """ t2ids = await self.tickers2ids(tickers) - ids = ','.join(t2ids.values()) - quotes = (await self.api.quotes(ids=ids)) - - # set None for all symbols not found - if len(t2ids) < len(tickers): - for ticker in tickers: - if ticker not in quotes: - quotes[ticker] = None + quotes = [] + if t2ids: + ids = ','.join(t2ids.values()) + quotes = (await self.api.quotes(ids=ids)) return quotes From 61294c6c44ab773054595d0bb0b9d5d3c5fa5827 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Dec 2018 16:09:41 -0500 Subject: [PATCH 12/17] Adhere to the same non-found-symbol behaviour as QT --- piker/brokers/robinhood.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py index 63cd8af7..ff1e22d4 100644 --- a/piker/brokers/robinhood.py +++ b/piker/brokers/robinhood.py @@ -2,6 +2,7 @@ Robinhood API backend. """ from functools import partial +from typing import List from async_generator import asynccontextmanager # TODO: move to urllib3/requests once supported @@ -44,7 +45,7 @@ class Client: self._sess.base_location = _service_ep self.api = _API(self._sess) - def _zip_in_order(self, symbols: [str], results_dict: dict): + def _zip_in_order(self, symbols: [str], quotes: List[dict]): return {quote.get('symbol', sym) if quote else sym: quote for sym, quote in zip(symbols, results_dict)} @@ -52,11 +53,16 @@ class Client: """Retrieve quotes for a list of ``symbols``. """ try: - resp = await self.api.quotes(','.join(symbols)) + quotes = (await self.api.quotes(','.join(symbols)))['results'] except BrokerError: - resp = {'results': [None] * len(symbols)} + quotes = [None] * len(symbols) - return self._zip_in_order(symbols, resp['results']) + for quote in quotes: + # insert our subscription key field + if quote is not None: + quote['key'] = quote['symbol'] + + return list(filter(bool, quotes)) async def symbol_data(self, symbols: [str]): """Retrieve symbol data via the ``fundamentals`` endpoint. From 2915e83324c156c64f0198269a8b82bb39c792c9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Dec 2018 16:11:38 -0500 Subject: [PATCH 13/17] Warn about missing symbols at CLI level --- piker/brokers/core.py | 7 +------ piker/cli.py | 7 +++++++ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index e6fed8a1..b493a9e7 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -46,12 +46,7 @@ async def stocks_quote( """Return quotes dict for ``tickers``. """ async with brokermod.get_client() as client: - results = await client.quote(tickers) - for val in results: - if val is None: - brokermod.log.warn(f"Could not find symbol {key}?") - - return results + return await client.quote(tickers) # TODO: these need tests diff --git a/piker/cli.py b/piker/cli.py index d0ab22fc..6dd3b327 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -5,6 +5,7 @@ from functools import partial import json import os from operator import attrgetter +from operator import itemgetter import click import pandas as pd @@ -101,6 +102,12 @@ def quote(loglevel, broker, tickers, df_output): log.error(f"No quotes could be found for {tickers}?") return + if len(quotes) < len(tickers): + syms = tuple(map(itemgetter('symbol'), quotes)) + for ticker in tickers: + if ticker not in syms: + brokermod.log.warn(f"Could not find symbol {ticker}?") + if df_output: cols = next(filter(bool, quotes.values())).copy() cols.pop('symbol') From 7378a16b9045bdc9862c9f831392143b94d9fba7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Dec 2018 16:12:03 -0500 Subject: [PATCH 14/17] s/tickers/symbols --- piker/brokers/data.py | 10 +++++----- piker/ui/monitor.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index bbbe5678..56e99fac 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -233,23 +233,23 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### -async def modify_quote_stream(broker, feed_type, tickers, chan=None, cid=None): +async def modify_quote_stream(broker, feed_type, symbols, chan=None, cid=None): """Absolute symbol subscription list for each quote stream. Effectively a symbol subscription api. """ - log.info(f"{chan} changed symbol subscription to {tickers}") + log.info(f"{chan} changed symbol subscription to {symbols}") feed = await get_cached_feed(broker) symbols2chans = feed.subscriptions[feed_type] # update map from each symbol to requesting client's chan - for ticker in tickers: + for ticker in symbols: symbols2chans.setdefault(ticker, set()).add((chan, cid)) # remove any existing symbol subscriptions if symbol is not - # found in ``tickers`` + # found in ``symbols`` # TODO: this can likely be factored out into the pub-sub api for ticker in filter( - lambda ticker: ticker not in tickers, symbols2chans.copy() + lambda ticker: ticker not in symbols, symbols2chans.copy() ): chanset = symbols2chans.get(ticker) # XXX: cid will be different on unsub call diff --git a/piker/ui/monitor.py b/piker/ui/monitor.py index 97da2b32..44480a61 100644 --- a/piker/ui/monitor.py +++ b/piker/ui/monitor.py @@ -615,7 +615,7 @@ async def _async_main( "piker.brokers.data", 'modify_quote_stream', broker=brokermod.name, feed_type='stock', - tickers=[] + symbols=[] ) # cancel GUI update task From 2df5c7682862b14928be38db95e6baccedde3e24 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Dec 2018 16:13:15 -0500 Subject: [PATCH 15/17] Adjust cli tests for new quotes list output --- tests/test_cli.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 2273410a..ec46c004 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -18,11 +18,11 @@ def run(cmd, *args): return cp -def verify_keys(tickers, quotes_dict): +def verify_keys(tickers, quotes): """Verify all ticker names are keys in ``quotes_dict``. """ - for key, quote in quotes_dict.items(): - assert key in tickers + for quote in quotes: + assert quote['key'] in tickers @pytest.fixture @@ -39,8 +39,8 @@ def test_known_quotes(capfd, nyse_tickers): # verify output can be parsed as json out, err = capfd.readouterr() - quotes_dict = json.loads(out) - verify_keys(nyse_tickers, quotes_dict) + quotes = json.loads(out) + verify_keys(nyse_tickers, quotes) @pytest.mark.parametrize( @@ -61,8 +61,8 @@ def test_quotes_ticker_not_found( out, err = capfd.readouterr() if out: # verify output can be parsed as json - quotes_dict = json.loads(out) - verify_keys(tickers, quotes_dict) + quotes = json.loads(out) + verify_keys(tickers, quotes) # check for warning log message when some quotes are found warnmsg = f'Could not find symbol {bad_ticker[0]}' assert warnmsg in err From 12d56278602090526aa139d702e3da9d02a62ad2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Dec 2018 16:14:33 -0500 Subject: [PATCH 16/17] Aggregate streaming tests and test stocks + options together --- tests/test_questrade.py | 146 ++++++++++++++++++++++++++-------------- tests/test_tractor.py | 56 --------------- 2 files changed, 96 insertions(+), 106 deletions(-) delete mode 100644 tests/test_tractor.py diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 7a5170b4..482c1e37 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -178,8 +178,98 @@ async def test_option_quote_latency(tmx_symbols): await trio.sleep(0.1) +async def stream_option_chain(portal, symbols): + """Start up an option quote stream. + + ``symbols`` arg is ignored here. + """ + symbol = 'APHA.TO' # your fave greenhouse LP + async with qt.get_client() as client: + contracts = await client.get_all_contracts([symbol]) + + contractkey = next(iter(contracts)) + subs_keys = list( + map(lambda item: (item.symbol, item.expiry), contracts)) + sub = subs_keys[0] + + agen = await portal.run( + 'piker.brokers.data', + 'start_quote_stream', + broker='questrade', + symbols=[sub], + feed_type='option', + diff_cached=False, + ) + try: + # wait on the data streamer to actually start + # delivering + await agen.__anext__() + + # it'd sure be nice to have an asyncitertools here... + with trio.fail_after(2.1): + loops = 8 + count = 0 + async for quotes in agen: + # print(f'got quotes for {quotes.keys()}') + # we should receive all calls and puts + assert len(quotes) == len(contracts[contractkey]) * 2 + for symbol, quote in quotes.items(): + assert quote['key'] == sub + for key in _ex_quotes['option']: + quote.pop(key) + assert not quote + count += 1 + if count == loops: + break + finally: + # unsub + await portal.run( + 'piker.brokers.data', + 'modify_quote_stream', + broker='questrade', + feed_type='option', + symbols=[], + ) + + +async def stream_stocks(portal, symbols): + """Start up a stock quote stream. + """ + agen = await portal.run( + 'piker.brokers.data', + 'start_quote_stream', + broker='questrade', + symbols=symbols, + ) + try: + # it'd sure be nice to have an asyncitertools here... + async for quotes in agen: + assert quotes + for key in quotes: + assert key in symbols + break + finally: + # unsub + await portal.run( + 'piker.brokers.data', + 'modify_quote_stream', + broker='questrade', + feed_type='stock', + symbols=[], + ) + + +@pytest.mark.parametrize( + 'stream_what', + [ + (stream_stocks,), + (stream_option_chain,), + (stream_stocks, stream_option_chain), + ], + ids=['stocks', 'options', 'stocks_and_options'], +) @tractor_test -async def test_option_streaming(tmx_symbols, loglevel): +async def test_quote_streaming(tmx_symbols, loglevel, stream_what): """Set up option streaming using the broker daemon. """ async with tractor.find_actor('brokerd') as portal: @@ -194,53 +284,9 @@ async def test_option_streaming(tmx_symbols, loglevel): 'piker.brokers.core' ], ) + async with trio.open_nursery() as n: + for func in stream_what: + n.start_soon(func, portal, tmx_symbols) - symbol = 'APHA.TO' # your fave greenhouse LP - async with qt.get_client() as client: - contracts = await client.get_all_contracts([symbol]) - - contractkey = next(iter(contracts)) - subs_keys = list( - map(lambda item: (item.symbol, item.expiry), contracts)) - sub = subs_keys[0] - - agen = await portal.run( - 'piker.brokers.data', - 'start_quote_stream', - broker='questrade', - symbols=[sub], - feed_type='option', - diff_cached=False, - ) - try: - # wait on the data streamer to actually start - # delivering - await agen.__anext__() - - # it'd sure be nice to have an asyncitertools here... - with trio.fail_after(2.1): - loops = 8 - count = 0 - async for quotes in agen: - # print(f'got quotes for {quotes.keys()}') - # we should receive all calls and puts - assert len(quotes) == len(contracts[contractkey]) * 2 - for symbol, quote in quotes.items(): - assert quote['key'] == sub - for key in _ex_quotes['option']: - quote.pop(key) - assert not quote - count += 1 - if count == loops: - break - finally: - # unsub - await portal.run( - 'piker.brokers.data', - 'modify_quote_stream', - broker='questrade', - feed_type='option', - tickers=[], - ) - # stop all spawned subactors - await nursery.cancel() + # stop all spawned subactors + await nursery.cancel() diff --git a/tests/test_tractor.py b/tests/test_tractor.py deleted file mode 100644 index 7afc5d77..00000000 --- a/tests/test_tractor.py +++ /dev/null @@ -1,56 +0,0 @@ -""" -Actor model API testing -""" -import pytest -import tractor - - -async def rx_price_quotes_from_brokerd(us_symbols): - """Verify we can spawn a daemon actor and retrieve streamed price data. - """ - async with tractor.find_actor('brokerd') as portals: - if not portals: - # only one per host address, spawns an actor if None - async with tractor.open_nursery() as nursery: - # no brokerd actor found - portal = await nursery.start_actor( - 'brokerd', - rpc_module_paths=['piker.brokers.data'], - ) - - # gotta expose in a broker agnostic way... - # retrieve initial symbol data - # sd = await portal.run( - # 'piker.brokers.data', 'symbol_data', symbols=us_symbols) - # assert list(sd.keys()) == us_symbols - - gen = await portal.run( - 'piker.brokers.data', - 'start_quote_stream', - broker='robinhood', - tickers=us_symbols, - ) - # it'd sure be nice to have an asyncitertools here... - async for quotes in gen: - assert quotes - for key in quotes: - assert key in us_symbols - break - # terminate far-end async-gen - # await gen.asend(None) - # break - - # stop all spawned subactors - await nursery.cancel() - - # arbitter is cancelled here due to `find_actors()` internals - # (which internally uses `get_arbiter` which kills its channel - # server scope on exit) - - -def test_rx_price_quotes_from_brokerd(us_symbols): - tractor.run( - rx_price_quotes_from_brokerd, - us_symbols, - name='arbiter', - ) From 42f7a1092b95fd4b2ebdc75f4335d76bd8aacbe9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Dec 2018 17:10:59 -0500 Subject: [PATCH 17/17] Update deps --- Pipfile.lock | 204 ++++++++++++++++++++++++--------------------------- 1 file changed, 94 insertions(+), 110 deletions(-) diff --git a/Pipfile.lock b/Pipfile.lock index 19afd44b..7a26f213 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -50,37 +50,37 @@ }, "cython": { "hashes": [ - "sha256:019008a69e6b7c102f2ed3d733a288d1784363802b437dd2b91e6256b12746da", - "sha256:1441fe19c56c90b8c2159d7b861c31a134d543ef7886fd82a5d267f9f11f35ac", - "sha256:1d1a5e9d6ed415e75a676b72200ad67082242ec4d2d76eb7446da255ae72d3f7", - "sha256:339f5b985de3662b1d6c69991ab46fdbdc736feb4ac903ef6b8c00e14d87f4d8", - "sha256:35bdf3f48535891fee2eaade70e91d5b2cc1ee9fc2a551847c7ec18bce55a92c", - "sha256:3d0afba0aec878639608f013045697fb0969ff60b3aea2daec771ea8d01ad112", - "sha256:42c53786806e24569571a7a24ebe78ec6b364fe53e79a3f27eddd573cacd398f", - "sha256:48b919da89614d201e72fbd8247b5ae8881e296cf968feb5595a015a14c67f1f", - "sha256:49906e008eeb91912654a36c200566392bd448b87a529086694053a280f8af2d", - "sha256:49fc01a7c9c4e3c1784e9a15d162c2cac3990fcc28728227a6f8f0837aabda7c", - "sha256:501b671b639b9ca17ad303f8807deb1d0ff754d1dab106f2607d14b53cb0ff0b", - "sha256:5574574142364804423ab4428bd331a05c65f7ecfd31ac97c936f0c720fe6a53", - "sha256:6092239a772b3c6604be9e94b9ab4f0dacb7452e8ad299fd97eae0611355b679", - "sha256:71ff5c7632501c4f60edb8a24fd0a772e04c5bdca2856d978d04271b63666ef7", - "sha256:7dcf2ad14e25b05eda8bdd104f8c03a642a384aeefd25a5b51deac0826e646fa", - "sha256:8ca3a99f5a7443a6a8f83a5d8fcc11854b44e6907e92ba8640d8a8f7b9085e21", - "sha256:927da3b5710fb705aab173ad630b45a4a04c78e63dcd89411a065b2fe60e4770", - "sha256:94916d1ede67682638d3cc0feb10648ff14dc51fb7a7f147f4fedce78eaaea97", - "sha256:a3e5e5ca325527d312cdb12a4dab8b0459c458cad1c738c6f019d0d8d147081c", - "sha256:a7716a98f0b9b8f61ddb2bae7997daf546ac8fc594be6ba397f4bde7d76bfc62", - "sha256:acf10d1054de92af8d5bfc6620bb79b85f04c98214b4da7db77525bfa9fc2a89", - "sha256:de46ffb67e723975f5acab101c5235747af1e84fbbc89bf3533e2ea93fb26947", - "sha256:df428969154a9a4cd9748c7e6efd18432111fbea3d700f7376046c38c5e27081", - "sha256:f5ebf24b599caf466f9da8c4115398d663b2567b89e92f58a835e9da4f74669f", - "sha256:f79e45d5c122c4fb1fd54029bf1d475cecc05f4ed5b68136b0d6ec268bae68b6", - "sha256:f7a43097d143bd7846ffba6d2d8cd1cc97f233318dbd0f50a235ea01297a096b", - "sha256:fceb8271bc2fd3477094ca157c824e8ea840a7b393e89e766eea9a3b9ce7e0c6", - "sha256:ff919ceb40259f5332db43803aa6c22ff487e86036ce3921ae04b9185efc99a4" + "sha256:0202f753b0a69dd87095b698df00010daf452ab61279747248a042a24892a2a9", + "sha256:0fbe9514ffe35aad337db27b11f7ee1bf27d01059b2e27f112315b185d69de79", + "sha256:18ab7646985a97e02cee72e1ddba2e732d4931d4e1732494ff30c5aa084bfb97", + "sha256:18bb95daa41fd2ff0102844172bc068150bf031186249fc70c6f57fc75c9c0a9", + "sha256:222c65c7022ff52faf3ac6c706e4e8a726ddaa29dabf2173b2a0fdfc1a2f1586", + "sha256:2387c5a2a436669de9157d117fd426dfc2b46ffdc49e43f0a2267380896c04ea", + "sha256:31bad130b701587ab7e74c3c304bb3d63d9f0d365e3f81880203e8e476d914b1", + "sha256:3895014b1a653726a9da5aca852d9e6d0e2c2667bf315d6a2cd632bf7463130b", + "sha256:3d38967ef9c1c0ffabe80827f56817609153e2da83e3dce84476d0928c72972c", + "sha256:5478efd92291084adc9b679666aeaeaafca69d6bf3e95fe3efce82814e3ab782", + "sha256:5c2a6121e4e1e65690b60c270012218e38201bcf700314b1926d5dbeae78a499", + "sha256:5f66f7f76fc870500fe6db0c02d5fc4187062d29e582431f5a986881c5aef4e3", + "sha256:6572d74990b16480608441b941c1cefd60bf742416bc3668cf311980f740768d", + "sha256:6990b9965f31762ac71340869c064f39fb6776beca396d0558d3b5b1ebb7f027", + "sha256:87c82803f9c51c275b16c729aade952ca93c74a8aec963b9b8871df9bbb3120a", + "sha256:8fd32974024052b2260d08b94f970c4c1d92c327ed3570a2b4708070fa53a879", + "sha256:9a81bba33c7fbdb76e6fe8d15b6e793a1916afd4d2463f07d762c69efaaea466", + "sha256:9c31cb9bfaa1004a2a50115a37e1fcb79d664917968399dae3e04610356afe8c", + "sha256:a0b28235c28a088e052f90a0b5fefaa503e5378046a29d0af045e2ec9d5d6555", + "sha256:a3f5022d818b6c91a8bbc466211e6fd708f234909cbb10bc4dbccb2a04884ef6", + "sha256:a7252ca498f510404185e3c1bdda3224e80b1be1a5fbc2b174aab83a477ea0cb", + "sha256:aa8d7136cad8b2a7bf3596e1bc053476edeee567271f197449b2d30ea0c37175", + "sha256:b50a8de6f2820286129fe7d71d76c9e0c0f53a8c83cf39bbe6375b827994e4f1", + "sha256:b528a9c152c569062375d5c2260b59f8243bb4136fc38420854ac1bd4aa0d02f", + "sha256:b72db7201a4aa0445f27af9954d48ed7d2c119ce3b8b253e4dcd514fc72e5dc6", + "sha256:d3444e10ccb5b16e4c1bed3cb3c565ec676b20a21eb43430e70ec4168c631dcc", + "sha256:e16d6f06f4d2161347e51c4bc1f7a8feedeee444d26efa92243f18441a6fa742", + "sha256:f5774bef92d33a62a584f6e7552a9a8653241ecc036e259bfb03d33091599537" ], "index": "pypi", - "version": "==0.29" + "version": "==0.29.1" }, "e1839a8": { "editable": true, @@ -112,24 +112,16 @@ }, "msgpack": { "hashes": [ - "sha256:0b3b1773d2693c70598585a34ca2715873ba899565f0a7c9a1545baef7e7fbdc", - "sha256:0bae5d1538c5c6a75642f75a1781f3ac2275d744a92af1a453c150da3446138b", - "sha256:0ee8c8c85aa651be3aa0cd005b5931769eaa658c948ce79428766f1bd46ae2c3", - "sha256:1369f9edba9500c7a6489b70fdfac773e925342f4531f1e3d4c20ac3173b1ae0", - "sha256:22d9c929d1d539f37da3d1b0e16270fa9d46107beab8c0d4d2bddffffe895cee", - "sha256:2ff43e3247a1e11d544017bb26f580a68306cec7a6257d8818893c1fda665f42", - "sha256:31a98047355d34d047fcdb55b09cb19f633cf214c705a765bd745456c142130c", - "sha256:8767eb0032732c3a0da92cbec5ac186ef89a3258c6edca09161472ca0206c45f", - "sha256:8acc8910218555044e23826980b950e96685dc48124a290c86f6f41a296ea172", - "sha256:ab189a6365be1860a5ecf8159c248f12d33f79ea799ae9695fa6a29896dcf1d4", - "sha256:cfd6535feb0f1cf1c7cdb25773e965cc9f92928244a8c3ef6f8f8a8e1f7ae5c4", - "sha256:e274cd4480d8c76ec467a85a9c6635bbf2258f0649040560382ab58cabb44bcf", - "sha256:f86642d60dca13e93260187d56c2bef2487aa4d574a669e8ceefcf9f4c26fd00", - "sha256:f8a57cbda46a94ed0db55b73e6ab0c15e78b4ede8690fa491a0e55128d552bb0", - "sha256:fcea97a352416afcbccd7af9625159d80704a25c519c251c734527329bb20d0e" + "sha256:102802a9433dcf36f939b632cce9dea87310b2f163bb37ffc8bc343677726e88", + "sha256:64abc6bf3a2ac301702f5760f4e6e227d0fd4d84d9014ef9a40faa9d43365259", + "sha256:72259661a83f8b08ef6ee83927ce4937f841226735824af5b10a536d886eeb36", + "sha256:85f1342b9d7549dd3daf494100d47a3dc7daae703cdbfc2c9ee7bbdc8a492cba", + "sha256:8ce9f88b6cb75d74eda2a5522e5c2e5ec0f17fd78605d6502abb61f46b306865", + "sha256:9936ce3a530ca78db60b6631003b5f4ba383cfb1d9830a27d1b5c61857226e2f", + "sha256:cb4e228f3d93779a1d77a1e9d72759b79dfa2975c1a5bd2a090eaa98239fa4b1" ], "index": "pypi", - "version": "==0.5.6" + "version": "==0.6.0" }, "multio": { "hashes": [ @@ -204,17 +196,17 @@ }, "pdbpp": { "hashes": [ - "sha256:dde77326e4ea41439c243ed065826d53539530eeabd1b6615aae15cfbb9fda05" + "sha256:535085916fcfb768690ba0aeab2967c2a2163a0a60e5b703776846873e171399" ], "index": "pypi", - "version": "==0.9.2" + "version": "==0.9.3" }, "pygments": { "hashes": [ - "sha256:78f3f434bcc5d6ee09020f92ba487f95ba50f1e3ef83ae96b9d5ffa1bab25c5d", - "sha256:dbae1046def0efb574852fab9e90209b23f556367b5a320c0bcb871c77c3e8cc" + "sha256:6301ecb0997a52d2d31385e62d0a4a4cf18d2f2da7054a5ddad5c366cd39cee7", + "sha256:82666aac15622bd7bb685a4ee7f6625dd716da3ef7473620c192c0168aae64fc" ], - "version": "==2.2.0" + "version": "==2.3.0" }, "python-dateutil": { "hashes": [ @@ -246,14 +238,14 @@ }, "sortedcontainers": { "hashes": [ - "sha256:220bb2e3e1886297fd7cdd6d164cb5cf237be1cfae1a3a3e526d149c52816682", - "sha256:b74f2756fb5e23512572cc76f0fe0832fd86310f77dfee54335a35fb33f6b950" + "sha256:974e9a32f56b17c1bac2aebd9dcf197f3eb9cd30553c5852a3187ad162e1a03a", + "sha256:d9e96492dd51fae31e60837736b38fe42a187b5404c16606ff7ee7cd582d4c60" ], - "version": "==2.0.5" + "version": "==2.1.0" }, "tractor": { "git": "git://github.com/tgoodlet/tractor.git", - "ref": "71bb87aa3a249af37ec68d00b0a5853f58923f1e" + "ref": "c0cdb3945a9a9538b65bd76038f263e859fbbfe7" }, "trio": { "hashes": [ @@ -314,37 +306,37 @@ }, "cython": { "hashes": [ - "sha256:019008a69e6b7c102f2ed3d733a288d1784363802b437dd2b91e6256b12746da", - "sha256:1441fe19c56c90b8c2159d7b861c31a134d543ef7886fd82a5d267f9f11f35ac", - "sha256:1d1a5e9d6ed415e75a676b72200ad67082242ec4d2d76eb7446da255ae72d3f7", - "sha256:339f5b985de3662b1d6c69991ab46fdbdc736feb4ac903ef6b8c00e14d87f4d8", - "sha256:35bdf3f48535891fee2eaade70e91d5b2cc1ee9fc2a551847c7ec18bce55a92c", - "sha256:3d0afba0aec878639608f013045697fb0969ff60b3aea2daec771ea8d01ad112", - "sha256:42c53786806e24569571a7a24ebe78ec6b364fe53e79a3f27eddd573cacd398f", - "sha256:48b919da89614d201e72fbd8247b5ae8881e296cf968feb5595a015a14c67f1f", - "sha256:49906e008eeb91912654a36c200566392bd448b87a529086694053a280f8af2d", - "sha256:49fc01a7c9c4e3c1784e9a15d162c2cac3990fcc28728227a6f8f0837aabda7c", - "sha256:501b671b639b9ca17ad303f8807deb1d0ff754d1dab106f2607d14b53cb0ff0b", - "sha256:5574574142364804423ab4428bd331a05c65f7ecfd31ac97c936f0c720fe6a53", - "sha256:6092239a772b3c6604be9e94b9ab4f0dacb7452e8ad299fd97eae0611355b679", - "sha256:71ff5c7632501c4f60edb8a24fd0a772e04c5bdca2856d978d04271b63666ef7", - "sha256:7dcf2ad14e25b05eda8bdd104f8c03a642a384aeefd25a5b51deac0826e646fa", - "sha256:8ca3a99f5a7443a6a8f83a5d8fcc11854b44e6907e92ba8640d8a8f7b9085e21", - "sha256:927da3b5710fb705aab173ad630b45a4a04c78e63dcd89411a065b2fe60e4770", - "sha256:94916d1ede67682638d3cc0feb10648ff14dc51fb7a7f147f4fedce78eaaea97", - "sha256:a3e5e5ca325527d312cdb12a4dab8b0459c458cad1c738c6f019d0d8d147081c", - "sha256:a7716a98f0b9b8f61ddb2bae7997daf546ac8fc594be6ba397f4bde7d76bfc62", - "sha256:acf10d1054de92af8d5bfc6620bb79b85f04c98214b4da7db77525bfa9fc2a89", - "sha256:de46ffb67e723975f5acab101c5235747af1e84fbbc89bf3533e2ea93fb26947", - "sha256:df428969154a9a4cd9748c7e6efd18432111fbea3d700f7376046c38c5e27081", - "sha256:f5ebf24b599caf466f9da8c4115398d663b2567b89e92f58a835e9da4f74669f", - "sha256:f79e45d5c122c4fb1fd54029bf1d475cecc05f4ed5b68136b0d6ec268bae68b6", - "sha256:f7a43097d143bd7846ffba6d2d8cd1cc97f233318dbd0f50a235ea01297a096b", - "sha256:fceb8271bc2fd3477094ca157c824e8ea840a7b393e89e766eea9a3b9ce7e0c6", - "sha256:ff919ceb40259f5332db43803aa6c22ff487e86036ce3921ae04b9185efc99a4" + "sha256:0202f753b0a69dd87095b698df00010daf452ab61279747248a042a24892a2a9", + "sha256:0fbe9514ffe35aad337db27b11f7ee1bf27d01059b2e27f112315b185d69de79", + "sha256:18ab7646985a97e02cee72e1ddba2e732d4931d4e1732494ff30c5aa084bfb97", + "sha256:18bb95daa41fd2ff0102844172bc068150bf031186249fc70c6f57fc75c9c0a9", + "sha256:222c65c7022ff52faf3ac6c706e4e8a726ddaa29dabf2173b2a0fdfc1a2f1586", + "sha256:2387c5a2a436669de9157d117fd426dfc2b46ffdc49e43f0a2267380896c04ea", + "sha256:31bad130b701587ab7e74c3c304bb3d63d9f0d365e3f81880203e8e476d914b1", + "sha256:3895014b1a653726a9da5aca852d9e6d0e2c2667bf315d6a2cd632bf7463130b", + "sha256:3d38967ef9c1c0ffabe80827f56817609153e2da83e3dce84476d0928c72972c", + "sha256:5478efd92291084adc9b679666aeaeaafca69d6bf3e95fe3efce82814e3ab782", + "sha256:5c2a6121e4e1e65690b60c270012218e38201bcf700314b1926d5dbeae78a499", + "sha256:5f66f7f76fc870500fe6db0c02d5fc4187062d29e582431f5a986881c5aef4e3", + "sha256:6572d74990b16480608441b941c1cefd60bf742416bc3668cf311980f740768d", + "sha256:6990b9965f31762ac71340869c064f39fb6776beca396d0558d3b5b1ebb7f027", + "sha256:87c82803f9c51c275b16c729aade952ca93c74a8aec963b9b8871df9bbb3120a", + "sha256:8fd32974024052b2260d08b94f970c4c1d92c327ed3570a2b4708070fa53a879", + "sha256:9a81bba33c7fbdb76e6fe8d15b6e793a1916afd4d2463f07d762c69efaaea466", + "sha256:9c31cb9bfaa1004a2a50115a37e1fcb79d664917968399dae3e04610356afe8c", + "sha256:a0b28235c28a088e052f90a0b5fefaa503e5378046a29d0af045e2ec9d5d6555", + "sha256:a3f5022d818b6c91a8bbc466211e6fd708f234909cbb10bc4dbccb2a04884ef6", + "sha256:a7252ca498f510404185e3c1bdda3224e80b1be1a5fbc2b174aab83a477ea0cb", + "sha256:aa8d7136cad8b2a7bf3596e1bc053476edeee567271f197449b2d30ea0c37175", + "sha256:b50a8de6f2820286129fe7d71d76c9e0c0f53a8c83cf39bbe6375b827994e4f1", + "sha256:b528a9c152c569062375d5c2260b59f8243bb4136fc38420854ac1bd4aa0d02f", + "sha256:b72db7201a4aa0445f27af9954d48ed7d2c119ce3b8b253e4dcd514fc72e5dc6", + "sha256:d3444e10ccb5b16e4c1bed3cb3c565ec676b20a21eb43430e70ec4168c631dcc", + "sha256:e16d6f06f4d2161347e51c4bc1f7a8feedeee444d26efa92243f18441a6fa742", + "sha256:f5774bef92d33a62a584f6e7552a9a8653241ecc036e259bfb03d33091599537" ], "index": "pypi", - "version": "==0.29" + "version": "==0.29.1" }, "fancycompleter": { "hashes": [ @@ -376,24 +368,16 @@ }, "msgpack": { "hashes": [ - "sha256:0b3b1773d2693c70598585a34ca2715873ba899565f0a7c9a1545baef7e7fbdc", - "sha256:0bae5d1538c5c6a75642f75a1781f3ac2275d744a92af1a453c150da3446138b", - "sha256:0ee8c8c85aa651be3aa0cd005b5931769eaa658c948ce79428766f1bd46ae2c3", - "sha256:1369f9edba9500c7a6489b70fdfac773e925342f4531f1e3d4c20ac3173b1ae0", - "sha256:22d9c929d1d539f37da3d1b0e16270fa9d46107beab8c0d4d2bddffffe895cee", - "sha256:2ff43e3247a1e11d544017bb26f580a68306cec7a6257d8818893c1fda665f42", - "sha256:31a98047355d34d047fcdb55b09cb19f633cf214c705a765bd745456c142130c", - "sha256:8767eb0032732c3a0da92cbec5ac186ef89a3258c6edca09161472ca0206c45f", - "sha256:8acc8910218555044e23826980b950e96685dc48124a290c86f6f41a296ea172", - "sha256:ab189a6365be1860a5ecf8159c248f12d33f79ea799ae9695fa6a29896dcf1d4", - "sha256:cfd6535feb0f1cf1c7cdb25773e965cc9f92928244a8c3ef6f8f8a8e1f7ae5c4", - "sha256:e274cd4480d8c76ec467a85a9c6635bbf2258f0649040560382ab58cabb44bcf", - "sha256:f86642d60dca13e93260187d56c2bef2487aa4d574a669e8ceefcf9f4c26fd00", - "sha256:f8a57cbda46a94ed0db55b73e6ab0c15e78b4ede8690fa491a0e55128d552bb0", - "sha256:fcea97a352416afcbccd7af9625159d80704a25c519c251c734527329bb20d0e" + "sha256:102802a9433dcf36f939b632cce9dea87310b2f163bb37ffc8bc343677726e88", + "sha256:64abc6bf3a2ac301702f5760f4e6e227d0fd4d84d9014ef9a40faa9d43365259", + "sha256:72259661a83f8b08ef6ee83927ce4937f841226735824af5b10a536d886eeb36", + "sha256:85f1342b9d7549dd3daf494100d47a3dc7daae703cdbfc2c9ee7bbdc8a492cba", + "sha256:8ce9f88b6cb75d74eda2a5522e5c2e5ec0f17fd78605d6502abb61f46b306865", + "sha256:9936ce3a530ca78db60b6631003b5f4ba383cfb1d9830a27d1b5c61857226e2f", + "sha256:cb4e228f3d93779a1d77a1e9d72759b79dfa2975c1a5bd2a090eaa98239fa4b1" ], "index": "pypi", - "version": "==0.5.6" + "version": "==0.6.0" }, "multio": { "hashes": [ @@ -468,10 +452,10 @@ }, "pdbpp": { "hashes": [ - "sha256:dde77326e4ea41439c243ed065826d53539530eeabd1b6615aae15cfbb9fda05" + "sha256:535085916fcfb768690ba0aeab2967c2a2163a0a60e5b703776846873e171399" ], "index": "pypi", - "version": "==0.9.2" + "version": "==0.9.3" }, "piker": { "editable": true, @@ -493,18 +477,18 @@ }, "pygments": { "hashes": [ - "sha256:78f3f434bcc5d6ee09020f92ba487f95ba50f1e3ef83ae96b9d5ffa1bab25c5d", - "sha256:dbae1046def0efb574852fab9e90209b23f556367b5a320c0bcb871c77c3e8cc" + "sha256:6301ecb0997a52d2d31385e62d0a4a4cf18d2f2da7054a5ddad5c366cd39cee7", + "sha256:82666aac15622bd7bb685a4ee7f6625dd716da3ef7473620c192c0168aae64fc" ], - "version": "==2.2.0" + "version": "==2.3.0" }, "pytest": { "hashes": [ - "sha256:3f193df1cfe1d1609d4c583838bea3d532b18d6160fd3f55c9447fdca30848ec", - "sha256:e246cf173c01169b9617fc07264b7b1316e78d7a650055235d6d897bc80d9660" + "sha256:1d131cc532be0023ef8ae265e2a779938d0619bb6c2510f52987ffcba7fa1ee4", + "sha256:ca4761407f1acc85ffd1609f464ca20bb71a767803505bd4127d0e45c5a50e23" ], "index": "pypi", - "version": "==3.10.1" + "version": "==4.0.1" }, "python-dateutil": { "hashes": [ @@ -536,10 +520,10 @@ }, "sortedcontainers": { "hashes": [ - "sha256:220bb2e3e1886297fd7cdd6d164cb5cf237be1cfae1a3a3e526d149c52816682", - "sha256:b74f2756fb5e23512572cc76f0fe0832fd86310f77dfee54335a35fb33f6b950" + "sha256:974e9a32f56b17c1bac2aebd9dcf197f3eb9cd30553c5852a3187ad162e1a03a", + "sha256:d9e96492dd51fae31e60837736b38fe42a187b5404c16606ff7ee7cd582d4c60" ], - "version": "==2.0.5" + "version": "==2.1.0" }, "trio": { "hashes": [