Merge pull request #199 from pikers/naive_feed_throttling

Naive feed throttling
wait_on_daemon_portals
goodboy 2021-06-20 11:56:56 -04:00 committed by GitHub
commit 835ea7f046
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 178 additions and 118 deletions

View File

@ -724,7 +724,7 @@ async def _emsd_main(
_router.feeds[(broker, symbol)] = feed _router.feeds[(broker, symbol)] = feed
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
first_quote = await feed.receive() first_quote = feed.first_quote
# open a stream with the brokerd backend for order # open a stream with the brokerd backend for order
# flow dialogue # flow dialogue

View File

@ -17,6 +17,7 @@
""" """
Data buffers for fast shared humpy. Data buffers for fast shared humpy.
""" """
import time
from typing import Dict, List from typing import Dict, List
import tractor import tractor
@ -152,10 +153,12 @@ async def iter_ohlc_periods(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: '_FeedBus', # noqa bus: '_FeedBus', # noqa
shm: ShmArray, shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
sum_tick_vlm: bool = True, sum_tick_vlm: bool = True,
) -> None: ) -> None:
log.info("Started shared mem bar writer") log.info("Started shared mem bar writer")
@ -177,11 +180,10 @@ async def sample_and_broadcast(
# trade data # trade data
for tick in quote['ticks']: for tick in quote['ticks']:
# if tick['type'] in ('utrade',): ticktype = tick['type']
# print(tick)
# write trade events to shm last OHLC sample # write trade events to shm last OHLC sample
if tick['type'] in ('trade', 'utrade'): if ticktype in ('trade', 'utrade'):
last = tick['price'] last = tick['price']
@ -229,16 +231,71 @@ async def sample_and_broadcast(
# thus other consumers still attached. # thus other consumers still attached.
subs = bus._subscribers[sym.lower()] subs = bus._subscribers[sym.lower()]
for ctx in subs: for (stream, tick_throttle) in subs:
# print(f'sub is {ctx.chan.uid}')
try: if tick_throttle:
await ctx.send_yield({sym: quote}) await stream.send(quote)
except (
trio.BrokenResourceError, else:
trio.ClosedResourceError try:
): await stream.send({sym: quote})
# XXX: do we need to deregister here except (
# if it's done in the fee bus code? trio.BrokenResourceError,
# so far seems like no since this should all trio.ClosedResourceError
# be single-threaded. ):
log.error(f'{ctx.chan.uid} dropped connection') # XXX: do we need to deregister here
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.error(f'{stream._ctx.chan.uid} dropped connection')
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream,
) -> None:
sleep_period = 1/rate - 0.000616
last_send = time.time()
while True:
first_quote = await quote_stream.receive()
start = time.time()
# append quotes since last iteration into the last quote's
# tick array/buffer.
# TODO: once we decide to get fancy really we should have
# a shared mem tick buffer that is just continually filled and
# the UI just ready from it at it's display rate.
# we'll likely head toward this once we get this issue going:
#
while True:
try:
next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks')
if ticks:
first_quote['ticks'].extend(ticks)
except trio.WouldBlock:
now = time.time()
rate = 1 / (now - last_send)
last_send = now
# print(f'{rate} Hz sending quotes\n{first_quote}')
# TODO: now if only we could sync this to the display
# rate timing exactly lul
await stream.send({first_quote['symbol']: first_quote})
break
end = time.time()
diff = end - start
# throttle to provided transmit rate
period = max(sleep_period - diff, 0)
if period > 0:
await trio.sleep(period)

View File

@ -25,9 +25,9 @@ from contextlib import asynccontextmanager
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Dict, Any, Sequence, Any, Sequence,
AsyncIterator, Optional, AsyncIterator, Optional,
List, Awaitable, Callable, Awaitable, Callable,
) )
import trio import trio
@ -54,6 +54,7 @@ from ._sampling import (
increment_ohlc_buffer, increment_ohlc_buffer,
iter_ohlc_periods, iter_ohlc_periods,
sample_and_broadcast, sample_and_broadcast,
uniform_rate_send,
) )
@ -69,7 +70,7 @@ class _FeedsBus(BaseModel):
""" """
brokername: str brokername: str
nursery: trio.Nursery nursery: trio.Nursery
feeds: Dict[str, trio.CancelScope] = {} feeds: dict[str, trio.CancelScope] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
@ -78,7 +79,10 @@ class _FeedsBus(BaseModel):
# vars (namely `._portal` and `._cancel_scope`) at import time. # vars (namely `._portal` and `._cancel_scope`) at import time.
# Reported this bug: # Reported this bug:
# https://github.com/samuelcolvin/pydantic/issues/2816 # https://github.com/samuelcolvin/pydantic/issues/2816
_subscribers: Dict[str, List[tractor.Context]] = {} _subscribers: dict[
str,
list[tuple[tractor.MsgStream, Optional[float]]]
] = {}
class Config: class Config:
arbitrary_types_allowed = True arbitrary_types_allowed = True
@ -149,7 +153,6 @@ async def _setup_persistent_brokerd(
async def allocate_persistent_feed( async def allocate_persistent_feed(
ctx: tractor.Context,
bus: _FeedsBus, bus: _FeedsBus,
brokername: str, brokername: str,
symbol: str, symbol: str,
@ -240,13 +243,14 @@ async def allocate_persistent_feed(
await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm)
@tractor.stream @tractor.context
async def attach_feed_bus( async def attach_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
tick_throttle: Optional[float] = None,
) -> None: ) -> None:
@ -260,10 +264,11 @@ async def attach_feed_bus(
assert 'brokerd' in tractor.current_actor().name assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
sub_only: bool = False
entry = bus.feeds.get(symbol) entry = bus.feeds.get(symbol)
bus._subscribers.setdefault(symbol, [])
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # brokerd yet, start persistent stream and shm writer task in
# service nursery # service nursery
@ -272,7 +277,7 @@ async def attach_feed_bus(
init_msg, first_quote = await bus.nursery.start( init_msg, first_quote = await bus.nursery.start(
partial( partial(
allocate_persistent_feed, allocate_persistent_feed,
ctx=ctx,
bus=bus, bus=bus,
brokername=brokername, brokername=brokername,
@ -284,29 +289,40 @@ async def attach_feed_bus(
loglevel=loglevel, loglevel=loglevel,
) )
) )
bus._subscribers.setdefault(symbol, []).append(ctx)
assert isinstance(bus.feeds[symbol], tuple) assert isinstance(bus.feeds[symbol], tuple)
else:
sub_only = True
# XXX: ``first_quote`` may be outdated here if this is secondary # XXX: ``first_quote`` may be outdated here if this is secondary
# subscriber # subscriber
cs, init_msg, first_quote = bus.feeds[symbol] cs, init_msg, first_quote = bus.feeds[symbol]
# send this even to subscribers to existing feed? # send this even to subscribers to existing feed?
await ctx.send_yield(init_msg) # deliver initial info message a first quote asap
await ctx.started((init_msg, first_quote))
# deliver a first quote asap async with (
await ctx.send_yield(first_quote) ctx.open_stream() as stream,
trio.open_nursery() as n,
):
if sub_only: if tick_throttle:
bus._subscribers[symbol].append(ctx) send, recv = trio.open_memory_channel(2**10)
n.start_soon(
uniform_rate_send,
tick_throttle,
recv,
stream,
)
sub = (send, tick_throttle)
try: else:
await trio.sleep_forever() sub = (stream, tick_throttle)
finally:
bus._subscribers[symbol].remove(ctx) bus._subscribers[symbol].append(sub)
try:
await trio.sleep_forever()
finally:
bus._subscribers[symbol].remove(sub)
@dataclass @dataclass
@ -319,20 +335,21 @@ class Feed:
memory buffer orchestration. memory buffer orchestration.
""" """
name: str name: str
stream: AsyncIterator[Dict[str, Any]] stream: AsyncIterator[dict[str, Any]]
shm: ShmArray shm: ShmArray
mod: ModuleType mod: ModuleType
first_quote: dict
_brokerd_portal: tractor._portal.Portal _brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None _index_stream: Optional[AsyncIterator[int]] = None
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None
_max_sample_rate: int = 0 _max_sample_rate: int = 0
search: Callable[..., Awaitable] = None search: Callable[..., Awaitable] = None
# cache of symbol info messages received as first message when # cache of symbol info messages received as first message when
# a stream startsc. # a stream startsc.
symbols: Dict[str, Symbol] = field(default_factory=dict) symbols: dict[str, Symbol] = field(default_factory=dict)
async def receive(self) -> dict: async def receive(self) -> dict:
return await self.stream.__anext__() return await self.stream.__anext__()
@ -357,36 +374,6 @@ class Feed:
else: else:
yield self._index_stream yield self._index_stream
@asynccontextmanager
async def receive_trades_data(self) -> AsyncIterator[dict]:
if not getattr(self.mod, 'stream_trades', False):
log.warning(
f"{self.mod.name} doesn't have trade data support yet :(")
if not self._trade_stream:
raise RuntimeError(
f'Can not stream trade data from {self.mod.name}')
# NOTE: this can be faked by setting a rx chan
# using the ``_.set_fake_trades_stream()`` method
if self._trade_stream is None:
async with self._brokerd_portal.open_stream_from(
self.mod.stream_trades,
# do we need this? -> yes
# the broker side must declare this key
# in messages, though we could probably use
# more then one?
topics=['local_trades'],
) as self._trade_stream:
yield self._trade_stream
else:
yield self._trade_stream
def sym_to_shm_key( def sym_to_shm_key(
broker: str, broker: str,
@ -411,7 +398,7 @@ async def install_brokerd_search(
# cancellable by the user as they see fit. # cancellable by the user as they see fit.
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async def search(text: str) -> Dict[str, Any]: async def search(text: str) -> dict[str, Any]:
await stream.send(text) await stream.send(text)
return await stream.receive() return await stream.receive()
@ -436,7 +423,9 @@ async def open_feed(
symbols: Sequence[str], symbols: Sequence[str],
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]: tick_throttle: Optional[float] = None, # Hz
) -> AsyncIterator[dict[str, Any]]:
''' '''
Open a "data feed" which provides streamed real-time quotes. Open a "data feed" which provides streamed real-time quotes.
@ -463,63 +452,72 @@ async def open_feed(
# no feed for broker exists so maybe spawn a data brokerd # no feed for broker exists so maybe spawn a data brokerd
async with maybe_spawn_brokerd( async with (
brokername,
loglevel=loglevel
) as portal:
async with portal.open_stream_from( maybe_spawn_brokerd(
brokername,
loglevel=loglevel
) as portal,
portal.open_context(
attach_feed_bus, attach_feed_bus,
brokername=brokername, brokername=brokername,
symbol=sym, symbol=sym,
loglevel=loglevel loglevel=loglevel,
) as stream: tick_throttle=tick_throttle,
# TODO: can we make this work better with the proposed ) as (ctx, (init_msg, first_quote)),
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
init_msg = await stream.receive()
# we can only read from shm ctx.open_stream() as stream,
shm = attach_shm_array( ):
token=init_msg[sym]['shm_token'],
readonly=True, # TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
# init_msg = await stream.receive()
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
readonly=True,
)
feed = Feed(
name=brokername,
stream=stream,
shm=shm,
mod=mod,
first_quote=first_quote,
_brokerd_portal=portal,
)
ohlc_sample_rates = []
for sym, data in init_msg.items():
si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
symbol = Symbol(
key=sym,
type_key=si.get('asset_type', 'forex'),
tick_size=si.get('price_tick_size', 0.01),
lot_tick_size=si.get('lot_tick_size', 0.0),
) )
symbol.broker_info[brokername] = si
feed = Feed( feed.symbols[sym] = symbol
name=brokername,
stream=stream,
shm=shm,
mod=mod,
_brokerd_portal=portal,
)
ohlc_sample_rates = []
for sym, data in init_msg.items(): # cast shm dtype to list... can't member why we need this
shm_token = data['shm_token']
si = data['symbol_info'] # XXX: msgspec won't relay through the tuples XD
ohlc_sample_rates.append(data['sample_rate']) shm_token['dtype_descr'] = list(
map(tuple, shm_token['dtype_descr']))
symbol = Symbol( assert shm_token == shm.token # sanity
key=sym,
type_key=si.get('asset_type', 'forex'),
tick_size=si.get('price_tick_size', 0.01),
lot_tick_size=si.get('lot_tick_size', 0.0),
)
symbol.broker_info[brokername] = si
feed.symbols[sym] = symbol feed._max_sample_rate = max(ohlc_sample_rates)
# cast shm dtype to list... can't member why we need this yield feed
shm_token = data['shm_token']
# XXX: msgspec won't relay through the tuples XD
shm_token['dtype_descr'] = list(map(tuple, shm_token['dtype_descr']))
assert shm_token == shm.token # sanity
feed._max_sample_rate = max(ohlc_sample_rates)
yield feed

View File

@ -1515,9 +1515,14 @@ async def chart_symbol(
brokermod = brokers.get_brokermod(provider) brokermod = brokers.get_brokermod(provider)
async with data.open_feed( async with data.open_feed(
provider, provider,
[sym], [sym],
loglevel=loglevel, loglevel=loglevel,
# 60 FPS to limit context switches
tick_throttle=_clear_throttle_rate,
) as feed: ) as feed:
ohlcv: ShmArray = feed.shm ohlcv: ShmArray = feed.shm