Use `chan: LinkedTaskChannel` API in all aio-task fns
Convert every remaining `to_trio`/`from_trio` fn-sig style to the new unified `chan: LinkedTaskChannel` iface added in prior commit (c46e9ee8). Deats, - `to_trio.send_nowait(val)` (1st call) -> `chan.started_nowait(val)` - `to_trio.send_nowait(val)` (subsequent) -> `chan.send_nowait(val)` - `await from_trio.get()` -> `await chan.get()` Converted fns, - `sleep_and_err()`, `push_from_aio_task()` in `tests/test_infected_asyncio.py` - `sync_and_err()` in `tests/test_root_infect_asyncio.py` - `aio_streamer()` in `tests/test_child_manages_service_nursery.py` - `aio_echo_server()` in `examples/infected_asyncio_echo_server.py` - `bp_then_error()` in `examples/debugging/asyncio_bp.py` Also, - drop stale comments referencing old param names. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codepiker_pin_macmini
parent
36cbc07602
commit
417b796169
|
|
@ -18,15 +18,14 @@ async def aio_sleep_forever():
|
||||||
|
|
||||||
|
|
||||||
async def bp_then_error(
|
async def bp_then_error(
|
||||||
to_trio: trio.MemorySendChannel,
|
chan: to_asyncio.LinkedTaskChannel,
|
||||||
from_trio: asyncio.Queue,
|
|
||||||
|
|
||||||
raise_after_bp: bool = True,
|
raise_after_bp: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# sync with `trio`-side (caller) task
|
# sync with `trio`-side (caller) task
|
||||||
to_trio.send_nowait('start')
|
chan.started_nowait('start')
|
||||||
|
|
||||||
# NOTE: what happens here inside the hook needs some refinement..
|
# NOTE: what happens here inside the hook needs some refinement..
|
||||||
# => seems like it's still `.debug._set_trace()` but
|
# => seems like it's still `.debug._set_trace()` but
|
||||||
|
|
|
||||||
|
|
@ -11,21 +11,17 @@ import tractor
|
||||||
|
|
||||||
|
|
||||||
async def aio_echo_server(
|
async def aio_echo_server(
|
||||||
to_trio: trio.MemorySendChannel,
|
chan: tractor.to_asyncio.LinkedTaskChannel,
|
||||||
from_trio: asyncio.Queue,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
# a first message must be sent **from** this ``asyncio``
|
# a first message must be sent **from** this ``asyncio``
|
||||||
# task or the ``trio`` side will never unblock from
|
# task or the ``trio`` side will never unblock from
|
||||||
# ``tractor.to_asyncio.open_channel_from():``
|
# ``tractor.to_asyncio.open_channel_from():``
|
||||||
to_trio.send_nowait('start')
|
chan.started_nowait('start')
|
||||||
|
|
||||||
# XXX: this uses an ``from_trio: asyncio.Queue`` currently but we
|
|
||||||
# should probably offer something better.
|
|
||||||
while True:
|
while True:
|
||||||
# echo the msg back
|
# echo the msg back
|
||||||
to_trio.send_nowait(await from_trio.get())
|
chan.send_nowait(await chan.get())
|
||||||
await asyncio.sleep(0)
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,16 +18,15 @@ from tractor import RemoteActorError
|
||||||
|
|
||||||
|
|
||||||
async def aio_streamer(
|
async def aio_streamer(
|
||||||
from_trio: asyncio.Queue,
|
chan: tractor.to_asyncio.LinkedTaskChannel,
|
||||||
to_trio: trio.abc.SendChannel,
|
|
||||||
) -> trio.abc.ReceiveChannel:
|
) -> trio.abc.ReceiveChannel:
|
||||||
|
|
||||||
# required first msg to sync caller
|
# required first msg to sync caller
|
||||||
to_trio.send_nowait(None)
|
chan.started_nowait(None)
|
||||||
|
|
||||||
from itertools import cycle
|
from itertools import cycle
|
||||||
for i in cycle(range(10)):
|
for i in cycle(range(10)):
|
||||||
to_trio.send_nowait(i)
|
chan.send_nowait(i)
|
||||||
await asyncio.sleep(0.01)
|
await asyncio.sleep(0.01)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -47,12 +47,11 @@ async def sleep_and_err(
|
||||||
|
|
||||||
# just signature placeholders for compat with
|
# just signature placeholders for compat with
|
||||||
# ``to_asyncio.open_channel_from()``
|
# ``to_asyncio.open_channel_from()``
|
||||||
to_trio: trio.MemorySendChannel|None = None,
|
chan: to_asyncio.LinkedTaskChannel|None = None,
|
||||||
from_trio: asyncio.Queue|None = None,
|
|
||||||
|
|
||||||
):
|
):
|
||||||
if to_trio:
|
if chan:
|
||||||
to_trio.send_nowait('start')
|
chan.started_nowait('start')
|
||||||
|
|
||||||
await asyncio.sleep(sleep_for)
|
await asyncio.sleep(sleep_for)
|
||||||
assert 0
|
assert 0
|
||||||
|
|
@ -399,7 +398,7 @@ async def no_to_trio_in_args():
|
||||||
|
|
||||||
async def push_from_aio_task(
|
async def push_from_aio_task(
|
||||||
sequence: Iterable,
|
sequence: Iterable,
|
||||||
to_trio: trio.abc.SendChannel,
|
chan: to_asyncio.LinkedTaskChannel,
|
||||||
expect_cancel: False,
|
expect_cancel: False,
|
||||||
fail_early: bool,
|
fail_early: bool,
|
||||||
exit_early: bool,
|
exit_early: bool,
|
||||||
|
|
@ -407,15 +406,12 @@ async def push_from_aio_task(
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# print('trying breakpoint')
|
|
||||||
# breakpoint()
|
|
||||||
|
|
||||||
# sync caller ctx manager
|
# sync caller ctx manager
|
||||||
to_trio.send_nowait(True)
|
chan.started_nowait(True)
|
||||||
|
|
||||||
for i in sequence:
|
for i in sequence:
|
||||||
print(f'asyncio sending {i}')
|
print(f'asyncio sending {i}')
|
||||||
to_trio.send_nowait(i)
|
chan.send_nowait(i)
|
||||||
await asyncio.sleep(0.001)
|
await asyncio.sleep(0.001)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
|
|
@ -1109,13 +1105,12 @@ async def raise_before_started(
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
`asyncio.Task` entry point which RTEs before calling
|
`asyncio.Task` entry point which RTEs before calling
|
||||||
`to_trio.send_nowait()`.
|
`chan.started_nowait()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
await asyncio.sleep(0.2)
|
await asyncio.sleep(0.2)
|
||||||
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
|
raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
|
||||||
|
|
||||||
# to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
|
|
||||||
chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
|
chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
|
||||||
await asyncio.sleep(float('inf'))
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -91,13 +91,12 @@ def test_infected_root_actor(
|
||||||
async def sync_and_err(
|
async def sync_and_err(
|
||||||
# just signature placeholders for compat with
|
# just signature placeholders for compat with
|
||||||
# ``to_asyncio.open_channel_from()``
|
# ``to_asyncio.open_channel_from()``
|
||||||
to_trio: trio.MemorySendChannel,
|
chan: tractor.to_asyncio.LinkedTaskChannel,
|
||||||
from_trio: asyncio.Queue,
|
|
||||||
ev: asyncio.Event,
|
ev: asyncio.Event,
|
||||||
|
|
||||||
):
|
):
|
||||||
if to_trio:
|
if chan:
|
||||||
to_trio.send_nowait('start')
|
chan.started_nowait('start')
|
||||||
|
|
||||||
await ev.wait()
|
await ev.wait()
|
||||||
raise RuntimeError('asyncio-side')
|
raise RuntimeError('asyncio-side')
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue