diff --git a/tractor/trionics/_broadcast.py b/tractor/trionics/_broadcast.py index 6b2725d..12ab382 100644 --- a/tractor/trionics/_broadcast.py +++ b/tractor/trionics/_broadcast.py @@ -33,7 +33,9 @@ from trio._core._run import Task from trio.abc import ReceiveChannel from trio.lowlevel import current_task from msgspec import Struct +from tractor.log import get_logger +log = get_logger(__name__) # A regular invariant generic type T = TypeVar("T") @@ -152,6 +154,7 @@ class BroadcastReceiver(ReceiveChannel): rx_chan: AsyncReceiver, state: BroadcastState, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, + raise_on_lag: bool = False, ) -> None: @@ -170,6 +173,7 @@ class BroadcastReceiver(ReceiveChannel): self._rx = rx_chan self._recv = receive_afunc or rx_chan.receive self._closed: bool = False + self._raise_on_lag = raise_on_lag def receive_nowait( self, @@ -218,7 +222,12 @@ class BroadcastReceiver(ReceiveChannel): # this task was overrun by the producer side task: Task = current_task() - raise Lagged(f'Task {task.name} was overrun') + msg = f'Task {task.name} was overrun' + + if self._raise_on_lag: + raise Lagged(msg) + else: + log.warning(msg) state.subs[key] -= 1 return value