Call start_quote_stream() from monitor main

kivy_mainline_and_py3.8
Tyler Goodlet 2018-11-30 08:17:54 -05:00
parent c2ec4800d6
commit 288ea604af
1 changed files with 19 additions and 4 deletions

View File

@ -19,6 +19,7 @@ from kivy.lang import Builder
from kivy import utils from kivy import utils
from kivy.app import async_runTouchApp from kivy.app import async_runTouchApp
from kivy.core.window import Window from kivy.core.window import Window
from async_generator import aclosing
from ..log import get_logger from ..log import get_logger
from .pager import PagerView from .pager import PagerView
@ -513,13 +514,24 @@ async def _async_main(
tickers: List[str], tickers: List[str],
brokermod: ModuleType, brokermod: ModuleType,
rate: int, rate: int,
# an async generator instance which yields quotes dict packets test: bool = False
quote_gen: AsyncGeneratorType,
) -> None: ) -> None:
'''Launch kivy app + all other related tasks. '''Launch kivy app + all other related tasks.
This is started with cli cmd `piker monitor`. This is started with cli cmd `piker monitor`.
''' '''
if test:
# stream from a local test file
quote_gen = await portal.run(
"piker.brokers.data", 'stream_from_file',
filename=test
)
else:
# start live streaming from broker daemon
quote_gen = await portal.run(
"piker.brokers.data", 'start_quote_stream',
broker=brokermod.name, symbols=tickers)
# subscribe for tickers (this performs a possible filtering # subscribe for tickers (this performs a possible filtering
# where invalid symbols are discarded) # where invalid symbols are discarded)
sd = await portal.run( sd = await portal.run(
@ -594,14 +606,17 @@ async def _async_main(
try: try:
# Trio-kivy entry point. # Trio-kivy entry point.
await async_runTouchApp(widgets['root']) # run kivy await async_runTouchApp(widgets['root']) # run kivy
await quote_gen.aclose() # cancel aysnc gen call
finally: finally:
await quote_gen.aclose() # cancel aysnc gen call
# un-subscribe from symbols stream (cancel if brokerd # un-subscribe from symbols stream (cancel if brokerd
# was already torn down - say by SIGINT) # was already torn down - say by SIGINT)
with trio.move_on_after(0.2): with trio.move_on_after(0.2):
await portal.run( await portal.run(
"piker.brokers.data", 'modify_quote_stream', "piker.brokers.data", 'modify_quote_stream',
broker=brokermod.name, tickers=[]) broker=brokermod.name,
feed_type='stock',
tickers=[]
)
# cancel GUI update task # cancel GUI update task
nursery.cancel_scope.cancel() nursery.cancel_scope.cancel()