From 57a35a3c6cb44edbbb41e081768f1e45ed844e47 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jun 2021 10:55:01 -0400 Subject: [PATCH 1/3] Port feed bus endpoint to a `@tractor.context` --- piker/clearing/_ems.py | 2 +- piker/data/_sampling.py | 6 +- piker/data/feed.py | 155 +++++++++++++++++----------------------- 3 files changed, 68 insertions(+), 95 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 2378465d..9b1c3246 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -724,7 +724,7 @@ async def _emsd_main( _router.feeds[(broker, symbol)] = feed # XXX: this should be initial price quote from target provider - first_quote = await feed.receive() + first_quote = feed.first_quote # open a stream with the brokerd backend for order # flow dialogue diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 566f2b07..2aab0ecf 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -229,10 +229,10 @@ async def sample_and_broadcast( # thus other consumers still attached. subs = bus._subscribers[sym.lower()] - for ctx in subs: + for stream in subs: # print(f'sub is {ctx.chan.uid}') try: - await ctx.send_yield({sym: quote}) + await stream.send({sym: quote}) except ( trio.BrokenResourceError, trio.ClosedResourceError @@ -241,4 +241,4 @@ async def sample_and_broadcast( # if it's done in the fee bus code? # so far seems like no since this should all # be single-threaded. - log.error(f'{ctx.chan.uid} dropped connection') + log.error(f'{stream._ctx.chan.uid} dropped connection') diff --git a/piker/data/feed.py b/piker/data/feed.py index 4a41500d..689b8f93 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -149,7 +149,6 @@ async def _setup_persistent_brokerd( async def allocate_persistent_feed( - ctx: tractor.Context, bus: _FeedsBus, brokername: str, symbol: str, @@ -240,7 +239,7 @@ async def allocate_persistent_feed( await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) -@tractor.stream +@tractor.context async def attach_feed_bus( ctx: tractor.Context, @@ -260,10 +259,11 @@ async def attach_feed_bus( assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) - sub_only: bool = False entry = bus.feeds.get(symbol) + bus._subscribers.setdefault(symbol, []) + # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery @@ -272,7 +272,7 @@ async def attach_feed_bus( init_msg, first_quote = await bus.nursery.start( partial( allocate_persistent_feed, - ctx=ctx, + bus=bus, brokername=brokername, @@ -284,29 +284,24 @@ async def attach_feed_bus( loglevel=loglevel, ) ) - bus._subscribers.setdefault(symbol, []).append(ctx) assert isinstance(bus.feeds[symbol], tuple) - else: - sub_only = True - # XXX: ``first_quote`` may be outdated here if this is secondary # subscriber cs, init_msg, first_quote = bus.feeds[symbol] # send this even to subscribers to existing feed? - await ctx.send_yield(init_msg) + # deliver initial info message a first quote asap + await ctx.started((init_msg, first_quote)) - # deliver a first quote asap - await ctx.send_yield(first_quote) + async with ctx.open_stream() as stream: - if sub_only: - bus._subscribers[symbol].append(ctx) + bus._subscribers[symbol].append(stream) - try: - await trio.sleep_forever() - finally: - bus._subscribers[symbol].remove(ctx) + try: + await trio.sleep_forever() + finally: + bus._subscribers[symbol].remove(stream) @dataclass @@ -322,6 +317,7 @@ class Feed: stream: AsyncIterator[Dict[str, Any]] shm: ShmArray mod: ModuleType + first_quote: dict _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None @@ -357,36 +353,6 @@ class Feed: else: yield self._index_stream - @asynccontextmanager - async def receive_trades_data(self) -> AsyncIterator[dict]: - - if not getattr(self.mod, 'stream_trades', False): - log.warning( - f"{self.mod.name} doesn't have trade data support yet :(") - - if not self._trade_stream: - raise RuntimeError( - f'Can not stream trade data from {self.mod.name}') - - # NOTE: this can be faked by setting a rx chan - # using the ``_.set_fake_trades_stream()`` method - if self._trade_stream is None: - - async with self._brokerd_portal.open_stream_from( - - self.mod.stream_trades, - - # do we need this? -> yes - # the broker side must declare this key - # in messages, though we could probably use - # more then one? - topics=['local_trades'], - ) as self._trade_stream: - - yield self._trade_stream - else: - yield self._trade_stream - def sym_to_shm_key( broker: str, @@ -463,63 +429,70 @@ async def open_feed( # no feed for broker exists so maybe spawn a data brokerd - async with maybe_spawn_brokerd( - brokername, - loglevel=loglevel - ) as portal: + async with ( - async with portal.open_stream_from( + maybe_spawn_brokerd( + brokername, + loglevel=loglevel + ) as portal, + + portal.open_context( attach_feed_bus, brokername=brokername, symbol=sym, loglevel=loglevel - ) as stream: + ) as (ctx, (init_msg, first_quote)), - # TODO: can we make this work better with the proposed - # context based bidirectional streaming style api proposed in: - # https://github.com/goodboy/tractor/issues/53 - init_msg = await stream.receive() + ctx.open_stream() as stream, + ): - # we can only read from shm - shm = attach_shm_array( - token=init_msg[sym]['shm_token'], - readonly=True, + # TODO: can we make this work better with the proposed + # context based bidirectional streaming style api proposed in: + # https://github.com/goodboy/tractor/issues/53 + # init_msg = await stream.receive() + + # we can only read from shm + shm = attach_shm_array( + token=init_msg[sym]['shm_token'], + readonly=True, + ) + + feed = Feed( + name=brokername, + stream=stream, + shm=shm, + mod=mod, + first_quote=first_quote, + _brokerd_portal=portal, + ) + ohlc_sample_rates = [] + + for sym, data in init_msg.items(): + + si = data['symbol_info'] + ohlc_sample_rates.append(data['sample_rate']) + + symbol = Symbol( + key=sym, + type_key=si.get('asset_type', 'forex'), + tick_size=si.get('price_tick_size', 0.01), + lot_tick_size=si.get('lot_tick_size', 0.0), ) + symbol.broker_info[brokername] = si - feed = Feed( - name=brokername, - stream=stream, - shm=shm, - mod=mod, - _brokerd_portal=portal, - ) - ohlc_sample_rates = [] + feed.symbols[sym] = symbol - for sym, data in init_msg.items(): + # cast shm dtype to list... can't member why we need this + shm_token = data['shm_token'] - si = data['symbol_info'] - ohlc_sample_rates.append(data['sample_rate']) + # XXX: msgspec won't relay through the tuples XD + shm_token['dtype_descr'] = list( + map(tuple, shm_token['dtype_descr'])) - symbol = Symbol( - key=sym, - type_key=si.get('asset_type', 'forex'), - tick_size=si.get('price_tick_size', 0.01), - lot_tick_size=si.get('lot_tick_size', 0.0), - ) - symbol.broker_info[brokername] = si + assert shm_token == shm.token # sanity - feed.symbols[sym] = symbol + feed._max_sample_rate = max(ohlc_sample_rates) - # cast shm dtype to list... can't member why we need this - shm_token = data['shm_token'] - - # XXX: msgspec won't relay through the tuples XD - shm_token['dtype_descr'] = list(map(tuple, shm_token['dtype_descr'])) - - assert shm_token == shm.token # sanity - - feed._max_sample_rate = max(ohlc_sample_rates) - - yield feed + yield feed From ccf81520cb906777df1f8979bd3b0698df170554 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jun 2021 21:43:19 -0400 Subject: [PATCH 2/3] First attempt data feed side quote throttling Adding binance's "hft" ws feeds has resulted in a lot of context switching in our Qt charts, so much so it's chewin CPU and definitely worth it to throttle to the detected display rate as per discussion in issue #192. This is a first very very naive attempt at throttling L1 tick feeds on the `brokerd` end (producer side) using a constant and uniform delivery rate by way of a `trio` task + mem chan. The new func is `data._sampling.uniform_rate_send()`. Basically if a client request a feed and provides a throttle rate we just spawn a task and queue up ticks until approximately the next display rate's worth period of time has passed before forwarding. It's definitely nothing fancy but does provide fodder and a start point for an up and coming queueing eng to start digging into both #107 and #109 ;) --- piker/data/_sampling.py | 89 +++++++++++++++++++++++++++++++++-------- 1 file changed, 73 insertions(+), 16 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 2aab0ecf..a1d615f5 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -17,6 +17,7 @@ """ Data buffers for fast shared humpy. """ +import time from typing import Dict, List import tractor @@ -152,10 +153,12 @@ async def iter_ohlc_periods( async def sample_and_broadcast( + bus: '_FeedBus', # noqa shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, sum_tick_vlm: bool = True, + ) -> None: log.info("Started shared mem bar writer") @@ -177,11 +180,10 @@ async def sample_and_broadcast( # trade data for tick in quote['ticks']: - # if tick['type'] in ('utrade',): - # print(tick) + ticktype = tick['type'] # write trade events to shm last OHLC sample - if tick['type'] in ('trade', 'utrade'): + if ticktype in ('trade', 'utrade'): last = tick['price'] @@ -229,16 +231,71 @@ async def sample_and_broadcast( # thus other consumers still attached. subs = bus._subscribers[sym.lower()] - for stream in subs: - # print(f'sub is {ctx.chan.uid}') - try: - await stream.send({sym: quote}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - # XXX: do we need to deregister here - # if it's done in the fee bus code? - # so far seems like no since this should all - # be single-threaded. - log.error(f'{stream._ctx.chan.uid} dropped connection') + for (stream, tick_throttle) in subs: + + if tick_throttle: + await stream.send(quote) + + else: + try: + await stream.send({sym: quote}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # XXX: do we need to deregister here + # if it's done in the fee bus code? + # so far seems like no since this should all + # be single-threaded. + log.error(f'{stream._ctx.chan.uid} dropped connection') + + +async def uniform_rate_send( + rate: float, + quote_stream: trio.abc.ReceiveChannel, + stream: tractor.MsgStream, +) -> None: + + sleep_period = 1/rate - 0.000616 + last_send = time.time() + + while True: + + first_quote = await quote_stream.receive() + start = time.time() + + # append quotes since last iteration into the last quote's + # tick array/buffer. + + # TODO: once we decide to get fancy really we should have + # a shared mem tick buffer that is just continually filled and + # the UI just ready from it at it's display rate. + # we'll likely head toward this once we get this issue going: + # + while True: + try: + next_quote = quote_stream.receive_nowait() + ticks = next_quote.get('ticks') + + if ticks: + first_quote['ticks'].extend(ticks) + + except trio.WouldBlock: + now = time.time() + rate = 1 / (now - last_send) + last_send = now + + # print(f'{rate} Hz sending quotes\n{first_quote}') + + # TODO: now if only we could sync this to the display + # rate timing exactly lul + await stream.send({first_quote['symbol']: first_quote}) + break + + end = time.time() + diff = end - start + + # throttle to provided transmit rate + period = max(sleep_period - diff, 0) + if period > 0: + await trio.sleep(period) From df2f6487ff62af40740b64f3ba427e2aa538df18 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jun 2021 21:55:03 -0400 Subject: [PATCH 3/3] Apply `brokerd` quote rate throttling when requested in `open_feed()` --- piker/data/feed.py | 51 ++++++++++++++++++++++++++++++++++------------ piker/ui/_chart.py | 5 +++++ 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 689b8f93..477e7bac 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -25,9 +25,9 @@ from contextlib import asynccontextmanager from functools import partial from types import ModuleType from typing import ( - Dict, Any, Sequence, + Any, Sequence, AsyncIterator, Optional, - List, Awaitable, Callable, + Awaitable, Callable, ) import trio @@ -54,6 +54,7 @@ from ._sampling import ( increment_ohlc_buffer, iter_ohlc_periods, sample_and_broadcast, + uniform_rate_send, ) @@ -69,7 +70,7 @@ class _FeedsBus(BaseModel): """ brokername: str nursery: trio.Nursery - feeds: Dict[str, trio.CancelScope] = {} + feeds: dict[str, trio.CancelScope] = {} task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() @@ -78,7 +79,10 @@ class _FeedsBus(BaseModel): # vars (namely `._portal` and `._cancel_scope`) at import time. # Reported this bug: # https://github.com/samuelcolvin/pydantic/issues/2816 - _subscribers: Dict[str, List[tractor.Context]] = {} + _subscribers: dict[ + str, + list[tuple[tractor.MsgStream, Optional[float]]] + ] = {} class Config: arbitrary_types_allowed = True @@ -246,6 +250,7 @@ async def attach_feed_bus( brokername: str, symbol: str, loglevel: str, + tick_throttle: Optional[float] = None, ) -> None: @@ -294,14 +299,30 @@ async def attach_feed_bus( # deliver initial info message a first quote asap await ctx.started((init_msg, first_quote)) - async with ctx.open_stream() as stream: + async with ( + ctx.open_stream() as stream, + trio.open_nursery() as n, + ): - bus._subscribers[symbol].append(stream) + if tick_throttle: + send, recv = trio.open_memory_channel(2**10) + n.start_soon( + uniform_rate_send, + tick_throttle, + recv, + stream, + ) + sub = (send, tick_throttle) + + else: + sub = (stream, tick_throttle) + + bus._subscribers[symbol].append(sub) try: await trio.sleep_forever() finally: - bus._subscribers[symbol].remove(stream) + bus._subscribers[symbol].remove(sub) @dataclass @@ -314,21 +335,21 @@ class Feed: memory buffer orchestration. """ name: str - stream: AsyncIterator[Dict[str, Any]] + stream: AsyncIterator[dict[str, Any]] shm: ShmArray mod: ModuleType first_quote: dict _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None - _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None + _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None _max_sample_rate: int = 0 search: Callable[..., Awaitable] = None # cache of symbol info messages received as first message when # a stream startsc. - symbols: Dict[str, Symbol] = field(default_factory=dict) + symbols: dict[str, Symbol] = field(default_factory=dict) async def receive(self) -> dict: return await self.stream.__anext__() @@ -377,7 +398,7 @@ async def install_brokerd_search( # cancellable by the user as they see fit. async with ctx.open_stream() as stream: - async def search(text: str) -> Dict[str, Any]: + async def search(text: str) -> dict[str, Any]: await stream.send(text) return await stream.receive() @@ -402,7 +423,9 @@ async def open_feed( symbols: Sequence[str], loglevel: Optional[str] = None, -) -> AsyncIterator[Dict[str, Any]]: + tick_throttle: Optional[float] = None, # Hz + +) -> AsyncIterator[dict[str, Any]]: ''' Open a "data feed" which provides streamed real-time quotes. @@ -441,7 +464,9 @@ async def open_feed( attach_feed_bus, brokername=brokername, symbol=sym, - loglevel=loglevel + loglevel=loglevel, + + tick_throttle=tick_throttle, ) as (ctx, (init_msg, first_quote)), diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 838da304..082dcb44 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -1515,9 +1515,14 @@ async def chart_symbol( brokermod = brokers.get_brokermod(provider) async with data.open_feed( + provider, [sym], loglevel=loglevel, + + # 60 FPS to limit context switches + tick_throttle=_clear_throttle_rate, + ) as feed: ohlcv: ShmArray = feed.shm