Demonstrates fixed size frame-oriented reads by the child where the
parent only transmits a "read" stream msg on "frame fill events" such
that the child incrementally reads the shm list data (much like in
a real-time-buffered streaming system).
First attempt at getting `multiprocessing.shared_memory.ShareableList`
working; we wrap the stdlib type with a readonly attr and a `.key` for
cross-actor lookup. Also, rename all `numpy` specific routines to have
a `ndarray` suffix in the func names.
More or less a verbatim copy-paste minus some edgy variable naming and
internal `piker` module imports. There is a bunch of OHLC related
defaults that need to be dropped and we need to adjust to an optional
dependence on `numpy` by supporting shared lists as per the mp docs.
- `Context._cancel_called_remote` -> `._cancelled_remote` since "called"
implies the cancellation was "requested" when it could be due to
another error and the actor uid is the value - only set once the far
end task scope is terminated due to either error or cancel, which has
nothing to do with *what* caused the cancellation.
- `Actor._cancel_called_remote` -> `._cancel_called_by_remote` which
emphasizes that this variable is **only set** IFF some remote actor
**requested that** this actor's runtime be cancelled via
`Actor.cancel()`.
Turns out you can get a case where you might be opening multiple
ctx-streams concurrently and during the context opening phase you block
for all contexts to open, but then when you eventually start opening
streams some slow to start context has caused the others become in an
overrun state.. so we need to let the caller control whether that's an
error ;)
This also needs a test!
Because obviously we probably want to support `allow_overruns` on the
remote callee side as well XD
Only found the bugs fixed in this patch this thanks to writing a much
more exhaustive test set for overrun cases B)
This actually caught further runtime bugs so it's gud i tried..
Add overrun-ignore enabled / disabled cases and error catching for all
of them. More or less this should cover every possible outcome when
it comes to setting `allow_overruns: bool` i hope XD
This adds remote cancellation semantics to our `tractor.Context`
machinery to more closely match that of `trio.CancelScope` but
with operational differences to handle the nature of parallel tasks interoperating
across multiple memory boundaries:
- if an actor task cancels some context it has opened via
`Context.cancel()`, the remote (scope linked) task will be cancelled
using the normal `CancelScope` semantics of `trio` meaning the remote
cancel scope surrounding the far side task is cancelled and
`trio.Cancelled`s are expected to be raised in that scope as per
normal `trio` operation, and in the case where no error is raised
in that remote scope, a `ContextCancelled` error is raised inside the
runtime machinery and relayed back to the opener/caller side of the
context.
- if any actor task cancels a full remote actor runtime using
`Portal.cancel_actor()` the same semantics as above apply except every
other remote actor task which also has an open context with the actor
which was cancelled will also be sent a `ContextCancelled` **but**
with the `.canceller` field set to the uid of the original cancel
requesting actor.
This changeset also includes a more "proper" solution to the issue of
"allowing overruns" during streaming without attempting to implement any
form of IPC streaming backpressure. Implementing task-granularity
backpressure cross-process turns out to be more or less impossible
without augmenting out streaming protocol (likely at the cost of
performance). Further allowing overruns requires special care since
any blocking of the runtime RPC msg loop task effectively can block
control msgs such as cancels and stream terminations.
The implementation details per abstraction layer are as follows.
._streaming.Context:
- add a new contructor factor func `mk_context()` which provides
a strictly private init-er whilst allowing us to not have to define
an `.__init__()` on the type def.
- add public `.cancel_called` and `.cancel_called_remote` properties.
- general rename of what was the internal `._backpressure` var to
`._allow_overruns: bool`.
- move the old contents of `Actor._push_result()` into a new
`._deliver_msg()` allowing for better encapsulation of per-ctx
msg handling.
- always check for received 'error' msgs and process them with the new
`_maybe_cancel_and_set_remote_error()` **before** any msg delivery to
the local task, thus guaranteeing error and cancellation handling
despite any overflow handling.
- add a new `._drain_overflows()` task-method for use with new
`._allow_overruns: bool = True` mode.
- add back a `._scope_nursery: trio.Nursery` (allocated in
`Portal.open_context()`) who's sole purpose is to spawn a single task
which runs the above method; anything else is an error.
- augment `._deliver_msg()` to start a task and run the above method
when operating in no overrun mode; the task queues overflow msgs and
attempts to send them to the underlying mem chan using a blocking
`.send()` call.
- on context exit, any existing "drainer task" will be cancelled and
remaining overflow queued msgs are discarded with a warning.
- rename `._error` -> `_remote_error` and set it in a new method
`_maybe_cancel_and_set_remote_error()` which is called before
processing
- adjust `.result()` to always call `._maybe_raise_remote_err()` at its
start such that whenever a `ContextCancelled` arrives we do logic for
whether or not to immediately raise that error or ignore it due to the
current actor being the one who requested the cancel, by checking the
error's `.canceller` field.
- set the default value of `._result` to be `id(Context()` thus avoiding
conflict with any `.result()` actually being `False`..
._runtime.Actor:
- augment `.cancel()` and `._cancel_task()` and `.cancel_rpc_tasks()` to
take a `requesting_uid: tuple` indicating the source actor of every
cancellation request.
- pass through the new `Context._allow_overruns` through `.get_context()`
- call the new `Context._deliver_msg()` from `._push_result()` (since
the factoring out that method's contents).
._runtime._invoke:
- `TastStatus.started()` back a `Context` (unless an error is raised)
instead of the cancel scope to make it easy to set/get state on that
context for the purposes of cancellation and remote error relay.
- always raise any remote error via `Context._maybe_raise_remote_err()`
before doing any `ContextCancelled` logic.
- assign any `Context._cancel_called_remote` set by the `requesting_uid`
cancel methods (mentioned above) to the `ContextCancelled.canceller`.
._runtime.process_messages:
- always pass a `requesting_uid: tuple` to `Actor.cancel()` and
`._cancel_task` to that any corresponding `ContextCancelled.canceller`
can be set inside `._invoke()`.
Turns out stuff was totally broken in these cases because we're either
closing the underlying mem chan too early or not handling the
"allow_overruns" mode's cancellation correctly..
To handle both remote cancellation this adds `ContextCanceled.canceller:
tuple` the uid of the cancel requesting actor and is expected to be set
by the runtime when servicing any remote cancel request. This makes it
possible for `ContextCancelled` receivers to know whether "their actor
runtime" is the source of the cancellation.
Also add an explicit `RemoteActor.src_actor_uid` which better formalizes
the notion of "which remote actor" the error originated from.
Both of these new attrs are expected to be packed in the `.msgdata` when
the errors are loaded locally.
These will verify new changes to the runtime/messaging core which allows
us to adopt an "ignore cancel if requested by us" style handling of
`ContextCancelled` more like how `trio` does with
`trio.Nursery.cancel_scope.cancel()`. We now expect
a `ContextCancelled.canceller: tuple` which is set to the actor uid of
the actor which requested the cancellation which eventually resulted in
the remote error-msg.
Also adds some experimental tweaks to the "backpressure" test which it
turns out is very problematic in coordination with context cancellation
since blocking on the feed mem chan to some task will block the ipc msg
loop and thus handling of cancellation.. More to come to both the test
and core to address this hopefully since right now this test is failing.
Previously we were leaking our (pdb++) override into the Python runtime
which would always result in a runtime error whenever `breakpoint()` is
called outside our runtime; after exit of the root actor . This
explicitly restores any previous hook override (detected during startup)
or deletes the hook and restores the environment if none existed prior.
Also adds a new WIP debugging example script to ensure breakpointing
works as normal after runtime close; this will be added to the test
suite.