Compare commits
	
		
			10 Commits 
		
	
	
		
			f3285ea870
			...
			1c73c0c0ee
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						1c73c0c0ee | |
| 
							
							
								 | 
						101cd94e89 | |
| 
							
							
								 | 
						3f33ba1cc0 | |
| 
							
							
								 | 
						70f5315506 | |
| 
							
							
								 | 
						496fac04bb | |
| 
							
							
								 | 
						02baeb6a8b | |
| 
							
							
								 | 
						d4ab802e14 | |
| 
							
							
								 | 
						fdeaeef9f7 | |
| 
							
							
								 | 
						41609d1433 | |
| 
							
							
								 | 
						c9068522ed | 
| 
						 | 
				
			
			@ -138,11 +138,19 @@ def tpt_protos(request) -> list[str]:
 | 
			
		|||
    yield proto_keys
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='session')
 | 
			
		||||
@pytest.fixture(
 | 
			
		||||
    scope='session',
 | 
			
		||||
    autouse=True,
 | 
			
		||||
)
 | 
			
		||||
def tpt_proto(
 | 
			
		||||
    tpt_protos: list[str],
 | 
			
		||||
) -> str:
 | 
			
		||||
    yield tpt_protos[0]
 | 
			
		||||
    proto_key: str = tpt_protos[0]
 | 
			
		||||
    from tractor import _state
 | 
			
		||||
    if _state._def_tpt_proto != proto_key:
 | 
			
		||||
        _state._def_tpt_proto = proto_key
 | 
			
		||||
    # breakpoint()
 | 
			
		||||
    yield proto_key
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_ci_env: bool = os.environ.get('CI', False)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,4 @@
 | 
			
		|||
'''
 | 
			
		||||
`tractor.ipc` subsystem(s)/unit testing suites.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,72 @@
 | 
			
		|||
'''
 | 
			
		||||
High-level `.ipc._server` unit tests.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
import trio
 | 
			
		||||
from tractor import (
 | 
			
		||||
    devx,
 | 
			
		||||
    ipc,
 | 
			
		||||
    log,
 | 
			
		||||
)
 | 
			
		||||
from tractor._testing.addr import (
 | 
			
		||||
    get_rando_addr,
 | 
			
		||||
)
 | 
			
		||||
# TODO, use/check-roundtripping with some of these wrapper types?
 | 
			
		||||
#
 | 
			
		||||
# from .._addr import Address
 | 
			
		||||
# from ._chan import Channel
 | 
			
		||||
# from ._transport import MsgTransport
 | 
			
		||||
# from ._uds import UDSAddress
 | 
			
		||||
# from ._tcp import TCPAddress
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    '_tpt_proto',
 | 
			
		||||
    ['uds', 'tcp']
 | 
			
		||||
)
 | 
			
		||||
def test_basic_ipc_server(
 | 
			
		||||
    _tpt_proto: str,
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    loglevel: str,
 | 
			
		||||
):
 | 
			
		||||
 | 
			
		||||
    # so we see the socket-listener reporting on console
 | 
			
		||||
    log.get_console_log("INFO")
 | 
			
		||||
 | 
			
		||||
    rando_addr: tuple = get_rando_addr(
 | 
			
		||||
        tpt_proto=_tpt_proto,
 | 
			
		||||
    )
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with ipc._server.open_ipc_server() as server:
 | 
			
		||||
 | 
			
		||||
            assert (
 | 
			
		||||
                server._parent_tn
 | 
			
		||||
                and
 | 
			
		||||
                server._parent_tn is server._stream_handler_tn
 | 
			
		||||
            )
 | 
			
		||||
            assert server._no_more_peers.is_set()
 | 
			
		||||
 | 
			
		||||
            eps: list[ipc.IPCEndpoint] = await server.listen_on(
 | 
			
		||||
                accept_addrs=[rando_addr],
 | 
			
		||||
                stream_handler_nursery=None,
 | 
			
		||||
            )
 | 
			
		||||
            assert (
 | 
			
		||||
                len(eps) == 1
 | 
			
		||||
                and
 | 
			
		||||
                (ep := eps[0])._listener
 | 
			
		||||
                and
 | 
			
		||||
                not ep.peer_tpts
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            server._parent_tn.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
        # !TODO! actually make a bg-task connection from a client
 | 
			
		||||
        # using `ipc._chan._connect_chan()`
 | 
			
		||||
 | 
			
		||||
    with devx.maybe_open_crash_handler(
 | 
			
		||||
        pdb=debug_mode,
 | 
			
		||||
    ):
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			@ -100,16 +100,29 @@ async def streamer(
 | 
			
		|||
@acm
 | 
			
		||||
async def open_stream() -> Awaitable[tractor.MsgStream]:
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_nursery() as tn:
 | 
			
		||||
        portal = await tn.start_actor('streamer', enable_modules=[__name__])
 | 
			
		||||
        async with (
 | 
			
		||||
            portal.open_context(streamer) as (ctx, first),
 | 
			
		||||
            ctx.open_stream() as stream,
 | 
			
		||||
        ):
 | 
			
		||||
            yield stream
 | 
			
		||||
    try:
 | 
			
		||||
        async with tractor.open_nursery() as an:
 | 
			
		||||
            portal = await an.start_actor(
 | 
			
		||||
                'streamer',
 | 
			
		||||
                enable_modules=[__name__],
 | 
			
		||||
            )
 | 
			
		||||
            async with (
 | 
			
		||||
                portal.open_context(streamer) as (ctx, first),
 | 
			
		||||
                ctx.open_stream() as stream,
 | 
			
		||||
            ):
 | 
			
		||||
                yield stream
 | 
			
		||||
 | 
			
		||||
        await portal.cancel_actor()
 | 
			
		||||
    print('CANCELLED STREAMER')
 | 
			
		||||
            print('Cancelling streamer')
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
            print('Cancelled streamer')
 | 
			
		||||
 | 
			
		||||
    except Exception as err:
 | 
			
		||||
        print(
 | 
			
		||||
            f'`open_stream()` errored?\n'
 | 
			
		||||
            f'{err!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
        await tractor.pause(shield=True)
 | 
			
		||||
        raise err
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
| 
						 | 
				
			
			@ -132,19 +145,28 @@ async def maybe_open_stream(taskname: str):
 | 
			
		|||
            yield stream
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_open_local_sub_to_stream():
 | 
			
		||||
def test_open_local_sub_to_stream(
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Verify a single inter-actor stream can can be fanned-out shared to
 | 
			
		||||
    N local tasks using ``trionics.maybe_open_context():``.
 | 
			
		||||
    N local tasks using `trionics.maybe_open_context()`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    timeout: float = 3.6 if platform.system() != "Windows" else 10
 | 
			
		||||
    timeout: float = 3.6
 | 
			
		||||
    if platform.system() == "Windows":
 | 
			
		||||
        timeout: float = 10
 | 
			
		||||
 | 
			
		||||
    if debug_mode:
 | 
			
		||||
        timeout = 999
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
 | 
			
		||||
        full = list(range(1000))
 | 
			
		||||
 | 
			
		||||
        async def get_sub_and_pull(taskname: str):
 | 
			
		||||
 | 
			
		||||
            stream: tractor.MsgStream
 | 
			
		||||
            async with (
 | 
			
		||||
                maybe_open_stream(taskname) as stream,
 | 
			
		||||
            ):
 | 
			
		||||
| 
						 | 
				
			
			@ -165,17 +187,27 @@ def test_open_local_sub_to_stream():
 | 
			
		|||
                assert set(seq).issubset(set(full))
 | 
			
		||||
            print(f'{taskname} finished')
 | 
			
		||||
 | 
			
		||||
        with trio.fail_after(timeout):
 | 
			
		||||
        with trio.fail_after(timeout) as cs:
 | 
			
		||||
            # TODO: turns out this isn't multi-task entrant XD
 | 
			
		||||
            # We probably need an indepotent entry semantic?
 | 
			
		||||
            async with tractor.open_root_actor():
 | 
			
		||||
            async with tractor.open_root_actor(
 | 
			
		||||
                debug_mode=debug_mode,
 | 
			
		||||
            ):
 | 
			
		||||
                async with (
 | 
			
		||||
                    trio.open_nursery() as nurse,
 | 
			
		||||
                    trio.open_nursery() as tn,
 | 
			
		||||
                ):
 | 
			
		||||
                    for i in range(10):
 | 
			
		||||
                        nurse.start_soon(get_sub_and_pull, f'task_{i}')
 | 
			
		||||
                        tn.start_soon(
 | 
			
		||||
                            get_sub_and_pull,
 | 
			
		||||
                            f'task_{i}',
 | 
			
		||||
                        )
 | 
			
		||||
                        await trio.sleep(0.001)
 | 
			
		||||
 | 
			
		||||
                print('all consumer tasks finished')
 | 
			
		||||
 | 
			
		||||
        if cs.cancelled_caught:
 | 
			
		||||
            pytest.fail(
 | 
			
		||||
                'Should NOT time out in `open_root_actor()` ?'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -57,7 +57,11 @@ async def spawn(
 | 
			
		|||
            )
 | 
			
		||||
 | 
			
		||||
            assert len(an._children) == 1
 | 
			
		||||
            assert portal.channel.uid in tractor.current_actor()._peers
 | 
			
		||||
            assert (
 | 
			
		||||
                portal.channel.uid
 | 
			
		||||
                in
 | 
			
		||||
                tractor.current_actor().ipc_server._peers
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # get result from child subactor
 | 
			
		||||
            result = await portal.result()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -180,7 +180,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
 | 
			
		|||
        with tractor.devx.maybe_open_crash_handler(
 | 
			
		||||
            pdb=debug_mode,
 | 
			
		||||
        ) as bxerr:
 | 
			
		||||
            assert not bxerr.value
 | 
			
		||||
            if bxerr:
 | 
			
		||||
                assert not bxerr.value
 | 
			
		||||
 | 
			
		||||
            async with (
 | 
			
		||||
                wraps_tn_that_always_cancels() as tn,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -48,6 +48,7 @@ from ._state import (
 | 
			
		|||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from ._runtime import Actor
 | 
			
		||||
    from .ipc._server import IPCServer
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
| 
						 | 
				
			
			@ -79,7 +80,7 @@ async def get_registry(
 | 
			
		|||
        )
 | 
			
		||||
    else:
 | 
			
		||||
        # TODO: try to look pre-existing connection from
 | 
			
		||||
        # `Actor._peers` and use it instead?
 | 
			
		||||
        # `IPCServer._peers` and use it instead?
 | 
			
		||||
        async with (
 | 
			
		||||
            _connect_chan(addr) as chan,
 | 
			
		||||
            open_portal(chan) as regstr_ptl,
 | 
			
		||||
| 
						 | 
				
			
			@ -111,7 +112,7 @@ def get_peer_by_name(
 | 
			
		|||
) -> list[Channel]|None:  # at least 1
 | 
			
		||||
    '''
 | 
			
		||||
    Scan for an existing connection (set) to a named actor
 | 
			
		||||
    and return any channels from `Actor._peers`.
 | 
			
		||||
    and return any channels from `IPCServer._peers: dict`.
 | 
			
		||||
 | 
			
		||||
    This is an optimization method over querying the registrar for
 | 
			
		||||
    the same info.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -578,12 +578,11 @@ async def open_portal(
 | 
			
		|||
 | 
			
		||||
        msg_loop_cs: trio.CancelScope|None = None
 | 
			
		||||
        if start_msg_loop:
 | 
			
		||||
            from ._runtime import process_messages
 | 
			
		||||
            from . import _rpc
 | 
			
		||||
            msg_loop_cs = await tn.start(
 | 
			
		||||
                partial(
 | 
			
		||||
                    process_messages,
 | 
			
		||||
                    actor,
 | 
			
		||||
                    channel,
 | 
			
		||||
                    _rpc.process_messages,
 | 
			
		||||
                    chan=channel,
 | 
			
		||||
                    # if the local task is cancelled we want to keep
 | 
			
		||||
                    # the msg loop running until our block ends
 | 
			
		||||
                    shield=True,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
async def process_messages(
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
    chan: Channel,
 | 
			
		||||
    shield: bool = False,
 | 
			
		||||
    task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
| 
						 | 
				
			
			@ -907,6 +906,7 @@ async def process_messages(
 | 
			
		|||
      (as utilized inside `Portal.cancel_actor()` ).
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    actor: Actor = _state.current_actor()
 | 
			
		||||
    assert actor._service_n  # runtime state sanity
 | 
			
		||||
 | 
			
		||||
    # TODO: once `trio` get's an "obvious way" for req/resp we
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -40,9 +40,7 @@ from __future__ import annotations
 | 
			
		|||
from contextlib import (
 | 
			
		||||
    ExitStack,
 | 
			
		||||
)
 | 
			
		||||
from collections import defaultdict
 | 
			
		||||
from functools import partial
 | 
			
		||||
from itertools import chain
 | 
			
		||||
import importlib
 | 
			
		||||
import importlib.util
 | 
			
		||||
import os
 | 
			
		||||
| 
						 | 
				
			
			@ -76,6 +74,7 @@ from tractor.msg import (
 | 
			
		|||
)
 | 
			
		||||
from .ipc import (
 | 
			
		||||
    Channel,
 | 
			
		||||
    # IPCServer,  # causes cycles atm..
 | 
			
		||||
    _server,
 | 
			
		||||
)
 | 
			
		||||
from ._addr import (
 | 
			
		||||
| 
						 | 
				
			
			@ -96,18 +95,13 @@ from ._exceptions import (
 | 
			
		|||
    ModuleNotExposed,
 | 
			
		||||
    MsgTypeError,
 | 
			
		||||
    unpack_error,
 | 
			
		||||
    TransportClosed,
 | 
			
		||||
)
 | 
			
		||||
from .devx import _debug
 | 
			
		||||
from ._discovery import get_registry
 | 
			
		||||
from ._portal import Portal
 | 
			
		||||
from . import _state
 | 
			
		||||
from . import _mp_fixup_main
 | 
			
		||||
from ._rpc import (
 | 
			
		||||
    process_messages,
 | 
			
		||||
    try_ship_error_to_remote,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from . import _rpc
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from ._supervise import ActorNursery
 | 
			
		||||
| 
						 | 
				
			
			@ -161,7 +155,6 @@ class Actor:
 | 
			
		|||
    _root_n: Nursery|None = None
 | 
			
		||||
    _service_n: Nursery|None = None
 | 
			
		||||
 | 
			
		||||
    # XXX moving to IPCServer!
 | 
			
		||||
    _ipc_server: _server.IPCServer|None = None
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
| 
						 | 
				
			
			@ -251,14 +244,6 @@ class Actor:
 | 
			
		|||
        # by the user (currently called the "arbiter")
 | 
			
		||||
        self._spawn_method: str = spawn_method
 | 
			
		||||
 | 
			
		||||
        self._peers: defaultdict[
 | 
			
		||||
            str,  # uaid
 | 
			
		||||
            list[Channel],  # IPC conns from peer
 | 
			
		||||
        ] = defaultdict(list)
 | 
			
		||||
        self._peer_connected: dict[tuple[str, str], trio.Event] = {}
 | 
			
		||||
        self._no_more_peers = trio.Event()
 | 
			
		||||
        self._no_more_peers.set()
 | 
			
		||||
 | 
			
		||||
        # RPC state
 | 
			
		||||
        self._ongoing_rpc_tasks = trio.Event()
 | 
			
		||||
        self._ongoing_rpc_tasks.set()
 | 
			
		||||
| 
						 | 
				
			
			@ -343,7 +328,12 @@ class Actor:
 | 
			
		|||
        parent_uid: tuple|None = None
 | 
			
		||||
        if rent_chan := self._parent_chan:
 | 
			
		||||
            parent_uid = rent_chan.uid
 | 
			
		||||
        peers: list[tuple] = list(self._peer_connected)
 | 
			
		||||
 | 
			
		||||
        peers: list = []
 | 
			
		||||
        server: _server.IPCServer = self.ipc_server
 | 
			
		||||
        if server:
 | 
			
		||||
            peers: list[tuple] = list(server._peer_connected)
 | 
			
		||||
 | 
			
		||||
        fmtstr: str = (
 | 
			
		||||
            f' |_id: {self.aid!r}\n'
 | 
			
		||||
            # f"   aid{ds}{self.aid!r}\n"
 | 
			
		||||
| 
						 | 
				
			
			@ -399,25 +389,6 @@ class Actor:
 | 
			
		|||
 | 
			
		||||
        self._reg_addrs = addrs
 | 
			
		||||
 | 
			
		||||
    async def wait_for_peer(
 | 
			
		||||
        self,
 | 
			
		||||
        uid: tuple[str, str],
 | 
			
		||||
 | 
			
		||||
    ) -> tuple[trio.Event, Channel]:
 | 
			
		||||
        '''
 | 
			
		||||
        Wait for a connection back from a (spawned sub-)actor with
 | 
			
		||||
        a `uid` using a `trio.Event` for sync.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        log.debug(f'Waiting for peer {uid!r} to connect')
 | 
			
		||||
        event = self._peer_connected.setdefault(uid, trio.Event())
 | 
			
		||||
        await event.wait()
 | 
			
		||||
        log.debug(f'{uid!r} successfully connected back to us')
 | 
			
		||||
        return (
 | 
			
		||||
            event,
 | 
			
		||||
            self._peers[uid][-1],
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def load_modules(
 | 
			
		||||
        self,
 | 
			
		||||
        # debug_mode: bool = False,
 | 
			
		||||
| 
						 | 
				
			
			@ -493,434 +464,6 @@ class Actor:
 | 
			
		|||
 | 
			
		||||
            raise mne
 | 
			
		||||
 | 
			
		||||
    # TODO: maybe change to mod-func and rename for implied
 | 
			
		||||
    # multi-transport semantics?
 | 
			
		||||
    async def _stream_handler(
 | 
			
		||||
        self,
 | 
			
		||||
        stream: trio.SocketStream,
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
        Entry point for new inbound IPC connections on a specific
 | 
			
		||||
        transport server.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        self._no_more_peers = trio.Event()  # unset by making new
 | 
			
		||||
        # with _debug.maybe_open_crash_handler(
 | 
			
		||||
        #     pdb=True,
 | 
			
		||||
        # ) as boxerr:
 | 
			
		||||
        chan = Channel.from_stream(stream)
 | 
			
		||||
        con_status: str = (
 | 
			
		||||
            'New inbound IPC connection <=\n'
 | 
			
		||||
            f'|_{chan}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # send/receive initial handshake response
 | 
			
		||||
        try:
 | 
			
		||||
            peer_aid: msgtypes.Aid = await chan._do_handshake(
 | 
			
		||||
                aid=self.aid,
 | 
			
		||||
            )
 | 
			
		||||
        except (
 | 
			
		||||
            TransportClosed,
 | 
			
		||||
            # ^XXX NOTE, the above wraps `trio` exc types raised
 | 
			
		||||
            # during various `SocketStream.send/receive_xx()` calls
 | 
			
		||||
            # under different fault conditions such as,
 | 
			
		||||
            #
 | 
			
		||||
            # trio.BrokenResourceError,
 | 
			
		||||
            # trio.ClosedResourceError,
 | 
			
		||||
            #
 | 
			
		||||
            # Inside our `.ipc._transport` layer we absorb and
 | 
			
		||||
            # re-raise our own `TransportClosed` exc such that this
 | 
			
		||||
            # higher level runtime code can only worry one
 | 
			
		||||
            # "kinda-error" that we expect to tolerate during
 | 
			
		||||
            # discovery-sys related pings, queires, DoS etc.
 | 
			
		||||
        ):
 | 
			
		||||
            # XXX: This may propagate up from `Channel._aiter_recv()`
 | 
			
		||||
            # and `MsgpackStream._inter_packets()` on a read from the
 | 
			
		||||
            # stream particularly when the runtime is first starting up
 | 
			
		||||
            # inside `open_root_actor()` where there is a check for
 | 
			
		||||
            # a bound listener on the "arbiter" addr.  the reset will be
 | 
			
		||||
            # because the handshake was never meant took place.
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                con_status
 | 
			
		||||
                +
 | 
			
		||||
                ' -> But failed to handshake? Ignoring..\n'
 | 
			
		||||
            )
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        uid: tuple[str, str] = (
 | 
			
		||||
            peer_aid.name,
 | 
			
		||||
            peer_aid.uuid,
 | 
			
		||||
        )
 | 
			
		||||
        # TODO, can we make this downstream peer tracking use the
 | 
			
		||||
        # `peer_aid` instead?
 | 
			
		||||
        familiar: str = 'new-peer'
 | 
			
		||||
        if _pre_chan := self._peers.get(uid):
 | 
			
		||||
            familiar: str = 'pre-existing-peer'
 | 
			
		||||
        uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
 | 
			
		||||
        con_status += (
 | 
			
		||||
            f' -> Handshake with {familiar} `{uid_short}` complete\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if _pre_chan:
 | 
			
		||||
            # con_status += (
 | 
			
		||||
            # ^TODO^ swap once we minimize conn duplication
 | 
			
		||||
            # -[ ] last thing might be reg/unreg runtime reqs?
 | 
			
		||||
            # log.warning(
 | 
			
		||||
            log.debug(
 | 
			
		||||
                f'?Wait?\n'
 | 
			
		||||
                f'We already have IPC with peer {uid_short!r}\n'
 | 
			
		||||
                f'|_{_pre_chan}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # IPC connection tracking for both peers and new children:
 | 
			
		||||
        # - if this is a new channel to a locally spawned
 | 
			
		||||
        #   sub-actor there will be a spawn wait even registered
 | 
			
		||||
        #   by a call to `.wait_for_peer()`.
 | 
			
		||||
        # - if a peer is connecting no such event will exit.
 | 
			
		||||
        event: trio.Event|None = self._peer_connected.pop(
 | 
			
		||||
            uid,
 | 
			
		||||
            None,
 | 
			
		||||
        )
 | 
			
		||||
        if event:
 | 
			
		||||
            con_status += (
 | 
			
		||||
                ' -> Waking subactor spawn waiters: '
 | 
			
		||||
                f'{event.statistics().tasks_waiting}\n'
 | 
			
		||||
                f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
 | 
			
		||||
                # f'    {event}\n'
 | 
			
		||||
                # f'    |{event.statistics()}\n'
 | 
			
		||||
            )
 | 
			
		||||
            # wake tasks waiting on this IPC-transport "connect-back"
 | 
			
		||||
            event.set()
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            con_status += (
 | 
			
		||||
                f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
 | 
			
		||||
            )  # type: ignore
 | 
			
		||||
 | 
			
		||||
        chans: list[Channel] = self._peers[uid]
 | 
			
		||||
        # if chans:
 | 
			
		||||
        #     # TODO: re-use channels for new connections instead
 | 
			
		||||
        #     # of always new ones?
 | 
			
		||||
        #     # => will require changing all the discovery funcs..
 | 
			
		||||
 | 
			
		||||
        # append new channel
 | 
			
		||||
        # TODO: can we just use list-ref directly?
 | 
			
		||||
        chans.append(chan)
 | 
			
		||||
 | 
			
		||||
        con_status += ' -> Entering RPC msg loop..\n'
 | 
			
		||||
        log.runtime(con_status)
 | 
			
		||||
 | 
			
		||||
        # Begin channel management - respond to remote requests and
 | 
			
		||||
        # process received reponses.
 | 
			
		||||
        disconnected: bool = False
 | 
			
		||||
        last_msg: MsgType
 | 
			
		||||
        try:
 | 
			
		||||
            (
 | 
			
		||||
                disconnected,
 | 
			
		||||
                last_msg,
 | 
			
		||||
            ) = await process_messages(
 | 
			
		||||
                self,
 | 
			
		||||
                chan,
 | 
			
		||||
            )
 | 
			
		||||
        except trio.Cancelled:
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                'IPC transport msg loop was cancelled\n'
 | 
			
		||||
                f'c)>\n'
 | 
			
		||||
                f' |_{chan}\n'
 | 
			
		||||
            )
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
        finally:
 | 
			
		||||
            local_nursery: (
 | 
			
		||||
                ActorNursery|None
 | 
			
		||||
            ) = self._actoruid2nursery.get(uid)
 | 
			
		||||
 | 
			
		||||
            # This is set in ``Portal.cancel_actor()``. So if
 | 
			
		||||
            # the peer was cancelled we try to wait for them
 | 
			
		||||
            # to tear down their side of the connection before
 | 
			
		||||
            # moving on with closing our own side.
 | 
			
		||||
            if (
 | 
			
		||||
                local_nursery
 | 
			
		||||
                and (
 | 
			
		||||
                    self._cancel_called
 | 
			
		||||
                    or
 | 
			
		||||
                    chan._cancel_called
 | 
			
		||||
                )
 | 
			
		||||
                #
 | 
			
		||||
                # ^-TODO-^ along with this is there another condition
 | 
			
		||||
                # that we should filter with to avoid entering this
 | 
			
		||||
                # waiting block needlessly?
 | 
			
		||||
                # -[ ] maybe `and local_nursery.cancelled` and/or
 | 
			
		||||
                #     only if the `._children` table is empty or has
 | 
			
		||||
                #     only `Portal`s with .chan._cancel_called ==
 | 
			
		||||
                #     True` as per what we had below; the MAIN DIFF
 | 
			
		||||
                #     BEING that just bc one `Portal.cancel_actor()`
 | 
			
		||||
                #     was called, doesn't mean the whole actor-nurse
 | 
			
		||||
                #     is gonna exit any time soon right!?
 | 
			
		||||
                #
 | 
			
		||||
                # or
 | 
			
		||||
                # all(chan._cancel_called for chan in chans)
 | 
			
		||||
 | 
			
		||||
            ):
 | 
			
		||||
                log.cancel(
 | 
			
		||||
                    'Waiting on cancel request to peer..\n'
 | 
			
		||||
                    f'c)=>\n'
 | 
			
		||||
                    f'  |_{chan.uid}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
                # XXX: this is a soft wait on the channel (and its
 | 
			
		||||
                # underlying transport protocol) to close from the
 | 
			
		||||
                # remote peer side since we presume that any channel
 | 
			
		||||
                # which is mapped to a sub-actor (i.e. it's managed
 | 
			
		||||
                # by local actor-nursery) has a message that is sent
 | 
			
		||||
                # to the peer likely by this actor (which may be in
 | 
			
		||||
                # a shutdown sequence due to cancellation) when the
 | 
			
		||||
                # local runtime here is now cancelled while
 | 
			
		||||
                # (presumably) in the middle of msg loop processing.
 | 
			
		||||
                chan_info: str = (
 | 
			
		||||
                    f'{chan.uid}\n'
 | 
			
		||||
                    f'|_{chan}\n'
 | 
			
		||||
                    f'  |_{chan.transport}\n\n'
 | 
			
		||||
                )
 | 
			
		||||
                with trio.move_on_after(0.5) as drain_cs:
 | 
			
		||||
                    drain_cs.shield = True
 | 
			
		||||
 | 
			
		||||
                    # attempt to wait for the far end to close the
 | 
			
		||||
                    # channel and bail after timeout (a 2-generals
 | 
			
		||||
                    # problem on closure).
 | 
			
		||||
                    assert chan.transport
 | 
			
		||||
                    async for msg in chan.transport.drain():
 | 
			
		||||
 | 
			
		||||
                        # try to deliver any lingering msgs
 | 
			
		||||
                        # before we destroy the channel.
 | 
			
		||||
                        # This accomplishes deterministic
 | 
			
		||||
                        # ``Portal.cancel_actor()`` cancellation by
 | 
			
		||||
                        # making sure any RPC response to that call is
 | 
			
		||||
                        # delivered the local calling task.
 | 
			
		||||
                        # TODO: factor this into a helper?
 | 
			
		||||
                        log.warning(
 | 
			
		||||
                            'Draining msg from disconnected peer\n'
 | 
			
		||||
                            f'{chan_info}'
 | 
			
		||||
                            f'{pformat(msg)}\n'
 | 
			
		||||
                        )
 | 
			
		||||
                        # cid: str|None = msg.get('cid')
 | 
			
		||||
                        cid: str|None = msg.cid
 | 
			
		||||
                        if cid:
 | 
			
		||||
                            # deliver response to local caller/waiter
 | 
			
		||||
                            await self._deliver_ctx_payload(
 | 
			
		||||
                                chan,
 | 
			
		||||
                                cid,
 | 
			
		||||
                                msg,
 | 
			
		||||
                            )
 | 
			
		||||
                if drain_cs.cancelled_caught:
 | 
			
		||||
                    log.warning(
 | 
			
		||||
                        'Timed out waiting on IPC transport channel to drain?\n'
 | 
			
		||||
                        f'{chan_info}'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                # XXX NOTE XXX when no explicit call to
 | 
			
		||||
                # `open_root_actor()` was made by the application
 | 
			
		||||
                # (normally we implicitly make that call inside
 | 
			
		||||
                # the first `.open_nursery()` in root-actor
 | 
			
		||||
                # user/app code), we can assume that either we
 | 
			
		||||
                # are NOT the root actor or are root but the
 | 
			
		||||
                # runtime was started manually. and thus DO have
 | 
			
		||||
                # to wait for the nursery-enterer to exit before
 | 
			
		||||
                # shutting down the local runtime to avoid
 | 
			
		||||
                # clobbering any ongoing subactor
 | 
			
		||||
                # teardown/debugging/graceful-cancel.
 | 
			
		||||
                #
 | 
			
		||||
                # see matching  note inside `._supervise.open_nursery()`
 | 
			
		||||
                #
 | 
			
		||||
                # TODO: should we have a separate cs + timeout
 | 
			
		||||
                # block here?
 | 
			
		||||
                if (
 | 
			
		||||
                    # XXX SO either,
 | 
			
		||||
                    #  - not root OR,
 | 
			
		||||
                    #  - is root but `open_root_actor()` was
 | 
			
		||||
                    #    entered manually (in which case we do
 | 
			
		||||
                    #    the equiv wait there using the
 | 
			
		||||
                    #    `devx._debug` sub-sys APIs).
 | 
			
		||||
                    not local_nursery._implicit_runtime_started
 | 
			
		||||
                ):
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        'Waiting on local actor nursery to exit..\n'
 | 
			
		||||
                        f'|_{local_nursery}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    with trio.move_on_after(0.5) as an_exit_cs:
 | 
			
		||||
                        an_exit_cs.shield = True
 | 
			
		||||
                        await local_nursery.exited.wait()
 | 
			
		||||
 | 
			
		||||
                    # TODO: currently this is always triggering for every
 | 
			
		||||
                    # sub-daemon spawned from the `piker.services._mngr`?
 | 
			
		||||
                    # -[ ] how do we ensure that the IPC is supposed to
 | 
			
		||||
                    #      be long lived and isn't just a register?
 | 
			
		||||
                    # |_ in the register case how can we signal that the
 | 
			
		||||
                    #    ephemeral msg loop was intentional?
 | 
			
		||||
                    if (
 | 
			
		||||
                        # not local_nursery._implicit_runtime_started
 | 
			
		||||
                        # and
 | 
			
		||||
                        an_exit_cs.cancelled_caught
 | 
			
		||||
                    ):
 | 
			
		||||
                        report: str = (
 | 
			
		||||
                            'Timed out waiting on local actor-nursery to exit?\n'
 | 
			
		||||
                            f'c)>\n'
 | 
			
		||||
                            f' |_{local_nursery}\n'
 | 
			
		||||
                        )
 | 
			
		||||
                        if children := local_nursery._children:
 | 
			
		||||
                            # indent from above local-nurse repr
 | 
			
		||||
                            report += (
 | 
			
		||||
                                f'   |_{pformat(children)}\n'
 | 
			
		||||
                            )
 | 
			
		||||
 | 
			
		||||
                        log.warning(report)
 | 
			
		||||
 | 
			
		||||
                if disconnected:
 | 
			
		||||
                    # if the transport died and this actor is still
 | 
			
		||||
                    # registered within a local nursery, we report
 | 
			
		||||
                    # that the IPC layer may have failed
 | 
			
		||||
                    # unexpectedly since it may be the cause of
 | 
			
		||||
                    # other downstream errors.
 | 
			
		||||
                    entry: tuple|None = local_nursery._children.get(uid)
 | 
			
		||||
                    if entry:
 | 
			
		||||
                        proc: trio.Process
 | 
			
		||||
                        _, proc, _ = entry
 | 
			
		||||
 | 
			
		||||
                        if (
 | 
			
		||||
                            (poll := getattr(proc, 'poll', None))
 | 
			
		||||
                            and
 | 
			
		||||
                            poll() is None  # proc still alive
 | 
			
		||||
                        ):
 | 
			
		||||
                            # TODO: change log level based on
 | 
			
		||||
                            # detecting whether chan was created for
 | 
			
		||||
                            # ephemeral `.register_actor()` request!
 | 
			
		||||
                            # -[ ] also, that should be avoidable by
 | 
			
		||||
                            #   re-using any existing chan from the
 | 
			
		||||
                            #   `._discovery.get_registry()` call as
 | 
			
		||||
                            #   well..
 | 
			
		||||
                            log.runtime(
 | 
			
		||||
                                f'Peer IPC broke but subproc is alive?\n\n'
 | 
			
		||||
 | 
			
		||||
                                f'<=x {chan.uid}@{chan.raddr}\n'
 | 
			
		||||
                                f'   |_{proc}\n'
 | 
			
		||||
                            )
 | 
			
		||||
 | 
			
		||||
            # ``Channel`` teardown and closure sequence
 | 
			
		||||
            # drop ref to channel so it can be gc-ed and disconnected
 | 
			
		||||
            con_teardown_status: str = (
 | 
			
		||||
                f'IPC channel disconnected:\n'
 | 
			
		||||
                f'<=x uid: {chan.uid}\n'
 | 
			
		||||
                f'   |_{pformat(chan)}\n\n'
 | 
			
		||||
            )
 | 
			
		||||
            chans.remove(chan)
 | 
			
		||||
 | 
			
		||||
            # TODO: do we need to be this pedantic?
 | 
			
		||||
            if not chans:
 | 
			
		||||
                con_teardown_status += (
 | 
			
		||||
                    f'-> No more channels with {chan.uid}'
 | 
			
		||||
                )
 | 
			
		||||
                self._peers.pop(uid, None)
 | 
			
		||||
 | 
			
		||||
            peers_str: str = ''
 | 
			
		||||
            for uid, chans in self._peers.items():
 | 
			
		||||
                peers_str += (
 | 
			
		||||
                    f'uid: {uid}\n'
 | 
			
		||||
                )
 | 
			
		||||
                for i, chan in enumerate(chans):
 | 
			
		||||
                    peers_str += (
 | 
			
		||||
                        f' |_[{i}] {pformat(chan)}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
            con_teardown_status += (
 | 
			
		||||
                f'-> Remaining IPC {len(self._peers)} peers: {peers_str}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # No more channels to other actors (at all) registered
 | 
			
		||||
            # as connected.
 | 
			
		||||
            if not self._peers:
 | 
			
		||||
                con_teardown_status += (
 | 
			
		||||
                    'Signalling no more peer channel connections'
 | 
			
		||||
                )
 | 
			
		||||
                self._no_more_peers.set()
 | 
			
		||||
 | 
			
		||||
                # NOTE: block this actor from acquiring the
 | 
			
		||||
                # debugger-TTY-lock since we have no way to know if we
 | 
			
		||||
                # cancelled it and further there is no way to ensure the
 | 
			
		||||
                # lock will be released if acquired due to having no
 | 
			
		||||
                # more active IPC channels.
 | 
			
		||||
                if _state.is_root_process():
 | 
			
		||||
                    pdb_lock = _debug.Lock
 | 
			
		||||
                    pdb_lock._blocked.add(uid)
 | 
			
		||||
 | 
			
		||||
                    # TODO: NEEEDS TO BE TESTED!
 | 
			
		||||
                    # actually, no idea if this ever even enters.. XD
 | 
			
		||||
                    #
 | 
			
		||||
                    # XXX => YES IT DOES, when i was testing ctl-c
 | 
			
		||||
                    # from broken debug TTY locking due to
 | 
			
		||||
                    # msg-spec races on application using RunVar...
 | 
			
		||||
                    if (
 | 
			
		||||
                        (ctx_in_debug := pdb_lock.ctx_in_debug)
 | 
			
		||||
                        and
 | 
			
		||||
                        (pdb_user_uid := ctx_in_debug.chan.uid)
 | 
			
		||||
                        and
 | 
			
		||||
                        local_nursery
 | 
			
		||||
                    ):
 | 
			
		||||
                        entry: tuple|None = local_nursery._children.get(
 | 
			
		||||
                            tuple(pdb_user_uid)
 | 
			
		||||
                        )
 | 
			
		||||
                        if entry:
 | 
			
		||||
                            proc: trio.Process
 | 
			
		||||
                            _, proc, _ = entry
 | 
			
		||||
 | 
			
		||||
                            if (
 | 
			
		||||
                                (poll := getattr(proc, 'poll', None))
 | 
			
		||||
                                and poll() is None
 | 
			
		||||
                            ):
 | 
			
		||||
                                log.cancel(
 | 
			
		||||
                                    'Root actor reports no-more-peers, BUT\n'
 | 
			
		||||
                                    'a DISCONNECTED child still has the debug '
 | 
			
		||||
                                    'lock!\n\n'
 | 
			
		||||
                                    # f'root uid: {self.uid}\n'
 | 
			
		||||
                                    f'last disconnected child uid: {uid}\n'
 | 
			
		||||
                                    f'locking child uid: {pdb_user_uid}\n'
 | 
			
		||||
                                )
 | 
			
		||||
                                await _debug.maybe_wait_for_debugger(
 | 
			
		||||
                                    child_in_debug=True
 | 
			
		||||
                                )
 | 
			
		||||
 | 
			
		||||
                    # TODO: just bc a child's transport dropped
 | 
			
		||||
                    # doesn't mean it's not still using the pdb
 | 
			
		||||
                    # REPL! so,
 | 
			
		||||
                    # -[ ] ideally we can check out child proc
 | 
			
		||||
                    #  tree to ensure that its alive (and
 | 
			
		||||
                    #  actually using the REPL) before we cancel
 | 
			
		||||
                    #  it's lock acquire by doing the below!
 | 
			
		||||
                    # -[ ] create a way to read the tree of each actor's
 | 
			
		||||
                    #  grandchildren such that when an
 | 
			
		||||
                    #  intermediary parent is cancelled but their
 | 
			
		||||
                    #  child has locked the tty, the grandparent
 | 
			
		||||
                    #  will not allow the parent to cancel or
 | 
			
		||||
                    #  zombie reap the child! see open issue:
 | 
			
		||||
                    #  - https://github.com/goodboy/tractor/issues/320
 | 
			
		||||
                    # ------ - ------
 | 
			
		||||
                    # if a now stale local task has the TTY lock still
 | 
			
		||||
                    # we cancel it to allow servicing other requests for
 | 
			
		||||
                    # the lock.
 | 
			
		||||
                    if (
 | 
			
		||||
                        (db_cs := pdb_lock.get_locking_task_cs())
 | 
			
		||||
                        and not db_cs.cancel_called
 | 
			
		||||
                        and uid == pdb_user_uid
 | 
			
		||||
                    ):
 | 
			
		||||
                        log.critical(
 | 
			
		||||
                            f'STALE DEBUG LOCK DETECTED FOR {uid}'
 | 
			
		||||
                        )
 | 
			
		||||
                        # TODO: figure out why this breaks tests..
 | 
			
		||||
                        db_cs.cancel()
 | 
			
		||||
 | 
			
		||||
            log.runtime(con_teardown_status)
 | 
			
		||||
        # finally block closure
 | 
			
		||||
 | 
			
		||||
    # TODO: rename to `._deliver_payload()` since this handles
 | 
			
		||||
    # more then just `result` msgs now obvi XD
 | 
			
		||||
    async def _deliver_ctx_payload(
 | 
			
		||||
| 
						 | 
				
			
			@ -1157,7 +700,7 @@ class Actor:
 | 
			
		|||
            )
 | 
			
		||||
            assert isinstance(chan, Channel)
 | 
			
		||||
 | 
			
		||||
            # Initial handshake: swap names.
 | 
			
		||||
            # init handshake: swap actor-IDs.
 | 
			
		||||
            await chan._do_handshake(aid=self.aid)
 | 
			
		||||
 | 
			
		||||
            accept_addrs: list[UnwrappedAddress]|None = None
 | 
			
		||||
| 
						 | 
				
			
			@ -1719,6 +1262,10 @@ async def async_main(
 | 
			
		|||
    the actor's "runtime" and all thus all ongoing RPC tasks.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # XXX NOTE, `_state._current_actor` **must** be set prior to
 | 
			
		||||
    # calling this core runtime entrypoint!
 | 
			
		||||
    assert actor is _state.current_actor()
 | 
			
		||||
 | 
			
		||||
    actor._task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
 | 
			
		||||
    # attempt to retreive ``trio``'s sigint handler and stash it
 | 
			
		||||
| 
						 | 
				
			
			@ -1778,7 +1325,6 @@ async def async_main(
 | 
			
		|||
                ) as service_nursery,
 | 
			
		||||
 | 
			
		||||
                _server.open_ipc_server(
 | 
			
		||||
                    actor=actor,
 | 
			
		||||
                    parent_tn=service_nursery,
 | 
			
		||||
                    stream_handler_tn=service_nursery,
 | 
			
		||||
                ) as ipc_server,
 | 
			
		||||
| 
						 | 
				
			
			@ -1832,7 +1378,6 @@ async def async_main(
 | 
			
		|||
                        'Booting IPC server'
 | 
			
		||||
                    )
 | 
			
		||||
                    eps: list = await ipc_server.listen_on(
 | 
			
		||||
                        actor=actor,
 | 
			
		||||
                        accept_addrs=accept_addrs,
 | 
			
		||||
                        stream_handler_nursery=service_nursery,
 | 
			
		||||
                    )
 | 
			
		||||
| 
						 | 
				
			
			@ -1916,9 +1461,8 @@ async def async_main(
 | 
			
		|||
                if actor._parent_chan:
 | 
			
		||||
                    await root_nursery.start(
 | 
			
		||||
                        partial(
 | 
			
		||||
                            process_messages,
 | 
			
		||||
                            actor,
 | 
			
		||||
                            actor._parent_chan,
 | 
			
		||||
                            _rpc.process_messages,
 | 
			
		||||
                            chan=actor._parent_chan,
 | 
			
		||||
                            shield=True,
 | 
			
		||||
                        )
 | 
			
		||||
                    )
 | 
			
		||||
| 
						 | 
				
			
			@ -1959,7 +1503,7 @@ async def async_main(
 | 
			
		|||
            log.exception(err_report)
 | 
			
		||||
 | 
			
		||||
        if actor._parent_chan:
 | 
			
		||||
            await try_ship_error_to_remote(
 | 
			
		||||
            await _rpc.try_ship_error_to_remote(
 | 
			
		||||
                actor._parent_chan,
 | 
			
		||||
                internal_err,
 | 
			
		||||
            )
 | 
			
		||||
| 
						 | 
				
			
			@ -2053,16 +1597,18 @@ async def async_main(
 | 
			
		|||
                    )
 | 
			
		||||
 | 
			
		||||
        # Ensure all peers (actors connected to us as clients) are finished
 | 
			
		||||
        if not actor._no_more_peers.is_set():
 | 
			
		||||
            if any(
 | 
			
		||||
                chan.connected() for chan in chain(*actor._peers.values())
 | 
			
		||||
            ):
 | 
			
		||||
                teardown_report += (
 | 
			
		||||
                    f'-> Waiting for remaining peers {actor._peers} to clear..\n'
 | 
			
		||||
                )
 | 
			
		||||
                log.runtime(teardown_report)
 | 
			
		||||
                with CancelScope(shield=True):
 | 
			
		||||
                    await actor._no_more_peers.wait()
 | 
			
		||||
        if (
 | 
			
		||||
            (ipc_server := actor.ipc_server)
 | 
			
		||||
            and
 | 
			
		||||
            ipc_server.has_peers(check_chans=True)
 | 
			
		||||
        ):
 | 
			
		||||
            teardown_report += (
 | 
			
		||||
                f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
 | 
			
		||||
            )
 | 
			
		||||
            log.runtime(teardown_report)
 | 
			
		||||
            await ipc_server.wait_for_no_more_peers(
 | 
			
		||||
                shield=True,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        teardown_report += (
 | 
			
		||||
            '-> All peer channels are complete\n'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -58,9 +58,11 @@ from tractor.msg.types import (
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from ipc import IPCServer
 | 
			
		||||
    from ._supervise import ActorNursery
 | 
			
		||||
    ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger('tractor')
 | 
			
		||||
 | 
			
		||||
# placeholder for an mp start context if so using that backend
 | 
			
		||||
| 
						 | 
				
			
			@ -481,6 +483,7 @@ async def trio_proc(
 | 
			
		|||
 | 
			
		||||
    cancelled_during_spawn: bool = False
 | 
			
		||||
    proc: trio.Process|None = None
 | 
			
		||||
    ipc_server: IPCServer = actor_nursery._actor.ipc_server
 | 
			
		||||
    try:
 | 
			
		||||
        try:
 | 
			
		||||
            proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
 | 
			
		||||
| 
						 | 
				
			
			@ -492,7 +495,7 @@ async def trio_proc(
 | 
			
		|||
            # wait for actor to spawn and connect back to us
 | 
			
		||||
            # channel should have handshake completed by the
 | 
			
		||||
            # local actor by the time we get a ref to it
 | 
			
		||||
            event, chan = await actor_nursery._actor.wait_for_peer(
 | 
			
		||||
            event, chan = await ipc_server.wait_for_peer(
 | 
			
		||||
                subactor.uid
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -724,11 +727,12 @@ async def mp_proc(
 | 
			
		|||
 | 
			
		||||
    log.runtime(f"Started {proc}")
 | 
			
		||||
 | 
			
		||||
    ipc_server: IPCServer = actor_nursery._actor.ipc_server
 | 
			
		||||
    try:
 | 
			
		||||
        # wait for actor to spawn and connect back to us
 | 
			
		||||
        # channel should have handshake completed by the
 | 
			
		||||
        # local actor by the time we get a ref to it
 | 
			
		||||
        event, chan = await actor_nursery._actor.wait_for_peer(
 | 
			
		||||
        event, chan = await ipc_server.wait_for_peer(
 | 
			
		||||
            subactor.uid,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -53,6 +53,9 @@ from . import _spawn
 | 
			
		|||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    import multiprocessing as mp
 | 
			
		||||
    # from .ipc._server import IPCServer
 | 
			
		||||
    from .ipc import IPCServer
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -315,6 +318,9 @@ class ActorNursery:
 | 
			
		|||
        children: dict = self._children
 | 
			
		||||
        child_count: int = len(children)
 | 
			
		||||
        msg: str = f'Cancelling actor nursery with {child_count} children\n'
 | 
			
		||||
 | 
			
		||||
        server: IPCServer = self._actor.ipc_server
 | 
			
		||||
 | 
			
		||||
        with trio.move_on_after(3) as cs:
 | 
			
		||||
            async with trio.open_nursery(
 | 
			
		||||
                strict_exception_groups=False,
 | 
			
		||||
| 
						 | 
				
			
			@ -337,7 +343,7 @@ class ActorNursery:
 | 
			
		|||
 | 
			
		||||
                    else:
 | 
			
		||||
                        if portal is None:  # actor hasn't fully spawned yet
 | 
			
		||||
                            event = self._actor._peer_connected[subactor.uid]
 | 
			
		||||
                            event: trio.Event = server._peer_connected[subactor.uid]
 | 
			
		||||
                            log.warning(
 | 
			
		||||
                                f"{subactor.uid} never 't finished spawning?"
 | 
			
		||||
                            )
 | 
			
		||||
| 
						 | 
				
			
			@ -353,7 +359,7 @@ class ActorNursery:
 | 
			
		|||
                            if portal is None:
 | 
			
		||||
                                # cancelled while waiting on the event
 | 
			
		||||
                                # to arrive
 | 
			
		||||
                                chan = self._actor._peers[subactor.uid][-1]
 | 
			
		||||
                                chan = server._peers[subactor.uid][-1]
 | 
			
		||||
                                if chan:
 | 
			
		||||
                                    portal = Portal(chan)
 | 
			
		||||
                                else:  # there's no other choice left
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -92,7 +92,11 @@ from tractor._state import (
 | 
			
		|||
if TYPE_CHECKING:
 | 
			
		||||
    from trio.lowlevel import Task
 | 
			
		||||
    from threading import Thread
 | 
			
		||||
    from tractor.ipc import Channel
 | 
			
		||||
    from tractor.ipc import (
 | 
			
		||||
        Channel,
 | 
			
		||||
        IPCServer,
 | 
			
		||||
        # _server,  # TODO? export at top level?
 | 
			
		||||
    )
 | 
			
		||||
    from tractor._runtime import (
 | 
			
		||||
        Actor,
 | 
			
		||||
    )
 | 
			
		||||
| 
						 | 
				
			
			@ -1434,6 +1438,7 @@ def any_connected_locker_child() -> bool:
 | 
			
		|||
 | 
			
		||||
    '''
 | 
			
		||||
    actor: Actor = current_actor()
 | 
			
		||||
    server: IPCServer = actor.ipc_server
 | 
			
		||||
 | 
			
		||||
    if not is_root_process():
 | 
			
		||||
        raise InternalError('This is a root-actor only API!')
 | 
			
		||||
| 
						 | 
				
			
			@ -1443,7 +1448,7 @@ def any_connected_locker_child() -> bool:
 | 
			
		|||
        and
 | 
			
		||||
        (uid_in_debug := ctx.chan.uid)
 | 
			
		||||
    ):
 | 
			
		||||
        chans: list[tractor.Channel] = actor._peers.get(
 | 
			
		||||
        chans: list[tractor.Channel] = server._peers.get(
 | 
			
		||||
            tuple(uid_in_debug)
 | 
			
		||||
        )
 | 
			
		||||
        if chans:
 | 
			
		||||
| 
						 | 
				
			
			@ -3003,6 +3008,7 @@ async def _maybe_enter_pm(
 | 
			
		|||
        [BaseException|BaseExceptionGroup],
 | 
			
		||||
        bool,
 | 
			
		||||
    ] = lambda err: not is_multi_cancelled(err),
 | 
			
		||||
    **_pause_kws,
 | 
			
		||||
 | 
			
		||||
):
 | 
			
		||||
    if (
 | 
			
		||||
| 
						 | 
				
			
			@ -3029,6 +3035,7 @@ async def _maybe_enter_pm(
 | 
			
		|||
        await post_mortem(
 | 
			
		||||
            api_frame=api_frame,
 | 
			
		||||
            tb=tb,
 | 
			
		||||
            **_pause_kws,
 | 
			
		||||
        )
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -49,6 +49,7 @@ from tractor.log import get_logger
 | 
			
		|||
from tractor._exceptions import (
 | 
			
		||||
    MsgTypeError,
 | 
			
		||||
    pack_from_raise,
 | 
			
		||||
    TransportClosed,
 | 
			
		||||
)
 | 
			
		||||
from tractor.msg import (
 | 
			
		||||
    Aid,
 | 
			
		||||
| 
						 | 
				
			
			@ -256,7 +257,7 @@ class Channel:
 | 
			
		|||
        self,
 | 
			
		||||
        payload: Any,
 | 
			
		||||
 | 
			
		||||
        hide_tb: bool = False,
 | 
			
		||||
        hide_tb: bool = True,
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
| 
						 | 
				
			
			@ -274,18 +275,27 @@ class Channel:
 | 
			
		|||
                payload,
 | 
			
		||||
                hide_tb=hide_tb,
 | 
			
		||||
            )
 | 
			
		||||
        except BaseException as _err:
 | 
			
		||||
        except (
 | 
			
		||||
            BaseException,
 | 
			
		||||
            MsgTypeError,
 | 
			
		||||
            TransportClosed,
 | 
			
		||||
        )as _err:
 | 
			
		||||
            err = _err  # bind for introspection
 | 
			
		||||
            if not isinstance(_err, MsgTypeError):
 | 
			
		||||
                # assert err
 | 
			
		||||
                __tracebackhide__: bool = False
 | 
			
		||||
            else:
 | 
			
		||||
                try:
 | 
			
		||||
                    assert err.cid
 | 
			
		||||
 | 
			
		||||
                except KeyError:
 | 
			
		||||
                    raise err
 | 
			
		||||
            match err:
 | 
			
		||||
                case MsgTypeError():
 | 
			
		||||
                    try:
 | 
			
		||||
                        assert err.cid
 | 
			
		||||
                    except KeyError:
 | 
			
		||||
                        raise err
 | 
			
		||||
                case TransportClosed():
 | 
			
		||||
                    log.transport(
 | 
			
		||||
                        f'Transport stream closed due to\n'
 | 
			
		||||
                        f'{err.repr_src_exc()}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                case _:
 | 
			
		||||
                    # never suppress non-tpt sources
 | 
			
		||||
                    __tracebackhide__: bool = False
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    async def recv(self) -> Any:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,11 +19,14 @@ multi-transport-protcol needs!
 | 
			
		|||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
from collections import defaultdict
 | 
			
		||||
from contextlib import (
 | 
			
		||||
    asynccontextmanager as acm,
 | 
			
		||||
)
 | 
			
		||||
from functools import partial
 | 
			
		||||
from itertools import chain
 | 
			
		||||
import inspect
 | 
			
		||||
from pprint import pformat
 | 
			
		||||
from types import (
 | 
			
		||||
    ModuleType,
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -40,24 +43,540 @@ from trio import (
 | 
			
		|||
    SocketListener,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from ..msg import Struct
 | 
			
		||||
# from ..devx import _debug
 | 
			
		||||
from .._exceptions import (
 | 
			
		||||
    TransportClosed,
 | 
			
		||||
)
 | 
			
		||||
from .. import _rpc
 | 
			
		||||
from ..msg import (
 | 
			
		||||
    MsgType,
 | 
			
		||||
    Struct,
 | 
			
		||||
    types as msgtypes,
 | 
			
		||||
)
 | 
			
		||||
from ..trionics import maybe_open_nursery
 | 
			
		||||
from .. import (
 | 
			
		||||
    _state,
 | 
			
		||||
    log,
 | 
			
		||||
)
 | 
			
		||||
from .._addr import Address
 | 
			
		||||
from ._chan import Channel
 | 
			
		||||
from ._transport import MsgTransport
 | 
			
		||||
from ._uds import UDSAddress
 | 
			
		||||
from ._tcp import TCPAddress
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from .._runtime import Actor
 | 
			
		||||
    from .._supervise import ActorNursery
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = log.get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def maybe_wait_on_canced_subs(
 | 
			
		||||
    uid: tuple[str, str],
 | 
			
		||||
    chan: Channel,
 | 
			
		||||
    disconnected: bool,
 | 
			
		||||
 | 
			
		||||
    actor: Actor|None = None,
 | 
			
		||||
    chan_drain_timeout: float = 0.5,
 | 
			
		||||
    an_exit_timeout: float = 0.5,
 | 
			
		||||
 | 
			
		||||
) -> ActorNursery|None:
 | 
			
		||||
    '''
 | 
			
		||||
    When a process-local actor-nursery is found for the given actor
 | 
			
		||||
    `uid` (i.e. that peer is **also** a subactor of this parent), we
 | 
			
		||||
    attempt to (with timeouts) wait on,
 | 
			
		||||
 | 
			
		||||
    - all IPC msgs to drain on the (common) `Channel` such that all
 | 
			
		||||
      local `Context`-parent-tasks can also gracefully collect
 | 
			
		||||
      `ContextCancelled` msgs from their respective remote children
 | 
			
		||||
      vs. a `chan_drain_timeout`.
 | 
			
		||||
 | 
			
		||||
    - the actor-nursery to cancel-n-join all its supervised children
 | 
			
		||||
      (processes) *gracefully* vs. a `an_exit_timeout` and thus also
 | 
			
		||||
      detect cases where the IPC transport connection broke but
 | 
			
		||||
      a sub-process is detected as still alive (a case that happens
 | 
			
		||||
      when the subactor is still in an active debugger REPL session).
 | 
			
		||||
 | 
			
		||||
    If the timeout expires in either case we ofc report with warning.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    actor = actor or _state.current_actor()
 | 
			
		||||
 | 
			
		||||
    # XXX running outside actor-runtime usage,
 | 
			
		||||
    # - unit testing
 | 
			
		||||
    # - possibly manual usage (eventually) ?
 | 
			
		||||
    if not actor:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    local_nursery: (
 | 
			
		||||
        ActorNursery|None
 | 
			
		||||
    ) = actor._actoruid2nursery.get(uid)
 | 
			
		||||
 | 
			
		||||
    # This is set in `Portal.cancel_actor()`. So if
 | 
			
		||||
    # the peer was cancelled we try to wait for them
 | 
			
		||||
    # to tear down their side of the connection before
 | 
			
		||||
    # moving on with closing our own side.
 | 
			
		||||
    if (
 | 
			
		||||
        local_nursery
 | 
			
		||||
        and (
 | 
			
		||||
            actor._cancel_called
 | 
			
		||||
            or
 | 
			
		||||
            chan._cancel_called
 | 
			
		||||
        )
 | 
			
		||||
        #
 | 
			
		||||
        # ^-TODO-^ along with this is there another condition
 | 
			
		||||
        # that we should filter with to avoid entering this
 | 
			
		||||
        # waiting block needlessly?
 | 
			
		||||
        # -[ ] maybe `and local_nursery.cancelled` and/or
 | 
			
		||||
        #     only if the `._children` table is empty or has
 | 
			
		||||
        #     only `Portal`s with .chan._cancel_called ==
 | 
			
		||||
        #     True` as per what we had below; the MAIN DIFF
 | 
			
		||||
        #     BEING that just bc one `Portal.cancel_actor()`
 | 
			
		||||
        #     was called, doesn't mean the whole actor-nurse
 | 
			
		||||
        #     is gonna exit any time soon right!?
 | 
			
		||||
        #
 | 
			
		||||
        # or
 | 
			
		||||
        # all(chan._cancel_called for chan in chans)
 | 
			
		||||
 | 
			
		||||
    ):
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            'Waiting on cancel request to peer..\n'
 | 
			
		||||
            f'c)=>\n'
 | 
			
		||||
            f'  |_{chan.uid}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # XXX: this is a soft wait on the channel (and its
 | 
			
		||||
        # underlying transport protocol) to close from the
 | 
			
		||||
        # remote peer side since we presume that any channel
 | 
			
		||||
        # which is mapped to a sub-actor (i.e. it's managed
 | 
			
		||||
        # by local actor-nursery) has a message that is sent
 | 
			
		||||
        # to the peer likely by this actor (which may be in
 | 
			
		||||
        # a shutdown sequence due to cancellation) when the
 | 
			
		||||
        # local runtime here is now cancelled while
 | 
			
		||||
        # (presumably) in the middle of msg loop processing.
 | 
			
		||||
        chan_info: str = (
 | 
			
		||||
            f'{chan.uid}\n'
 | 
			
		||||
            f'|_{chan}\n'
 | 
			
		||||
            f'  |_{chan.transport}\n\n'
 | 
			
		||||
        )
 | 
			
		||||
        with trio.move_on_after(chan_drain_timeout) as drain_cs:
 | 
			
		||||
            drain_cs.shield = True
 | 
			
		||||
 | 
			
		||||
            # attempt to wait for the far end to close the
 | 
			
		||||
            # channel and bail after timeout (a 2-generals
 | 
			
		||||
            # problem on closure).
 | 
			
		||||
            assert chan.transport
 | 
			
		||||
            async for msg in chan.transport.drain():
 | 
			
		||||
 | 
			
		||||
                # try to deliver any lingering msgs
 | 
			
		||||
                # before we destroy the channel.
 | 
			
		||||
                # This accomplishes deterministic
 | 
			
		||||
                # ``Portal.cancel_actor()`` cancellation by
 | 
			
		||||
                # making sure any RPC response to that call is
 | 
			
		||||
                # delivered the local calling task.
 | 
			
		||||
                # TODO: factor this into a helper?
 | 
			
		||||
                log.warning(
 | 
			
		||||
                    'Draining msg from disconnected peer\n'
 | 
			
		||||
                    f'{chan_info}'
 | 
			
		||||
                    f'{pformat(msg)}\n'
 | 
			
		||||
                )
 | 
			
		||||
                # cid: str|None = msg.get('cid')
 | 
			
		||||
                cid: str|None = msg.cid
 | 
			
		||||
                if cid:
 | 
			
		||||
                    # deliver response to local caller/waiter
 | 
			
		||||
                    await actor._deliver_ctx_payload(
 | 
			
		||||
                        chan,
 | 
			
		||||
                        cid,
 | 
			
		||||
                        msg,
 | 
			
		||||
                    )
 | 
			
		||||
        if drain_cs.cancelled_caught:
 | 
			
		||||
            log.warning(
 | 
			
		||||
                'Timed out waiting on IPC transport channel to drain?\n'
 | 
			
		||||
                f'{chan_info}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # XXX NOTE XXX when no explicit call to
 | 
			
		||||
        # `open_root_actor()` was made by the application
 | 
			
		||||
        # (normally we implicitly make that call inside
 | 
			
		||||
        # the first `.open_nursery()` in root-actor
 | 
			
		||||
        # user/app code), we can assume that either we
 | 
			
		||||
        # are NOT the root actor or are root but the
 | 
			
		||||
        # runtime was started manually. and thus DO have
 | 
			
		||||
        # to wait for the nursery-enterer to exit before
 | 
			
		||||
        # shutting down the local runtime to avoid
 | 
			
		||||
        # clobbering any ongoing subactor
 | 
			
		||||
        # teardown/debugging/graceful-cancel.
 | 
			
		||||
        #
 | 
			
		||||
        # see matching  note inside `._supervise.open_nursery()`
 | 
			
		||||
        #
 | 
			
		||||
        # TODO: should we have a separate cs + timeout
 | 
			
		||||
        # block here?
 | 
			
		||||
        if (
 | 
			
		||||
            # XXX SO either,
 | 
			
		||||
            #  - not root OR,
 | 
			
		||||
            #  - is root but `open_root_actor()` was
 | 
			
		||||
            #    entered manually (in which case we do
 | 
			
		||||
            #    the equiv wait there using the
 | 
			
		||||
            #    `devx.debug` sub-sys APIs).
 | 
			
		||||
            not local_nursery._implicit_runtime_started
 | 
			
		||||
        ):
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'Waiting on local actor nursery to exit..\n'
 | 
			
		||||
                f'|_{local_nursery}\n'
 | 
			
		||||
            )
 | 
			
		||||
            with trio.move_on_after(an_exit_timeout) as an_exit_cs:
 | 
			
		||||
                an_exit_cs.shield = True
 | 
			
		||||
                await local_nursery.exited.wait()
 | 
			
		||||
 | 
			
		||||
            # TODO: currently this is always triggering for every
 | 
			
		||||
            # sub-daemon spawned from the `piker.services._mngr`?
 | 
			
		||||
            # -[ ] how do we ensure that the IPC is supposed to
 | 
			
		||||
            #      be long lived and isn't just a register?
 | 
			
		||||
            # |_ in the register case how can we signal that the
 | 
			
		||||
            #    ephemeral msg loop was intentional?
 | 
			
		||||
            if (
 | 
			
		||||
                # not local_nursery._implicit_runtime_started
 | 
			
		||||
                # and
 | 
			
		||||
                an_exit_cs.cancelled_caught
 | 
			
		||||
            ):
 | 
			
		||||
                report: str = (
 | 
			
		||||
                    'Timed out waiting on local actor-nursery to exit?\n'
 | 
			
		||||
                    f'c)>\n'
 | 
			
		||||
                    f' |_{local_nursery}\n'
 | 
			
		||||
                )
 | 
			
		||||
                if children := local_nursery._children:
 | 
			
		||||
                    # indent from above local-nurse repr
 | 
			
		||||
                    report += (
 | 
			
		||||
                        f'   |_{pformat(children)}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                log.warning(report)
 | 
			
		||||
 | 
			
		||||
        if disconnected:
 | 
			
		||||
            # if the transport died and this actor is still
 | 
			
		||||
            # registered within a local nursery, we report
 | 
			
		||||
            # that the IPC layer may have failed
 | 
			
		||||
            # unexpectedly since it may be the cause of
 | 
			
		||||
            # other downstream errors.
 | 
			
		||||
            entry: tuple|None = local_nursery._children.get(uid)
 | 
			
		||||
            if entry:
 | 
			
		||||
                proc: trio.Process
 | 
			
		||||
                _, proc, _ = entry
 | 
			
		||||
 | 
			
		||||
                if (
 | 
			
		||||
                    (poll := getattr(proc, 'poll', None))
 | 
			
		||||
                    and
 | 
			
		||||
                    poll() is None  # proc still alive
 | 
			
		||||
                ):
 | 
			
		||||
                    # TODO: change log level based on
 | 
			
		||||
                    # detecting whether chan was created for
 | 
			
		||||
                    # ephemeral `.register_actor()` request!
 | 
			
		||||
                    # -[ ] also, that should be avoidable by
 | 
			
		||||
                    #   re-using any existing chan from the
 | 
			
		||||
                    #   `._discovery.get_registry()` call as
 | 
			
		||||
                    #   well..
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        f'Peer IPC broke but subproc is alive?\n\n'
 | 
			
		||||
 | 
			
		||||
                        f'<=x {chan.uid}@{chan.raddr}\n'
 | 
			
		||||
                        f'   |_{proc}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
    return local_nursery
 | 
			
		||||
 | 
			
		||||
# TODO multi-tpt support with per-proto peer tracking?
 | 
			
		||||
#
 | 
			
		||||
# -[x] maybe change to mod-func and rename for implied
 | 
			
		||||
#    multi-transport semantics?
 | 
			
		||||
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
 | 
			
		||||
#     so that we can query per tpt all peer contact infos?
 | 
			
		||||
#  |_[ ] possibly provide a global viewing via a
 | 
			
		||||
#        `collections.ChainMap`?
 | 
			
		||||
#
 | 
			
		||||
async def handle_stream_from_peer(
 | 
			
		||||
    stream: trio.SocketStream,
 | 
			
		||||
 | 
			
		||||
    *,
 | 
			
		||||
    server: IPCServer,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
    Top-level `trio.abc.Stream` (i.e. normally `trio.SocketStream`)
 | 
			
		||||
    handler-callback as spawn-invoked by `trio.serve_listeners()`.
 | 
			
		||||
 | 
			
		||||
    Note that each call to this handler is as a spawned task inside
 | 
			
		||||
    any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
 | 
			
		||||
    such that it is invoked as,
 | 
			
		||||
 | 
			
		||||
      IPCEndpoint.stream_handler_tn.start_soon(
 | 
			
		||||
          handle_stream,
 | 
			
		||||
          stream,
 | 
			
		||||
      )
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    server._no_more_peers = trio.Event()  # unset by making new
 | 
			
		||||
 | 
			
		||||
    # TODO, debug_mode tooling for when hackin this lower layer?
 | 
			
		||||
    # with _debug.maybe_open_crash_handler(
 | 
			
		||||
    #     pdb=True,
 | 
			
		||||
    # ) as boxerr:
 | 
			
		||||
 | 
			
		||||
    chan = Channel.from_stream(stream)
 | 
			
		||||
    con_status: str = (
 | 
			
		||||
        'New inbound IPC connection <=\n'
 | 
			
		||||
        f'|_{chan}\n'
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # initial handshake with peer phase
 | 
			
		||||
    try:
 | 
			
		||||
        if actor := _state.current_actor():
 | 
			
		||||
            peer_aid: msgtypes.Aid = await chan._do_handshake(
 | 
			
		||||
                aid=actor.aid,
 | 
			
		||||
            )
 | 
			
		||||
    except (
 | 
			
		||||
        TransportClosed,
 | 
			
		||||
        # ^XXX NOTE, the above wraps `trio` exc types raised
 | 
			
		||||
        # during various `SocketStream.send/receive_xx()` calls
 | 
			
		||||
        # under different fault conditions such as,
 | 
			
		||||
        #
 | 
			
		||||
        # trio.BrokenResourceError,
 | 
			
		||||
        # trio.ClosedResourceError,
 | 
			
		||||
        #
 | 
			
		||||
        # Inside our `.ipc._transport` layer we absorb and
 | 
			
		||||
        # re-raise our own `TransportClosed` exc such that this
 | 
			
		||||
        # higher level runtime code can only worry one
 | 
			
		||||
        # "kinda-error" that we expect to tolerate during
 | 
			
		||||
        # discovery-sys related pings, queires, DoS etc.
 | 
			
		||||
    ):
 | 
			
		||||
        # XXX: This may propagate up from `Channel._aiter_recv()`
 | 
			
		||||
        # and `MsgpackStream._inter_packets()` on a read from the
 | 
			
		||||
        # stream particularly when the runtime is first starting up
 | 
			
		||||
        # inside `open_root_actor()` where there is a check for
 | 
			
		||||
        # a bound listener on the "arbiter" addr.  the reset will be
 | 
			
		||||
        # because the handshake was never meant took place.
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            con_status
 | 
			
		||||
            +
 | 
			
		||||
            ' -> But failed to handshake? Ignoring..\n'
 | 
			
		||||
        )
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    uid: tuple[str, str] = (
 | 
			
		||||
        peer_aid.name,
 | 
			
		||||
        peer_aid.uuid,
 | 
			
		||||
    )
 | 
			
		||||
    # TODO, can we make this downstream peer tracking use the
 | 
			
		||||
    # `peer_aid` instead?
 | 
			
		||||
    familiar: str = 'new-peer'
 | 
			
		||||
    if _pre_chan := server._peers.get(uid):
 | 
			
		||||
        familiar: str = 'pre-existing-peer'
 | 
			
		||||
    uid_short: str = f'{uid[0]}[{uid[1][-6:]}]'
 | 
			
		||||
    con_status += (
 | 
			
		||||
        f' -> Handshake with {familiar} `{uid_short}` complete\n'
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    if _pre_chan:
 | 
			
		||||
        # con_status += (
 | 
			
		||||
        # ^TODO^ swap once we minimize conn duplication
 | 
			
		||||
        # -[ ] last thing might be reg/unreg runtime reqs?
 | 
			
		||||
        # log.warning(
 | 
			
		||||
        log.debug(
 | 
			
		||||
            f'?Wait?\n'
 | 
			
		||||
            f'We already have IPC with peer {uid_short!r}\n'
 | 
			
		||||
            f'|_{_pre_chan}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # IPC connection tracking for both peers and new children:
 | 
			
		||||
    # - if this is a new channel to a locally spawned
 | 
			
		||||
    #   sub-actor there will be a spawn wait even registered
 | 
			
		||||
    #   by a call to `.wait_for_peer()`.
 | 
			
		||||
    # - if a peer is connecting no such event will exit.
 | 
			
		||||
    event: trio.Event|None = server._peer_connected.pop(
 | 
			
		||||
        uid,
 | 
			
		||||
        None,
 | 
			
		||||
    )
 | 
			
		||||
    if event:
 | 
			
		||||
        con_status += (
 | 
			
		||||
            ' -> Waking subactor spawn waiters: '
 | 
			
		||||
            f'{event.statistics().tasks_waiting}\n'
 | 
			
		||||
            f' -> Registered IPC chan for child actor {uid}@{chan.raddr}\n'
 | 
			
		||||
            # f'    {event}\n'
 | 
			
		||||
            # f'    |{event.statistics()}\n'
 | 
			
		||||
        )
 | 
			
		||||
        # wake tasks waiting on this IPC-transport "connect-back"
 | 
			
		||||
        event.set()
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        con_status += (
 | 
			
		||||
            f' -> Registered IPC chan for peer actor {uid}@{chan.raddr}\n'
 | 
			
		||||
        )  # type: ignore
 | 
			
		||||
 | 
			
		||||
    chans: list[Channel] = server._peers[uid]
 | 
			
		||||
    # if chans:
 | 
			
		||||
    #     # TODO: re-use channels for new connections instead
 | 
			
		||||
    #     # of always new ones?
 | 
			
		||||
    #     # => will require changing all the discovery funcs..
 | 
			
		||||
 | 
			
		||||
    # append new channel
 | 
			
		||||
    # TODO: can we just use list-ref directly?
 | 
			
		||||
    chans.append(chan)
 | 
			
		||||
 | 
			
		||||
    con_status += ' -> Entering RPC msg loop..\n'
 | 
			
		||||
    log.runtime(con_status)
 | 
			
		||||
 | 
			
		||||
    # Begin channel management - respond to remote requests and
 | 
			
		||||
    # process received reponses.
 | 
			
		||||
    disconnected: bool = False
 | 
			
		||||
    last_msg: MsgType
 | 
			
		||||
    try:
 | 
			
		||||
        (
 | 
			
		||||
            disconnected,
 | 
			
		||||
            last_msg,
 | 
			
		||||
        ) = await _rpc.process_messages(
 | 
			
		||||
            chan=chan,
 | 
			
		||||
        )
 | 
			
		||||
    except trio.Cancelled:
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            'IPC transport msg loop was cancelled\n'
 | 
			
		||||
            f'c)>\n'
 | 
			
		||||
            f' |_{chan}\n'
 | 
			
		||||
        )
 | 
			
		||||
        raise
 | 
			
		||||
 | 
			
		||||
    finally:
 | 
			
		||||
 | 
			
		||||
        # check if there are subs which we should gracefully join at
 | 
			
		||||
        # both the inter-actor-task and subprocess levels to
 | 
			
		||||
        # gracefully remote cancel and later disconnect (particularly
 | 
			
		||||
        # for permitting subs engaged in active debug-REPL sessions).
 | 
			
		||||
        local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
 | 
			
		||||
            uid=uid,
 | 
			
		||||
            chan=chan,
 | 
			
		||||
            disconnected=disconnected,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # ``Channel`` teardown and closure sequence
 | 
			
		||||
        # drop ref to channel so it can be gc-ed and disconnected
 | 
			
		||||
        con_teardown_status: str = (
 | 
			
		||||
            f'IPC channel disconnected:\n'
 | 
			
		||||
            f'<=x uid: {chan.uid}\n'
 | 
			
		||||
            f'   |_{pformat(chan)}\n\n'
 | 
			
		||||
        )
 | 
			
		||||
        chans.remove(chan)
 | 
			
		||||
 | 
			
		||||
        # TODO: do we need to be this pedantic?
 | 
			
		||||
        if not chans:
 | 
			
		||||
            con_teardown_status += (
 | 
			
		||||
                f'-> No more channels with {chan.uid}'
 | 
			
		||||
            )
 | 
			
		||||
            server._peers.pop(uid, None)
 | 
			
		||||
 | 
			
		||||
        peers_str: str = ''
 | 
			
		||||
        for uid, chans in server._peers.items():
 | 
			
		||||
            peers_str += (
 | 
			
		||||
                f'uid: {uid}\n'
 | 
			
		||||
            )
 | 
			
		||||
            for i, chan in enumerate(chans):
 | 
			
		||||
                peers_str += (
 | 
			
		||||
                    f' |_[{i}] {pformat(chan)}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        con_teardown_status += (
 | 
			
		||||
            f'-> Remaining IPC {len(server._peers)} peers: {peers_str}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # No more channels to other actors (at all) registered
 | 
			
		||||
        # as connected.
 | 
			
		||||
        if not server._peers:
 | 
			
		||||
            con_teardown_status += (
 | 
			
		||||
                'Signalling no more peer channel connections'
 | 
			
		||||
            )
 | 
			
		||||
            server._no_more_peers.set()
 | 
			
		||||
 | 
			
		||||
            # NOTE: block this actor from acquiring the
 | 
			
		||||
            # debugger-TTY-lock since we have no way to know if we
 | 
			
		||||
            # cancelled it and further there is no way to ensure the
 | 
			
		||||
            # lock will be released if acquired due to having no
 | 
			
		||||
            # more active IPC channels.
 | 
			
		||||
            if (
 | 
			
		||||
                _state.is_root_process()
 | 
			
		||||
                and
 | 
			
		||||
                _state.is_debug_mode()
 | 
			
		||||
            ):
 | 
			
		||||
                from ..devx import _debug
 | 
			
		||||
                pdb_lock = _debug.Lock
 | 
			
		||||
                pdb_lock._blocked.add(uid)
 | 
			
		||||
 | 
			
		||||
                # TODO: NEEEDS TO BE TESTED!
 | 
			
		||||
                # actually, no idea if this ever even enters.. XD
 | 
			
		||||
                #
 | 
			
		||||
                # XXX => YES IT DOES, when i was testing ctl-c
 | 
			
		||||
                # from broken debug TTY locking due to
 | 
			
		||||
                # msg-spec races on application using RunVar...
 | 
			
		||||
                if (
 | 
			
		||||
                    local_nursery
 | 
			
		||||
                    and
 | 
			
		||||
                    (ctx_in_debug := pdb_lock.ctx_in_debug)
 | 
			
		||||
                    and
 | 
			
		||||
                    (pdb_user_uid := ctx_in_debug.chan.uid)
 | 
			
		||||
                ):
 | 
			
		||||
                    entry: tuple|None = local_nursery._children.get(
 | 
			
		||||
                        tuple(pdb_user_uid)
 | 
			
		||||
                    )
 | 
			
		||||
                    if entry:
 | 
			
		||||
                        proc: trio.Process
 | 
			
		||||
                        _, proc, _ = entry
 | 
			
		||||
 | 
			
		||||
                        if (
 | 
			
		||||
                            (poll := getattr(proc, 'poll', None))
 | 
			
		||||
                            and poll() is None
 | 
			
		||||
                        ):
 | 
			
		||||
                            log.cancel(
 | 
			
		||||
                                'Root actor reports no-more-peers, BUT\n'
 | 
			
		||||
                                'a DISCONNECTED child still has the debug '
 | 
			
		||||
                                'lock!\n\n'
 | 
			
		||||
                                # f'root uid: {actor.uid}\n'
 | 
			
		||||
                                f'last disconnected child uid: {uid}\n'
 | 
			
		||||
                                f'locking child uid: {pdb_user_uid}\n'
 | 
			
		||||
                            )
 | 
			
		||||
                            await _debug.maybe_wait_for_debugger(
 | 
			
		||||
                                child_in_debug=True
 | 
			
		||||
                            )
 | 
			
		||||
 | 
			
		||||
                # TODO: just bc a child's transport dropped
 | 
			
		||||
                # doesn't mean it's not still using the pdb
 | 
			
		||||
                # REPL! so,
 | 
			
		||||
                # -[ ] ideally we can check out child proc
 | 
			
		||||
                #  tree to ensure that its alive (and
 | 
			
		||||
                #  actually using the REPL) before we cancel
 | 
			
		||||
                #  it's lock acquire by doing the below!
 | 
			
		||||
                # -[ ] create a way to read the tree of each actor's
 | 
			
		||||
                #  grandchildren such that when an
 | 
			
		||||
                #  intermediary parent is cancelled but their
 | 
			
		||||
                #  child has locked the tty, the grandparent
 | 
			
		||||
                #  will not allow the parent to cancel or
 | 
			
		||||
                #  zombie reap the child! see open issue:
 | 
			
		||||
                #  - https://github.com/goodboy/tractor/issues/320
 | 
			
		||||
                # ------ - ------
 | 
			
		||||
                # if a now stale local task has the TTY lock still
 | 
			
		||||
                # we cancel it to allow servicing other requests for
 | 
			
		||||
                # the lock.
 | 
			
		||||
                if (
 | 
			
		||||
                    (db_cs := pdb_lock.get_locking_task_cs())
 | 
			
		||||
                    and not db_cs.cancel_called
 | 
			
		||||
                    and uid == pdb_user_uid
 | 
			
		||||
                ):
 | 
			
		||||
                    log.critical(
 | 
			
		||||
                        f'STALE DEBUG LOCK DETECTED FOR {uid}'
 | 
			
		||||
                    )
 | 
			
		||||
                    # TODO: figure out why this breaks tests..
 | 
			
		||||
                    db_cs.cancel()
 | 
			
		||||
 | 
			
		||||
        log.runtime(con_teardown_status)
 | 
			
		||||
    # finally block closure
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IPCEndpoint(Struct):
 | 
			
		||||
    '''
 | 
			
		||||
    An instance of an IPC "bound" address where the lifetime of the
 | 
			
		||||
| 
						 | 
				
			
			@ -120,8 +639,23 @@ class IPCEndpoint(Struct):
 | 
			
		|||
class IPCServer(Struct):
 | 
			
		||||
    _parent_tn: Nursery
 | 
			
		||||
    _stream_handler_tn: Nursery
 | 
			
		||||
    # level-triggered sig for whether "no peers are currently
 | 
			
		||||
    # connected"; field is **always** set to an instance but
 | 
			
		||||
    # initialized with `.is_set() == True`.
 | 
			
		||||
    _no_more_peers: trio.Event
 | 
			
		||||
 | 
			
		||||
    _endpoints: list[IPCEndpoint] = []
 | 
			
		||||
 | 
			
		||||
    # connection tracking & mgmt
 | 
			
		||||
    _peers: defaultdict[
 | 
			
		||||
        str,  # uaid
 | 
			
		||||
        list[Channel],  # IPC conns from peer
 | 
			
		||||
    ] = defaultdict(list)
 | 
			
		||||
    _peer_connected: dict[
 | 
			
		||||
        tuple[str, str],
 | 
			
		||||
        trio.Event,
 | 
			
		||||
    ] = {}
 | 
			
		||||
 | 
			
		||||
    # syncs for setup/teardown sequences
 | 
			
		||||
    _shutdown: trio.Event|None = None
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -183,6 +717,65 @@ class IPCServer(Struct):
 | 
			
		|||
                f'protos: {tpt_protos!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def has_peers(
 | 
			
		||||
        self,
 | 
			
		||||
        check_chans: bool = False,
 | 
			
		||||
    ) -> bool:
 | 
			
		||||
        '''
 | 
			
		||||
        Predicate for "are there any active peer IPC `Channel`s at the moment?"
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        has_peers: bool = not self._no_more_peers.is_set()
 | 
			
		||||
        if (
 | 
			
		||||
            has_peers
 | 
			
		||||
            and
 | 
			
		||||
            check_chans
 | 
			
		||||
        ):
 | 
			
		||||
            has_peers: bool = (
 | 
			
		||||
                any(chan.connected()
 | 
			
		||||
                    for chan in chain(
 | 
			
		||||
                        *self._peers.values()
 | 
			
		||||
                    )
 | 
			
		||||
                )
 | 
			
		||||
                and
 | 
			
		||||
                has_peers
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        return has_peers
 | 
			
		||||
 | 
			
		||||
    async def wait_for_no_more_peers(
 | 
			
		||||
        self,
 | 
			
		||||
        shield: bool = False,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        with trio.CancelScope(shield=shield):
 | 
			
		||||
            await self._no_more_peers.wait()
 | 
			
		||||
 | 
			
		||||
    async def wait_for_peer(
 | 
			
		||||
        self,
 | 
			
		||||
        uid: tuple[str, str],
 | 
			
		||||
 | 
			
		||||
    ) -> tuple[trio.Event, Channel]:
 | 
			
		||||
        '''
 | 
			
		||||
        Wait for a connection back from a (spawned sub-)actor with
 | 
			
		||||
        a `uid` using a `trio.Event`.
 | 
			
		||||
 | 
			
		||||
        Returns a pair of the event and the "last" registered IPC
 | 
			
		||||
        `Channel` for the peer with `uid`.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        log.debug(f'Waiting for peer {uid!r} to connect')
 | 
			
		||||
        event: trio.Event = self._peer_connected.setdefault(
 | 
			
		||||
            uid,
 | 
			
		||||
            trio.Event(),
 | 
			
		||||
        )
 | 
			
		||||
        await event.wait()
 | 
			
		||||
        log.debug(f'{uid!r} successfully connected back to us')
 | 
			
		||||
        mru_chan: Channel = self._peers[uid][-1]
 | 
			
		||||
        return (
 | 
			
		||||
            event,
 | 
			
		||||
            mru_chan,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def addrs(self) -> list[Address]:
 | 
			
		||||
        return [ep.addr for ep in self._endpoints]
 | 
			
		||||
| 
						 | 
				
			
			@ -211,17 +804,27 @@ class IPCServer(Struct):
 | 
			
		|||
        return ev.is_set()
 | 
			
		||||
 | 
			
		||||
    def pformat(self) -> str:
 | 
			
		||||
        eps: list[IPCEndpoint] = self._endpoints
 | 
			
		||||
 | 
			
		||||
        fmtstr: str = (
 | 
			
		||||
            f' |_endpoints: {self._endpoints}\n'
 | 
			
		||||
        state_repr: str = (
 | 
			
		||||
            f'{len(eps)!r} IPC-endpoints active'
 | 
			
		||||
        )
 | 
			
		||||
        fmtstr = (
 | 
			
		||||
            f' |_state: {state_repr}\n'
 | 
			
		||||
            f'   no_more_peers: {self.has_peers()}\n'
 | 
			
		||||
        )
 | 
			
		||||
        if self._shutdown is not None:
 | 
			
		||||
            shutdown_stats: EventStatistics = self._shutdown.statistics()
 | 
			
		||||
            fmtstr += (
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f' |_shutdown: {shutdown_stats}\n'
 | 
			
		||||
                f'   task_waiting_on_shutdown: {shutdown_stats}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        fmtstr += (
 | 
			
		||||
            # TODO, use the `ppfmt()` helper from `modden`!
 | 
			
		||||
            f' |_endpoints: {pformat(self._endpoints)}\n'
 | 
			
		||||
            f' |_peers: {len(self._peers)} connected\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        return (
 | 
			
		||||
            f'<IPCServer(\n'
 | 
			
		||||
            f'{fmtstr}'
 | 
			
		||||
| 
						 | 
				
			
			@ -249,7 +852,6 @@ class IPCServer(Struct):
 | 
			
		|||
    async def listen_on(
 | 
			
		||||
        self,
 | 
			
		||||
        *,
 | 
			
		||||
        actor: Actor,
 | 
			
		||||
        accept_addrs: list[tuple[str, int|str]]|None = None,
 | 
			
		||||
        stream_handler_nursery: Nursery|None = None,
 | 
			
		||||
    ) -> list[IPCEndpoint]:
 | 
			
		||||
| 
						 | 
				
			
			@ -282,20 +884,19 @@ class IPCServer(Struct):
 | 
			
		|||
                f'{self}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        log.info(
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Binding to endpoints for,\n'
 | 
			
		||||
            f'{accept_addrs}\n'
 | 
			
		||||
        )
 | 
			
		||||
        eps: list[IPCEndpoint] = await self._parent_tn.start(
 | 
			
		||||
            partial(
 | 
			
		||||
                _serve_ipc_eps,
 | 
			
		||||
                actor=actor,
 | 
			
		||||
                server=self,
 | 
			
		||||
                stream_handler_tn=stream_handler_nursery,
 | 
			
		||||
                listen_addrs=accept_addrs,
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        log.info(
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Started IPC endpoints\n'
 | 
			
		||||
            f'{eps}\n'
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -318,7 +919,6 @@ class IPCServer(Struct):
 | 
			
		|||
 | 
			
		||||
async def _serve_ipc_eps(
 | 
			
		||||
    *,
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
    server: IPCServer,
 | 
			
		||||
    stream_handler_tn: Nursery,
 | 
			
		||||
    listen_addrs: list[tuple[str, int|str]],
 | 
			
		||||
| 
						 | 
				
			
			@ -352,12 +952,13 @@ async def _serve_ipc_eps(
 | 
			
		|||
                    stream_handler_tn=stream_handler_tn,
 | 
			
		||||
                )
 | 
			
		||||
                try:
 | 
			
		||||
                    log.info(
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        f'Starting new endpoint listener\n'
 | 
			
		||||
                        f'{ep}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    listener: trio.abc.Listener = await ep.start_listener()
 | 
			
		||||
                    assert listener is ep._listener
 | 
			
		||||
                    # actor = _state.current_actor()
 | 
			
		||||
                    # if actor.is_registry:
 | 
			
		||||
                    #     import pdbp; pdbp.set_trace()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -379,7 +980,10 @@ async def _serve_ipc_eps(
 | 
			
		|||
            _listeners: list[SocketListener] = await listen_tn.start(
 | 
			
		||||
                partial(
 | 
			
		||||
                    trio.serve_listeners,
 | 
			
		||||
                    handler=actor._stream_handler,
 | 
			
		||||
                    handler=partial(
 | 
			
		||||
                        handle_stream_from_peer,
 | 
			
		||||
                        server=server,
 | 
			
		||||
                    ),
 | 
			
		||||
                    listeners=listeners,
 | 
			
		||||
 | 
			
		||||
                    # NOTE: configured such that new
 | 
			
		||||
| 
						 | 
				
			
			@ -389,13 +993,13 @@ async def _serve_ipc_eps(
 | 
			
		|||
                )
 | 
			
		||||
            )
 | 
			
		||||
            # TODO, wow make this message better! XD
 | 
			
		||||
            log.info(
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'Started server(s)\n'
 | 
			
		||||
                +
 | 
			
		||||
                '\n'.join([f'|_{addr}' for addr in listen_addrs])
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            log.info(
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                f'Started IPC endpoints\n'
 | 
			
		||||
                f'{eps}\n'
 | 
			
		||||
            )
 | 
			
		||||
| 
						 | 
				
			
			@ -411,6 +1015,7 @@ async def _serve_ipc_eps(
 | 
			
		|||
                ep.close_listener()
 | 
			
		||||
                server._endpoints.remove(ep)
 | 
			
		||||
 | 
			
		||||
        # actor = _state.current_actor()
 | 
			
		||||
        # if actor.is_arbiter:
 | 
			
		||||
        #     import pdbp; pdbp.set_trace()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -421,7 +1026,6 @@ async def _serve_ipc_eps(
 | 
			
		|||
 | 
			
		||||
@acm
 | 
			
		||||
async def open_ipc_server(
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
    parent_tn: Nursery|None = None,
 | 
			
		||||
    stream_handler_tn: Nursery|None = None,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -430,20 +1034,28 @@ async def open_ipc_server(
 | 
			
		|||
    async with maybe_open_nursery(
 | 
			
		||||
        nursery=parent_tn,
 | 
			
		||||
    ) as rent_tn:
 | 
			
		||||
        no_more_peers = trio.Event()
 | 
			
		||||
        no_more_peers.set()
 | 
			
		||||
 | 
			
		||||
        ipc_server = IPCServer(
 | 
			
		||||
            _parent_tn=rent_tn,
 | 
			
		||||
            _stream_handler_tn=stream_handler_tn or rent_tn,
 | 
			
		||||
            _no_more_peers=no_more_peers,
 | 
			
		||||
        )
 | 
			
		||||
        try:
 | 
			
		||||
            yield ipc_server
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                f'Waiting on server to shutdown or be cancelled..\n'
 | 
			
		||||
                f'{ipc_server}'
 | 
			
		||||
            )
 | 
			
		||||
            # TODO? when if ever would we want/need this?
 | 
			
		||||
            # with trio.CancelScope(shield=True):
 | 
			
		||||
            #     await ipc_server.wait_for_shutdown()
 | 
			
		||||
 | 
			
		||||
        # except BaseException as berr:
 | 
			
		||||
        #     log.exception(
 | 
			
		||||
        #         'IPC server crashed on exit ?'
 | 
			
		||||
        #     )
 | 
			
		||||
        #     raise berr
 | 
			
		||||
 | 
			
		||||
        finally:
 | 
			
		||||
        except BaseException as berr:
 | 
			
		||||
            log.exception(
 | 
			
		||||
                'IPC server caller crashed ??'
 | 
			
		||||
            )
 | 
			
		||||
            # ?TODO, maybe we can ensure the endpoints are torndown
 | 
			
		||||
            # (and thus their managed listeners) beforehand to ensure
 | 
			
		||||
            # super graceful RPC mechanics?
 | 
			
		||||
| 
						 | 
				
			
			@ -451,17 +1063,5 @@ async def open_ipc_server(
 | 
			
		|||
            # -[ ] but aren't we doing that already per-`listen_tn`
 | 
			
		||||
            #      inside `_serve_ipc_eps()` above?
 | 
			
		||||
            #
 | 
			
		||||
            # if not ipc_server.is_shutdown():
 | 
			
		||||
            #     ipc_server.cancel()
 | 
			
		||||
            #     await ipc_server.wait_for_shutdown()
 | 
			
		||||
            # assert ipc_server.is_shutdown()
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
            # !XXX TODO! lol so classic, the below code is rekt!
 | 
			
		||||
            #
 | 
			
		||||
            # XXX here is a perfect example of suppressing errors with
 | 
			
		||||
            # `trio.Cancelled` as per our demonstrating example,
 | 
			
		||||
            # `test_trioisms::test_acm_embedded_nursery_propagates_enter_err
 | 
			
		||||
            #
 | 
			
		||||
            # with trio.CancelScope(shield=True):
 | 
			
		||||
            #     await ipc_server.wait_for_shutdown()
 | 
			
		||||
            # ipc_server.cancel()
 | 
			
		||||
            raise berr
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -127,6 +127,11 @@ async def start_listener(
 | 
			
		|||
    Start a TCP socket listener on the given `TCPAddress`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    log.info(
 | 
			
		||||
        f'Attempting to bind TCP socket\n'
 | 
			
		||||
        f'>[\n'
 | 
			
		||||
        f'|_{addr}\n'
 | 
			
		||||
    )
 | 
			
		||||
    # ?TODO, maybe we should just change the lower-level call this is
 | 
			
		||||
    # using internall per-listener?
 | 
			
		||||
    listeners: list[SocketListener] = await open_tcp_listeners(
 | 
			
		||||
| 
						 | 
				
			
			@ -140,6 +145,12 @@ async def start_listener(
 | 
			
		|||
    assert len(listeners) == 1
 | 
			
		||||
    listener = listeners[0]
 | 
			
		||||
    host, port = listener.socket.getsockname()[:2]
 | 
			
		||||
 | 
			
		||||
    log.info(
 | 
			
		||||
        f'Listening on TCP socket\n'
 | 
			
		||||
        f'[>\n'
 | 
			
		||||
        f' |_{addr}\n'
 | 
			
		||||
    )
 | 
			
		||||
    return listener
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue