Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet a89799b682 Handle broken mem chan on `Actor._push_result()`
When backpressure is used and a feeder mem chan breaks during msg
delivery (usually because the IPC allocating task already terminated)
instead of raising we simply warn as we do for the non-backpressure
case.

Also, add a proper `Actor.is_arbiter` test inside `._invoke()` to avoid
doing an arbiter-registry lookup if the current actor **is** the
registrar.
2022-12-12 15:04:40 -05:00
Tyler Goodlet 45a9aaf6e9 Always set the `parent_exit: trio.Event` on exit 2022-12-12 15:04:40 -05:00
Tyler Goodlet b624ebba21 Add logging and teardown prints to full-fledged-streaming example 2022-12-12 15:04:40 -05:00
Tyler Goodlet e12def51a8 Always print any std streams to console in docs examples tests 2022-12-12 15:04:40 -05:00
Tyler Goodlet 64819b2acb Skip debugger tests on OS X for now 2022-12-12 15:04:40 -05:00
Tyler Goodlet a5e3cf4ecf Add macos run using only the `trio` spawner 2022-12-12 15:04:40 -05:00
Tyler Goodlet 7c42d2510c Add back `pytest` full trace flag to debug CI hangs 2022-12-12 15:04:40 -05:00
Tyler Goodlet 9c336ec064 Add `numpy` for testing optional integrated shm API layer 2022-12-12 15:04:11 -05:00
Tyler Goodlet 6453195e97 Pass `str` dtype for `use_str` case 2022-12-12 15:04:11 -05:00
Tyler Goodlet 54322f2bae Allocate size-specced "empty" sequence from default values by type 2022-12-12 15:04:11 -05:00
9 changed files with 118 additions and 46 deletions

View File

@ -67,7 +67,6 @@ jobs:
] ]
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v2 uses: actions/checkout@v2
@ -85,6 +84,40 @@ jobs:
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
testing-macos:
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}'
timeout-minutes: 10
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [macos-latest]
python: ['3.10']
spawn_backend: [
'trio',
]
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '${{ matrix.python }}'
- name: Install dependencies
run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: List dependencies
run: pip list
- name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rsx
# We skip 3.10 on windows for now due to not having any collabs to # We skip 3.10 on windows for now due to not having any collabs to
# debug the CI failures. Anyone wanting to hack and solve them is very # debug the CI failures. Anyone wanting to hack and solve them is very
# welcome, but our primary user base is not using that OS. # welcome, but our primary user base is not using that OS.

View File

@ -12,9 +12,11 @@ async def stream_data(seed):
# this is the third actor; the aggregator # this is the third actor; the aggregator
async def aggregate(seed): async def aggregate(seed):
"""Ensure that the two streams we receive match but only stream '''
Ensure that the two streams we receive match but only stream
a single set of values to the parent. a single set of values to the parent.
"""
'''
async with tractor.open_nursery() as nursery: async with tractor.open_nursery() as nursery:
portals = [] portals = []
for i in range(1, 3): for i in range(1, 3):
@ -69,7 +71,8 @@ async def aggregate(seed):
async def main(): async def main():
# a nursery which spawns "actors" # a nursery which spawns "actors"
async with tractor.open_nursery( async with tractor.open_nursery(
arbiter_addr=('127.0.0.1', 1616) arbiter_addr=('127.0.0.1', 1616),
loglevel='cancel',
) as nursery: ) as nursery:
seed = int(1e3) seed = int(1e3)
@ -92,6 +95,9 @@ async def main():
async for value in stream: async for value in stream:
result_stream.append(value) result_stream.append(value)
print("ROOT STREAM CONSUMER COMPLETE")
print("ROOT CANCELLING AGGREGATOR CHILD")
await portal.cancel_actor() await portal.cancel_actor()
print(f"STREAM TIME = {time.time() - start}") print(f"STREAM TIME = {time.time() - start}")

View File

@ -6,3 +6,4 @@ mypy
trio_typing trio_typing
pexpect pexpect
towncrier towncrier
numpy

View File

@ -36,9 +36,12 @@ from conftest import repodir, _ci_env
# - recurrent root errors # - recurrent root errors
if platform.system() == 'Windows': if osname := platform.system() in (
'Windows',
'Darwin',
):
pytest.skip( pytest.skip(
'Debugger tests have no windows support (yet)', 'Debugger tests have no {osname} support (yet)',
allow_module_level=True, allow_module_level=True,
) )
@ -783,8 +786,6 @@ def test_multi_nested_subactors_error_through_nurseries(
child = spawn('multi_nested_subactors_error_up_through_nurseries') child = spawn('multi_nested_subactors_error_up_through_nurseries')
timed_out_early: bool = False
for send_char in itertools.cycle(['c', 'q']): for send_char in itertools.cycle(['c', 'q']):
try: try:
child.expect(r"\(Pdb\+\+\)") child.expect(r"\(Pdb\+\+\)")

View File

@ -93,14 +93,16 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
ids=lambda t: t[1], ids=lambda t: t[1],
) )
def test_example(run_example_in_subproc, example_script): def test_example(run_example_in_subproc, example_script):
"""Load and run scripts from this repo's ``examples/`` dir as a user '''
Load and run scripts from this repo's ``examples/`` dir as a user
would copy and pasing them into their editor. would copy and pasing them into their editor.
On windows a little more "finessing" is done to make On windows a little more "finessing" is done to make
``multiprocessing`` play nice: we copy the ``__main__.py`` into the ``multiprocessing`` play nice: we copy the ``__main__.py`` into the
test directory and invoke the script as a module with ``python -m test directory and invoke the script as a module with ``python -m
test_example``. test_example``.
"""
'''
ex_file = os.path.join(*example_script) ex_file = os.path.join(*example_script)
if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9): if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9):
@ -110,25 +112,32 @@ def test_example(run_example_in_subproc, example_script):
code = ex.read() code = ex.read()
with run_example_in_subproc(code) as proc: with run_example_in_subproc(code) as proc:
proc.wait() try:
err, _ = proc.stderr.read(), proc.stdout.read() proc.wait(timeout=5)
# print(f'STDERR: {err}') finally:
# print(f'STDOUT: {out}') err = proc.stderr.read()
# if we get some gnarly output let's aggregate and raise
if err:
errmsg = err.decode() errmsg = err.decode()
errlines = errmsg.splitlines() out = proc.stdout.read()
last_error = errlines[-1] outmsg = out.decode()
if (
'Error' in last_error
# XXX: currently we print this to console, but maybe if out:
# shouldn't eventually once we figure out what's print(f'STDOUT: {out.decode()}')
# a better way to be explicit about aio side
# cancels? # if we get some gnarly output let's aggregate and raise
and 'asyncio.exceptions.CancelledError' not in last_error if err:
): print(f'STDERR:\n{errmsg}')
raise Exception(errmsg) errmsg = err.decode()
errlines = errmsg.splitlines()
last_error = errlines[-1]
if (
'Error' in last_error
# XXX: currently we print this to console, but maybe
# shouldn't eventually once we figure out what's
# a better way to be explicit about aio side
# cancels?
and 'asyncio.exceptions.CancelledError' not in last_error
):
raise Exception(errmsg)
assert proc.returncode == 0 assert proc.returncode == 0

View File

@ -70,7 +70,10 @@ async def child_read_shm_list(
) -> None: ) -> None:
# attach in child # attach in child
shml = attach_shm_list(key=shm_key) shml = attach_shm_list(
key=shm_key,
# dtype=str if use_str else float,
)
await ctx.started(shml.key) await ctx.started(shml.key)
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
@ -92,7 +95,9 @@ async def child_read_shm_list(
@pytest.mark.parametrize( @pytest.mark.parametrize(
'use_str', [False, True], 'use_str',
[False, True],
ids=lambda i: f'use_str_values={i}',
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'frame_size', 'frame_size',
@ -106,7 +111,7 @@ def test_parent_writer_child_reader(
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, # debug_mode=True,
) as an: ) as an:
portal = await an.start_actor( portal = await an.start_actor(
@ -121,6 +126,7 @@ def test_parent_writer_child_reader(
shml = open_shm_list( shml = open_shm_list(
key=key, key=key,
size=seq_size, size=seq_size,
dtype=str if use_str else float,
readonly=False, readonly=False,
) )
@ -143,7 +149,7 @@ def test_parent_writer_child_reader(
if use_str: if use_str:
val = str(val) val = str(val)
print(f'(parent): writing {val}') # print(f'(parent): writing {val}')
shml[i] = val shml[i] = val
# only on frame fills do we # only on frame fills do we

View File

@ -829,7 +829,12 @@ class Actor:
if ctx._backpressure: if ctx._backpressure:
log.warning(text) log.warning(text)
await send_chan.send(msg) try:
await send_chan.send(msg)
except trio.BrokenResourceError:
# XXX: local consumer has closed their side
# so cancel the far end streaming task
log.warning(f"{chan} is already closed")
else: else:
try: try:
raise StreamOverrun(text) from None raise StreamOverrun(text) from None
@ -1374,8 +1379,9 @@ async def async_main(
actor.lifetime_stack.close() actor.lifetime_stack.close()
# Unregister actor from the arbiter # Unregister actor from the arbiter
if registered_with_arbiter and ( if (
actor._arb_addr is not None registered_with_arbiter
and not actor.is_arbiter
): ):
failed = False failed = False
with trio.move_on_after(0.5) as cs: with trio.move_on_after(0.5) as cs:

View File

@ -460,7 +460,6 @@ class ShmArray:
def open_shm_ndarray( def open_shm_ndarray(
key: Optional[str] = None, key: Optional[str] = None,
size: int = int(2 ** 10), size: int = int(2 ** 10),
dtype: np.dtype | None = None, dtype: np.dtype | None = None,
@ -719,7 +718,7 @@ class ShmList(ShareableList):
Carbon copy of ``.shared_memory.ShareableList`` with a few Carbon copy of ``.shared_memory.ShareableList`` with a few
enhancements: enhancements:
- readonly mode via instance var flag - readonly mode via instance var flag `._readonly: bool`
- ``.__getitem__()`` accepts ``slice`` inputs - ``.__getitem__()`` accepts ``slice`` inputs
- exposes the underlying buffer "name" as a ``.key: str`` - exposes the underlying buffer "name" as a ``.key: str``
@ -743,6 +742,10 @@ class ShmList(ShareableList):
def key(self) -> str: def key(self) -> str:
return self._key return self._key
@property
def readonly(self) -> bool:
return self._readonly
def __setitem__( def __setitem__(
self, self,
position, position,
@ -781,13 +784,20 @@ def open_shm_list(
key: str, key: str,
sequence: list | None = None, sequence: list | None = None,
size: int = int(2 ** 10), size: int = int(2 ** 10),
dtype: np.dtype | None = None, dtype: float | int | bool | str | bytes | None = float,
readonly: bool = True, readonly: bool = True,
) -> ShmList: ) -> ShmList:
if sequence is None: if sequence is None:
sequence = list(map(float, range(size))) default = {
float: 0.,
int: 0,
bool: True,
str: 'doggy',
None: None,
}[dtype]
sequence = [default] * size
shml = ShmList( shml = ShmList(
sequence=sequence, sequence=sequence,

View File

@ -133,13 +133,13 @@ async def gather_contexts(
# deliver control once all managers have started up # deliver control once all managers have started up
await all_entered.wait() await all_entered.wait()
# NOTE: order *should* be preserved in the output values try:
# since ``dict``s are now implicitly ordered. yield tuple(unwrapped.values())
yield tuple(unwrapped.values()) finally:
# NOTE: this is ABSOLUTELY REQUIRED to avoid
# we don't need a try/finally since cancellation will be triggered # the following wacky bug:
# by the surrounding nursery on error. # <tractorbugurlhere>
parent_exit.set() parent_exit.set()
# Per actor task caching helpers. # Per actor task caching helpers.