forked from goodboy/tractor
Lel, revert `AsyncioCancelled` inherit, module..
Turns out it somehow breaks our `to_asyncio` error relay since obvi `asyncio`'s runtime seems to specially handle it (prolly via `isinstance()` ?) and it caused our `test_aio_cancelled_from_aio_causes_trio_cancelled()` to hang.. Further, obvi `unpack_error()` won't be able to find the type def if not kept inside `._exceptions`.. So given all that, revert the change/move as well as: - tweak the aio-from-aio cancel test to timeout. - do `trio.sleep()` conc with any bg aio task by moving out nursery block. - add a `send_sigint_to: str` parameter to `test_sigint_closes_lifetime_stack()` such that we test the SIGINT being relayed to just the parent or the child.aio_abandons
parent
a870df68c0
commit
4f1db1ff52
|
@ -289,23 +289,35 @@ async def aio_cancel():
|
||||||
|
|
||||||
'''
|
'''
|
||||||
await asyncio.sleep(0.5)
|
await asyncio.sleep(0.5)
|
||||||
task = asyncio.current_task()
|
|
||||||
|
|
||||||
# cancel and enter sleep
|
# cancel and enter sleep
|
||||||
|
task = asyncio.current_task()
|
||||||
task.cancel()
|
task.cancel()
|
||||||
await aio_sleep_forever()
|
await aio_sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||||
|
'''
|
||||||
|
When the `asyncio.Task` cancels itself the `trio` side cshould
|
||||||
|
also cancel and teardown and relay the cancellation cross-process
|
||||||
|
to the caller (parent).
|
||||||
|
|
||||||
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as n:
|
|
||||||
await n.run_in_actor(
|
an: tractor.ActorNursery
|
||||||
|
async with tractor.open_nursery() as an:
|
||||||
|
p: tractor.Portal = await an.run_in_actor(
|
||||||
asyncio_actor,
|
asyncio_actor,
|
||||||
target='aio_cancel',
|
target='aio_cancel',
|
||||||
expect_err='tractor.to_asyncio.AsyncioCancelled',
|
expect_err='tractor.to_asyncio.AsyncioCancelled',
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
)
|
)
|
||||||
|
# NOTE: normally the `an.__aexit__()` waits on the
|
||||||
|
# portal's result but we do it explicitly here
|
||||||
|
# to avoid indent levels.
|
||||||
|
with trio.fail_after(1):
|
||||||
|
await p.wait_for_result()
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(
|
||||||
expected_exception=(RemoteActorError, ExceptionGroup),
|
expected_exception=(RemoteActorError, ExceptionGroup),
|
||||||
|
@ -313,7 +325,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
# might get multiple `trio.Cancelled`s as well inside an inception
|
# might get multiple `trio.Cancelled`s as well inside an inception
|
||||||
err = excinfo.value
|
err: RemoteActorError|ExceptionGroup = excinfo.value
|
||||||
if isinstance(err, ExceptionGroup):
|
if isinstance(err, ExceptionGroup):
|
||||||
err = next(itertools.dropwhile(
|
err = next(itertools.dropwhile(
|
||||||
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
lambda exc: not isinstance(exc, tractor.RemoteActorError),
|
||||||
|
@ -321,7 +333,8 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr):
|
||||||
))
|
))
|
||||||
assert err
|
assert err
|
||||||
|
|
||||||
# ensure boxed error is correct
|
# relayed boxed error should be our `trio`-task's
|
||||||
|
# cancel-signal-proxy-equivalent of `asyncio.CancelledError`.
|
||||||
assert err.boxed_type == to_asyncio.AsyncioCancelled
|
assert err.boxed_type == to_asyncio.AsyncioCancelled
|
||||||
|
|
||||||
|
|
||||||
|
@ -630,6 +643,7 @@ def test_echoserver_detailed_mechanics(
|
||||||
async def manage_file(
|
async def manage_file(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
tmp_path_str: str,
|
tmp_path_str: str,
|
||||||
|
send_sigint_to: str,
|
||||||
bg_aio_task: bool = False,
|
bg_aio_task: bool = False,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -687,14 +701,20 @@ async def manage_file(
|
||||||
# honestly, this REALLY reminds me why i haven't used
|
# honestly, this REALLY reminds me why i haven't used
|
||||||
# `asyncio` by choice in years.. XD
|
# `asyncio` by choice in years.. XD
|
||||||
#
|
#
|
||||||
# await tractor.to_asyncio.run_task(aio_sleep_forever)
|
|
||||||
if bg_aio_task:
|
|
||||||
async with trio.open_nursery() as tn:
|
async with trio.open_nursery() as tn:
|
||||||
|
if bg_aio_task:
|
||||||
tn.start_soon(
|
tn.start_soon(
|
||||||
tractor.to_asyncio.run_task,
|
tractor.to_asyncio.run_task,
|
||||||
aio_sleep_forever,
|
aio_sleep_forever,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# XXX don't-need/doesn't-make-a-diff right
|
||||||
|
# since we're already doing it from parent?
|
||||||
|
# if send_sigint_to == 'child':
|
||||||
|
# os.kill(
|
||||||
|
# os.getpid(),
|
||||||
|
# signal.SIGINT,
|
||||||
|
# )
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
# signalled manually at the OS level (aka KBI) by the parent actor.
|
# signalled manually at the OS level (aka KBI) by the parent actor.
|
||||||
|
@ -702,10 +722,18 @@ async def manage_file(
|
||||||
print('child raised KBI..')
|
print('child raised KBI..')
|
||||||
assert tmp_file.exists()
|
assert tmp_file.exists()
|
||||||
raise
|
raise
|
||||||
else:
|
|
||||||
raise RuntimeError('shoulda received a KBI?')
|
raise RuntimeError('shoulda received a KBI?')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'send_sigint_to',
|
||||||
|
[
|
||||||
|
'child',
|
||||||
|
'parent',
|
||||||
|
],
|
||||||
|
ids='send_SIGINT_to={}'.format,
|
||||||
|
)
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'bg_aio_task',
|
'bg_aio_task',
|
||||||
[
|
[
|
||||||
|
@ -740,6 +768,8 @@ def test_sigint_closes_lifetime_stack(
|
||||||
tmp_path: Path,
|
tmp_path: Path,
|
||||||
wait_for_ctx: bool,
|
wait_for_ctx: bool,
|
||||||
bg_aio_task: bool,
|
bg_aio_task: bool,
|
||||||
|
debug_mode: bool,
|
||||||
|
send_sigint_to: str,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Ensure that an infected child can use the `Actor.lifetime_stack`
|
Ensure that an infected child can use the `Actor.lifetime_stack`
|
||||||
|
@ -749,8 +779,11 @@ def test_sigint_closes_lifetime_stack(
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
try:
|
try:
|
||||||
async with tractor.open_nursery() as n:
|
an: tractor.ActorNursery
|
||||||
p = await n.start_actor(
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
) as an:
|
||||||
|
p: tractor.Portal = await an.start_actor(
|
||||||
'file_mngr',
|
'file_mngr',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
infect_asyncio=True,
|
infect_asyncio=True,
|
||||||
|
@ -758,6 +791,7 @@ def test_sigint_closes_lifetime_stack(
|
||||||
async with p.open_context(
|
async with p.open_context(
|
||||||
manage_file,
|
manage_file,
|
||||||
tmp_path_str=str(tmp_path),
|
tmp_path_str=str(tmp_path),
|
||||||
|
send_sigint_to=send_sigint_to,
|
||||||
bg_aio_task=bg_aio_task,
|
bg_aio_task=bg_aio_task,
|
||||||
) as (ctx, first):
|
) as (ctx, first):
|
||||||
|
|
||||||
|
@ -777,10 +811,13 @@ def test_sigint_closes_lifetime_stack(
|
||||||
# shm-buffer leaks in `piker`'s live quote stream
|
# shm-buffer leaks in `piker`'s live quote stream
|
||||||
# susbys!
|
# susbys!
|
||||||
#
|
#
|
||||||
# await trio.sleep(.5)
|
|
||||||
await trio.sleep(.2)
|
await trio.sleep(.2)
|
||||||
|
pid: int = (
|
||||||
|
cpid if send_sigint_to == 'child'
|
||||||
|
else os.getpid()
|
||||||
|
)
|
||||||
os.kill(
|
os.kill(
|
||||||
cpid,
|
pid,
|
||||||
signal.SIGINT,
|
signal.SIGINT,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -790,7 +827,7 @@ def test_sigint_closes_lifetime_stack(
|
||||||
if wait_for_ctx:
|
if wait_for_ctx:
|
||||||
print('waiting for ctx outcome in parent..')
|
print('waiting for ctx outcome in parent..')
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(.7):
|
with trio.fail_after(1):
|
||||||
await ctx.wait_for_result()
|
await ctx.wait_for_result()
|
||||||
except tractor.ContextCancelled as ctxc:
|
except tractor.ContextCancelled as ctxc:
|
||||||
assert ctxc.canceller == ctx.chan.uid
|
assert ctxc.canceller == ctx.chan.uid
|
||||||
|
|
|
@ -929,6 +929,17 @@ class MessagingError(Exception):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
class AsyncioCancelled(Exception):
|
||||||
|
'''
|
||||||
|
Asyncio cancelled translation (non-base) error
|
||||||
|
for use with the ``to_asyncio`` module
|
||||||
|
to be raised in the ``trio`` side task
|
||||||
|
|
||||||
|
NOTE: this should NOT inherit from `asyncio.CancelledError` or
|
||||||
|
tests should break!
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
|
||||||
def pack_error(
|
def pack_error(
|
||||||
exc: BaseException|RemoteActorError,
|
exc: BaseException|RemoteActorError,
|
||||||
|
|
Loading…
Reference in New Issue