Port to the new `@tractor.msg.pub` decorator API

The pub-sub data feed system was factored into `tractor` as an
experimental api / subsystem. Move to using that which greatly
simplifies the data feed architecture.
kivy_mainline_and_py3.8
Tyler Goodlet 2019-01-27 14:50:04 -05:00
parent 22670afe58
commit 2514843fc1
1 changed files with 61 additions and 155 deletions

View File

@ -44,10 +44,38 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict:
await trio.sleep(sleep) await trio.sleep(sleep)
# TODO: at this point probably just just make this a class and
# a lot of these functions should be methods. It will definitely
# make stateful UI apps easier to implement
@dataclass
class BrokerFeed:
"""A per broker "client feed" container.
A structure to keep track of components used by
real-time data daemons. This is a backend "client" which pulls
data from broker specific data lakes:
``DataFeed`` <- tractor -> ``BrokerFeed`` <- broker IPC -> broker API
"""
mod: ModuleType
client: object
exit_stack: contextlib.AsyncExitStack
quoter_keys: Tuple[str] = ('stock', 'option')
locks: Dict[str, trio.StrictFIFOLock] = field(
default_factory=lambda:
{'stock': trio.StrictFIFOLock(), 'option': trio.StrictFIFOLock()}
)
quoters: Dict[str, typing.Coroutine] = field(default_factory=dict)
subscriptions: Dict[str, Dict[str, set]] = field(
default_factory=partial(dict, **{'option': {}, 'stock': {}})
)
@tractor.msg.pub(tasks=['stock', 'option'])
async def stream_quotes( async def stream_quotes(
get_topics: typing.Callable, get_topics: typing.Callable,
get_quotes: Coroutine, get_quotes: Coroutine,
brokermod: ModuleType, feed: BrokerFeed,
rate: int = 5, # delay between quote requests rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None: ) -> None:
@ -57,6 +85,11 @@ async def stream_quotes(
A stock-broker client ``get_quotes()`` async context manager must be A stock-broker client ``get_quotes()`` async context manager must be
provided which returns an async quote retrieval function. provided which returns an async quote retrieval function.
""" """
broker_limit = getattr(feed.mod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec")
sleeptime = round(1. / rate, 3) sleeptime = round(1. / rate, 3)
_cache = {} # ticker to quote caching _cache = {} # ticker to quote caching
@ -94,11 +127,11 @@ async def stream_quotes(
log.info( log.info(
f"New quote {quote['symbol']}:\n{new}") f"New quote {quote['symbol']}:\n{new}")
_cache[symbol] = quote _cache[symbol] = quote
new_quotes[symbol] = quote new_quotes[quote['key']] = quote
else: else:
log.info(f"Delivering quotes:\n{quotes}") log.info(f"Delivering quotes:\n{quotes}")
for quote in quotes: for quote in quotes:
newquotes[quote['symbol']] = quote new_quotes[quote['symbol']] = quote
yield new_quotes yield new_quotes
@ -117,90 +150,6 @@ async def stream_quotes(
await trio.sleep(delay) await trio.sleep(delay)
# TODO: at this point probably just just make this a class and
# a lot of these functions should be methods. It will definitely
# make stateful UI apps easier to implement
@dataclass
class BrokerFeed:
"""A per broker "client feed" container.
A structure to keep track of components used by
real-time data daemons. This is a backend "client" which pulls
data from broker specific data lakes:
``DataFeed`` <- tractor -> ``BrokerFeed`` <- broker IPC -> broker API
"""
mod: ModuleType
client: object
exit_stack: contextlib.AsyncExitStack
quoter_keys: Tuple[str] = ('stock', 'option')
locks: Dict[str, trio.StrictFIFOLock] = field(
default_factory=lambda:
{'stock': trio.StrictFIFOLock(), 'option': trio.StrictFIFOLock()}
)
quoters: Dict[str, typing.Coroutine] = field(default_factory=dict)
subscriptions: Dict[str, Dict[str, set]] = field(
default_factory=partial(dict, **{'option': {}, 'stock': {}})
)
async def fan_out_to_ctxs(
pub_gen: typing.AsyncGenerator,
feed: BrokerFeed,
get_quotes: Coroutine,
topics2ctxs: Dict[str, tractor.Context],
topic_key: str = 'key',
packet_key: str = 'symbol',
rate: int = 5, # delay between quote requests
diff_cached: bool = True, # only deliver "new" quotes to the queue
) -> None:
"""Request and fan out quotes to each subscribed actor channel.
"""
broker_limit = getattr(feed.mod, '_rate_limit', float('inf'))
if broker_limit < rate:
rate = broker_limit
log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec")
def get_topics():
return tuple(topics2ctxs.keys())
async for published in pub_gen(
get_topics,
get_quotes,
feed.mod,
rate,
diff_cached=diff_cached,
):
ctx_payloads = {}
for packet_key, data in published.items():
# grab each suscription topic using provided key for lookup
topic = data[topic_key]
# build a new dict packet for passing to multiple underlying channels
packet = {packet_key: data}
for ctx in topics2ctxs.get(topic, set()):
ctx_payloads.setdefault(ctx, {}).update(packet),
# deliver to each subscriber (fan out)
if ctx_payloads:
for ctx, payload in ctx_payloads.items():
try:
await ctx.send_yield(payload)
except (
# That's right, anything you can think of...
trio.ClosedStreamError, ConnectionResetError,
ConnectionRefusedError,
):
log.warn(f"{ctx.chan} went down?")
for ctx_set in topics2ctxs.values():
ctx_set.discard(ctx)
if not any(topics2ctxs.values()):
log.warn(f"No subs left for broker {feed.mod.name}, exiting task")
break
log.info(f"Terminating stream quoter task for {pub_gen.__name__}")
async def symbol_data(broker: str, tickers: List[str]): async def symbol_data(broker: str, tickers: List[str]):
"""Retrieve baseline symbol info from broker. """Retrieve baseline symbol info from broker.
""" """
@ -255,38 +204,6 @@ async def smoke_quote(get_quotes, tickers, broker):
########################################### ###########################################
def modify_quote_stream(broker, feed_type, symbols, ctx):
"""Absolute symbol subscription list for each quote stream.
Effectively a symbol subscription api.
"""
log.info(f"{ctx.chan} changed symbol subscription to {symbols}")
ss = tractor.current_actor().statespace
feed = ss['feeds'].get(broker)
if feed is None:
raise RuntimeError(
"`get_cached_feed()` must be called before modifying its stream"
)
symbols2ctxs = feed.subscriptions[feed_type]
# update map from each symbol to requesting client's chan
for ticker in symbols:
symbols2ctxs.setdefault(ticker, set()).add(ctx)
# remove any existing symbol subscriptions if symbol is not
# found in ``symbols``
# TODO: this can likely be factored out into the pub-sub api
for ticker in filter(
lambda ticker: ticker not in symbols, symbols2ctxs.copy()
):
ctx_set = symbols2ctxs.get(ticker)
ctx_set.discard(ctx)
if not ctx_set:
# pop empty sets which will trigger bg quoter task termination
symbols2ctxs.pop(ticker)
async def get_cached_feed( async def get_cached_feed(
brokername: str, brokername: str,
) -> BrokerFeed: ) -> BrokerFeed:
@ -317,11 +234,11 @@ async def get_cached_feed(
async def start_quote_stream( async def start_quote_stream(
ctx: tractor.Context, # marks this as a streaming func
broker: str, broker: str,
symbols: List[Any], symbols: List[Any],
feed_type: str = 'stock', feed_type: str = 'stock',
diff_cached: bool = True, diff_cached: bool = True,
ctx: tractor.Context = None,
rate: int = 3, rate: int = 3,
) -> None: ) -> None:
"""Handle per-broker quote stream subscriptions using a "lazy" pub-sub """Handle per-broker quote stream subscriptions using a "lazy" pub-sub
@ -341,7 +258,7 @@ async def start_quote_stream(
# another actor task may have already created it # another actor task may have already created it
feed = await get_cached_feed(broker) feed = await get_cached_feed(broker)
symbols2ctxs = feed.subscriptions[feed_type] symbols2ctxs = feed.subscriptions[feed_type]
task_is_dead = None packetizer = None
if feed_type == 'stock': if feed_type == 'stock':
get_quotes = feed.quoters.setdefault( get_quotes = feed.quoters.setdefault(
@ -351,8 +268,7 @@ async def start_quote_stream(
# do a smoke quote (note this mutates the input list and filters # do a smoke quote (note this mutates the input list and filters
# out bad symbols for now) # out bad symbols for now)
payload = await smoke_quote(get_quotes, symbols, broker) payload = await smoke_quote(get_quotes, symbols, broker)
# push initial smoke quote response for client initialization
await ctx.send_yield(payload)
elif feed_type == 'option': elif feed_type == 'option':
# FIXME: yeah we need maybe a more general way to specify # FIXME: yeah we need maybe a more general way to specify
# the arg signature for the option feed beasides a symbol # the arg signature for the option feed beasides a symbol
@ -361,46 +277,36 @@ async def start_quote_stream(
'option', 'option',
await feed.mod.option_quoter(feed.client, symbols) await feed.mod.option_quoter(feed.client, symbols)
) )
# packetize
payload = { payload = {
quote['symbol']: quote quote['symbol']: quote
for quote in await get_quotes(symbols) for quote in await get_quotes(symbols)
} }
def packetizer(topic, quote):
return {quote['symbol']: quote}
# push initial smoke quote response for client initialization # push initial smoke quote response for client initialization
await ctx.send_yield(payload) await ctx.send_yield(payload)
try:
# update map from each symbol to requesting client's chan
modify_quote_stream(broker, feed_type, symbols, ctx)
# prevents more then one broker feed task from spawning
lock = feed.locks.get(feed_type)
# block and let existing feed task deliver
# stream data until it is cancelled in which case
# we'll take over and spawn it again
async with lock:
# no data feeder task yet; so start one
respawn = True
while respawn:
respawn = False
log.info(f"Spawning data feed task for {feed.mod.name}")
try: try:
# unblocks when no more symbols subscriptions exist and the await stream_quotes(
# quote streamer task terminates
await fan_out_to_ctxs( # pub required kwargs
stream_quotes, task_name=feed_type,
feed, ctx=ctx,
get_quotes, topics=symbols,
symbols2ctxs, packetizer=packetizer,
# actual func args
feed=feed,
get_quotes=get_quotes,
diff_cached=diff_cached, diff_cached=diff_cached,
rate=rate, rate=rate,
) )
except trio.BrokenResourceError: log.info(
log.exception("Respawning failed data feed task") f"Terminating stream quoter task for {feed.mod.name}")
respawn = True
finally: finally:
# if we're cancelled externally unsubscribe our quote feed
modify_quote_stream(broker, feed_type, [], ctx)
# if there are truly no more subscriptions with this broker # if there are truly no more subscriptions with this broker
# drop from broker subs dict # drop from broker subs dict
if not any(symbols2ctxs.values()): if not any(symbols2ctxs.values()):