From 4ce2dcd12bbcbc85b389c3d57a3252685bcb0cc3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 29 Jan 2023 14:35:02 -0500 Subject: [PATCH] Switch back to raising `Lagged` by default Makes the broadcast test suite not hang xD, and is our expected default behaviour. Also removes a ton of commented legacy cruft from before the refactor to remove the `.receive()` recursion and fixes some typing. Oh right, and in the case where there's only one subscriber left we warn log about it since in theory we could actually entirely unwind the bcaster back to the original underlying, though not sure if that's sane or works for some use cases (like wanting to have some other subscriber get added dynamically later). --- tractor/trionics/_broadcast.py | 133 ++++++--------------------------- 1 file changed, 23 insertions(+), 110 deletions(-) diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 9bec416..43af2f0 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -113,19 +113,22 @@ class BroadcastState(Struct): # If the broadcaster was cancelled, we might as well track it cancelled: dict[int, Task] = {} - def statistics(self) -> dict[str, str | int | float]: + def statistics(self) -> dict[str, Any]: ''' Return broadcast receiver group "statistics" like many of ``trio``'s internal task-sync primitives. ''' + key: int | None + ev: trio.Event | None + subs = self.subs if self.recv_ready is not None: key, ev = self.recv_ready else: key = ev = None - qlens = {} + qlens: dict[int, int] = {} for tid, sz in subs.items(): qlens[tid] = sz if sz != -1 else 0 @@ -154,7 +157,7 @@ class BroadcastReceiver(ReceiveChannel): rx_chan: AsyncReceiver, state: BroadcastState, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, - raise_on_lag: bool = False, + raise_on_lag: bool = True, ) -> None: @@ -180,7 +183,12 @@ class BroadcastReceiver(ReceiveChannel): _key: int | None = None, _state: BroadcastState | None = None, - ) -> ReceiveType: + ) -> Any: + ''' + Sync version of `.receive()` which does all the low level work + of receiving from the underlying/wrapped receive channel. + + ''' key = _key or self.key state = _state or self._state @@ -273,7 +281,6 @@ class BroadcastReceiver(ReceiveChannel): # already retreived the last value # XXX: which of these impls is fastest? - # subs = state.subs.copy() # subs.pop(key) @@ -310,7 +317,6 @@ class BroadcastReceiver(ReceiveChannel): raise finally: - # Reset receiver waiter task event for next blocking condition. # this MUST be reset even if the above ``.recv()`` call # was cancelled to avoid the next consumer from blocking on @@ -330,83 +336,14 @@ class BroadcastReceiver(ReceiveChannel): pass # current task already has the latest value **and** is the - # first task to begin waiting for a new one + # first task to begin waiting for a new one so we begin blocking + # until rescheduled with the a new value from the underlying. if state.recv_ready is None: return await self._receive_from_underlying(key, state) - # if self._closed: - # raise trio.ClosedResourceError - - # event = trio.Event() - # state.recv_ready = key, event - - # try: - # # if we're cancelled here it should be - # # fine to bail without affecting any other consumers - # # right? - # value = await self._recv() - - # # items with lower indices are "newer" - # # NOTE: ``collections.deque`` implicitly takes care of - # # trucating values outside our ``state.maxlen``. In the - # # alt-backend-array-case we'll need to make sure this is - # # implemented in similar ringer-buffer-ish style. - # state.queue.appendleft(value) - - # # broadcast new value to all subscribers by increasing - # # all sequence numbers that will point in the queue to - # # their latest available value. - - # # don't decrement the sequence for this task since we - # # already retreived the last value - - # # XXX: which of these impls is fastest? - - # # subs = state.subs.copy() - # # subs.pop(key) - - # for sub_key in filter( - # # lambda k: k != key, state.subs, - # partial(ne, key), state.subs, - # ): - # state.subs[sub_key] += 1 - - # # NOTE: this should ONLY be set if the above task was *NOT* - # # cancelled on the `._recv()` call. - # event.set() - # return value - - # except trio.EndOfChannel: - # # if any one consumer gets an EOC from the underlying - # # receiver we need to unblock and send that signal to - # # all other consumers. - # self._state.eoc = True - # if event.statistics().tasks_waiting: - # event.set() - # raise - - # except ( - # trio.Cancelled, - # ): - # # handle cancelled specially otherwise sibling - # # consumers will be awoken with a sequence of -1 - # # and will potentially try to rewait the underlying - # # receiver instead of just cancelling immediately. - # self._state.cancelled[key] = current_task() - # if event.statistics().tasks_waiting: - # event.set() - # raise - - # finally: - - # # Reset receiver waiter task event for next blocking condition. - # # this MUST be reset even if the above ``.recv()`` call - # # was cancelled to avoid the next consumer from blocking on - # # an event that won't be set! - # state.recv_ready = None - # This task is all caught up and ready to receive the latest - # value, so queue sched it on the internal event. + # value, so queue/schedule it to be woken on the next internal + # event. else: while state.recv_ready is not None: # seq = state.subs[key] @@ -419,9 +356,7 @@ class BroadcastReceiver(ReceiveChannel): _state=state, ) except trio.WouldBlock: - if ( - self._closed - ): + if self._closed: raise trio.ClosedResourceError subs = state.subs @@ -433,9 +368,12 @@ class BroadcastReceiver(ReceiveChannel): # XXX: we are the last and only user of this BR so # likely it makes sense to unwind back to the # underlying? - import tractor - await tractor.breakpoint() - + # import tractor + # await tractor.breakpoint() + log.warning( + f'Only one sub left for {self}?\n' + 'We can probably unwind from breceiver?' + ) # XXX: In the case where the first task to allocate the # ``.recv_ready`` event is cancelled we will be woken @@ -445,33 +383,8 @@ class BroadcastReceiver(ReceiveChannel): # been incremented and then receive again. # return await self.receive() - # if state.recv_ready is None: - - print(f'{key}: {state.statistics()}') return await self._receive_from_underlying(key, state) - # seq = state.subs[key] - - # NOTE: if we ever would like the behaviour where if the - # first task to recv on the underlying is cancelled but it - # still DOES trigger the ``.recv_ready``, event we'll likely need - # this logic: - - # if seq > -1: - # # stuff from above.. - # seq = state.subs[key] - - # value = state.queue[seq] - # state.subs[key] -= 1 - # return value - - # elif ( - # seq == -1 - # ): - - # else: - raise RuntimeError(f'Unable to receive {key}:\n{state.statistics()}') - @asynccontextmanager async def subscribe( self,