Port monitor app to `DataFeed` api
							parent
							
								
									a4501bb0e0
								
							
						
					
					
						commit
						36d0c2ed68
					
				| 
						 | 
					@ -15,6 +15,7 @@ from kivy.lang import Builder
 | 
				
			||||||
from kivy.app import async_runTouchApp
 | 
					from kivy.app import async_runTouchApp
 | 
				
			||||||
from kivy.core.window import Window
 | 
					from kivy.core.window import Window
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from ..brokers.data import DataFeed
 | 
				
			||||||
from .tabular import (
 | 
					from .tabular import (
 | 
				
			||||||
    Row, TickerTable, _kv, _black_rgba, colorcode,
 | 
					    Row, TickerTable, _kv, _black_rgba, colorcode,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
| 
						 | 
					@ -170,37 +171,12 @@ async def _async_main(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    This is started with cli cmd `piker monitor`.
 | 
					    This is started with cli cmd `piker monitor`.
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    if test:
 | 
					    feed = DataFeed(portal, brokermod)
 | 
				
			||||||
        # stream from a local test file
 | 
					 | 
				
			||||||
        quote_gen = await portal.run(
 | 
					 | 
				
			||||||
            "piker.brokers.data", 'stream_from_file',
 | 
					 | 
				
			||||||
            filename=test
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        # TODO: need a set of test packets to make this work
 | 
					 | 
				
			||||||
        # seriously fu QT
 | 
					 | 
				
			||||||
        # sd = {}
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        # start live streaming from broker daemon
 | 
					 | 
				
			||||||
        quote_gen = await portal.run(
 | 
					 | 
				
			||||||
            "piker.brokers.data",
 | 
					 | 
				
			||||||
            'start_quote_stream',
 | 
					 | 
				
			||||||
            broker=brokermod.name,
 | 
					 | 
				
			||||||
            symbols=tickers,
 | 
					 | 
				
			||||||
            rate=3,
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # subscribe for tickers (this performs a possible filtering
 | 
					    quote_gen, quotes = await feed.open_stream(
 | 
				
			||||||
    # where invalid symbols are discarded)
 | 
					        tickers, 'stock', rate=rate)
 | 
				
			||||||
    sd = await portal.run(
 | 
					 | 
				
			||||||
        "piker.brokers.data", 'symbol_data',
 | 
					 | 
				
			||||||
        broker=brokermod.name, tickers=tickers)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # get first quotes response
 | 
					    first_quotes, _ = feed.format_quotes(quotes)
 | 
				
			||||||
    log.debug("Waiting on first quote...")
 | 
					 | 
				
			||||||
    quotes = await quote_gen.__anext__()
 | 
					 | 
				
			||||||
    first_quotes = [
 | 
					 | 
				
			||||||
        brokermod.format_stock_quote(quote, symbol_data=sd)[0]
 | 
					 | 
				
			||||||
        for quote in quotes.values()]
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if first_quotes[0].get('last') is None:
 | 
					    if first_quotes[0].get('last') is None:
 | 
				
			||||||
        log.error("Broker API is down temporarily")
 | 
					        log.error("Broker API is down temporarily")
 | 
				
			||||||
| 
						 | 
					@ -274,7 +250,7 @@ async def _async_main(
 | 
				
			||||||
                brokermod.format_stock_quote,
 | 
					                brokermod.format_stock_quote,
 | 
				
			||||||
                widgets,
 | 
					                widgets,
 | 
				
			||||||
                quote_gen,
 | 
					                quote_gen,
 | 
				
			||||||
                sd,
 | 
					                feed._symbol_data_cache,
 | 
				
			||||||
                quotes
 | 
					                quotes
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue