Compare commits
	
		
			3 Commits 
		
	
	
		
			67f673bf36
			...
			f2ce4a3469
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						f2ce4a3469 | |
| 
							
							
								 | 
						3aa964315a | |
| 
							
							
								 | 
						f3ca8608d5 | 
| 
						 | 
				
			
			@ -7,7 +7,6 @@ B~)
 | 
			
		|||
'''
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
    _GenericAlias,
 | 
			
		||||
    Type,
 | 
			
		||||
    Union,
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -26,20 +25,23 @@ from msgspec import (
 | 
			
		|||
import pytest
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor.msg import (
 | 
			
		||||
    _def_msgspec_codec,
 | 
			
		||||
    _codec,
 | 
			
		||||
    _ctxvar_MsgCodec,
 | 
			
		||||
 | 
			
		||||
    NamespacePath,
 | 
			
		||||
    MsgCodec,
 | 
			
		||||
    mk_codec,
 | 
			
		||||
    apply_codec,
 | 
			
		||||
    current_msgspec_codec,
 | 
			
		||||
    current_codec,
 | 
			
		||||
)
 | 
			
		||||
from tractor.msg import types
 | 
			
		||||
from tractor.msg import (
 | 
			
		||||
    types,
 | 
			
		||||
)
 | 
			
		||||
from tractor import _state
 | 
			
		||||
from tractor.msg.types import (
 | 
			
		||||
    # PayloadT,
 | 
			
		||||
    Msg,
 | 
			
		||||
    # Started,
 | 
			
		||||
    Started,
 | 
			
		||||
    mk_msg_spec,
 | 
			
		||||
)
 | 
			
		||||
import trio
 | 
			
		||||
| 
						 | 
				
			
			@ -60,56 +62,110 @@ def test_msg_spec_xor_pld_spec():
 | 
			
		|||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: wrap these into `._codec` such that user can just pass
 | 
			
		||||
# a type table of some sort?
 | 
			
		||||
def enc_hook(obj: Any) -> Any:
 | 
			
		||||
    if isinstance(obj, NamespacePath):
 | 
			
		||||
        return str(obj)
 | 
			
		||||
    else:
 | 
			
		||||
        raise NotImplementedError(
 | 
			
		||||
            f'Objects of type {type(obj)} are not supported'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def dec_hook(type: Type, obj: Any) -> Any:
 | 
			
		||||
    print(f'type is: {type}')
 | 
			
		||||
    if type is NamespacePath:
 | 
			
		||||
        return NamespacePath(obj)
 | 
			
		||||
    else:
 | 
			
		||||
        raise NotImplementedError(
 | 
			
		||||
            f'Objects of type {type(obj)} are not supported'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def ex_func(*args):
 | 
			
		||||
    print(f'ex_func({args})')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def mk_custom_codec(
 | 
			
		||||
    ipc_msg_spec: Type[Any] = Any,
 | 
			
		||||
) -> MsgCodec:
 | 
			
		||||
    # apply custom hooks and set a `Decoder` which only
 | 
			
		||||
    # loads `NamespacePath` types.
 | 
			
		||||
    nsp_codec: MsgCodec = mk_codec(
 | 
			
		||||
        ipc_msg_spec=ipc_msg_spec,
 | 
			
		||||
        enc_hook=enc_hook,
 | 
			
		||||
        dec_hook=dec_hook,
 | 
			
		||||
    )
 | 
			
		||||
    pld_spec: Union[Type]|Any,
 | 
			
		||||
 | 
			
		||||
    # TODO: validate `MsgCodec` interface/semantics?
 | 
			
		||||
    # -[ ] simple field tests to ensure caching + reset is workin?
 | 
			
		||||
    # -[ ] custom / changing `.decoder()` calls?
 | 
			
		||||
    #
 | 
			
		||||
    # dec = nsp_codec.decoder(
 | 
			
		||||
    #     types=NamespacePath,
 | 
			
		||||
    # )
 | 
			
		||||
    # assert nsp_codec.dec is dec
 | 
			
		||||
) -> MsgCodec:
 | 
			
		||||
    '''
 | 
			
		||||
    Create custom `msgpack` enc/dec-hooks and set a `Decoder`
 | 
			
		||||
    which only loads `NamespacePath` types.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    uid: tuple[str, str] = tractor.current_actor().uid
 | 
			
		||||
 | 
			
		||||
    # XXX NOTE XXX: despite defining `NamespacePath` as a type
 | 
			
		||||
    # field on our `Msg.pld`, we still need a enc/dec_hook() pair
 | 
			
		||||
    # to cast to/from that type on the wire. See the docs:
 | 
			
		||||
    # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
 | 
			
		||||
 | 
			
		||||
    def enc_nsp(obj: Any) -> Any:
 | 
			
		||||
        match obj:
 | 
			
		||||
            case NamespacePath():
 | 
			
		||||
                print(
 | 
			
		||||
                    f'{uid}: `NamespacePath`-Only ENCODE?\n'
 | 
			
		||||
                    f'type: {type(obj)}\n'
 | 
			
		||||
                    f'obj: {obj}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                return str(obj)
 | 
			
		||||
 | 
			
		||||
        logmsg: str = (
 | 
			
		||||
            f'{uid}: Encoding `{obj}: <{type(obj)}>` not supported'
 | 
			
		||||
            f'type: {type(obj)}\n'
 | 
			
		||||
            f'obj: {obj}\n'
 | 
			
		||||
        )
 | 
			
		||||
        print(logmsg)
 | 
			
		||||
        raise NotImplementedError(logmsg)
 | 
			
		||||
 | 
			
		||||
    def dec_nsp(
 | 
			
		||||
        type: Type,
 | 
			
		||||
        obj: Any,
 | 
			
		||||
 | 
			
		||||
    ) -> Any:
 | 
			
		||||
        print(
 | 
			
		||||
            f'{uid}: CUSTOM DECODE\n'
 | 
			
		||||
            f'input type: {type}\n'
 | 
			
		||||
            f'obj: {obj}\n'
 | 
			
		||||
            f'type(obj): `{type(obj).__class__}`\n'
 | 
			
		||||
        )
 | 
			
		||||
        nsp = None
 | 
			
		||||
 | 
			
		||||
        # This never seems to hit?
 | 
			
		||||
        if isinstance(obj, Msg):
 | 
			
		||||
            print(f'Msg type: {obj}')
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
            type is NamespacePath
 | 
			
		||||
            and isinstance(obj, str)
 | 
			
		||||
            and ':' in obj
 | 
			
		||||
        ):
 | 
			
		||||
            nsp = NamespacePath(obj)
 | 
			
		||||
 | 
			
		||||
        if nsp:
 | 
			
		||||
            print(f'Returning NSP instance: {nsp}')
 | 
			
		||||
            return nsp
 | 
			
		||||
 | 
			
		||||
        logmsg: str = (
 | 
			
		||||
            f'{uid}: Decoding `{obj}: <{type(obj)}>` not supported'
 | 
			
		||||
            f'input type: {type(obj)}\n'
 | 
			
		||||
            f'obj: {obj}\n'
 | 
			
		||||
            f'type(obj): `{type(obj).__class__}`\n'
 | 
			
		||||
        )
 | 
			
		||||
        print(logmsg)
 | 
			
		||||
        raise NotImplementedError(logmsg)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    nsp_codec: MsgCodec = mk_codec(
 | 
			
		||||
        ipc_pld_spec=pld_spec,
 | 
			
		||||
 | 
			
		||||
        # NOTE XXX: the encode hook MUST be used no matter what since
 | 
			
		||||
        # our `NamespacePath` is not any of a `Any` native type nor
 | 
			
		||||
        # a `msgspec.Struct` subtype - so `msgspec` has no way to know
 | 
			
		||||
        # how to encode it unless we provide the custom hook.
 | 
			
		||||
        #
 | 
			
		||||
        # AGAIN that is, regardless of whether we spec an
 | 
			
		||||
        # `Any`-decoded-pld the enc has no knowledge (by default)
 | 
			
		||||
        # how to enc `NamespacePath` (nsp), so we add a custom
 | 
			
		||||
        # hook to do that ALWAYS.
 | 
			
		||||
        enc_hook=enc_nsp,
 | 
			
		||||
 | 
			
		||||
        # XXX NOTE: pretty sure this is mutex with the `type=` to
 | 
			
		||||
        # `Decoder`? so it won't work in tandem with the
 | 
			
		||||
        # `ipc_pld_spec` passed above?
 | 
			
		||||
        dec_hook=dec_nsp,
 | 
			
		||||
    )
 | 
			
		||||
    return nsp_codec
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def send_back_nsp(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
    ctx: Context,
 | 
			
		||||
    expect_debug: bool,
 | 
			
		||||
    use_any_spec: bool,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
| 
						 | 
				
			
			@ -117,28 +173,65 @@ async def send_back_nsp(
 | 
			
		|||
    and ensure we can round trip a func ref with our parent.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
    task_ctx: Context = task.context
 | 
			
		||||
    assert _ctxvar_MsgCodec not in task_ctx
 | 
			
		||||
    # debug mode sanity check
 | 
			
		||||
    assert expect_debug == _state.debug_mode()
 | 
			
		||||
 | 
			
		||||
    nsp_codec: MsgCodec = mk_custom_codec()
 | 
			
		||||
    # task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
 | 
			
		||||
    # TreeVar
 | 
			
		||||
    # curr_codec = _ctxvar_MsgCodec.get_in(task)
 | 
			
		||||
 | 
			
		||||
    # ContextVar
 | 
			
		||||
    # task_ctx: Context = task.context
 | 
			
		||||
    # assert _ctxvar_MsgCodec not in task_ctx
 | 
			
		||||
 | 
			
		||||
    curr_codec = _ctxvar_MsgCodec.get()
 | 
			
		||||
    assert curr_codec is _codec._def_tractor_codec
 | 
			
		||||
 | 
			
		||||
    if use_any_spec:
 | 
			
		||||
        pld_spec = Any
 | 
			
		||||
    else:
 | 
			
		||||
        # NOTE: don't need the |None here since
 | 
			
		||||
        # the parent side will never send `None` like
 | 
			
		||||
        # we do here in the implicit return at the end of this
 | 
			
		||||
        # `@context` body.
 | 
			
		||||
        pld_spec = NamespacePath  # |None
 | 
			
		||||
 | 
			
		||||
    nsp_codec: MsgCodec = mk_custom_codec(
 | 
			
		||||
        pld_spec=pld_spec,
 | 
			
		||||
    )
 | 
			
		||||
    with apply_codec(nsp_codec) as codec:
 | 
			
		||||
        chk_codec_applied(
 | 
			
		||||
            custom_codec=nsp_codec,
 | 
			
		||||
            enter_value=codec,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # ensure roundtripping works locally
 | 
			
		||||
        nsp = NamespacePath.from_ref(ex_func)
 | 
			
		||||
        await ctx.started(nsp)
 | 
			
		||||
        wire_bytes: bytes = nsp_codec.encode(
 | 
			
		||||
            Started(
 | 
			
		||||
                cid=ctx.cid,
 | 
			
		||||
                pld=nsp
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        msg: Started = nsp_codec.decode(wire_bytes)
 | 
			
		||||
        pld = msg.pld
 | 
			
		||||
        assert pld == nsp
 | 
			
		||||
 | 
			
		||||
        await ctx.started(nsp)
 | 
			
		||||
        async with ctx.open_stream() as ipc:
 | 
			
		||||
            async for msg in ipc:
 | 
			
		||||
 | 
			
		||||
                assert msg == f'{__name__}:ex_func'
 | 
			
		||||
                if use_any_spec:
 | 
			
		||||
                    assert msg == f'{__name__}:ex_func'
 | 
			
		||||
 | 
			
		||||
                # TODO: as per below
 | 
			
		||||
                # assert isinstance(msg, NamespacePath)
 | 
			
		||||
                assert isinstance(msg, str)
 | 
			
		||||
                    # TODO: as per below
 | 
			
		||||
                    # assert isinstance(msg, NamespacePath)
 | 
			
		||||
                    assert isinstance(msg, str)
 | 
			
		||||
                else:
 | 
			
		||||
                    assert isinstance(msg, NamespacePath)
 | 
			
		||||
 | 
			
		||||
                await ipc.send(msg)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def chk_codec_applied(
 | 
			
		||||
| 
						 | 
				
			
			@ -146,11 +239,20 @@ def chk_codec_applied(
 | 
			
		|||
    enter_value: MsgCodec,
 | 
			
		||||
) -> MsgCodec:
 | 
			
		||||
 | 
			
		||||
    task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
    task_ctx: Context = task.context
 | 
			
		||||
    # task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
 | 
			
		||||
    assert _ctxvar_MsgCodec in task_ctx
 | 
			
		||||
    curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
 | 
			
		||||
    # TreeVar
 | 
			
		||||
    # curr_codec = _ctxvar_MsgCodec.get_in(task)
 | 
			
		||||
 | 
			
		||||
    # ContextVar
 | 
			
		||||
    # task_ctx: Context = task.context
 | 
			
		||||
    # assert _ctxvar_MsgCodec in task_ctx
 | 
			
		||||
    # curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
 | 
			
		||||
 | 
			
		||||
    # RunVar
 | 
			
		||||
    curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
 | 
			
		||||
    last_read_codec = _ctxvar_MsgCodec.get()
 | 
			
		||||
    assert curr_codec is last_read_codec
 | 
			
		||||
 | 
			
		||||
    assert (
 | 
			
		||||
        # returned from `mk_codec()`
 | 
			
		||||
| 
						 | 
				
			
			@ -163,14 +265,31 @@ def chk_codec_applied(
 | 
			
		|||
        curr_codec is
 | 
			
		||||
 | 
			
		||||
        # public API for all of the above
 | 
			
		||||
        current_msgspec_codec()
 | 
			
		||||
        current_codec()
 | 
			
		||||
 | 
			
		||||
        # the default `msgspec` settings
 | 
			
		||||
        is not _def_msgspec_codec
 | 
			
		||||
        is not _codec._def_msgspec_codec
 | 
			
		||||
        is not _codec._def_tractor_codec
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_codec_hooks_mod():
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'ipc_pld_spec',
 | 
			
		||||
    [
 | 
			
		||||
        # _codec._def_msgspec_codec,
 | 
			
		||||
        Any,
 | 
			
		||||
        # _codec._def_tractor_codec,
 | 
			
		||||
        NamespacePath|None,
 | 
			
		||||
    ],
 | 
			
		||||
    ids=[
 | 
			
		||||
        'any_type',
 | 
			
		||||
        'nsp_type',
 | 
			
		||||
    ]
 | 
			
		||||
)
 | 
			
		||||
def test_codec_hooks_mod(
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    ipc_pld_spec: Union[Type]|Any,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Audit the `.msg.MsgCodec` override apis details given our impl
 | 
			
		||||
    uses `contextvars` to accomplish per `trio` task codec
 | 
			
		||||
| 
						 | 
				
			
			@ -178,11 +297,21 @@ def test_codec_hooks_mod():
 | 
			
		|||
 | 
			
		||||
    '''
 | 
			
		||||
    async def main():
 | 
			
		||||
        task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
        task_ctx: Context = task.context
 | 
			
		||||
        assert _ctxvar_MsgCodec not in task_ctx
 | 
			
		||||
 | 
			
		||||
        async with tractor.open_nursery() as an:
 | 
			
		||||
        # task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
 | 
			
		||||
        # ContextVar
 | 
			
		||||
        # task_ctx: Context = task.context
 | 
			
		||||
        # assert _ctxvar_MsgCodec not in task_ctx
 | 
			
		||||
 | 
			
		||||
        # TreeVar
 | 
			
		||||
        # def_codec: MsgCodec = _ctxvar_MsgCodec.get_in(task)
 | 
			
		||||
        def_codec = _ctxvar_MsgCodec.get()
 | 
			
		||||
        assert def_codec is _codec._def_tractor_codec
 | 
			
		||||
 | 
			
		||||
        async with tractor.open_nursery(
 | 
			
		||||
            debug_mode=debug_mode,
 | 
			
		||||
        ) as an:
 | 
			
		||||
            p: tractor.Portal = await an.start_actor(
 | 
			
		||||
                'sub',
 | 
			
		||||
                enable_modules=[__name__],
 | 
			
		||||
| 
						 | 
				
			
			@ -192,7 +321,9 @@ def test_codec_hooks_mod():
 | 
			
		|||
            # - codec not modified -> decode nsp as `str`
 | 
			
		||||
            # - codec modified with hooks -> decode nsp as
 | 
			
		||||
            #   `NamespacePath`
 | 
			
		||||
            nsp_codec: MsgCodec = mk_custom_codec()
 | 
			
		||||
            nsp_codec: MsgCodec = mk_custom_codec(
 | 
			
		||||
                pld_spec=ipc_pld_spec,
 | 
			
		||||
            )
 | 
			
		||||
            with apply_codec(nsp_codec) as codec:
 | 
			
		||||
                chk_codec_applied(
 | 
			
		||||
                    custom_codec=nsp_codec,
 | 
			
		||||
| 
						 | 
				
			
			@ -202,9 +333,22 @@ def test_codec_hooks_mod():
 | 
			
		|||
                async with (
 | 
			
		||||
                    p.open_context(
 | 
			
		||||
                        send_back_nsp,
 | 
			
		||||
                        # TODO: send the original nsp here and
 | 
			
		||||
                        # test with `limit_msg_spec()` above?
 | 
			
		||||
                        expect_debug=debug_mode,
 | 
			
		||||
                        use_any_spec=(ipc_pld_spec==Any),
 | 
			
		||||
 | 
			
		||||
                    ) as (ctx, first),
 | 
			
		||||
                    ctx.open_stream() as ipc,
 | 
			
		||||
                ):
 | 
			
		||||
                    if ipc_pld_spec is NamespacePath:
 | 
			
		||||
                        assert isinstance(first, NamespacePath)
 | 
			
		||||
 | 
			
		||||
                    print(
 | 
			
		||||
                        'root: ENTERING CONTEXT BLOCK\n'
 | 
			
		||||
                        f'type(first): {type(first)}\n'
 | 
			
		||||
                        f'first: {first}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    # ensure codec is still applied across
 | 
			
		||||
                    # `tractor.Context` + its embedded nursery.
 | 
			
		||||
                    chk_codec_applied(
 | 
			
		||||
| 
						 | 
				
			
			@ -212,23 +356,46 @@ def test_codec_hooks_mod():
 | 
			
		|||
                        enter_value=codec,
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                    assert first == f'{__name__}:ex_func'
 | 
			
		||||
                    first_nsp = NamespacePath(first)
 | 
			
		||||
 | 
			
		||||
                    # ensure roundtripping works
 | 
			
		||||
                    wire_bytes: bytes = nsp_codec.encode(
 | 
			
		||||
                        Started(
 | 
			
		||||
                            cid=ctx.cid,
 | 
			
		||||
                            pld=first_nsp
 | 
			
		||||
                        )
 | 
			
		||||
                    )
 | 
			
		||||
                    msg: Started = nsp_codec.decode(wire_bytes)
 | 
			
		||||
                    pld = msg.pld
 | 
			
		||||
                    assert  pld == first_nsp
 | 
			
		||||
 | 
			
		||||
                    # try a manual decode of the started msg+pld
 | 
			
		||||
 | 
			
		||||
                    # TODO: actually get the decoder loading
 | 
			
		||||
                    # to native once we spec our SCIPP msgspec
 | 
			
		||||
                    # (structurred-conc-inter-proc-protocol)
 | 
			
		||||
                    # implemented as per,
 | 
			
		||||
                    # https://github.com/goodboy/tractor/issues/36
 | 
			
		||||
                    #
 | 
			
		||||
                    # assert isinstance(first, NamespacePath)
 | 
			
		||||
                    assert isinstance(first, str)
 | 
			
		||||
                    if ipc_pld_spec is NamespacePath:
 | 
			
		||||
                        assert isinstance(first, NamespacePath)
 | 
			
		||||
 | 
			
		||||
                    # `Any`-payload-spec case
 | 
			
		||||
                    else:
 | 
			
		||||
                        assert isinstance(first, str)
 | 
			
		||||
                        assert first == f'{__name__}:ex_func'
 | 
			
		||||
 | 
			
		||||
                    await ipc.send(first)
 | 
			
		||||
 | 
			
		||||
                    with trio.move_on_after(1):
 | 
			
		||||
                    with trio.move_on_after(.6):
 | 
			
		||||
                        async for msg in ipc:
 | 
			
		||||
                            print(msg)
 | 
			
		||||
 | 
			
		||||
                            # TODO: as per above
 | 
			
		||||
                            # assert isinstance(msg, NamespacePath)
 | 
			
		||||
                            assert isinstance(msg, str)
 | 
			
		||||
                            await ipc.send(msg)
 | 
			
		||||
                            await trio.sleep(0.1)
 | 
			
		||||
 | 
			
		||||
            await p.cancel_actor()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -6,6 +6,7 @@ sync-opening a ``tractor.Context`` beforehand.
 | 
			
		|||
 | 
			
		||||
'''
 | 
			
		||||
from itertools import count
 | 
			
		||||
import math
 | 
			
		||||
import platform
 | 
			
		||||
from pprint import pformat
 | 
			
		||||
from typing import (
 | 
			
		||||
| 
						 | 
				
			
			@ -845,7 +846,10 @@ async def keep_sending_from_callee(
 | 
			
		|||
        ('caller', 1, never_open_stream),
 | 
			
		||||
        ('callee', 0, keep_sending_from_callee),
 | 
			
		||||
    ],
 | 
			
		||||
    ids='overrun_condition={}'.format,
 | 
			
		||||
    ids=[
 | 
			
		||||
         ('caller_1buf_never_open_stream'),
 | 
			
		||||
         ('callee_0buf_keep_sending_from_callee'),
 | 
			
		||||
    ]
 | 
			
		||||
)
 | 
			
		||||
def test_one_end_stream_not_opened(
 | 
			
		||||
    overrun_by: tuple[str, int, Callable],
 | 
			
		||||
| 
						 | 
				
			
			@ -869,29 +873,30 @@ def test_one_end_stream_not_opened(
 | 
			
		|||
                enable_modules=[__name__],
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            async with portal.open_context(
 | 
			
		||||
                entrypoint,
 | 
			
		||||
            ) as (ctx, sent):
 | 
			
		||||
                assert sent is None
 | 
			
		||||
            with trio.fail_after(0.8):
 | 
			
		||||
                async with portal.open_context(
 | 
			
		||||
                    entrypoint,
 | 
			
		||||
                ) as (ctx, sent):
 | 
			
		||||
                    assert sent is None
 | 
			
		||||
 | 
			
		||||
                if 'caller' in overrunner:
 | 
			
		||||
                    if 'caller' in overrunner:
 | 
			
		||||
 | 
			
		||||
                    async with ctx.open_stream() as stream:
 | 
			
		||||
                        async with ctx.open_stream() as stream:
 | 
			
		||||
 | 
			
		||||
                        # itersend +1 msg more then the buffer size
 | 
			
		||||
                        # to cause the most basic overrun.
 | 
			
		||||
                        for i in range(buf_size):
 | 
			
		||||
                            print(f'sending {i}')
 | 
			
		||||
                            await stream.send(i)
 | 
			
		||||
                            # itersend +1 msg more then the buffer size
 | 
			
		||||
                            # to cause the most basic overrun.
 | 
			
		||||
                            for i in range(buf_size):
 | 
			
		||||
                                print(f'sending {i}')
 | 
			
		||||
                                await stream.send(i)
 | 
			
		||||
 | 
			
		||||
                        else:
 | 
			
		||||
                            # expect overrun error to be relayed back
 | 
			
		||||
                            # and this sleep interrupted
 | 
			
		||||
                            await trio.sleep_forever()
 | 
			
		||||
                            else:
 | 
			
		||||
                                # expect overrun error to be relayed back
 | 
			
		||||
                                # and this sleep interrupted
 | 
			
		||||
                                await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
                else:
 | 
			
		||||
                    # callee overruns caller case so we do nothing here
 | 
			
		||||
                    await trio.sleep_forever()
 | 
			
		||||
                    else:
 | 
			
		||||
                        # callee overruns caller case so we do nothing here
 | 
			
		||||
                        await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1055,54 +1060,63 @@ def test_maybe_allow_overruns_stream(
 | 
			
		|||
                loglevel=loglevel,
 | 
			
		||||
                debug_mode=debug_mode,
 | 
			
		||||
            )
 | 
			
		||||
            seq = list(range(10))
 | 
			
		||||
            async with portal.open_context(
 | 
			
		||||
                echo_back_sequence,
 | 
			
		||||
                seq=seq,
 | 
			
		||||
                wait_for_cancel=cancel_ctx,
 | 
			
		||||
                be_slow=(slow_side == 'child'),
 | 
			
		||||
                allow_overruns_side=allow_overruns_side,
 | 
			
		||||
 | 
			
		||||
            ) as (ctx, sent):
 | 
			
		||||
                assert sent is None
 | 
			
		||||
            # stream-sequence batch info with send delay to determine
 | 
			
		||||
            # approx timeout determining whether test has hung.
 | 
			
		||||
            total_batches: int = 2
 | 
			
		||||
            num_items: int = 10
 | 
			
		||||
            seq = list(range(num_items))
 | 
			
		||||
            parent_send_delay: float = 0.16
 | 
			
		||||
            timeout: float = math.ceil(
 | 
			
		||||
                total_batches * num_items * parent_send_delay
 | 
			
		||||
            )
 | 
			
		||||
            with trio.fail_after(timeout):
 | 
			
		||||
                async with portal.open_context(
 | 
			
		||||
                    echo_back_sequence,
 | 
			
		||||
                    seq=seq,
 | 
			
		||||
                    wait_for_cancel=cancel_ctx,
 | 
			
		||||
                    be_slow=(slow_side == 'child'),
 | 
			
		||||
                    allow_overruns_side=allow_overruns_side,
 | 
			
		||||
 | 
			
		||||
                async with ctx.open_stream(
 | 
			
		||||
                    msg_buffer_size=1 if slow_side == 'parent' else None,
 | 
			
		||||
                    allow_overruns=(allow_overruns_side in {'parent', 'both'}),
 | 
			
		||||
                ) as stream:
 | 
			
		||||
                ) as (ctx, sent):
 | 
			
		||||
                    assert sent is None
 | 
			
		||||
 | 
			
		||||
                    total_batches: int = 2
 | 
			
		||||
                    for _ in range(total_batches):
 | 
			
		||||
                        for msg in seq:
 | 
			
		||||
                            # print(f'root tx {msg}')
 | 
			
		||||
                            await stream.send(msg)
 | 
			
		||||
                            if slow_side == 'parent':
 | 
			
		||||
                                # NOTE: we make the parent slightly
 | 
			
		||||
                                # slower, when it is slow, to make sure
 | 
			
		||||
                                # that in the overruns everywhere case
 | 
			
		||||
                                await trio.sleep(0.16)
 | 
			
		||||
                    async with ctx.open_stream(
 | 
			
		||||
                        msg_buffer_size=1 if slow_side == 'parent' else None,
 | 
			
		||||
                        allow_overruns=(allow_overruns_side in {'parent', 'both'}),
 | 
			
		||||
                    ) as stream:
 | 
			
		||||
 | 
			
		||||
                        batch = []
 | 
			
		||||
                        async for msg in stream:
 | 
			
		||||
                            print(f'root rx {msg}')
 | 
			
		||||
                            batch.append(msg)
 | 
			
		||||
                            if batch == seq:
 | 
			
		||||
                                break
 | 
			
		||||
                        for _ in range(total_batches):
 | 
			
		||||
                            for msg in seq:
 | 
			
		||||
                                # print(f'root tx {msg}')
 | 
			
		||||
                                await stream.send(msg)
 | 
			
		||||
                                if slow_side == 'parent':
 | 
			
		||||
                                    # NOTE: we make the parent slightly
 | 
			
		||||
                                    # slower, when it is slow, to make sure
 | 
			
		||||
                                    # that in the overruns everywhere case
 | 
			
		||||
                                    await trio.sleep(parent_send_delay)
 | 
			
		||||
 | 
			
		||||
                            batch = []
 | 
			
		||||
                            async for msg in stream:
 | 
			
		||||
                                print(f'root rx {msg}')
 | 
			
		||||
                                batch.append(msg)
 | 
			
		||||
                                if batch == seq:
 | 
			
		||||
                                    break
 | 
			
		||||
 | 
			
		||||
                    if cancel_ctx:
 | 
			
		||||
                        # cancel the remote task
 | 
			
		||||
                        print('Requesting `ctx.cancel()` in parent!')
 | 
			
		||||
                        await ctx.cancel()
 | 
			
		||||
 | 
			
		||||
                res: str|ContextCancelled = await ctx.result()
 | 
			
		||||
 | 
			
		||||
                if cancel_ctx:
 | 
			
		||||
                    # cancel the remote task
 | 
			
		||||
                    print('Requesting `ctx.cancel()` in parent!')
 | 
			
		||||
                    await ctx.cancel()
 | 
			
		||||
                    assert isinstance(res, ContextCancelled)
 | 
			
		||||
                    assert tuple(res.canceller) == current_actor().uid
 | 
			
		||||
 | 
			
		||||
            res: str|ContextCancelled = await ctx.result()
 | 
			
		||||
 | 
			
		||||
            if cancel_ctx:
 | 
			
		||||
                assert isinstance(res, ContextCancelled)
 | 
			
		||||
                assert tuple(res.canceller) == current_actor().uid
 | 
			
		||||
 | 
			
		||||
            else:
 | 
			
		||||
                print(f'RX ROOT SIDE RESULT {res}')
 | 
			
		||||
                assert res == 'yo'
 | 
			
		||||
                else:
 | 
			
		||||
                    print(f'RX ROOT SIDE RESULT {res}')
 | 
			
		||||
                    assert res == 'yo'
 | 
			
		||||
 | 
			
		||||
            # cancel the daemon
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -31,25 +31,24 @@ from ._codec import (
 | 
			
		|||
    apply_codec as apply_codec,
 | 
			
		||||
    mk_codec as mk_codec,
 | 
			
		||||
    MsgCodec as MsgCodec,
 | 
			
		||||
    current_msgspec_codec as current_msgspec_codec,
 | 
			
		||||
    current_codec as current_codec,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from .types import (
 | 
			
		||||
    Msg as Msg,
 | 
			
		||||
 | 
			
		||||
    Start as Start,  # with pld
 | 
			
		||||
    FuncSpec as FuncSpec,
 | 
			
		||||
    Aid as Aid,
 | 
			
		||||
    SpawnSpec as SpawnSpec,
 | 
			
		||||
 | 
			
		||||
    StartAck as StartAck, # with pld
 | 
			
		||||
    IpcCtxSpec as IpcCtxSpec,
 | 
			
		||||
    Start as Start,
 | 
			
		||||
    StartAck as StartAck,
 | 
			
		||||
 | 
			
		||||
    Started as Started,
 | 
			
		||||
    Yield as Yield,
 | 
			
		||||
    Stop as Stop,
 | 
			
		||||
    Return as Return,
 | 
			
		||||
 | 
			
		||||
    Error as Error,  # with pld
 | 
			
		||||
    ErrorData as ErrorData,
 | 
			
		||||
    Error as Error,
 | 
			
		||||
 | 
			
		||||
    # full msg spec set
 | 
			
		||||
    __spec__ as __spec__,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -30,13 +30,13 @@ ToDo: backends we prolly should offer:
 | 
			
		|||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
from contextvars import (
 | 
			
		||||
    ContextVar,
 | 
			
		||||
    Token,
 | 
			
		||||
)
 | 
			
		||||
from contextlib import (
 | 
			
		||||
    contextmanager as cm,
 | 
			
		||||
)
 | 
			
		||||
# from contextvars import (
 | 
			
		||||
#     ContextVar,
 | 
			
		||||
#     Token,
 | 
			
		||||
# )
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
    Callable,
 | 
			
		||||
| 
						 | 
				
			
			@ -47,6 +47,12 @@ from types import ModuleType
 | 
			
		|||
 | 
			
		||||
import msgspec
 | 
			
		||||
from msgspec import msgpack
 | 
			
		||||
from trio.lowlevel import (
 | 
			
		||||
    RunVar,
 | 
			
		||||
    RunVarToken,
 | 
			
		||||
)
 | 
			
		||||
# TODO: see notes below from @mikenerone..
 | 
			
		||||
# from tricycle import TreeVar
 | 
			
		||||
 | 
			
		||||
from tractor.msg.pretty_struct import Struct
 | 
			
		||||
from tractor.msg.types import (
 | 
			
		||||
| 
						 | 
				
			
			@ -72,6 +78,9 @@ class MsgCodec(Struct):
 | 
			
		|||
    '''
 | 
			
		||||
    A IPC msg interchange format lib's encoder + decoder pair.
 | 
			
		||||
 | 
			
		||||
    Pretty much nothing more then delegation to underlying
 | 
			
		||||
    `msgspec.<interchange-protocol>.Encoder/Decoder`s for now.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    _enc: msgpack.Encoder
 | 
			
		||||
    _dec: msgpack.Decoder
 | 
			
		||||
| 
						 | 
				
			
			@ -86,11 +95,6 @@ class MsgCodec(Struct):
 | 
			
		|||
 | 
			
		||||
    lib: ModuleType = msgspec
 | 
			
		||||
 | 
			
		||||
    # ad-hoc type extensions
 | 
			
		||||
    # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
 | 
			
		||||
    enc_hook: Callable[[Any], Any]|None = None  # coder
 | 
			
		||||
    dec_hook: Callable[[type, Any], Any]|None = None # decoder
 | 
			
		||||
 | 
			
		||||
    # TODO: a sub-decoder system as well?
 | 
			
		||||
    # payload_msg_specs: Union[Type[Struct]] = Any
 | 
			
		||||
    # see related comments in `.msg.types`
 | 
			
		||||
| 
						 | 
				
			
			@ -304,7 +308,8 @@ def mk_codec(
 | 
			
		|||
 | 
			
		||||
    libname: str = 'msgspec',
 | 
			
		||||
 | 
			
		||||
    # proxy as `Struct(**kwargs)`
 | 
			
		||||
    # proxy as `Struct(**kwargs)` for ad-hoc type extensions
 | 
			
		||||
    # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
 | 
			
		||||
    # ------ - ------
 | 
			
		||||
    dec_hook: Callable|None = None,
 | 
			
		||||
    enc_hook: Callable|None = None,
 | 
			
		||||
| 
						 | 
				
			
			@ -389,14 +394,52 @@ def mk_codec(
 | 
			
		|||
# no custom structs, hooks or other special types.
 | 
			
		||||
_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
 | 
			
		||||
 | 
			
		||||
# NOTE: provides for per-`trio.Task` specificity of the
 | 
			
		||||
# The built-in IPC `Msg` spec.
 | 
			
		||||
# Our composing "shuttle" protocol which allows `tractor`-app code
 | 
			
		||||
# to use any `msgspec` supported type as the `Msg.pld` payload,
 | 
			
		||||
# https://jcristharif.com/msgspec/supported-types.html
 | 
			
		||||
#
 | 
			
		||||
_def_tractor_codec: MsgCodec = mk_codec(
 | 
			
		||||
    ipc_pld_spec=Any,
 | 
			
		||||
)
 | 
			
		||||
# TODO: IDEALLY provides for per-`trio.Task` specificity of the
 | 
			
		||||
# IPC msging codec used by the transport layer when doing
 | 
			
		||||
# `Channel.send()/.recv()` of wire data.
 | 
			
		||||
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
 | 
			
		||||
 | 
			
		||||
# ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!?
 | 
			
		||||
# _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
 | 
			
		||||
 | 
			
		||||
# TreeVar-TODO: DIDN'T WORK, kept resetting in every new embedded nursery
 | 
			
		||||
# even though it's supposed to inherit from a parent context ???
 | 
			
		||||
#
 | 
			
		||||
# _ctxvar_MsgCodec: TreeVar[MsgCodec] = TreeVar(
 | 
			
		||||
#
 | 
			
		||||
# ^-NOTE-^: for this to work see the mods by @mikenerone from `trio` gitter:
 | 
			
		||||
#
 | 
			
		||||
# 22:02:54 <mikenerone> even for regular contextvars, all you have to do is:
 | 
			
		||||
#    `task: Task = trio.lowlevel.current_task()`
 | 
			
		||||
#    `task.parent_nursery.parent_task.context.run(my_ctx_var.set, new_value)`
 | 
			
		||||
#
 | 
			
		||||
# From a comment in his prop code he couldn't share outright:
 | 
			
		||||
# 1. For every TreeVar set in the current task (which covers what
 | 
			
		||||
#    we need from SynchronizerFacade), walk up the tree until the
 | 
			
		||||
#    root or finding one where the TreeVar is already set, setting
 | 
			
		||||
#    it in all of the contexts along the way.
 | 
			
		||||
# 2. For each of those, we also forcibly set the values that are
 | 
			
		||||
#    pending for child nurseries that have not yet accessed the
 | 
			
		||||
#    TreeVar.
 | 
			
		||||
# 3. We similarly set the pending values for the child nurseries
 | 
			
		||||
#    of the *current* task.
 | 
			
		||||
#
 | 
			
		||||
 | 
			
		||||
# TODO: STOP USING THIS, since it's basically a global and won't
 | 
			
		||||
# allow sub-IPC-ctxs to limit the msg-spec however desired..
 | 
			
		||||
_ctxvar_MsgCodec: MsgCodec = RunVar(
 | 
			
		||||
    'msgspec_codec',
 | 
			
		||||
 | 
			
		||||
    # TODO: move this to our new `Msg`-spec!
 | 
			
		||||
    default=_def_msgspec_codec,
 | 
			
		||||
    # default=_def_tractor_codec,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -410,15 +453,36 @@ def apply_codec(
 | 
			
		|||
    runtime context such that all IPC msgs are processed
 | 
			
		||||
    with it for that task.
 | 
			
		||||
 | 
			
		||||
    Uses a `tricycle.TreeVar` to ensure the scope of the codec
 | 
			
		||||
    matches the `@cm` block and DOES NOT change to the original
 | 
			
		||||
    (default) value in new tasks (as it does for `ContextVar`).
 | 
			
		||||
 | 
			
		||||
    See the docs:
 | 
			
		||||
    - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
 | 
			
		||||
    - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    token: Token = _ctxvar_MsgCodec.set(codec)
 | 
			
		||||
    orig: MsgCodec = _ctxvar_MsgCodec.get()
 | 
			
		||||
    assert orig is not codec
 | 
			
		||||
    token: RunVarToken = _ctxvar_MsgCodec.set(codec)
 | 
			
		||||
 | 
			
		||||
    # TODO: for TreeVar approach, see docs for @cm `.being()` API:
 | 
			
		||||
    # https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
 | 
			
		||||
    # try:
 | 
			
		||||
    #     with _ctxvar_MsgCodec.being(codec):
 | 
			
		||||
    #         new = _ctxvar_MsgCodec.get()
 | 
			
		||||
    #         assert new is codec
 | 
			
		||||
    #         yield codec
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        yield _ctxvar_MsgCodec.get()
 | 
			
		||||
    finally:
 | 
			
		||||
        _ctxvar_MsgCodec.reset(token)
 | 
			
		||||
 | 
			
		||||
    assert _ctxvar_MsgCodec.get() is orig
 | 
			
		||||
 | 
			
		||||
def current_msgspec_codec() -> MsgCodec:
 | 
			
		||||
 | 
			
		||||
def current_codec() -> MsgCodec:
 | 
			
		||||
    '''
 | 
			
		||||
    Return the current `trio.Task.context`'s value
 | 
			
		||||
    for `msgspec_codec` used by `Channel.send/.recv()`
 | 
			
		||||
| 
						 | 
				
			
			@ -449,5 +513,6 @@ def limit_msg_spec(
 | 
			
		|||
        payload_types=payload_types,
 | 
			
		||||
        **codec_kwargs,
 | 
			
		||||
    )
 | 
			
		||||
    with apply_codec(msgspec_codec):
 | 
			
		||||
    with apply_codec(msgspec_codec) as applied_codec:
 | 
			
		||||
        assert applied_codec is msgspec_codec
 | 
			
		||||
        yield msgspec_codec
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -80,6 +80,28 @@ class DiffDump(UserList):
 | 
			
		|||
        return repstr
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def iter_fields(struct: Struct) -> Iterator[
 | 
			
		||||
    tuple[
 | 
			
		||||
        structs.FieldIinfo,
 | 
			
		||||
        str,
 | 
			
		||||
        Any,
 | 
			
		||||
    ]
 | 
			
		||||
]:
 | 
			
		||||
    '''
 | 
			
		||||
    Iterate over all non-@property fields of this struct.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    fi: structs.FieldInfo
 | 
			
		||||
    for fi in structs.fields(struct):
 | 
			
		||||
        key: str = fi.name
 | 
			
		||||
        val: Any = getattr(struct, key)
 | 
			
		||||
        yield (
 | 
			
		||||
            fi,
 | 
			
		||||
            key,
 | 
			
		||||
            val,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Struct(
 | 
			
		||||
    _Struct,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -91,23 +113,6 @@ class Struct(
 | 
			
		|||
    A "human friendlier" (aka repl buddy) struct subtype.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    def _sin_props(self) -> Iterator[
 | 
			
		||||
        tuple[
 | 
			
		||||
            structs.FieldIinfo,
 | 
			
		||||
            str,
 | 
			
		||||
            Any,
 | 
			
		||||
        ]
 | 
			
		||||
    ]:
 | 
			
		||||
        '''
 | 
			
		||||
        Iterate over all non-@property fields of this struct.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        fi: structs.FieldInfo
 | 
			
		||||
        for fi in structs.fields(self):
 | 
			
		||||
            key: str = fi.name
 | 
			
		||||
            val: Any = getattr(self, key)
 | 
			
		||||
            yield fi, key, val
 | 
			
		||||
 | 
			
		||||
    def to_dict(
 | 
			
		||||
        self,
 | 
			
		||||
        include_non_members: bool = True,
 | 
			
		||||
| 
						 | 
				
			
			@ -130,7 +135,7 @@ class Struct(
 | 
			
		|||
        # added as type-defined `@property` methods!
 | 
			
		||||
        sin_props: dict = {}
 | 
			
		||||
        fi: structs.FieldInfo
 | 
			
		||||
        for fi, k, v in self._sin_props():
 | 
			
		||||
        for fi, k, v in iter_fields(self):
 | 
			
		||||
            sin_props[k] = asdict[k]
 | 
			
		||||
 | 
			
		||||
        return sin_props
 | 
			
		||||
| 
						 | 
				
			
			@ -159,7 +164,7 @@ class Struct(
 | 
			
		|||
        fi: structs.FieldInfo
 | 
			
		||||
        k: str
 | 
			
		||||
        v: Any
 | 
			
		||||
        for fi, k, v in self._sin_props():
 | 
			
		||||
        for fi, k, v in iter_fields(self):
 | 
			
		||||
 | 
			
		||||
            # TODO: how can we prefer `Literal['option1',  'option2,
 | 
			
		||||
            # ..]` over .__name__ == `Literal` but still get only the
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -26,6 +26,7 @@ from __future__ import annotations
 | 
			
		|||
import types
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
    Callable,
 | 
			
		||||
    Generic,
 | 
			
		||||
    Literal,
 | 
			
		||||
    Type,
 | 
			
		||||
| 
						 | 
				
			
			@ -37,8 +38,12 @@ from msgspec import (
 | 
			
		|||
    defstruct,
 | 
			
		||||
    # field,
 | 
			
		||||
    Struct,
 | 
			
		||||
    UNSET,
 | 
			
		||||
    UnsetType,
 | 
			
		||||
    # UNSET,
 | 
			
		||||
    # UnsetType,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from tractor.msg import (
 | 
			
		||||
    pretty_struct,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
# type variable for the boxed payload field `.pld`
 | 
			
		||||
| 
						 | 
				
			
			@ -48,11 +53,19 @@ PayloadT = TypeVar('PayloadT')
 | 
			
		|||
class Msg(
 | 
			
		||||
    Struct,
 | 
			
		||||
    Generic[PayloadT],
 | 
			
		||||
 | 
			
		||||
    # https://jcristharif.com/msgspec/structs.html#tagged-unions
 | 
			
		||||
    tag=True,
 | 
			
		||||
    tag_field='msg_type',
 | 
			
		||||
 | 
			
		||||
    # eq=True,
 | 
			
		||||
    # https://jcristharif.com/msgspec/structs.html#field-ordering
 | 
			
		||||
    # kw_only=True,
 | 
			
		||||
 | 
			
		||||
    # https://jcristharif.com/msgspec/structs.html#equality-and-order
 | 
			
		||||
    # order=True,
 | 
			
		||||
 | 
			
		||||
    # https://jcristharif.com/msgspec/structs.html#encoding-decoding-as-arrays
 | 
			
		||||
    # as_array=True,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    The "god" boxing msg type.
 | 
			
		||||
| 
						 | 
				
			
			@ -90,6 +103,53 @@ class Msg(
 | 
			
		|||
    pld: PayloadT
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Aid(
 | 
			
		||||
    Struct,
 | 
			
		||||
    tag=True,
 | 
			
		||||
    tag_field='msg_type',
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Actor-identity msg.
 | 
			
		||||
 | 
			
		||||
    Initial contact exchange enabling an actor "mailbox handshake"
 | 
			
		||||
    delivering the peer identity (and maybe eventually contact)
 | 
			
		||||
    info.
 | 
			
		||||
 | 
			
		||||
    Used by discovery protocol to register actors as well as
 | 
			
		||||
    conduct the initial comms (capability) filtering.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    name: str
 | 
			
		||||
    uuid: str
 | 
			
		||||
    # TODO: use built-in support for UUIDs?
 | 
			
		||||
    # -[ ] `uuid.UUID` which has multi-protocol support
 | 
			
		||||
    #  https://jcristharif.com/msgspec/supported-types.html#uuid
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SpawnSpec(
 | 
			
		||||
    pretty_struct.Struct,
 | 
			
		||||
    tag=True,
 | 
			
		||||
    tag_field='msg_type',
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Initial runtime spec handed down from a spawning parent to its
 | 
			
		||||
    child subactor immediately following first contact via an
 | 
			
		||||
    `Aid` msg.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    _parent_main_data: dict
 | 
			
		||||
    _runtime_vars: dict[str, Any]
 | 
			
		||||
 | 
			
		||||
    # module import capability
 | 
			
		||||
    enable_modules: dict[str, str]
 | 
			
		||||
 | 
			
		||||
    # TODO: not just sockaddr pairs?
 | 
			
		||||
    # -[ ] abstract into a `TransportAddr` type?
 | 
			
		||||
    reg_addrs: list[tuple[str, int]]
 | 
			
		||||
    bind_addrs: list[tuple[str, int]]
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: caps based RPC support in the payload?
 | 
			
		||||
#
 | 
			
		||||
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
 | 
			
		||||
| 
						 | 
				
			
			@ -105,18 +165,31 @@ class Msg(
 | 
			
		|||
#
 | 
			
		||||
# -[ ] can we combine .ns + .func into a native `NamespacePath` field?
 | 
			
		||||
#
 | 
			
		||||
# -[ ]better name, like `Call/TaskInput`?
 | 
			
		||||
# -[ ] better name, like `Call/TaskInput`?
 | 
			
		||||
#
 | 
			
		||||
class FuncSpec(Struct):
 | 
			
		||||
    ns: str
 | 
			
		||||
    func: str
 | 
			
		||||
 | 
			
		||||
    kwargs: dict
 | 
			
		||||
    uid: str  # (calling) actor-id
 | 
			
		||||
# -[ ] XXX a debugger lock msg transaction with payloads like,
 | 
			
		||||
#   child -> `.pld: DebugLock` -> root
 | 
			
		||||
#   child <- `.pld: DebugLocked` <- root
 | 
			
		||||
#   child -> `.pld: DebugRelease` -> root
 | 
			
		||||
#
 | 
			
		||||
#   WHY => when a pld spec is provided it might not allow for
 | 
			
		||||
#   debug mode msgs as they currently are (using plain old `pld.
 | 
			
		||||
#   str` payloads) so we only when debug_mode=True we need to
 | 
			
		||||
#   union in this debugger payload set?
 | 
			
		||||
#
 | 
			
		||||
#   mk_msg_spec(
 | 
			
		||||
#       MyPldSpec,
 | 
			
		||||
#       debug_mode=True,
 | 
			
		||||
#   ) -> (
 | 
			
		||||
#       Union[MyPldSpec]
 | 
			
		||||
#      | Union[DebugLock, DebugLocked, DebugRelease]
 | 
			
		||||
#   )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Start(
 | 
			
		||||
    Msg,
 | 
			
		||||
    Struct,
 | 
			
		||||
    tag=True,
 | 
			
		||||
    tag_field='msg_type',
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Initial request to remotely schedule an RPC `trio.Task` via
 | 
			
		||||
| 
						 | 
				
			
			@ -134,14 +207,26 @@ class Start(
 | 
			
		|||
    - `Context.open_context()`
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    pld: FuncSpec
 | 
			
		||||
    cid: str
 | 
			
		||||
 | 
			
		||||
    ns: str
 | 
			
		||||
    func: str
 | 
			
		||||
 | 
			
		||||
    kwargs: dict
 | 
			
		||||
    uid: tuple[str, str]  # (calling) actor-id
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IpcCtxSpec(Struct):
 | 
			
		||||
class StartAck(
 | 
			
		||||
    Struct,
 | 
			
		||||
    tag=True,
 | 
			
		||||
    tag_field='msg_type',
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    An inter-actor-`trio.Task`-comms `Context` spec.
 | 
			
		||||
    Init response to a `Cmd` request indicating the far
 | 
			
		||||
    end's RPC spec, namely its callable "type".
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    cid: str
 | 
			
		||||
    # TODO: maybe better names for all these?
 | 
			
		||||
    # -[ ] obvi ^ would need sync with `._rpc`
 | 
			
		||||
    functype: Literal[
 | 
			
		||||
| 
						 | 
				
			
			@ -160,18 +245,6 @@ class IpcCtxSpec(Struct):
 | 
			
		|||
    # msgspec: MsgSpec
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class StartAck(
 | 
			
		||||
    Msg,
 | 
			
		||||
    Generic[PayloadT],
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Init response to a `Cmd` request indicating the far
 | 
			
		||||
    end's RPC callable "type".
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    pld: IpcCtxSpec
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Started(
 | 
			
		||||
    Msg,
 | 
			
		||||
    Generic[PayloadT],
 | 
			
		||||
| 
						 | 
				
			
			@ -202,13 +275,19 @@ class Yield(
 | 
			
		|||
    pld: PayloadT
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Stop(Msg):
 | 
			
		||||
class Stop(
 | 
			
		||||
    Struct,
 | 
			
		||||
    tag=True,
 | 
			
		||||
    tag_field='msg_type',
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Stream termination signal much like an IPC version 
 | 
			
		||||
    of `StopAsyncIteration`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    pld: UnsetType = UNSET
 | 
			
		||||
    cid: str
 | 
			
		||||
    # TODO: do we want to support a payload on stop?
 | 
			
		||||
    # pld: UnsetType = UNSET
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Return(
 | 
			
		||||
| 
						 | 
				
			
			@ -223,32 +302,33 @@ class Return(
 | 
			
		|||
    pld: PayloadT
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ErrorData(Struct):
 | 
			
		||||
class Error(
 | 
			
		||||
    Struct,
 | 
			
		||||
    tag=True,
 | 
			
		||||
    tag_field='msg_type',
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Remote actor error meta-data as needed originally by
 | 
			
		||||
    A pkt that wraps `RemoteActorError`s for relay and raising.
 | 
			
		||||
 | 
			
		||||
    Fields are 1-to-1 meta-data as needed originally by
 | 
			
		||||
    `RemoteActorError.msgdata: dict`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    src_uid: str
 | 
			
		||||
    src_uid: tuple[str, str]
 | 
			
		||||
    src_type_str: str
 | 
			
		||||
    boxed_type_str: str
 | 
			
		||||
 | 
			
		||||
    relay_path: list[str]
 | 
			
		||||
    relay_path: list[tuple[str, str]]
 | 
			
		||||
    tb_str: str
 | 
			
		||||
 | 
			
		||||
    cid: str|None = None
 | 
			
		||||
 | 
			
		||||
    # TODO: use UNSET or don't include them via
 | 
			
		||||
    #
 | 
			
		||||
    # `ContextCancelled`
 | 
			
		||||
    canceller: str|None = None
 | 
			
		||||
    canceller: tuple[str, str]|None = None
 | 
			
		||||
 | 
			
		||||
    # `StreamOverrun`
 | 
			
		||||
    sender: str|None = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Error(Msg):
 | 
			
		||||
    '''
 | 
			
		||||
    A pkt that wraps `RemoteActorError`s for relay.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    pld: ErrorData
 | 
			
		||||
    sender: tuple[str, str]|None = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: should be make a msg version of `ContextCancelled?`
 | 
			
		||||
| 
						 | 
				
			
			@ -265,6 +345,12 @@ class Error(Msg):
 | 
			
		|||
# approx order of the IPC txn-state spaces.
 | 
			
		||||
__spec__: list[Msg] = [
 | 
			
		||||
 | 
			
		||||
    # identity handshake
 | 
			
		||||
    Aid,
 | 
			
		||||
 | 
			
		||||
    # spawn specification from parent
 | 
			
		||||
    SpawnSpec,
 | 
			
		||||
 | 
			
		||||
    # inter-actor RPC initiation
 | 
			
		||||
    Start,
 | 
			
		||||
    StartAck,
 | 
			
		||||
| 
						 | 
				
			
			@ -280,6 +366,8 @@ __spec__: list[Msg] = [
 | 
			
		|||
]
 | 
			
		||||
 | 
			
		||||
_runtime_spec_msgs: list[Msg] = [
 | 
			
		||||
    Aid,
 | 
			
		||||
    SpawnSpec,
 | 
			
		||||
    Start,
 | 
			
		||||
    StartAck,
 | 
			
		||||
    Stop,
 | 
			
		||||
| 
						 | 
				
			
			@ -443,3 +531,99 @@ def mk_msg_spec(
 | 
			
		|||
        pld_spec | runtime_spec,
 | 
			
		||||
        msgtypes_table[spec_build_method] + ipc_msg_types,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: make something similar to this inside `._codec` such that
 | 
			
		||||
# user can just pass a type table of some sort?
 | 
			
		||||
# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
 | 
			
		||||
#     '''
 | 
			
		||||
#     Deliver a `enc_hook()`/`dec_hook()` pair which does
 | 
			
		||||
#     manual convertion from our above native `Msg` set
 | 
			
		||||
#     to `dict` equivalent (wire msgs) in order to keep legacy compat
 | 
			
		||||
#     with the original runtime implementation.
 | 
			
		||||
 | 
			
		||||
#     Note: this is is/was primarly used while moving the core
 | 
			
		||||
#     runtime over to using native `Msg`-struct types wherein we
 | 
			
		||||
#     start with the send side emitting without loading
 | 
			
		||||
#     a typed-decoder and then later flipping the switch over to
 | 
			
		||||
#     load to the native struct types once all runtime usage has
 | 
			
		||||
#     been adjusted appropriately.
 | 
			
		||||
 | 
			
		||||
#     '''
 | 
			
		||||
#     def enc_to_dict(msg: Any) -> Any:
 | 
			
		||||
#         '''
 | 
			
		||||
#         Encode `Msg`-structs to `dict` msgs instead
 | 
			
		||||
#         of using `msgspec.msgpack.Decoder.type`-ed
 | 
			
		||||
#         features.
 | 
			
		||||
 | 
			
		||||
#         '''
 | 
			
		||||
#         match msg:
 | 
			
		||||
#             case Start():
 | 
			
		||||
#                 dctmsg: dict = pretty_struct.Struct.to_dict(
 | 
			
		||||
#                     msg
 | 
			
		||||
#                 )['pld']
 | 
			
		||||
 | 
			
		||||
#             case Error():
 | 
			
		||||
#                 dctmsg: dict = pretty_struct.Struct.to_dict(
 | 
			
		||||
#                     msg
 | 
			
		||||
#                 )['pld']
 | 
			
		||||
#                 return {'error': dctmsg}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#     def dec_from_dict(
 | 
			
		||||
#         type: Type,
 | 
			
		||||
#         obj: Any,
 | 
			
		||||
#     ) -> Any:
 | 
			
		||||
#         '''
 | 
			
		||||
#         Decode to `Msg`-structs from `dict` msgs instead
 | 
			
		||||
#         of using `msgspec.msgpack.Decoder.type`-ed
 | 
			
		||||
#         features.
 | 
			
		||||
 | 
			
		||||
#         '''
 | 
			
		||||
#         cid: str = obj.get('cid')
 | 
			
		||||
#         match obj:
 | 
			
		||||
#             case {'cmd': pld}:
 | 
			
		||||
#                 return Start(
 | 
			
		||||
#                     cid=cid,
 | 
			
		||||
#                     pld=pld,
 | 
			
		||||
#                 )
 | 
			
		||||
#             case {'functype': pld}:
 | 
			
		||||
#                 return StartAck(
 | 
			
		||||
#                     cid=cid,
 | 
			
		||||
#                     functype=pld,
 | 
			
		||||
#                     # pld=IpcCtxSpec(
 | 
			
		||||
#                     #     functype=pld,
 | 
			
		||||
#                     # ),
 | 
			
		||||
#                 )
 | 
			
		||||
#             case {'started': pld}:
 | 
			
		||||
#                 return Started(
 | 
			
		||||
#                     cid=cid,
 | 
			
		||||
#                     pld=pld,
 | 
			
		||||
#                 )
 | 
			
		||||
#             case {'yield': pld}:
 | 
			
		||||
#                 return Yield(
 | 
			
		||||
#                     cid=obj['cid'],
 | 
			
		||||
#                     pld=pld,
 | 
			
		||||
#                 )
 | 
			
		||||
#             case {'stop': pld}:
 | 
			
		||||
#                 return Stop(
 | 
			
		||||
#                     cid=cid,
 | 
			
		||||
#                 )
 | 
			
		||||
#             case {'return': pld}:
 | 
			
		||||
#                 return Return(
 | 
			
		||||
#                     cid=cid,
 | 
			
		||||
#                     pld=pld,
 | 
			
		||||
#                 )
 | 
			
		||||
 | 
			
		||||
#             case {'error': pld}:
 | 
			
		||||
#                 return Error(
 | 
			
		||||
#                     cid=cid,
 | 
			
		||||
#                     pld=ErrorData(
 | 
			
		||||
#                         **pld
 | 
			
		||||
#                     ),
 | 
			
		||||
#                 )
 | 
			
		||||
 | 
			
		||||
#     return (
 | 
			
		||||
#         # enc_to_dict,
 | 
			
		||||
#         dec_from_dict,
 | 
			
		||||
#     )
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue