Switch back to raising `Lagged` by default

Makes the broadcast test suite not hang xD, and is our expected default
behaviour. Also removes a ton of commented legacy cruft from before the
refactor to remove the `.receive()` recursion and fixes some typing.

Oh right, and in the case where there's only one subscriber left we warn
log about it since in theory we could actually entirely unwind the
bcaster back to the original underlying, though not sure if that's sane
or works for some use cases (like wanting to have some other subscriber
get added dynamically later).
breceiver_internals
Tyler Goodlet 2023-01-29 14:35:02 -05:00
parent 80f983818f
commit 4ce2dcd12b
1 changed files with 23 additions and 110 deletions
tractor/trionics

View File

@ -113,19 +113,22 @@ class BroadcastState(Struct):
# If the broadcaster was cancelled, we might as well track it # If the broadcaster was cancelled, we might as well track it
cancelled: dict[int, Task] = {} cancelled: dict[int, Task] = {}
def statistics(self) -> dict[str, str | int | float]: def statistics(self) -> dict[str, Any]:
''' '''
Return broadcast receiver group "statistics" like many of Return broadcast receiver group "statistics" like many of
``trio``'s internal task-sync primitives. ``trio``'s internal task-sync primitives.
''' '''
key: int | None
ev: trio.Event | None
subs = self.subs subs = self.subs
if self.recv_ready is not None: if self.recv_ready is not None:
key, ev = self.recv_ready key, ev = self.recv_ready
else: else:
key = ev = None key = ev = None
qlens = {} qlens: dict[int, int] = {}
for tid, sz in subs.items(): for tid, sz in subs.items():
qlens[tid] = sz if sz != -1 else 0 qlens[tid] = sz if sz != -1 else 0
@ -154,7 +157,7 @@ class BroadcastReceiver(ReceiveChannel):
rx_chan: AsyncReceiver, rx_chan: AsyncReceiver,
state: BroadcastState, state: BroadcastState,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = False, raise_on_lag: bool = True,
) -> None: ) -> None:
@ -180,7 +183,12 @@ class BroadcastReceiver(ReceiveChannel):
_key: int | None = None, _key: int | None = None,
_state: BroadcastState | None = None, _state: BroadcastState | None = None,
) -> ReceiveType: ) -> Any:
'''
Sync version of `.receive()` which does all the low level work
of receiving from the underlying/wrapped receive channel.
'''
key = _key or self.key key = _key or self.key
state = _state or self._state state = _state or self._state
@ -273,7 +281,6 @@ class BroadcastReceiver(ReceiveChannel):
# already retreived the last value # already retreived the last value
# XXX: which of these impls is fastest? # XXX: which of these impls is fastest?
# subs = state.subs.copy() # subs = state.subs.copy()
# subs.pop(key) # subs.pop(key)
@ -310,7 +317,6 @@ class BroadcastReceiver(ReceiveChannel):
raise raise
finally: finally:
# Reset receiver waiter task event for next blocking condition. # Reset receiver waiter task event for next blocking condition.
# this MUST be reset even if the above ``.recv()`` call # this MUST be reset even if the above ``.recv()`` call
# was cancelled to avoid the next consumer from blocking on # was cancelled to avoid the next consumer from blocking on
@ -330,83 +336,14 @@ class BroadcastReceiver(ReceiveChannel):
pass pass
# current task already has the latest value **and** is the # current task already has the latest value **and** is the
# first task to begin waiting for a new one # first task to begin waiting for a new one so we begin blocking
# until rescheduled with the a new value from the underlying.
if state.recv_ready is None: if state.recv_ready is None:
return await self._receive_from_underlying(key, state) return await self._receive_from_underlying(key, state)
# if self._closed:
# raise trio.ClosedResourceError
# event = trio.Event()
# state.recv_ready = key, event
# try:
# # if we're cancelled here it should be
# # fine to bail without affecting any other consumers
# # right?
# value = await self._recv()
# # items with lower indices are "newer"
# # NOTE: ``collections.deque`` implicitly takes care of
# # trucating values outside our ``state.maxlen``. In the
# # alt-backend-array-case we'll need to make sure this is
# # implemented in similar ringer-buffer-ish style.
# state.queue.appendleft(value)
# # broadcast new value to all subscribers by increasing
# # all sequence numbers that will point in the queue to
# # their latest available value.
# # don't decrement the sequence for this task since we
# # already retreived the last value
# # XXX: which of these impls is fastest?
# # subs = state.subs.copy()
# # subs.pop(key)
# for sub_key in filter(
# # lambda k: k != key, state.subs,
# partial(ne, key), state.subs,
# ):
# state.subs[sub_key] += 1
# # NOTE: this should ONLY be set if the above task was *NOT*
# # cancelled on the `._recv()` call.
# event.set()
# return value
# except trio.EndOfChannel:
# # if any one consumer gets an EOC from the underlying
# # receiver we need to unblock and send that signal to
# # all other consumers.
# self._state.eoc = True
# if event.statistics().tasks_waiting:
# event.set()
# raise
# except (
# trio.Cancelled,
# ):
# # handle cancelled specially otherwise sibling
# # consumers will be awoken with a sequence of -1
# # and will potentially try to rewait the underlying
# # receiver instead of just cancelling immediately.
# self._state.cancelled[key] = current_task()
# if event.statistics().tasks_waiting:
# event.set()
# raise
# finally:
# # Reset receiver waiter task event for next blocking condition.
# # this MUST be reset even if the above ``.recv()`` call
# # was cancelled to avoid the next consumer from blocking on
# # an event that won't be set!
# state.recv_ready = None
# This task is all caught up and ready to receive the latest # This task is all caught up and ready to receive the latest
# value, so queue sched it on the internal event. # value, so queue/schedule it to be woken on the next internal
# event.
else: else:
while state.recv_ready is not None: while state.recv_ready is not None:
# seq = state.subs[key] # seq = state.subs[key]
@ -419,9 +356,7 @@ class BroadcastReceiver(ReceiveChannel):
_state=state, _state=state,
) )
except trio.WouldBlock: except trio.WouldBlock:
if ( if self._closed:
self._closed
):
raise trio.ClosedResourceError raise trio.ClosedResourceError
subs = state.subs subs = state.subs
@ -433,9 +368,12 @@ class BroadcastReceiver(ReceiveChannel):
# XXX: we are the last and only user of this BR so # XXX: we are the last and only user of this BR so
# likely it makes sense to unwind back to the # likely it makes sense to unwind back to the
# underlying? # underlying?
import tractor # import tractor
await tractor.breakpoint() # await tractor.breakpoint()
log.warning(
f'Only one sub left for {self}?\n'
'We can probably unwind from breceiver?'
)
# XXX: In the case where the first task to allocate the # XXX: In the case where the first task to allocate the
# ``.recv_ready`` event is cancelled we will be woken # ``.recv_ready`` event is cancelled we will be woken
@ -445,33 +383,8 @@ class BroadcastReceiver(ReceiveChannel):
# been incremented and then receive again. # been incremented and then receive again.
# return await self.receive() # return await self.receive()
# if state.recv_ready is None:
print(f'{key}: {state.statistics()}')
return await self._receive_from_underlying(key, state) return await self._receive_from_underlying(key, state)
# seq = state.subs[key]
# NOTE: if we ever would like the behaviour where if the
# first task to recv on the underlying is cancelled but it
# still DOES trigger the ``.recv_ready``, event we'll likely need
# this logic:
# if seq > -1:
# # stuff from above..
# seq = state.subs[key]
# value = state.queue[seq]
# state.subs[key] -= 1
# return value
# elif (
# seq == -1
# ):
# else:
raise RuntimeError(f'Unable to receive {key}:\n{state.statistics()}')
@asynccontextmanager @asynccontextmanager
async def subscribe( async def subscribe(
self, self,