WIP, "revertible" or "dynamic" multicast streams

TODO, write up the deats, prolly by distilling (todo) notes from
`tests/test_resource_cache.py::test_open_local_sub_to_stream` comments!
multicast_revertable_streams
Tyler Goodlet 2025-07-15 22:15:19 -04:00
parent 7a075494f1
commit 7e49ac678b
3 changed files with 211 additions and 97 deletions

View File

@ -67,7 +67,6 @@ async def ensure_sequence(
@acm @acm
async def open_sequence_streamer( async def open_sequence_streamer(
sequence: list[int], sequence: list[int],
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
start_method: str, start_method: str,
@ -96,39 +95,43 @@ async def open_sequence_streamer(
def test_stream_fan_out_to_local_subscriptions( def test_stream_fan_out_to_local_subscriptions(
reg_addr, debug_mode: bool,
reg_addr: tuple,
start_method, start_method,
): ):
sequence = list(range(1000)) sequence = list(range(1000))
async def main(): async def main():
with trio.fail_after(9):
async with open_sequence_streamer(
sequence,
reg_addr,
start_method,
) as stream:
async with open_sequence_streamer( async with (
sequence, collapse_eg(),
reg_addr, trio.open_nursery() as tn,
start_method, ):
) as stream: for i in range(10):
tn.start_soon(
ensure_sequence,
stream,
sequence.copy(),
name=f'consumer_{i}',
)
async with trio.open_nursery() as n: await stream.send(tuple(sequence))
for i in range(10):
n.start_soon(
ensure_sequence,
stream,
sequence.copy(),
name=f'consumer_{i}',
)
await stream.send(tuple(sequence)) async for value in stream:
print(f'source stream rx: {value}')
assert value == sequence[0]
sequence.remove(value)
async for value in stream: if not sequence:
print(f'source stream rx: {value}') # fully consumed
assert value == sequence[0] break
sequence.remove(value)
if not sequence:
# fully consumed
break
trio.run(main) trio.run(main)
@ -151,67 +154,69 @@ def test_consumer_and_parent_maybe_lag(
sequence = list(range(300)) sequence = list(range(300))
parent_delay, sub_delay = task_delays parent_delay, sub_delay = task_delays
async with open_sequence_streamer( # TODO, maybe mak a cm-deco for main()s?
sequence, with trio.fail_after(3):
reg_addr, async with open_sequence_streamer(
start_method, sequence,
) as stream: reg_addr,
start_method,
) as stream:
try: try:
async with ( async with (
collapse_eg(), collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
tn.start_soon( tn.start_soon(
ensure_sequence, ensure_sequence,
stream, stream,
sequence.copy(), sequence.copy(),
sub_delay, sub_delay,
name='consumer_task', name='consumer_task',
) )
await stream.send(tuple(sequence)) await stream.send(tuple(sequence))
# async for value in stream: # async for value in stream:
lagged = False lagged = False
lag_count = 0 lag_count = 0
while True: while True:
try: try:
value = await stream.receive() value = await stream.receive()
print(f'source stream rx: {value}') print(f'source stream rx: {value}')
if lagged: if lagged:
# re set the sequence starting at our last # re set the sequence starting at our last
# value # value
sequence = sequence[sequence.index(value) + 1:] sequence = sequence[sequence.index(value) + 1:]
else: else:
assert value == sequence[0] assert value == sequence[0]
sequence.remove(value) sequence.remove(value)
lagged = False lagged = False
except Lagged: except Lagged:
lagged = True lagged = True
print(f'source stream lagged after {value}') print(f'source stream lagged after {value}')
lag_count += 1 lag_count += 1
continue continue
# lag the parent # lag the parent
await trio.sleep(parent_delay) await trio.sleep(parent_delay)
if not sequence: if not sequence:
# fully consumed # fully consumed
break break
print(f'parent + source stream lagged: {lag_count}') print(f'parent + source stream lagged: {lag_count}')
if parent_delay > sub_delay: if parent_delay > sub_delay:
assert lag_count > 0 assert lag_count > 0
except Lagged: except Lagged:
# child was lagged # child was lagged
assert parent_delay < sub_delay assert parent_delay < sub_delay
trio.run(main) trio.run(main)
@ -285,7 +290,11 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
def test_subscribe_errors_after_close(): def test_subscribe_errors_after_close():
'''
Verify after calling `BroadcastReceiver.aclose()` you can't
"re-open" it via `.subscribe()`.
'''
async def main(): async def main():
size = 1 size = 1
@ -293,6 +302,8 @@ def test_subscribe_errors_after_close():
async with broadcast_receiver(rx, size) as brx: async with broadcast_receiver(rx, size) as brx:
pass pass
assert brx.key not in brx._state.subs
try: try:
# open and close # open and close
async with brx.subscribe(): async with brx.subscribe():
@ -302,7 +313,7 @@ def test_subscribe_errors_after_close():
assert brx.key not in brx._state.subs assert brx.key not in brx._state.subs
else: else:
assert 0 pytest.fail('brx.subscribe() never raised!?')
trio.run(main) trio.run(main)

View File

@ -102,6 +102,9 @@ class MsgStream(trio.abc.Channel):
self._eoc: bool|trio.EndOfChannel = False self._eoc: bool|trio.EndOfChannel = False
self._closed: bool|trio.ClosedResourceError = False self._closed: bool|trio.ClosedResourceError = False
def is_eoc(self) -> bool|trio.EndOfChannel:
return self._eoc
@property @property
def ctx(self) -> Context: def ctx(self) -> Context:
''' '''
@ -188,7 +191,14 @@ class MsgStream(trio.abc.Channel):
return pld return pld
async def receive( # XXX NOTE, this is left private because in `.subscribe()` usage
# we rebind the public `.recieve()` to a `BroadcastReceiver` but
# on `.subscribe().__aexit__()`, for the first task which enters,
# we want to revert to this msg-stream-instance's method since
# mult-task-tracking provided by the b-caster is then no longer
# necessary.
#
async def _receive(
self, self,
hide_tb: bool = False, hide_tb: bool = False,
): ):
@ -313,6 +323,8 @@ class MsgStream(trio.abc.Channel):
raise src_err raise src_err
receive = _receive
async def aclose(self) -> list[Exception|dict]: async def aclose(self) -> list[Exception|dict]:
''' '''
Cancel associated remote actor task and local memory channel on Cancel associated remote actor task and local memory channel on
@ -528,10 +540,15 @@ class MsgStream(trio.abc.Channel):
receiver wrapper. receiver wrapper.
''' '''
# NOTE: This operation is indempotent and non-reversible, so be # XXX NOTE, This operation was originally implemented as
# sure you can deal with any (theoretical) overhead of the the # indempotent and non-reversible, so you had to be **VERY**
# allocated ``BroadcastReceiver`` before calling this method for # aware of any (theoretical) overhead from the allocated
# the first time. # `BroadcastReceiver.receive()`.
#
# HOWEVER, NOw we do revert and de-alloc the ._broadcaster
# when the final caller (task) exits.
#
bcast: BroadcastReceiver|None = None
if self._broadcaster is None: if self._broadcaster is None:
bcast = self._broadcaster = broadcast_receiver( bcast = self._broadcaster = broadcast_receiver(
@ -541,29 +558,60 @@ class MsgStream(trio.abc.Channel):
# TODO: can remove this kwarg right since # TODO: can remove this kwarg right since
# by default behaviour is to do this anyway? # by default behaviour is to do this anyway?
receive_afunc=self.receive, receive_afunc=self._receive,
) )
# NOTE: we override the original stream instance's receive # XXX NOTE, we override the original stream instance's
# method to now delegate to the broadcaster's ``.receive()`` # receive method to instead delegate to the broadcaster's
# such that new subscribers will be copied received values # `.receive()` such that new subscribers (multiple
# and this stream doesn't have to expect it's original # `trio.Task`s) will be copied received values and the
# consumer(s) to get a new broadcast rx handle. # *first* task to enter here doesn't have to expect its original consumer(s)
# to get a new broadcast rx handle; everything happens
# underneath this iface seemlessly.
#
self.receive = bcast.receive # type: ignore self.receive = bcast.receive # type: ignore
# seems there's no graceful way to type this with ``mypy``? # seems there's no graceful way to type this with `mypy`?
# https://github.com/python/mypy/issues/708 # https://github.com/python/mypy/issues/708
async with self._broadcaster.subscribe() as bstream: # TODO, prevent re-entrant sub scope?
assert bstream.key != self._broadcaster.key # if self._broadcaster._closed:
assert bstream._recv == self._broadcaster._recv # raise RuntimeError(
# 'This stream
# NOTE: we patch on a `.send()` to the bcaster so that the try:
# caller can still conduct 2-way streaming using this aenter = self._broadcaster.subscribe()
# ``bstream`` handle transparently as though it was the msg async with aenter as bstream:
# stream instance. # ?TODO, move into test suite?
bstream.send = self.send # type: ignore assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv
yield bstream # NOTE: we patch on a `.send()` to the bcaster so that the
# caller can still conduct 2-way streaming using this
# ``bstream`` handle transparently as though it was the msg
# stream instance.
bstream.send = self.send # type: ignore
# newly-allocated instance
yield bstream
finally:
# XXX, the first-enterer task should, like all other
# subs, close the first allocated bcrx, which adjusts the
# common `bcrx.state`
with trio.CancelScope(shield=True):
if bcast is not None:
await bcast.aclose()
# XXX, when the bcrx.state reports there are no more subs
# we can revert to this obj's method, removing any
# delegation overhead!
if (
(orig_bcast := self._broadcaster)
and
not orig_bcast.state.subs
):
self.receive = self._receive
# self._broadcaster = None
async def send( async def send(
self, self,

View File

@ -100,6 +100,32 @@ class Lagged(trio.TooSlowError):
''' '''
def wrap_rx_for_eoc(
rx: AsyncReceiver,
) -> AsyncReceiver:
match rx:
case trio.MemoryReceiveChannel():
# XXX, taken verbatim from .receive_nowait()
def is_eoc() -> bool:
if not rx._state.open_send_channels:
return trio.EndOfChannel
return False
rx.is_eoc = is_eoc
case _:
# XXX, ensure we define a private field!
# case tractor.MsgStream:
assert (
getattr(rx, '_eoc', False) is not None
)
return rx
class BroadcastState(Struct): class BroadcastState(Struct):
''' '''
Common state to all receivers of a broadcast. Common state to all receivers of a broadcast.
@ -186,11 +212,23 @@ class BroadcastReceiver(ReceiveChannel):
state.subs[self.key] = -1 state.subs[self.key] = -1
# underlying for this receiver # underlying for this receiver
self._rx = rx_chan self._rx = wrap_rx_for_eoc(rx_chan)
self._recv = receive_afunc or rx_chan.receive self._recv = receive_afunc or rx_chan.receive
self._closed: bool = False self._closed: bool = False
self._raise_on_lag = raise_on_lag self._raise_on_lag = raise_on_lag
@property
def state(self) -> BroadcastState:
'''
Read-only access to this receivers internal `._state`
instance ref.
If you just want to read the high-level state metrics,
use `.state.statistics()`.
'''
return self._state
def receive_nowait( def receive_nowait(
self, self,
_key: int | None = None, _key: int | None = None,
@ -215,7 +253,23 @@ class BroadcastReceiver(ReceiveChannel):
try: try:
seq = state.subs[key] seq = state.subs[key]
except KeyError: except KeyError:
# from tractor import pause_from_sync
# pause_from_sync(shield=True)
if (
(rx_eoc := self._rx.is_eoc())
or
self.state.eoc
):
raise trio.EndOfChannel(
'Broadcast-Rx underlying already ended!'
) from rx_eoc
if self._closed: if self._closed:
# if (rx_eoc := self._rx._eoc):
# raise trio.EndOfChannel(
# 'Broadcast-Rx underlying already ended!'
# ) from rx_eoc
raise trio.ClosedResourceError raise trio.ClosedResourceError
raise RuntimeError( raise RuntimeError(
@ -453,8 +507,9 @@ class BroadcastReceiver(ReceiveChannel):
self._closed = True self._closed = True
# NOTE, this can we use as an `@acm` since `BroadcastReceiver`
# derives from `ReceiveChannel`.
def broadcast_receiver( def broadcast_receiver(
recv_chan: AsyncReceiver, recv_chan: AsyncReceiver,
max_buffer_size: int, max_buffer_size: int,
receive_afunc: Callable[[], Awaitable[Any]]|None = None, receive_afunc: Callable[[], Awaitable[Any]]|None = None,