Add "raises-pre-started" `open_channel_from()` test
Verifying that if any exc is raised pre `chan.send_nowait()` (our currentlly shite version of a `chan.started()`) then that exc is indeed raised through on the `trio`-parent task side. This case was reproduced from a `piker.brokers.ib` issue with a similar embedded `.trionics.maybe_open_context()` call. Deats, - call the suite `test_aio_side_raises_before_started`. - mk the `@context` simply `maybe_open_context(acm_func=open_channel_from)` with a `target=raise_before_started` which, - simply sleeps then immediately raises a RTE. - expect the RTE from the aio-child-side to propagate all the way up to the root-actor's task right up through the `trio.run()`.to_asyncio_eoc_signal
parent
aef306465d
commit
808dd9d73c
|
@ -573,14 +573,16 @@ def test_basic_interloop_channel_stream(
|
||||||
fan_out: bool,
|
fan_out: bool,
|
||||||
):
|
):
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery() as an:
|
# TODO, figure out min timeout here!
|
||||||
portal = await an.run_in_actor(
|
with trio.fail_after(6):
|
||||||
stream_from_aio,
|
async with tractor.open_nursery() as an:
|
||||||
infect_asyncio=True,
|
portal = await an.run_in_actor(
|
||||||
fan_out=fan_out,
|
stream_from_aio,
|
||||||
)
|
infect_asyncio=True,
|
||||||
# should raise RAE diectly
|
fan_out=fan_out,
|
||||||
await portal.result()
|
)
|
||||||
|
# should raise RAE diectly
|
||||||
|
await portal.result()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
@ -1088,6 +1090,97 @@ def test_sigint_closes_lifetime_stack(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# asyncio.Task fn
|
||||||
|
async def raise_before_started(
|
||||||
|
from_trio: asyncio.Queue,
|
||||||
|
to_trio: trio.abc.SendChannel,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
`asyncio.Task` entry point which RTEs before calling
|
||||||
|
`to_trio.send_nowait()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
|
||||||
|
|
||||||
|
to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
|
||||||
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def caching_ep(
|
||||||
|
ctx: tractor.Context,
|
||||||
|
):
|
||||||
|
|
||||||
|
log = tractor.log.get_logger('caching_ep')
|
||||||
|
log.info('syncing via `ctx.started()`')
|
||||||
|
await ctx.started()
|
||||||
|
|
||||||
|
# XXX, allocate the `open_channel_from()` inside
|
||||||
|
# a `.trionics.maybe_open_context()`.
|
||||||
|
chan: to_asyncio.LinkedTaskChannel
|
||||||
|
async with (
|
||||||
|
tractor.trionics.maybe_open_context(
|
||||||
|
acm_func=tractor.to_asyncio.open_channel_from,
|
||||||
|
kwargs={
|
||||||
|
'target': raise_before_started,
|
||||||
|
# ^XXX, kwarg to `open_channel_from()`
|
||||||
|
},
|
||||||
|
|
||||||
|
# lock around current actor task access
|
||||||
|
key=tractor.current_actor().uid,
|
||||||
|
|
||||||
|
) as (cache_hit, (clients, chan)),
|
||||||
|
):
|
||||||
|
if cache_hit:
|
||||||
|
log.error(
|
||||||
|
'Re-using cached `.open_from_channel()` call!\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
log.info(
|
||||||
|
'Allocating SHOULD-FAIL `.open_from_channel()`\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
# TODO, simulates connection-err from `piker.brokers.ib.api`..
|
||||||
|
def test_aio_side_raises_before_started(
|
||||||
|
reg_addr: tuple[str, int],
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
|
# delay = 999 if debug_mode else 1
|
||||||
|
async def main():
|
||||||
|
with trio.fail_after(3):
|
||||||
|
an: tractor.ActorNursery
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel=loglevel,
|
||||||
|
) as an:
|
||||||
|
p: tractor.Portal = await an.start_actor(
|
||||||
|
'lchan_cacher_that_raises_fast',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
infect_asyncio=True,
|
||||||
|
)
|
||||||
|
async with p.open_context(
|
||||||
|
caching_ep,
|
||||||
|
) as (ctx, first):
|
||||||
|
assert not first
|
||||||
|
|
||||||
|
with pytest.raises(
|
||||||
|
expected_exception=(RemoteActorError),
|
||||||
|
) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
# ensure `asyncio.Task` exception is bubbled
|
||||||
|
# allll the way erp!!
|
||||||
|
rae = excinfo.value
|
||||||
|
assert rae.boxed_type is RuntimeError
|
||||||
|
|
||||||
# TODO: debug_mode tests once we get support for `asyncio`!
|
# TODO: debug_mode tests once we get support for `asyncio`!
|
||||||
#
|
#
|
||||||
# -[ ] need tests to wrap both scripts:
|
# -[ ] need tests to wrap both scripts:
|
||||||
|
|
Loading…
Reference in New Issue