Clearly this wasn't developed against a task that spawned just an async
func in `asyncio`.. Fix all that and remove a bunch of unnecessary func
layers. Add provisional support for the target receiving the `to_trio`
and `from_trio` channels and for the @tractor.stream marker.
This should mostly maintain top level SC principles for any task spawned
using `tractor.to_asyncio.run()`. When the `asyncio` task completes make
sure to cancel the pertaining `trio` cancel scope and raise any error
that may have resulted. This interface uses `trio`'s "guest-mode" to run
`asyncio` loop using a special entrypoint which is handed to Python
during process spawn.
If the one side of an inter-actor context cancels the other then that
side should always expect back a `ContextCancelled` message. However we
should not set this error in this case (where the cancel request was
sent and a `ContextCancelled` msg was received back) since it may
override some other error that caused the cancellation request to be
sent out in the first place. As an example when a context opens another
context to a peer and some error happens which causes the second peer
context to be cancelled but we want to propagate the original error.
Fixes the issue found in https://github.com/pikers/piker/issues/244
The underlying issue is actually that a nested `Context` which was
cancelled was overriding the local error that triggered that secondary's
context's cancellation in the first place XD. This test catches that
case.
Relates to https://github.com/pikers/piker/issues/244
After more extensive testing I realized that keying on the context
manager *instance id* isn't going to work since each entering task is
going to create a unique key XD
Instead pass the manager function as `acm_func` and optionally allow
keying the resource on the passed `kwargs` (if hashable) or the
`key:str`. Further, pass the key to the enterer task and avoid
a separate keying scheme for the manager versus the value it delivers.
Don't bother with checking and releasing the lock in `finally:` block,
it should be an error if it's still locked.
This actually catches a lot of bugs to do with stream termination and
``MsgStream.subscribe()`` usage where the underlying stream closes from
the producer side. When this passes the broadcaster logic will have to
ensure non-lossy fan out semantics and closure tracking.
Without this wakeup you can have tasks which re-enter `.receive()`
and get stuck waiting on the wakeup event indefinitely. Whenever
a ``trio.EndOfChannel`` arrives we want to make sure all consumers
at least know about it and don't block. This previous behaviour was
basically a bug.
Add some state flags for tracking if the broadcaster was either
cancelled or terminated via EOC mostly for testing and debugging
purposes though this info might be useful if we decide to offer
a `.statistics()` like API in the future.