Merge pull request #64 from pikers/data_feed_reorg

Data feed reorg
kivy_mainline_and_py3.8
goodboy 2019-01-12 11:47:21 -05:00 committed by GitHub
commit 78dced3091
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 103 additions and 100 deletions

View File

@ -112,11 +112,14 @@ async def stream_quotes(
# TODO: at this point probably just just make this a class and # TODO: at this point probably just just make this a class and
# a lot of these functions should be methods. It will definitely # a lot of these functions should be methods. It will definitely
# make stateful UI apps easier to implement # make stateful UI apps easier to implement
class DataFeed(typing.NamedTuple): class BrokerFeed(typing.NamedTuple):
"""A per broker "data feed" container. """A per broker "client feed" container.
A structure to keep track of components used by A structure to keep track of components used by
real-time data daemons. real-time data daemons. This is a backend "client" which pulls
data from broker specific data lakes:
``DataFeed`` <- tractor -> ``BrokerFeed`` <- broker IPC -> broker API
""" """
mod: ModuleType mod: ModuleType
client: object client: object
@ -129,7 +132,7 @@ class DataFeed(typing.NamedTuple):
async def fan_out_to_chans( async def fan_out_to_chans(
feed: DataFeed, feed: BrokerFeed,
get_quotes: Coroutine, get_quotes: Coroutine,
symbols2chans: Dict[str, tractor.Channel], symbols2chans: Dict[str, tractor.Channel],
rate: int = 5, # delay between quote requests rate: int = 5, # delay between quote requests
@ -275,8 +278,8 @@ def modify_quote_stream(broker, feed_type, symbols, chan, cid):
async def get_cached_feed( async def get_cached_feed(
brokername: str, brokername: str,
) -> DataFeed: ) -> BrokerFeed:
"""Get/create a ``DataFeed`` from/in the current actor. """Get/create a ``BrokerFeed`` from/in the current actor.
""" """
# check if a cached client is in the local actor's statespace # check if a cached client is in the local actor's statespace
ss = tractor.current_actor().statespace ss = tractor.current_actor().statespace
@ -293,7 +296,7 @@ async def get_cached_feed(
exit_stack = contextlib.AsyncExitStack() exit_stack = contextlib.AsyncExitStack()
client = await exit_stack.enter_async_context( client = await exit_stack.enter_async_context(
brokermod.get_client()) brokermod.get_client())
feed = DataFeed( feed = BrokerFeed(
mod=brokermod, mod=brokermod,
client=client, client=client,
exit_stack=exit_stack, exit_stack=exit_stack,
@ -422,6 +425,89 @@ async def start_quote_stream(
await feed.exit_stack.aclose() await feed.exit_stack.aclose()
class DataFeed(object):
"""Data feed client for streaming symbol data from a (remote)
``brokerd`` data daemon.
"""
_allowed = ('stock', 'option')
def __init__(self, portal, brokermod):
self.portal = portal
self.brokermod = brokermod
self._quote_type = None
self._symbols = None
self.quote_gen = None
self._mutex = trio.StrictFIFOLock()
self._symbol_data_cache: Dict[str, Any] = {}
async def open_stream(self, symbols, feed_type, rate=1, test=None):
if feed_type not in self._allowed:
raise ValueError(f"Only feed types {self._allowed} are supported")
self._quote_type = feed_type
async with self._mutex:
try:
if self.quote_gen is not None and symbols != self._symbols:
log.info(
f"Stopping existing subscription for {self._symbols}")
await self.quote_gen.aclose()
self._symbols = symbols
if feed_type == 'stock' and not (
all(symbol in self._symbol_data_cache
for symbol in symbols)
):
# subscribe for tickers (this performs a possible filtering
# where invalid symbols are discarded)
sd = await self.portal.run(
"piker.brokers.data", 'symbol_data',
broker=self.brokermod.name, tickers=symbols)
self._symbol_data_cache.update(sd)
if test:
# stream from a local test file
quote_gen = await self.portal.run(
"piker.brokers.data", 'stream_from_file',
filename=test
)
else:
log.info(f"Starting new stream for {self._symbols}")
# start live streaming from broker daemon
quote_gen = await self.portal.run(
"piker.brokers.data",
'start_quote_stream',
broker=self.brokermod.name,
symbols=symbols,
feed_type=feed_type,
rate=rate,
)
# get first quotes response
log.debug(f"Waiting on first quote for {symbols}...")
quotes = {}
# with trio.move_on_after(5):
quotes = await quote_gen.__anext__()
self.quote_gen = quote_gen
self.first_quotes = quotes
return quote_gen, quotes
except Exception:
if self.quote_gen:
await self.quote_gen.aclose()
self.quote_gen = None
raise
def format_quotes(self, quotes, symbol_data={}):
self._symbol_data_cache.update(symbol_data)
formatter = getattr(self.brokermod, f'format_{self._quote_type}_quote')
records, displayables = zip(*[
formatter(quote, self._symbol_data_cache)
for quote in quotes.values()
])
return records, displayables
async def stream_to_file( async def stream_to_file(
watchlist_name: str, watchlist_name: str,
filename: str, filename: str,

View File

@ -15,6 +15,7 @@ from kivy.lang import Builder
from kivy.app import async_runTouchApp from kivy.app import async_runTouchApp
from kivy.core.window import Window from kivy.core.window import Window
from ..brokers.data import DataFeed
from .tabular import ( from .tabular import (
Row, TickerTable, _kv, _black_rgba, colorcode, Row, TickerTable, _kv, _black_rgba, colorcode,
) )
@ -170,37 +171,12 @@ async def _async_main(
This is started with cli cmd `piker monitor`. This is started with cli cmd `piker monitor`.
''' '''
if test: feed = DataFeed(portal, brokermod)
# stream from a local test file
quote_gen = await portal.run(
"piker.brokers.data", 'stream_from_file',
filename=test
)
# TODO: need a set of test packets to make this work
# seriously fu QT
# sd = {}
else:
# start live streaming from broker daemon
quote_gen = await portal.run(
"piker.brokers.data",
'start_quote_stream',
broker=brokermod.name,
symbols=tickers,
rate=3,
)
# subscribe for tickers (this performs a possible filtering quote_gen, quotes = await feed.open_stream(
# where invalid symbols are discarded) tickers, 'stock', rate=rate)
sd = await portal.run(
"piker.brokers.data", 'symbol_data',
broker=brokermod.name, tickers=tickers)
# get first quotes response first_quotes, _ = feed.format_quotes(quotes)
log.debug("Waiting on first quote...")
quotes = await quote_gen.__anext__()
first_quotes = [
brokermod.format_stock_quote(quote, symbol_data=sd)[0]
for quote in quotes.values()]
if first_quotes[0].get('last') is None: if first_quotes[0].get('last') is None:
log.error("Broker API is down temporarily") log.error("Broker API is down temporarily")
@ -274,7 +250,7 @@ async def _async_main(
brokermod.format_stock_quote, brokermod.format_stock_quote,
widgets, widgets,
quote_gen, quote_gen,
sd, feed._symbol_data_cache,
quotes quotes
) )

View File

@ -18,6 +18,7 @@ from kivy.uix.label import Label
from ..log import get_logger from ..log import get_logger
from ..brokers.core import contracts from ..brokers.core import contracts
from ..brokers.data import DataFeed
from .pager import PagerView from .pager import PagerView
from .tabular import Row, HeaderCell, Cell, TickerTable from .tabular import Row, HeaderCell, Cell, TickerTable
@ -146,67 +147,6 @@ class ExpiryButton(Cell):
self.chain.start_displaying(self.chain.symbol, self.key) self.chain.start_displaying(self.chain.symbol, self.key)
class DataFeed(object):
"""Data feed client for streaming symbol data from a (remote)
``brokerd`` data daemon.
"""
def __init__(self, portal, brokermod):
self.portal = portal
self.brokermod = brokermod
self._symbols = None
self.quote_gen = None
self._mutex = trio.StrictFIFOLock()
async def open_stream(self, symbols, rate=1, test=None):
async with self._mutex:
try:
if self.quote_gen is not None and symbols != self._symbols:
log.info(
f"Stopping existing subscription for {self._symbols}")
await self.quote_gen.aclose()
self._symbols = symbols
if test:
# stream from a local test file
quote_gen = await self.portal.run(
"piker.brokers.data", 'stream_from_file',
filename=test
)
else:
log.info(f"Starting new stream for {self._symbols}")
# start live streaming from broker daemon
quote_gen = await self.portal.run(
"piker.brokers.data",
'start_quote_stream',
broker=self.brokermod.name,
symbols=symbols,
feed_type='option',
rate=rate,
)
# get first quotes response
log.debug(f"Waiting on first quote for {symbols}...")
quotes = {}
# with trio.move_on_after(5):
quotes = await quote_gen.__anext__()
self.quote_gen = quote_gen
self.first_quotes = quotes
return quote_gen, quotes
except Exception:
if self.quote_gen:
await self.quote_gen.aclose()
self.quote_gen = None
raise
def format_quotes(self, quotes):
records, displayables = zip(*[
self.brokermod.format_option_quote(quote, {})
for quote in quotes.values()
])
return records, displayables
@asynccontextmanager @asynccontextmanager
async def find_local_monitor(): async def find_local_monitor():
"""Establish a portal to a local monitor for triggering """Establish a portal to a local monitor for triggering
@ -276,7 +216,7 @@ class OptionChain(object):
self._parent_nursery = nursery self._parent_nursery = nursery
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
self._nursery = n self._nursery = n
# fill out and start updatingn strike table # fill out and start updating strike table
n.start_soon( n.start_soon(
partial(self._start_displaying, symbol, expiry=expiry) partial(self._start_displaying, symbol, expiry=expiry)
) )
@ -450,7 +390,8 @@ class OptionChain(object):
log.debug(f"Waiting on first_quotes for {symbol}:{expiry}") log.debug(f"Waiting on first_quotes for {symbol}:{expiry}")
self._quote_gen, first_quotes = await self.feed.open_stream( self._quote_gen, first_quotes = await self.feed.open_stream(
[(symbol, expiry)] [(symbol, expiry)],
'option',
) )
log.debug(f"Got first_quotes for {symbol}:{expiry}") log.debug(f"Got first_quotes for {symbol}:{expiry}")
records, displayables = self.feed.format_quotes(first_quotes) records, displayables = self.feed.format_quotes(first_quotes)