Compare commits
	
		
			3 Commits 
		
	
	
		
			6445f1cde4
			...
			fbc9325184
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						fbc9325184 | |
| 
							
							
								 | 
						3cd222959a | |
| 
							
							
								 | 
						2ea703cc75 | 
| 
						 | 
				
			
			@ -0,0 +1,4 @@
 | 
			
		|||
'''
 | 
			
		||||
`tractor.ipc` subsystem(s)/unit testing suites.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,72 @@
 | 
			
		|||
'''
 | 
			
		||||
High-level `.ipc._server` unit tests.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
import trio
 | 
			
		||||
from tractor import (
 | 
			
		||||
    devx,
 | 
			
		||||
    ipc,
 | 
			
		||||
    log,
 | 
			
		||||
)
 | 
			
		||||
from tractor._testing.addr import (
 | 
			
		||||
    get_rando_addr,
 | 
			
		||||
)
 | 
			
		||||
# TODO, use/check-roundtripping with some of these wrapper types?
 | 
			
		||||
#
 | 
			
		||||
# from .._addr import Address
 | 
			
		||||
# from ._chan import Channel
 | 
			
		||||
# from ._transport import MsgTransport
 | 
			
		||||
# from ._uds import UDSAddress
 | 
			
		||||
# from ._tcp import TCPAddress
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    '_tpt_proto',
 | 
			
		||||
    ['uds', 'tcp']
 | 
			
		||||
)
 | 
			
		||||
def test_basic_ipc_server(
 | 
			
		||||
    _tpt_proto: str,
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    loglevel: str,
 | 
			
		||||
):
 | 
			
		||||
 | 
			
		||||
    # so we see the socket-listener reporting on console
 | 
			
		||||
    log.get_console_log("INFO")
 | 
			
		||||
 | 
			
		||||
    rando_addr: tuple = get_rando_addr(
 | 
			
		||||
        tpt_proto=_tpt_proto,
 | 
			
		||||
    )
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with ipc._server.open_ipc_server() as server:
 | 
			
		||||
 | 
			
		||||
            assert (
 | 
			
		||||
                server._parent_tn
 | 
			
		||||
                and
 | 
			
		||||
                server._parent_tn is server._stream_handler_tn
 | 
			
		||||
            )
 | 
			
		||||
            assert server._no_more_peers.is_set()
 | 
			
		||||
 | 
			
		||||
            eps: list[ipc.IPCEndpoint] = await server.listen_on(
 | 
			
		||||
                accept_addrs=[rando_addr],
 | 
			
		||||
                stream_handler_nursery=None,
 | 
			
		||||
            )
 | 
			
		||||
            assert (
 | 
			
		||||
                len(eps) == 1
 | 
			
		||||
                and
 | 
			
		||||
                (ep := eps[0])._listener
 | 
			
		||||
                and
 | 
			
		||||
                not ep.peer_tpts
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            server._parent_tn.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
        # !TODO! actually make a bg-task connection from a client
 | 
			
		||||
        # using `ipc._chan._connect_chan()`
 | 
			
		||||
 | 
			
		||||
    with devx.maybe_open_crash_handler(
 | 
			
		||||
        pdb=debug_mode,
 | 
			
		||||
    ):
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			@ -582,8 +582,7 @@ async def open_portal(
 | 
			
		|||
            msg_loop_cs = await tn.start(
 | 
			
		||||
                partial(
 | 
			
		||||
                    _rpc.process_messages,
 | 
			
		||||
                    actor,
 | 
			
		||||
                    channel,
 | 
			
		||||
                    chan=channel,
 | 
			
		||||
                    # if the local task is cancelled we want to keep
 | 
			
		||||
                    # the msg loop running until our block ends
 | 
			
		||||
                    shield=True,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -869,7 +869,6 @@ async def try_ship_error_to_remote(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
async def process_messages(
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
    chan: Channel,
 | 
			
		||||
    shield: bool = False,
 | 
			
		||||
    task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
| 
						 | 
				
			
			@ -907,6 +906,7 @@ async def process_messages(
 | 
			
		|||
      (as utilized inside `Portal.cancel_actor()` ).
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    actor: Actor = _state.current_actor()
 | 
			
		||||
    assert actor._service_n  # runtime state sanity
 | 
			
		||||
 | 
			
		||||
    # TODO: once `trio` get's an "obvious way" for req/resp we
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1262,6 +1262,10 @@ async def async_main(
 | 
			
		|||
    the actor's "runtime" and all thus all ongoing RPC tasks.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # XXX NOTE, `_state._current_actor` **must** be set prior to
 | 
			
		||||
    # calling this core runtime entrypoint!
 | 
			
		||||
    assert actor is _state.current_actor()
 | 
			
		||||
 | 
			
		||||
    actor._task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
 | 
			
		||||
    # attempt to retreive ``trio``'s sigint handler and stash it
 | 
			
		||||
| 
						 | 
				
			
			@ -1321,7 +1325,6 @@ async def async_main(
 | 
			
		|||
                ) as service_nursery,
 | 
			
		||||
 | 
			
		||||
                _server.open_ipc_server(
 | 
			
		||||
                    actor=actor,
 | 
			
		||||
                    parent_tn=service_nursery,
 | 
			
		||||
                    stream_handler_tn=service_nursery,
 | 
			
		||||
                ) as ipc_server,
 | 
			
		||||
| 
						 | 
				
			
			@ -1375,7 +1378,6 @@ async def async_main(
 | 
			
		|||
                        'Booting IPC server'
 | 
			
		||||
                    )
 | 
			
		||||
                    eps: list = await ipc_server.listen_on(
 | 
			
		||||
                        actor=actor,
 | 
			
		||||
                        accept_addrs=accept_addrs,
 | 
			
		||||
                        stream_handler_nursery=service_nursery,
 | 
			
		||||
                    )
 | 
			
		||||
| 
						 | 
				
			
			@ -1460,8 +1462,7 @@ async def async_main(
 | 
			
		|||
                    await root_nursery.start(
 | 
			
		||||
                        partial(
 | 
			
		||||
                            _rpc.process_messages,
 | 
			
		||||
                            actor,
 | 
			
		||||
                            actor._parent_chan,
 | 
			
		||||
                            chan=actor._parent_chan,
 | 
			
		||||
                            shield=True,
 | 
			
		||||
                        )
 | 
			
		||||
                    )
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -72,11 +72,223 @@ if TYPE_CHECKING:
 | 
			
		|||
log = log.get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def maybe_wait_on_canced_subs(
 | 
			
		||||
    uid: tuple[str, str],
 | 
			
		||||
    chan: Channel,
 | 
			
		||||
    disconnected: bool,
 | 
			
		||||
 | 
			
		||||
    actor: Actor|None = None,
 | 
			
		||||
    chan_drain_timeout: float = 0.5,
 | 
			
		||||
    an_exit_timeout: float = 0.5,
 | 
			
		||||
 | 
			
		||||
) -> ActorNursery|None:
 | 
			
		||||
    '''
 | 
			
		||||
    When a process-local actor-nursery is found for the given actor
 | 
			
		||||
    `uid` (i.e. that peer is **also** a subactor of this parent), we
 | 
			
		||||
    attempt to (with timeouts) wait on,
 | 
			
		||||
 | 
			
		||||
    - all IPC msgs to drain on the (common) `Channel` such that all
 | 
			
		||||
      local `Context`-parent-tasks can also gracefully collect
 | 
			
		||||
      `ContextCancelled` msgs from their respective remote children
 | 
			
		||||
      vs. a `chan_drain_timeout`.
 | 
			
		||||
 | 
			
		||||
    - the actor-nursery to cancel-n-join all its supervised children
 | 
			
		||||
      (processes) *gracefully* vs. a `an_exit_timeout` and thus also
 | 
			
		||||
      detect cases where the IPC transport connection broke but
 | 
			
		||||
      a sub-process is detected as still alive (a case that happens
 | 
			
		||||
      when the subactor is still in an active debugger REPL session).
 | 
			
		||||
 | 
			
		||||
    If the timeout expires in either case we ofc report with warning.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    actor = actor or _state.current_actor()
 | 
			
		||||
 | 
			
		||||
    # XXX running outside actor-runtime usage,
 | 
			
		||||
    # - unit testing
 | 
			
		||||
    # - possibly manual usage (eventually) ?
 | 
			
		||||
    if not actor:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    local_nursery: (
 | 
			
		||||
        ActorNursery|None
 | 
			
		||||
    ) = actor._actoruid2nursery.get(uid)
 | 
			
		||||
 | 
			
		||||
    # This is set in `Portal.cancel_actor()`. So if
 | 
			
		||||
    # the peer was cancelled we try to wait for them
 | 
			
		||||
    # to tear down their side of the connection before
 | 
			
		||||
    # moving on with closing our own side.
 | 
			
		||||
    if (
 | 
			
		||||
        local_nursery
 | 
			
		||||
        and (
 | 
			
		||||
            actor._cancel_called
 | 
			
		||||
            or
 | 
			
		||||
            chan._cancel_called
 | 
			
		||||
        )
 | 
			
		||||
        #
 | 
			
		||||
        # ^-TODO-^ along with this is there another condition
 | 
			
		||||
        # that we should filter with to avoid entering this
 | 
			
		||||
        # waiting block needlessly?
 | 
			
		||||
        # -[ ] maybe `and local_nursery.cancelled` and/or
 | 
			
		||||
        #     only if the `._children` table is empty or has
 | 
			
		||||
        #     only `Portal`s with .chan._cancel_called ==
 | 
			
		||||
        #     True` as per what we had below; the MAIN DIFF
 | 
			
		||||
        #     BEING that just bc one `Portal.cancel_actor()`
 | 
			
		||||
        #     was called, doesn't mean the whole actor-nurse
 | 
			
		||||
        #     is gonna exit any time soon right!?
 | 
			
		||||
        #
 | 
			
		||||
        # or
 | 
			
		||||
        # all(chan._cancel_called for chan in chans)
 | 
			
		||||
 | 
			
		||||
    ):
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            'Waiting on cancel request to peer..\n'
 | 
			
		||||
            f'c)=>\n'
 | 
			
		||||
            f'  |_{chan.uid}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # XXX: this is a soft wait on the channel (and its
 | 
			
		||||
        # underlying transport protocol) to close from the
 | 
			
		||||
        # remote peer side since we presume that any channel
 | 
			
		||||
        # which is mapped to a sub-actor (i.e. it's managed
 | 
			
		||||
        # by local actor-nursery) has a message that is sent
 | 
			
		||||
        # to the peer likely by this actor (which may be in
 | 
			
		||||
        # a shutdown sequence due to cancellation) when the
 | 
			
		||||
        # local runtime here is now cancelled while
 | 
			
		||||
        # (presumably) in the middle of msg loop processing.
 | 
			
		||||
        chan_info: str = (
 | 
			
		||||
            f'{chan.uid}\n'
 | 
			
		||||
            f'|_{chan}\n'
 | 
			
		||||
            f'  |_{chan.transport}\n\n'
 | 
			
		||||
        )
 | 
			
		||||
        with trio.move_on_after(chan_drain_timeout) as drain_cs:
 | 
			
		||||
            drain_cs.shield = True
 | 
			
		||||
 | 
			
		||||
            # attempt to wait for the far end to close the
 | 
			
		||||
            # channel and bail after timeout (a 2-generals
 | 
			
		||||
            # problem on closure).
 | 
			
		||||
            assert chan.transport
 | 
			
		||||
            async for msg in chan.transport.drain():
 | 
			
		||||
 | 
			
		||||
                # try to deliver any lingering msgs
 | 
			
		||||
                # before we destroy the channel.
 | 
			
		||||
                # This accomplishes deterministic
 | 
			
		||||
                # ``Portal.cancel_actor()`` cancellation by
 | 
			
		||||
                # making sure any RPC response to that call is
 | 
			
		||||
                # delivered the local calling task.
 | 
			
		||||
                # TODO: factor this into a helper?
 | 
			
		||||
                log.warning(
 | 
			
		||||
                    'Draining msg from disconnected peer\n'
 | 
			
		||||
                    f'{chan_info}'
 | 
			
		||||
                    f'{pformat(msg)}\n'
 | 
			
		||||
                )
 | 
			
		||||
                # cid: str|None = msg.get('cid')
 | 
			
		||||
                cid: str|None = msg.cid
 | 
			
		||||
                if cid:
 | 
			
		||||
                    # deliver response to local caller/waiter
 | 
			
		||||
                    await actor._deliver_ctx_payload(
 | 
			
		||||
                        chan,
 | 
			
		||||
                        cid,
 | 
			
		||||
                        msg,
 | 
			
		||||
                    )
 | 
			
		||||
        if drain_cs.cancelled_caught:
 | 
			
		||||
            log.warning(
 | 
			
		||||
                'Timed out waiting on IPC transport channel to drain?\n'
 | 
			
		||||
                f'{chan_info}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # XXX NOTE XXX when no explicit call to
 | 
			
		||||
        # `open_root_actor()` was made by the application
 | 
			
		||||
        # (normally we implicitly make that call inside
 | 
			
		||||
        # the first `.open_nursery()` in root-actor
 | 
			
		||||
        # user/app code), we can assume that either we
 | 
			
		||||
        # are NOT the root actor or are root but the
 | 
			
		||||
        # runtime was started manually. and thus DO have
 | 
			
		||||
        # to wait for the nursery-enterer to exit before
 | 
			
		||||
        # shutting down the local runtime to avoid
 | 
			
		||||
        # clobbering any ongoing subactor
 | 
			
		||||
        # teardown/debugging/graceful-cancel.
 | 
			
		||||
        #
 | 
			
		||||
        # see matching  note inside `._supervise.open_nursery()`
 | 
			
		||||
        #
 | 
			
		||||
        # TODO: should we have a separate cs + timeout
 | 
			
		||||
        # block here?
 | 
			
		||||
        if (
 | 
			
		||||
            # XXX SO either,
 | 
			
		||||
            #  - not root OR,
 | 
			
		||||
            #  - is root but `open_root_actor()` was
 | 
			
		||||
            #    entered manually (in which case we do
 | 
			
		||||
            #    the equiv wait there using the
 | 
			
		||||
            #    `devx.debug` sub-sys APIs).
 | 
			
		||||
            not local_nursery._implicit_runtime_started
 | 
			
		||||
        ):
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'Waiting on local actor nursery to exit..\n'
 | 
			
		||||
                f'|_{local_nursery}\n'
 | 
			
		||||
            )
 | 
			
		||||
            with trio.move_on_after(an_exit_timeout) as an_exit_cs:
 | 
			
		||||
                an_exit_cs.shield = True
 | 
			
		||||
                await local_nursery.exited.wait()
 | 
			
		||||
 | 
			
		||||
            # TODO: currently this is always triggering for every
 | 
			
		||||
            # sub-daemon spawned from the `piker.services._mngr`?
 | 
			
		||||
            # -[ ] how do we ensure that the IPC is supposed to
 | 
			
		||||
            #      be long lived and isn't just a register?
 | 
			
		||||
            # |_ in the register case how can we signal that the
 | 
			
		||||
            #    ephemeral msg loop was intentional?
 | 
			
		||||
            if (
 | 
			
		||||
                # not local_nursery._implicit_runtime_started
 | 
			
		||||
                # and
 | 
			
		||||
                an_exit_cs.cancelled_caught
 | 
			
		||||
            ):
 | 
			
		||||
                report: str = (
 | 
			
		||||
                    'Timed out waiting on local actor-nursery to exit?\n'
 | 
			
		||||
                    f'c)>\n'
 | 
			
		||||
                    f' |_{local_nursery}\n'
 | 
			
		||||
                )
 | 
			
		||||
                if children := local_nursery._children:
 | 
			
		||||
                    # indent from above local-nurse repr
 | 
			
		||||
                    report += (
 | 
			
		||||
                        f'   |_{pformat(children)}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                log.warning(report)
 | 
			
		||||
 | 
			
		||||
        if disconnected:
 | 
			
		||||
            # if the transport died and this actor is still
 | 
			
		||||
            # registered within a local nursery, we report
 | 
			
		||||
            # that the IPC layer may have failed
 | 
			
		||||
            # unexpectedly since it may be the cause of
 | 
			
		||||
            # other downstream errors.
 | 
			
		||||
            entry: tuple|None = local_nursery._children.get(uid)
 | 
			
		||||
            if entry:
 | 
			
		||||
                proc: trio.Process
 | 
			
		||||
                _, proc, _ = entry
 | 
			
		||||
 | 
			
		||||
                if (
 | 
			
		||||
                    (poll := getattr(proc, 'poll', None))
 | 
			
		||||
                    and
 | 
			
		||||
                    poll() is None  # proc still alive
 | 
			
		||||
                ):
 | 
			
		||||
                    # TODO: change log level based on
 | 
			
		||||
                    # detecting whether chan was created for
 | 
			
		||||
                    # ephemeral `.register_actor()` request!
 | 
			
		||||
                    # -[ ] also, that should be avoidable by
 | 
			
		||||
                    #   re-using any existing chan from the
 | 
			
		||||
                    #   `._discovery.get_registry()` call as
 | 
			
		||||
                    #   well..
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        f'Peer IPC broke but subproc is alive?\n\n'
 | 
			
		||||
 | 
			
		||||
                        f'<=x {chan.uid}@{chan.raddr}\n'
 | 
			
		||||
                        f'   |_{proc}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
    return local_nursery
 | 
			
		||||
 | 
			
		||||
# TODO multi-tpt support with per-proto peer tracking?
 | 
			
		||||
#
 | 
			
		||||
# -[x] maybe change to mod-func and rename for implied
 | 
			
		||||
#    multi-transport semantics?
 | 
			
		||||
#
 | 
			
		||||
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
 | 
			
		||||
#     so that we can query per tpt all peer contact infos?
 | 
			
		||||
#  |_[ ] possibly provide a global viewing via a
 | 
			
		||||
| 
						 | 
				
			
			@ -87,7 +299,6 @@ async def handle_stream_from_peer(
 | 
			
		|||
 | 
			
		||||
    *,
 | 
			
		||||
    server: IPCServer,
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
| 
						 | 
				
			
			@ -119,9 +330,10 @@ async def handle_stream_from_peer(
 | 
			
		|||
 | 
			
		||||
    # initial handshake with peer phase
 | 
			
		||||
    try:
 | 
			
		||||
        peer_aid: msgtypes.Aid = await chan._do_handshake(
 | 
			
		||||
            aid=actor.aid,
 | 
			
		||||
        )
 | 
			
		||||
        if actor := _state.current_actor():
 | 
			
		||||
            peer_aid: msgtypes.Aid = await chan._do_handshake(
 | 
			
		||||
                aid=actor.aid,
 | 
			
		||||
            )
 | 
			
		||||
    except (
 | 
			
		||||
        TransportClosed,
 | 
			
		||||
        # ^XXX NOTE, the above wraps `trio` exc types raised
 | 
			
		||||
| 
						 | 
				
			
			@ -222,8 +434,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
            disconnected,
 | 
			
		||||
            last_msg,
 | 
			
		||||
        ) = await _rpc.process_messages(
 | 
			
		||||
            actor,
 | 
			
		||||
            chan,
 | 
			
		||||
            chan=chan,
 | 
			
		||||
        )
 | 
			
		||||
    except trio.Cancelled:
 | 
			
		||||
        log.cancel(
 | 
			
		||||
| 
						 | 
				
			
			@ -234,179 +445,16 @@ async def handle_stream_from_peer(
 | 
			
		|||
        raise
 | 
			
		||||
 | 
			
		||||
    finally:
 | 
			
		||||
        local_nursery: (
 | 
			
		||||
            ActorNursery|None
 | 
			
		||||
        ) = actor._actoruid2nursery.get(uid)
 | 
			
		||||
 | 
			
		||||
        # This is set in ``Portal.cancel_actor()``. So if
 | 
			
		||||
        # the peer was cancelled we try to wait for them
 | 
			
		||||
        # to tear down their side of the connection before
 | 
			
		||||
        # moving on with closing our own side.
 | 
			
		||||
        if (
 | 
			
		||||
            local_nursery
 | 
			
		||||
            and (
 | 
			
		||||
                actor._cancel_called
 | 
			
		||||
                or
 | 
			
		||||
                chan._cancel_called
 | 
			
		||||
            )
 | 
			
		||||
            #
 | 
			
		||||
            # ^-TODO-^ along with this is there another condition
 | 
			
		||||
            # that we should filter with to avoid entering this
 | 
			
		||||
            # waiting block needlessly?
 | 
			
		||||
            # -[ ] maybe `and local_nursery.cancelled` and/or
 | 
			
		||||
            #     only if the `._children` table is empty or has
 | 
			
		||||
            #     only `Portal`s with .chan._cancel_called ==
 | 
			
		||||
            #     True` as per what we had below; the MAIN DIFF
 | 
			
		||||
            #     BEING that just bc one `Portal.cancel_actor()`
 | 
			
		||||
            #     was called, doesn't mean the whole actor-nurse
 | 
			
		||||
            #     is gonna exit any time soon right!?
 | 
			
		||||
            #
 | 
			
		||||
            # or
 | 
			
		||||
            # all(chan._cancel_called for chan in chans)
 | 
			
		||||
 | 
			
		||||
        ):
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                'Waiting on cancel request to peer..\n'
 | 
			
		||||
                f'c)=>\n'
 | 
			
		||||
                f'  |_{chan.uid}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # XXX: this is a soft wait on the channel (and its
 | 
			
		||||
            # underlying transport protocol) to close from the
 | 
			
		||||
            # remote peer side since we presume that any channel
 | 
			
		||||
            # which is mapped to a sub-actor (i.e. it's managed
 | 
			
		||||
            # by local actor-nursery) has a message that is sent
 | 
			
		||||
            # to the peer likely by this actor (which may be in
 | 
			
		||||
            # a shutdown sequence due to cancellation) when the
 | 
			
		||||
            # local runtime here is now cancelled while
 | 
			
		||||
            # (presumably) in the middle of msg loop processing.
 | 
			
		||||
            chan_info: str = (
 | 
			
		||||
                f'{chan.uid}\n'
 | 
			
		||||
                f'|_{chan}\n'
 | 
			
		||||
                f'  |_{chan.transport}\n\n'
 | 
			
		||||
            )
 | 
			
		||||
            with trio.move_on_after(0.5) as drain_cs:
 | 
			
		||||
                drain_cs.shield = True
 | 
			
		||||
 | 
			
		||||
                # attempt to wait for the far end to close the
 | 
			
		||||
                # channel and bail after timeout (a 2-generals
 | 
			
		||||
                # problem on closure).
 | 
			
		||||
                assert chan.transport
 | 
			
		||||
                async for msg in chan.transport.drain():
 | 
			
		||||
 | 
			
		||||
                    # try to deliver any lingering msgs
 | 
			
		||||
                    # before we destroy the channel.
 | 
			
		||||
                    # This accomplishes deterministic
 | 
			
		||||
                    # ``Portal.cancel_actor()`` cancellation by
 | 
			
		||||
                    # making sure any RPC response to that call is
 | 
			
		||||
                    # delivered the local calling task.
 | 
			
		||||
                    # TODO: factor this into a helper?
 | 
			
		||||
                    log.warning(
 | 
			
		||||
                        'Draining msg from disconnected peer\n'
 | 
			
		||||
                        f'{chan_info}'
 | 
			
		||||
                        f'{pformat(msg)}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    # cid: str|None = msg.get('cid')
 | 
			
		||||
                    cid: str|None = msg.cid
 | 
			
		||||
                    if cid:
 | 
			
		||||
                        # deliver response to local caller/waiter
 | 
			
		||||
                        await actor._deliver_ctx_payload(
 | 
			
		||||
                            chan,
 | 
			
		||||
                            cid,
 | 
			
		||||
                            msg,
 | 
			
		||||
                        )
 | 
			
		||||
            if drain_cs.cancelled_caught:
 | 
			
		||||
                log.warning(
 | 
			
		||||
                    'Timed out waiting on IPC transport channel to drain?\n'
 | 
			
		||||
                    f'{chan_info}'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # XXX NOTE XXX when no explicit call to
 | 
			
		||||
            # `open_root_actor()` was made by the application
 | 
			
		||||
            # (normally we implicitly make that call inside
 | 
			
		||||
            # the first `.open_nursery()` in root-actor
 | 
			
		||||
            # user/app code), we can assume that either we
 | 
			
		||||
            # are NOT the root actor or are root but the
 | 
			
		||||
            # runtime was started manually. and thus DO have
 | 
			
		||||
            # to wait for the nursery-enterer to exit before
 | 
			
		||||
            # shutting down the local runtime to avoid
 | 
			
		||||
            # clobbering any ongoing subactor
 | 
			
		||||
            # teardown/debugging/graceful-cancel.
 | 
			
		||||
            #
 | 
			
		||||
            # see matching  note inside `._supervise.open_nursery()`
 | 
			
		||||
            #
 | 
			
		||||
            # TODO: should we have a separate cs + timeout
 | 
			
		||||
            # block here?
 | 
			
		||||
            if (
 | 
			
		||||
                # XXX SO either,
 | 
			
		||||
                #  - not root OR,
 | 
			
		||||
                #  - is root but `open_root_actor()` was
 | 
			
		||||
                #    entered manually (in which case we do
 | 
			
		||||
                #    the equiv wait there using the
 | 
			
		||||
                #    `devx._debug` sub-sys APIs).
 | 
			
		||||
                not local_nursery._implicit_runtime_started
 | 
			
		||||
            ):
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    'Waiting on local actor nursery to exit..\n'
 | 
			
		||||
                    f'|_{local_nursery}\n'
 | 
			
		||||
                )
 | 
			
		||||
                with trio.move_on_after(0.5) as an_exit_cs:
 | 
			
		||||
                    an_exit_cs.shield = True
 | 
			
		||||
                    await local_nursery.exited.wait()
 | 
			
		||||
 | 
			
		||||
                # TODO: currently this is always triggering for every
 | 
			
		||||
                # sub-daemon spawned from the `piker.services._mngr`?
 | 
			
		||||
                # -[ ] how do we ensure that the IPC is supposed to
 | 
			
		||||
                #      be long lived and isn't just a register?
 | 
			
		||||
                # |_ in the register case how can we signal that the
 | 
			
		||||
                #    ephemeral msg loop was intentional?
 | 
			
		||||
                if (
 | 
			
		||||
                    # not local_nursery._implicit_runtime_started
 | 
			
		||||
                    # and
 | 
			
		||||
                    an_exit_cs.cancelled_caught
 | 
			
		||||
                ):
 | 
			
		||||
                    report: str = (
 | 
			
		||||
                        'Timed out waiting on local actor-nursery to exit?\n'
 | 
			
		||||
                        f'c)>\n'
 | 
			
		||||
                        f' |_{local_nursery}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    if children := local_nursery._children:
 | 
			
		||||
                        # indent from above local-nurse repr
 | 
			
		||||
                        report += (
 | 
			
		||||
                            f'   |_{pformat(children)}\n'
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
                    log.warning(report)
 | 
			
		||||
 | 
			
		||||
            if disconnected:
 | 
			
		||||
                # if the transport died and this actor is still
 | 
			
		||||
                # registered within a local nursery, we report
 | 
			
		||||
                # that the IPC layer may have failed
 | 
			
		||||
                # unexpectedly since it may be the cause of
 | 
			
		||||
                # other downstream errors.
 | 
			
		||||
                entry: tuple|None = local_nursery._children.get(uid)
 | 
			
		||||
                if entry:
 | 
			
		||||
                    proc: trio.Process
 | 
			
		||||
                    _, proc, _ = entry
 | 
			
		||||
 | 
			
		||||
                    if (
 | 
			
		||||
                        (poll := getattr(proc, 'poll', None))
 | 
			
		||||
                        and
 | 
			
		||||
                        poll() is None  # proc still alive
 | 
			
		||||
                    ):
 | 
			
		||||
                        # TODO: change log level based on
 | 
			
		||||
                        # detecting whether chan was created for
 | 
			
		||||
                        # ephemeral `.register_actor()` request!
 | 
			
		||||
                        # -[ ] also, that should be avoidable by
 | 
			
		||||
                        #   re-using any existing chan from the
 | 
			
		||||
                        #   `._discovery.get_registry()` call as
 | 
			
		||||
                        #   well..
 | 
			
		||||
                        log.runtime(
 | 
			
		||||
                            f'Peer IPC broke but subproc is alive?\n\n'
 | 
			
		||||
 | 
			
		||||
                            f'<=x {chan.uid}@{chan.raddr}\n'
 | 
			
		||||
                            f'   |_{proc}\n'
 | 
			
		||||
                        )
 | 
			
		||||
        # check if there are subs which we should gracefully join at
 | 
			
		||||
        # both the inter-actor-task and subprocess levels to
 | 
			
		||||
        # gracefully remote cancel and later disconnect (particularly
 | 
			
		||||
        # for permitting subs engaged in active debug-REPL sessions).
 | 
			
		||||
        local_nursery: ActorNursery|None = await maybe_wait_on_canced_subs(
 | 
			
		||||
            uid=uid,
 | 
			
		||||
            chan=chan,
 | 
			
		||||
            disconnected=disconnected,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # ``Channel`` teardown and closure sequence
 | 
			
		||||
        # drop ref to channel so it can be gc-ed and disconnected
 | 
			
		||||
| 
						 | 
				
			
			@ -467,11 +515,11 @@ async def handle_stream_from_peer(
 | 
			
		|||
                # from broken debug TTY locking due to
 | 
			
		||||
                # msg-spec races on application using RunVar...
 | 
			
		||||
                if (
 | 
			
		||||
                    local_nursery
 | 
			
		||||
                    and
 | 
			
		||||
                    (ctx_in_debug := pdb_lock.ctx_in_debug)
 | 
			
		||||
                    and
 | 
			
		||||
                    (pdb_user_uid := ctx_in_debug.chan.uid)
 | 
			
		||||
                    and
 | 
			
		||||
                    local_nursery
 | 
			
		||||
                ):
 | 
			
		||||
                    entry: tuple|None = local_nursery._children.get(
 | 
			
		||||
                        tuple(pdb_user_uid)
 | 
			
		||||
| 
						 | 
				
			
			@ -804,7 +852,6 @@ class IPCServer(Struct):
 | 
			
		|||
    async def listen_on(
 | 
			
		||||
        self,
 | 
			
		||||
        *,
 | 
			
		||||
        actor: Actor,
 | 
			
		||||
        accept_addrs: list[tuple[str, int|str]]|None = None,
 | 
			
		||||
        stream_handler_nursery: Nursery|None = None,
 | 
			
		||||
    ) -> list[IPCEndpoint]:
 | 
			
		||||
| 
						 | 
				
			
			@ -837,20 +884,19 @@ class IPCServer(Struct):
 | 
			
		|||
                f'{self}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        log.info(
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Binding to endpoints for,\n'
 | 
			
		||||
            f'{accept_addrs}\n'
 | 
			
		||||
        )
 | 
			
		||||
        eps: list[IPCEndpoint] = await self._parent_tn.start(
 | 
			
		||||
            partial(
 | 
			
		||||
                _serve_ipc_eps,
 | 
			
		||||
                actor=actor,
 | 
			
		||||
                server=self,
 | 
			
		||||
                stream_handler_tn=stream_handler_nursery,
 | 
			
		||||
                listen_addrs=accept_addrs,
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        log.info(
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Started IPC endpoints\n'
 | 
			
		||||
            f'{eps}\n'
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -873,7 +919,6 @@ class IPCServer(Struct):
 | 
			
		|||
 | 
			
		||||
async def _serve_ipc_eps(
 | 
			
		||||
    *,
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
    server: IPCServer,
 | 
			
		||||
    stream_handler_tn: Nursery,
 | 
			
		||||
    listen_addrs: list[tuple[str, int|str]],
 | 
			
		||||
| 
						 | 
				
			
			@ -907,12 +952,13 @@ async def _serve_ipc_eps(
 | 
			
		|||
                    stream_handler_tn=stream_handler_tn,
 | 
			
		||||
                )
 | 
			
		||||
                try:
 | 
			
		||||
                    log.info(
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        f'Starting new endpoint listener\n'
 | 
			
		||||
                        f'{ep}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    listener: trio.abc.Listener = await ep.start_listener()
 | 
			
		||||
                    assert listener is ep._listener
 | 
			
		||||
                    # actor = _state.current_actor()
 | 
			
		||||
                    # if actor.is_registry:
 | 
			
		||||
                    #     import pdbp; pdbp.set_trace()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -937,7 +983,6 @@ async def _serve_ipc_eps(
 | 
			
		|||
                    handler=partial(
 | 
			
		||||
                        handle_stream_from_peer,
 | 
			
		||||
                        server=server,
 | 
			
		||||
                        actor=actor,
 | 
			
		||||
                    ),
 | 
			
		||||
                    listeners=listeners,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -948,13 +993,13 @@ async def _serve_ipc_eps(
 | 
			
		|||
                )
 | 
			
		||||
            )
 | 
			
		||||
            # TODO, wow make this message better! XD
 | 
			
		||||
            log.info(
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'Started server(s)\n'
 | 
			
		||||
                +
 | 
			
		||||
                '\n'.join([f'|_{addr}' for addr in listen_addrs])
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            log.info(
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                f'Started IPC endpoints\n'
 | 
			
		||||
                f'{eps}\n'
 | 
			
		||||
            )
 | 
			
		||||
| 
						 | 
				
			
			@ -970,6 +1015,7 @@ async def _serve_ipc_eps(
 | 
			
		|||
                ep.close_listener()
 | 
			
		||||
                server._endpoints.remove(ep)
 | 
			
		||||
 | 
			
		||||
        # actor = _state.current_actor()
 | 
			
		||||
        # if actor.is_arbiter:
 | 
			
		||||
        #     import pdbp; pdbp.set_trace()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -980,7 +1026,6 @@ async def _serve_ipc_eps(
 | 
			
		|||
 | 
			
		||||
@acm
 | 
			
		||||
async def open_ipc_server(
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
    parent_tn: Nursery|None = None,
 | 
			
		||||
    stream_handler_tn: Nursery|None = None,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -127,6 +127,11 @@ async def start_listener(
 | 
			
		|||
    Start a TCP socket listener on the given `TCPAddress`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    log.info(
 | 
			
		||||
        f'Attempting to bind TCP socket\n'
 | 
			
		||||
        f'>[\n'
 | 
			
		||||
        f'|_{addr}\n'
 | 
			
		||||
    )
 | 
			
		||||
    # ?TODO, maybe we should just change the lower-level call this is
 | 
			
		||||
    # using internall per-listener?
 | 
			
		||||
    listeners: list[SocketListener] = await open_tcp_listeners(
 | 
			
		||||
| 
						 | 
				
			
			@ -140,6 +145,12 @@ async def start_listener(
 | 
			
		|||
    assert len(listeners) == 1
 | 
			
		||||
    listener = listeners[0]
 | 
			
		||||
    host, port = listener.socket.getsockname()[:2]
 | 
			
		||||
 | 
			
		||||
    log.info(
 | 
			
		||||
        f'Listening on TCP socket\n'
 | 
			
		||||
        f'[>\n'
 | 
			
		||||
        f' |_{addr}\n'
 | 
			
		||||
    )
 | 
			
		||||
    return listener
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue