Since we'd like to eventually allow a diverse set of transport
(protocol) methods and stacks, and a multi-peer discovery system for
distributed actor-tree applications, this reworks all runtime internals
to support multi-homing for any given tree on a logical host. In other
words any actor can now bind its transport server (currently only
unsecured TCP + `msgspec`) to more then one address available in its
(linux) network namespace. Further, registry actors (now dubbed
"registars" instead of "arbiters") can also similarly bind to multiple
network addresses and provide discovery services to remote actors via
multiple addresses which can now be provided at runtime startup.
Deats:
- adjust `._runtime` internals to use a `list[tuple[str, int]]` (and
thus pluralized) socket address sequence where applicable for transport
server socket binds, now exposed via `Actor.accept_addrs`:
- `Actor.__init__()` now takes a `registry_addrs: list`.
- `Actor.is_arbiter` -> `.is_registrar`.
- `._arb_addr` -> `._reg_addrs: list[tuple]`.
- always reg and de-reg from all registrars in `async_main()`.
- only set the global runtime var `'_root_mailbox'` to the loopback
address since normally all in-tree processes should have access to
it, right?
- `._serve_forever()` task now takes `listen_sockaddrs: list[tuple]`
- make `open_root_actor()` take a `registry_addrs: list[tuple[str, int]]`
and defaults when not passed.
- change `ActorNursery.start_..()` methods take `bind_addrs: list` and
pass down through the spawning layer(s) via the parent-seed-msg.
- generalize all `._discovery()` APIs to accept `registry_addrs`-like
inputs and move all relevant subsystems to adopt the "registry" style
naming instead of "arbiter":
- make `find_actor()` support batched concurrent portal queries over
all provided input addresses using `.trionics.gather_contexts()` Bo
- syntax: move to using `async with <tuples>` 3.9+ style chained
@acms.
- a general modernization of the code to a python 3.9+ style.
- start deprecation and change to "registry" naming / semantics:
- `._discovery.get_arbiter()` -> `.get_registry()`
We were using a `all(<yielded values>)` condition which obviously won't
work if the batched managers yield any non-truthy value. So instead see
the `unwrapped: dict` with the `id(mngrs)` and only unblock once all
values have been filled in to be something that is not that value.
Detect if the input ref is a non-func (like an `object` instance) in
which case grab its type name using `type()`. Wrap all the name-getting
into a new `_mk_fqpn()` static meth: gets the "fully qualified path
name" and returns path and name in tuple; port other methds to use it.
Refine and update the docs B)
For whatever reason pdb(p), and in general, will show the frame of the
*next* python instruction/LOC on initial entry (at least using
`.set_trace()`), as such remove the `try/finally` block in the sync
code entrypoint `.pause_from_sync()`, and also since doesn't seem like
we really need it anyway.
Further, and to this end:
- enable hidden frames support in our default config.
- fix/drop/mask all the frame ref-ing/mangling we had prior since it's no
longer needed as well as manual `Lock` releasing which seems to work
already by having the `greenback` spawned task do it's normal thing?
- move to no `Union` type annots.
- hide all frames that can add "this is the runtime confusion" to
traces.
This works now for supporting a new `tractor.pause_from_sync()`
`tractor`-aware-replacement for `Pdb.set_trace()` from sync functions
which are also scheduled from our runtime. Uses `greenback` to do all
the magic of scheduling the bg `tractor._debug._pause()` task and
engaging the normal TTY locking machinery triggered by `await
tractor.breakpoint()`
Further this starts some public API renaming, making a switch to
`tractor.pause()` from `.breakpoint()` which IMO much better expresses
the semantics of the runtime intervention required to suffice
multi-process "breakpointing"; it also is an alternate name for the same
in computer science more generally: https://en.wikipedia.org/wiki/Breakpoint
It also avoids using the same name as the `breakpoint()` built-in which
is important since there **is alot more going on** when you call our
equivalent API.
Deats of that:
- add deprecation warning for `tractor.breakpoint()`
- add `tractor.pause()` and a shorthand, easier-to-type, alias `.pp()`
for "pause-point" B)
- add `pause_from_sync()` as the new `breakpoint()`-from-sync-function
hack which does all the `greenback` stuff for the user.
Still TODO:
- figure out where in the runtime and when to call
`greenback.ensure_portal()`.
- fix the frame selection issue where
`trio._core._ki._ki_protection_decorator:wrapper` seems to be always
shown on REPL start as the selected frame..
Only found this by luck more or less (while working on something in
a client project) and it turns out we can actually get to (yet another)
hang state where SIGINT will be ignored by the root actor on teardown..
I've added all the necessary logic flags to reproduce. We obviously need
a follow up bug issue and a test suite to replicate!
It appears as though the following are required based on very light
tinkering:
- infected asyncio mode active
- debug mode active
- the `trio` context must breakpoint *before* `.started()`-ing
- the `asyncio` must **not** error
Demonstrates fixed size frame-oriented reads by the child where the
parent only transmits a "read" stream msg on "frame fill events" such
that the child incrementally reads the shm list data (much like in
a real-time-buffered streaming system).
First attempt at getting `multiprocessing.shared_memory.ShareableList`
working; we wrap the stdlib type with a readonly attr and a `.key` for
cross-actor lookup. Also, rename all `numpy` specific routines to have
a `ndarray` suffix in the func names.
More or less a verbatim copy-paste minus some edgy variable naming and
internal `piker` module imports. There is a bunch of OHLC related
defaults that need to be dropped and we need to adjust to an optional
dependence on `numpy` by supporting shared lists as per the mp docs.
- `Context._cancel_called_remote` -> `._cancelled_remote` since "called"
implies the cancellation was "requested" when it could be due to
another error and the actor uid is the value - only set once the far
end task scope is terminated due to either error or cancel, which has
nothing to do with *what* caused the cancellation.
- `Actor._cancel_called_remote` -> `._cancel_called_by_remote` which
emphasizes that this variable is **only set** IFF some remote actor
**requested that** this actor's runtime be cancelled via
`Actor.cancel()`.
Turns out you can get a case where you might be opening multiple
ctx-streams concurrently and during the context opening phase you block
for all contexts to open, but then when you eventually start opening
streams some slow to start context has caused the others become in an
overrun state.. so we need to let the caller control whether that's an
error ;)
This also needs a test!
Because obviously we probably want to support `allow_overruns` on the
remote callee side as well XD
Only found the bugs fixed in this patch this thanks to writing a much
more exhaustive test set for overrun cases B)
This actually caught further runtime bugs so it's gud i tried..
Add overrun-ignore enabled / disabled cases and error catching for all
of them. More or less this should cover every possible outcome when
it comes to setting `allow_overruns: bool` i hope XD
This adds remote cancellation semantics to our `tractor.Context`
machinery to more closely match that of `trio.CancelScope` but
with operational differences to handle the nature of parallel tasks interoperating
across multiple memory boundaries:
- if an actor task cancels some context it has opened via
`Context.cancel()`, the remote (scope linked) task will be cancelled
using the normal `CancelScope` semantics of `trio` meaning the remote
cancel scope surrounding the far side task is cancelled and
`trio.Cancelled`s are expected to be raised in that scope as per
normal `trio` operation, and in the case where no error is raised
in that remote scope, a `ContextCancelled` error is raised inside the
runtime machinery and relayed back to the opener/caller side of the
context.
- if any actor task cancels a full remote actor runtime using
`Portal.cancel_actor()` the same semantics as above apply except every
other remote actor task which also has an open context with the actor
which was cancelled will also be sent a `ContextCancelled` **but**
with the `.canceller` field set to the uid of the original cancel
requesting actor.
This changeset also includes a more "proper" solution to the issue of
"allowing overruns" during streaming without attempting to implement any
form of IPC streaming backpressure. Implementing task-granularity
backpressure cross-process turns out to be more or less impossible
without augmenting out streaming protocol (likely at the cost of
performance). Further allowing overruns requires special care since
any blocking of the runtime RPC msg loop task effectively can block
control msgs such as cancels and stream terminations.
The implementation details per abstraction layer are as follows.
._streaming.Context:
- add a new contructor factor func `mk_context()` which provides
a strictly private init-er whilst allowing us to not have to define
an `.__init__()` on the type def.
- add public `.cancel_called` and `.cancel_called_remote` properties.
- general rename of what was the internal `._backpressure` var to
`._allow_overruns: bool`.
- move the old contents of `Actor._push_result()` into a new
`._deliver_msg()` allowing for better encapsulation of per-ctx
msg handling.
- always check for received 'error' msgs and process them with the new
`_maybe_cancel_and_set_remote_error()` **before** any msg delivery to
the local task, thus guaranteeing error and cancellation handling
despite any overflow handling.
- add a new `._drain_overflows()` task-method for use with new
`._allow_overruns: bool = True` mode.
- add back a `._scope_nursery: trio.Nursery` (allocated in
`Portal.open_context()`) who's sole purpose is to spawn a single task
which runs the above method; anything else is an error.
- augment `._deliver_msg()` to start a task and run the above method
when operating in no overrun mode; the task queues overflow msgs and
attempts to send them to the underlying mem chan using a blocking
`.send()` call.
- on context exit, any existing "drainer task" will be cancelled and
remaining overflow queued msgs are discarded with a warning.
- rename `._error` -> `_remote_error` and set it in a new method
`_maybe_cancel_and_set_remote_error()` which is called before
processing
- adjust `.result()` to always call `._maybe_raise_remote_err()` at its
start such that whenever a `ContextCancelled` arrives we do logic for
whether or not to immediately raise that error or ignore it due to the
current actor being the one who requested the cancel, by checking the
error's `.canceller` field.
- set the default value of `._result` to be `id(Context()` thus avoiding
conflict with any `.result()` actually being `False`..
._runtime.Actor:
- augment `.cancel()` and `._cancel_task()` and `.cancel_rpc_tasks()` to
take a `requesting_uid: tuple` indicating the source actor of every
cancellation request.
- pass through the new `Context._allow_overruns` through `.get_context()`
- call the new `Context._deliver_msg()` from `._push_result()` (since
the factoring out that method's contents).
._runtime._invoke:
- `TastStatus.started()` back a `Context` (unless an error is raised)
instead of the cancel scope to make it easy to set/get state on that
context for the purposes of cancellation and remote error relay.
- always raise any remote error via `Context._maybe_raise_remote_err()`
before doing any `ContextCancelled` logic.
- assign any `Context._cancel_called_remote` set by the `requesting_uid`
cancel methods (mentioned above) to the `ContextCancelled.canceller`.
._runtime.process_messages:
- always pass a `requesting_uid: tuple` to `Actor.cancel()` and
`._cancel_task` to that any corresponding `ContextCancelled.canceller`
can be set inside `._invoke()`.
Turns out stuff was totally broken in these cases because we're either
closing the underlying mem chan too early or not handling the
"allow_overruns" mode's cancellation correctly..
To handle both remote cancellation this adds `ContextCanceled.canceller:
tuple` the uid of the cancel requesting actor and is expected to be set
by the runtime when servicing any remote cancel request. This makes it
possible for `ContextCancelled` receivers to know whether "their actor
runtime" is the source of the cancellation.
Also add an explicit `RemoteActor.src_actor_uid` which better formalizes
the notion of "which remote actor" the error originated from.
Both of these new attrs are expected to be packed in the `.msgdata` when
the errors are loaded locally.
These will verify new changes to the runtime/messaging core which allows
us to adopt an "ignore cancel if requested by us" style handling of
`ContextCancelled` more like how `trio` does with
`trio.Nursery.cancel_scope.cancel()`. We now expect
a `ContextCancelled.canceller: tuple` which is set to the actor uid of
the actor which requested the cancellation which eventually resulted in
the remote error-msg.
Also adds some experimental tweaks to the "backpressure" test which it
turns out is very problematic in coordination with context cancellation
since blocking on the feed mem chan to some task will block the ipc msg
loop and thus handling of cancellation.. More to come to both the test
and core to address this hopefully since right now this test is failing.