diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index edd7ee47..f11a4eed 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -571,14 +571,16 @@ def test_basic_interloop_channel_stream( fan_out: bool, ): async def main(): - async with tractor.open_nursery() as an: - portal = await an.run_in_actor( - stream_from_aio, - infect_asyncio=True, - fan_out=fan_out, - ) - # should raise RAE diectly - await portal.result() + # TODO, figure out min timeout here! + with trio.fail_after(6): + async with tractor.open_nursery() as an: + portal = await an.run_in_actor( + stream_from_aio, + infect_asyncio=True, + fan_out=fan_out, + ) + # should raise RAE diectly + await portal.result() trio.run(main) @@ -1086,6 +1088,108 @@ def test_sigint_closes_lifetime_stack( trio.run(main) + +# ?TODO asyncio.Task fn-deco? +# -[ ] do sig checkingat import time like @context? +# -[ ] maybe name it @aio_task ?? +# -[ ] chan: to_asyncio.InterloopChannel ?? +async def raise_before_started( + # from_trio: asyncio.Queue, + # to_trio: trio.abc.SendChannel, + chan: to_asyncio.LinkedTaskChannel, + +) -> None: + ''' + `asyncio.Task` entry point which RTEs before calling + `to_trio.send_nowait()`. + + ''' + await asyncio.sleep(0.2) + raise RuntimeError('Some shite went wrong before `.send_nowait()`!!') + + # to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??') + chan.started_nowait('Uhh we shouldve RTE-d ^^ ??') + await asyncio.sleep(float('inf')) + + +@tractor.context +async def caching_ep( + ctx: tractor.Context, +): + + log = tractor.log.get_logger('caching_ep') + log.info('syncing via `ctx.started()`') + await ctx.started() + + # XXX, allocate the `open_channel_from()` inside + # a `.trionics.maybe_open_context()`. + chan: to_asyncio.LinkedTaskChannel + async with ( + tractor.trionics.maybe_open_context( + acm_func=tractor.to_asyncio.open_channel_from, + kwargs={ + 'target': raise_before_started, + # ^XXX, kwarg to `open_channel_from()` + }, + + # lock around current actor task access + key=tractor.current_actor().uid, + + ) as (cache_hit, (clients, chan)), + ): + if cache_hit: + log.error( + 'Re-using cached `.open_from_channel()` call!\n' + ) + + else: + log.info( + 'Allocating SHOULD-FAIL `.open_from_channel()`\n' + ) + + await trio.sleep_forever() + + +def test_aio_side_raises_before_started( + reg_addr: tuple[str, int], + debug_mode: bool, + loglevel: str, +): + ''' + Simulates connection-err from `piker.brokers.ib.api`.. + + Ensure any error raised by child-`asyncio.Task` BEFORE + `chan.started()` + + ''' + # delay = 999 if debug_mode else 1 + async def main(): + with trio.fail_after(3): + an: tractor.ActorNursery + async with tractor.open_nursery( + debug_mode=debug_mode, + loglevel=loglevel, + ) as an: + p: tractor.Portal = await an.start_actor( + 'lchan_cacher_that_raises_fast', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + caching_ep, + ) as (ctx, first): + assert not first + + with pytest.raises( + expected_exception=(RemoteActorError), + ) as excinfo: + trio.run(main) + + # ensure `asyncio.Task` exception is bubbled + # allll the way erp!! + rae = excinfo.value + assert rae.boxed_type is RuntimeError + # TODO: debug_mode tests once we get support for `asyncio`! # # -[ ] need tests to wrap both scripts: diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 63c20bc1..bd46a5ab 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -130,6 +130,7 @@ class LinkedTaskChannel( _trio_task: trio.Task _aio_task_complete: trio.Event + _closed_by_aio_task: bool = False _suppress_graceful_exits: bool = True _trio_err: BaseException|None = None @@ -208,10 +209,15 @@ class LinkedTaskChannel( async def aclose(self) -> None: await self._from_aio.aclose() - def started( + # ?TODO? async version of this? + def started_nowait( self, val: Any = None, ) -> None: + ''' + Synchronize aio-side with its trio-parent. + + ''' self._aio_started_val = val return self._to_trio.send_nowait(val) @@ -242,6 +248,7 @@ class LinkedTaskChannel( # cycle on the trio side? # await trio.lowlevel.checkpoint() return await self._from_aio.receive() + except BaseException as err: async with translate_aio_errors( chan=self, @@ -319,7 +326,7 @@ def _run_asyncio_task( qsize: int = 1, provide_channels: bool = False, suppress_graceful_exits: bool = True, - hide_tb: bool = False, + hide_tb: bool = True, **kwargs, ) -> LinkedTaskChannel: @@ -347,18 +354,6 @@ def _run_asyncio_task( # value otherwise it would just return ;P assert qsize > 1 - if provide_channels: - assert 'to_trio' in args - - # allow target func to accept/stream results manually by name - if 'to_trio' in args: - kwargs['to_trio'] = to_trio - - if 'from_trio' in args: - kwargs['from_trio'] = from_trio - - coro = func(**kwargs) - trio_task: trio.Task = trio.lowlevel.current_task() trio_cs = trio.CancelScope() aio_task_complete = trio.Event() @@ -373,6 +368,25 @@ def _run_asyncio_task( _suppress_graceful_exits=suppress_graceful_exits, ) + # allow target func to accept/stream results manually by name + if 'to_trio' in args: + kwargs['to_trio'] = to_trio + + if 'from_trio' in args: + kwargs['from_trio'] = from_trio + + if 'chan' in args: + kwargs['chan'] = chan + + if provide_channels: + assert ( + 'to_trio' in args + or + 'chan' in args + ) + + coro = func(**kwargs) + async def wait_on_coro_final_result( to_trio: trio.MemorySendChannel, coro: Awaitable, @@ -445,9 +459,23 @@ def _run_asyncio_task( f'Task exited with final result: {result!r}\n' ) - # only close the sender side which will relay - # a `trio.EndOfChannel` to the trio (consumer) side. + # XXX ALWAYS close the child-`asyncio`-task-side's + # `to_trio` handle which will in turn relay + # a `trio.EndOfChannel` to the `trio`-parent. + # Consequently the parent `trio` task MUST ALWAYS + # check for any `chan._aio_err` to be raised when it + # receives an EoC. + # + # NOTE, there are 2 EoC cases, + # - normal/graceful EoC due to the aio-side actually + # terminating its "streaming", but the task did not + # error and is not yet complete. + # + # - the aio-task terminated and we specially mark the + # closure as due to the `asyncio.Task`'s exit. + # to_trio.close() + chan._closed_by_aio_task = True aio_task_complete.set() log.runtime( @@ -645,8 +673,9 @@ def _run_asyncio_task( not trio_cs.cancel_called ): log.cancel( - f'Cancelling `trio` side due to aio-side src exc\n' - f'{curr_aio_err}\n' + f'Cancelling trio-side due to aio-side src exc\n' + f'\n' + f'{curr_aio_err!r}\n' f'\n' f'(c>\n' f' |_{trio_task}\n' @@ -758,6 +787,7 @@ async def translate_aio_errors( aio_done_before_trio: bool = aio_task.done() assert aio_task trio_err: BaseException|None = None + eoc: trio.EndOfChannel|None = None try: yield # back to one of the cross-loop apis except trio.Cancelled as taskc: @@ -789,12 +819,48 @@ async def translate_aio_errors( # ) # raise - # XXX always passthrough EoC since this translator is often - # called from `LinkedTaskChannel.receive()` which we want - # passthrough and further we have no special meaning for it in - # terms of relaying errors or signals from the aio side! - except trio.EndOfChannel as eoc: - trio_err = chan._trio_err = eoc + # XXX EoC is a special SIGNAL from the aio-side here! + # There are 2 cases to handle: + # 1. the "EoC passthrough" case. + # - the aio-task actually closed the channel "gracefully" and + # the trio-task should unwind any ongoing channel + # iteration/receiving, + # |_this exc-translator wraps calls to `LinkedTaskChannel.receive()` + # in which case we want to relay the actual "end-of-chan" for + # iteration purposes. + # + # 2. relaying the "asyncio.Task termination" case. + # - if the aio-task terminates, maybe with an error, AND the + # `open_channel_from()` API was used, it will always signal + # that termination. + # |_`wait_on_coro_final_result()` always calls + # `to_trio.close()` when `provide_channels=True` so we need to + # always check if there is an aio-side exc which needs to be + # relayed to the parent trio side! + # |_in this case the special `chan._closed_by_aio_task` is + # ALWAYS set. + # + except trio.EndOfChannel as _eoc: + eoc = _eoc + if ( + chan._closed_by_aio_task + and + aio_err + ): + log.cancel( + f'The asyncio-child task terminated due to error\n' + f'{aio_err!r}\n' + ) + chan._trio_to_raise = aio_err + trio_err = chan._trio_err = eoc + # + # ?TODO?, raise something like a, + # chan._trio_to_raise = AsyncioErrored() + # BUT, with the tb rewritten to reflect the underlying + # call stack? + else: + trio_err = chan._trio_err = eoc + raise eoc # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio @@ -1047,7 +1113,7 @@ async def translate_aio_errors( # if wait_on_aio_task: await chan._aio_task_complete.wait() - log.info( + log.debug( 'asyncio-task is done and unblocked trio-side!\n' ) @@ -1064,11 +1130,17 @@ async def translate_aio_errors( trio_to_raise: ( AsyncioCancelled| AsyncioTaskExited| + Exception| # relayed from aio-task None ) = chan._trio_to_raise + raise_from: Exception = ( + trio_err if (aio_err is trio_to_raise) + else aio_err + ) + if not suppress_graceful_exits: - raise trio_to_raise from (aio_err or trio_err) + raise trio_to_raise from raise_from if trio_to_raise: match ( @@ -1101,7 +1173,7 @@ async def translate_aio_errors( ) return case _: - raise trio_to_raise from (aio_err or trio_err) + raise trio_to_raise from raise_from # Check if the asyncio-side is the cause of the trio-side # error. @@ -1167,7 +1239,6 @@ async def run_task( @acm async def open_channel_from( - target: Callable[..., Any], suppress_graceful_exits: bool = True, **target_kwargs, @@ -1201,7 +1272,6 @@ async def open_channel_from( # deliver stream handle upward yield first, chan except trio.Cancelled as taskc: - # await tractor.pause(shield=True) # ya it worx ;) if cs.cancel_called: if isinstance(chan._trio_to_raise, AsyncioCancelled): log.cancel(