From df2f6487ff62af40740b64f3ba427e2aa538df18 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jun 2021 21:55:03 -0400 Subject: [PATCH] Apply `brokerd` quote rate throttling when requested in `open_feed()` --- piker/data/feed.py | 51 ++++++++++++++++++++++++++++++++++------------ piker/ui/_chart.py | 5 +++++ 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 689b8f93..477e7bac 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -25,9 +25,9 @@ from contextlib import asynccontextmanager from functools import partial from types import ModuleType from typing import ( - Dict, Any, Sequence, + Any, Sequence, AsyncIterator, Optional, - List, Awaitable, Callable, + Awaitable, Callable, ) import trio @@ -54,6 +54,7 @@ from ._sampling import ( increment_ohlc_buffer, iter_ohlc_periods, sample_and_broadcast, + uniform_rate_send, ) @@ -69,7 +70,7 @@ class _FeedsBus(BaseModel): """ brokername: str nursery: trio.Nursery - feeds: Dict[str, trio.CancelScope] = {} + feeds: dict[str, trio.CancelScope] = {} task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() @@ -78,7 +79,10 @@ class _FeedsBus(BaseModel): # vars (namely `._portal` and `._cancel_scope`) at import time. # Reported this bug: # https://github.com/samuelcolvin/pydantic/issues/2816 - _subscribers: Dict[str, List[tractor.Context]] = {} + _subscribers: dict[ + str, + list[tuple[tractor.MsgStream, Optional[float]]] + ] = {} class Config: arbitrary_types_allowed = True @@ -246,6 +250,7 @@ async def attach_feed_bus( brokername: str, symbol: str, loglevel: str, + tick_throttle: Optional[float] = None, ) -> None: @@ -294,14 +299,30 @@ async def attach_feed_bus( # deliver initial info message a first quote asap await ctx.started((init_msg, first_quote)) - async with ctx.open_stream() as stream: + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): - bus._subscribers[symbol].append(stream) + if tick_throttle: + send, recv = trio.open_memory_channel(2**10) + n.start_soon( + uniform_rate_send, + tick_throttle, + recv, + stream, + ) + sub = (send, tick_throttle) + + else: + sub = (stream, tick_throttle) + + bus._subscribers[symbol].append(sub) try: await trio.sleep_forever() finally: - bus._subscribers[symbol].remove(stream) + bus._subscribers[symbol].remove(sub) @dataclass @@ -314,21 +335,21 @@ class Feed: memory buffer orchestration. """ name: str - stream: AsyncIterator[Dict[str, Any]] + stream: AsyncIterator[dict[str, Any]] shm: ShmArray mod: ModuleType first_quote: dict _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None - _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None + _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None _max_sample_rate: int = 0 search: Callable[..., Awaitable] = None # cache of symbol info messages received as first message when # a stream startsc. - symbols: Dict[str, Symbol] = field(default_factory=dict) + symbols: dict[str, Symbol] = field(default_factory=dict) async def receive(self) -> dict: return await self.stream.__anext__() @@ -377,7 +398,7 @@ async def install_brokerd_search( # cancellable by the user as they see fit. async with ctx.open_stream() as stream: - async def search(text: str) -> Dict[str, Any]: + async def search(text: str) -> dict[str, Any]: await stream.send(text) return await stream.receive() @@ -402,7 +423,9 @@ async def open_feed( symbols: Sequence[str], loglevel: Optional[str] = None, -) -> AsyncIterator[Dict[str, Any]]: + tick_throttle: Optional[float] = None, # Hz + +) -> AsyncIterator[dict[str, Any]]: ''' Open a "data feed" which provides streamed real-time quotes. @@ -441,7 +464,9 @@ async def open_feed( attach_feed_bus, brokername=brokername, symbol=sym, - loglevel=loglevel + loglevel=loglevel, + + tick_throttle=tick_throttle, ) as (ctx, (init_msg, first_quote)), diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 838da304..082dcb44 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -1515,9 +1515,14 @@ async def chart_symbol( brokermod = brokers.get_brokermod(provider) async with data.open_feed( + provider, [sym], loglevel=loglevel, + + # 60 FPS to limit context switches + tick_throttle=_clear_throttle_rate, + ) as feed: ohlcv: ShmArray = feed.shm