diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index f11a4eed..51ca6115 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -732,15 +732,21 @@ def test_aio_errors_and_channel_propagates_and_closes( async def aio_echo_server( - to_trio: trio.MemorySendChannel, - from_trio: asyncio.Queue, + chan: to_asyncio.LinkedTaskChannel, ) -> None: + ''' + An IPC-msg "echo server" with msgs received and relayed by + a parent `trio.Task` into a child `asyncio.Task` + and then repeated back to that local parent (`trio.Task`) + and sent again back to the original calling remote actor. - to_trio.send_nowait('start') + ''' + # same semantics as `trio.TaskStatus.started()` + chan.started_nowait('start') while True: try: - msg = await from_trio.get() + msg = await chan.get() except to_asyncio.TrioTaskExited: print( 'breaking aio echo loop due to `trio` exit!' @@ -748,7 +754,7 @@ async def aio_echo_server( break # echo the msg back - to_trio.send_nowait(msg) + chan.send_nowait(msg) # if we get the terminate sentinel # break the echo loop @@ -765,7 +771,10 @@ async def trio_to_aio_echo_server( ): async with to_asyncio.open_channel_from( aio_echo_server, - ) as (first, chan): + ) as ( + first, # value from `chan.started_nowait()` above + chan, + ): assert first == 'start' await ctx.started(first) @@ -776,7 +785,8 @@ async def trio_to_aio_echo_server( await chan.send(msg) out = await chan.receive() - # echo back to parent actor-task + + # echo back to parent-actor's remote parent-ctx-task! await stream.send(out) if out is None: @@ -1090,14 +1100,12 @@ def test_sigint_closes_lifetime_stack( # ?TODO asyncio.Task fn-deco? -# -[ ] do sig checkingat import time like @context? -# -[ ] maybe name it @aio_task ?? # -[ ] chan: to_asyncio.InterloopChannel ?? +# -[ ] do fn-sig checking at import time like @context? +# |_[ ] maybe name it @a(sync)io_task ?? +# @asyncio_task <- not bad ?? async def raise_before_started( - # from_trio: asyncio.Queue, - # to_trio: trio.abc.SendChannel, chan: to_asyncio.LinkedTaskChannel, - ) -> None: ''' `asyncio.Task` entry point which RTEs before calling diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 8c22f84d..9393b452 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -94,10 +94,14 @@ else: QueueShutDown = False -# TODO, generally speaking we can generalize this abstraction, a "SC linked -# parent->child task pair", as the same "supervision scope primitive" -# **that is** our `._context.Context` with the only difference being -# in how the tasks conduct msg-passing comms. +# TODO, generally speaking we can generalize this abstraction as, +# +# > A "SC linked, inter-event-loop" channel for comms between +# > a `parent: trio.Task` -> `child: asyncio.Task` pair. +# +# It is **very similar** in terms of its operation as a "supervision +# scope primitive" to that of our `._context.Context` with the only +# difference being in how the tasks conduct msg-passing comms. # # For `LinkedTaskChannel` we are passing the equivalent of (once you # include all the recently added `._trio/aio_to_raise` @@ -122,6 +126,7 @@ class LinkedTaskChannel( task scheduled in the host loop. ''' + # ?TODO, rename as `._aio_q` since it's 2-way? _to_aio: asyncio.Queue _from_aio: trio.MemoryReceiveChannel @@ -235,9 +240,11 @@ class LinkedTaskChannel( # async def receive(self) -> Any: ''' - Receive a value from the paired `asyncio.Task` with + Receive a value `trio.Task` <- `asyncio.Task`. + + Note the tasks in each loop are "SC linked" as a pair with exception/cancel handling to teardown both sides on any - unexpected error. + unexpected error or cancellation. ''' try: @@ -261,15 +268,42 @@ class LinkedTaskChannel( ): raise err + async def get(self) -> Any: + ''' + Receive a value `asyncio.Task` <- `trio.Task`. + + This is equiv to `await self._from_trio.get()`. + + ''' + return await self._to_aio.get() + async def send(self, item: Any) -> None: ''' - Send a value through to the asyncio task presuming - it defines a ``from_trio`` argument, if it does not + Send a value through `trio.Task` -> `asyncio.Task` + presuming + it defines a `from_trio` argument or makes calls + to `chan.get()` , if it does not this method will raise an error. ''' self._to_aio.put_nowait(item) + # TODO? could we only compile-in this method on an instance + # handed to the `asyncio`-side, i.e. the fn invoked with + # `.open_channel_from()`. + def send_nowait( + self, + item: Any, + ) -> None: + ''' + Send a value through FROM the `asyncio.Task` to + the `trio.Task` NON-BLOCKING. + + This is equiv to `self._to_trio.send_nowait()`. + + ''' + self._to_trio.send_nowait(item) + # TODO? needed? # async def wait_aio_complete(self) -> None: # await self._aio_task_complete.wait()