diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index c5f517c..bfd70ce 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -29,18 +29,26 @@ class BroadcastReceiver(ReceiveChannel): with ``.subscribe()`` and receiving from thew new instance it delivers. ''' - # map of underlying clones to receiver wrappers - _subs: dict[trio.ReceiveChannel, BroadcastReceiver] = {} - def __init__( self, rx_chan: ReceiveChannel, queue: deque, + _subs: dict[trio.ReceiveChannel, BroadcastReceiver], ) -> None: + # map of underlying clones to receiver wrappers + # which must be provided as a singleton per broadcaster + # clone-subscription set. + self._subs = _subs + + # underlying for this receiver self._rx = rx_chan + + # register the original underlying (clone) + self._subs[rx_chan] = -1 + self._queue = queue self._value_received: Optional[trio.Event] = None @@ -137,18 +145,21 @@ class BroadcastReceiver(ReceiveChannel): ''' clone = self._rx.clone() - self._subs[clone] = -1 + br = BroadcastReceiver( + clone, + self._queue, + _subs=self._subs, + ) + assert clone in self._subs + try: - yield BroadcastReceiver( - clone, - self._queue, - ) + yield br finally: - # drop from subscribers and close - self._subs.pop(clone) # XXX: this is the reason this function is async: the # ``AsyncResource`` api. await clone.aclose() + # drop from subscribers and close + self._subs.pop(clone) # TODO: # - should there be some ._closed flag that causes @@ -176,6 +187,7 @@ def broadcast_receiver( return BroadcastReceiver( recv_chan, queue=deque(maxlen=max_buffer_size), + _subs={}, # this is singleton over all subscriptions )