forked from goodboy/tractor
1
0
Fork 0

Add `._raise_on_lag` flag to disable `Lag` raising

breceiver_internals
Tyler Goodlet 2023-01-08 13:18:51 -05:00
parent c8efcdd0d3
commit 2707a0e971
1 changed files with 10 additions and 1 deletions

View File

@ -33,7 +33,9 @@ from trio._core._run import Task
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio.lowlevel import current_task from trio.lowlevel import current_task
from msgspec import Struct from msgspec import Struct
from tractor.log import get_logger
log = get_logger(__name__)
# A regular invariant generic type # A regular invariant generic type
T = TypeVar("T") T = TypeVar("T")
@ -152,6 +154,7 @@ class BroadcastReceiver(ReceiveChannel):
rx_chan: AsyncReceiver, rx_chan: AsyncReceiver,
state: BroadcastState, state: BroadcastState,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
raise_on_lag: bool = False,
) -> None: ) -> None:
@ -170,6 +173,7 @@ class BroadcastReceiver(ReceiveChannel):
self._rx = rx_chan self._rx = rx_chan
self._recv = receive_afunc or rx_chan.receive self._recv = receive_afunc or rx_chan.receive
self._closed: bool = False self._closed: bool = False
self._raise_on_lag = raise_on_lag
def receive_nowait( def receive_nowait(
self, self,
@ -218,7 +222,12 @@ class BroadcastReceiver(ReceiveChannel):
# this task was overrun by the producer side # this task was overrun by the producer side
task: Task = current_task() task: Task = current_task()
raise Lagged(f'Task {task.name} was overrun') msg = f'Task {task.name} was overrun'
if self._raise_on_lag:
raise Lagged(msg)
else:
log.warning(msg)
state.subs[key] -= 1 state.subs[key] -= 1
return value return value