Compare commits
	
		
			9 Commits 
		
	
	
		
			29db08b370
			...
			4a9e8731f1
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						4a9e8731f1 | |
| 
							
							
								 | 
						b6ffaea62f | |
| 
							
							
								 | 
						63bf967598 | |
| 
							
							
								 | 
						011d033a12 | |
| 
							
							
								 | 
						76fb80fda6 | |
| 
							
							
								 | 
						d50f7ba9ca | |
| 
							
							
								 | 
						65ae2dc67c | |
| 
							
							
								 | 
						4be499fb1f | |
| 
							
							
								 | 
						7317bb269c | 
| 
						 | 
				
			
			@ -116,9 +116,11 @@ def test_shield_pause(
 | 
			
		|||
        child.pid,
 | 
			
		||||
        signal.SIGINT,
 | 
			
		||||
    )
 | 
			
		||||
    from tractor._supervise import _shutdown_msg
 | 
			
		||||
    expect(
 | 
			
		||||
        child,
 | 
			
		||||
        'Shutting down actor runtime',
 | 
			
		||||
        # 'Shutting down actor runtime',
 | 
			
		||||
        _shutdown_msg,
 | 
			
		||||
        timeout=6,
 | 
			
		||||
    )
 | 
			
		||||
    assert_before(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,95 @@
 | 
			
		|||
'''
 | 
			
		||||
Verify the `enable_transports` param drives various
 | 
			
		||||
per-root/sub-actor IPC endpoint/server settings.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
import trio
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor import (
 | 
			
		||||
    Actor,
 | 
			
		||||
    Portal,
 | 
			
		||||
    ipc,
 | 
			
		||||
    msg,
 | 
			
		||||
    _state,
 | 
			
		||||
    _addr,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def chk_tpts(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
    tpt_proto_key: str,
 | 
			
		||||
):
 | 
			
		||||
    rtvars = _state._runtime_vars
 | 
			
		||||
    assert (
 | 
			
		||||
        tpt_proto_key
 | 
			
		||||
        in
 | 
			
		||||
        rtvars['_enable_tpts']
 | 
			
		||||
    )
 | 
			
		||||
    actor: Actor = tractor.current_actor()
 | 
			
		||||
    spec: msg.types.SpawnSpec = actor._spawn_spec
 | 
			
		||||
    assert spec._runtime_vars == rtvars
 | 
			
		||||
 | 
			
		||||
    # ensure individual IPC ep-addr types
 | 
			
		||||
    serv: ipc._server.Server = actor.ipc_server
 | 
			
		||||
    addr: ipc._types.Address
 | 
			
		||||
    for addr in serv.addrs:
 | 
			
		||||
        assert addr.proto_key == tpt_proto_key
 | 
			
		||||
 | 
			
		||||
    # Actor delegate-props enforcement
 | 
			
		||||
    assert (
 | 
			
		||||
        actor.accept_addrs
 | 
			
		||||
        ==
 | 
			
		||||
        serv.accept_addrs
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    await ctx.started(serv.accept_addrs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO, parametrize over mis-matched-proto-typed `registry_addrs`
 | 
			
		||||
# since i seems to work in `piker` but not exactly sure if both tcp
 | 
			
		||||
# & uds are being deployed then?
 | 
			
		||||
#
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'tpt_proto_key',
 | 
			
		||||
    ['tcp', 'uds'],
 | 
			
		||||
    ids=lambda item: f'ipc_tpt={item!r}'
 | 
			
		||||
)
 | 
			
		||||
def test_root_passes_tpt_to_sub(
 | 
			
		||||
    tpt_proto_key: str,
 | 
			
		||||
    reg_addr: tuple,
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
):
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery(
 | 
			
		||||
            enable_transports=[tpt_proto_key],
 | 
			
		||||
            registry_addrs=[reg_addr],
 | 
			
		||||
            debug_mode=debug_mode,
 | 
			
		||||
        ) as an:
 | 
			
		||||
 | 
			
		||||
            assert (
 | 
			
		||||
                tpt_proto_key
 | 
			
		||||
                in
 | 
			
		||||
                _state._runtime_vars['_enable_tpts']
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            ptl: Portal = await an.start_actor(
 | 
			
		||||
                name='sub',
 | 
			
		||||
                enable_modules=[__name__],
 | 
			
		||||
            )
 | 
			
		||||
            async with ptl.open_context(
 | 
			
		||||
                chk_tpts,
 | 
			
		||||
                tpt_proto_key=tpt_proto_key,
 | 
			
		||||
            ) as (ctx, accept_addrs):
 | 
			
		||||
 | 
			
		||||
                uw_addr: tuple
 | 
			
		||||
                for uw_addr in accept_addrs:
 | 
			
		||||
                    addr = _addr.wrap_address(uw_addr)
 | 
			
		||||
                    assert addr.is_valid
 | 
			
		||||
 | 
			
		||||
            # shudown sub-actor(s)
 | 
			
		||||
            await an.cancel()
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			@ -118,6 +118,10 @@ class Portal:
 | 
			
		|||
 | 
			
		||||
    @property
 | 
			
		||||
    def chan(self) -> Channel:
 | 
			
		||||
        '''
 | 
			
		||||
        Ref to this ctx's underlying `tractor.ipc.Channel`.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._chan
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
| 
						 | 
				
			
			@ -177,10 +181,17 @@ class Portal:
 | 
			
		|||
 | 
			
		||||
        # not expecting a "main" result
 | 
			
		||||
        if self._expect_result_ctx is None:
 | 
			
		||||
            peer_id: str = f'{self.channel.aid.reprol()!r}'
 | 
			
		||||
            log.warning(
 | 
			
		||||
                f"Portal for {self.channel.aid} not expecting a final"
 | 
			
		||||
                " result?\nresult() should only be called if subactor"
 | 
			
		||||
                " was spawned with `ActorNursery.run_in_actor()`")
 | 
			
		||||
                f'Portal to peer {peer_id} will not deliver a final result?\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'Context.result() can only be called by the parent of '
 | 
			
		||||
                f'a sub-actor when it was spawned with '
 | 
			
		||||
                f'`ActorNursery.run_in_actor()`'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'Further this `ActorNursery`-method-API will deprecated in the'
 | 
			
		||||
                f'near fututre!\n'
 | 
			
		||||
            )
 | 
			
		||||
            return NoResult
 | 
			
		||||
 | 
			
		||||
        # expecting a "main" result
 | 
			
		||||
| 
						 | 
				
			
			@ -213,6 +224,7 @@ class Portal:
 | 
			
		|||
        typname: str = type(self).__name__
 | 
			
		||||
        log.warning(
 | 
			
		||||
            f'`{typname}.result()` is DEPRECATED!\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'Use `{typname}.wait_for_result()` instead!\n'
 | 
			
		||||
        )
 | 
			
		||||
        return await self.wait_for_result(
 | 
			
		||||
| 
						 | 
				
			
			@ -224,8 +236,10 @@ class Portal:
 | 
			
		|||
        # terminate all locally running async generator
 | 
			
		||||
        # IPC calls
 | 
			
		||||
        if self._streams:
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                f"Cancelling all streams with {self.channel.aid}")
 | 
			
		||||
            peer_id: str = f'{self.channel.aid.reprol()!r}'
 | 
			
		||||
            report: str = (
 | 
			
		||||
                f'Cancelling all msg-streams with {peer_id}\n'
 | 
			
		||||
            )
 | 
			
		||||
            for stream in self._streams.copy():
 | 
			
		||||
                try:
 | 
			
		||||
                    await stream.aclose()
 | 
			
		||||
| 
						 | 
				
			
			@ -234,10 +248,18 @@ class Portal:
 | 
			
		|||
                    # (unless of course at some point down the road we
 | 
			
		||||
                    # won't expect this to always be the case or need to
 | 
			
		||||
                    # detect it for respawning purposes?)
 | 
			
		||||
                    log.debug(f"{stream} was already closed.")
 | 
			
		||||
                    report += (
 | 
			
		||||
                        f'->) {stream!r} already closed\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
            log.cancel(report)
 | 
			
		||||
 | 
			
		||||
    async def aclose(self):
 | 
			
		||||
        log.debug(f"Closing {self}")
 | 
			
		||||
        log.debug(
 | 
			
		||||
            f'Closing portal\n'
 | 
			
		||||
            f'>}}\n'
 | 
			
		||||
            f'|_{self}\n'
 | 
			
		||||
        )
 | 
			
		||||
        # TODO: once we move to implementing our own `ReceiveChannel`
 | 
			
		||||
        # (including remote task cancellation inside its `.aclose()`)
 | 
			
		||||
        # we'll need to .aclose all those channels here
 | 
			
		||||
| 
						 | 
				
			
			@ -263,19 +285,18 @@ class Portal:
 | 
			
		|||
        __runtimeframe__: int = 1  # noqa
 | 
			
		||||
 | 
			
		||||
        chan: Channel = self.channel
 | 
			
		||||
        peer_id: str = f'{self.channel.aid.reprol()!r}'
 | 
			
		||||
        if not chan.connected():
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'This channel is already closed, skipping cancel request..'
 | 
			
		||||
                'Peer {peer_id} is already disconnected\n'
 | 
			
		||||
                '-> skipping cancel request..\n'
 | 
			
		||||
            )
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
        reminfo: str = (
 | 
			
		||||
            f'c)=> {self.channel.aid}\n'
 | 
			
		||||
            f'  |_{chan}\n'
 | 
			
		||||
        )
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            f'Requesting actor-runtime cancel for peer\n\n'
 | 
			
		||||
            f'{reminfo}'
 | 
			
		||||
            f'Sending actor-runtime-cancel-req to peer\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'c)=> {peer_id}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # XXX the one spot we set it?
 | 
			
		||||
| 
						 | 
				
			
			@ -300,8 +321,9 @@ class Portal:
 | 
			
		|||
                # may timeout and we never get an ack (obvi racy)
 | 
			
		||||
                # but that doesn't mean it wasn't cancelled.
 | 
			
		||||
                log.debug(
 | 
			
		||||
                    'May have failed to cancel peer?\n'
 | 
			
		||||
                    f'{reminfo}'
 | 
			
		||||
                    f'May have failed to cancel peer?\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'c)=?> {peer_id}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # if we get here some weird cancellation case happened
 | 
			
		||||
| 
						 | 
				
			
			@ -319,22 +341,22 @@ class Portal:
 | 
			
		|||
 | 
			
		||||
            TransportClosed,
 | 
			
		||||
        ) as tpt_err:
 | 
			
		||||
            report: str = (
 | 
			
		||||
                f'IPC chan for actor already closed or broken?\n\n'
 | 
			
		||||
                f'{self.channel.aid}\n'
 | 
			
		||||
                f' |_{self.channel}\n'
 | 
			
		||||
            ipc_borked_report: str = (
 | 
			
		||||
                f'IPC for actor already closed/broken?\n\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'c)=x> {peer_id}\n'
 | 
			
		||||
            )
 | 
			
		||||
            match tpt_err:
 | 
			
		||||
                case TransportClosed():
 | 
			
		||||
                    log.debug(report)
 | 
			
		||||
                    log.debug(ipc_borked_report)
 | 
			
		||||
                case _:
 | 
			
		||||
                    report += (
 | 
			
		||||
                    ipc_borked_report += (
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'Unhandled low-level transport-closed/error during\n'
 | 
			
		||||
                        f'Portal.cancel_actor()` request?\n'
 | 
			
		||||
                        f'<{type(tpt_err).__name__}( {tpt_err} )>\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    log.warning(report)
 | 
			
		||||
                    log.warning(ipc_borked_report)
 | 
			
		||||
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -491,10 +513,13 @@ class Portal:
 | 
			
		|||
                with trio.CancelScope(shield=True):
 | 
			
		||||
                    await ctx.cancel()
 | 
			
		||||
 | 
			
		||||
            except trio.ClosedResourceError:
 | 
			
		||||
            except trio.ClosedResourceError as cre:
 | 
			
		||||
                # if the far end terminates before we send a cancel the
 | 
			
		||||
                # underlying transport-channel may already be closed.
 | 
			
		||||
                log.cancel(f'Context {ctx} was already closed?')
 | 
			
		||||
                log.cancel(
 | 
			
		||||
                    f'Context.cancel() -> {cre!r}\n'
 | 
			
		||||
                    f'cid: {ctx.cid!r} already closed?\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # XXX: should this always be done?
 | 
			
		||||
            # await recv_chan.aclose()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -97,7 +97,7 @@ async def maybe_block_bp(
 | 
			
		|||
    ):
 | 
			
		||||
        logger.info(
 | 
			
		||||
            f'Found `greenback` installed @ {maybe_mod}\n'
 | 
			
		||||
            'Enabling `tractor.pause_from_sync()` support!\n'
 | 
			
		||||
            f'Enabling `tractor.pause_from_sync()` support!\n'
 | 
			
		||||
        )
 | 
			
		||||
        os.environ['PYTHONBREAKPOINT'] = (
 | 
			
		||||
            'tractor.devx.debug._sync_pause_from_builtin'
 | 
			
		||||
| 
						 | 
				
			
			@ -471,7 +471,7 @@ async def open_root_actor(
 | 
			
		|||
                    '-> Opening new registry @ '
 | 
			
		||||
                    +
 | 
			
		||||
                    '\n'.join(
 | 
			
		||||
                        f'@{addr}' for addr in reg_addrs
 | 
			
		||||
                        f'{addr}' for addr in reg_addrs
 | 
			
		||||
                    )
 | 
			
		||||
                )
 | 
			
		||||
            logger.info(f'{report}\n')
 | 
			
		||||
| 
						 | 
				
			
			@ -543,7 +543,7 @@ async def open_root_actor(
 | 
			
		|||
                    raise
 | 
			
		||||
 | 
			
		||||
                finally:
 | 
			
		||||
                    # NOTE: not sure if we'll ever need this but it's
 | 
			
		||||
                    # NOTE/TODO?, not sure if we'll ever need this but it's
 | 
			
		||||
                    # possibly better for even more determinism?
 | 
			
		||||
                    # logger.cancel(
 | 
			
		||||
                    #     f'Waiting on {len(nurseries)} nurseries in root..')
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -552,6 +552,14 @@ class Actor:
 | 
			
		|||
            )
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    # ?TODO, factor this meth-iface into a new `.rpc` subsys primitive?
 | 
			
		||||
    # - _get_rpc_func(),
 | 
			
		||||
    # - _deliver_ctx_payload(),
 | 
			
		||||
    # - get_context(),
 | 
			
		||||
    # - start_remote_task(),
 | 
			
		||||
    # - cancel_rpc_tasks(),
 | 
			
		||||
    # - _cancel_task(),
 | 
			
		||||
    #
 | 
			
		||||
    def _get_rpc_func(self, ns, funcname):
 | 
			
		||||
        '''
 | 
			
		||||
        Try to lookup and return a target RPC func from the
 | 
			
		||||
| 
						 | 
				
			
			@ -1119,14 +1127,6 @@ class Actor:
 | 
			
		|||
        self._cancel_complete.set()
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    # XXX: hard kill logic if needed?
 | 
			
		||||
    # def _hard_mofo_kill(self):
 | 
			
		||||
    #     # If we're the root actor or zombied kill everything
 | 
			
		||||
    #     if self._parent_chan is None:  # TODO: more robust check
 | 
			
		||||
    #         root = trio.lowlevel.current_root_task()
 | 
			
		||||
    #         for n in root.child_nurseries:
 | 
			
		||||
    #             n.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
    async def _cancel_task(
 | 
			
		||||
        self,
 | 
			
		||||
        cid: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -1361,25 +1361,13 @@ class Actor:
 | 
			
		|||
        '''
 | 
			
		||||
        return self.accept_addrs[0]
 | 
			
		||||
 | 
			
		||||
    def get_parent(self) -> Portal:
 | 
			
		||||
        '''
 | 
			
		||||
        Return a `Portal` to our parent.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        assert self._parent_chan, "No parent channel for this actor?"
 | 
			
		||||
        return Portal(self._parent_chan)
 | 
			
		||||
 | 
			
		||||
    def get_chans(
 | 
			
		||||
        self,
 | 
			
		||||
        uid: tuple[str, str],
 | 
			
		||||
 | 
			
		||||
    ) -> list[Channel]:
 | 
			
		||||
        '''
 | 
			
		||||
        Return all IPC channels to the actor with provided `uid`.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._ipc_server._peers[uid]
 | 
			
		||||
 | 
			
		||||
    # TODO, this should delegate ONLY to the
 | 
			
		||||
    # `._spawn_spec._runtime_vars: dict` / `._state` APIs?
 | 
			
		||||
    #
 | 
			
		||||
    # XXX, AH RIGHT that's why..
 | 
			
		||||
    #   it's bc we pass this as a CLI flag to the child.py precisely
 | 
			
		||||
    #   bc we need the bootstrapping pre `async_main()`.. but maybe
 | 
			
		||||
    #   keep this as an impl deat and not part of the pub iface impl?
 | 
			
		||||
    def is_infected_aio(self) -> bool:
 | 
			
		||||
        '''
 | 
			
		||||
        If `True`, this actor is running `trio` in guest mode on
 | 
			
		||||
| 
						 | 
				
			
			@ -1390,6 +1378,23 @@ class Actor:
 | 
			
		|||
        '''
 | 
			
		||||
        return self._infected_aio
 | 
			
		||||
 | 
			
		||||
    # ?TODO, is this the right type for this method?
 | 
			
		||||
    def get_parent(self) -> Portal:
 | 
			
		||||
        '''
 | 
			
		||||
        Return a `Portal` to our parent.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        assert self._parent_chan, "No parent channel for this actor?"
 | 
			
		||||
        return Portal(self._parent_chan)
 | 
			
		||||
 | 
			
		||||
    # XXX: hard kill logic if needed?
 | 
			
		||||
    # def _hard_mofo_kill(self):
 | 
			
		||||
    #     # If we're the root actor or zombied kill everything
 | 
			
		||||
    #     if self._parent_chan is None:  # TODO: more robust check
 | 
			
		||||
    #         root = trio.lowlevel.current_root_task()
 | 
			
		||||
    #         for n in root.child_nurseries:
 | 
			
		||||
    #             n.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def async_main(
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -34,9 +34,9 @@ from typing import (
 | 
			
		|||
import trio
 | 
			
		||||
from trio import TaskStatus
 | 
			
		||||
 | 
			
		||||
from .devx.debug import (
 | 
			
		||||
    maybe_wait_for_debugger,
 | 
			
		||||
    acquire_debug_lock,
 | 
			
		||||
from .devx import (
 | 
			
		||||
    debug,
 | 
			
		||||
    pformat as _pformat
 | 
			
		||||
)
 | 
			
		||||
from tractor._state import (
 | 
			
		||||
    current_actor,
 | 
			
		||||
| 
						 | 
				
			
			@ -51,14 +51,17 @@ from tractor._portal import Portal
 | 
			
		|||
from tractor._runtime import Actor
 | 
			
		||||
from tractor._entry import _mp_main
 | 
			
		||||
from tractor._exceptions import ActorFailure
 | 
			
		||||
from tractor.msg.types import (
 | 
			
		||||
    Aid,
 | 
			
		||||
    SpawnSpec,
 | 
			
		||||
from tractor.msg import (
 | 
			
		||||
    types as msgtypes,
 | 
			
		||||
    pretty_struct,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from ipc import IPCServer
 | 
			
		||||
    from ipc import (
 | 
			
		||||
        _server,
 | 
			
		||||
        Channel,
 | 
			
		||||
    )
 | 
			
		||||
    from ._supervise import ActorNursery
 | 
			
		||||
    ProcessType = TypeVar('ProcessType', mp.Process, trio.Process)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -328,20 +331,21 @@ async def soft_kill(
 | 
			
		|||
    see `.hard_kill()`).
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    peer_aid: Aid = portal.channel.aid
 | 
			
		||||
    chan: Channel = portal.channel
 | 
			
		||||
    peer_aid: msgtypes.Aid = chan.aid
 | 
			
		||||
    try:
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            f'Soft killing sub-actor via portal request\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'(c=> {peer_aid}\n'
 | 
			
		||||
            f'  |_{proc}\n'
 | 
			
		||||
            f'c)=> {peer_aid.reprol()}@[{chan.maddr}]\n'
 | 
			
		||||
            f'   |_{proc}\n'
 | 
			
		||||
        )
 | 
			
		||||
        # wait on sub-proc to signal termination
 | 
			
		||||
        await wait_func(proc)
 | 
			
		||||
 | 
			
		||||
    except trio.Cancelled:
 | 
			
		||||
        with trio.CancelScope(shield=True):
 | 
			
		||||
            await maybe_wait_for_debugger(
 | 
			
		||||
            await debug.maybe_wait_for_debugger(
 | 
			
		||||
                child_in_debug=_runtime_vars.get(
 | 
			
		||||
                    '_debug_mode', False
 | 
			
		||||
                ),
 | 
			
		||||
| 
						 | 
				
			
			@ -465,7 +469,7 @@ async def trio_proc(
 | 
			
		|||
        "--uid",
 | 
			
		||||
        # TODO, how to pass this over "wire" encodings like
 | 
			
		||||
        # cmdline args?
 | 
			
		||||
        # -[ ] maybe we can add an `Aid.min_tuple()` ?
 | 
			
		||||
        # -[ ] maybe we can add an `msgtypes.Aid.min_tuple()` ?
 | 
			
		||||
        str(subactor.uid),
 | 
			
		||||
        # Address the child must connect to on startup
 | 
			
		||||
        "--parent_addr",
 | 
			
		||||
| 
						 | 
				
			
			@ -483,13 +487,14 @@ async def trio_proc(
 | 
			
		|||
 | 
			
		||||
    cancelled_during_spawn: bool = False
 | 
			
		||||
    proc: trio.Process|None = None
 | 
			
		||||
    ipc_server: IPCServer = actor_nursery._actor.ipc_server
 | 
			
		||||
    ipc_server: _server.Server = actor_nursery._actor.ipc_server
 | 
			
		||||
    try:
 | 
			
		||||
        try:
 | 
			
		||||
            proc: trio.Process = await trio.lowlevel.open_process(spawn_cmd, **proc_kwargs)
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                'Started new child\n'
 | 
			
		||||
                f'|_{proc}\n'
 | 
			
		||||
                f'Started new child subproc\n'
 | 
			
		||||
                f'(>\n'
 | 
			
		||||
                f' |_{proc}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # wait for actor to spawn and connect back to us
 | 
			
		||||
| 
						 | 
				
			
			@ -507,10 +512,10 @@ async def trio_proc(
 | 
			
		|||
                with trio.CancelScope(shield=True):
 | 
			
		||||
                    # don't clobber an ongoing pdb
 | 
			
		||||
                    if is_root_process():
 | 
			
		||||
                        await maybe_wait_for_debugger()
 | 
			
		||||
                        await debug.maybe_wait_for_debugger()
 | 
			
		||||
 | 
			
		||||
                    elif proc is not None:
 | 
			
		||||
                        async with acquire_debug_lock(subactor.uid):
 | 
			
		||||
                        async with debug.acquire_debug_lock(subactor.uid):
 | 
			
		||||
                            # soft wait on the proc to terminate
 | 
			
		||||
                            with trio.move_on_after(0.5):
 | 
			
		||||
                                await proc.wait()
 | 
			
		||||
| 
						 | 
				
			
			@ -528,14 +533,19 @@ async def trio_proc(
 | 
			
		|||
 | 
			
		||||
        # send a "spawning specification" which configures the
 | 
			
		||||
        # initial runtime state of the child.
 | 
			
		||||
        sspec = SpawnSpec(
 | 
			
		||||
        sspec = msgtypes.SpawnSpec(
 | 
			
		||||
            _parent_main_data=subactor._parent_main_data,
 | 
			
		||||
            enable_modules=subactor.enable_modules,
 | 
			
		||||
            reg_addrs=subactor.reg_addrs,
 | 
			
		||||
            bind_addrs=bind_addrs,
 | 
			
		||||
            _runtime_vars=_runtime_vars,
 | 
			
		||||
        )
 | 
			
		||||
        log.runtime(f'Sending spawn spec: {str(sspec)}')
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            f'Sending spawn spec to child\n'
 | 
			
		||||
            f'{{}}=> {chan.aid.reprol()!r}\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'{pretty_struct.pformat(sspec)}\n'
 | 
			
		||||
        )
 | 
			
		||||
        await chan.send(sspec)
 | 
			
		||||
 | 
			
		||||
        # track subactor in current nursery
 | 
			
		||||
| 
						 | 
				
			
			@ -563,7 +573,7 @@ async def trio_proc(
 | 
			
		|||
            # condition.
 | 
			
		||||
            await soft_kill(
 | 
			
		||||
                proc,
 | 
			
		||||
                trio.Process.wait,
 | 
			
		||||
                trio.Process.wait,  # XXX, uses `pidfd_open()` below.
 | 
			
		||||
                portal
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -571,8 +581,7 @@ async def trio_proc(
 | 
			
		|||
            # tandem if not done already
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                'Cancelling portal result reaper task\n'
 | 
			
		||||
                f'>c)\n'
 | 
			
		||||
                f' |_{subactor.uid}\n'
 | 
			
		||||
                f'c)> {subactor.aid.reprol()!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            nursery.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -581,21 +590,24 @@ async def trio_proc(
 | 
			
		|||
        # allowed! Do this **after** cancellation/teardown to avoid
 | 
			
		||||
        # killing the process too early.
 | 
			
		||||
        if proc:
 | 
			
		||||
            reap_repr: str = _pformat.nest_from_op(
 | 
			
		||||
                input_op='>x)',
 | 
			
		||||
                text=subactor.pformat(),
 | 
			
		||||
            )
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                f'Hard reap sequence starting for subactor\n'
 | 
			
		||||
                f'>x)\n'
 | 
			
		||||
                f' |_{subactor}@{subactor.uid}\n'
 | 
			
		||||
                f'{reap_repr}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            with trio.CancelScope(shield=True):
 | 
			
		||||
                # don't clobber an ongoing pdb
 | 
			
		||||
                if cancelled_during_spawn:
 | 
			
		||||
                    # Try again to avoid TTY clobbering.
 | 
			
		||||
                    async with acquire_debug_lock(subactor.uid):
 | 
			
		||||
                    async with debug.acquire_debug_lock(subactor.uid):
 | 
			
		||||
                        with trio.move_on_after(0.5):
 | 
			
		||||
                            await proc.wait()
 | 
			
		||||
 | 
			
		||||
                await maybe_wait_for_debugger(
 | 
			
		||||
                await debug.maybe_wait_for_debugger(
 | 
			
		||||
                    child_in_debug=_runtime_vars.get(
 | 
			
		||||
                        '_debug_mode', False
 | 
			
		||||
                    ),
 | 
			
		||||
| 
						 | 
				
			
			@ -624,7 +636,7 @@ async def trio_proc(
 | 
			
		|||
                #     acquire the lock and get notified of who has it,
 | 
			
		||||
                #     check that uid against our known children?
 | 
			
		||||
                # this_uid: tuple[str, str] = current_actor().uid
 | 
			
		||||
                # await acquire_debug_lock(this_uid)
 | 
			
		||||
                # await debug.acquire_debug_lock(this_uid)
 | 
			
		||||
 | 
			
		||||
                if proc.poll() is None:
 | 
			
		||||
                    log.cancel(f"Attempting to hard kill {proc}")
 | 
			
		||||
| 
						 | 
				
			
			@ -727,7 +739,7 @@ async def mp_proc(
 | 
			
		|||
 | 
			
		||||
    log.runtime(f"Started {proc}")
 | 
			
		||||
 | 
			
		||||
    ipc_server: IPCServer = actor_nursery._actor.ipc_server
 | 
			
		||||
    ipc_server: _server.Server = actor_nursery._actor.ipc_server
 | 
			
		||||
    try:
 | 
			
		||||
        # wait for actor to spawn and connect back to us
 | 
			
		||||
        # channel should have handshake completed by the
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -789,6 +789,11 @@ def open_shm_list(
 | 
			
		|||
        readonly=readonly,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # TODO, factor into a @actor_fixture acm-API?
 | 
			
		||||
    # -[ ] also `@maybe_actor_fixture()` which inludes
 | 
			
		||||
    #     the .current_actor() convenience check?
 | 
			
		||||
    #   |_ orr can that just be in the sin-maybe-version?
 | 
			
		||||
    #
 | 
			
		||||
    # "close" attached shm on actor teardown
 | 
			
		||||
    try:
 | 
			
		||||
        actor = tractor.current_actor()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -18,6 +18,7 @@ TCP implementation of tractor.ipc._transport.MsgTransport protocol
 | 
			
		|||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
import ipaddress
 | 
			
		||||
from typing import (
 | 
			
		||||
    ClassVar,
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -50,13 +51,45 @@ class TCPAddress(
 | 
			
		|||
    _host: str
 | 
			
		||||
    _port: int
 | 
			
		||||
 | 
			
		||||
    def __post_init__(self):
 | 
			
		||||
        try:
 | 
			
		||||
            ipaddress.ip_address(self._host)
 | 
			
		||||
        except ValueError as valerr:
 | 
			
		||||
            raise ValueError(
 | 
			
		||||
                'Invalid {type(self).__name__}._host = {self._host!r}\n'
 | 
			
		||||
            ) from valerr
 | 
			
		||||
 | 
			
		||||
    proto_key: ClassVar[str] = 'tcp'
 | 
			
		||||
    unwrapped_type: ClassVar[type] = tuple[str, int]
 | 
			
		||||
    def_bindspace: ClassVar[str] = '127.0.0.1'
 | 
			
		||||
 | 
			
		||||
    # ?TODO, actually validate ipv4/6 with stdlib's `ipaddress`
 | 
			
		||||
    @property
 | 
			
		||||
    def is_valid(self) -> bool:
 | 
			
		||||
        return self._port != 0
 | 
			
		||||
        '''
 | 
			
		||||
        Predicate to ensure a valid socket-address pair.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return (
 | 
			
		||||
            self._port != 0
 | 
			
		||||
            and
 | 
			
		||||
            (ipaddr := ipaddress.ip_address(self._host))
 | 
			
		||||
            and not (
 | 
			
		||||
                ipaddr.is_reserved
 | 
			
		||||
                or
 | 
			
		||||
                ipaddr.is_unspecified
 | 
			
		||||
                or
 | 
			
		||||
                ipaddr.is_link_local
 | 
			
		||||
                or
 | 
			
		||||
                ipaddr.is_link_local
 | 
			
		||||
                or
 | 
			
		||||
                ipaddr.is_multicast
 | 
			
		||||
                or
 | 
			
		||||
                ipaddr.is_global
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        # ^XXX^ see various properties of invalid addrs here,
 | 
			
		||||
        # https://docs.python.org/3/library/ipaddress.html#ipaddress.IPv4Address
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def bindspace(self) -> str:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -210,12 +210,14 @@ class PldRx(Struct):
 | 
			
		|||
        match msg:
 | 
			
		||||
            case Return()|Error():
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    f'Rxed final outcome msg\n'
 | 
			
		||||
                    f'Rxed final-outcome msg\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'{msg}\n'
 | 
			
		||||
                )
 | 
			
		||||
            case Stop():
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    f'Rxed stream stopped msg\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'{msg}\n'
 | 
			
		||||
                )
 | 
			
		||||
                if passthrough_non_pld_msgs:
 | 
			
		||||
| 
						 | 
				
			
			@ -261,8 +263,9 @@ class PldRx(Struct):
 | 
			
		|||
        if (
 | 
			
		||||
            type(msg) is Return
 | 
			
		||||
        ):
 | 
			
		||||
            log.info(
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                f'Rxed final result msg\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'{msg}\n'
 | 
			
		||||
            )
 | 
			
		||||
        return self.decode_pld(
 | 
			
		||||
| 
						 | 
				
			
			@ -304,10 +307,13 @@ class PldRx(Struct):
 | 
			
		|||
                try:
 | 
			
		||||
                    pld: PayloadT = self._pld_dec.decode(pld)
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        'Decoded msg payload\n\n'
 | 
			
		||||
                        f'Decoded payload for\n'
 | 
			
		||||
                        # f'\n'
 | 
			
		||||
                        f'{msg}\n'
 | 
			
		||||
                        f'where payload decoded as\n'
 | 
			
		||||
                        f'|_pld={pld!r}\n'
 | 
			
		||||
                        # ^TODO?, ideally just render with `,
 | 
			
		||||
                        # pld={decode}` in the `msg.pformat()`??
 | 
			
		||||
                        f'where, '
 | 
			
		||||
                        f'{type(msg).__name__}.pld={pld!r}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    return pld
 | 
			
		||||
                except TypeError as typerr:
 | 
			
		||||
| 
						 | 
				
			
			@ -494,7 +500,8 @@ def limit_plds(
 | 
			
		|||
 | 
			
		||||
    finally:
 | 
			
		||||
        log.runtime(
 | 
			
		||||
            'Reverted to previous payload-decoder\n\n'
 | 
			
		||||
            f'Reverted to previous payload-decoder\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'{orig_pldec}\n'
 | 
			
		||||
        )
 | 
			
		||||
        # sanity on orig settings
 | 
			
		||||
| 
						 | 
				
			
			@ -629,7 +636,8 @@ async def drain_to_final_msg(
 | 
			
		|||
                    (local_cs := rent_n.cancel_scope).cancel_called
 | 
			
		||||
                ):
 | 
			
		||||
                    log.cancel(
 | 
			
		||||
                        'RPC-ctx cancelled by local-parent scope during drain!\n\n'
 | 
			
		||||
                        f'RPC-ctx cancelled by local-parent scope during drain!\n'
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'c}}>\n'
 | 
			
		||||
                        f' |_{rent_n}\n'
 | 
			
		||||
                        f'   |_.cancel_scope = {local_cs}\n'
 | 
			
		||||
| 
						 | 
				
			
			@ -663,7 +671,8 @@ async def drain_to_final_msg(
 | 
			
		|||
            # final result arrived!
 | 
			
		||||
            case Return():
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    'Context delivered final draining msg:\n'
 | 
			
		||||
                    f'Context delivered final draining msg\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'{pretty_struct.pformat(msg)}'
 | 
			
		||||
                )
 | 
			
		||||
                ctx._result: Any = pld
 | 
			
		||||
| 
						 | 
				
			
			@ -697,12 +706,14 @@ async def drain_to_final_msg(
 | 
			
		|||
                ):
 | 
			
		||||
                    log.cancel(
 | 
			
		||||
                        'Cancelling `MsgStream` drain since '
 | 
			
		||||
                        f'{reason}\n\n'
 | 
			
		||||
                        f'{reason}\n'
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'<= {ctx.chan.uid}\n'
 | 
			
		||||
                        f'  |_{ctx._nsf}()\n\n'
 | 
			
		||||
                        f'  |_{ctx._nsf}()\n'
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'=> {ctx._task}\n'
 | 
			
		||||
                        f'  |_{ctx._stream}\n\n'
 | 
			
		||||
 | 
			
		||||
                        f'  |_{ctx._stream}\n'
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'{pretty_struct.pformat(msg)}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    break
 | 
			
		||||
| 
						 | 
				
			
			@ -739,7 +750,8 @@ async def drain_to_final_msg(
 | 
			
		|||
            case Stop():
 | 
			
		||||
                pre_result_drained.append(msg)
 | 
			
		||||
                log.runtime(  # normal/expected shutdown transaction
 | 
			
		||||
                    'Remote stream terminated due to "stop" msg:\n\n'
 | 
			
		||||
                    f'Remote stream terminated due to "stop" msg\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'{pretty_struct.pformat(msg)}\n'
 | 
			
		||||
                )
 | 
			
		||||
                continue
 | 
			
		||||
| 
						 | 
				
			
			@ -814,7 +826,8 @@ async def drain_to_final_msg(
 | 
			
		|||
 | 
			
		||||
    else:
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            'Skipping `MsgStream` drain since final outcome is set\n\n'
 | 
			
		||||
            f'Skipping `MsgStream` drain since final outcome is set\n'
 | 
			
		||||
            f'\n'
 | 
			
		||||
            f'{ctx.outcome}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue