From ca5f6f50a89264787f3001caccc26818c5645897 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Sep 2025 12:33:43 -0400 Subject: [PATCH 1/7] Explain the `infect_asyncio: bool` param to pass in RTE msg --- tractor/to_asyncio.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 21566beb..3f059f92 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -337,9 +337,12 @@ def _run_asyncio_task( ''' __tracebackhide__: bool = hide_tb - if not tractor.current_actor().is_infected_aio(): + if not (actor := tractor.current_actor()).is_infected_aio(): raise RuntimeError( - "`infect_asyncio` mode is not enabled!?" + f'`infect_asyncio: bool` mode is not enabled ??\n' + f'Ensure you pass `ActorNursery.start_actor(infect_asyncio=True)`\n' + f'\n' + f'{actor}\n' ) # ITC (inter task comms), these channel/queue names are mostly from From 1f2fad22eef720ef3502f0e16ad36cc1c82c53da Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 21 Sep 2025 15:53:45 -0400 Subject: [PATCH 2/7] Extend `.to_asyncio.LinkedTaskChannel` for aio side With methods to comms similar to those that exist for the `trio` side, - `.get()` which proxies verbatim to the `._to_aio: asyncio.Queue`, - `.send_nowait()` which thin-wraps to `._to_trio: trio.MemorySendChannel`. Obviously the more correct design is to break up the channel type into a pair of handle types, one for each "side's" task in each event-loop, that's hopefully coming shortly in a follow up patch B) Also, - fill in some missing doc strings, tweak some explanation comments and update todos. - adjust the `test_aio_errors_and_channel_propagates_and_closes()` suite to use the new `chan` fn-sig-API with `.open_channel_from()` including the new methods for msg comms; ensures everything added here works e2e. --- tests/test_infected_asyncio.py | 32 ++++++++++++++-------- tractor/to_asyncio.py | 50 ++++++++++++++++++++++++++++------ 2 files changed, 62 insertions(+), 20 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index f11a4eed..51ca6115 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -732,15 +732,21 @@ def test_aio_errors_and_channel_propagates_and_closes( async def aio_echo_server( - to_trio: trio.MemorySendChannel, - from_trio: asyncio.Queue, + chan: to_asyncio.LinkedTaskChannel, ) -> None: + ''' + An IPC-msg "echo server" with msgs received and relayed by + a parent `trio.Task` into a child `asyncio.Task` + and then repeated back to that local parent (`trio.Task`) + and sent again back to the original calling remote actor. - to_trio.send_nowait('start') + ''' + # same semantics as `trio.TaskStatus.started()` + chan.started_nowait('start') while True: try: - msg = await from_trio.get() + msg = await chan.get() except to_asyncio.TrioTaskExited: print( 'breaking aio echo loop due to `trio` exit!' @@ -748,7 +754,7 @@ async def aio_echo_server( break # echo the msg back - to_trio.send_nowait(msg) + chan.send_nowait(msg) # if we get the terminate sentinel # break the echo loop @@ -765,7 +771,10 @@ async def trio_to_aio_echo_server( ): async with to_asyncio.open_channel_from( aio_echo_server, - ) as (first, chan): + ) as ( + first, # value from `chan.started_nowait()` above + chan, + ): assert first == 'start' await ctx.started(first) @@ -776,7 +785,8 @@ async def trio_to_aio_echo_server( await chan.send(msg) out = await chan.receive() - # echo back to parent actor-task + + # echo back to parent-actor's remote parent-ctx-task! await stream.send(out) if out is None: @@ -1090,14 +1100,12 @@ def test_sigint_closes_lifetime_stack( # ?TODO asyncio.Task fn-deco? -# -[ ] do sig checkingat import time like @context? -# -[ ] maybe name it @aio_task ?? # -[ ] chan: to_asyncio.InterloopChannel ?? +# -[ ] do fn-sig checking at import time like @context? +# |_[ ] maybe name it @a(sync)io_task ?? +# @asyncio_task <- not bad ?? async def raise_before_started( - # from_trio: asyncio.Queue, - # to_trio: trio.abc.SendChannel, chan: to_asyncio.LinkedTaskChannel, - ) -> None: ''' `asyncio.Task` entry point which RTEs before calling diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 3f059f92..258ba515 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -94,10 +94,14 @@ else: QueueShutDown = False -# TODO, generally speaking we can generalize this abstraction, a "SC linked -# parent->child task pair", as the same "supervision scope primitive" -# **that is** our `._context.Context` with the only difference being -# in how the tasks conduct msg-passing comms. +# TODO, generally speaking we can generalize this abstraction as, +# +# > A "SC linked, inter-event-loop" channel for comms between +# > a `parent: trio.Task` -> `child: asyncio.Task` pair. +# +# It is **very similar** in terms of its operation as a "supervision +# scope primitive" to that of our `._context.Context` with the only +# difference being in how the tasks conduct msg-passing comms. # # For `LinkedTaskChannel` we are passing the equivalent of (once you # include all the recently added `._trio/aio_to_raise` @@ -122,6 +126,7 @@ class LinkedTaskChannel( task scheduled in the host loop. ''' + # ?TODO, rename as `._aio_q` since it's 2-way? _to_aio: asyncio.Queue _from_aio: trio.MemoryReceiveChannel @@ -235,9 +240,11 @@ class LinkedTaskChannel( # async def receive(self) -> Any: ''' - Receive a value from the paired `asyncio.Task` with + Receive a value `trio.Task` <- `asyncio.Task`. + + Note the tasks in each loop are "SC linked" as a pair with exception/cancel handling to teardown both sides on any - unexpected error. + unexpected error or cancellation. ''' try: @@ -261,15 +268,42 @@ class LinkedTaskChannel( ): raise err + async def get(self) -> Any: + ''' + Receive a value `asyncio.Task` <- `trio.Task`. + + This is equiv to `await self._from_trio.get()`. + + ''' + return await self._to_aio.get() + async def send(self, item: Any) -> None: ''' - Send a value through to the asyncio task presuming - it defines a ``from_trio`` argument, if it does not + Send a value through `trio.Task` -> `asyncio.Task` + presuming + it defines a `from_trio` argument or makes calls + to `chan.get()` , if it does not this method will raise an error. ''' self._to_aio.put_nowait(item) + # TODO? could we only compile-in this method on an instance + # handed to the `asyncio`-side, i.e. the fn invoked with + # `.open_channel_from()`. + def send_nowait( + self, + item: Any, + ) -> None: + ''' + Send a value through FROM the `asyncio.Task` to + the `trio.Task` NON-BLOCKING. + + This is equiv to `self._to_trio.send_nowait()`. + + ''' + self._to_trio.send_nowait(item) + # TODO? needed? # async def wait_aio_complete(self) -> None: # await self._aio_task_complete.wait() From 36cbc076022e69fff80716c5ef7f8d02ea6ce834 Mon Sep 17 00:00:00 2001 From: goodboy Date: Sun, 8 Feb 2026 15:09:54 -0500 Subject: [PATCH 3/7] Tried out an alt approach for `.to_asyncio` crashes This change is masked out now BUT i'm leaving it in for reference. I was debugging a multi-actor fault where the primary source actor was an infected-aio-subactor (`brokerd.ib`) and it seemed like the REPL was only entering on the `trio` side (at a `.open_channel_from()`) and not eventually breaking in the `asyncio.Task`. But, since (changing something?) it seems to be working now, it's just that the `trio` side seems to sometimes handle before the (source/causing and more child-ish) `asyncio`-task, which is a bit odd and not expected.. We could likely refine (maybe with an inter-loop-task REPL lock?) this at some point and ensure a child-`asyncio` task which errors always grabs the REPL **first**? Lowlevel deats/further-todos, - add (masked) `maybe_open_crash_handler()` block around `asyncio.Task` execution with notes about weird parent-addr delivery bug in `test_sync_pause_from_aio_task` * yeah dunno what that's about but made a bug; seems to be IPC serialization of the `TCPAddress` struct somewhere?? - add inter-loop lock TODO for avoiding aio-task clobbering trio-tasks when both crash in debug-mode Also, - change import from `tractor.devx.debug` to `tractor.devx` - adjust `get_logger()` call to use new implicit mod-name detection added to `.log.get_logger()`, i.e. sin `name=__name__`. - some teensie refinements to `open_channel_from()`: * swap return type annotation for to `tuple[LinkedTaskChannel, Any]` (was `Any`). * update doc-string to clarify started-value delivery * add err-log before `.pause()` in what should be an unreachable path. * add todo to swap the `(first, chan)` pair to match that of ctx.. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/to_asyncio.py | 43 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 258ba515..09f0781c 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -48,7 +48,7 @@ from tractor._state import ( _runtime_vars, ) from tractor._context import Unresolved -from tractor.devx import debug +from tractor import devx from tractor.log import ( get_logger, StackLevelAdapter, @@ -439,7 +439,23 @@ def _run_asyncio_task( orig = result = id(coro) try: + # XXX TODO UGH! + # this seems to break a `test_sync_pause_from_aio_task` + # in a REALLY weird way where a `dict` value for + # `_runtime_vars['_root_addrs']` is delivered from the + # parent actor?? + # + # XXX => see masked `.set_trace()` block in + # `Actor.from_parent()`.. + # + # with devx.maybe_open_crash_handler( + # # XXX, if trio-side exits (intentionally) we + # # shouldn't care bc it should have its own crash + # # handling logic. + # ignore={TrioTaskExited,}, + # ) as _bxerr: result: Any = await coro + chan._aio_result = result except BaseException as aio_err: chan._aio_err = aio_err @@ -546,7 +562,7 @@ def _run_asyncio_task( if ( debug_mode() and - (greenback := debug.maybe_import_greenback( + (greenback := devx.debug.maybe_import_greenback( force_reload=True, raise_not_found=False, )) @@ -946,7 +962,11 @@ async def translate_aio_errors( except BaseException as _trio_err: trio_err = chan._trio_err = _trio_err # await tractor.pause(shield=True) # workx! - entered: bool = await debug._maybe_enter_pm( + + # !TODO! we need an inter-loop lock here to avoid aio-tasks + # clobbering trio ones when both crash in debug-mode! + # + entered: bool = await devx.debug._maybe_enter_pm( trio_err, api_frame=inspect.currentframe(), ) @@ -1280,10 +1300,17 @@ async def open_channel_from( suppress_graceful_exits: bool = True, **target_kwargs, -) -> AsyncIterator[Any]: +) -> AsyncIterator[ + tuple[LinkedTaskChannel, Any] +]: ''' - Open an inter-loop linked task channel for streaming between a target - spawned ``asyncio`` task and ``trio``. + Start an `asyncio.Task` as `target()` and open an inter-loop + (linked) channel for streaming between it and the current + `trio.Task`. + + A pair `(chan: LinkedTaskChannel, Any)` is delivered to the caller + where the 2nd element is the value provided by the + `asyncio.Task`'s unblocking call to `chan.started_nowait()`. ''' chan: LinkedTaskChannel = _run_asyncio_task( @@ -1308,6 +1335,7 @@ async def open_channel_from( # deliver stream handle upward yield first, chan + # ^TODO! swap these!! except trio.Cancelled as taskc: if cs.cancel_called: if isinstance(chan._trio_to_raise, AsyncioCancelled): @@ -1338,7 +1366,8 @@ async def open_channel_from( ) else: # XXX SHOULD NEVER HAPPEN! - await tractor.pause() + log.error("SHOULD NEVER GET HERE !?!?") + await tractor.pause(shield=True) else: chan._to_trio.close() From 417b796169b524cb7f06e4794b29e20d182beed2 Mon Sep 17 00:00:00 2001 From: goodboy Date: Tue, 10 Mar 2026 18:28:50 -0400 Subject: [PATCH 4/7] Use `chan: LinkedTaskChannel` API in all aio-task fns Convert every remaining `to_trio`/`from_trio` fn-sig style to the new unified `chan: LinkedTaskChannel` iface added in prior commit (c46e9ee8). Deats, - `to_trio.send_nowait(val)` (1st call) -> `chan.started_nowait(val)` - `to_trio.send_nowait(val)` (subsequent) -> `chan.send_nowait(val)` - `await from_trio.get()` -> `await chan.get()` Converted fns, - `sleep_and_err()`, `push_from_aio_task()` in `tests/test_infected_asyncio.py` - `sync_and_err()` in `tests/test_root_infect_asyncio.py` - `aio_streamer()` in `tests/test_child_manages_service_nursery.py` - `aio_echo_server()` in `examples/infected_asyncio_echo_server.py` - `bp_then_error()` in `examples/debugging/asyncio_bp.py` Also, - drop stale comments referencing old param names. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- examples/debugging/asyncio_bp.py | 5 ++--- examples/infected_asyncio_echo_server.py | 10 +++------- tests/test_child_manages_service_nursery.py | 7 +++---- tests/test_infected_asyncio.py | 19 +++++++------------ tests/test_root_infect_asyncio.py | 7 +++---- 5 files changed, 18 insertions(+), 30 deletions(-) diff --git a/examples/debugging/asyncio_bp.py b/examples/debugging/asyncio_bp.py index fc3b222a..348e990f 100644 --- a/examples/debugging/asyncio_bp.py +++ b/examples/debugging/asyncio_bp.py @@ -18,15 +18,14 @@ async def aio_sleep_forever(): async def bp_then_error( - to_trio: trio.MemorySendChannel, - from_trio: asyncio.Queue, + chan: to_asyncio.LinkedTaskChannel, raise_after_bp: bool = True, ) -> None: # sync with `trio`-side (caller) task - to_trio.send_nowait('start') + chan.started_nowait('start') # NOTE: what happens here inside the hook needs some refinement.. # => seems like it's still `.debug._set_trace()` but diff --git a/examples/infected_asyncio_echo_server.py b/examples/infected_asyncio_echo_server.py index 02508351..20914089 100644 --- a/examples/infected_asyncio_echo_server.py +++ b/examples/infected_asyncio_echo_server.py @@ -11,21 +11,17 @@ import tractor async def aio_echo_server( - to_trio: trio.MemorySendChannel, - from_trio: asyncio.Queue, - + chan: tractor.to_asyncio.LinkedTaskChannel, ) -> None: # a first message must be sent **from** this ``asyncio`` # task or the ``trio`` side will never unblock from # ``tractor.to_asyncio.open_channel_from():`` - to_trio.send_nowait('start') + chan.started_nowait('start') - # XXX: this uses an ``from_trio: asyncio.Queue`` currently but we - # should probably offer something better. while True: # echo the msg back - to_trio.send_nowait(await from_trio.get()) + chan.send_nowait(await chan.get()) await asyncio.sleep(0) diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py index 6379afc6..0c42dd93 100644 --- a/tests/test_child_manages_service_nursery.py +++ b/tests/test_child_manages_service_nursery.py @@ -18,16 +18,15 @@ from tractor import RemoteActorError async def aio_streamer( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, + chan: tractor.to_asyncio.LinkedTaskChannel, ) -> trio.abc.ReceiveChannel: # required first msg to sync caller - to_trio.send_nowait(None) + chan.started_nowait(None) from itertools import cycle for i in cycle(range(10)): - to_trio.send_nowait(i) + chan.send_nowait(i) await asyncio.sleep(0.01) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 51ca6115..2ba932b1 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -47,12 +47,11 @@ async def sleep_and_err( # just signature placeholders for compat with # ``to_asyncio.open_channel_from()`` - to_trio: trio.MemorySendChannel|None = None, - from_trio: asyncio.Queue|None = None, + chan: to_asyncio.LinkedTaskChannel|None = None, ): - if to_trio: - to_trio.send_nowait('start') + if chan: + chan.started_nowait('start') await asyncio.sleep(sleep_for) assert 0 @@ -399,7 +398,7 @@ async def no_to_trio_in_args(): async def push_from_aio_task( sequence: Iterable, - to_trio: trio.abc.SendChannel, + chan: to_asyncio.LinkedTaskChannel, expect_cancel: False, fail_early: bool, exit_early: bool, @@ -407,15 +406,12 @@ async def push_from_aio_task( ) -> None: try: - # print('trying breakpoint') - # breakpoint() - # sync caller ctx manager - to_trio.send_nowait(True) + chan.started_nowait(True) for i in sequence: print(f'asyncio sending {i}') - to_trio.send_nowait(i) + chan.send_nowait(i) await asyncio.sleep(0.001) if ( @@ -1109,13 +1105,12 @@ async def raise_before_started( ) -> None: ''' `asyncio.Task` entry point which RTEs before calling - `to_trio.send_nowait()`. + `chan.started_nowait()`. ''' await asyncio.sleep(0.2) raise RuntimeError('Some shite went wrong before `.send_nowait()`!!') - # to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??') chan.started_nowait('Uhh we shouldve RTE-d ^^ ??') await asyncio.sleep(float('inf')) diff --git a/tests/test_root_infect_asyncio.py b/tests/test_root_infect_asyncio.py index 78f9b2b4..159aa3d5 100644 --- a/tests/test_root_infect_asyncio.py +++ b/tests/test_root_infect_asyncio.py @@ -91,13 +91,12 @@ def test_infected_root_actor( async def sync_and_err( # just signature placeholders for compat with # ``to_asyncio.open_channel_from()`` - to_trio: trio.MemorySendChannel, - from_trio: asyncio.Queue, + chan: tractor.to_asyncio.LinkedTaskChannel, ev: asyncio.Event, ): - if to_trio: - to_trio.send_nowait('start') + if chan: + chan.started_nowait('start') await ev.wait() raise RuntimeError('asyncio-side') From e89fe03da7ae129acadae23b8d28b4b33b920ddb Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 11 Mar 2026 23:11:30 -0400 Subject: [PATCH 5/7] Fix `LinkedTaskChannel` docstrings from GH bot review Address valid findings from copilot's PR #413 review (https://github.com/goodboy/tractor/pull/413 #pullrequestreview-3925876037): - `.get()` docstring referenced non-existent `._from_trio` attr, correct to `._to_aio`. - `.send()` docstring falsely claimed error-raising on missing `from_trio` arg; reword to describe the actual `.put_nowait()` enqueue behaviour. - `.open_channel_from()` return type annotation had `tuple[LinkedTaskChannel, Any]` but `yield` order is `(first, chan)`; fix annotation + docstring to match actual `tuple[Any, LinkedTaskChannel]`. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/to_asyncio.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 09f0781c..3769bdd0 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -272,18 +272,16 @@ class LinkedTaskChannel( ''' Receive a value `asyncio.Task` <- `trio.Task`. - This is equiv to `await self._from_trio.get()`. + This is equiv to `await self._to_aio.get()`. ''' return await self._to_aio.get() async def send(self, item: Any) -> None: ''' - Send a value through `trio.Task` -> `asyncio.Task` - presuming - it defines a `from_trio` argument or makes calls - to `chan.get()` , if it does not - this method will raise an error. + Send a value `trio.Task` -> `asyncio.Task` + by enqueuing `item` onto the internal + `asyncio.Queue` via `put_nowait()`. ''' self._to_aio.put_nowait(item) @@ -1301,16 +1299,17 @@ async def open_channel_from( **target_kwargs, ) -> AsyncIterator[ - tuple[LinkedTaskChannel, Any] + tuple[Any, LinkedTaskChannel] ]: ''' - Start an `asyncio.Task` as `target()` and open an inter-loop - (linked) channel for streaming between it and the current - `trio.Task`. + Start an `asyncio.Task` as `target()` and open an + inter-loop (linked) channel for streaming between + it and the current `trio.Task`. - A pair `(chan: LinkedTaskChannel, Any)` is delivered to the caller - where the 2nd element is the value provided by the - `asyncio.Task`'s unblocking call to `chan.started_nowait()`. + A pair `(Any, chan: LinkedTaskChannel)` is delivered + to the caller where the 1st element is the value + provided by the `asyncio.Task`'s unblocking call + to `chan.started_nowait()`. ''' chan: LinkedTaskChannel = _run_asyncio_task( From b3ce5ab4f649c6a6e63f73d0ec9a6844744572bc Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 12 Mar 2026 16:32:44 -0400 Subject: [PATCH 6/7] Swap `open_channel_from()` to yield `(chan, first)` Deliver `(LinkedTaskChannel, Any)` instead of the prior `(first, chan)` order from `open_channel_from()` to match the type annotation and be consistent with `trio.open_*_channel()` style where the channel obj comes first. - flip `yield first, chan` -> `yield chan, first` - update type annotation + docstring to match - swap all unpack sites in tests and examples (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- examples/debugging/asyncio_bp.py | 2 +- examples/infected_asyncio_echo_server.py | 2 +- tests/test_child_manages_service_nursery.py | 2 +- tests/test_infected_asyncio.py | 6 +++--- tests/test_root_infect_asyncio.py | 4 ++-- tractor/to_asyncio.py | 9 ++++----- 6 files changed, 12 insertions(+), 13 deletions(-) diff --git a/examples/debugging/asyncio_bp.py b/examples/debugging/asyncio_bp.py index 348e990f..3e3247bb 100644 --- a/examples/debugging/asyncio_bp.py +++ b/examples/debugging/asyncio_bp.py @@ -59,7 +59,7 @@ async def trio_ctx( to_asyncio.open_channel_from( bp_then_error, # raise_after_bp=not bp_before_started, - ) as (first, chan), + ) as (chan, first), trio.open_nursery() as tn, ): diff --git a/examples/infected_asyncio_echo_server.py b/examples/infected_asyncio_echo_server.py index 20914089..e8d29dc3 100644 --- a/examples/infected_asyncio_echo_server.py +++ b/examples/infected_asyncio_echo_server.py @@ -33,7 +33,7 @@ async def trio_to_aio_echo_server( # message. async with tractor.to_asyncio.open_channel_from( aio_echo_server, - ) as (first, chan): + ) as (chan, first): assert first == 'start' await ctx.started(first) diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py index 0c42dd93..820d9ca0 100644 --- a/tests/test_child_manages_service_nursery.py +++ b/tests/test_child_manages_service_nursery.py @@ -68,7 +68,7 @@ async def wrapper_mngr( else: async with tractor.to_asyncio.open_channel_from( aio_streamer, - ) as (first, from_aio): + ) as (from_aio, first): assert not first # cache it so next task uses broadcast receiver diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 2ba932b1..7b1e952c 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -237,7 +237,7 @@ async def trio_ctx( trio.open_nursery() as tn, tractor.to_asyncio.open_channel_from( sleep_and_err, - ) as (first, chan), + ) as (chan, first), ): assert first == 'start' @@ -474,7 +474,7 @@ async def stream_from_aio( trio_exit_early )) - ) as (first, chan): + ) as (chan, first): assert first is True @@ -768,8 +768,8 @@ async def trio_to_aio_echo_server( async with to_asyncio.open_channel_from( aio_echo_server, ) as ( - first, # value from `chan.started_nowait()` above chan, + first, # value from `chan.started_nowait()` above ): assert first == 'start' diff --git a/tests/test_root_infect_asyncio.py b/tests/test_root_infect_asyncio.py index 159aa3d5..e7a307bc 100644 --- a/tests/test_root_infect_asyncio.py +++ b/tests/test_root_infect_asyncio.py @@ -49,7 +49,7 @@ def test_infected_root_actor( ), to_asyncio.open_channel_from( aio_echo_server, - ) as (first, chan), + ) as (chan, first), ): assert first == 'start' @@ -173,7 +173,7 @@ def test_trio_prestarted_task_bubbles( sync_and_err, ev=aio_ev, ) - ) as (first, chan), + ) as (chan, first), ): for i in range(5): diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 3769bdd0..0da47475 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -1299,15 +1299,15 @@ async def open_channel_from( **target_kwargs, ) -> AsyncIterator[ - tuple[Any, LinkedTaskChannel] + tuple[LinkedTaskChannel, Any] ]: ''' Start an `asyncio.Task` as `target()` and open an inter-loop (linked) channel for streaming between it and the current `trio.Task`. - A pair `(Any, chan: LinkedTaskChannel)` is delivered - to the caller where the 1st element is the value + A pair `(chan: LinkedTaskChannel, Any)` is delivered + to the caller where the 2nd element is the value provided by the `asyncio.Task`'s unblocking call to `chan.started_nowait()`. @@ -1333,8 +1333,7 @@ async def open_channel_from( first = await chan.receive() # deliver stream handle upward - yield first, chan - # ^TODO! swap these!! + yield chan, first except trio.Cancelled as taskc: if cs.cancel_called: if isinstance(chan._trio_to_raise, AsyncioCancelled): From 359bcf691fd90a76968112a410d480e2f34a8652 Mon Sep 17 00:00:00 2001 From: goodboy Date: Fri, 13 Mar 2026 20:54:49 -0400 Subject: [PATCH 7/7] Update `docs/README.rst` to use `chan` API style Sync the inline "infected asyncio" echo-server example with the new `LinkedTaskChannel` iface from prior commits. - `to_trio`/`from_trio` params -> `chan: LinkedTaskChannel` - use `chan.started_nowait()`, `.send_nowait()`, `.get()` - swap yield order to `(chan, first)` - update blurb to describe the new unified channel API (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- docs/README.rst | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/docs/README.rst b/docs/README.rst index 1d8bbb9f..f68ebd9a 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -420,20 +420,17 @@ Check out our experimental system for `guest`_-mode controlled async def aio_echo_server( - to_trio: trio.MemorySendChannel, - from_trio: asyncio.Queue, + chan: tractor.to_asyncio.LinkedTaskChannel, ) -> None: # a first message must be sent **from** this ``asyncio`` # task or the ``trio`` side will never unblock from # ``tractor.to_asyncio.open_channel_from():`` - to_trio.send_nowait('start') + chan.started_nowait('start') - # XXX: this uses an ``from_trio: asyncio.Queue`` currently but we - # should probably offer something better. while True: # echo the msg back - to_trio.send_nowait(await from_trio.get()) + chan.send_nowait(await chan.get()) await asyncio.sleep(0) @@ -445,7 +442,7 @@ Check out our experimental system for `guest`_-mode controlled # message. async with tractor.to_asyncio.open_channel_from( aio_echo_server, - ) as (first, chan): + ) as (chan, first): assert first == 'start' await ctx.started(first) @@ -504,8 +501,10 @@ Yes, we spawn a python process, run ``asyncio``, start ``trio`` on the ``asyncio`` loop, then send commands to the ``trio`` scheduled tasks to tell ``asyncio`` tasks what to do XD -We need help refining the `asyncio`-side channel API to be more -`trio`-like. Feel free to sling your opinion in `#273`_! +The ``asyncio``-side task receives a single +``chan: LinkedTaskChannel`` handle providing a ``trio``-like +API: ``.started_nowait()``, ``.send_nowait()``, ``.get()`` +and more. Feel free to sling your opinion in `#273`_! .. _#273: https://github.com/goodboy/tractor/issues/273