Add subscription support to message streams

Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast
receiver around the stream for use by multiple actor-local consumer
tasks. Entering this context manager idempotently mutates the stream's
receive machinery which for now can not be undone. Move `.clone()` to
the receive stream type.

Resolves #204
tokio_backup
Tyler Goodlet 2021-08-15 17:42:10 -04:00
parent db2f3f787a
commit eaa761b0c7
1 changed files with 53 additions and 11 deletions

View File

@ -2,6 +2,7 @@
Message stream types and APIs.
"""
from __future__ import annotations
import inspect
from contextlib import contextmanager, asynccontextmanager
from dataclasses import dataclass
@ -17,6 +18,7 @@ import trio
from ._ipc import Channel
from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor
from ._broadcast import broadcast_receiver, BroadcastReceiver
from .log import get_logger
@ -49,6 +51,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
) -> None:
self._ctx = ctx
self._rx_chan = rx_chan
self._broadcaster: Optional[BroadcastReceiver] = None
# flag to denote end of stream
self._eoc: bool = False
@ -229,6 +232,56 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel):
# still need to consume msgs that are "in transit" from the far
# end (eg. for ``Context.result()``).
def clone(self):
"""Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return type(self)(
self._ctx,
self._rx_chan.clone(),
)
@asynccontextmanager
async def subscribe(
self,
) -> BroadcastReceiver:
'''Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream.
This allows multiple local tasks to receive each their own copy
of this message stream.
This operation is indempotent and and mutates this stream's
receive machinery to copy and window-length-store each received
value from the far end via the internally created broudcast
receiver wrapper.
'''
if self._broadcaster is None:
self._broadcaster = broadcast_receiver(
self,
self._rx_chan._state.max_buffer_size,
)
# override the original stream instance's receive to
# delegate to the broadcaster receive such that
# new subscribers will be copied received values
# XXX: this operation is indempotent and non-reversible,
# so be sure you can deal with any (theoretical) overhead
# of the the ``BroadcastReceiver`` before calling
# this method for the first time.
# XXX: why does this work without a recursion issue?!
self.receive = self._broadcaster.receive
async with self._broadcaster.subscribe() as bstream:
# a ``MsgStream`` clone is allocated for the
# broadcaster to track this entry's subscription
stream_clone = bstream._rx
assert stream_clone is not self
yield bstream
class MsgStream(ReceiveMsgStream, trio.abc.Channel):
"""
@ -245,17 +298,6 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel):
'''
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
# TODO: but make it broadcasting to consumers
def clone(self):
"""Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return MsgStream(
self._ctx,
self._rx_chan.clone(),
)
@dataclass
class Context: