We don't need to any more presuming you get ideal remote cancellation
conditions where the remote actor should teardown and kill the streams
from its end.
On msg loop termination we now check and see if a channel is associated
with a child-actor registered in some local task's nursery. If so, we
attempt to wait on channel closure initiated from the child side (by
draining the underlying msg stream) so as to avoid closing it too early
resulting in the child not relaying its termination status response. This
means we now support the ideal case in 2-general's where we get back the
ack to the closure request instead of just ignoring it and timing out XD
The main implementation detail is that when `Portal.cancel_actor()`
remotely calls `Actor.cancel()` we actually wait for the RPC response
from that request before allowing the channel shutdown sequence to
engage. The new msg stream draining support enables this.
Also, factor child-to-parent error propagation logic into a helper func
and improve some docs (yeah yeah y'all don't like the ''', i don't
care - it makes my eyes not hurt).
Use a `trio.Event` to enable nursery closure detection such that core
runtime tasks can be notified when a local nursery exits and allow
shutdown protocols to operate without close-before-terminate issues
(such as IPC channel closure during remote peer cancellation).
Enables "draining" the last set of messages after a channel/stream has
been terminated mostly for the purposes of receiving a final ACK to
a remote cancel command. Also, add an internal `Channel._cancel_called`
flag which can be set by `Portal.cancel_actor()`.
It's definitely possible to have a nursery spawn task be cancelled
before a `trio.Process` handle is ever returned; we now handle this
case as a cancelled-during-spawn scenario. Zombie collection logic
also is bypassed in this case.
Thanks to @richardsheridan for pointing out the limitations of using
*any* kind of value as the result-cached-flag and how it might cause
problems for anyone returning pickled blob-data. This changes the
`Portal` internal result value tracking to stash the full message from
which the value can be retrieved by any `Portal.result()` caller.
The internal change is that `Portal._return_once()` now returns a tuple
of the message *and* its value.
Fixes the issue where if the main remote task returns `None`,
`Portal.result()` would erroneously wait again on the underlying feeder
mem chan since `None` was being used as the cache flag. Instead set the
flag as the channel uid and consider the result collected when set to
anything else (since it would be odd to return that value from a remote
task when you already can read it as part of portal/channel apis).
The api we've made here is actually closer to `asyncio.gather()` but
with opening async context managers instead of funcs. Use another event
to allow for graceful teardown of children on non-cancellation exits
and add a doc string.
Since it seems we're building out more and more higher level primitives
in order to support certain parallel style actor trees and messaging
patterns (eg. task broadcast channels), we might as well start a new
sub-package for purely `trio` constructions. We hereby dub this
the realm of `trionics` (like electronics but for trios instead of
electrons).
To kick things off, add an `async_enter_all()` concurrent
exit-stack-like context manager API which will concurrently spawn
a sequence of provided async context managers and deliver their ordered
results but with proper support for `trio` cancellation semantics.
The stdlib's `AsyncExitStack` is not compatible with nurseries not
`trio` tasks (which are cancelled) since as task will be suspended on
the stack after push and does not ever hit a checkpoint until the stack
is closed.
This is actually surprisingly easy to grok having gone through a lot of
pain understanding edge cases in the zombie lord dev branch. Basically
we just need to make sure actors are managed in a 2 step reap sequence.
In the "soft" reap phase we wait for the process to terminate on its own
concurrently with (maybe) waiting for its portal's final result (if it's
a `.run_in_actor()`). If this path is cancelled or errors, then we do
a "hard" reap where we timeout and send a signal to the proc to
terminate immediately. The only last remaining trick is to tie in the
root-is-debugger-aware logic to yet again avoid tty clobbers.
As for `Actor.cancel()` requests, do the same for
`Actor._cancel_task()` but use `_invoke()` to ensure
correct msg transactions with caller. Don't cancel task
cancels on a cancel-all-tasks operation in attempt at
more determinism.
Now that we're on our way to a (somewhat) serious beta release I think
it's about time to start de-noising the logging emissions. Since we're
trying out this approach of "stack layer oriented" log levels, I figured
this is a good time to move most of the "warnings" to what they should
be: cancellation monitoring status messages. The level is set to 16
which is just above our "runtime" level but just below the traditional
"info" level. I think this will be a decent approach since usually if
you're confused about why your `tractor` app is behaving unlike you
expect, it's 90% of the time going to be to do with cancellation or
error propagation. This this setup a user can specify the 'cancel' level
and see all the msgs pertaining to both actor and task-in-actor
cancellation mechanics.
The stdlib's `logging.LoggingAdapter` doesn't currently pass through
`stacklevel: int` down to its wrapped logger instance. Hack it here
and get our msgs looking like they would if using a built-in level.
In an effort to have some kind of more formal interface around the
transport layer, add a `MsgTransport` protocol type and use with
the channel composition of message streams. Start a little "key map"
of `(<codec>, <protocol>)` to `MsgTransport` types which can be
dynamically loaded. Add a `Channel.from_stream()` constructor thus
cleaning up the mangled logic that was in the constructor based on
inputs. Drop all the "auto reconnect" channel logic for now since
nothing is using it (internally) and it's likely it will need rework
once we bring in a protocol besides TCP.
`msgspec` sends python lists over the wire
(https://github.com/jcrist/msgspec/issues/30) which is fine and dandy
but we use them as lookup keys so we need to be sure we tuple-cast
first.
This change some super old (and bad) code from the project's very early
days. For some redic reason i must have thought masking `trio`'s
internal stream / transport errors and a TCP EOF as `StopAsyncIteration`
somehow a good idea. The reality is you probably
want to know the difference between an unexpected transport error
and a simple EOF lol. This begins to resolve that by adding our own
special `TransportClosed` error to signal the "graceful" termination of
a channel's underlying transport. Oh, and this builds on the `msgspec`
integration which helped shed light on the core issues here B)
Add a `tractor._ipc.MsgspecStream` type which can be swapped in for
`msgspec` serialization transparently. A small msg-length-prefix framing
is implemented as part of the type and we use
`tricycle.BufferedReceieveStream` to handle buffering logic for the
underlying transport.
Notes:
- had to force cast a few more list -> tuple spots due to no native
`tuple`decode-by-default in `msgspec`: https://github.com/jcrist/msgspec/issues/30
- the framing can be understood by this protobuf walkthrough:
https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers
- `tricycle` becomes a new dependency
Can only really use an encoder currently since there is no streaming api
in `msgspec` as of currently. See jcrist/msgspec#27.
Not sure if any encoding speedups are currently noticeable especially
without any validation going on yet XD.
First experiments toward #196
- drop `shield` input to `MsgStream`
- check for cancel called prior to loading the feeder mem chan
in `Context.open_stream()`
- warn on a timeout when trying to cancel a remote task from
`Context.cancel()`
- drop noop endofchannel handler block
The `collections.deque` takes care of array length truncation of values
for us implicitly but in the future we'll likely want this value exposed
to alternate array implementations. This patch is to provide for that as
well as make `mypy` happy since the `dequeu.maxlen` can also be `None`.
Get rid of all the (requirements for) clones of the underlying
receivable. We can just use a uuid generated key for each instance
(thinking now this can probably just be `id(self)`). I'm fully convinced
now that channel cloning is only a source of confusion and anti-patterns
when we already have nurseries to define resource lifetimes. There is no
benefit in particular when you allocate subscriptions using a context
manager (not sure why `trio.open_memory_channel()` doesn't enforce
this).
Further refinements:
- add a `._closed` state that will error the receiver on reuse
- drop module script section; it's been moved to a real test
- call the "receiver" duck-type stub a new name
This allows for wrapping an existing stream by re-assigning its receive
method to the allocated broadcaster's `.receive()` so as to avoid
expecting any original consumer(s) of the stream to now know about the
broadcaster; this instead mutates the stream to delegate to the new
receive call behind the scenes any time `.subscribe()` is called.
Add a `typing.Protocol` for so called "cloneable channels" until we
decide/figure out a better keying system for each subscription and
mask all undesired typing failures.
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast
receiver around the stream for use by multiple actor-local consumer
tasks. Entering this context manager idempotently mutates the stream's
receive machinery which for now can not be undone. Move `.clone()` to
the receive stream type.
Resolves#204
For every set of broadcast receivers which pull from the same producer,
we need a singleton state for all of,
- subscriptions
- the sender ready event
- the queue
Add a `BroadcastState` dataclass for this and pass it to all
subscriptions. This makes the design much more like the built-in memory
channels which do something very similar with `MemoryChannelState`.
Use a `filter()` on the subs list in the sequence update step, plus some
other commented approaches we can try for speed.
Using the current task as a subscription key fails horribly as soon as
you hand off new subscription receiver to another task you've spawned..
Instead use the underlying ``trio.abc.ReceiveChannel.clone()`` as a key
(so i guess we're assuming cloning is supported by the underlying?)
which makes this all work just like default mem chans. As a bonus, now
we can just close the underlying rx (which may be a clone) on
`.aclose()` and everything should just work in terms of the underlying
channels lifetime (i think?).
Change `.subscribe()` to be async since the receive channel type
interface only expects `.aclose()` and it actually ends up being
nicer for 3.9+ style `async with` parentheses style anyway.
Buncha improvements:
- pass in the queue via constructor
- tracking over all underlying memory channel closure using cloning
- do it like `tokio` and set lagged consumers to the last sequence
before raising
- copy the subs on first receiver wakeup for iteration instead of
iterating the table directly (and being forced to skip the current
tasks sequence increment)
- implement `.aclose()` to close the underlying clone for this task
- make `broadcast_receiver()` just take the recv chan since it doesn't
need anything on the send side.
We're not actually using this but it's for reference if we do end up
needing it.
The std lib's `pdb` internals override SIGINT handling whenever one
enters the debugger repl. Force a handler that kills the tree if SIGINT
is triggered from the root actor, otherwise ignore it since supervised
children should be managed already. This resolves an issue with guest
mode where `pdb` causes SIGINTs to be swallowed resulting in the host
loop never terminating the process tree.
The whole origin was not having an explicit open/close semantic for
streams. We have that now so this internal mechanic isn't needed and
further our streams become more correct by having `.aclose()` be
independent of cancellation.
Finally this makes a cancelled root actor nursery not clobber child
tasks which request and lock the root's tty for the debugger repl.
Using an edge triggered event which is set after all fifo-lock-queued
tasks are complete, we can be sure that no lingering child tasks are
going to get interrupted during pdb use and tty lock acquisition.
Further, even if new tasks do queue up to get the lock, the root will
incrementally send cancel msgs to each sub-actor only once the tty is
not locked by a (set of) child request task(s). Add shielding around all
the critical sections where the child attempts to allocate the lock from
the root such that it won't be disrupted from cancel messages from the
root after the acquire lock transaction has started.
If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
A context is the natural fit (vs. a receive stream) for locking the root
proc's tty usage via it's `.started()` sync point. Simplify the
`_breakpoin()` routine to be a simple async func instead of all this
"returning a coroutine" stuff from before we decided that
`tractor.breakpoint()` must be async. Use `runtime` level for locking
logging making it easier to trace.