forked from goodboy/tractor
1
0
Fork 0

Drop uuid4 keys, raise closed error on subscription after close

live_on_air_from_tokio
Tyler Goodlet 2021-08-20 13:04:17 -04:00
parent 2bad2bac50
commit bec3f5999d
1 changed files with 21 additions and 27 deletions

View File

@ -12,7 +12,6 @@ from functools import partial
from operator import ne
from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol
from typing import Generic, TypeVar
from uuid import uuid4
import trio
from trio._core._run import Task
@ -23,9 +22,8 @@ from trio.lowlevel import current_task
# A regular invariant generic type
T = TypeVar("T")
# The type of object produced by a ReceiveChannel (covariant because
# ReceiveChannel[Derived] can be passed to someone expecting
# ReceiveChannel[Base])
# covariant because AsyncReceiver[Derived] can be passed to someone
# expecting AsyncReceiver[Base])
ReceiveType = TypeVar("ReceiveType", covariant=True)
@ -33,9 +31,13 @@ class AsyncReceiver(
Protocol,
Generic[ReceiveType],
):
'''An async receivable duck-type that quacks much like trio's
``trio.abc.ReceieveChannel``.
'''
@abstractmethod
async def receive(self) -> ReceiveType:
'''Same as in ``trio``.'''
...
@abstractmethod
def __aiter__(self) -> AsyncIterator[ReceiveType]:
@ -60,7 +62,10 @@ class AsyncReceiver(
class Lagged(trio.TooSlowError):
'''Subscribed consumer task was too slow'''
'''Subscribed consumer task was too slow and was overrun
by the fastest consumer-producer pair.
'''
@dataclass
@ -70,11 +75,11 @@ class BroadcastState:
'''
queue: deque
# map of underlying uuid keys to receiver instances which must be
# provided as a singleton per broadcaster set.
# map of underlying instance id keys to receiver instances which
# must be provided as a singleton per broadcaster set.
subs: dict[str, int]
# broadcast event to wakeup all sleeping consumer tasks
# broadcast event to wake up all sleeping consumer tasks
# on a newly produced value from the sender.
recv_ready: Optional[tuple[str, trio.Event]] = None
@ -84,13 +89,12 @@ class BroadcastReceiver(ReceiveChannel):
fastest consumer.
Additional consumer tasks can receive all produced values by registering
with ``.subscribe()`` and receiving from thew new instance it delivers.
with ``.subscribe()`` and receiving from the new instance it delivers.
'''
def __init__(
self,
key: str,
rx_chan: AsyncReceiver,
state: BroadcastState,
receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None,
@ -98,9 +102,9 @@ class BroadcastReceiver(ReceiveChannel):
) -> None:
# register the original underlying (clone)
self.key = key
self.key = id(self)
self._state = state
state.subs[key] = -1
state.subs[self.key] = -1
# underlying for this receiver
self._rx = rx_chan
@ -216,29 +220,23 @@ class BroadcastReceiver(ReceiveChannel):
provided at creation.
'''
# use a uuid4 for a tee-instance token
key = str(uuid4())
if self._closed:
raise trio.ClosedResourceError
state = self._state
br = BroadcastReceiver(
key=key,
rx_chan=self._rx,
state=state,
receive_afunc=self._recv,
)
# assert clone in state.subs
assert key in state.subs
assert br.key in state.subs
try:
yield br
finally:
await br.aclose()
# TODO:
# - should there be some ._closed flag that causes
# consumers to die **before** they read all queued values?
# - if subs only open and close clones then the underlying
# will never be killed until the last instance closes?
# This is correct right?
async def aclose(
self,
) -> None:
@ -248,10 +246,7 @@ class BroadcastReceiver(ReceiveChannel):
# XXX: leaving it like this consumers can still get values
# up to the last received that still reside in the queue.
# Is this what we want?
self._state.subs.pop(self.key)
# if not self._state.subs:
# await self._rx.aclose()
self._closed = True
@ -265,7 +260,6 @@ def broadcast_receiver(
) -> BroadcastReceiver:
return BroadcastReceiver(
str(uuid4()),
recv_chan,
state=BroadcastState(
queue=deque(maxlen=max_buffer_size),