Make `get_cached_feed()` an asynccontextmanager

Adjust feed locking around internal manager `yields` to make this work.

Also, change quote publisher to deliver a list of quotes for each
retrieved batch. This was actually broken for option streaming since
each quote was being overwritten due to a common `key` value for all
expiries. Asjust the `packetizer` function accordingly to work for
both options and stocks.
kivy_mainline_and_py3.8
Tyler Goodlet 2019-02-03 23:40:51 -05:00
parent 9b37607b04
commit e91a50a1ba
1 changed files with 61 additions and 59 deletions

View File

@ -15,6 +15,7 @@ from operator import itemgetter
import trio import trio
import tractor import tractor
from async_generator import asynccontextmanager
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from . import get_brokermod from . import get_brokermod
@ -127,11 +128,16 @@ async def stream_quotes(
log.info( log.info(
f"New quote {quote['symbol']}:\n{new}") f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote _cache[symbol] = quote
new_quotes[quote['key']] = quote # XXX: we append to a list for the options case where the
# subscription topic (key) is the same for all
# expiries even though this is uncessary for the
# stock case (different topic [i.e. symbol] for each
# quote).
new_quotes.setdefault(quote['key'], []).append(quote)
else: else:
log.info(f"Delivering quotes:\n{quotes}") log.info(f"Delivering quotes:\n{quotes}")
for quote in quotes: for quote in quotes:
new_quotes[quote['symbol']] = quote new_quotes.setdefault(quote['key'], []).append(quote)
yield new_quotes yield new_quotes
@ -153,7 +159,7 @@ async def stream_quotes(
async def symbol_data(broker: str, tickers: List[str]): async def symbol_data(broker: str, tickers: List[str]):
"""Retrieve baseline symbol info from broker. """Retrieve baseline symbol info from broker.
""" """
feed = await get_cached_feed(broker) async with get_cached_feed(broker) as feed:
return await feed.client.symbol_data(tickers) return await feed.client.symbol_data(tickers)
@ -193,7 +199,7 @@ async def smoke_quote(get_quotes, tickers, broker):
# report any unknown/invalid symbols (QT specific) # report any unknown/invalid symbols (QT specific)
if quote.get('low52w', False) is None: if quote.get('low52w', False) is None:
log.warn( log.error(
f"{symbol} seems to be defunct") f"{symbol} seems to be defunct")
payload[symbol] = quote payload[symbol] = quote
@ -204,6 +210,7 @@ async def smoke_quote(get_quotes, tickers, broker):
########################################### ###########################################
@asynccontextmanager
async def get_cached_feed( async def get_cached_feed(
brokername: str, brokername: str,
) -> BrokerFeed: ) -> BrokerFeed:
@ -213,12 +220,14 @@ async def get_cached_feed(
ss = tractor.current_actor().statespace ss = tractor.current_actor().statespace
feeds = ss.setdefault('feeds', {'_lock': trio.Lock()}) feeds = ss.setdefault('feeds', {'_lock': trio.Lock()})
lock = feeds['_lock'] lock = feeds['_lock']
async with lock:
try: try:
try:
async with lock:
feed = feeds[brokername] feed = feeds[brokername]
log.info(f"Subscribing with existing `{brokername}` daemon") log.info(f"Subscribing with existing `{brokername}` daemon")
return feed yield feed
except KeyError: except KeyError:
async with lock:
log.info(f"Creating new client for broker {brokername}") log.info(f"Creating new client for broker {brokername}")
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
exit_stack = contextlib.AsyncExitStack() exit_stack = contextlib.AsyncExitStack()
@ -230,7 +239,10 @@ async def get_cached_feed(
exit_stack=exit_stack, exit_stack=exit_stack,
) )
feeds[brokername] = feed feeds[brokername] = feed
return feed yield feed
finally:
# destroy the API client
await feed.exit_stack.aclose()
async def start_quote_stream( async def start_quote_stream(
@ -256,8 +268,8 @@ async def start_quote_stream(
log.info( log.info(
f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}") f"{ctx.chan.uid} subscribed to {broker} for symbols {symbols}")
# another actor task may have already created it # another actor task may have already created it
feed = await get_cached_feed(broker) async with get_cached_feed(broker) as feed:
symbols2ctxs = feed.subscriptions[feed_type] # function to format packets delivered to subscribers
packetizer = None packetizer = None
if feed_type == 'stock': if feed_type == 'stock':
@ -283,13 +295,12 @@ async def start_quote_stream(
for quote in await get_quotes(symbols) for quote in await get_quotes(symbols)
} }
def packetizer(topic, quote): def packetizer(topic, quotes):
return {quote['symbol']: quote} return {quote['symbol']: quote for quote in quotes}
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
await ctx.send_yield(payload) await ctx.send_yield(payload)
try:
await stream_quotes( await stream_quotes(
# pub required kwargs # pub required kwargs
@ -306,14 +317,6 @@ async def start_quote_stream(
) )
log.info( log.info(
f"Terminating stream quoter task for {feed.mod.name}") f"Terminating stream quoter task for {feed.mod.name}")
finally:
# if there are truly no more subscriptions with this broker
# drop from broker subs dict
if not any(symbols2ctxs.values()):
log.info(f"No more subscriptions for broker {broker}")
# destroy the API client
await feed.exit_stack.aclose()
class DataFeed: class DataFeed:
@ -377,7 +380,6 @@ class DataFeed:
# get first quotes response # get first quotes response
log.debug(f"Waiting on first quote for {symbols}...") log.debug(f"Waiting on first quote for {symbols}...")
quotes = {} quotes = {}
# with trio.move_on_after(5):
quotes = await quote_gen.__anext__() quotes = await quote_gen.__anext__()
self.quote_gen = quote_gen self.quote_gen = quote_gen