First attempt at getting `multiprocessing.shared_memory.ShareableList`
working; we wrap the stdlib type with a readonly attr and a `.key` for
cross-actor lookup. Also, rename all `numpy` specific routines to have
a `ndarray` suffix in the func names.
More or less a verbatim copy-paste minus some edgy variable naming and
internal `piker` module imports. There is a bunch of OHLC related
defaults that need to be dropped and we need to adjust to an optional
dependence on `numpy` by supporting shared lists as per the mp docs.
- `Context._cancel_called_remote` -> `._cancelled_remote` since "called"
implies the cancellation was "requested" when it could be due to
another error and the actor uid is the value - only set once the far
end task scope is terminated due to either error or cancel, which has
nothing to do with *what* caused the cancellation.
- `Actor._cancel_called_remote` -> `._cancel_called_by_remote` which
emphasizes that this variable is **only set** IFF some remote actor
**requested that** this actor's runtime be cancelled via
`Actor.cancel()`.
Turns out you can get a case where you might be opening multiple
ctx-streams concurrently and during the context opening phase you block
for all contexts to open, but then when you eventually start opening
streams some slow to start context has caused the others become in an
overrun state.. so we need to let the caller control whether that's an
error ;)
This also needs a test!
Because obviously we probably want to support `allow_overruns` on the
remote callee side as well XD
Only found the bugs fixed in this patch this thanks to writing a much
more exhaustive test set for overrun cases B)
This actually caught further runtime bugs so it's gud i tried..
Add overrun-ignore enabled / disabled cases and error catching for all
of them. More or less this should cover every possible outcome when
it comes to setting `allow_overruns: bool` i hope XD
This adds remote cancellation semantics to our `tractor.Context`
machinery to more closely match that of `trio.CancelScope` but
with operational differences to handle the nature of parallel tasks interoperating
across multiple memory boundaries:
- if an actor task cancels some context it has opened via
`Context.cancel()`, the remote (scope linked) task will be cancelled
using the normal `CancelScope` semantics of `trio` meaning the remote
cancel scope surrounding the far side task is cancelled and
`trio.Cancelled`s are expected to be raised in that scope as per
normal `trio` operation, and in the case where no error is raised
in that remote scope, a `ContextCancelled` error is raised inside the
runtime machinery and relayed back to the opener/caller side of the
context.
- if any actor task cancels a full remote actor runtime using
`Portal.cancel_actor()` the same semantics as above apply except every
other remote actor task which also has an open context with the actor
which was cancelled will also be sent a `ContextCancelled` **but**
with the `.canceller` field set to the uid of the original cancel
requesting actor.
This changeset also includes a more "proper" solution to the issue of
"allowing overruns" during streaming without attempting to implement any
form of IPC streaming backpressure. Implementing task-granularity
backpressure cross-process turns out to be more or less impossible
without augmenting out streaming protocol (likely at the cost of
performance). Further allowing overruns requires special care since
any blocking of the runtime RPC msg loop task effectively can block
control msgs such as cancels and stream terminations.
The implementation details per abstraction layer are as follows.
._streaming.Context:
- add a new contructor factor func `mk_context()` which provides
a strictly private init-er whilst allowing us to not have to define
an `.__init__()` on the type def.
- add public `.cancel_called` and `.cancel_called_remote` properties.
- general rename of what was the internal `._backpressure` var to
`._allow_overruns: bool`.
- move the old contents of `Actor._push_result()` into a new
`._deliver_msg()` allowing for better encapsulation of per-ctx
msg handling.
- always check for received 'error' msgs and process them with the new
`_maybe_cancel_and_set_remote_error()` **before** any msg delivery to
the local task, thus guaranteeing error and cancellation handling
despite any overflow handling.
- add a new `._drain_overflows()` task-method for use with new
`._allow_overruns: bool = True` mode.
- add back a `._scope_nursery: trio.Nursery` (allocated in
`Portal.open_context()`) who's sole purpose is to spawn a single task
which runs the above method; anything else is an error.
- augment `._deliver_msg()` to start a task and run the above method
when operating in no overrun mode; the task queues overflow msgs and
attempts to send them to the underlying mem chan using a blocking
`.send()` call.
- on context exit, any existing "drainer task" will be cancelled and
remaining overflow queued msgs are discarded with a warning.
- rename `._error` -> `_remote_error` and set it in a new method
`_maybe_cancel_and_set_remote_error()` which is called before
processing
- adjust `.result()` to always call `._maybe_raise_remote_err()` at its
start such that whenever a `ContextCancelled` arrives we do logic for
whether or not to immediately raise that error or ignore it due to the
current actor being the one who requested the cancel, by checking the
error's `.canceller` field.
- set the default value of `._result` to be `id(Context()` thus avoiding
conflict with any `.result()` actually being `False`..
._runtime.Actor:
- augment `.cancel()` and `._cancel_task()` and `.cancel_rpc_tasks()` to
take a `requesting_uid: tuple` indicating the source actor of every
cancellation request.
- pass through the new `Context._allow_overruns` through `.get_context()`
- call the new `Context._deliver_msg()` from `._push_result()` (since
the factoring out that method's contents).
._runtime._invoke:
- `TastStatus.started()` back a `Context` (unless an error is raised)
instead of the cancel scope to make it easy to set/get state on that
context for the purposes of cancellation and remote error relay.
- always raise any remote error via `Context._maybe_raise_remote_err()`
before doing any `ContextCancelled` logic.
- assign any `Context._cancel_called_remote` set by the `requesting_uid`
cancel methods (mentioned above) to the `ContextCancelled.canceller`.
._runtime.process_messages:
- always pass a `requesting_uid: tuple` to `Actor.cancel()` and
`._cancel_task` to that any corresponding `ContextCancelled.canceller`
can be set inside `._invoke()`.
To handle both remote cancellation this adds `ContextCanceled.canceller:
tuple` the uid of the cancel requesting actor and is expected to be set
by the runtime when servicing any remote cancel request. This makes it
possible for `ContextCancelled` receivers to know whether "their actor
runtime" is the source of the cancellation.
Also add an explicit `RemoteActor.src_actor_uid` which better formalizes
the notion of "which remote actor" the error originated from.
Both of these new attrs are expected to be packed in the `.msgdata` when
the errors are loaded locally.
Previously we were leaking our (pdb++) override into the Python runtime
which would always result in a runtime error whenever `breakpoint()` is
called outside our runtime; after exit of the root actor . This
explicitly restores any previous hook override (detected during startup)
or deletes the hook and restores the environment if none existed prior.
Also adds a new WIP debugging example script to ensure breakpointing
works as normal after runtime close; this will be added to the test
suite.
Makes the broadcast test suite not hang xD, and is our expected default
behaviour. Also removes a ton of commented legacy cruft from before the
refactor to remove the `.receive()` recursion and fixes some typing.
Oh right, and in the case where there's only one subscriber left we warn
log about it since in theory we could actually entirely unwind the
bcaster back to the original underlying, though not sure if that's sane
or works for some use cases (like wanting to have some other subscriber
get added dynamically later).
Since one-way streaming can be accomplished by just *not* sending on one
side (and/or thus wrapping such usage in a more restrictive API), we
just drop the recv-only parent type. The only method different was
`MsgStream.send()`, now merged in. Further in usage of `.subscribe()`
we monkey patch the underlying stream's `.send()` onto the delivered
broadcast receiver so that subscriber tasks can two-way stream as though
using the stream directly.
This allows us to more definitively drop `tractor.open_stream_from()` in
the longer run if we so choose as well; note currently this will
potentially create an issue if a caller tries to `.send()` on such a one
way stream.
Driven by a bug found in `piker` where we'd get an inf recursion error
due to `BroadcastReceiver.receive()` being called when consumer tasks
are awoken but no value is ready to `.nowait_receive()`.
This new rework takes an approach closer to the interface and internals
of `trio.MemoryReceiveChannel` particularly in terms of,
- implementing a `BroadcastReceiver.receive_nowait()` and using it
within the async `.receive()`.
- failing over to an internal `._receive_from_underlying()` when the
`_nowait()` call raises `trio.WouldBlock`.
- adding `BroadcastState.statistics()` for debugging and testing
dropping recursion from `.receive()`.
We weren't doing this originally I *think* just because of the path
dependent nature of the way the code was developed (originally being
mega pedantic about one-way vs. bidirectional streams) but, it doesn't
seem like there's any issue just calling the stream's `.aclose()`; also
have the benefit of just being less code and logic checks B)