Compare commits
4 Commits
9639c3b455
...
369fca6cc1
Author | SHA1 | Date |
---|---|---|
|
369fca6cc1 | |
|
3af55d9d64 | |
|
093ad02beb | |
|
b4bacbc766 |
|
@ -1,85 +0,0 @@
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from functools import partial
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
|
||||
log = tractor.log.get_logger(
|
||||
name=__name__
|
||||
)
|
||||
|
||||
_lock: trio.Lock|None = None
|
||||
|
||||
|
||||
@acm
|
||||
async def acquire_singleton_lock(
|
||||
) -> None:
|
||||
global _lock
|
||||
if _lock is None:
|
||||
log.info('Allocating LOCK')
|
||||
_lock = trio.Lock()
|
||||
|
||||
log.info('TRYING TO LOCK ACQUIRE')
|
||||
async with _lock:
|
||||
log.info('ACQUIRED')
|
||||
yield _lock
|
||||
|
||||
log.info('RELEASED')
|
||||
|
||||
|
||||
|
||||
async def hold_lock_forever(
|
||||
task_status=trio.TASK_STATUS_IGNORED
|
||||
):
|
||||
async with (
|
||||
tractor.trionics.maybe_raise_from_masking_exc(),
|
||||
acquire_singleton_lock() as lock,
|
||||
):
|
||||
task_status.started(lock)
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
async def main(
|
||||
ignore_special_cases: bool,
|
||||
loglevel: str = 'info',
|
||||
debug_mode: bool = True,
|
||||
):
|
||||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
|
||||
# tractor.trionics.maybe_raise_from_masking_exc()
|
||||
# ^^^ XXX NOTE, interestingly putting the unmasker
|
||||
# here does not exhibit the same behaviour ??
|
||||
):
|
||||
if not ignore_special_cases:
|
||||
from tractor.trionics import _taskc
|
||||
_taskc._mask_cases.clear()
|
||||
|
||||
_lock = await tn.start(
|
||||
hold_lock_forever,
|
||||
)
|
||||
with trio.move_on_after(0.2):
|
||||
await tn.start(
|
||||
hold_lock_forever,
|
||||
)
|
||||
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
# XXX, manual test as script
|
||||
if __name__ == '__main__':
|
||||
tractor.log.get_console_log(level='info')
|
||||
for case in [True, False]:
|
||||
log.info(
|
||||
f'\n'
|
||||
f'------ RUNNING SCRIPT TRIAL ------\n'
|
||||
f'ignore_special_cases: {case!r}\n'
|
||||
)
|
||||
trio.run(partial(
|
||||
main,
|
||||
ignore_special_cases=case,
|
||||
loglevel='info',
|
||||
))
|
|
@ -9,10 +9,8 @@ import tractor
|
|||
import trio
|
||||
|
||||
|
||||
log = tractor.log.get_logger(
|
||||
name=__name__
|
||||
)
|
||||
|
||||
log = tractor.log.get_logger(__name__)
|
||||
tractor.log.get_console_log('info')
|
||||
|
||||
@cm
|
||||
def teardown_on_exc(
|
||||
|
@ -56,7 +54,6 @@ def teardown_on_exc(
|
|||
async def finite_stream_to_rent(
|
||||
tx: trio.abc.SendChannel,
|
||||
child_errors_mid_stream: bool,
|
||||
raise_unmasked: bool,
|
||||
|
||||
task_status: trio.TaskStatus[
|
||||
trio.CancelScope,
|
||||
|
@ -71,41 +68,20 @@ async def finite_stream_to_rent(
|
|||
# inside the child task!
|
||||
#
|
||||
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
||||
tractor.trionics.maybe_raise_from_masking_exc(
|
||||
raise_unmasked=raise_unmasked,
|
||||
),
|
||||
# tractor.trionics.maybe_raise_from_masking_exc(),
|
||||
|
||||
tx as tx, # .aclose() is the guilty masker chkpt!
|
||||
|
||||
# XXX, this ONLY matters in the
|
||||
# `child_errors_mid_stream=False` case oddly!?
|
||||
# THAT IS, if no tn is opened in that case then the
|
||||
# test will not fail; it raises the RTE correctly?
|
||||
#
|
||||
# -> so it seems this new scope somehow affects the form of
|
||||
# eventual in the parent EG?
|
||||
tractor.trionics.maybe_open_nursery(
|
||||
nursery=(
|
||||
None
|
||||
if not child_errors_mid_stream
|
||||
else True
|
||||
),
|
||||
) as _tn,
|
||||
trio.open_nursery() as _tn,
|
||||
):
|
||||
# pass our scope back to parent for supervision\
|
||||
# control.
|
||||
cs: trio.CancelScope|None = (
|
||||
None
|
||||
if _tn is True
|
||||
else _tn.cancel_scope
|
||||
)
|
||||
task_status.started(cs)
|
||||
task_status.started(_tn.cancel_scope)
|
||||
|
||||
with teardown_on_exc(
|
||||
raise_from_handler=not child_errors_mid_stream,
|
||||
):
|
||||
for i in range(100):
|
||||
log.debug(
|
||||
log.info(
|
||||
f'Child tx {i!r}\n'
|
||||
)
|
||||
if (
|
||||
|
@ -131,31 +107,23 @@ async def main(
|
|||
# bug and raises.
|
||||
#
|
||||
child_errors_mid_stream: bool,
|
||||
|
||||
raise_unmasked: bool = False,
|
||||
loglevel: str = 'info',
|
||||
):
|
||||
tractor.log.get_console_log(level=loglevel)
|
||||
|
||||
# the `.aclose()` being checkpoints on these
|
||||
# is the source of the problem..
|
||||
tx, rx = trio.open_memory_channel(1)
|
||||
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
rx as rx,
|
||||
):
|
||||
|
||||
_child_cs = await tn.start(
|
||||
partial(
|
||||
finite_stream_to_rent,
|
||||
child_errors_mid_stream=child_errors_mid_stream,
|
||||
raise_unmasked=raise_unmasked,
|
||||
tx=tx,
|
||||
)
|
||||
)
|
||||
async for msg in rx:
|
||||
log.debug(
|
||||
log.info(
|
||||
f'Rent rx {msg!r}\n'
|
||||
)
|
||||
|
||||
|
@ -171,25 +139,7 @@ async def main(
|
|||
tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
# XXX, manual test as script
|
||||
if __name__ == '__main__':
|
||||
tractor.log.get_console_log(level='info')
|
||||
|
||||
for case in [True, False]:
|
||||
log.info(
|
||||
f'\n'
|
||||
f'------ RUNNING SCRIPT TRIAL ------\n'
|
||||
f'child_errors_midstream: {case!r}\n'
|
||||
)
|
||||
try:
|
||||
trio.run(partial(
|
||||
main,
|
||||
child_errors_mid_stream=case,
|
||||
# raise_unmasked=True,
|
||||
loglevel='info',
|
||||
))
|
||||
except Exception as _exc:
|
||||
exc = _exc
|
||||
log.exception(
|
||||
'Should have raised an RTE or Cancelled?\n'
|
||||
)
|
||||
breakpoint()
|
||||
trio.run(main, case)
|
||||
|
|
|
@ -95,7 +95,6 @@ def run_example_in_subproc(
|
|||
and 'integration' not in p[0]
|
||||
and 'advanced_faults' not in p[0]
|
||||
and 'multihost' not in p[0]
|
||||
and 'trio' not in p[0]
|
||||
)
|
||||
],
|
||||
ids=lambda t: t[1],
|
||||
|
|
|
@ -6,18 +6,11 @@ want to see changed.
|
|||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from types import ModuleType
|
||||
|
||||
from functools import partial
|
||||
|
||||
import pytest
|
||||
from _pytest import pathlib
|
||||
from tractor.trionics import collapse_eg
|
||||
import trio
|
||||
from trio import TaskStatus
|
||||
from tractor._testing import (
|
||||
examples_dir,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -113,9 +106,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Demo how a masking `trio.Cancelled` could be handled by unmasking
|
||||
from the `.__context__` field when a user (by accident) re-raises
|
||||
from a `finally:`.
|
||||
Demo how a masking `trio.Cancelled` could be handled by unmasking from the
|
||||
`.__context__` field when a user (by accident) re-raises from a `finally:`.
|
||||
|
||||
'''
|
||||
import tractor
|
||||
|
@ -166,13 +158,13 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
assert len(assert_eg.exceptions) == 1
|
||||
|
||||
|
||||
|
||||
def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Demo how a using an `async with sndchan` inside
|
||||
a `.trionics.gather_contexts()` task will break a strict-eg-tn's
|
||||
multi-cancelled absorption..
|
||||
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
|
||||
will break a strict-eg-tn's multi-cancelled absorption..
|
||||
|
||||
'''
|
||||
from tractor import (
|
||||
|
@ -198,6 +190,7 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
|||
f'Closed {task!r}\n'
|
||||
)
|
||||
|
||||
|
||||
async def main():
|
||||
async with (
|
||||
# XXX should ensure ONLY the KBI
|
||||
|
@ -218,85 +211,3 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
|||
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
trio.run(main)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'raise_unmasked', [
|
||||
True,
|
||||
pytest.param(
|
||||
False,
|
||||
marks=pytest.mark.xfail(
|
||||
reason="see examples/trio/send_chan_aclose_masks.py"
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'child_errors_mid_stream',
|
||||
[True, False],
|
||||
)
|
||||
def test_unmask_aclose_as_checkpoint_on_aexit(
|
||||
raise_unmasked: bool,
|
||||
child_errors_mid_stream: bool,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify that our unmasker util works over the common case where
|
||||
a mem-chan's `.aclose()` is included in an `@acm` stack
|
||||
and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded
|
||||
exception from user code resulting in a silent failure which
|
||||
appears like graceful cancellation.
|
||||
|
||||
This test suite is mostly implemented as an example script so it
|
||||
could more easily be shared with `trio`-core peeps as `tractor`-less
|
||||
minimum reproducing example.
|
||||
|
||||
'''
|
||||
mod: ModuleType = pathlib.import_path(
|
||||
examples_dir()
|
||||
/ 'trio'
|
||||
/ 'send_chan_aclose_masks_beg.py',
|
||||
root=examples_dir(),
|
||||
consider_namespace_packages=False,
|
||||
)
|
||||
with pytest.raises(RuntimeError):
|
||||
trio.run(partial(
|
||||
mod.main,
|
||||
raise_unmasked=raise_unmasked,
|
||||
child_errors_mid_stream=child_errors_mid_stream,
|
||||
))
|
||||
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'ignore_special_cases', [
|
||||
True,
|
||||
pytest.param(
|
||||
False,
|
||||
marks=pytest.mark.xfail(
|
||||
reason="see examples/trio/lockacquire_not_umasked.py"
|
||||
)
|
||||
),
|
||||
]
|
||||
)
|
||||
def test_cancelled_lockacquire_in_ipctx_not_unmasked(
|
||||
ignore_special_cases: bool,
|
||||
loglevel: str,
|
||||
debug_mode: bool,
|
||||
):
|
||||
mod: ModuleType = pathlib.import_path(
|
||||
examples_dir()
|
||||
/ 'trio'
|
||||
/ 'lockacquire_not_unmasked.py',
|
||||
root=examples_dir(),
|
||||
consider_namespace_packages=False,
|
||||
)
|
||||
async def _main():
|
||||
with trio.fail_after(2):
|
||||
await mod.main(
|
||||
ignore_special_cases=ignore_special_cases,
|
||||
loglevel=loglevel,
|
||||
debug_mode=debug_mode,
|
||||
)
|
||||
|
||||
trio.run(_main)
|
||||
|
|
|
@ -72,7 +72,7 @@ _mask_cases: dict[
|
|||
dict[
|
||||
int, # inner-frame index into `inspect.getinnerframes()`
|
||||
# `FrameInfo.function/filename: str`s to match
|
||||
dict[str, str],
|
||||
tuple[str, str],
|
||||
],
|
||||
] = {
|
||||
trio.WouldBlock: {
|
||||
|
@ -163,7 +163,6 @@ async def maybe_raise_from_masking_exc(
|
|||
# ^XXX, special case(s) where we warn-log bc likely
|
||||
# there will be no operational diff since the exc
|
||||
# is always expected to be consumed.
|
||||
|
||||
) -> BoxedMaybeException:
|
||||
'''
|
||||
Maybe un-mask and re-raise exception(s) suppressed by a known
|
||||
|
@ -264,8 +263,6 @@ async def maybe_raise_from_masking_exc(
|
|||
if len(masked) < 2:
|
||||
# don't unmask already known "special" cases..
|
||||
if (
|
||||
_mask_cases
|
||||
and
|
||||
(cases := _mask_cases.get(type(exc_ctx)))
|
||||
and
|
||||
(masker_frame := is_expected_masking_case(
|
||||
|
@ -275,8 +272,7 @@ async def maybe_raise_from_masking_exc(
|
|||
))
|
||||
):
|
||||
log.warning(
|
||||
f'Ignoring already-known, non-ideal-but-valid '
|
||||
f'masker code @\n'
|
||||
f'Ignoring already-known/non-ideally-valid masker code @\n'
|
||||
f'{masker_frame}\n'
|
||||
f'\n'
|
||||
f'NOT raising {exc_ctx} from masker {exc_match!r}\n'
|
||||
|
|
Loading…
Reference in New Issue