Compare commits
	
		
			3 Commits 
		
	
	
		
			67f673bf36
			...
			f2ce4a3469
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						f2ce4a3469 | |
| 
							
							
								 | 
						3aa964315a | |
| 
							
							
								 | 
						f3ca8608d5 | 
| 
						 | 
					@ -7,7 +7,6 @@ B~)
 | 
				
			||||||
'''
 | 
					'''
 | 
				
			||||||
from typing import (
 | 
					from typing import (
 | 
				
			||||||
    Any,
 | 
					    Any,
 | 
				
			||||||
    _GenericAlias,
 | 
					 | 
				
			||||||
    Type,
 | 
					    Type,
 | 
				
			||||||
    Union,
 | 
					    Union,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
| 
						 | 
					@ -26,20 +25,23 @@ from msgspec import (
 | 
				
			||||||
import pytest
 | 
					import pytest
 | 
				
			||||||
import tractor
 | 
					import tractor
 | 
				
			||||||
from tractor.msg import (
 | 
					from tractor.msg import (
 | 
				
			||||||
    _def_msgspec_codec,
 | 
					    _codec,
 | 
				
			||||||
    _ctxvar_MsgCodec,
 | 
					    _ctxvar_MsgCodec,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    NamespacePath,
 | 
					    NamespacePath,
 | 
				
			||||||
    MsgCodec,
 | 
					    MsgCodec,
 | 
				
			||||||
    mk_codec,
 | 
					    mk_codec,
 | 
				
			||||||
    apply_codec,
 | 
					    apply_codec,
 | 
				
			||||||
    current_msgspec_codec,
 | 
					    current_codec,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
from tractor.msg import types
 | 
					from tractor.msg import (
 | 
				
			||||||
 | 
					    types,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					from tractor import _state
 | 
				
			||||||
from tractor.msg.types import (
 | 
					from tractor.msg.types import (
 | 
				
			||||||
    # PayloadT,
 | 
					    # PayloadT,
 | 
				
			||||||
    Msg,
 | 
					    Msg,
 | 
				
			||||||
    # Started,
 | 
					    Started,
 | 
				
			||||||
    mk_msg_spec,
 | 
					    mk_msg_spec,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
import trio
 | 
					import trio
 | 
				
			||||||
| 
						 | 
					@ -60,56 +62,110 @@ def test_msg_spec_xor_pld_spec():
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# TODO: wrap these into `._codec` such that user can just pass
 | 
					 | 
				
			||||||
# a type table of some sort?
 | 
					 | 
				
			||||||
def enc_hook(obj: Any) -> Any:
 | 
					 | 
				
			||||||
    if isinstance(obj, NamespacePath):
 | 
					 | 
				
			||||||
        return str(obj)
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        raise NotImplementedError(
 | 
					 | 
				
			||||||
            f'Objects of type {type(obj)} are not supported'
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def dec_hook(type: Type, obj: Any) -> Any:
 | 
					 | 
				
			||||||
    print(f'type is: {type}')
 | 
					 | 
				
			||||||
    if type is NamespacePath:
 | 
					 | 
				
			||||||
        return NamespacePath(obj)
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        raise NotImplementedError(
 | 
					 | 
				
			||||||
            f'Objects of type {type(obj)} are not supported'
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def ex_func(*args):
 | 
					def ex_func(*args):
 | 
				
			||||||
    print(f'ex_func({args})')
 | 
					    print(f'ex_func({args})')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def mk_custom_codec(
 | 
					def mk_custom_codec(
 | 
				
			||||||
    ipc_msg_spec: Type[Any] = Any,
 | 
					    pld_spec: Union[Type]|Any,
 | 
				
			||||||
) -> MsgCodec:
 | 
					 | 
				
			||||||
    # apply custom hooks and set a `Decoder` which only
 | 
					 | 
				
			||||||
    # loads `NamespacePath` types.
 | 
					 | 
				
			||||||
    nsp_codec: MsgCodec = mk_codec(
 | 
					 | 
				
			||||||
        ipc_msg_spec=ipc_msg_spec,
 | 
					 | 
				
			||||||
        enc_hook=enc_hook,
 | 
					 | 
				
			||||||
        dec_hook=dec_hook,
 | 
					 | 
				
			||||||
    )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: validate `MsgCodec` interface/semantics?
 | 
					) -> MsgCodec:
 | 
				
			||||||
    # -[ ] simple field tests to ensure caching + reset is workin?
 | 
					    '''
 | 
				
			||||||
    # -[ ] custom / changing `.decoder()` calls?
 | 
					    Create custom `msgpack` enc/dec-hooks and set a `Decoder`
 | 
				
			||||||
    #
 | 
					    which only loads `NamespacePath` types.
 | 
				
			||||||
    # dec = nsp_codec.decoder(
 | 
					
 | 
				
			||||||
    #     types=NamespacePath,
 | 
					    '''
 | 
				
			||||||
    # )
 | 
					    uid: tuple[str, str] = tractor.current_actor().uid
 | 
				
			||||||
    # assert nsp_codec.dec is dec
 | 
					
 | 
				
			||||||
 | 
					    # XXX NOTE XXX: despite defining `NamespacePath` as a type
 | 
				
			||||||
 | 
					    # field on our `Msg.pld`, we still need a enc/dec_hook() pair
 | 
				
			||||||
 | 
					    # to cast to/from that type on the wire. See the docs:
 | 
				
			||||||
 | 
					    # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def enc_nsp(obj: Any) -> Any:
 | 
				
			||||||
 | 
					        match obj:
 | 
				
			||||||
 | 
					            case NamespacePath():
 | 
				
			||||||
 | 
					                print(
 | 
				
			||||||
 | 
					                    f'{uid}: `NamespacePath`-Only ENCODE?\n'
 | 
				
			||||||
 | 
					                    f'type: {type(obj)}\n'
 | 
				
			||||||
 | 
					                    f'obj: {obj}\n'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                return str(obj)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logmsg: str = (
 | 
				
			||||||
 | 
					            f'{uid}: Encoding `{obj}: <{type(obj)}>` not supported'
 | 
				
			||||||
 | 
					            f'type: {type(obj)}\n'
 | 
				
			||||||
 | 
					            f'obj: {obj}\n'
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        print(logmsg)
 | 
				
			||||||
 | 
					        raise NotImplementedError(logmsg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def dec_nsp(
 | 
				
			||||||
 | 
					        type: Type,
 | 
				
			||||||
 | 
					        obj: Any,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ) -> Any:
 | 
				
			||||||
 | 
					        print(
 | 
				
			||||||
 | 
					            f'{uid}: CUSTOM DECODE\n'
 | 
				
			||||||
 | 
					            f'input type: {type}\n'
 | 
				
			||||||
 | 
					            f'obj: {obj}\n'
 | 
				
			||||||
 | 
					            f'type(obj): `{type(obj).__class__}`\n'
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        nsp = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # This never seems to hit?
 | 
				
			||||||
 | 
					        if isinstance(obj, Msg):
 | 
				
			||||||
 | 
					            print(f'Msg type: {obj}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if (
 | 
				
			||||||
 | 
					            type is NamespacePath
 | 
				
			||||||
 | 
					            and isinstance(obj, str)
 | 
				
			||||||
 | 
					            and ':' in obj
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
 | 
					            nsp = NamespacePath(obj)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if nsp:
 | 
				
			||||||
 | 
					            print(f'Returning NSP instance: {nsp}')
 | 
				
			||||||
 | 
					            return nsp
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logmsg: str = (
 | 
				
			||||||
 | 
					            f'{uid}: Decoding `{obj}: <{type(obj)}>` not supported'
 | 
				
			||||||
 | 
					            f'input type: {type(obj)}\n'
 | 
				
			||||||
 | 
					            f'obj: {obj}\n'
 | 
				
			||||||
 | 
					            f'type(obj): `{type(obj).__class__}`\n'
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        print(logmsg)
 | 
				
			||||||
 | 
					        raise NotImplementedError(logmsg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    nsp_codec: MsgCodec = mk_codec(
 | 
				
			||||||
 | 
					        ipc_pld_spec=pld_spec,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # NOTE XXX: the encode hook MUST be used no matter what since
 | 
				
			||||||
 | 
					        # our `NamespacePath` is not any of a `Any` native type nor
 | 
				
			||||||
 | 
					        # a `msgspec.Struct` subtype - so `msgspec` has no way to know
 | 
				
			||||||
 | 
					        # how to encode it unless we provide the custom hook.
 | 
				
			||||||
 | 
					        #
 | 
				
			||||||
 | 
					        # AGAIN that is, regardless of whether we spec an
 | 
				
			||||||
 | 
					        # `Any`-decoded-pld the enc has no knowledge (by default)
 | 
				
			||||||
 | 
					        # how to enc `NamespacePath` (nsp), so we add a custom
 | 
				
			||||||
 | 
					        # hook to do that ALWAYS.
 | 
				
			||||||
 | 
					        enc_hook=enc_nsp,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # XXX NOTE: pretty sure this is mutex with the `type=` to
 | 
				
			||||||
 | 
					        # `Decoder`? so it won't work in tandem with the
 | 
				
			||||||
 | 
					        # `ipc_pld_spec` passed above?
 | 
				
			||||||
 | 
					        dec_hook=dec_nsp,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
    return nsp_codec
 | 
					    return nsp_codec
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@tractor.context
 | 
					@tractor.context
 | 
				
			||||||
async def send_back_nsp(
 | 
					async def send_back_nsp(
 | 
				
			||||||
    ctx: tractor.Context,
 | 
					    ctx: Context,
 | 
				
			||||||
 | 
					    expect_debug: bool,
 | 
				
			||||||
 | 
					    use_any_spec: bool,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
| 
						 | 
					@ -117,28 +173,65 @@ async def send_back_nsp(
 | 
				
			||||||
    and ensure we can round trip a func ref with our parent.
 | 
					    and ensure we can round trip a func ref with our parent.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    task: trio.Task = trio.lowlevel.current_task()
 | 
					    # debug mode sanity check
 | 
				
			||||||
    task_ctx: Context = task.context
 | 
					    assert expect_debug == _state.debug_mode()
 | 
				
			||||||
    assert _ctxvar_MsgCodec not in task_ctx
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    nsp_codec: MsgCodec = mk_custom_codec()
 | 
					    # task: trio.Task = trio.lowlevel.current_task()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # TreeVar
 | 
				
			||||||
 | 
					    # curr_codec = _ctxvar_MsgCodec.get_in(task)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # ContextVar
 | 
				
			||||||
 | 
					    # task_ctx: Context = task.context
 | 
				
			||||||
 | 
					    # assert _ctxvar_MsgCodec not in task_ctx
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    curr_codec = _ctxvar_MsgCodec.get()
 | 
				
			||||||
 | 
					    assert curr_codec is _codec._def_tractor_codec
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    if use_any_spec:
 | 
				
			||||||
 | 
					        pld_spec = Any
 | 
				
			||||||
 | 
					    else:
 | 
				
			||||||
 | 
					        # NOTE: don't need the |None here since
 | 
				
			||||||
 | 
					        # the parent side will never send `None` like
 | 
				
			||||||
 | 
					        # we do here in the implicit return at the end of this
 | 
				
			||||||
 | 
					        # `@context` body.
 | 
				
			||||||
 | 
					        pld_spec = NamespacePath  # |None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    nsp_codec: MsgCodec = mk_custom_codec(
 | 
				
			||||||
 | 
					        pld_spec=pld_spec,
 | 
				
			||||||
 | 
					    )
 | 
				
			||||||
    with apply_codec(nsp_codec) as codec:
 | 
					    with apply_codec(nsp_codec) as codec:
 | 
				
			||||||
        chk_codec_applied(
 | 
					        chk_codec_applied(
 | 
				
			||||||
            custom_codec=nsp_codec,
 | 
					            custom_codec=nsp_codec,
 | 
				
			||||||
            enter_value=codec,
 | 
					            enter_value=codec,
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # ensure roundtripping works locally
 | 
				
			||||||
        nsp = NamespacePath.from_ref(ex_func)
 | 
					        nsp = NamespacePath.from_ref(ex_func)
 | 
				
			||||||
        await ctx.started(nsp)
 | 
					        wire_bytes: bytes = nsp_codec.encode(
 | 
				
			||||||
 | 
					            Started(
 | 
				
			||||||
 | 
					                cid=ctx.cid,
 | 
				
			||||||
 | 
					                pld=nsp
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					        msg: Started = nsp_codec.decode(wire_bytes)
 | 
				
			||||||
 | 
					        pld = msg.pld
 | 
				
			||||||
 | 
					        assert pld == nsp
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        await ctx.started(nsp)
 | 
				
			||||||
        async with ctx.open_stream() as ipc:
 | 
					        async with ctx.open_stream() as ipc:
 | 
				
			||||||
            async for msg in ipc:
 | 
					            async for msg in ipc:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                assert msg == f'{__name__}:ex_func'
 | 
					                if use_any_spec:
 | 
				
			||||||
 | 
					                    assert msg == f'{__name__}:ex_func'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # TODO: as per below
 | 
					                    # TODO: as per below
 | 
				
			||||||
                # assert isinstance(msg, NamespacePath)
 | 
					                    # assert isinstance(msg, NamespacePath)
 | 
				
			||||||
                assert isinstance(msg, str)
 | 
					                    assert isinstance(msg, str)
 | 
				
			||||||
 | 
					                else:
 | 
				
			||||||
 | 
					                    assert isinstance(msg, NamespacePath)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                await ipc.send(msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def chk_codec_applied(
 | 
					def chk_codec_applied(
 | 
				
			||||||
| 
						 | 
					@ -146,11 +239,20 @@ def chk_codec_applied(
 | 
				
			||||||
    enter_value: MsgCodec,
 | 
					    enter_value: MsgCodec,
 | 
				
			||||||
) -> MsgCodec:
 | 
					) -> MsgCodec:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    task: trio.Task = trio.lowlevel.current_task()
 | 
					    # task: trio.Task = trio.lowlevel.current_task()
 | 
				
			||||||
    task_ctx: Context = task.context
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    assert _ctxvar_MsgCodec in task_ctx
 | 
					    # TreeVar
 | 
				
			||||||
    curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
 | 
					    # curr_codec = _ctxvar_MsgCodec.get_in(task)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # ContextVar
 | 
				
			||||||
 | 
					    # task_ctx: Context = task.context
 | 
				
			||||||
 | 
					    # assert _ctxvar_MsgCodec in task_ctx
 | 
				
			||||||
 | 
					    # curr_codec: MsgCodec = task.context[_ctxvar_MsgCodec]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # RunVar
 | 
				
			||||||
 | 
					    curr_codec: MsgCodec = _ctxvar_MsgCodec.get()
 | 
				
			||||||
 | 
					    last_read_codec = _ctxvar_MsgCodec.get()
 | 
				
			||||||
 | 
					    assert curr_codec is last_read_codec
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    assert (
 | 
					    assert (
 | 
				
			||||||
        # returned from `mk_codec()`
 | 
					        # returned from `mk_codec()`
 | 
				
			||||||
| 
						 | 
					@ -163,14 +265,31 @@ def chk_codec_applied(
 | 
				
			||||||
        curr_codec is
 | 
					        curr_codec is
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # public API for all of the above
 | 
					        # public API for all of the above
 | 
				
			||||||
        current_msgspec_codec()
 | 
					        current_codec()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # the default `msgspec` settings
 | 
					        # the default `msgspec` settings
 | 
				
			||||||
        is not _def_msgspec_codec
 | 
					        is not _codec._def_msgspec_codec
 | 
				
			||||||
 | 
					        is not _codec._def_tractor_codec
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def test_codec_hooks_mod():
 | 
					@pytest.mark.parametrize(
 | 
				
			||||||
 | 
					    'ipc_pld_spec',
 | 
				
			||||||
 | 
					    [
 | 
				
			||||||
 | 
					        # _codec._def_msgspec_codec,
 | 
				
			||||||
 | 
					        Any,
 | 
				
			||||||
 | 
					        # _codec._def_tractor_codec,
 | 
				
			||||||
 | 
					        NamespacePath|None,
 | 
				
			||||||
 | 
					    ],
 | 
				
			||||||
 | 
					    ids=[
 | 
				
			||||||
 | 
					        'any_type',
 | 
				
			||||||
 | 
					        'nsp_type',
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					def test_codec_hooks_mod(
 | 
				
			||||||
 | 
					    debug_mode: bool,
 | 
				
			||||||
 | 
					    ipc_pld_spec: Union[Type]|Any,
 | 
				
			||||||
 | 
					):
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Audit the `.msg.MsgCodec` override apis details given our impl
 | 
					    Audit the `.msg.MsgCodec` override apis details given our impl
 | 
				
			||||||
    uses `contextvars` to accomplish per `trio` task codec
 | 
					    uses `contextvars` to accomplish per `trio` task codec
 | 
				
			||||||
| 
						 | 
					@ -178,11 +297,21 @@ def test_codec_hooks_mod():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    async def main():
 | 
					    async def main():
 | 
				
			||||||
        task: trio.Task = trio.lowlevel.current_task()
 | 
					 | 
				
			||||||
        task_ctx: Context = task.context
 | 
					 | 
				
			||||||
        assert _ctxvar_MsgCodec not in task_ctx
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async with tractor.open_nursery() as an:
 | 
					        # task: trio.Task = trio.lowlevel.current_task()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # ContextVar
 | 
				
			||||||
 | 
					        # task_ctx: Context = task.context
 | 
				
			||||||
 | 
					        # assert _ctxvar_MsgCodec not in task_ctx
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # TreeVar
 | 
				
			||||||
 | 
					        # def_codec: MsgCodec = _ctxvar_MsgCodec.get_in(task)
 | 
				
			||||||
 | 
					        def_codec = _ctxvar_MsgCodec.get()
 | 
				
			||||||
 | 
					        assert def_codec is _codec._def_tractor_codec
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        async with tractor.open_nursery(
 | 
				
			||||||
 | 
					            debug_mode=debug_mode,
 | 
				
			||||||
 | 
					        ) as an:
 | 
				
			||||||
            p: tractor.Portal = await an.start_actor(
 | 
					            p: tractor.Portal = await an.start_actor(
 | 
				
			||||||
                'sub',
 | 
					                'sub',
 | 
				
			||||||
                enable_modules=[__name__],
 | 
					                enable_modules=[__name__],
 | 
				
			||||||
| 
						 | 
					@ -192,7 +321,9 @@ def test_codec_hooks_mod():
 | 
				
			||||||
            # - codec not modified -> decode nsp as `str`
 | 
					            # - codec not modified -> decode nsp as `str`
 | 
				
			||||||
            # - codec modified with hooks -> decode nsp as
 | 
					            # - codec modified with hooks -> decode nsp as
 | 
				
			||||||
            #   `NamespacePath`
 | 
					            #   `NamespacePath`
 | 
				
			||||||
            nsp_codec: MsgCodec = mk_custom_codec()
 | 
					            nsp_codec: MsgCodec = mk_custom_codec(
 | 
				
			||||||
 | 
					                pld_spec=ipc_pld_spec,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
            with apply_codec(nsp_codec) as codec:
 | 
					            with apply_codec(nsp_codec) as codec:
 | 
				
			||||||
                chk_codec_applied(
 | 
					                chk_codec_applied(
 | 
				
			||||||
                    custom_codec=nsp_codec,
 | 
					                    custom_codec=nsp_codec,
 | 
				
			||||||
| 
						 | 
					@ -202,9 +333,22 @@ def test_codec_hooks_mod():
 | 
				
			||||||
                async with (
 | 
					                async with (
 | 
				
			||||||
                    p.open_context(
 | 
					                    p.open_context(
 | 
				
			||||||
                        send_back_nsp,
 | 
					                        send_back_nsp,
 | 
				
			||||||
 | 
					                        # TODO: send the original nsp here and
 | 
				
			||||||
 | 
					                        # test with `limit_msg_spec()` above?
 | 
				
			||||||
 | 
					                        expect_debug=debug_mode,
 | 
				
			||||||
 | 
					                        use_any_spec=(ipc_pld_spec==Any),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    ) as (ctx, first),
 | 
					                    ) as (ctx, first),
 | 
				
			||||||
                    ctx.open_stream() as ipc,
 | 
					                    ctx.open_stream() as ipc,
 | 
				
			||||||
                ):
 | 
					                ):
 | 
				
			||||||
 | 
					                    if ipc_pld_spec is NamespacePath:
 | 
				
			||||||
 | 
					                        assert isinstance(first, NamespacePath)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    print(
 | 
				
			||||||
 | 
					                        'root: ENTERING CONTEXT BLOCK\n'
 | 
				
			||||||
 | 
					                        f'type(first): {type(first)}\n'
 | 
				
			||||||
 | 
					                        f'first: {first}\n'
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
                    # ensure codec is still applied across
 | 
					                    # ensure codec is still applied across
 | 
				
			||||||
                    # `tractor.Context` + its embedded nursery.
 | 
					                    # `tractor.Context` + its embedded nursery.
 | 
				
			||||||
                    chk_codec_applied(
 | 
					                    chk_codec_applied(
 | 
				
			||||||
| 
						 | 
					@ -212,23 +356,46 @@ def test_codec_hooks_mod():
 | 
				
			||||||
                        enter_value=codec,
 | 
					                        enter_value=codec,
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    assert first == f'{__name__}:ex_func'
 | 
					                    first_nsp = NamespacePath(first)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # ensure roundtripping works
 | 
				
			||||||
 | 
					                    wire_bytes: bytes = nsp_codec.encode(
 | 
				
			||||||
 | 
					                        Started(
 | 
				
			||||||
 | 
					                            cid=ctx.cid,
 | 
				
			||||||
 | 
					                            pld=first_nsp
 | 
				
			||||||
 | 
					                        )
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					                    msg: Started = nsp_codec.decode(wire_bytes)
 | 
				
			||||||
 | 
					                    pld = msg.pld
 | 
				
			||||||
 | 
					                    assert  pld == first_nsp
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # try a manual decode of the started msg+pld
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # TODO: actually get the decoder loading
 | 
					                    # TODO: actually get the decoder loading
 | 
				
			||||||
                    # to native once we spec our SCIPP msgspec
 | 
					                    # to native once we spec our SCIPP msgspec
 | 
				
			||||||
                    # (structurred-conc-inter-proc-protocol)
 | 
					                    # (structurred-conc-inter-proc-protocol)
 | 
				
			||||||
                    # implemented as per,
 | 
					                    # implemented as per,
 | 
				
			||||||
                    # https://github.com/goodboy/tractor/issues/36
 | 
					                    # https://github.com/goodboy/tractor/issues/36
 | 
				
			||||||
                    #
 | 
					                    #
 | 
				
			||||||
                    # assert isinstance(first, NamespacePath)
 | 
					                    if ipc_pld_spec is NamespacePath:
 | 
				
			||||||
                    assert isinstance(first, str)
 | 
					                        assert isinstance(first, NamespacePath)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # `Any`-payload-spec case
 | 
				
			||||||
 | 
					                    else:
 | 
				
			||||||
 | 
					                        assert isinstance(first, str)
 | 
				
			||||||
 | 
					                        assert first == f'{__name__}:ex_func'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    await ipc.send(first)
 | 
					                    await ipc.send(first)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    with trio.move_on_after(1):
 | 
					                    with trio.move_on_after(.6):
 | 
				
			||||||
                        async for msg in ipc:
 | 
					                        async for msg in ipc:
 | 
				
			||||||
 | 
					                            print(msg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                            # TODO: as per above
 | 
					                            # TODO: as per above
 | 
				
			||||||
                            # assert isinstance(msg, NamespacePath)
 | 
					                            # assert isinstance(msg, NamespacePath)
 | 
				
			||||||
                            assert isinstance(msg, str)
 | 
					                            assert isinstance(msg, str)
 | 
				
			||||||
 | 
					                            await ipc.send(msg)
 | 
				
			||||||
 | 
					                            await trio.sleep(0.1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            await p.cancel_actor()
 | 
					            await p.cancel_actor()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -6,6 +6,7 @@ sync-opening a ``tractor.Context`` beforehand.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
'''
 | 
					'''
 | 
				
			||||||
from itertools import count
 | 
					from itertools import count
 | 
				
			||||||
 | 
					import math
 | 
				
			||||||
import platform
 | 
					import platform
 | 
				
			||||||
from pprint import pformat
 | 
					from pprint import pformat
 | 
				
			||||||
from typing import (
 | 
					from typing import (
 | 
				
			||||||
| 
						 | 
					@ -845,7 +846,10 @@ async def keep_sending_from_callee(
 | 
				
			||||||
        ('caller', 1, never_open_stream),
 | 
					        ('caller', 1, never_open_stream),
 | 
				
			||||||
        ('callee', 0, keep_sending_from_callee),
 | 
					        ('callee', 0, keep_sending_from_callee),
 | 
				
			||||||
    ],
 | 
					    ],
 | 
				
			||||||
    ids='overrun_condition={}'.format,
 | 
					    ids=[
 | 
				
			||||||
 | 
					         ('caller_1buf_never_open_stream'),
 | 
				
			||||||
 | 
					         ('callee_0buf_keep_sending_from_callee'),
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
def test_one_end_stream_not_opened(
 | 
					def test_one_end_stream_not_opened(
 | 
				
			||||||
    overrun_by: tuple[str, int, Callable],
 | 
					    overrun_by: tuple[str, int, Callable],
 | 
				
			||||||
| 
						 | 
					@ -869,29 +873,30 @@ def test_one_end_stream_not_opened(
 | 
				
			||||||
                enable_modules=[__name__],
 | 
					                enable_modules=[__name__],
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            async with portal.open_context(
 | 
					            with trio.fail_after(0.8):
 | 
				
			||||||
                entrypoint,
 | 
					                async with portal.open_context(
 | 
				
			||||||
            ) as (ctx, sent):
 | 
					                    entrypoint,
 | 
				
			||||||
                assert sent is None
 | 
					                ) as (ctx, sent):
 | 
				
			||||||
 | 
					                    assert sent is None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if 'caller' in overrunner:
 | 
					                    if 'caller' in overrunner:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    async with ctx.open_stream() as stream:
 | 
					                        async with ctx.open_stream() as stream:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        # itersend +1 msg more then the buffer size
 | 
					                            # itersend +1 msg more then the buffer size
 | 
				
			||||||
                        # to cause the most basic overrun.
 | 
					                            # to cause the most basic overrun.
 | 
				
			||||||
                        for i in range(buf_size):
 | 
					                            for i in range(buf_size):
 | 
				
			||||||
                            print(f'sending {i}')
 | 
					                                print(f'sending {i}')
 | 
				
			||||||
                            await stream.send(i)
 | 
					                                await stream.send(i)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        else:
 | 
					                            else:
 | 
				
			||||||
                            # expect overrun error to be relayed back
 | 
					                                # expect overrun error to be relayed back
 | 
				
			||||||
                            # and this sleep interrupted
 | 
					                                # and this sleep interrupted
 | 
				
			||||||
                            await trio.sleep_forever()
 | 
					                                await trio.sleep_forever()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                else:
 | 
					                    else:
 | 
				
			||||||
                    # callee overruns caller case so we do nothing here
 | 
					                        # callee overruns caller case so we do nothing here
 | 
				
			||||||
                    await trio.sleep_forever()
 | 
					                        await trio.sleep_forever()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            await portal.cancel_actor()
 | 
					            await portal.cancel_actor()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1055,54 +1060,63 @@ def test_maybe_allow_overruns_stream(
 | 
				
			||||||
                loglevel=loglevel,
 | 
					                loglevel=loglevel,
 | 
				
			||||||
                debug_mode=debug_mode,
 | 
					                debug_mode=debug_mode,
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            seq = list(range(10))
 | 
					 | 
				
			||||||
            async with portal.open_context(
 | 
					 | 
				
			||||||
                echo_back_sequence,
 | 
					 | 
				
			||||||
                seq=seq,
 | 
					 | 
				
			||||||
                wait_for_cancel=cancel_ctx,
 | 
					 | 
				
			||||||
                be_slow=(slow_side == 'child'),
 | 
					 | 
				
			||||||
                allow_overruns_side=allow_overruns_side,
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            ) as (ctx, sent):
 | 
					            # stream-sequence batch info with send delay to determine
 | 
				
			||||||
                assert sent is None
 | 
					            # approx timeout determining whether test has hung.
 | 
				
			||||||
 | 
					            total_batches: int = 2
 | 
				
			||||||
 | 
					            num_items: int = 10
 | 
				
			||||||
 | 
					            seq = list(range(num_items))
 | 
				
			||||||
 | 
					            parent_send_delay: float = 0.16
 | 
				
			||||||
 | 
					            timeout: float = math.ceil(
 | 
				
			||||||
 | 
					                total_batches * num_items * parent_send_delay
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            with trio.fail_after(timeout):
 | 
				
			||||||
 | 
					                async with portal.open_context(
 | 
				
			||||||
 | 
					                    echo_back_sequence,
 | 
				
			||||||
 | 
					                    seq=seq,
 | 
				
			||||||
 | 
					                    wait_for_cancel=cancel_ctx,
 | 
				
			||||||
 | 
					                    be_slow=(slow_side == 'child'),
 | 
				
			||||||
 | 
					                    allow_overruns_side=allow_overruns_side,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                async with ctx.open_stream(
 | 
					                ) as (ctx, sent):
 | 
				
			||||||
                    msg_buffer_size=1 if slow_side == 'parent' else None,
 | 
					                    assert sent is None
 | 
				
			||||||
                    allow_overruns=(allow_overruns_side in {'parent', 'both'}),
 | 
					 | 
				
			||||||
                ) as stream:
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    total_batches: int = 2
 | 
					                    async with ctx.open_stream(
 | 
				
			||||||
                    for _ in range(total_batches):
 | 
					                        msg_buffer_size=1 if slow_side == 'parent' else None,
 | 
				
			||||||
                        for msg in seq:
 | 
					                        allow_overruns=(allow_overruns_side in {'parent', 'both'}),
 | 
				
			||||||
                            # print(f'root tx {msg}')
 | 
					                    ) as stream:
 | 
				
			||||||
                            await stream.send(msg)
 | 
					 | 
				
			||||||
                            if slow_side == 'parent':
 | 
					 | 
				
			||||||
                                # NOTE: we make the parent slightly
 | 
					 | 
				
			||||||
                                # slower, when it is slow, to make sure
 | 
					 | 
				
			||||||
                                # that in the overruns everywhere case
 | 
					 | 
				
			||||||
                                await trio.sleep(0.16)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                        batch = []
 | 
					                        for _ in range(total_batches):
 | 
				
			||||||
                        async for msg in stream:
 | 
					                            for msg in seq:
 | 
				
			||||||
                            print(f'root rx {msg}')
 | 
					                                # print(f'root tx {msg}')
 | 
				
			||||||
                            batch.append(msg)
 | 
					                                await stream.send(msg)
 | 
				
			||||||
                            if batch == seq:
 | 
					                                if slow_side == 'parent':
 | 
				
			||||||
                                break
 | 
					                                    # NOTE: we make the parent slightly
 | 
				
			||||||
 | 
					                                    # slower, when it is slow, to make sure
 | 
				
			||||||
 | 
					                                    # that in the overruns everywhere case
 | 
				
			||||||
 | 
					                                    await trio.sleep(parent_send_delay)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                            batch = []
 | 
				
			||||||
 | 
					                            async for msg in stream:
 | 
				
			||||||
 | 
					                                print(f'root rx {msg}')
 | 
				
			||||||
 | 
					                                batch.append(msg)
 | 
				
			||||||
 | 
					                                if batch == seq:
 | 
				
			||||||
 | 
					                                    break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    if cancel_ctx:
 | 
				
			||||||
 | 
					                        # cancel the remote task
 | 
				
			||||||
 | 
					                        print('Requesting `ctx.cancel()` in parent!')
 | 
				
			||||||
 | 
					                        await ctx.cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                res: str|ContextCancelled = await ctx.result()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if cancel_ctx:
 | 
					                if cancel_ctx:
 | 
				
			||||||
                    # cancel the remote task
 | 
					                    assert isinstance(res, ContextCancelled)
 | 
				
			||||||
                    print('Requesting `ctx.cancel()` in parent!')
 | 
					                    assert tuple(res.canceller) == current_actor().uid
 | 
				
			||||||
                    await ctx.cancel()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            res: str|ContextCancelled = await ctx.result()
 | 
					                else:
 | 
				
			||||||
 | 
					                    print(f'RX ROOT SIDE RESULT {res}')
 | 
				
			||||||
            if cancel_ctx:
 | 
					                    assert res == 'yo'
 | 
				
			||||||
                assert isinstance(res, ContextCancelled)
 | 
					 | 
				
			||||||
                assert tuple(res.canceller) == current_actor().uid
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            else:
 | 
					 | 
				
			||||||
                print(f'RX ROOT SIDE RESULT {res}')
 | 
					 | 
				
			||||||
                assert res == 'yo'
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # cancel the daemon
 | 
					            # cancel the daemon
 | 
				
			||||||
            await portal.cancel_actor()
 | 
					            await portal.cancel_actor()
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -31,25 +31,24 @@ from ._codec import (
 | 
				
			||||||
    apply_codec as apply_codec,
 | 
					    apply_codec as apply_codec,
 | 
				
			||||||
    mk_codec as mk_codec,
 | 
					    mk_codec as mk_codec,
 | 
				
			||||||
    MsgCodec as MsgCodec,
 | 
					    MsgCodec as MsgCodec,
 | 
				
			||||||
    current_msgspec_codec as current_msgspec_codec,
 | 
					    current_codec as current_codec,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from .types import (
 | 
					from .types import (
 | 
				
			||||||
    Msg as Msg,
 | 
					    Msg as Msg,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Start as Start,  # with pld
 | 
					    Aid as Aid,
 | 
				
			||||||
    FuncSpec as FuncSpec,
 | 
					    SpawnSpec as SpawnSpec,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    StartAck as StartAck, # with pld
 | 
					    Start as Start,
 | 
				
			||||||
    IpcCtxSpec as IpcCtxSpec,
 | 
					    StartAck as StartAck,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Started as Started,
 | 
					    Started as Started,
 | 
				
			||||||
    Yield as Yield,
 | 
					    Yield as Yield,
 | 
				
			||||||
    Stop as Stop,
 | 
					    Stop as Stop,
 | 
				
			||||||
    Return as Return,
 | 
					    Return as Return,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Error as Error,  # with pld
 | 
					    Error as Error,
 | 
				
			||||||
    ErrorData as ErrorData,
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # full msg spec set
 | 
					    # full msg spec set
 | 
				
			||||||
    __spec__ as __spec__,
 | 
					    __spec__ as __spec__,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -30,13 +30,13 @@ ToDo: backends we prolly should offer:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
'''
 | 
					'''
 | 
				
			||||||
from __future__ import annotations
 | 
					from __future__ import annotations
 | 
				
			||||||
from contextvars import (
 | 
					 | 
				
			||||||
    ContextVar,
 | 
					 | 
				
			||||||
    Token,
 | 
					 | 
				
			||||||
)
 | 
					 | 
				
			||||||
from contextlib import (
 | 
					from contextlib import (
 | 
				
			||||||
    contextmanager as cm,
 | 
					    contextmanager as cm,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					# from contextvars import (
 | 
				
			||||||
 | 
					#     ContextVar,
 | 
				
			||||||
 | 
					#     Token,
 | 
				
			||||||
 | 
					# )
 | 
				
			||||||
from typing import (
 | 
					from typing import (
 | 
				
			||||||
    Any,
 | 
					    Any,
 | 
				
			||||||
    Callable,
 | 
					    Callable,
 | 
				
			||||||
| 
						 | 
					@ -47,6 +47,12 @@ from types import ModuleType
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import msgspec
 | 
					import msgspec
 | 
				
			||||||
from msgspec import msgpack
 | 
					from msgspec import msgpack
 | 
				
			||||||
 | 
					from trio.lowlevel import (
 | 
				
			||||||
 | 
					    RunVar,
 | 
				
			||||||
 | 
					    RunVarToken,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					# TODO: see notes below from @mikenerone..
 | 
				
			||||||
 | 
					# from tricycle import TreeVar
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from tractor.msg.pretty_struct import Struct
 | 
					from tractor.msg.pretty_struct import Struct
 | 
				
			||||||
from tractor.msg.types import (
 | 
					from tractor.msg.types import (
 | 
				
			||||||
| 
						 | 
					@ -72,6 +78,9 @@ class MsgCodec(Struct):
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    A IPC msg interchange format lib's encoder + decoder pair.
 | 
					    A IPC msg interchange format lib's encoder + decoder pair.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Pretty much nothing more then delegation to underlying
 | 
				
			||||||
 | 
					    `msgspec.<interchange-protocol>.Encoder/Decoder`s for now.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    _enc: msgpack.Encoder
 | 
					    _enc: msgpack.Encoder
 | 
				
			||||||
    _dec: msgpack.Decoder
 | 
					    _dec: msgpack.Decoder
 | 
				
			||||||
| 
						 | 
					@ -86,11 +95,6 @@ class MsgCodec(Struct):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    lib: ModuleType = msgspec
 | 
					    lib: ModuleType = msgspec
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # ad-hoc type extensions
 | 
					 | 
				
			||||||
    # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
 | 
					 | 
				
			||||||
    enc_hook: Callable[[Any], Any]|None = None  # coder
 | 
					 | 
				
			||||||
    dec_hook: Callable[[type, Any], Any]|None = None # decoder
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    # TODO: a sub-decoder system as well?
 | 
					    # TODO: a sub-decoder system as well?
 | 
				
			||||||
    # payload_msg_specs: Union[Type[Struct]] = Any
 | 
					    # payload_msg_specs: Union[Type[Struct]] = Any
 | 
				
			||||||
    # see related comments in `.msg.types`
 | 
					    # see related comments in `.msg.types`
 | 
				
			||||||
| 
						 | 
					@ -304,7 +308,8 @@ def mk_codec(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    libname: str = 'msgspec',
 | 
					    libname: str = 'msgspec',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # proxy as `Struct(**kwargs)`
 | 
					    # proxy as `Struct(**kwargs)` for ad-hoc type extensions
 | 
				
			||||||
 | 
					    # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types
 | 
				
			||||||
    # ------ - ------
 | 
					    # ------ - ------
 | 
				
			||||||
    dec_hook: Callable|None = None,
 | 
					    dec_hook: Callable|None = None,
 | 
				
			||||||
    enc_hook: Callable|None = None,
 | 
					    enc_hook: Callable|None = None,
 | 
				
			||||||
| 
						 | 
					@ -389,14 +394,52 @@ def mk_codec(
 | 
				
			||||||
# no custom structs, hooks or other special types.
 | 
					# no custom structs, hooks or other special types.
 | 
				
			||||||
_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
 | 
					_def_msgspec_codec: MsgCodec = mk_codec(ipc_msg_spec=Any)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# NOTE: provides for per-`trio.Task` specificity of the
 | 
					# The built-in IPC `Msg` spec.
 | 
				
			||||||
 | 
					# Our composing "shuttle" protocol which allows `tractor`-app code
 | 
				
			||||||
 | 
					# to use any `msgspec` supported type as the `Msg.pld` payload,
 | 
				
			||||||
 | 
					# https://jcristharif.com/msgspec/supported-types.html
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					_def_tractor_codec: MsgCodec = mk_codec(
 | 
				
			||||||
 | 
					    ipc_pld_spec=Any,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					# TODO: IDEALLY provides for per-`trio.Task` specificity of the
 | 
				
			||||||
# IPC msging codec used by the transport layer when doing
 | 
					# IPC msging codec used by the transport layer when doing
 | 
				
			||||||
# `Channel.send()/.recv()` of wire data.
 | 
					# `Channel.send()/.recv()` of wire data.
 | 
				
			||||||
_ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
 | 
					
 | 
				
			||||||
 | 
					# ContextVar-TODO: DIDN'T WORK, kept resetting in every new task to default!?
 | 
				
			||||||
 | 
					# _ctxvar_MsgCodec: ContextVar[MsgCodec] = ContextVar(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# TreeVar-TODO: DIDN'T WORK, kept resetting in every new embedded nursery
 | 
				
			||||||
 | 
					# even though it's supposed to inherit from a parent context ???
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					# _ctxvar_MsgCodec: TreeVar[MsgCodec] = TreeVar(
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					# ^-NOTE-^: for this to work see the mods by @mikenerone from `trio` gitter:
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					# 22:02:54 <mikenerone> even for regular contextvars, all you have to do is:
 | 
				
			||||||
 | 
					#    `task: Task = trio.lowlevel.current_task()`
 | 
				
			||||||
 | 
					#    `task.parent_nursery.parent_task.context.run(my_ctx_var.set, new_value)`
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					# From a comment in his prop code he couldn't share outright:
 | 
				
			||||||
 | 
					# 1. For every TreeVar set in the current task (which covers what
 | 
				
			||||||
 | 
					#    we need from SynchronizerFacade), walk up the tree until the
 | 
				
			||||||
 | 
					#    root or finding one where the TreeVar is already set, setting
 | 
				
			||||||
 | 
					#    it in all of the contexts along the way.
 | 
				
			||||||
 | 
					# 2. For each of those, we also forcibly set the values that are
 | 
				
			||||||
 | 
					#    pending for child nurseries that have not yet accessed the
 | 
				
			||||||
 | 
					#    TreeVar.
 | 
				
			||||||
 | 
					# 3. We similarly set the pending values for the child nurseries
 | 
				
			||||||
 | 
					#    of the *current* task.
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# TODO: STOP USING THIS, since it's basically a global and won't
 | 
				
			||||||
 | 
					# allow sub-IPC-ctxs to limit the msg-spec however desired..
 | 
				
			||||||
 | 
					_ctxvar_MsgCodec: MsgCodec = RunVar(
 | 
				
			||||||
    'msgspec_codec',
 | 
					    'msgspec_codec',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # TODO: move this to our new `Msg`-spec!
 | 
					    # TODO: move this to our new `Msg`-spec!
 | 
				
			||||||
    default=_def_msgspec_codec,
 | 
					    default=_def_msgspec_codec,
 | 
				
			||||||
 | 
					    # default=_def_tractor_codec,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -410,15 +453,36 @@ def apply_codec(
 | 
				
			||||||
    runtime context such that all IPC msgs are processed
 | 
					    runtime context such that all IPC msgs are processed
 | 
				
			||||||
    with it for that task.
 | 
					    with it for that task.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Uses a `tricycle.TreeVar` to ensure the scope of the codec
 | 
				
			||||||
 | 
					    matches the `@cm` block and DOES NOT change to the original
 | 
				
			||||||
 | 
					    (default) value in new tasks (as it does for `ContextVar`).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    See the docs:
 | 
				
			||||||
 | 
					    - https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
 | 
				
			||||||
 | 
					    - https://github.com/oremanj/tricycle/blob/master/tricycle/_tests/test_tree_var.py
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    token: Token = _ctxvar_MsgCodec.set(codec)
 | 
					    orig: MsgCodec = _ctxvar_MsgCodec.get()
 | 
				
			||||||
 | 
					    assert orig is not codec
 | 
				
			||||||
 | 
					    token: RunVarToken = _ctxvar_MsgCodec.set(codec)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # TODO: for TreeVar approach, see docs for @cm `.being()` API:
 | 
				
			||||||
 | 
					    # https://tricycle.readthedocs.io/en/latest/reference.html#tree-variables
 | 
				
			||||||
 | 
					    # try:
 | 
				
			||||||
 | 
					    #     with _ctxvar_MsgCodec.being(codec):
 | 
				
			||||||
 | 
					    #         new = _ctxvar_MsgCodec.get()
 | 
				
			||||||
 | 
					    #         assert new is codec
 | 
				
			||||||
 | 
					    #         yield codec
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
        yield _ctxvar_MsgCodec.get()
 | 
					        yield _ctxvar_MsgCodec.get()
 | 
				
			||||||
    finally:
 | 
					    finally:
 | 
				
			||||||
        _ctxvar_MsgCodec.reset(token)
 | 
					        _ctxvar_MsgCodec.reset(token)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    assert _ctxvar_MsgCodec.get() is orig
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def current_msgspec_codec() -> MsgCodec:
 | 
					
 | 
				
			||||||
 | 
					def current_codec() -> MsgCodec:
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Return the current `trio.Task.context`'s value
 | 
					    Return the current `trio.Task.context`'s value
 | 
				
			||||||
    for `msgspec_codec` used by `Channel.send/.recv()`
 | 
					    for `msgspec_codec` used by `Channel.send/.recv()`
 | 
				
			||||||
| 
						 | 
					@ -449,5 +513,6 @@ def limit_msg_spec(
 | 
				
			||||||
        payload_types=payload_types,
 | 
					        payload_types=payload_types,
 | 
				
			||||||
        **codec_kwargs,
 | 
					        **codec_kwargs,
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
    with apply_codec(msgspec_codec):
 | 
					    with apply_codec(msgspec_codec) as applied_codec:
 | 
				
			||||||
 | 
					        assert applied_codec is msgspec_codec
 | 
				
			||||||
        yield msgspec_codec
 | 
					        yield msgspec_codec
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -80,6 +80,28 @@ class DiffDump(UserList):
 | 
				
			||||||
        return repstr
 | 
					        return repstr
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def iter_fields(struct: Struct) -> Iterator[
 | 
				
			||||||
 | 
					    tuple[
 | 
				
			||||||
 | 
					        structs.FieldIinfo,
 | 
				
			||||||
 | 
					        str,
 | 
				
			||||||
 | 
					        Any,
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					]:
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Iterate over all non-@property fields of this struct.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    fi: structs.FieldInfo
 | 
				
			||||||
 | 
					    for fi in structs.fields(struct):
 | 
				
			||||||
 | 
					        key: str = fi.name
 | 
				
			||||||
 | 
					        val: Any = getattr(struct, key)
 | 
				
			||||||
 | 
					        yield (
 | 
				
			||||||
 | 
					            fi,
 | 
				
			||||||
 | 
					            key,
 | 
				
			||||||
 | 
					            val,
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Struct(
 | 
					class Struct(
 | 
				
			||||||
    _Struct,
 | 
					    _Struct,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -91,23 +113,6 @@ class Struct(
 | 
				
			||||||
    A "human friendlier" (aka repl buddy) struct subtype.
 | 
					    A "human friendlier" (aka repl buddy) struct subtype.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    def _sin_props(self) -> Iterator[
 | 
					 | 
				
			||||||
        tuple[
 | 
					 | 
				
			||||||
            structs.FieldIinfo,
 | 
					 | 
				
			||||||
            str,
 | 
					 | 
				
			||||||
            Any,
 | 
					 | 
				
			||||||
        ]
 | 
					 | 
				
			||||||
    ]:
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        Iterate over all non-@property fields of this struct.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        '''
 | 
					 | 
				
			||||||
        fi: structs.FieldInfo
 | 
					 | 
				
			||||||
        for fi in structs.fields(self):
 | 
					 | 
				
			||||||
            key: str = fi.name
 | 
					 | 
				
			||||||
            val: Any = getattr(self, key)
 | 
					 | 
				
			||||||
            yield fi, key, val
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def to_dict(
 | 
					    def to_dict(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        include_non_members: bool = True,
 | 
					        include_non_members: bool = True,
 | 
				
			||||||
| 
						 | 
					@ -130,7 +135,7 @@ class Struct(
 | 
				
			||||||
        # added as type-defined `@property` methods!
 | 
					        # added as type-defined `@property` methods!
 | 
				
			||||||
        sin_props: dict = {}
 | 
					        sin_props: dict = {}
 | 
				
			||||||
        fi: structs.FieldInfo
 | 
					        fi: structs.FieldInfo
 | 
				
			||||||
        for fi, k, v in self._sin_props():
 | 
					        for fi, k, v in iter_fields(self):
 | 
				
			||||||
            sin_props[k] = asdict[k]
 | 
					            sin_props[k] = asdict[k]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        return sin_props
 | 
					        return sin_props
 | 
				
			||||||
| 
						 | 
					@ -159,7 +164,7 @@ class Struct(
 | 
				
			||||||
        fi: structs.FieldInfo
 | 
					        fi: structs.FieldInfo
 | 
				
			||||||
        k: str
 | 
					        k: str
 | 
				
			||||||
        v: Any
 | 
					        v: Any
 | 
				
			||||||
        for fi, k, v in self._sin_props():
 | 
					        for fi, k, v in iter_fields(self):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # TODO: how can we prefer `Literal['option1',  'option2,
 | 
					            # TODO: how can we prefer `Literal['option1',  'option2,
 | 
				
			||||||
            # ..]` over .__name__ == `Literal` but still get only the
 | 
					            # ..]` over .__name__ == `Literal` but still get only the
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -26,6 +26,7 @@ from __future__ import annotations
 | 
				
			||||||
import types
 | 
					import types
 | 
				
			||||||
from typing import (
 | 
					from typing import (
 | 
				
			||||||
    Any,
 | 
					    Any,
 | 
				
			||||||
 | 
					    Callable,
 | 
				
			||||||
    Generic,
 | 
					    Generic,
 | 
				
			||||||
    Literal,
 | 
					    Literal,
 | 
				
			||||||
    Type,
 | 
					    Type,
 | 
				
			||||||
| 
						 | 
					@ -37,8 +38,12 @@ from msgspec import (
 | 
				
			||||||
    defstruct,
 | 
					    defstruct,
 | 
				
			||||||
    # field,
 | 
					    # field,
 | 
				
			||||||
    Struct,
 | 
					    Struct,
 | 
				
			||||||
    UNSET,
 | 
					    # UNSET,
 | 
				
			||||||
    UnsetType,
 | 
					    # UnsetType,
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from tractor.msg import (
 | 
				
			||||||
 | 
					    pretty_struct,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# type variable for the boxed payload field `.pld`
 | 
					# type variable for the boxed payload field `.pld`
 | 
				
			||||||
| 
						 | 
					@ -48,11 +53,19 @@ PayloadT = TypeVar('PayloadT')
 | 
				
			||||||
class Msg(
 | 
					class Msg(
 | 
				
			||||||
    Struct,
 | 
					    Struct,
 | 
				
			||||||
    Generic[PayloadT],
 | 
					    Generic[PayloadT],
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # https://jcristharif.com/msgspec/structs.html#tagged-unions
 | 
				
			||||||
    tag=True,
 | 
					    tag=True,
 | 
				
			||||||
    tag_field='msg_type',
 | 
					    tag_field='msg_type',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # eq=True,
 | 
					    # https://jcristharif.com/msgspec/structs.html#field-ordering
 | 
				
			||||||
 | 
					    # kw_only=True,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # https://jcristharif.com/msgspec/structs.html#equality-and-order
 | 
				
			||||||
    # order=True,
 | 
					    # order=True,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # https://jcristharif.com/msgspec/structs.html#encoding-decoding-as-arrays
 | 
				
			||||||
 | 
					    # as_array=True,
 | 
				
			||||||
):
 | 
					):
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    The "god" boxing msg type.
 | 
					    The "god" boxing msg type.
 | 
				
			||||||
| 
						 | 
					@ -90,6 +103,53 @@ class Msg(
 | 
				
			||||||
    pld: PayloadT
 | 
					    pld: PayloadT
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class Aid(
 | 
				
			||||||
 | 
					    Struct,
 | 
				
			||||||
 | 
					    tag=True,
 | 
				
			||||||
 | 
					    tag_field='msg_type',
 | 
				
			||||||
 | 
					):
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Actor-identity msg.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Initial contact exchange enabling an actor "mailbox handshake"
 | 
				
			||||||
 | 
					    delivering the peer identity (and maybe eventually contact)
 | 
				
			||||||
 | 
					    info.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Used by discovery protocol to register actors as well as
 | 
				
			||||||
 | 
					    conduct the initial comms (capability) filtering.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    name: str
 | 
				
			||||||
 | 
					    uuid: str
 | 
				
			||||||
 | 
					    # TODO: use built-in support for UUIDs?
 | 
				
			||||||
 | 
					    # -[ ] `uuid.UUID` which has multi-protocol support
 | 
				
			||||||
 | 
					    #  https://jcristharif.com/msgspec/supported-types.html#uuid
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class SpawnSpec(
 | 
				
			||||||
 | 
					    pretty_struct.Struct,
 | 
				
			||||||
 | 
					    tag=True,
 | 
				
			||||||
 | 
					    tag_field='msg_type',
 | 
				
			||||||
 | 
					):
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Initial runtime spec handed down from a spawning parent to its
 | 
				
			||||||
 | 
					    child subactor immediately following first contact via an
 | 
				
			||||||
 | 
					    `Aid` msg.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    _parent_main_data: dict
 | 
				
			||||||
 | 
					    _runtime_vars: dict[str, Any]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # module import capability
 | 
				
			||||||
 | 
					    enable_modules: dict[str, str]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # TODO: not just sockaddr pairs?
 | 
				
			||||||
 | 
					    # -[ ] abstract into a `TransportAddr` type?
 | 
				
			||||||
 | 
					    reg_addrs: list[tuple[str, int]]
 | 
				
			||||||
 | 
					    bind_addrs: list[tuple[str, int]]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# TODO: caps based RPC support in the payload?
 | 
					# TODO: caps based RPC support in the payload?
 | 
				
			||||||
#
 | 
					#
 | 
				
			||||||
# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
 | 
					# -[ ] integration with our ``enable_modules: list[str]`` caps sys.
 | 
				
			||||||
| 
						 | 
					@ -105,18 +165,31 @@ class Msg(
 | 
				
			||||||
#
 | 
					#
 | 
				
			||||||
# -[ ] can we combine .ns + .func into a native `NamespacePath` field?
 | 
					# -[ ] can we combine .ns + .func into a native `NamespacePath` field?
 | 
				
			||||||
#
 | 
					#
 | 
				
			||||||
# -[ ]better name, like `Call/TaskInput`?
 | 
					# -[ ] better name, like `Call/TaskInput`?
 | 
				
			||||||
#
 | 
					#
 | 
				
			||||||
class FuncSpec(Struct):
 | 
					# -[ ] XXX a debugger lock msg transaction with payloads like,
 | 
				
			||||||
    ns: str
 | 
					#   child -> `.pld: DebugLock` -> root
 | 
				
			||||||
    func: str
 | 
					#   child <- `.pld: DebugLocked` <- root
 | 
				
			||||||
 | 
					#   child -> `.pld: DebugRelease` -> root
 | 
				
			||||||
    kwargs: dict
 | 
					#
 | 
				
			||||||
    uid: str  # (calling) actor-id
 | 
					#   WHY => when a pld spec is provided it might not allow for
 | 
				
			||||||
 | 
					#   debug mode msgs as they currently are (using plain old `pld.
 | 
				
			||||||
 | 
					#   str` payloads) so we only when debug_mode=True we need to
 | 
				
			||||||
 | 
					#   union in this debugger payload set?
 | 
				
			||||||
 | 
					#
 | 
				
			||||||
 | 
					#   mk_msg_spec(
 | 
				
			||||||
 | 
					#       MyPldSpec,
 | 
				
			||||||
 | 
					#       debug_mode=True,
 | 
				
			||||||
 | 
					#   ) -> (
 | 
				
			||||||
 | 
					#       Union[MyPldSpec]
 | 
				
			||||||
 | 
					#      | Union[DebugLock, DebugLocked, DebugRelease]
 | 
				
			||||||
 | 
					#   )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Start(
 | 
					class Start(
 | 
				
			||||||
    Msg,
 | 
					    Struct,
 | 
				
			||||||
 | 
					    tag=True,
 | 
				
			||||||
 | 
					    tag_field='msg_type',
 | 
				
			||||||
):
 | 
					):
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Initial request to remotely schedule an RPC `trio.Task` via
 | 
					    Initial request to remotely schedule an RPC `trio.Task` via
 | 
				
			||||||
| 
						 | 
					@ -134,14 +207,26 @@ class Start(
 | 
				
			||||||
    - `Context.open_context()`
 | 
					    - `Context.open_context()`
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    pld: FuncSpec
 | 
					    cid: str
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ns: str
 | 
				
			||||||
 | 
					    func: str
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    kwargs: dict
 | 
				
			||||||
 | 
					    uid: tuple[str, str]  # (calling) actor-id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class IpcCtxSpec(Struct):
 | 
					class StartAck(
 | 
				
			||||||
 | 
					    Struct,
 | 
				
			||||||
 | 
					    tag=True,
 | 
				
			||||||
 | 
					    tag_field='msg_type',
 | 
				
			||||||
 | 
					):
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    An inter-actor-`trio.Task`-comms `Context` spec.
 | 
					    Init response to a `Cmd` request indicating the far
 | 
				
			||||||
 | 
					    end's RPC spec, namely its callable "type".
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
 | 
					    cid: str
 | 
				
			||||||
    # TODO: maybe better names for all these?
 | 
					    # TODO: maybe better names for all these?
 | 
				
			||||||
    # -[ ] obvi ^ would need sync with `._rpc`
 | 
					    # -[ ] obvi ^ would need sync with `._rpc`
 | 
				
			||||||
    functype: Literal[
 | 
					    functype: Literal[
 | 
				
			||||||
| 
						 | 
					@ -160,18 +245,6 @@ class IpcCtxSpec(Struct):
 | 
				
			||||||
    # msgspec: MsgSpec
 | 
					    # msgspec: MsgSpec
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class StartAck(
 | 
					 | 
				
			||||||
    Msg,
 | 
					 | 
				
			||||||
    Generic[PayloadT],
 | 
					 | 
				
			||||||
):
 | 
					 | 
				
			||||||
    '''
 | 
					 | 
				
			||||||
    Init response to a `Cmd` request indicating the far
 | 
					 | 
				
			||||||
    end's RPC callable "type".
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    '''
 | 
					 | 
				
			||||||
    pld: IpcCtxSpec
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class Started(
 | 
					class Started(
 | 
				
			||||||
    Msg,
 | 
					    Msg,
 | 
				
			||||||
    Generic[PayloadT],
 | 
					    Generic[PayloadT],
 | 
				
			||||||
| 
						 | 
					@ -202,13 +275,19 @@ class Yield(
 | 
				
			||||||
    pld: PayloadT
 | 
					    pld: PayloadT
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Stop(Msg):
 | 
					class Stop(
 | 
				
			||||||
 | 
					    Struct,
 | 
				
			||||||
 | 
					    tag=True,
 | 
				
			||||||
 | 
					    tag_field='msg_type',
 | 
				
			||||||
 | 
					):
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Stream termination signal much like an IPC version 
 | 
					    Stream termination signal much like an IPC version 
 | 
				
			||||||
    of `StopAsyncIteration`.
 | 
					    of `StopAsyncIteration`.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    pld: UnsetType = UNSET
 | 
					    cid: str
 | 
				
			||||||
 | 
					    # TODO: do we want to support a payload on stop?
 | 
				
			||||||
 | 
					    # pld: UnsetType = UNSET
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Return(
 | 
					class Return(
 | 
				
			||||||
| 
						 | 
					@ -223,32 +302,33 @@ class Return(
 | 
				
			||||||
    pld: PayloadT
 | 
					    pld: PayloadT
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class ErrorData(Struct):
 | 
					class Error(
 | 
				
			||||||
 | 
					    Struct,
 | 
				
			||||||
 | 
					    tag=True,
 | 
				
			||||||
 | 
					    tag_field='msg_type',
 | 
				
			||||||
 | 
					):
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    Remote actor error meta-data as needed originally by
 | 
					    A pkt that wraps `RemoteActorError`s for relay and raising.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Fields are 1-to-1 meta-data as needed originally by
 | 
				
			||||||
    `RemoteActorError.msgdata: dict`.
 | 
					    `RemoteActorError.msgdata: dict`.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    src_uid: str
 | 
					    src_uid: tuple[str, str]
 | 
				
			||||||
    src_type_str: str
 | 
					    src_type_str: str
 | 
				
			||||||
    boxed_type_str: str
 | 
					    boxed_type_str: str
 | 
				
			||||||
 | 
					    relay_path: list[tuple[str, str]]
 | 
				
			||||||
    relay_path: list[str]
 | 
					 | 
				
			||||||
    tb_str: str
 | 
					    tb_str: str
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    cid: str|None = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # TODO: use UNSET or don't include them via
 | 
				
			||||||
 | 
					    #
 | 
				
			||||||
    # `ContextCancelled`
 | 
					    # `ContextCancelled`
 | 
				
			||||||
    canceller: str|None = None
 | 
					    canceller: tuple[str, str]|None = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # `StreamOverrun`
 | 
					    # `StreamOverrun`
 | 
				
			||||||
    sender: str|None = None
 | 
					    sender: tuple[str, str]|None = None
 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
class Error(Msg):
 | 
					 | 
				
			||||||
    '''
 | 
					 | 
				
			||||||
    A pkt that wraps `RemoteActorError`s for relay.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    '''
 | 
					 | 
				
			||||||
    pld: ErrorData
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# TODO: should be make a msg version of `ContextCancelled?`
 | 
					# TODO: should be make a msg version of `ContextCancelled?`
 | 
				
			||||||
| 
						 | 
					@ -265,6 +345,12 @@ class Error(Msg):
 | 
				
			||||||
# approx order of the IPC txn-state spaces.
 | 
					# approx order of the IPC txn-state spaces.
 | 
				
			||||||
__spec__: list[Msg] = [
 | 
					__spec__: list[Msg] = [
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # identity handshake
 | 
				
			||||||
 | 
					    Aid,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # spawn specification from parent
 | 
				
			||||||
 | 
					    SpawnSpec,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # inter-actor RPC initiation
 | 
					    # inter-actor RPC initiation
 | 
				
			||||||
    Start,
 | 
					    Start,
 | 
				
			||||||
    StartAck,
 | 
					    StartAck,
 | 
				
			||||||
| 
						 | 
					@ -280,6 +366,8 @@ __spec__: list[Msg] = [
 | 
				
			||||||
]
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
_runtime_spec_msgs: list[Msg] = [
 | 
					_runtime_spec_msgs: list[Msg] = [
 | 
				
			||||||
 | 
					    Aid,
 | 
				
			||||||
 | 
					    SpawnSpec,
 | 
				
			||||||
    Start,
 | 
					    Start,
 | 
				
			||||||
    StartAck,
 | 
					    StartAck,
 | 
				
			||||||
    Stop,
 | 
					    Stop,
 | 
				
			||||||
| 
						 | 
					@ -443,3 +531,99 @@ def mk_msg_spec(
 | 
				
			||||||
        pld_spec | runtime_spec,
 | 
					        pld_spec | runtime_spec,
 | 
				
			||||||
        msgtypes_table[spec_build_method] + ipc_msg_types,
 | 
					        msgtypes_table[spec_build_method] + ipc_msg_types,
 | 
				
			||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					# TODO: make something similar to this inside `._codec` such that
 | 
				
			||||||
 | 
					# user can just pass a type table of some sort?
 | 
				
			||||||
 | 
					# def mk_dict_msg_codec_hooks() -> tuple[Callable, Callable]:
 | 
				
			||||||
 | 
					#     '''
 | 
				
			||||||
 | 
					#     Deliver a `enc_hook()`/`dec_hook()` pair which does
 | 
				
			||||||
 | 
					#     manual convertion from our above native `Msg` set
 | 
				
			||||||
 | 
					#     to `dict` equivalent (wire msgs) in order to keep legacy compat
 | 
				
			||||||
 | 
					#     with the original runtime implementation.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#     Note: this is is/was primarly used while moving the core
 | 
				
			||||||
 | 
					#     runtime over to using native `Msg`-struct types wherein we
 | 
				
			||||||
 | 
					#     start with the send side emitting without loading
 | 
				
			||||||
 | 
					#     a typed-decoder and then later flipping the switch over to
 | 
				
			||||||
 | 
					#     load to the native struct types once all runtime usage has
 | 
				
			||||||
 | 
					#     been adjusted appropriately.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#     '''
 | 
				
			||||||
 | 
					#     def enc_to_dict(msg: Any) -> Any:
 | 
				
			||||||
 | 
					#         '''
 | 
				
			||||||
 | 
					#         Encode `Msg`-structs to `dict` msgs instead
 | 
				
			||||||
 | 
					#         of using `msgspec.msgpack.Decoder.type`-ed
 | 
				
			||||||
 | 
					#         features.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#         '''
 | 
				
			||||||
 | 
					#         match msg:
 | 
				
			||||||
 | 
					#             case Start():
 | 
				
			||||||
 | 
					#                 dctmsg: dict = pretty_struct.Struct.to_dict(
 | 
				
			||||||
 | 
					#                     msg
 | 
				
			||||||
 | 
					#                 )['pld']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#             case Error():
 | 
				
			||||||
 | 
					#                 dctmsg: dict = pretty_struct.Struct.to_dict(
 | 
				
			||||||
 | 
					#                     msg
 | 
				
			||||||
 | 
					#                 )['pld']
 | 
				
			||||||
 | 
					#                 return {'error': dctmsg}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#     def dec_from_dict(
 | 
				
			||||||
 | 
					#         type: Type,
 | 
				
			||||||
 | 
					#         obj: Any,
 | 
				
			||||||
 | 
					#     ) -> Any:
 | 
				
			||||||
 | 
					#         '''
 | 
				
			||||||
 | 
					#         Decode to `Msg`-structs from `dict` msgs instead
 | 
				
			||||||
 | 
					#         of using `msgspec.msgpack.Decoder.type`-ed
 | 
				
			||||||
 | 
					#         features.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#         '''
 | 
				
			||||||
 | 
					#         cid: str = obj.get('cid')
 | 
				
			||||||
 | 
					#         match obj:
 | 
				
			||||||
 | 
					#             case {'cmd': pld}:
 | 
				
			||||||
 | 
					#                 return Start(
 | 
				
			||||||
 | 
					#                     cid=cid,
 | 
				
			||||||
 | 
					#                     pld=pld,
 | 
				
			||||||
 | 
					#                 )
 | 
				
			||||||
 | 
					#             case {'functype': pld}:
 | 
				
			||||||
 | 
					#                 return StartAck(
 | 
				
			||||||
 | 
					#                     cid=cid,
 | 
				
			||||||
 | 
					#                     functype=pld,
 | 
				
			||||||
 | 
					#                     # pld=IpcCtxSpec(
 | 
				
			||||||
 | 
					#                     #     functype=pld,
 | 
				
			||||||
 | 
					#                     # ),
 | 
				
			||||||
 | 
					#                 )
 | 
				
			||||||
 | 
					#             case {'started': pld}:
 | 
				
			||||||
 | 
					#                 return Started(
 | 
				
			||||||
 | 
					#                     cid=cid,
 | 
				
			||||||
 | 
					#                     pld=pld,
 | 
				
			||||||
 | 
					#                 )
 | 
				
			||||||
 | 
					#             case {'yield': pld}:
 | 
				
			||||||
 | 
					#                 return Yield(
 | 
				
			||||||
 | 
					#                     cid=obj['cid'],
 | 
				
			||||||
 | 
					#                     pld=pld,
 | 
				
			||||||
 | 
					#                 )
 | 
				
			||||||
 | 
					#             case {'stop': pld}:
 | 
				
			||||||
 | 
					#                 return Stop(
 | 
				
			||||||
 | 
					#                     cid=cid,
 | 
				
			||||||
 | 
					#                 )
 | 
				
			||||||
 | 
					#             case {'return': pld}:
 | 
				
			||||||
 | 
					#                 return Return(
 | 
				
			||||||
 | 
					#                     cid=cid,
 | 
				
			||||||
 | 
					#                     pld=pld,
 | 
				
			||||||
 | 
					#                 )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#             case {'error': pld}:
 | 
				
			||||||
 | 
					#                 return Error(
 | 
				
			||||||
 | 
					#                     cid=cid,
 | 
				
			||||||
 | 
					#                     pld=ErrorData(
 | 
				
			||||||
 | 
					#                         **pld
 | 
				
			||||||
 | 
					#                     ),
 | 
				
			||||||
 | 
					#                 )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					#     return (
 | 
				
			||||||
 | 
					#         # enc_to_dict,
 | 
				
			||||||
 | 
					#         dec_from_dict,
 | 
				
			||||||
 | 
					#     )
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue