This change some super old (and bad) code from the project's very early
days. For some redic reason i must have thought masking `trio`'s
internal stream / transport errors and a TCP EOF as `StopAsyncIteration`
somehow a good idea. The reality is you probably
want to know the difference between an unexpected transport error
and a simple EOF lol. This begins to resolve that by adding our own
special `TransportClosed` error to signal the "graceful" termination of
a channel's underlying transport. Oh, and this builds on the `msgspec`
integration which helped shed light on the core issues here B)
Add a `tractor._ipc.MsgspecStream` type which can be swapped in for
`msgspec` serialization transparently. A small msg-length-prefix framing
is implemented as part of the type and we use
`tricycle.BufferedReceieveStream` to handle buffering logic for the
underlying transport.
Notes:
- had to force cast a few more list -> tuple spots due to no native
`tuple`decode-by-default in `msgspec`: https://github.com/jcrist/msgspec/issues/30
- the framing can be understood by this protobuf walkthrough:
https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers
- `tricycle` becomes a new dependency
Can only really use an encoder currently since there is no streaming api
in `msgspec` as of currently. See jcrist/msgspec#27.
Not sure if any encoding speedups are currently noticeable especially
without any validation going on yet XD.
First experiments toward #196
- drop `shield` input to `MsgStream`
- check for cancel called prior to loading the feeder mem chan
in `Context.open_stream()`
- warn on a timeout when trying to cancel a remote task from
`Context.cancel()`
- drop noop endofchannel handler block
Follow up to previous commit: extend our simple context test set to
include cancellation via kbi in the parent as well as timeout logic and
testing of the parent opening a stream even though the target actor does
not.
Thanks again to https://github.com/adder46/wrath for discovering this
bug.
Not sure we even have a test for this yet. The main issue discovered by
a user project (https://github.com/adder46/wrath) was that a kbi raised
inside a block like this (with both recv-only and send-recv streams)
would not cancel on the first ctrl-c sent from console and instead
SIGiNT had to be repeatedly sent as many times as there are subactors in
the first level tree. This test catches that as well as just verifies
the basic side-by-side functionality.
The `collections.deque` takes care of array length truncation of values
for us implicitly but in the future we'll likely want this value exposed
to alternate array implementations. This patch is to provide for that as
well as make `mypy` happy since the `dequeu.maxlen` can also be `None`.
Add a couple more tests to check that a parent and sub-task stream can
be lagged and recovered (depending on who's slower). Factor some of the
test machinery into a new ctx mngr to make it all happen.
Get rid of all the (requirements for) clones of the underlying
receivable. We can just use a uuid generated key for each instance
(thinking now this can probably just be `id(self)`). I'm fully convinced
now that channel cloning is only a source of confusion and anti-patterns
when we already have nurseries to define resource lifetimes. There is no
benefit in particular when you allocate subscriptions using a context
manager (not sure why `trio.open_memory_channel()` doesn't enforce
this).
Further refinements:
- add a `._closed` state that will error the receiver on reuse
- drop module script section; it's been moved to a real test
- call the "receiver" duck-type stub a new name
This allows for wrapping an existing stream by re-assigning its receive
method to the allocated broadcaster's `.receive()` so as to avoid
expecting any original consumer(s) of the stream to now know about the
broadcaster; this instead mutates the stream to delegate to the new
receive call behind the scenes any time `.subscribe()` is called.
Add a `typing.Protocol` for so called "cloneable channels" until we
decide/figure out a better keying system for each subscription and
mask all undesired typing failures.