Use a `trio.Event` to guarantee respawning of data feed task
parent
01c0551a7f
commit
66ecb4c0cb
|
@ -108,6 +108,9 @@ async def stream_quotes(
|
||||||
await trio.sleep(delay)
|
await trio.sleep(delay)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: at this point probably just just make this a class and
|
||||||
|
# a lot of these functions should be methods. It will definitely
|
||||||
|
# make stateful UI apps easier to implement
|
||||||
class DataFeed(typing.NamedTuple):
|
class DataFeed(typing.NamedTuple):
|
||||||
"""A per broker "data feed" container.
|
"""A per broker "data feed" container.
|
||||||
|
|
||||||
|
@ -118,7 +121,7 @@ class DataFeed(typing.NamedTuple):
|
||||||
client: object
|
client: object
|
||||||
exit_stack: contextlib.AsyncExitStack
|
exit_stack: contextlib.AsyncExitStack
|
||||||
quoter_keys: List[str] = ['stock', 'option']
|
quoter_keys: List[str] = ['stock', 'option']
|
||||||
tasks: Dict[str, trio._core._run.Task] = dict.fromkeys(
|
tasks: Dict[str, trio.Event] = dict.fromkeys(
|
||||||
quoter_keys, False)
|
quoter_keys, False)
|
||||||
quoters: Dict[str, typing.Coroutine] = {}
|
quoters: Dict[str, typing.Coroutine] = {}
|
||||||
subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}}
|
subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}}
|
||||||
|
@ -234,13 +237,19 @@ async def smoke_quote(get_quotes, tickers, broker):
|
||||||
###########################################
|
###########################################
|
||||||
|
|
||||||
|
|
||||||
async def modify_quote_stream(broker, feed_type, symbols, chan=None, cid=None):
|
def modify_quote_stream(broker, feed_type, symbols, chan, cid):
|
||||||
"""Absolute symbol subscription list for each quote stream.
|
"""Absolute symbol subscription list for each quote stream.
|
||||||
|
|
||||||
Effectively a symbol subscription api.
|
Effectively a symbol subscription api.
|
||||||
"""
|
"""
|
||||||
log.info(f"{chan} changed symbol subscription to {symbols}")
|
log.info(f"{chan} changed symbol subscription to {symbols}")
|
||||||
feed = await get_cached_feed(broker)
|
ss = tractor.current_actor().statespace
|
||||||
|
feed = ss['feeds'].get(broker)
|
||||||
|
if feed is None:
|
||||||
|
raise RuntimeError(
|
||||||
|
"`get_cached_feed()` must be called before modifying its stream"
|
||||||
|
)
|
||||||
|
|
||||||
symbols2chans = feed.subscriptions[feed_type]
|
symbols2chans = feed.subscriptions[feed_type]
|
||||||
# update map from each symbol to requesting client's chan
|
# update map from each symbol to requesting client's chan
|
||||||
for ticker in symbols:
|
for ticker in symbols:
|
||||||
|
@ -299,6 +308,7 @@ async def start_quote_stream(
|
||||||
diff_cached: bool = True,
|
diff_cached: bool = True,
|
||||||
chan: tractor.Channel = None,
|
chan: tractor.Channel = None,
|
||||||
cid: str = None,
|
cid: str = None,
|
||||||
|
rate: int = 3,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Handle per-broker quote stream subscriptions using a "lazy" pub-sub
|
"""Handle per-broker quote stream subscriptions using a "lazy" pub-sub
|
||||||
pattern.
|
pattern.
|
||||||
|
@ -311,7 +321,6 @@ async def start_quote_stream(
|
||||||
# set log level after fork
|
# set log level after fork
|
||||||
get_console_log(actor.loglevel)
|
get_console_log(actor.loglevel)
|
||||||
# pull global vars from local actor
|
# pull global vars from local actor
|
||||||
ss = actor.statespace
|
|
||||||
symbols = list(symbols)
|
symbols = list(symbols)
|
||||||
log.info(
|
log.info(
|
||||||
f"{chan.uid} subscribed to {broker} for symbols {symbols}")
|
f"{chan.uid} subscribed to {broker} for symbols {symbols}")
|
||||||
|
@ -337,17 +346,36 @@ async def start_quote_stream(
|
||||||
'option',
|
'option',
|
||||||
await feed.mod.option_quoter(feed.client, symbols)
|
await feed.mod.option_quoter(feed.client, symbols)
|
||||||
)
|
)
|
||||||
|
|
||||||
# update map from each symbol to requesting client's chan
|
|
||||||
await modify_quote_stream(broker, feed_type, symbols, chan, cid)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not feed.tasks.get(feed_type):
|
# update map from each symbol to requesting client's chan
|
||||||
|
modify_quote_stream(broker, feed_type, symbols, chan, cid)
|
||||||
|
|
||||||
|
# event indicating that task was started and then killed
|
||||||
|
task_is_dead = feed.tasks.get(feed_type)
|
||||||
|
if task_is_dead is False:
|
||||||
|
task_is_dead = trio.Event()
|
||||||
|
task_is_dead.set()
|
||||||
|
feed.tasks[feed_type] = task_is_dead
|
||||||
|
|
||||||
|
if not task_is_dead.is_set():
|
||||||
|
# block and let existing feed task deliver
|
||||||
|
# stream data until it is cancelled in which case
|
||||||
|
# we'll take over and spawn it again
|
||||||
|
await task_is_dead.wait()
|
||||||
|
# client channel was likely disconnected
|
||||||
|
# but we still want to keep the broker task
|
||||||
|
# alive if there are other consumers (including
|
||||||
|
# ourselves)
|
||||||
|
if any(symbols2chans.values()):
|
||||||
|
log.warn(
|
||||||
|
f"Data feed task for {feed.mod.name} was cancelled but"
|
||||||
|
f" there are still active clients, respawning")
|
||||||
|
|
||||||
# no data feeder task yet; so start one
|
# no data feeder task yet; so start one
|
||||||
respawn = True
|
respawn = True
|
||||||
log.info(f"Spawning data feed task for {feed.mod.name}")
|
|
||||||
while respawn:
|
while respawn:
|
||||||
respawn = False
|
respawn = False
|
||||||
|
log.info(f"Spawning data feed task for {feed.mod.name}")
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
nursery.start_soon(
|
nursery.start_soon(
|
||||||
|
@ -355,20 +383,28 @@ async def start_quote_stream(
|
||||||
fan_out_to_chans, feed, get_quotes,
|
fan_out_to_chans, feed, get_quotes,
|
||||||
symbols2chans,
|
symbols2chans,
|
||||||
diff_cached=diff_cached,
|
diff_cached=diff_cached,
|
||||||
cid=cid
|
cid=cid,
|
||||||
|
rate=rate,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
feed.tasks[feed_type] = True
|
# it's alive!
|
||||||
|
task_is_dead.clear()
|
||||||
|
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
log.exception("Respawning failed data feed task")
|
log.exception("Respawning failed data feed task")
|
||||||
respawn = True
|
respawn = True
|
||||||
|
|
||||||
# unblocks when no more symbols subscriptions exist and the
|
# unblocks when no more symbols subscriptions exist and the
|
||||||
# quote streamer task terminates (usually because another call
|
# quote streamer task terminates (usually because another call
|
||||||
# was made to `modify_quoter` to unsubscribe from streaming
|
# was made to `modify_quoter` to unsubscribe from streaming
|
||||||
# symbols)
|
# symbols)
|
||||||
finally:
|
finally:
|
||||||
log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}")
|
log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}")
|
||||||
feed.tasks.pop(feed_type)
|
task_is_dead.set()
|
||||||
|
|
||||||
|
# if we're cancelled externally unsubscribe our quote feed
|
||||||
|
modify_quote_stream(broker, feed_type, [], chan, cid)
|
||||||
|
|
||||||
# if there are truly no more subscriptions with this broker
|
# if there are truly no more subscriptions with this broker
|
||||||
# drop from broker subs dict
|
# drop from broker subs dict
|
||||||
if not any(symbols2chans.values()):
|
if not any(symbols2chans.values()):
|
||||||
|
|
Loading…
Reference in New Issue