diff --git a/tractor/_context.py b/tractor/_context.py
index 2b539a6f..54e309e1 100644
--- a/tractor/_context.py
+++ b/tractor/_context.py
@@ -43,12 +43,17 @@ import warnings
 
 import trio
 
+# from .devx import (
+#     maybe_wait_for_debugger,
+#     pause,
+# )
 from ._exceptions import (
     # _raise_from_no_key_in_msg,
     unpack_error,
     pack_error,
     ContextCancelled,
     # MessagingError,
+    RemoteActorError,
     StreamOverrun,
 )
 from .log import get_logger
@@ -64,6 +69,164 @@ if TYPE_CHECKING:
 log = get_logger(__name__)
 
 
+async def _drain_to_final_msg(
+    ctx: Context,
+) -> list[dict]:
+
+# ) -> tuple[
+#     Any|Exception,
+#     list[dict],
+# ]:
+    raise_overrun: bool = not ctx._allow_overruns
+
+    # wait for a final context result by collecting (but
+    # basically ignoring) any bi-dir-stream msgs still in transit
+    # from the far end.
+    pre_result_drained: list[dict] = []
+    while not ctx._remote_error:
+        try:
+            # NOTE: this REPL usage actually works here dawg! Bo
+            # from .devx._debug import pause
+            # await pause()
+            # if re := ctx._remote_error:
+            #     ctx._maybe_raise_remote_err(
+            #         re,
+            #         # NOTE: obvi we don't care if we
+            #         # overran the far end if we're already
+            #         # waiting on a final result (msg).
+            #         raise_overrun_from_self=raise_overrun,
+            #     )
+
+            # TODO: bad idea?
+            # with trio.CancelScope() as res_cs:
+            #     ctx._res_scope = res_cs
+            #     msg: dict = await ctx._recv_chan.receive()
+            # if res_cs.cancelled_caught:
+
+            # from .devx._debug import pause
+            # await pause()
+            msg: dict = await ctx._recv_chan.receive()
+            ctx._result: Any = msg['return']
+            log.runtime(
+                'Context delivered final result msg:\n'
+                f'{pformat(msg)}'
+            )
+            pre_result_drained.append(msg)
+            # NOTE: we don't need to do this right?
+            # XXX: only close the rx mem chan AFTER
+            # a final result is retreived.
+            # if ctx._recv_chan:
+            #     await ctx._recv_chan.aclose()
+            break
+
+        # NOTE: we get here if the far end was
+        # `ContextCancelled` in 2 cases:
+        # 1. we requested the cancellation and thus
+        #    SHOULD NOT raise that far end error,
+        # 2. WE DID NOT REQUEST that cancel and thus
+        #    SHOULD RAISE HERE!
+        except trio.Cancelled:
+
+            # CASE 2: mask the local cancelled-error(s)
+            # only when we are sure the remote error is
+            # the source cause of this local task's
+            # cancellation.
+            if re := ctx._remote_error:
+                ctx._maybe_raise_remote_err(re)
+
+            # CASE 1: we DID request the cancel we simply
+            # continue to bubble up as normal.
+            raise
+
+        except KeyError:
+
+            if 'yield' in msg:
+                # far end task is still streaming to us so discard
+                log.warning(f'Discarding std "yield"\n{msg}')
+                pre_result_drained.append(msg)
+                continue
+
+            # TODO: work out edge cases here where
+            # a stream is open but the task also calls
+            # this?
+            # -[ ] should be a runtime error if a stream is open
+            #   right?
+            elif 'stop' in msg:
+                log.cancel(
+                    'Remote stream terminated due to "stop" msg:\n'
+                    f'{msg}'
+                )
+                pre_result_drained.append(msg)
+                continue
+
+            # internal error should never get here
+            assert msg.get('cid'), (
+                "Received internal error at portal?"
+            )
+
+            # XXX fallthrough to handle expected error XXX
+            re: Exception|None = ctx._remote_error
+            if re:
+                log.critical(
+                    'Remote ctx terminated due to "error" msg:\n'
+                    f'{re}'
+                )
+                assert msg is ctx._cancel_msg
+                # NOTE: this solved a super dupe edge case XD
+                # this was THE super duper edge case of:
+                # - local task opens a remote task,
+                # - requests remote cancellation of far end
+                #   ctx/tasks,
+                # - needs to wait for the cancel ack msg
+                #   (ctxc) or some result in the race case
+                #   where the other side's task returns
+                #   before the cancel request msg is ever
+                #   rxed and processed,
+                # - here this surrounding drain loop (which
+                #   iterates all ipc msgs until the ack or
+                #   an early result arrives) was NOT exiting
+                #   since we are the edge case: local task
+                #   does not re-raise any ctxc it receives
+                #   IFF **it** was the cancellation
+                #   requester..
+                # will raise if necessary, ow break from
+                # loop presuming any error terminates the
+                # context!
+                ctx._maybe_raise_remote_err(
+                    re,
+                    # NOTE: obvi we don't care if we
+                    # overran the far end if we're already
+                    # waiting on a final result (msg).
+                    # raise_overrun_from_self=False,
+                    raise_overrun_from_self=raise_overrun,
+                )
+
+                break  # OOOOOF, yeah obvi we need this..
+
+            # XXX we should never really get here
+            # right! since `._deliver_msg()` should
+            # always have detected an {'error': ..}
+            # msg and already called this right!?!
+            elif error := unpack_error(
+                msg=msg,
+                chan=ctx._portal.channel,
+                hide_tb=False,
+            ):
+                log.critical('SHOULD NEVER GET HERE!?')
+                assert msg is ctx._cancel_msg
+                assert error.msgdata == ctx._remote_error.msgdata
+                from .devx._debug import pause
+                await pause()
+                ctx._maybe_cancel_and_set_remote_error(error)
+                ctx._maybe_raise_remote_err(error)
+
+            else:
+                # bubble the original src key error
+                raise
+
+    return pre_result_drained
+
+
 # TODO: make this a msgspec.Struct!
 @dataclass
 class Context:
@@ -118,6 +281,7 @@ class Context:
     # which is exactly the primitive that allows for
     # cross-actor-task-supervision and thus SC.
     _scope: trio.CancelScope | None = None
+    # _res_scope: trio.CancelScope|None = None
 
     # on a clean exit there should be a final value
     # delivered from the far end "callee" task, so
@@ -205,6 +369,10 @@ class Context:
             )
         )
 
+    # @property
+    # def is_waiting_result(self) -> bool:
+    #     return bool(self._res_scope)
+
     @property
     def side(self) -> str:
         '''
@@ -247,7 +415,11 @@ class Context:
         await self.chan.send({'yield': data, 'cid': self.cid})
 
     async def send_stop(self) -> None:
-        await self.chan.send({'stop': True, 'cid': self.cid})
+        # await pause()
+        await self.chan.send({
+            'stop': True,
+            'cid': self.cid
+        })
 
     def _maybe_cancel_and_set_remote_error(
         self,
@@ -320,27 +492,37 @@ class Context:
         # XXX: set the remote side's error so that after we cancel
         # whatever task is the opener of this context it can raise
         # that error as the reason.
+        # if self._remote_error:
+        #     return
+
+        # breakpoint()
+        log.cancel(
+            'Setting remote error for ctx \n'
+            f'<= remote ctx uid: {self.chan.uid}\n'
+            f'=>\n{error}'
+        )
         self._remote_error: BaseException = error
 
         if (
             isinstance(error, ContextCancelled)
         ):
-            # always record the cancelling actor's uid since its cancellation
-            # state is linked and we want to know which process was
-            # the cause / requester of the cancellation.
-            self._canceller = error.canceller
-
             log.cancel(
                 'Remote task-context was cancelled for '
                 f'actor: {self.chan.uid}\n'
                 f'task: {self.cid}\n'
                 f'canceller: {error.canceller}\n'
             )
+            # always record the cancelling actor's uid since its cancellation
+            # state is linked and we want to know which process was
+            # the cause / requester of the cancellation.
+            # if error.canceller is None:
+            #     import pdbp; pdbp.set_trace()
+
+                # breakpoint()
+            self._canceller = error.canceller
+
 
             if self._cancel_called:
-                # from ._debug import breakpoint
-                # await breakpoint()
-
                 # this is an expected cancel request response message
                 # and we **don't need to raise it** in local cancel
                 # scope since it will potentially override a real error.
@@ -348,10 +530,11 @@ class Context:
 
         else:
             log.error(
-                f'Remote context error,\n'
-                f'remote actor: {self.chan.uid}\n'
-                f'task: {self.cid}\n'
-                f'{error}'
+                f'Remote context error:\n'
+                f'{error}\n'
+                f'{pformat(self)}\n'
+                # f'remote actor: {self.chan.uid}\n'
+                # f'cid: {self.cid}\n'
             )
             self._canceller = self.chan.uid
 
@@ -376,9 +559,11 @@ class Context:
             self._scope.cancel()
 
             # NOTE: this REPL usage actually works here dawg! Bo
-            # from .devx._debug import pause
             # await pause()
 
+        # TODO: maybe we have to use `._res_scope.cancel()` if it
+        # exists?
+
     async def cancel(
         self,
         timeout: float = 0.616,
@@ -395,6 +580,8 @@ class Context:
         log.cancel(
             f'Cancelling {side} side of context to {self.chan.uid}'
         )
+
+        # await pause()
         self._cancel_called: bool = True
 
         # caller side who entered `Portal.open_context()`
@@ -484,13 +671,11 @@ class Context:
         '''
         actor: Actor = current_actor()
 
-        # here we create a mem chan that corresponds to the
-        # far end caller / callee.
-
-        # Likewise if the surrounding context has been cancelled we error here
-        # since it likely means the surrounding block was exited or
-        # killed
-
+        # If the surrounding context has been cancelled by some
+        # task with a handle to THIS, we error here immediately
+        # since it likely means the surrounding lexical-scope has
+        # errored, been `trio.Cancelled` or at the least
+        # `Context.cancel()` was called by some task.
         if self._cancel_called:
 
             # XXX NOTE: ALWAYS RAISE any remote error here even if
@@ -503,6 +688,11 @@ class Context:
             # actually try to stream - a cancel msg was already
             # sent to the other side!
             if self._remote_error:
+                # NOTE: this is diff then calling
+                # `._maybe_raise_from_remote_msg()` specifically
+                # because any task entering this `.open_stream()`
+                # AFTER cancellation has already been requested,
+                # we DO NOT want to absorb any ctxc ACK silently!
                 raise self._remote_error
 
             # XXX NOTE: if no `ContextCancelled` has been responded
@@ -529,7 +719,7 @@ class Context:
         # to send a stop from the caller to the callee in the
         # single-direction-stream case you'll get a lookup error
         # currently.
-        ctx = actor.get_context(
+        ctx: Context = actor.get_context(
             self.chan,
             self.cid,
             msg_buffer_size=msg_buffer_size,
@@ -548,6 +738,19 @@ class Context:
                 'The underlying channel for this stream was already closed!?'
             )
 
+        # NOTE: implicitly this will call `MsgStream.aclose()` on
+        # `.__aexit__()` due to stream's parent `Channel` type!
+        #
+        # XXX NOTE XXX: ensures the stream is "one-shot use",
+        # which specifically means that on exit,
+        # - signal ``trio.EndOfChannel``/``StopAsyncIteration`` to
+        #   the far end indicating that the caller exited
+        #   the streaming context purposefully by letting
+        #   the exit block exec.
+        # - this is diff from the cancel/error case where
+        #   a cancel request from this side or an error
+        #   should be sent to the far end indicating the
+        #   stream WAS NOT just closed normally/gracefully.
         async with MsgStream(
             ctx=self,
             rx_chan=ctx._recv_chan,
@@ -567,11 +770,37 @@ class Context:
                 # await trio.lowlevel.checkpoint()
                 yield stream
 
-                # NOTE: Make the stream "one-shot use".  On exit,
-                # signal
-                # ``trio.EndOfChannel``/``StopAsyncIteration`` to
-                # the far end.
-                await stream.aclose()
+
+                # XXX: (MEGA IMPORTANT) if this is a root opened process we
+                # wait for any immediate child in debug before popping the
+                # context from the runtime msg loop otherwise inside
+                # ``Actor._push_result()`` the msg will be discarded and in
+                # the case where that msg is global debugger unlock (via
+                # a "stop" msg for a stream), this can result in a deadlock
+                # where the root is waiting on the lock to clear but the
+                # child has already cleared it and clobbered IPC.
+                #
+                # await maybe_wait_for_debugger()
+
+                # XXX TODO: pretty sure this isn't needed (see
+                # note above this block) AND will result in
+                # a double `.send_stop()` call. The only reason to
+                # put it here would be to due with "order" in
+                # terms of raising any remote error (as per
+                # directly below) or bc the stream's
+                # `.__aexit__()` block might not get run
+                # (doubtful)? Either way if we did put this back
+                # in we also need a state var to avoid the double
+                # stop-msg send..
+                #
+                # await stream.aclose()
+
+                # if re := ctx._remote_error:
+                #     ctx._maybe_raise_remote_err(
+                #         re,
+                #         raise_ctxc_from_self_call=True,
+                #     )
+                # await trio.lowlevel.checkpoint()
 
             finally:
                 if self._portal:
@@ -587,7 +816,10 @@ class Context:
     def _maybe_raise_remote_err(
         self,
         err: Exception,
-    ) -> None:
+        raise_ctxc_from_self_call: bool = False,
+        raise_overrun_from_self: bool = True,
+
+    ) -> ContextCancelled|None:
         '''
         Maybe raise a remote error depending on who (which task from
         which actor) requested a cancellation (if any).
@@ -603,13 +835,21 @@ class Context:
         # "error"-msg.
         our_uid: tuple[str, str] = current_actor().uid
         if (
-            isinstance(err, ContextCancelled)
-            and (
+            (not raise_ctxc_from_self_call
+             and isinstance(err, ContextCancelled)
+             and (
                 self._cancel_called
                 or self.chan._cancel_called
                 or self.canceller == our_uid
-                or tuple(err.canceller) == our_uid
+                or tuple(err.canceller) == our_uid)
             )
+            or
+            (not raise_overrun_from_self
+             and isinstance(err, RemoteActorError)
+             and err.msgdata['type_str'] == 'StreamOverrun'
+             and tuple(err.msgdata['sender']) == our_uid
+            )
+
         ):
             # NOTE: we set the local scope error to any "self
             # cancellation" error-response thus "absorbing"
@@ -661,77 +901,196 @@ class Context:
         assert self._portal, "Context.result() can not be called from callee!"
         assert self._recv_chan
 
-        if re := self._remote_error:
-            return self._maybe_raise_remote_err(re)
+        raise_overrun: bool = not self._allow_overruns
+        # if re := self._remote_error:
+        #     return self._maybe_raise_remote_err(
+        #         re,
+        #         # NOTE: obvi we don't care if we
+        #         # overran the far end if we're already
+        #         # waiting on a final result (msg).
+        #         raise_overrun_from_self=raise_overrun,
+        #     )
 
+        res_placeholder: int = id(self)
         if (
-            self._result == id(self)
+            self._result == res_placeholder
             and not self._remote_error
             and not self._recv_chan._closed  # type: ignore
         ):
-            # wait for a final context result consuming
-            # and discarding any bi dir stream msgs still
-            # in transit from the far end.
-            while True:
-                try:
-                    msg = await self._recv_chan.receive()
-                    self._result: Any = msg['return']
 
-                    # NOTE: we don't need to do this right?
-                    # XXX: only close the rx mem chan AFTER
-                    # a final result is retreived.
-                    # if self._recv_chan:
-                    #     await self._recv_chan.aclose()
+            # wait for a final context result by collecting (but
+            # basically ignoring) any bi-dir-stream msgs still in transit
+            # from the far end.
+            drained_msgs: list[dict] = await _drain_to_final_msg(ctx=self)
+            log.runtime(
+                'Ctx drained pre-result msgs:\n'
+                f'{drained_msgs}'
+            )
 
-                    break
+            # TODO: implement via helper func ^^^^
+            # pre_result_drained: list[dict] = []
+            # while not self._remote_error:
+            #     try:
+            #         # NOTE: this REPL usage actually works here dawg! Bo
+            #         # from .devx._debug import pause
+            #         # await pause()
+            #         # if re := self._remote_error:
+            #         #     self._maybe_raise_remote_err(
+            #         #         re,
+            #         #         # NOTE: obvi we don't care if we
+            #         #         # overran the far end if we're already
+            #         #         # waiting on a final result (msg).
+            #         #         raise_overrun_from_self=raise_overrun,
+            #         #     )
 
-                # NOTE: we get here if the far end was
-                # `ContextCancelled` in 2 cases:
-                # 1. we requested the cancellation and thus
-                #    SHOULD NOT raise that far end error,
-                # 2. WE DID NOT REQUEST that cancel and thus
-                #    SHOULD RAISE HERE!
-                except trio.Cancelled:
+            #         # TODO: bad idea?
+            #         # with trio.CancelScope() as res_cs:
+            #         #     self._res_scope = res_cs
+            #         #     msg: dict = await self._recv_chan.receive()
+            #         # if res_cs.cancelled_caught:
 
-                    # CASE 2: mask the local cancelled-error(s)
-                    # only when we are sure the remote error is the
-                    # (likely) source cause of this local runtime
-                    # task's cancellation.
-                    if re := self._remote_error:
-                        self._maybe_raise_remote_err(re)
+            #         # from .devx._debug import pause
+            #         # await pause()
+            #         msg: dict = await self._recv_chan.receive()
+            #         self._result: Any = msg['return']
+            #         log.runtime(
+            #             'Context delivered final result msg:\n'
+            #             f'{pformat(msg)}'
+            #         )
+            #         # NOTE: we don't need to do this right?
+            #         # XXX: only close the rx mem chan AFTER
+            #         # a final result is retreived.
+            #         # if self._recv_chan:
+            #         #     await self._recv_chan.aclose()
+            #         break
 
-                    # CASE 1: we DID request the cancel we simply
-                    # continue to bubble up as normal.
-                    raise
+            #     # NOTE: we get here if the far end was
+            #     # `ContextCancelled` in 2 cases:
+            #     # 1. we requested the cancellation and thus
+            #     #    SHOULD NOT raise that far end error,
+            #     # 2. WE DID NOT REQUEST that cancel and thus
+            #     #    SHOULD RAISE HERE!
+            #     except trio.Cancelled:
 
-                except KeyError:  # as msgerr:
+            #         # CASE 2: mask the local cancelled-error(s)
+            #         # only when we are sure the remote error is
+            #         # the source cause of this local task's
+            #         # cancellation.
+            #         if re := self._remote_error:
+            #             self._maybe_raise_remote_err(re)
 
-                    if 'yield' in msg:
-                        # far end task is still streaming to us so discard
-                        log.warning(f'Discarding stream delivered {msg}')
-                        continue
+            #         # CASE 1: we DID request the cancel we simply
+            #         # continue to bubble up as normal.
+            #         raise
 
-                    elif 'stop' in msg:
-                        log.debug('Remote stream terminated')
-                        continue
+            #     except KeyError:
 
-                    # internal error should never get here
-                    assert msg.get('cid'), (
-                        "Received internal error at portal?"
-                    )
+            #         if 'yield' in msg:
+            #             # far end task is still streaming to us so discard
+            #             log.warning(f'Discarding std "yield"\n{msg}')
+            #             pre_result_drained.append(msg)
+            #             continue
 
-                    if err:= unpack_error(
-                        msg,
-                        self._portal.channel
-                    ):  # from msgerr
-                        self._maybe_cancel_and_set_remote_error(err)
-                        self._maybe_raise_remote_err(err)
+            #         # TODO: work out edge cases here where
+            #         # a stream is open but the task also calls
+            #         # this?
+            #         # -[ ] should be a runtime error if a stream is open
+            #         #   right?
+            #         elif 'stop' in msg:
+            #             log.cancel(
+            #                 'Remote stream terminated due to "stop" msg:\n'
+            #                 f'{msg}'
+            #             )
+            #             pre_result_drained.append(msg)
+            #             continue
 
-                    else:
-                        raise
+            #         # internal error should never get here
+            #         assert msg.get('cid'), (
+            #             "Received internal error at portal?"
+            #         )
 
-        if re := self._remote_error:
-            return self._maybe_raise_remote_err(re)
+            #         # XXX fallthrough to handle expected error XXX
+            #         re: Exception|None = self._remote_error
+            #         if re:
+            #             log.critical(
+            #                 'Remote ctx terminated due to "error" msg:\n'
+            #                 f'{re}'
+            #             )
+            #             assert msg is self._cancel_msg
+            #             # NOTE: this solved a super dupe edge case XD
+            #             # this was THE super duper edge case of:
+            #             # - local task opens a remote task,
+            #             # - requests remote cancellation of far end
+            #             #   ctx/tasks,
+            #             # - needs to wait for the cancel ack msg
+            #             #   (ctxc) or some result in the race case
+            #             #   where the other side's task returns
+            #             #   before the cancel request msg is ever
+            #             #   rxed and processed,
+            #             # - here this surrounding drain loop (which
+            #             #   iterates all ipc msgs until the ack or
+            #             #   an early result arrives) was NOT exiting
+            #             #   since we are the edge case: local task
+            #             #   does not re-raise any ctxc it receives
+            #             #   IFF **it** was the cancellation
+            #             #   requester..
+            #             # will raise if necessary, ow break from
+            #             # loop presuming any error terminates the
+            #             # context!
+            #             self._maybe_raise_remote_err(
+            #                 re,
+            #                 # NOTE: obvi we don't care if we
+            #                 # overran the far end if we're already
+            #                 # waiting on a final result (msg).
+            #                 # raise_overrun_from_self=False,
+            #                 raise_overrun_from_self=raise_overrun,
+            #             )
+
+            #             break  # OOOOOF, yeah obvi we need this..
+
+            #         # XXX we should never really get here
+            #         # right! since `._deliver_msg()` should
+            #         # always have detected an {'error': ..}
+            #         # msg and already called this right!?!
+            #         elif error := unpack_error(
+            #             msg=msg,
+            #             chan=self._portal.channel,
+            #             hide_tb=False,
+            #         ):
+            #             log.critical('SHOULD NEVER GET HERE!?')
+            #             assert msg is self._cancel_msg
+            #             assert error.msgdata == self._remote_error.msgdata
+            #             from .devx._debug import pause
+            #             await pause()
+            #             self._maybe_cancel_and_set_remote_error(error)
+            #             self._maybe_raise_remote_err(error)
+
+            #         else:
+            #             # bubble the original src key error
+            #             raise
+
+        if (
+            (re := self._remote_error)
+            and self._result == res_placeholder
+        ):
+            maybe_err: Exception|None = self._maybe_raise_remote_err(
+                re,
+                # NOTE: obvi we don't care if we
+                # overran the far end if we're already
+                # waiting on a final result (msg).
+                # raise_overrun_from_self=False,
+                raise_overrun_from_self=(
+                    raise_overrun
+                    and
+                    # only when we ARE NOT the canceller
+                    # should we raise overruns, bc ow we're
+                    # raising something we know might happen
+                    # during cancellation ;)
+                    (not self._cancel_called)
+                ),
+            )
+            if maybe_err:
+                self._result = maybe_err
 
         return self._result
 
@@ -779,7 +1138,7 @@ class Context:
             while self._overflow_q:
                 # NOTE: these msgs should never be errors since we always do
                 # the check prior to checking if we're in an overrun state
-                # inside ``.deliver_msg()``.
+                # inside ``._deliver_msg()``.
                 msg = self._overflow_q.popleft()
                 try:
                     await self._send_chan.send(msg)
@@ -830,34 +1189,50 @@ class Context:
         messages are eventually sent if possible.
 
         '''
-        cid = self.cid
-        chan = self.chan
-        uid = chan.uid
+        cid: str = self.cid
+        chan: Channel = self.chan
+        from_uid: tuple[str, str]  = chan.uid
         send_chan: trio.MemorySendChannel = self._send_chan
 
-        log.runtime(
-            f"Delivering {msg} from {uid} to caller {cid}"
-        )
-
-        if (
-            msg.get('error')  # check for field
-            and (
-                error := unpack_error(
-                    msg,
-                    self.chan,
-                )
+        if re := unpack_error(
+            msg,
+            self.chan,
+        ):
+            log.error(
+                f'Delivering error-msg from {from_uid} to caller {cid}'
+                f'{re}'
             )
-        ):
             self._cancel_msg = msg
-            self._maybe_cancel_and_set_remote_error(error)
+            self._maybe_cancel_and_set_remote_error(re)
 
-        if (
-            self._in_overrun
-        ):
+            # XXX NEVER do this XXX..!!
+            # bc if the error is a ctxc and there is a task
+            # waiting on `.result()` we need the msg to be sent
+            # over the `send_chan`/`._recv_chan` so that the error
+            # is relayed to that waiter task..
+            # return True
+            #
+            # XXX ALSO NO!! XXX
+            # if self._remote_error:
+            #     self._maybe_raise_remote_err(error)
+
+        if self._in_overrun:
+            log.warning(
+                f'Capturing overrun-msg from {from_uid} to caller {cid}'
+                f'{msg}'
+            )
             self._overflow_q.append(msg)
             return False
 
         try:
+            log.runtime(
+                f'Delivering IPC `Context` msg:\n'
+                f'<= {from_uid}\n'
+                f'=> caller: {cid}\n'
+                f'{msg}'
+            )
+            # from .devx._debug import pause
+            # await pause()
             send_chan.send_nowait(msg)
             return True
             # if an error is deteced we should always
@@ -890,7 +1265,8 @@ class Context:
             lines = [
                 f'OVERRUN on actor-task context {cid}@{local_uid}!\n'
                 # TODO: put remote task name here if possible?
-                f'remote sender actor: {uid}',
+                f'sender: {from_uid}',
+                f'msg: {msg}',
                 # TODO: put task func name here and maybe an arrow
                 # from sender to overrunner?
                 # f'local task {self.func_name}'
@@ -926,11 +1302,19 @@ class Context:
                         # anything different.
                         return False
             else:
+                # raise local overrun and immediately pack as IPC
+                # msg for far end.
                 try:
-                    raise StreamOverrun(text)
+                    raise StreamOverrun(
+                        text,
+                        sender=from_uid,
+                    )
                 except StreamOverrun as err:
-                    err_msg = pack_error(err)
-                    err_msg['cid'] = cid
+                    err_msg: dict[str, dict] = pack_error(
+                        err,
+                        cid=cid,
+                    )
+                    # err_msg['cid']: str = cid
                     try:
                         await chan.send(err_msg)
                     except trio.BrokenResourceError:
diff --git a/tractor/_portal.py b/tractor/_portal.py
index fc094593..97b89b3c 100644
--- a/tractor/_portal.py
+++ b/tractor/_portal.py
@@ -39,7 +39,15 @@ import trio
 from async_generator import asynccontextmanager
 
 from .trionics import maybe_open_nursery
-from ._state import current_actor
+from .devx import (
+    # acquire_debug_lock,
+    # pause,
+    maybe_wait_for_debugger,
+)
+from ._state import (
+    current_actor,
+    debug_mode,
+)
 from ._ipc import Channel
 from .log import get_logger
 from .msg import NamespacePath
@@ -48,6 +56,7 @@ from ._exceptions import (
     unpack_error,
     NoResult,
     ContextCancelled,
+    RemoteActorError,
 )
 from ._context import (
     Context,
@@ -468,7 +477,6 @@ class Portal:
             ctx._started_called: bool = True
 
         except KeyError as src_error:
-
             _raise_from_no_key_in_msg(
                 ctx=ctx,
                 msg=msg,
@@ -493,6 +501,33 @@ class Portal:
                 # in enter tuple.
                 yield ctx, first
 
+                # between the caller exiting and arriving here the
+                # far end may have sent a ctxc-msg or other error,
+                # so check for it here immediately and maybe raise
+                # so as to engage the ctxc handling block below!
+                # if re := ctx._remote_error:
+                #     maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
+                #         re,
+
+                #         # TODO: do we want this to always raise?
+                #         # - means that on self-ctxc, if/when the
+                #         #   block is exited before the msg arrives
+                #         #   but then the msg during __exit__
+                #         #   calling we may not activate the
+                #         #   ctxc-handler block below? should we
+                #         #   be?
+                #         # - if there's a remote error that arrives
+                #         #   after the child has exited, we won't
+                #         #   handle until the `finally:` block
+                #         #   where `.result()` is always called,
+                #         #   again in which case we handle it
+                #         #   differently then in the handler block
+                #         #   that would normally engage from THIS
+                #         #   block?
+                #         raise_ctxc_from_self_call=True,
+                #     )
+                #     assert maybe_ctxc
+
                 # when in allow_overruns mode there may be
                 # lingering overflow sender tasks remaining?
                 if nurse.child_tasks:
@@ -538,7 +573,7 @@ class Portal:
         #   `.canceller: tuple[str, str]` to be same value as
         #   caught here in a `ContextCancelled.canceller`.
         #
-        # Again, there are 2 cases:
+        # AGAIN to restate the above, there are 2 cases:
         #
         # 1-some other context opened in this `.open_context()`
         #   block cancelled due to a self or peer cancellation
@@ -554,6 +589,16 @@ class Portal:
         except ContextCancelled as ctxc:
             scope_err = ctxc
 
+            # XXX TODO XXX: FIX THIS debug_mode BUGGGG!!!
+            # using this code and then resuming the REPL will
+            # cause a SIGINT-ignoring HANG!
+            # -> prolly due to a stale debug lock entry..
+            # -[ ] USE `.stackscope` to demonstrate that (possibly
+            #   documenting it as a definittive example of
+            #   debugging the tractor-runtime itself using it's
+            #   own `.devx.` tooling!
+            # await pause()
+
             # CASE 2: context was cancelled by local task calling
             # `.cancel()`, we don't raise and the exit block should
             # exit silently.
@@ -561,18 +606,23 @@ class Portal:
                 ctx._cancel_called
                 and (
                     ctxc is ctx._remote_error
-                    or
-                    ctxc.canceller is self.canceller
+                    # ctxc.msgdata == ctx._remote_error.msgdata
+
+                    # TODO: uhh `Portal.canceller` ain't a thangg
+                    # dawg? (was `self.canceller` before?!?)
+                    and
+                    ctxc.canceller == self.actor.uid
                 )
             ):
-                log.debug(
-                    f'Context {ctx} cancelled gracefully with:\n'
+                log.cancel(
+                    f'Context (cid=[{ctx.cid[-6:]}..] cancelled gracefully with:\n'
                     f'{ctxc}'
                 )
             # CASE 1: this context was never cancelled via a local
             # task (tree) having called `Context.cancel()`, raise
             # the error since it was caused by someone else!
             else:
+                # await pause()
                 raise
 
         # the above `._scope` can be cancelled due to:
@@ -601,8 +651,8 @@ class Portal:
             trio.Cancelled,  # NOTE: NOT from inside the ctx._scope
             KeyboardInterrupt,
 
-        ) as err:
-            scope_err = err
+        ) as caller_err:
+            scope_err = caller_err
 
             # XXX: ALWAYS request the context to CANCEL ON any ERROR.
             # NOTE: `Context.cancel()` is conversely NEVER CALLED in
@@ -610,11 +660,26 @@ class Portal:
             # handled in the block above!
             log.cancel(
                 'Context cancelled for task due to\n'
-                f'{err}\n'
+                f'{caller_err}\n'
                 'Sending cancel request..\n'
                 f'task:{cid}\n'
                 f'actor:{uid}'
             )
+
+            if debug_mode():
+                log.pdb(
+                    'Delaying `ctx.cancel()` until debug lock '
+                    'acquired..'
+                )
+                # async with acquire_debug_lock(self.actor.uid):
+                #     pass
+                # TODO: factor ^ into below for non-root cases?
+                await maybe_wait_for_debugger()
+                log.pdb(
+                    'Acquired debug lock! '
+                    'Calling `ctx.cancel()`!'
+                )
+
             try:
                 await ctx.cancel()
             except trio.BrokenResourceError:
@@ -628,6 +693,33 @@ class Portal:
 
         # no local scope error, the "clean exit with a result" case.
         else:
+            # between the caller exiting and arriving here the
+            # far end may have sent a ctxc-msg or other error,
+            # so check for it here immediately and maybe raise
+            # so as to engage the ctxc handling block below!
+            # if re := ctx._remote_error:
+            #     maybe_ctxc: ContextCancelled|None = ctx._maybe_raise_remote_err(
+            #         re,
+
+            #         # TODO: do we want this to always raise?
+            #         # - means that on self-ctxc, if/when the
+            #         #   block is exited before the msg arrives
+            #         #   but then the msg during __exit__
+            #         #   calling we may not activate the
+            #         #   ctxc-handler block below? should we
+            #         #   be?
+            #         # - if there's a remote error that arrives
+            #         #   after the child has exited, we won't
+            #         #   handle until the `finally:` block
+            #         #   where `.result()` is always called,
+            #         #   again in which case we handle it
+            #         #   differently then in the handler block
+            #         #   that would normally engage from THIS
+            #         #   block?
+            #         raise_ctxc_from_self_call=True,
+            #     )
+            #     assert maybe_ctxc
+
             if ctx.chan.connected():
                 log.info(
                     'Waiting on final context-task result for\n'
@@ -644,13 +736,8 @@ class Portal:
                 # As per `Context._deliver_msg()`, that error IS
                 # ALWAYS SET any time "callee" side fails and causes "caller
                 # side" cancellation via a `ContextCancelled` here.
-                # result = await ctx.result()
                 try:
-                    result = await ctx.result()
-                    log.runtime(
-                        f'Context {fn_name} returned value from callee:\n'
-                        f'`{result}`'
-                    )
+                    result_or_err: Exception|Any = await ctx.result()
                 except BaseException as berr:
                     # on normal teardown, if we get some error
                     # raised in `Context.result()` we still want to
@@ -662,7 +749,48 @@ class Portal:
                     scope_err = berr
                     raise
 
+                # an exception type boxed in a `RemoteActorError`
+                # is returned (meaning it was obvi not raised).
+                msgdata: str|None = getattr(
+                    result_or_err,
+                    'msgdata',
+                    None
+                )
+                # yes! this worx Bp
+                # from .devx import _debug
+                # await _debug.pause()
+                match (msgdata, result_or_err):
+                    case (
+                        {'tb_str': tbstr},
+                        ContextCancelled(),
+                    ):
+                        log.cancel(tbstr)
+
+                    case (
+                        {'tb_str': tbstr},
+                        RemoteActorError(),
+                    ):
+                        log.exception(
+                            f'Context `{fn_name}` remotely errored:\n'
+                            f'`{tbstr}`'
+                        )
+                    case (None, _):
+                        log.runtime(
+                            f'Context {fn_name} returned value from callee:\n'
+                            f'`{result_or_err}`'
+                        )
+
         finally:
+            # XXX: (MEGA IMPORTANT) if this is a root opened process we
+            # wait for any immediate child in debug before popping the
+            # context from the runtime msg loop otherwise inside
+            # ``Actor._push_result()`` the msg will be discarded and in
+            # the case where that msg is global debugger unlock (via
+            # a "stop" msg for a stream), this can result in a deadlock
+            # where the root is waiting on the lock to clear but the
+            # child has already cleared it and clobbered IPC.
+            await maybe_wait_for_debugger()
+
             # though it should be impossible for any tasks
             # operating *in* this scope to have survived
             # we tear down the runtime feeder chan last
@@ -707,6 +835,10 @@ class Portal:
                 # out any exception group or legit (remote) ctx
                 # error that sourced from the remote task or its
                 # runtime.
+                #
+                # NOTE: further, this should be the only place the
+                # underlying feeder channel is
+                # once-and-only-CLOSED!
                 with trio.CancelScope(shield=True):
                     await ctx._recv_chan.aclose()
 
@@ -747,6 +879,9 @@ class Portal:
 
             # FINALLY, remove the context from runtime tracking and
             # exit!
+            log.runtime(
+                f'Exiting context opened with {ctx.chan.uid}'
+            )
             self.actor._contexts.pop(
                 (self.channel.uid, ctx.cid),
                 None,
diff --git a/tractor/_streaming.py b/tractor/_streaming.py
index 4530e144..e8f735ec 100644
--- a/tractor/_streaming.py
+++ b/tractor/_streaming.py
@@ -21,8 +21,9 @@ The machinery and types behind ``Context.open_stream()``
 
 '''
 from __future__ import annotations
-import inspect
 from contextlib import asynccontextmanager as acm
+import inspect
+from pprint import pformat
 from typing import (
     Any,
     Callable,
@@ -35,6 +36,7 @@ import trio
 
 from ._exceptions import (
     _raise_from_no_key_in_msg,
+    ContextCancelled,
 )
 from .log import get_logger
 from .trionics import (
@@ -84,8 +86,8 @@ class MsgStream(trio.abc.Channel):
         self._broadcaster = _broadcaster
 
         # flag to denote end of stream
-        self._eoc: bool = False
-        self._closed: bool = False
+        self._eoc: bool|trio.EndOfChannel = False
+        self._closed: bool|trio.ClosedResourceError = False
 
     # delegate directly to underlying mem channel
     def receive_nowait(self):
@@ -93,6 +95,9 @@ class MsgStream(trio.abc.Channel):
         try:
             return msg['yield']
         except KeyError as kerr:
+            # if 'return' in msg:
+            #     return msg
+
             _raise_from_no_key_in_msg(
                 ctx=self._ctx,
                 msg=msg,
@@ -122,30 +127,43 @@ class MsgStream(trio.abc.Channel):
         # see ``.aclose()`` for notes on the old behaviour prior to
         # introducing this
         if self._eoc:
-            raise trio.EndOfChannel
+            raise self._eoc
+            # raise trio.EndOfChannel
 
         if self._closed:
-            raise trio.ClosedResourceError('This stream was closed')
+            raise self._closed
+            # raise trio.ClosedResourceError(
+            #     'This stream was already closed'
+            # )
 
+        src_err: Exception|None = None
         try:
-            msg = await self._rx_chan.receive()
-            return msg['yield']
+            try:
+                msg = await self._rx_chan.receive()
+                return msg['yield']
 
-        except KeyError as kerr:
-            _raise_from_no_key_in_msg(
-                ctx=self._ctx,
-                msg=msg,
-                src_err=kerr,
-                log=log,
-                expect_key='yield',
-                stream=self,
-            )
+            except KeyError as kerr:
+                src_err = kerr
 
+                # NOTE: may raise any of the below error types
+                # includg EoC when a 'stop' msg is found.
+                _raise_from_no_key_in_msg(
+                    ctx=self._ctx,
+                    msg=msg,
+                    src_err=kerr,
+                    log=log,
+                    expect_key='yield',
+                    stream=self,
+                )
+
+        # XXX: we close the stream on any of these error conditions:
         except (
-            trio.ClosedResourceError,  # by self._rx_chan
+            # trio.ClosedResourceError,  # by self._rx_chan
             trio.EndOfChannel,  # by self._rx_chan or `stop` msg from far end
-        ):
-            # XXX: we close the stream on any of these error conditions:
+        ) as eoc:
+            src_err = eoc
+            self._eoc = eoc
+            # await trio.sleep(1)
 
             # a ``ClosedResourceError`` indicates that the internal
             # feeder memory receive channel was closed likely by the
@@ -168,14 +186,53 @@ class MsgStream(trio.abc.Channel):
             # closing this stream and not flushing a final value to
             # remaining (clone) consumers who may not have been
             # scheduled to receive it yet.
+            # try:
+            #     maybe_err_msg_or_res: dict = self._rx_chan.receive_nowait()
+            #     if maybe_err_msg_or_res:
+            #         log.warning(
+            #             'Discarding un-processed msg:\n'
+            #             f'{maybe_err_msg_or_res}'
+            #         )
+            # except trio.WouldBlock:
+            #     # no queued msgs that might be another remote
+            #     # error, so just raise the original EoC
+            #     pass
 
-            # when the send is closed we assume the stream has
-            # terminated and signal this local iterator to stop
-            await self.aclose()
+            # raise eoc
 
-            raise  # propagate
+        except trio.ClosedResourceError as cre:  # by self._rx_chan
+            src_err = cre
+            log.warning(
+                '`Context._rx_chan` was already closed?'
+            )
+            self._closed = cre
 
-    async def aclose(self):
+        # when the send is closed we assume the stream has
+        # terminated and signal this local iterator to stop
+        drained: list[Exception|dict] = await self.aclose()
+        if drained:
+            log.warning(
+                'Drained context msgs during closure:\n'
+                f'{drained}'
+            )
+        # TODO: pass these to the `._ctx._drained_msgs: deque`
+        # and then iterate them as part of any `.result()` call?
+
+        # NOTE XXX: if the context was cancelled or remote-errored
+        # but we received the stream close msg first, we
+        # probably want to instead raise the remote error
+        # over the end-of-stream connection error since likely
+        # the remote error was the source cause?
+        ctx: Context = self._ctx
+        if re := ctx._remote_error:
+            ctx._maybe_raise_remote_err(
+                re,
+                raise_ctxc_from_self_call=True,
+            )
+
+        raise src_err  # propagate
+
+    async def aclose(self) -> list[Exception|dict]:
         '''
         Cancel associated remote actor task and local memory channel on
         close.
@@ -185,15 +242,55 @@ class MsgStream(trio.abc.Channel):
         # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
         rx_chan = self._rx_chan
 
-        if rx_chan._closed:
-            log.cancel(f"{self} is already closed")
+        if (
+            rx_chan._closed
+            or
+            self._closed
+        ):
+            log.cancel(
+                f'`MsgStream` is already closed\n'
+                f'.cid: {self._ctx.cid}\n'
+                f'._rx_chan`: {rx_chan}\n'
+                f'._eoc: {self._eoc}\n'
+                f'._closed: {self._eoc}\n'
+            )
 
             # this stream has already been closed so silently succeed as
             # per ``trio.AsyncResource`` semantics.
             # https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
-            return
+            return []
 
-        self._eoc = True
+        ctx: Context = self._ctx
+        # caught_eoc: bool = False
+        drained: list[Exception|dict] = []
+        while not drained:
+            try:
+                maybe_final_msg = self.receive_nowait()
+                if maybe_final_msg:
+                    log.cancel(
+                        'Drained un-processed stream msg:\n'
+                        f'{pformat(maybe_final_msg)}'
+                    )
+                    # TODO: inject into parent `Context` buf?
+                    drained.append(maybe_final_msg)
+
+            except trio.WouldBlock as be:
+                drained.append(be)
+                break
+
+            except trio.EndOfChannel as eoc:
+                drained.append(eoc)
+                # caught_eoc = True
+                self._eoc: bool = eoc
+                break
+
+            except ContextCancelled as ctxc:
+                log.cancel(
+                    'Context was cancelled during stream closure:\n'
+                    f'canceller: {ctxc.canceller}\n'
+                    f'{pformat(ctxc.msgdata)}'
+                )
+                break
 
         # NOTE: this is super subtle IPC messaging stuff:
         # Relay stop iteration to far end **iff** we're
@@ -224,26 +321,33 @@ class MsgStream(trio.abc.Channel):
         except (
             trio.BrokenResourceError,
             trio.ClosedResourceError
-        ):
+        ) as re:
             # the underlying channel may already have been pulled
             # in which case our stop message is meaningless since
             # it can't traverse the transport.
-            ctx = self._ctx
             log.warning(
                 f'Stream was already destroyed?\n'
                 f'actor: {ctx.chan.uid}\n'
                 f'ctx id: {ctx.cid}'
             )
+            drained.append(re)
+            self._closed = re
 
-        self._closed = True
+        # if caught_eoc:
+        #     # from .devx import _debug
+        #     # await _debug.pause()
+        #     with trio.CancelScope(shield=True):
+        #         await rx_chan.aclose()
 
-        # Do we close the local mem chan ``self._rx_chan`` ??!?
+        # self._eoc: bool = caught_eoc
 
-        # NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``!
-        # BECAUSE this same core-msg-loop mem recv-chan is used to deliver
-        # the potential final result from the surrounding inter-actor
-        # `Context` so we don't want to close it until that context has
-        # run to completion.
+        # ?XXX WAIT, why do we not close the local mem chan `._rx_chan` XXX?
+        # => NO, DEFINITELY NOT! <=
+        # if we're a bi-dir ``MsgStream`` BECAUSE this same
+        # core-msg-loop mem recv-chan is used to deliver the
+        # potential final result from the surrounding inter-actor
+        # `Context` so we don't want to close it until that
+        # context has run to completion.
 
         # XXX: Notes on old behaviour:
         # await rx_chan.aclose()
@@ -272,6 +376,8 @@ class MsgStream(trio.abc.Channel):
         # runtime's closure of ``rx_chan`` in the case where we may
         # still need to consume msgs that are "in transit" from the far
         # end (eg. for ``Context.result()``).
+        # self._closed = True
+        return drained
 
     @acm
     async def subscribe(
@@ -337,9 +443,13 @@ class MsgStream(trio.abc.Channel):
             raise self._ctx._remote_error  # from None
 
         if self._closed:
-            raise trio.ClosedResourceError('This stream was already closed')
+            raise self._closed
+            # raise trio.ClosedResourceError('This stream was already closed')
 
-        await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})
+        await self._ctx.chan.send({
+            'yield': data,
+            'cid': self._ctx.cid,
+        })
 
 
 def stream(func: Callable) -> Callable: