Compare commits
	
		
			10 Commits 
		
	
	
		
			49c61e40c7
			...
			c0058024c2
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						c0058024c2 | |
| 
							
							
								 | 
						065104401c | |
| 
							
							
								 | 
						3201437f4e | |
| 
							
							
								 | 
						a9da16892d | |
| 
							
							
								 | 
						1b609113c3 | |
| 
							
							
								 | 
						4a80cda841 | |
| 
							
							
								 | 
						131e2ee0a4 | |
| 
							
							
								 | 
						79ef973058 | |
| 
							
							
								 | 
						c738492879 | |
| 
							
							
								 | 
						a931274da6 | 
| 
						 | 
				
			
			@ -21,7 +21,7 @@ Sub-process entry points.
 | 
			
		|||
from __future__ import annotations
 | 
			
		||||
from functools import partial
 | 
			
		||||
import multiprocessing as mp
 | 
			
		||||
import os
 | 
			
		||||
# import os
 | 
			
		||||
from typing import (
 | 
			
		||||
    Any,
 | 
			
		||||
    TYPE_CHECKING,
 | 
			
		||||
| 
						 | 
				
			
			@ -38,6 +38,7 @@ from .devx import (
 | 
			
		|||
    _frame_stack,
 | 
			
		||||
    pformat,
 | 
			
		||||
)
 | 
			
		||||
# from .msg import pretty_struct
 | 
			
		||||
from .to_asyncio import run_as_asyncio_guest
 | 
			
		||||
from ._addr import UnwrappedAddress
 | 
			
		||||
from ._runtime import (
 | 
			
		||||
| 
						 | 
				
			
			@ -127,20 +128,13 @@ def _trio_main(
 | 
			
		|||
 | 
			
		||||
    if actor.loglevel is not None:
 | 
			
		||||
        get_console_log(actor.loglevel)
 | 
			
		||||
        actor_info: str = (
 | 
			
		||||
            f'|_{actor}\n'
 | 
			
		||||
            f'  uid: {actor.uid}\n'
 | 
			
		||||
            f'  pid: {os.getpid()}\n'
 | 
			
		||||
            f'  parent_addr: {parent_addr}\n'
 | 
			
		||||
            f'  loglevel: {actor.loglevel}\n'
 | 
			
		||||
        )
 | 
			
		||||
        log.info(
 | 
			
		||||
            'Starting new `trio` subactor\n'
 | 
			
		||||
            f'Starting `trio` subactor from parent @ '
 | 
			
		||||
            f'{parent_addr}\n'
 | 
			
		||||
            +
 | 
			
		||||
            pformat.nest_from_op(
 | 
			
		||||
                input_op='>(',  # see syntax ideas above
 | 
			
		||||
                text=actor_info,
 | 
			
		||||
                nest_indent=2,  # since "complete"
 | 
			
		||||
                text=f'{actor}',
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
    logmeth = log.info
 | 
			
		||||
| 
						 | 
				
			
			@ -149,7 +143,7 @@ def _trio_main(
 | 
			
		|||
        +
 | 
			
		||||
        pformat.nest_from_op(
 | 
			
		||||
            input_op=')>',  # like a "closed-to-play"-icon from super perspective
 | 
			
		||||
            text=actor_info,
 | 
			
		||||
            text=f'{actor}',
 | 
			
		||||
            nest_indent=1,
 | 
			
		||||
        )
 | 
			
		||||
    )
 | 
			
		||||
| 
						 | 
				
			
			@ -167,7 +161,7 @@ def _trio_main(
 | 
			
		|||
            +
 | 
			
		||||
            pformat.nest_from_op(
 | 
			
		||||
                input_op='c)>',  # closed due to cancel (see above)
 | 
			
		||||
                text=actor_info,
 | 
			
		||||
                text=f'{actor}',
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
    except BaseException as err:
 | 
			
		||||
| 
						 | 
				
			
			@ -177,7 +171,7 @@ def _trio_main(
 | 
			
		|||
            +
 | 
			
		||||
            pformat.nest_from_op(
 | 
			
		||||
                input_op='x)>',  # closed by error
 | 
			
		||||
                text=actor_info,
 | 
			
		||||
                text=f'{actor}',
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        # NOTE since we raise a tb will already be shown on the
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										101
									
								
								tractor/_rpc.py
								
								
								
								
							
							
						
						
									
										101
									
								
								tractor/_rpc.py
								
								
								
								
							| 
						 | 
				
			
			@ -64,6 +64,7 @@ from .trionics import (
 | 
			
		|||
from .devx import (
 | 
			
		||||
    debug,
 | 
			
		||||
    add_div,
 | 
			
		||||
    pformat as _pformat,
 | 
			
		||||
)
 | 
			
		||||
from . import _state
 | 
			
		||||
from .log import get_logger
 | 
			
		||||
| 
						 | 
				
			
			@ -72,7 +73,7 @@ from .msg import (
 | 
			
		|||
    MsgCodec,
 | 
			
		||||
    PayloadT,
 | 
			
		||||
    NamespacePath,
 | 
			
		||||
    # pretty_struct,
 | 
			
		||||
    pretty_struct,
 | 
			
		||||
    _ops as msgops,
 | 
			
		||||
)
 | 
			
		||||
from tractor.msg.types import (
 | 
			
		||||
| 
						 | 
				
			
			@ -220,11 +221,18 @@ async def _invoke_non_context(
 | 
			
		|||
            task_status.started(ctx)
 | 
			
		||||
            result = await coro
 | 
			
		||||
            fname: str = func.__name__
 | 
			
		||||
 | 
			
		||||
            op_nested_task: str = _pformat.nest_from_op(
 | 
			
		||||
                input_op=f')> cid: {ctx.cid!r}',
 | 
			
		||||
                text=f'{ctx._task}',
 | 
			
		||||
                nest_indent=1,  # under >
 | 
			
		||||
            )
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'RPC complete:\n'
 | 
			
		||||
                f'task: {ctx._task}\n'
 | 
			
		||||
                f'|_cid={ctx.cid}\n'
 | 
			
		||||
                f'|_{fname}() -> {pformat(result)}\n'
 | 
			
		||||
                f'RPC task complete\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'{op_nested_task}\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f')> {fname}() -> {pformat(result)}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # NOTE: only send result if we know IPC isn't down
 | 
			
		||||
| 
						 | 
				
			
			@ -1043,7 +1051,7 @@ async def process_messages(
 | 
			
		|||
                    ):
 | 
			
		||||
                        target_cid: str = kwargs['cid']
 | 
			
		||||
                        kwargs |= {
 | 
			
		||||
                            'requesting_uid': chan.uid,
 | 
			
		||||
                            'requesting_aid': chan.aid,
 | 
			
		||||
                            'ipc_msg': msg,
 | 
			
		||||
 | 
			
		||||
                            # XXX NOTE! ONLY the rpc-task-owning
 | 
			
		||||
| 
						 | 
				
			
			@ -1079,21 +1087,34 @@ async def process_messages(
 | 
			
		|||
                        ns=ns,
 | 
			
		||||
                        func=funcname,
 | 
			
		||||
                        kwargs=kwargs,  # type-spec this? see `msg.types`
 | 
			
		||||
                        uid=actorid,
 | 
			
		||||
                        uid=actor_uuid,
 | 
			
		||||
                    ):
 | 
			
		||||
                        if actor_uuid != chan.aid.uid:
 | 
			
		||||
                            raise RuntimeError(
 | 
			
		||||
                                f'IPC <Start> msg <-> chan.aid mismatch!?\n'
 | 
			
		||||
                                f'Channel.aid = {chan.aid!r}\n'
 | 
			
		||||
                                f'Start.uid = {actor_uuid!r}\n'
 | 
			
		||||
                            )
 | 
			
		||||
                        # await debug.pause()
 | 
			
		||||
                        op_repr: str = 'Start <=) '
 | 
			
		||||
                        req_repr: str = _pformat.nest_from_op(
 | 
			
		||||
                            input_op=op_repr,
 | 
			
		||||
                            op_suffix='',
 | 
			
		||||
                            nest_prefix='',
 | 
			
		||||
                            text=f'{chan}',
 | 
			
		||||
 | 
			
		||||
                            nest_indent=len(op_repr)-1,
 | 
			
		||||
                            rm_from_first_ln='<',
 | 
			
		||||
                            # ^XXX, subtract -1 to account for
 | 
			
		||||
                            # <Channel
 | 
			
		||||
                            # ^_chevron to be stripped
 | 
			
		||||
                        )
 | 
			
		||||
                        start_status: str = (
 | 
			
		||||
                            'Handling RPC `Start` request\n'
 | 
			
		||||
                            f'<= peer: {actorid}\n\n'
 | 
			
		||||
                            f'  |_{chan}\n'
 | 
			
		||||
                            f'  |_cid: {cid}\n\n'
 | 
			
		||||
                            # f'  |_{ns}.{funcname}({kwargs})\n'
 | 
			
		||||
                            f'>> {actor.uid}\n'
 | 
			
		||||
                            f'  |_{actor}\n'
 | 
			
		||||
                            f'   -> nsp: `{ns}.{funcname}({kwargs})`\n'
 | 
			
		||||
 | 
			
		||||
                            # f'  |_{ns}.{funcname}({kwargs})\n\n'
 | 
			
		||||
 | 
			
		||||
                            # f'{pretty_struct.pformat(msg)}\n'
 | 
			
		||||
                            'Handling RPC request\n'
 | 
			
		||||
                            f'{req_repr}\n'
 | 
			
		||||
                            f'\n'
 | 
			
		||||
                            f'->{{ ipc-context-id: {cid!r}\n'
 | 
			
		||||
                            f'->{{ nsp for fn: `{ns}.{funcname}({kwargs})`\n'
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
                        # runtime-internal endpoint: `Actor.<funcname>`
 | 
			
		||||
| 
						 | 
				
			
			@ -1122,10 +1143,6 @@ async def process_messages(
 | 
			
		|||
                                await chan.send(err_msg)
 | 
			
		||||
                                continue
 | 
			
		||||
 | 
			
		||||
                        start_status += (
 | 
			
		||||
                            f'   -> func: {func}\n'
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
                        # schedule a task for the requested RPC function
 | 
			
		||||
                        # in the actor's main "service nursery".
 | 
			
		||||
                        #
 | 
			
		||||
| 
						 | 
				
			
			@ -1133,7 +1150,7 @@ async def process_messages(
 | 
			
		|||
                        # supervision isolation? would avoid having to
 | 
			
		||||
                        # manage RPC tasks individually in `._rpc_tasks`
 | 
			
		||||
                        # table?
 | 
			
		||||
                        start_status += '   -> scheduling new task..\n'
 | 
			
		||||
                        start_status += '->( scheduling new task..\n'
 | 
			
		||||
                        log.runtime(start_status)
 | 
			
		||||
                        try:
 | 
			
		||||
                            ctx: Context = await actor._service_n.start(
 | 
			
		||||
| 
						 | 
				
			
			@ -1222,7 +1239,7 @@ async def process_messages(
 | 
			
		|||
                f'|_{chan}\n'
 | 
			
		||||
            )
 | 
			
		||||
            await actor.cancel_rpc_tasks(
 | 
			
		||||
                req_uid=actor.uid,
 | 
			
		||||
                req_aid=actor.aid,
 | 
			
		||||
                # a "self cancel" in terms of the lifetime of the
 | 
			
		||||
                # IPC connection which is presumed to be the
 | 
			
		||||
                # source of any requests for spawned tasks.
 | 
			
		||||
| 
						 | 
				
			
			@ -1294,13 +1311,37 @@ async def process_messages(
 | 
			
		|||
    finally:
 | 
			
		||||
        # msg debugging for when he machinery is brokey
 | 
			
		||||
        if msg is None:
 | 
			
		||||
            message: str = 'Exiting IPC msg loop without receiving a msg?'
 | 
			
		||||
            message: str = 'Exiting RPC-loop without receiving a msg?'
 | 
			
		||||
        else:
 | 
			
		||||
            task_op_repr: str = ')>'
 | 
			
		||||
            task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
 | 
			
		||||
            # maybe add cancelled opt prefix
 | 
			
		||||
            if task._cancel_status.effectively_cancelled:
 | 
			
		||||
                task_op_repr = 'c' + task_op_repr
 | 
			
		||||
 | 
			
		||||
            task_repr: str = _pformat.nest_from_op(
 | 
			
		||||
                input_op=task_op_repr,
 | 
			
		||||
                text=f'{task!r}',
 | 
			
		||||
                nest_indent=1,
 | 
			
		||||
            )
 | 
			
		||||
            # chan_op_repr: str = '<=} '
 | 
			
		||||
            # chan_repr: str = _pformat.nest_from_op(
 | 
			
		||||
            #     input_op=chan_op_repr,
 | 
			
		||||
            #     op_suffix='',
 | 
			
		||||
            #     nest_prefix='',
 | 
			
		||||
            #     text=chan.pformat(),
 | 
			
		||||
            #     nest_indent=len(chan_op_repr)-1,
 | 
			
		||||
            #     rm_from_first_ln='<',
 | 
			
		||||
            # )
 | 
			
		||||
            message: str = (
 | 
			
		||||
                'Exiting IPC msg loop with final msg\n\n'
 | 
			
		||||
                f'<= peer: {chan.uid}\n'
 | 
			
		||||
                f'  |_{chan}\n\n'
 | 
			
		||||
                # f'{pretty_struct.pformat(msg)}'
 | 
			
		||||
                f'Exiting RPC-loop with final msg\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                # f'{chan_repr}\n'
 | 
			
		||||
                f'{task_repr}\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'{pretty_struct.pformat(msg)}'
 | 
			
		||||
                f'\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        log.runtime(message)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -262,6 +262,7 @@ def nest_from_op(
 | 
			
		|||
    nest_indent: int|None = None,
 | 
			
		||||
    # XXX indent `next_prefix` "to-the-right-of" `input_op`
 | 
			
		||||
    # by this count of whitespaces (' ').
 | 
			
		||||
    rm_from_first_ln: str|None = None,
 | 
			
		||||
 | 
			
		||||
) -> str:
 | 
			
		||||
    '''
 | 
			
		||||
| 
						 | 
				
			
			@ -346,20 +347,35 @@ def nest_from_op(
 | 
			
		|||
    if (
 | 
			
		||||
        nest_prefix
 | 
			
		||||
        and
 | 
			
		||||
        nest_indent
 | 
			
		||||
        nest_indent != 0
 | 
			
		||||
    ):
 | 
			
		||||
        nest_prefix: str = textwrap.indent(
 | 
			
		||||
            nest_prefix,
 | 
			
		||||
            prefix=nest_indent*' ',
 | 
			
		||||
        )
 | 
			
		||||
        if nest_indent is not None:
 | 
			
		||||
            nest_prefix: str = textwrap.indent(
 | 
			
		||||
                nest_prefix,
 | 
			
		||||
                prefix=nest_indent*' ',
 | 
			
		||||
            )
 | 
			
		||||
        nest_indent: int = len(nest_prefix)
 | 
			
		||||
 | 
			
		||||
    # determine body-text indent either by,
 | 
			
		||||
    # - using wtv explicit indent value is provided,
 | 
			
		||||
    # OR
 | 
			
		||||
    # - auto-calcing the indent to embed `text` under
 | 
			
		||||
    #   the `nest_prefix` if provided, **IFF** `nest_indent=None`.
 | 
			
		||||
    tree_str_indent: int = 0
 | 
			
		||||
    if nest_indent not in {0, None}:
 | 
			
		||||
        tree_str_indent = nest_indent
 | 
			
		||||
    elif (
 | 
			
		||||
        nest_prefix
 | 
			
		||||
        and
 | 
			
		||||
        nest_indent != 0
 | 
			
		||||
    ):
 | 
			
		||||
        tree_str_indent = len(nest_prefix)
 | 
			
		||||
 | 
			
		||||
    indented_tree_str: str = text
 | 
			
		||||
    tree_str_indent: int = 0
 | 
			
		||||
    if nest_indent != 0:
 | 
			
		||||
        tree_str_indent: int = len(nest_prefix)
 | 
			
		||||
    if tree_str_indent:
 | 
			
		||||
        indented_tree_str: str = textwrap.indent(
 | 
			
		||||
            text,
 | 
			
		||||
            prefix=' '*tree_str_indent
 | 
			
		||||
            prefix=' '*tree_str_indent,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # inject any provided nesting-prefix chars
 | 
			
		||||
| 
						 | 
				
			
			@ -369,18 +385,35 @@ def nest_from_op(
 | 
			
		|||
            f'{nest_prefix}{indented_tree_str[tree_str_indent:]}'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    if (
 | 
			
		||||
        not prefix_op
 | 
			
		||||
        or
 | 
			
		||||
        rm_from_first_ln
 | 
			
		||||
    ):
 | 
			
		||||
        tree_lns: list[str] = indented_tree_str.splitlines()
 | 
			
		||||
        first: str = tree_lns[0]
 | 
			
		||||
        if rm_from_first_ln:
 | 
			
		||||
            first = first.strip().replace(
 | 
			
		||||
                rm_from_first_ln,
 | 
			
		||||
                '',
 | 
			
		||||
            )
 | 
			
		||||
        indented_tree_str: str = '\n'.join(tree_lns[1:])
 | 
			
		||||
 | 
			
		||||
        if prefix_op:
 | 
			
		||||
            indented_tree_str = (
 | 
			
		||||
                f'{first}\n'
 | 
			
		||||
                f'{indented_tree_str}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    if prefix_op:
 | 
			
		||||
        return (
 | 
			
		||||
            f'{input_op}{op_suffix}'
 | 
			
		||||
            f'{indented_tree_str}'
 | 
			
		||||
        )
 | 
			
		||||
    else:
 | 
			
		||||
        tree_lns: list[str] = indented_tree_str.splitlines()
 | 
			
		||||
        first: str = tree_lns[0]
 | 
			
		||||
        rest: str = '\n'.join(tree_lns[1:])
 | 
			
		||||
        return (
 | 
			
		||||
            f'{first}{input_op}{op_suffix}'
 | 
			
		||||
            f'{rest}'
 | 
			
		||||
            f'{indented_tree_str}'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -171,11 +171,23 @@ class Channel:
 | 
			
		|||
        )
 | 
			
		||||
        assert transport.raddr == addr
 | 
			
		||||
        chan = Channel(transport=transport)
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Connected channel IPC transport\n'
 | 
			
		||||
            f'[>\n'
 | 
			
		||||
            f' |_{chan}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # ?TODO, compact this into adapter level-methods?
 | 
			
		||||
        # -[ ] would avoid extra repr-calcs if level not active?
 | 
			
		||||
        #   |_ how would the `calc_if_level` look though? func?
 | 
			
		||||
        if log.at_least_level('runtime'):
 | 
			
		||||
            from tractor.devx import (
 | 
			
		||||
                pformat as _pformat,
 | 
			
		||||
            )
 | 
			
		||||
            chan_repr: str = _pformat.nest_from_op(
 | 
			
		||||
                input_op='[>',
 | 
			
		||||
                text=chan.pformat(),
 | 
			
		||||
                nest_indent=1,
 | 
			
		||||
            )
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                f'Connected channel IPC transport\n'
 | 
			
		||||
                f'{chan_repr}'
 | 
			
		||||
            )
 | 
			
		||||
        return chan
 | 
			
		||||
 | 
			
		||||
    @cm
 | 
			
		||||
| 
						 | 
				
			
			@ -218,17 +230,19 @@ class Channel:
 | 
			
		|||
            if privates else ''
 | 
			
		||||
        ) + (  # peer-actor (processs) section
 | 
			
		||||
            f' |_peer: {self.aid.reprol()!r}\n'
 | 
			
		||||
           if self.aid else '<unknown>'
 | 
			
		||||
            if self.aid else ' |_peer: <unknown>\n'
 | 
			
		||||
        ) + (
 | 
			
		||||
            f' |_msgstream: {tpt_name}\n'
 | 
			
		||||
            f'   proto={tpt.laddr.proto_key!r}\n'
 | 
			
		||||
            f'   layer={tpt.layer_key!r}\n'
 | 
			
		||||
            f'   laddr={tpt.laddr}\n'
 | 
			
		||||
            f'   raddr={tpt.raddr}\n'
 | 
			
		||||
            f'   codec={tpt.codec_key!r}\n'
 | 
			
		||||
            f'   stream={tpt.stream}\n'
 | 
			
		||||
            f'   maddr={tpt.maddr!r}\n'
 | 
			
		||||
            f'   drained={tpt.drained}\n'
 | 
			
		||||
            f'   maddr: {tpt.maddr!r}\n'
 | 
			
		||||
            f'   proto: {tpt.laddr.proto_key!r}\n'
 | 
			
		||||
            f'   layer: {tpt.layer_key!r}\n'
 | 
			
		||||
            f'   codec: {tpt.codec_key!r}\n'
 | 
			
		||||
            f'   .laddr={tpt.laddr}\n'
 | 
			
		||||
            f'   .raddr={tpt.raddr}\n'
 | 
			
		||||
        ) + (
 | 
			
		||||
            f'   ._transport.stream={tpt.stream}\n'
 | 
			
		||||
            f'   ._transport.drained={tpt.drained}\n'
 | 
			
		||||
            if privates else ''
 | 
			
		||||
        ) + (
 | 
			
		||||
            f'   _send_lock={tpt._send_lock.statistics()}\n'
 | 
			
		||||
            if privates else ''
 | 
			
		||||
| 
						 | 
				
			
			@ -257,6 +271,10 @@ class Channel:
 | 
			
		|||
    def raddr(self) -> Address|None:
 | 
			
		||||
        return self._transport.raddr if self._transport else None
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def maddr(self) -> str:
 | 
			
		||||
        return self._transport.maddr if self._transport else '<no-tpt>'
 | 
			
		||||
 | 
			
		||||
    # TODO: something like,
 | 
			
		||||
    # `pdbp.hideframe_on(errors=[MsgTypeError])`
 | 
			
		||||
    # instead of the `try/except` hack we have rn..
 | 
			
		||||
| 
						 | 
				
			
			@ -444,8 +462,8 @@ class Channel:
 | 
			
		|||
        await self.send(aid)
 | 
			
		||||
        peer_aid: Aid = await self.recv()
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Received hanshake with peer actor,\n'
 | 
			
		||||
            f'{peer_aid}\n'
 | 
			
		||||
            f'Received hanshake with peer\n'
 | 
			
		||||
            f'<= {peer_aid.reprol(sin_uuid=False)}\n'
 | 
			
		||||
        )
 | 
			
		||||
        # NOTE, we always are referencing the remote peer!
 | 
			
		||||
        self.aid = peer_aid
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,9 +17,16 @@
 | 
			
		|||
Utils to tame mp non-SC madeness
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
 | 
			
		||||
# !TODO! in 3.13 this can be disabled (the-same/similarly) using
 | 
			
		||||
# a flag,
 | 
			
		||||
# - [ ] soo if it works like this, drop this module entirely for
 | 
			
		||||
#   3.13+ B)
 | 
			
		||||
#  |_https://docs.python.org/3/library/multiprocessing.shared_memory.html
 | 
			
		||||
#
 | 
			
		||||
def disable_mantracker():
 | 
			
		||||
    '''
 | 
			
		||||
    Disable all ``multiprocessing``` "resource tracking" machinery since
 | 
			
		||||
    Disable all `multiprocessing` "resource tracking" machinery since
 | 
			
		||||
    it's an absolute multi-threaded mess of non-SC madness.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -26,7 +26,7 @@ from contextlib import (
 | 
			
		|||
from functools import partial
 | 
			
		||||
from itertools import chain
 | 
			
		||||
import inspect
 | 
			
		||||
from pprint import pformat
 | 
			
		||||
import textwrap
 | 
			
		||||
from types import (
 | 
			
		||||
    ModuleType,
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -43,7 +43,10 @@ from trio import (
 | 
			
		|||
    SocketListener,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
# from ..devx import debug
 | 
			
		||||
from ..devx.pformat import (
 | 
			
		||||
    ppfmt,
 | 
			
		||||
    nest_from_op,
 | 
			
		||||
)
 | 
			
		||||
from .._exceptions import (
 | 
			
		||||
    TransportClosed,
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -141,9 +144,8 @@ async def maybe_wait_on_canced_subs(
 | 
			
		|||
 | 
			
		||||
    ):
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            'Waiting on cancel request to peer..\n'
 | 
			
		||||
            f'c)=>\n'
 | 
			
		||||
            f'  |_{chan.aid}\n'
 | 
			
		||||
            'Waiting on cancel request to peer\n'
 | 
			
		||||
            f'c)=> {chan.aid.reprol()}@[{chan.maddr}]\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # XXX: this is a soft wait on the channel (and its
 | 
			
		||||
| 
						 | 
				
			
			@ -179,7 +181,7 @@ async def maybe_wait_on_canced_subs(
 | 
			
		|||
                log.warning(
 | 
			
		||||
                    'Draining msg from disconnected peer\n'
 | 
			
		||||
                    f'{chan_info}'
 | 
			
		||||
                    f'{pformat(msg)}\n'
 | 
			
		||||
                    f'{ppfmt(msg)}\n'
 | 
			
		||||
                )
 | 
			
		||||
                # cid: str|None = msg.get('cid')
 | 
			
		||||
                cid: str|None = msg.cid
 | 
			
		||||
| 
						 | 
				
			
			@ -248,7 +250,7 @@ async def maybe_wait_on_canced_subs(
 | 
			
		|||
                if children := local_nursery._children:
 | 
			
		||||
                    # indent from above local-nurse repr
 | 
			
		||||
                    report += (
 | 
			
		||||
                        f'   |_{pformat(children)}\n'
 | 
			
		||||
                        f'   |_{ppfmt(children)}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                log.warning(report)
 | 
			
		||||
| 
						 | 
				
			
			@ -279,8 +281,9 @@ async def maybe_wait_on_canced_subs(
 | 
			
		|||
                    log.runtime(
 | 
			
		||||
                        f'Peer IPC broke but subproc is alive?\n\n'
 | 
			
		||||
 | 
			
		||||
                        f'<=x {chan.aid}@{chan.raddr}\n'
 | 
			
		||||
                        f'   |_{proc}\n'
 | 
			
		||||
                        f'<=x {chan.aid.reprol()}@[{chan.maddr}]\n'
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'{proc}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
    return local_nursery
 | 
			
		||||
| 
						 | 
				
			
			@ -324,9 +327,10 @@ async def handle_stream_from_peer(
 | 
			
		|||
 | 
			
		||||
    chan = Channel.from_stream(stream)
 | 
			
		||||
    con_status: str = (
 | 
			
		||||
        'New inbound IPC connection <=\n'
 | 
			
		||||
        f'|_{chan}\n'
 | 
			
		||||
        f'New inbound IPC transport connection\n'
 | 
			
		||||
        f'<=( {stream!r}\n'
 | 
			
		||||
    )
 | 
			
		||||
    con_status_steps: str = ''
 | 
			
		||||
 | 
			
		||||
    # initial handshake with peer phase
 | 
			
		||||
    try:
 | 
			
		||||
| 
						 | 
				
			
			@ -372,7 +376,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
    if _pre_chan := server._peers.get(uid):
 | 
			
		||||
        familiar: str = 'pre-existing-peer'
 | 
			
		||||
    uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
 | 
			
		||||
    con_status += (
 | 
			
		||||
    con_status_steps += (
 | 
			
		||||
        f' -> Handshake with {familiar} `{uid_short}` complete\n'
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -397,7 +401,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
        None,
 | 
			
		||||
    )
 | 
			
		||||
    if event:
 | 
			
		||||
        con_status += (
 | 
			
		||||
        con_status_steps += (
 | 
			
		||||
            ' -> Waking subactor spawn waiters: '
 | 
			
		||||
            f'{event.statistics().tasks_waiting}\n'
 | 
			
		||||
            f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
 | 
			
		||||
| 
						 | 
				
			
			@ -408,7 +412,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
        event.set()
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        con_status += (
 | 
			
		||||
        con_status_steps += (
 | 
			
		||||
            f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
 | 
			
		||||
        )  # type: ignore
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -422,8 +426,15 @@ async def handle_stream_from_peer(
 | 
			
		|||
    # TODO: can we just use list-ref directly?
 | 
			
		||||
    chans.append(chan)
 | 
			
		||||
 | 
			
		||||
    con_status += ' -> Entering RPC msg loop..\n'
 | 
			
		||||
    log.runtime(con_status)
 | 
			
		||||
    con_status_steps += ' -> Entering RPC msg loop..\n'
 | 
			
		||||
    log.runtime(
 | 
			
		||||
        con_status
 | 
			
		||||
        +
 | 
			
		||||
        textwrap.indent(
 | 
			
		||||
            con_status_steps,
 | 
			
		||||
            prefix=' '*3,  # align to first-ln
 | 
			
		||||
        )
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # Begin channel management - respond to remote requests and
 | 
			
		||||
    # process received reponses.
 | 
			
		||||
| 
						 | 
				
			
			@ -456,41 +467,67 @@ async def handle_stream_from_peer(
 | 
			
		|||
            disconnected=disconnected,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # ``Channel`` teardown and closure sequence
 | 
			
		||||
        # `Channel` teardown and closure sequence
 | 
			
		||||
        # drop ref to channel so it can be gc-ed and disconnected
 | 
			
		||||
        con_teardown_status: str = (
 | 
			
		||||
            f'IPC channel disconnected:\n'
 | 
			
		||||
            f'<=x uid: {chan.aid}\n'
 | 
			
		||||
            f'   |_{pformat(chan)}\n\n'
 | 
			
		||||
        #
 | 
			
		||||
        # -[x]TODO mk this be like
 | 
			
		||||
        # <=x Channel(
 | 
			
		||||
        #     |_field: blah
 | 
			
		||||
        # )>
 | 
			
		||||
        op_repr: str = '<=x '
 | 
			
		||||
        chan_repr: str = nest_from_op(
 | 
			
		||||
            input_op=op_repr,
 | 
			
		||||
            op_suffix='',
 | 
			
		||||
            nest_prefix='',
 | 
			
		||||
            text=chan.pformat(),
 | 
			
		||||
            nest_indent=len(op_repr)-1,
 | 
			
		||||
            rm_from_first_ln='<',
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        con_teardown_status: str = (
 | 
			
		||||
            f'IPC channel disconnect\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'{chan_repr}\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        chans.remove(chan)
 | 
			
		||||
 | 
			
		||||
        # TODO: do we need to be this pedantic?
 | 
			
		||||
        if not chans:
 | 
			
		||||
            con_teardown_status += (
 | 
			
		||||
                f'-> No more channels with {chan.aid}'
 | 
			
		||||
                f'-> No more channels with {chan.aid.reprol()!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            server._peers.pop(uid, None)
 | 
			
		||||
 | 
			
		||||
        peers_str: str = ''
 | 
			
		||||
        for uid, chans in server._peers.items():
 | 
			
		||||
            peers_str += (
 | 
			
		||||
                f'uid: {uid}\n'
 | 
			
		||||
            )
 | 
			
		||||
            for i, chan in enumerate(chans):
 | 
			
		||||
                peers_str += (
 | 
			
		||||
                    f' |_[{i}] {pformat(chan)}\n'
 | 
			
		||||
        if peers := list(server._peers.values()):
 | 
			
		||||
            peer_cnt: int = len(peers)
 | 
			
		||||
            if (
 | 
			
		||||
                (first := peers[0][0]) is not chan
 | 
			
		||||
                and
 | 
			
		||||
                not disconnected
 | 
			
		||||
                and
 | 
			
		||||
                peer_cnt > 1
 | 
			
		||||
            ):
 | 
			
		||||
                con_teardown_status += (
 | 
			
		||||
                    f'-> Remaining IPC {peer_cnt-1!r} peers:\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        con_teardown_status += (
 | 
			
		||||
            f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n'
 | 
			
		||||
        )
 | 
			
		||||
                for chans in server._peers.values():
 | 
			
		||||
                    first: Channel = chans[0]
 | 
			
		||||
                    if not (
 | 
			
		||||
                        first is chan
 | 
			
		||||
                        and
 | 
			
		||||
                        disconnected
 | 
			
		||||
                    ):
 | 
			
		||||
                        con_teardown_status += (
 | 
			
		||||
                            f'  |_{first.aid.reprol()!r} -> {len(chans)!r} chans\n'
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
        # No more channels to other actors (at all) registered
 | 
			
		||||
        # as connected.
 | 
			
		||||
        if not server._peers:
 | 
			
		||||
            con_teardown_status += (
 | 
			
		||||
                'Signalling no more peer channel connections'
 | 
			
		||||
                '-> Signalling no more peer connections!\n'
 | 
			
		||||
            )
 | 
			
		||||
            server._no_more_peers.set()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -579,10 +616,10 @@ async def handle_stream_from_peer(
 | 
			
		|||
 | 
			
		||||
class Endpoint(Struct):
 | 
			
		||||
    '''
 | 
			
		||||
    An instance of an IPC "bound" address where the lifetime of the
 | 
			
		||||
    "ability to accept connections" (from clients) and then handle
 | 
			
		||||
    those inbound sessions or sequences-of-packets is determined by
 | 
			
		||||
    a (maybe pair of) nurser(y/ies).
 | 
			
		||||
    An instance of an IPC "bound" address where the lifetime of an
 | 
			
		||||
    "ability to accept connections" and handle the subsequent
 | 
			
		||||
    sequence-of-packets (maybe oriented as sessions) is determined by
 | 
			
		||||
    the underlying nursery scope(s).
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    addr: Address
 | 
			
		||||
| 
						 | 
				
			
			@ -600,6 +637,24 @@ class Endpoint(Struct):
 | 
			
		|||
        MsgTransport,  # handle to encoded-msg transport stream
 | 
			
		||||
    ] = {}
 | 
			
		||||
 | 
			
		||||
    def pformat(
 | 
			
		||||
        self,
 | 
			
		||||
        indent: int = 0,
 | 
			
		||||
        privates: bool = False,
 | 
			
		||||
    ) -> str:
 | 
			
		||||
        type_repr: str = type(self).__name__
 | 
			
		||||
        fmtstr: str = (
 | 
			
		||||
            # !TODO, always be ns aware!
 | 
			
		||||
            # f'|_netns: {netns}\n'
 | 
			
		||||
            f' |.addr: {self.addr!r}\n'
 | 
			
		||||
            f' |_peers: {len(self.peer_tpts)}\n'
 | 
			
		||||
        )
 | 
			
		||||
        return (
 | 
			
		||||
            f'<{type_repr}(\n'
 | 
			
		||||
            f'{fmtstr}'
 | 
			
		||||
            f')>'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    async def start_listener(self) -> SocketListener:
 | 
			
		||||
        tpt_mod: ModuleType = inspect.getmodule(self.addr)
 | 
			
		||||
        lstnr: SocketListener = await tpt_mod.start_listener(
 | 
			
		||||
| 
						 | 
				
			
			@ -639,11 +694,13 @@ class Endpoint(Struct):
 | 
			
		|||
class Server(Struct):
 | 
			
		||||
    _parent_tn: Nursery
 | 
			
		||||
    _stream_handler_tn: Nursery
 | 
			
		||||
 | 
			
		||||
    # level-triggered sig for whether "no peers are currently
 | 
			
		||||
    # connected"; field is **always** set to an instance but
 | 
			
		||||
    # initialized with `.is_set() == True`.
 | 
			
		||||
    _no_more_peers: trio.Event
 | 
			
		||||
 | 
			
		||||
    # active eps as allocated by `.listen_on()`
 | 
			
		||||
    _endpoints: list[Endpoint] = []
 | 
			
		||||
 | 
			
		||||
    # connection tracking & mgmt
 | 
			
		||||
| 
						 | 
				
			
			@ -651,12 +708,19 @@ class Server(Struct):
 | 
			
		|||
        str,  # uaid
 | 
			
		||||
        list[Channel],  # IPC conns from peer
 | 
			
		||||
    ] = defaultdict(list)
 | 
			
		||||
 | 
			
		||||
    # events-table with entries registered unset while the local
 | 
			
		||||
    # actor is waiting on a new actor to inbound connect, often
 | 
			
		||||
    # a parent waiting on its child just after spawn.
 | 
			
		||||
    _peer_connected: dict[
 | 
			
		||||
        tuple[str, str],
 | 
			
		||||
        trio.Event,
 | 
			
		||||
    ] = {}
 | 
			
		||||
 | 
			
		||||
    # syncs for setup/teardown sequences
 | 
			
		||||
    # - null when not yet booted,
 | 
			
		||||
    # - unset when active,
 | 
			
		||||
    # - set when fully shutdown with 0 eps active.
 | 
			
		||||
    _shutdown: trio.Event|None = None
 | 
			
		||||
 | 
			
		||||
    # TODO, maybe just make `._endpoints: list[Endpoint]` and
 | 
			
		||||
| 
						 | 
				
			
			@ -664,7 +728,6 @@ class Server(Struct):
 | 
			
		|||
    # @property
 | 
			
		||||
    # def addrs2eps(self) -> dict[Address, Endpoint]:
 | 
			
		||||
    #     ...
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def proto_keys(self) -> list[str]:
 | 
			
		||||
        return [
 | 
			
		||||
| 
						 | 
				
			
			@ -690,7 +753,7 @@ class Server(Struct):
 | 
			
		|||
            # TODO: obvi a different server type when we eventually
 | 
			
		||||
            # support some others XD
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                f'Cancelling server(s) for\n'
 | 
			
		||||
                f'Cancelling server(s) for tpt-protos\n'
 | 
			
		||||
                f'{self.proto_keys!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            self._parent_tn.cancel_scope.cancel()
 | 
			
		||||
| 
						 | 
				
			
			@ -717,6 +780,14 @@ class Server(Struct):
 | 
			
		|||
                f'protos: {tpt_protos!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def len_peers(
 | 
			
		||||
        self,
 | 
			
		||||
    ) -> int:
 | 
			
		||||
        return len([
 | 
			
		||||
            chan.connected()
 | 
			
		||||
            for chan in chain(*self._peers.values())
 | 
			
		||||
        ])
 | 
			
		||||
 | 
			
		||||
    def has_peers(
 | 
			
		||||
        self,
 | 
			
		||||
        check_chans: bool = False,
 | 
			
		||||
| 
						 | 
				
			
			@ -730,13 +801,11 @@ class Server(Struct):
 | 
			
		|||
            has_peers
 | 
			
		||||
            and
 | 
			
		||||
            check_chans
 | 
			
		||||
            and
 | 
			
		||||
            (peer_cnt := self.len_peers())
 | 
			
		||||
        ):
 | 
			
		||||
            has_peers: bool = (
 | 
			
		||||
                any(chan.connected()
 | 
			
		||||
                    for chan in chain(
 | 
			
		||||
                        *self._peers.values()
 | 
			
		||||
                    )
 | 
			
		||||
                )
 | 
			
		||||
                peer_cnt > 0
 | 
			
		||||
                and
 | 
			
		||||
                has_peers
 | 
			
		||||
            )
 | 
			
		||||
| 
						 | 
				
			
			@ -803,30 +872,66 @@ class Server(Struct):
 | 
			
		|||
 | 
			
		||||
        return ev.is_set()
 | 
			
		||||
 | 
			
		||||
    def pformat(self) -> str:
 | 
			
		||||
    @property
 | 
			
		||||
    def repr_state(self) -> str:
 | 
			
		||||
        '''
 | 
			
		||||
        A `str`-status describing the current state of this
 | 
			
		||||
        IPC server in terms of the current operating "phase".
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        status = 'server is active'
 | 
			
		||||
        if self.has_peers():
 | 
			
		||||
            peer_cnt: int = self.len_peers()
 | 
			
		||||
            status: str = (
 | 
			
		||||
                f'{peer_cnt!r} peer chans'
 | 
			
		||||
            )
 | 
			
		||||
        else:
 | 
			
		||||
            status: str = 'No peer chans'
 | 
			
		||||
 | 
			
		||||
        if self.is_shutdown():
 | 
			
		||||
            status: str = 'server-shutdown'
 | 
			
		||||
 | 
			
		||||
        return status
 | 
			
		||||
 | 
			
		||||
    def pformat(
 | 
			
		||||
        self,
 | 
			
		||||
        privates: bool = False,
 | 
			
		||||
    ) -> str:
 | 
			
		||||
        eps: list[Endpoint] = self._endpoints
 | 
			
		||||
 | 
			
		||||
        state_repr: str = (
 | 
			
		||||
            f'{len(eps)!r} IPC-endpoints active'
 | 
			
		||||
        )
 | 
			
		||||
        # state_repr: str = (
 | 
			
		||||
        #     f'{len(eps)!r} endpoints active'
 | 
			
		||||
        # )
 | 
			
		||||
        fmtstr = (
 | 
			
		||||
            f' |_state: {state_repr}\n'
 | 
			
		||||
            f'   no_more_peers: {self.has_peers()}\n'
 | 
			
		||||
            f' |_state: {self.repr_state!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
        if self._shutdown is not None:
 | 
			
		||||
            shutdown_stats: EventStatistics = self._shutdown.statistics()
 | 
			
		||||
        if privates:
 | 
			
		||||
            fmtstr += f'   no_more_peers: {self.has_peers()}\n'
 | 
			
		||||
 | 
			
		||||
            if self._shutdown is not None:
 | 
			
		||||
                shutdown_stats: EventStatistics = self._shutdown.statistics()
 | 
			
		||||
                fmtstr += (
 | 
			
		||||
                    f'   task_waiting_on_shutdown: {shutdown_stats}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        if eps := self._endpoints:
 | 
			
		||||
            addrs: list[tuple] = [
 | 
			
		||||
                ep.addr for ep in eps
 | 
			
		||||
            ]
 | 
			
		||||
            repr_eps: str = ppfmt(addrs)
 | 
			
		||||
 | 
			
		||||
            fmtstr += (
 | 
			
		||||
                f'   task_waiting_on_shutdown: {shutdown_stats}\n'
 | 
			
		||||
                f' |_endpoints: {repr_eps}\n'
 | 
			
		||||
                # ^TODO? how to indent closing ']'..
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        fmtstr += (
 | 
			
		||||
            # TODO, use the `ppfmt()` helper from `modden`!
 | 
			
		||||
            f' |_endpoints: {pformat(self._endpoints)}\n'
 | 
			
		||||
            f' |_peers: {len(self._peers)} connected\n'
 | 
			
		||||
        )
 | 
			
		||||
        if peers := self._peers:
 | 
			
		||||
            fmtstr += (
 | 
			
		||||
                f' |_peers: {len(peers)} connected\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        return (
 | 
			
		||||
            f'<IPCServer(\n'
 | 
			
		||||
            f'<Server(\n'
 | 
			
		||||
            f'{fmtstr}'
 | 
			
		||||
            f')>\n'
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -885,8 +990,8 @@ class Server(Struct):
 | 
			
		|||
            )
 | 
			
		||||
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Binding to endpoints for,\n'
 | 
			
		||||
            f'{accept_addrs}\n'
 | 
			
		||||
            f'Binding endpoints\n'
 | 
			
		||||
            f'{ppfmt(accept_addrs)}\n'
 | 
			
		||||
        )
 | 
			
		||||
        eps: list[Endpoint] = await self._parent_tn.start(
 | 
			
		||||
            partial(
 | 
			
		||||
| 
						 | 
				
			
			@ -896,13 +1001,19 @@ class Server(Struct):
 | 
			
		|||
                listen_addrs=accept_addrs,
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        self._endpoints.extend(eps)
 | 
			
		||||
 | 
			
		||||
        serv_repr: str = nest_from_op(
 | 
			
		||||
            input_op='(>',
 | 
			
		||||
            text=self.pformat(),
 | 
			
		||||
            nest_indent=1,
 | 
			
		||||
        )
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Started IPC endpoints\n'
 | 
			
		||||
            f'{eps}\n'
 | 
			
		||||
            f'Started IPC server\n'
 | 
			
		||||
            f'{serv_repr}'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._endpoints.extend(eps)
 | 
			
		||||
        # XXX, just a little bit of sanity
 | 
			
		||||
        # XXX, a little sanity on new ep allocations
 | 
			
		||||
        group_tn: Nursery|None = None
 | 
			
		||||
        ep: Endpoint
 | 
			
		||||
        for ep in eps:
 | 
			
		||||
| 
						 | 
				
			
			@ -956,9 +1067,13 @@ async def _serve_ipc_eps(
 | 
			
		|||
                    stream_handler_tn=stream_handler_tn,
 | 
			
		||||
                )
 | 
			
		||||
                try:
 | 
			
		||||
                    ep_sclang: str = nest_from_op(
 | 
			
		||||
                        input_op='>[',
 | 
			
		||||
                        text=f'{ep.pformat()}',
 | 
			
		||||
                    )
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        f'Starting new endpoint listener\n'
 | 
			
		||||
                        f'{ep}\n'
 | 
			
		||||
                        f'{ep_sclang}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    listener: trio.abc.Listener = await ep.start_listener()
 | 
			
		||||
                    assert listener is ep._listener
 | 
			
		||||
| 
						 | 
				
			
			@ -996,17 +1111,6 @@ async def _serve_ipc_eps(
 | 
			
		|||
                    handler_nursery=stream_handler_tn
 | 
			
		||||
                )
 | 
			
		||||
            )
 | 
			
		||||
            # TODO, wow make this message better! XD
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'Started server(s)\n'
 | 
			
		||||
                +
 | 
			
		||||
                '\n'.join([f'|_{addr}' for addr in listen_addrs])
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                f'Started IPC endpoints\n'
 | 
			
		||||
                f'{eps}\n'
 | 
			
		||||
            )
 | 
			
		||||
            task_status.started(
 | 
			
		||||
                eps,
 | 
			
		||||
            )
 | 
			
		||||
| 
						 | 
				
			
			@ -1049,8 +1153,7 @@ async def open_ipc_server(
 | 
			
		|||
        try:
 | 
			
		||||
            yield ipc_server
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                f'Waiting on server to shutdown or be cancelled..\n'
 | 
			
		||||
                f'{ipc_server}'
 | 
			
		||||
                'Server-tn running until terminated\n'
 | 
			
		||||
            )
 | 
			
		||||
            # TODO? when if ever would we want/need this?
 | 
			
		||||
            # with trio.CancelScope(shield=True):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -127,10 +127,9 @@ async def start_listener(
 | 
			
		|||
    Start a TCP socket listener on the given `TCPAddress`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    log.info(
 | 
			
		||||
        f'Attempting to bind TCP socket\n'
 | 
			
		||||
        f'>[\n'
 | 
			
		||||
        f'|_{addr}\n'
 | 
			
		||||
    log.runtime(
 | 
			
		||||
        f'Trying socket bind\n'
 | 
			
		||||
        f'>[ {addr}\n'
 | 
			
		||||
    )
 | 
			
		||||
    # ?TODO, maybe we should just change the lower-level call this is
 | 
			
		||||
    # using internall per-listener?
 | 
			
		||||
| 
						 | 
				
			
			@ -145,11 +144,10 @@ async def start_listener(
 | 
			
		|||
    assert len(listeners) == 1
 | 
			
		||||
    listener = listeners[0]
 | 
			
		||||
    host, port = listener.socket.getsockname()[:2]
 | 
			
		||||
 | 
			
		||||
    bound_addr: TCPAddress = type(addr).from_addr((host, port))
 | 
			
		||||
    log.info(
 | 
			
		||||
        f'Listening on TCP socket\n'
 | 
			
		||||
        f'[>\n'
 | 
			
		||||
        f' |_{addr}\n'
 | 
			
		||||
        f'[> {bound_addr}\n'
 | 
			
		||||
    )
 | 
			
		||||
    return listener
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -81,10 +81,35 @@ BOLD_PALETTE = {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def at_least_level(
 | 
			
		||||
    log: Logger|LoggerAdapter,
 | 
			
		||||
    level: int|str,
 | 
			
		||||
) -> bool:
 | 
			
		||||
    '''
 | 
			
		||||
    Predicate to test if a given level is active.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    if isinstance(level, str):
 | 
			
		||||
        level: int = CUSTOM_LEVELS[level.upper()]
 | 
			
		||||
 | 
			
		||||
    if log.getEffectiveLevel() <= level:
 | 
			
		||||
        return True
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: this isn't showing the correct '{filename}'
 | 
			
		||||
# as it did before..
 | 
			
		||||
class StackLevelAdapter(LoggerAdapter):
 | 
			
		||||
 | 
			
		||||
    def at_least_level(
 | 
			
		||||
        self,
 | 
			
		||||
        level: str,
 | 
			
		||||
    ) -> bool:
 | 
			
		||||
        return at_least_level(
 | 
			
		||||
            log=self,
 | 
			
		||||
            level=level,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def transport(
 | 
			
		||||
        self,
 | 
			
		||||
        msg: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -401,19 +426,3 @@ def get_loglevel() -> str:
 | 
			
		|||
 | 
			
		||||
# global module logger for tractor itself
 | 
			
		||||
log: StackLevelAdapter = get_logger('tractor')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def at_least_level(
 | 
			
		||||
    log: Logger|LoggerAdapter,
 | 
			
		||||
    level: int|str,
 | 
			
		||||
) -> bool:
 | 
			
		||||
    '''
 | 
			
		||||
    Predicate to test if a given level is active.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    if isinstance(level, str):
 | 
			
		||||
        level: int = CUSTOM_LEVELS[level.upper()]
 | 
			
		||||
 | 
			
		||||
    if log.getEffectiveLevel() <= level:
 | 
			
		||||
        return True
 | 
			
		||||
    return False
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -177,6 +177,16 @@ class Aid(
 | 
			
		|||
            f'{self.name}@{self.pid!r}'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # mk hashable via `.uuid`
 | 
			
		||||
    def __hash__(self) -> int:
 | 
			
		||||
        return hash(self.uuid)
 | 
			
		||||
 | 
			
		||||
    def __eq__(self, other: Aid) -> bool:
 | 
			
		||||
        return self.uuid == other.uuid
 | 
			
		||||
 | 
			
		||||
    # use pretty fmt since often repr-ed for console/log
 | 
			
		||||
    __repr__ = pretty_struct.Struct.__repr__
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SpawnSpec(
 | 
			
		||||
    pretty_struct.Struct,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -60,6 +60,9 @@ def find_masked_excs(
 | 
			
		|||
    return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# XXX, relevant ish discussion @ `trio`-core,
 | 
			
		||||
# https://github.com/python-trio/trio/issues/455#issuecomment-2785122216
 | 
			
		||||
#
 | 
			
		||||
@acm
 | 
			
		||||
async def maybe_raise_from_masking_exc(
 | 
			
		||||
    tn: trio.Nursery|None = None,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue