Commit Graph

2063 Commits (8040ae6994f2d88cf207928419137454cab0c7fd)

Author SHA1 Message Date
Tyler Goodlet 8040ae6994 Adjust lowlevel-tb hiding logic for `MsgStream`
Such that whenev the `self._ctx.chan._exc is trans_err` we suppress.
I.e. when the `Channel._exc: Exception|None` error **is the same as**
set by the `._rpc.process_messages()` loop (that is, set to the
underlying transport layer error), we suppress the lowlevel tb,
otherwise we deliver the full tb since likely something at the lowlevel
that we aren't detecting changed/signalled/is-relevant!
2025-03-30 22:45:44 -04:00
Tyler Goodlet 08ac89b807 Slight typing and multi-line styling tweaks in `.ipc` sugpkg 2025-03-30 22:42:51 -04:00
Tyler Goodlet e904af679b Add a big boi `Channel.pformat()/__repr__()`
Much like how `Context` has been implemented, try to give tons of high
level details on all the lower level encapsulated primitives, namely the
`.msgstream/.transport` and any useful runtime state.

B)

Impl deats,
- adjust `.from_addr()` to only call `._addr.wrap_address()` when we
  detect `addr` is unwrapped.
- add another `log.runtime()` using the new `.__repr__()` in
  `Channel.from_addr()`.
- change to `UnwrappedAddress` as in prior commits.
2025-03-30 22:35:35 -04:00
Tyler Goodlet 6a5ccc2425 Allocate bind-addrs in subactors
Previously whenever an `ActorNursery.start_actor()` call did not receive
a `bind_addrs` arg we would allocate the default `(localhost, 0)` pairs
in the parent, for UDS this obviously won't work nor is it ideal bc it's
nicer to have the actor to be a socket server (who calls
`Address.open_listener()`) define the socket-file-name containing their
unique ID info such as pid, actor-uuid etc.

As such this moves "random" generation of server addresses to the
child-side of a subactor's spawn-sequence when it's sin-`bind_addrs`;
i.e. we do the allocation of the `Address.get_random()` addrs inside
`._runtime.async_main()` instead of `Portal.start_actor()` and **only
when** `accept_addrs`/`bind_addrs` was **not provided by the spawning
parent**.

Further this patch get's way more rigorous about the `SpawnSpec`
processing in the child inside `Actor._from_parent()` such that we
handle any invalid msgs **very loudly and pedantically!**

Impl deats,
- do the "random addr generation" in an explicit `for` loop (instead of
  prior comprehension) to allow for more detailed typing of the layered
  calls to the new `._addr` mod.
- use a `match:/case:` for process any invalid `SpawnSpec` payload case
  where we can instead receive a `MsgTypeError` from the `chan.recv()`
  call in `Actor._from_parent()` to raise it immediately instead of
  triggering downstream type-errors XD
  |_ as per the big `#TODO` we prolly want to take from other callers
     of `Channel.recv()` (like in the `._rpc.process_messages()` loop).
  |_ always raise `InternalError` on non-match/fall-through case!
  |_ add a note about not being able to use `breakpoint()` in this
     section due to causality of `SpawnSpec._runtime_vars` not having
     been processed yet..
  |_ always return a third element from `._from_rent()` eventually to be
     the `preferred_transports: list[str]` from the spawning rent.
- use new `._addr.mk_uuid()` and pass to new `Actor.__init__(uuid: str)`
  for all actor creation (including in all the mods tweaked here).
- Move to new type-alias-name `UnwrappedAddress` throughout.
2025-03-30 22:01:43 -04:00
Tyler Goodlet 23acd0f4cb Adjust imports to use new `UnwrappedAddress`
For those mods where it's just a type-alias (name) import change.
2025-03-30 21:24:48 -04:00
Tyler Goodlet 2c11d1d44a Implement peer-info tracking for UDS streams
Such that any UDS socket pair is represented (and with the recent
updates to) a `USDAddress` via a similar pair-`tuple[str, int]` as TCP
sockets, a pair of the `.filepath: Path` & the peer proc's `.pid: int`
which we read from the underlying `socket.socket` using
`.set/getsockopt()` calls

Impl deats,
- using the Linux specific APIs, we add a `get_peer_info()` which reads
  the `(pid, uid, gid)` using the `SOL_SOCKET` and `SOL_PEECRED` opts to
  `sock.getsockopt()`.
  |_ this presumes the client has been correspondingly configured to
     deliver the creds via a `sock.setsockopt(SOL_SOCKET, SO_PASSCRED,
     1)` call - this required us to override `trio.open_unix_socket()`.
- override `trio.open_unix_socket()` as per the above bullet to ensure
  connecting peers always transmit "credentials" options info to the
  listener.
- update `.get_stream_addrs()` to always call `get_peer_info()` and
  extract the peer's pid for the `raddr` and use `os.getpid()` for
  `laddr` (obvi).
  |_ as part of the new impl also `log.info()` the creds-info deats and
    socket-file path.
  |_ handle the oddity where it depends which of `.getpeername()` or
    `.getsockname()` will return the file-path; i think it's to do with
    who is client vs. server?

Related refinements,
- set `.layer_key: int = 4` for the "transport layer" ;)
- tweak some typing and multi-line unpacking in `.ipc/_tcp`.
2025-03-30 21:14:12 -04:00
Tyler Goodlet 9de192390a Rework/simplify transport addressing
A few things that can fundamentally change,

- UDS addresses now always encapsulate the local and remote pid such
  that it denotes each side's process much like a TCP *port*.
  |_ `.__init__()` takes a new `maybe_pid: int`.
  |_ this required changes to the `.ipc._uds` backend which will come in
     an subsequent commit!
  |_ `UDSAddress.address_type` becomes a `tuple[str, int]` just like the
      TCP case.
  |_ adjust `wrap_address()` to match.
- use a new `_state.get_rt_dir() -> Path` as the default location for
  UDS socket file: now under `XDG_RUNTIME_DIR'/tractor/` subdir by
  default.
- re-implement `USDAddress.get_random()` to use both the local
  `Actor.uid` (if available) and at least the pid for its socket file
  name.

Removals,
- drop the loop generated `_default_addrs`, simplify to just
  `_default_lo_addrs` for per-transport default registry addresses.
  |_ change to `_address_types: dict[str, Type[Address]]` instead of
     separate types `list`.
  |_ adjust `is_wrapped_addr()` to just check `in _addr_types.values()`.
- comment out `Address.open_stream()` it's unused and i think the wrong
  place for this API.

Renames,
- from `AddressTypes` -> `UnwrappedAddress`, since it's a simple type
  union and all this type set is, is the simple python data-structures
  we encode to for the wire.
  |_ see note about possibly implementing the `.[un]wrap()` stuff as
     `msgspec` codec `enc/dec_hook()`s instead!

Additions,
- add a `mk_uuid()` to be used throughout the runtime including for
  generating the `Aid.uuid` part.
- tons of notes around follow up refinements!
2025-03-30 19:35:02 -04:00
Guillermo Rodriguez efd11f7d74
Trying to make full suite pass with uds 2025-03-27 20:37:52 -03:00
Guillermo Rodriguez 76cee99fc2
Finally switch to using address protocol in all runtime 2025-03-27 20:37:52 -03:00
Guillermo Rodriguez 5f50206d84
Add root and random addr getters on MsgTransport type 2025-03-27 20:37:52 -03:00
Guillermo Rodriguez a47a7a39b1
Starting to make tractor.ipc.Channel work with multiple MsgTransports 2025-03-27 20:37:52 -03:00
Guillermo Rodriguez bab265b2d8
Important RingBuffBytesSender fix on non batched mode! & downgrade nix-shell python to lowest supported 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez 010874bed5
Catch trio cancellation on RingBuffReceiver bg eof listener task, add batched mode to RingBuffBytesSender 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez ea010ab46a
Add direct read method on EventFD
Type hint all ctx managers in _ringbuf.py
Remove unnecesary send lock on ring chan sender
Handle EOF on ring chan receiver
Rename ringbuf tests to make it less redundant
2025-03-27 20:36:46 -03:00
Guillermo Rodriguez be7fc89ae9
Add direct ctx managers for RB channels 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez 2a9a78651b
Improve test_ringbuf test, drop MsgTransport ring buf impl for now in favour of a trio.abc.Channel[bytes] impl, add docstrings 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez be818a720a
Switch `tractor.ipc.MsgTransport.stream` type to `trio.abc.Stream`
Add EOF signaling mechanism
Support proper `receive_some` end of stream semantics
Add StapledStream non-ipc test
Create MsgpackRBStream similar to MsgpackTCPStream for buffered whole-msg reads
Add EventFD.read cancellation on EventFD.close mechanism using cancel scope
Add test for eventfd cancellation
Improve and add docstrings
2025-03-27 20:36:46 -03:00
Guillermo Rodriguez ba353bf46f
Better encapsulate RingBuff ctx managment methods and support non ipc usage
Add trio.StrictFIFOLock on sender.send_all
Support max_bytes argument on receive_some, keep track of write_ptr on receiver
Add max_bytes receive test test_ringbuf_max_bytes
Add docstrings to all ringbuf tests
Remove EFD_NONBLOCK support, not necesary anymore since we can use abandon_on_cancel=True on trio.to_thread.run_sync
Close eventfd's after usage on open_ringbuf
2025-03-27 20:36:46 -03:00
Guillermo Rodriguez 9b2161506f
Break out transport protocol and tcp specifics into their own submodules under tractor.ipc 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez 6b155849b7
Add buf_size to RBToken and add sender cancel test, move disable_mantracker to its own _mp_bs module 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez 59c8c7bfe3
Make ring buf api use pickle-able RBToken 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez 6ac6fd56c0
Address some of fomo\'s comments 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez f799e9ac51
Handle cancelation on EventFD.read 2025-03-27 20:36:46 -03:00
Guillermo Rodriguez 9980bb2bd0
Add module headers and fix spacing on tractor._ipc._linux 2025-03-27 20:36:45 -03:00
Guillermo Rodriguez 8de9ab291e
Move RingBuffSender|Receiver to its own tractor.ipc._ringbuf module 2025-03-27 20:36:45 -03:00
Guillermo Rodriguez 1a83626f26
Move linux specifics from tractor.ipc._shm into tractor.ipc._linux 2025-03-27 20:36:45 -03:00
Guillermo Rodriguez 6b4d08d030
Move tractor._shm to tractor.ipc._shm 2025-03-27 20:36:45 -03:00
Guillermo Rodriguez 7b8b9d6805
move tractor._ipc.py into tractor.ipc._chan.py 2025-03-27 20:36:45 -03:00
Guillermo Rodriguez 5afe0a0264
General improvements
EventFD class now expects the fd to already be init with open_eventfd
RingBuff Sender and Receiver fully manage SharedMemory and EventFD lifecycles, no aditional ctx mngrs needed
Separate ring buf tests into its own test bed
Add parametrization to test and cancellation
Add docstrings
Add simple testing data gen module .samples
2025-03-27 20:36:45 -03:00
Guillermo Rodriguez eeb9a7d61b
IPC ring bug impl with async read 2025-03-27 20:36:45 -03:00
Tyler Goodlet 5cee222353 Updates from latest `piker.data._sharedmem` changes 2025-03-27 17:54:04 -04:00
Tyler Goodlet 8ebb1f09de Pass `str` dtype for `use_str` case 2025-03-27 17:54:04 -04:00
Tyler Goodlet 2683a7f33a Allocate size-specced "empty" sequence from default values by type 2025-03-27 17:54:04 -04:00
Tyler Goodlet 255209f881 Mod define `_USE_POSIX`, add a of of todos 2025-03-27 17:54:04 -04:00
Tyler Goodlet 9a0d529b18 Parametrize rw test with variable frame sizes
Demonstrates fixed size frame-oriented reads by the child where the
parent only transmits a "read" stream msg on "frame fill events" such
that the child incrementally reads the shm list data (much like in
a real-time-buffered streaming system).
2025-03-27 17:54:04 -04:00
Tyler Goodlet 1c441b0986 Add `ShmList` slice support in `.__getitem__()` 2025-03-27 17:54:04 -04:00
Tyler Goodlet afbdb50a30 Rename token type to `NDToken` in the style of `nptyping` 2025-03-27 17:54:04 -04:00
Tyler Goodlet e46033cbe7 Don't require runtime (for now), type annot fixing 2025-03-27 17:54:04 -04:00
Tyler Goodlet c932bb5911 Add repetitive attach to existing segment test 2025-03-27 17:54:04 -04:00
Tyler Goodlet 33482d8f41 Add initial readers-writer shm list tests 2025-03-27 17:54:04 -04:00
Tyler Goodlet 7ae194baed Add `ShmList` wrapping the stdlib's `ShareableList`
First attempt at getting `multiprocessing.shared_memory.ShareableList`
working; we wrap the stdlib type with a readonly attr and a `.key` for
cross-actor lookup. Also, rename all `numpy` specific routines to have
a `ndarray` suffix in the func names.
2025-03-27 17:54:04 -04:00
Tyler Goodlet ef7ca49e9b Initial module import from `piker.data._sharemem`
More or less a verbatim copy-paste minus some edgy variable naming and
internal `piker` module imports. There is a bunch of OHLC related
defaults that need to be dropped and we need to adjust to an optional
dependence on `numpy` by supporting shared lists as per the mp docs.
2025-03-27 17:54:04 -04:00
Tyler Goodlet fde681fa19 Merge pull request 'Extension types support via msgspec.Encoder/Decoder hooks' (#19) from ext_type_plds into main
Reviewed(-ish)-on: #19
2025-03-27 17:43:43 -04:00
Tyler Goodlet efcf81bcad Add `.runtime()`-emit to `._invoke()` to report final result msg in the child 2025-03-27 15:58:03 -04:00
Tyler Goodlet 3988ea69f5 Add `MsgStream._stop_msg` use new `PldRx` API
In particular ensuring we use `ctx._pld_rx.recv_msg_nowait()` from
`.receive_nowait()` (which is called from `.aclose()`) such that we
ALWAYS (can) set the surrounding `Context._result/._outcome_msg` attrs
on reception of a final `Return`!!

This fixes a final stream-teardown-race-condition-bug where prior we
normally didn't set the `Context._result/._outcome_msg` in such cases.
This is **precisely because**  `.receive_nowait()` only returns the
`pld` and when called from `.aclose()` this value is discarded, meaning
so is its boxing `Return` despite consuming it from the underlying
`._rx_chan`..

Longer term this should be solved differently by ensuring such races
cases are handled at a higher scope like inside `Context._deliver_msg()`
or the `Portal.open_context()` enter/exit blocks? Add a detailed warning
note and todos for all this around the special case block!
2025-03-27 15:58:03 -04:00
Tyler Goodlet 8bd4490cad Add `Context._outcome_msg` use new `PldRx` API
Such that any `Return` is always capture for each ctx instance and set
in `._deliver_msg()` normally; ensures we can at least introspect for it
when missing (like in a recently discovered stream teardown race bug).
Yes this augments the already existing `._result` which is dedicated for
the `._outcome_msg.pld` in the non-error case; we might want to see if
there's a nicer way to directly proxy ref to that without getting the
pre-pld-decoded `Raw` form with `msgspec`?

Also use the new `ctx._pld_rx.recv_msg()` and drop assigning
`pld_rx._ctx`.
2025-03-27 15:58:03 -04:00
Tyler Goodlet 622f840dfd Slight `PldRx` rework to simplify
Namely renaming and tweaking the `MsgType` receiving methods,
- `.recv_msg()` from what was `.recv_msg_w_pld()` which both receives
  the IPC msg from the underlying `._rx_chan` and then decodes its
  payload with `.decode_pld()`; it now also log reports on the different
  "stage of SC dialog protocol" msg types via a `match/case`.
- a new `.recv_msg_nowait()` sync equivalent of ^ (*was*
  `.recv_pld_nowait()`) who's use was the source of a recently
  discovered bug where any final `Return.pld` is being
  consumed-n-discarded by by `MsgStream.aclose()` depending on
  ctx/stream teardown race conditions..

Also,
- remove all the "instance persistent" ipc-ctx attrs, specifically the
  optional `_ipc`, `_ctx` and the `.wraps_ipc()` cm, since none of them
  were ever really needed/used; all methods which require
  a `Context/MsgStream` are explicitly always passed.
- update a buncha typing namely to use the more generic-styled
  `PayloadT` over `Any` and obviously `MsgType[PayloadT]`.
2025-03-27 15:58:03 -04:00
Tyler Goodlet 8ba315e60c Rename ext-types with `msgspec` suite module 2025-03-27 15:58:03 -04:00
Tyler Goodlet 80f20b35b1 Complete rename to parent->child IPC ctx peers
Now changed in all comments docs **and** test-code content such that we
aren't using the "caller"->"callee" semantics anymore.
2025-03-27 15:58:02 -04:00
Tyler Goodlet 9ec37dd13f Fix msg-draining on `parent_never_opened_stream`!
Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case
block we weren't guarding against the `ctx._stream is None` edge case
which should be treated a `continue`-draining (not a `break` or
attr-error!!) situation since the peer task maybe be continuing to send
`Yield` but has not yet sent an outcome msg (one of
`Return/Error/ContextCancelled`) to terminate the loop. Ensure we
explicitly warn about this case as well as `.cancel()` emit on a taskc.

Thanks again to @guille for discovering this!

Also add temporary `.info()`s around rxed `Return` msgs as part of
trying to debug a different bug discovered while updating the
context-semantics test suite (in a prior commit).
2025-03-27 15:58:02 -04:00