diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py
index 24f1ace6..d5f78ca8 100644
--- a/tractor/to_asyncio.py
+++ b/tractor/to_asyncio.py
@@ -33,13 +33,19 @@ from typing import (
 )
 
 import tractor
-from tractor._exceptions import AsyncioCancelled
+from tractor._exceptions import (
+    AsyncioCancelled,
+    is_multi_cancelled,
+)
 from tractor._state import (
     debug_mode,
     _runtime_vars,
 )
 from tractor.devx import _debug
-from tractor.log import get_logger
+from tractor.log import (
+    get_logger,
+    StackLevelAdapter,
+)
 from tractor.trionics._broadcast import (
     broadcast_receiver,
     BroadcastReceiver,
@@ -50,7 +56,7 @@ from outcome import (
     Outcome,
 )
 
-log = get_logger(__name__)
+log: StackLevelAdapter = get_logger(__name__)
 
 
 __all__ = [
@@ -70,9 +76,10 @@ class LinkedTaskChannel(trio.abc.Channel):
     _to_aio: asyncio.Queue
     _from_aio: trio.MemoryReceiveChannel
     _to_trio: trio.MemorySendChannel
-
     _trio_cs: trio.CancelScope
     _aio_task_complete: trio.Event
+
+    _trio_err: BaseException|None = None
     _trio_exited: bool = False
 
     # set after ``asyncio.create_task()``
@@ -84,28 +91,40 @@ class LinkedTaskChannel(trio.abc.Channel):
         await self._from_aio.aclose()
 
     async def receive(self) -> Any:
-        async with translate_aio_errors(
-            self,
-
-            # XXX: obviously this will deadlock if an on-going stream is
-            # being procesed.
-            # wait_on_aio_task=False,
-        ):
+        '''
+        Receive a value from the paired `asyncio.Task` with
+        exception/cancel handling to teardown both sides on any
+        unexpected error.
 
+        '''
+        try:
             # TODO: do we need this to guarantee asyncio code get's
             # cancelled in the case where the trio side somehow creates
             # a state where the asyncio cycle-task isn't getting the
             # cancel request sent by (in theory) the last checkpoint
             # cycle on the trio side?
             # await trio.lowlevel.checkpoint()
-
             return await self._from_aio.receive()
+        except BaseException as err:
+            async with translate_aio_errors(
+                self,
+
+                # XXX: obviously this will deadlock if an on-going stream is
+                # being procesed.
+                # wait_on_aio_task=False,
+            ):
+                raise err
 
     async def wait_asyncio_complete(self) -> None:
         await self._aio_task_complete.wait()
 
-    # def cancel_asyncio_task(self) -> None:
-    #     self._aio_task.cancel()
+    def cancel_asyncio_task(
+        self,
+        msg: str = '',
+    ) -> None:
+        self._aio_task.cancel(
+            msg=msg,
+        )
 
     async def send(self, item: Any) -> None:
         '''
@@ -155,7 +174,6 @@ class LinkedTaskChannel(trio.abc.Channel):
 
 
 def _run_asyncio_task(
-
     func: Callable,
     *,
     qsize: int = 1,
@@ -165,8 +183,9 @@ def _run_asyncio_task(
 
 ) -> LinkedTaskChannel:
     '''
-    Run an ``asyncio`` async function or generator in a task, return
-    or stream the result back to the caller `trio.lowleve.Task`.
+    Run an `asyncio`-compat async function or generator in a task,
+    return or stream the result back to the caller
+    `trio.lowleve.Task`.
 
     '''
     __tracebackhide__: bool = hide_tb
@@ -204,23 +223,23 @@ def _run_asyncio_task(
     aio_err: BaseException|None = None
 
     chan = LinkedTaskChannel(
-        aio_q,  # asyncio.Queue
-        from_aio,  # recv chan
-        to_trio,  # send chan
-
-        cancel_scope,
-        aio_task_complete,
+        _to_aio=aio_q,  # asyncio.Queue
+        _from_aio=from_aio,  # recv chan
+        _to_trio=to_trio,  # send chan
+        _trio_cs=cancel_scope,
+        _aio_task_complete=aio_task_complete,
     )
 
     async def wait_on_coro_final_result(
-
         to_trio: trio.MemorySendChannel,
         coro: Awaitable,
         aio_task_complete: trio.Event,
 
     ) -> None:
         '''
-        Await ``coro`` and relay result back to ``trio``.
+        Await `coro` and relay result back to `trio`.
+
+        This can only be run as an `asyncio.Task`!
 
         '''
         nonlocal aio_err
@@ -243,8 +262,10 @@ def _run_asyncio_task(
 
         else:
             if (
-                result != orig and
-                aio_err is None and
+                result != orig
+                and
+                aio_err is None
+                and
 
                 # in the `open_channel_from()` case we don't
                 # relay through the "return value".
@@ -260,12 +281,21 @@ def _run_asyncio_task(
                 # a ``trio.EndOfChannel`` to the trio (consumer) side.
                 to_trio.close()
 
+            # import pdbp; pdbp.set_trace()
             aio_task_complete.set()
-            log.runtime(f'`asyncio` task: {task.get_name()} is complete')
+            # await asyncio.sleep(0.1)
+            log.info(
+                f'`asyncio` task terminated\n'
+                f'x)>\n'
+                f'  |_{task}\n'
+            )
 
     # start the asyncio task we submitted from trio
     if not inspect.isawaitable(coro):
-        raise TypeError(f"No support for invoking {coro}")
+        raise TypeError(
+            f'Pass the async-fn NOT a coroutine\n'
+            f'{coro!r}'
+        )
 
     task: asyncio.Task = asyncio.create_task(
         wait_on_coro_final_result(
@@ -289,6 +319,10 @@ def _run_asyncio_task(
             raise_not_found=False,
         ))
     ):
+        log.info(
+            f'Bestowing `greenback` portal for `asyncio`-task\n'
+            f'{task}\n'
+        )
         greenback.bestow_portal(task)
 
     def cancel_trio(task: asyncio.Task) -> None:
@@ -304,11 +338,22 @@ def _run_asyncio_task(
         # task exceptions
         try:
             res: Any = task.result()
+            log.info(
+                '`trio` received final result from {task}\n'
+                f'|_{res}\n'
+            )
         except BaseException as terr:
             task_err: BaseException = terr
 
+            # read again AFTER the `asyncio` side errors in case
+            # it was cancelled due to an error from `trio` (or
+            # some other out of band exc).
+            aio_err: BaseException|None = chan._aio_err
+
             msg: str = (
-                'Infected `asyncio` task {etype_str}\n'
+                '`trio`-side reports that the `asyncio`-side '
+                '{etype_str}\n'
+                # ^NOTE filled in below
             )
             if isinstance(terr, CancelledError):
                 msg += (
@@ -327,17 +372,18 @@ def _run_asyncio_task(
                     msg.format(etype_str='errored')
                 )
 
-            assert type(terr) is type(aio_err), (
-                '`asyncio` task error mismatch?!?'
-            )
+            assert (
+                type(terr) is type(aio_err)
+            ), '`asyncio` task error mismatch?!?'
 
         if aio_err is not None:
+            # import pdbp; pdbp.set_trace()
             # XXX: uhh is this true?
             # assert task_err, f'Asyncio task {task.get_name()} discrepancy!?'
 
             # NOTE: currently mem chan closure may act as a form
-            # of error relay (at least in the ``asyncio.CancelledError``
-            # case) since we have no way to directly trigger a ``trio``
+            # of error relay (at least in the `asyncio.CancelledError`
+            # case) since we have no way to directly trigger a `trio`
             # task error without creating a nursery to throw one.
             # We might want to change this in the future though.
             from_aio.close()
@@ -359,29 +405,25 @@ def _run_asyncio_task(
             #     )
             #     raise aio_err from task_err
 
-            # XXX: if not already, alway cancel the scope
-            # on a task error in case the trio task is blocking on
+            # XXX: if not already, alway cancel the scope on a task
+            # error in case the trio task is blocking on
             # a checkpoint.
-            cancel_scope.cancel()
-
             if (
-                task_err
-                and
-                aio_err is not task_err
+                not cancel_scope.cancelled_caught
+                or
+                not cancel_scope.cancel_called
             ):
-                raise aio_err from task_err
+                # import pdbp; pdbp.set_trace()
+                cancel_scope.cancel()
 
-            # raise any `asyncio` side error.
-            raise aio_err
-
-        log.info(
-            '`trio` received final result from {task}\n'
-            f'|_{res}\n'
-        )
-        # TODO: do we need this?
-        # if task_err:
-        #     cancel_scope.cancel()
-        #     raise task_err
+            if task_err:
+                # XXX raise any `asyncio` side error IFF it doesn't
+                # match the one we just caught from the task above!
+                # (that would indicate something weird/very-wrong
+                # going on?)
+                if aio_err is not task_err:
+                    # import pdbp; pdbp.set_trace()
+                    raise aio_err from task_err
 
     task.add_done_callback(cancel_trio)
     return chan
@@ -389,13 +431,18 @@ def _run_asyncio_task(
 
 @acm
 async def translate_aio_errors(
-
     chan: LinkedTaskChannel,
     wait_on_aio_task: bool = False,
+    cancel_aio_task_on_trio_exit: bool = True,
 
 ) -> AsyncIterator[None]:
     '''
-    Error handling context around ``asyncio`` task spawns which
+    An error handling to cross-loop propagation context around
+    `asyncio.Task` spawns via one of this module's APIs:
+
+    - `open_channel_from()`
+    - `run_task()`
+
     appropriately translates errors and cancels into ``trio`` land.
 
     '''
@@ -403,88 +450,204 @@ async def translate_aio_errors(
 
     aio_err: BaseException|None = None
 
-    # TODO: make thisi a channel method?
-    def maybe_raise_aio_err(
-        err: Exception|None = None
-    ) -> None:
-        aio_err = chan._aio_err
-        if (
-            aio_err is not None
-            and
-            # not isinstance(aio_err, CancelledError)
-            type(aio_err) != CancelledError
-        ):
-            # always raise from any captured asyncio error
-            if err:
-                raise aio_err from err
-            else:
-                raise aio_err
-
-    task = chan._aio_task
-    assert task
+    aio_task: asyncio.Task = chan._aio_task
+    assert aio_task
+    trio_err: BaseException|None = None
     try:
-        yield
-
+        yield  # back to one of the cross-loop apis
     except (
         trio.Cancelled,
-    ):
-        # relay cancel through to called ``asyncio`` task
+    ) as _trio_err:
+        trio_err = _trio_err
         assert chan._aio_task
-        chan._aio_task.cancel(
-            msg=f'the `trio` caller task was cancelled: {trio_task.name}'
+
+        # import pdbp; pdbp.set_trace()  # lolevel-debug
+
+        # relay cancel through to called ``asyncio`` task
+        chan._aio_err = AsyncioCancelled(
+            f'trio`-side cancelled the `asyncio`-side,\n'
+            f'c)>\n'
+            f'  |_{trio_task}\n\n'
+
+
+            f'{trio_err!r}\n'
         )
-        raise
+
+        # XXX NOTE XXX seems like we can get all sorts of unreliable
+        # behaviour from `asyncio` under various cancellation
+        # conditions (like SIGINT/kbi) when this is used..
+        # SO FOR NOW, try to avoid it at most costs!
+        #
+        # aio_task.cancel(
+        #     msg=f'the `trio` parent task was cancelled: {trio_task.name}'
+        # )
+        # raise
 
     except (
-        # NOTE: see the note in the ``cancel_trio()`` asyncio task
+        # NOTE: also see note in the `cancel_trio()` asyncio task
         # termination callback
         trio.ClosedResourceError,
         # trio.BrokenResourceError,
-    ):
+
+    ) as _trio_err:
+        trio_err = _trio_err
         aio_err = chan._aio_err
+        # import pdbp; pdbp.set_trace()
+
+        # XXX if an underlying `asyncio.CancelledError` triggered
+        # this channel close, raise our (non-`BaseException`) wrapper
+        # exception (`AsyncioCancelled`) from that source error.
         if (
-            task.cancelled()
+            # NOTE, not until it terminates?
+            aio_task.cancelled()
             and
             type(aio_err) is CancelledError
         ):
-            # if an underlying `asyncio.CancelledError` triggered this
-            # channel close, raise our (non-``BaseException``) wrapper
-            # error: ``AsyncioCancelled`` from that source error.
             raise AsyncioCancelled(
-                f'Task cancelled\n'
-                f'|_{task}\n'
+                f'asyncio`-side cancelled the `trio`-side,\n'
+                f'c(>\n'
+                f'  |_{aio_task}\n\n'
+
+                f'{trio_err!r}\n'
             ) from aio_err
 
         else:
             raise
 
-    finally:
+    except BaseException as _trio_err:
+        trio_err = _trio_err
+        log.exception(
+            '`trio`-side task errored?'
+        )
+
+        entered: bool = await _debug._maybe_enter_pm(
+            trio_err,
+            api_frame=inspect.currentframe(),
+        )
         if (
-            # NOTE: always cancel the ``asyncio`` task if we've made it
-            # this far and it's not done.
-            not task.done() and aio_err
+            not entered
+            and
+            not is_multi_cancelled(trio_err)
+        ):
+            log.exception('actor crashed\n')
+
+        aio_taskc = AsyncioCancelled(
+            f'`trio`-side task errored!\n'
+            f'{trio_err}'
+        ) #from trio_err
+
+        try:
+            aio_task.set_exception(aio_taskc)
+        except (
+            asyncio.InvalidStateError,
+            RuntimeError,
+            # ^XXX, uhh bc apparently we can't use `.set_exception()`
+            # any more XD .. ??
+        ):
+            wait_on_aio_task = False
+
+        # import pdbp; pdbp.set_trace()
+        # raise aio_taskc from trio_err
+
+    finally:
+        # record wtv `trio`-side error transpired
+        chan._trio_err = trio_err
+
+        # NOTE! by default always cancel the `asyncio` task if
+        # we've made it this far and it's not done.
+        # TODO, how to detect if there's an out-of-band error that
+        # caused the exit?
+        if (
+            cancel_aio_task_on_trio_exit
+            and
+            not aio_task.done()
+            and
+            aio_err
 
             # or the trio side has exited it's surrounding cancel scope
             # indicating the lifetime of the ``asyncio``-side task
             # should also be terminated.
-            or chan._trio_exited
-        ):
-            log.runtime(
-                f'Cancelling `asyncio`-task: {task.get_name()}'
+            or (
+                chan._trio_exited
+                and
+                not chan._trio_err   # XXX CRITICAL, `asyncio.Task.cancel()` is cucked man..
             )
-            # assert not aio_err, 'WTF how did asyncio do this?!'
-            task.cancel()
+        ):
+            # pass
+            msg: str = (
+                f'MANUALLY Cancelling `asyncio`-task: {aio_task.get_name()}!\n\n'
+                f'**THIS CAN SILENTLY SUPPRESS ERRORS FYI\n\n'
 
-        # Required to sync with the far end ``asyncio``-task to ensure
+                f'trio-side exited silently!'
+            )
+            # TODO XXX, figure out the case where calling this makes the
+            # `test_infected_asyncio.py::test_trio_closes_early_and_channel_exits`
+            # hang and then don't call it in that case!
+            #
+            aio_task.cancel(msg=msg)
+            log.warning(msg)
+            # assert not aio_err, 'WTF how did asyncio do this?!'
+            # import pdbp; pdbp.set_trace()
+
+        # Required to sync with the far end `asyncio`-task to ensure
         # any error is captured (via monkeypatching the
-        # ``channel._aio_err``) before calling ``maybe_raise_aio_err()``
+        # `channel._aio_err`) before calling ``maybe_raise_aio_err()``
         # below!
+        #
+        # XXX NOTE XXX the `task.set_exception(aio_taskc)` call above
+        # MUST NOT EXCEPT or this WILL HANG!!
+        #
+        # so if you get a hang maybe step through and figure out why
+        # it erroed out up there!
+        #
         if wait_on_aio_task:
+            # await chan.wait_asyncio_complete()
             await chan._aio_task_complete.wait()
+            log.info(
+                'asyncio-task is done and unblocked trio-side!\n'
+            )
+
+        # TODO?
+        # -[ ] make this a channel method, OR
+        # -[ ] just put back inline below?
+        #
+        def maybe_raise_aio_side_err(
+            trio_err: Exception,
+        ) -> None:
+            '''
+            Raise any `trio`-side-caused cancellation or legit task
+            error normally propagated from the caller of either,
+              - `open_channel_from()`
+              - `run_task()`
+
+            '''
+            aio_err: BaseException|None = chan._aio_err
+
+            # Check if the asyncio-side is the cause of the trio-side
+            # error.
+            if (
+                aio_err is not None
+                and
+                type(aio_err) is not AsyncioCancelled
+
+                # not isinstance(aio_err, CancelledError)
+                # type(aio_err) is not CancelledError
+            ):
+                # always raise from any captured asyncio error
+                if trio_err:
+                    raise trio_err from aio_err
+
+                raise aio_err
+
+            if trio_err:
+                raise trio_err
 
         # NOTE: if any ``asyncio`` error was caught, raise it here inline
         # here in the ``trio`` task
-        maybe_raise_aio_err()
+        # if trio_err:
+        maybe_raise_aio_side_err(
+            trio_err=trio_err
+        )
 
 
 async def run_task(
@@ -496,8 +659,8 @@ async def run_task(
 
 ) -> Any:
     '''
-    Run an `asyncio` async function or generator in a task, return
-    or stream the result back to `trio`.
+    Run an `asyncio`-compat async function or generator in a task,
+    return or stream the result back to `trio`.
 
     '''
     # simple async func
@@ -537,6 +700,7 @@ async def open_channel_from(
         provide_channels=True,
         **kwargs,
     )
+    # TODO, tuple form here?
     async with chan._from_aio:
         async with translate_aio_errors(
             chan,
@@ -685,18 +849,21 @@ def run_as_asyncio_guest(
     # Uh, oh.
     #
     # :o
-
-    # It looks like your event loop has caught a case of the ``trio``s.
-
-    # :()
-
-    # Don't worry, we've heard you'll barely notice. You might
-    # hallucinate a few more propagating errors and feel like your
-    # digestion has slowed but if anything get's too bad your parents
-    # will know about it.
-
+    #
+    # looks like your stdlib event loop has caught a case of "the trios" !
+    #
+    # :O
+    #
+    # Don't worry, we've heard you'll barely notice.
+    #
     # :)
-
+    #
+    # You might hallucinate a few more propagating errors and feel
+    # like your digestion has slowed, but if anything get's too bad
+    # your parents will know about it.
+    #
+    # B)
+    #
     async def aio_main(trio_main):
         '''
         Main `asyncio.Task` which calls
@@ -713,16 +880,20 @@ def run_as_asyncio_guest(
             '-> built a `trio`-done future\n'
         )
 
-        # TODO: shoudn't this be done in the guest-run trio task?
-        # if debug_mode():
-        #     # XXX make it obvi we know this isn't supported yet!
-        #     log.error(
-        #         'Attempting to enter unsupported `greenback` init '
-        #         'from `asyncio` task..'
-        #     )
-        #     await _debug.maybe_init_greenback(
-        #         force_reload=True,
-        #     )
+        # TODO: is this evern run or needed?
+        # -[ ] pretty sure it never gets run for root-infected-aio
+        #     since this main task is always the parent of any
+        #     eventual `open_root_actor()` call?
+        if debug_mode():
+            log.error(
+                'Attempting to enter non-required `greenback` init '
+                'from `asyncio` task ???'
+            )
+            # XXX make it obvi we know this isn't supported yet!
+            assert 0
+            # await _debug.maybe_init_greenback(
+            #     force_reload=True,
+            # )
 
         def trio_done_callback(main_outcome):
             log.runtime(
@@ -732,6 +903,7 @@ def run_as_asyncio_guest(
             )
 
             if isinstance(main_outcome, Error):
+                # import pdbp; pdbp.set_trace()
                 error: BaseException = main_outcome.error
 
                 # show an dedicated `asyncio`-side tb from the error
@@ -751,7 +923,7 @@ def run_as_asyncio_guest(
                 trio_done_fute.set_result(main_outcome)
 
             log.info(
-                f'`trio` guest-run finished with outcome\n'
+                f'`trio` guest-run finished with,\n'
                 f')>\n'
                 f'|_{trio_done_fute}\n'
             )
@@ -777,9 +949,20 @@ def run_as_asyncio_guest(
             done_callback=trio_done_callback,
         )
         fute_err: BaseException|None = None
-
         try:
             out: Outcome = await asyncio.shield(trio_done_fute)
+            # ^TODO still don't really understand why the `.shield()`
+            # is required ... ??
+            # https://docs.python.org/3/library/asyncio-task.html#asyncio.shield
+            # ^ seems as though in combo with the try/except here
+            # we're BOLDLY INGORING cancel of the trio fute?
+            #
+            # I guess it makes sense bc we don't want `asyncio` to
+            # cancel trio just because they can't handle SIGINT
+            # sanely? XD .. kk
+
+            # XXX, sin-shield causes guest-run abandons on SIGINT..
+            # out: Outcome = await trio_done_fute
 
             # NOTE will raise (via `Error.unwrap()`) from any
             # exception packed into the guest-run's `main_outcome`.
@@ -802,27 +985,32 @@ def run_as_asyncio_guest(
             fute_err = _fute_err
             err_message: str = (
                 'main `asyncio` task '
+                'was cancelled!\n'
             )
-            if isinstance(fute_err, asyncio.CancelledError):
-                err_message += 'was cancelled!\n'
-            else:
-                err_message += f'errored with {out.error!r}\n'
 
+            # TODO, handle possible edge cases with
+            # `open_root_actor()` closing before this is run!
+            #
             actor: tractor.Actor = tractor.current_actor()
+
             log.exception(
                 err_message
                 +
                 'Cancelling `trio`-side `tractor`-runtime..\n'
-                f'c)>\n'
+                f'c(>\n'
                 f'  |_{actor}.cancel_soon()\n'
             )
 
-            # XXX WARNING XXX the next LOCs are super important, since
-            # without them, we can get guest-run abandonment cases
-            # where `asyncio` will not schedule or wait on the `trio`
-            # guest-run task before final shutdown! This is
-            # particularly true if the `trio` side has tasks doing
-            # shielded work when a SIGINT condition occurs.
+            # XXX WARNING XXX the next LOCs are super important!
+            #
+            # SINCE without them, we can get guest-run ABANDONMENT
+            # cases where `asyncio` will not schedule or wait on the
+            # guest-run `trio.Task` nor invoke its registered
+            # `trio_done_callback()` before final shutdown!
+            #
+            # This is particularly true if the `trio` side has tasks
+            # in shielded sections when an OC-cancel (SIGINT)
+            # condition occurs!
             #
             # We now have the
             # `test_infected_asyncio.test_sigint_closes_lifetime_stack()`
@@ -886,7 +1074,10 @@ def run_as_asyncio_guest(
 
             try:
                 return trio_done_fute.result()
-            except asyncio.exceptions.InvalidStateError as state_err:
+            except (
+                asyncio.InvalidStateError,
+                # asyncio.CancelledError,
+            )as state_err:
 
                 # XXX be super dupere noisy about abandonment issues!
                 aio_task: asyncio.Task = asyncio.current_task()