Compare commits

...

59 Commits

Author SHA1 Message Date
Gud Boi e89fe03da7 Fix `LinkedTaskChannel` docstrings from GH bot review
Address valid findings from copilot's PR #413 review
(https://github.com/goodboy/tractor/pull/413
 #pullrequestreview-3925876037):

- `.get()` docstring referenced non-existent
  `._from_trio` attr, correct to `._to_aio`.
- `.send()` docstring falsely claimed error-raising
  on missing `from_trio` arg; reword to describe the
  actual `.put_nowait()` enqueue behaviour.
- `.open_channel_from()` return type annotation had
  `tuple[LinkedTaskChannel, Any]` but `yield` order
  is `(first, chan)`; fix annotation + docstring to
  match actual `tuple[Any, LinkedTaskChannel]`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-13 19:28:57 -04:00
Gud Boi 417b796169 Use `chan: LinkedTaskChannel` API in all aio-task fns
Convert every remaining `to_trio`/`from_trio` fn-sig style
to the new unified `chan: LinkedTaskChannel` iface added in
prior commit (c46e9ee8).

Deats,
- `to_trio.send_nowait(val)` (1st call) -> `chan.started_nowait(val)`
- `to_trio.send_nowait(val)` (subsequent) -> `chan.send_nowait(val)`
- `await from_trio.get()` -> `await chan.get()`

Converted fns,
- `sleep_and_err()`, `push_from_aio_task()` in
  `tests/test_infected_asyncio.py`
- `sync_and_err()` in `tests/test_root_infect_asyncio.py`
- `aio_streamer()` in
  `tests/test_child_manages_service_nursery.py`
- `aio_echo_server()` in
  `examples/infected_asyncio_echo_server.py`
- `bp_then_error()` in `examples/debugging/asyncio_bp.py`

Also,
- drop stale comments referencing old param names.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-13 19:28:57 -04:00
Gud Boi 36cbc07602 Tried out an alt approach for `.to_asyncio` crashes
This change is masked out now BUT i'm leaving it in for reference.

I was debugging a multi-actor fault where the primary source actor was
an infected-aio-subactor (`brokerd.ib`) and it seemed like the REPL was only
entering on the `trio` side (at a `.open_channel_from()`) and not
eventually breaking in the `asyncio.Task`. But, since (changing
something?) it seems to be working now, it's just that the `trio` side
seems to sometimes handle before the (source/causing and more
child-ish) `asyncio`-task, which is a bit odd and not expected..
We could likely refine (maybe with an inter-loop-task REPL lock?) this
at some point and ensure a child-`asyncio` task which errors always
grabs the REPL **first**?

Lowlevel deats/further-todos,
- add (masked) `maybe_open_crash_handler()` block around
  `asyncio.Task` execution with notes about weird parent-addr
  delivery bug in `test_sync_pause_from_aio_task`
  * yeah dunno what that's about but made a bug; seems to be IPC
    serialization of the `TCPAddress` struct somewhere??
- add inter-loop lock TODO for avoiding aio-task clobbering
  trio-tasks when both crash in debug-mode

Also,
- change import from `tractor.devx.debug` to `tractor.devx`
- adjust `get_logger()` call to use new implicit mod-name detection
  added to `.log.get_logger()`, i.e. sin `name=__name__`.
- some teensie refinements to `open_channel_from()`:
  * swap return type annotation for  to `tuple[LinkedTaskChannel, Any]`
    (was `Any`).
  * update doc-string to clarify started-value delivery
  * add err-log before `.pause()` in what should be an unreachable path.
  * add todo to swap the `(first, chan)` pair to match that of ctx..

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])

[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-13 19:28:57 -04:00
Tyler Goodlet 1f2fad22ee Extend `.to_asyncio.LinkedTaskChannel` for aio side
With methods to comms similar to those that exist for the `trio` side,
- `.get()` which proxies verbatim to the `._to_aio: asyncio.Queue`,
- `.send_nowait()` which thin-wraps to `._to_trio: trio.MemorySendChannel`.

Obviously the more correct design is to break up the channel type into
a pair of handle types, one for each "side's" task in each event-loop,
that's hopefully coming shortly in a follow up patch B)

Also,
- fill in some missing doc strings, tweak some explanation comments and
  update todos.
- adjust the `test_aio_errors_and_channel_propagates_and_closes()` suite
  to use the new `chan` fn-sig-API with `.open_channel_from()` including
  the new methods for msg comms; ensures everything added here works e2e.
2026-03-13 19:28:57 -04:00
Tyler Goodlet ca5f6f50a8 Explain the `infect_asyncio: bool` param to pass in RTE msg 2026-03-13 19:28:57 -04:00
Bd a7ff1387c7
Merge pull request #414 from goodboy/struct_field_filtering
Hide private fields in `Struct.pformat()` output
2026-03-13 19:22:22 -04:00
Gud Boi abbb4a79c8 Drop unused import noticed by `copilot` 2026-03-13 11:52:18 -04:00
Gud Boi 1529095c32 Add `tests/msg/` sub-pkg, audit `pformat()` filtering
Reorganize existing msg-related test suites under
a new `tests/msg/` subdir (matching `tests/devx/`
and `tests/ipc/` convention) and add unit tests for
the `_`-prefixed field filtering in `pformat()`.

Deats,
- `git mv` `test_ext_types_msgspec` and `test_pldrx_limiting` into
  `tests/msg/`.
- add `__init__.py` + `conftest.py` for the new test sub-pkg.
- add new `test_pretty_struct.py` suite with 8 unit tests:
  - parametrized field visibility (public shown, `_`-private hidden,
    mixed)
  - direct `iter_struct_ppfmt_lines()` assertion
  - nested struct recursion filtering
  - empty struct edge case
  - real `MsgDec` via `mk_dec()` hiding `_dec`
  - `repr()` integration via `Struct.__repr__()`

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-12 18:32:31 -04:00
Gud Boi 8215a7ba34 Hide private fields in `Struct.pformat()` output
Skip fields starting with `_` in pretty-printed struct output
to avoid cluttering displays with internal/private state (and/or accessing
private properties which have errors Bp).

Deats,
- add `if k[0] == '_': continue` check to skip private fields
- change nested `if isinstance(v, Struct)` to `elif` since we
  now have early-continue for private fields
- mv `else:` comment to clarify it handles top-level fields
- fix indentation of `yield` statement to only output
  non-private, non-nested fields

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-10 00:33:32 -04:00
Bd c1c4d85958
Merge pull request #406 from goodboy/macos_support
macOS support (for the fancy peeps with nice furniture)
2026-03-10 00:28:54 -04:00
Gud Boi 88b084802f Merge `testing-macos` into unified `testing` matrix
Drop the separate `testing-macos` job and add
`macos-latest` to the existing OS matrix; bump
timeout to 16 min to accommodate macOS runs.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-09 23:28:58 -04:00
Gud Boi bf1dcea9d1 Announce macOS support in `pyproject` and README
- add `"Operating System :: MacOS"` classifier.
- add macOS bullet to README's TODO/status section.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-09 23:23:58 -04:00
Bd 5c270b89d5
Merge pull request #342 from goodboy/macos_in_ci
Macos in ci
2026-03-09 20:33:38 -04:00
Gud Boi 6ee0149e8d Another cancellation test timeout bump for non-linux 2026-03-09 19:46:42 -04:00
Gud Boi 9c4cd869fb OK-FINE, skip streaming docs example on macos!
It seems something is up with their VM-img or wtv bc i keep increasing
the subproc timeout and nothing is changing. Since i can't try
a `-xlarge` one without paying i'm just muting this test for now.
2026-03-09 19:46:42 -04:00
Gud Boi afd66ce3b7 Final try, drop logging level in streaming example to see if macos can cope.. 2026-03-09 19:46:42 -04:00
Gud Boi f9bdb1b35d Try one more timeout bumps for flaky docs streaming ex.. 2026-03-09 19:46:42 -04:00
Gud Boi d135ce94af Restyle `test_legacy_one_way_streaming` mod
- convert all doc-strings to `'''` multiline style.
- rename `nursery` -> `an`, `n` -> `tn` to match
  project-wide conventions.
- add type annotations to fn params (fixtures, test
  helpers).
- break long lines into multiline style for fn calls,
  assertions, and `parametrize` decorator lists.
- add `ids=` to `@pytest.mark.parametrize`.
- use `'` over `"` for string literals.
- add `from typing import Callable` import.
- drop spurious blank lines inside generators.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-09 19:46:25 -04:00
Gud Boi fb94aa0095 Tidy a typing-typo, add explicit `ids=` for paramed suites 2026-03-09 19:35:47 -04:00
Gud Boi b71e8575e5 Skip a couple more `ctlc` flaking suites 2026-03-09 19:31:16 -04:00
Gud Boi bbc028e84c Increase macos job timeout to 16s 2026-03-09 19:31:16 -04:00
Gud Boi 016306adf5 Allow `ctlcs_bish(<condition-args>)` skipping
Via ensuring `all(mark.args)` on wtv expressions are arg-passed to the
mark decorator; use it to skip the `test_subactor_breakpoint` suite when
`ctlc=True` since it seems too unreliable in CI.
2026-03-09 19:31:15 -04:00
Gud Boi 712c009790 Hike `testdir.spawn()` timeout on non-linux in CI 2026-03-09 19:30:41 -04:00
Gud Boi 79396b4a26 2x the ctl-c loop prompt-timeout for non-linux in CI 2026-03-09 19:30:41 -04:00
Gud Boi 5b2905b702 Xplatform tweaks for `daemon` fixture
There's a very sloppy registrar-actor-bootup syncing approach used in
this fixture (basically just guessing how long to sleep to wait for it
to init and bind the registry socket) using a `global _PROC_SPAWN_WAIT`
that needs to be made more reliable. But, for now i'm just playing along
with what's there to try and make less CI runs flaky by,

- sleeping *another* 1s when run from non-linux CI.
- reporting stdout (if any) alongside stderr on teardown.
- not strictly requiring a `proc.returncode == -2` indicating successful
  graceful cancellation via SIGINT; instead we now error-log and only
  raise the RTE on `< 0` exit code.
  * though i can't think of why this would happen other then an
    underlying crash which should propagate.. but i don't think any test
    suite does this intentionally rn?
  * though i don't think it should ever happen, having a CI run
    "error"-fail bc of this isn't all that illuminating, if there is
    some weird `.returncode == 0` termination case it's likely not
    a failure?

For later, see the new todo list; we should sync to some kind of "ping"
polling of the tpt address if possible which is already easy enough for
TCP reusing an internal closure from `._root.open_root_actor()`.
2026-03-09 19:30:41 -04:00
Gud Boi 776af3fce6 Register our `ctlcs_bish` marker to avoid warnings 2026-03-09 19:30:41 -04:00
Gud Boi 4639685770 Fill out types in `test_discovery` mod 2026-03-09 19:30:41 -04:00
Gud Boi 98a7d69341 Always pre-sleep in `daemon` fixture when in non-linux CI.. 2026-03-09 19:30:41 -04:00
Gud Boi ab6c955949 Lol fine! bump it a bit more XD 2026-03-09 19:30:41 -04:00
Gud Boi a72bb9321e Bleh, bump timeout again for docs-exs suite when in CI 2026-03-09 19:30:41 -04:00
Gud Boi 0e2949ea59 Bump docs-exs subproc timeout, exception log any timeouts 2026-03-09 19:30:41 -04:00
Gud Boi fb73935dbc Add a `test_log` fixture for emitting from *within* test bodies or fixtures 2026-03-09 19:30:41 -04:00
Gud Boi 94dfeb1441 Add delay before root-actor open, macos in CI.. 2026-03-09 19:30:41 -04:00
Gud Boi 9c1bcb23af Skip legacy-one-way suites on non-linux in CI 2026-03-09 19:30:41 -04:00
Gud Boi a1ea373f34 Ok.. try a longer prompt timeout? 2026-03-09 19:30:41 -04:00
Gud Boi e8f3d64e71 Increase prompt timeout for macos in CI 2026-03-09 19:30:41 -04:00
Gud Boi b30faaca82 Adjust debugger test suites for macos
Namely, after trying to get `test_multi_daemon_subactors` to work for
the `ctlc=True` case (for way too long), give up on that (see
todo/comments) and skip it; the normal case works just fine. Also tweak
the `test_ctxep_pauses_n_maybe_ipc_breaks` pattern matching for
non-`'UDS'` per the previous script commit; we can't use UDS alongside
`pytest`'s tmp dir generation, mega lulz.
2026-03-09 19:30:40 -04:00
Gud Boi 51701fc8dc Ok just skip `test_shield_pause` for macos..
Something something the SIGINT handler isn't being swapped correctly?
2026-03-09 19:29:47 -04:00
Gud Boi 7b89204afd Tweak `do_ctlc()`'s `delay` default
To be a null default and set to `0.1` when not passed by the caller so
as to avoid having to pass `0.1` if you wanted the
param-defined-default.

Also,
- in the `spawn()` fixtures's `unset_colors()` closure, add in a masked
  `os.environ['NO_COLOR'] = '1'` since i found it while trying to debug
  debugger tests.
- always return the `child.before` content from `assert_before()`
  helper; again it comes in handy when debugging console matching tests.
2026-03-09 19:29:18 -04:00
Gud Boi 82d02ef404 Lul, never use `'uds'` tpt for macos test-scripts
It's explained in the comment and i really think it's getting more
hilarious the more i learn about the arbitrary limitations of user space
with this tina platform.
2026-03-09 19:29:18 -04:00
Gud Boi b7546fd221 Longer timeout for `test_one_end_stream_not_opened`
On non-linux that is.
2026-03-09 19:29:18 -04:00
Gud Boi 86c95539ca Loosen shml test assert for key shortening on macos 2026-03-09 19:29:18 -04:00
Gud Boi 706a4b761b Add 6sec timeout around `test_simple_rpc` suite for macos 2026-03-09 19:29:18 -04:00
Gud Boi c5af2fa778 Add a `@no_macos` skipif deco 2026-03-09 19:29:18 -04:00
Gud Boi 86489cc453 Use py version in job `name`, consider macos in linux matrix? 2026-03-09 19:29:18 -04:00
Gud Boi 2631fb4ff3 Only run CI on <py3.14 2026-03-09 19:29:18 -04:00
Gud Boi aee86f2544 Run macos job on `uv` and newer `actions@v4` 2026-03-09 19:29:18 -04:00
Tyler Goodlet 83c8a8ad78 Add macos run using only the `trio` spawner 2026-03-09 19:29:18 -04:00
Gud Boi daae196048 Warn if `.ipc._uds.get_peer_pid()` returns null 2026-03-08 19:17:16 -04:00
Gud Boi 70efcb09a0 Slight refinements to `._state.get_rt_dir()`
Per the `copilot` review,
https://github.com/goodboy/tractor/pull/406#pullrequestreview-3893270953

now we also,
- pass `exists_ok=True` to `.mkdir()` to avoid conc races.
- expose `appname: str` param for caller override.
- normalize `subdir` to avoid escaping the base rt-dir location.
2026-03-08 19:17:16 -04:00
Gud Boi a7e74acdff Doc `getsockopt()` args (for macOS)
Per the questionable `copilot` review which is detailed for follow up in
https://github.com/goodboy/tractor/issues/418. These constants are
directly linked from the kernel sources fwiw.
2026-03-08 19:17:16 -04:00
Gud Boi 9c3d3bcec1 Add prompt flush hack for `bash` on macos as well.. 2026-03-08 19:17:16 -04:00
Gud Boi 521fb97fe9 Support UDS on macos (for realz)
Though it was a good (vibed) try by @dnks, the previous "fix" was not
actually adding unix socket support but merely sidestepping a crash due
to `get_peer_info()`'s impl never going to work on MacOS (and it was
never intended to).

This patch instead solves the underlying issue by implementing a new
`get_peer_pid()` helper which does in fact retrieve the peer's PID in
a more generic/cross-platform way (:fingers_crossed:); much thanks to
the linked SO answer for this solution!

Impl deats,
- add `get_peer_pid()` and call it from
  `MsgpackUDSStream.get_stream_addrs()` when we detect a non-'linux'
  platform, OW use the original soln: `get_stream_addrs()`.
- add a new case for the `match (peername, sockname)` with a
  `case (str(), str()):` which seems to at least work on macos.
- drop all the `LOCAL_PEERCRED` dynamic import branching since it was
  never needed and was never going to work.
2026-03-08 19:17:16 -04:00
Gud Boi d8a3969048 Also shorten shm-key for `ShmList` on macos
Same problem as for the `ShmArray` tokens, so tweak and reuse
the `_shorten_key_for_macos()` helper and call it from
`open_shm_list()` similarly.

Some tweaks/updates to the various helpers,
- support `prefix/suffix` inputs and if provided take their lengths and
  subtract them from the known *macOS shm_open() has a 31 char limit
  (PSHMNAMLEN)* when generating and using the `hashlib.sha256()` value
  which overrides (for now..) wtv `key` is passed by the caller.
- pass the appropriate `suffix='_first/_last'` values for the `ShmArray`
  token generators case.
- add a `prefix: str = 'shml_'` param to `open_shm_list()`.
- better log formatting with `!r` to report any key shortening.
2026-03-08 19:17:16 -04:00
Gud Boi 01c0db651a Port macOS shm 31-char name limit hack from `piker`
Adapt the `PSHMNAMLEN` fix from `piker.data._sharedmem` (orig commit
96fb79ec thx @dnks!) to `tractor.ipc._shm` accounting for the
module-local differences:

- Add `hashlib` import for sha256 key hashing
- Add `key: str|None` field to `NDToken` for storing
  the original descriptive key separate from the
  (possibly shortened) OS-level `shm_name`
- Add `__eq__()`/`__hash__()` to `NDToken` excluding
  the `key` field from identity comparison
- Add `_shorten_key_for_macos()` using `t_` prefix
  (vs piker's `p_`) with 16 hex chars of sha256
- Use `platform.system() == 'Darwin'` in `_make_token()`
  (tractor already imports the `platform` module vs
  piker's `sys.platform`)
- Wrap `shm_unlink()` in `ShmArray.destroy()` with
  `try/except FileNotFoundError` for teardown races
  (was already done in `SharedInt.destroy()`)
- Move token creation before `SharedMemory()` alloc in
  `open_shm_ndarray()` so `token.shm_name` is used
  as the OS-level name
- Use `lookup_key` pattern in `attach_shm_ndarray()`
  to decouple `_known_tokens` dict key from OS name

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-08 19:17:16 -04:00
Gud Boi 7bcd7aca2b Reorg `socket` conditional imports a bit
Move the multi-platorm-supporting conditional/dynamic `socket` constant
imports to *after* the main cross-platform ones.
Also add constant typing and reformat comments a bit for the macOS case.
2026-03-08 19:17:16 -04:00
Gud Boi 920d0043b4 Force parent subdirs for macos 2026-03-08 19:17:16 -04:00
wygud 93b9a6cd97 Add macOS compatibility for Unix socket credential passing
Make socket credential imports platform-conditional in `.ipc._uds`.
- Linux: use `SO_PASSCRED`/`SO_PEERCRED` from socket module
- macOS: use `LOCAL_PEERCRED` (0x0001) instead, no need for `SO_PASSCRED`
- Conditionally call `setsockopt(SO_PASSCRED)` only on Linux

Fixes AttributeError on macOS where SO_PASSCRED doesn't exist.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-03-08 19:17:16 -04:00
Tyler Goodlet e7cefba67f Use `platformdirs` for `.config.get_rt_dir()`
Thanks to the `tox`-dev community for such a lovely pkg which seems to
solves all the current cross-platform user-dir problems B)

Also this,
- now passes `platformdirs.user_runtime_dir(appname='tractor')`
  and allows caller to pass an optional `subdir` under `tractor/`
  if desired.
- drops the `.config._rtdir: Path` mod var.
- bumps the lock file with the new dep.
2026-03-08 19:16:49 -04:00
35 changed files with 1296 additions and 309 deletions

View File

@ -74,16 +74,22 @@ jobs:
# run: mypy tractor/ --ignore-missing-imports --show-traceback
testing-linux:
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
timeout-minutes: 10
testing:
name: '${{ matrix.os }} Python${{ matrix.python-version }} - spawn_backend=${{ matrix.spawn_backend }}'
timeout-minutes: 16
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
python-version: ['3.13']
os: [
ubuntu-latest,
macos-latest,
]
python-version: [
'3.13',
# '3.14',
]
spawn_backend: [
'trio',
# 'mp_spawn',
@ -91,7 +97,6 @@ jobs:
]
steps:
- uses: actions/checkout@v4
- name: 'Install uv + py-${{ matrix.python-version }}'

View File

@ -641,13 +641,15 @@ Help us push toward the future of distributed `Python`.
- Typed capability-based (dialog) protocols ( see `#196
<https://github.com/goodboy/tractor/issues/196>`_ with draft work
started in `#311 <https://github.com/goodboy/tractor/pull/311>`_)
- We **recently disabled CI-testing on windows** and need help getting
it running again! (see `#327
<https://github.com/goodboy/tractor/pull/327>`_). **We do have windows
support** (and have for quite a while) but since no active hacker
exists in the user-base to help test on that OS, for now we're not
actively maintaining testing due to the added hassle and general
latency..
- **macOS is now officially supported** and tested in CI
alongside Linux!
- We **recently disabled CI-testing on windows** and need
help getting it running again! (see `#327
<https://github.com/goodboy/tractor/pull/327>`_). **We do
have windows support** (and have for quite a while) but
since no active hacker exists in the user-base to help
test on that OS, for now we're not actively maintaining
testing due to the added hassle and general latency..
Feel like saying hi?

View File

@ -18,15 +18,14 @@ async def aio_sleep_forever():
async def bp_then_error(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
chan: to_asyncio.LinkedTaskChannel,
raise_after_bp: bool = True,
) -> None:
# sync with `trio`-side (caller) task
to_trio.send_nowait('start')
chan.started_nowait('start')
# NOTE: what happens here inside the hook needs some refinement..
# => seems like it's still `.debug._set_trace()` but

View File

@ -3,6 +3,7 @@ Verify we can dump a `stackscope` tree on a hang.
'''
import os
import platform
import signal
import trio
@ -31,13 +32,26 @@ async def main(
from_test: bool = False,
) -> None:
if platform.system() != 'Darwin':
tpt = 'uds'
else:
# XXX, precisely we can't use pytest's tmp-path generation
# for tests.. apparently because:
#
# > The OSError: AF_UNIX path too long in macOS Python occurs
# > because the path to the Unix domain socket exceeds the
# > operating system's maximum path length limit (around 104
#
# WHICH IS just, wtf hillarious XD
tpt = 'tcp'
async with (
tractor.open_nursery(
debug_mode=True,
enable_stack_on_sig=True,
# maybe_enable_greenback=False,
loglevel='devx',
enable_transports=['uds'],
enable_transports=[tpt],
) as an,
):
ptl: tractor.Portal = await an.start_actor(

View File

@ -1,3 +1,5 @@
import platform
import tractor
import trio
@ -34,9 +36,22 @@ async def just_bp(
async def main():
if platform.system() != 'Darwin':
tpt = 'uds'
else:
# XXX, precisely we can't use pytest's tmp-path generation
# for tests.. apparently because:
#
# > The OSError: AF_UNIX path too long in macOS Python occurs
# > because the path to the Unix domain socket exceeds the
# > operating system's maximum path length limit (around 104
#
# WHICH IS just, wtf hillarious XD
tpt = 'tcp'
async with tractor.open_nursery(
debug_mode=True,
enable_transports=['uds'],
enable_transports=[tpt],
loglevel='devx',
) as n:
p = await n.start_actor(

View File

@ -90,7 +90,7 @@ async def main() -> list[int]:
# yes, a nursery which spawns `trio`-"actors" B)
an: ActorNursery
async with tractor.open_nursery(
loglevel='cancel',
loglevel='error',
# debug_mode=True,
) as an:
@ -118,8 +118,10 @@ async def main() -> list[int]:
cancelled: bool = await portal.cancel_actor()
assert cancelled
print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
print(
f"STREAM TIME = {time.time() - start}\n"
f"STREAM + SPAWN TIME = {time.time() - pre_start}\n"
)
assert result_stream == list(range(seed))
return result_stream

View File

@ -11,21 +11,17 @@ import tractor
async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
chan: tractor.to_asyncio.LinkedTaskChannel,
) -> None:
# a first message must be sent **from** this ``asyncio``
# task or the ``trio`` side will never unblock from
# ``tractor.to_asyncio.open_channel_from():``
to_trio.send_nowait('start')
chan.started_nowait('start')
# XXX: this uses an ``from_trio: asyncio.Queue`` currently but we
# should probably offer something better.
while True:
# echo the msg back
to_trio.send_nowait(await from_trio.get())
chan.send_nowait(await chan.get())
await asyncio.sleep(0)

View File

@ -24,6 +24,7 @@ keywords = [
classifiers = [
"Development Status :: 3 - Alpha",
"Operating System :: POSIX :: Linux",
"Operating System :: MacOS",
"Framework :: Trio",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Programming Language :: Python :: Implementation :: CPython",
@ -48,6 +49,7 @@ dependencies = [
"msgspec>=0.19.0",
"cffi>=1.17.1",
"bidict>=0.23.1",
"platformdirs>=4.4.0",
]
# ------ project ------

View File

@ -11,6 +11,7 @@ import platform
import time
import pytest
import tractor
from tractor._testing import (
examples_dir as examples_dir,
tractor_test as tractor_test,
@ -22,6 +23,7 @@ pytest_plugins: list[str] = [
'tractor._testing.pytest',
]
_non_linux: bool = platform.system() != 'Linux'
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
if platform.system() == 'Windows':
@ -44,6 +46,10 @@ no_windows = pytest.mark.skipif(
platform.system() == "Windows",
reason="Test is unsupported on windows",
)
no_macos = pytest.mark.skipif(
platform.system() == "Darwin",
reason="Test is unsupported on MacOS",
)
def pytest_addoption(
@ -61,7 +67,7 @@ def pytest_addoption(
@pytest.fixture(scope='session', autouse=True)
def loglevel(request):
def loglevel(request) -> str:
import tractor
orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
@ -69,11 +75,46 @@ def loglevel(request):
level=level,
name='tractor', # <- enable root logger
)
log.info(f'Test-harness logging level: {level}\n')
log.info(
f'Test-harness set runtime loglevel: {level!r}\n'
)
yield level
tractor.log._default_loglevel = orig
@pytest.fixture(scope='function')
def test_log(
request,
loglevel: str,
) -> tractor.log.StackLevelAdapter:
'''
Deliver a per test-module-fn logger instance for reporting from
within actual test bodies/fixtures.
For example this can be handy to report certain error cases from
exception handlers using `test_log.exception()`.
'''
modname: str = request.function.__module__
log = tractor.log.get_logger(
name=modname, # <- enable root logger
# pkg_name='tests',
)
_log = tractor.log.get_console_log(
level=loglevel,
logger=log,
name=modname,
# pkg_name='tests',
)
_log.debug(
f'In-test-logging requested\n'
f'test_log.name: {log.name!r}\n'
f'level: {loglevel!r}\n'
)
yield _log
_ci_env: bool = os.environ.get('CI', False)
@ -110,6 +151,7 @@ def daemon(
testdir: pytest.Pytester,
reg_addr: tuple[str, int],
tpt_proto: str,
ci_env: bool,
) -> subprocess.Popen:
'''
@ -147,13 +189,25 @@ def daemon(
**kwargs,
)
# TODO! we should poll for the registry socket-bind to take place
# and only once that's done yield to the requester!
# -[ ] TCP: use the `._root.open_root_actor()`::`ping_tpt_socket()`
# closure!
# -[ ] UDS: can we do something similar for 'pinging" the
# file-socket?
#
global _PROC_SPAWN_WAIT
# UDS sockets are **really** fast to bind()/listen()/connect()
# so it's often required that we delay a bit more starting
# the first actor-tree..
if tpt_proto == 'uds':
global _PROC_SPAWN_WAIT
_PROC_SPAWN_WAIT = 0.6
if _non_linux and ci_env:
_PROC_SPAWN_WAIT += 1
# XXX, allow time for the sub-py-proc to boot up.
# !TODO, see ping-polling ideas above!
time.sleep(_PROC_SPAWN_WAIT)
assert not proc.returncode
@ -163,18 +217,30 @@ def daemon(
# XXX! yeah.. just be reaaal careful with this bc sometimes it
# can lock up on the `_io.BufferedReader` and hang..
stderr: str = proc.stderr.read().decode()
if stderr:
stdout: str = proc.stdout.read().decode()
if (
stderr
or
stdout
):
print(
f'Daemon actor tree produced STDERR:\n'
f'Daemon actor tree produced output:\n'
f'{proc.args}\n'
f'\n'
f'{stderr}\n'
f'stderr: {stderr!r}\n'
f'stdout: {stdout!r}\n'
)
if proc.returncode != -2:
raise RuntimeError(
'Daemon actor tree failed !?\n'
f'{proc.args}\n'
if (rc := proc.returncode) != -2:
msg: str = (
f'Daemon actor tree was not cancelled !?\n'
f'proc.args: {proc.args!r}\n'
f'proc.returncode: {rc!r}\n'
)
if rc < 0:
raise RuntimeError(msg)
log.error(msg)
# @pytest.fixture(autouse=True)

View File

@ -3,8 +3,9 @@
'''
from __future__ import annotations
import time
import platform
import signal
import time
from typing import (
Callable,
TYPE_CHECKING,
@ -33,6 +34,17 @@ if TYPE_CHECKING:
from pexpect import pty_spawn
_non_linux: bool = platform.system() != 'Linux'
def pytest_configure(config):
# register custom marks to avoid warnings see,
# https://docs.pytest.org/en/stable/how-to/writing_plugins.html#registering-custom-markers
config.addinivalue_line(
'markers',
'ctlcs_bish: test will (likely) not behave under SIGINT..'
)
# a fn that sub-instantiates a `pexpect.spawn()`
# and returns it.
type PexpectSpawner = Callable[
@ -68,7 +80,10 @@ def spawn(
'''
import os
# disable colored tbs
os.environ['PYTHON_COLORS'] = '0'
# disable all ANSI color output
# os.environ['NO_COLOR'] = '1'
spawned: PexpectSpawner|None = None
@ -83,7 +98,10 @@ def spawn(
cmd,
**mkcmd_kwargs,
),
expect_timeout=3,
expect_timeout=(
10 if _non_linux and _ci_env
else 3
),
# preexec_fn=unset_colors,
# ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff?
@ -146,6 +164,8 @@ def ctlc(
mark.name == 'ctlcs_bish'
and
use_ctlc
and
all(mark.args)
):
pytest.skip(
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
@ -251,12 +271,13 @@ def assert_before(
err_on_false=True,
**kwargs
)
return str(child.before.decode())
def do_ctlc(
child,
count: int = 3,
delay: float = 0.1,
delay: float|None = None,
patt: str|None = None,
# expect repl UX to reprint the prompt after every
@ -268,6 +289,7 @@ def do_ctlc(
) -> str|None:
before: str|None = None
delay = delay or 0.1
# make sure ctl-c sends don't do anything but repeat output
for _ in range(count):
@ -278,7 +300,10 @@ def do_ctlc(
# if you run this test manually it works just fine..
if expect_prompt:
time.sleep(delay)
child.expect(PROMPT)
child.expect(
PROMPT,
timeout=(child.timeout * 2) if _ci_env else child.timeout,
)
before = str(child.before.decode())
time.sleep(delay)

View File

@ -37,6 +37,9 @@ from .conftest import (
in_prompt_msg,
assert_before,
)
from ..conftest import (
_ci_env,
)
if TYPE_CHECKING:
from ..conftest import PexpectSpawner
@ -51,13 +54,14 @@ if TYPE_CHECKING:
# - recurrent root errors
_non_linux: bool = platform.system() != 'Linux'
if platform.system() == 'Windows':
pytest.skip(
'Debugger tests have no windows support (yet)',
allow_module_level=True,
)
# TODO: was trying to this xfail style but some weird bug i see in CI
# that's happening at collect time.. pretty soon gonna dump actions i'm
# thinkin...
@ -193,6 +197,11 @@ def test_root_actor_bp_forever(
child.expect(EOF)
# skip on non-Linux CI
@pytest.mark.ctlcs_bish(
_non_linux,
_ci_env,
)
@pytest.mark.parametrize(
'do_next',
(True, False),
@ -258,6 +267,11 @@ def test_subactor_error(
child.expect(EOF)
# skip on non-Linux CI
@pytest.mark.ctlcs_bish(
_non_linux,
_ci_env,
)
def test_subactor_breakpoint(
spawn,
ctlc: bool,
@ -480,8 +494,24 @@ def test_multi_daemon_subactors(
stream.
'''
child = spawn('multi_daemon_subactors')
non_linux = _non_linux
if non_linux and ctlc:
pytest.skip(
'Ctl-c + MacOS is too unreliable/racy for this test..\n'
)
# !TODO, if someone with more patience then i wants to muck
# with the timings on this please feel free to see all the
# `non_linux` branching logic i added on my first attempt
# below!
#
# my conclusion was that if i were to run the script
# manually, and thus as slowly as a human would, the test
# would and should pass as described in this test fn, however
# after fighting with it for >= 1hr. i decided more then
# likely the more extensive `linux` testing should cover most
# regressions.
child = spawn('multi_daemon_subactors')
child.expect(PROMPT)
# there can be a race for which subactor will acquire
@ -511,8 +541,19 @@ def test_multi_daemon_subactors(
else:
raise ValueError('Neither log msg was found !?')
non_linux_delay: float = 0.3
if ctlc:
do_ctlc(child)
do_ctlc(
child,
delay=(
non_linux_delay
if non_linux
else None
),
)
if non_linux:
time.sleep(1)
# NOTE: previously since we did not have clobber prevention
# in the root actor this final resume could result in the debugger
@ -543,33 +584,66 @@ def test_multi_daemon_subactors(
# assert "in use by child ('bp_forever'," in before
if ctlc:
do_ctlc(child)
do_ctlc(
child,
delay=(
non_linux_delay
if non_linux
else None
),
)
if non_linux:
time.sleep(1)
# expect another breakpoint actor entry
child.sendline('c')
child.expect(PROMPT)
try:
assert_before(
before: str = assert_before(
child,
bp_forev_parts,
)
except AssertionError:
assert_before(
before: str = assert_before(
child,
name_error_parts,
)
else:
if ctlc:
do_ctlc(child)
before: str = do_ctlc(
child,
delay=(
non_linux_delay
if non_linux
else None
),
)
if non_linux:
time.sleep(1)
# should crash with the 2nd name error (simulates
# a retry) and then the root eventually (boxed) errors
# after 1 or more further bp actor entries.
child.sendline('c')
child.expect(PROMPT)
try:
child.expect(
PROMPT,
timeout=3,
)
except EOF:
before: str = child.before.decode()
print(
f'\n'
f'??? NEVER RXED `pdb` PROMPT ???\n'
f'\n'
f'{before}\n'
)
raise
assert_before(
child,
name_error_parts,
@ -689,7 +763,8 @@ def test_multi_subactors_root_errors(
@has_nested_actors
def test_multi_nested_subactors_error_through_nurseries(
spawn,
ci_env: bool,
spawn: PexpectSpawner,
# TODO: address debugger issue for nested tree:
# https://github.com/goodboy/tractor/issues/320
@ -712,7 +787,16 @@ def test_multi_nested_subactors_error_through_nurseries(
for send_char in itertools.cycle(['c', 'q']):
try:
child.expect(PROMPT)
child.expect(
PROMPT,
timeout=(
6 if (
_non_linux
and
ci_env
) else -1
),
)
child.sendline(send_char)
time.sleep(0.01)
@ -889,6 +973,11 @@ def test_different_debug_mode_per_actor(
)
# skip on non-Linux CI
@pytest.mark.ctlcs_bish(
_non_linux,
_ci_env,
)
def test_post_mortem_api(
spawn,
ctlc: bool,
@ -1133,14 +1222,20 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
# closed so verify we see error reporting as well as
# a failed crash-REPL request msg and can CTL-c our way
# out.
# ?TODO, match depending on `tpt_proto(s)`?
# - [ ] how can we pass it into the script tho?
tpt: str = 'UDS'
if _non_linux:
tpt: str = 'TCP'
assert_before(
child,
['peer IPC channel closed abruptly?',
'another task closed this fd',
'Debug lock request was CANCELLED?',
"'MsgpackUDSStream' was already closed locally?",
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
# ?TODO^? match depending on `tpt_proto(s)`?
f"'Msgpack{tpt}Stream' was already closed locally?",
f"TransportClosed: 'Msgpack{tpt}Stream' was already closed 'by peer'?",
]
# XXX races on whether these show/hit?

View File

@ -31,6 +31,9 @@ from .conftest import (
PROMPT,
_pause_msg,
)
from ..conftest import (
no_macos,
)
import pytest
from pexpect.exceptions import (
@ -42,6 +45,7 @@ if TYPE_CHECKING:
from ..conftest import PexpectSpawner
@no_macos
def test_shield_pause(
spawn: PexpectSpawner,
):
@ -57,6 +61,7 @@ def test_shield_pause(
expect(
child,
'Yo my child hanging..?',
timeout=3,
)
assert_before(
child,

View File

@ -0,0 +1,4 @@
'''
`tractor.msg.*` sub-sys test suite.
'''

View File

@ -0,0 +1,4 @@
'''
`tractor.msg.*` test sub-pkg conf.
'''

View File

@ -0,0 +1,240 @@
'''
Unit tests for `tractor.msg.pretty_struct`
private-field filtering in `pformat()`.
'''
import pytest
from tractor.msg.pretty_struct import (
Struct,
pformat,
iter_struct_ppfmt_lines,
)
from tractor.msg._codec import (
MsgDec,
mk_dec,
)
# ------ test struct definitions ------ #
class PublicOnly(Struct):
'''
All-public fields for baseline testing.
'''
name: str = 'alice'
age: int = 30
class PrivateOnly(Struct):
'''
Only underscore-prefixed (private) fields.
'''
_secret: str = 'hidden'
_internal: int = 99
class MixedFields(Struct):
'''
Mix of public and private fields.
'''
name: str = 'bob'
_hidden: int = 42
value: float = 3.14
_meta: str = 'internal'
class Inner(
Struct,
frozen=True,
):
'''
Frozen inner struct with a private field,
for nesting tests.
'''
x: int = 1
_secret: str = 'nope'
class Outer(Struct):
'''
Outer struct nesting an `Inner`.
'''
label: str = 'outer'
inner: Inner = Inner()
class EmptyStruct(Struct):
'''
Struct with zero fields.
'''
pass
# ------ tests ------ #
@pytest.mark.parametrize(
'struct_and_expected',
[
(
PublicOnly(),
{
'shown': ['name', 'age'],
'hidden': [],
},
),
(
MixedFields(),
{
'shown': ['name', 'value'],
'hidden': ['_hidden', '_meta'],
},
),
(
PrivateOnly(),
{
'shown': [],
'hidden': ['_secret', '_internal'],
},
),
],
ids=[
'all-public',
'mixed-pub-priv',
'all-private',
],
)
def test_field_visibility_in_pformat(
struct_and_expected: tuple[
Struct,
dict[str, list[str]],
],
):
'''
Verify `pformat()` shows public fields
and hides `_`-prefixed private fields.
'''
(
struct,
expected,
) = struct_and_expected
output: str = pformat(struct)
for field_name in expected['shown']:
assert field_name in output, (
f'{field_name!r} should appear in:\n'
f'{output}'
)
for field_name in expected['hidden']:
assert field_name not in output, (
f'{field_name!r} should NOT appear in:\n'
f'{output}'
)
def test_iter_ppfmt_lines_skips_private():
'''
Directly verify `iter_struct_ppfmt_lines()`
never yields tuples with `_`-prefixed field
names.
'''
struct = MixedFields()
lines: list[tuple[str, str]] = list(
iter_struct_ppfmt_lines(
struct,
field_indent=2,
)
)
# should have lines for public fields only
assert len(lines) == 2
for _prefix, line_content in lines:
field_name: str = (
line_content.split(':')[0].strip()
)
assert not field_name.startswith('_'), (
f'private field leaked: {field_name!r}'
)
def test_nested_struct_filters_inner_private():
'''
Verify that nested struct's private fields
are also filtered out during recursion.
'''
outer = Outer()
output: str = pformat(outer)
# outer's public field
assert 'label' in output
# inner's public field (recursed into)
assert 'x' in output
# inner's private field must be hidden
assert '_secret' not in output
def test_empty_struct_pformat():
'''
An empty struct should produce a valid
`pformat()` result with no field lines.
'''
output: str = pformat(EmptyStruct())
assert 'EmptyStruct(' in output
assert output.rstrip().endswith(')')
# no field lines => only struct header+footer
lines: list[tuple[str, str]] = list(
iter_struct_ppfmt_lines(
EmptyStruct(),
field_indent=2,
)
)
assert lines == []
def test_real_msgdec_pformat_hides_private():
'''
Verify `pformat()` on a real `MsgDec`
hides the `_dec` internal field.
NOTE: `MsgDec.__repr__` is custom and does
NOT call `pformat()`, so we call it directly.
'''
dec: MsgDec = mk_dec(spec=int)
output: str = pformat(dec)
# the private `_dec` field should be filtered
assert '_dec' not in output
# but the struct type name should be present
assert 'MsgDec(' in output
def test_pformat_repr_integration():
'''
Verify that `Struct.__repr__()` (which calls
`pformat()`) also hides private fields for
custom structs that do NOT override `__repr__`.
'''
mixed = MixedFields()
output: str = repr(mixed)
assert 'name' in output
assert 'value' in output
assert '_hidden' not in output
assert '_meta' not in output

View File

@ -1,7 +1,12 @@
"""
Bidirectional streaming.
'''
Audit the simplest inter-actor bidirectional (streaming)
msg patterns.
"""
'''
from __future__ import annotations
from typing import (
Callable,
)
import pytest
import trio
import tractor
@ -9,10 +14,8 @@ import tractor
@tractor.context
async def simple_rpc(
ctx: tractor.Context,
data: int,
) -> None:
'''
Test a small ping-pong server.
@ -39,15 +42,13 @@ async def simple_rpc(
@tractor.context
async def simple_rpc_with_forloop(
ctx: tractor.Context,
data: int,
) -> None:
"""Same as previous test but using ``async for`` syntax/api.
"""
'''
Same as previous test but using `async for` syntax/api.
'''
# signal to parent that we're up
await ctx.started(data + 1)
@ -68,62 +69,78 @@ async def simple_rpc_with_forloop(
@pytest.mark.parametrize(
'use_async_for',
[True, False],
[
True,
False,
],
ids='use_async_for={}'.format,
)
@pytest.mark.parametrize(
'server_func',
[simple_rpc, simple_rpc_with_forloop],
[
simple_rpc,
simple_rpc_with_forloop,
],
ids='server_func={}'.format,
)
def test_simple_rpc(server_func, use_async_for):
def test_simple_rpc(
server_func: Callable,
use_async_for: bool,
loglevel: str,
debug_mode: bool,
):
'''
The simplest request response pattern.
'''
async def main():
async with tractor.open_nursery() as n:
with trio.fail_after(6):
async with tractor.open_nursery(
loglevel=loglevel,
debug_mode=debug_mode,
) as an:
portal: tractor.Portal = await an.start_actor(
'rpc_server',
enable_modules=[__name__],
)
portal = await n.start_actor(
'rpc_server',
enable_modules=[__name__],
)
async with portal.open_context(
server_func, # taken from pytest parameterization
data=10,
) as (ctx, sent):
async with portal.open_context(
server_func, # taken from pytest parameterization
data=10,
) as (ctx, sent):
assert sent == 11
assert sent == 11
async with ctx.open_stream() as stream:
async with ctx.open_stream() as stream:
if use_async_for:
if use_async_for:
count = 0
# receive msgs using async for style
print('ping')
await stream.send('ping')
async for msg in stream:
assert msg == 'pong'
count = 0
# receive msgs using async for style
print('ping')
await stream.send('ping')
count += 1
if count >= 9:
break
async for msg in stream:
assert msg == 'pong'
print('ping')
await stream.send('ping')
count += 1
else:
# classic send/receive style
for _ in range(10):
if count >= 9:
break
print('ping')
await stream.send('ping')
assert await stream.receive() == 'pong'
else:
# classic send/receive style
for _ in range(10):
# stream should terminate here
print('ping')
await stream.send('ping')
assert await stream.receive() == 'pong'
# final context result(s) should be consumed here in __aexit__()
# stream should terminate here
await portal.cancel_actor()
# final context result(s) should be consumed here in __aexit__()
await portal.cancel_actor()
trio.run(main)

View File

@ -17,8 +17,8 @@ from tractor._testing import (
from .conftest import no_windows
def is_win():
return platform.system() == 'Windows'
_non_linux: bool = platform.system() != 'Linux'
_friggin_windows: bool = platform.system() == 'Windows'
async def assert_err(delay=0):
@ -431,7 +431,7 @@ async def test_nested_multierrors(loglevel, start_method):
for subexc in err.exceptions:
# verify first level actor errors are wrapped as remote
if is_win():
if _friggin_windows:
# windows is often too slow and cancellation seems
# to happen before an actor is spawned
@ -464,7 +464,7 @@ async def test_nested_multierrors(loglevel, start_method):
# XXX not sure what's up with this..
# on windows sometimes spawning is just too slow and
# we get back the (sent) cancel signal instead
if is_win():
if _friggin_windows:
if isinstance(subexc, tractor.RemoteActorError):
assert subexc.boxed_type in (
BaseExceptionGroup,
@ -507,17 +507,22 @@ def test_cancel_via_SIGINT(
@no_windows
def test_cancel_via_SIGINT_other_task(
loglevel,
start_method,
spawn_backend,
loglevel: str,
start_method: str,
spawn_backend: str,
):
"""Ensure that a control-C (SIGINT) signal cancels both the parent
and child processes in trionic fashion even a subprocess is started
from a seperate ``trio`` child task.
"""
pid = os.getpid()
timeout: float = 2
if is_win(): # smh
'''
Ensure that a control-C (SIGINT) signal cancels both the parent
and child processes in trionic fashion even a subprocess is
started from a seperate ``trio`` child task.
'''
pid: int = os.getpid()
timeout: float = (
4 if _non_linux
else 2
)
if _friggin_windows: # smh
timeout += 1
async def spawn_and_sleep_forever(
@ -696,7 +701,7 @@ def test_fast_graceful_cancel_when_spawn_task_in_soft_proc_wait_for_daemon(
kbi_delay = 0.5
timeout: float = 2.9
if is_win(): # smh
if _friggin_windows: # smh
timeout += 1
async def main():

View File

@ -18,16 +18,15 @@ from tractor import RemoteActorError
async def aio_streamer(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
chan: tractor.to_asyncio.LinkedTaskChannel,
) -> trio.abc.ReceiveChannel:
# required first msg to sync caller
to_trio.send_nowait(None)
chan.started_nowait(None)
from itertools import cycle
for i in cycle(range(10)):
to_trio.send_nowait(i)
chan.send_nowait(i)
await asyncio.sleep(0.01)

View File

@ -9,6 +9,7 @@ from itertools import count
import math
import platform
from pprint import pformat
import sys
from typing import (
Callable,
)
@ -941,6 +942,11 @@ def test_one_end_stream_not_opened(
from tractor._runtime import Actor
buf_size = buf_size_increase + Actor.msg_buffer_size
timeout: float = (
1 if sys.platform == 'linux'
else 3
)
async def main():
async with tractor.open_nursery(
debug_mode=debug_mode,
@ -950,7 +956,7 @@ def test_one_end_stream_not_opened(
enable_modules=[__name__],
)
with trio.fail_after(1):
with trio.fail_after(timeout):
async with portal.open_context(
entrypoint,
) as (ctx, sent):

View File

@ -1,11 +1,13 @@
"""
Actor "discovery" testing
Discovery subsys.
"""
import os
import signal
import platform
from functools import partial
import itertools
from typing import Callable
import psutil
import pytest
@ -17,7 +19,9 @@ import trio
@tractor_test
async def test_reg_then_unreg(reg_addr):
async def test_reg_then_unreg(
reg_addr: tuple,
):
actor = tractor.current_actor()
assert actor.is_arbiter
assert len(actor._registry) == 1 # only self is registered
@ -82,11 +86,15 @@ async def say_hello_use_wait(
@tractor_test
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
@pytest.mark.parametrize(
'func',
[say_hello,
say_hello_use_wait]
)
async def test_trynamic_trio(
func,
start_method,
reg_addr,
func: Callable,
start_method: str,
reg_addr: tuple,
):
'''
Root actor acting as the "director" and running one-shot-task-actors
@ -119,7 +127,10 @@ async def stream_forever():
await trio.sleep(0.01)
async def cancel(use_signal, delay=0):
async def cancel(
use_signal: bool,
delay: float = 0,
):
# hold on there sally
await trio.sleep(delay)
@ -132,13 +143,15 @@ async def cancel(use_signal, delay=0):
raise KeyboardInterrupt
async def stream_from(portal):
async def stream_from(portal: tractor.Portal):
async with portal.open_stream_from(stream_forever) as stream:
async for value in stream:
print(value)
async def unpack_reg(actor_or_portal):
async def unpack_reg(
actor_or_portal: tractor.Portal|tractor.Actor,
):
'''
Get and unpack a "registry" RPC request from the "arbiter" registry
system.
@ -173,7 +186,9 @@ async def spawn_and_check_registry(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
):
async with tractor.get_registry(reg_addr) as portal:
async with tractor.get_registry(
addr=reg_addr,
) as portal:
# runtime needs to be up to call this
actor = tractor.current_actor()
@ -246,10 +261,10 @@ async def spawn_and_check_registry(
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel(
debug_mode: bool,
start_method,
use_signal,
reg_addr,
with_streaming,
start_method: str,
use_signal: bool,
reg_addr: tuple,
with_streaming: bool,
):
'''
Verify that cancelling a nursery results in all subactors
@ -274,15 +289,17 @@ def test_subactors_unregister_on_cancel(
def test_subactors_unregister_on_cancel_remote_daemon(
daemon: subprocess.Popen,
debug_mode: bool,
start_method,
use_signal,
reg_addr,
with_streaming,
start_method: str,
use_signal: bool,
reg_addr: tuple,
with_streaming: bool,
):
"""Verify that cancelling a nursery results in all subactors
deregistering themselves with a **remote** (not in the local process
tree) arbiter.
"""
'''
Verify that cancelling a nursery results in all subactors
deregistering themselves with a **remote** (not in the local
process tree) arbiter.
'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
@ -374,14 +391,16 @@ async def close_chans_before_nursery(
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit(
start_method,
use_signal,
reg_addr,
start_method: str,
use_signal: bool,
reg_addr: tuple,
):
"""Verify that closing a stream explicitly and killing the actor's
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(
@ -396,14 +415,16 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
daemon: subprocess.Popen,
start_method,
use_signal,
reg_addr,
start_method: str,
use_signal: bool,
reg_addr: tuple,
):
"""Verify that closing a stream explicitly and killing the actor's
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt):
trio.run(
partial(

View File

@ -9,12 +9,17 @@ import sys
import subprocess
import platform
import shutil
from typing import Callable
import pytest
import tractor
from tractor._testing import (
examples_dir,
)
_non_linux: bool = platform.system() != 'Linux'
_friggin_macos: bool = platform.system() == 'Darwin'
@pytest.fixture
def run_example_in_subproc(
@ -101,8 +106,10 @@ def run_example_in_subproc(
ids=lambda t: t[1],
)
def test_example(
run_example_in_subproc,
example_script,
run_example_in_subproc: Callable,
example_script: str,
test_log: tractor.log.StackLevelAdapter,
ci_env: bool,
):
'''
Load and run scripts from this repo's ``examples/`` dir as a user
@ -116,9 +123,32 @@ def test_example(
'''
ex_file: str = os.path.join(*example_script)
if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9):
if (
'rpc_bidir_streaming' in ex_file
and
sys.version_info < (3, 9)
):
pytest.skip("2-way streaming example requires py3.9 async with syntax")
if (
'full_fledged_streaming_service' in ex_file
and
_friggin_macos
and
ci_env
):
pytest.skip(
'Streaming example is too flaky in CI\n'
'AND their competitor runs this CI service..\n'
'This test does run just fine "in person" however..'
)
timeout: float = (
60
if ci_env and _non_linux
else 16
)
with open(ex_file, 'r') as ex:
code = ex.read()
@ -126,9 +156,12 @@ def test_example(
err = None
try:
if not proc.poll():
_, err = proc.communicate(timeout=15)
_, err = proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired as e:
test_log.exception(
f'Example failed to finish within {timeout}s ??\n'
)
proc.kill()
err = e.stderr

View File

@ -47,12 +47,11 @@ async def sleep_and_err(
# just signature placeholders for compat with
# ``to_asyncio.open_channel_from()``
to_trio: trio.MemorySendChannel|None = None,
from_trio: asyncio.Queue|None = None,
chan: to_asyncio.LinkedTaskChannel|None = None,
):
if to_trio:
to_trio.send_nowait('start')
if chan:
chan.started_nowait('start')
await asyncio.sleep(sleep_for)
assert 0
@ -399,7 +398,7 @@ async def no_to_trio_in_args():
async def push_from_aio_task(
sequence: Iterable,
to_trio: trio.abc.SendChannel,
chan: to_asyncio.LinkedTaskChannel,
expect_cancel: False,
fail_early: bool,
exit_early: bool,
@ -407,15 +406,12 @@ async def push_from_aio_task(
) -> None:
try:
# print('trying breakpoint')
# breakpoint()
# sync caller ctx manager
to_trio.send_nowait(True)
chan.started_nowait(True)
for i in sequence:
print(f'asyncio sending {i}')
to_trio.send_nowait(i)
chan.send_nowait(i)
await asyncio.sleep(0.001)
if (
@ -732,15 +728,21 @@ def test_aio_errors_and_channel_propagates_and_closes(
async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
chan: to_asyncio.LinkedTaskChannel,
) -> None:
'''
An IPC-msg "echo server" with msgs received and relayed by
a parent `trio.Task` into a child `asyncio.Task`
and then repeated back to that local parent (`trio.Task`)
and sent again back to the original calling remote actor.
to_trio.send_nowait('start')
'''
# same semantics as `trio.TaskStatus.started()`
chan.started_nowait('start')
while True:
try:
msg = await from_trio.get()
msg = await chan.get()
except to_asyncio.TrioTaskExited:
print(
'breaking aio echo loop due to `trio` exit!'
@ -748,7 +750,7 @@ async def aio_echo_server(
break
# echo the msg back
to_trio.send_nowait(msg)
chan.send_nowait(msg)
# if we get the terminate sentinel
# break the echo loop
@ -765,7 +767,10 @@ async def trio_to_aio_echo_server(
):
async with to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):
) as (
first, # value from `chan.started_nowait()` above
chan,
):
assert first == 'start'
await ctx.started(first)
@ -776,7 +781,8 @@ async def trio_to_aio_echo_server(
await chan.send(msg)
out = await chan.receive()
# echo back to parent actor-task
# echo back to parent-actor's remote parent-ctx-task!
await stream.send(out)
if out is None:
@ -1090,24 +1096,21 @@ def test_sigint_closes_lifetime_stack(
# ?TODO asyncio.Task fn-deco?
# -[ ] do sig checkingat import time like @context?
# -[ ] maybe name it @aio_task ??
# -[ ] chan: to_asyncio.InterloopChannel ??
# -[ ] do fn-sig checking at import time like @context?
# |_[ ] maybe name it @a(sync)io_task ??
# @asyncio_task <- not bad ??
async def raise_before_started(
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
chan: to_asyncio.LinkedTaskChannel,
) -> None:
'''
`asyncio.Task` entry point which RTEs before calling
`to_trio.send_nowait()`.
`chan.started_nowait()`.
'''
await asyncio.sleep(0.2)
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
# to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
await asyncio.sleep(float('inf'))

View File

@ -1,9 +1,11 @@
"""
Streaming via async gen api
Streaming via the, now legacy, "async-gen API".
"""
import time
from functools import partial
import platform
from typing import Callable
import trio
import tractor
@ -19,7 +21,11 @@ def test_must_define_ctx():
async def no_ctx():
pass
assert "no_ctx must be `ctx: tractor.Context" in str(err.value)
assert (
"no_ctx must be `ctx: tractor.Context"
in
str(err.value)
)
@tractor.stream
async def has_ctx(ctx):
@ -69,14 +75,14 @@ async def stream_from_single_subactor(
async with tractor.open_nursery(
registry_addrs=[reg_addr],
start_method=start_method,
) as nursery:
) as an:
async with tractor.find_actor('streamerd') as portals:
if not portals:
# no brokerd actor found
portal = await nursery.start_actor(
portal = await an.start_actor(
'streamerd',
enable_modules=[__name__],
)
@ -116,11 +122,22 @@ async def stream_from_single_subactor(
@pytest.mark.parametrize(
'stream_func', [async_gen_stream, context_stream]
'stream_func',
[
async_gen_stream,
context_stream,
],
ids='stream_func={}'.format
)
def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
"""Verify streaming from a spawned async generator.
"""
def test_stream_from_single_subactor(
reg_addr: tuple,
start_method: str,
stream_func: Callable,
):
'''
Verify streaming from a spawned async generator.
'''
trio.run(
partial(
stream_from_single_subactor,
@ -132,10 +149,9 @@ def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
# this is the first 2 actors, streamer_1 and streamer_2
async def stream_data(seed):
async def stream_data(seed: int):
for i in range(seed):
yield i
# trigger scheduler to simulate practical usage
@ -143,15 +159,17 @@ async def stream_data(seed):
# this is the third actor; the aggregator
async def aggregate(seed):
"""Ensure that the two streams we receive match but only stream
async def aggregate(seed: int):
'''
Ensure that the two streams we receive match but only stream
a single set of values to the parent.
"""
async with tractor.open_nursery() as nursery:
'''
async with tractor.open_nursery() as an:
portals = []
for i in range(1, 3):
# fork point
portal = await nursery.start_actor(
portal = await an.start_actor(
name=f'streamer_{i}',
enable_modules=[__name__],
)
@ -164,7 +182,8 @@ async def aggregate(seed):
async with send_chan:
async with portal.open_stream_from(
stream_data, seed=seed,
stream_data,
seed=seed,
) as stream:
async for value in stream:
@ -174,10 +193,14 @@ async def aggregate(seed):
print(f"FINISHED ITERATING {portal.channel.uid}")
# spawn 2 trio tasks to collect streams and push to a local queue
async with trio.open_nursery() as n:
async with trio.open_nursery() as tn:
for portal in portals:
n.start_soon(push_to_chan, portal, send_chan.clone())
tn.start_soon(
push_to_chan,
portal,
send_chan.clone(),
)
# close this local task's reference to send side
await send_chan.aclose()
@ -194,20 +217,21 @@ async def aggregate(seed):
print("FINISHED ITERATING in aggregator")
await nursery.cancel()
await an.cancel()
print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!")
# this is the main actor and *arbiter*
async def a_quadruple_example():
# a nursery which spawns "actors"
async with tractor.open_nursery() as nursery:
async def a_quadruple_example() -> list[int]:
'''
Open the root-actor which is also a "registrar".
'''
async with tractor.open_nursery() as an:
seed = int(1e3)
pre_start = time.time()
portal = await nursery.start_actor(
portal = await an.start_actor(
name='aggregator',
enable_modules=[__name__],
)
@ -228,8 +252,14 @@ async def a_quadruple_example():
return result_stream
async def cancel_after(wait, reg_addr):
async with tractor.open_root_actor(registry_addrs=[reg_addr]):
async def cancel_after(
wait: float,
reg_addr: tuple,
) -> list[int]:
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
with trio.move_on_after(wait):
return await a_quadruple_example()
@ -240,6 +270,10 @@ def time_quad_ex(
ci_env: bool,
spawn_backend: str,
):
non_linux: bool = (_sys := platform.system()) != 'Linux'
if ci_env and non_linux:
pytest.skip(f'Test is too flaky on {_sys!r} in CI')
if spawn_backend == 'mp':
'''
no idea but the mp *nix runs are flaking out here often...
@ -247,16 +281,20 @@ def time_quad_ex(
'''
pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
timeout = 7 if non_linux else 4
start = time.time()
results = trio.run(cancel_after, timeout, reg_addr)
diff = time.time() - start
results: list[int] = trio.run(
cancel_after,
timeout,
reg_addr,
)
diff: float = time.time() - start
assert results
return results, diff
def test_a_quadruple_example(
time_quad_ex: tuple,
time_quad_ex: tuple[list[int], float],
ci_env: bool,
spawn_backend: str,
):
@ -264,13 +302,12 @@ def test_a_quadruple_example(
This also serves as a kind of "we'd like to be this fast test".
'''
non_linux: bool = (_sys := platform.system()) != 'Linux'
results, diff = time_quad_ex
assert results
this_fast = (
6 if platform.system() in (
'Windows',
'Darwin',
)
6 if non_linux
else 3
)
assert diff < this_fast
@ -281,19 +318,33 @@ def test_a_quadruple_example(
list(map(lambda i: i/10, range(3, 9)))
)
def test_not_fast_enough_quad(
reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend
reg_addr: tuple,
time_quad_ex: tuple[list[int], float],
cancel_delay: float,
ci_env: bool,
spawn_backend: str,
):
"""Verify we can cancel midway through the quad example and all actors
cancel gracefully.
"""
'''
Verify we can cancel midway through the quad example and all
actors cancel gracefully.
'''
results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0)
results = trio.run(cancel_after, delay, reg_addr)
system = platform.system()
if system in ('Windows', 'Darwin') and results is not None:
results = trio.run(
cancel_after,
delay,
reg_addr,
)
system: str = platform.system()
if (
system in ('Windows', 'Darwin')
and
results is not None
):
# In CI envoirments it seems later runs are quicker then the first
# so just ignore these
print(f"Woa there {system} caught your breath eh?")
print(f'Woa there {system} caught your breath eh?')
else:
# should be cancelled mid-streaming
assert results is None
@ -301,23 +352,24 @@ def test_not_fast_enough_quad(
@tractor_test
async def test_respawn_consumer_task(
reg_addr,
spawn_backend,
loglevel,
reg_addr: tuple,
spawn_backend: str,
loglevel: str,
):
"""Verify that ``._portal.ReceiveStream.shield()``
'''
Verify that ``._portal.ReceiveStream.shield()``
sucessfully protects the underlying IPC channel from being closed
when cancelling and respawning a consumer task.
This also serves to verify that all values from the stream can be
received despite the respawns.
"""
'''
stream = None
async with tractor.open_nursery() as n:
async with tractor.open_nursery() as an:
portal = await n.start_actor(
portal = await an.start_actor(
name='streamer',
enable_modules=[__name__]
)

View File

@ -35,6 +35,9 @@ if TYPE_CHECKING:
)
_non_linux: bool = platform.system() != 'Linux'
def test_abort_on_sigint(
daemon: subprocess.Popen,
):
@ -137,6 +140,7 @@ def test_non_registrar_spawns_child(
reg_addr: UnwrappedAddress,
loglevel: str,
debug_mode: bool,
ci_env: bool,
):
'''
Ensure a non-regristar (serving) root actor can spawn a sub and
@ -148,6 +152,12 @@ def test_non_registrar_spawns_child(
'''
async def main():
# XXX, since apparently on macos in GH's CI it can be a race
# with the `daemon` registrar on grabbing the socket-addr..
if ci_env and _non_linux:
await trio.sleep(.5)
async with tractor.open_nursery(
registry_addrs=[reg_addr],
loglevel=loglevel,

View File

@ -91,13 +91,12 @@ def test_infected_root_actor(
async def sync_and_err(
# just signature placeholders for compat with
# ``to_asyncio.open_channel_from()``
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
chan: tractor.to_asyncio.LinkedTaskChannel,
ev: asyncio.Event,
):
if to_trio:
to_trio.send_nowait('start')
if chan:
chan.started_nowait('start')
await ev.wait()
raise RuntimeError('asyncio-side')

View File

@ -2,6 +2,7 @@
Shared mem primitives and APIs.
"""
import platform
import uuid
# import numpy
@ -53,7 +54,18 @@ def test_child_attaches_alot():
shm_key=shml.key,
) as (ctx, start_val),
):
assert start_val == key
assert (_key := shml.key) == start_val
if platform.system() != 'Darwin':
# XXX, macOS has a char limit..
# see `ipc._shm._shorten_key_for_macos`
assert (
start_val
==
key
==
_key
)
await ctx.result()
await portal.cancel_actor()

View File

@ -22,7 +22,6 @@ from __future__ import annotations
from contextvars import (
ContextVar,
)
import os
from pathlib import Path
from typing import (
Any,
@ -30,6 +29,7 @@ from typing import (
TYPE_CHECKING,
)
import platformdirs
from trio.lowlevel import current_task
if TYPE_CHECKING:
@ -172,23 +172,56 @@ def current_ipc_ctx(
return ctx
# std ODE (mutable) app state location
_rtdir: Path = Path(os.environ['XDG_RUNTIME_DIR'])
def get_rt_dir(
subdir: str = 'tractor'
subdir: str|Path|None = None,
appname: str = 'tractor',
) -> Path:
'''
Return the user "runtime dir" where most userspace apps stick
their IPC and cache related system util-files; we take hold
of a `'XDG_RUNTIME_DIR'/tractor/` subdir by default.
Return the user "runtime dir", the file-sys location where most
userspace apps stick their IPC and cache related system
util-files.
On linux we use a `${XDG_RUNTIME_DIR}/tractor/` subdir by
default, but equivalents are mapped for each platform using
the lovely `platformdirs` lib.
'''
rtdir: Path = _rtdir / subdir
if not rtdir.is_dir():
rtdir.mkdir()
return rtdir
rt_dir: Path = Path(
platformdirs.user_runtime_dir(
appname=appname,
),
)
# Normalize and validate that `subdir` is a relative path
# without any parent-directory ("..") components, to prevent
# escaping the runtime directory.
if subdir:
subdir_path = (
subdir
if isinstance(subdir, Path)
else Path(subdir)
)
if subdir_path.is_absolute():
raise ValueError(
f'`subdir` must be a relative path!\n'
f'{subdir!r}\n'
)
if any(part == '..' for part in subdir_path.parts):
raise ValueError(
"`subdir` must not contain '..' components!\n"
f'{subdir!r}\n'
)
rt_dir: Path = rt_dir / subdir_path
if not rt_dir.is_dir():
rt_dir.mkdir(
parents=True,
exist_ok=True, # avoid `FileExistsError` from conc calls
)
return rt_dir
def current_ipc_protos() -> list[str]:

View File

@ -21,6 +21,7 @@ cancellation during REPL interaction.
'''
from __future__ import annotations
import platform
from typing import (
TYPE_CHECKING,
)
@ -49,6 +50,7 @@ if TYPE_CHECKING:
log = get_logger()
_is_macos: bool = platform.system() == 'Darwin'
_ctlc_ignore_header: str = (
'Ignoring SIGINT while debug REPL in use'
)
@ -300,6 +302,11 @@ def sigint_shield(
# XXX: yah, mega hack, but how else do we catch this madness XD
if (
repl.shname == 'xonsh'
or (
repl.shname == 'bash'
and
_is_macos
)
):
flush_status += (
'-> ALSO re-flushing due to `xonsh`..\n'

View File

@ -23,6 +23,7 @@ considered optional within the context of this runtime-library.
"""
from __future__ import annotations
import hashlib
from multiprocessing import shared_memory as shm
from multiprocessing.shared_memory import (
# SharedMemory,
@ -106,11 +107,12 @@ class NDToken(Struct, frozen=True):
This type is msg safe.
'''
shm_name: str # this servers as a "key" value
shm_name: str # actual OS-level name (may be shortened on macOS)
shm_first_index_name: str
shm_last_index_name: str
dtype_descr: tuple
size: int # in struct-array index / row terms
key: str|None = None # original descriptive key (for lookup)
# TODO: use nptyping here on dtypes
@property
@ -124,6 +126,41 @@ class NDToken(Struct, frozen=True):
def as_msg(self):
return to_builtins(self)
def __eq__(self, other) -> bool:
'''
Compare tokens based on shm names and dtype,
ignoring the `key` field.
The `key` field is only used for lookups,
not for token identity.
'''
if not isinstance(other, NDToken):
return False
return (
self.shm_name == other.shm_name
and self.shm_first_index_name
== other.shm_first_index_name
and self.shm_last_index_name
== other.shm_last_index_name
and self.dtype_descr == other.dtype_descr
and self.size == other.size
)
def __hash__(self) -> int:
'''
Hash based on the same fields used
in `.__eq__()`.
'''
return hash((
self.shm_name,
self.shm_first_index_name,
self.shm_last_index_name,
self.dtype_descr,
self.size,
))
@classmethod
def from_msg(cls, msg: dict) -> NDToken:
if isinstance(msg, NDToken):
@ -160,6 +197,50 @@ def get_shm_token(key: str) -> NDToken | None:
return _known_tokens.get(key)
def _shorten_key_for_macos(
key: str,
prefix: str = '',
suffix: str = '',
) -> str:
'''
MacOS has a (hillarious) 31 character limit for POSIX shared
memory names. Hash long keys to fit within this limit while
maintaining uniqueness.
'''
# macOS shm_open() has a 31 char limit (PSHMNAMLEN)
# format: /t_<hash16> = 19 chars, well under limit
max_len: int = 31
if len(key) <= max_len:
return key
_hash: str = hashlib.sha256(
key.encode()
).hexdigest()
hash_len: int = (
(max_len - 1)
- len(prefix)
- len(suffix)
)
key_hash: str = _hash[:hash_len]
short_key = (
prefix
+
f'{key_hash}'
+
suffix
)
log.debug(
f'Shortened shm key for macOS:\n'
f' original: {key!r} ({len(key)!r} chars)\n'
f' shortened: {short_key!r}'
f' ({len(short_key)!r} chars)'
)
return short_key
def _make_token(
key: str,
size: int,
@ -171,12 +252,32 @@ def _make_token(
to access a shared array.
'''
# On macOS, shorten keys that exceed the
# 31 character limit
if platform.system() == 'Darwin':
shm_name = _shorten_key_for_macos(
key=key,
)
shm_first = _shorten_key_for_macos(
key=key,
suffix='_first',
)
shm_last = _shorten_key_for_macos(
key=key,
suffix='_last',
)
else:
shm_name = key
shm_first = key + '_first'
shm_last = key + '_last'
return NDToken(
shm_name=key,
shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last",
shm_name=shm_name,
shm_first_index_name=shm_first,
shm_last_index_name=shm_last,
dtype_descr=tuple(np.dtype(dtype).descr),
size=size,
key=key, # store original key for lookup
)
@ -431,9 +532,17 @@ class ShmArray:
def destroy(self) -> None:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
# We manually unlink to bypass all the
# "resource tracker" nonsense meant for
# non-SC systems.
name = self._shm.name
try:
shm_unlink(name)
except FileNotFoundError:
# might be a teardown race here?
log.warning(
f'Shm for {name} already unlinked?'
)
self._first.destroy()
self._last.destroy()
@ -463,8 +572,16 @@ def open_shm_ndarray(
a = np.zeros(size, dtype=dtype)
a['index'] = np.arange(len(a))
# Create token first to get the (possibly
# shortened) shm name
token = _make_token(
key=key,
size=size,
dtype=dtype,
)
shm = SharedMemory(
name=key,
name=token.shm_name,
create=True,
size=a.nbytes
)
@ -476,12 +593,6 @@ def open_shm_ndarray(
array[:] = a[:]
array.setflags(write=int(not readonly))
token = _make_token(
key=key,
size=size,
dtype=dtype,
)
# create single entry arrays for storing an first and last indices
first = SharedInt(
shm=SharedMemory(
@ -554,13 +665,23 @@ def attach_shm_ndarray(
'''
token = NDToken.from_msg(token)
key = token.shm_name
# Use original key for _known_tokens lookup,
# shm_name for OS calls
lookup_key = (
token.key if token.key
else token.shm_name
)
if key in _known_tokens:
assert NDToken.from_msg(_known_tokens[key]) == token, "WTF"
if lookup_key in _known_tokens:
assert (
NDToken.from_msg(
_known_tokens[lookup_key]
) == token
), 'WTF'
# XXX: ugh, looks like due to the ``shm_open()`` C api we can't
# actually place files in a subdir, see discussion here:
# XXX: ugh, looks like due to the ``shm_open()``
# C api we can't actually place files in a subdir,
# see discussion here:
# https://stackoverflow.com/a/11103289
# attach to array buffer and view as per dtype
@ -568,7 +689,7 @@ def attach_shm_ndarray(
for _ in range(3):
try:
shm = SharedMemory(
name=key,
name=token.shm_name,
create=False,
)
break
@ -614,10 +735,10 @@ def attach_shm_ndarray(
sha.array
# Stash key -> token knowledge for future queries
# via `maybe_opepn_shm_array()` but only after we know
# we can attach.
if key not in _known_tokens:
_known_tokens[key] = token
# via `maybe_open_shm_ndarray()` but only after
# we know we can attach.
if lookup_key not in _known_tokens:
_known_tokens[lookup_key] = token
# "close" attached shm on actor teardown
tractor.current_actor().lifetime_stack.callback(sha.close)
@ -661,7 +782,10 @@ def maybe_open_shm_ndarray(
False, # not newly opened
)
except KeyError:
log.warning(f"Could not find {key} in shms cache")
log.warning(
f'Could not find key in shms cache,\n'
f'key: {key!r}\n'
)
if dtype:
token = _make_token(
key,
@ -771,6 +895,7 @@ def open_shm_list(
size: int = int(2 ** 10),
dtype: float | int | bool | str | bytes | None = float,
readonly: bool = True,
prefix: str = 'shml_',
) -> ShmList:
@ -784,6 +909,12 @@ def open_shm_list(
}[dtype]
sequence = [default] * size
if platform.system() == 'Darwin':
key: str = _shorten_key_for_macos(
key=key,
prefix=prefix,
)
shml = ShmList(
sequence=sequence,
name=key,

View File

@ -23,12 +23,12 @@ from contextlib import (
)
from pathlib import Path
import os
import sys
from socket import (
AF_UNIX,
SOCK_STREAM,
SO_PASSCRED,
SO_PEERCRED,
SOL_SOCKET,
error as socket_error,
)
import struct
from typing import (
@ -53,7 +53,7 @@ from tractor.log import get_logger
from tractor.ipc._transport import (
MsgpackTransport,
)
from .._state import (
from tractor._state import (
get_rt_dir,
current_actor,
is_root_process,
@ -63,6 +63,28 @@ if TYPE_CHECKING:
from ._runtime import Actor
# Platform-specific credential passing constants
# See: https://stackoverflow.com/a/7982749
if sys.platform == 'linux':
from socket import (
SO_PASSCRED,
SO_PEERCRED,
)
else:
# Other (Unix) platforms - though further testing is required and
# others may need additional special handling?
SO_PASSCRED = None
SO_PEERCRED = None
# NOTE, macOS uses `LOCAL_PEERCRED` instead of `SO_PEERCRED` and
# doesn't need `SO_PASSCRED` (credential passing is always enabled).
# See code in <sys/un.h>: `#define LOCAL_PEERCRED 0x001`
#
# XXX INSTEAD we use the (hopefully) more generic
# `get_peer_pid()` below for other OSes.
log = get_logger()
@ -165,7 +187,11 @@ class UDSAddress(
err_on_no_runtime=False,
)
if actor:
sockname: str = '::'.join(actor.uid) + f'@{pid}'
sockname: str = f'{actor.aid.name}@{pid}'
# XXX, orig version which broke both macOS (file-name
# length) and `multiaddrs` ('::' invalid separator).
# sockname: str = '::'.join(actor.uid) + f'@{pid}'
#
# ?^TODO, for `multiaddr`'s parser we can't use the `::`
# above^, SO maybe a `.` or something else here?
# sockname: str = '.'.join(actor.uid) + f'@{pid}'
@ -292,7 +318,12 @@ def close_listener(
async def open_unix_socket_w_passcred(
filename: str|bytes|os.PathLike[str]|os.PathLike[bytes],
filename: (
str
|bytes
|os.PathLike[str]
|os.PathLike[bytes]
),
) -> trio.SocketStream:
'''
Literally the exact same as `trio.open_unix_socket()` except we set the additiona
@ -310,21 +341,66 @@ async def open_unix_socket_w_passcred(
# much more simplified logic vs tcp sockets - one socket type and only one
# possible location to connect to
sock = trio.socket.socket(AF_UNIX, SOCK_STREAM)
sock.setsockopt(SOL_SOCKET, SO_PASSCRED, 1)
# Only set SO_PASSCRED on Linux (not needed/available on macOS)
if SO_PASSCRED is not None:
sock.setsockopt(SOL_SOCKET, SO_PASSCRED, 1)
with close_on_error(sock):
await sock.connect(os.fspath(filename))
return trio.SocketStream(sock)
def get_peer_info(sock: trio.socket.socket) -> tuple[
def get_peer_pid(sock) -> int|None:
'''
Gets the PID of the process connected to the other end of a Unix
domain socket on macOS, or `None` if that fails.
NOTE, should work on MacOS (and others?).
'''
# try to get the peer PID using a naive soln found from,
# https://stackoverflow.com/a/67971484
#
# NOTE, a more correct soln is likely needed here according to
# the complaints of `copilot` which led to digging into the
# underlying `go`lang issue linked from the above SO answer,
# XXX, darwin-xnu kernel srces defining these constants,
# - SOL_LOCAL
# |_https://github.com/apple/darwin-xnu/blob/main/bsd/sys/un.h#L85
# - LOCAL_PEERPID
# |_https://github.com/apple/darwin-xnu/blob/main/bsd/sys/un.h#L89
#
SOL_LOCAL: int = 0
LOCAL_PEERPID: int = 0x002
try:
pid: int = sock.getsockopt(
SOL_LOCAL,
LOCAL_PEERPID,
)
return pid
except socket_error as e:
log.exception(
f"Failed to get peer PID: {e}"
)
return None
def get_peer_info(
sock: trio.socket.socket,
) -> tuple[
int, # pid
int, # uid
int, # guid
]:
'''
Deliver the connecting peer's "credentials"-info as defined in
a very Linux specific way..
a platform-specific way.
Linux-ONLY, uses SO_PEERCRED.
For more deats see,
- `man accept`,
@ -337,6 +413,11 @@ def get_peer_info(sock: trio.socket.socket) -> tuple[
- https://stackoverflow.com/a/7982749
'''
if SO_PEERCRED is None:
raise RuntimeError(
f'Peer credential retrieval not supported on {sys.platform}!'
)
creds: bytes = sock.getsockopt(
SOL_SOCKET,
SO_PEERCRED,
@ -440,13 +521,37 @@ class MsgpackUDSStream(MsgpackTransport):
match (peername, sockname):
case (str(), bytes()):
sock_path: Path = Path(peername)
case (bytes(), str()):
sock_path: Path = Path(sockname)
(
peer_pid,
_,
_,
) = get_peer_info(sock)
case (str(), str()): # XXX, likely macOS
sock_path: Path = Path(peername)
case _:
raise TypeError(
f'Failed to match (peername, sockname) types?\n'
f'peername: {peername!r}\n'
f'sockname: {sockname!r}\n'
)
if sys.platform == 'linux':
(
peer_pid,
_,
_,
) = get_peer_info(sock)
# NOTE known to at least works on,
# - macos
else:
peer_pid: int|None = get_peer_pid(sock)
if peer_pid is None:
log.warning(
f'Unable to get peer PID?\n'
f'sock: {sock!r}\n'
f'peer_pid: {peer_pid!r}\n'
)
filedir, filename = unwrap_sockpath(sock_path)
laddr = UDSAddress(

View File

@ -126,13 +126,17 @@ def iter_struct_ppfmt_lines(
str(ft)
).replace(' ', '')
if k[0] == '_':
continue
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
elif isinstance(v, Struct):
yield from iter_struct_ppfmt_lines(
struct=v,
field_indent=field_indent+field_indent,
)
else:
else: # top-level field
val_str: str = repr(v)
# XXX LOL, below just seems to be f#$%in causing
@ -149,10 +153,10 @@ def iter_struct_ppfmt_lines(
# raise
# return _Struct.__repr__(struct)
yield (
' '*field_indent, # indented ws prefix
f'{k}: {typ_name} = {val_str},', # field's repr line content
)
yield (
' '*field_indent, # indented ws prefix
f'{k}: {typ_name} = {val_str},', # field's repr line content
)
def pformat(

View File

@ -48,7 +48,7 @@ from tractor._state import (
_runtime_vars,
)
from tractor._context import Unresolved
from tractor.devx import debug
from tractor import devx
from tractor.log import (
get_logger,
StackLevelAdapter,
@ -94,10 +94,14 @@ else:
QueueShutDown = False
# TODO, generally speaking we can generalize this abstraction, a "SC linked
# parent->child task pair", as the same "supervision scope primitive"
# **that is** our `._context.Context` with the only difference being
# in how the tasks conduct msg-passing comms.
# TODO, generally speaking we can generalize this abstraction as,
#
# > A "SC linked, inter-event-loop" channel for comms between
# > a `parent: trio.Task` -> `child: asyncio.Task` pair.
#
# It is **very similar** in terms of its operation as a "supervision
# scope primitive" to that of our `._context.Context` with the only
# difference being in how the tasks conduct msg-passing comms.
#
# For `LinkedTaskChannel` we are passing the equivalent of (once you
# include all the recently added `._trio/aio_to_raise`
@ -122,6 +126,7 @@ class LinkedTaskChannel(
task scheduled in the host loop.
'''
# ?TODO, rename as `._aio_q` since it's 2-way?
_to_aio: asyncio.Queue
_from_aio: trio.MemoryReceiveChannel
@ -235,9 +240,11 @@ class LinkedTaskChannel(
#
async def receive(self) -> Any:
'''
Receive a value from the paired `asyncio.Task` with
Receive a value `trio.Task` <- `asyncio.Task`.
Note the tasks in each loop are "SC linked" as a pair with
exception/cancel handling to teardown both sides on any
unexpected error.
unexpected error or cancellation.
'''
try:
@ -261,15 +268,40 @@ class LinkedTaskChannel(
):
raise err
async def get(self) -> Any:
'''
Receive a value `asyncio.Task` <- `trio.Task`.
This is equiv to `await self._to_aio.get()`.
'''
return await self._to_aio.get()
async def send(self, item: Any) -> None:
'''
Send a value through to the asyncio task presuming
it defines a ``from_trio`` argument, if it does not
this method will raise an error.
Send a value `trio.Task` -> `asyncio.Task`
by enqueuing `item` onto the internal
`asyncio.Queue` via `put_nowait()`.
'''
self._to_aio.put_nowait(item)
# TODO? could we only compile-in this method on an instance
# handed to the `asyncio`-side, i.e. the fn invoked with
# `.open_channel_from()`.
def send_nowait(
self,
item: Any,
) -> None:
'''
Send a value through FROM the `asyncio.Task` to
the `trio.Task` NON-BLOCKING.
This is equiv to `self._to_trio.send_nowait()`.
'''
self._to_trio.send_nowait(item)
# TODO? needed?
# async def wait_aio_complete(self) -> None:
# await self._aio_task_complete.wait()
@ -337,9 +369,12 @@ def _run_asyncio_task(
'''
__tracebackhide__: bool = hide_tb
if not tractor.current_actor().is_infected_aio():
if not (actor := tractor.current_actor()).is_infected_aio():
raise RuntimeError(
"`infect_asyncio` mode is not enabled!?"
f'`infect_asyncio: bool` mode is not enabled ??\n'
f'Ensure you pass `ActorNursery.start_actor(infect_asyncio=True)`\n'
f'\n'
f'{actor}\n'
)
# ITC (inter task comms), these channel/queue names are mostly from
@ -402,7 +437,23 @@ def _run_asyncio_task(
orig = result = id(coro)
try:
# XXX TODO UGH!
# this seems to break a `test_sync_pause_from_aio_task`
# in a REALLY weird way where a `dict` value for
# `_runtime_vars['_root_addrs']` is delivered from the
# parent actor??
#
# XXX => see masked `.set_trace()` block in
# `Actor.from_parent()`..
#
# with devx.maybe_open_crash_handler(
# # XXX, if trio-side exits (intentionally) we
# # shouldn't care bc it should have its own crash
# # handling logic.
# ignore={TrioTaskExited,},
# ) as _bxerr:
result: Any = await coro
chan._aio_result = result
except BaseException as aio_err:
chan._aio_err = aio_err
@ -509,7 +560,7 @@ def _run_asyncio_task(
if (
debug_mode()
and
(greenback := debug.maybe_import_greenback(
(greenback := devx.debug.maybe_import_greenback(
force_reload=True,
raise_not_found=False,
))
@ -909,7 +960,11 @@ async def translate_aio_errors(
except BaseException as _trio_err:
trio_err = chan._trio_err = _trio_err
# await tractor.pause(shield=True) # workx!
entered: bool = await debug._maybe_enter_pm(
# !TODO! we need an inter-loop lock here to avoid aio-tasks
# clobbering trio ones when both crash in debug-mode!
#
entered: bool = await devx.debug._maybe_enter_pm(
trio_err,
api_frame=inspect.currentframe(),
)
@ -1243,10 +1298,18 @@ async def open_channel_from(
suppress_graceful_exits: bool = True,
**target_kwargs,
) -> AsyncIterator[Any]:
) -> AsyncIterator[
tuple[Any, LinkedTaskChannel]
]:
'''
Open an inter-loop linked task channel for streaming between a target
spawned ``asyncio`` task and ``trio``.
Start an `asyncio.Task` as `target()` and open an
inter-loop (linked) channel for streaming between
it and the current `trio.Task`.
A pair `(Any, chan: LinkedTaskChannel)` is delivered
to the caller where the 1st element is the value
provided by the `asyncio.Task`'s unblocking call
to `chan.started_nowait()`.
'''
chan: LinkedTaskChannel = _run_asyncio_task(
@ -1271,6 +1334,7 @@ async def open_channel_from(
# deliver stream handle upward
yield first, chan
# ^TODO! swap these!!
except trio.Cancelled as taskc:
if cs.cancel_called:
if isinstance(chan._trio_to_raise, AsyncioCancelled):
@ -1301,7 +1365,8 @@ async def open_channel_from(
)
else:
# XXX SHOULD NEVER HAPPEN!
await tractor.pause()
log.error("SHOULD NEVER GET HERE !?!?")
await tractor.pause(shield=True)
else:
chan._to_trio.close()

11
uv.lock
View File

@ -208,6 +208,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/9e/c3/059298687310d527a58bb01f3b1965787ee3b40dce76752eda8b44e9a2c5/pexpect-4.9.0-py2.py3-none-any.whl", hash = "sha256:7236d1e080e4936be2dc3e326cec0af72acf9212a7e1d060210e70a47e253523", size = 63772, upload-time = "2023-11-25T06:56:14.81Z" },
]
[[package]]
name = "platformdirs"
version = "4.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/23/e8/21db9c9987b0e728855bd57bff6984f67952bea55d6f75e055c46b5383e8/platformdirs-4.4.0.tar.gz", hash = "sha256:ca753cf4d81dc309bc67b0ea38fd15dc97bc30ce419a7f58d13eb3bf14c4febf", size = 21634, upload-time = "2025-08-26T14:32:04.268Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/40/4b/2028861e724d3bd36227adfa20d3fd24c3fc6d52032f4a93c133be5d17ce/platformdirs-4.4.0-py3-none-any.whl", hash = "sha256:abd01743f24e5287cd7a5db3752faf1a2d65353f38ec26d98e25a6db65958c85", size = 18654, upload-time = "2025-08-26T14:32:02.735Z" },
]
[[package]]
name = "pluggy"
version = "1.5.0"
@ -376,6 +385,7 @@ dependencies = [
{ name = "colorlog" },
{ name = "msgspec" },
{ name = "pdbp" },
{ name = "platformdirs" },
{ name = "tricycle" },
{ name = "trio" },
{ name = "wrapt" },
@ -419,6 +429,7 @@ requires-dist = [
{ name = "colorlog", specifier = ">=6.8.2,<7" },
{ name = "msgspec", specifier = ">=0.19.0" },
{ name = "pdbp", specifier = ">=1.8.2,<2" },
{ name = "platformdirs", specifier = ">=4.4.0" },
{ name = "tricycle", specifier = ">=0.4.1,<0.5" },
{ name = "trio", specifier = ">0.27" },
{ name = "wrapt", specifier = ">=1.16.0,<2" },