From 3817b4fb5e57d4120189998d9f9458b11107a634 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 8 Aug 2021 17:23:48 -0400 Subject: [PATCH 01/32] Ultra naive broadcast channel prototype --- tractor/_live_from_tokio.py | 194 ++++++++++++++++++++++++++++++++++++ 1 file changed, 194 insertions(+) create mode 100644 tractor/_live_from_tokio.py diff --git a/tractor/_live_from_tokio.py b/tractor/_live_from_tokio.py new file mode 100644 index 0000000..88e3652 --- /dev/null +++ b/tractor/_live_from_tokio.py @@ -0,0 +1,194 @@ +''' +``tokio`` style broadcast channels. + +''' +from __future__ import annotations +# from math import inf +from itertools import cycle +from collections import deque +from contextlib import contextmanager # , asynccontextmanager +from functools import partial +from typing import Optional + +import trio +import tractor +from trio.lowlevel import current_task +from trio.abc import ReceiveChannel # , SendChannel +# from trio._core import enable_ki_protection +from trio._core._run import Task +from trio._channel import ( + MemorySendChannel, + MemoryReceiveChannel, + # MemoryChannelState, +) + + +class Lagged(trio.TooSlowError): + '''Subscribed consumer task was too slow''' + + +class BroadcastReceiver(ReceiveChannel): + '''This isn't Paris, not Berlin, nor Honk Kong.. + + ''' + def __init__( + self, + rx_chan: MemoryReceiveChannel, + buffer_size: int = 100, + + ) -> None: + + self._rx = rx_chan + self._len = buffer_size + self._queue = deque(maxlen=buffer_size) + self._subs = {id(current_task()): -1} + self._value_received: Optional[trio.Event] = None + + async def receive(self): + + task: Task + task = current_task() + + # check that task does not already have a value it can receive + # immediately and/or that it has lagged. + key = id(task) + # print(key) + try: + seq = self._subs[key] + except KeyError: + self._subs.pop(key) + raise RuntimeError( + f'Task {task.name} is not registerd as subscriber') + + if seq > -1: + # get the oldest value we haven't received immediately + + try: + value = self._queue[seq] + except IndexError: + raise Lagged(f'Task {task.name} was overrun') + + self._subs[key] -= 1 + return value + + if self._value_received is None: + # we already have the latest value **and** are the first + # task to begin waiting for a new one + + # sanity checks with underlying chan ? + # assert not self._rx._state.data + + event = self._value_received = trio.Event() + value = await self._rx.receive() + + # items with lower indices are "newer" + self._queue.appendleft(value) + + # broadcast new value to all subscribers by increasing + # all sequence numbers that will point in the queue to + # their latest available value. + for sub_key, seq in self._subs.items(): + + if key == sub_key: + # we don't need to increase **this** task's + # sequence number since we just consumed the latest + # value + continue + + # # except TypeError: + # # # already lagged + # # seq = Lagged + + self._subs[sub_key] += 1 + + self._value_received = None + event.set() + return value + + else: + await self._value_received.wait() + + seq = self._subs[key] + assert seq > -1, 'Uhhhh' + + self._subs[key] -= 1 + return self._queue[0] + + # @asynccontextmanager + @contextmanager + def subscribe( + self, + + ) -> BroadcastReceiver: + key = id(current_task()) + self._subs[key] = -1 + try: + yield self + finally: + self._subs.pop(key) + + async def aclose(self) -> None: + # TODO: wtf should we do here? + # if we're the last sub to close then close + # the underlying rx channel + pass + + +def broadcast_channel( + + max_buffer_size: int, + +) -> (MemorySendChannel, BroadcastReceiver): + + tx, rx = trio.open_memory_channel(max_buffer_size) + return tx, BroadcastReceiver(rx) + + +if __name__ == '__main__': + + async def main(): + + async with tractor.open_root_actor( + debug_mode=True, + # loglevel='info', + ): + + tx, rx = broadcast_channel(100) + + async def sub_and_print( + delay: float, + ) -> None: + + task = current_task() + count = 0 + + while True: + with rx.subscribe(): + try: + async for value in rx: + print(f'{task.name}: {value}') + await trio.sleep(delay) + count += 1 + + except Lagged: + print( + f'restarting slow ass {task.name}' + f'that bailed out on {count}:{value}') + continue + + async with trio.open_nursery() as n: + for i in range(1, 10): + n.start_soon( + partial( + sub_and_print, + delay=i*0.01, + ), + name=f'sub_{i}', + ) + + async with tx: + for i in cycle(range(1000)): + print(f'sending: {i}') + await tx.send(i) + + trio.run(main) From 6a2c3da1bb59ea84f7b5e9c237af63acb77045b3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 8 Aug 2021 19:48:02 -0400 Subject: [PATCH 02/32] Simplify api around receive channel Buncha improvements: - pass in the queue via constructor - tracking over all underlying memory channel closure using cloning - do it like `tokio` and set lagged consumers to the last sequence before raising - copy the subs on first receiver wakeup for iteration instead of iterating the table directly (and being forced to skip the current tasks sequence increment) - implement `.aclose()` to close the underlying clone for this task - make `broadcast_receiver()` just take the recv chan since it doesn't need anything on the send side. --- tractor/_live_from_tokio.py | 88 +++++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 39 deletions(-) diff --git a/tractor/_live_from_tokio.py b/tractor/_live_from_tokio.py index 88e3652..5aab368 100644 --- a/tractor/_live_from_tokio.py +++ b/tractor/_live_from_tokio.py @@ -1,25 +1,22 @@ ''' -``tokio`` style broadcast channels. +``tokio`` style broadcast channel. +https://tokio-rs.github.io/tokio/doc/tokio/sync/broadcast/index.html ''' from __future__ import annotations -# from math import inf from itertools import cycle from collections import deque -from contextlib import contextmanager # , asynccontextmanager +from contextlib import contextmanager from functools import partial from typing import Optional import trio import tractor from trio.lowlevel import current_task -from trio.abc import ReceiveChannel # , SendChannel -# from trio._core import enable_ki_protection +from trio.abc import ReceiveChannel from trio._core._run import Task from trio._channel import ( - MemorySendChannel, MemoryReceiveChannel, - # MemoryChannelState, ) @@ -28,20 +25,25 @@ class Lagged(trio.TooSlowError): class BroadcastReceiver(ReceiveChannel): - '''This isn't Paris, not Berlin, nor Honk Kong.. + '''A memory receive channel broadcaster which is non-lossy for the + fastest consumer. + + Additional consumer tasks can receive all produced values by registering + with ``.subscribe()``. ''' def __init__( self, + rx_chan: MemoryReceiveChannel, - buffer_size: int = 100, + queue: deque, ) -> None: self._rx = rx_chan - self._len = buffer_size - self._queue = deque(maxlen=buffer_size) - self._subs = {id(current_task()): -1} + self._queue = queue + self._subs: dict[Task, int] = {} # {id(current_task()): -1} + self._clones: dict[Task, MemoryReceiveChannel] = {} self._value_received: Optional[trio.Event] = None async def receive(self): @@ -56,26 +58,30 @@ class BroadcastReceiver(ReceiveChannel): try: seq = self._subs[key] except KeyError: - self._subs.pop(key) raise RuntimeError( f'Task {task.name} is not registerd as subscriber') if seq > -1: # get the oldest value we haven't received immediately - try: value = self._queue[seq] except IndexError: + # decrement to the last value and expect + # consumer to either handle the ``Lagged`` and come back + # or bail out on it's own (thus un-subscribing) + self._subs[key] = self._queue.maxlen - 1 + + # this task was overrun by the producer side raise Lagged(f'Task {task.name} was overrun') self._subs[key] -= 1 return value if self._value_received is None: - # we already have the latest value **and** are the first - # task to begin waiting for a new one + # current task already has the latest value **and** is the + # first task to begin waiting for a new one - # sanity checks with underlying chan ? + # what sanity checks might we use for the underlying chan ? # assert not self._rx._state.data event = self._value_received = trio.Event() @@ -87,20 +93,15 @@ class BroadcastReceiver(ReceiveChannel): # broadcast new value to all subscribers by increasing # all sequence numbers that will point in the queue to # their latest available value. - for sub_key, seq in self._subs.items(): - - if key == sub_key: - # we don't need to increase **this** task's - # sequence number since we just consumed the latest - # value - continue - - # # except TypeError: - # # # already lagged - # # seq = Lagged + subs = self._subs.copy() + # don't decerement the sequence # for this task since we + # already retreived the last value + subs.pop(key) + for sub_key, seq in subs.items(): self._subs[sub_key] += 1 + # reset receiver waiter task event for next blocking condition self._value_received = None event.set() return value @@ -109,7 +110,7 @@ class BroadcastReceiver(ReceiveChannel): await self._value_received.wait() seq = self._subs[key] - assert seq > -1, 'Uhhhh' + assert seq > -1, 'Internal error?' self._subs[key] -= 1 return self._queue[0] @@ -118,30 +119,37 @@ class BroadcastReceiver(ReceiveChannel): @contextmanager def subscribe( self, - ) -> BroadcastReceiver: key = id(current_task()) self._subs[key] = -1 + # XXX: we only use this clone for closure tracking + clone = self._clones[key] = self._rx.clone() try: yield self finally: self._subs.pop(key) + clone.close() + # TODO: do we need anything here? + # if we're the last sub to close then close + # the underlying rx channel, but couldn't we just + # use ``.clone()``s trackign then? async def aclose(self) -> None: - # TODO: wtf should we do here? - # if we're the last sub to close then close - # the underlying rx channel - pass + key = id(current_task()) + await self._clones[key].aclose() -def broadcast_channel( +def broadcast_receiver( + recv_chan: MemoryReceiveChannel, max_buffer_size: int, -) -> (MemorySendChannel, BroadcastReceiver): +) -> BroadcastReceiver: - tx, rx = trio.open_memory_channel(max_buffer_size) - return tx, BroadcastReceiver(rx) + return BroadcastReceiver( + recv_chan, + queue=deque(maxlen=max_buffer_size), + ) if __name__ == '__main__': @@ -153,7 +161,9 @@ if __name__ == '__main__': # loglevel='info', ): - tx, rx = broadcast_channel(100) + size = 100 + tx, rx = trio.open_memory_channel(size) + rx = broadcast_receiver(rx, size) async def sub_and_print( delay: float, From 1af7dbb732256eb081b86c926f7b72793d4638d9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 8 Aug 2021 19:58:12 -0400 Subject: [PATCH 03/32] `Task` is hashable, so key on it --- tractor/_live_from_tokio.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/tractor/_live_from_tokio.py b/tractor/_live_from_tokio.py index 5aab368..16f6aaf 100644 --- a/tractor/_live_from_tokio.py +++ b/tractor/_live_from_tokio.py @@ -48,15 +48,12 @@ class BroadcastReceiver(ReceiveChannel): async def receive(self): - task: Task - task = current_task() + task: Task = current_task() # check that task does not already have a value it can receive # immediately and/or that it has lagged. - key = id(task) - # print(key) try: - seq = self._subs[key] + seq = self._subs[task] except KeyError: raise RuntimeError( f'Task {task.name} is not registerd as subscriber') @@ -69,12 +66,12 @@ class BroadcastReceiver(ReceiveChannel): # decrement to the last value and expect # consumer to either handle the ``Lagged`` and come back # or bail out on it's own (thus un-subscribing) - self._subs[key] = self._queue.maxlen - 1 + self._subs[task] = self._queue.maxlen - 1 # this task was overrun by the producer side raise Lagged(f'Task {task.name} was overrun') - self._subs[key] -= 1 + self._subs[task] -= 1 return value if self._value_received is None: @@ -97,7 +94,7 @@ class BroadcastReceiver(ReceiveChannel): subs = self._subs.copy() # don't decerement the sequence # for this task since we # already retreived the last value - subs.pop(key) + subs.pop(task) for sub_key, seq in subs.items(): self._subs[sub_key] += 1 @@ -109,10 +106,10 @@ class BroadcastReceiver(ReceiveChannel): else: await self._value_received.wait() - seq = self._subs[key] + seq = self._subs[task] assert seq > -1, 'Internal error?' - self._subs[key] -= 1 + self._subs[task] -= 1 return self._queue[0] # @asynccontextmanager @@ -120,14 +117,14 @@ class BroadcastReceiver(ReceiveChannel): def subscribe( self, ) -> BroadcastReceiver: - key = id(current_task()) - self._subs[key] = -1 + task: task = current_task() + self._subs[task] = -1 # XXX: we only use this clone for closure tracking - clone = self._clones[key] = self._rx.clone() + clone = self._clones[task] = self._rx.clone() try: yield self finally: - self._subs.pop(key) + self._subs.pop(task) clone.close() # TODO: do we need anything here? @@ -135,8 +132,8 @@ class BroadcastReceiver(ReceiveChannel): # the underlying rx channel, but couldn't we just # use ``.clone()``s trackign then? async def aclose(self) -> None: - key = id(current_task()) - await self._clones[key].aclose() + task: Task = current_task() + await self._clones[task].aclose() def broadcast_receiver( From 64358f6525e75d1035730422ade681895afff7fd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Aug 2021 07:35:42 -0400 Subject: [PATCH 04/32] Rename to broadcast mod, don't expect mem chan specifically --- tractor/{_live_from_tokio.py => _broadcast.py} | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) rename tractor/{_live_from_tokio.py => _broadcast.py} (97%) diff --git a/tractor/_live_from_tokio.py b/tractor/_broadcast.py similarity index 97% rename from tractor/_live_from_tokio.py rename to tractor/_broadcast.py index 16f6aaf..3834116 100644 --- a/tractor/_live_from_tokio.py +++ b/tractor/_broadcast.py @@ -15,9 +15,9 @@ import tractor from trio.lowlevel import current_task from trio.abc import ReceiveChannel from trio._core._run import Task -from trio._channel import ( - MemoryReceiveChannel, -) +# from trio._channel import ( +# MemoryReceiveChannel, +# ) class Lagged(trio.TooSlowError): @@ -43,7 +43,7 @@ class BroadcastReceiver(ReceiveChannel): self._rx = rx_chan self._queue = queue self._subs: dict[Task, int] = {} # {id(current_task()): -1} - self._clones: dict[Task, MemoryReceiveChannel] = {} + self._clones: dict[Task, ReceiveChannel] = {} self._value_received: Optional[trio.Event] = None async def receive(self): @@ -138,7 +138,7 @@ class BroadcastReceiver(ReceiveChannel): def broadcast_receiver( - recv_chan: MemoryReceiveChannel, + recv_chan: ReceiveChannel, max_buffer_size: int, ) -> BroadcastReceiver: From 4ad75a3287beb4ca86a146323b75e7cb4ab9f016 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 9 Aug 2021 16:40:02 -0400 Subject: [PATCH 05/32] Obviously keying on tasks isn't going to work Using the current task as a subscription key fails horribly as soon as you hand off new subscription receiver to another task you've spawned.. Instead use the underlying ``trio.abc.ReceiveChannel.clone()`` as a key (so i guess we're assuming cloning is supported by the underlying?) which makes this all work just like default mem chans. As a bonus, now we can just close the underlying rx (which may be a clone) on `.aclose()` and everything should just work in terms of the underlying channels lifetime (i think?). Change `.subscribe()` to be async since the receive channel type interface only expects `.aclose()` and it actually ends up being nicer for 3.9+ style `async with` parentheses style anyway. --- tractor/_broadcast.py | 139 ++++++++++++++++++++++++++---------------- 1 file changed, 88 insertions(+), 51 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index 3834116..c5f517c 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -6,7 +6,7 @@ https://tokio-rs.github.io/tokio/doc/tokio/sync/broadcast/index.html from __future__ import annotations from itertools import cycle from collections import deque -from contextlib import contextmanager +from contextlib import asynccontextmanager from functools import partial from typing import Optional @@ -15,9 +15,6 @@ import tractor from trio.lowlevel import current_task from trio.abc import ReceiveChannel from trio._core._run import Task -# from trio._channel import ( -# MemoryReceiveChannel, -# ) class Lagged(trio.TooSlowError): @@ -29,57 +26,71 @@ class BroadcastReceiver(ReceiveChannel): fastest consumer. Additional consumer tasks can receive all produced values by registering - with ``.subscribe()``. + with ``.subscribe()`` and receiving from thew new instance it delivers. ''' + # map of underlying clones to receiver wrappers + _subs: dict[trio.ReceiveChannel, BroadcastReceiver] = {} + def __init__( self, - rx_chan: MemoryReceiveChannel, + rx_chan: ReceiveChannel, queue: deque, ) -> None: self._rx = rx_chan self._queue = queue - self._subs: dict[Task, int] = {} # {id(current_task()): -1} - self._clones: dict[Task, ReceiveChannel] = {} self._value_received: Optional[trio.Event] = None async def receive(self): - task: Task = current_task() + key = self._rx + + # TODO: ideally we can make some way to "lock out" the + # underlying receive channel in some way such that if some task + # tries to pull from it directly (i.e. one we're unaware of) + # then it errors out. + + # only tasks which have entered ``.subscribe()`` can + # receive on this broadcaster. + try: + seq = self._subs[key] + except KeyError: + raise RuntimeError( + f'{self} is not registerd as subscriber') # check that task does not already have a value it can receive # immediately and/or that it has lagged. - try: - seq = self._subs[task] - except KeyError: - raise RuntimeError( - f'Task {task.name} is not registerd as subscriber') - if seq > -1: # get the oldest value we haven't received immediately try: value = self._queue[seq] except IndexError: + + # adhere to ``tokio`` style "lagging": + # "Once RecvError::Lagged is returned, the lagging + # receiver's position is updated to the oldest value + # contained by the channel. The next call to recv will + # return this value." + # https://tokio-rs.github.io/tokio/doc/tokio/sync/broadcast/index.html#lagging + # decrement to the last value and expect # consumer to either handle the ``Lagged`` and come back - # or bail out on it's own (thus un-subscribing) - self._subs[task] = self._queue.maxlen - 1 + # or bail out on its own (thus un-subscribing) + self._subs[key] = self._queue.maxlen - 1 # this task was overrun by the producer side + task: Task = current_task() raise Lagged(f'Task {task.name} was overrun') - self._subs[task] -= 1 + self._subs[key] -= 1 return value + # current task already has the latest value **and** is the + # first task to begin waiting for a new one if self._value_received is None: - # current task already has the latest value **and** is the - # first task to begin waiting for a new one - - # what sanity checks might we use for the underlying chan ? - # assert not self._rx._state.data event = self._value_received = trio.Event() value = await self._rx.receive() @@ -92,9 +103,9 @@ class BroadcastReceiver(ReceiveChannel): # their latest available value. subs = self._subs.copy() - # don't decerement the sequence # for this task since we + # don't decrement the sequence # for this task since we # already retreived the last value - subs.pop(task) + subs.pop(key) for sub_key, seq in subs.items(): self._subs[sub_key] += 1 @@ -103,37 +114,56 @@ class BroadcastReceiver(ReceiveChannel): event.set() return value + # This task is all caught up and ready to receive the latest + # value, so queue sched it on the internal event. else: await self._value_received.wait() - seq = self._subs[task] + seq = self._subs[key] assert seq > -1, 'Internal error?' - self._subs[task] -= 1 + self._subs[key] -= 1 return self._queue[0] - # @asynccontextmanager - @contextmanager - def subscribe( + @asynccontextmanager + async def subscribe( self, ) -> BroadcastReceiver: - task: task = current_task() - self._subs[task] = -1 - # XXX: we only use this clone for closure tracking - clone = self._clones[task] = self._rx.clone() - try: - yield self - finally: - self._subs.pop(task) - clone.close() + '''Subscribe for values from this broadcast receiver. - # TODO: do we need anything here? - # if we're the last sub to close then close - # the underlying rx channel, but couldn't we just - # use ``.clone()``s trackign then? - async def aclose(self) -> None: - task: Task = current_task() - await self._clones[task].aclose() + Returns a new ``BroadCastReceiver`` which is registered for and + pulls data from a clone of the original ``trio.abc.ReceiveChannel`` + provided at creation. + + ''' + clone = self._rx.clone() + self._subs[clone] = -1 + try: + yield BroadcastReceiver( + clone, + self._queue, + ) + finally: + # drop from subscribers and close + self._subs.pop(clone) + # XXX: this is the reason this function is async: the + # ``AsyncResource`` api. + await clone.aclose() + + # TODO: + # - should there be some ._closed flag that causes + # consumers to die **before** they read all queued values? + # - if subs only open and close clones then the underlying + # will never be killed until the last instance closes? + # This is correct right? + async def aclose( + self, + ) -> None: + # XXX: leaving it like this consumers can still get values + # up to the last received that still reside in the queue. + # Is this what we want? + await self._rx.aclose() + self._subs.pop(self._rx) def broadcast_receiver( @@ -158,6 +188,7 @@ if __name__ == '__main__': # loglevel='info', ): + retries = 3 size = 100 tx, rx = trio.open_memory_channel(size) rx = broadcast_receiver(rx, size) @@ -170,9 +201,9 @@ if __name__ == '__main__': count = 0 while True: - with rx.subscribe(): + async with rx.subscribe() as brx: try: - async for value in rx: + async for value in brx: print(f'{task.name}: {value}') await trio.sleep(delay) count += 1 @@ -181,10 +212,16 @@ if __name__ == '__main__': print( f'restarting slow ass {task.name}' f'that bailed out on {count}:{value}') - continue + if count <= retries: + continue + else: + print( + f'{task.name} was too slow and terminated ' + f'on {count}:{value}') + return async with trio.open_nursery() as n: - for i in range(1, 10): + for i in range(1, size): n.start_soon( partial( sub_and_print, @@ -194,7 +231,7 @@ if __name__ == '__main__': ) async with tx: - for i in cycle(range(1000)): + for i in cycle(range(size)): print(f'sending: {i}') await tx.send(i) From 6e78bcf898d69b110cef770d9a722abbdc1776ac Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Aug 2021 12:38:26 -0400 Subject: [PATCH 06/32] Facepalm: use single `_subs` per clone set --- tractor/_broadcast.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index c5f517c..bfd70ce 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -29,18 +29,26 @@ class BroadcastReceiver(ReceiveChannel): with ``.subscribe()`` and receiving from thew new instance it delivers. ''' - # map of underlying clones to receiver wrappers - _subs: dict[trio.ReceiveChannel, BroadcastReceiver] = {} - def __init__( self, rx_chan: ReceiveChannel, queue: deque, + _subs: dict[trio.ReceiveChannel, BroadcastReceiver], ) -> None: + # map of underlying clones to receiver wrappers + # which must be provided as a singleton per broadcaster + # clone-subscription set. + self._subs = _subs + + # underlying for this receiver self._rx = rx_chan + + # register the original underlying (clone) + self._subs[rx_chan] = -1 + self._queue = queue self._value_received: Optional[trio.Event] = None @@ -137,18 +145,21 @@ class BroadcastReceiver(ReceiveChannel): ''' clone = self._rx.clone() - self._subs[clone] = -1 + br = BroadcastReceiver( + clone, + self._queue, + _subs=self._subs, + ) + assert clone in self._subs + try: - yield BroadcastReceiver( - clone, - self._queue, - ) + yield br finally: - # drop from subscribers and close - self._subs.pop(clone) # XXX: this is the reason this function is async: the # ``AsyncResource`` api. await clone.aclose() + # drop from subscribers and close + self._subs.pop(clone) # TODO: # - should there be some ._closed flag that causes @@ -176,6 +187,7 @@ def broadcast_receiver( return BroadcastReceiver( recv_chan, queue=deque(maxlen=max_buffer_size), + _subs={}, # this is singleton over all subscriptions ) From ceed96aa3ffc0402a26f2ba52ea82971b1818a8c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Aug 2021 15:32:53 -0400 Subject: [PATCH 07/32] Add common state delegate type for all consumers For every set of broadcast receivers which pull from the same producer, we need a singleton state for all of, - subscriptions - the sender ready event - the queue Add a `BroadcastState` dataclass for this and pass it to all subscriptions. This makes the design much more like the built-in memory channels which do something very similar with `MemoryChannelState`. Use a `filter()` on the subs list in the sequence update step, plus some other commented approaches we can try for speed. --- tractor/_broadcast.py | 118 +++++++++++++++++++++++++----------------- 1 file changed, 71 insertions(+), 47 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index bfd70ce..984aae9 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -4,23 +4,42 @@ https://tokio-rs.github.io/tokio/doc/tokio/sync/broadcast/index.html ''' from __future__ import annotations -from itertools import cycle from collections import deque from contextlib import asynccontextmanager +from dataclasses import dataclass from functools import partial +from itertools import cycle +from operator import ne from typing import Optional import trio -import tractor -from trio.lowlevel import current_task -from trio.abc import ReceiveChannel from trio._core._run import Task +from trio.abc import ReceiveChannel +from trio.lowlevel import current_task +import tractor class Lagged(trio.TooSlowError): '''Subscribed consumer task was too slow''' +@dataclass +class BroadcastState: + '''Common state to all receivers of a broadcast. + + ''' + queue: deque + + # map of underlying clones to receiver wrappers + # which must be provided as a singleton per broadcaster + # clone-subscription set. + subs: dict[trio.ReceiveChannel, BroadcastReceiver] + + # broadcast event to wakeup all sleeping consumer tasks + # on a newly produced value from the sender. + sender_ready: Optional[trio.Event] = None + + class BroadcastReceiver(ReceiveChannel): '''A memory receive channel broadcaster which is non-lossy for the fastest consumer. @@ -33,28 +52,21 @@ class BroadcastReceiver(ReceiveChannel): self, rx_chan: ReceiveChannel, - queue: deque, - _subs: dict[trio.ReceiveChannel, BroadcastReceiver], + state: BroadcastState, ) -> None: - # map of underlying clones to receiver wrappers - # which must be provided as a singleton per broadcaster - # clone-subscription set. - self._subs = _subs + # register the original underlying (clone) + self._state = state + state.subs[rx_chan] = -1 # underlying for this receiver self._rx = rx_chan - # register the original underlying (clone) - self._subs[rx_chan] = -1 - - self._queue = queue - self._value_received: Optional[trio.Event] = None - async def receive(self): key = self._rx + state = self._state # TODO: ideally we can make some way to "lock out" the # underlying receive channel in some way such that if some task @@ -64,7 +76,7 @@ class BroadcastReceiver(ReceiveChannel): # only tasks which have entered ``.subscribe()`` can # receive on this broadcaster. try: - seq = self._subs[key] + seq = state.subs[key] except KeyError: raise RuntimeError( f'{self} is not registerd as subscriber') @@ -74,7 +86,7 @@ class BroadcastReceiver(ReceiveChannel): if seq > -1: # get the oldest value we haven't received immediately try: - value = self._queue[seq] + value = state.queue[seq] except IndexError: # adhere to ``tokio`` style "lagging": @@ -87,51 +99,61 @@ class BroadcastReceiver(ReceiveChannel): # decrement to the last value and expect # consumer to either handle the ``Lagged`` and come back # or bail out on its own (thus un-subscribing) - self._subs[key] = self._queue.maxlen - 1 + state.subs[key] = state.queue.maxlen - 1 # this task was overrun by the producer side task: Task = current_task() raise Lagged(f'Task {task.name} was overrun') - self._subs[key] -= 1 + state.subs[key] -= 1 return value # current task already has the latest value **and** is the # first task to begin waiting for a new one - if self._value_received is None: + if state.sender_ready is None: - event = self._value_received = trio.Event() + event = state.sender_ready = trio.Event() value = await self._rx.receive() # items with lower indices are "newer" - self._queue.appendleft(value) + state.queue.appendleft(value) # broadcast new value to all subscribers by increasing # all sequence numbers that will point in the queue to # their latest available value. - subs = self._subs.copy() - # don't decrement the sequence # for this task since we + # don't decrement the sequence for this task since we # already retreived the last value - subs.pop(key) - for sub_key, seq in subs.items(): - self._subs[sub_key] += 1 + + # XXX: which of these impls is fastest? + + # subs = state.subs.copy() + # subs.pop(key) + + for sub_key in filter( + # lambda k: k != key, state.subs, + partial(ne, key), state.subs, + ): + state.subs[sub_key] += 1 # reset receiver waiter task event for next blocking condition - self._value_received = None event.set() + state.sender_ready = None return value # This task is all caught up and ready to receive the latest # value, so queue sched it on the internal event. else: - await self._value_received.wait() + await state.sender_ready.wait() - seq = self._subs[key] - assert seq > -1, 'Internal error?' + # TODO: optimization: if this is always true can't we just + # skip iterating these sequence numbers on the fastest + # task's wakeup and always read from state.queue[0]? + seq = state.subs[key] + assert seq == 0, 'Internal error?' - self._subs[key] -= 1 - return self._queue[0] + state.subs[key] -= 1 + return state.queue[seq] @asynccontextmanager async def subscribe( @@ -145,12 +167,12 @@ class BroadcastReceiver(ReceiveChannel): ''' clone = self._rx.clone() + state = self._state br = BroadcastReceiver( - clone, - self._queue, - _subs=self._subs, + rx_chan=clone, + state=state, ) - assert clone in self._subs + assert clone in state.subs try: yield br @@ -159,7 +181,7 @@ class BroadcastReceiver(ReceiveChannel): # ``AsyncResource`` api. await clone.aclose() # drop from subscribers and close - self._subs.pop(clone) + state.subs.pop(clone) # TODO: # - should there be some ._closed flag that causes @@ -186,8 +208,10 @@ def broadcast_receiver( return BroadcastReceiver( recv_chan, - queue=deque(maxlen=max_buffer_size), - _subs={}, # this is singleton over all subscriptions + state=BroadcastState( + queue=deque(maxlen=max_buffer_size), + subs={}, + ), ) @@ -210,7 +234,7 @@ if __name__ == '__main__': ) -> None: task = current_task() - count = 0 + lags = 0 while True: async with rx.subscribe() as brx: @@ -218,22 +242,22 @@ if __name__ == '__main__': async for value in brx: print(f'{task.name}: {value}') await trio.sleep(delay) - count += 1 except Lagged: print( f'restarting slow ass {task.name}' - f'that bailed out on {count}:{value}') - if count <= retries: + f'that bailed out on {lags}:{value}') + if lags <= retries: + lags += 1 continue else: print( f'{task.name} was too slow and terminated ' - f'on {count}:{value}') + f'on {lags}:{value}') return async with trio.open_nursery() as n: - for i in range(1, size): + for i in range(1, 10): n.start_soon( partial( sub_and_print, From a12b1fc6311a422d7f0600fda720c4ced169e286 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Aug 2021 19:09:34 -0400 Subject: [PATCH 08/32] Drop optimization check, binance made its point --- tractor/_broadcast.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index 984aae9..7d56430 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -145,13 +145,7 @@ class BroadcastReceiver(ReceiveChannel): # value, so queue sched it on the internal event. else: await state.sender_ready.wait() - - # TODO: optimization: if this is always true can't we just - # skip iterating these sequence numbers on the fastest - # task's wakeup and always read from state.queue[0]? seq = state.subs[key] - assert seq == 0, 'Internal error?' - state.subs[key] -= 1 return state.queue[seq] From 2d1c24112bc830136a47d6df6ae7ac5363eded7f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 15 Aug 2021 17:42:10 -0400 Subject: [PATCH 09/32] Add subscription support to message streams Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast receiver around the stream for use by multiple actor-local consumer tasks. Entering this context manager idempotently mutates the stream's receive machinery which for now can not be undone. Move `.clone()` to the receive stream type. Resolves #204 --- tractor/_streaming.py | 64 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 5f04554..35642d0 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -2,6 +2,7 @@ Message stream types and APIs. """ +from __future__ import annotations import inspect from contextlib import contextmanager, asynccontextmanager from dataclasses import dataclass @@ -17,6 +18,7 @@ import trio from ._ipc import Channel from ._exceptions import unpack_error, ContextCancelled from ._state import current_actor +from ._broadcast import broadcast_receiver, BroadcastReceiver from .log import get_logger @@ -52,6 +54,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): ) -> None: self._ctx = ctx self._rx_chan = rx_chan + self._broadcaster: Optional[BroadcastReceiver] = None # flag to denote end of stream self._eoc: bool = False @@ -231,6 +234,56 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). + def clone(self): + """Clone this receive channel allowing for multi-task + consumption from the same channel. + + """ + return type(self)( + self._ctx, + self._rx_chan.clone(), + ) + + @asynccontextmanager + async def subscribe( + self, + + ) -> BroadcastReceiver: + '''Allocate and return a ``BroadcastReceiver`` which delegates + to this message stream. + + This allows multiple local tasks to receive each their own copy + of this message stream. + + This operation is indempotent and and mutates this stream's + receive machinery to copy and window-length-store each received + value from the far end via the internally created broudcast + receiver wrapper. + + ''' + if self._broadcaster is None: + self._broadcaster = broadcast_receiver( + self, + self._rx_chan._state.max_buffer_size, + ) + # override the original stream instance's receive to + # delegate to the broadcaster receive such that + # new subscribers will be copied received values + # XXX: this operation is indempotent and non-reversible, + # so be sure you can deal with any (theoretical) overhead + # of the the ``BroadcastReceiver`` before calling + # this method for the first time. + + # XXX: why does this work without a recursion issue?! + self.receive = self._broadcaster.receive + + async with self._broadcaster.subscribe() as bstream: + # a ``MsgStream`` clone is allocated for the + # broadcaster to track this entry's subscription + stream_clone = bstream._rx + assert stream_clone is not self + yield bstream + class MsgStream(ReceiveMsgStream, trio.abc.Channel): """ @@ -247,17 +300,6 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): ''' await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) - # TODO: but make it broadcasting to consumers - def clone(self): - """Clone this receive channel allowing for multi-task - consumption from the same channel. - - """ - return MsgStream( - self._ctx, - self._rx_chan.clone(), - ) - @dataclass class Context: From 6c17c7367a7e5c7a0dad6c79dd5467460f8051bd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Aug 2021 12:47:49 -0400 Subject: [PATCH 10/32] Store handle to underlying channel's `.receive()` This allows for wrapping an existing stream by re-assigning its receive method to the allocated broadcaster's `.receive()` so as to avoid expecting any original consumer(s) of the stream to now know about the broadcaster; this instead mutates the stream to delegate to the new receive call behind the scenes any time `.subscribe()` is called. Add a `typing.Protocol` for so called "cloneable channels" until we decide/figure out a better keying system for each subscription and mask all undesired typing failures. --- tractor/_broadcast.py | 65 ++++++++++++++++++++++++++++++++++++++----- tractor/_streaming.py | 29 ++++++++++--------- 2 files changed, 74 insertions(+), 20 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index 7d56430..bba8021 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -4,13 +4,15 @@ https://tokio-rs.github.io/tokio/doc/tokio/sync/broadcast/index.html ''' from __future__ import annotations +from abc import abstractmethod from collections import deque from contextlib import asynccontextmanager from dataclasses import dataclass from functools import partial from itertools import cycle from operator import ne -from typing import Optional +from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol +from typing import Generic, TypeVar import trio from trio._core._run import Task @@ -19,6 +21,49 @@ from trio.lowlevel import current_task import tractor +# A regular invariant generic type +T = TypeVar("T") + +# The type of object produced by a ReceiveChannel (covariant because +# ReceiveChannel[Derived] can be passed to someone expecting +# ReceiveChannel[Base]) +ReceiveType = TypeVar("ReceiveType", covariant=True) + + +class CloneableReceiveChannel( + Protocol, + Generic[ReceiveType], +): + @abstractmethod + def clone(self) -> CloneableReceiveChannel[ReceiveType]: + '''Clone this receiver usually by making a copy.''' + + @abstractmethod + async def receive(self) -> ReceiveType: + '''Same as in ``trio``.''' + + @abstractmethod + def __aiter__(self) -> AsyncIterator[ReceiveType]: + ... + + @abstractmethod + async def __anext__(self) -> ReceiveType: + ... + + # ``trio.abc.AsyncResource`` methods + @abstractmethod + async def aclose(self): + ... + + @abstractmethod + async def __aenter__(self) -> CloneableReceiveChannel[ReceiveType]: + ... + + @abstractmethod + async def __aexit__(self, *args) -> None: + ... + + class Lagged(trio.TooSlowError): '''Subscribed consumer task was too slow''' @@ -33,7 +78,7 @@ class BroadcastState: # map of underlying clones to receiver wrappers # which must be provided as a singleton per broadcaster # clone-subscription set. - subs: dict[trio.ReceiveChannel, BroadcastReceiver] + subs: dict[CloneableReceiveChannel, int] # broadcast event to wakeup all sleeping consumer tasks # on a newly produced value from the sender. @@ -51,8 +96,9 @@ class BroadcastReceiver(ReceiveChannel): def __init__( self, - rx_chan: ReceiveChannel, + rx_chan: CloneableReceiveChannel, state: BroadcastState, + receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, ) -> None: @@ -62,6 +108,7 @@ class BroadcastReceiver(ReceiveChannel): # underlying for this receiver self._rx = rx_chan + self._recv = receive_afunc or rx_chan.receive async def receive(self): @@ -113,7 +160,7 @@ class BroadcastReceiver(ReceiveChannel): if state.sender_ready is None: event = state.sender_ready = trio.Event() - value = await self._rx.receive() + value = await self._recv() # items with lower indices are "newer" state.queue.appendleft(value) @@ -152,7 +199,7 @@ class BroadcastReceiver(ReceiveChannel): @asynccontextmanager async def subscribe( self, - ) -> BroadcastReceiver: + ) -> AsyncIterator[BroadcastReceiver]: '''Subscribe for values from this broadcast receiver. Returns a new ``BroadCastReceiver`` which is registered for and @@ -160,6 +207,8 @@ class BroadcastReceiver(ReceiveChannel): provided at creation. ''' + # if we didn't want to enforce "clone-ability" how would + # we key arbitrary subscriptions? Use a token system? clone = self._rx.clone() state = self._state br = BroadcastReceiver( @@ -190,13 +239,14 @@ class BroadcastReceiver(ReceiveChannel): # up to the last received that still reside in the queue. # Is this what we want? await self._rx.aclose() - self._subs.pop(self._rx) + self._state.subs.pop(self._rx) def broadcast_receiver( - recv_chan: ReceiveChannel, + recv_chan: CloneableReceiveChannel, max_buffer_size: int, + **kwargs, ) -> BroadcastReceiver: @@ -206,6 +256,7 @@ def broadcast_receiver( queue=deque(maxlen=max_buffer_size), subs={}, ), + **kwargs, ) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 35642d0..1bb1b81 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -9,6 +9,7 @@ from dataclasses import dataclass from typing import ( Any, Iterator, Optional, Callable, AsyncGenerator, Dict, + AsyncIterator, Awaitable ) import warnings @@ -49,8 +50,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): def __init__( self, ctx: 'Context', # typing: ignore # noqa - rx_chan: trio.abc.ReceiveChannel, - + rx_chan: trio.MemoryReceiveChannel, ) -> None: self._ctx = ctx self._rx_chan = rx_chan @@ -248,7 +248,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): async def subscribe( self, - ) -> BroadcastReceiver: + ) -> AsyncIterator[BroadcastReceiver]: '''Allocate and return a ``BroadcastReceiver`` which delegates to this message stream. @@ -261,21 +261,24 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): receiver wrapper. ''' + # NOTE: This operation is indempotent and non-reversible, so be + # sure you can deal with any (theoretical) overhead of the the + # allocated ``BroadcastReceiver`` before calling this method for + # the first time. if self._broadcaster is None: self._broadcaster = broadcast_receiver( self, - self._rx_chan._state.max_buffer_size, + self._rx_chan._state.max_buffer_size, # type: ignore ) - # override the original stream instance's receive to - # delegate to the broadcaster receive such that - # new subscribers will be copied received values - # XXX: this operation is indempotent and non-reversible, - # so be sure you can deal with any (theoretical) overhead - # of the the ``BroadcastReceiver`` before calling - # this method for the first time. - # XXX: why does this work without a recursion issue?! - self.receive = self._broadcaster.receive + # NOTE: we override the original stream instance's receive + # method to now delegate to the broadcaster's ``.receive()`` + # such that new subscribers will be copied received values + # and this stream doesn't have to expect it's original + # consumer(s) to get a new broadcast rx handle. + self.receive = self._broadcaster.receive # type: ignore + # seems there's no graceful way to type this with ``mypy``? + # https://github.com/python/mypy/issues/708 async with self._broadcaster.subscribe() as bstream: # a ``MsgStream`` clone is allocated for the From 346b5d2eda3e14dabcbda885241994e67e81cfed Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Aug 2021 11:14:47 -0400 Subject: [PATCH 11/32] Blade runner it Get rid of all the (requirements for) clones of the underlying receivable. We can just use a uuid generated key for each instance (thinking now this can probably just be `id(self)`). I'm fully convinced now that channel cloning is only a source of confusion and anti-patterns when we already have nurseries to define resource lifetimes. There is no benefit in particular when you allocate subscriptions using a context manager (not sure why `trio.open_memory_channel()` doesn't enforce this). Further refinements: - add a `._closed` state that will error the receiver on reuse - drop module script section; it's been moved to a real test - call the "receiver" duck-type stub a new name --- tractor/_broadcast.py | 178 ++++++++++++++++-------------------------- 1 file changed, 67 insertions(+), 111 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index bba8021..2b326f9 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -9,16 +9,15 @@ from collections import deque from contextlib import asynccontextmanager from dataclasses import dataclass from functools import partial -from itertools import cycle from operator import ne from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol from typing import Generic, TypeVar +from uuid import uuid4 import trio from trio._core._run import Task from trio.abc import ReceiveChannel from trio.lowlevel import current_task -import tractor # A regular invariant generic type @@ -30,14 +29,10 @@ T = TypeVar("T") ReceiveType = TypeVar("ReceiveType", covariant=True) -class CloneableReceiveChannel( +class AsyncReceiver( Protocol, Generic[ReceiveType], ): - @abstractmethod - def clone(self) -> CloneableReceiveChannel[ReceiveType]: - '''Clone this receiver usually by making a copy.''' - @abstractmethod async def receive(self) -> ReceiveType: '''Same as in ``trio``.''' @@ -56,7 +51,7 @@ class CloneableReceiveChannel( ... @abstractmethod - async def __aenter__(self) -> CloneableReceiveChannel[ReceiveType]: + async def __aenter__(self) -> AsyncReceiver[ReceiveType]: ... @abstractmethod @@ -75,14 +70,13 @@ class BroadcastState: ''' queue: deque - # map of underlying clones to receiver wrappers - # which must be provided as a singleton per broadcaster - # clone-subscription set. - subs: dict[CloneableReceiveChannel, int] + # map of underlying uuid keys to receiver instances which must be + # provided as a singleton per broadcaster set. + subs: dict[str, int] # broadcast event to wakeup all sleeping consumer tasks # on a newly produced value from the sender. - sender_ready: Optional[trio.Event] = None + recv_ready: Optional[tuple[str, trio.Event]] = None class BroadcastReceiver(ReceiveChannel): @@ -96,23 +90,26 @@ class BroadcastReceiver(ReceiveChannel): def __init__( self, - rx_chan: CloneableReceiveChannel, + key: str, + rx_chan: AsyncReceiver, state: BroadcastState, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, ) -> None: # register the original underlying (clone) + self.key = key self._state = state - state.subs[rx_chan] = -1 + state.subs[key] = -1 # underlying for this receiver self._rx = rx_chan self._recv = receive_afunc or rx_chan.receive + self._closed: bool = False async def receive(self): - key = self._rx + key = self.key state = self._state # TODO: ideally we can make some way to "lock out" the @@ -125,6 +122,9 @@ class BroadcastReceiver(ReceiveChannel): try: seq = state.subs[key] except KeyError: + if self._closed: + raise trio.ClosedResourceError + raise RuntimeError( f'{self} is not registerd as subscriber') @@ -157,41 +157,50 @@ class BroadcastReceiver(ReceiveChannel): # current task already has the latest value **and** is the # first task to begin waiting for a new one - if state.sender_ready is None: + if state.recv_ready is None: - event = state.sender_ready = trio.Event() - value = await self._recv() + if self._closed: + raise trio.ClosedResourceError - # items with lower indices are "newer" - state.queue.appendleft(value) + event = trio.Event() + state.recv_ready = key, event - # broadcast new value to all subscribers by increasing - # all sequence numbers that will point in the queue to - # their latest available value. + try: + value = await self._recv() - # don't decrement the sequence for this task since we - # already retreived the last value + # items with lower indices are "newer" + state.queue.appendleft(value) - # XXX: which of these impls is fastest? + # broadcast new value to all subscribers by increasing + # all sequence numbers that will point in the queue to + # their latest available value. - # subs = state.subs.copy() - # subs.pop(key) + # don't decrement the sequence for this task since we + # already retreived the last value - for sub_key in filter( - # lambda k: k != key, state.subs, - partial(ne, key), state.subs, - ): - state.subs[sub_key] += 1 + # XXX: which of these impls is fastest? - # reset receiver waiter task event for next blocking condition - event.set() - state.sender_ready = None - return value + # subs = state.subs.copy() + # subs.pop(key) + + for sub_key in filter( + # lambda k: k != key, state.subs, + partial(ne, key), state.subs, + ): + state.subs[sub_key] += 1 + + return value + + finally: + # reset receiver waiter task event for next blocking condition + event.set() + state.recv_ready = None # This task is all caught up and ready to receive the latest # value, so queue sched it on the internal event. else: - await state.sender_ready.wait() + _, ev = state.recv_ready + await ev.wait() seq = state.subs[key] state.subs[key] -= 1 return state.queue[seq] @@ -207,24 +216,22 @@ class BroadcastReceiver(ReceiveChannel): provided at creation. ''' - # if we didn't want to enforce "clone-ability" how would - # we key arbitrary subscriptions? Use a token system? - clone = self._rx.clone() + # use a uuid4 for a tee-instance token + key = str(uuid4()) state = self._state br = BroadcastReceiver( - rx_chan=clone, + key=key, + rx_chan=self._rx, state=state, + receive_afunc=self._recv, ) - assert clone in state.subs + # assert clone in state.subs + assert key in state.subs try: yield br finally: - # XXX: this is the reason this function is async: the - # ``AsyncResource`` api. - await clone.aclose() - # drop from subscribers and close - state.subs.pop(clone) + await br.aclose() # TODO: # - should there be some ._closed flag that causes @@ -235,22 +242,30 @@ class BroadcastReceiver(ReceiveChannel): async def aclose( self, ) -> None: + + if self._closed: + return + # XXX: leaving it like this consumers can still get values # up to the last received that still reside in the queue. # Is this what we want? - await self._rx.aclose() - self._state.subs.pop(self._rx) + self._state.subs.pop(self.key) + # if not self._state.subs: + # await self._rx.aclose() + + self._closed = True def broadcast_receiver( - recv_chan: CloneableReceiveChannel, + recv_chan: AsyncReceiver, max_buffer_size: int, **kwargs, ) -> BroadcastReceiver: return BroadcastReceiver( + str(uuid4()), recv_chan, state=BroadcastState( queue=deque(maxlen=max_buffer_size), @@ -258,62 +273,3 @@ def broadcast_receiver( ), **kwargs, ) - - -if __name__ == '__main__': - - async def main(): - - async with tractor.open_root_actor( - debug_mode=True, - # loglevel='info', - ): - - retries = 3 - size = 100 - tx, rx = trio.open_memory_channel(size) - rx = broadcast_receiver(rx, size) - - async def sub_and_print( - delay: float, - ) -> None: - - task = current_task() - lags = 0 - - while True: - async with rx.subscribe() as brx: - try: - async for value in brx: - print(f'{task.name}: {value}') - await trio.sleep(delay) - - except Lagged: - print( - f'restarting slow ass {task.name}' - f'that bailed out on {lags}:{value}') - if lags <= retries: - lags += 1 - continue - else: - print( - f'{task.name} was too slow and terminated ' - f'on {lags}:{value}') - return - - async with trio.open_nursery() as n: - for i in range(1, 10): - n.start_soon( - partial( - sub_and_print, - delay=i*0.01, - ), - name=f'sub_{i}', - ) - - async with tx: - for i in cycle(range(size)): - print(f'sending: {i}') - await tx.send(i) - - trio.run(main) From 236ed0b0dd79494a0995119bfd7d10a0e944db6e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Aug 2021 12:35:18 -0400 Subject: [PATCH 12/32] Initial broadcaster tests including one to test our `MsgStream.subscribe()` api --- tests/test_local_task_broadcast.py | 221 +++++++++++++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 tests/test_local_task_broadcast.py diff --git a/tests/test_local_task_broadcast.py b/tests/test_local_task_broadcast.py new file mode 100644 index 0000000..4daca09 --- /dev/null +++ b/tests/test_local_task_broadcast.py @@ -0,0 +1,221 @@ +""" +Broadcast channels for fan-out to local tasks. +""" +from functools import partial +from itertools import cycle +import time + +import trio +from trio.lowlevel import current_task +import tractor +from tractor._broadcast import broadcast_receiver, Lagged + + +@tractor.context +async def echo_sequences( + ctx: tractor.Context, +) -> None: + '''Bidir streaming endpoint which will stream + back any sequence it is sent item-wise. + + ''' + await ctx.started() + + async with ctx.open_stream() as stream: + async for sequence in stream: + seq = list(sequence) + for value in seq: + print(f'sending {value}') + await stream.send(value) + + +async def ensure_sequence( + stream: tractor.ReceiveMsgStream, + sequence: list, +) -> None: + + name = current_task().name + async with stream.subscribe() as bcaster: + assert not isinstance(bcaster, type(stream)) + async for value in bcaster: + print(f'{name} rx: {value}') + assert value == sequence[0] + sequence.remove(value) + + if not sequence: + # fully consumed + break + + +def test_stream_fan_out_to_local_subscriptions( + arb_addr, + start_method, +): + + sequence = list(range(1000)) + + async def main(): + + async with tractor.open_nursery( + arbiter_addr=arb_addr, + start_method=start_method, + ) as tn: + + portal = await tn.start_actor( + 'sequence_echoer', + enable_modules=[__name__], + ) + + async with portal.open_context( + echo_sequences, + ) as (ctx, first): + + assert first is None + async with ctx.open_stream() as stream: + + async with trio.open_nursery() as n: + for i in range(10): + n.start_soon( + ensure_sequence, + stream, + sequence.copy(), + name=f'consumer_{i}', + ) + + await stream.send(tuple(sequence)) + + async for value in stream: + print(f'source stream rx: {value}') + assert value == sequence[0] + sequence.remove(value) + + if not sequence: + # fully consumed + break + + await portal.cancel_actor() + + trio.run(main) + + +def test_ensure_slow_consumers_lag_out( + arb_addr, + start_method, +): + '''This is a pure local task test; no tractor + machinery is really required. + + ''' + async def main(): + + async with tractor.open_root_actor( + debug_mode=True, + ): + + num_laggers = 4 + laggers: dict[str, int] = {} + retries = 3 + size = 100 + tx, rx = trio.open_memory_channel(size) + brx = broadcast_receiver(rx, size) + + async def sub_and_print( + delay: float, + ) -> None: + + task = current_task() + start = time.time() + + async with brx.subscribe() as lbrx: + while True: + # await tractor.breakpoint() + print(f'{task.name}: starting consume loop') + try: + async for value in lbrx: + print(f'{task.name}: {value}') + await trio.sleep(delay) + + if task.name == 'sub_1': + # the non-lagger got + # a ``trio.EndOfChannel`` + # because the ``tx`` below was closed + assert len(lbrx._state.subs) == 1 + + await lbrx.aclose() + + assert len(lbrx._state.subs) == 0 + + except trio.ClosedResourceError: + # only the fast sub will try to re-enter + # iteration on the now closed bcaster + assert task.name == 'sub_1' + return + + except Lagged: + lag_time = time.time() - start + lags = laggers[task.name] + print( + f'restarting slow ass {task.name} ' + f'that bailed out on {lags}:{value} ' + f'after {lag_time:.3f}') + if lags <= retries: + laggers[task.name] += 1 + continue + else: + print( + f'{task.name} was too slow and terminated ' + f'on {lags}:{value}') + return + + async with trio.open_nursery() as nursery: + + for i in range(1, num_laggers): + + task_name = f'sub_{i}' + laggers[task_name] = 0 + nursery.start_soon( + partial( + sub_and_print, + delay=i*0.001, + ), + name=task_name, + ) + + # allow subs to sched + await trio.sleep(0.1) + + async with tx: + for i in cycle(range(size)): + await tx.send(i) + if len(brx._state.subs) == 2: + # only one, the non lagger, sub is left + break + + # the non-lagger + assert laggers.pop('sub_1') == 0 + + for n, v in laggers.items(): + assert v == 4 + + assert tx._closed + assert not tx._state.open_send_channels + + # check that "first" bcaster that we created + # above, never wass iterated and is thus overrun + try: + await brx.receive() + except Lagged: + # expect tokio style index truncation + assert brx._state.subs[brx.key] == len(brx._state.queue) - 1 + + # all backpressured entries in the underlying + # channel should have been copied into the caster + # queue trailing-window + async for i in rx: + print(f'bped: {i}') + assert i in brx._state.queue + + # should be noop + await brx.aclose() + + trio.run(main) From a4cb0ef21f9c4343b4a8dd7f6f3bf06712533075 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Aug 2021 12:36:05 -0400 Subject: [PATCH 13/32] Fix `.receive()` re-assignment, drop `.clone()` --- tractor/_streaming.py | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 1bb1b81..3dcc24b 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -51,10 +51,13 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: 'Context', # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, + shield: bool = False, + _broadcaster: Optional[BroadcastReceiver] = None, + ) -> None: self._ctx = ctx self._rx_chan = rx_chan - self._broadcaster: Optional[BroadcastReceiver] = None + self._broadcaster = _broadcaster # flag to denote end of stream self._eoc: bool = False @@ -234,16 +237,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # still need to consume msgs that are "in transit" from the far # end (eg. for ``Context.result()``). - def clone(self): - """Clone this receive channel allowing for multi-task - consumption from the same channel. - - """ - return type(self)( - self._ctx, - self._rx_chan.clone(), - ) - @asynccontextmanager async def subscribe( self, @@ -266,9 +259,12 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # allocated ``BroadcastReceiver`` before calling this method for # the first time. if self._broadcaster is None: - self._broadcaster = broadcast_receiver( + + bcast = self._broadcaster = broadcast_receiver( self, + # use memory channel size by default self._rx_chan._state.max_buffer_size, # type: ignore + receive_afunc=self.receive, ) # NOTE: we override the original stream instance's receive @@ -276,15 +272,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # such that new subscribers will be copied received values # and this stream doesn't have to expect it's original # consumer(s) to get a new broadcast rx handle. - self.receive = self._broadcaster.receive # type: ignore + self.receive = bcast.receive # type: ignore # seems there's no graceful way to type this with ``mypy``? # https://github.com/python/mypy/issues/708 async with self._broadcaster.subscribe() as bstream: - # a ``MsgStream`` clone is allocated for the - # broadcaster to track this entry's subscription - stream_clone = bstream._rx - assert stream_clone is not self yield bstream From 2bad2bac50371750a360683f68472bb484734a70 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Aug 2021 13:55:16 -0400 Subject: [PATCH 14/32] Don't enable debug mode..it borks CI --- tests/test_local_task_broadcast.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_local_task_broadcast.py b/tests/test_local_task_broadcast.py index 4daca09..3967016 100644 --- a/tests/test_local_task_broadcast.py +++ b/tests/test_local_task_broadcast.py @@ -13,7 +13,9 @@ from tractor._broadcast import broadcast_receiver, Lagged @tractor.context async def echo_sequences( + ctx: tractor.Context, + ) -> None: '''Bidir streaming endpoint which will stream back any sequence it is sent item-wise. @@ -108,9 +110,8 @@ def test_ensure_slow_consumers_lag_out( ''' async def main(): - async with tractor.open_root_actor( - debug_mode=True, - ): + # make sure it all works within the runtime + async with tractor.open_root_actor(): num_laggers = 4 laggers: dict[str, int] = {} From bec3f5999d8ad593556f99d2fe11e3207ad12173 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Aug 2021 13:04:17 -0400 Subject: [PATCH 15/32] Drop uuid4 keys, raise closed error on subscription after close --- tractor/_broadcast.py | 48 +++++++++++++++++++------------------------ 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index 2b326f9..9c29902 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -12,7 +12,6 @@ from functools import partial from operator import ne from typing import Optional, Callable, Awaitable, Any, AsyncIterator, Protocol from typing import Generic, TypeVar -from uuid import uuid4 import trio from trio._core._run import Task @@ -23,9 +22,8 @@ from trio.lowlevel import current_task # A regular invariant generic type T = TypeVar("T") -# The type of object produced by a ReceiveChannel (covariant because -# ReceiveChannel[Derived] can be passed to someone expecting -# ReceiveChannel[Base]) +# covariant because AsyncReceiver[Derived] can be passed to someone +# expecting AsyncReceiver[Base]) ReceiveType = TypeVar("ReceiveType", covariant=True) @@ -33,9 +31,13 @@ class AsyncReceiver( Protocol, Generic[ReceiveType], ): + '''An async receivable duck-type that quacks much like trio's + ``trio.abc.ReceieveChannel``. + + ''' @abstractmethod async def receive(self) -> ReceiveType: - '''Same as in ``trio``.''' + ... @abstractmethod def __aiter__(self) -> AsyncIterator[ReceiveType]: @@ -60,7 +62,10 @@ class AsyncReceiver( class Lagged(trio.TooSlowError): - '''Subscribed consumer task was too slow''' + '''Subscribed consumer task was too slow and was overrun + by the fastest consumer-producer pair. + + ''' @dataclass @@ -70,11 +75,11 @@ class BroadcastState: ''' queue: deque - # map of underlying uuid keys to receiver instances which must be - # provided as a singleton per broadcaster set. + # map of underlying instance id keys to receiver instances which + # must be provided as a singleton per broadcaster set. subs: dict[str, int] - # broadcast event to wakeup all sleeping consumer tasks + # broadcast event to wake up all sleeping consumer tasks # on a newly produced value from the sender. recv_ready: Optional[tuple[str, trio.Event]] = None @@ -84,13 +89,12 @@ class BroadcastReceiver(ReceiveChannel): fastest consumer. Additional consumer tasks can receive all produced values by registering - with ``.subscribe()`` and receiving from thew new instance it delivers. + with ``.subscribe()`` and receiving from the new instance it delivers. ''' def __init__( self, - key: str, rx_chan: AsyncReceiver, state: BroadcastState, receive_afunc: Optional[Callable[[], Awaitable[Any]]] = None, @@ -98,9 +102,9 @@ class BroadcastReceiver(ReceiveChannel): ) -> None: # register the original underlying (clone) - self.key = key + self.key = id(self) self._state = state - state.subs[key] = -1 + state.subs[self.key] = -1 # underlying for this receiver self._rx = rx_chan @@ -216,29 +220,23 @@ class BroadcastReceiver(ReceiveChannel): provided at creation. ''' - # use a uuid4 for a tee-instance token - key = str(uuid4()) + if self._closed: + raise trio.ClosedResourceError + state = self._state br = BroadcastReceiver( - key=key, rx_chan=self._rx, state=state, receive_afunc=self._recv, ) # assert clone in state.subs - assert key in state.subs + assert br.key in state.subs try: yield br finally: await br.aclose() - # TODO: - # - should there be some ._closed flag that causes - # consumers to die **before** they read all queued values? - # - if subs only open and close clones then the underlying - # will never be killed until the last instance closes? - # This is correct right? async def aclose( self, ) -> None: @@ -248,10 +246,7 @@ class BroadcastReceiver(ReceiveChannel): # XXX: leaving it like this consumers can still get values # up to the last received that still reside in the queue. - # Is this what we want? self._state.subs.pop(self.key) - # if not self._state.subs: - # await self._rx.aclose() self._closed = True @@ -265,7 +260,6 @@ def broadcast_receiver( ) -> BroadcastReceiver: return BroadcastReceiver( - str(uuid4()), recv_chan, state=BroadcastState( queue=deque(maxlen=max_buffer_size), From d7ad8982ffd03d169eb43e6a7e50ea1991991db9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Aug 2021 13:04:51 -0400 Subject: [PATCH 16/32] Add subscribe after close test --- tests/test_local_task_broadcast.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/tests/test_local_task_broadcast.py b/tests/test_local_task_broadcast.py index 3967016..fe30e33 100644 --- a/tests/test_local_task_broadcast.py +++ b/tests/test_local_task_broadcast.py @@ -100,6 +100,29 @@ def test_stream_fan_out_to_local_subscriptions( trio.run(main) +def test_subscribe_errors_after_close(): + + async def main(): + + size = 1 + tx, rx = trio.open_memory_channel(size) + async with broadcast_receiver(rx, size) as brx: + pass + + try: + # open and close + async with brx.subscribe(): + pass + + except trio.ClosedResourceError: + assert brx.key not in brx._state.subs + + else: + assert 0 + + trio.run(main) + + def test_ensure_slow_consumers_lag_out( arb_addr, start_method, @@ -129,7 +152,6 @@ def test_ensure_slow_consumers_lag_out( async with brx.subscribe() as lbrx: while True: - # await tractor.breakpoint() print(f'{task.name}: starting consume loop') try: async for value in lbrx: @@ -156,7 +178,7 @@ def test_ensure_slow_consumers_lag_out( lag_time = time.time() - start lags = laggers[task.name] print( - f'restarting slow ass {task.name} ' + f'restarting slow task {task.name} ' f'that bailed out on {lags}:{value} ' f'after {lag_time:.3f}') if lags <= retries: @@ -207,7 +229,8 @@ def test_ensure_slow_consumers_lag_out( await brx.receive() except Lagged: # expect tokio style index truncation - assert brx._state.subs[brx.key] == len(brx._state.queue) - 1 + seq = brx._state.subs[brx.key] + assert seq == len(brx._state.queue) - 1 # all backpressured entries in the underlying # channel should have been copied into the caster From 093e7d921c07a12140cfe637e05bd54ace1ebd09 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Aug 2021 14:46:52 -0400 Subject: [PATCH 17/32] Instance ids are ints --- tractor/_broadcast.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index 9c29902..907859c 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -77,7 +77,7 @@ class BroadcastState: # map of underlying instance id keys to receiver instances which # must be provided as a singleton per broadcaster set. - subs: dict[str, int] + subs: dict[int, int] # broadcast event to wake up all sleeping consumer tasks # on a newly produced value from the sender. From 0d70e3081aa2330eb62774182d441f04ca53bf72 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 13:03:20 -0400 Subject: [PATCH 18/32] Add laggy parent stream tests Add a couple more tests to check that a parent and sub-task stream can be lagged and recovered (depending on who's slower). Factor some of the test machinery into a new ctx mngr to make it all happen. --- tests/test_local_task_broadcast.py | 179 +++++++++++++++++++++++------ 1 file changed, 145 insertions(+), 34 deletions(-) diff --git a/tests/test_local_task_broadcast.py b/tests/test_local_task_broadcast.py index fe30e33..c2200b3 100644 --- a/tests/test_local_task_broadcast.py +++ b/tests/test_local_task_broadcast.py @@ -1,10 +1,13 @@ """ Broadcast channels for fan-out to local tasks. """ +from contextlib import asynccontextmanager from functools import partial from itertools import cycle import time +from typing import Optional +import pytest import trio from trio.lowlevel import current_task import tractor @@ -32,8 +35,11 @@ async def echo_sequences( async def ensure_sequence( + stream: tractor.ReceiveMsgStream, sequence: list, + delay: Optional[float] = None, + ) -> None: name = current_task().name @@ -44,11 +50,44 @@ async def ensure_sequence( assert value == sequence[0] sequence.remove(value) + if delay: + await trio.sleep(delay) + if not sequence: # fully consumed break +@asynccontextmanager +async def open_sequence_streamer( + + sequence: list[int], + arb_addr: tuple[str, int], + start_method: str, + +) -> tractor.MsgStream: + + async with tractor.open_nursery( + arbiter_addr=arb_addr, + start_method=start_method, + ) as tn: + + portal = await tn.start_actor( + 'sequence_echoer', + enable_modules=[__name__], + ) + + async with portal.open_context( + echo_sequences, + ) as (ctx, first): + + assert first is None + async with ctx.open_stream() as stream: + yield stream + + await portal.cancel_actor() + + def test_stream_fan_out_to_local_subscriptions( arb_addr, start_method, @@ -58,48 +97,120 @@ def test_stream_fan_out_to_local_subscriptions( async def main(): - async with tractor.open_nursery( - arbiter_addr=arb_addr, - start_method=start_method, - ) as tn: + async with open_sequence_streamer( + sequence, + arb_addr, + start_method, + ) as stream: - portal = await tn.start_actor( - 'sequence_echoer', - enable_modules=[__name__], - ) + async with trio.open_nursery() as n: + for i in range(10): + n.start_soon( + ensure_sequence, + stream, + sequence.copy(), + name=f'consumer_{i}', + ) - async with portal.open_context( - echo_sequences, - ) as (ctx, first): + await stream.send(tuple(sequence)) - assert first is None - async with ctx.open_stream() as stream: + async for value in stream: + print(f'source stream rx: {value}') + assert value == sequence[0] + sequence.remove(value) - async with trio.open_nursery() as n: - for i in range(10): - n.start_soon( - ensure_sequence, - stream, - sequence.copy(), - name=f'consumer_{i}', - ) - - await stream.send(tuple(sequence)) - - async for value in stream: - print(f'source stream rx: {value}') - assert value == sequence[0] - sequence.remove(value) - - if not sequence: - # fully consumed - break - - await portal.cancel_actor() + if not sequence: + # fully consumed + break trio.run(main) +@pytest.mark.parametrize( + 'task_delays', + [ + (0.01, 0.001), + (0.001, 0.01), + ] +) +def test_consumer_and_parent_maybe_lag( + arb_addr, + start_method, + task_delays, +): + + async def main(): + + sequence = list(range(1000)) + parent_delay, sub_delay = task_delays + + async with open_sequence_streamer( + sequence, + arb_addr, + start_method, + ) as stream: + + try: + async with trio.open_nursery() as n: + + n.start_soon( + ensure_sequence, + stream, + sequence.copy(), + sub_delay, + name='consumer_task', + ) + + await stream.send(tuple(sequence)) + + # async for value in stream: + lagged = False + lag_count = 0 + + while True: + try: + value = await stream.receive() + print(f'source stream rx: {value}') + + if lagged: + # re set the sequence starting at our last + # value + sequence = sequence[sequence.index(value) + 1:] + else: + assert value == sequence[0] + sequence.remove(value) + + lagged = False + + except Lagged: + lagged = True + print(f'source stream lagged after {value}') + lag_count += 1 + continue + + # lag the parent + await trio.sleep(parent_delay) + + if not sequence: + # fully consumed + break + print(f'parent + source stream lagged: {lag_count}') + + if parent_delay > sub_delay: + assert lag_count > 0 + + except Lagged: + # child was lagged + assert parent_delay < sub_delay + + trio.run(main) + + +# TODO: +# def test_first_task_to_recv_is_cancelled(): +# ... + + def test_subscribe_errors_after_close(): async def main(): From 63ec740e27dd33581cda9b2ba34cf27855f2136d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 13:06:17 -0400 Subject: [PATCH 19/32] Add some bcaster ref sanity asserts around subscriptions --- tractor/_streaming.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 3dcc24b..9d832b2 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -9,7 +9,7 @@ from dataclasses import dataclass from typing import ( Any, Iterator, Optional, Callable, AsyncGenerator, Dict, - AsyncIterator, Awaitable + AsyncIterator ) import warnings @@ -264,7 +264,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, # use memory channel size by default self._rx_chan._state.max_buffer_size, # type: ignore - receive_afunc=self.receive, + receive_afunc=self.receive, ) # NOTE: we override the original stream instance's receive @@ -277,6 +277,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # https://github.com/python/mypy/issues/708 async with self._broadcaster.subscribe() as bstream: + assert bstream.key != self._broadcaster.key + assert bstream._recv == self._broadcaster._recv yield bstream From 39cf9af9fc143739eaf149e77111a8571620a0d0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 14:02:29 -0400 Subject: [PATCH 20/32] Rename test module --- tests/{test_local_task_broadcast.py => test_task_broadcasting.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/{test_local_task_broadcast.py => test_task_broadcasting.py} (100%) diff --git a/tests/test_local_task_broadcast.py b/tests/test_task_broadcasting.py similarity index 100% rename from tests/test_local_task_broadcast.py rename to tests/test_task_broadcasting.py From 5182ee7782d02f8d436fe86fde3334635a74345e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 17:42:48 -0400 Subject: [PATCH 21/32] Add a "faster task is cancelled" test --- tests/test_task_broadcasting.py | 81 +++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 5 deletions(-) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index c2200b3..e9e5500 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -30,8 +30,8 @@ async def echo_sequences( async for sequence in stream: seq = list(sequence) for value in seq: - print(f'sending {value}') await stream.send(value) + print(f'producer sent {value}') async def ensure_sequence( @@ -64,6 +64,7 @@ async def open_sequence_streamer( sequence: list[int], arb_addr: tuple[str, int], start_method: str, + shield: bool = False, ) -> tractor.MsgStream: @@ -82,7 +83,7 @@ async def open_sequence_streamer( ) as (ctx, first): assert first is None - async with ctx.open_stream() as stream: + async with ctx.open_stream(shield=shield) as stream: yield stream await portal.cancel_actor() @@ -206,9 +207,79 @@ def test_consumer_and_parent_maybe_lag( trio.run(main) -# TODO: -# def test_first_task_to_recv_is_cancelled(): -# ... +def test_faster_task_to_recv_is_cancelled_by_slower( + arb_addr, + start_method, +): + '''Ensure that if a faster task consuming from a stream is cancelled + the slower task can continue to receive all expected values. + + ''' + async def main(): + + sequence = list(range(1000)) + + async with open_sequence_streamer( + sequence, + arb_addr, + start_method, + + # NOTE: this MUST be set to avoid the stream terminating + # early when the faster subtask is cancelled by the slower + # parent task. + shield=True, + + ) as stream: + + # alt to passing kwarg above. + # with stream.shield(): + + async with trio.open_nursery() as n: + n.start_soon( + ensure_sequence, + stream, + sequence.copy(), + 0, + name='consumer_task', + ) + + await stream.send(tuple(sequence)) + + # pull 3 values, cancel the subtask, then + # expect to be able to pull all values still + for i in range(20): + try: + value = await stream.receive() + print(f'source stream rx: {value}') + await trio.sleep(0.01) + except Lagged: + print(f'parent overrun after {value}') + continue + + print('cancelling faster subtask') + n.cancel_scope.cancel() + + try: + value = await stream.receive() + print(f'source stream after cancel: {value}') + except Lagged: + print(f'parent overrun after {value}') + + # expect to see all remaining values + with trio.fail_after(0.5): + async for value in stream: + assert stream._broadcaster._state.recv_ready is None + print(f'source stream rx: {value}') + if value == 999: + # fully consumed and we missed no values once + # the faster subtask was cancelled + break + + # await tractor.breakpoint() + # await stream.receive() + print(f'final value: {value}') + + trio.run(main) def test_subscribe_errors_after_close(): From 7857a9ac6dd41707eb1bcf7257ddeeb4c93d1d04 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 17:43:59 -0400 Subject: [PATCH 22/32] Add `shield: bool` kwarg to `Portal.open_stream_from()` --- tractor/_portal.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 44e8630..63c59ed 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -294,6 +294,7 @@ class Portal: async def open_stream_from( self, async_gen_func: Callable, # typing: ignore + shield: bool = False, **kwargs, ) -> AsyncGenerator[ReceiveMsgStream, None]: @@ -320,7 +321,9 @@ class Portal: ctx = Context(self.channel, cid, _portal=self) try: # deliver receive only stream - async with ReceiveMsgStream(ctx, recv_chan) as rchan: + async with ReceiveMsgStream( + ctx, recv_chan, shield=shield + ) as rchan: self._streams.add(rchan) yield rchan From d9e793d4ba913c3dfd5dad10372d383c577b8be6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 17:50:26 -0400 Subject: [PATCH 23/32] Can't use built-in generics till 3.9... --- tests/test_task_broadcasting.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index e9e5500..f9b00e5 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -5,7 +5,7 @@ from contextlib import asynccontextmanager from functools import partial from itertools import cycle import time -from typing import Optional +from typing import Optional, List, Tuple import pytest import trio @@ -61,8 +61,8 @@ async def ensure_sequence( @asynccontextmanager async def open_sequence_streamer( - sequence: list[int], - arb_addr: tuple[str, int], + sequence: List[int], + arb_addr: Tuple[str, int], start_method: str, shield: bool = False, From 44ef26bb186514ae066a4a10473e259343b9cf50 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 18:19:43 -0400 Subject: [PATCH 24/32] Shorten default feeder mem chan size to 64 --- tractor/_actor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 0dbaede..f84a597 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -567,7 +567,7 @@ class Actor: try: send_chan, recv_chan = self._cids2qs[(actorid, cid)] except KeyError: - send_chan, recv_chan = trio.open_memory_channel(1000) + send_chan, recv_chan = trio.open_memory_channel(2**6) send_chan.cid = cid # type: ignore recv_chan.cid = cid # type: ignore self._cids2qs[(actorid, cid)] = send_chan, recv_chan From 5c6355062c0f2110631fcd360fce58ec8dac2a0b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 18:20:04 -0400 Subject: [PATCH 25/32] Shorten sequence length for test speedup --- tests/test_task_broadcasting.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index f9b00e5..34a761a 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -142,7 +142,7 @@ def test_consumer_and_parent_maybe_lag( async def main(): - sequence = list(range(1000)) + sequence = list(range(300)) parent_delay, sub_delay = task_delays async with open_sequence_streamer( From 9258f79510899d5ad38af662fc38661744b96a44 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 18:30:06 -0400 Subject: [PATCH 26/32] Don't wake sibling bcast consumers on a cancelled call --- tractor/_broadcast.py | 42 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index 907859c..e375ef2 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -111,7 +111,7 @@ class BroadcastReceiver(ReceiveChannel): self._recv = receive_afunc or rx_chan.receive self._closed: bool = False - async def receive(self): + async def receive(self) -> ReceiveType: key = self.key state = self._state @@ -169,9 +169,11 @@ class BroadcastReceiver(ReceiveChannel): event = trio.Event() state.recv_ready = key, event + # if we're cancelled here it should be + # fine to bail without affecting any other consumers + # right? try: value = await self._recv() - # items with lower indices are "newer" state.queue.appendleft(value) @@ -193,21 +195,51 @@ class BroadcastReceiver(ReceiveChannel): ): state.subs[sub_key] += 1 + # NOTE: this should ONLY be set if the above task was *NOT* + # cancelled on the `._recv()` call otherwise sibling + # consumers will be awoken with a sequence of -1 + event.set() + return value finally: - # reset receiver waiter task event for next blocking condition - event.set() + # Reset receiver waiter task event for next blocking condition. + # this MUST be reset even if the above ``.recv()`` call + # was cancelled to avoid the next consumer from blocking on + # an event that won't be set! state.recv_ready = None # This task is all caught up and ready to receive the latest # value, so queue sched it on the internal event. else: + seq = state.subs[key] + assert seq == -1 # sanity _, ev = state.recv_ready await ev.wait() + seq = state.subs[key] + assert seq > -1, f'Invalid sequence {seq}!?' + + value = state.queue[seq] state.subs[key] -= 1 - return state.queue[seq] + return value + + # NOTE: if we ever would like the behaviour where if the + # first task to recv on the underlying is cancelled but it + # still DOES trigger the ``.recv_ready``, event we'll likely need + # this logic: + + # if seq > -1: + # # stuff from above.. + # elif seq == -1: + # # XXX: In the case where the first task to allocate the + # # ``.recv_ready`` event is cancelled we will be woken with + # # a non-incremented sequence number and thus will read the + # # oldest value if we use that. Instead we need to detect if + # # we have not been incremented and then receive again. + # return await self.receive() + # else: + # raise ValueError(f'Invalid sequence {seq}!?') @asynccontextmanager async def subscribe( From d9bb52fe7b230c722815c65444fb8f00f0748604 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 Aug 2021 21:02:48 -0400 Subject: [PATCH 27/32] Store array `maxlen` in state singleton The `collections.deque` takes care of array length truncation of values for us implicitly but in the future we'll likely want this value exposed to alternate array implementations. This patch is to provide for that as well as make `mypy` happy since the `dequeu.maxlen` can also be `None`. --- tractor/_broadcast.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index e375ef2..b60f45c 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -74,6 +74,7 @@ class BroadcastState: ''' queue: deque + maxlen: int # map of underlying instance id keys to receiver instances which # must be provided as a singleton per broadcaster set. @@ -81,7 +82,7 @@ class BroadcastState: # broadcast event to wake up all sleeping consumer tasks # on a newly produced value from the sender. - recv_ready: Optional[tuple[str, trio.Event]] = None + recv_ready: Optional[tuple[int, trio.Event]] = None class BroadcastReceiver(ReceiveChannel): @@ -150,7 +151,7 @@ class BroadcastReceiver(ReceiveChannel): # decrement to the last value and expect # consumer to either handle the ``Lagged`` and come back # or bail out on its own (thus un-subscribing) - state.subs[key] = state.queue.maxlen - 1 + state.subs[key] = state.maxlen - 1 # this task was overrun by the producer side task: Task = current_task() @@ -174,7 +175,12 @@ class BroadcastReceiver(ReceiveChannel): # right? try: value = await self._recv() + # items with lower indices are "newer" + # NOTE: ``collections.deque`` implicitly takes care of + # trucating values outside our ``state.maxlen``. In the + # alt-backend-array-case we'll need to make sure this is + # implemented in similar ringer-buffer-ish style. state.queue.appendleft(value) # broadcast new value to all subscribers by increasing @@ -295,6 +301,7 @@ def broadcast_receiver( recv_chan, state=BroadcastState( queue=deque(maxlen=max_buffer_size), + maxlen=max_buffer_size, subs={}, ), **kwargs, From b7b489dd0754338fcc46915cfe3c2b44856a5ec3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Sep 2021 10:38:31 -0400 Subject: [PATCH 28/32] Drop shielded stream api usage --- tests/test_task_broadcasting.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 34a761a..55f6d3f 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -64,7 +64,6 @@ async def open_sequence_streamer( sequence: List[int], arb_addr: Tuple[str, int], start_method: str, - shield: bool = False, ) -> tractor.MsgStream: @@ -83,7 +82,7 @@ async def open_sequence_streamer( ) as (ctx, first): assert first is None - async with ctx.open_stream(shield=shield) as stream: + async with ctx.open_stream() as stream: yield stream await portal.cancel_actor() @@ -224,16 +223,8 @@ def test_faster_task_to_recv_is_cancelled_by_slower( arb_addr, start_method, - # NOTE: this MUST be set to avoid the stream terminating - # early when the faster subtask is cancelled by the slower - # parent task. - shield=True, - ) as stream: - # alt to passing kwarg above. - # with stream.shield(): - async with trio.open_nursery() as n: n.start_soon( ensure_sequence, From 5881a82d2afe99b6e23b59f0e08cb9c17818f982 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Sep 2021 11:27:52 -0400 Subject: [PATCH 29/32] Add a first receiver is cancelled test --- tests/test_task_broadcasting.py | 41 +++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 55f6d3f..8265197 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -416,3 +416,44 @@ def test_ensure_slow_consumers_lag_out( await brx.aclose() trio.run(main) + + +def test_first_recver_is_cancelled(): + + async def main(): + + # make sure it all works within the runtime + async with tractor.open_root_actor(): + + tx, rx = trio.open_memory_channel(1) + brx = broadcast_receiver(rx, 1) + cs = trio.CancelScope() + sequence = list(range(3)) + + async def sub_and_recv(): + with cs: + async with brx.subscribe() as bc: + async for value in bc: + print(value) + + async def cancel_and_send(): + await trio.sleep(0.2) + cs.cancel() + await tx.send(1) + + async with trio.open_nursery() as n: + + n.start_soon(sub_and_recv) + await trio.sleep(0.1) + assert brx._state.recv_ready + + n.start_soon(cancel_and_send) + + # ensure that we don't hang because no-task is now + # waiting on the underlying receive.. + with trio.fail_after(0.5): + value = await brx.receive() + print(f'parent: {value}') + assert value == 1 + + trio.run(main) From 2745a2b1dca96f97b41420a2ce83f8183dc00d65 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Sep 2021 11:39:56 -0400 Subject: [PATCH 30/32] Solve first-recv-cancelled by recursive `.receive()` on wake --- tractor/_broadcast.py | 49 ++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 21 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index b60f45c..e26c637 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -202,13 +202,20 @@ class BroadcastReceiver(ReceiveChannel): state.subs[sub_key] += 1 # NOTE: this should ONLY be set if the above task was *NOT* - # cancelled on the `._recv()` call otherwise sibling - # consumers will be awoken with a sequence of -1 + # cancelled on the `._recv()` call. event.set() - return value + except trio.Cancelled: + # handle cancelled specially otherwise sibling + # consumers will be awoken with a sequence of -1 + # state.recv_ready = trio.Cancelled + if event.statistics().tasks_waiting: + event.set() + raise + finally: + # Reset receiver waiter task event for next blocking condition. # this MUST be reset even if the above ``.recv()`` call # was cancelled to avoid the next consumer from blocking on @@ -223,29 +230,29 @@ class BroadcastReceiver(ReceiveChannel): _, ev = state.recv_ready await ev.wait() - seq = state.subs[key] - assert seq > -1, f'Invalid sequence {seq}!?' - - value = state.queue[seq] - state.subs[key] -= 1 - return value - # NOTE: if we ever would like the behaviour where if the # first task to recv on the underlying is cancelled but it # still DOES trigger the ``.recv_ready``, event we'll likely need # this logic: - # if seq > -1: - # # stuff from above.. - # elif seq == -1: - # # XXX: In the case where the first task to allocate the - # # ``.recv_ready`` event is cancelled we will be woken with - # # a non-incremented sequence number and thus will read the - # # oldest value if we use that. Instead we need to detect if - # # we have not been incremented and then receive again. - # return await self.receive() - # else: - # raise ValueError(f'Invalid sequence {seq}!?') + if seq > -1: + # stuff from above.. + seq = state.subs[key] + + value = state.queue[seq] + state.subs[key] -= 1 + return value + + elif seq == -1: + # XXX: In the case where the first task to allocate the + # ``.recv_ready`` event is cancelled we will be woken with + # a non-incremented sequence number and thus will read the + # oldest value if we use that. Instead we need to detect if + # we have not been incremented and then receive again. + return await self.receive() + + else: + raise ValueError(f'Invalid sequence {seq}!?') @asynccontextmanager async def subscribe( From bcf5b9fd1869f72818b74635b385d8c2b90d6e48 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Sep 2021 13:16:21 -0400 Subject: [PATCH 31/32] Add news fragment --- newsfragments/229.feature.rst | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 newsfragments/229.feature.rst diff --git a/newsfragments/229.feature.rst b/newsfragments/229.feature.rst new file mode 100644 index 0000000..bda005c --- /dev/null +++ b/newsfragments/229.feature.rst @@ -0,0 +1,12 @@ +Add `tokio-style broadcast channels +`_ as +a solution for `#204 `_ and +discussed thoroughly in `trio/#987 +`_. + +This gives us local task broadcast functionality using a new +``BroadcastReceiver`` type which can wrap ``trio.ReceiveChannel`` and +provide fan-out copies of a stream of data to every subscribed consumer. +We use this new machinery to provide a ``ReceiveMsgStream.subscribe()`` +async context manager which can be used by actor-local concumers tasks +to easily pull from a shared and dynamic IPC stream. From 1137a9e7ac949602bdc358dbce965196fea91bb7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Sep 2021 16:08:41 -0400 Subject: [PATCH 32/32] Fix 404ed tokio urls --- tractor/_broadcast.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_broadcast.py b/tractor/_broadcast.py index e26c637..51a9be8 100644 --- a/tractor/_broadcast.py +++ b/tractor/_broadcast.py @@ -1,6 +1,6 @@ ''' ``tokio`` style broadcast channel. -https://tokio-rs.github.io/tokio/doc/tokio/sync/broadcast/index.html +https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html ''' from __future__ import annotations @@ -146,7 +146,7 @@ class BroadcastReceiver(ReceiveChannel): # receiver's position is updated to the oldest value # contained by the channel. The next call to recv will # return this value." - # https://tokio-rs.github.io/tokio/doc/tokio/sync/broadcast/index.html#lagging + # https://docs.rs/tokio/1.11.0/tokio/sync/broadcast/index.html#lagging # decrement to the last value and expect # consumer to either handle the ``Lagged`` and come back