Factor `DataFeed` client API into `brokers.data`

kivy_mainline_and_py3.8
Tyler Goodlet 2019-01-05 19:05:39 -05:00
parent e69f0b286c
commit a4501bb0e0
2 changed files with 97 additions and 70 deletions

View File

@ -112,11 +112,14 @@ async def stream_quotes(
# TODO: at this point probably just just make this a class and # TODO: at this point probably just just make this a class and
# a lot of these functions should be methods. It will definitely # a lot of these functions should be methods. It will definitely
# make stateful UI apps easier to implement # make stateful UI apps easier to implement
class DataFeed(typing.NamedTuple): class BrokerFeed(typing.NamedTuple):
"""A per broker "data feed" container. """A per broker "client feed" container.
A structure to keep track of components used by A structure to keep track of components used by
real-time data daemons. real-time data daemons. This is a backend "client" which pulls
data from broker specific data lakes:
``DataFeed`` <- tractor -> ``BrokerFeed`` <- broker IPC -> broker API
""" """
mod: ModuleType mod: ModuleType
client: object client: object
@ -129,7 +132,7 @@ class DataFeed(typing.NamedTuple):
async def fan_out_to_chans( async def fan_out_to_chans(
feed: DataFeed, feed: BrokerFeed,
get_quotes: Coroutine, get_quotes: Coroutine,
symbols2chans: Dict[str, tractor.Channel], symbols2chans: Dict[str, tractor.Channel],
rate: int = 5, # delay between quote requests rate: int = 5, # delay between quote requests
@ -275,8 +278,8 @@ def modify_quote_stream(broker, feed_type, symbols, chan, cid):
async def get_cached_feed( async def get_cached_feed(
brokername: str, brokername: str,
) -> DataFeed: ) -> BrokerFeed:
"""Get/create a ``DataFeed`` from/in the current actor. """Get/create a ``BrokerFeed`` from/in the current actor.
""" """
# check if a cached client is in the local actor's statespace # check if a cached client is in the local actor's statespace
ss = tractor.current_actor().statespace ss = tractor.current_actor().statespace
@ -293,7 +296,7 @@ async def get_cached_feed(
exit_stack = contextlib.AsyncExitStack() exit_stack = contextlib.AsyncExitStack()
client = await exit_stack.enter_async_context( client = await exit_stack.enter_async_context(
brokermod.get_client()) brokermod.get_client())
feed = DataFeed( feed = BrokerFeed(
mod=brokermod, mod=brokermod,
client=client, client=client,
exit_stack=exit_stack, exit_stack=exit_stack,
@ -422,6 +425,89 @@ async def start_quote_stream(
await feed.exit_stack.aclose() await feed.exit_stack.aclose()
class DataFeed(object):
"""Data feed client for streaming symbol data from a (remote)
``brokerd`` data daemon.
"""
_allowed = ('stock', 'option')
def __init__(self, portal, brokermod):
self.portal = portal
self.brokermod = brokermod
self._quote_type = None
self._symbols = None
self.quote_gen = None
self._mutex = trio.StrictFIFOLock()
self._symbol_data_cache: Dict[str, Any] = {}
async def open_stream(self, symbols, feed_type, rate=1, test=None):
if feed_type not in self._allowed:
raise ValueError(f"Only feed types {self._allowed} are supported")
self._quote_type = feed_type
async with self._mutex:
try:
if self.quote_gen is not None and symbols != self._symbols:
log.info(
f"Stopping existing subscription for {self._symbols}")
await self.quote_gen.aclose()
self._symbols = symbols
if feed_type == 'stock' and not (
all(symbol in self._symbol_data_cache
for symbol in symbols)
):
# subscribe for tickers (this performs a possible filtering
# where invalid symbols are discarded)
sd = await self.portal.run(
"piker.brokers.data", 'symbol_data',
broker=self.brokermod.name, tickers=symbols)
self._symbol_data_cache.update(sd)
if test:
# stream from a local test file
quote_gen = await self.portal.run(
"piker.brokers.data", 'stream_from_file',
filename=test
)
else:
log.info(f"Starting new stream for {self._symbols}")
# start live streaming from broker daemon
quote_gen = await self.portal.run(
"piker.brokers.data",
'start_quote_stream',
broker=self.brokermod.name,
symbols=symbols,
feed_type=feed_type,
rate=rate,
)
# get first quotes response
log.debug(f"Waiting on first quote for {symbols}...")
quotes = {}
# with trio.move_on_after(5):
quotes = await quote_gen.__anext__()
self.quote_gen = quote_gen
self.first_quotes = quotes
return quote_gen, quotes
except Exception:
if self.quote_gen:
await self.quote_gen.aclose()
self.quote_gen = None
raise
def format_quotes(self, quotes, symbol_data={}):
self._symbol_data_cache.update(symbol_data)
formatter = getattr(self.brokermod, f'format_{self._quote_type}_quote')
records, displayables = zip(*[
formatter(quote, self._symbol_data_cache)
for quote in quotes.values()
])
return records, displayables
async def stream_to_file( async def stream_to_file(
watchlist_name: str, watchlist_name: str,
filename: str, filename: str,

View File

@ -18,6 +18,7 @@ from kivy.uix.label import Label
from ..log import get_logger from ..log import get_logger
from ..brokers.core import contracts from ..brokers.core import contracts
from ..brokers.data import DataFeed
from .pager import PagerView from .pager import PagerView
from .tabular import Row, HeaderCell, Cell, TickerTable from .tabular import Row, HeaderCell, Cell, TickerTable
@ -146,67 +147,6 @@ class ExpiryButton(Cell):
self.chain.start_displaying(self.chain.symbol, self.key) self.chain.start_displaying(self.chain.symbol, self.key)
class DataFeed(object):
"""Data feed client for streaming symbol data from a (remote)
``brokerd`` data daemon.
"""
def __init__(self, portal, brokermod):
self.portal = portal
self.brokermod = brokermod
self._symbols = None
self.quote_gen = None
self._mutex = trio.StrictFIFOLock()
async def open_stream(self, symbols, rate=1, test=None):
async with self._mutex:
try:
if self.quote_gen is not None and symbols != self._symbols:
log.info(
f"Stopping existing subscription for {self._symbols}")
await self.quote_gen.aclose()
self._symbols = symbols
if test:
# stream from a local test file
quote_gen = await self.portal.run(
"piker.brokers.data", 'stream_from_file',
filename=test
)
else:
log.info(f"Starting new stream for {self._symbols}")
# start live streaming from broker daemon
quote_gen = await self.portal.run(
"piker.brokers.data",
'start_quote_stream',
broker=self.brokermod.name,
symbols=symbols,
feed_type='option',
rate=rate,
)
# get first quotes response
log.debug(f"Waiting on first quote for {symbols}...")
quotes = {}
# with trio.move_on_after(5):
quotes = await quote_gen.__anext__()
self.quote_gen = quote_gen
self.first_quotes = quotes
return quote_gen, quotes
except Exception:
if self.quote_gen:
await self.quote_gen.aclose()
self.quote_gen = None
raise
def format_quotes(self, quotes):
records, displayables = zip(*[
self.brokermod.format_option_quote(quote, {})
for quote in quotes.values()
])
return records, displayables
@asynccontextmanager @asynccontextmanager
async def find_local_monitor(): async def find_local_monitor():
"""Establish a portal to a local monitor for triggering """Establish a portal to a local monitor for triggering
@ -276,7 +216,7 @@ class OptionChain(object):
self._parent_nursery = nursery self._parent_nursery = nursery
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
self._nursery = n self._nursery = n
# fill out and start updatingn strike table # fill out and start updating strike table
n.start_soon( n.start_soon(
partial(self._start_displaying, symbol, expiry=expiry) partial(self._start_displaying, symbol, expiry=expiry)
) )
@ -450,7 +390,8 @@ class OptionChain(object):
log.debug(f"Waiting on first_quotes for {symbol}:{expiry}") log.debug(f"Waiting on first_quotes for {symbol}:{expiry}")
self._quote_gen, first_quotes = await self.feed.open_stream( self._quote_gen, first_quotes = await self.feed.open_stream(
[(symbol, expiry)] [(symbol, expiry)],
'option',
) )
log.debug(f"Got first_quotes for {symbol}:{expiry}") log.debug(f"Got first_quotes for {symbol}:{expiry}")
records, displayables = self.feed.format_quotes(first_quotes) records, displayables = self.feed.format_quotes(first_quotes)