Merge pull request #342 from goodboy/macos_in_ci

Macos in ci
ns_aware
Bd 2026-03-09 20:33:38 -04:00 committed by GitHub
commit 5c270b89d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 603 additions and 184 deletions

View File

@ -75,15 +75,21 @@ jobs:
testing-linux: testing-linux:
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}' name: '${{ matrix.os }} Python${{ matrix.python-version }} - spawn_backend=${{ matrix.spawn_backend }}'
timeout-minutes: 10 timeout-minutes: 10
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
strategy: strategy:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest] os: [
python-version: ['3.13'] ubuntu-latest,
# macos-latest, # ?TODO, better?
]
python-version: [
'3.13',
# '3.14',
]
spawn_backend: [ spawn_backend: [
'trio', 'trio',
# 'mp_spawn', # 'mp_spawn',
@ -91,7 +97,6 @@ jobs:
] ]
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: 'Install uv + py-${{ matrix.python-version }}' - name: 'Install uv + py-${{ matrix.python-version }}'
@ -120,6 +125,42 @@ jobs:
- name: Run tests - name: Run tests
run: uv run pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx run: uv run pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
testing-macos:
name: '${{ matrix.os }} Python${{ matrix.python-version }} - spawn_backend=${{ matrix.spawn_backend }}'
timeout-minutes: 16
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [
macos-latest,
]
python-version: [
'3.13',
# '3.14',
]
spawn_backend: [
'trio',
]
steps:
- uses: actions/checkout@v4
- name: 'Install uv + py-${{ matrix.python-version }}'
uses: astral-sh/setup-uv@v6
with:
python-version: ${{ matrix.python-version }}
- name: Install the project w uv
run: uv sync --all-extras --dev
- name: List deps tree
run: uv tree
- name: Run tests w uv
run: uv run pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
# XXX legacy NOTE XXX # XXX legacy NOTE XXX
# #
# We skip 3.10 on windows for now due to not having any collabs to # We skip 3.10 on windows for now due to not having any collabs to

View File

@ -3,6 +3,7 @@ Verify we can dump a `stackscope` tree on a hang.
''' '''
import os import os
import platform
import signal import signal
import trio import trio
@ -31,13 +32,26 @@ async def main(
from_test: bool = False, from_test: bool = False,
) -> None: ) -> None:
if platform.system() != 'Darwin':
tpt = 'uds'
else:
# XXX, precisely we can't use pytest's tmp-path generation
# for tests.. apparently because:
#
# > The OSError: AF_UNIX path too long in macOS Python occurs
# > because the path to the Unix domain socket exceeds the
# > operating system's maximum path length limit (around 104
#
# WHICH IS just, wtf hillarious XD
tpt = 'tcp'
async with ( async with (
tractor.open_nursery( tractor.open_nursery(
debug_mode=True, debug_mode=True,
enable_stack_on_sig=True, enable_stack_on_sig=True,
# maybe_enable_greenback=False, # maybe_enable_greenback=False,
loglevel='devx', loglevel='devx',
enable_transports=['uds'], enable_transports=[tpt],
) as an, ) as an,
): ):
ptl: tractor.Portal = await an.start_actor( ptl: tractor.Portal = await an.start_actor(

View File

@ -1,3 +1,5 @@
import platform
import tractor import tractor
import trio import trio
@ -34,9 +36,22 @@ async def just_bp(
async def main(): async def main():
if platform.system() != 'Darwin':
tpt = 'uds'
else:
# XXX, precisely we can't use pytest's tmp-path generation
# for tests.. apparently because:
#
# > The OSError: AF_UNIX path too long in macOS Python occurs
# > because the path to the Unix domain socket exceeds the
# > operating system's maximum path length limit (around 104
#
# WHICH IS just, wtf hillarious XD
tpt = 'tcp'
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
enable_transports=['uds'], enable_transports=[tpt],
loglevel='devx', loglevel='devx',
) as n: ) as n:
p = await n.start_actor( p = await n.start_actor(

View File

@ -90,7 +90,7 @@ async def main() -> list[int]:
# yes, a nursery which spawns `trio`-"actors" B) # yes, a nursery which spawns `trio`-"actors" B)
an: ActorNursery an: ActorNursery
async with tractor.open_nursery( async with tractor.open_nursery(
loglevel='cancel', loglevel='error',
# debug_mode=True, # debug_mode=True,
) as an: ) as an:
@ -118,8 +118,10 @@ async def main() -> list[int]:
cancelled: bool = await portal.cancel_actor() cancelled: bool = await portal.cancel_actor()
assert cancelled assert cancelled
print(f"STREAM TIME = {time.time() - start}") print(
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") f"STREAM TIME = {time.time() - start}\n"
f"STREAM + SPAWN TIME = {time.time() - pre_start}\n"
)
assert result_stream == list(range(seed)) assert result_stream == list(range(seed))
return result_stream return result_stream

View File

@ -11,6 +11,7 @@ import platform
import time import time
import pytest import pytest
import tractor
from tractor._testing import ( from tractor._testing import (
examples_dir as examples_dir, examples_dir as examples_dir,
tractor_test as tractor_test, tractor_test as tractor_test,
@ -22,6 +23,7 @@ pytest_plugins: list[str] = [
'tractor._testing.pytest', 'tractor._testing.pytest',
] ]
_non_linux: bool = platform.system() != 'Linux'
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives # Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
if platform.system() == 'Windows': if platform.system() == 'Windows':
@ -44,6 +46,10 @@ no_windows = pytest.mark.skipif(
platform.system() == "Windows", platform.system() == "Windows",
reason="Test is unsupported on windows", reason="Test is unsupported on windows",
) )
no_macos = pytest.mark.skipif(
platform.system() == "Darwin",
reason="Test is unsupported on MacOS",
)
def pytest_addoption( def pytest_addoption(
@ -61,7 +67,7 @@ def pytest_addoption(
@pytest.fixture(scope='session', autouse=True) @pytest.fixture(scope='session', autouse=True)
def loglevel(request): def loglevel(request) -> str:
import tractor import tractor
orig = tractor.log._default_loglevel orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel level = tractor.log._default_loglevel = request.config.option.loglevel
@ -69,11 +75,46 @@ def loglevel(request):
level=level, level=level,
name='tractor', # <- enable root logger name='tractor', # <- enable root logger
) )
log.info(f'Test-harness logging level: {level}\n') log.info(
f'Test-harness set runtime loglevel: {level!r}\n'
)
yield level yield level
tractor.log._default_loglevel = orig tractor.log._default_loglevel = orig
@pytest.fixture(scope='function')
def test_log(
request,
loglevel: str,
) -> tractor.log.StackLevelAdapter:
'''
Deliver a per test-module-fn logger instance for reporting from
within actual test bodies/fixtures.
For example this can be handy to report certain error cases from
exception handlers using `test_log.exception()`.
'''
modname: str = request.function.__module__
log = tractor.log.get_logger(
name=modname, # <- enable root logger
# pkg_name='tests',
)
_log = tractor.log.get_console_log(
level=loglevel,
logger=log,
name=modname,
# pkg_name='tests',
)
_log.debug(
f'In-test-logging requested\n'
f'test_log.name: {log.name!r}\n'
f'level: {loglevel!r}\n'
)
yield _log
_ci_env: bool = os.environ.get('CI', False) _ci_env: bool = os.environ.get('CI', False)
@ -110,6 +151,7 @@ def daemon(
testdir: pytest.Pytester, testdir: pytest.Pytester,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
tpt_proto: str, tpt_proto: str,
ci_env: bool,
) -> subprocess.Popen: ) -> subprocess.Popen:
''' '''
@ -147,13 +189,25 @@ def daemon(
**kwargs, **kwargs,
) )
# TODO! we should poll for the registry socket-bind to take place
# and only once that's done yield to the requester!
# -[ ] TCP: use the `._root.open_root_actor()`::`ping_tpt_socket()`
# closure!
# -[ ] UDS: can we do something similar for 'pinging" the
# file-socket?
#
global _PROC_SPAWN_WAIT
# UDS sockets are **really** fast to bind()/listen()/connect() # UDS sockets are **really** fast to bind()/listen()/connect()
# so it's often required that we delay a bit more starting # so it's often required that we delay a bit more starting
# the first actor-tree.. # the first actor-tree..
if tpt_proto == 'uds': if tpt_proto == 'uds':
global _PROC_SPAWN_WAIT
_PROC_SPAWN_WAIT = 0.6 _PROC_SPAWN_WAIT = 0.6
if _non_linux and ci_env:
_PROC_SPAWN_WAIT += 1
# XXX, allow time for the sub-py-proc to boot up.
# !TODO, see ping-polling ideas above!
time.sleep(_PROC_SPAWN_WAIT) time.sleep(_PROC_SPAWN_WAIT)
assert not proc.returncode assert not proc.returncode
@ -163,18 +217,30 @@ def daemon(
# XXX! yeah.. just be reaaal careful with this bc sometimes it # XXX! yeah.. just be reaaal careful with this bc sometimes it
# can lock up on the `_io.BufferedReader` and hang.. # can lock up on the `_io.BufferedReader` and hang..
stderr: str = proc.stderr.read().decode() stderr: str = proc.stderr.read().decode()
if stderr: stdout: str = proc.stdout.read().decode()
if (
stderr
or
stdout
):
print( print(
f'Daemon actor tree produced STDERR:\n' f'Daemon actor tree produced output:\n'
f'{proc.args}\n' f'{proc.args}\n'
f'\n' f'\n'
f'{stderr}\n' f'stderr: {stderr!r}\n'
f'stdout: {stdout!r}\n'
) )
if proc.returncode != -2:
raise RuntimeError( if (rc := proc.returncode) != -2:
'Daemon actor tree failed !?\n' msg: str = (
f'{proc.args}\n' f'Daemon actor tree was not cancelled !?\n'
f'proc.args: {proc.args!r}\n'
f'proc.returncode: {rc!r}\n'
) )
if rc < 0:
raise RuntimeError(msg)
log.error(msg)
# @pytest.fixture(autouse=True) # @pytest.fixture(autouse=True)

View File

@ -3,8 +3,9 @@
''' '''
from __future__ import annotations from __future__ import annotations
import time import platform
import signal import signal
import time
from typing import ( from typing import (
Callable, Callable,
TYPE_CHECKING, TYPE_CHECKING,
@ -33,6 +34,17 @@ if TYPE_CHECKING:
from pexpect import pty_spawn from pexpect import pty_spawn
_non_linux: bool = platform.system() != 'Linux'
def pytest_configure(config):
# register custom marks to avoid warnings see,
# https://docs.pytest.org/en/stable/how-to/writing_plugins.html#registering-custom-markers
config.addinivalue_line(
'markers',
'ctlcs_bish: test will (likely) not behave under SIGINT..'
)
# a fn that sub-instantiates a `pexpect.spawn()` # a fn that sub-instantiates a `pexpect.spawn()`
# and returns it. # and returns it.
type PexpectSpawner = Callable[ type PexpectSpawner = Callable[
@ -68,7 +80,10 @@ def spawn(
''' '''
import os import os
# disable colored tbs
os.environ['PYTHON_COLORS'] = '0' os.environ['PYTHON_COLORS'] = '0'
# disable all ANSI color output
# os.environ['NO_COLOR'] = '1'
spawned: PexpectSpawner|None = None spawned: PexpectSpawner|None = None
@ -83,7 +98,10 @@ def spawn(
cmd, cmd,
**mkcmd_kwargs, **mkcmd_kwargs,
), ),
expect_timeout=3, expect_timeout=(
10 if _non_linux and _ci_env
else 3
),
# preexec_fn=unset_colors, # preexec_fn=unset_colors,
# ^TODO? get `pytest` core to expose underlying # ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff? # `pexpect.spawn()` stuff?
@ -146,6 +164,8 @@ def ctlc(
mark.name == 'ctlcs_bish' mark.name == 'ctlcs_bish'
and and
use_ctlc use_ctlc
and
all(mark.args)
): ):
pytest.skip( pytest.skip(
f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n' f'Test {node} prolly uses something from the stdlib (namely `asyncio`..)\n'
@ -251,12 +271,13 @@ def assert_before(
err_on_false=True, err_on_false=True,
**kwargs **kwargs
) )
return str(child.before.decode())
def do_ctlc( def do_ctlc(
child, child,
count: int = 3, count: int = 3,
delay: float = 0.1, delay: float|None = None,
patt: str|None = None, patt: str|None = None,
# expect repl UX to reprint the prompt after every # expect repl UX to reprint the prompt after every
@ -268,6 +289,7 @@ def do_ctlc(
) -> str|None: ) -> str|None:
before: str|None = None before: str|None = None
delay = delay or 0.1
# make sure ctl-c sends don't do anything but repeat output # make sure ctl-c sends don't do anything but repeat output
for _ in range(count): for _ in range(count):
@ -278,7 +300,10 @@ def do_ctlc(
# if you run this test manually it works just fine.. # if you run this test manually it works just fine..
if expect_prompt: if expect_prompt:
time.sleep(delay) time.sleep(delay)
child.expect(PROMPT) child.expect(
PROMPT,
timeout=(child.timeout * 2) if _ci_env else child.timeout,
)
before = str(child.before.decode()) before = str(child.before.decode())
time.sleep(delay) time.sleep(delay)

View File

@ -37,6 +37,9 @@ from .conftest import (
in_prompt_msg, in_prompt_msg,
assert_before, assert_before,
) )
from ..conftest import (
_ci_env,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ..conftest import PexpectSpawner from ..conftest import PexpectSpawner
@ -51,13 +54,14 @@ if TYPE_CHECKING:
# - recurrent root errors # - recurrent root errors
_non_linux: bool = platform.system() != 'Linux'
if platform.system() == 'Windows': if platform.system() == 'Windows':
pytest.skip( pytest.skip(
'Debugger tests have no windows support (yet)', 'Debugger tests have no windows support (yet)',
allow_module_level=True, allow_module_level=True,
) )
# TODO: was trying to this xfail style but some weird bug i see in CI # TODO: was trying to this xfail style but some weird bug i see in CI
# that's happening at collect time.. pretty soon gonna dump actions i'm # that's happening at collect time.. pretty soon gonna dump actions i'm
# thinkin... # thinkin...
@ -193,6 +197,11 @@ def test_root_actor_bp_forever(
child.expect(EOF) child.expect(EOF)
# skip on non-Linux CI
@pytest.mark.ctlcs_bish(
_non_linux,
_ci_env,
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'do_next', 'do_next',
(True, False), (True, False),
@ -258,6 +267,11 @@ def test_subactor_error(
child.expect(EOF) child.expect(EOF)
# skip on non-Linux CI
@pytest.mark.ctlcs_bish(
_non_linux,
_ci_env,
)
def test_subactor_breakpoint( def test_subactor_breakpoint(
spawn, spawn,
ctlc: bool, ctlc: bool,
@ -480,8 +494,24 @@ def test_multi_daemon_subactors(
stream. stream.
''' '''
child = spawn('multi_daemon_subactors') non_linux = _non_linux
if non_linux and ctlc:
pytest.skip(
'Ctl-c + MacOS is too unreliable/racy for this test..\n'
)
# !TODO, if someone with more patience then i wants to muck
# with the timings on this please feel free to see all the
# `non_linux` branching logic i added on my first attempt
# below!
#
# my conclusion was that if i were to run the script
# manually, and thus as slowly as a human would, the test
# would and should pass as described in this test fn, however
# after fighting with it for >= 1hr. i decided more then
# likely the more extensive `linux` testing should cover most
# regressions.
child = spawn('multi_daemon_subactors')
child.expect(PROMPT) child.expect(PROMPT)
# there can be a race for which subactor will acquire # there can be a race for which subactor will acquire
@ -511,8 +541,19 @@ def test_multi_daemon_subactors(
else: else:
raise ValueError('Neither log msg was found !?') raise ValueError('Neither log msg was found !?')
non_linux_delay: float = 0.3
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(
child,
delay=(
non_linux_delay
if non_linux
else None
),
)
if non_linux:
time.sleep(1)
# NOTE: previously since we did not have clobber prevention # NOTE: previously since we did not have clobber prevention
# in the root actor this final resume could result in the debugger # in the root actor this final resume could result in the debugger
@ -543,33 +584,66 @@ def test_multi_daemon_subactors(
# assert "in use by child ('bp_forever'," in before # assert "in use by child ('bp_forever'," in before
if ctlc: if ctlc:
do_ctlc(child) do_ctlc(
child,
delay=(
non_linux_delay
if non_linux
else None
),
)
if non_linux:
time.sleep(1)
# expect another breakpoint actor entry # expect another breakpoint actor entry
child.sendline('c') child.sendline('c')
child.expect(PROMPT) child.expect(PROMPT)
try: try:
assert_before( before: str = assert_before(
child, child,
bp_forev_parts, bp_forev_parts,
) )
except AssertionError: except AssertionError:
assert_before( before: str = assert_before(
child, child,
name_error_parts, name_error_parts,
) )
else: else:
if ctlc: if ctlc:
do_ctlc(child) before: str = do_ctlc(
child,
delay=(
non_linux_delay
if non_linux
else None
),
)
if non_linux:
time.sleep(1)
# should crash with the 2nd name error (simulates # should crash with the 2nd name error (simulates
# a retry) and then the root eventually (boxed) errors # a retry) and then the root eventually (boxed) errors
# after 1 or more further bp actor entries. # after 1 or more further bp actor entries.
child.sendline('c') child.sendline('c')
child.expect(PROMPT) try:
child.expect(
PROMPT,
timeout=3,
)
except EOF:
before: str = child.before.decode()
print(
f'\n'
f'??? NEVER RXED `pdb` PROMPT ???\n'
f'\n'
f'{before}\n'
)
raise
assert_before( assert_before(
child, child,
name_error_parts, name_error_parts,
@ -689,7 +763,8 @@ def test_multi_subactors_root_errors(
@has_nested_actors @has_nested_actors
def test_multi_nested_subactors_error_through_nurseries( def test_multi_nested_subactors_error_through_nurseries(
spawn, ci_env: bool,
spawn: PexpectSpawner,
# TODO: address debugger issue for nested tree: # TODO: address debugger issue for nested tree:
# https://github.com/goodboy/tractor/issues/320 # https://github.com/goodboy/tractor/issues/320
@ -712,7 +787,16 @@ def test_multi_nested_subactors_error_through_nurseries(
for send_char in itertools.cycle(['c', 'q']): for send_char in itertools.cycle(['c', 'q']):
try: try:
child.expect(PROMPT) child.expect(
PROMPT,
timeout=(
6 if (
_non_linux
and
ci_env
) else -1
),
)
child.sendline(send_char) child.sendline(send_char)
time.sleep(0.01) time.sleep(0.01)
@ -889,6 +973,11 @@ def test_different_debug_mode_per_actor(
) )
# skip on non-Linux CI
@pytest.mark.ctlcs_bish(
_non_linux,
_ci_env,
)
def test_post_mortem_api( def test_post_mortem_api(
spawn, spawn,
ctlc: bool, ctlc: bool,
@ -1133,14 +1222,20 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
# closed so verify we see error reporting as well as # closed so verify we see error reporting as well as
# a failed crash-REPL request msg and can CTL-c our way # a failed crash-REPL request msg and can CTL-c our way
# out. # out.
# ?TODO, match depending on `tpt_proto(s)`?
# - [ ] how can we pass it into the script tho?
tpt: str = 'UDS'
if _non_linux:
tpt: str = 'TCP'
assert_before( assert_before(
child, child,
['peer IPC channel closed abruptly?', ['peer IPC channel closed abruptly?',
'another task closed this fd', 'another task closed this fd',
'Debug lock request was CANCELLED?', 'Debug lock request was CANCELLED?',
"'MsgpackUDSStream' was already closed locally?", f"'Msgpack{tpt}Stream' was already closed locally?",
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?", f"TransportClosed: 'Msgpack{tpt}Stream' was already closed 'by peer'?",
# ?TODO^? match depending on `tpt_proto(s)`?
] ]
# XXX races on whether these show/hit? # XXX races on whether these show/hit?

View File

@ -31,6 +31,9 @@ from .conftest import (
PROMPT, PROMPT,
_pause_msg, _pause_msg,
) )
from ..conftest import (
no_macos,
)
import pytest import pytest
from pexpect.exceptions import ( from pexpect.exceptions import (
@ -42,6 +45,7 @@ if TYPE_CHECKING:
from ..conftest import PexpectSpawner from ..conftest import PexpectSpawner
@no_macos
def test_shield_pause( def test_shield_pause(
spawn: PexpectSpawner, spawn: PexpectSpawner,
): ):
@ -57,6 +61,7 @@ def test_shield_pause(
expect( expect(
child, child,
'Yo my child hanging..?', 'Yo my child hanging..?',
timeout=3,
) )
assert_before( assert_before(
child, child,

View File

@ -1,7 +1,12 @@
""" '''
Bidirectional streaming. Audit the simplest inter-actor bidirectional (streaming)
msg patterns.
""" '''
from __future__ import annotations
from typing import (
Callable,
)
import pytest import pytest
import trio import trio
import tractor import tractor
@ -9,10 +14,8 @@ import tractor
@tractor.context @tractor.context
async def simple_rpc( async def simple_rpc(
ctx: tractor.Context, ctx: tractor.Context,
data: int, data: int,
) -> None: ) -> None:
''' '''
Test a small ping-pong server. Test a small ping-pong server.
@ -39,15 +42,13 @@ async def simple_rpc(
@tractor.context @tractor.context
async def simple_rpc_with_forloop( async def simple_rpc_with_forloop(
ctx: tractor.Context, ctx: tractor.Context,
data: int, data: int,
) -> None: ) -> None:
"""Same as previous test but using ``async for`` syntax/api. '''
Same as previous test but using `async for` syntax/api.
"""
'''
# signal to parent that we're up # signal to parent that we're up
await ctx.started(data + 1) await ctx.started(data + 1)
@ -68,62 +69,78 @@ async def simple_rpc_with_forloop(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'use_async_for', 'use_async_for',
[True, False], [
True,
False,
],
ids='use_async_for={}'.format,
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'server_func', 'server_func',
[simple_rpc, simple_rpc_with_forloop], [
simple_rpc,
simple_rpc_with_forloop,
],
ids='server_func={}'.format,
) )
def test_simple_rpc(server_func, use_async_for): def test_simple_rpc(
server_func: Callable,
use_async_for: bool,
loglevel: str,
debug_mode: bool,
):
''' '''
The simplest request response pattern. The simplest request response pattern.
''' '''
async def main(): async def main():
async with tractor.open_nursery() as n: with trio.fail_after(6):
async with tractor.open_nursery(
loglevel=loglevel,
debug_mode=debug_mode,
) as an:
portal: tractor.Portal = await an.start_actor(
'rpc_server',
enable_modules=[__name__],
)
portal = await n.start_actor( async with portal.open_context(
'rpc_server', server_func, # taken from pytest parameterization
enable_modules=[__name__], data=10,
) ) as (ctx, sent):
async with portal.open_context( assert sent == 11
server_func, # taken from pytest parameterization
data=10,
) as (ctx, sent):
assert sent == 11 async with ctx.open_stream() as stream:
async with ctx.open_stream() as stream: if use_async_for:
if use_async_for: count = 0
# receive msgs using async for style
count = 0
# receive msgs using async for style
print('ping')
await stream.send('ping')
async for msg in stream:
assert msg == 'pong'
print('ping') print('ping')
await stream.send('ping') await stream.send('ping')
count += 1
if count >= 9: async for msg in stream:
break assert msg == 'pong'
print('ping')
await stream.send('ping')
count += 1
else: if count >= 9:
# classic send/receive style break
for _ in range(10):
print('ping') else:
await stream.send('ping') # classic send/receive style
assert await stream.receive() == 'pong' for _ in range(10):
# stream should terminate here print('ping')
await stream.send('ping')
assert await stream.receive() == 'pong'
# final context result(s) should be consumed here in __aexit__() # stream should terminate here
await portal.cancel_actor() # final context result(s) should be consumed here in __aexit__()
await portal.cancel_actor()
trio.run(main) trio.run(main)

View File

@ -17,8 +17,8 @@ from tractor._testing import (
from .conftest import no_windows from .conftest import no_windows
def is_win(): _non_linux: bool = platform.system() != 'Linux'
return platform.system() == 'Windows' _friggin_windows: bool = platform.system() == 'Windows'
async def assert_err(delay=0): async def assert_err(delay=0):
@ -431,7 +431,7 @@ async def test_nested_multierrors(loglevel, start_method):
for subexc in err.exceptions: for subexc in err.exceptions:
# verify first level actor errors are wrapped as remote # verify first level actor errors are wrapped as remote
if is_win(): if _friggin_windows:
# windows is often too slow and cancellation seems # windows is often too slow and cancellation seems
# to happen before an actor is spawned # to happen before an actor is spawned
@ -464,7 +464,7 @@ async def test_nested_multierrors(loglevel, start_method):
# XXX not sure what's up with this.. # XXX not sure what's up with this..
# on windows sometimes spawning is just too slow and # on windows sometimes spawning is just too slow and
# we get back the (sent) cancel signal instead # we get back the (sent) cancel signal instead
if is_win(): if _friggin_windows:
if isinstance(subexc, tractor.RemoteActorError): if isinstance(subexc, tractor.RemoteActorError):
assert subexc.boxed_type in ( assert subexc.boxed_type in (
BaseExceptionGroup, BaseExceptionGroup,
@ -507,17 +507,22 @@ def test_cancel_via_SIGINT(
@no_windows @no_windows
def test_cancel_via_SIGINT_other_task( def test_cancel_via_SIGINT_other_task(
loglevel, loglevel: str,
start_method, start_method: str,
spawn_backend, spawn_backend: str,
): ):
"""Ensure that a control-C (SIGINT) signal cancels both the parent '''
and child processes in trionic fashion even a subprocess is started Ensure that a control-C (SIGINT) signal cancels both the parent
from a seperate ``trio`` child task. and child processes in trionic fashion even a subprocess is
""" started from a seperate ``trio`` child task.
pid = os.getpid()
timeout: float = 2 '''
if is_win(): # smh pid: int = os.getpid()
timeout: float = (
4 if _non_linux
else 2
)
if _friggin_windows: # smh
timeout += 1 timeout += 1
async def spawn_and_sleep_forever( async def spawn_and_sleep_forever(
@ -696,7 +701,7 @@ def test_fast_graceful_cancel_when_spawn_task_in_soft_proc_wait_for_daemon(
kbi_delay = 0.5 kbi_delay = 0.5
timeout: float = 2.9 timeout: float = 2.9
if is_win(): # smh if _friggin_windows: # smh
timeout += 1 timeout += 1
async def main(): async def main():

View File

@ -9,6 +9,7 @@ from itertools import count
import math import math
import platform import platform
from pprint import pformat from pprint import pformat
import sys
from typing import ( from typing import (
Callable, Callable,
) )
@ -941,6 +942,11 @@ def test_one_end_stream_not_opened(
from tractor._runtime import Actor from tractor._runtime import Actor
buf_size = buf_size_increase + Actor.msg_buffer_size buf_size = buf_size_increase + Actor.msg_buffer_size
timeout: float = (
1 if sys.platform == 'linux'
else 3
)
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
@ -950,7 +956,7 @@ def test_one_end_stream_not_opened(
enable_modules=[__name__], enable_modules=[__name__],
) )
with trio.fail_after(1): with trio.fail_after(timeout):
async with portal.open_context( async with portal.open_context(
entrypoint, entrypoint,
) as (ctx, sent): ) as (ctx, sent):

View File

@ -1,11 +1,13 @@
""" """
Actor "discovery" testing Discovery subsys.
""" """
import os import os
import signal import signal
import platform import platform
from functools import partial from functools import partial
import itertools import itertools
from typing import Callable
import psutil import psutil
import pytest import pytest
@ -17,7 +19,9 @@ import trio
@tractor_test @tractor_test
async def test_reg_then_unreg(reg_addr): async def test_reg_then_unreg(
reg_addr: tuple,
):
actor = tractor.current_actor() actor = tractor.current_actor()
assert actor.is_arbiter assert actor.is_arbiter
assert len(actor._registry) == 1 # only self is registered assert len(actor._registry) == 1 # only self is registered
@ -82,11 +86,15 @@ async def say_hello_use_wait(
@tractor_test @tractor_test
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) @pytest.mark.parametrize(
'func',
[say_hello,
say_hello_use_wait]
)
async def test_trynamic_trio( async def test_trynamic_trio(
func, func: Callable,
start_method, start_method: str,
reg_addr, reg_addr: tuple,
): ):
''' '''
Root actor acting as the "director" and running one-shot-task-actors Root actor acting as the "director" and running one-shot-task-actors
@ -119,7 +127,10 @@ async def stream_forever():
await trio.sleep(0.01) await trio.sleep(0.01)
async def cancel(use_signal, delay=0): async def cancel(
use_signal: bool,
delay: float = 0,
):
# hold on there sally # hold on there sally
await trio.sleep(delay) await trio.sleep(delay)
@ -132,13 +143,15 @@ async def cancel(use_signal, delay=0):
raise KeyboardInterrupt raise KeyboardInterrupt
async def stream_from(portal): async def stream_from(portal: tractor.Portal):
async with portal.open_stream_from(stream_forever) as stream: async with portal.open_stream_from(stream_forever) as stream:
async for value in stream: async for value in stream:
print(value) print(value)
async def unpack_reg(actor_or_portal): async def unpack_reg(
actor_or_portal: tractor.Portal|tractor.Actor,
):
''' '''
Get and unpack a "registry" RPC request from the "arbiter" registry Get and unpack a "registry" RPC request from the "arbiter" registry
system. system.
@ -173,7 +186,9 @@ async def spawn_and_check_registry(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
debug_mode=debug_mode, debug_mode=debug_mode,
): ):
async with tractor.get_registry(reg_addr) as portal: async with tractor.get_registry(
addr=reg_addr,
) as portal:
# runtime needs to be up to call this # runtime needs to be up to call this
actor = tractor.current_actor() actor = tractor.current_actor()
@ -246,10 +261,10 @@ async def spawn_and_check_registry(
@pytest.mark.parametrize('with_streaming', [False, True]) @pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel( def test_subactors_unregister_on_cancel(
debug_mode: bool, debug_mode: bool,
start_method, start_method: str,
use_signal, use_signal: bool,
reg_addr, reg_addr: tuple,
with_streaming, with_streaming: bool,
): ):
''' '''
Verify that cancelling a nursery results in all subactors Verify that cancelling a nursery results in all subactors
@ -274,15 +289,17 @@ def test_subactors_unregister_on_cancel(
def test_subactors_unregister_on_cancel_remote_daemon( def test_subactors_unregister_on_cancel_remote_daemon(
daemon: subprocess.Popen, daemon: subprocess.Popen,
debug_mode: bool, debug_mode: bool,
start_method, start_method: str,
use_signal, use_signal: bool,
reg_addr, reg_addr: tuple,
with_streaming, with_streaming: bool,
): ):
"""Verify that cancelling a nursery results in all subactors '''
deregistering themselves with a **remote** (not in the local process Verify that cancelling a nursery results in all subactors
tree) arbiter. deregistering themselves with a **remote** (not in the local
""" process tree) arbiter.
'''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run( trio.run(
partial( partial(
@ -374,14 +391,16 @@ async def close_chans_before_nursery(
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit( def test_close_channel_explicit(
start_method, start_method: str,
use_signal, use_signal: bool,
reg_addr, reg_addr: tuple,
): ):
"""Verify that closing a stream explicitly and killing the actor's '''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also "root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run( trio.run(
partial( partial(
@ -396,14 +415,16 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter( def test_close_channel_explicit_remote_arbiter(
daemon: subprocess.Popen, daemon: subprocess.Popen,
start_method, start_method: str,
use_signal, use_signal: bool,
reg_addr, reg_addr: tuple,
): ):
"""Verify that closing a stream explicitly and killing the actor's '''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also "root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter. results in subactor(s) deregistering from the arbiter.
"""
'''
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):
trio.run( trio.run(
partial( partial(

View File

@ -9,12 +9,17 @@ import sys
import subprocess import subprocess
import platform import platform
import shutil import shutil
from typing import Callable
import pytest import pytest
import tractor
from tractor._testing import ( from tractor._testing import (
examples_dir, examples_dir,
) )
_non_linux: bool = platform.system() != 'Linux'
_friggin_macos: bool = platform.system() == 'Darwin'
@pytest.fixture @pytest.fixture
def run_example_in_subproc( def run_example_in_subproc(
@ -101,8 +106,10 @@ def run_example_in_subproc(
ids=lambda t: t[1], ids=lambda t: t[1],
) )
def test_example( def test_example(
run_example_in_subproc, run_example_in_subproc: Callable,
example_script, example_script: str,
test_log: tractor.log.StackLevelAdapter,
ci_env: bool,
): ):
''' '''
Load and run scripts from this repo's ``examples/`` dir as a user Load and run scripts from this repo's ``examples/`` dir as a user
@ -116,9 +123,32 @@ def test_example(
''' '''
ex_file: str = os.path.join(*example_script) ex_file: str = os.path.join(*example_script)
if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9): if (
'rpc_bidir_streaming' in ex_file
and
sys.version_info < (3, 9)
):
pytest.skip("2-way streaming example requires py3.9 async with syntax") pytest.skip("2-way streaming example requires py3.9 async with syntax")
if (
'full_fledged_streaming_service' in ex_file
and
_friggin_macos
and
ci_env
):
pytest.skip(
'Streaming example is too flaky in CI\n'
'AND their competitor runs this CI service..\n'
'This test does run just fine "in person" however..'
)
timeout: float = (
60
if ci_env and _non_linux
else 16
)
with open(ex_file, 'r') as ex: with open(ex_file, 'r') as ex:
code = ex.read() code = ex.read()
@ -126,9 +156,12 @@ def test_example(
err = None err = None
try: try:
if not proc.poll(): if not proc.poll():
_, err = proc.communicate(timeout=15) _, err = proc.communicate(timeout=timeout)
except subprocess.TimeoutExpired as e: except subprocess.TimeoutExpired as e:
test_log.exception(
f'Example failed to finish within {timeout}s ??\n'
)
proc.kill() proc.kill()
err = e.stderr err = e.stderr

View File

@ -1,9 +1,11 @@
""" """
Streaming via async gen api Streaming via the, now legacy, "async-gen API".
""" """
import time import time
from functools import partial from functools import partial
import platform import platform
from typing import Callable
import trio import trio
import tractor import tractor
@ -19,7 +21,11 @@ def test_must_define_ctx():
async def no_ctx(): async def no_ctx():
pass pass
assert "no_ctx must be `ctx: tractor.Context" in str(err.value) assert (
"no_ctx must be `ctx: tractor.Context"
in
str(err.value)
)
@tractor.stream @tractor.stream
async def has_ctx(ctx): async def has_ctx(ctx):
@ -69,14 +75,14 @@ async def stream_from_single_subactor(
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
start_method=start_method, start_method=start_method,
) as nursery: ) as an:
async with tractor.find_actor('streamerd') as portals: async with tractor.find_actor('streamerd') as portals:
if not portals: if not portals:
# no brokerd actor found # no brokerd actor found
portal = await nursery.start_actor( portal = await an.start_actor(
'streamerd', 'streamerd',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -116,11 +122,22 @@ async def stream_from_single_subactor(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'stream_func', [async_gen_stream, context_stream] 'stream_func',
[
async_gen_stream,
context_stream,
],
ids='stream_func={}'.format
) )
def test_stream_from_single_subactor(reg_addr, start_method, stream_func): def test_stream_from_single_subactor(
"""Verify streaming from a spawned async generator. reg_addr: tuple,
""" start_method: str,
stream_func: Callable,
):
'''
Verify streaming from a spawned async generator.
'''
trio.run( trio.run(
partial( partial(
stream_from_single_subactor, stream_from_single_subactor,
@ -132,10 +149,9 @@ def test_stream_from_single_subactor(reg_addr, start_method, stream_func):
# this is the first 2 actors, streamer_1 and streamer_2 # this is the first 2 actors, streamer_1 and streamer_2
async def stream_data(seed): async def stream_data(seed: int):
for i in range(seed): for i in range(seed):
yield i yield i
# trigger scheduler to simulate practical usage # trigger scheduler to simulate practical usage
@ -143,15 +159,17 @@ async def stream_data(seed):
# this is the third actor; the aggregator # this is the third actor; the aggregator
async def aggregate(seed): async def aggregate(seed: int):
"""Ensure that the two streams we receive match but only stream '''
Ensure that the two streams we receive match but only stream
a single set of values to the parent. a single set of values to the parent.
"""
async with tractor.open_nursery() as nursery: '''
async with tractor.open_nursery() as an:
portals = [] portals = []
for i in range(1, 3): for i in range(1, 3):
# fork point # fork point
portal = await nursery.start_actor( portal = await an.start_actor(
name=f'streamer_{i}', name=f'streamer_{i}',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -164,7 +182,8 @@ async def aggregate(seed):
async with send_chan: async with send_chan:
async with portal.open_stream_from( async with portal.open_stream_from(
stream_data, seed=seed, stream_data,
seed=seed,
) as stream: ) as stream:
async for value in stream: async for value in stream:
@ -174,10 +193,14 @@ async def aggregate(seed):
print(f"FINISHED ITERATING {portal.channel.uid}") print(f"FINISHED ITERATING {portal.channel.uid}")
# spawn 2 trio tasks to collect streams and push to a local queue # spawn 2 trio tasks to collect streams and push to a local queue
async with trio.open_nursery() as n: async with trio.open_nursery() as tn:
for portal in portals: for portal in portals:
n.start_soon(push_to_chan, portal, send_chan.clone()) tn.start_soon(
push_to_chan,
portal,
send_chan.clone(),
)
# close this local task's reference to send side # close this local task's reference to send side
await send_chan.aclose() await send_chan.aclose()
@ -194,20 +217,21 @@ async def aggregate(seed):
print("FINISHED ITERATING in aggregator") print("FINISHED ITERATING in aggregator")
await nursery.cancel() await an.cancel()
print("WAITING on `ActorNursery` to finish") print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!") print("AGGREGATOR COMPLETE!")
# this is the main actor and *arbiter* async def a_quadruple_example() -> list[int]:
async def a_quadruple_example(): '''
# a nursery which spawns "actors" Open the root-actor which is also a "registrar".
async with tractor.open_nursery() as nursery:
'''
async with tractor.open_nursery() as an:
seed = int(1e3) seed = int(1e3)
pre_start = time.time() pre_start = time.time()
portal = await nursery.start_actor( portal = await an.start_actor(
name='aggregator', name='aggregator',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -228,8 +252,14 @@ async def a_quadruple_example():
return result_stream return result_stream
async def cancel_after(wait, reg_addr): async def cancel_after(
async with tractor.open_root_actor(registry_addrs=[reg_addr]): wait: float,
reg_addr: tuple,
) -> list[int]:
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
with trio.move_on_after(wait): with trio.move_on_after(wait):
return await a_quadruple_example() return await a_quadruple_example()
@ -240,6 +270,10 @@ def time_quad_ex(
ci_env: bool, ci_env: bool,
spawn_backend: str, spawn_backend: str,
): ):
non_linux: bool = (_sys := platform.system()) != 'Linux'
if ci_env and non_linux:
pytest.skip(f'Test is too flaky on {_sys!r} in CI')
if spawn_backend == 'mp': if spawn_backend == 'mp':
''' '''
no idea but the mp *nix runs are flaking out here often... no idea but the mp *nix runs are flaking out here often...
@ -247,16 +281,20 @@ def time_quad_ex(
''' '''
pytest.skip("Test is too flaky on mp in CI") pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 timeout = 7 if non_linux else 4
start = time.time() start = time.time()
results = trio.run(cancel_after, timeout, reg_addr) results: list[int] = trio.run(
diff = time.time() - start cancel_after,
timeout,
reg_addr,
)
diff: float = time.time() - start
assert results assert results
return results, diff return results, diff
def test_a_quadruple_example( def test_a_quadruple_example(
time_quad_ex: tuple, time_quad_ex: tuple[list[int], float],
ci_env: bool, ci_env: bool,
spawn_backend: str, spawn_backend: str,
): ):
@ -264,13 +302,12 @@ def test_a_quadruple_example(
This also serves as a kind of "we'd like to be this fast test". This also serves as a kind of "we'd like to be this fast test".
''' '''
non_linux: bool = (_sys := platform.system()) != 'Linux'
results, diff = time_quad_ex results, diff = time_quad_ex
assert results assert results
this_fast = ( this_fast = (
6 if platform.system() in ( 6 if non_linux
'Windows',
'Darwin',
)
else 3 else 3
) )
assert diff < this_fast assert diff < this_fast
@ -281,19 +318,33 @@ def test_a_quadruple_example(
list(map(lambda i: i/10, range(3, 9))) list(map(lambda i: i/10, range(3, 9)))
) )
def test_not_fast_enough_quad( def test_not_fast_enough_quad(
reg_addr, time_quad_ex, cancel_delay, ci_env, spawn_backend reg_addr: tuple,
time_quad_ex: tuple[list[int], float],
cancel_delay: float,
ci_env: bool,
spawn_backend: str,
): ):
"""Verify we can cancel midway through the quad example and all actors '''
cancel gracefully. Verify we can cancel midway through the quad example and all
""" actors cancel gracefully.
'''
results, diff = time_quad_ex results, diff = time_quad_ex
delay = max(diff - cancel_delay, 0) delay = max(diff - cancel_delay, 0)
results = trio.run(cancel_after, delay, reg_addr) results = trio.run(
system = platform.system() cancel_after,
if system in ('Windows', 'Darwin') and results is not None: delay,
reg_addr,
)
system: str = platform.system()
if (
system in ('Windows', 'Darwin')
and
results is not None
):
# In CI envoirments it seems later runs are quicker then the first # In CI envoirments it seems later runs are quicker then the first
# so just ignore these # so just ignore these
print(f"Woa there {system} caught your breath eh?") print(f'Woa there {system} caught your breath eh?')
else: else:
# should be cancelled mid-streaming # should be cancelled mid-streaming
assert results is None assert results is None
@ -301,23 +352,24 @@ def test_not_fast_enough_quad(
@tractor_test @tractor_test
async def test_respawn_consumer_task( async def test_respawn_consumer_task(
reg_addr, reg_addr: tuple,
spawn_backend, spawn_backend: str,
loglevel, loglevel: str,
): ):
"""Verify that ``._portal.ReceiveStream.shield()`` '''
Verify that ``._portal.ReceiveStream.shield()``
sucessfully protects the underlying IPC channel from being closed sucessfully protects the underlying IPC channel from being closed
when cancelling and respawning a consumer task. when cancelling and respawning a consumer task.
This also serves to verify that all values from the stream can be This also serves to verify that all values from the stream can be
received despite the respawns. received despite the respawns.
""" '''
stream = None stream = None
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
portal = await n.start_actor( portal = await an.start_actor(
name='streamer', name='streamer',
enable_modules=[__name__] enable_modules=[__name__]
) )

View File

@ -35,6 +35,9 @@ if TYPE_CHECKING:
) )
_non_linux: bool = platform.system() != 'Linux'
def test_abort_on_sigint( def test_abort_on_sigint(
daemon: subprocess.Popen, daemon: subprocess.Popen,
): ):
@ -137,6 +140,7 @@ def test_non_registrar_spawns_child(
reg_addr: UnwrappedAddress, reg_addr: UnwrappedAddress,
loglevel: str, loglevel: str,
debug_mode: bool, debug_mode: bool,
ci_env: bool,
): ):
''' '''
Ensure a non-regristar (serving) root actor can spawn a sub and Ensure a non-regristar (serving) root actor can spawn a sub and
@ -148,6 +152,12 @@ def test_non_registrar_spawns_child(
''' '''
async def main(): async def main():
# XXX, since apparently on macos in GH's CI it can be a race
# with the `daemon` registrar on grabbing the socket-addr..
if ci_env and _non_linux:
await trio.sleep(.5)
async with tractor.open_nursery( async with tractor.open_nursery(
registry_addrs=[reg_addr], registry_addrs=[reg_addr],
loglevel=loglevel, loglevel=loglevel,

View File

@ -2,6 +2,7 @@
Shared mem primitives and APIs. Shared mem primitives and APIs.
""" """
import platform
import uuid import uuid
# import numpy # import numpy
@ -53,7 +54,18 @@ def test_child_attaches_alot():
shm_key=shml.key, shm_key=shml.key,
) as (ctx, start_val), ) as (ctx, start_val),
): ):
assert start_val == key assert (_key := shml.key) == start_val
if platform.system() != 'Darwin':
# XXX, macOS has a char limit..
# see `ipc._shm._shorten_key_for_macos`
assert (
start_val
==
key
==
_key
)
await ctx.result() await ctx.result()
await portal.cancel_actor() await portal.cancel_actor()