From 4753dc2db8d641f02c8b6c41cb8a452ccb9f8dcb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jan 2019 21:12:35 -0500 Subject: [PATCH 01/16] Alway teardown quote gen on exit --- piker/ui/monitor.py | 60 +++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/piker/ui/monitor.py b/piker/ui/monitor.py index 18773da6..fafab986 100644 --- a/piker/ui/monitor.py +++ b/piker/ui/monitor.py @@ -140,8 +140,6 @@ async def update_quotes( log.debug("Waiting on quotes") log.warn("Data feed connection dropped") - # XXX: if we're cancelled this should never get called - # nursery.cancel_scope.cancel() async def stream_symbol_selection(): @@ -227,38 +225,36 @@ async def _async_main( table.bind(minimum_height=table.setter('height')) ss = tractor.current_actor().statespace - try: - async with trio.open_nursery() as nursery: - pager = PagerView( - container=box, - contained=table, - nursery=nursery - ) - box.add_widget(pager) - - widgets = { - 'root': box, - 'table': table, - 'box': box, - 'header': header, - 'pager': pager, - } - ss['widgets'] = widgets - nursery.start_soon( - update_quotes, - nursery, - brokermod.format_stock_quote, - widgets, - quote_gen, - feed._symbol_data_cache, - quotes - ) + async with trio.open_nursery() as nursery: + pager = PagerView( + container=box, + contained=table, + nursery=nursery + ) + box.add_widget(pager) + widgets = { + 'root': box, + 'table': table, + 'box': box, + 'header': header, + 'pager': pager, + } + ss['widgets'] = widgets + nursery.start_soon( + update_quotes, + nursery, + brokermod.format_stock_quote, + widgets, + quote_gen, + feed._symbol_data_cache, + quotes + ) + try: # Trio-kivy entry point. await async_runTouchApp(widgets['root']) # run kivy + finally: + # cancel remote data feed task + await quote_gen.aclose() # cancel GUI update task nursery.cancel_scope.cancel() - finally: - with trio.open_cancel_scope(shield=True): - # cancel aysnc gen call - await quote_gen.aclose() From c94ce47aa64102763c2d59b94a6af1edbf744068 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jan 2019 21:13:22 -0500 Subject: [PATCH 02/16] Always set contract sub state --- piker/ui/option_chain.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/piker/ui/option_chain.py b/piker/ui/option_chain.py index d68808a9..276ff343 100644 --- a/piker/ui/option_chain.py +++ b/piker/ui/option_chain.py @@ -365,6 +365,8 @@ class OptionChain(object): label.symbol = symbol if table: table.add_widget(label) + # always keep track of current subscription + self.symbol, self.expiry = symbol, expiry return # start streaming soonest contract by default if not provided @@ -498,5 +500,6 @@ async def _async_main( # trio-kivy entry point. await async_runTouchApp(chain.widgets['root']) # run kivy finally: + await chain._quote_gen.aclose() # cancel GUI update task nursery.cancel_scope.cancel() From 22670afe58cacc29926c2bcd49179d0cefca4e76 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jan 2019 21:23:49 -0500 Subject: [PATCH 03/16] Generalize the publisher/fan-out system Start working toward a more general (on-demand) pub-sub system which can be brought into ``tractor``. Right now this just means making the code in the `fan_out_to_ctxs()` less specific but, eventually I think this function should be coupled with a decorator and shipped as a standard "message pattern". Additionally, - try out making `BrokerFeed` a `@dataclass` - strip out all the `trio.Event` / uneeded nursery / extra task crap from `start_quote_stream()` --- piker/brokers/data.py | 221 ++++++++++++++++++++---------------------- 1 file changed, 103 insertions(+), 118 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index ff959ec0..bc3ff369 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -3,12 +3,13 @@ Live data feed machinery """ import time from functools import partial +from dataclasses import dataclass, field from itertools import cycle import socket import json from types import ModuleType import typing -from typing import Coroutine, Callable, Dict, List, Any +from typing import Coroutine, Callable, Dict, List, Any, Tuple import contextlib from operator import itemgetter @@ -44,8 +45,9 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: async def stream_quotes( + get_topics: typing.Callable, + get_quotes: Coroutine, brokermod: ModuleType, - request_quotes: Coroutine, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -58,8 +60,14 @@ async def stream_quotes( sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching - while True: # use an event here to trigger exit? + async def request_quotes(): + """Get quotes for current symbol subscription set. + """ + symbols = get_topics() + # subscription can be changed at any time + return await get_quotes(symbols) if symbols else () + while True: # use an event here to trigger exit? prequote_start = time.time() with trio.move_on_after(3) as cancel_scope: @@ -71,14 +79,13 @@ async def stream_quotes( if cancelled: log.warn("Quote query timed out after 3 seconds, retrying...") # handle network outages by idling until response is received - # quotes = await wait_for_network(partial(get_quotes, tickers)) quotes = await wait_for_network(request_quotes) - new_quotes = [] + new_quotes = {} if diff_cached: # If cache is enabled then only deliver "new" changes. # Useful for polling setups but obviously should be - # disabled if you're rx-ing event data. + # disabled if you're rx-ing per-tick data. for quote in quotes: symbol = quote['symbol'] last = _cache.setdefault(symbol, {}) @@ -87,10 +94,11 @@ async def stream_quotes( log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote - new_quotes.append(quote) + new_quotes[symbol] = quote else: - new_quotes = quotes log.info(f"Delivering quotes:\n{quotes}") + for quote in quotes: + newquotes[quote['symbol']] = quote yield new_quotes @@ -112,7 +120,8 @@ async def stream_quotes( # TODO: at this point probably just just make this a class and # a lot of these functions should be methods. It will definitely # make stateful UI apps easier to implement -class BrokerFeed(typing.NamedTuple): +@dataclass +class BrokerFeed: """A per broker "client feed" container. A structure to keep track of components used by @@ -124,20 +133,26 @@ class BrokerFeed(typing.NamedTuple): mod: ModuleType client: object exit_stack: contextlib.AsyncExitStack - quoter_keys: List[str] = ['stock', 'option'] - tasks: Dict[str, trio.Event] = dict.fromkeys( - quoter_keys, False) - quoters: Dict[str, typing.Coroutine] = {} - subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}} + quoter_keys: Tuple[str] = ('stock', 'option') + locks: Dict[str, trio.StrictFIFOLock] = field( + default_factory=lambda: + {'stock': trio.StrictFIFOLock(), 'option': trio.StrictFIFOLock()} + ) + quoters: Dict[str, typing.Coroutine] = field(default_factory=dict) + subscriptions: Dict[str, Dict[str, set]] = field( + default_factory=partial(dict, **{'option': {}, 'stock': {}}) + ) -async def fan_out_to_chans( +async def fan_out_to_ctxs( + pub_gen: typing.AsyncGenerator, feed: BrokerFeed, get_quotes: Coroutine, - symbols2chans: Dict[str, tractor.Channel], + topics2ctxs: Dict[str, tractor.Context], + topic_key: str = 'key', + packet_key: str = 'symbol', rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue - cid: str = None, ) -> None: """Request and fan out quotes to each subscribed actor channel. """ @@ -146,45 +161,44 @@ async def fan_out_to_chans( rate = broker_limit log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") - async def request(): - """Get quotes for current symbol subscription set. - """ - symbols = list(symbols2chans.keys()) - # subscription can be changed at any time - return await get_quotes(symbols) if symbols else () + def get_topics(): + return tuple(topics2ctxs.keys()) - async for quotes in stream_quotes( - feed.mod, request, rate, + async for published in pub_gen( + get_topics, + get_quotes, + feed.mod, + rate, diff_cached=diff_cached, ): - chan_payloads = {} - for quote in quotes: - packet = {quote['symbol']: quote} - for chan, cid in symbols2chans.get(quote['key'], set()): - chan_payloads.setdefault( - (chan, cid), - {'yield': {}, 'cid': cid} - )['yield'].update(packet) + ctx_payloads = {} + for packet_key, data in published.items(): + # grab each suscription topic using provided key for lookup + topic = data[topic_key] + # build a new dict packet for passing to multiple underlying channels + packet = {packet_key: data} + for ctx in topics2ctxs.get(topic, set()): + ctx_payloads.setdefault(ctx, {}).update(packet), # deliver to each subscriber (fan out) - if chan_payloads: - for (chan, cid), payload in chan_payloads.items(): + if ctx_payloads: + for ctx, payload in ctx_payloads.items(): try: - await chan.send(payload) + await ctx.send_yield(payload) except ( # That's right, anything you can think of... trio.ClosedStreamError, ConnectionResetError, ConnectionRefusedError, ): - log.warn(f"{chan} went down?") - for chanset in symbols2chans.values(): - chanset.discard((chan, cid)) + log.warn(f"{ctx.chan} went down?") + for ctx_set in topics2ctxs.values(): + ctx_set.discard(ctx) - if not any(symbols2chans.values()): + if not any(topics2ctxs.values()): log.warn(f"No subs left for broker {feed.mod.name}, exiting task") break - log.info(f"Terminating stream quoter task for {feed.mod.name}") + log.info(f"Terminating stream quoter task for {pub_gen.__name__}") async def symbol_data(broker: str, tickers: List[str]): @@ -241,12 +255,12 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### -def modify_quote_stream(broker, feed_type, symbols, chan, cid): +def modify_quote_stream(broker, feed_type, symbols, ctx): """Absolute symbol subscription list for each quote stream. Effectively a symbol subscription api. """ - log.info(f"{chan} changed symbol subscription to {symbols}") + log.info(f"{ctx.chan} changed symbol subscription to {symbols}") ss = tractor.current_actor().statespace feed = ss['feeds'].get(broker) if feed is None: @@ -254,26 +268,23 @@ def modify_quote_stream(broker, feed_type, symbols, chan, cid): "`get_cached_feed()` must be called before modifying its stream" ) - symbols2chans = feed.subscriptions[feed_type] + symbols2ctxs = feed.subscriptions[feed_type] # update map from each symbol to requesting client's chan for ticker in symbols: - symbols2chans.setdefault(ticker, set()).add((chan, cid)) + symbols2ctxs.setdefault(ticker, set()).add(ctx) # remove any existing symbol subscriptions if symbol is not # found in ``symbols`` # TODO: this can likely be factored out into the pub-sub api for ticker in filter( - lambda ticker: ticker not in symbols, symbols2chans.copy() + lambda ticker: ticker not in symbols, symbols2ctxs.copy() ): - chanset = symbols2chans.get(ticker) - # XXX: cid will be different on unsub call - for item in chanset.copy(): - if (chan, cid) == item: - chanset.discard(item) + ctx_set = symbols2ctxs.get(ticker) + ctx_set.discard(ctx) - if not chanset: + if not ctx_set: # pop empty sets which will trigger bg quoter task termination - symbols2chans.pop(ticker) + symbols2ctxs.pop(ticker) async def get_cached_feed( @@ -310,8 +321,7 @@ async def start_quote_stream( symbols: List[Any], feed_type: str = 'stock', diff_cached: bool = True, - chan: tractor.Channel = None, - cid: str = None, + ctx: tractor.Context = None, rate: int = 3, ) -> None: """Handle per-broker quote stream subscriptions using a "lazy" pub-sub @@ -321,16 +331,17 @@ async def start_quote_stream( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ - actor = tractor.current_actor() - # set log level after fork - get_console_log(actor.loglevel) + # XXX: why do we need this again? + get_console_log(tractor.current_actor().loglevel) + # pull global vars from local actor symbols = list(symbols) log.info( - f"{chan.uid} subscribed to {broker} for symbols {symbols}") + f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}") # another actor task may have already created it feed = await get_cached_feed(broker) - symbols2chans = feed.subscriptions[feed_type] + symbols2ctxs = feed.subscriptions[feed_type] + task_is_dead = None if feed_type == 'stock': get_quotes = feed.quoters.setdefault( @@ -341,7 +352,7 @@ async def start_quote_stream( # out bad symbols for now) payload = await smoke_quote(get_quotes, symbols, broker) # push initial smoke quote response for client initialization - await chan.send({'yield': payload, 'cid': cid}) + await ctx.send_yield(payload) elif feed_type == 'option': # FIXME: yeah we need maybe a more general way to specify # the arg signature for the option feed beasides a symbol @@ -355,77 +366,51 @@ async def start_quote_stream( for quote in await get_quotes(symbols) } # push initial smoke quote response for client initialization - await chan.send({'yield': payload, 'cid': cid}) + await ctx.send_yield(payload) try: # update map from each symbol to requesting client's chan - modify_quote_stream(broker, feed_type, symbols, chan, cid) + modify_quote_stream(broker, feed_type, symbols, ctx) - # event indicating that task was started and then killed - task_is_dead = feed.tasks.get(feed_type) - if task_is_dead is False: - task_is_dead = trio.Event() - task_is_dead.set() - feed.tasks[feed_type] = task_is_dead + # prevents more then one broker feed task from spawning + lock = feed.locks.get(feed_type) - if not task_is_dead.is_set(): - # block and let existing feed task deliver - # stream data until it is cancelled in which case - # we'll take over and spawn it again - await task_is_dead.wait() - # client channel was likely disconnected - # but we still want to keep the broker task - # alive if there are other consumers (including - # ourselves) - if any(symbols2chans.values()): - log.warn( - f"Data feed task for {feed.mod.name} was cancelled but" - f" there are still active clients, respawning") - - # no data feeder task yet; so start one - respawn = True - while respawn: - respawn = False - log.info(f"Spawning data feed task for {feed.mod.name}") - try: - async with trio.open_nursery() as nursery: - nursery.start_soon( - partial( - fan_out_to_chans, feed, get_quotes, - symbols2chans, - diff_cached=diff_cached, - cid=cid, - rate=rate, - ) + # block and let existing feed task deliver + # stream data until it is cancelled in which case + # we'll take over and spawn it again + async with lock: + # no data feeder task yet; so start one + respawn = True + while respawn: + respawn = False + log.info(f"Spawning data feed task for {feed.mod.name}") + try: + # unblocks when no more symbols subscriptions exist and the + # quote streamer task terminates + await fan_out_to_ctxs( + stream_quotes, + feed, + get_quotes, + symbols2ctxs, + diff_cached=diff_cached, + rate=rate, ) - # it's alive! - task_is_dead.clear() - - except trio.BrokenResourceError: - log.exception("Respawning failed data feed task") - respawn = True - - # unblocks when no more symbols subscriptions exist and the - # quote streamer task terminates (usually because another call - # was made to `modify_quoter` to unsubscribe from streaming - # symbols) + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True finally: - log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}") - task_is_dead.set() - # if we're cancelled externally unsubscribe our quote feed - modify_quote_stream(broker, feed_type, [], chan, cid) + modify_quote_stream(broker, feed_type, [], ctx) # if there are truly no more subscriptions with this broker # drop from broker subs dict - if not any(symbols2chans.values()): - log.info(f"No more subscriptions for {broker}") - # broker2symbolsubs.pop(broker, None) + if not any(symbols2ctxs.values()): + log.info(f"No more subscriptions for broker {broker}") # destroy the API client await feed.exit_stack.aclose() -class DataFeed(object): +class DataFeed: """Data feed client for streaming symbol data from a (remote) ``brokerd`` data daemon. """ @@ -472,7 +457,7 @@ class DataFeed(object): filename=test ) else: - log.info(f"Starting new stream for {self._symbols}") + log.info(f"Starting new stream for {symbols}") # start live streaming from broker daemon quote_gen = await self.portal.run( "piker.brokers.data", From 2514843fc1f80e130b7a13a52ce6f5baa03b040f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 27 Jan 2019 14:50:04 -0500 Subject: [PATCH 04/16] Port to the new `@tractor.msg.pub` decorator API The pub-sub data feed system was factored into `tractor` as an experimental api / subsystem. Move to using that which greatly simplifies the data feed architecture. --- piker/brokers/data.py | 216 ++++++++++++------------------------------ 1 file changed, 61 insertions(+), 155 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index bc3ff369..09847615 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -44,10 +44,38 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: await trio.sleep(sleep) +# TODO: at this point probably just just make this a class and +# a lot of these functions should be methods. It will definitely +# make stateful UI apps easier to implement +@dataclass +class BrokerFeed: + """A per broker "client feed" container. + + A structure to keep track of components used by + real-time data daemons. This is a backend "client" which pulls + data from broker specific data lakes: + + ``DataFeed`` <- tractor -> ``BrokerFeed`` <- broker IPC -> broker API + """ + mod: ModuleType + client: object + exit_stack: contextlib.AsyncExitStack + quoter_keys: Tuple[str] = ('stock', 'option') + locks: Dict[str, trio.StrictFIFOLock] = field( + default_factory=lambda: + {'stock': trio.StrictFIFOLock(), 'option': trio.StrictFIFOLock()} + ) + quoters: Dict[str, typing.Coroutine] = field(default_factory=dict) + subscriptions: Dict[str, Dict[str, set]] = field( + default_factory=partial(dict, **{'option': {}, 'stock': {}}) + ) + + +@tractor.msg.pub(tasks=['stock', 'option']) async def stream_quotes( get_topics: typing.Callable, get_quotes: Coroutine, - brokermod: ModuleType, + feed: BrokerFeed, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -57,6 +85,11 @@ async def stream_quotes( A stock-broker client ``get_quotes()`` async context manager must be provided which returns an async quote retrieval function. """ + broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) + if broker_limit < rate: + rate = broker_limit + log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") + sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching @@ -94,11 +127,11 @@ async def stream_quotes( log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote - new_quotes[symbol] = quote + new_quotes[quote['key']] = quote else: log.info(f"Delivering quotes:\n{quotes}") for quote in quotes: - newquotes[quote['symbol']] = quote + new_quotes[quote['symbol']] = quote yield new_quotes @@ -117,90 +150,6 @@ async def stream_quotes( await trio.sleep(delay) -# TODO: at this point probably just just make this a class and -# a lot of these functions should be methods. It will definitely -# make stateful UI apps easier to implement -@dataclass -class BrokerFeed: - """A per broker "client feed" container. - - A structure to keep track of components used by - real-time data daemons. This is a backend "client" which pulls - data from broker specific data lakes: - - ``DataFeed`` <- tractor -> ``BrokerFeed`` <- broker IPC -> broker API - """ - mod: ModuleType - client: object - exit_stack: contextlib.AsyncExitStack - quoter_keys: Tuple[str] = ('stock', 'option') - locks: Dict[str, trio.StrictFIFOLock] = field( - default_factory=lambda: - {'stock': trio.StrictFIFOLock(), 'option': trio.StrictFIFOLock()} - ) - quoters: Dict[str, typing.Coroutine] = field(default_factory=dict) - subscriptions: Dict[str, Dict[str, set]] = field( - default_factory=partial(dict, **{'option': {}, 'stock': {}}) - ) - - -async def fan_out_to_ctxs( - pub_gen: typing.AsyncGenerator, - feed: BrokerFeed, - get_quotes: Coroutine, - topics2ctxs: Dict[str, tractor.Context], - topic_key: str = 'key', - packet_key: str = 'symbol', - rate: int = 5, # delay between quote requests - diff_cached: bool = True, # only deliver "new" quotes to the queue -) -> None: - """Request and fan out quotes to each subscribed actor channel. - """ - broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) - if broker_limit < rate: - rate = broker_limit - log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") - - def get_topics(): - return tuple(topics2ctxs.keys()) - - async for published in pub_gen( - get_topics, - get_quotes, - feed.mod, - rate, - diff_cached=diff_cached, - ): - ctx_payloads = {} - for packet_key, data in published.items(): - # grab each suscription topic using provided key for lookup - topic = data[topic_key] - # build a new dict packet for passing to multiple underlying channels - packet = {packet_key: data} - for ctx in topics2ctxs.get(topic, set()): - ctx_payloads.setdefault(ctx, {}).update(packet), - - # deliver to each subscriber (fan out) - if ctx_payloads: - for ctx, payload in ctx_payloads.items(): - try: - await ctx.send_yield(payload) - except ( - # That's right, anything you can think of... - trio.ClosedStreamError, ConnectionResetError, - ConnectionRefusedError, - ): - log.warn(f"{ctx.chan} went down?") - for ctx_set in topics2ctxs.values(): - ctx_set.discard(ctx) - - if not any(topics2ctxs.values()): - log.warn(f"No subs left for broker {feed.mod.name}, exiting task") - break - - log.info(f"Terminating stream quoter task for {pub_gen.__name__}") - - async def symbol_data(broker: str, tickers: List[str]): """Retrieve baseline symbol info from broker. """ @@ -255,38 +204,6 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### -def modify_quote_stream(broker, feed_type, symbols, ctx): - """Absolute symbol subscription list for each quote stream. - - Effectively a symbol subscription api. - """ - log.info(f"{ctx.chan} changed symbol subscription to {symbols}") - ss = tractor.current_actor().statespace - feed = ss['feeds'].get(broker) - if feed is None: - raise RuntimeError( - "`get_cached_feed()` must be called before modifying its stream" - ) - - symbols2ctxs = feed.subscriptions[feed_type] - # update map from each symbol to requesting client's chan - for ticker in symbols: - symbols2ctxs.setdefault(ticker, set()).add(ctx) - - # remove any existing symbol subscriptions if symbol is not - # found in ``symbols`` - # TODO: this can likely be factored out into the pub-sub api - for ticker in filter( - lambda ticker: ticker not in symbols, symbols2ctxs.copy() - ): - ctx_set = symbols2ctxs.get(ticker) - ctx_set.discard(ctx) - - if not ctx_set: - # pop empty sets which will trigger bg quoter task termination - symbols2ctxs.pop(ticker) - - async def get_cached_feed( brokername: str, ) -> BrokerFeed: @@ -317,11 +234,11 @@ async def get_cached_feed( async def start_quote_stream( + ctx: tractor.Context, # marks this as a streaming func broker: str, symbols: List[Any], feed_type: str = 'stock', diff_cached: bool = True, - ctx: tractor.Context = None, rate: int = 3, ) -> None: """Handle per-broker quote stream subscriptions using a "lazy" pub-sub @@ -341,7 +258,7 @@ async def start_quote_stream( # another actor task may have already created it feed = await get_cached_feed(broker) symbols2ctxs = feed.subscriptions[feed_type] - task_is_dead = None + packetizer = None if feed_type == 'stock': get_quotes = feed.quoters.setdefault( @@ -351,8 +268,7 @@ async def start_quote_stream( # do a smoke quote (note this mutates the input list and filters # out bad symbols for now) payload = await smoke_quote(get_quotes, symbols, broker) - # push initial smoke quote response for client initialization - await ctx.send_yield(payload) + elif feed_type == 'option': # FIXME: yeah we need maybe a more general way to specify # the arg signature for the option feed beasides a symbol @@ -361,46 +277,36 @@ async def start_quote_stream( 'option', await feed.mod.option_quoter(feed.client, symbols) ) + # packetize payload = { quote['symbol']: quote for quote in await get_quotes(symbols) } - # push initial smoke quote response for client initialization - await ctx.send_yield(payload) + + def packetizer(topic, quote): + return {quote['symbol']: quote} + + # push initial smoke quote response for client initialization + await ctx.send_yield(payload) + try: - # update map from each symbol to requesting client's chan - modify_quote_stream(broker, feed_type, symbols, ctx) + await stream_quotes( - # prevents more then one broker feed task from spawning - lock = feed.locks.get(feed_type) + # pub required kwargs + task_name=feed_type, + ctx=ctx, + topics=symbols, + packetizer=packetizer, - # block and let existing feed task deliver - # stream data until it is cancelled in which case - # we'll take over and spawn it again - async with lock: - # no data feeder task yet; so start one - respawn = True - while respawn: - respawn = False - log.info(f"Spawning data feed task for {feed.mod.name}") - try: - # unblocks when no more symbols subscriptions exist and the - # quote streamer task terminates - await fan_out_to_ctxs( - stream_quotes, - feed, - get_quotes, - symbols2ctxs, - diff_cached=diff_cached, - rate=rate, - ) - except trio.BrokenResourceError: - log.exception("Respawning failed data feed task") - respawn = True + # actual func args + feed=feed, + get_quotes=get_quotes, + diff_cached=diff_cached, + rate=rate, + ) + log.info( + f"Terminating stream quoter task for {feed.mod.name}") finally: - # if we're cancelled externally unsubscribe our quote feed - modify_quote_stream(broker, feed_type, [], ctx) - # if there are truly no more subscriptions with this broker # drop from broker subs dict if not any(symbols2ctxs.values()): From 9b37607b049eebb51e3e03dcc09a516ad1740a50 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 27 Jan 2019 22:10:49 -0500 Subject: [PATCH 05/16] Deps bump --- Pipfile.lock | 450 ++++++++++++++++++++++++++------------------------- 1 file changed, 229 insertions(+), 221 deletions(-) diff --git a/Pipfile.lock b/Pipfile.lock index 7a26f213..72cecc49 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -43,44 +43,44 @@ }, "colorlog": { "hashes": [ - "sha256:418db638c9577f37f0fae4914074f395847a728158a011be2a193ac491b9779d", - "sha256:8b234ebae1ba1237bc79c0d5f1f47b31a3f3e90c0b4c2b0ebdde63a174d3b97b" + "sha256:3cf31b25cbc8f86ec01fef582ef3b840950dea414084ed19ab922c8b493f9b42", + "sha256:450f52ea2a2b6ebb308f034ea9a9b15cea51e65650593dca1da3eb792e4e4981" ], - "version": "==3.1.4" + "version": "==4.0.2" }, "cython": { "hashes": [ - "sha256:0202f753b0a69dd87095b698df00010daf452ab61279747248a042a24892a2a9", - "sha256:0fbe9514ffe35aad337db27b11f7ee1bf27d01059b2e27f112315b185d69de79", - "sha256:18ab7646985a97e02cee72e1ddba2e732d4931d4e1732494ff30c5aa084bfb97", - "sha256:18bb95daa41fd2ff0102844172bc068150bf031186249fc70c6f57fc75c9c0a9", - "sha256:222c65c7022ff52faf3ac6c706e4e8a726ddaa29dabf2173b2a0fdfc1a2f1586", - "sha256:2387c5a2a436669de9157d117fd426dfc2b46ffdc49e43f0a2267380896c04ea", - "sha256:31bad130b701587ab7e74c3c304bb3d63d9f0d365e3f81880203e8e476d914b1", - "sha256:3895014b1a653726a9da5aca852d9e6d0e2c2667bf315d6a2cd632bf7463130b", - "sha256:3d38967ef9c1c0ffabe80827f56817609153e2da83e3dce84476d0928c72972c", - "sha256:5478efd92291084adc9b679666aeaeaafca69d6bf3e95fe3efce82814e3ab782", - "sha256:5c2a6121e4e1e65690b60c270012218e38201bcf700314b1926d5dbeae78a499", - "sha256:5f66f7f76fc870500fe6db0c02d5fc4187062d29e582431f5a986881c5aef4e3", - "sha256:6572d74990b16480608441b941c1cefd60bf742416bc3668cf311980f740768d", - "sha256:6990b9965f31762ac71340869c064f39fb6776beca396d0558d3b5b1ebb7f027", - "sha256:87c82803f9c51c275b16c729aade952ca93c74a8aec963b9b8871df9bbb3120a", - "sha256:8fd32974024052b2260d08b94f970c4c1d92c327ed3570a2b4708070fa53a879", - "sha256:9a81bba33c7fbdb76e6fe8d15b6e793a1916afd4d2463f07d762c69efaaea466", - "sha256:9c31cb9bfaa1004a2a50115a37e1fcb79d664917968399dae3e04610356afe8c", - "sha256:a0b28235c28a088e052f90a0b5fefaa503e5378046a29d0af045e2ec9d5d6555", - "sha256:a3f5022d818b6c91a8bbc466211e6fd708f234909cbb10bc4dbccb2a04884ef6", - "sha256:a7252ca498f510404185e3c1bdda3224e80b1be1a5fbc2b174aab83a477ea0cb", - "sha256:aa8d7136cad8b2a7bf3596e1bc053476edeee567271f197449b2d30ea0c37175", - "sha256:b50a8de6f2820286129fe7d71d76c9e0c0f53a8c83cf39bbe6375b827994e4f1", - "sha256:b528a9c152c569062375d5c2260b59f8243bb4136fc38420854ac1bd4aa0d02f", - "sha256:b72db7201a4aa0445f27af9954d48ed7d2c119ce3b8b253e4dcd514fc72e5dc6", - "sha256:d3444e10ccb5b16e4c1bed3cb3c565ec676b20a21eb43430e70ec4168c631dcc", - "sha256:e16d6f06f4d2161347e51c4bc1f7a8feedeee444d26efa92243f18441a6fa742", - "sha256:f5774bef92d33a62a584f6e7552a9a8653241ecc036e259bfb03d33091599537" + "sha256:1327655db47beb665961d3dc0365e20c9e8e80c234513ab2c7d06ec0dd9d63eb", + "sha256:142400f13102403f43576bb92d808a668e29deda5625388cfa39fe0bcf37b3d1", + "sha256:1b4204715141281a631337378f0c15fe660b35e1b6888ca05f1f3f49df3b97d5", + "sha256:23aabaaf8887e6db99df2145de6742f8c92830134735778bf2ae26338f2b406f", + "sha256:2a724c6f21fdf4e3c1e8c5c862ff87f5420fdaecf53a5a0417915e483d90217f", + "sha256:2c9c8c1c6e8bd3587e5f5db6f865a42195ff2dedcaf5cdb63fdea10c98bd6246", + "sha256:3a1be38b774423605189d60652b3d8a324fc81d213f96569720c8093784245ab", + "sha256:46be5297a76513e4d5d6e746737d4866a762cfe457e57d7c54baa7ef8fea7e9a", + "sha256:48dc2ea4c4d3f34ddcad5bc71b1f1cf49830f868832d3e5df803c811e7395b6e", + "sha256:53f33e04d2ed078ac02841741bcd536b546e1f416608084468ab30a87638a466", + "sha256:57b10588618ca19a4cc870f381aa8805bc5fe0c62d19d7f940232ff8a373887c", + "sha256:6001038341b52301450bb9c62e5d5da825788944572679277e137ffb3596e718", + "sha256:70bef52e735607060f327d729be35c820d9018d260a875e4f98b20ba8c4fff96", + "sha256:7d0f76b251699be8f1f1064dcb12d4b3b2b676ce15ff30c104e0c2091a015142", + "sha256:9440b64c1569c26a184b7c778bb221cf9987c5c8486d32cda02302c66ea78980", + "sha256:956cc97eac6f9d3b16e3b2d2a94c5586af3403ba97945e9d88a4a0f029899646", + "sha256:ae430ad8cce937e07ea566d1d7899eef1fedc8ec512b4d5fa37ebf2c1f879936", + "sha256:bdb575149881978d62167dd8427402a5872a79bd83e9d51219680670e9f80b40", + "sha256:c0ffcddd3dbdf22aae3980931112cc8b2732315a6273988f3205cf5dacf36f45", + "sha256:c133e2efc57426974366ac74f2ef0f1171b860301ac27f72316deacff4ccdc17", + "sha256:c6e9521d0b77eb1da89e8264eb98c8f5cda7c49a49b8128acfd35f0ca50e56d0", + "sha256:c7cac0220ecb733024e8acfcfb6b593a007185690f2ea470d2392b72510b7187", + "sha256:d53483820ac28f2be2ff13eedd56c0f36a4c583727b551d3d468023556e2336a", + "sha256:d60210784186d61e0ec808d5dbee5d661c7457a57f93cb5fdc456394607ce98c", + "sha256:d687fb1cd9df28c1515666174c62e54bd894a6a6d0862f89705063cd47739f83", + "sha256:d926764d9c768a48b0a16a91696aaa25498057e060934f968fa4c5629b942d85", + "sha256:d94a2f4ad74732f58d1c771fc5d90a62c4fe4c98d0adfecbc76cd0d8d14bf044", + "sha256:def76a546eeec059666f5f4117dfdf9c78e50fa1f95bdd23b04618c7adf845cd" ], "index": "pypi", - "version": "==0.29.1" + "version": "==0.29.3" }, "e1839a8": { "editable": true, @@ -101,10 +101,10 @@ }, "idna": { "hashes": [ - "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e", - "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16" + "sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407", + "sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c" ], - "version": "==2.7" + "version": "==2.8" }, "kivy": { "git": "git://github.com/matham/kivy.git", @@ -112,16 +112,26 @@ }, "msgpack": { "hashes": [ - "sha256:102802a9433dcf36f939b632cce9dea87310b2f163bb37ffc8bc343677726e88", - "sha256:64abc6bf3a2ac301702f5760f4e6e227d0fd4d84d9014ef9a40faa9d43365259", - "sha256:72259661a83f8b08ef6ee83927ce4937f841226735824af5b10a536d886eeb36", - "sha256:85f1342b9d7549dd3daf494100d47a3dc7daae703cdbfc2c9ee7bbdc8a492cba", - "sha256:8ce9f88b6cb75d74eda2a5522e5c2e5ec0f17fd78605d6502abb61f46b306865", - "sha256:9936ce3a530ca78db60b6631003b5f4ba383cfb1d9830a27d1b5c61857226e2f", - "sha256:cb4e228f3d93779a1d77a1e9d72759b79dfa2975c1a5bd2a090eaa98239fa4b1" + "sha256:26cb40116111c232bc235ce131cc3b4e76549088cb154e66a2eb8ff6fcc907ec", + "sha256:300fd3f2c664a3bf473d6a952f843b4a71454f4c592ed7e74a36b205c1782d28", + "sha256:3129c355342853007de4a2a86e75eab966119733eb15748819b6554363d4e85c", + "sha256:31f6d645ee5a97d59d3263fab9e6be76f69fa131cddc0d94091a3c8aca30d67a", + "sha256:3ce7ef7ee2546c3903ca8c934d09250531b80c6127e6478781ae31ed835aac4c", + "sha256:4008c72f5ef2b7936447dcb83db41d97e9791c83221be13d5e19db0796df1972", + "sha256:62bd8e43d204580308d477a157b78d3fee2fb4c15d32578108dc5d89866036c8", + "sha256:70cebfe08fb32f83051971264466eadf183101e335d8107b80002e632f425511", + "sha256:72cb7cf85e9df5251abd7b61a1af1fb77add15f40fa7328e924a9c0b6bc7a533", + "sha256:7c55649965c35eb32c499d17dadfb8f53358b961582846e1bc06f66b9bccc556", + "sha256:86b963a5de11336ec26bc4f839327673c9796b398b9f1fe6bb6150c2a5d00f0f", + "sha256:8c73c9bcdfb526247c5e4f4f6cf581b9bb86b388df82cfcaffde0a6e7bf3b43a", + "sha256:8e68c76c6aff4849089962d25346d6784d38e02baa23ffa513cf46be72e3a540", + "sha256:97ac6b867a8f63debc64f44efdc695109d541ecc361ee2dce2c8884ab37360a1", + "sha256:9d4f546af72aa001241d74a79caec278bcc007b4bcde4099994732e98012c858", + "sha256:a28e69fe5468c9f5251c7e4e7232286d71b7dfadc74f312006ebe984433e9746", + "sha256:fd509d4aa95404ce8d86b4e32ce66d5d706fd6646c205e1c2a715d87078683a2" ], "index": "pypi", - "version": "==0.6.0" + "version": "==0.6.1" }, "multio": { "hashes": [ @@ -131,36 +141,31 @@ }, "numpy": { "hashes": [ - "sha256:0df89ca13c25eaa1621a3f09af4c8ba20da849692dcae184cb55e80952c453fb", - "sha256:154c35f195fd3e1fad2569930ca51907057ae35e03938f89a8aedae91dd1b7c7", - "sha256:18e84323cdb8de3325e741a7a8dd4a82db74fde363dce32b625324c7b32aa6d7", - "sha256:1e8956c37fc138d65ded2d96ab3949bd49038cc6e8a4494b1515b0ba88c91565", - "sha256:23557bdbca3ccbde3abaa12a6e82299bc92d2b9139011f8c16ca1bb8c75d1e95", - "sha256:24fd645a5e5d224aa6e39d93e4a722fafa9160154f296fd5ef9580191c755053", - "sha256:36e36b6868e4440760d4b9b44587ea1dc1f06532858d10abba98e851e154ca70", - "sha256:3d734559db35aa3697dadcea492a423118c5c55d176da2f3be9c98d4803fc2a7", - "sha256:416a2070acf3a2b5d586f9a6507bb97e33574df5bd7508ea970bbf4fc563fa52", - "sha256:4a22dc3f5221a644dfe4a63bf990052cc674ef12a157b1056969079985c92816", - "sha256:4d8d3e5aa6087490912c14a3c10fbdd380b40b421c13920ff468163bc50e016f", - "sha256:4f41fd159fba1245e1958a99d349df49c616b133636e0cf668f169bce2aeac2d", - "sha256:561ef098c50f91fbac2cc9305b68c915e9eb915a74d9038ecf8af274d748f76f", - "sha256:56994e14b386b5c0a9b875a76d22d707b315fa037affc7819cda08b6d0489756", - "sha256:73a1f2a529604c50c262179fcca59c87a05ff4614fe8a15c186934d84d09d9a5", - "sha256:7da99445fd890206bfcc7419f79871ba8e73d9d9e6b82fe09980bc5bb4efc35f", - "sha256:99d59e0bcadac4aa3280616591fb7bcd560e2218f5e31d5223a2e12a1425d495", - "sha256:a4cc09489843c70b22e8373ca3dfa52b3fab778b57cf81462f1203b0852e95e3", - "sha256:a61dc29cfca9831a03442a21d4b5fd77e3067beca4b5f81f1a89a04a71cf93fa", - "sha256:b1853df739b32fa913cc59ad9137caa9cc3d97ff871e2bbd89c2a2a1d4a69451", - "sha256:b1f44c335532c0581b77491b7715a871d0dd72e97487ac0f57337ccf3ab3469b", - "sha256:b261e0cb0d6faa8fd6863af26d30351fd2ffdb15b82e51e81e96b9e9e2e7ba16", - "sha256:c857ae5dba375ea26a6228f98c195fec0898a0fd91bcf0e8a0cae6d9faf3eca7", - "sha256:cf5bb4a7d53a71bb6a0144d31df784a973b36d8687d615ef6a7e9b1809917a9b", - "sha256:db9814ff0457b46f2e1d494c1efa4111ca089e08c8b983635ebffb9c1573361f", - "sha256:df04f4bad8a359daa2ff74f8108ea051670cafbca533bb2636c58b16e962989e", - "sha256:ecf81720934a0e18526177e645cbd6a8a21bb0ddc887ff9738de07a1df5c6b61", - "sha256:edfa6fba9157e0e3be0f40168eb142511012683ac3dc82420bee4a3f3981b30e" + "sha256:00a458d6821b1e87be873f2126d5646b901047a7480e8ae9773ecf214f0e19f3", + "sha256:0470c5dc32212a08ebc2405f32e8ceb9a5b1c8ac61a2daf9835ec0856a220495", + "sha256:24a9c287a4a1c427c2d45bf7c4fc6180c52a08fa0990d4c94e4c86a9b1e23ba5", + "sha256:25600e8901012180a1b7cd1ac3e27e7793586ecd432383191929ac2edf37ff5d", + "sha256:2d279bd99329e72c30937bdef82b6dc7779c7607c5a379bab1bf76be1f4c1422", + "sha256:32af2bcf4bb7631dac19736a6e092ec9715e770dcaa1f85fcd99dec5040b2a4d", + "sha256:3e90a9fce378114b6c2fc01fff7423300515c7b54b7cc71b02a22bc0bd7dfdd8", + "sha256:5774d49516c37fd3fc1f232e033d2b152f3323ca4c7bfefd7277e4c67f3c08b4", + "sha256:64ff21aac30d40c20ba994c94a08d439b8ced3b9c704af897e9e4ba09d10e62c", + "sha256:803b2af862dcad6c11231ea3cd1015d1293efd6c87088be33d713a9b23e9e419", + "sha256:95c830b09626508f7808ce7f1344fb98068e63143e6050e5dc3063142fc60007", + "sha256:96e49a0c82b4e3130093002f625545104037c2d25866fa2e0c90d6e54f5a1fbc", + "sha256:a1dd8221f0e69038748f47b8bb3248d0b9ecdf13fe837440951c3d5ff72639bb", + "sha256:a80ecac5664f420556a725a5646f2d1c60a7c0489d68a38b5056393e949e27ac", + "sha256:b19a47ff1bd2fca0cacdfa830c967746764c32dca6a0c0328d9c893f4bfe2f6b", + "sha256:be43df2c563e264b38e3318574d80fc8f365df3fb745270934d2dbe54e006f41", + "sha256:c40cb17188f6ae3c5b6efc6f0fd43a7ddd219b7807fe179e71027849a9b91afc", + "sha256:c6251e0f0ecac53ba2b99d9f0cc16fa9021914a78869c38213c436ba343641f0", + "sha256:cb189bd98b2e7ac02df389b6212846ab20661f4bafe16b5a70a6f1728c1cc7cb", + "sha256:ef4ae41add536cb825d8aa029c15ef510aead06ea5b68daea64f0b9ecbff17db", + "sha256:f00a2c21f60284e024bba351875f3501c6d5817d64997a0afe4f4355161a8889", + "sha256:f1232f98a6bbd6d1678249f94028bccc541bbc306aa5c4e1471a881b0e5a3409", + "sha256:fea682f6ddc09517df0e6d5caad9613c6d91a42232aeb082df67e4d205de19cc" ], - "version": "==1.15.4" + "version": "==1.16.0" }, "outcome": { "hashes": [ @@ -171,28 +176,28 @@ }, "pandas": { "hashes": [ - "sha256:11975fad9edbdb55f1a560d96f91830e83e29bed6ad5ebf506abda09818eaf60", - "sha256:12e13d127ca1b585dd6f6840d3fe3fa6e46c36a6afe2dbc5cb0b57032c902e31", - "sha256:1c87fcb201e1e06f66e23a61a5fea9eeebfe7204a66d99df24600e3f05168051", - "sha256:242e9900de758e137304ad4b5663c2eff0d798c2c3b891250bd0bd97144579da", - "sha256:26c903d0ae1542890cb9abadb4adcb18f356b14c2df46e4ff657ae640e3ac9e7", - "sha256:2e1e88f9d3e5f107b65b59cd29f141995597b035d17cc5537e58142038942e1a", - "sha256:31b7a48b344c14691a8e92765d4023f88902ba3e96e2e4d0364d3453cdfd50db", - "sha256:4fd07a932b4352f8a8973761ab4e84f965bf81cc750fb38e04f01088ab901cb8", - "sha256:5b24ca47acf69222e82530e89111dd9d14f9b970ab2cd3a1c2c78f0c4fbba4f4", - "sha256:647b3b916cc8f6aeba240c8171be3ab799c3c1b2ea179a3be0bd2712c4237553", - "sha256:66b060946046ca27c0e03e9bec9bba3e0b918bafff84c425ca2cc2e157ce121e", - "sha256:6efa9fa6e1434141df8872d0fa4226fc301b17aacf37429193f9d70b426ea28f", - "sha256:be4715c9d8367e51dbe6bc6d05e205b1ae234f0dc5465931014aa1c4af44c1ba", - "sha256:bea90da782d8e945fccfc958585210d23de374fa9294a9481ed2abcef637ebfc", - "sha256:d318d77ab96f66a59e792a481e2701fba879e1a453aefeebdb17444fe204d1ed", - "sha256:d785fc08d6f4207437e900ffead930a61e634c5e4f980ba6d3dc03c9581748c7", - "sha256:de9559287c4fe8da56e8c3878d2374abc19d1ba2b807bfa7553e912a8e5ba87c", - "sha256:f4f98b190bb918ac0bc0e3dd2ab74ff3573da9f43106f6dba6385406912ec00f", - "sha256:f71f1a7e2d03758f6e957896ed696254e2bc83110ddbc6942018f1a232dd9dad", - "sha256:fb944c8f0b0ab5c1f7846c686bc4cdf8cde7224655c12edcd59d5212cd57bec0" + "sha256:02d34a55e85819a7eab096f391f8dcc237876e8b3cdaf1fba964f5fb59af9acf", + "sha256:0dbcf78e68f619840184ce661c68c1760de403b0f69d81905d6b9a699d1861d6", + "sha256:174c3974da26fd778ac8537d74efb17d4cef59e6b3e81e3c59690f39a6f6b73d", + "sha256:3a8ab5c350131ba273d3f8eb430343304d6c2138a61d34e4a11ebd75f8bf3e7e", + "sha256:560074ce9ff95409b233c0a8d143a2546a2d71d636d583172252dc0021fdb11b", + "sha256:5bded8cb431705609dbd9048114f1d6d59bef2f1ca95a8c58bd649442c9dc16c", + "sha256:8a8748684787792f3a643a7e0530c3024301f3e5799a199a5c2c526c07f712ba", + "sha256:8c7e43c4b7920fc02ce7743b976aca15bd45293ed298d84793307bc9799df3f6", + "sha256:9bd9ef3e183b7b1ce90b7ab5e8672907cd73dc36f036fc6714f0e7a5f9852da0", + "sha256:d3f27e276c8557c15c19c5c9a414e77b893d39fce6e6e40e5c46fcf5eeffe028", + "sha256:d40b82a4aee4ca968348e41bf6588ed9cadd171c7da8b671ed31d3fd967de703", + "sha256:d8cf054a099ff694a0e75386471bdde098efe7c350548ec6b899f169bef1a859", + "sha256:dd9f4843aa59f09698679b64064f11f51d60e45358ab45299de4dcff90524be3", + "sha256:e6f9f5ad4e73f5eecaa66e9c9d30ff8661c400190a6079ee170e37a466457e31", + "sha256:e9989e17f203900b2c7add53fa17d6686e66282598359b43fb12260ae8bf7eba", + "sha256:eadc9d19b25420e1ae77f0a11b779d4e71f47c3aa1953c218e8fe812d1f5341e", + "sha256:ecb630a99b0ab6c178b5c2988ca8c5b98f6ec2fd9e172c2873a5df44b261310f", + "sha256:f8eb9308bd64abf71dda77b823913696cd85c4f36c026acee0a64d8834a09b43", + "sha256:fe71a037ce866d9fb717fd3a792d46c744433179bf3f25da48af8f46cee20c3e", + "sha256:ff0d83306bfda4639fac2a4f8df2c51eb2bbdda540a74490703e8a6b413a37eb" ], - "version": "==0.23.4" + "version": "==0.24.0" }, "pdbpp": { "hashes": [ @@ -203,10 +208,10 @@ }, "pygments": { "hashes": [ - "sha256:6301ecb0997a52d2d31385e62d0a4a4cf18d2f2da7054a5ddad5c366cd39cee7", - "sha256:82666aac15622bd7bb685a4ee7f6625dd716da3ef7473620c192c0168aae64fc" + "sha256:5ffada19f6203563680669ee7f53b64dabbeb100eb51b61996085e99c03b284a", + "sha256:e8218dd399a61674745138520d0d4cf2621d7e032439341bc3f647bff125818d" ], - "version": "==2.3.0" + "version": "==2.3.1" }, "python-dateutil": { "hashes": [ @@ -217,17 +222,17 @@ }, "pytz": { "hashes": [ - "sha256:31cb35c89bd7d333cd32c5f278fca91b523b0834369e757f4c5641ea252236ca", - "sha256:8e0f8568c118d3077b46be7d654cc8167fa916092e28320cde048e54bfc9f1e6" + "sha256:32b0891edff07e28efe91284ed9c31e123d84bea3fd98e1f72be2508f43ef8d9", + "sha256:d5f05e487007e29e03409f9398d074e158d920d36eb82eaf66fb1136b0c5374c" ], - "version": "==2018.7" + "version": "==2018.9" }, "six": { "hashes": [ - "sha256:70e8a77beed4562e7f14fe23a786b54f6296e34344c23bc42f07b15018ff98e9", - "sha256:832dc0e10feb1aa2c68dcc57dbb658f1c7e65b9b61af69048abc87a2db00a0eb" + "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", + "sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73" ], - "version": "==1.11.0" + "version": "==1.12.0" }, "sniffio": { "hashes": [ @@ -245,15 +250,14 @@ }, "tractor": { "git": "git://github.com/tgoodlet/tractor.git", - "ref": "c0cdb3945a9a9538b65bd76038f263e859fbbfe7" + "ref": "977eaedb0bd4b235a5ac07da318f4c1d3be3749a" }, "trio": { "hashes": [ - "sha256:65cf596eccad597f46fce1d53220e5aca9a143e52cc99e11f33e429b0c4de33f", - "sha256:6d905d950dfa1db3fad6b5ef5637c221947123fd2b0e112033fecfc582318c3b" + "sha256:d323cc15f6406d15954af91e5e34af2001cc24163fdde29e3f88a227a1b53ab0" ], "index": "pypi", - "version": "==0.9.0" + "version": "==0.10.0" }, "wmctrl": { "hashes": [ @@ -299,44 +303,44 @@ }, "colorlog": { "hashes": [ - "sha256:418db638c9577f37f0fae4914074f395847a728158a011be2a193ac491b9779d", - "sha256:8b234ebae1ba1237bc79c0d5f1f47b31a3f3e90c0b4c2b0ebdde63a174d3b97b" + "sha256:3cf31b25cbc8f86ec01fef582ef3b840950dea414084ed19ab922c8b493f9b42", + "sha256:450f52ea2a2b6ebb308f034ea9a9b15cea51e65650593dca1da3eb792e4e4981" ], - "version": "==3.1.4" + "version": "==4.0.2" }, "cython": { "hashes": [ - "sha256:0202f753b0a69dd87095b698df00010daf452ab61279747248a042a24892a2a9", - "sha256:0fbe9514ffe35aad337db27b11f7ee1bf27d01059b2e27f112315b185d69de79", - "sha256:18ab7646985a97e02cee72e1ddba2e732d4931d4e1732494ff30c5aa084bfb97", - "sha256:18bb95daa41fd2ff0102844172bc068150bf031186249fc70c6f57fc75c9c0a9", - "sha256:222c65c7022ff52faf3ac6c706e4e8a726ddaa29dabf2173b2a0fdfc1a2f1586", - "sha256:2387c5a2a436669de9157d117fd426dfc2b46ffdc49e43f0a2267380896c04ea", - "sha256:31bad130b701587ab7e74c3c304bb3d63d9f0d365e3f81880203e8e476d914b1", - "sha256:3895014b1a653726a9da5aca852d9e6d0e2c2667bf315d6a2cd632bf7463130b", - "sha256:3d38967ef9c1c0ffabe80827f56817609153e2da83e3dce84476d0928c72972c", - "sha256:5478efd92291084adc9b679666aeaeaafca69d6bf3e95fe3efce82814e3ab782", - "sha256:5c2a6121e4e1e65690b60c270012218e38201bcf700314b1926d5dbeae78a499", - "sha256:5f66f7f76fc870500fe6db0c02d5fc4187062d29e582431f5a986881c5aef4e3", - "sha256:6572d74990b16480608441b941c1cefd60bf742416bc3668cf311980f740768d", - "sha256:6990b9965f31762ac71340869c064f39fb6776beca396d0558d3b5b1ebb7f027", - "sha256:87c82803f9c51c275b16c729aade952ca93c74a8aec963b9b8871df9bbb3120a", - "sha256:8fd32974024052b2260d08b94f970c4c1d92c327ed3570a2b4708070fa53a879", - "sha256:9a81bba33c7fbdb76e6fe8d15b6e793a1916afd4d2463f07d762c69efaaea466", - "sha256:9c31cb9bfaa1004a2a50115a37e1fcb79d664917968399dae3e04610356afe8c", - "sha256:a0b28235c28a088e052f90a0b5fefaa503e5378046a29d0af045e2ec9d5d6555", - "sha256:a3f5022d818b6c91a8bbc466211e6fd708f234909cbb10bc4dbccb2a04884ef6", - "sha256:a7252ca498f510404185e3c1bdda3224e80b1be1a5fbc2b174aab83a477ea0cb", - "sha256:aa8d7136cad8b2a7bf3596e1bc053476edeee567271f197449b2d30ea0c37175", - "sha256:b50a8de6f2820286129fe7d71d76c9e0c0f53a8c83cf39bbe6375b827994e4f1", - "sha256:b528a9c152c569062375d5c2260b59f8243bb4136fc38420854ac1bd4aa0d02f", - "sha256:b72db7201a4aa0445f27af9954d48ed7d2c119ce3b8b253e4dcd514fc72e5dc6", - "sha256:d3444e10ccb5b16e4c1bed3cb3c565ec676b20a21eb43430e70ec4168c631dcc", - "sha256:e16d6f06f4d2161347e51c4bc1f7a8feedeee444d26efa92243f18441a6fa742", - "sha256:f5774bef92d33a62a584f6e7552a9a8653241ecc036e259bfb03d33091599537" + "sha256:1327655db47beb665961d3dc0365e20c9e8e80c234513ab2c7d06ec0dd9d63eb", + "sha256:142400f13102403f43576bb92d808a668e29deda5625388cfa39fe0bcf37b3d1", + "sha256:1b4204715141281a631337378f0c15fe660b35e1b6888ca05f1f3f49df3b97d5", + "sha256:23aabaaf8887e6db99df2145de6742f8c92830134735778bf2ae26338f2b406f", + "sha256:2a724c6f21fdf4e3c1e8c5c862ff87f5420fdaecf53a5a0417915e483d90217f", + "sha256:2c9c8c1c6e8bd3587e5f5db6f865a42195ff2dedcaf5cdb63fdea10c98bd6246", + "sha256:3a1be38b774423605189d60652b3d8a324fc81d213f96569720c8093784245ab", + "sha256:46be5297a76513e4d5d6e746737d4866a762cfe457e57d7c54baa7ef8fea7e9a", + "sha256:48dc2ea4c4d3f34ddcad5bc71b1f1cf49830f868832d3e5df803c811e7395b6e", + "sha256:53f33e04d2ed078ac02841741bcd536b546e1f416608084468ab30a87638a466", + "sha256:57b10588618ca19a4cc870f381aa8805bc5fe0c62d19d7f940232ff8a373887c", + "sha256:6001038341b52301450bb9c62e5d5da825788944572679277e137ffb3596e718", + "sha256:70bef52e735607060f327d729be35c820d9018d260a875e4f98b20ba8c4fff96", + "sha256:7d0f76b251699be8f1f1064dcb12d4b3b2b676ce15ff30c104e0c2091a015142", + "sha256:9440b64c1569c26a184b7c778bb221cf9987c5c8486d32cda02302c66ea78980", + "sha256:956cc97eac6f9d3b16e3b2d2a94c5586af3403ba97945e9d88a4a0f029899646", + "sha256:ae430ad8cce937e07ea566d1d7899eef1fedc8ec512b4d5fa37ebf2c1f879936", + "sha256:bdb575149881978d62167dd8427402a5872a79bd83e9d51219680670e9f80b40", + "sha256:c0ffcddd3dbdf22aae3980931112cc8b2732315a6273988f3205cf5dacf36f45", + "sha256:c133e2efc57426974366ac74f2ef0f1171b860301ac27f72316deacff4ccdc17", + "sha256:c6e9521d0b77eb1da89e8264eb98c8f5cda7c49a49b8128acfd35f0ca50e56d0", + "sha256:c7cac0220ecb733024e8acfcfb6b593a007185690f2ea470d2392b72510b7187", + "sha256:d53483820ac28f2be2ff13eedd56c0f36a4c583727b551d3d468023556e2336a", + "sha256:d60210784186d61e0ec808d5dbee5d661c7457a57f93cb5fdc456394607ce98c", + "sha256:d687fb1cd9df28c1515666174c62e54bd894a6a6d0862f89705063cd47739f83", + "sha256:d926764d9c768a48b0a16a91696aaa25498057e060934f968fa4c5629b942d85", + "sha256:d94a2f4ad74732f58d1c771fc5d90a62c4fe4c98d0adfecbc76cd0d8d14bf044", + "sha256:def76a546eeec059666f5f4117dfdf9c78e50fa1f95bdd23b04618c7adf845cd" ], "index": "pypi", - "version": "==0.29.1" + "version": "==0.29.3" }, "fancycompleter": { "hashes": [ @@ -353,31 +357,41 @@ }, "idna": { "hashes": [ - "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e", - "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16" + "sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407", + "sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c" ], - "version": "==2.7" + "version": "==2.8" }, "more-itertools": { "hashes": [ - "sha256:c187a73da93e7a8acc0001572aebc7e3c69daf7bf6881a2cea10650bd4420092", - "sha256:c476b5d3a34e12d40130bc2f935028b5f636df8f372dc2c1c01dc19681b2039e", - "sha256:fcbfeaea0be121980e15bc97b3817b5202ca73d0eae185b4550cbfce2a3ebb3d" + "sha256:38a936c0a6d98a38bcc2d03fdaaedaba9f412879461dd2ceff8d37564d6522e4", + "sha256:c0a5785b1109a6bd7fac76d6837fd1feca158e54e521ccd2ae8bfe393cc9d4fc", + "sha256:fe7a7cae1ccb57d33952113ff4fa1bc5f879963600ed74918f1236e212ee50b9" ], - "version": "==4.3.0" + "version": "==5.0.0" }, "msgpack": { "hashes": [ - "sha256:102802a9433dcf36f939b632cce9dea87310b2f163bb37ffc8bc343677726e88", - "sha256:64abc6bf3a2ac301702f5760f4e6e227d0fd4d84d9014ef9a40faa9d43365259", - "sha256:72259661a83f8b08ef6ee83927ce4937f841226735824af5b10a536d886eeb36", - "sha256:85f1342b9d7549dd3daf494100d47a3dc7daae703cdbfc2c9ee7bbdc8a492cba", - "sha256:8ce9f88b6cb75d74eda2a5522e5c2e5ec0f17fd78605d6502abb61f46b306865", - "sha256:9936ce3a530ca78db60b6631003b5f4ba383cfb1d9830a27d1b5c61857226e2f", - "sha256:cb4e228f3d93779a1d77a1e9d72759b79dfa2975c1a5bd2a090eaa98239fa4b1" + "sha256:26cb40116111c232bc235ce131cc3b4e76549088cb154e66a2eb8ff6fcc907ec", + "sha256:300fd3f2c664a3bf473d6a952f843b4a71454f4c592ed7e74a36b205c1782d28", + "sha256:3129c355342853007de4a2a86e75eab966119733eb15748819b6554363d4e85c", + "sha256:31f6d645ee5a97d59d3263fab9e6be76f69fa131cddc0d94091a3c8aca30d67a", + "sha256:3ce7ef7ee2546c3903ca8c934d09250531b80c6127e6478781ae31ed835aac4c", + "sha256:4008c72f5ef2b7936447dcb83db41d97e9791c83221be13d5e19db0796df1972", + "sha256:62bd8e43d204580308d477a157b78d3fee2fb4c15d32578108dc5d89866036c8", + "sha256:70cebfe08fb32f83051971264466eadf183101e335d8107b80002e632f425511", + "sha256:72cb7cf85e9df5251abd7b61a1af1fb77add15f40fa7328e924a9c0b6bc7a533", + "sha256:7c55649965c35eb32c499d17dadfb8f53358b961582846e1bc06f66b9bccc556", + "sha256:86b963a5de11336ec26bc4f839327673c9796b398b9f1fe6bb6150c2a5d00f0f", + "sha256:8c73c9bcdfb526247c5e4f4f6cf581b9bb86b388df82cfcaffde0a6e7bf3b43a", + "sha256:8e68c76c6aff4849089962d25346d6784d38e02baa23ffa513cf46be72e3a540", + "sha256:97ac6b867a8f63debc64f44efdc695109d541ecc361ee2dce2c8884ab37360a1", + "sha256:9d4f546af72aa001241d74a79caec278bcc007b4bcde4099994732e98012c858", + "sha256:a28e69fe5468c9f5251c7e4e7232286d71b7dfadc74f312006ebe984433e9746", + "sha256:fd509d4aa95404ce8d86b4e32ce66d5d706fd6646c205e1c2a715d87078683a2" ], "index": "pypi", - "version": "==0.6.0" + "version": "==0.6.1" }, "multio": { "hashes": [ @@ -387,36 +401,31 @@ }, "numpy": { "hashes": [ - "sha256:0df89ca13c25eaa1621a3f09af4c8ba20da849692dcae184cb55e80952c453fb", - "sha256:154c35f195fd3e1fad2569930ca51907057ae35e03938f89a8aedae91dd1b7c7", - "sha256:18e84323cdb8de3325e741a7a8dd4a82db74fde363dce32b625324c7b32aa6d7", - "sha256:1e8956c37fc138d65ded2d96ab3949bd49038cc6e8a4494b1515b0ba88c91565", - "sha256:23557bdbca3ccbde3abaa12a6e82299bc92d2b9139011f8c16ca1bb8c75d1e95", - "sha256:24fd645a5e5d224aa6e39d93e4a722fafa9160154f296fd5ef9580191c755053", - "sha256:36e36b6868e4440760d4b9b44587ea1dc1f06532858d10abba98e851e154ca70", - "sha256:3d734559db35aa3697dadcea492a423118c5c55d176da2f3be9c98d4803fc2a7", - "sha256:416a2070acf3a2b5d586f9a6507bb97e33574df5bd7508ea970bbf4fc563fa52", - "sha256:4a22dc3f5221a644dfe4a63bf990052cc674ef12a157b1056969079985c92816", - "sha256:4d8d3e5aa6087490912c14a3c10fbdd380b40b421c13920ff468163bc50e016f", - "sha256:4f41fd159fba1245e1958a99d349df49c616b133636e0cf668f169bce2aeac2d", - "sha256:561ef098c50f91fbac2cc9305b68c915e9eb915a74d9038ecf8af274d748f76f", - "sha256:56994e14b386b5c0a9b875a76d22d707b315fa037affc7819cda08b6d0489756", - "sha256:73a1f2a529604c50c262179fcca59c87a05ff4614fe8a15c186934d84d09d9a5", - "sha256:7da99445fd890206bfcc7419f79871ba8e73d9d9e6b82fe09980bc5bb4efc35f", - "sha256:99d59e0bcadac4aa3280616591fb7bcd560e2218f5e31d5223a2e12a1425d495", - "sha256:a4cc09489843c70b22e8373ca3dfa52b3fab778b57cf81462f1203b0852e95e3", - "sha256:a61dc29cfca9831a03442a21d4b5fd77e3067beca4b5f81f1a89a04a71cf93fa", - "sha256:b1853df739b32fa913cc59ad9137caa9cc3d97ff871e2bbd89c2a2a1d4a69451", - "sha256:b1f44c335532c0581b77491b7715a871d0dd72e97487ac0f57337ccf3ab3469b", - "sha256:b261e0cb0d6faa8fd6863af26d30351fd2ffdb15b82e51e81e96b9e9e2e7ba16", - "sha256:c857ae5dba375ea26a6228f98c195fec0898a0fd91bcf0e8a0cae6d9faf3eca7", - "sha256:cf5bb4a7d53a71bb6a0144d31df784a973b36d8687d615ef6a7e9b1809917a9b", - "sha256:db9814ff0457b46f2e1d494c1efa4111ca089e08c8b983635ebffb9c1573361f", - "sha256:df04f4bad8a359daa2ff74f8108ea051670cafbca533bb2636c58b16e962989e", - "sha256:ecf81720934a0e18526177e645cbd6a8a21bb0ddc887ff9738de07a1df5c6b61", - "sha256:edfa6fba9157e0e3be0f40168eb142511012683ac3dc82420bee4a3f3981b30e" + "sha256:00a458d6821b1e87be873f2126d5646b901047a7480e8ae9773ecf214f0e19f3", + "sha256:0470c5dc32212a08ebc2405f32e8ceb9a5b1c8ac61a2daf9835ec0856a220495", + "sha256:24a9c287a4a1c427c2d45bf7c4fc6180c52a08fa0990d4c94e4c86a9b1e23ba5", + "sha256:25600e8901012180a1b7cd1ac3e27e7793586ecd432383191929ac2edf37ff5d", + "sha256:2d279bd99329e72c30937bdef82b6dc7779c7607c5a379bab1bf76be1f4c1422", + "sha256:32af2bcf4bb7631dac19736a6e092ec9715e770dcaa1f85fcd99dec5040b2a4d", + "sha256:3e90a9fce378114b6c2fc01fff7423300515c7b54b7cc71b02a22bc0bd7dfdd8", + "sha256:5774d49516c37fd3fc1f232e033d2b152f3323ca4c7bfefd7277e4c67f3c08b4", + "sha256:64ff21aac30d40c20ba994c94a08d439b8ced3b9c704af897e9e4ba09d10e62c", + "sha256:803b2af862dcad6c11231ea3cd1015d1293efd6c87088be33d713a9b23e9e419", + "sha256:95c830b09626508f7808ce7f1344fb98068e63143e6050e5dc3063142fc60007", + "sha256:96e49a0c82b4e3130093002f625545104037c2d25866fa2e0c90d6e54f5a1fbc", + "sha256:a1dd8221f0e69038748f47b8bb3248d0b9ecdf13fe837440951c3d5ff72639bb", + "sha256:a80ecac5664f420556a725a5646f2d1c60a7c0489d68a38b5056393e949e27ac", + "sha256:b19a47ff1bd2fca0cacdfa830c967746764c32dca6a0c0328d9c893f4bfe2f6b", + "sha256:be43df2c563e264b38e3318574d80fc8f365df3fb745270934d2dbe54e006f41", + "sha256:c40cb17188f6ae3c5b6efc6f0fd43a7ddd219b7807fe179e71027849a9b91afc", + "sha256:c6251e0f0ecac53ba2b99d9f0cc16fa9021914a78869c38213c436ba343641f0", + "sha256:cb189bd98b2e7ac02df389b6212846ab20661f4bafe16b5a70a6f1728c1cc7cb", + "sha256:ef4ae41add536cb825d8aa029c15ef510aead06ea5b68daea64f0b9ecbff17db", + "sha256:f00a2c21f60284e024bba351875f3501c6d5817d64997a0afe4f4355161a8889", + "sha256:f1232f98a6bbd6d1678249f94028bccc541bbc306aa5c4e1471a881b0e5a3409", + "sha256:fea682f6ddc09517df0e6d5caad9613c6d91a42232aeb082df67e4d205de19cc" ], - "version": "==1.15.4" + "version": "==1.16.0" }, "outcome": { "hashes": [ @@ -427,28 +436,28 @@ }, "pandas": { "hashes": [ - "sha256:11975fad9edbdb55f1a560d96f91830e83e29bed6ad5ebf506abda09818eaf60", - "sha256:12e13d127ca1b585dd6f6840d3fe3fa6e46c36a6afe2dbc5cb0b57032c902e31", - "sha256:1c87fcb201e1e06f66e23a61a5fea9eeebfe7204a66d99df24600e3f05168051", - "sha256:242e9900de758e137304ad4b5663c2eff0d798c2c3b891250bd0bd97144579da", - "sha256:26c903d0ae1542890cb9abadb4adcb18f356b14c2df46e4ff657ae640e3ac9e7", - "sha256:2e1e88f9d3e5f107b65b59cd29f141995597b035d17cc5537e58142038942e1a", - "sha256:31b7a48b344c14691a8e92765d4023f88902ba3e96e2e4d0364d3453cdfd50db", - "sha256:4fd07a932b4352f8a8973761ab4e84f965bf81cc750fb38e04f01088ab901cb8", - "sha256:5b24ca47acf69222e82530e89111dd9d14f9b970ab2cd3a1c2c78f0c4fbba4f4", - "sha256:647b3b916cc8f6aeba240c8171be3ab799c3c1b2ea179a3be0bd2712c4237553", - "sha256:66b060946046ca27c0e03e9bec9bba3e0b918bafff84c425ca2cc2e157ce121e", - "sha256:6efa9fa6e1434141df8872d0fa4226fc301b17aacf37429193f9d70b426ea28f", - "sha256:be4715c9d8367e51dbe6bc6d05e205b1ae234f0dc5465931014aa1c4af44c1ba", - "sha256:bea90da782d8e945fccfc958585210d23de374fa9294a9481ed2abcef637ebfc", - "sha256:d318d77ab96f66a59e792a481e2701fba879e1a453aefeebdb17444fe204d1ed", - "sha256:d785fc08d6f4207437e900ffead930a61e634c5e4f980ba6d3dc03c9581748c7", - "sha256:de9559287c4fe8da56e8c3878d2374abc19d1ba2b807bfa7553e912a8e5ba87c", - "sha256:f4f98b190bb918ac0bc0e3dd2ab74ff3573da9f43106f6dba6385406912ec00f", - "sha256:f71f1a7e2d03758f6e957896ed696254e2bc83110ddbc6942018f1a232dd9dad", - "sha256:fb944c8f0b0ab5c1f7846c686bc4cdf8cde7224655c12edcd59d5212cd57bec0" + "sha256:02d34a55e85819a7eab096f391f8dcc237876e8b3cdaf1fba964f5fb59af9acf", + "sha256:0dbcf78e68f619840184ce661c68c1760de403b0f69d81905d6b9a699d1861d6", + "sha256:174c3974da26fd778ac8537d74efb17d4cef59e6b3e81e3c59690f39a6f6b73d", + "sha256:3a8ab5c350131ba273d3f8eb430343304d6c2138a61d34e4a11ebd75f8bf3e7e", + "sha256:560074ce9ff95409b233c0a8d143a2546a2d71d636d583172252dc0021fdb11b", + "sha256:5bded8cb431705609dbd9048114f1d6d59bef2f1ca95a8c58bd649442c9dc16c", + "sha256:8a8748684787792f3a643a7e0530c3024301f3e5799a199a5c2c526c07f712ba", + "sha256:8c7e43c4b7920fc02ce7743b976aca15bd45293ed298d84793307bc9799df3f6", + "sha256:9bd9ef3e183b7b1ce90b7ab5e8672907cd73dc36f036fc6714f0e7a5f9852da0", + "sha256:d3f27e276c8557c15c19c5c9a414e77b893d39fce6e6e40e5c46fcf5eeffe028", + "sha256:d40b82a4aee4ca968348e41bf6588ed9cadd171c7da8b671ed31d3fd967de703", + "sha256:d8cf054a099ff694a0e75386471bdde098efe7c350548ec6b899f169bef1a859", + "sha256:dd9f4843aa59f09698679b64064f11f51d60e45358ab45299de4dcff90524be3", + "sha256:e6f9f5ad4e73f5eecaa66e9c9d30ff8661c400190a6079ee170e37a466457e31", + "sha256:e9989e17f203900b2c7add53fa17d6686e66282598359b43fb12260ae8bf7eba", + "sha256:eadc9d19b25420e1ae77f0a11b779d4e71f47c3aa1953c218e8fe812d1f5341e", + "sha256:ecb630a99b0ab6c178b5c2988ca8c5b98f6ec2fd9e172c2873a5df44b261310f", + "sha256:f8eb9308bd64abf71dda77b823913696cd85c4f36c026acee0a64d8834a09b43", + "sha256:fe71a037ce866d9fb717fd3a792d46c744433179bf3f25da48af8f46cee20c3e", + "sha256:ff0d83306bfda4639fac2a4f8df2c51eb2bbdda540a74490703e8a6b413a37eb" ], - "version": "==0.23.4" + "version": "==0.24.0" }, "pdbpp": { "hashes": [ @@ -463,10 +472,10 @@ }, "pluggy": { "hashes": [ - "sha256:447ba94990e8014ee25ec853339faf7b0fc8050cdc3289d4d71f7f410fb90095", - "sha256:bde19360a8ec4dfd8a20dcb811780a30998101f078fc7ded6162f0076f50508f" + "sha256:8ddc32f03971bfdf900a81961a48ccf2fb677cf7715108f85295c67405798616", + "sha256:980710797ff6a041e9a73a5787804f848996ecaa6f8a1b1e08224a5894f2074a" ], - "version": "==0.8.0" + "version": "==0.8.1" }, "py": { "hashes": [ @@ -477,18 +486,18 @@ }, "pygments": { "hashes": [ - "sha256:6301ecb0997a52d2d31385e62d0a4a4cf18d2f2da7054a5ddad5c366cd39cee7", - "sha256:82666aac15622bd7bb685a4ee7f6625dd716da3ef7473620c192c0168aae64fc" + "sha256:5ffada19f6203563680669ee7f53b64dabbeb100eb51b61996085e99c03b284a", + "sha256:e8218dd399a61674745138520d0d4cf2621d7e032439341bc3f647bff125818d" ], - "version": "==2.3.0" + "version": "==2.3.1" }, "pytest": { "hashes": [ - "sha256:1d131cc532be0023ef8ae265e2a779938d0619bb6c2510f52987ffcba7fa1ee4", - "sha256:ca4761407f1acc85ffd1609f464ca20bb71a767803505bd4127d0e45c5a50e23" + "sha256:41568ea7ecb4a68d7f63837cf65b92ce8d0105e43196ff2b26622995bb3dc4b2", + "sha256:c3c573a29d7c9547fb90217ece8a8843aa0c1328a797e200290dc3d0b4b823be" ], "index": "pypi", - "version": "==4.0.1" + "version": "==4.1.1" }, "python-dateutil": { "hashes": [ @@ -499,17 +508,17 @@ }, "pytz": { "hashes": [ - "sha256:31cb35c89bd7d333cd32c5f278fca91b523b0834369e757f4c5641ea252236ca", - "sha256:8e0f8568c118d3077b46be7d654cc8167fa916092e28320cde048e54bfc9f1e6" + "sha256:32b0891edff07e28efe91284ed9c31e123d84bea3fd98e1f72be2508f43ef8d9", + "sha256:d5f05e487007e29e03409f9398d074e158d920d36eb82eaf66fb1136b0c5374c" ], - "version": "==2018.7" + "version": "==2018.9" }, "six": { "hashes": [ - "sha256:70e8a77beed4562e7f14fe23a786b54f6296e34344c23bc42f07b15018ff98e9", - "sha256:832dc0e10feb1aa2c68dcc57dbb658f1c7e65b9b61af69048abc87a2db00a0eb" + "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", + "sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73" ], - "version": "==1.11.0" + "version": "==1.12.0" }, "sniffio": { "hashes": [ @@ -527,11 +536,10 @@ }, "trio": { "hashes": [ - "sha256:65cf596eccad597f46fce1d53220e5aca9a143e52cc99e11f33e429b0c4de33f", - "sha256:6d905d950dfa1db3fad6b5ef5637c221947123fd2b0e112033fecfc582318c3b" + "sha256:d323cc15f6406d15954af91e5e34af2001cc24163fdde29e3f88a227a1b53ab0" ], "index": "pypi", - "version": "==0.9.0" + "version": "==0.10.0" }, "wmctrl": { "hashes": [ From e91a50a1bad4f715c942887d021e7313c52ff6b4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 3 Feb 2019 23:40:51 -0500 Subject: [PATCH 06/16] Make `get_cached_feed()` an asynccontextmanager Adjust feed locking around internal manager `yields` to make this work. Also, change quote publisher to deliver a list of quotes for each retrieved batch. This was actually broken for option streaming since each quote was being overwritten due to a common `key` value for all expiries. Asjust the `packetizer` function accordingly to work for both options and stocks. --- piker/brokers/data.py | 120 +++++++++++++++++++++--------------------- 1 file changed, 61 insertions(+), 59 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 09847615..3c4cb944 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -15,6 +15,7 @@ from operator import itemgetter import trio import tractor +from async_generator import asynccontextmanager from ..log import get_logger, get_console_log from . import get_brokermod @@ -127,11 +128,16 @@ async def stream_quotes( log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote - new_quotes[quote['key']] = quote + # XXX: we append to a list for the options case where the + # subscription topic (key) is the same for all + # expiries even though this is uncessary for the + # stock case (different topic [i.e. symbol] for each + # quote). + new_quotes.setdefault(quote['key'], []).append(quote) else: log.info(f"Delivering quotes:\n{quotes}") for quote in quotes: - new_quotes[quote['symbol']] = quote + new_quotes.setdefault(quote['key'], []).append(quote) yield new_quotes @@ -153,8 +159,8 @@ async def stream_quotes( async def symbol_data(broker: str, tickers: List[str]): """Retrieve baseline symbol info from broker. """ - feed = await get_cached_feed(broker) - return await feed.client.symbol_data(tickers) + async with get_cached_feed(broker) as feed: + return await feed.client.symbol_data(tickers) async def smoke_quote(get_quotes, tickers, broker): @@ -193,7 +199,7 @@ async def smoke_quote(get_quotes, tickers, broker): # report any unknown/invalid symbols (QT specific) if quote.get('low52w', False) is None: - log.warn( + log.error( f"{symbol} seems to be defunct") payload[symbol] = quote @@ -204,6 +210,7 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### +@asynccontextmanager async def get_cached_feed( brokername: str, ) -> BrokerFeed: @@ -213,24 +220,29 @@ async def get_cached_feed( ss = tractor.current_actor().statespace feeds = ss.setdefault('feeds', {'_lock': trio.Lock()}) lock = feeds['_lock'] - async with lock: + try: try: - feed = feeds[brokername] - log.info(f"Subscribing with existing `{brokername}` daemon") - return feed + async with lock: + feed = feeds[brokername] + log.info(f"Subscribing with existing `{brokername}` daemon") + yield feed except KeyError: - log.info(f"Creating new client for broker {brokername}") - brokermod = get_brokermod(brokername) - exit_stack = contextlib.AsyncExitStack() - client = await exit_stack.enter_async_context( - brokermod.get_client()) - feed = BrokerFeed( - mod=brokermod, - client=client, - exit_stack=exit_stack, - ) - feeds[brokername] = feed - return feed + async with lock: + log.info(f"Creating new client for broker {brokername}") + brokermod = get_brokermod(brokername) + exit_stack = contextlib.AsyncExitStack() + client = await exit_stack.enter_async_context( + brokermod.get_client()) + feed = BrokerFeed( + mod=brokermod, + client=client, + exit_stack=exit_stack, + ) + feeds[brokername] = feed + yield feed + finally: + # destroy the API client + await feed.exit_stack.aclose() async def start_quote_stream( @@ -256,40 +268,39 @@ async def start_quote_stream( log.info( f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}") # another actor task may have already created it - feed = await get_cached_feed(broker) - symbols2ctxs = feed.subscriptions[feed_type] - packetizer = None + async with get_cached_feed(broker) as feed: + # function to format packets delivered to subscribers + packetizer = None - if feed_type == 'stock': - get_quotes = feed.quoters.setdefault( - 'stock', - await feed.mod.stock_quoter(feed.client, symbols) - ) - # do a smoke quote (note this mutates the input list and filters - # out bad symbols for now) - payload = await smoke_quote(get_quotes, symbols, broker) + if feed_type == 'stock': + get_quotes = feed.quoters.setdefault( + 'stock', + await feed.mod.stock_quoter(feed.client, symbols) + ) + # do a smoke quote (note this mutates the input list and filters + # out bad symbols for now) + payload = await smoke_quote(get_quotes, symbols, broker) - elif feed_type == 'option': - # FIXME: yeah we need maybe a more general way to specify - # the arg signature for the option feed beasides a symbol - # + expiry date. - get_quotes = feed.quoters.setdefault( - 'option', - await feed.mod.option_quoter(feed.client, symbols) - ) - # packetize - payload = { - quote['symbol']: quote - for quote in await get_quotes(symbols) - } + elif feed_type == 'option': + # FIXME: yeah we need maybe a more general way to specify + # the arg signature for the option feed beasides a symbol + # + expiry date. + get_quotes = feed.quoters.setdefault( + 'option', + await feed.mod.option_quoter(feed.client, symbols) + ) + # packetize + payload = { + quote['symbol']: quote + for quote in await get_quotes(symbols) + } - def packetizer(topic, quote): - return {quote['symbol']: quote} + def packetizer(topic, quotes): + return {quote['symbol']: quote for quote in quotes} - # push initial smoke quote response for client initialization - await ctx.send_yield(payload) + # push initial smoke quote response for client initialization + await ctx.send_yield(payload) - try: await stream_quotes( # pub required kwargs @@ -306,14 +317,6 @@ async def start_quote_stream( ) log.info( f"Terminating stream quoter task for {feed.mod.name}") - finally: - # if there are truly no more subscriptions with this broker - # drop from broker subs dict - if not any(symbols2ctxs.values()): - log.info(f"No more subscriptions for broker {broker}") - - # destroy the API client - await feed.exit_stack.aclose() class DataFeed: @@ -377,7 +380,6 @@ class DataFeed: # get first quotes response log.debug(f"Waiting on first quote for {symbols}...") quotes = {} - # with trio.move_on_after(5): quotes = await quote_gen.__anext__() self.quote_gen = quote_gen From 5dac8fa44dff8084f3691e3a2bdf6d1465e44bac Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Feb 2019 00:15:10 -0500 Subject: [PATCH 07/16] Note the RH auth/account requirements for usage --- piker/brokers/robinhood.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py index ff1e22d4..4c17fff1 100644 --- a/piker/brokers/robinhood.py +++ b/piker/brokers/robinhood.py @@ -1,5 +1,9 @@ """ Robinhood API backend. + +WARNING: robinhood now requires authenticated access to use the quote +endpoints (it didn't originally). We need someone with a valid US +account to test this code. """ from functools import partial from typing import List From 5339f754a1021e8f58887c17bedf27f5d4011d72 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Feb 2019 00:16:16 -0500 Subject: [PATCH 08/16] Add a token refresh test that exhibits an API race issue --- tests/test_questrade.py | 55 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 4 deletions(-) diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 6730884d..2e34aada 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -2,15 +2,19 @@ Questrade broker testing """ import time +import logging import trio -import tractor from trio.testing import trio_test +import tractor from tractor.testing import tractor_test from piker.brokers import questrade as qt import pytest +log = tractor.get_logger('tests') + + @pytest.fixture(autouse=True) def check_qt_conf_section(brokerconf): """Skip this module's tests if we have not quetrade API creds. @@ -101,6 +105,49 @@ def match_packet(symbols, quotes, feed_type='stock'): assert not quotes +async def intermittently_refresh_tokens(): + async with qt.get_client() as client: + try: + while True: + try: + log.info("REFRESHING TOKENS!") + await client.ensure_access(force_refresh=True) + await trio.sleep(0.3) + except Exception: + log.exception("Token refresh failed") + finally: + with trio.open_cancel_scope(shield=True): + async with qt.get_client() as client: + await client.ensure_access(force_refresh=True) + + +# XXX: demonstrates the shoddy API QT serves +@pytest.mark.skip +@tractor_test +async def test_concurrent_tokens_refresh(us_symbols, loglevel): + async with qt.get_client() as client: + + # async with tractor.open_nursery() as n: + # await n.run_in_actor('other', intermittently_refresh_tokens) + + async with trio.open_nursery() as n: + n.start_soon(intermittently_refresh_tokens) + + quoter = await qt.stock_quoter(client, us_symbols) + + async def get_quotes(): + for tries in range(10): + log.info(f"{tries}: GETTING QUOTES!") + quotes = await quoter(us_symbols) + await trio.sleep(0.1) + + await get_quotes() + + # shutdown + # await n.cancel() + n.cancel_scope.cancel() + + @trio_test async def test_batched_stock_quote(us_symbols): """Use the client stock quote api and verify quote response format. @@ -201,13 +248,13 @@ async def stream_option_chain(portal, symbols): broker='questrade', symbols=[sub], feed_type='option', - rate=4, + rate=3, diff_cached=False, ) # latency arithmetic loops = 8 - rate = 1/3. # 3 rps - timeout = loops / rate + period = 1/3. # 3 rps + timeout = loops / period try: # wait on the data streamer to actually start From 026b015627453fe5960915215fb3807e1386c2cb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Feb 2019 00:17:11 -0500 Subject: [PATCH 09/16] Allow passing a config path for broker testing in CI --- piker/brokers/config.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/piker/brokers/config.py b/piker/brokers/config.py index 5b1f9e0f..22cbfe9c 100644 --- a/piker/brokers/config.py +++ b/piker/brokers/config.py @@ -12,13 +12,14 @@ _config_dir = click.get_app_dir('piker') _broker_conf_path = path.join(_config_dir, 'brokers.ini') -def load() -> (configparser.ConfigParser, str): +def load(path: str = None) -> (configparser.ConfigParser, str): """Load broker config. """ + path = path or _broker_conf_path config = configparser.ConfigParser() - read = config.read(_broker_conf_path) - log.debug(f"Read config file {_broker_conf_path}") - return config, _broker_conf_path + read = config.read(path) + log.debug(f"Read config file {path}") + return config, path def write(config: configparser.ConfigParser) -> None: From f6230dd6df442bef245c79099bdfdeb1a7ee444b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 9 Feb 2019 21:38:00 -0500 Subject: [PATCH 10/16] Add a `DataFeed.call_client()` method Allows for calling an actor local broker client's methods from a remote actor. --- piker/brokers/data.py | 69 ++++++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 24 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 3c4cb944..340ef73a 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -25,7 +25,7 @@ log = get_logger('broker.data') async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: - """Wait until the network comes back up. + """Wait until the network (DNS) comes back up. """ down = False while True: @@ -220,29 +220,30 @@ async def get_cached_feed( ss = tractor.current_actor().statespace feeds = ss.setdefault('feeds', {'_lock': trio.Lock()}) lock = feeds['_lock'] + feed = None try: - try: - async with lock: - feed = feeds[brokername] - log.info(f"Subscribing with existing `{brokername}` daemon") - yield feed - except KeyError: - async with lock: - log.info(f"Creating new client for broker {brokername}") - brokermod = get_brokermod(brokername) - exit_stack = contextlib.AsyncExitStack() - client = await exit_stack.enter_async_context( - brokermod.get_client()) - feed = BrokerFeed( - mod=brokermod, - client=client, - exit_stack=exit_stack, - ) - feeds[brokername] = feed - yield feed + async with lock: + feed = feeds[brokername] + log.info(f"Subscribing with existing `{brokername}` daemon") + yield feed + except KeyError: + async with lock: + log.info(f"Creating new client for broker {brokername}") + brokermod = get_brokermod(brokername) + exit_stack = contextlib.AsyncExitStack() + client = await exit_stack.enter_async_context( + brokermod.get_client()) + feed = BrokerFeed( + mod=brokermod, + client=client, + exit_stack=exit_stack, + ) + feeds[brokername] = feed + yield feed finally: - # destroy the API client - await feed.exit_stack.aclose() + if feed is not None: + # destroy the API client + await feed.exit_stack.aclose() async def start_quote_stream( @@ -319,9 +320,18 @@ async def start_quote_stream( f"Terminating stream quoter task for {feed.mod.name}") +async def call_client( + broker: str, + methname: str, + **kwargs, +): + async with get_cached_feed(broker) as feed: + return await getattr(feed.client, methname)(**kwargs) + + class DataFeed: - """Data feed client for streaming symbol data from a (remote) - ``brokerd`` data daemon. + """Data feed client for streaming symbol data from and making API client calls + to a (remote) ``brokerd`` daemon. """ _allowed = ('stock', 'option') @@ -400,6 +410,17 @@ class DataFeed: ]) return records, displayables + async def call_client(self, method, **kwargs): + """Call a broker ``Client`` method using RPC and return result. + """ + return await self.portal.run( + 'piker.brokers.data', + 'call_client', + broker=self.brokermod.name, + methname=method, + **kwargs + ) + async def stream_to_file( watchlist_name: str, From 395f0c8e4aabef0e7df65f71629af593f39d101b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 9 Feb 2019 21:39:22 -0500 Subject: [PATCH 11/16] Synchronize Questrade token refreshing per client Questrade's API is half baked and can't handle concurrency. It allows multiple concurrent requests to most endpoints *except* for the auth endpoint used to refresh tokens: https://www.questrade.com/api/documentation/security I've gone through extensive dialogue with their API team and despite making what I think are very good arguments for doing the request serialization on the server side, they decided that I should instead do the "locking" on the client side. Frankly it doesn't seem like they have that competent an engineering department as it took me a long time to explain the issue even though it's rather trivial and probably not that hard to fix; maybe it's better this way. This adds a few things to ensure more reliable token refreshes on expiry: - add a `@refresh_token_on_err` decorator which can be used on `_API` methods that should refresh tokens on failure - decorate most endpoints with this *except* for the auth ep - add locking logic for the troublesome scenario as follows: * every time a request is sent out set a "request in progress" event variable that can be used to determine when no requests are currently outstanding * every time the auth end point is hit in order to refresh tokens set an event that locks out other tasks from making requests * only allow hitting the auth endpoint when there are no "requests in progress" using the first event * mutex all auth endpoint requests; there can only be one outstanding - don't hit the accounts endpoint at client startup; we want to eventually support keys from multiple accounts and you can disable account info per key and just share the market data function --- piker/brokers/questrade.py | 362 +++++++++++++++++++++---------------- 1 file changed, 207 insertions(+), 155 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index c3fb7586..2ac9d1ed 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -1,6 +1,8 @@ """ Questrade API backend. """ +from __future__ import annotations +import inspect import time from datetime import datetime from functools import partial @@ -9,6 +11,7 @@ from typing import List, Tuple, Dict, Any, Iterator, NamedTuple import trio from async_generator import asynccontextmanager +import wrapt from ..calc import humanize, percent_change from . import config @@ -40,56 +43,121 @@ class ContractsKey(NamedTuple): expiry: datetime +def refresh_token_on_err(tries=3): + """`_API` method decorator which locks the client and refreshes tokens + before unlocking access to the API again. + + QT's service end can't handle concurrent requests to multiple + endpoints reliably without choking up and confusing their interal + servers. + """ + + @wrapt.decorator + async def wrapper(wrapped, api, args, kwargs): + assert inspect.iscoroutinefunction(wrapped) + client = api.client + + if not client._has_access.is_set(): + log.warning("WAITING ON ACCESS LOCK") + await client._has_access.wait() + + for i in range(1, tries): + try: + try: + client._request_not_in_progress.clear() + return await wrapped(*args, **kwargs) + finally: + client._request_not_in_progress.set() + except (QuestradeError, BrokerError) as qterr: + if "Access token is invalid" not in str(qterr.args[0]): + raise + # TODO: this will crash when run from a sub-actor since + # STDIN can't be acquired. The right way to handle this + # is to make a request to the parent actor (i.e. + # spawner of this) to call this + # `client.ensure_access()` locally thus blocking until + # the user provides an API key on the "client side" + log.warning(f"Tokens are invalid refreshing try {i}..") + await client.ensure_access(force_refresh=True) + if i == tries - 1: + raise + return wrapper + + class _API: """Questrade API endpoints exposed as methods and wrapped with an http session. """ - def __init__(self, session: asks.Session): - self._sess = session + def __init__( + self, + client: Client, + ): + self.client = client + self._sess: asks.Session = client._sess - async def _request(self, path: str, params=None) -> dict: + @refresh_token_on_err() + async def _get(self, path: str, params=None) -> dict: + """Get an endpoint "reliably" by ensuring access on failure. + """ resp = await self._sess.get(path=f'/{path}', params=params) return resproc(resp, log) + async def _new_auth_token(self, refresh_token: str) -> dict: + """Request a new api authorization ``refresh_token``. + + Gain api access using either a user provided or existing token. + See the instructions:: + + http://www.questrade.com/api/documentation/getting-started + http://www.questrade.com/api/documentation/security + """ + resp = await self._sess.get( + _refresh_token_ep + 'token', + params={'grant_type': 'refresh_token', + 'refresh_token': refresh_token} + ) + return resproc(resp, log) + async def accounts(self) -> dict: - return await self._request('accounts') + return await self._get('accounts') async def time(self) -> dict: - return await self._request('time') + return await self._get('time') async def markets(self) -> dict: - return await self._request('markets') + return await self._get('markets') async def search(self, prefix: str) -> dict: - return await self._request( + return await self._get( 'symbols/search', params={'prefix': prefix}) async def symbols(self, ids: str = '', names: str = '') -> dict: log.debug(f"Symbol lookup for {ids or names}") - return await self._request( + return await self._get( 'symbols', params={'ids': ids, 'names': names}) async def quotes(self, ids: str) -> dict: - quotes = (await self._request( + quotes = (await self._get( 'markets/quotes', params={'ids': ids}))['quotes'] for quote in quotes: quote['key'] = quote['symbol'] return quotes async def candles(self, id: str, start: str, end, interval) -> dict: - return await self._request(f'markets/candles/{id}', params={}) + return await self._get(f'markets/candles/{id}', params={}) async def balances(self, id: str) -> dict: - return await self._request(f'accounts/{id}/balances') + return await self._get(f'accounts/{id}/balances') async def postions(self, id: str) -> dict: - return await self._request(f'accounts/{id}/positions') + return await self._get(f'accounts/{id}/positions') async def option_contracts(self, symbol_id: str) -> dict: "Retrieve all option contract API ids with expiry -> strike prices." - contracts = await self._request(f'symbols/{symbol_id}/options') + contracts = await self._get(f'symbols/{symbol_id}/options') return contracts['optionChain'] + @refresh_token_on_err() async def option_quotes( self, contracts: Dict[ContractsKey, Dict[int, dict]] = {}, @@ -107,7 +175,8 @@ class _API: ] resp = await self._sess.post( path=f'/markets/quotes/options', - # XXX: b'{"code":1024,"message":"The size of the array requested is not valid: optionIds"}' + # XXX: b'{"code":1024,"message":"The size of the array requested + # is not valid: optionIds"}' # ^ what I get when trying to use too many ids manually... json={'filters': filters, 'optionIds': option_ids} ) @@ -122,48 +191,24 @@ class Client: """ def __init__(self, config: configparser.ConfigParser): self._sess = asks.Session() - self.api = _API(self._sess) + self.api = _API(self) self._conf = config self.access_data = {} self._reload_config(config) self._symbol_cache: Dict[str, int] = {} self._optids2contractinfo = {} self._contract2ids = {} + # for blocking during token refresh + self._has_access = trio.Event() + self._has_access.set() + self._request_not_in_progress = trio.Event() + self._request_not_in_progress.set() + self._mutex = trio.StrictFIFOLock() def _reload_config(self, config=None, **kwargs): - log.warn("Reloading access config data") self._conf = config or get_config(**kwargs) self.access_data = dict(self._conf['questrade']) - async def _new_auth_token(self) -> dict: - """Request a new api authorization ``refresh_token``. - - Gain api access using either a user provided or existing token. - See the instructions:: - - http://www.questrade.com/api/documentation/getting-started - http://www.questrade.com/api/documentation/security - """ - resp = await self._sess.get( - _refresh_token_ep + 'token', - params={'grant_type': 'refresh_token', - 'refresh_token': self.access_data['refresh_token']} - ) - data = resproc(resp, log) - self.access_data.update(data) - - return data - - def _prep_sess(self) -> None: - """Fill http session with auth headers and a base url. - """ - data = self.access_data - # set access token header for the session - self._sess.headers.update({ - 'Authorization': (f"{data['token_type']} {data['access_token']}")}) - # set base API url (asks shorthand) - self._sess.base_location = self.access_data['api_server'] + _version - async def _revoke_auth_token(self) -> None: """Revoke api access for the current token. """ @@ -175,8 +220,14 @@ class Client: ) return resp + def write_config(self): + """Save access creds to config file. + """ + self._conf['questrade'] = self.access_data + config.write(self._conf) + async def ensure_access(self, force_refresh: bool = False) -> dict: - """Acquire new ``access_token`` and/or ``refresh_token`` if necessary. + """Acquire a new token set (``access_token`` and ``refresh_token``). Checks if the locally cached (file system) ``access_token`` has expired (based on a ``expires_at`` time stamp stored in the brokers.ini config) @@ -185,47 +236,90 @@ class Client: ``refresh_token`` has expired a new one needs to be provided by the user. """ - access_token = self.access_data.get('access_token') - expires = float(self.access_data.get('expires_at', 0)) - expires_stamp = datetime.fromtimestamp( - expires).strftime('%Y-%m-%d %H:%M:%S') - if not access_token or (expires < time.time()) or force_refresh: - log.debug( - f"Refreshing access token {access_token} which expired at" - f" {expires_stamp}") - try: - data = await self._new_auth_token() - except BrokerError as qterr: - if "We're making some changes" in str(qterr.args[0]): - # API service is down - raise QuestradeError("API is down for maintenance") - elif qterr.args[0].decode() == 'Bad Request': - # likely config ``refresh_token`` is expired but may - # be updated in the config file via another piker process - self._reload_config() + # wait for ongoing requests to clear (API can't handle + # concurrent endpoint requests alongside a token refresh) + await self._request_not_in_progress.wait() + + # block api access to tall other tasks + # XXX: this is limitation of the API when using a single + # token whereby their service can't handle concurrent requests + # to differnet end points (particularly the auth ep) which + # causes hangs and premature token invalidation issues. + self._has_access.clear() + try: + # don't allow simultaneous token refresh requests + async with self._mutex: + access_token = self.access_data.get('access_token') + expires = float(self.access_data.get('expires_at', 0)) + expires_stamp = datetime.fromtimestamp( + expires).strftime('%Y-%m-%d %H:%M:%S') + if not access_token or ( + expires < time.time() + ) or force_refresh: + log.info("REFRESHING TOKENS!") + log.debug( + f"Refreshing access token {access_token} which expired" + f" at {expires_stamp}") try: - data = await self._new_auth_token() + data = await self.api._new_auth_token( + self.access_data['refresh_token']) except BrokerError as qterr: - if qterr.args[0].decode() == 'Bad Request': - # actually expired; get new from user - self._reload_config(force_from_user=True) - data = await self._new_auth_token() + + def get_err_msg(err): + # handle str and bytes... + msg = err.args[0] + return msg.decode() if msg.isascii() else msg + + msg = get_err_msg(qterr) + + if "We're making some changes" in msg: + # API service is down + raise QuestradeError("API is down for maintenance") + + elif msg == 'Bad Request': + # likely config ``refresh_token`` is expired but + # may be updated in the config file via another + # piker process + self._reload_config() + try: + data = await self.api._new_auth_token( + self.access_data['refresh_token']) + except BrokerError as qterr: + if get_err_msg(qterr) == 'Bad Request': + # actually expired; get new from user + self._reload_config(force_from_user=True) + data = await self.api._new_auth_token( + self.access_data['refresh_token']) + else: + raise QuestradeError(qterr) else: - raise QuestradeError(qterr) + raise qterr + + self.access_data.update(data) + log.debug(f"Updated tokens:\n{data}") + # store an absolute access token expiry time + self.access_data['expires_at'] = time.time() + float( + data['expires_in']) + + # write to config to disk + self.write_config() else: - raise qterr + log.info( + f"\nCurrent access token {access_token} expires at" + f" {expires_stamp}\n") - # store absolute token expiry time - self.access_data['expires_at'] = time.time() + float( - data['expires_in']) - # write to config on disk - write_conf(self) - else: - log.debug(f"\nCurrent access token {access_token} expires at" - f" {expires_stamp}\n") + # set access token header for the session + data = self.access_data + self._sess.headers.update({ + 'Authorization': + (f"{data['token_type']} {data['access_token']}")} + ) + # set base API url (asks shorthand) + self._sess.base_location = data['api_server'] + _version + finally: + self._has_access.set() - self._prep_sess() - return self.access_data + return data async def tickers2ids(self, tickers): """Helper routine that take a sequence of ticker symbols and returns @@ -407,54 +501,53 @@ def _token_from_user(conf: 'configparser.ConfigParser') -> None: conf['questrade'] = {'refresh_token': refresh_token} -def get_config(force_from_user=False) -> "configparser.ConfigParser": - conf, path = config.load() - if not conf.has_section('questrade') or ( - not conf['questrade'].get('refresh_token') or ( - force_from_user) - ): +def get_config( + force_from_user: bool = False, + config_path: str = None, +) -> "configparser.ConfigParser": + """Load the broker config from disk. + + By default this is the file: + + ~/.config/piker/brokers.ini + + though may be different depending on your OS. + """ + log.debug("Reloading access config data") + conf, path = config.load(config_path) + if not conf.has_section('questrade'): log.warn( f"No valid refresh token could be found in {path}") + elif force_from_user: + log.warn(f"Forcing manual token auth from user") _token_from_user(conf) return conf -def write_conf(client): - """Save access creds to config file. - """ - client._conf['questrade'] = client.access_data - config.write(client._conf) - - @asynccontextmanager async def get_client() -> Client: - """Spawn a broker client. - - A client must adhere to the method calls in ``piker.broker.core``. + """Spawn a broker client for making requests to the API service. """ conf = get_config() log.debug(f"Loaded config:\n{colorize_json(dict(conf['questrade']))}") client = Client(conf) await client.ensure_access() - try: log.debug("Check time to ensure access token is valid") - try: - # await client.api.time() - await client.quote(['RY.TO']) - except Exception: - # access token is likely no good - log.warn(f"Access token {client.access_data['access_token']} seems" - f" expired, forcing refresh") - await client.ensure_access(force_refresh=True) - await client.api.time() - - accounts = await client.api.accounts() - log.info(f"Available accounts:\n{colorize_json(accounts)}") + await client.api.time() + except Exception: + # access token is likely no good + log.warn(f"Access tokens {client.access_data} seem" + f" expired, forcing refresh") + await client.ensure_access(force_refresh=True) + await client.api.time() + try: yield client - finally: - write_conf(client) + except trio.Cancelled: + # only write config if we didn't bail out + client.write_config() + raise async def stock_quoter(client: Client, tickers: List[str]): @@ -480,27 +573,7 @@ async def stock_quoter(client: Client, tickers: List[str]): return {} ids = await get_symbol_id_seq(tuple(tickers)) - - try: - quotes_resp = await client.api.quotes(ids=ids) - except (QuestradeError, BrokerError) as qterr: - if "Access token is invalid" not in str(qterr.args[0]): - raise - # out-of-process piker actor may have - # renewed already.. - client._reload_config() - try: - quotes_resp = await client.api.quotes(ids=ids) - except BrokerError as qterr: - if "Access token is invalid" in str(qterr.args[0]): - # TODO: this will crash when run from a sub-actor since - # STDIN can't be acquired. The right way to handle this - # is to make a request to the parent actor (i.e. - # spawner of this) to call this - # `client.ensure_access()` locally thus blocking until - # the user provides an API key on the "client side" - await client.ensure_access(force_refresh=True) - quotes_resp = await client.api.quotes(ids=ids) + quotes_resp = await client.api.quotes(ids=ids) # post-processing for quote in quotes_resp: @@ -543,28 +616,7 @@ async def option_quoter(client: Client, tickers: List[str]): """ contracts = await get_contract_by_date( tuple(symbol_date_pairs)) - try: - quotes = await client.option_chains(contracts) - except (QuestradeError, BrokerError) as qterr: - if "Access token is invalid" not in str(qterr.args[0]): - raise - # out-of-process piker actor may have - # renewed already.. - client._reload_config() - try: - quotes = await client.option_chains(contracts) - except BrokerError as qterr: - if "Access token is invalid" in str(qterr.args[0]): - # TODO: this will crash when run from a sub-actor since - # STDIN can't be acquired. The right way to handle this - # is to make a request to the parent actor (i.e. - # spawner of this) to call this - # `client.ensure_access()` locally thus blocking until - # the user provides an API key on the "client side" - await client.ensure_access(force_refresh=True) - quotes = await client.option_chains(contracts) - - return quotes + return await client.option_chains(contracts) return get_quote From 3ab9e28ddb89312a9f320a09c4e1f8ef1f3b3e06 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 9 Feb 2019 21:58:27 -0500 Subject: [PATCH 12/16] Use brokerd's client to get all contracts --- piker/ui/option_chain.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/piker/ui/option_chain.py b/piker/ui/option_chain.py index 276ff343..da981f54 100644 --- a/piker/ui/option_chain.py +++ b/piker/ui/option_chain.py @@ -1,6 +1,5 @@ """ options: a real-time option chain. - Launch with ``piker options ``. """ import types @@ -358,7 +357,9 @@ class OptionChain(object): ) # retreive all contracts to populate expiry row - all_contracts = await contracts(self.feed.brokermod, symbol) + all_contracts = await self.feed.call_client( + 'get_all_contracts', symbols=[symbol]) + # all_contracts = await contracts(self.feed.brokermod, symbol) if not all_contracts: label = self.no_opts_label @@ -369,12 +370,18 @@ class OptionChain(object): self.symbol, self.expiry = symbol, expiry return + # XXX: Unfortunately we can't serialize named tuples over + # msgpack... The expiry index is 2, see the ``ContractsKey`` named + # tuple in the questrade broker mod. It would normally look + # something like: + # expiry = next(iter(all_contracts)).expiry if not expiry else expiry + ei = 2 # start streaming soonest contract by default if not provided - expiry = next(iter(all_contracts)).expiry if not expiry else expiry + expiry = next(iter(all_contracts))[ei] if not expiry else expiry # TODO: figure out how to compact these buttons expiries = { - key.expiry: key.expiry[:key.expiry.find('T')] + key[ei]: key[ei][:key[ei].find('T')] for key in all_contracts } expiry_row = self.widgets['expiry_row'] @@ -500,6 +507,7 @@ async def _async_main( # trio-kivy entry point. await async_runTouchApp(chain.widgets['root']) # run kivy finally: - await chain._quote_gen.aclose() + if chain._quote_gen: + await chain._quote_gen.aclose() # cancel GUI update task nursery.cancel_scope.cancel() From ea289540b3bfb24c2542675bee2190e7f46543c2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 9 Feb 2019 21:58:49 -0500 Subject: [PATCH 13/16] Add a test to verify auth endpoint "locking" --- tests/test_questrade.py | 42 +++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 2e34aada..1c081935 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -105,42 +105,44 @@ def match_packet(symbols, quotes, feed_type='stock'): assert not quotes -async def intermittently_refresh_tokens(): - async with qt.get_client() as client: - try: - while True: - try: - log.info("REFRESHING TOKENS!") - await client.ensure_access(force_refresh=True) - await trio.sleep(0.3) - except Exception: - log.exception("Token refresh failed") - finally: - with trio.open_cancel_scope(shield=True): - async with qt.get_client() as client: - await client.ensure_access(force_refresh=True) - - -# XXX: demonstrates the shoddy API QT serves -@pytest.mark.skip @tractor_test async def test_concurrent_tokens_refresh(us_symbols, loglevel): + """Verify that concurrent requests from mulitple tasks work alongside + random token refreshing which simulates an access token expiry + refresh + scenario. + + The API does not support concurrent requests when refreshing tokens + (i.e. when hitting the auth endpoint). This tests ensures that when + multiple tasks use the same client concurrency works and access + token expiry will result in a reliable token set update. + """ async with qt.get_client() as client: # async with tractor.open_nursery() as n: # await n.run_in_actor('other', intermittently_refresh_tokens) async with trio.open_nursery() as n: - n.start_soon(intermittently_refresh_tokens) quoter = await qt.stock_quoter(client, us_symbols) async def get_quotes(): - for tries in range(10): + for tries in range(30): log.info(f"{tries}: GETTING QUOTES!") quotes = await quoter(us_symbols) await trio.sleep(0.1) + async def intermittently_refresh_tokens(client): + while True: + try: + await client.ensure_access(force_refresh=True) + log.info(f"last token data is {client.access_data}") + await trio.sleep(1) + except Exception: + log.exception("Token refresh failed") + + n.start_soon(intermittently_refresh_tokens, client) + # run 2 quote polling tasks + n.start_soon(get_quotes) await get_quotes() # shutdown From 308ceb1772ad092eee5d9ceb8ab03327108f6315 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 10 Feb 2019 17:28:43 -0500 Subject: [PATCH 14/16] Use search method for `piker api` test --- tests/test_cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index ec46c004..26065245 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -75,7 +75,7 @@ def test_quotes_ticker_not_found( def test_api_method(nyse_tickers, capfd): """Ensure a low level api method can be called via CLI. """ - run(f"piker api quotes symbols={','.join(nyse_tickers)}") + run(f"piker api search prefix='WEED'") out, err = capfd.readouterr() quotes_dict = json.loads(out) assert isinstance(quotes_dict, dict) From 57bef52438db5feffd876949ff835e053312a16a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 10 Feb 2019 17:29:08 -0500 Subject: [PATCH 15/16] Capture the right logger --- tests/test_watchlists.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_watchlists.py b/tests/test_watchlists.py index f35ee9d4..8d90536b 100644 --- a/tests/test_watchlists.py +++ b/tests/test_watchlists.py @@ -37,7 +37,7 @@ def test_watchlist_is_sorted_no_dups_and_saved_to_file(piker_dir): def test_watchlists_config_dir_created(caplog, temp_dir): """Ensure that a config directory is created. """ - with caplog.at_level(logging.DEBUG): + with caplog.at_level(logging.DEBUG, logger='piker'): wl.make_config_dir(temp_dir) assert len(caplog.records) == 1 record = caplog.records[0] From 3a6efd451d45f5a1a819e303fdb265f1ecf6fb2a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 10 Feb 2019 19:09:54 -0500 Subject: [PATCH 16/16] Don't bother ensuring all symbols in data --- piker/brokers/questrade.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 2ac9d1ed..12d87de8 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -338,9 +338,8 @@ class Client: to_lookup = list(set(tickers) - set(symbols2ids)) if to_lookup: data = await self.api.symbols(names=','.join(to_lookup)) - for ticker, symbol in zip(to_lookup, data['symbols']): + for symbol in data['symbols']: name = symbol['symbol'] - assert name == ticker cache[name] = symbols2ids[name] = str(symbol['symbolId']) return symbols2ids