Apply `brokerd` quote rate throttling when requested in `open_feed()`

naive_feed_throttling
Tyler Goodlet 2021-06-14 21:55:03 -04:00
parent ccf81520cb
commit df2f6487ff
2 changed files with 43 additions and 13 deletions

View File

@ -25,9 +25,9 @@ from contextlib import asynccontextmanager
from functools import partial
from types import ModuleType
from typing import (
Dict, Any, Sequence,
Any, Sequence,
AsyncIterator, Optional,
List, Awaitable, Callable,
Awaitable, Callable,
)
import trio
@ -54,6 +54,7 @@ from ._sampling import (
increment_ohlc_buffer,
iter_ohlc_periods,
sample_and_broadcast,
uniform_rate_send,
)
@ -69,7 +70,7 @@ class _FeedsBus(BaseModel):
"""
brokername: str
nursery: trio.Nursery
feeds: Dict[str, trio.CancelScope] = {}
feeds: dict[str, trio.CancelScope] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
@ -78,7 +79,10 @@ class _FeedsBus(BaseModel):
# vars (namely `._portal` and `._cancel_scope`) at import time.
# Reported this bug:
# https://github.com/samuelcolvin/pydantic/issues/2816
_subscribers: Dict[str, List[tractor.Context]] = {}
_subscribers: dict[
str,
list[tuple[tractor.MsgStream, Optional[float]]]
] = {}
class Config:
arbitrary_types_allowed = True
@ -246,6 +250,7 @@ async def attach_feed_bus(
brokername: str,
symbol: str,
loglevel: str,
tick_throttle: Optional[float] = None,
) -> None:
@ -294,14 +299,30 @@ async def attach_feed_bus(
# deliver initial info message a first quote asap
await ctx.started((init_msg, first_quote))
async with ctx.open_stream() as stream:
async with (
ctx.open_stream() as stream,
trio.open_nursery() as n,
):
bus._subscribers[symbol].append(stream)
if tick_throttle:
send, recv = trio.open_memory_channel(2**10)
n.start_soon(
uniform_rate_send,
tick_throttle,
recv,
stream,
)
sub = (send, tick_throttle)
else:
sub = (stream, tick_throttle)
bus._subscribers[symbol].append(sub)
try:
await trio.sleep_forever()
finally:
bus._subscribers[symbol].remove(stream)
bus._subscribers[symbol].remove(sub)
@dataclass
@ -314,21 +335,21 @@ class Feed:
memory buffer orchestration.
"""
name: str
stream: AsyncIterator[Dict[str, Any]]
stream: AsyncIterator[dict[str, Any]]
shm: ShmArray
mod: ModuleType
first_quote: dict
_brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
_trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None
_max_sample_rate: int = 0
search: Callable[..., Awaitable] = None
# cache of symbol info messages received as first message when
# a stream startsc.
symbols: Dict[str, Symbol] = field(default_factory=dict)
symbols: dict[str, Symbol] = field(default_factory=dict)
async def receive(self) -> dict:
return await self.stream.__anext__()
@ -377,7 +398,7 @@ async def install_brokerd_search(
# cancellable by the user as they see fit.
async with ctx.open_stream() as stream:
async def search(text: str) -> Dict[str, Any]:
async def search(text: str) -> dict[str, Any]:
await stream.send(text)
return await stream.receive()
@ -402,7 +423,9 @@ async def open_feed(
symbols: Sequence[str],
loglevel: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]:
tick_throttle: Optional[float] = None, # Hz
) -> AsyncIterator[dict[str, Any]]:
'''
Open a "data feed" which provides streamed real-time quotes.
@ -441,7 +464,9 @@ async def open_feed(
attach_feed_bus,
brokername=brokername,
symbol=sym,
loglevel=loglevel
loglevel=loglevel,
tick_throttle=tick_throttle,
) as (ctx, (init_msg, first_quote)),

View File

@ -1515,9 +1515,14 @@ async def chart_symbol(
brokermod = brokers.get_brokermod(provider)
async with data.open_feed(
provider,
[sym],
loglevel=loglevel,
# 60 FPS to limit context switches
tick_throttle=_clear_throttle_rate,
) as feed:
ohlcv: ShmArray = feed.shm