diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 82e08507..9369a73b 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -20,7 +20,6 @@ Real-time data feed machinery import time from functools import partial from dataclasses import dataclass, field -from itertools import cycle import socket import json from types import ModuleType @@ -31,7 +30,6 @@ from typing import ( Sequence ) import contextlib -from operator import itemgetter import trio import tractor @@ -182,6 +180,8 @@ async def symbol_data(broker: str, tickers: List[str]): _feeds_cache = {} + +# TODO: use the version of this from .api ? @asynccontextmanager async def get_cached_feed( brokername: str, @@ -326,6 +326,7 @@ class DataFeed: self.quote_gen = None self._symbol_data_cache: Dict[str, Any] = {} + @asynccontextmanager async def open_stream( self, symbols: Sequence[str], @@ -351,40 +352,32 @@ class DataFeed: # subscribe for tickers (this performs a possible filtering # where invalid symbols are discarded) sd = await self.portal.run( - "piker.brokers.data", - 'symbol_data', + symbol_data, broker=self.brokermod.name, tickers=symbols ) self._symbol_data_cache.update(sd) - if test: - # stream from a local test file - quote_gen = await self.portal.run( - "piker.brokers.data", - 'stream_from_file', - filename=test, - ) - else: - log.info(f"Starting new stream for {symbols}") - # start live streaming from broker daemon - quote_gen = await self.portal.run( - "piker.brokers.data", - 'start_quote_stream', - broker=self.brokermod.name, - symbols=symbols, - feed_type=feed_type, - rate=rate, - ) + log.info(f"Starting new stream for {symbols}") - # get first quotes response - log.debug(f"Waiting on first quote for {symbols}...") - quotes = {} - quotes = await quote_gen.__anext__() + # start live streaming from broker daemon + async with self.portal.open_stream_from( + start_quote_stream, + broker=self.brokermod.name, + symbols=symbols, + feed_type=feed_type, + rate=rate, + ) as quote_gen: + + # get first quotes response + log.debug(f"Waiting on first quote for {symbols}...") + quotes = {} + quotes = await quote_gen.__anext__() + + self.quote_gen = quote_gen + self.first_quotes = quotes + yield quote_gen, quotes - self.quote_gen = quote_gen - self.first_quotes = quotes - return quote_gen, quotes except Exception: if self.quote_gen: await self.quote_gen.aclose() @@ -406,8 +399,7 @@ class DataFeed: """Call a broker ``Client`` method using RPC and return result. """ return await self.portal.run( - 'piker.brokers.data', - 'call_client', + call_client, broker=self.brokermod.name, methname=method, **kwargs @@ -425,27 +417,29 @@ async def stream_to_file( """Record client side received quotes to file ``filename``. """ # an async generator instance - agen = await portal.run( - "piker.brokers.data", 'start_quote_stream', - broker=brokermod.name, symbols=tickers) + async with portal.open_stream_from( + start_quote_stream, + broker=brokermod.name, + symbols=tickers + ) as agen: - fname = filename or f'{watchlist_name}.jsonstream' - with open(fname, 'a') as f: - async for quotes in agen: - f.write(json.dumps(quotes)) - f.write('\n--\n') + fname = filename or f'{watchlist_name}.jsonstream' + with open(fname, 'a') as f: + async for quotes in agen: + f.write(json.dumps(quotes)) + f.write('\n--\n') - return fname + return fname -async def stream_from_file( - filename: str, -): - with open(filename, 'r') as quotes_file: - content = quotes_file.read() +# async def stream_from_file( +# filename: str, +# ): +# with open(filename, 'r') as quotes_file: +# content = quotes_file.read() - pkts = content.split('--')[:-1] # simulate 2 separate quote packets - payloads = [json.loads(pkt) for pkt in pkts] - for payload in cycle(payloads): - yield payload - await trio.sleep(0.3) +# pkts = content.split('--')[:-1] # simulate 2 separate quote packets +# payloads = [json.loads(pkt) for pkt in pkts] +# for payload in cycle(payloads): +# yield payload +# await trio.sleep(0.3) diff --git a/piker/ui/kivy/monitor.py b/piker/ui/kivy/monitor.py index 6c0a7736..6bb45bfd 100644 --- a/piker/ui/kivy/monitor.py +++ b/piker/ui/kivy/monitor.py @@ -179,121 +179,121 @@ async def _async_main( This is started with cli cmd `piker monitor`. ''' feed = DataFeed(portal, brokermod) - quote_gen, first_quotes = await feed.open_stream( + async with feed.open_stream( symbols, 'stock', rate=rate, test=test, - ) - first_quotes_list = list(first_quotes.copy().values()) - quotes = list(first_quotes.copy().values()) + ) as (quote_gen, first_quotes): + first_quotes_list = list(first_quotes.copy().values()) + quotes = list(first_quotes.copy().values()) - # build out UI - Window.set_title(f"monitor: {name}\t(press ? for help)") - Builder.load_string(_kv) - box = BoxLayout(orientation='vertical', spacing=0) + # build out UI + Window.set_title(f"monitor: {name}\t(press ? for help)") + Builder.load_string(_kv) + box = BoxLayout(orientation='vertical', spacing=0) - # define bid-ask "stacked" cells - # (TODO: needs some rethinking and renaming for sure) - bidasks = brokermod._stock_bidasks + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._stock_bidasks - # add header row - headers = list(first_quotes_list[0].keys()) - headers.remove('displayable') + # add header row + headers = list(first_quotes_list[0].keys()) + headers.remove('displayable') - header = Row( - {key: key for key in headers}, - headers=headers, - bidasks=bidasks, - is_header=True, - size_hint=(1, None), - ) - box.add_widget(header) - - # build table - table = TickerTable( - cols=1, - size_hint=(1, None), - ) - for ticker_record in first_quotes_list: - symbol = ticker_record['symbol'] - table.append_row( - symbol, - Row( - ticker_record, - headers=('symbol',), - bidasks=bidasks, - no_cell=('displayable',), - table=table - ) + header = Row( + {key: key for key in headers}, + headers=headers, + bidasks=bidasks, + is_header=True, + size_hint=(1, None), ) - table.last_clicked_row = next(iter(table.symbols2rows.values())) + box.add_widget(header) - # associate the col headers row with the ticker table even though - # they're technically wrapped separately in containing BoxLayout - header.table = table - - # mark the initial sorted column header as bold and underlined - sort_cell = header.get_cell(table.sort_key) - sort_cell.bold = sort_cell.underline = True - table.last_clicked_col_cell = sort_cell - - # set up a pager view for large ticker lists - table.bind(minimum_height=table.setter('height')) - - async def spawn_opts_chain(): - """Spawn an options chain UI in a new subactor. - """ - from .option_chain import _async_main - - try: - async with tractor.open_nursery() as tn: - portal = await tn.run_in_actor( - 'optschain', - _async_main, - symbol=table.last_clicked_row._last_record['symbol'], - brokername=brokermod.name, - loglevel=tractor.log.get_loglevel(), + # build table + table = TickerTable( + cols=1, + size_hint=(1, None), + ) + for ticker_record in first_quotes_list: + symbol = ticker_record['symbol'] + table.append_row( + symbol, + Row( + ticker_record, + headers=('symbol',), + bidasks=bidasks, + no_cell=('displayable',), + table=table ) - except tractor.RemoteActorError: - # don't allow option chain errors to crash this monitor - # this is, like, the most basic of resliency policies - log.exception(f"{portal.actor.name} crashed:") + ) + table.last_clicked_row = next(iter(table.symbols2rows.values())) - async with trio.open_nursery() as nursery: - pager = PagerView( - container=box, - contained=table, - nursery=nursery, - # spawn an option chain on 'o' keybinding - kbctls={('o',): spawn_opts_chain}, - ) - box.add_widget(pager) + # associate the col headers row with the ticker table even though + # they're technically wrapped separately in containing BoxLayout + header.table = table - widgets = { - 'root': box, - 'table': table, - 'box': box, - 'header': header, - 'pager': pager, - } + # mark the initial sorted column header as bold and underlined + sort_cell = header.get_cell(table.sort_key) + sort_cell.bold = sort_cell.underline = True + table.last_clicked_col_cell = sort_cell - global _widgets - _widgets = widgets + # set up a pager view for large ticker lists + table.bind(minimum_height=table.setter('height')) - nursery.start_soon( - update_quotes, - nursery, - brokermod.format_stock_quote, - widgets, - quote_gen, - feed._symbol_data_cache, - quotes - ) - try: - await async_runTouchApp(widgets['root']) - finally: - # cancel remote data feed task - await quote_gen.aclose() - # cancel GUI update task - nursery.cancel_scope.cancel() + async def spawn_opts_chain(): + """Spawn an options chain UI in a new subactor. + """ + from .option_chain import _async_main + + try: + async with tractor.open_nursery() as tn: + portal = await tn.run_in_actor( + 'optschain', + _async_main, + symbol=table.last_clicked_row._last_record['symbol'], + brokername=brokermod.name, + loglevel=tractor.log.get_loglevel(), + ) + except tractor.RemoteActorError: + # don't allow option chain errors to crash this monitor + # this is, like, the most basic of resliency policies + log.exception(f"{portal.actor.name} crashed:") + + async with trio.open_nursery() as nursery: + pager = PagerView( + container=box, + contained=table, + nursery=nursery, + # spawn an option chain on 'o' keybinding + kbctls={('o',): spawn_opts_chain}, + ) + box.add_widget(pager) + + widgets = { + 'root': box, + 'table': table, + 'box': box, + 'header': header, + 'pager': pager, + } + + global _widgets + _widgets = widgets + + nursery.start_soon( + update_quotes, + nursery, + brokermod.format_stock_quote, + widgets, + quote_gen, + feed._symbol_data_cache, + quotes + ) + try: + await async_runTouchApp(widgets['root']) + finally: + # cancel remote data feed task + await quote_gen.aclose() + # cancel GUI update task + nursery.cancel_scope.cancel()