Merge pull request #413 from goodboy/to_asyncio_channel_iface

Extend the `to_asyncio` inter-loop-task channel iface
ns_aware
Bd 2026-03-13 21:09:13 -04:00 committed by GitHub
commit c9b415475f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 136 additions and 77 deletions

View File

@ -420,20 +420,17 @@ Check out our experimental system for `guest`_-mode controlled
async def aio_echo_server( async def aio_echo_server(
to_trio: trio.MemorySendChannel, chan: tractor.to_asyncio.LinkedTaskChannel,
from_trio: asyncio.Queue,
) -> None: ) -> None:
# a first message must be sent **from** this ``asyncio`` # a first message must be sent **from** this ``asyncio``
# task or the ``trio`` side will never unblock from # task or the ``trio`` side will never unblock from
# ``tractor.to_asyncio.open_channel_from():`` # ``tractor.to_asyncio.open_channel_from():``
to_trio.send_nowait('start') chan.started_nowait('start')
# XXX: this uses an ``from_trio: asyncio.Queue`` currently but we
# should probably offer something better.
while True: while True:
# echo the msg back # echo the msg back
to_trio.send_nowait(await from_trio.get()) chan.send_nowait(await chan.get())
await asyncio.sleep(0) await asyncio.sleep(0)
@ -445,7 +442,7 @@ Check out our experimental system for `guest`_-mode controlled
# message. # message.
async with tractor.to_asyncio.open_channel_from( async with tractor.to_asyncio.open_channel_from(
aio_echo_server, aio_echo_server,
) as (first, chan): ) as (chan, first):
assert first == 'start' assert first == 'start'
await ctx.started(first) await ctx.started(first)
@ -504,8 +501,10 @@ Yes, we spawn a python process, run ``asyncio``, start ``trio`` on the
``asyncio`` loop, then send commands to the ``trio`` scheduled tasks to ``asyncio`` loop, then send commands to the ``trio`` scheduled tasks to
tell ``asyncio`` tasks what to do XD tell ``asyncio`` tasks what to do XD
We need help refining the `asyncio`-side channel API to be more The ``asyncio``-side task receives a single
`trio`-like. Feel free to sling your opinion in `#273`_! ``chan: LinkedTaskChannel`` handle providing a ``trio``-like
API: ``.started_nowait()``, ``.send_nowait()``, ``.get()``
and more. Feel free to sling your opinion in `#273`_!
.. _#273: https://github.com/goodboy/tractor/issues/273 .. _#273: https://github.com/goodboy/tractor/issues/273

View File

@ -18,15 +18,14 @@ async def aio_sleep_forever():
async def bp_then_error( async def bp_then_error(
to_trio: trio.MemorySendChannel, chan: to_asyncio.LinkedTaskChannel,
from_trio: asyncio.Queue,
raise_after_bp: bool = True, raise_after_bp: bool = True,
) -> None: ) -> None:
# sync with `trio`-side (caller) task # sync with `trio`-side (caller) task
to_trio.send_nowait('start') chan.started_nowait('start')
# NOTE: what happens here inside the hook needs some refinement.. # NOTE: what happens here inside the hook needs some refinement..
# => seems like it's still `.debug._set_trace()` but # => seems like it's still `.debug._set_trace()` but
@ -60,7 +59,7 @@ async def trio_ctx(
to_asyncio.open_channel_from( to_asyncio.open_channel_from(
bp_then_error, bp_then_error,
# raise_after_bp=not bp_before_started, # raise_after_bp=not bp_before_started,
) as (first, chan), ) as (chan, first),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):

View File

@ -11,21 +11,17 @@ import tractor
async def aio_echo_server( async def aio_echo_server(
to_trio: trio.MemorySendChannel, chan: tractor.to_asyncio.LinkedTaskChannel,
from_trio: asyncio.Queue,
) -> None: ) -> None:
# a first message must be sent **from** this ``asyncio`` # a first message must be sent **from** this ``asyncio``
# task or the ``trio`` side will never unblock from # task or the ``trio`` side will never unblock from
# ``tractor.to_asyncio.open_channel_from():`` # ``tractor.to_asyncio.open_channel_from():``
to_trio.send_nowait('start') chan.started_nowait('start')
# XXX: this uses an ``from_trio: asyncio.Queue`` currently but we
# should probably offer something better.
while True: while True:
# echo the msg back # echo the msg back
to_trio.send_nowait(await from_trio.get()) chan.send_nowait(await chan.get())
await asyncio.sleep(0) await asyncio.sleep(0)
@ -37,7 +33,7 @@ async def trio_to_aio_echo_server(
# message. # message.
async with tractor.to_asyncio.open_channel_from( async with tractor.to_asyncio.open_channel_from(
aio_echo_server, aio_echo_server,
) as (first, chan): ) as (chan, first):
assert first == 'start' assert first == 'start'
await ctx.started(first) await ctx.started(first)

View File

@ -18,16 +18,15 @@ from tractor import RemoteActorError
async def aio_streamer( async def aio_streamer(
from_trio: asyncio.Queue, chan: tractor.to_asyncio.LinkedTaskChannel,
to_trio: trio.abc.SendChannel,
) -> trio.abc.ReceiveChannel: ) -> trio.abc.ReceiveChannel:
# required first msg to sync caller # required first msg to sync caller
to_trio.send_nowait(None) chan.started_nowait(None)
from itertools import cycle from itertools import cycle
for i in cycle(range(10)): for i in cycle(range(10)):
to_trio.send_nowait(i) chan.send_nowait(i)
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
@ -69,7 +68,7 @@ async def wrapper_mngr(
else: else:
async with tractor.to_asyncio.open_channel_from( async with tractor.to_asyncio.open_channel_from(
aio_streamer, aio_streamer,
) as (first, from_aio): ) as (from_aio, first):
assert not first assert not first
# cache it so next task uses broadcast receiver # cache it so next task uses broadcast receiver

View File

@ -47,12 +47,11 @@ async def sleep_and_err(
# just signature placeholders for compat with # just signature placeholders for compat with
# ``to_asyncio.open_channel_from()`` # ``to_asyncio.open_channel_from()``
to_trio: trio.MemorySendChannel|None = None, chan: to_asyncio.LinkedTaskChannel|None = None,
from_trio: asyncio.Queue|None = None,
): ):
if to_trio: if chan:
to_trio.send_nowait('start') chan.started_nowait('start')
await asyncio.sleep(sleep_for) await asyncio.sleep(sleep_for)
assert 0 assert 0
@ -238,7 +237,7 @@ async def trio_ctx(
trio.open_nursery() as tn, trio.open_nursery() as tn,
tractor.to_asyncio.open_channel_from( tractor.to_asyncio.open_channel_from(
sleep_and_err, sleep_and_err,
) as (first, chan), ) as (chan, first),
): ):
assert first == 'start' assert first == 'start'
@ -399,7 +398,7 @@ async def no_to_trio_in_args():
async def push_from_aio_task( async def push_from_aio_task(
sequence: Iterable, sequence: Iterable,
to_trio: trio.abc.SendChannel, chan: to_asyncio.LinkedTaskChannel,
expect_cancel: False, expect_cancel: False,
fail_early: bool, fail_early: bool,
exit_early: bool, exit_early: bool,
@ -407,15 +406,12 @@ async def push_from_aio_task(
) -> None: ) -> None:
try: try:
# print('trying breakpoint')
# breakpoint()
# sync caller ctx manager # sync caller ctx manager
to_trio.send_nowait(True) chan.started_nowait(True)
for i in sequence: for i in sequence:
print(f'asyncio sending {i}') print(f'asyncio sending {i}')
to_trio.send_nowait(i) chan.send_nowait(i)
await asyncio.sleep(0.001) await asyncio.sleep(0.001)
if ( if (
@ -478,7 +474,7 @@ async def stream_from_aio(
trio_exit_early trio_exit_early
)) ))
) as (first, chan): ) as (chan, first):
assert first is True assert first is True
@ -732,15 +728,21 @@ def test_aio_errors_and_channel_propagates_and_closes(
async def aio_echo_server( async def aio_echo_server(
to_trio: trio.MemorySendChannel, chan: to_asyncio.LinkedTaskChannel,
from_trio: asyncio.Queue,
) -> None: ) -> None:
'''
An IPC-msg "echo server" with msgs received and relayed by
a parent `trio.Task` into a child `asyncio.Task`
and then repeated back to that local parent (`trio.Task`)
and sent again back to the original calling remote actor.
to_trio.send_nowait('start') '''
# same semantics as `trio.TaskStatus.started()`
chan.started_nowait('start')
while True: while True:
try: try:
msg = await from_trio.get() msg = await chan.get()
except to_asyncio.TrioTaskExited: except to_asyncio.TrioTaskExited:
print( print(
'breaking aio echo loop due to `trio` exit!' 'breaking aio echo loop due to `trio` exit!'
@ -748,7 +750,7 @@ async def aio_echo_server(
break break
# echo the msg back # echo the msg back
to_trio.send_nowait(msg) chan.send_nowait(msg)
# if we get the terminate sentinel # if we get the terminate sentinel
# break the echo loop # break the echo loop
@ -765,7 +767,10 @@ async def trio_to_aio_echo_server(
): ):
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
aio_echo_server, aio_echo_server,
) as (first, chan): ) as (
chan,
first, # value from `chan.started_nowait()` above
):
assert first == 'start' assert first == 'start'
await ctx.started(first) await ctx.started(first)
@ -776,7 +781,8 @@ async def trio_to_aio_echo_server(
await chan.send(msg) await chan.send(msg)
out = await chan.receive() out = await chan.receive()
# echo back to parent actor-task
# echo back to parent-actor's remote parent-ctx-task!
await stream.send(out) await stream.send(out)
if out is None: if out is None:
@ -1090,24 +1096,21 @@ def test_sigint_closes_lifetime_stack(
# ?TODO asyncio.Task fn-deco? # ?TODO asyncio.Task fn-deco?
# -[ ] do sig checkingat import time like @context?
# -[ ] maybe name it @aio_task ??
# -[ ] chan: to_asyncio.InterloopChannel ?? # -[ ] chan: to_asyncio.InterloopChannel ??
# -[ ] do fn-sig checking at import time like @context?
# |_[ ] maybe name it @a(sync)io_task ??
# @asyncio_task <- not bad ??
async def raise_before_started( async def raise_before_started(
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
chan: to_asyncio.LinkedTaskChannel, chan: to_asyncio.LinkedTaskChannel,
) -> None: ) -> None:
''' '''
`asyncio.Task` entry point which RTEs before calling `asyncio.Task` entry point which RTEs before calling
`to_trio.send_nowait()`. `chan.started_nowait()`.
''' '''
await asyncio.sleep(0.2) await asyncio.sleep(0.2)
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!') raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
# to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
chan.started_nowait('Uhh we shouldve RTE-d ^^ ??') chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))

View File

@ -49,7 +49,7 @@ def test_infected_root_actor(
), ),
to_asyncio.open_channel_from( to_asyncio.open_channel_from(
aio_echo_server, aio_echo_server,
) as (first, chan), ) as (chan, first),
): ):
assert first == 'start' assert first == 'start'
@ -91,13 +91,12 @@ def test_infected_root_actor(
async def sync_and_err( async def sync_and_err(
# just signature placeholders for compat with # just signature placeholders for compat with
# ``to_asyncio.open_channel_from()`` # ``to_asyncio.open_channel_from()``
to_trio: trio.MemorySendChannel, chan: tractor.to_asyncio.LinkedTaskChannel,
from_trio: asyncio.Queue,
ev: asyncio.Event, ev: asyncio.Event,
): ):
if to_trio: if chan:
to_trio.send_nowait('start') chan.started_nowait('start')
await ev.wait() await ev.wait()
raise RuntimeError('asyncio-side') raise RuntimeError('asyncio-side')
@ -174,7 +173,7 @@ def test_trio_prestarted_task_bubbles(
sync_and_err, sync_and_err,
ev=aio_ev, ev=aio_ev,
) )
) as (first, chan), ) as (chan, first),
): ):
for i in range(5): for i in range(5):

View File

@ -48,7 +48,7 @@ from tractor._state import (
_runtime_vars, _runtime_vars,
) )
from tractor._context import Unresolved from tractor._context import Unresolved
from tractor.devx import debug from tractor import devx
from tractor.log import ( from tractor.log import (
get_logger, get_logger,
StackLevelAdapter, StackLevelAdapter,
@ -94,10 +94,14 @@ else:
QueueShutDown = False QueueShutDown = False
# TODO, generally speaking we can generalize this abstraction, a "SC linked # TODO, generally speaking we can generalize this abstraction as,
# parent->child task pair", as the same "supervision scope primitive" #
# **that is** our `._context.Context` with the only difference being # > A "SC linked, inter-event-loop" channel for comms between
# in how the tasks conduct msg-passing comms. # > a `parent: trio.Task` -> `child: asyncio.Task` pair.
#
# It is **very similar** in terms of its operation as a "supervision
# scope primitive" to that of our `._context.Context` with the only
# difference being in how the tasks conduct msg-passing comms.
# #
# For `LinkedTaskChannel` we are passing the equivalent of (once you # For `LinkedTaskChannel` we are passing the equivalent of (once you
# include all the recently added `._trio/aio_to_raise` # include all the recently added `._trio/aio_to_raise`
@ -122,6 +126,7 @@ class LinkedTaskChannel(
task scheduled in the host loop. task scheduled in the host loop.
''' '''
# ?TODO, rename as `._aio_q` since it's 2-way?
_to_aio: asyncio.Queue _to_aio: asyncio.Queue
_from_aio: trio.MemoryReceiveChannel _from_aio: trio.MemoryReceiveChannel
@ -235,9 +240,11 @@ class LinkedTaskChannel(
# #
async def receive(self) -> Any: async def receive(self) -> Any:
''' '''
Receive a value from the paired `asyncio.Task` with Receive a value `trio.Task` <- `asyncio.Task`.
Note the tasks in each loop are "SC linked" as a pair with
exception/cancel handling to teardown both sides on any exception/cancel handling to teardown both sides on any
unexpected error. unexpected error or cancellation.
''' '''
try: try:
@ -261,15 +268,40 @@ class LinkedTaskChannel(
): ):
raise err raise err
async def get(self) -> Any:
'''
Receive a value `asyncio.Task` <- `trio.Task`.
This is equiv to `await self._to_aio.get()`.
'''
return await self._to_aio.get()
async def send(self, item: Any) -> None: async def send(self, item: Any) -> None:
''' '''
Send a value through to the asyncio task presuming Send a value `trio.Task` -> `asyncio.Task`
it defines a ``from_trio`` argument, if it does not by enqueuing `item` onto the internal
this method will raise an error. `asyncio.Queue` via `put_nowait()`.
''' '''
self._to_aio.put_nowait(item) self._to_aio.put_nowait(item)
# TODO? could we only compile-in this method on an instance
# handed to the `asyncio`-side, i.e. the fn invoked with
# `.open_channel_from()`.
def send_nowait(
self,
item: Any,
) -> None:
'''
Send a value through FROM the `asyncio.Task` to
the `trio.Task` NON-BLOCKING.
This is equiv to `self._to_trio.send_nowait()`.
'''
self._to_trio.send_nowait(item)
# TODO? needed? # TODO? needed?
# async def wait_aio_complete(self) -> None: # async def wait_aio_complete(self) -> None:
# await self._aio_task_complete.wait() # await self._aio_task_complete.wait()
@ -337,9 +369,12 @@ def _run_asyncio_task(
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
if not tractor.current_actor().is_infected_aio(): if not (actor := tractor.current_actor()).is_infected_aio():
raise RuntimeError( raise RuntimeError(
"`infect_asyncio` mode is not enabled!?" f'`infect_asyncio: bool` mode is not enabled ??\n'
f'Ensure you pass `ActorNursery.start_actor(infect_asyncio=True)`\n'
f'\n'
f'{actor}\n'
) )
# ITC (inter task comms), these channel/queue names are mostly from # ITC (inter task comms), these channel/queue names are mostly from
@ -402,7 +437,23 @@ def _run_asyncio_task(
orig = result = id(coro) orig = result = id(coro)
try: try:
# XXX TODO UGH!
# this seems to break a `test_sync_pause_from_aio_task`
# in a REALLY weird way where a `dict` value for
# `_runtime_vars['_root_addrs']` is delivered from the
# parent actor??
#
# XXX => see masked `.set_trace()` block in
# `Actor.from_parent()`..
#
# with devx.maybe_open_crash_handler(
# # XXX, if trio-side exits (intentionally) we
# # shouldn't care bc it should have its own crash
# # handling logic.
# ignore={TrioTaskExited,},
# ) as _bxerr:
result: Any = await coro result: Any = await coro
chan._aio_result = result chan._aio_result = result
except BaseException as aio_err: except BaseException as aio_err:
chan._aio_err = aio_err chan._aio_err = aio_err
@ -509,7 +560,7 @@ def _run_asyncio_task(
if ( if (
debug_mode() debug_mode()
and and
(greenback := debug.maybe_import_greenback( (greenback := devx.debug.maybe_import_greenback(
force_reload=True, force_reload=True,
raise_not_found=False, raise_not_found=False,
)) ))
@ -909,7 +960,11 @@ async def translate_aio_errors(
except BaseException as _trio_err: except BaseException as _trio_err:
trio_err = chan._trio_err = _trio_err trio_err = chan._trio_err = _trio_err
# await tractor.pause(shield=True) # workx! # await tractor.pause(shield=True) # workx!
entered: bool = await debug._maybe_enter_pm(
# !TODO! we need an inter-loop lock here to avoid aio-tasks
# clobbering trio ones when both crash in debug-mode!
#
entered: bool = await devx.debug._maybe_enter_pm(
trio_err, trio_err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
) )
@ -1243,10 +1298,18 @@ async def open_channel_from(
suppress_graceful_exits: bool = True, suppress_graceful_exits: bool = True,
**target_kwargs, **target_kwargs,
) -> AsyncIterator[Any]: ) -> AsyncIterator[
tuple[LinkedTaskChannel, Any]
]:
''' '''
Open an inter-loop linked task channel for streaming between a target Start an `asyncio.Task` as `target()` and open an
spawned ``asyncio`` task and ``trio``. inter-loop (linked) channel for streaming between
it and the current `trio.Task`.
A pair `(chan: LinkedTaskChannel, Any)` is delivered
to the caller where the 2nd element is the value
provided by the `asyncio.Task`'s unblocking call
to `chan.started_nowait()`.
''' '''
chan: LinkedTaskChannel = _run_asyncio_task( chan: LinkedTaskChannel = _run_asyncio_task(
@ -1270,7 +1333,7 @@ async def open_channel_from(
first = await chan.receive() first = await chan.receive()
# deliver stream handle upward # deliver stream handle upward
yield first, chan yield chan, first
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
if cs.cancel_called: if cs.cancel_called:
if isinstance(chan._trio_to_raise, AsyncioCancelled): if isinstance(chan._trio_to_raise, AsyncioCancelled):
@ -1301,7 +1364,8 @@ async def open_channel_from(
) )
else: else:
# XXX SHOULD NEVER HAPPEN! # XXX SHOULD NEVER HAPPEN!
await tractor.pause() log.error("SHOULD NEVER GET HERE !?!?")
await tractor.pause(shield=True)
else: else:
chan._to_trio.close() chan._to_trio.close()