forked from goodboy/tractor
				
			Facepalm: use single `_subs` per clone set
							parent
							
								
									4ad75a3287
								
							
						
					
					
						commit
						6e78bcf898
					
				| 
						 | 
				
			
			@ -29,18 +29,26 @@ class BroadcastReceiver(ReceiveChannel):
 | 
			
		|||
    with ``.subscribe()`` and receiving from thew new instance it delivers.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # map of underlying clones to receiver wrappers
 | 
			
		||||
    _subs: dict[trio.ReceiveChannel, BroadcastReceiver] = {}
 | 
			
		||||
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
 | 
			
		||||
        rx_chan: ReceiveChannel,
 | 
			
		||||
        queue: deque,
 | 
			
		||||
        _subs: dict[trio.ReceiveChannel, BroadcastReceiver],
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
 | 
			
		||||
        # map of underlying clones to receiver wrappers
 | 
			
		||||
        # which must be provided as a singleton per broadcaster
 | 
			
		||||
        # clone-subscription set.
 | 
			
		||||
        self._subs = _subs
 | 
			
		||||
 | 
			
		||||
        # underlying for this receiver
 | 
			
		||||
        self._rx = rx_chan
 | 
			
		||||
 | 
			
		||||
        # register the original underlying (clone)
 | 
			
		||||
        self._subs[rx_chan] = -1
 | 
			
		||||
 | 
			
		||||
        self._queue = queue
 | 
			
		||||
        self._value_received: Optional[trio.Event] = None
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -137,18 +145,21 @@ class BroadcastReceiver(ReceiveChannel):
 | 
			
		|||
 | 
			
		||||
        '''
 | 
			
		||||
        clone = self._rx.clone()
 | 
			
		||||
        self._subs[clone] = -1
 | 
			
		||||
        br = BroadcastReceiver(
 | 
			
		||||
            clone,
 | 
			
		||||
            self._queue,
 | 
			
		||||
            _subs=self._subs,
 | 
			
		||||
        )
 | 
			
		||||
        assert clone in self._subs
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            yield BroadcastReceiver(
 | 
			
		||||
                clone,
 | 
			
		||||
                self._queue,
 | 
			
		||||
            )
 | 
			
		||||
            yield br
 | 
			
		||||
        finally:
 | 
			
		||||
            # drop from subscribers and close
 | 
			
		||||
            self._subs.pop(clone)
 | 
			
		||||
            # XXX: this is the reason this function is async: the
 | 
			
		||||
            # ``AsyncResource`` api.
 | 
			
		||||
            await clone.aclose()
 | 
			
		||||
            # drop from subscribers and close
 | 
			
		||||
            self._subs.pop(clone)
 | 
			
		||||
 | 
			
		||||
    # TODO:
 | 
			
		||||
    # - should there be some ._closed flag that causes
 | 
			
		||||
| 
						 | 
				
			
			@ -176,6 +187,7 @@ def broadcast_receiver(
 | 
			
		|||
    return BroadcastReceiver(
 | 
			
		||||
        recv_chan,
 | 
			
		||||
        queue=deque(maxlen=max_buffer_size),
 | 
			
		||||
        _subs={},  # this is singleton over all subscriptions
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue