Compare commits
	
		
			No commits in common. "b209990d045c9069b11279916dc529bbe3152b0b" and "eec240a70a6fea5412f970a22c6b91fc3472410c" have entirely different histories. 
		
	
	
		
			b209990d04
			...
			eec240a70a
		
	
		| 
						 | 
					@ -38,7 +38,6 @@ async def main():
 | 
				
			||||||
    """
 | 
					    """
 | 
				
			||||||
    async with tractor.open_nursery(
 | 
					    async with tractor.open_nursery(
 | 
				
			||||||
        debug_mode=True,
 | 
					        debug_mode=True,
 | 
				
			||||||
        # loglevel='runtime',
 | 
					 | 
				
			||||||
    ) as n:
 | 
					    ) as n:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Spawn both actors, don't bother with collecting results
 | 
					        # Spawn both actors, don't bother with collecting results
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -3,20 +3,17 @@ import tractor
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def breakpoint_forever():
 | 
					async def breakpoint_forever():
 | 
				
			||||||
    '''
 | 
					    """Indefinitely re-enter debugger in child actor.
 | 
				
			||||||
    Indefinitely re-enter debugger in child actor.
 | 
					    """
 | 
				
			||||||
 | 
					 | 
				
			||||||
    '''
 | 
					 | 
				
			||||||
    while True:
 | 
					    while True:
 | 
				
			||||||
        await trio.sleep(0.1)
 | 
					        await trio.sleep(0.1)
 | 
				
			||||||
        await tractor.pause()
 | 
					        await tractor.breakpoint()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def main():
 | 
					async def main():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async with tractor.open_nursery(
 | 
					    async with tractor.open_nursery(
 | 
				
			||||||
        debug_mode=True,
 | 
					        debug_mode=True,
 | 
				
			||||||
        loglevel='cancel',
 | 
					 | 
				
			||||||
    ) as n:
 | 
					    ) as n:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        portal = await n.run_in_actor(
 | 
					        portal = await n.run_in_actor(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -246,10 +246,10 @@ def test_simple_context(
 | 
				
			||||||
            trio.run(main)
 | 
					            trio.run(main)
 | 
				
			||||||
        except error_parent:
 | 
					        except error_parent:
 | 
				
			||||||
            pass
 | 
					            pass
 | 
				
			||||||
        except BaseExceptionGroup as beg:
 | 
					        except trio.MultiError as me:
 | 
				
			||||||
            # XXX: on windows it seems we may have to expect the group error
 | 
					            # XXX: on windows it seems we may have to expect the group error
 | 
				
			||||||
            from tractor._exceptions import is_multi_cancelled
 | 
					            from tractor._exceptions import is_multi_cancelled
 | 
				
			||||||
            assert is_multi_cancelled(beg)
 | 
					            assert is_multi_cancelled(me)
 | 
				
			||||||
    else:
 | 
					    else:
 | 
				
			||||||
        trio.run(main)
 | 
					        trio.run(main)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -38,13 +38,10 @@ async def async_gen_stream(sequence):
 | 
				
			||||||
    assert cs.cancelled_caught
 | 
					    assert cs.cancelled_caught
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# TODO: deprecated either remove entirely
 | 
					 | 
				
			||||||
# or re-impl in terms of `MsgStream` one-sides
 | 
					 | 
				
			||||||
# wrapper, but at least remove `Portal.open_stream_from()`
 | 
					 | 
				
			||||||
@tractor.stream
 | 
					@tractor.stream
 | 
				
			||||||
async def context_stream(
 | 
					async def context_stream(
 | 
				
			||||||
    ctx: tractor.Context,
 | 
					    ctx: tractor.Context,
 | 
				
			||||||
    sequence: list[int],
 | 
					    sequence
 | 
				
			||||||
):
 | 
					):
 | 
				
			||||||
    for i in sequence:
 | 
					    for i in sequence:
 | 
				
			||||||
        await ctx.send_yield(i)
 | 
					        await ctx.send_yield(i)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -36,7 +36,6 @@ def parse_ipaddr(arg):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
if __name__ == "__main__":
 | 
					if __name__ == "__main__":
 | 
				
			||||||
    __tracebackhide__: bool = True
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    parser = argparse.ArgumentParser()
 | 
					    parser = argparse.ArgumentParser()
 | 
				
			||||||
    parser.add_argument("--uid", type=parse_uid)
 | 
					    parser.add_argument("--uid", type=parse_uid)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -47,7 +47,6 @@ import trio
 | 
				
			||||||
from ._exceptions import (
 | 
					from ._exceptions import (
 | 
				
			||||||
    ContextCancelled,
 | 
					    ContextCancelled,
 | 
				
			||||||
    InternalError,
 | 
					    InternalError,
 | 
				
			||||||
    MsgTypeError,
 | 
					 | 
				
			||||||
    RemoteActorError,
 | 
					    RemoteActorError,
 | 
				
			||||||
    StreamOverrun,
 | 
					    StreamOverrun,
 | 
				
			||||||
    pack_from_raise,
 | 
					    pack_from_raise,
 | 
				
			||||||
| 
						 | 
					@ -60,14 +59,12 @@ from .msg import (
 | 
				
			||||||
    MsgType,
 | 
					    MsgType,
 | 
				
			||||||
    MsgCodec,
 | 
					    MsgCodec,
 | 
				
			||||||
    NamespacePath,
 | 
					    NamespacePath,
 | 
				
			||||||
    PayloadT,
 | 
					 | 
				
			||||||
    Return,
 | 
					    Return,
 | 
				
			||||||
    Started,
 | 
					    Started,
 | 
				
			||||||
    Stop,
 | 
					    Stop,
 | 
				
			||||||
    Yield,
 | 
					    Yield,
 | 
				
			||||||
    current_codec,
 | 
					    current_codec,
 | 
				
			||||||
    pretty_struct,
 | 
					    pretty_struct,
 | 
				
			||||||
    types as msgtypes,
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from ._ipc import Channel
 | 
					from ._ipc import Channel
 | 
				
			||||||
from ._streaming import MsgStream
 | 
					from ._streaming import MsgStream
 | 
				
			||||||
| 
						 | 
					@ -91,10 +88,7 @@ async def _drain_to_final_msg(
 | 
				
			||||||
    hide_tb: bool = True,
 | 
					    hide_tb: bool = True,
 | 
				
			||||||
    msg_limit: int = 6,
 | 
					    msg_limit: int = 6,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> tuple[
 | 
					) -> list[dict]:
 | 
				
			||||||
    Return|None,
 | 
					 | 
				
			||||||
    list[MsgType]
 | 
					 | 
				
			||||||
]:
 | 
					 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Drain IPC msgs delivered to the underlying rx-mem-chan
 | 
					    Drain IPC msgs delivered to the underlying rx-mem-chan
 | 
				
			||||||
    `Context._recv_chan` from the runtime in search for a final
 | 
					    `Context._recv_chan` from the runtime in search for a final
 | 
				
			||||||
| 
						 | 
					@ -115,7 +109,6 @@ async def _drain_to_final_msg(
 | 
				
			||||||
    # basically ignoring) any bi-dir-stream msgs still in transit
 | 
					    # basically ignoring) any bi-dir-stream msgs still in transit
 | 
				
			||||||
    # from the far end.
 | 
					    # from the far end.
 | 
				
			||||||
    pre_result_drained: list[MsgType] = []
 | 
					    pre_result_drained: list[MsgType] = []
 | 
				
			||||||
    return_msg: Return|None = None
 | 
					 | 
				
			||||||
    while not (
 | 
					    while not (
 | 
				
			||||||
        ctx.maybe_error
 | 
					        ctx.maybe_error
 | 
				
			||||||
        and not ctx._final_result_is_set()
 | 
					        and not ctx._final_result_is_set()
 | 
				
			||||||
| 
						 | 
					@ -176,6 +169,8 @@ async def _drain_to_final_msg(
 | 
				
			||||||
            # pray to the `trio` gawds that we're corrent with this
 | 
					            # pray to the `trio` gawds that we're corrent with this
 | 
				
			||||||
            # msg: dict = await ctx._recv_chan.receive()
 | 
					            # msg: dict = await ctx._recv_chan.receive()
 | 
				
			||||||
            msg: MsgType = await ctx._recv_chan.receive()
 | 
					            msg: MsgType = await ctx._recv_chan.receive()
 | 
				
			||||||
 | 
					            # always capture unexpected/non-result msgs
 | 
				
			||||||
 | 
					            pre_result_drained.append(msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # NOTE: we get here if the far end was
 | 
					        # NOTE: we get here if the far end was
 | 
				
			||||||
        # `ContextCancelled` in 2 cases:
 | 
					        # `ContextCancelled` in 2 cases:
 | 
				
			||||||
| 
						 | 
					@ -212,13 +207,11 @@ async def _drain_to_final_msg(
 | 
				
			||||||
                # if ctx._recv_chan:
 | 
					                # if ctx._recv_chan:
 | 
				
			||||||
                #     await ctx._recv_chan.aclose()
 | 
					                #     await ctx._recv_chan.aclose()
 | 
				
			||||||
                # TODO: ^ we don't need it right?
 | 
					                # TODO: ^ we don't need it right?
 | 
				
			||||||
                return_msg = msg
 | 
					 | 
				
			||||||
                break
 | 
					                break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # far end task is still streaming to us so discard
 | 
					            # far end task is still streaming to us so discard
 | 
				
			||||||
            # and report depending on local ctx state.
 | 
					            # and report depending on local ctx state.
 | 
				
			||||||
            case Yield():
 | 
					            case Yield():
 | 
				
			||||||
                pre_result_drained.append(msg)
 | 
					 | 
				
			||||||
                if (
 | 
					                if (
 | 
				
			||||||
                    (ctx._stream.closed
 | 
					                    (ctx._stream.closed
 | 
				
			||||||
                     and (reason := 'stream was already closed')
 | 
					                     and (reason := 'stream was already closed')
 | 
				
			||||||
| 
						 | 
					@ -243,10 +236,7 @@ async def _drain_to_final_msg(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        f'{pformat(msg)}\n'
 | 
					                        f'{pformat(msg)}\n'
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
                    return (
 | 
					                    return pre_result_drained
 | 
				
			||||||
                        return_msg,
 | 
					 | 
				
			||||||
                        pre_result_drained,
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # drain up to the `msg_limit` hoping to get
 | 
					                # drain up to the `msg_limit` hoping to get
 | 
				
			||||||
                # a final result or error/ctxc.
 | 
					                # a final result or error/ctxc.
 | 
				
			||||||
| 
						 | 
					@ -270,7 +260,6 @@ async def _drain_to_final_msg(
 | 
				
			||||||
            # -[ ] should be a runtime error if a stream is open right?
 | 
					            # -[ ] should be a runtime error if a stream is open right?
 | 
				
			||||||
            # Stop()
 | 
					            # Stop()
 | 
				
			||||||
            case Stop():
 | 
					            case Stop():
 | 
				
			||||||
                pre_result_drained.append(msg)
 | 
					 | 
				
			||||||
                log.cancel(
 | 
					                log.cancel(
 | 
				
			||||||
                    'Remote stream terminated due to "stop" msg:\n\n'
 | 
					                    'Remote stream terminated due to "stop" msg:\n\n'
 | 
				
			||||||
                    f'{pformat(msg)}\n'
 | 
					                    f'{pformat(msg)}\n'
 | 
				
			||||||
| 
						 | 
					@ -280,6 +269,7 @@ async def _drain_to_final_msg(
 | 
				
			||||||
            # remote error msg, likely already handled inside
 | 
					            # remote error msg, likely already handled inside
 | 
				
			||||||
            # `Context._deliver_msg()`
 | 
					            # `Context._deliver_msg()`
 | 
				
			||||||
            case Error():
 | 
					            case Error():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # TODO: can we replace this with `ctx.maybe_raise()`?
 | 
					                # TODO: can we replace this with `ctx.maybe_raise()`?
 | 
				
			||||||
                # -[ ]  would this be handier for this case maybe?
 | 
					                # -[ ]  would this be handier for this case maybe?
 | 
				
			||||||
                #     async with maybe_raise_on_exit() as raises:
 | 
					                #     async with maybe_raise_on_exit() as raises:
 | 
				
			||||||
| 
						 | 
					@ -346,7 +336,6 @@ async def _drain_to_final_msg(
 | 
				
			||||||
            # XXX should pretty much never get here unless someone
 | 
					            # XXX should pretty much never get here unless someone
 | 
				
			||||||
            # overrides the default `MsgType` spec.
 | 
					            # overrides the default `MsgType` spec.
 | 
				
			||||||
            case _:
 | 
					            case _:
 | 
				
			||||||
                pre_result_drained.append(msg)
 | 
					 | 
				
			||||||
                # It's definitely an internal error if any other
 | 
					                # It's definitely an internal error if any other
 | 
				
			||||||
                # msg type without a`'cid'` field arrives here!
 | 
					                # msg type without a`'cid'` field arrives here!
 | 
				
			||||||
                if not msg.cid:
 | 
					                if not msg.cid:
 | 
				
			||||||
| 
						 | 
					@ -363,10 +352,7 @@ async def _drain_to_final_msg(
 | 
				
			||||||
            f'{ctx.outcome}\n'
 | 
					            f'{ctx.outcome}\n'
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return (
 | 
					    return pre_result_drained
 | 
				
			||||||
        return_msg,
 | 
					 | 
				
			||||||
        pre_result_drained,
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Unresolved:
 | 
					class Unresolved:
 | 
				
			||||||
| 
						 | 
					@ -733,36 +719,21 @@ class Context:
 | 
				
			||||||
        Return string indicating which task this instance is wrapping.
 | 
					        Return string indicating which task this instance is wrapping.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        return 'parent' if self._portal else 'child'
 | 
					        return 'caller' if self._portal else 'callee'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @staticmethod
 | 
					 | 
				
			||||||
    def peer_side(side: str) -> str:
 | 
					 | 
				
			||||||
        match side:
 | 
					 | 
				
			||||||
            case 'child':
 | 
					 | 
				
			||||||
                return 'parent'
 | 
					 | 
				
			||||||
            case 'parent':
 | 
					 | 
				
			||||||
                return 'child'
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # TODO: remove stat!
 | 
					 | 
				
			||||||
    # -[ ] re-implement the `.experiemental._pubsub` stuff
 | 
					 | 
				
			||||||
    #     with `MsgStream` and that should be last usage?
 | 
					 | 
				
			||||||
    # -[ ] remove from `tests/legacy_one_way_streaming.py`!
 | 
					 | 
				
			||||||
    async def send_yield(
 | 
					    async def send_yield(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        data: Any,
 | 
					        data: Any,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> None:
 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        Deprecated method for what now is implemented in `MsgStream`.
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        We need to rework / remove some stuff tho, see above.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        warnings.warn(
 | 
					        warnings.warn(
 | 
				
			||||||
            "`Context.send_yield()` is now deprecated. "
 | 
					            "`Context.send_yield()` is now deprecated. "
 | 
				
			||||||
            "Use ``MessageStream.send()``. ",
 | 
					            "Use ``MessageStream.send()``. ",
 | 
				
			||||||
            DeprecationWarning,
 | 
					            DeprecationWarning,
 | 
				
			||||||
            stacklevel=2,
 | 
					            stacklevel=2,
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					        # await self.chan.send({'yield': data, 'cid': self.cid})
 | 
				
			||||||
        await self.chan.send(
 | 
					        await self.chan.send(
 | 
				
			||||||
            Yield(
 | 
					            Yield(
 | 
				
			||||||
                cid=self.cid,
 | 
					                cid=self.cid,
 | 
				
			||||||
| 
						 | 
					@ -771,11 +742,12 @@ class Context:
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def send_stop(self) -> None:
 | 
					    async def send_stop(self) -> None:
 | 
				
			||||||
        '''
 | 
					        # await pause()
 | 
				
			||||||
        Terminate a `MsgStream` dialog-phase by sending the IPC
 | 
					        # await self.chan.send({
 | 
				
			||||||
        equiv of a `StopIteration`.
 | 
					        #     # Stop(
 | 
				
			||||||
 | 
					        #     'stop': True,
 | 
				
			||||||
        '''
 | 
					        #     'cid': self.cid
 | 
				
			||||||
 | 
					        # })
 | 
				
			||||||
        await self.chan.send(
 | 
					        await self.chan.send(
 | 
				
			||||||
            Stop(cid=self.cid)
 | 
					            Stop(cid=self.cid)
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
| 
						 | 
					@ -871,7 +843,6 @@ class Context:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # self-cancel (ack) or,
 | 
					        # self-cancel (ack) or,
 | 
				
			||||||
        # peer propagated remote cancellation.
 | 
					        # peer propagated remote cancellation.
 | 
				
			||||||
        msgtyperr: bool = False
 | 
					 | 
				
			||||||
        if isinstance(error, ContextCancelled):
 | 
					        if isinstance(error, ContextCancelled):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            whom: str = (
 | 
					            whom: str = (
 | 
				
			||||||
| 
						 | 
					@ -883,16 +854,6 @@ class Context:
 | 
				
			||||||
                f'{error}'
 | 
					                f'{error}'
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        elif isinstance(error, MsgTypeError):
 | 
					 | 
				
			||||||
            msgtyperr = True
 | 
					 | 
				
			||||||
            peer_side: str = self.peer_side(self.side)
 | 
					 | 
				
			||||||
            log.error(
 | 
					 | 
				
			||||||
                f'IPC dialog error due to msg-type caused by {peer_side!r} side\n\n'
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                f'{error}\n'
 | 
					 | 
				
			||||||
                f'{pformat(self)}\n'
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            log.error(
 | 
					            log.error(
 | 
				
			||||||
                f'Remote context error:\n\n'
 | 
					                f'Remote context error:\n\n'
 | 
				
			||||||
| 
						 | 
					@ -933,9 +894,9 @@ class Context:
 | 
				
			||||||
            # if `._cancel_called` then `.cancel_acked and .cancel_called`
 | 
					            # if `._cancel_called` then `.cancel_acked and .cancel_called`
 | 
				
			||||||
            # always should be set.
 | 
					            # always should be set.
 | 
				
			||||||
            and not self._is_self_cancelled()
 | 
					            and not self._is_self_cancelled()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            and not cs.cancel_called
 | 
					            and not cs.cancel_called
 | 
				
			||||||
            and not cs.cancelled_caught
 | 
					            and not cs.cancelled_caught
 | 
				
			||||||
            and not msgtyperr
 | 
					 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
            # TODO: it'd sure be handy to inject our own
 | 
					            # TODO: it'd sure be handy to inject our own
 | 
				
			||||||
            # `trio.Cancelled` subtype here ;)
 | 
					            # `trio.Cancelled` subtype here ;)
 | 
				
			||||||
| 
						 | 
					@ -1040,7 +1001,7 @@ class Context:
 | 
				
			||||||
        # when the runtime finally receives it during teardown
 | 
					        # when the runtime finally receives it during teardown
 | 
				
			||||||
        # (normally in `.result()` called from
 | 
					        # (normally in `.result()` called from
 | 
				
			||||||
        # `Portal.open_context().__aexit__()`)
 | 
					        # `Portal.open_context().__aexit__()`)
 | 
				
			||||||
        if side == 'parent':
 | 
					        if side == 'caller':
 | 
				
			||||||
            if not self._portal:
 | 
					            if not self._portal:
 | 
				
			||||||
                raise InternalError(
 | 
					                raise InternalError(
 | 
				
			||||||
                    'No portal found!?\n'
 | 
					                    'No portal found!?\n'
 | 
				
			||||||
| 
						 | 
					@ -1462,10 +1423,7 @@ class Context:
 | 
				
			||||||
            # wait for a final context result/error by "draining"
 | 
					            # wait for a final context result/error by "draining"
 | 
				
			||||||
            # (by more or less ignoring) any bi-dir-stream "yield"
 | 
					            # (by more or less ignoring) any bi-dir-stream "yield"
 | 
				
			||||||
            # msgs still in transit from the far end.
 | 
					            # msgs still in transit from the far end.
 | 
				
			||||||
            (
 | 
					            drained_msgs: list[dict] = await _drain_to_final_msg(
 | 
				
			||||||
                return_msg,
 | 
					 | 
				
			||||||
                drained_msgs,
 | 
					 | 
				
			||||||
            ) = await _drain_to_final_msg(
 | 
					 | 
				
			||||||
                ctx=self,
 | 
					                ctx=self,
 | 
				
			||||||
                hide_tb=hide_tb,
 | 
					                hide_tb=hide_tb,
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
| 
						 | 
					@ -1483,10 +1441,7 @@ class Context:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            log.cancel(
 | 
					            log.cancel(
 | 
				
			||||||
                'Ctx drained pre-result msgs:\n'
 | 
					                'Ctx drained pre-result msgs:\n'
 | 
				
			||||||
                f'{pformat(drained_msgs)}\n\n'
 | 
					                f'{pformat(drained_msgs)}'
 | 
				
			||||||
 | 
					 | 
				
			||||||
                f'Final return msg:\n'
 | 
					 | 
				
			||||||
                f'{return_msg}\n'
 | 
					 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.maybe_raise(
 | 
					        self.maybe_raise(
 | 
				
			||||||
| 
						 | 
					@ -1653,13 +1608,7 @@ class Context:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def started(
 | 
					    async def started(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
 | 
					        value: Any | None = None
 | 
				
			||||||
        # TODO: how to type this so that it's the
 | 
					 | 
				
			||||||
        # same as the payload type? Is this enough?
 | 
					 | 
				
			||||||
        value: PayloadT|None = None,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        strict_parity: bool = False,
 | 
					 | 
				
			||||||
        complain_no_parity: bool = True,
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> None:
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
| 
						 | 
					@ -1680,7 +1629,7 @@ class Context:
 | 
				
			||||||
                f'called `.started()` twice on context with {self.chan.uid}'
 | 
					                f'called `.started()` twice on context with {self.chan.uid}'
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        started_msg = Started(
 | 
					        started = Started(
 | 
				
			||||||
            cid=self.cid,
 | 
					            cid=self.cid,
 | 
				
			||||||
            pld=value,
 | 
					            pld=value,
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
| 
						 | 
					@ -1701,54 +1650,28 @@ class Context:
 | 
				
			||||||
        # https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
 | 
					        # https://zguide.zeromq.org/docs/chapter7/#The-Cheap-or-Nasty-Pattern
 | 
				
			||||||
        #
 | 
					        #
 | 
				
			||||||
        codec: MsgCodec = current_codec()
 | 
					        codec: MsgCodec = current_codec()
 | 
				
			||||||
        msg_bytes: bytes = codec.encode(started_msg)
 | 
					        msg_bytes: bytes = codec.encode(started)
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            # be a "cheap" dialog (see above!)
 | 
					            # be a "cheap" dialog (see above!)
 | 
				
			||||||
            if (
 | 
					            rt_started = codec.decode(msg_bytes)
 | 
				
			||||||
                strict_parity
 | 
					            if rt_started != started:
 | 
				
			||||||
                or
 | 
					 | 
				
			||||||
                complain_no_parity
 | 
					 | 
				
			||||||
            ):
 | 
					 | 
				
			||||||
                rt_started: Started = codec.decode(msg_bytes)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # XXX something is prolly totes cucked with the
 | 
					                # TODO: break these methods out from the struct subtype?
 | 
				
			||||||
                # codec state!
 | 
					                diff = pretty_struct.Struct.__sub__(rt_started, started)
 | 
				
			||||||
                if isinstance(rt_started, dict):
 | 
					 | 
				
			||||||
                    rt_started = msgtypes.from_dict_msg(
 | 
					 | 
				
			||||||
                        dict_msg=rt_started,
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    raise RuntimeError(
 | 
					 | 
				
			||||||
                        'Failed to roundtrip `Started` msg?\n'
 | 
					 | 
				
			||||||
                        f'{pformat(rt_started)}\n'
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if rt_started != started_msg:
 | 
					                complaint: str = (
 | 
				
			||||||
                    # TODO: break these methods out from the struct subtype?
 | 
					                    'Started value does not match after codec rountrip?\n\n'
 | 
				
			||||||
 | 
					                    f'{diff}'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                # TODO: rn this will pretty much always fail with
 | 
				
			||||||
 | 
					                # any other sequence type embeded in the
 | 
				
			||||||
 | 
					                # payload...
 | 
				
			||||||
 | 
					                if self._strict_started:
 | 
				
			||||||
 | 
					                    raise ValueError(complaint)
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    log.warning(complaint)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    diff = pretty_struct.Struct.__sub__(
 | 
					            await self.chan.send(rt_started)
 | 
				
			||||||
                        rt_started,
 | 
					 | 
				
			||||||
                        started_msg,
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    complaint: str = (
 | 
					 | 
				
			||||||
                        'Started value does not match after codec rountrip?\n\n'
 | 
					 | 
				
			||||||
                        f'{diff}'
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    # TODO: rn this will pretty much always fail with
 | 
					 | 
				
			||||||
                    # any other sequence type embeded in the
 | 
					 | 
				
			||||||
                    # payload...
 | 
					 | 
				
			||||||
                    if (
 | 
					 | 
				
			||||||
                        self._strict_started
 | 
					 | 
				
			||||||
                        or
 | 
					 | 
				
			||||||
                        strict_parity
 | 
					 | 
				
			||||||
                    ):
 | 
					 | 
				
			||||||
                        raise ValueError(complaint)
 | 
					 | 
				
			||||||
                    else:
 | 
					 | 
				
			||||||
                        log.warning(complaint)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                # started_msg = rt_started
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            await self.chan.send(started_msg)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # raise any msg type error NO MATTER WHAT!
 | 
					        # raise any msg type error NO MATTER WHAT!
 | 
				
			||||||
        except msgspec.ValidationError as verr:
 | 
					        except msgspec.ValidationError as verr:
 | 
				
			||||||
| 
						 | 
					@ -1759,7 +1682,7 @@ class Context:
 | 
				
			||||||
                src_validation_error=verr,
 | 
					                src_validation_error=verr,
 | 
				
			||||||
                verb_header='Trying to send payload'
 | 
					                verb_header='Trying to send payload'
 | 
				
			||||||
                # > 'invalid `Started IPC msgs\n'
 | 
					                # > 'invalid `Started IPC msgs\n'
 | 
				
			||||||
            ) from verr
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self._started_called = True
 | 
					        self._started_called = True
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1860,17 +1783,13 @@ class Context:
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                log_meth = log.runtime
 | 
					                log_meth = log.runtime
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            side: str = self.side
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            peer_side: str = self.peer_side(side)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            log_meth(
 | 
					            log_meth(
 | 
				
			||||||
                f'Delivering IPC ctx error from {peer_side!r} to {side!r} task\n\n'
 | 
					                f'Delivering error-msg to caller\n\n'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                f'<= peer {peer_side!r}: {from_uid}\n'
 | 
					                f'<= peer: {from_uid}\n'
 | 
				
			||||||
                f'  |_ {nsf}()\n\n'
 | 
					                f'  |_ {nsf}()\n\n'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                f'=> {side!r} cid: {cid}\n'
 | 
					                f'=> cid: {cid}\n'
 | 
				
			||||||
                f'  |_{self._task}\n\n'
 | 
					                f'  |_{self._task}\n\n'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                f'{pformat(re)}\n'
 | 
					                f'{pformat(re)}\n'
 | 
				
			||||||
| 
						 | 
					@ -1885,7 +1804,6 @@ class Context:
 | 
				
			||||||
            self._maybe_cancel_and_set_remote_error(re)
 | 
					            self._maybe_cancel_and_set_remote_error(re)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # XXX only case where returning early is fine!
 | 
					        # XXX only case where returning early is fine!
 | 
				
			||||||
        structfmt = pretty_struct.Struct.pformat
 | 
					 | 
				
			||||||
        if self._in_overrun:
 | 
					        if self._in_overrun:
 | 
				
			||||||
            log.warning(
 | 
					            log.warning(
 | 
				
			||||||
                f'Queueing OVERRUN msg on caller task:\n'
 | 
					                f'Queueing OVERRUN msg on caller task:\n'
 | 
				
			||||||
| 
						 | 
					@ -1895,7 +1813,7 @@ class Context:
 | 
				
			||||||
                f'=> cid: {cid}\n'
 | 
					                f'=> cid: {cid}\n'
 | 
				
			||||||
                f'  |_{self._task}\n\n'
 | 
					                f'  |_{self._task}\n\n'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                f'{structfmt(msg)}\n'
 | 
					                f'{pformat(msg)}\n'
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            self._overflow_q.append(msg)
 | 
					            self._overflow_q.append(msg)
 | 
				
			||||||
            return False
 | 
					            return False
 | 
				
			||||||
| 
						 | 
					@ -1909,7 +1827,7 @@ class Context:
 | 
				
			||||||
                f'=> {self._task}\n'
 | 
					                f'=> {self._task}\n'
 | 
				
			||||||
                f'  |_cid={self.cid}\n\n'
 | 
					                f'  |_cid={self.cid}\n\n'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                f'{structfmt(msg)}\n'
 | 
					                f'{pformat(msg)}\n'
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # NOTE: if an error is deteced we should always still
 | 
					            # NOTE: if an error is deteced we should always still
 | 
				
			||||||
| 
						 | 
					@ -2129,9 +2047,6 @@ async def open_context_from_portal(
 | 
				
			||||||
        # place..
 | 
					        # place..
 | 
				
			||||||
        allow_overruns=allow_overruns,
 | 
					        allow_overruns=allow_overruns,
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    # ASAP, so that `Context.side: str` can be determined for
 | 
					 | 
				
			||||||
    # logging / tracing / debug!
 | 
					 | 
				
			||||||
    ctx._portal: Portal = portal
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    assert ctx._remote_func_type == 'context'
 | 
					    assert ctx._remote_func_type == 'context'
 | 
				
			||||||
    msg: Started = await ctx._recv_chan.receive()
 | 
					    msg: Started = await ctx._recv_chan.receive()
 | 
				
			||||||
| 
						 | 
					@ -2150,9 +2065,10 @@ async def open_context_from_portal(
 | 
				
			||||||
            msg=msg,
 | 
					            msg=msg,
 | 
				
			||||||
            src_err=src_error,
 | 
					            src_err=src_error,
 | 
				
			||||||
            log=log,
 | 
					            log=log,
 | 
				
			||||||
            expect_msg=Started,
 | 
					            expect_key='started',
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ctx._portal: Portal = portal
 | 
				
			||||||
    uid: tuple = portal.channel.uid
 | 
					    uid: tuple = portal.channel.uid
 | 
				
			||||||
    cid: str = ctx.cid
 | 
					    cid: str = ctx.cid
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -106,7 +106,6 @@ def _trio_main(
 | 
				
			||||||
    Entry point for a `trio_run_in_process` subactor.
 | 
					    Entry point for a `trio_run_in_process` subactor.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    __tracebackhide__: bool = True
 | 
					 | 
				
			||||||
    _state._current_actor = actor
 | 
					    _state._current_actor = actor
 | 
				
			||||||
    trio_main = partial(
 | 
					    trio_main = partial(
 | 
				
			||||||
        async_main,
 | 
					        async_main,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -43,12 +43,9 @@ from tractor.msg import (
 | 
				
			||||||
    MsgType,
 | 
					    MsgType,
 | 
				
			||||||
    Stop,
 | 
					    Stop,
 | 
				
			||||||
    Yield,
 | 
					    Yield,
 | 
				
			||||||
 | 
					    pretty_struct,
 | 
				
			||||||
    types as msgtypes,
 | 
					    types as msgtypes,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from tractor.msg.pretty_struct import (
 | 
					 | 
				
			||||||
    iter_fields,
 | 
					 | 
				
			||||||
    Struct,
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
if TYPE_CHECKING:
 | 
					if TYPE_CHECKING:
 | 
				
			||||||
    from ._context import Context
 | 
					    from ._context import Context
 | 
				
			||||||
| 
						 | 
					@ -85,7 +82,7 @@ class InternalError(RuntimeError):
 | 
				
			||||||
_ipcmsg_keys: list[str] = [
 | 
					_ipcmsg_keys: list[str] = [
 | 
				
			||||||
    fi.name
 | 
					    fi.name
 | 
				
			||||||
    for fi, k, v
 | 
					    for fi, k, v
 | 
				
			||||||
    in iter_fields(Error)
 | 
					    in pretty_struct.iter_fields(Error)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -324,7 +321,7 @@ class RemoteActorError(Exception):
 | 
				
			||||||
            assert self.boxed_type is boxed_type
 | 
					            assert self.boxed_type is boxed_type
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @property
 | 
					    @property
 | 
				
			||||||
    def ipc_msg(self) -> Struct:
 | 
					    def ipc_msg(self) -> pretty_struct.Struct:
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        Re-render the underlying `._ipc_msg: Msg` as
 | 
					        Re-render the underlying `._ipc_msg: Msg` as
 | 
				
			||||||
        a `pretty_struct.Struct` for introspection such that the
 | 
					        a `pretty_struct.Struct` for introspection such that the
 | 
				
			||||||
| 
						 | 
					@ -337,12 +334,12 @@ class RemoteActorError(Exception):
 | 
				
			||||||
        msg_type: MsgType = type(self._ipc_msg)
 | 
					        msg_type: MsgType = type(self._ipc_msg)
 | 
				
			||||||
        fields: dict[str, Any] = {
 | 
					        fields: dict[str, Any] = {
 | 
				
			||||||
            k: v for _, k, v in
 | 
					            k: v for _, k, v in
 | 
				
			||||||
            iter_fields(self._ipc_msg)
 | 
					            pretty_struct.iter_fields(self._ipc_msg)
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        return defstruct(
 | 
					        return defstruct(
 | 
				
			||||||
            msg_type.__name__,
 | 
					            msg_type.__name__,
 | 
				
			||||||
            fields=fields.keys(),
 | 
					            fields=fields.keys(),
 | 
				
			||||||
            bases=(msg_type, Struct),
 | 
					            bases=(msg_type, pretty_struct.Struct),
 | 
				
			||||||
        )(**fields)
 | 
					        )(**fields)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @property
 | 
					    @property
 | 
				
			||||||
| 
						 | 
					@ -644,11 +641,11 @@ class MsgTypeError(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    reprol_fields: list[str] = [
 | 
					    reprol_fields: list[str] = [
 | 
				
			||||||
        'expected_msg_type',
 | 
					        'payload_msg',
 | 
				
			||||||
    ]
 | 
					    ]
 | 
				
			||||||
    extra_body_fields: list[str] = [
 | 
					    extra_body_fields: list[str] = [
 | 
				
			||||||
        'cid',
 | 
					        'cid',
 | 
				
			||||||
        'expected_msg',
 | 
					        'payload_msg',
 | 
				
			||||||
    ]
 | 
					    ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @property
 | 
					    @property
 | 
				
			||||||
| 
						 | 
					@ -664,7 +661,9 @@ class MsgTypeError(
 | 
				
			||||||
        return self.msgdata.get('_msg_dict')
 | 
					        return self.msgdata.get('_msg_dict')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @property
 | 
					    @property
 | 
				
			||||||
    def expected_msg(self) -> MsgType|None:
 | 
					    def payload_msg(
 | 
				
			||||||
 | 
					        self,
 | 
				
			||||||
 | 
					    ) -> MsgType|None:
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        Attempt to construct what would have been the original
 | 
					        Attempt to construct what would have been the original
 | 
				
			||||||
        `MsgType`-with-payload subtype (i.e. an instance from the set
 | 
					        `MsgType`-with-payload subtype (i.e. an instance from the set
 | 
				
			||||||
| 
						 | 
					@ -675,17 +674,9 @@ class MsgTypeError(
 | 
				
			||||||
        if msg_dict := self.msg_dict.copy():
 | 
					        if msg_dict := self.msg_dict.copy():
 | 
				
			||||||
            return msgtypes.from_dict_msg(
 | 
					            return msgtypes.from_dict_msg(
 | 
				
			||||||
                dict_msg=msg_dict,
 | 
					                dict_msg=msg_dict,
 | 
				
			||||||
                # use_pretty=True,
 | 
					 | 
				
			||||||
                # ^-TODO-^ would luv to use this BUT then the
 | 
					 | 
				
			||||||
                # `field_prefix` in `pformat_boxed_tb()` cucks it
 | 
					 | 
				
			||||||
                # all up.. XD
 | 
					 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
        return None
 | 
					        return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @property
 | 
					 | 
				
			||||||
    def expected_msg_type(self) -> Type[MsgType]|None:
 | 
					 | 
				
			||||||
        return type(self.expected_msg)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    @property
 | 
					    @property
 | 
				
			||||||
    def cid(self) -> str:
 | 
					    def cid(self) -> str:
 | 
				
			||||||
        # pre-packed using `.from_decode()` constructor
 | 
					        # pre-packed using `.from_decode()` constructor
 | 
				
			||||||
| 
						 | 
					@ -938,6 +929,7 @@ def _raise_from_no_key_in_msg(
 | 
				
			||||||
    src_err: KeyError,
 | 
					    src_err: KeyError,
 | 
				
			||||||
    log: StackLevelAdapter,  # caller specific `log` obj
 | 
					    log: StackLevelAdapter,  # caller specific `log` obj
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    expect_key: str = 'yield',
 | 
				
			||||||
    expect_msg: str = Yield,
 | 
					    expect_msg: str = Yield,
 | 
				
			||||||
    stream: MsgStream | None = None,
 | 
					    stream: MsgStream | None = None,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1052,7 +1044,7 @@ def _raise_from_no_key_in_msg(
 | 
				
			||||||
    # is activated above.
 | 
					    # is activated above.
 | 
				
			||||||
    _type: str = 'Stream' if stream else 'Context'
 | 
					    _type: str = 'Stream' if stream else 'Context'
 | 
				
			||||||
    raise MessagingError(
 | 
					    raise MessagingError(
 | 
				
			||||||
        f"{_type} was expecting a {expect_msg} message"
 | 
					        f"{_type} was expecting a '{expect_key.upper()}' message"
 | 
				
			||||||
        " BUT received a non-error msg:\n"
 | 
					        " BUT received a non-error msg:\n"
 | 
				
			||||||
        f'{pformat(msg)}'
 | 
					        f'{pformat(msg)}'
 | 
				
			||||||
    ) from src_err
 | 
					    ) from src_err
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -130,8 +130,6 @@ def _mk_msg_type_err(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> MsgTypeError:
 | 
					) -> MsgTypeError:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    import textwrap
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # `Channel.send()` case
 | 
					    # `Channel.send()` case
 | 
				
			||||||
    if src_validation_error is None:  # send-side
 | 
					    if src_validation_error is None:  # send-side
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -211,24 +209,10 @@ def _mk_msg_type_err(
 | 
				
			||||||
        msg, _, maybe_field = msgspec_msg.rpartition('$.')
 | 
					        msg, _, maybe_field = msgspec_msg.rpartition('$.')
 | 
				
			||||||
        obj = object()
 | 
					        obj = object()
 | 
				
			||||||
        if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
 | 
					        if (field_val := msg_dict.get(maybe_field, obj)) is not obj:
 | 
				
			||||||
            field_name_expr: str = (
 | 
					 | 
				
			||||||
                f' |_{maybe_field}: {codec.pld_spec_str} = '
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            fmt_val_lines: list[str] = pformat(field_val).splitlines()
 | 
					 | 
				
			||||||
            fmt_val: str = (
 | 
					 | 
				
			||||||
                f'{fmt_val_lines[0]}\n'
 | 
					 | 
				
			||||||
                +
 | 
					 | 
				
			||||||
                textwrap.indent(
 | 
					 | 
				
			||||||
                    '\n'.join(fmt_val_lines[1:]),
 | 
					 | 
				
			||||||
                    prefix=' '*len(field_name_expr),
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
            )
 | 
					 | 
				
			||||||
            message += (
 | 
					            message += (
 | 
				
			||||||
                f'{msg.rstrip("`")}\n\n'
 | 
					                f'{msg.rstrip("`")}\n\n'
 | 
				
			||||||
                f'<{msg_type.__qualname__}(\n'
 | 
					                f'{msg_type}\n'
 | 
				
			||||||
                # f'{".".join([msg_type.__module__, msg_type.__qualname__])}\n'
 | 
					                f' |_.{maybe_field}: {codec.pld_spec_str} = {field_val!r}\n'
 | 
				
			||||||
                f'{field_name_expr}{fmt_val}\n'
 | 
					 | 
				
			||||||
                f')>'
 | 
					 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        msgtyperr = MsgTypeError.from_decode(
 | 
					        msgtyperr = MsgTypeError.from_decode(
 | 
				
			||||||
| 
						 | 
					@ -354,7 +338,7 @@ class MsgpackTCPStream(MsgTransport):
 | 
				
			||||||
                    # self._task = task
 | 
					                    # self._task = task
 | 
				
			||||||
                    self._codec = codec
 | 
					                    self._codec = codec
 | 
				
			||||||
                    log.runtime(
 | 
					                    log.runtime(
 | 
				
			||||||
                        f'Using new codec in {self}.recv()\n'
 | 
					                        'Using new codec in {self}.recv()\n'
 | 
				
			||||||
                        f'codec: {self._codec}\n\n'
 | 
					                        f'codec: {self._codec}\n\n'
 | 
				
			||||||
                        f'msg_bytes: {msg_bytes}\n'
 | 
					                        f'msg_bytes: {msg_bytes}\n'
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
| 
						 | 
					@ -436,7 +420,7 @@ class MsgpackTCPStream(MsgTransport):
 | 
				
			||||||
            if self._codec.pld_spec != codec.pld_spec:
 | 
					            if self._codec.pld_spec != codec.pld_spec:
 | 
				
			||||||
                self._codec = codec
 | 
					                self._codec = codec
 | 
				
			||||||
                log.runtime(
 | 
					                log.runtime(
 | 
				
			||||||
                    f'Using new codec in {self}.send()\n'
 | 
					                    'Using new codec in {self}.send()\n'
 | 
				
			||||||
                    f'codec: {self._codec}\n\n'
 | 
					                    f'codec: {self._codec}\n\n'
 | 
				
			||||||
                    f'msg: {msg}\n'
 | 
					                    f'msg: {msg}\n'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -79,7 +79,6 @@ async def open_root_actor(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # enables the multi-process debugger support
 | 
					    # enables the multi-process debugger support
 | 
				
			||||||
    debug_mode: bool = False,
 | 
					    debug_mode: bool = False,
 | 
				
			||||||
    maybe_enable_greenback: bool = False,  # `.pause_from_sync()/breakpoint()` support
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # internal logging
 | 
					    # internal logging
 | 
				
			||||||
    loglevel: str|None = None,
 | 
					    loglevel: str|None = None,
 | 
				
			||||||
| 
						 | 
					@ -108,16 +107,14 @@ async def open_root_actor(
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    if (
 | 
					    if (
 | 
				
			||||||
        debug_mode
 | 
					        debug_mode
 | 
				
			||||||
        and maybe_enable_greenback
 | 
					        and
 | 
				
			||||||
        and await _debug.maybe_init_greenback(
 | 
					        await _debug.maybe_init_greenback(
 | 
				
			||||||
            raise_not_found=False,
 | 
					            raise_not_found=False,
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
        os.environ['PYTHONBREAKPOINT'] = (
 | 
					        os.environ['PYTHONBREAKPOINT'] = (
 | 
				
			||||||
            'tractor.devx._debug.pause_from_sync'
 | 
					            'tractor.devx._debug.pause_from_sync'
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
        _state._runtime_vars['use_greenback'] = True
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    else:
 | 
					    else:
 | 
				
			||||||
        # TODO: disable `breakpoint()` by default (without
 | 
					        # TODO: disable `breakpoint()` by default (without
 | 
				
			||||||
        # `greenback`) since it will break any multi-actor
 | 
					        # `greenback`) since it will break any multi-actor
 | 
				
			||||||
| 
						 | 
					@ -388,20 +385,14 @@ async def open_root_actor(
 | 
				
			||||||
        _state._last_actor_terminated = actor
 | 
					        _state._last_actor_terminated = actor
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # restore built-in `breakpoint()` hook state
 | 
					        # restore built-in `breakpoint()` hook state
 | 
				
			||||||
        if (
 | 
					        if debug_mode:
 | 
				
			||||||
            debug_mode
 | 
					 | 
				
			||||||
            and
 | 
					 | 
				
			||||||
            maybe_enable_greenback
 | 
					 | 
				
			||||||
        ):
 | 
					 | 
				
			||||||
            if builtin_bp_handler is not None:
 | 
					            if builtin_bp_handler is not None:
 | 
				
			||||||
                sys.breakpointhook = builtin_bp_handler
 | 
					                sys.breakpointhook = builtin_bp_handler
 | 
				
			||||||
 | 
					 | 
				
			||||||
            if orig_bp_path is not None:
 | 
					            if orig_bp_path is not None:
 | 
				
			||||||
                os.environ['PYTHONBREAKPOINT'] = orig_bp_path
 | 
					                os.environ['PYTHONBREAKPOINT'] = orig_bp_path
 | 
				
			||||||
 | 
					 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
                # clear env back to having no entry
 | 
					                # clear env back to having no entry
 | 
				
			||||||
                os.environ.pop('PYTHONBREAKPOINT', None)
 | 
					                os.environ.pop('PYTHONBREAKPOINT')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        logger.runtime("Root actor terminated")
 | 
					        logger.runtime("Root actor terminated")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -41,6 +41,7 @@ from trio import (
 | 
				
			||||||
    TaskStatus,
 | 
					    TaskStatus,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from .msg import NamespacePath
 | 
				
			||||||
from ._ipc import Channel
 | 
					from ._ipc import Channel
 | 
				
			||||||
from ._context import (
 | 
					from ._context import (
 | 
				
			||||||
    Context,
 | 
					    Context,
 | 
				
			||||||
| 
						 | 
					@ -60,11 +61,6 @@ from .devx import (
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from . import _state
 | 
					from . import _state
 | 
				
			||||||
from .log import get_logger
 | 
					from .log import get_logger
 | 
				
			||||||
from .msg import (
 | 
					 | 
				
			||||||
    current_codec,
 | 
					 | 
				
			||||||
    MsgCodec,
 | 
					 | 
				
			||||||
    NamespacePath,
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
from tractor.msg.types import (
 | 
					from tractor.msg.types import (
 | 
				
			||||||
    CancelAck,
 | 
					    CancelAck,
 | 
				
			||||||
    Error,
 | 
					    Error,
 | 
				
			||||||
| 
						 | 
					@ -102,7 +98,6 @@ async def _invoke_non_context(
 | 
				
			||||||
        Context | BaseException
 | 
					        Context | BaseException
 | 
				
			||||||
    ] = trio.TASK_STATUS_IGNORED,
 | 
					    ] = trio.TASK_STATUS_IGNORED,
 | 
				
			||||||
):
 | 
					):
 | 
				
			||||||
    __tracebackhide__: bool = True
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: can we unify this with the `context=True` impl below?
 | 
					    # TODO: can we unify this with the `context=True` impl below?
 | 
				
			||||||
    if inspect.isasyncgen(coro):
 | 
					    if inspect.isasyncgen(coro):
 | 
				
			||||||
| 
						 | 
					@ -403,11 +398,7 @@ async def _invoke(
 | 
				
			||||||
    __tracebackhide__: bool = hide_tb
 | 
					    __tracebackhide__: bool = hide_tb
 | 
				
			||||||
    treat_as_gen: bool = False
 | 
					    treat_as_gen: bool = False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if (
 | 
					    if _state.debug_mode():
 | 
				
			||||||
        _state.debug_mode()
 | 
					 | 
				
			||||||
        and
 | 
					 | 
				
			||||||
        _state._runtime_vars['use_greenback']
 | 
					 | 
				
			||||||
    ):
 | 
					 | 
				
			||||||
        # XXX for .pause_from_sync()` usage we need to make sure
 | 
					        # XXX for .pause_from_sync()` usage we need to make sure
 | 
				
			||||||
        # `greenback` is boostrapped in the subactor!
 | 
					        # `greenback` is boostrapped in the subactor!
 | 
				
			||||||
        await _debug.maybe_init_greenback()
 | 
					        await _debug.maybe_init_greenback()
 | 
				
			||||||
| 
						 | 
					@ -521,22 +512,10 @@ async def _invoke(
 | 
				
			||||||
        #     wrapper that calls `Context.started()` and then does
 | 
					        #     wrapper that calls `Context.started()` and then does
 | 
				
			||||||
        #     the `await coro()`?
 | 
					        #     the `await coro()`?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # ------ - ------
 | 
					        # a "context" endpoint type is the most general and
 | 
				
			||||||
        # a "context" endpoint is the most general and
 | 
					        # "least sugary" type of RPC ep with support for
 | 
				
			||||||
        # "least sugary" type of RPC with support for
 | 
					 | 
				
			||||||
        # bi-dir streaming B)
 | 
					        # bi-dir streaming B)
 | 
				
			||||||
        #
 | 
					        # StartAck
 | 
				
			||||||
        # the concurrency relation is simlar to a task nursery
 | 
					 | 
				
			||||||
        # wherein a "parent" task (the one that enters
 | 
					 | 
				
			||||||
        # `trio.open_nursery()` in some actor "opens" (via
 | 
					 | 
				
			||||||
        # `Portal.open_context()`) an IPC ctx to another peer
 | 
					 | 
				
			||||||
        # (which is maybe a sub-) actor who then schedules (aka
 | 
					 | 
				
			||||||
        # `trio.Nursery.start()`s) a new "child" task to execute
 | 
					 | 
				
			||||||
        # the `@context` annotated func; that is this func we're
 | 
					 | 
				
			||||||
        # running directly below!
 | 
					 | 
				
			||||||
        # ------ - ------
 | 
					 | 
				
			||||||
        #
 | 
					 | 
				
			||||||
        # StartAck: respond immediately with endpoint info
 | 
					 | 
				
			||||||
        await chan.send(
 | 
					        await chan.send(
 | 
				
			||||||
            StartAck(
 | 
					            StartAck(
 | 
				
			||||||
                cid=cid,
 | 
					                cid=cid,
 | 
				
			||||||
| 
						 | 
					@ -545,11 +524,11 @@ async def _invoke(
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # TODO: should we also use an `.open_context()` equiv
 | 
					        # TODO: should we also use an `.open_context()` equiv
 | 
				
			||||||
        # for this child side by factoring the impl from
 | 
					        # for this callee side by factoring the impl from
 | 
				
			||||||
        # `Portal.open_context()` into a common helper?
 | 
					        # `Portal.open_context()` into a common helper?
 | 
				
			||||||
        #
 | 
					        #
 | 
				
			||||||
        # NOTE: there are many different ctx state details
 | 
					        # NOTE: there are many different ctx state details
 | 
				
			||||||
        # in a child side instance according to current impl:
 | 
					        # in a callee side instance according to current impl:
 | 
				
			||||||
        # - `.cancelled_caught` can never be `True`.
 | 
					        # - `.cancelled_caught` can never be `True`.
 | 
				
			||||||
        #  -> the below scope is never exposed to the
 | 
					        #  -> the below scope is never exposed to the
 | 
				
			||||||
        #     `@context` marked RPC function.
 | 
					        #     `@context` marked RPC function.
 | 
				
			||||||
| 
						 | 
					@ -575,7 +554,7 @@ async def _invoke(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # NOTE: this happens IFF `ctx._scope.cancel()` is
 | 
					            # NOTE: this happens IFF `ctx._scope.cancel()` is
 | 
				
			||||||
            # called by any of,
 | 
					            # called by any of,
 | 
				
			||||||
            # - *this* child task manually calling `ctx.cancel()`.
 | 
					            # - *this* callee task manually calling `ctx.cancel()`.
 | 
				
			||||||
            # - the runtime calling `ctx._deliver_msg()` which
 | 
					            # - the runtime calling `ctx._deliver_msg()` which
 | 
				
			||||||
            #   itself calls `ctx._maybe_cancel_and_set_remote_error()`
 | 
					            #   itself calls `ctx._maybe_cancel_and_set_remote_error()`
 | 
				
			||||||
            #   which cancels the scope presuming the input error
 | 
					            #   which cancels the scope presuming the input error
 | 
				
			||||||
| 
						 | 
					@ -652,11 +631,10 @@ async def _invoke(
 | 
				
			||||||
                        # f'  |_{ctx}'
 | 
					                        # f'  |_{ctx}'
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # task-contex was either cancelled by request
 | 
					                    # task-contex was either cancelled by request using
 | 
				
			||||||
                    # using ``Portal.cancel_actor()`` or
 | 
					                    # ``Portal.cancel_actor()`` or ``Context.cancel()``
 | 
				
			||||||
                    # ``Context.cancel()`` on the far end, or it
 | 
					                    # on the far end, or it was cancelled by the local
 | 
				
			||||||
                    # was cancelled by the local child (or callee)
 | 
					                    # (callee) task, so relay this cancel signal to the
 | 
				
			||||||
                    # task, so relay this cancel signal to the
 | 
					 | 
				
			||||||
                    # other side.
 | 
					                    # other side.
 | 
				
			||||||
                    ctxc = ContextCancelled(
 | 
					                    ctxc = ContextCancelled(
 | 
				
			||||||
                        message=msg,
 | 
					                        message=msg,
 | 
				
			||||||
| 
						 | 
					@ -677,7 +655,7 @@ async def _invoke(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        ) as scope_error:
 | 
					        ) as scope_error:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # always set this (child) side's exception as the
 | 
					            # always set this (callee) side's exception as the
 | 
				
			||||||
            # local error on the context
 | 
					            # local error on the context
 | 
				
			||||||
            ctx._local_error: BaseException = scope_error
 | 
					            ctx._local_error: BaseException = scope_error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1046,8 +1024,9 @@ async def process_messages(
 | 
				
			||||||
                                trio.Event(),
 | 
					                                trio.Event(),
 | 
				
			||||||
                            )
 | 
					                            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # runtime-scoped remote error (since no `.cid`)
 | 
					                    # XXX remote (runtime scoped) error or uknown
 | 
				
			||||||
                    case Error():
 | 
					                    # msg (type).
 | 
				
			||||||
 | 
					                    case Error() | _:
 | 
				
			||||||
                        # NOTE: this is the non-rpc error case,
 | 
					                        # NOTE: this is the non-rpc error case,
 | 
				
			||||||
                        # that is, an error **not** raised inside
 | 
					                        # that is, an error **not** raised inside
 | 
				
			||||||
                        # a call to ``_invoke()`` (i.e. no cid was
 | 
					                        # a call to ``_invoke()`` (i.e. no cid was
 | 
				
			||||||
| 
						 | 
					@ -1055,6 +1034,10 @@ async def process_messages(
 | 
				
			||||||
                        # this error to all local channel
 | 
					                        # this error to all local channel
 | 
				
			||||||
                        # consumers (normally portals) by marking
 | 
					                        # consumers (normally portals) by marking
 | 
				
			||||||
                        # the channel as errored
 | 
					                        # the channel as errored
 | 
				
			||||||
 | 
					                        log.exception(
 | 
				
			||||||
 | 
					                            f'Unhandled IPC msg:\n\n'
 | 
				
			||||||
 | 
					                            f'{msg}\n'
 | 
				
			||||||
 | 
					                        )
 | 
				
			||||||
                        # assert chan.uid
 | 
					                        # assert chan.uid
 | 
				
			||||||
                        chan._exc: Exception = unpack_error(
 | 
					                        chan._exc: Exception = unpack_error(
 | 
				
			||||||
                            msg,
 | 
					                            msg,
 | 
				
			||||||
| 
						 | 
					@ -1062,17 +1045,6 @@ async def process_messages(
 | 
				
			||||||
                        )
 | 
					                        )
 | 
				
			||||||
                        raise chan._exc
 | 
					                        raise chan._exc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # unknown/invalid msg type?
 | 
					 | 
				
			||||||
                    case _:
 | 
					 | 
				
			||||||
                        codec: MsgCodec = current_codec()
 | 
					 | 
				
			||||||
                        message: str = (
 | 
					 | 
				
			||||||
                            f'Unhandled IPC msg for codec?\n\n'
 | 
					 | 
				
			||||||
                            f'|_{codec}\n\n'
 | 
					 | 
				
			||||||
                            f'{msg}\n'
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
                        log.exception(message)
 | 
					 | 
				
			||||||
                        raise RuntimeError(message)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                log.runtime(
 | 
					                log.runtime(
 | 
				
			||||||
                    'Waiting on next IPC msg from\n'
 | 
					                    'Waiting on next IPC msg from\n'
 | 
				
			||||||
                    f'peer: {chan.uid}\n'
 | 
					                    f'peer: {chan.uid}\n'
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -513,7 +513,7 @@ async def trio_proc(
 | 
				
			||||||
        # })
 | 
					        # })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # track subactor in current nursery
 | 
					        # track subactor in current nursery
 | 
				
			||||||
        curr_actor: Actor = current_actor()
 | 
					        curr_actor = current_actor()
 | 
				
			||||||
        curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
 | 
					        curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # resume caller at next checkpoint now that child is up
 | 
					        # resume caller at next checkpoint now that child is up
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -44,7 +44,6 @@ from .trionics import (
 | 
				
			||||||
    BroadcastReceiver,
 | 
					    BroadcastReceiver,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from tractor.msg import (
 | 
					from tractor.msg import (
 | 
				
			||||||
    Return,
 | 
					 | 
				
			||||||
    Stop,
 | 
					    Stop,
 | 
				
			||||||
    Yield,
 | 
					    Yield,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
| 
						 | 
					@ -83,7 +82,7 @@ class MsgStream(trio.abc.Channel):
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        ctx: Context,  # typing: ignore # noqa
 | 
					        ctx: Context,  # typing: ignore # noqa
 | 
				
			||||||
        rx_chan: trio.MemoryReceiveChannel,
 | 
					        rx_chan: trio.MemoryReceiveChannel,
 | 
				
			||||||
        _broadcaster: BroadcastReceiver|None = None,
 | 
					        _broadcaster: BroadcastReceiver | None = None,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> None:
 | 
				
			||||||
        self._ctx = ctx
 | 
					        self._ctx = ctx
 | 
				
			||||||
| 
						 | 
					@ -97,26 +96,36 @@ class MsgStream(trio.abc.Channel):
 | 
				
			||||||
    # delegate directly to underlying mem channel
 | 
					    # delegate directly to underlying mem channel
 | 
				
			||||||
    def receive_nowait(
 | 
					    def receive_nowait(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        allow_msgs: list[str] = Yield,
 | 
					        allow_msg_keys: list[str] = ['yield'],
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
 | 
					        # msg: dict = self._rx_chan.receive_nowait()
 | 
				
			||||||
        msg: Yield|Stop = self._rx_chan.receive_nowait()
 | 
					        msg: Yield|Stop = self._rx_chan.receive_nowait()
 | 
				
			||||||
        # TODO: replace msg equiv of this or does the `.pld`
 | 
					        for (
 | 
				
			||||||
        # interface read already satisfy it? I think so, yes?
 | 
					            i,
 | 
				
			||||||
        try:
 | 
					            key,
 | 
				
			||||||
            return msg.pld
 | 
					        ) in enumerate(allow_msg_keys):
 | 
				
			||||||
        except AttributeError as attrerr:
 | 
					            try:
 | 
				
			||||||
            _raise_from_no_key_in_msg(
 | 
					                # return msg[key]
 | 
				
			||||||
                ctx=self._ctx,
 | 
					                return msg.pld
 | 
				
			||||||
                msg=msg,
 | 
					            # except KeyError as kerr:
 | 
				
			||||||
                src_err=attrerr,
 | 
					            except AttributeError as attrerr:
 | 
				
			||||||
                log=log,
 | 
					                if i < (len(allow_msg_keys) - 1):
 | 
				
			||||||
                stream=self,
 | 
					                    continue
 | 
				
			||||||
            )
 | 
					
 | 
				
			||||||
 | 
					                _raise_from_no_key_in_msg(
 | 
				
			||||||
 | 
					                    ctx=self._ctx,
 | 
				
			||||||
 | 
					                    msg=msg,
 | 
				
			||||||
 | 
					                    # src_err=kerr,
 | 
				
			||||||
 | 
					                    src_err=attrerr,
 | 
				
			||||||
 | 
					                    log=log,
 | 
				
			||||||
 | 
					                    expect_key=key,
 | 
				
			||||||
 | 
					                    stream=self,
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def receive(
 | 
					    async def receive(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        hide_tb: bool = False,
 | 
					        hide_tb: bool = True,
 | 
				
			||||||
    ):
 | 
					    ):
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        Receive a single msg from the IPC transport, the next in
 | 
					        Receive a single msg from the IPC transport, the next in
 | 
				
			||||||
| 
						 | 
					@ -148,9 +157,10 @@ class MsgStream(trio.abc.Channel):
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                msg: Yield = await self._rx_chan.receive()
 | 
					                msg: Yield = await self._rx_chan.receive()
 | 
				
			||||||
 | 
					                # return msg['yield']
 | 
				
			||||||
                return msg.pld
 | 
					                return msg.pld
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # TODO: implement with match: instead?
 | 
					            # except KeyError as kerr:
 | 
				
			||||||
            except AttributeError as attrerr:
 | 
					            except AttributeError as attrerr:
 | 
				
			||||||
                # src_err = kerr
 | 
					                # src_err = kerr
 | 
				
			||||||
                src_err = attrerr
 | 
					                src_err = attrerr
 | 
				
			||||||
| 
						 | 
					@ -160,8 +170,10 @@ class MsgStream(trio.abc.Channel):
 | 
				
			||||||
                _raise_from_no_key_in_msg(
 | 
					                _raise_from_no_key_in_msg(
 | 
				
			||||||
                    ctx=self._ctx,
 | 
					                    ctx=self._ctx,
 | 
				
			||||||
                    msg=msg,
 | 
					                    msg=msg,
 | 
				
			||||||
 | 
					                    # src_err=kerr,
 | 
				
			||||||
                    src_err=attrerr,
 | 
					                    src_err=attrerr,
 | 
				
			||||||
                    log=log,
 | 
					                    log=log,
 | 
				
			||||||
 | 
					                    expect_key='yield',
 | 
				
			||||||
                    stream=self,
 | 
					                    stream=self,
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -292,7 +304,7 @@ class MsgStream(trio.abc.Channel):
 | 
				
			||||||
        while not drained:
 | 
					        while not drained:
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                maybe_final_msg = self.receive_nowait(
 | 
					                maybe_final_msg = self.receive_nowait(
 | 
				
			||||||
                    allow_msgs=[Yield, Return],
 | 
					                    allow_msg_keys=['yield', 'return'],
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
                if maybe_final_msg:
 | 
					                if maybe_final_msg:
 | 
				
			||||||
                    log.debug(
 | 
					                    log.debug(
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -119,11 +119,11 @@ class ActorNursery:
 | 
				
			||||||
        name: str,
 | 
					        name: str,
 | 
				
			||||||
        *,
 | 
					        *,
 | 
				
			||||||
        bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
 | 
					        bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
 | 
				
			||||||
        rpc_module_paths: list[str]|None = None,
 | 
					        rpc_module_paths: list[str] | None = None,
 | 
				
			||||||
        enable_modules: list[str]|None = None,
 | 
					        enable_modules: list[str] | None = None,
 | 
				
			||||||
        loglevel: str|None = None,  # set log level per subactor
 | 
					        loglevel: str | None = None,  # set log level per subactor
 | 
				
			||||||
        nursery: trio.Nursery|None = None,
 | 
					        nursery: trio.Nursery | None = None,
 | 
				
			||||||
        debug_mode: bool|None = None,
 | 
					        debug_mode: bool | None = None,
 | 
				
			||||||
        infect_asyncio: bool = False,
 | 
					        infect_asyncio: bool = False,
 | 
				
			||||||
    ) -> Portal:
 | 
					    ) -> Portal:
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -420,7 +420,7 @@ def mk_codec(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# instance of the default `msgspec.msgpack` codec settings, i.e.
 | 
					# instance of the default `msgspec.msgpack` codec settings, i.e.
 | 
				
			||||||
# no custom structs, hooks or other special types.
 | 
					# no custom structs, hooks or other special types.
 | 
				
			||||||
_def_msgspec_codec: MsgCodec = mk_codec(ipc_pld_spec=Any)
 | 
					_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# The built-in IPC `Msg` spec.
 | 
					# The built-in IPC `Msg` spec.
 | 
				
			||||||
# Our composing "shuttle" protocol which allows `tractor`-app code
 | 
					# Our composing "shuttle" protocol which allows `tractor`-app code
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -451,8 +451,7 @@ def from_dict_msg(
 | 
				
			||||||
    dict_msg: dict,
 | 
					    dict_msg: dict,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    msgT: MsgType|None = None,
 | 
					    msgT: MsgType|None = None,
 | 
				
			||||||
    tag_field: str = 'msg_type',
 | 
					    tag_field: str = 'msg_type'
 | 
				
			||||||
    use_pretty: bool = False,
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> MsgType:
 | 
					) -> MsgType:
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
| 
						 | 
					@ -469,19 +468,6 @@ def from_dict_msg(
 | 
				
			||||||
    # XXX ensure tag field is removed
 | 
					    # XXX ensure tag field is removed
 | 
				
			||||||
    msgT_name: str = dict_msg.pop(msg_type_tag_field)
 | 
					    msgT_name: str = dict_msg.pop(msg_type_tag_field)
 | 
				
			||||||
    msgT: MsgType = _msg_table[msgT_name]
 | 
					    msgT: MsgType = _msg_table[msgT_name]
 | 
				
			||||||
    if use_pretty:
 | 
					 | 
				
			||||||
        msgT = defstruct(
 | 
					 | 
				
			||||||
            name=msgT_name,
 | 
					 | 
				
			||||||
            fields=[
 | 
					 | 
				
			||||||
                (key, fi.type)
 | 
					 | 
				
			||||||
                for fi, key, _
 | 
					 | 
				
			||||||
                in pretty_struct.iter_fields(msgT)
 | 
					 | 
				
			||||||
            ],
 | 
					 | 
				
			||||||
            bases=(
 | 
					 | 
				
			||||||
                pretty_struct.Struct,
 | 
					 | 
				
			||||||
                msgT,
 | 
					 | 
				
			||||||
            ),
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
    return msgT(**dict_msg)
 | 
					    return msgT(**dict_msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# TODO: should be make a msg version of `ContextCancelled?`
 | 
					# TODO: should be make a msg version of `ContextCancelled?`
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue