From 0917b580c9c774480f4871b1a9fdaf22cffbf573 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 14 Apr 2023 01:50:36 -0400 Subject: [PATCH] Flip `.feed` and `._sampling` over to new stuff In `.feed` and `._sampling` move to using the new `tractor.Context.open_stream(allow_overruns: bool)` (cough, A BREAKING CHANGE). Also set `Flume.mkt` during construction in `.feed.open_feed()`. --- piker/data/_sampling.py | 3 --- piker/data/feed.py | 36 ++++++++++++------------------------ 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 3ebdd140..208a686b 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -782,9 +782,6 @@ async def uniform_rate_send( https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 ''' - # try not to error-out on overruns of the subscribed client - stream._ctx._backpressure = True - # TODO: compute the approx overhead latency per cycle left_to_sleep = throttle_period = 1/rate - 0.000616 diff --git a/piker/data/feed.py b/piker/data/feed.py index 927eecd5..9d4e09d9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1116,7 +1116,7 @@ async def allocate_persistent_feed( # TODO: we have to use this for now since currently the # MktPair above doesn't render the correct output key it seems # when we provide the `MktInfo` here?..? - symbol=symbol, + mkt=symbol, first_quote=first_quote, _rt_shm_token=rt_shm.token, @@ -1206,10 +1206,6 @@ async def open_feed_bus( symbol. ''' - # ensure that a quote feed stream which is pushing too fast doesn't - # cause and overrun in the client. - ctx._backpressure = True - if loglevel is None: loglevel = tractor.current_actor().loglevel @@ -1285,8 +1281,7 @@ async def open_feed_bus( # sync feed subscribers with flume handles await ctx.started( {fqsn: flume.to_msg() - for fqsn, flume in flumes.items() - } + for fqsn, flume in flumes.items()} ) if not start_stream: @@ -1295,7 +1290,12 @@ async def open_feed_bus( # real-time stream loop async with ( - ctx.open_stream() as stream, + ctx.open_stream( + # NOTE we allow this since it's common to have the live + # quote feed actor's sampling task push faster then the + # the local UI-graphics code during startup. + allow_overruns=True, + ) as stream, ): local_subs: dict[str, set[tuple]] = {} @@ -1323,7 +1323,6 @@ async def open_feed_bus( # a max ``tick_throttle`` instantaneous rate. send, recv = trio.open_memory_channel(2**10) - ctx._backpressure = False cs = await bus.start_task( uniform_rate_send, tick_throttle, @@ -1455,14 +1454,6 @@ class Feed(Struct): _max_sample_rate: int = 1 - # @property - # def portal(self) -> tractor.Portal: - # return self._portal - - # @property - # def name(self) -> str: - # return self.mod.name - async def pause(self) -> None: for stream in set(self.streams.values()): await stream.send('pause') @@ -1537,7 +1528,7 @@ async def maybe_open_feed( 'tick_throttle': kwargs.get('tick_throttle'), # XXX: super critical to have bool defaults here XD - 'backpressure': kwargs.get('backpressure', True), + 'allow_overruns': kwargs.get('allow_overruns', True), 'start_stream': kwargs.get('start_stream', True), }, key=fqsn, @@ -1569,7 +1560,7 @@ async def open_feed( fqsns: list[str], loglevel: str | None = None, - backpressure: bool = True, + allow_overruns: bool = True, start_stream: bool = True, tick_throttle: float | None = None, # Hz @@ -1659,9 +1650,6 @@ async def open_feed( (brokermod, bfqsns), ) in zip(ctxs, providers.items()): - # NOTE: do it asap to avoid overruns during multi-feed setup? - ctx._backpressure = backpressure - for fqsn, flume_msg in flumes_msg_dict.items(): flume = Flume.from_msg(flume_msg) assert flume.symbol.fqsn == fqsn @@ -1683,11 +1671,11 @@ async def open_feed( stream_ctxs.append( ctx.open_stream( - # XXX: be explicit about stream backpressure + # XXX: be explicit about stream overruns # since we should **never** overrun on feeds # being too fast, which will pretty much # always happen with HFT XD - backpressure=backpressure, + allow_overruns=allow_overruns, ) )