diff --git a/tractor/_context.py b/tractor/_context.py
index 51b23302..02dcac39 100644
--- a/tractor/_context.py
+++ b/tractor/_context.py
@@ -53,7 +53,14 @@ from ._exceptions import (
     _raise_from_no_key_in_msg,
 )
 from .log import get_logger
-from .msg import NamespacePath
+from .msg import (
+    NamespacePath,
+    Msg,
+    Return,
+    Started,
+    Stop,
+    Yield,
+)
 from ._ipc import Channel
 from ._streaming import MsgStream
 from ._state import (
@@ -96,7 +103,8 @@ async def _drain_to_final_msg(
     # wait for a final context result by collecting (but
     # basically ignoring) any bi-dir-stream msgs still in transit
     # from the far end.
-    pre_result_drained: list[dict] = []
+    # pre_result_drained: list[dict] = []
+    pre_result_drained: list[Msg] = []
     while not (
         ctx.maybe_error
         and not ctx._final_result_is_set()
@@ -155,7 +163,10 @@ async def _drain_to_final_msg(
             #     await pause()
 
             # pray to the `trio` gawds that we're corrent with this
-            msg: dict = await ctx._recv_chan.receive()
+            # msg: dict = await ctx._recv_chan.receive()
+            msg: Msg = await ctx._recv_chan.receive()
+            # always capture unexpected/non-result msgs
+            pre_result_drained.append(msg)
 
         # NOTE: we get here if the far end was
         # `ContextCancelled` in 2 cases:
@@ -175,24 +186,31 @@ async def _drain_to_final_msg(
             # continue to bubble up as normal.
             raise
 
-        try:
-            ctx._result: Any = msg['return']
-            log.runtime(
-                'Context delivered final draining msg:\n'
-                f'{pformat(msg)}'
-            )
-            # XXX: only close the rx mem chan AFTER
-            # a final result is retreived.
-            # if ctx._recv_chan:
-            #     await ctx._recv_chan.aclose()
-            # TODO: ^ we don't need it right?
-            break
+        match msg:
+            case Return(
+                cid=cid,
+                pld=res,
+            ):
+        # try:
+            # ctx._result: Any = msg['return']
+            # ctx._result: Any = msg.pld
+                ctx._result: Any = res
+                log.runtime(
+                    'Context delivered final draining msg:\n'
+                    f'{pformat(msg)}'
+                )
+                # XXX: only close the rx mem chan AFTER
+                # a final result is retreived.
+                # if ctx._recv_chan:
+                #     await ctx._recv_chan.aclose()
+                # TODO: ^ we don't need it right?
+                break
 
-        except KeyError:
-            # always capture unexpected/non-result msgs
-            pre_result_drained.append(msg)
+        # except KeyError:
+        # except AttributeError:
+            case Yield():
+            # if 'yield' in msg:
 
-            if 'yield' in msg:
                 # far end task is still streaming to us so discard
                 # and report per local context state.
                 if (
@@ -238,9 +256,10 @@ async def _drain_to_final_msg(
             # TODO: work out edge cases here where
             # a stream is open but the task also calls
             # this?
-            # -[ ] should be a runtime error if a stream is open
-            #   right?
-            elif 'stop' in msg:
+            # -[ ] should be a runtime error if a stream is open right?
+            # Stop()
+            case Stop():
+            # elif 'stop' in msg:
                 log.cancel(
                     'Remote stream terminated due to "stop" msg:\n\n'
                     f'{pformat(msg)}\n'
@@ -249,78 +268,80 @@ async def _drain_to_final_msg(
 
             # It's an internal error if any other msg type without
             # a`'cid'` field arrives here!
-            if not msg.get('cid'):
-                raise InternalError(
-                    'Unexpected cid-missing msg?\n\n'
-                    f'{msg}\n'
-                )
+            case _:
+            # if not msg.get('cid'):
+                if not msg.cid:
+                    raise InternalError(
+                        'Unexpected cid-missing msg?\n\n'
+                        f'{msg}\n'
+                    )
 
-            # XXX fallthrough to handle expected error XXX
-            # TODO: replace this with `ctx.maybe_raise()`
-            #
-            # TODO: would this be handier for this case maybe?
-            # async with maybe_raise_on_exit() as raises:
-            #     if raises:
-            #         log.error('some msg about raising..')
+                # XXX fallthrough to handle expected error XXX
+                # TODO: replace this with `ctx.maybe_raise()`
+                #
+                # TODO: would this be handier for this case maybe?
+                # async with maybe_raise_on_exit() as raises:
+                #     if raises:
+                #         log.error('some msg about raising..')
 
-            re: Exception|None = ctx._remote_error
-            if re:
-                log.critical(
-                    'Remote ctx terminated due to "error" msg:\n'
-                    f'{re}'
-                )
-                assert msg is ctx._cancel_msg
-                # NOTE: this solved a super dupe edge case XD
-                # this was THE super duper edge case of:
-                # - local task opens a remote task,
-                # - requests remote cancellation of far end
-                #   ctx/tasks,
-                # - needs to wait for the cancel ack msg
-                #   (ctxc) or some result in the race case
-                #   where the other side's task returns
-                #   before the cancel request msg is ever
-                #   rxed and processed,
-                # - here this surrounding drain loop (which
-                #   iterates all ipc msgs until the ack or
-                #   an early result arrives) was NOT exiting
-                #   since we are the edge case: local task
-                #   does not re-raise any ctxc it receives
-                #   IFF **it** was the cancellation
-                #   requester..
-                # will raise if necessary, ow break from
-                # loop presuming any error terminates the
-                # context!
-                ctx._maybe_raise_remote_err(
-                    re,
-                    # NOTE: obvi we don't care if we
-                    # overran the far end if we're already
-                    # waiting on a final result (msg).
-                    # raise_overrun_from_self=False,
-                    raise_overrun_from_self=raise_overrun,
-                )
+                re: Exception|None = ctx._remote_error
+                if re:
+                    log.critical(
+                        'Remote ctx terminated due to "error" msg:\n'
+                        f'{re}'
+                    )
+                    assert msg is ctx._cancel_msg
+                    # NOTE: this solved a super dupe edge case XD
+                    # this was THE super duper edge case of:
+                    # - local task opens a remote task,
+                    # - requests remote cancellation of far end
+                    #   ctx/tasks,
+                    # - needs to wait for the cancel ack msg
+                    #   (ctxc) or some result in the race case
+                    #   where the other side's task returns
+                    #   before the cancel request msg is ever
+                    #   rxed and processed,
+                    # - here this surrounding drain loop (which
+                    #   iterates all ipc msgs until the ack or
+                    #   an early result arrives) was NOT exiting
+                    #   since we are the edge case: local task
+                    #   does not re-raise any ctxc it receives
+                    #   IFF **it** was the cancellation
+                    #   requester..
+                    # will raise if necessary, ow break from
+                    # loop presuming any error terminates the
+                    # context!
+                    ctx._maybe_raise_remote_err(
+                        re,
+                        # NOTE: obvi we don't care if we
+                        # overran the far end if we're already
+                        # waiting on a final result (msg).
+                        # raise_overrun_from_self=False,
+                        raise_overrun_from_self=raise_overrun,
+                    )
 
-                break  # OOOOOF, yeah obvi we need this..
+                    break  # OOOOOF, yeah obvi we need this..
 
-            # XXX we should never really get here
-            # right! since `._deliver_msg()` should
-            # always have detected an {'error': ..}
-            # msg and already called this right!?!
-            elif error := unpack_error(
-                msg=msg,
-                chan=ctx._portal.channel,
-                hide_tb=False,
-            ):
-                log.critical('SHOULD NEVER GET HERE!?')
-                assert msg is ctx._cancel_msg
-                assert error.msgdata == ctx._remote_error.msgdata
-                from .devx._debug import pause
-                await pause()
-                ctx._maybe_cancel_and_set_remote_error(error)
-                ctx._maybe_raise_remote_err(error)
+                # XXX we should never really get here
+                # right! since `._deliver_msg()` should
+                # always have detected an {'error': ..}
+                # msg and already called this right!?!
+                elif error := unpack_error(
+                    msg=msg,
+                    chan=ctx._portal.channel,
+                    hide_tb=False,
+                ):
+                    log.critical('SHOULD NEVER GET HERE!?')
+                    assert msg is ctx._cancel_msg
+                    assert error.msgdata == ctx._remote_error.msgdata
+                    from .devx._debug import pause
+                    await pause()
+                    ctx._maybe_cancel_and_set_remote_error(error)
+                    ctx._maybe_raise_remote_err(error)
 
-            else:
-                # bubble the original src key error
-                raise
+                else:
+                    # bubble the original src key error
+                    raise
     else:
         log.cancel(
             'Skipping `MsgStream` drain since final outcome is set\n\n'
@@ -710,10 +731,14 @@ class Context:
 
     async def send_stop(self) -> None:
         # await pause()
-        await self.chan.send({
-            'stop': True,
-            'cid': self.cid
-        })
+        # await self.chan.send({
+        #     # Stop(
+        #     'stop': True,
+        #     'cid': self.cid
+        # })
+        await self.chan.send(
+            Stop(cid=self.cid)
+        )
 
     def _maybe_cancel_and_set_remote_error(
         self,
@@ -1398,17 +1423,19 @@ class Context:
             for msg in drained_msgs:
 
                 # TODO: mask this by default..
-                if 'return' in msg:
+                # if 'return' in msg:
+                if isinstance(msg, Return):
                     # from .devx import pause
                     # await pause()
-                    raise InternalError(
+                    # raise InternalError(
+                    log.warning(
                         'Final `return` msg should never be drained !?!?\n\n'
                         f'{msg}\n'
                     )
 
             log.cancel(
                 'Ctx drained pre-result msgs:\n'
-                f'{drained_msgs}'
+                f'{pformat(drained_msgs)}'
             )
 
         self.maybe_raise(
@@ -1616,7 +1643,18 @@ class Context:
                 f'called `.started()` twice on context with {self.chan.uid}'
             )
 
-        await self.chan.send({'started': value, 'cid': self.cid})
+        # await self.chan.send(
+        #     {
+        #         'started': value,
+        #          'cid': self.cid,
+        #     }
+        # )
+        await self.chan.send(
+            Started(
+                cid=self.cid,
+                pld=value,
+            )
+        )
         self._started_called = True
 
     async def _drain_overflows(
@@ -1671,7 +1709,8 @@ class Context:
 
     async def _deliver_msg(
         self,
-        msg: dict,
+        # msg: dict,
+        msg: Msg,
 
     ) -> bool:
         '''
@@ -1855,7 +1894,7 @@ class Context:
                         # anything different.
                         return False
             else:
-                txt += f'\n{msg}\n'
+                # txt += f'\n{msg}\n'
                 # raise local overrun and immediately pack as IPC
                 # msg for far end.
                 try:
@@ -1986,15 +2025,17 @@ async def open_context_from_portal(
     )
 
     assert ctx._remote_func_type == 'context'
-    msg: dict = await ctx._recv_chan.receive()
+    msg: Started = await ctx._recv_chan.receive()
 
     try:
         # the "first" value here is delivered by the callee's
         # ``Context.started()`` call.
-        first: Any = msg['started']
+        # first: Any = msg['started']
+        first: Any = msg.pld
         ctx._started_called: bool = True
 
-    except KeyError as src_error:
+    # except KeyError as src_error:
+    except AttributeError as src_error:
         _raise_from_no_key_in_msg(
             ctx=ctx,
             msg=msg,
diff --git a/tractor/_entry.py b/tractor/_entry.py
index 21c9ae48..bf719abb 100644
--- a/tractor/_entry.py
+++ b/tractor/_entry.py
@@ -136,6 +136,7 @@ def _trio_main(
             run_as_asyncio_guest(trio_main)
         else:
             trio.run(trio_main)
+
     except KeyboardInterrupt:
         log.cancel(
             'Actor received KBI\n'
diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py
index b1a8ee63..7deda9d2 100644
--- a/tractor/_exceptions.py
+++ b/tractor/_exceptions.py
@@ -31,9 +31,16 @@ import textwrap
 import traceback
 
 import trio
+from msgspec import structs
 
 from tractor._state import current_actor
 from tractor.log import get_logger
+from tractor.msg import (
+    Error,
+    Msg,
+    Stop,
+    Yield,
+)
 
 if TYPE_CHECKING:
     from ._context import Context
@@ -135,6 +142,8 @@ class RemoteActorError(Exception):
         # and instead render if from `.boxed_type_str`?
         self._boxed_type: BaseException = boxed_type
         self._src_type: BaseException|None = None
+
+        # TODO: make this a `.errmsg: Error` throughout?
         self.msgdata: dict[str, Any] = msgdata
 
         # TODO: mask out eventually or place in `pack_error()`
@@ -464,7 +473,23 @@ class AsyncioCancelled(Exception):
     '''
 
 class MessagingError(Exception):
-    'Some kind of unexpected SC messaging dialog issue'
+    '''
+    IPC related msg (typing), transaction (ordering) or dialog
+    handling error.
+
+    '''
+
+
+class MsgTypeError(MessagingError):
+    '''
+    Equivalent of a `TypeError` for an IPC wire-message
+    due to an invalid field value (type).
+
+    Normally this is re-raised from some `.msg._codec`
+    decode error raised by a backend interchange lib
+    like `msgspec` or `pycapnproto`.
+
+    '''
 
 
 def pack_error(
@@ -473,7 +498,7 @@ def pack_error(
     tb: str|None = None,
     cid: str|None = None,
 
-) -> dict[str, dict]:
+) -> Error|dict[str, dict]:
     '''
     Create an "error message" which boxes a locally caught
     exception's meta-data and encodes it for wire transport via an
@@ -536,17 +561,23 @@ def pack_error(
     # content's `.msgdata`).
     error_msg['tb_str'] = tb_str
 
-    pkt: dict = {
-        'error': error_msg,
-    }
-    if cid:
-        pkt['cid'] = cid
+    # Error()
+    # pkt: dict = {
+    #     'error': error_msg,
+    # }
+    pkt: Error = Error(
+        cid=cid,
+        **error_msg,
+        # TODO: just get rid of `.pld` on this msg?
+    )
+    # if cid:
+    #     pkt['cid'] = cid
 
     return pkt
 
 
 def unpack_error(
-    msg: dict[str, Any],
+    msg: dict[str, Any]|Error,
 
     chan: Channel|None = None,
     box_type: RemoteActorError = RemoteActorError,
@@ -564,15 +595,17 @@ def unpack_error(
     '''
     __tracebackhide__: bool = hide_tb
 
-    error_dict: dict[str, dict] | None
-    if (
-        error_dict := msg.get('error')
-    ) is None:
+    error_dict: dict[str, dict]|None
+    if not isinstance(msg, Error):
+    # if (
+    #     error_dict := msg.get('error')
+    # ) is None:
         # no error field, nothing to unpack.
         return None
 
     # retrieve the remote error's msg encoded details
-    tb_str: str = error_dict.get('tb_str', '')
+    # tb_str: str = error_dict.get('tb_str', '')
+    tb_str: str = msg.tb_str
     message: str = (
         f'{chan.uid}\n'
         +
@@ -581,7 +614,8 @@ def unpack_error(
 
     # try to lookup a suitable error type from the local runtime
     # env then use it to construct a local instance.
-    boxed_type_str: str = error_dict['boxed_type_str']
+    # boxed_type_str: str = error_dict['boxed_type_str']
+    boxed_type_str: str = msg.boxed_type_str
     boxed_type: Type[BaseException] = get_err_type(boxed_type_str)
 
     if boxed_type_str == 'ContextCancelled':
@@ -595,7 +629,11 @@ def unpack_error(
     # original source error.
     elif boxed_type_str == 'RemoteActorError':
         assert boxed_type is RemoteActorError
-        assert len(error_dict['relay_path']) >= 1
+        # assert len(error_dict['relay_path']) >= 1
+        assert len(msg.relay_path) >= 1
+
+    # TODO: mk RAE just take the `Error` instance directly?
+    error_dict: dict = structs.asdict(msg)
 
     exc = box_type(
         message,
@@ -623,11 +661,12 @@ def is_multi_cancelled(exc: BaseException) -> bool:
 
 def _raise_from_no_key_in_msg(
     ctx: Context,
-    msg: dict,
+    msg: Msg,
     src_err: KeyError,
     log: StackLevelAdapter,  # caller specific `log` obj
 
     expect_key: str = 'yield',
+    expect_msg: str = Yield,
     stream: MsgStream | None = None,
 
     # allow "deeper" tbs when debugging B^o
@@ -660,8 +699,10 @@ def _raise_from_no_key_in_msg(
 
     # an internal error should never get here
     try:
-        cid: str = msg['cid']
-    except KeyError as src_err:
+        cid: str = msg.cid
+        # cid: str = msg['cid']
+    # except KeyError as src_err:
+    except AttributeError as src_err:
         raise MessagingError(
             f'IPC `Context` rx-ed msg without a ctx-id (cid)!?\n'
             f'cid: {cid}\n\n'
@@ -672,7 +713,10 @@ def _raise_from_no_key_in_msg(
     # TODO: test that shows stream raising an expected error!!!
 
     # raise the error message in a boxed exception type!
-    if msg.get('error'):
+    # if msg.get('error'):
+    if isinstance(msg, Error):
+    # match msg:
+    #     case Error():
         raise unpack_error(
             msg,
             ctx.chan,
@@ -683,8 +727,10 @@ def _raise_from_no_key_in_msg(
     # `MsgStream` termination msg.
     # TODO: does it make more sense to pack 
     # the stream._eoc outside this in the calleer always?
+        # case Stop():
     elif (
-        msg.get('stop')
+        # msg.get('stop')
+        isinstance(msg, Stop)
         or (
             stream
             and stream._eoc
@@ -725,14 +771,16 @@ def _raise_from_no_key_in_msg(
         stream
         and stream._closed
     ):
-        raise trio.ClosedResourceError('This stream was closed')
-
+        # TODO: our own error subtype?
+        raise trio.ClosedResourceError(
+            'This stream was closed'
+        )
 
     # always re-raise the source error if no translation error case
     # is activated above.
     _type: str = 'Stream' if stream else 'Context'
     raise MessagingError(
-        f"{_type} was expecting a '{expect_key}' message"
+        f"{_type} was expecting a '{expect_key.upper()}' message"
         " BUT received a non-error msg:\n"
         f'{pformat(msg)}'
     ) from src_err
diff --git a/tractor/_ipc.py b/tractor/_ipc.py
index 5f71c38c..6168c77c 100644
--- a/tractor/_ipc.py
+++ b/tractor/_ipc.py
@@ -38,17 +38,23 @@ from typing import (
     Protocol,
     Type,
     TypeVar,
+    Union,
 )
 
+import msgspec
 from tricycle import BufferedReceiveStream
 import trio
 
 from tractor.log import get_logger
-from tractor._exceptions import TransportClosed
+from tractor._exceptions import (
+    TransportClosed,
+    MsgTypeError,
+)
 from tractor.msg import (
     _ctxvar_MsgCodec,
+    _codec,
     MsgCodec,
-    mk_codec,
+    types,
 )
 
 log = get_logger(__name__)
@@ -163,7 +169,16 @@ class MsgpackTCPStream(MsgTransport):
 
         # allow for custom IPC msg interchange format
         # dynamic override Bo
-        self.codec: MsgCodec = codec or mk_codec()
+        self._task = trio.lowlevel.current_task()
+        self._codec: MsgCodec = (
+            codec
+            or
+            _codec._ctxvar_MsgCodec.get()
+        )
+        log.critical(
+            '!?!: USING STD `tractor` CODEC !?!?\n'
+            f'{self._codec}\n'
+        )
 
     async def _iter_packets(self) -> AsyncGenerator[dict, None]:
         '''
@@ -171,7 +186,6 @@ class MsgpackTCPStream(MsgTransport):
         stream using the current task's `MsgCodec`.
 
         '''
-        import msgspec  # noqa
         decodes_failed: int = 0
 
         while True:
@@ -206,7 +220,19 @@ class MsgpackTCPStream(MsgTransport):
             try:
                 # NOTE: lookup the `trio.Task.context`'s var for
                 # the current `MsgCodec`.
-                yield  _ctxvar_MsgCodec.get().decode(msg_bytes)
+                codec: MsgCodec = _ctxvar_MsgCodec.get()
+                if self._codec.pld_spec != codec.pld_spec:
+                    # assert (
+                    #     task := trio.lowlevel.current_task()
+                    # ) is not self._task
+                    # self._task = task
+                    self._codec = codec
+                    log.critical(
+                        '.recv() USING NEW CODEC !?!?\n'
+                        f'{self._codec}\n\n'
+                        f'msg_bytes -> {msg_bytes}\n'
+                    )
+                yield codec.decode(msg_bytes)
 
                 # TODO: remove, was only for orig draft impl
                 # testing.
@@ -221,6 +247,41 @@ class MsgpackTCPStream(MsgTransport):
                 #
                 # yield obj
 
+            # XXX NOTE: since the below error derives from
+            # `DecodeError` we need to catch is specially
+            # and always raise such that spec violations
+            # are never allowed to be caught silently!
+            except msgspec.ValidationError as verr:
+
+                # decode the msg-bytes using the std msgpack
+                # interchange-prot (i.e. without any
+                # `msgspec.Struct` handling) so that we can
+                # determine what `.msg.types.Msg` is the culprit
+                # by reporting the received value.
+                msg_dict: dict = msgspec.msgpack.decode(msg_bytes)
+                msg_type_name: str = msg_dict['msg_type']
+                msg_type = getattr(types, msg_type_name)
+                errmsg: str = (
+                    f'Received invalid IPC `{msg_type_name}` msg\n\n'
+                )
+
+                # XXX see if we can determine the exact invalid field
+                # such that we can comprehensively report the
+                # specific field's type problem
+                msgspec_msg: str = verr.args[0].rstrip('`')
+                msg, _, maybe_field = msgspec_msg.rpartition('$.')
+                if field_val := msg_dict.get(maybe_field):
+                    field_type: Union[Type] = msg_type.__signature__.parameters[
+                        maybe_field
+                    ].annotation
+                    errmsg += (
+                        f'{msg.rstrip("`")}\n\n'
+                        f'{msg_type}\n'
+                        f' |_.{maybe_field}: {field_type} = {field_val}\n'
+                    )
+
+                raise MsgTypeError(errmsg) from verr
+
             except (
                 msgspec.DecodeError,
                 UnicodeDecodeError,
@@ -230,14 +291,15 @@ class MsgpackTCPStream(MsgTransport):
                     # do with a channel drop - hope that receiving from the
                     # channel will raise an expected error and bubble up.
                     try:
-                        msg_str: str | bytes = msg_bytes.decode()
+                        msg_str: str|bytes = msg_bytes.decode()
                     except UnicodeDecodeError:
                         msg_str = msg_bytes
 
-                    log.error(
-                        '`msgspec` failed to decode!?\n'
-                        'dumping bytes:\n'
-                        f'{msg_str!r}'
+                    log.exception(
+                        'Failed to decode msg?\n'
+                        f'{codec}\n\n'
+                        'Rxed bytes from wire:\n\n'
+                        f'{msg_str!r}\n'
                     )
                     decodes_failed += 1
                 else:
@@ -258,8 +320,21 @@ class MsgpackTCPStream(MsgTransport):
 
             # NOTE: lookup the `trio.Task.context`'s var for
             # the current `MsgCodec`.
-            bytes_data: bytes = _ctxvar_MsgCodec.get().encode(msg)
-            # bytes_data: bytes = self.codec.encode(msg)
+            codec: MsgCodec = _ctxvar_MsgCodec.get()
+            # if self._codec != codec:
+            if self._codec.pld_spec != codec.pld_spec:
+                self._codec = codec
+                log.critical(
+                    '.send() using NEW CODEC !?!?\n'
+                    f'{self._codec}\n\n'
+                    f'OBJ -> {msg}\n'
+                )
+            if type(msg) not in types.__spec__:
+                log.warning(
+                    'Sending non-`Msg`-spec msg?\n\n'
+                    f'{msg}\n'
+                )
+            bytes_data: bytes = codec.encode(msg)
 
             # supposedly the fastest says,
             # https://stackoverflow.com/a/54027962
diff --git a/tractor/_portal.py b/tractor/_portal.py
index ac602dd5..cc9052ba 100644
--- a/tractor/_portal.py
+++ b/tractor/_portal.py
@@ -45,7 +45,10 @@ from ._state import (
 )
 from ._ipc import Channel
 from .log import get_logger
-from .msg import NamespacePath
+from .msg import (
+    NamespacePath,
+    Return,
+)
 from ._exceptions import (
     unpack_error,
     NoResult,
@@ -66,7 +69,8 @@ log = get_logger(__name__)
 # `._raise_from_no_key_in_msg()` (after tweak to
 # accept a `chan: Channel` arg) in key block!
 def _unwrap_msg(
-    msg: dict[str, Any],
+    # msg: dict[str, Any],
+    msg: Return,
     channel: Channel,
 
     hide_tb: bool = True,
@@ -79,18 +83,21 @@ def _unwrap_msg(
     __tracebackhide__: bool = hide_tb
 
     try:
-        return msg['return']
-    except KeyError as ke:
+        return msg.pld
+        # return msg['return']
+    # except KeyError as ke:
+    except AttributeError as err:
 
         # internal error should never get here
-        assert msg.get('cid'), (
+        # assert msg.get('cid'), (
+        assert msg.cid, (
             "Received internal error at portal?"
         )
 
         raise unpack_error(
             msg,
             channel
-        ) from ke
+        ) from err
 
 
 class Portal:
diff --git a/tractor/_rpc.py b/tractor/_rpc.py
index ef6cbe00..9b179524 100644
--- a/tractor/_rpc.py
+++ b/tractor/_rpc.py
@@ -57,6 +57,15 @@ from ._exceptions import (
 from .devx import _debug
 from . import _state
 from .log import get_logger
+from tractor.msg.types import (
+    Start,
+    StartAck,
+    Started,
+    Stop,
+    Yield,
+    Return,
+    Error,
+)
 
 if TYPE_CHECKING:
     from ._runtime import Actor
@@ -84,10 +93,13 @@ async def _invoke_non_context(
 
     # TODO: can we unify this with the `context=True` impl below?
     if inspect.isasyncgen(coro):
-        await chan.send({
-            'cid': cid,
-            'functype': 'asyncgen',
-        })
+        # await chan.send({
+        await chan.send(
+            StartAck(
+                cid=cid,
+                functype='asyncgen',
+            )
+        )
         # XXX: massive gotcha! If the containing scope
         # is cancelled and we execute the below line,
         # any ``ActorNursery.__aexit__()`` WON'T be
@@ -107,27 +119,45 @@ async def _invoke_non_context(
                     # to_send = await chan.recv_nowait()
                     # if to_send is not None:
                     #     to_yield = await coro.asend(to_send)
-                    await chan.send({
-                        'yield': item,
-                        'cid': cid,
-                    })
+                    # await chan.send({
+                    #     # Yield()
+                    #     'cid': cid,
+                    #     'yield': item,
+                    # })
+                    await chan.send(
+                        Yield(
+                            cid=cid,
+                            pld=item,
+                        )
+                    )
 
         log.runtime(f"Finished iterating {coro}")
         # TODO: we should really support a proper
         # `StopAsyncIteration` system here for returning a final
         # value if desired
-        await chan.send({
-            'stop': True,
-            'cid': cid,
-        })
+        await chan.send(
+            Stop(cid=cid)
+        )
+        # await chan.send({
+        #     # Stop(
+        #     'cid': cid,
+        #     'stop': True,
+        # })
 
     # one way @stream func that gets treated like an async gen
     # TODO: can we unify this with the `context=True` impl below?
     elif treat_as_gen:
-        await chan.send({
-            'cid': cid,
-            'functype': 'asyncgen',
-        })
+        await chan.send(
+            StartAck(
+                cid=cid,
+                functype='asyncgen',
+            )
+        )
+        # await chan.send({
+        #     # StartAck()
+        #     'cid': cid,
+        #     'functype': 'asyncgen',
+        # })
         # XXX: the async-func may spawn further tasks which push
         # back values like an async-generator would but must
         # manualy construct the response dict-packet-responses as
@@ -140,10 +170,14 @@ async def _invoke_non_context(
         if not cs.cancelled_caught:
             # task was not cancelled so we can instruct the
             # far end async gen to tear down
-            await chan.send({
-                'stop': True,
-                'cid': cid
-            })
+            await chan.send(
+                Stop(cid=cid)
+            )
+            # await chan.send({
+            #     # Stop(
+            #     'cid': cid,
+            #     'stop': True,
+            # })
     else:
         # regular async function/method
         # XXX: possibly just a scheduled `Actor._cancel_task()`
@@ -155,10 +189,17 @@ async def _invoke_non_context(
         # way: using the linked IPC context machinery.
         failed_resp: bool = False
         try:
-            await chan.send({
-                'functype': 'asyncfunc',
-                'cid': cid
-            })
+            await chan.send(
+                StartAck(
+                    cid=cid,
+                    functype='asyncfunc',
+                )
+            )
+            # await chan.send({
+            #     # StartAck()
+            #     'cid': cid,
+            #     'functype': 'asyncfunc',
+            # })
         except (
             trio.ClosedResourceError,
             trio.BrokenResourceError,
@@ -192,10 +233,17 @@ async def _invoke_non_context(
                 and chan.connected()
             ):
                 try:
-                    await chan.send({
-                        'return': result,
-                        'cid': cid,
-                    })
+                    # await chan.send({
+                    #     # Return()
+                    #     'cid': cid,
+                    #     'return': result,
+                    # })
+                    await chan.send(
+                        Return(
+                            cid=cid,
+                            pld=result,
+                        )
+                    )
                 except (
                     BrokenPipeError,
                     trio.BrokenResourceError,
@@ -376,6 +424,8 @@ async def _invoke(
         # XXX for .pause_from_sync()` usage we need to make sure
         # `greenback` is boostrapped in the subactor!
         await _debug.maybe_init_greenback()
+    # else:
+    #     await pause()
 
     # TODO: possibly a specially formatted traceback
     # (not sure what typing is for this..)?
@@ -488,10 +538,18 @@ async def _invoke(
         # a "context" endpoint type is the most general and
         # "least sugary" type of RPC ep with support for
         # bi-dir streaming B)
-        await chan.send({
-            'cid': cid,
-            'functype': 'context',
-        })
+        # StartAck
+        await chan.send(
+            StartAck(
+                cid=cid,
+                functype='context',
+            )
+        )
+        # await chan.send({
+        #     # StartAck()
+        #     'cid': cid,
+        #     'functype': 'context',
+        # })
 
         # TODO: should we also use an `.open_context()` equiv
         # for this callee side by factoring the impl from
@@ -515,10 +573,17 @@ async def _invoke(
                 ctx._result = res
 
                 # deliver final result to caller side.
-                await chan.send({
-                    'return': res,
-                    'cid': cid
-                })
+                await chan.send(
+                    Return(
+                        cid=cid,
+                        pld=res,
+                    )
+                )
+                # await chan.send({
+                #     # Return()
+                #     'cid': cid,
+                #     'return': res,
+                # })
 
             # NOTE: this happens IFF `ctx._scope.cancel()` is
             # called by any of,
@@ -691,7 +756,8 @@ async def try_ship_error_to_remote(
         try:
             # NOTE: normally only used for internal runtime errors
             # so ship to peer actor without a cid.
-            msg: dict = pack_error(
+            # msg: dict = pack_error(
+            msg: Error = pack_error(
                 err,
                 cid=cid,
 
@@ -707,12 +773,13 @@ async def try_ship_error_to_remote(
             trio.BrokenResourceError,
             BrokenPipeError,
         ):
-            err_msg: dict = msg['error']['tb_str']
+            # err_msg: dict = msg['error']['tb_str']
             log.critical(
                 'IPC transport failure -> '
                 f'failed to ship error to {remote_descr}!\n\n'
                 f'X=> {channel.uid}\n\n'
-                f'{err_msg}\n'
+                # f'{err_msg}\n'
+                f'{msg}\n'
             )
 
 
@@ -772,31 +839,6 @@ async def process_messages(
         with CancelScope(shield=shield) as loop_cs:
             task_status.started(loop_cs)
             async for msg in chan:
-
-                # dedicated loop terminate sentinel
-                if msg is None:
-
-                    tasks: dict[
-                        tuple[Channel, str],
-                        tuple[Context, Callable, trio.Event]
-                    ] = actor._rpc_tasks.copy()
-                    log.cancel(
-                        f'Peer IPC channel terminated via `None` setinel msg?\n'
-                        f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
-                        f'peer: {chan.uid}\n'
-                        f'|_{chan}\n'
-                    )
-                    for (channel, cid) in tasks:
-                        if channel is chan:
-                            await actor._cancel_task(
-                                cid,
-                                channel,
-                                requesting_uid=channel.uid,
-
-                                ipc_msg=msg,
-                            )
-                    break
-
                 log.transport(   # type: ignore
                     f'<= IPC msg from peer: {chan.uid}\n\n'
 
@@ -806,216 +848,294 @@ async def process_messages(
                     f'{pformat(msg)}\n'
                 )
 
-                cid = msg.get('cid')
-                if cid:
-                    # deliver response to local caller/waiter
-                    # via its per-remote-context memory channel.
-                    await actor._push_result(
-                        chan,
-                        cid,
-                        msg,
-                    )
+                match msg:
 
-                    log.runtime(
-                        'Waiting on next IPC msg from\n'
-                        f'peer: {chan.uid}:\n'
-                        f'|_{chan}\n'
+                # if msg is None:
+                # dedicated loop terminate sentinel
+                    case None:
 
-                        # f'last msg: {msg}\n'
-                    )
-                    continue
-
-                # process a 'cmd' request-msg upack
-                # TODO: impl with native `msgspec.Struct` support !!
-                # -[ ] implement with ``match:`` syntax?
-                # -[ ] discard un-authed msgs as per,
-                # <TODO put issue for typed msging structs>
-                try:
-                    (
-                        ns,
-                        funcname,
-                        kwargs,
-                        actorid,
-                        cid,
-                    ) = msg['cmd']
-
-                except KeyError:
-                    # This is the non-rpc error case, that is, an
-                    # error **not** raised inside a call to ``_invoke()``
-                    # (i.e. no cid was provided in the msg - see above).
-                    # Push this error to all local channel consumers
-                    # (normally portals) by marking the channel as errored
-                    assert chan.uid
-                    exc = unpack_error(msg, chan=chan)
-                    chan._exc = exc
-                    raise exc
-
-                log.runtime(
-                    'Handling RPC cmd from\n'
-                    f'peer: {actorid}\n'
-                    '\n'
-                    f'=> {ns}.{funcname}({kwargs})\n'
-                )
-                if ns == 'self':
-                    if funcname == 'cancel':
-                        func: Callable = actor.cancel
-                        kwargs |= {
-                            'req_chan': chan,
-                        }
-
-                        # don't start entire actor runtime cancellation
-                        # if this actor is currently in debug mode!
-                        pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
-                        if pdb_complete:
-                            await pdb_complete.wait()
-
-                        # Either of  `Actor.cancel()`/`.cancel_soon()`
-                        # was called, so terminate this IPC msg
-                        # loop, exit back out into `async_main()`,
-                        # and immediately start the core runtime
-                        # machinery shutdown!
-                        with CancelScope(shield=True):
-                            await _invoke(
-                                actor,
-                                cid,
-                                chan,
-                                func,
-                                kwargs,
-                                is_rpc=False,
-                            )
-
-                        log.runtime(
-                            'Cancelling IPC transport msg-loop with peer:\n'
+                        tasks: dict[
+                            tuple[Channel, str],
+                            tuple[Context, Callable, trio.Event]
+                        ] = actor._rpc_tasks.copy()
+                        log.cancel(
+                            f'Peer IPC channel terminated via `None` setinel msg?\n'
+                            f'=> Cancelling all {len(tasks)} local RPC tasks..\n'
+                            f'peer: {chan.uid}\n'
                             f'|_{chan}\n'
                         )
-                        loop_cs.cancel()
+                        for (channel, cid) in tasks:
+                            if channel is chan:
+                                await actor._cancel_task(
+                                    cid,
+                                    channel,
+                                    requesting_uid=channel.uid,
+
+                                    ipc_msg=msg,
+                                )
                         break
 
-                    if funcname == '_cancel_task':
-                        func: Callable = actor._cancel_task
-
-                        # we immediately start the runtime machinery
-                        # shutdown
-                        # with CancelScope(shield=True):
-                        target_cid: str = kwargs['cid']
-                        kwargs |= {
-                            # NOTE: ONLY the rpc-task-owning
-                            # parent IPC channel should be able to
-                            # cancel it!
-                            'parent_chan': chan,
-                            'requesting_uid': chan.uid,
-                            'ipc_msg': msg,
-                        }
-                        # TODO: remove? already have emit in meth.
-                        # log.runtime(
-                        #     f'Rx RPC task cancel request\n'
-                        #     f'<= canceller: {chan.uid}\n'
-                        #     f'  |_{chan}\n\n'
-                        #     f'=> {actor}\n'
-                        #     f'  |_cid: {target_cid}\n'
-                        # )
-                        try:
-                            await _invoke(
-                                actor,
-                                cid,
-                                chan,
-                                func,
-                                kwargs,
-                                is_rpc=False,
-                            )
-                        except BaseException:
-                            log.exception(
-                                'Failed to cancel task?\n'
-                                f'<= canceller: {chan.uid}\n'
-                                f'  |_{chan}\n\n'
-                                f'=> {actor}\n'
-                                f'  |_cid: {target_cid}\n'
-                            )
-                        continue
-                    else:
-                        # normally registry methods, eg.
-                        # ``.register_actor()`` etc.
-                        func: Callable = getattr(actor, funcname)
-
-                else:
-                    # complain to client about restricted modules
-                    try:
-                        func = actor._get_rpc_func(ns, funcname)
-                    except (
-                        ModuleNotExposed,
-                        AttributeError,
-                    ) as err:
-                        err_msg: dict[str, dict] = pack_error(
-                            err,
-                            cid=cid,
-                        )
-                        await chan.send(err_msg)
-                        continue
-
-                # schedule a task for the requested RPC function
-                # in the actor's main "service nursery".
-                # TODO: possibly a service-tn per IPC channel for
-                # supervision isolation? would avoid having to
-                # manage RPC tasks individually in `._rpc_tasks`
-                # table?
-                log.runtime(
-                    f'Spawning task for RPC request\n'
-                    f'<= caller: {chan.uid}\n'
-                    f'  |_{chan}\n\n'
-                    # TODO: maddr style repr?
-                    # f'  |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
-                    # f'cid="{cid[-16:]} .."\n\n'
-
-                    f'=> {actor}\n'
-                    f'  |_cid: {cid}\n'
-                    f'   |>> {func}()\n'
-                )
-                assert actor._service_n  # wait why? do it at top?
-                try:
-                    ctx: Context = await actor._service_n.start(
-                        partial(
-                            _invoke,
-                            actor,
-                            cid,
+                # cid = msg.get('cid')
+                # if cid:
+                    case (
+                        StartAck(cid=cid)
+                        | Started(cid=cid)
+                        | Yield(cid=cid)
+                        | Stop(cid=cid)
+                        | Return(cid=cid)
+                        | Error(cid=cid)
+                    ):
+                        # deliver response to local caller/waiter
+                        # via its per-remote-context memory channel.
+                        await actor._push_result(
                             chan,
-                            func,
-                            kwargs,
-                        ),
-                        name=funcname,
-                    )
+                            cid,
+                            msg,
+                        )
 
-                except (
-                    RuntimeError,
-                    BaseExceptionGroup,
-                ):
-                    # avoid reporting a benign race condition
-                    # during actor runtime teardown.
-                    nursery_cancelled_before_task: bool = True
-                    break
+                        log.runtime(
+                            'Waiting on next IPC msg from\n'
+                            f'peer: {chan.uid}:\n'
+                            f'|_{chan}\n'
 
-                # in the lone case where a ``Context`` is not
-                # delivered, it's likely going to be a locally
-                # scoped exception from ``_invoke()`` itself.
-                if isinstance(err := ctx, Exception):
-                    log.warning(
-                        'Task for RPC failed?'
-                        f'|_ {func}()\n\n'
+                            # f'last msg: {msg}\n'
+                        )
+                        continue
 
-                        f'{err}'
-                    )
-                    continue
+                    # process a 'cmd' request-msg upack
+                    # TODO: impl with native `msgspec.Struct` support !!
+                    # -[ ] implement with ``match:`` syntax?
+                    # -[ ] discard un-authed msgs as per,
+                    # <TODO put issue for typed msging structs>
+                    case Start(
+                        cid=cid,
+                        ns=ns,
+                        func=funcname,
+                        kwargs=kwargs,
+                        uid=actorid,
+                    ):
+                        # try:
+                        #     (
+                        #         ns,
+                        #         funcname,
+                        #         kwargs,
+                        #         actorid,
+                        #         cid,
+                        #     ) = msg['cmd']
 
-                else:
-                    # mark that we have ongoing rpc tasks
-                    actor._ongoing_rpc_tasks = trio.Event()
+                        # # TODO: put in `case Error():` right?
+                        # except KeyError:
+                        #     # This is the non-rpc error case, that is, an
+                        #     # error **not** raised inside a call to ``_invoke()``
+                        #     # (i.e. no cid was provided in the msg - see above).
+                        #     # Push this error to all local channel consumers
+                        #     # (normally portals) by marking the channel as errored
+                        #     assert chan.uid
+                        #     exc = unpack_error(msg, chan=chan)
+                        #     chan._exc = exc
+                        #     raise exc
 
-                    # store cancel scope such that the rpc task can be
-                    # cancelled gracefully if requested
-                    actor._rpc_tasks[(chan, cid)] = (
-                        ctx,
-                        func,
-                        trio.Event(),
-                    )
+                        log.runtime(
+                            'Handling RPC `Start` request from\n'
+                            f'peer: {actorid}\n'
+                            '\n'
+                            f'=> {ns}.{funcname}({kwargs})\n'
+                        )
+                        # case Start(
+                        #     ns='self',
+                        #     funcname='cancel',
+                        # ):
+                        if ns == 'self':
+                            if funcname == 'cancel':
+                                func: Callable = actor.cancel
+                                kwargs |= {
+                                    'req_chan': chan,
+                                }
+
+                                # don't start entire actor runtime cancellation
+                                # if this actor is currently in debug mode!
+                                pdb_complete: trio.Event|None = _debug.Lock.local_pdb_complete
+                                if pdb_complete:
+                                    await pdb_complete.wait()
+
+                                # Either of  `Actor.cancel()`/`.cancel_soon()`
+                                # was called, so terminate this IPC msg
+                                # loop, exit back out into `async_main()`,
+                                # and immediately start the core runtime
+                                # machinery shutdown!
+                                with CancelScope(shield=True):
+                                    await _invoke(
+                                        actor,
+                                        cid,
+                                        chan,
+                                        func,
+                                        kwargs,
+                                        is_rpc=False,
+                                    )
+
+                                log.runtime(
+                                    'Cancelling IPC transport msg-loop with peer:\n'
+                                    f'|_{chan}\n'
+                                )
+                                loop_cs.cancel()
+                                break
+
+                        # case Start(
+                        #     ns='self',
+                        #     funcname='_cancel_task',
+                        # ):
+                            if funcname == '_cancel_task':
+                                func: Callable = actor._cancel_task
+
+                                # we immediately start the runtime machinery
+                                # shutdown
+                                # with CancelScope(shield=True):
+                                target_cid: str = kwargs['cid']
+                                kwargs |= {
+                                    # NOTE: ONLY the rpc-task-owning
+                                    # parent IPC channel should be able to
+                                    # cancel it!
+                                    'parent_chan': chan,
+                                    'requesting_uid': chan.uid,
+                                    'ipc_msg': msg,
+                                }
+                                # TODO: remove? already have emit in meth.
+                                # log.runtime(
+                                #     f'Rx RPC task cancel request\n'
+                                #     f'<= canceller: {chan.uid}\n'
+                                #     f'  |_{chan}\n\n'
+                                #     f'=> {actor}\n'
+                                #     f'  |_cid: {target_cid}\n'
+                                # )
+                                try:
+                                    await _invoke(
+                                        actor,
+                                        cid,
+                                        chan,
+                                        func,
+                                        kwargs,
+                                        is_rpc=False,
+                                    )
+                                except BaseException:
+                                    log.exception(
+                                        'Failed to cancel task?\n'
+                                        f'<= canceller: {chan.uid}\n'
+                                        f'  |_{chan}\n\n'
+                                        f'=> {actor}\n'
+                                        f'  |_cid: {target_cid}\n'
+                                    )
+                                continue
+
+                            # case Start(
+                            #     ns='self',
+                            #     funcname='register_actor',
+                            # ):
+                            else:
+                                # normally registry methods, eg.
+                                # ``.register_actor()`` etc.
+                                func: Callable = getattr(actor, funcname)
+
+                        # case Start(
+                        #     ns=str(),
+                        #     funcname=funcname,
+                        # ):
+                        else:
+                            # complain to client about restricted modules
+                            try:
+                                func = actor._get_rpc_func(ns, funcname)
+                            except (
+                                ModuleNotExposed,
+                                AttributeError,
+                            ) as err:
+                                err_msg: dict[str, dict] = pack_error(
+                                    err,
+                                    cid=cid,
+                                )
+                                await chan.send(err_msg)
+                                continue
+
+                        # schedule a task for the requested RPC function
+                        # in the actor's main "service nursery".
+                        # TODO: possibly a service-tn per IPC channel for
+                        # supervision isolation? would avoid having to
+                        # manage RPC tasks individually in `._rpc_tasks`
+                        # table?
+                        log.runtime(
+                            f'Spawning task for RPC request\n'
+                            f'<= caller: {chan.uid}\n'
+                            f'  |_{chan}\n\n'
+                            # TODO: maddr style repr?
+                            # f'  |_@ /ipv4/{chan.raddr}/tcp/{chan.rport}/'
+                            # f'cid="{cid[-16:]} .."\n\n'
+
+                            f'=> {actor}\n'
+                            f'  |_cid: {cid}\n'
+                            f'   |>> {func}()\n'
+                        )
+                        assert actor._service_n  # wait why? do it at top?
+                        try:
+                            ctx: Context = await actor._service_n.start(
+                                partial(
+                                    _invoke,
+                                    actor,
+                                    cid,
+                                    chan,
+                                    func,
+                                    kwargs,
+                                ),
+                                name=funcname,
+                            )
+
+                        except (
+                            RuntimeError,
+                            BaseExceptionGroup,
+                        ):
+                            # avoid reporting a benign race condition
+                            # during actor runtime teardown.
+                            nursery_cancelled_before_task: bool = True
+                            break
+
+                        # in the lone case where a ``Context`` is not
+                        # delivered, it's likely going to be a locally
+                        # scoped exception from ``_invoke()`` itself.
+                        if isinstance(err := ctx, Exception):
+                            log.warning(
+                                'Task for RPC failed?'
+                                f'|_ {func}()\n\n'
+
+                                f'{err}'
+                            )
+                            continue
+
+                        else:
+                            # mark that we have ongoing rpc tasks
+                            actor._ongoing_rpc_tasks = trio.Event()
+
+                            # store cancel scope such that the rpc task can be
+                            # cancelled gracefully if requested
+                            actor._rpc_tasks[(chan, cid)] = (
+                                ctx,
+                                func,
+                                trio.Event(),
+                            )
+
+                    case Error()|_:
+                        # This is the non-rpc error case, that is, an
+                        # error **not** raised inside a call to ``_invoke()``
+                        # (i.e. no cid was provided in the msg - see above).
+                        # Push this error to all local channel consumers
+                        # (normally portals) by marking the channel as errored
+                        log.exception(
+                            f'Unhandled IPC msg:\n\n'
+                            f'{msg}\n'
+                        )
+                        assert chan.uid
+                        exc = unpack_error(
+                            msg,
+                            chan=chan,
+                        )
+                        chan._exc = exc
+                        raise exc
 
                 log.runtime(
                     'Waiting on next IPC msg from\n'
diff --git a/tractor/_runtime.py b/tractor/_runtime.py
index ed7b4503..eee78973 100644
--- a/tractor/_runtime.py
+++ b/tractor/_runtime.py
@@ -87,6 +87,23 @@ from ._rpc import (
     process_messages,
     try_ship_error_to_remote,
 )
+from tractor.msg import (
+    types as msgtypes,
+    pretty_struct,
+)
+# from tractor.msg.types import (
+#     Aid,
+#     SpawnSpec,
+#     Start,
+#     StartAck,
+#     Started,
+#     Yield,
+#     Stop,
+#     Return,
+#     Error,
+# )
+
+
 
 
 if TYPE_CHECKING:
@@ -143,6 +160,7 @@ class Actor:
     # Information about `__main__` from parent
     _parent_main_data: dict[str, str]
     _parent_chan_cs: CancelScope|None = None
+    _spawn_spec: SpawnSpec|None = None
 
     # syncs for setup/teardown sequences
     _server_down: trio.Event|None = None
@@ -539,7 +557,8 @@ class Actor:
 
                             f'{pformat(msg)}\n'
                         )
-                        cid = msg.get('cid')
+                        # cid: str|None = msg.get('cid')
+                        cid: str|None = msg.cid
                         if cid:
                             # deliver response to local caller/waiter
                             await self._push_result(
@@ -891,29 +910,44 @@ class Actor:
             f'=> {ns}.{func}({kwargs})\n'
         )
         await chan.send(
-            {'cmd': (
-                ns,
-                func,
-                kwargs,
-                self.uid,
-                cid,
-            )}
+            msgtypes.Start(
+                ns=ns,
+                func=func,
+                kwargs=kwargs,
+                uid=self.uid,
+                cid=cid,
+            )
         )
+            # {'cmd': (
+            #     ns,
+            #     func,
+            #     kwargs,
+            #     self.uid,
+            #     cid,
+            # )}
+        # )
 
         # Wait on first response msg and validate; this should be
         # immediate.
-        first_msg: dict = await ctx._recv_chan.receive()
-        functype: str = first_msg.get('functype')
+        # first_msg: dict = await ctx._recv_chan.receive()
+        # functype: str = first_msg.get('functype')
 
-        if 'error' in first_msg:
+        first_msg: msgtypes.StartAck = await ctx._recv_chan.receive()
+        try:
+            functype: str = first_msg.functype
+        except AttributeError:
             raise unpack_error(first_msg, chan)
+            # if 'error' in first_msg:
+            #     raise unpack_error(first_msg, chan)
 
-        elif functype not in (
+        if functype not in (
             'asyncfunc',
             'asyncgen',
             'context',
         ):
-            raise ValueError(f"{first_msg} is an invalid response packet?")
+            raise ValueError(
+                f'{first_msg} is an invalid response packet?'
+            )
 
         ctx._remote_func_type = functype
         return ctx
@@ -946,24 +980,36 @@ class Actor:
             await self._do_handshake(chan)
 
             accept_addrs: list[tuple[str, int]]|None = None
-            if self._spawn_method == "trio":
-                # Receive runtime state from our parent
-                parent_data: dict[str, Any]
-                parent_data = await chan.recv()
-                log.runtime(
-                    'Received state from parent:\n\n'
-                    # TODO: eventually all these msgs as
-                    # `msgspec.Struct` with a special mode that
-                    # pformats them in multi-line mode, BUT only
-                    # if "trace"/"util" mode is enabled?
-                    f'{pformat(parent_data)}\n'
-                )
-                accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
-                rvs = parent_data.pop('_runtime_vars')
 
+            if self._spawn_method == "trio":
+
+                # Receive runtime state from our parent
+                # parent_data: dict[str, Any]
+                # parent_data = await chan.recv()
+
+                # TODO: maybe we should just wrap this directly
+                # in a `Actor.spawn_info: SpawnInfo` struct?
+                spawnspec: msgtypes.SpawnSpec = await chan.recv()
+                self._spawn_spec = spawnspec
+
+                # TODO: eventually all these msgs as
+                # `msgspec.Struct` with a special mode that
+                # pformats them in multi-line mode, BUT only
+                # if "trace"/"util" mode is enabled?
+                log.runtime(
+                    'Received runtime spec from parent:\n\n'
+                    f'{pformat(spawnspec)}\n'
+                )
+                # accept_addrs: list[tuple[str, int]] = parent_data.pop('bind_addrs')
+                accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs
+
+                # rvs = parent_data.pop('_runtime_vars')
+                rvs = spawnspec._runtime_vars
                 if rvs['_debug_mode']:
                     try:
-                        log.info('Enabling `stackscope` traces on SIGUSR1')
+                        log.info(
+                            'Enabling `stackscope` traces on SIGUSR1'
+                        )
                         from .devx import enable_stack_on_sig
                         enable_stack_on_sig()
                     except ImportError:
@@ -971,28 +1017,40 @@ class Actor:
                             '`stackscope` not installed for use in debug mode!'
                         )
 
-                log.runtime(f"Runtime vars are: {rvs}")
+                log.runtime(f'Runtime vars are: {rvs}')
                 rvs['_is_root'] = False
                 _state._runtime_vars.update(rvs)
 
-                for attr, value in parent_data.items():
-                    if (
-                        attr == 'reg_addrs'
-                        and value
-                    ):
-                        # XXX: ``msgspec`` doesn't support serializing tuples
-                        # so just cash manually here since it's what our
-                        # internals expect.
-                        # TODO: we don't really NEED these as
-                        # tuples so we can probably drop this
-                        # casting since apparently in python lists
-                        # are "more efficient"?
-                        self.reg_addrs = [tuple(val) for val in value]
+                # XXX: ``msgspec`` doesn't support serializing tuples
+                # so just cash manually here since it's what our
+                # internals expect.
+                #
+                self.reg_addrs = [
+                    # TODO: we don't really NEED these as tuples?
+                    # so we can probably drop this casting since
+                    # apparently in python lists are "more
+                    # efficient"?
+                    tuple(val)
+                    for val in spawnspec.reg_addrs
+                ]
 
-                    else:
-                        setattr(self, attr, value)
+                # for attr, value in parent_data.items():
+                for _, attr, value in pretty_struct.iter_fields(
+                    spawnspec,
+                ):
+                    setattr(self, attr, value)
+                    # if (
+                    #     attr == 'reg_addrs'
+                    #     and value
+                    # ):
+                    #     self.reg_addrs = [tuple(val) for val in value]
+                    # else:
+                    #     setattr(self, attr, value)
 
-            return chan, accept_addrs
+            return (
+                chan,
+                accept_addrs,
+            )
 
         except OSError:  # failed to connect
             log.warning(
@@ -1434,7 +1492,7 @@ class Actor:
         self,
         chan: Channel
 
-    ) -> tuple[str, str]:
+    ) -> msgtypes.Aid:
         '''
         Exchange `(name, UUIDs)` identifiers as the first
         communication step with any (peer) remote `Actor`.
@@ -1443,14 +1501,27 @@ class Actor:
         "actor model" parlance.
 
         '''
-        await chan.send(self.uid)
-        value: tuple = await chan.recv()
-        uid: tuple[str, str] = (str(value[0]), str(value[1]))
+        name, uuid = self.uid
+        await chan.send(
+            msgtypes.Aid(
+                name=name,
+                uuid=uuid,
+            )
+        )
+        aid: msgtypes.Aid = await chan.recv()
+        chan.aid = aid
+
+        uid: tuple[str, str] = (
+            # str(value[0]),
+            # str(value[1])
+            aid.name,
+            aid.uuid,
+        )
 
         if not isinstance(uid, tuple):
             raise ValueError(f"{uid} is not a valid uid?!")
 
-        chan.uid = str(uid[0]), str(uid[1])
+        chan.uid = uid
         return uid
 
     def is_infected_aio(self) -> bool:
@@ -1510,7 +1581,8 @@ async def async_main(
             # because we're running in mp mode
             if (
                 set_accept_addr_says_rent
-                and set_accept_addr_says_rent is not None
+                and
+                set_accept_addr_says_rent is not None
             ):
                 accept_addrs = set_accept_addr_says_rent
 
diff --git a/tractor/_spawn.py b/tractor/_spawn.py
index 48135cc9..824f41f3 100644
--- a/tractor/_spawn.py
+++ b/tractor/_spawn.py
@@ -49,6 +49,9 @@ from tractor._portal import Portal
 from tractor._runtime import Actor
 from tractor._entry import _mp_main
 from tractor._exceptions import ActorFailure
+from tractor.msg.types import (
+    SpawnSpec,
+)
 
 
 if TYPE_CHECKING:
@@ -493,14 +496,25 @@ async def trio_proc(
             portal,
         )
 
-        # send additional init params
-        await chan.send({
-            '_parent_main_data': subactor._parent_main_data,
-            'enable_modules': subactor.enable_modules,
-            'reg_addrs': subactor.reg_addrs,
-            'bind_addrs': bind_addrs,
-            '_runtime_vars': _runtime_vars,
-        })
+        # send a "spawning specification" which configures the
+        # initial runtime state of the child.
+        await chan.send(
+            SpawnSpec(
+                _parent_main_data=subactor._parent_main_data,
+                enable_modules=subactor.enable_modules,
+                reg_addrs=subactor.reg_addrs,
+                bind_addrs=bind_addrs,
+                _runtime_vars=_runtime_vars,
+            )
+        )
+
+        # await chan.send({
+        #     '_parent_main_data': subactor._parent_main_data,
+        #     'enable_modules': subactor.enable_modules,
+        #     'reg_addrs': subactor.reg_addrs,
+        #     'bind_addrs': bind_addrs,
+        #     '_runtime_vars': _runtime_vars,
+        # })
 
         # track subactor in current nursery
         curr_actor: Actor = current_actor()
diff --git a/tractor/_streaming.py b/tractor/_streaming.py
index 90c33d31..941cfe8d 100644
--- a/tractor/_streaming.py
+++ b/tractor/_streaming.py
@@ -43,6 +43,11 @@ from .trionics import (
     broadcast_receiver,
     BroadcastReceiver,
 )
+from tractor.msg import (
+    Stop,
+    Yield,
+    Error,
+)
 
 if TYPE_CHECKING:
     from ._context import Context
@@ -94,21 +99,25 @@ class MsgStream(trio.abc.Channel):
         self,
         allow_msg_keys: list[str] = ['yield'],
     ):
-        msg: dict = self._rx_chan.receive_nowait()
+        # msg: dict = self._rx_chan.receive_nowait()
+        msg: Yield|Stop = self._rx_chan.receive_nowait()
         for (
             i,
             key,
         ) in enumerate(allow_msg_keys):
             try:
-                return msg[key]
-            except KeyError as kerr:
+                # return msg[key]
+                return msg.pld
+            # except KeyError as kerr:
+            except AttributeError as attrerr:
                 if i < (len(allow_msg_keys) - 1):
                     continue
 
                 _raise_from_no_key_in_msg(
                     ctx=self._ctx,
                     msg=msg,
-                    src_err=kerr,
+                    # src_err=kerr,
+                    src_err=attrerr,
                     log=log,
                     expect_key=key,
                     stream=self,
@@ -148,18 +157,22 @@ class MsgStream(trio.abc.Channel):
         src_err: Exception|None = None  # orig tb
         try:
             try:
-                msg = await self._rx_chan.receive()
-                return msg['yield']
+                msg: Yield = await self._rx_chan.receive()
+                # return msg['yield']
+                return msg.pld
 
-            except KeyError as kerr:
-                src_err = kerr
+            # except KeyError as kerr:
+            except AttributeError as attrerr:
+                # src_err = kerr
+                src_err = attrerr
 
                 # NOTE: may raise any of the below error types
                 # includg EoC when a 'stop' msg is found.
                 _raise_from_no_key_in_msg(
                     ctx=self._ctx,
                     msg=msg,
-                    src_err=kerr,
+                    # src_err=kerr,
+                    src_err=attrerr,
                     log=log,
                     expect_key='yield',
                     stream=self,
@@ -514,11 +527,18 @@ class MsgStream(trio.abc.Channel):
             raise self._closed
 
         try:
+            # await self._ctx.chan.send(
+            #     payload={
+            #         'yield': data,
+            #         'cid': self._ctx.cid,
+            #     },
+            #     # hide_tb=hide_tb,
+            # )
             await self._ctx.chan.send(
-                payload={
-                    'yield': data,
-                    'cid': self._ctx.cid,
-                },
+                payload=Yield(
+                    cid=self._ctx.cid,
+                    pld=data,
+                ),
                 # hide_tb=hide_tb,
             )
         except (
diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py
index 255b1dbd..26155b22 100644
--- a/tractor/devx/_debug.py
+++ b/tractor/devx/_debug.py
@@ -935,6 +935,9 @@ async def _pause(
             # ``breakpoint()`` was awaited and begin handling stdio.
             log.debug('Entering sync world of the `pdb` REPL..')
             try:
+                # log.critical(
+                #     f'stack len: {len(pdb.stack)}\n'
+                # )
                 debug_func(
                     actor,
                     pdb,