diff --git a/piker/brokers/data.py b/piker/brokers/data.py index b6e5ab6a..b9431654 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -108,6 +108,9 @@ async def stream_quotes( await trio.sleep(delay) +# TODO: at this point probably just just make this a class and +# a lot of these functions should be methods. It will definitely +# make stateful UI apps easier to implement class DataFeed(typing.NamedTuple): """A per broker "data feed" container. @@ -118,7 +121,7 @@ class DataFeed(typing.NamedTuple): client: object exit_stack: contextlib.AsyncExitStack quoter_keys: List[str] = ['stock', 'option'] - tasks: Dict[str, trio._core._run.Task] = dict.fromkeys( + tasks: Dict[str, trio.Event] = dict.fromkeys( quoter_keys, False) quoters: Dict[str, typing.Coroutine] = {} subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}} @@ -234,13 +237,19 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### -async def modify_quote_stream(broker, feed_type, symbols, chan=None, cid=None): +def modify_quote_stream(broker, feed_type, symbols, chan, cid): """Absolute symbol subscription list for each quote stream. Effectively a symbol subscription api. """ log.info(f"{chan} changed symbol subscription to {symbols}") - feed = await get_cached_feed(broker) + ss = tractor.current_actor().statespace + feed = ss['feeds'].get(broker) + if feed is None: + raise RuntimeError( + "`get_cached_feed()` must be called before modifying its stream" + ) + symbols2chans = feed.subscriptions[feed_type] # update map from each symbol to requesting client's chan for ticker in symbols: @@ -299,6 +308,7 @@ async def start_quote_stream( diff_cached: bool = True, chan: tractor.Channel = None, cid: str = None, + rate: int = 3, ) -> None: """Handle per-broker quote stream subscriptions using a "lazy" pub-sub pattern. @@ -311,7 +321,6 @@ async def start_quote_stream( # set log level after fork get_console_log(actor.loglevel) # pull global vars from local actor - ss = actor.statespace symbols = list(symbols) log.info( f"{chan.uid} subscribed to {broker} for symbols {symbols}") @@ -337,38 +346,65 @@ async def start_quote_stream( 'option', await feed.mod.option_quoter(feed.client, symbols) ) - - # update map from each symbol to requesting client's chan - await modify_quote_stream(broker, feed_type, symbols, chan, cid) - try: - if not feed.tasks.get(feed_type): - # no data feeder task yet; so start one - respawn = True + # update map from each symbol to requesting client's chan + modify_quote_stream(broker, feed_type, symbols, chan, cid) + + # event indicating that task was started and then killed + task_is_dead = feed.tasks.get(feed_type) + if task_is_dead is False: + task_is_dead = trio.Event() + task_is_dead.set() + feed.tasks[feed_type] = task_is_dead + + if not task_is_dead.is_set(): + # block and let existing feed task deliver + # stream data until it is cancelled in which case + # we'll take over and spawn it again + await task_is_dead.wait() + # client channel was likely disconnected + # but we still want to keep the broker task + # alive if there are other consumers (including + # ourselves) + if any(symbols2chans.values()): + log.warn( + f"Data feed task for {feed.mod.name} was cancelled but" + f" there are still active clients, respawning") + + # no data feeder task yet; so start one + respawn = True + while respawn: + respawn = False log.info(f"Spawning data feed task for {feed.mod.name}") - while respawn: - respawn = False - try: - async with trio.open_nursery() as nursery: - nursery.start_soon( - partial( - fan_out_to_chans, feed, get_quotes, - symbols2chans, - diff_cached=diff_cached, - cid=cid - ) + try: + async with trio.open_nursery() as nursery: + nursery.start_soon( + partial( + fan_out_to_chans, feed, get_quotes, + symbols2chans, + diff_cached=diff_cached, + cid=cid, + rate=rate, ) - feed.tasks[feed_type] = True - except trio.BrokenResourceError: - log.exception("Respawning failed data feed task") - respawn = True - # unblocks when no more symbols subscriptions exist and the - # quote streamer task terminates (usually because another call - # was made to `modify_quoter` to unsubscribe from streaming - # symbols) + ) + # it's alive! + task_is_dead.clear() + + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True + + # unblocks when no more symbols subscriptions exist and the + # quote streamer task terminates (usually because another call + # was made to `modify_quoter` to unsubscribe from streaming + # symbols) finally: log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}") - feed.tasks.pop(feed_type) + task_is_dead.set() + + # if we're cancelled externally unsubscribe our quote feed + modify_quote_stream(broker, feed_type, [], chan, cid) + # if there are truly no more subscriptions with this broker # drop from broker subs dict if not any(symbols2chans.values()):