From ab11ee4fbea28348ddb30400ed5e74dbece08a5f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Jul 2025 14:42:15 -0400 Subject: [PATCH] Support `chan.started_nowait()` in `.open_channel_from()` target That is the `target` can declare a `chan: LinkedTaskChannel` instead of `to_trio`/`from_aio`. To support it, - change `.started()` -> the more appropriate `.started_nowait()` which can be called sync from the aio child task. - adjust the `provide_channels` assert to accept either fn sig declaration (for now). Still needs test(s) obvi.. --- tractor/to_asyncio.py | 38 +++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index a9096f7f..9c7b12b4 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -209,10 +209,15 @@ class LinkedTaskChannel( async def aclose(self) -> None: await self._from_aio.aclose() - def started( + # ?TODO? async version of this? + def started_nowait( self, val: Any = None, ) -> None: + ''' + Synchronize aio-sde with its trio-parent. + + ''' self._aio_started_val = val return self._to_trio.send_nowait(val) @@ -349,18 +354,6 @@ def _run_asyncio_task( # value otherwise it would just return ;P assert qsize > 1 - if provide_channels: - assert 'to_trio' in args - - # allow target func to accept/stream results manually by name - if 'to_trio' in args: - kwargs['to_trio'] = to_trio - - if 'from_trio' in args: - kwargs['from_trio'] = from_trio - - coro = func(**kwargs) - trio_task: trio.Task = trio.lowlevel.current_task() trio_cs = trio.CancelScope() aio_task_complete = trio.Event() @@ -375,6 +368,25 @@ def _run_asyncio_task( _suppress_graceful_exits=suppress_graceful_exits, ) + # allow target func to accept/stream results manually by name + if 'to_trio' in args: + kwargs['to_trio'] = to_trio + + if 'from_trio' in args: + kwargs['from_trio'] = from_trio + + if 'chan' in args: + kwargs['chan'] = chan + + if provide_channels: + assert ( + 'to_trio' in args + or + 'chan' in args + ) + + coro = func(**kwargs) + async def wait_on_coro_final_result( to_trio: trio.MemorySendChannel, coro: Awaitable,