From 4a7491bda4dc58140361bdf1f7efebbe53bdf348 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jul 2025 01:06:59 -0400 Subject: [PATCH 1/5] Add "raises-pre-started" `open_channel_from()` test Verifying that if any exc is raised pre `chan.send_nowait()` (our currentlly shite version of a `chan.started()`) then that exc is indeed raised through on the `trio`-parent task side. This case was reproduced from a `piker.brokers.ib` issue with a similar embedded `.trionics.maybe_open_context()` call. Deats, - call the suite `test_aio_side_raises_before_started`. - mk the `@context` simply `maybe_open_context(acm_func=open_channel_from)` with a `target=raise_before_started` which, - simply sleeps then immediately raises a RTE. - expect the RTE from the aio-child-side to propagate all the way up to the root-actor's task right up through the `trio.run()`. --- tests/test_infected_asyncio.py | 109 ++++++++++++++++++++++++++++++--- 1 file changed, 101 insertions(+), 8 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index edd7ee47..10bbd52f 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -571,14 +571,16 @@ def test_basic_interloop_channel_stream( fan_out: bool, ): async def main(): - async with tractor.open_nursery() as an: - portal = await an.run_in_actor( - stream_from_aio, - infect_asyncio=True, - fan_out=fan_out, - ) - # should raise RAE diectly - await portal.result() + # TODO, figure out min timeout here! + with trio.fail_after(6): + async with tractor.open_nursery() as an: + portal = await an.run_in_actor( + stream_from_aio, + infect_asyncio=True, + fan_out=fan_out, + ) + # should raise RAE diectly + await portal.result() trio.run(main) @@ -1086,6 +1088,97 @@ def test_sigint_closes_lifetime_stack( trio.run(main) + +# asyncio.Task fn +async def raise_before_started( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + +) -> None: + ''' + `asyncio.Task` entry point which RTEs before calling + `to_trio.send_nowait()`. + + ''' + await asyncio.sleep(0.2) + raise RuntimeError('Some shite went wrong before `.send_nowait()`!!') + + to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??') + await asyncio.sleep(float('inf')) + + +@tractor.context +async def caching_ep( + ctx: tractor.Context, +): + + log = tractor.log.get_logger('caching_ep') + log.info('syncing via `ctx.started()`') + await ctx.started() + + # XXX, allocate the `open_channel_from()` inside + # a `.trionics.maybe_open_context()`. + chan: to_asyncio.LinkedTaskChannel + async with ( + tractor.trionics.maybe_open_context( + acm_func=tractor.to_asyncio.open_channel_from, + kwargs={ + 'target': raise_before_started, + # ^XXX, kwarg to `open_channel_from()` + }, + + # lock around current actor task access + key=tractor.current_actor().uid, + + ) as (cache_hit, (clients, chan)), + ): + if cache_hit: + log.error( + 'Re-using cached `.open_from_channel()` call!\n' + ) + + else: + log.info( + 'Allocating SHOULD-FAIL `.open_from_channel()`\n' + ) + + await trio.sleep_forever() + + +# TODO, simulates connection-err from `piker.brokers.ib.api`.. +def test_aio_side_raises_before_started( + reg_addr: tuple[str, int], + debug_mode: bool, + loglevel: str, +): + # delay = 999 if debug_mode else 1 + async def main(): + with trio.fail_after(3): + an: tractor.ActorNursery + async with tractor.open_nursery( + debug_mode=debug_mode, + loglevel=loglevel, + ) as an: + p: tractor.Portal = await an.start_actor( + 'lchan_cacher_that_raises_fast', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + caching_ep, + ) as (ctx, first): + assert not first + + with pytest.raises( + expected_exception=(RemoteActorError), + ) as excinfo: + trio.run(main) + + # ensure `asyncio.Task` exception is bubbled + # allll the way erp!! + rae = excinfo.value + assert rae.boxed_type is RuntimeError + # TODO: debug_mode tests once we get support for `asyncio`! # # -[ ] need tests to wrap both scripts: From bd148300c5b7b33534892088b320ba37b6912371 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jul 2025 14:30:42 -0400 Subject: [PATCH 2/5] Relay `asyncio` errors via EoC and raise from rent Makes the newly added `test_aio_side_raises_before_started` test pass by ensuring errors raised by any `.to_asyncio.open_channel_from()` spawned child-`asyncio.Task` are relayed by any caught `trio.EndOfChannel` by checking for a new `LinkedTaskChannel._closed_by_aio_task: bool`. Impl deats, - obvi add `LinkedTaskChannel._closed_by_aio_task: bool = False` - in `translate_aio_errors()` always check for the new flag on EOC conditions and in such cases set `chan._trio_to_raise = aio_err` such that the `trio`-parent-task always raises the child's exception directly, OW keep original EoC passthrough in place. - include *very* detailed per-case comments around the extended handler. - adjust re-raising logic with a new `raise_from` where we only give the `aio_err` priority if it's not already set as to `trio_to_raise`. Also, - hide the `_run_asyncio_task()` frame by def. --- tractor/to_asyncio.py | 84 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 68 insertions(+), 16 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 63c20bc1..a9096f7f 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -130,6 +130,7 @@ class LinkedTaskChannel( _trio_task: trio.Task _aio_task_complete: trio.Event + _closed_by_aio_task: bool = False _suppress_graceful_exits: bool = True _trio_err: BaseException|None = None @@ -242,6 +243,7 @@ class LinkedTaskChannel( # cycle on the trio side? # await trio.lowlevel.checkpoint() return await self._from_aio.receive() + except BaseException as err: async with translate_aio_errors( chan=self, @@ -319,7 +321,7 @@ def _run_asyncio_task( qsize: int = 1, provide_channels: bool = False, suppress_graceful_exits: bool = True, - hide_tb: bool = False, + hide_tb: bool = True, **kwargs, ) -> LinkedTaskChannel: @@ -445,9 +447,15 @@ def _run_asyncio_task( f'Task exited with final result: {result!r}\n' ) - # only close the sender side which will relay - # a `trio.EndOfChannel` to the trio (consumer) side. + # only close the aio (child) side which will relay + # a `trio.EndOfChannel` to the trio (parent) side. + # + # XXX NOTE, that trio-side MUST then in such cases + # check for a `chan._aio_err` and raise it!! to_trio.close() + # specially mark the closure as due to the + # asyncio.Task terminating! + chan._closed_by_aio_task = True aio_task_complete.set() log.runtime( @@ -645,8 +653,9 @@ def _run_asyncio_task( not trio_cs.cancel_called ): log.cancel( - f'Cancelling `trio` side due to aio-side src exc\n' - f'{curr_aio_err}\n' + f'Cancelling trio-side due to aio-side src exc\n' + f'\n' + f'{curr_aio_err!r}\n' f'\n' f'(c>\n' f' |_{trio_task}\n' @@ -758,6 +767,7 @@ async def translate_aio_errors( aio_done_before_trio: bool = aio_task.done() assert aio_task trio_err: BaseException|None = None + eoc: trio.EndOfChannel|None = None try: yield # back to one of the cross-loop apis except trio.Cancelled as taskc: @@ -789,12 +799,50 @@ async def translate_aio_errors( # ) # raise - # XXX always passthrough EoC since this translator is often - # called from `LinkedTaskChannel.receive()` which we want - # passthrough and further we have no special meaning for it in - # terms of relaying errors or signals from the aio side! - except trio.EndOfChannel as eoc: - trio_err = chan._trio_err = eoc + # XXX EoC is a special SIGNAL from the aio-side here! + # There are 2 cases to handle: + # 1. the "EoC passthrough" case. + # - the aio-task actually closed the channel "gracefully" and + # the trio-task should unwind any ongoing channel + # iteration/receiving, + # |_this exc-translator wraps calls to `LinkedTaskChannel.receive()` + # in which case we want to relay the actual "end-of-chan" for + # iteration purposes. + # + # 2. relaying the "asyncio.Task termination" case. + # - if the aio-task terminates, maybe with an error, AND the + # `open_channel_from()` API was used, it will always signal + # that termination. + # |_`wait_on_coro_final_result()` always calls + # `to_trio.close()` when `provide_channels=True` so we need to + # always check if there is an aio-side exc which needs to be + # relayed to the parent trio side! + # |_in this case the special `chan._closed_by_aio_task` is + # ALWAYS set. + # + except trio.EndOfChannel as _eoc: + eoc = _eoc + if ( + chan._closed_by_aio_task + and + aio_err + ): + log.cancel( + f'The asyncio-child task terminated due to error\n' + f'{aio_err!r}\n' + ) + chan._trio_to_raise = aio_err + trio_err = chan._trio_err = eoc + # + # await tractor.pause(shield=True) + # + # ?TODO?, raise something like a, + # chan._trio_to_raise = AsyncioErrored() + # BUT, with the tb rewritten to reflect the underlying + # call stack? + else: + trio_err = chan._trio_err = eoc + raise eoc # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio @@ -1047,7 +1095,7 @@ async def translate_aio_errors( # if wait_on_aio_task: await chan._aio_task_complete.wait() - log.info( + log.debug( 'asyncio-task is done and unblocked trio-side!\n' ) @@ -1064,11 +1112,17 @@ async def translate_aio_errors( trio_to_raise: ( AsyncioCancelled| AsyncioTaskExited| + Exception| # relayed from aio-task None ) = chan._trio_to_raise + raise_from: Exception = ( + trio_err if (aio_err is trio_to_raise) + else aio_err + ) + if not suppress_graceful_exits: - raise trio_to_raise from (aio_err or trio_err) + raise trio_to_raise from raise_from if trio_to_raise: match ( @@ -1101,7 +1155,7 @@ async def translate_aio_errors( ) return case _: - raise trio_to_raise from (aio_err or trio_err) + raise trio_to_raise from raise_from # Check if the asyncio-side is the cause of the trio-side # error. @@ -1167,7 +1221,6 @@ async def run_task( @acm async def open_channel_from( - target: Callable[..., Any], suppress_graceful_exits: bool = True, **target_kwargs, @@ -1201,7 +1254,6 @@ async def open_channel_from( # deliver stream handle upward yield first, chan except trio.Cancelled as taskc: - # await tractor.pause(shield=True) # ya it worx ;) if cs.cancel_called: if isinstance(chan._trio_to_raise, AsyncioCancelled): log.cancel( From 961504b657beb537bdc0853c3a3115e24b28c0be Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jul 2025 14:42:15 -0400 Subject: [PATCH 3/5] Support `chan.started_nowait()` in `.open_channel_from()` target That is the `target` can declare a `chan: LinkedTaskChannel` instead of `to_trio`/`from_aio`. To support it, - change `.started()` -> the more appropriate `.started_nowait()` which can be called sync from the aio child task. - adjust the `provide_channels` assert to accept either fn sig declaration (for now). Still needs test(s) obvi.. --- tractor/to_asyncio.py | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index a9096f7f..9c7b12b4 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -209,10 +209,15 @@ class LinkedTaskChannel( async def aclose(self) -> None: await self._from_aio.aclose() - def started( + # ?TODO? async version of this? + def started_nowait( self, val: Any = None, ) -> None: + ''' + Synchronize aio-sde with its trio-parent. + + ''' self._aio_started_val = val return self._to_trio.send_nowait(val) @@ -349,18 +354,6 @@ def _run_asyncio_task( # value otherwise it would just return ;P assert qsize > 1 - if provide_channels: - assert 'to_trio' in args - - # allow target func to accept/stream results manually by name - if 'to_trio' in args: - kwargs['to_trio'] = to_trio - - if 'from_trio' in args: - kwargs['from_trio'] = from_trio - - coro = func(**kwargs) - trio_task: trio.Task = trio.lowlevel.current_task() trio_cs = trio.CancelScope() aio_task_complete = trio.Event() @@ -375,6 +368,25 @@ def _run_asyncio_task( _suppress_graceful_exits=suppress_graceful_exits, ) + # allow target func to accept/stream results manually by name + if 'to_trio' in args: + kwargs['to_trio'] = to_trio + + if 'from_trio' in args: + kwargs['from_trio'] = from_trio + + if 'chan' in args: + kwargs['chan'] = chan + + if provide_channels: + assert ( + 'to_trio' in args + or + 'chan' in args + ) + + coro = func(**kwargs) + async def wait_on_coro_final_result( to_trio: trio.MemorySendChannel, coro: Awaitable, From b74e93ee555e5534a60212d0d595761c7b4e0e59 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jul 2025 14:47:24 -0400 Subject: [PATCH 4/5] Change one infected-aio test to use `chan` in fn sig --- tests/test_infected_asyncio.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 10bbd52f..f11a4eed 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -1089,10 +1089,14 @@ def test_sigint_closes_lifetime_stack( -# asyncio.Task fn +# ?TODO asyncio.Task fn-deco? +# -[ ] do sig checkingat import time like @context? +# -[ ] maybe name it @aio_task ?? +# -[ ] chan: to_asyncio.InterloopChannel ?? async def raise_before_started( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, + # from_trio: asyncio.Queue, + # to_trio: trio.abc.SendChannel, + chan: to_asyncio.LinkedTaskChannel, ) -> None: ''' @@ -1103,7 +1107,8 @@ async def raise_before_started( await asyncio.sleep(0.2) raise RuntimeError('Some shite went wrong before `.send_nowait()`!!') - to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??') + # to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??') + chan.started_nowait('Uhh we shouldve RTE-d ^^ ??') await asyncio.sleep(float('inf')) @@ -1145,12 +1150,18 @@ async def caching_ep( await trio.sleep_forever() -# TODO, simulates connection-err from `piker.brokers.ib.api`.. def test_aio_side_raises_before_started( reg_addr: tuple[str, int], debug_mode: bool, loglevel: str, ): + ''' + Simulates connection-err from `piker.brokers.ib.api`.. + + Ensure any error raised by child-`asyncio.Task` BEFORE + `chan.started()` + + ''' # delay = 999 if debug_mode else 1 async def main(): with trio.fail_after(3): From 0fafd25f0d8b1fdd92b2f79aaea69e92feedfbb0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Aug 2025 12:33:47 -0400 Subject: [PATCH 5/5] Comment tweaks per copilot review --- tractor/to_asyncio.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 9c7b12b4..bd46a5ab 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -215,7 +215,7 @@ class LinkedTaskChannel( val: Any = None, ) -> None: ''' - Synchronize aio-sde with its trio-parent. + Synchronize aio-side with its trio-parent. ''' self._aio_started_val = val @@ -459,14 +459,22 @@ def _run_asyncio_task( f'Task exited with final result: {result!r}\n' ) - # only close the aio (child) side which will relay - # a `trio.EndOfChannel` to the trio (parent) side. + # XXX ALWAYS close the child-`asyncio`-task-side's + # `to_trio` handle which will in turn relay + # a `trio.EndOfChannel` to the `trio`-parent. + # Consequently the parent `trio` task MUST ALWAYS + # check for any `chan._aio_err` to be raised when it + # receives an EoC. + # + # NOTE, there are 2 EoC cases, + # - normal/graceful EoC due to the aio-side actually + # terminating its "streaming", but the task did not + # error and is not yet complete. + # + # - the aio-task terminated and we specially mark the + # closure as due to the `asyncio.Task`'s exit. # - # XXX NOTE, that trio-side MUST then in such cases - # check for a `chan._aio_err` and raise it!! to_trio.close() - # specially mark the closure as due to the - # asyncio.Task terminating! chan._closed_by_aio_task = True aio_task_complete.set() @@ -846,8 +854,6 @@ async def translate_aio_errors( chan._trio_to_raise = aio_err trio_err = chan._trio_err = eoc # - # await tractor.pause(shield=True) - # # ?TODO?, raise something like a, # chan._trio_to_raise = AsyncioErrored() # BUT, with the tb rewritten to reflect the underlying