From c2367c1c5e5b8718ac696df40d82e7580fb4163e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Nov 2022 16:10:43 -0500 Subject: [PATCH 01/11] Better `trio`-ize `BroadcastReceiver` internals Driven by a bug found in `piker` where we'd get an inf recursion error due to `BroadcastReceiver.receive()` being called when consumer tasks are awoken but no value is ready to `.nowait_receive()`. This new rework takes an approach closer to the interface and internals of `trio.MemoryReceiveChannel` particularly in terms of, - implementing a `BroadcastReceiver.receive_nowait()` and using it within the async `.receive()`. - failing over to an internal `._receive_from_underlying()` when the `_nowait()` call raises `trio.WouldBlock`. - adding `BroadcastState.statistics()` for debugging and testing dropping recursion from `.receive()`. --- tractor/trionics/_broadcast.py | 333 ++++++++++++++++++++++++--------- 1 file changed, 248 insertions(+), 85 deletions(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 6c04895..6b2725d 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -23,7 +23,6 @@ from __future__ import annotations from abc import abstractmethod from collections import deque from contextlib import asynccontextmanager -from dataclasses import dataclass from functools import partial from operator import ne from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol @@ -33,6 +32,7 @@ import trio from trio._core._run import Task from trio.abc import ReceiveChannel from trio.lowlevel import current_task +from msgspec import Struct # A regular invariant generic type @@ -86,8 +86,7 @@ class Lagged(trio.TooSlowError): ''' -@dataclass -class BroadcastState: +class BroadcastState(Struct): ''' Common state to all receivers of a broadcast. @@ -110,7 +109,32 @@ class BroadcastState: eoc: bool = False # If the broadcaster was cancelled, we might as well track it - cancelled: bool = False + cancelled: dict[int, Task] = {} + + def statistics(self) -> dict[str, str | int | float]: + ''' + Return broadcast receiver group "statistics" like many of + ``trio``'s internal task-sync primitives. + + ''' + subs = self.subs + if self.recv_ready is not None: + key, ev = self.recv_ready + else: + key = ev = None + + qlens = {} + for tid, sz in subs.items(): + qlens[tid] = sz if sz != -1 else 0 + + return { + 'open_consumers': len(subs), + 'queued_len_by_task': qlens, + 'max_buffer_size': self.maxlen, + 'tasks_waiting': ev.statistics().tasks_waiting if ev else 0, + 'tasks_cancelled': self.cancelled, + 'next_value_receiver_id': key, + } class BroadcastReceiver(ReceiveChannel): @@ -134,6 +158,12 @@ class BroadcastReceiver(ReceiveChannel): # register the original underlying (clone) self.key = id(self) self._state = state + + # each consumer has an int count which indicates + # which index contains the next value that the task has not yet + # consumed and thus should read. In the "up-to-date" case the + # consumer task must wait for a new value from the underlying + # receiver and we use ``-1`` as the sentinel for this state. state.subs[self.key] = -1 # underlying for this receiver @@ -141,10 +171,14 @@ class BroadcastReceiver(ReceiveChannel): self._recv = receive_afunc or rx_chan.receive self._closed: bool = False - async def receive(self) -> ReceiveType: + def receive_nowait( + self, + _key: int | None = None, + _state: BroadcastState | None = None, - key = self.key - state = self._state + ) -> ReceiveType: + key = _key or self.key + state = _state or self._state # TODO: ideally we can make some way to "lock out" the # underlying receive channel in some way such that if some task @@ -189,112 +223,241 @@ class BroadcastReceiver(ReceiveChannel): state.subs[key] -= 1 return value - # current task already has the latest value **and** is the - # first task to begin waiting for a new one - if state.recv_ready is None: + raise trio.WouldBlock - if self._closed: - raise trio.ClosedResourceError + async def _receive_from_underlying( + self, + key: int, + state: BroadcastState, - event = trio.Event() - state.recv_ready = key, event + ) -> ReceiveType: + if self._closed: + raise trio.ClosedResourceError + + event = trio.Event() + assert state.recv_ready is None + state.recv_ready = key, event + + try: # if we're cancelled here it should be # fine to bail without affecting any other consumers # right? - try: - value = await self._recv() + value = await self._recv() - # items with lower indices are "newer" - # NOTE: ``collections.deque`` implicitly takes care of - # trucating values outside our ``state.maxlen``. In the - # alt-backend-array-case we'll need to make sure this is - # implemented in similar ringer-buffer-ish style. - state.queue.appendleft(value) + # items with lower indices are "newer" + # NOTE: ``collections.deque`` implicitly takes care of + # trucating values outside our ``state.maxlen``. In the + # alt-backend-array-case we'll need to make sure this is + # implemented in similar ringer-buffer-ish style. + state.queue.appendleft(value) - # broadcast new value to all subscribers by increasing - # all sequence numbers that will point in the queue to - # their latest available value. + # broadcast new value to all subscribers by increasing + # all sequence numbers that will point in the queue to + # their latest available value. - # don't decrement the sequence for this task since we - # already retreived the last value + # don't decrement the sequence for this task since we + # already retreived the last value - # XXX: which of these impls is fastest? + # XXX: which of these impls is fastest? - # subs = state.subs.copy() - # subs.pop(key) + # subs = state.subs.copy() + # subs.pop(key) - for sub_key in filter( - # lambda k: k != key, state.subs, - partial(ne, key), state.subs, - ): - state.subs[sub_key] += 1 - - # NOTE: this should ONLY be set if the above task was *NOT* - # cancelled on the `._recv()` call. - event.set() - return value - - except trio.EndOfChannel: - # if any one consumer gets an EOC from the underlying - # receiver we need to unblock and send that signal to - # all other consumers. - self._state.eoc = True - if event.statistics().tasks_waiting: - event.set() - raise - - except ( - trio.Cancelled, + for sub_key in filter( + # lambda k: k != key, state.subs, + partial(ne, key), state.subs, ): - # handle cancelled specially otherwise sibling - # consumers will be awoken with a sequence of -1 - # and will potentially try to rewait the underlying - # receiver instead of just cancelling immediately. - self._state.cancelled = True - if event.statistics().tasks_waiting: - event.set() - raise + state.subs[sub_key] += 1 - finally: + # NOTE: this should ONLY be set if the above task was *NOT* + # cancelled on the `._recv()` call. + event.set() + return value - # Reset receiver waiter task event for next blocking condition. - # this MUST be reset even if the above ``.recv()`` call - # was cancelled to avoid the next consumer from blocking on - # an event that won't be set! - state.recv_ready = None + except trio.EndOfChannel: + # if any one consumer gets an EOC from the underlying + # receiver we need to unblock and send that signal to + # all other consumers. + self._state.eoc = True + if event.statistics().tasks_waiting: + event.set() + raise + + except ( + trio.Cancelled, + ): + # handle cancelled specially otherwise sibling + # consumers will be awoken with a sequence of -1 + # and will potentially try to rewait the underlying + # receiver instead of just cancelling immediately. + self._state.cancelled[key] = current_task() + if event.statistics().tasks_waiting: + event.set() + raise + + finally: + + # Reset receiver waiter task event for next blocking condition. + # this MUST be reset even if the above ``.recv()`` call + # was cancelled to avoid the next consumer from blocking on + # an event that won't be set! + state.recv_ready = None + + async def receive(self) -> ReceiveType: + key = self.key + state = self._state + + try: + return self.receive_nowait( + _key=key, + _state=state, + ) + except trio.WouldBlock: + pass + + # current task already has the latest value **and** is the + # first task to begin waiting for a new one + if state.recv_ready is None: + return await self._receive_from_underlying(key, state) + + # if self._closed: + # raise trio.ClosedResourceError + + # event = trio.Event() + # state.recv_ready = key, event + + # try: + # # if we're cancelled here it should be + # # fine to bail without affecting any other consumers + # # right? + # value = await self._recv() + + # # items with lower indices are "newer" + # # NOTE: ``collections.deque`` implicitly takes care of + # # trucating values outside our ``state.maxlen``. In the + # # alt-backend-array-case we'll need to make sure this is + # # implemented in similar ringer-buffer-ish style. + # state.queue.appendleft(value) + + # # broadcast new value to all subscribers by increasing + # # all sequence numbers that will point in the queue to + # # their latest available value. + + # # don't decrement the sequence for this task since we + # # already retreived the last value + + # # XXX: which of these impls is fastest? + + # # subs = state.subs.copy() + # # subs.pop(key) + + # for sub_key in filter( + # # lambda k: k != key, state.subs, + # partial(ne, key), state.subs, + # ): + # state.subs[sub_key] += 1 + + # # NOTE: this should ONLY be set if the above task was *NOT* + # # cancelled on the `._recv()` call. + # event.set() + # return value + + # except trio.EndOfChannel: + # # if any one consumer gets an EOC from the underlying + # # receiver we need to unblock and send that signal to + # # all other consumers. + # self._state.eoc = True + # if event.statistics().tasks_waiting: + # event.set() + # raise + + # except ( + # trio.Cancelled, + # ): + # # handle cancelled specially otherwise sibling + # # consumers will be awoken with a sequence of -1 + # # and will potentially try to rewait the underlying + # # receiver instead of just cancelling immediately. + # self._state.cancelled[key] = current_task() + # if event.statistics().tasks_waiting: + # event.set() + # raise + + # finally: + + # # Reset receiver waiter task event for next blocking condition. + # # this MUST be reset even if the above ``.recv()`` call + # # was cancelled to avoid the next consumer from blocking on + # # an event that won't be set! + # state.recv_ready = None # This task is all caught up and ready to receive the latest # value, so queue sched it on the internal event. else: - seq = state.subs[key] - assert seq == -1 # sanity - _, ev = state.recv_ready - await ev.wait() + while state.recv_ready is not None: + # seq = state.subs[key] + # assert seq == -1 # sanity + _, ev = state.recv_ready + await ev.wait() + try: + return self.receive_nowait( + _key=key, + _state=state, + ) + except trio.WouldBlock: + if ( + self._closed + ): + raise trio.ClosedResourceError + + subs = state.subs + if ( + len(subs) == 1 + and key in subs + # or cancelled + ): + # XXX: we are the last and only user of this BR so + # likely it makes sense to unwind back to the + # underlying? + import tractor + await tractor.breakpoint() + + + # XXX: In the case where the first task to allocate the + # ``.recv_ready`` event is cancelled we will be woken + # with a non-incremented sequence number (the ``-1`` + # sentinel) and thus will read the oldest value if we + # use that. Instead we need to detect if we have not + # been incremented and then receive again. + # return await self.receive() + + # if state.recv_ready is None: + + print(f'{key}: {state.statistics()}') + return await self._receive_from_underlying(key, state) + + # seq = state.subs[key] # NOTE: if we ever would like the behaviour where if the # first task to recv on the underlying is cancelled but it # still DOES trigger the ``.recv_ready``, event we'll likely need # this logic: - if seq > -1: - # stuff from above.. - seq = state.subs[key] + # if seq > -1: + # # stuff from above.. + # seq = state.subs[key] - value = state.queue[seq] - state.subs[key] -= 1 - return value + # value = state.queue[seq] + # state.subs[key] -= 1 + # return value - elif seq == -1: - # XXX: In the case where the first task to allocate the - # ``.recv_ready`` event is cancelled we will be woken with - # a non-incremented sequence number and thus will read the - # oldest value if we use that. Instead we need to detect if - # we have not been incremented and then receive again. - return await self.receive() + # elif ( + # seq == -1 + # ): - else: - raise ValueError(f'Invalid sequence {seq}!?') + # else: + raise RuntimeError(f'Unable to receive {key}:\n{state.statistics()}') @asynccontextmanager async def subscribe( From 9f9907271bf9c2ef1192ee7b723e8dd70db5b844 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 8 Jan 2023 13:00:36 -0500 Subject: [PATCH 02/11] Merge `ReceiveMsgStream` and `MsgStream` Since one-way streaming can be accomplished by just *not* sending on one side (and/or thus wrapping such usage in a more restrictive API), we just drop the recv-only parent type. The only method different was `MsgStream.send()`, now merged in. Further in usage of `.subscribe()` we monkey patch the underlying stream's `.send()` onto the delivered broadcast receiver so that subscriber tasks can two-way stream as though using the stream directly. This allows us to more definitively drop `tractor.open_stream_from()` in the longer run if we so choose as well; note currently this will potentially create an issue if a caller tries to `.send()` on such a one way stream. --- tractor/__init__.py | 2 -- tractor/_portal.py | 11 +++++++---- tractor/_streaming.py | 25 +++++++++++++------------ 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index a691df6..731f3e9 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -24,7 +24,6 @@ from ._clustering import open_actor_cluster from ._ipc import Channel from ._streaming import ( Context, - ReceiveMsgStream, MsgStream, stream, context, @@ -64,7 +63,6 @@ __all__ = [ 'MsgStream', 'BaseExceptionGroup', 'Portal', - 'ReceiveMsgStream', 'RemoteActorError', 'breakpoint', 'context', diff --git a/tractor/_portal.py b/tractor/_portal.py index 05504bd..17871aa 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -45,7 +45,10 @@ from ._exceptions import ( NoResult, ContextCancelled, ) -from ._streaming import Context, ReceiveMsgStream +from ._streaming import ( + Context, + MsgStream, +) log = get_logger(__name__) @@ -101,7 +104,7 @@ class Portal: # it is expected that ``result()`` will be awaited at some # point. self._expect_result: Optional[Context] = None - self._streams: set[ReceiveMsgStream] = set() + self._streams: set[MsgStream] = set() self.actor = current_actor() async def _submit_for_result( @@ -316,7 +319,7 @@ class Portal: async_gen_func: Callable, # typing: ignore **kwargs, - ) -> AsyncGenerator[ReceiveMsgStream, None]: + ) -> AsyncGenerator[MsgStream, None]: if not inspect.isasyncgenfunction(async_gen_func): if not ( @@ -341,7 +344,7 @@ class Portal: try: # deliver receive only stream - async with ReceiveMsgStream( + async with MsgStream( ctx, ctx._recv_chan, ) as rchan: self._streams.add(rchan) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 699a906..f24856c 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -50,12 +50,13 @@ log = get_logger(__name__) # - use __slots__ on ``Context``? -class ReceiveMsgStream(trio.abc.ReceiveChannel): +class MsgStream(trio.abc.Channel): ''' - A IPC message stream for receiving logically sequenced values over - an inter-actor ``Channel``. This is the type returned to a local - task which entered either ``Portal.open_stream_from()`` or - ``Context.open_stream()``. + A bidirectional message stream for receiving logically sequenced + values over an inter-actor IPC ``Channel``. + + This is the type returned to a local task which entered either + ``Portal.open_stream_from()`` or ``Context.open_stream()``. Termination rules: @@ -317,15 +318,15 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): async with self._broadcaster.subscribe() as bstream: assert bstream.key != self._broadcaster.key assert bstream._recv == self._broadcaster._recv + + # NOTE: we patch on a `.send()` to the bcaster so that the + # caller can still conduct 2-way streaming using this + # ``bstream`` handle transparently as though it was the msg + # stream instance. + bstream.send = self.send + yield bstream - -class MsgStream(ReceiveMsgStream, trio.abc.Channel): - ''' - Bidirectional message stream for use within an inter-actor actor - ``Context```. - - ''' async def send( self, data: Any From c8efcdd0d3f4513c6c7537bfd3bd481b42ddc56d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 8 Jan 2023 13:17:59 -0500 Subject: [PATCH 03/11] Drop `ReceiveMsgStream` from test suite --- tests/test_advanced_streaming.py | 2 +- tests/test_task_broadcasting.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index 99414a5..799a089 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -14,7 +14,7 @@ def is_win(): return platform.system() == 'Windows' -_registry: dict[str, set[tractor.ReceiveMsgStream]] = { +_registry: dict[str, set[tractor.MsgStream]] = { 'even': set(), 'odd': set(), } diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 1e2f6b4..636f92b 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -37,7 +37,7 @@ async def echo_sequences( async def ensure_sequence( - stream: tractor.ReceiveMsgStream, + stream: tractor.MsgStream, sequence: list, delay: Optional[float] = None, From 2707a0e97199417e9ad2ca5cc9b37b52f6b6794f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 8 Jan 2023 13:18:51 -0500 Subject: [PATCH 04/11] Add `._raise_on_lag` flag to disable `Lag` raising --- tractor/trionics/_broadcast.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 6b2725d..12ab382 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -33,7 +33,9 @@ from trio._core._run import Task from trio.abc import ReceiveChannel from trio.lowlevel import current_task from msgspec import Struct +from tractor.log import get_logger +log = get_logger(__name__) # A regular invariant generic type T = TypeVar("T") @@ -152,6 +154,7 @@ class BroadcastReceiver(ReceiveChannel): rx_chan: AsyncReceiver, state: BroadcastState, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, + raise_on_lag: bool = False, ) -> None: @@ -170,6 +173,7 @@ class BroadcastReceiver(ReceiveChannel): self._rx = rx_chan self._recv = receive_afunc or rx_chan.receive self._closed: bool = False + self._raise_on_lag = raise_on_lag def receive_nowait( self, @@ -218,7 +222,12 @@ class BroadcastReceiver(ReceiveChannel): # this task was overrun by the producer side task: Task = current_task() - raise Lagged(f'Task {task.name} was overrun') + msg = f'Task {task.name} was overrun' + + if self._raise_on_lag: + raise Lagged(msg) + else: + log.warning(msg) state.subs[key] -= 1 return value From 6ba29f8d565edaf434616079273da7dd1331fa6c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 13 Jan 2023 16:52:19 -0500 Subject: [PATCH 05/11] Recurse and get the last value when in warn mode --- tractor/trionics/_broadcast.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 12ab382..9bec416 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -215,19 +215,23 @@ class BroadcastReceiver(ReceiveChannel): # return this value." # https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging + mxln = state.maxlen + lost = seq - mxln + # decrement to the last value and expect # consumer to either handle the ``Lagged`` and come back # or bail out on its own (thus un-subscribing) - state.subs[key] = state.maxlen - 1 + state.subs[key] = mxln - 1 # this task was overrun by the producer side task: Task = current_task() - msg = f'Task {task.name} was overrun' + msg = f'Task `{task.name}` overrun and dropped `{lost}` values' if self._raise_on_lag: raise Lagged(msg) else: log.warning(msg) + return self.receive_nowait(_key, _state) state.subs[key] -= 1 return value From 80f983818fcd4d0f1ca827c6db0dbb522cb39589 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jan 2023 12:04:32 -0500 Subject: [PATCH 06/11] Ignore monkey patched `.send()` type annot --- tractor/_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index f24856c..b112956 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -323,7 +323,7 @@ class MsgStream(trio.abc.Channel): # caller can still conduct 2-way streaming using this # ``bstream`` handle transparently as though it was the msg # stream instance. - bstream.send = self.send + bstream.send = self.send # type: ignore yield bstream From 4ce2dcd12bbcbc85b389c3d57a3252685bcb0cc3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jan 2023 14:35:02 -0500 Subject: [PATCH 07/11] Switch back to raising `Lagged` by default Makes the broadcast test suite not hang xD, and is our expected default behaviour. Also removes a ton of commented legacy cruft from before the refactor to remove the `.receive()` recursion and fixes some typing. Oh right, and in the case where there's only one subscriber left we warn log about it since in theory we could actually entirely unwind the bcaster back to the original underlying, though not sure if that's sane or works for some use cases (like wanting to have some other subscriber get added dynamically later). --- tractor/trionics/_broadcast.py | 133 ++++++--------------------------- 1 file changed, 23 insertions(+), 110 deletions(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 9bec416..43af2f0 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -113,19 +113,22 @@ class BroadcastState(Struct): # If the broadcaster was cancelled, we might as well track it cancelled: dict[int, Task] = {} - def statistics(self) -> dict[str, str | int | float]: + def statistics(self) -> dict[str, Any]: ''' Return broadcast receiver group "statistics" like many of ``trio``'s internal task-sync primitives. ''' + key: int | None + ev: trio.Event | None + subs = self.subs if self.recv_ready is not None: key, ev = self.recv_ready else: key = ev = None - qlens = {} + qlens: dict[int, int] = {} for tid, sz in subs.items(): qlens[tid] = sz if sz != -1 else 0 @@ -154,7 +157,7 @@ class BroadcastReceiver(ReceiveChannel): rx_chan: AsyncReceiver, state: BroadcastState, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, - raise_on_lag: bool = False, + raise_on_lag: bool = True, ) -> None: @@ -180,7 +183,12 @@ class BroadcastReceiver(ReceiveChannel): _key: int | None = None, _state: BroadcastState | None = None, - ) -> ReceiveType: + ) -> Any: + ''' + Sync version of `.receive()` which does all the low level work + of receiving from the underlying/wrapped receive channel. + + ''' key = _key or self.key state = _state or self._state @@ -273,7 +281,6 @@ class BroadcastReceiver(ReceiveChannel): # already retreived the last value # XXX: which of these impls is fastest? - # subs = state.subs.copy() # subs.pop(key) @@ -310,7 +317,6 @@ class BroadcastReceiver(ReceiveChannel): raise finally: - # Reset receiver waiter task event for next blocking condition. # this MUST be reset even if the above ``.recv()`` call # was cancelled to avoid the next consumer from blocking on @@ -330,83 +336,14 @@ class BroadcastReceiver(ReceiveChannel): pass # current task already has the latest value **and** is the - # first task to begin waiting for a new one + # first task to begin waiting for a new one so we begin blocking + # until rescheduled with the a new value from the underlying. if state.recv_ready is None: return await self._receive_from_underlying(key, state) - # if self._closed: - # raise trio.ClosedResourceError - - # event = trio.Event() - # state.recv_ready = key, event - - # try: - # # if we're cancelled here it should be - # # fine to bail without affecting any other consumers - # # right? - # value = await self._recv() - - # # items with lower indices are "newer" - # # NOTE: ``collections.deque`` implicitly takes care of - # # trucating values outside our ``state.maxlen``. In the - # # alt-backend-array-case we'll need to make sure this is - # # implemented in similar ringer-buffer-ish style. - # state.queue.appendleft(value) - - # # broadcast new value to all subscribers by increasing - # # all sequence numbers that will point in the queue to - # # their latest available value. - - # # don't decrement the sequence for this task since we - # # already retreived the last value - - # # XXX: which of these impls is fastest? - - # # subs = state.subs.copy() - # # subs.pop(key) - - # for sub_key in filter( - # # lambda k: k != key, state.subs, - # partial(ne, key), state.subs, - # ): - # state.subs[sub_key] += 1 - - # # NOTE: this should ONLY be set if the above task was *NOT* - # # cancelled on the `._recv()` call. - # event.set() - # return value - - # except trio.EndOfChannel: - # # if any one consumer gets an EOC from the underlying - # # receiver we need to unblock and send that signal to - # # all other consumers. - # self._state.eoc = True - # if event.statistics().tasks_waiting: - # event.set() - # raise - - # except ( - # trio.Cancelled, - # ): - # # handle cancelled specially otherwise sibling - # # consumers will be awoken with a sequence of -1 - # # and will potentially try to rewait the underlying - # # receiver instead of just cancelling immediately. - # self._state.cancelled[key] = current_task() - # if event.statistics().tasks_waiting: - # event.set() - # raise - - # finally: - - # # Reset receiver waiter task event for next blocking condition. - # # this MUST be reset even if the above ``.recv()`` call - # # was cancelled to avoid the next consumer from blocking on - # # an event that won't be set! - # state.recv_ready = None - # This task is all caught up and ready to receive the latest - # value, so queue sched it on the internal event. + # value, so queue/schedule it to be woken on the next internal + # event. else: while state.recv_ready is not None: # seq = state.subs[key] @@ -419,9 +356,7 @@ class BroadcastReceiver(ReceiveChannel): _state=state, ) except trio.WouldBlock: - if ( - self._closed - ): + if self._closed: raise trio.ClosedResourceError subs = state.subs @@ -433,9 +368,12 @@ class BroadcastReceiver(ReceiveChannel): # XXX: we are the last and only user of this BR so # likely it makes sense to unwind back to the # underlying? - import tractor - await tractor.breakpoint() - + # import tractor + # await tractor.breakpoint() + log.warning( + f'Only one sub left for {self}?\n' + 'We can probably unwind from breceiver?' + ) # XXX: In the case where the first task to allocate the # ``.recv_ready`` event is cancelled we will be woken @@ -445,33 +383,8 @@ class BroadcastReceiver(ReceiveChannel): # been incremented and then receive again. # return await self.receive() - # if state.recv_ready is None: - - print(f'{key}: {state.statistics()}') return await self._receive_from_underlying(key, state) - # seq = state.subs[key] - - # NOTE: if we ever would like the behaviour where if the - # first task to recv on the underlying is cancelled but it - # still DOES trigger the ``.recv_ready``, event we'll likely need - # this logic: - - # if seq > -1: - # # stuff from above.. - # seq = state.subs[key] - - # value = state.queue[seq] - # state.subs[key] -= 1 - # return value - - # elif ( - # seq == -1 - # ): - - # else: - raise RuntimeError(f'Unable to receive {key}:\n{state.statistics()}') - @asynccontextmanager async def subscribe( self, From 47166e45f07bc4790793307cfecb053cd7f019dc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jan 2023 17:31:21 -0500 Subject: [PATCH 08/11] Be explicit with passthrough kwargs (there's so few) --- tractor/trionics/_broadcast.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 43af2f0..96e705a 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -157,7 +157,7 @@ class BroadcastReceiver(ReceiveChannel): rx_chan: AsyncReceiver, state: BroadcastState, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, - raise_on_lag: bool = True, + raise_on_lag: bool = False, ) -> None: @@ -441,7 +441,8 @@ def broadcast_receiver( recv_chan: AsyncReceiver, max_buffer_size: int, - **kwargs, + receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, + raise_on_lag: bool = False, ) -> BroadcastReceiver: @@ -452,5 +453,6 @@ def broadcast_receiver( maxlen=max_buffer_size, subs={}, ), - **kwargs, + receive_afunc=receive_afunc, + raise_on_lag=raise_on_lag, ) From 86377787392c1ecb06c41ba99d49677d1045baec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 30 Jan 2023 12:18:23 -0500 Subject: [PATCH 09/11] Expose `raise_on_lag: bool` flag through factory --- tractor/trionics/_broadcast.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 96e705a..42b1704 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -157,7 +157,7 @@ class BroadcastReceiver(ReceiveChannel): rx_chan: AsyncReceiver, state: BroadcastState, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, - raise_on_lag: bool = False, + raise_on_lag: bool = True, ) -> None: @@ -388,6 +388,8 @@ class BroadcastReceiver(ReceiveChannel): @asynccontextmanager async def subscribe( self, + raise_on_lag: bool = True, + ) -> AsyncIterator[BroadcastReceiver]: ''' Subscribe for values from this broadcast receiver. @@ -405,6 +407,7 @@ class BroadcastReceiver(ReceiveChannel): rx_chan=self._rx, state=state, receive_afunc=self._recv, + raise_on_lag=raise_on_lag, ) # assert clone in state.subs assert br.key in state.subs @@ -442,7 +445,7 @@ def broadcast_receiver( recv_chan: AsyncReceiver, max_buffer_size: int, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, - raise_on_lag: bool = False, + raise_on_lag: bool = True, ) -> BroadcastReceiver: From efb8bec8285025c6e6fe600745b3237b6b67d54c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 30 Jan 2023 12:26:07 -0500 Subject: [PATCH 10/11] Add a basic no-raise-on lag test --- tests/test_task_broadcasting.py | 56 +++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 636f92b..9f4a1fe 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -12,7 +12,10 @@ import pytest import trio from trio.lowlevel import current_task import tractor -from tractor.trionics import broadcast_receiver, Lagged +from tractor.trionics import ( + broadcast_receiver, + Lagged, +) @tractor.context @@ -211,7 +214,8 @@ def test_faster_task_to_recv_is_cancelled_by_slower( arb_addr, start_method, ): - '''Ensure that if a faster task consuming from a stream is cancelled + ''' + Ensure that if a faster task consuming from a stream is cancelled the slower task can continue to receive all expected values. ''' @@ -460,3 +464,51 @@ def test_first_recver_is_cancelled(): assert value == 1 trio.run(main) + + +def test_no_raise_on_lag(): + ''' + Run a simple 2-task broadcast where one task is slow but configured + so that it does not raise `Lagged` on overruns using + `raise_on_lasg=False` and verify that the task does not raise. + + ''' + size = 100 + tx, rx = trio.open_memory_channel(size) + brx = broadcast_receiver(rx, size) + + async def slow(): + async with brx.subscribe( + raise_on_lag=False, + ) as br: + async for msg in br: + print(f'slow task got: {msg}') + await trio.sleep(0.1) + + async def fast(): + async with brx.subscribe() as br: + async for msg in br: + print(f'fast task got: {msg}') + + async def main(): + async with ( + tractor.open_root_actor( + # NOTE: so we see the warning msg emitted by the bcaster + # internals when the no raise flag is set. + loglevel='warning', + ), + trio.open_nursery() as n, + ): + n.start_soon(slow) + n.start_soon(fast) + + for i in range(1000): + await tx.send(i) + + # simulate user nailing ctl-c after realizing + # there's a lag in the slow task. + await trio.sleep(1) + raise KeyboardInterrupt + + with pytest.raises(KeyboardInterrupt): + trio.run(main) From 203f95615cc9a16cb6280418e2f339b9a36dae07 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 30 Jan 2023 12:42:26 -0500 Subject: [PATCH 11/11] Add nooz --- nooz/343.trivial.rst | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 nooz/343.trivial.rst diff --git a/nooz/343.trivial.rst b/nooz/343.trivial.rst new file mode 100644 index 0000000..1193f3c --- /dev/null +++ b/nooz/343.trivial.rst @@ -0,0 +1,19 @@ +Rework our ``.trionics.BroadcastReceiver`` internals to avoid method +recursion and approach a design and interface closer to ``trio``'s +``MemoryReceiveChannel``. + +The details of the internal changes include: + +- implementing a ``BroadcastReceiver.receive_nowait()`` and using it + within the async ``.receive()`` thus avoiding recursion from + ``.receive()``. +- failing over to an internal ``._receive_from_underlying()`` when the + ``_nowait()`` call raises ``trio.WouldBlock`` +- adding ``BroadcastState.statistics()`` for debugging and testing both + internals and by users. +- add an internal ``BroadcastReceiver._raise_on_lag: bool`` which can be + set to avoid ``Lagged`` raising for possible use cases where a user + wants to choose between a [cheap or nasty + pattern](https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern) + the the particular stream (we use this in ``piker``'s dark clearing + engine to avoid fast feeds breaking during HFT periods).