forked from goodboy/tractor
1
0
Fork 0

Facepalm: use single `_subs` per clone set

tokio_backup
Tyler Goodlet 2021-08-10 12:38:26 -04:00
parent 3f9b860210
commit 9d12cc80dd
1 changed files with 22 additions and 10 deletions

View File

@ -29,18 +29,26 @@ class BroadcastReceiver(ReceiveChannel):
with ``.subscribe()`` and receiving from thew new instance it delivers. with ``.subscribe()`` and receiving from thew new instance it delivers.
''' '''
# map of underlying clones to receiver wrappers
_subs: dict[trio.ReceiveChannel, BroadcastReceiver] = {}
def __init__( def __init__(
self, self,
rx_chan: ReceiveChannel, rx_chan: ReceiveChannel,
queue: deque, queue: deque,
_subs: dict[trio.ReceiveChannel, BroadcastReceiver],
) -> None: ) -> None:
# map of underlying clones to receiver wrappers
# which must be provided as a singleton per broadcaster
# clone-subscription set.
self._subs = _subs
# underlying for this receiver
self._rx = rx_chan self._rx = rx_chan
# register the original underlying (clone)
self._subs[rx_chan] = -1
self._queue = queue self._queue = queue
self._value_received: Optional[trio.Event] = None self._value_received: Optional[trio.Event] = None
@ -137,18 +145,21 @@ class BroadcastReceiver(ReceiveChannel):
''' '''
clone = self._rx.clone() clone = self._rx.clone()
self._subs[clone] = -1 br = BroadcastReceiver(
clone,
self._queue,
_subs=self._subs,
)
assert clone in self._subs
try: try:
yield BroadcastReceiver( yield br
clone,
self._queue,
)
finally: finally:
# drop from subscribers and close
self._subs.pop(clone)
# XXX: this is the reason this function is async: the # XXX: this is the reason this function is async: the
# ``AsyncResource`` api. # ``AsyncResource`` api.
await clone.aclose() await clone.aclose()
# drop from subscribers and close
self._subs.pop(clone)
# TODO: # TODO:
# - should there be some ._closed flag that causes # - should there be some ._closed flag that causes
@ -176,6 +187,7 @@ def broadcast_receiver(
return BroadcastReceiver( return BroadcastReceiver(
recv_chan, recv_chan,
queue=deque(maxlen=max_buffer_size), queue=deque(maxlen=max_buffer_size),
_subs={}, # this is singleton over all subscriptions
) )