diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 9bec416..43af2f0 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -113,19 +113,22 @@ class BroadcastState(Struct): # If the broadcaster was cancelled, we might as well track it cancelled: dict[int, Task] = {} - def statistics(self) -> dict[str, str | int | float]: + def statistics(self) -> dict[str, Any]: ''' Return broadcast receiver group "statistics" like many of ``trio``'s internal task-sync primitives. ''' + key: int | None + ev: trio.Event | None + subs = self.subs if self.recv_ready is not None: key, ev = self.recv_ready else: key = ev = None - qlens = {} + qlens: dict[int, int] = {} for tid, sz in subs.items(): qlens[tid] = sz if sz != -1 else 0 @@ -154,7 +157,7 @@ class BroadcastReceiver(ReceiveChannel): rx_chan: AsyncReceiver, state: BroadcastState, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, - raise_on_lag: bool = False, + raise_on_lag: bool = True, ) -> None: @@ -180,7 +183,12 @@ class BroadcastReceiver(ReceiveChannel): _key: int | None = None, _state: BroadcastState | None = None, - ) -> ReceiveType: + ) -> Any: + ''' + Sync version of `.receive()` which does all the low level work + of receiving from the underlying/wrapped receive channel. + + ''' key = _key or self.key state = _state or self._state @@ -273,7 +281,6 @@ class BroadcastReceiver(ReceiveChannel): # already retreived the last value # XXX: which of these impls is fastest? - # subs = state.subs.copy() # subs.pop(key) @@ -310,7 +317,6 @@ class BroadcastReceiver(ReceiveChannel): raise finally: - # Reset receiver waiter task event for next blocking condition. # this MUST be reset even if the above ``.recv()`` call # was cancelled to avoid the next consumer from blocking on @@ -330,83 +336,14 @@ class BroadcastReceiver(ReceiveChannel): pass # current task already has the latest value **and** is the - # first task to begin waiting for a new one + # first task to begin waiting for a new one so we begin blocking + # until rescheduled with the a new value from the underlying. if state.recv_ready is None: return await self._receive_from_underlying(key, state) - # if self._closed: - # raise trio.ClosedResourceError - - # event = trio.Event() - # state.recv_ready = key, event - - # try: - # # if we're cancelled here it should be - # # fine to bail without affecting any other consumers - # # right? - # value = await self._recv() - - # # items with lower indices are "newer" - # # NOTE: ``collections.deque`` implicitly takes care of - # # trucating values outside our ``state.maxlen``. In the - # # alt-backend-array-case we'll need to make sure this is - # # implemented in similar ringer-buffer-ish style. - # state.queue.appendleft(value) - - # # broadcast new value to all subscribers by increasing - # # all sequence numbers that will point in the queue to - # # their latest available value. - - # # don't decrement the sequence for this task since we - # # already retreived the last value - - # # XXX: which of these impls is fastest? - - # # subs = state.subs.copy() - # # subs.pop(key) - - # for sub_key in filter( - # # lambda k: k != key, state.subs, - # partial(ne, key), state.subs, - # ): - # state.subs[sub_key] += 1 - - # # NOTE: this should ONLY be set if the above task was *NOT* - # # cancelled on the `._recv()` call. - # event.set() - # return value - - # except trio.EndOfChannel: - # # if any one consumer gets an EOC from the underlying - # # receiver we need to unblock and send that signal to - # # all other consumers. - # self._state.eoc = True - # if event.statistics().tasks_waiting: - # event.set() - # raise - - # except ( - # trio.Cancelled, - # ): - # # handle cancelled specially otherwise sibling - # # consumers will be awoken with a sequence of -1 - # # and will potentially try to rewait the underlying - # # receiver instead of just cancelling immediately. - # self._state.cancelled[key] = current_task() - # if event.statistics().tasks_waiting: - # event.set() - # raise - - # finally: - - # # Reset receiver waiter task event for next blocking condition. - # # this MUST be reset even if the above ``.recv()`` call - # # was cancelled to avoid the next consumer from blocking on - # # an event that won't be set! - # state.recv_ready = None - # This task is all caught up and ready to receive the latest - # value, so queue sched it on the internal event. + # value, so queue/schedule it to be woken on the next internal + # event. else: while state.recv_ready is not None: # seq = state.subs[key] @@ -419,9 +356,7 @@ class BroadcastReceiver(ReceiveChannel): _state=state, ) except trio.WouldBlock: - if ( - self._closed - ): + if self._closed: raise trio.ClosedResourceError subs = state.subs @@ -433,9 +368,12 @@ class BroadcastReceiver(ReceiveChannel): # XXX: we are the last and only user of this BR so # likely it makes sense to unwind back to the # underlying? - import tractor - await tractor.breakpoint() - + # import tractor + # await tractor.breakpoint() + log.warning( + f'Only one sub left for {self}?\n' + 'We can probably unwind from breceiver?' + ) # XXX: In the case where the first task to allocate the # ``.recv_ready`` event is cancelled we will be woken @@ -445,33 +383,8 @@ class BroadcastReceiver(ReceiveChannel): # been incremented and then receive again. # return await self.receive() - # if state.recv_ready is None: - - print(f'{key}: {state.statistics()}') return await self._receive_from_underlying(key, state) - # seq = state.subs[key] - - # NOTE: if we ever would like the behaviour where if the - # first task to recv on the underlying is cancelled but it - # still DOES trigger the ``.recv_ready``, event we'll likely need - # this logic: - - # if seq > -1: - # # stuff from above.. - # seq = state.subs[key] - - # value = state.queue[seq] - # state.subs[key] -= 1 - # return value - - # elif ( - # seq == -1 - # ): - - # else: - raise RuntimeError(f'Unable to receive {key}:\n{state.statistics()}') - @asynccontextmanager async def subscribe( self,