Compare commits
10 Commits
1be3f4115d
...
a89799b682
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | a89799b682 | |
Tyler Goodlet | 45a9aaf6e9 | |
Tyler Goodlet | b624ebba21 | |
Tyler Goodlet | e12def51a8 | |
Tyler Goodlet | 64819b2acb | |
Tyler Goodlet | a5e3cf4ecf | |
Tyler Goodlet | 7c42d2510c | |
Tyler Goodlet | 9c336ec064 | |
Tyler Goodlet | 6453195e97 | |
Tyler Goodlet | 54322f2bae |
|
@ -67,7 +67,6 @@ jobs:
|
||||||
]
|
]
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
|
|
||||||
- name: Checkout
|
- name: Checkout
|
||||||
uses: actions/checkout@v2
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
@ -85,6 +84,40 @@ jobs:
|
||||||
- name: Run tests
|
- name: Run tests
|
||||||
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
|
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
|
||||||
|
|
||||||
|
|
||||||
|
testing-macos:
|
||||||
|
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
|
||||||
|
timeout-minutes: 10
|
||||||
|
runs-on: ${{ matrix.os }}
|
||||||
|
|
||||||
|
strategy:
|
||||||
|
fail-fast: false
|
||||||
|
matrix:
|
||||||
|
os: [macos-latest]
|
||||||
|
python: ['3.10']
|
||||||
|
spawn_backend: [
|
||||||
|
'trio',
|
||||||
|
]
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
|
||||||
|
- name: Setup python
|
||||||
|
uses: actions/setup-python@v2
|
||||||
|
with:
|
||||||
|
python-version: '${{ matrix.python }}'
|
||||||
|
|
||||||
|
- name: Install dependencies
|
||||||
|
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
|
||||||
|
|
||||||
|
- name: List dependencies
|
||||||
|
run: pip list
|
||||||
|
|
||||||
|
- name: Run tests
|
||||||
|
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
|
||||||
|
|
||||||
|
|
||||||
# We skip 3.10 on windows for now due to not having any collabs to
|
# We skip 3.10 on windows for now due to not having any collabs to
|
||||||
# debug the CI failures. Anyone wanting to hack and solve them is very
|
# debug the CI failures. Anyone wanting to hack and solve them is very
|
||||||
# welcome, but our primary user base is not using that OS.
|
# welcome, but our primary user base is not using that OS.
|
||||||
|
|
|
@ -12,9 +12,11 @@ async def stream_data(seed):
|
||||||
|
|
||||||
# this is the third actor; the aggregator
|
# this is the third actor; the aggregator
|
||||||
async def aggregate(seed):
|
async def aggregate(seed):
|
||||||
"""Ensure that the two streams we receive match but only stream
|
'''
|
||||||
|
Ensure that the two streams we receive match but only stream
|
||||||
a single set of values to the parent.
|
a single set of values to the parent.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
async with tractor.open_nursery() as nursery:
|
async with tractor.open_nursery() as nursery:
|
||||||
portals = []
|
portals = []
|
||||||
for i in range(1, 3):
|
for i in range(1, 3):
|
||||||
|
@ -69,7 +71,8 @@ async def aggregate(seed):
|
||||||
async def main():
|
async def main():
|
||||||
# a nursery which spawns "actors"
|
# a nursery which spawns "actors"
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
arbiter_addr=('127.0.0.1', 1616)
|
arbiter_addr=('127.0.0.1', 1616),
|
||||||
|
loglevel='cancel',
|
||||||
) as nursery:
|
) as nursery:
|
||||||
|
|
||||||
seed = int(1e3)
|
seed = int(1e3)
|
||||||
|
@ -92,6 +95,9 @@ async def main():
|
||||||
async for value in stream:
|
async for value in stream:
|
||||||
result_stream.append(value)
|
result_stream.append(value)
|
||||||
|
|
||||||
|
print("ROOT STREAM CONSUMER COMPLETE")
|
||||||
|
|
||||||
|
print("ROOT CANCELLING AGGREGATOR CHILD")
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
print(f"STREAM TIME = {time.time() - start}")
|
print(f"STREAM TIME = {time.time() - start}")
|
||||||
|
|
|
@ -6,3 +6,4 @@ mypy
|
||||||
trio_typing
|
trio_typing
|
||||||
pexpect
|
pexpect
|
||||||
towncrier
|
towncrier
|
||||||
|
numpy
|
||||||
|
|
|
@ -36,9 +36,12 @@ from conftest import repodir, _ci_env
|
||||||
# - recurrent root errors
|
# - recurrent root errors
|
||||||
|
|
||||||
|
|
||||||
if platform.system() == 'Windows':
|
if osname := platform.system() in (
|
||||||
|
'Windows',
|
||||||
|
'Darwin',
|
||||||
|
):
|
||||||
pytest.skip(
|
pytest.skip(
|
||||||
'Debugger tests have no windows support (yet)',
|
'Debugger tests have no {osname} support (yet)',
|
||||||
allow_module_level=True,
|
allow_module_level=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -783,8 +786,6 @@ def test_multi_nested_subactors_error_through_nurseries(
|
||||||
|
|
||||||
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
||||||
|
|
||||||
timed_out_early: bool = False
|
|
||||||
|
|
||||||
for send_char in itertools.cycle(['c', 'q']):
|
for send_char in itertools.cycle(['c', 'q']):
|
||||||
try:
|
try:
|
||||||
child.expect(r"\(Pdb\+\+\)")
|
child.expect(r"\(Pdb\+\+\)")
|
||||||
|
|
|
@ -93,14 +93,16 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
|
||||||
ids=lambda t: t[1],
|
ids=lambda t: t[1],
|
||||||
)
|
)
|
||||||
def test_example(run_example_in_subproc, example_script):
|
def test_example(run_example_in_subproc, example_script):
|
||||||
"""Load and run scripts from this repo's ``examples/`` dir as a user
|
'''
|
||||||
|
Load and run scripts from this repo's ``examples/`` dir as a user
|
||||||
would copy and pasing them into their editor.
|
would copy and pasing them into their editor.
|
||||||
|
|
||||||
On windows a little more "finessing" is done to make
|
On windows a little more "finessing" is done to make
|
||||||
``multiprocessing`` play nice: we copy the ``__main__.py`` into the
|
``multiprocessing`` play nice: we copy the ``__main__.py`` into the
|
||||||
test directory and invoke the script as a module with ``python -m
|
test directory and invoke the script as a module with ``python -m
|
||||||
test_example``.
|
test_example``.
|
||||||
"""
|
|
||||||
|
'''
|
||||||
ex_file = os.path.join(*example_script)
|
ex_file = os.path.join(*example_script)
|
||||||
|
|
||||||
if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9):
|
if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9):
|
||||||
|
@ -110,25 +112,32 @@ def test_example(run_example_in_subproc, example_script):
|
||||||
code = ex.read()
|
code = ex.read()
|
||||||
|
|
||||||
with run_example_in_subproc(code) as proc:
|
with run_example_in_subproc(code) as proc:
|
||||||
proc.wait()
|
try:
|
||||||
err, _ = proc.stderr.read(), proc.stdout.read()
|
proc.wait(timeout=5)
|
||||||
# print(f'STDERR: {err}')
|
finally:
|
||||||
# print(f'STDOUT: {out}')
|
err = proc.stderr.read()
|
||||||
|
|
||||||
# if we get some gnarly output let's aggregate and raise
|
|
||||||
if err:
|
|
||||||
errmsg = err.decode()
|
errmsg = err.decode()
|
||||||
errlines = errmsg.splitlines()
|
out = proc.stdout.read()
|
||||||
last_error = errlines[-1]
|
outmsg = out.decode()
|
||||||
if (
|
|
||||||
'Error' in last_error
|
|
||||||
|
|
||||||
# XXX: currently we print this to console, but maybe
|
if out:
|
||||||
# shouldn't eventually once we figure out what's
|
print(f'STDOUT: {out.decode()}')
|
||||||
# a better way to be explicit about aio side
|
|
||||||
# cancels?
|
# if we get some gnarly output let's aggregate and raise
|
||||||
and 'asyncio.exceptions.CancelledError' not in last_error
|
if err:
|
||||||
):
|
print(f'STDERR:\n{errmsg}')
|
||||||
raise Exception(errmsg)
|
errmsg = err.decode()
|
||||||
|
errlines = errmsg.splitlines()
|
||||||
|
last_error = errlines[-1]
|
||||||
|
if (
|
||||||
|
'Error' in last_error
|
||||||
|
|
||||||
|
# XXX: currently we print this to console, but maybe
|
||||||
|
# shouldn't eventually once we figure out what's
|
||||||
|
# a better way to be explicit about aio side
|
||||||
|
# cancels?
|
||||||
|
and 'asyncio.exceptions.CancelledError' not in last_error
|
||||||
|
):
|
||||||
|
raise Exception(errmsg)
|
||||||
|
|
||||||
assert proc.returncode == 0
|
assert proc.returncode == 0
|
||||||
|
|
|
@ -70,7 +70,10 @@ async def child_read_shm_list(
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# attach in child
|
# attach in child
|
||||||
shml = attach_shm_list(key=shm_key)
|
shml = attach_shm_list(
|
||||||
|
key=shm_key,
|
||||||
|
# dtype=str if use_str else float,
|
||||||
|
)
|
||||||
await ctx.started(shml.key)
|
await ctx.started(shml.key)
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
@ -92,7 +95,9 @@ async def child_read_shm_list(
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'use_str', [False, True],
|
'use_str',
|
||||||
|
[False, True],
|
||||||
|
ids=lambda i: f'use_str_values={i}',
|
||||||
)
|
)
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'frame_size',
|
'frame_size',
|
||||||
|
@ -106,7 +111,7 @@ def test_parent_writer_child_reader(
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=True,
|
# debug_mode=True,
|
||||||
) as an:
|
) as an:
|
||||||
|
|
||||||
portal = await an.start_actor(
|
portal = await an.start_actor(
|
||||||
|
@ -121,6 +126,7 @@ def test_parent_writer_child_reader(
|
||||||
shml = open_shm_list(
|
shml = open_shm_list(
|
||||||
key=key,
|
key=key,
|
||||||
size=seq_size,
|
size=seq_size,
|
||||||
|
dtype=str if use_str else float,
|
||||||
readonly=False,
|
readonly=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -143,7 +149,7 @@ def test_parent_writer_child_reader(
|
||||||
if use_str:
|
if use_str:
|
||||||
val = str(val)
|
val = str(val)
|
||||||
|
|
||||||
print(f'(parent): writing {val}')
|
# print(f'(parent): writing {val}')
|
||||||
shml[i] = val
|
shml[i] = val
|
||||||
|
|
||||||
# only on frame fills do we
|
# only on frame fills do we
|
||||||
|
|
|
@ -829,7 +829,12 @@ class Actor:
|
||||||
|
|
||||||
if ctx._backpressure:
|
if ctx._backpressure:
|
||||||
log.warning(text)
|
log.warning(text)
|
||||||
await send_chan.send(msg)
|
try:
|
||||||
|
await send_chan.send(msg)
|
||||||
|
except trio.BrokenResourceError:
|
||||||
|
# XXX: local consumer has closed their side
|
||||||
|
# so cancel the far end streaming task
|
||||||
|
log.warning(f"{chan} is already closed")
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
raise StreamOverrun(text) from None
|
raise StreamOverrun(text) from None
|
||||||
|
@ -1374,8 +1379,9 @@ async def async_main(
|
||||||
actor.lifetime_stack.close()
|
actor.lifetime_stack.close()
|
||||||
|
|
||||||
# Unregister actor from the arbiter
|
# Unregister actor from the arbiter
|
||||||
if registered_with_arbiter and (
|
if (
|
||||||
actor._arb_addr is not None
|
registered_with_arbiter
|
||||||
|
and not actor.is_arbiter
|
||||||
):
|
):
|
||||||
failed = False
|
failed = False
|
||||||
with trio.move_on_after(0.5) as cs:
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
|
|
@ -460,7 +460,6 @@ class ShmArray:
|
||||||
|
|
||||||
|
|
||||||
def open_shm_ndarray(
|
def open_shm_ndarray(
|
||||||
|
|
||||||
key: Optional[str] = None,
|
key: Optional[str] = None,
|
||||||
size: int = int(2 ** 10),
|
size: int = int(2 ** 10),
|
||||||
dtype: np.dtype | None = None,
|
dtype: np.dtype | None = None,
|
||||||
|
@ -719,7 +718,7 @@ class ShmList(ShareableList):
|
||||||
Carbon copy of ``.shared_memory.ShareableList`` with a few
|
Carbon copy of ``.shared_memory.ShareableList`` with a few
|
||||||
enhancements:
|
enhancements:
|
||||||
|
|
||||||
- readonly mode via instance var flag
|
- readonly mode via instance var flag `._readonly: bool`
|
||||||
- ``.__getitem__()`` accepts ``slice`` inputs
|
- ``.__getitem__()`` accepts ``slice`` inputs
|
||||||
- exposes the underlying buffer "name" as a ``.key: str``
|
- exposes the underlying buffer "name" as a ``.key: str``
|
||||||
|
|
||||||
|
@ -743,6 +742,10 @@ class ShmList(ShareableList):
|
||||||
def key(self) -> str:
|
def key(self) -> str:
|
||||||
return self._key
|
return self._key
|
||||||
|
|
||||||
|
@property
|
||||||
|
def readonly(self) -> bool:
|
||||||
|
return self._readonly
|
||||||
|
|
||||||
def __setitem__(
|
def __setitem__(
|
||||||
self,
|
self,
|
||||||
position,
|
position,
|
||||||
|
@ -781,13 +784,20 @@ def open_shm_list(
|
||||||
key: str,
|
key: str,
|
||||||
sequence: list | None = None,
|
sequence: list | None = None,
|
||||||
size: int = int(2 ** 10),
|
size: int = int(2 ** 10),
|
||||||
dtype: np.dtype | None = None,
|
dtype: float | int | bool | str | bytes | None = float,
|
||||||
readonly: bool = True,
|
readonly: bool = True,
|
||||||
|
|
||||||
) -> ShmList:
|
) -> ShmList:
|
||||||
|
|
||||||
if sequence is None:
|
if sequence is None:
|
||||||
sequence = list(map(float, range(size)))
|
default = {
|
||||||
|
float: 0.,
|
||||||
|
int: 0,
|
||||||
|
bool: True,
|
||||||
|
str: 'doggy',
|
||||||
|
None: None,
|
||||||
|
}[dtype]
|
||||||
|
sequence = [default] * size
|
||||||
|
|
||||||
shml = ShmList(
|
shml = ShmList(
|
||||||
sequence=sequence,
|
sequence=sequence,
|
||||||
|
|
|
@ -133,13 +133,13 @@ async def gather_contexts(
|
||||||
# deliver control once all managers have started up
|
# deliver control once all managers have started up
|
||||||
await all_entered.wait()
|
await all_entered.wait()
|
||||||
|
|
||||||
# NOTE: order *should* be preserved in the output values
|
try:
|
||||||
# since ``dict``s are now implicitly ordered.
|
yield tuple(unwrapped.values())
|
||||||
yield tuple(unwrapped.values())
|
finally:
|
||||||
|
# NOTE: this is ABSOLUTELY REQUIRED to avoid
|
||||||
# we don't need a try/finally since cancellation will be triggered
|
# the following wacky bug:
|
||||||
# by the surrounding nursery on error.
|
# <tractorbugurlhere>
|
||||||
parent_exit.set()
|
parent_exit.set()
|
||||||
|
|
||||||
|
|
||||||
# Per actor task caching helpers.
|
# Per actor task caching helpers.
|
||||||
|
|
Loading…
Reference in New Issue