Follow up to previous commit: extend our simple context test set to
include cancellation via kbi in the parent as well as timeout logic and
testing of the parent opening a stream even though the target actor does
not.
Thanks again to https://github.com/adder46/wrath for discovering this
bug.
Not sure we even have a test for this yet. The main issue discovered by
a user project (https://github.com/adder46/wrath) was that a kbi raised
inside a block like this (with both recv-only and send-recv streams)
would not cancel on the first ctrl-c sent from console and instead
SIGiNT had to be repeatedly sent as many times as there are subactors in
the first level tree. This test catches that as well as just verifies
the basic side-by-side functionality.
The `collections.deque` takes care of array length truncation of values
for us implicitly but in the future we'll likely want this value exposed
to alternate array implementations. This patch is to provide for that as
well as make `mypy` happy since the `dequeu.maxlen` can also be `None`.
Add a couple more tests to check that a parent and sub-task stream can
be lagged and recovered (depending on who's slower). Factor some of the
test machinery into a new ctx mngr to make it all happen.
Get rid of all the (requirements for) clones of the underlying
receivable. We can just use a uuid generated key for each instance
(thinking now this can probably just be `id(self)`). I'm fully convinced
now that channel cloning is only a source of confusion and anti-patterns
when we already have nurseries to define resource lifetimes. There is no
benefit in particular when you allocate subscriptions using a context
manager (not sure why `trio.open_memory_channel()` doesn't enforce
this).
Further refinements:
- add a `._closed` state that will error the receiver on reuse
- drop module script section; it's been moved to a real test
- call the "receiver" duck-type stub a new name
This allows for wrapping an existing stream by re-assigning its receive
method to the allocated broadcaster's `.receive()` so as to avoid
expecting any original consumer(s) of the stream to now know about the
broadcaster; this instead mutates the stream to delegate to the new
receive call behind the scenes any time `.subscribe()` is called.
Add a `typing.Protocol` for so called "cloneable channels" until we
decide/figure out a better keying system for each subscription and
mask all undesired typing failures.
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast
receiver around the stream for use by multiple actor-local consumer
tasks. Entering this context manager idempotently mutates the stream's
receive machinery which for now can not be undone. Move `.clone()` to
the receive stream type.
Resolves#204
For every set of broadcast receivers which pull from the same producer,
we need a singleton state for all of,
- subscriptions
- the sender ready event
- the queue
Add a `BroadcastState` dataclass for this and pass it to all
subscriptions. This makes the design much more like the built-in memory
channels which do something very similar with `MemoryChannelState`.
Use a `filter()` on the subs list in the sequence update step, plus some
other commented approaches we can try for speed.
Using the current task as a subscription key fails horribly as soon as
you hand off new subscription receiver to another task you've spawned..
Instead use the underlying ``trio.abc.ReceiveChannel.clone()`` as a key
(so i guess we're assuming cloning is supported by the underlying?)
which makes this all work just like default mem chans. As a bonus, now
we can just close the underlying rx (which may be a clone) on
`.aclose()` and everything should just work in terms of the underlying
channels lifetime (i think?).
Change `.subscribe()` to be async since the receive channel type
interface only expects `.aclose()` and it actually ends up being
nicer for 3.9+ style `async with` parentheses style anyway.
Buncha improvements:
- pass in the queue via constructor
- tracking over all underlying memory channel closure using cloning
- do it like `tokio` and set lagged consumers to the last sequence
before raising
- copy the subs on first receiver wakeup for iteration instead of
iterating the table directly (and being forced to skip the current
tasks sequence increment)
- implement `.aclose()` to close the underlying clone for this task
- make `broadcast_receiver()` just take the recv chan since it doesn't
need anything on the send side.