Port kivy monitor to new tractor stream api

tractor_open_stream_from
Tyler Goodlet 2021-04-29 08:36:55 -04:00
parent a89da98141
commit 3375735914
2 changed files with 147 additions and 153 deletions

View File

@ -20,7 +20,6 @@ Real-time data feed machinery
import time import time
from functools import partial from functools import partial
from dataclasses import dataclass, field from dataclasses import dataclass, field
from itertools import cycle
import socket import socket
import json import json
from types import ModuleType from types import ModuleType
@ -31,7 +30,6 @@ from typing import (
Sequence Sequence
) )
import contextlib import contextlib
from operator import itemgetter
import trio import trio
import tractor import tractor
@ -182,6 +180,8 @@ async def symbol_data(broker: str, tickers: List[str]):
_feeds_cache = {} _feeds_cache = {}
# TODO: use the version of this from .api ?
@asynccontextmanager @asynccontextmanager
async def get_cached_feed( async def get_cached_feed(
brokername: str, brokername: str,
@ -326,6 +326,7 @@ class DataFeed:
self.quote_gen = None self.quote_gen = None
self._symbol_data_cache: Dict[str, Any] = {} self._symbol_data_cache: Dict[str, Any] = {}
@asynccontextmanager
async def open_stream( async def open_stream(
self, self,
symbols: Sequence[str], symbols: Sequence[str],
@ -351,31 +352,22 @@ class DataFeed:
# subscribe for tickers (this performs a possible filtering # subscribe for tickers (this performs a possible filtering
# where invalid symbols are discarded) # where invalid symbols are discarded)
sd = await self.portal.run( sd = await self.portal.run(
"piker.brokers.data", symbol_data,
'symbol_data',
broker=self.brokermod.name, broker=self.brokermod.name,
tickers=symbols tickers=symbols
) )
self._symbol_data_cache.update(sd) self._symbol_data_cache.update(sd)
if test:
# stream from a local test file
quote_gen = await self.portal.run(
"piker.brokers.data",
'stream_from_file',
filename=test,
)
else:
log.info(f"Starting new stream for {symbols}") log.info(f"Starting new stream for {symbols}")
# start live streaming from broker daemon # start live streaming from broker daemon
quote_gen = await self.portal.run( async with self.portal.open_stream_from(
"piker.brokers.data", start_quote_stream,
'start_quote_stream',
broker=self.brokermod.name, broker=self.brokermod.name,
symbols=symbols, symbols=symbols,
feed_type=feed_type, feed_type=feed_type,
rate=rate, rate=rate,
) ) as quote_gen:
# get first quotes response # get first quotes response
log.debug(f"Waiting on first quote for {symbols}...") log.debug(f"Waiting on first quote for {symbols}...")
@ -384,7 +376,8 @@ class DataFeed:
self.quote_gen = quote_gen self.quote_gen = quote_gen
self.first_quotes = quotes self.first_quotes = quotes
return quote_gen, quotes yield quote_gen, quotes
except Exception: except Exception:
if self.quote_gen: if self.quote_gen:
await self.quote_gen.aclose() await self.quote_gen.aclose()
@ -406,8 +399,7 @@ class DataFeed:
"""Call a broker ``Client`` method using RPC and return result. """Call a broker ``Client`` method using RPC and return result.
""" """
return await self.portal.run( return await self.portal.run(
'piker.brokers.data', call_client,
'call_client',
broker=self.brokermod.name, broker=self.brokermod.name,
methname=method, methname=method,
**kwargs **kwargs
@ -425,9 +417,11 @@ async def stream_to_file(
"""Record client side received quotes to file ``filename``. """Record client side received quotes to file ``filename``.
""" """
# an async generator instance # an async generator instance
agen = await portal.run( async with portal.open_stream_from(
"piker.brokers.data", 'start_quote_stream', start_quote_stream,
broker=brokermod.name, symbols=tickers) broker=brokermod.name,
symbols=tickers
) as agen:
fname = filename or f'{watchlist_name}.jsonstream' fname = filename or f'{watchlist_name}.jsonstream'
with open(fname, 'a') as f: with open(fname, 'a') as f:
@ -438,14 +432,14 @@ async def stream_to_file(
return fname return fname
async def stream_from_file( # async def stream_from_file(
filename: str, # filename: str,
): # ):
with open(filename, 'r') as quotes_file: # with open(filename, 'r') as quotes_file:
content = quotes_file.read() # content = quotes_file.read()
pkts = content.split('--')[:-1] # simulate 2 separate quote packets # pkts = content.split('--')[:-1] # simulate 2 separate quote packets
payloads = [json.loads(pkt) for pkt in pkts] # payloads = [json.loads(pkt) for pkt in pkts]
for payload in cycle(payloads): # for payload in cycle(payloads):
yield payload # yield payload
await trio.sleep(0.3) # await trio.sleep(0.3)

View File

@ -179,12 +179,12 @@ async def _async_main(
This is started with cli cmd `piker monitor`. This is started with cli cmd `piker monitor`.
''' '''
feed = DataFeed(portal, brokermod) feed = DataFeed(portal, brokermod)
quote_gen, first_quotes = await feed.open_stream( async with feed.open_stream(
symbols, symbols,
'stock', 'stock',
rate=rate, rate=rate,
test=test, test=test,
) ) as (quote_gen, first_quotes):
first_quotes_list = list(first_quotes.copy().values()) first_quotes_list = list(first_quotes.copy().values())
quotes = list(first_quotes.copy().values()) quotes = list(first_quotes.copy().values())