c72026091e
This adds remote cancellation semantics to our `tractor.Context` machinery to more closely match that of `trio.CancelScope` but with operational differences to handle the nature of parallel tasks interoperating across multiple memory boundaries: - if an actor task cancels some context it has opened via `Context.cancel()`, the remote (scope linked) task will be cancelled using the normal `CancelScope` semantics of `trio` meaning the remote cancel scope surrounding the far side task is cancelled and `trio.Cancelled`s are expected to be raised in that scope as per normal `trio` operation, and in the case where no error is raised in that remote scope, a `ContextCancelled` error is raised inside the runtime machinery and relayed back to the opener/caller side of the context. - if any actor task cancels a full remote actor runtime using `Portal.cancel_actor()` the same semantics as above apply except every other remote actor task which also has an open context with the actor which was cancelled will also be sent a `ContextCancelled` **but** with the `.canceller` field set to the uid of the original cancel requesting actor. This changeset also includes a more "proper" solution to the issue of "allowing overruns" during streaming without attempting to implement any form of IPC streaming backpressure. Implementing task-granularity backpressure cross-process turns out to be more or less impossible without augmenting out streaming protocol (likely at the cost of performance). Further allowing overruns requires special care since any blocking of the runtime RPC msg loop task effectively can block control msgs such as cancels and stream terminations. The implementation details per abstraction layer are as follows. ._streaming.Context: - add a new contructor factor func `mk_context()` which provides a strictly private init-er whilst allowing us to not have to define an `.__init__()` on the type def. - add public `.cancel_called` and `.cancel_called_remote` properties. - general rename of what was the internal `._backpressure` var to `._allow_overruns: bool`. - move the old contents of `Actor._push_result()` into a new `._deliver_msg()` allowing for better encapsulation of per-ctx msg handling. - always check for received 'error' msgs and process them with the new `_maybe_cancel_and_set_remote_error()` **before** any msg delivery to the local task, thus guaranteeing error and cancellation handling despite any overflow handling. - add a new `._drain_overflows()` task-method for use with new `._allow_overruns: bool = True` mode. - add back a `._scope_nursery: trio.Nursery` (allocated in `Portal.open_context()`) who's sole purpose is to spawn a single task which runs the above method; anything else is an error. - augment `._deliver_msg()` to start a task and run the above method when operating in no overrun mode; the task queues overflow msgs and attempts to send them to the underlying mem chan using a blocking `.send()` call. - on context exit, any existing "drainer task" will be cancelled and remaining overflow queued msgs are discarded with a warning. - rename `._error` -> `_remote_error` and set it in a new method `_maybe_cancel_and_set_remote_error()` which is called before processing - adjust `.result()` to always call `._maybe_raise_remote_err()` at its start such that whenever a `ContextCancelled` arrives we do logic for whether or not to immediately raise that error or ignore it due to the current actor being the one who requested the cancel, by checking the error's `.canceller` field. - set the default value of `._result` to be `id(Context()` thus avoiding conflict with any `.result()` actually being `False`.. ._runtime.Actor: - augment `.cancel()` and `._cancel_task()` and `.cancel_rpc_tasks()` to take a `requesting_uid: tuple` indicating the source actor of every cancellation request. - pass through the new `Context._allow_overruns` through `.get_context()` - call the new `Context._deliver_msg()` from `._push_result()` (since the factoring out that method's contents). ._runtime._invoke: - `TastStatus.started()` back a `Context` (unless an error is raised) instead of the cancel scope to make it easy to set/get state on that context for the purposes of cancellation and remote error relay. - always raise any remote error via `Context._maybe_raise_remote_err()` before doing any `ContextCancelled` logic. - assign any `Context._cancel_called_remote` set by the `requesting_uid` cancel methods (mentioned above) to the `ContextCancelled.canceller`. ._runtime.process_messages: - always pass a `requesting_uid: tuple` to `Actor.cancel()` and `._cancel_task` to that any corresponding `ContextCancelled.canceller` can be set inside `._invoke()`. |
||
---|---|---|
.. | ||
experimental | ||
trionics | ||
__init__.py | ||
_child.py | ||
_clustering.py | ||
_debug.py | ||
_discovery.py | ||
_entry.py | ||
_exceptions.py | ||
_forkserver_override.py | ||
_ipc.py | ||
_mp_fixup_main.py | ||
_portal.py | ||
_root.py | ||
_runtime.py | ||
_spawn.py | ||
_state.py | ||
_streaming.py | ||
_supervise.py | ||
log.py | ||
msg.py | ||
to_asyncio.py |