diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py
index d5f78ca8..3f8d20d3 100644
--- a/tractor/to_asyncio.py
+++ b/tractor/to_asyncio.py
@@ -342,20 +342,29 @@ def _run_asyncio_task(
                 '`trio` received final result from {task}\n'
                 f'|_{res}\n'
             )
-        except BaseException as terr:
-            task_err: BaseException = terr
+        except BaseException as _aio_err:
+            task_err: BaseException = _aio_err
 
             # read again AFTER the `asyncio` side errors in case
             # it was cancelled due to an error from `trio` (or
             # some other out of band exc).
             aio_err: BaseException|None = chan._aio_err
 
+            # always true right?
+            assert (
+                type(_aio_err) is type(aio_err)
+            ), (
+                f'`asyncio`-side task errors mismatch?!?\n\n'
+                f'caught: {_aio_err}\n'
+                f'chan._aio_err: {aio_err}\n'
+            )
+
             msg: str = (
                 '`trio`-side reports that the `asyncio`-side '
                 '{etype_str}\n'
                 # ^NOTE filled in below
             )
-            if isinstance(terr, CancelledError):
+            if isinstance(_aio_err, CancelledError):
                 msg += (
                     f'c)>\n'
                     f' |_{task}\n'
@@ -372,9 +381,6 @@ def _run_asyncio_task(
                     msg.format(etype_str='errored')
                 )
 
-            assert (
-                type(terr) is type(aio_err)
-            ), '`asyncio` task error mismatch?!?'
 
         if aio_err is not None:
             # import pdbp; pdbp.set_trace()
@@ -394,7 +400,7 @@ def _run_asyncio_task(
                 # aio_err.with_traceback(aio_err.__traceback__)
 
             # TODO: show when cancellation originated
-            # from each side more pedantically?
+            # from each side more pedantically in log-msg?
             # elif (
             #     type(aio_err) is CancelledError
             #     and  # trio was the cause?
@@ -429,6 +435,19 @@ def _run_asyncio_task(
     return chan
 
 
+class TrioTaskExited(AsyncioCancelled):
+    '''
+    The `trio`-side task exited without explicitly cancelling the
+    `asyncio.Task` peer.
+
+    This is very similar to how `trio.ClosedResource` acts as
+    a "clean shutdown" signal to the consumer side of a mem-chan,
+
+    https://trio.readthedocs.io/en/stable/reference-core.html#clean-shutdown-with-channels
+
+    '''
+
+
 @acm
 async def translate_aio_errors(
     chan: LinkedTaskChannel,
@@ -455,10 +474,11 @@ async def translate_aio_errors(
     trio_err: BaseException|None = None
     try:
         yield  # back to one of the cross-loop apis
-    except (
-        trio.Cancelled,
-    ) as _trio_err:
-        trio_err = _trio_err
+    except trio.Cancelled as taskc:
+        trio_err = taskc
+
+        # should NEVER be the case that `trio` is cancel-handling
+        # BEFORE the other side's task-ref was set!?
         assert chan._aio_task
 
         # import pdbp; pdbp.set_trace()  # lolevel-debug
@@ -483,14 +503,13 @@ async def translate_aio_errors(
         # )
         # raise
 
+    # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
+    # task-done-callback.
     except (
-        # NOTE: also see note in the `cancel_trio()` asyncio task
-        # termination callback
         trio.ClosedResourceError,
         # trio.BrokenResourceError,
-
-    ) as _trio_err:
-        trio_err = _trio_err
+    ) as cre:
+        trio_err = cre
         aio_err = chan._aio_err
         # import pdbp; pdbp.set_trace()
 
@@ -498,10 +517,21 @@ async def translate_aio_errors(
         # this channel close, raise our (non-`BaseException`) wrapper
         # exception (`AsyncioCancelled`) from that source error.
         if (
-            # NOTE, not until it terminates?
-            aio_task.cancelled()
+            # aio-side is cancelled?
+            aio_task.cancelled()  # not set until it terminates??
             and
             type(aio_err) is CancelledError
+
+            # TODO, if we want suppression of the
+            # silent-exit-by-`trio` case?
+            # -[ ] the parent task can also just catch it though?
+            # -[ ] OR, offer a `signal_aio_side_on_exit=True` ??
+            #
+            # or
+            # aio_err is None
+            # and
+            # chan._trio_exited
+
         ):
             raise AsyncioCancelled(
                 f'asyncio`-side cancelled the `trio`-side,\n'
@@ -511,6 +541,7 @@ async def translate_aio_errors(
                 f'{trio_err!r}\n'
             ) from aio_err
 
+        # maybe the chan-closure is due to something else?
         else:
             raise
 
@@ -552,6 +583,7 @@ async def translate_aio_errors(
     finally:
         # record wtv `trio`-side error transpired
         chan._trio_err = trio_err
+        ya_trio_exited: bool = chan._trio_exited
 
         # NOTE! by default always cancel the `asyncio` task if
         # we've made it this far and it's not done.
@@ -568,26 +600,56 @@ async def translate_aio_errors(
             # indicating the lifetime of the ``asyncio``-side task
             # should also be terminated.
             or (
-                chan._trio_exited
+                ya_trio_exited
                 and
                 not chan._trio_err   # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man..
             )
         ):
-            # pass
-            msg: str = (
-                f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n'
-                f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n'
-
-                f'trio-side exited silently!'
+            report: str = (
+                'trio-side exited silently!'
             )
-            # TODO XXX, figure out the case where calling this makes the
-            # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits`
-            # hang and then don't call it in that case!
-            #
-            aio_task.cancel(msg=msg)
-            log.warning(msg)
-            # assert not aio_err, 'WTF how did asyncio do this?!'
-            # import pdbp; pdbp.set_trace()
+            assert not aio_err, 'WTF how did asyncio do this?!'
+
+            # if the `trio.Task` already exited the `open_channel_from()`
+            # block we ensure the asyncio-side gets signalled via an
+            # explicit exception and its `Queue` is shutdown.
+            if ya_trio_exited:
+                chan._to_aio.shutdown()
+
+                # pump the other side's task? needed?
+                await trio.lowlevel.checkpoint()
+
+                if (
+                    not chan._trio_err
+                    and
+                    (fut := aio_task._fut_waiter)
+                ):
+                    fut.set_exception(
+                        TrioTaskExited(
+                            f'The peer `asyncio` task is still blocking/running?\n'
+                            f'>>\n'
+                            f'|_{aio_task!r}\n'
+                        )
+                    )
+                else:
+                    # from tractor._state import is_root_process
+                    # if is_root_process():
+                    #     breakpoint()
+                    #     import pdbp; pdbp.set_trace()
+
+                    aio_taskc_warn: str = (
+                        f'\n'
+                        f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n'
+                        f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n'
+                    )
+                    report += aio_taskc_warn
+                    # TODO XXX, figure out the case where calling this makes the
+                    # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits`
+                    # hang and then don't call it in that case!
+                    #
+                    aio_task.cancel(msg=aio_taskc_warn)
+
+            log.warning(report)
 
         # Required to sync with the far end `asyncio`-task to ensure
         # any error is captured (via monkeypatching the
@@ -1077,6 +1139,8 @@ def run_as_asyncio_guest(
             except (
                 asyncio.InvalidStateError,
                 # asyncio.CancelledError,
+                # ^^XXX `.shield()` call above prevents this??
+
             )as state_err:
 
                 # XXX be super dupere noisy about abandonment issues!