Compare commits
	
		
			5 Commits 
		
	
	
		
			c0058024c2
			...
			29db08b370
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						29db08b370 | |
| 
							
							
								 | 
						fe5e6e2ab0 | |
| 
							
							
								 | 
						ae91310b32 | |
| 
							
							
								 | 
						f86f4ae48d | |
| 
							
							
								 | 
						b244cf844d | 
| 
						 | 
				
			
			@ -743,6 +743,8 @@ class Context:
 | 
			
		|||
            # cancelled, NOT their reported canceller. IOW in the
 | 
			
		||||
            # latter case we're cancelled by someone else getting
 | 
			
		||||
            # cancelled.
 | 
			
		||||
            #
 | 
			
		||||
            # !TODO, switching to `Actor.aid` here!
 | 
			
		||||
            if (canc := error.canceller) == self._actor.uid:
 | 
			
		||||
                whom: str = 'us'
 | 
			
		||||
                self._canceller = canc
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										104
									
								
								tractor/_root.py
								
								
								
								
							
							
						
						
									
										104
									
								
								tractor/_root.py
								
								
								
								
							| 
						 | 
				
			
			@ -202,7 +202,9 @@ async def open_root_actor(
 | 
			
		|||
 | 
			
		||||
    '''
 | 
			
		||||
    # XXX NEVER allow nested actor-trees!
 | 
			
		||||
    if already_actor := _state.current_actor(err_on_no_runtime=False):
 | 
			
		||||
    if already_actor := _state.current_actor(
 | 
			
		||||
        err_on_no_runtime=False,
 | 
			
		||||
    ):
 | 
			
		||||
        rtvs: dict[str, Any] = _state._runtime_vars
 | 
			
		||||
        root_mailbox: list[str, int] = rtvs['_root_mailbox']
 | 
			
		||||
        registry_addrs: list[list[str, int]] = rtvs['_registry_addrs']
 | 
			
		||||
| 
						 | 
				
			
			@ -272,14 +274,20 @@ async def open_root_actor(
 | 
			
		|||
                DeprecationWarning,
 | 
			
		||||
                stacklevel=2,
 | 
			
		||||
            )
 | 
			
		||||
            registry_addrs = [arbiter_addr]
 | 
			
		||||
            uw_reg_addrs = [arbiter_addr]
 | 
			
		||||
 | 
			
		||||
        if not registry_addrs:
 | 
			
		||||
            registry_addrs: list[UnwrappedAddress] = default_lo_addrs(
 | 
			
		||||
        uw_reg_addrs = registry_addrs
 | 
			
		||||
        if not uw_reg_addrs:
 | 
			
		||||
            uw_reg_addrs: list[UnwrappedAddress] = default_lo_addrs(
 | 
			
		||||
                enable_transports
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        assert registry_addrs
 | 
			
		||||
        # must exist by now since all below code is dependent
 | 
			
		||||
        assert uw_reg_addrs
 | 
			
		||||
        registry_addrs: list[Address] = [
 | 
			
		||||
            wrap_address(uw_addr)
 | 
			
		||||
            for uw_addr in uw_reg_addrs
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        loglevel = (
 | 
			
		||||
            loglevel
 | 
			
		||||
| 
						 | 
				
			
			@ -328,10 +336,10 @@ async def open_root_actor(
 | 
			
		|||
            enable_stack_on_sig()
 | 
			
		||||
 | 
			
		||||
        # closed into below ping task-func
 | 
			
		||||
        ponged_addrs: list[UnwrappedAddress] = []
 | 
			
		||||
        ponged_addrs: list[Address] = []
 | 
			
		||||
 | 
			
		||||
        async def ping_tpt_socket(
 | 
			
		||||
            addr: UnwrappedAddress,
 | 
			
		||||
            addr: Address,
 | 
			
		||||
            timeout: float = 1,
 | 
			
		||||
        ) -> None:
 | 
			
		||||
            '''
 | 
			
		||||
| 
						 | 
				
			
			@ -351,17 +359,22 @@ async def open_root_actor(
 | 
			
		|||
                # be better to eventually have a "discovery" protocol
 | 
			
		||||
                # with basic handshake instead?
 | 
			
		||||
                with trio.move_on_after(timeout):
 | 
			
		||||
                    async with _connect_chan(addr):
 | 
			
		||||
                    async with _connect_chan(addr.unwrap()):
 | 
			
		||||
                        ponged_addrs.append(addr)
 | 
			
		||||
 | 
			
		||||
            except OSError:
 | 
			
		||||
                # TODO: make this a "discovery" log level?
 | 
			
		||||
                # ?TODO, make this a "discovery" log level?
 | 
			
		||||
                logger.info(
 | 
			
		||||
                    f'No actor registry found @ {addr}\n'
 | 
			
		||||
                    f'No root-actor registry found @ {addr!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        # !TODO, this is basically just another (abstract)
 | 
			
		||||
        # happy-eyeballs, so we should try for formalize it somewhere
 | 
			
		||||
        # in a `.[_]discovery` ya?
 | 
			
		||||
        #
 | 
			
		||||
        async with trio.open_nursery() as tn:
 | 
			
		||||
            for addr in registry_addrs:
 | 
			
		||||
            for uw_addr in uw_reg_addrs:
 | 
			
		||||
                addr: Address = wrap_address(uw_addr)
 | 
			
		||||
                tn.start_soon(
 | 
			
		||||
                    ping_tpt_socket,
 | 
			
		||||
                    addr,
 | 
			
		||||
| 
						 | 
				
			
			@ -390,24 +403,28 @@ async def open_root_actor(
 | 
			
		|||
                loglevel=loglevel,
 | 
			
		||||
                enable_modules=enable_modules,
 | 
			
		||||
            )
 | 
			
		||||
            # DO NOT use the registry_addrs as the transport server
 | 
			
		||||
            # addrs for this new non-registar, root-actor.
 | 
			
		||||
            # **DO NOT** use the registry_addrs as the
 | 
			
		||||
            # ipc-transport-server's bind-addrs as this is
 | 
			
		||||
            # a new NON-registrar, ROOT-actor.
 | 
			
		||||
            #
 | 
			
		||||
            # XXX INSTEAD, bind random addrs using the same tpt
 | 
			
		||||
            # proto.
 | 
			
		||||
            for addr in ponged_addrs:
 | 
			
		||||
                waddr: Address = wrap_address(addr)
 | 
			
		||||
                trans_bind_addrs.append(
 | 
			
		||||
                    waddr.get_random(bindspace=waddr.bindspace)
 | 
			
		||||
                    addr.get_random(
 | 
			
		||||
                        bindspace=addr.bindspace,
 | 
			
		||||
                    )
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        # Start this local actor as the "registrar", aka a regular
 | 
			
		||||
        # actor who manages the local registry of "mailboxes" of
 | 
			
		||||
        # other process-tree-local sub-actors.
 | 
			
		||||
        else:
 | 
			
		||||
 | 
			
		||||
            # NOTE that if the current actor IS THE REGISTAR, the
 | 
			
		||||
            # following init steps are taken:
 | 
			
		||||
            # - the tranport layer server is bound to each addr
 | 
			
		||||
            #   pair defined in provided registry_addrs, or the default.
 | 
			
		||||
            trans_bind_addrs = registry_addrs
 | 
			
		||||
            trans_bind_addrs = uw_reg_addrs
 | 
			
		||||
 | 
			
		||||
            # - it is normally desirable for any registrar to stay up
 | 
			
		||||
            #   indefinitely until either all registered (child/sub)
 | 
			
		||||
| 
						 | 
				
			
			@ -431,6 +448,16 @@ async def open_root_actor(
 | 
			
		|||
            # `.trio.run()`.
 | 
			
		||||
            actor._infected_aio = _state._runtime_vars['_is_infected_aio']
 | 
			
		||||
 | 
			
		||||
        # NOTE, only set the loopback addr for the
 | 
			
		||||
        # process-tree-global "root" mailbox since all sub-actors
 | 
			
		||||
        # should be able to speak to their root actor over that
 | 
			
		||||
        # channel.
 | 
			
		||||
        raddrs: list[Address] = _state._runtime_vars['_root_addrs']
 | 
			
		||||
        raddrs.extend(trans_bind_addrs)
 | 
			
		||||
        # TODO, remove once we have also removed all usage;
 | 
			
		||||
        # eventually all (root-)registry apis should expect > 1 addr.
 | 
			
		||||
        _state._runtime_vars['_root_mailbox'] = raddrs[0]
 | 
			
		||||
 | 
			
		||||
        # Start up main task set via core actor-runtime nurseries.
 | 
			
		||||
        try:
 | 
			
		||||
            # assign process-local actor
 | 
			
		||||
| 
						 | 
				
			
			@ -438,20 +465,26 @@ async def open_root_actor(
 | 
			
		|||
 | 
			
		||||
            # start local channel-server and fake the portal API
 | 
			
		||||
            # NOTE: this won't block since we provide the nursery
 | 
			
		||||
            ml_addrs_str: str = '\n'.join(
 | 
			
		||||
                f'@{addr}' for addr in trans_bind_addrs
 | 
			
		||||
            )
 | 
			
		||||
            logger.info(
 | 
			
		||||
                f'Starting local {actor.uid} on the following transport addrs:\n'
 | 
			
		||||
                f'{ml_addrs_str}'
 | 
			
		||||
            )
 | 
			
		||||
            report: str = f'Starting actor-runtime for {actor.aid.reprol()!r}\n'
 | 
			
		||||
            if reg_addrs := actor.registry_addrs:
 | 
			
		||||
                report += (
 | 
			
		||||
                    '-> Opening new registry @ '
 | 
			
		||||
                    +
 | 
			
		||||
                    '\n'.join(
 | 
			
		||||
                        f'@{addr}' for addr in reg_addrs
 | 
			
		||||
                    )
 | 
			
		||||
                )
 | 
			
		||||
            logger.info(f'{report}\n')
 | 
			
		||||
 | 
			
		||||
            # start runtime in a bg sub-task, yield to caller.
 | 
			
		||||
            async with (
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as root_tn,
 | 
			
		||||
            ):
 | 
			
		||||
 | 
			
		||||
                # XXX, finally-footgun below?
 | 
			
		||||
                # -> see note on why shielding.
 | 
			
		||||
                # maybe_raise_from_masking_exc(),
 | 
			
		||||
            ):
 | 
			
		||||
                # `_runtime.async_main()` creates an internal nursery
 | 
			
		||||
                # and blocks here until any underlying actor(-process)
 | 
			
		||||
                # tree has terminated thereby conducting so called
 | 
			
		||||
| 
						 | 
				
			
			@ -526,9 +559,14 @@ async def open_root_actor(
 | 
			
		|||
                    )
 | 
			
		||||
                    logger.info(
 | 
			
		||||
                        f'Closing down root actor\n'
 | 
			
		||||
                        f'{op_nested_actor_repr}\n'
 | 
			
		||||
                        f'{op_nested_actor_repr}'
 | 
			
		||||
                    )
 | 
			
		||||
                    await actor.cancel(None)  # self cancel
 | 
			
		||||
                    # XXX, THIS IS A *finally-footgun*!
 | 
			
		||||
                    # -> though already shields iternally it can
 | 
			
		||||
                    # taskc here and mask underlying errors raised in
 | 
			
		||||
                    # the try-block above?
 | 
			
		||||
                    with trio.CancelScope(shield=True):
 | 
			
		||||
                        await actor.cancel(None)  # self cancel
 | 
			
		||||
        finally:
 | 
			
		||||
            # revert all process-global runtime state
 | 
			
		||||
            if (
 | 
			
		||||
| 
						 | 
				
			
			@ -541,10 +579,16 @@ async def open_root_actor(
 | 
			
		|||
            _state._current_actor = None
 | 
			
		||||
            _state._last_actor_terminated = actor
 | 
			
		||||
 | 
			
		||||
            logger.runtime(
 | 
			
		||||
            sclang_repr: str = _pformat.nest_from_op(
 | 
			
		||||
                input_op=')>',
 | 
			
		||||
                text=actor.pformat(),
 | 
			
		||||
                nest_prefix='|_',
 | 
			
		||||
                nest_indent=1,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            logger.info(
 | 
			
		||||
                f'Root actor terminated\n'
 | 
			
		||||
                f')>\n'
 | 
			
		||||
                f' |_{actor}\n'
 | 
			
		||||
                f'{sclang_repr}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -672,7 +672,8 @@ async def _invoke(
 | 
			
		|||
                ctx._result = res
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    f'Sending result msg and exiting {ctx.side!r}\n'
 | 
			
		||||
                    f'{return_msg}\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'{pretty_struct.pformat(return_msg)}\n'
 | 
			
		||||
                )
 | 
			
		||||
                await chan.send(return_msg)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -839,12 +840,12 @@ async def _invoke(
 | 
			
		|||
                else:
 | 
			
		||||
                    descr_str += f'\n{merr!r}\n'
 | 
			
		||||
            else:
 | 
			
		||||
                descr_str += f'\nand final result {ctx.outcome!r}\n'
 | 
			
		||||
                descr_str += f'\nwith final result {ctx.outcome!r}\n'
 | 
			
		||||
 | 
			
		||||
            logmeth(
 | 
			
		||||
                message
 | 
			
		||||
                +
 | 
			
		||||
                descr_str
 | 
			
		||||
                f'{message}\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'{descr_str}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1011,8 +1012,6 @@ async def process_messages(
 | 
			
		|||
                        cid=cid,
 | 
			
		||||
                        kwargs=kwargs,
 | 
			
		||||
                    ):
 | 
			
		||||
                        kwargs |= {'req_chan': chan}
 | 
			
		||||
 | 
			
		||||
                        # XXX NOTE XXX don't start entire actor
 | 
			
		||||
                        # runtime cancellation if this actor is
 | 
			
		||||
                        # currently in debug mode!
 | 
			
		||||
| 
						 | 
				
			
			@ -1031,14 +1030,14 @@ async def process_messages(
 | 
			
		|||
                                cid,
 | 
			
		||||
                                chan,
 | 
			
		||||
                                actor.cancel,
 | 
			
		||||
                                kwargs,
 | 
			
		||||
                                kwargs | {'req_chan': chan},
 | 
			
		||||
                                is_rpc=False,
 | 
			
		||||
                                return_msg_type=CancelAck,
 | 
			
		||||
                            )
 | 
			
		||||
 | 
			
		||||
                        log.runtime(
 | 
			
		||||
                            'Cancelling IPC transport msg-loop with peer:\n'
 | 
			
		||||
                            f'|_{chan}\n'
 | 
			
		||||
                            'Cancelling RPC-msg-loop with peer\n'
 | 
			
		||||
                            f'->c}} {chan.aid.reprol()}@[{chan.maddr}]\n'
 | 
			
		||||
                        )
 | 
			
		||||
                        loop_cs.cancel()
 | 
			
		||||
                        break
 | 
			
		||||
| 
						 | 
				
			
			@ -1234,9 +1233,21 @@ async def process_messages(
 | 
			
		|||
            # END-OF `async for`:
 | 
			
		||||
            # IPC disconnected via `trio.EndOfChannel`, likely
 | 
			
		||||
            # due to a (graceful) `Channel.aclose()`.
 | 
			
		||||
 | 
			
		||||
            chan_op_repr: str = '<=x] '
 | 
			
		||||
            chan_repr: str = _pformat.nest_from_op(
 | 
			
		||||
                input_op=chan_op_repr,
 | 
			
		||||
                op_suffix='',
 | 
			
		||||
                nest_prefix='',
 | 
			
		||||
                text=chan.pformat(),
 | 
			
		||||
                nest_indent=len(chan_op_repr)-1,
 | 
			
		||||
                rm_from_first_ln='<',
 | 
			
		||||
            )
 | 
			
		||||
            log.runtime(
 | 
			
		||||
                f'channel for {chan.uid} disconnected, cancelling RPC tasks\n'
 | 
			
		||||
                f'|_{chan}\n'
 | 
			
		||||
                f'IPC channel disconnected\n'
 | 
			
		||||
                f'{chan_repr}\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'->c) cancelling RPC tasks.\n'
 | 
			
		||||
            )
 | 
			
		||||
            await actor.cancel_rpc_tasks(
 | 
			
		||||
                req_aid=actor.aid,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -213,7 +213,7 @@ class Actor:
 | 
			
		|||
        *,
 | 
			
		||||
        enable_modules: list[str] = [],
 | 
			
		||||
        loglevel: str|None = None,
 | 
			
		||||
        registry_addrs: list[UnwrappedAddress]|None = None,
 | 
			
		||||
        registry_addrs: list[Address]|None = None,
 | 
			
		||||
        spawn_method: str|None = None,
 | 
			
		||||
 | 
			
		||||
        # TODO: remove!
 | 
			
		||||
| 
						 | 
				
			
			@ -256,11 +256,12 @@ class Actor:
 | 
			
		|||
        if arbiter_addr is not None:
 | 
			
		||||
            warnings.warn(
 | 
			
		||||
                '`Actor(arbiter_addr=<blah>)` is now deprecated.\n'
 | 
			
		||||
                'Use `registry_addrs: list[tuple]` instead.',
 | 
			
		||||
                'Use `registry_addrs: list[Address]` instead.',
 | 
			
		||||
                DeprecationWarning,
 | 
			
		||||
                stacklevel=2,
 | 
			
		||||
            )
 | 
			
		||||
            registry_addrs: list[UnwrappedAddress] = [arbiter_addr]
 | 
			
		||||
 | 
			
		||||
            registry_addrs: list[Address] = [wrap_address(arbiter_addr)]
 | 
			
		||||
 | 
			
		||||
        # marked by the process spawning backend at startup
 | 
			
		||||
        # will be None for the parent most process started manually
 | 
			
		||||
| 
						 | 
				
			
			@ -299,8 +300,10 @@ class Actor:
 | 
			
		|||
        # input via the validator.
 | 
			
		||||
        self._reg_addrs: list[UnwrappedAddress] = []
 | 
			
		||||
        if registry_addrs:
 | 
			
		||||
            self.reg_addrs: list[UnwrappedAddress] = registry_addrs
 | 
			
		||||
            _state._runtime_vars['_registry_addrs'] = registry_addrs
 | 
			
		||||
            _state._runtime_vars['_registry_addrs'] = self.reg_addrs = [
 | 
			
		||||
                addr.unwrap()
 | 
			
		||||
                for addr in registry_addrs
 | 
			
		||||
            ]
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def aid(self) -> msgtypes.Aid:
 | 
			
		||||
| 
						 | 
				
			
			@ -472,7 +475,11 @@ class Actor:
 | 
			
		|||
    def reg_addrs(self) -> list[UnwrappedAddress]:
 | 
			
		||||
        '''
 | 
			
		||||
        List of (socket) addresses for all known (and contactable)
 | 
			
		||||
        registry actors.
 | 
			
		||||
        registry-service actors in "unwrapped" (i.e. IPC interchange
 | 
			
		||||
        wire-compat) form.
 | 
			
		||||
 | 
			
		||||
        If you are looking for the "wrapped" address form, use
 | 
			
		||||
        `.registry_addrs` instead.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._reg_addrs
 | 
			
		||||
| 
						 | 
				
			
			@ -491,8 +498,14 @@ class Actor:
 | 
			
		|||
 | 
			
		||||
        self._reg_addrs = addrs
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def registry_addrs(self) -> list[Address]:
 | 
			
		||||
        return [wrap_address(uw_addr)
 | 
			
		||||
                for uw_addr in self.reg_addrs]
 | 
			
		||||
 | 
			
		||||
    def load_modules(
 | 
			
		||||
        self,
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
        Load explicitly enabled python modules from local fs after
 | 
			
		||||
| 
						 | 
				
			
			@ -1365,7 +1378,7 @@ class Actor:
 | 
			
		|||
        Return all IPC channels to the actor with provided `uid`.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._peers[uid]
 | 
			
		||||
        return self._ipc_server._peers[uid]
 | 
			
		||||
 | 
			
		||||
    def is_infected_aio(self) -> bool:
 | 
			
		||||
        '''
 | 
			
		||||
| 
						 | 
				
			
			@ -1420,6 +1433,8 @@ async def async_main(
 | 
			
		|||
        # establish primary connection with immediate parent
 | 
			
		||||
        actor._parent_chan: Channel|None = None
 | 
			
		||||
 | 
			
		||||
        # is this a sub-actor?
 | 
			
		||||
        # get runtime info from parent.
 | 
			
		||||
        if parent_addr is not None:
 | 
			
		||||
            (
 | 
			
		||||
                actor._parent_chan,
 | 
			
		||||
| 
						 | 
				
			
			@ -1464,10 +1479,8 @@ async def async_main(
 | 
			
		|||
 | 
			
		||||
            ipc_server: _server.IPCServer
 | 
			
		||||
            async with (
 | 
			
		||||
                trio.open_nursery(
 | 
			
		||||
                    strict_exception_groups=False,
 | 
			
		||||
                ) as service_nursery,
 | 
			
		||||
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as service_nursery,
 | 
			
		||||
                _server.open_ipc_server(
 | 
			
		||||
                    parent_tn=service_nursery,
 | 
			
		||||
                    stream_handler_tn=service_nursery,
 | 
			
		||||
| 
						 | 
				
			
			@ -1518,9 +1531,6 @@ async def async_main(
 | 
			
		|||
 | 
			
		||||
                # TODO: why is this not with the root nursery?
 | 
			
		||||
                try:
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        'Booting IPC server'
 | 
			
		||||
                    )
 | 
			
		||||
                    eps: list = await ipc_server.listen_on(
 | 
			
		||||
                        accept_addrs=accept_addrs,
 | 
			
		||||
                        stream_handler_nursery=service_nursery,
 | 
			
		||||
| 
						 | 
				
			
			@ -1552,18 +1562,6 @@ async def async_main(
 | 
			
		|||
                # TODO, just read direct from ipc_server?
 | 
			
		||||
                accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
 | 
			
		||||
 | 
			
		||||
                # NOTE: only set the loopback addr for the 
 | 
			
		||||
                # process-tree-global "root" mailbox since
 | 
			
		||||
                # all sub-actors should be able to speak to
 | 
			
		||||
                # their root actor over that channel.
 | 
			
		||||
                if _state._runtime_vars['_is_root']:
 | 
			
		||||
                    raddrs: list[Address] = _state._runtime_vars['_root_addrs']
 | 
			
		||||
                    for addr in accept_addrs:
 | 
			
		||||
                        waddr: Address = wrap_address(addr)
 | 
			
		||||
                        raddrs.append(addr)
 | 
			
		||||
                    else:
 | 
			
		||||
                        _state._runtime_vars['_root_mailbox'] = raddrs[0]
 | 
			
		||||
 | 
			
		||||
                # Register with the arbiter if we're told its addr
 | 
			
		||||
                log.runtime(
 | 
			
		||||
                    f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
 | 
			
		||||
| 
						 | 
				
			
			@ -1581,6 +1579,7 @@ async def async_main(
 | 
			
		|||
                    except AssertionError:
 | 
			
		||||
                        await debug.pause()
 | 
			
		||||
 | 
			
		||||
                    # !TODO, get rid of the local-portal crap XD
 | 
			
		||||
                    async with get_registry(addr) as reg_portal:
 | 
			
		||||
                        for accept_addr in accept_addrs:
 | 
			
		||||
                            accept_addr = wrap_address(accept_addr)
 | 
			
		||||
| 
						 | 
				
			
			@ -1619,7 +1618,7 @@ async def async_main(
 | 
			
		|||
            log.runtime(
 | 
			
		||||
                'Service nursery complete\n'
 | 
			
		||||
                '\n'
 | 
			
		||||
                '-> Waiting on root nursery to complete'
 | 
			
		||||
                '->} waiting on root nursery to complete..\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # Blocks here as expected until the root nursery is
 | 
			
		||||
| 
						 | 
				
			
			@ -1674,6 +1673,7 @@ async def async_main(
 | 
			
		|||
    finally:
 | 
			
		||||
        teardown_report: str = (
 | 
			
		||||
            'Main actor-runtime task completed\n'
 | 
			
		||||
            '\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # ?TODO? should this be in `._entry`/`._root` mods instead?
 | 
			
		||||
| 
						 | 
				
			
			@ -1715,7 +1715,8 @@ async def async_main(
 | 
			
		|||
        # Unregister actor from the registry-sys / registrar.
 | 
			
		||||
        if (
 | 
			
		||||
            is_registered
 | 
			
		||||
            and not actor.is_registrar
 | 
			
		||||
            and
 | 
			
		||||
            not actor.is_registrar
 | 
			
		||||
        ):
 | 
			
		||||
            failed: bool = False
 | 
			
		||||
            for addr in actor.reg_addrs:
 | 
			
		||||
| 
						 | 
				
			
			@ -1750,7 +1751,8 @@ async def async_main(
 | 
			
		|||
            ipc_server.has_peers(check_chans=True)
 | 
			
		||||
        ):
 | 
			
		||||
            teardown_report += (
 | 
			
		||||
                f'-> Waiting for remaining peers {ipc_server._peers} to clear..\n'
 | 
			
		||||
                f'-> Waiting for remaining peers to clear..\n'
 | 
			
		||||
                f'   {pformat(ipc_server._peers)}'
 | 
			
		||||
            )
 | 
			
		||||
            log.runtime(teardown_report)
 | 
			
		||||
            await ipc_server.wait_for_no_more_peers(
 | 
			
		||||
| 
						 | 
				
			
			@ -1758,20 +1760,23 @@ async def async_main(
 | 
			
		|||
            )
 | 
			
		||||
 | 
			
		||||
        teardown_report += (
 | 
			
		||||
            '-> All peer channels are complete\n'
 | 
			
		||||
            '-]> all peer channels are complete.\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    op_nested_actor_repr: str = _pformat.nest_from_op(
 | 
			
		||||
        input_op=')>',
 | 
			
		||||
        text=actor.pformat(),
 | 
			
		||||
        nest_prefix='|_',
 | 
			
		||||
        nest_indent=1,  # under >
 | 
			
		||||
    )
 | 
			
		||||
    # op_nested_actor_repr: str = _pformat.nest_from_op(
 | 
			
		||||
    #     input_op=')>',
 | 
			
		||||
    #     text=actor.pformat(),
 | 
			
		||||
    #     nest_prefix='|_',
 | 
			
		||||
    #     nest_indent=1,  # under >
 | 
			
		||||
    # )
 | 
			
		||||
    teardown_report += (
 | 
			
		||||
        'Actor runtime exited\n'
 | 
			
		||||
        f'{op_nested_actor_repr}'
 | 
			
		||||
        '-)> actor runtime main task exit.\n'
 | 
			
		||||
        # f'{op_nested_actor_repr}'
 | 
			
		||||
    )
 | 
			
		||||
    log.info(teardown_report)
 | 
			
		||||
    # if _state._runtime_vars['_is_root']:
 | 
			
		||||
    #     log.info(teardown_report)
 | 
			
		||||
    # else:
 | 
			
		||||
    log.runtime(teardown_report)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# TODO: rename to `Registry` and move to `.discovery._registry`!
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -21,7 +21,6 @@
 | 
			
		|||
from contextlib import asynccontextmanager as acm
 | 
			
		||||
from functools import partial
 | 
			
		||||
import inspect
 | 
			
		||||
from pprint import pformat
 | 
			
		||||
from typing import (
 | 
			
		||||
    TYPE_CHECKING,
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -31,7 +30,10 @@ import warnings
 | 
			
		|||
import trio
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
from .devx.debug import maybe_wait_for_debugger
 | 
			
		||||
from .devx import (
 | 
			
		||||
    debug,
 | 
			
		||||
    pformat as _pformat,
 | 
			
		||||
)
 | 
			
		||||
from ._addr import (
 | 
			
		||||
    UnwrappedAddress,
 | 
			
		||||
    mk_uuid,
 | 
			
		||||
| 
						 | 
				
			
			@ -200,7 +202,7 @@ class ActorNursery:
 | 
			
		|||
            loglevel=loglevel,
 | 
			
		||||
 | 
			
		||||
            # verbatim relay this actor's registrar addresses
 | 
			
		||||
            registry_addrs=current_actor().reg_addrs,
 | 
			
		||||
            registry_addrs=current_actor().registry_addrs,
 | 
			
		||||
        )
 | 
			
		||||
        parent_addr: UnwrappedAddress = self._actor.accept_addr
 | 
			
		||||
        assert parent_addr
 | 
			
		||||
| 
						 | 
				
			
			@ -454,7 +456,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
                    # the "hard join phase".
 | 
			
		||||
                    log.runtime(
 | 
			
		||||
                        'Waiting on subactors to complete:\n'
 | 
			
		||||
                        f'{pformat(an._children)}\n'
 | 
			
		||||
                        f'>}} {len(an._children)}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                    an._join_procs.set()
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -468,7 +470,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
                    # will make the pdb repl unusable.
 | 
			
		||||
                    # Instead try to wait for pdb to be released before
 | 
			
		||||
                    # tearing down.
 | 
			
		||||
                    await maybe_wait_for_debugger(
 | 
			
		||||
                    await debug.maybe_wait_for_debugger(
 | 
			
		||||
                        child_in_debug=an._at_least_one_child_in_debug
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -544,7 +546,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
 | 
			
		||||
            # XXX: yet another guard before allowing the cancel
 | 
			
		||||
            # sequence in case a (single) child is in debug.
 | 
			
		||||
            await maybe_wait_for_debugger(
 | 
			
		||||
            await debug.maybe_wait_for_debugger(
 | 
			
		||||
                child_in_debug=an._at_least_one_child_in_debug
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -593,6 +595,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
    # final exit
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_shutdown_msg: str = (
 | 
			
		||||
    'Actor-runtime-shutdown'
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
# @api_frame
 | 
			
		||||
@acm
 | 
			
		||||
async def open_nursery(
 | 
			
		||||
| 
						 | 
				
			
			@ -681,17 +687,26 @@ async def open_nursery(
 | 
			
		|||
        ):
 | 
			
		||||
            __tracebackhide__: bool = False
 | 
			
		||||
 | 
			
		||||
        msg: str = (
 | 
			
		||||
            'Actor-nursery exited\n'
 | 
			
		||||
            f'|_{an}\n'
 | 
			
		||||
 | 
			
		||||
        op_nested_an_repr: str = _pformat.nest_from_op(
 | 
			
		||||
            input_op=')>',
 | 
			
		||||
            text=f'{an}',
 | 
			
		||||
            # nest_prefix='|_',
 | 
			
		||||
            nest_indent=1,  # under >
 | 
			
		||||
        )
 | 
			
		||||
        an_msg: str = (
 | 
			
		||||
            f'Actor-nursery exited\n'
 | 
			
		||||
            f'{op_nested_an_repr}\n'
 | 
			
		||||
        )
 | 
			
		||||
        # keep noise low during std operation.
 | 
			
		||||
        log.runtime(an_msg)
 | 
			
		||||
 | 
			
		||||
        if implicit_runtime:
 | 
			
		||||
            # shutdown runtime if it was started and report noisly
 | 
			
		||||
            # that we're did so.
 | 
			
		||||
            msg += '=> Shutting down actor runtime <=\n'
 | 
			
		||||
            msg: str = (
 | 
			
		||||
                '\n'
 | 
			
		||||
                '\n'
 | 
			
		||||
                f'{_shutdown_msg} )>\n'
 | 
			
		||||
            )
 | 
			
		||||
            log.info(msg)
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            # keep noise low during std operation.
 | 
			
		||||
            log.runtime(msg)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -31,7 +31,7 @@ import trio
 | 
			
		|||
 | 
			
		||||
def maybe_collapse_eg(
 | 
			
		||||
    beg: BaseExceptionGroup,
 | 
			
		||||
) -> BaseException:
 | 
			
		||||
) -> BaseException|bool:
 | 
			
		||||
    '''
 | 
			
		||||
    If the input beg can collapse to a single non-eg sub-exception,
 | 
			
		||||
    return it instead.
 | 
			
		||||
| 
						 | 
				
			
			@ -40,13 +40,12 @@ def maybe_collapse_eg(
 | 
			
		|||
    if len(excs := beg.exceptions) == 1:
 | 
			
		||||
        return excs[0]
 | 
			
		||||
 | 
			
		||||
    return beg
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def collapse_eg(
 | 
			
		||||
    hide_tb: bool = True,
 | 
			
		||||
    raise_from_src: bool = False,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    If `BaseExceptionGroup` raised in the body scope is
 | 
			
		||||
| 
						 | 
				
			
			@ -61,9 +60,11 @@ async def collapse_eg(
 | 
			
		|||
    except* BaseException as beg:
 | 
			
		||||
        if (
 | 
			
		||||
            exc := maybe_collapse_eg(beg)
 | 
			
		||||
        ) is not beg:
 | 
			
		||||
            from_exc = beg if raise_from_src else None
 | 
			
		||||
            raise exc from from_exc
 | 
			
		||||
        ):
 | 
			
		||||
            if cause := exc.__cause__:
 | 
			
		||||
                raise exc from cause
 | 
			
		||||
 | 
			
		||||
            raise exc
 | 
			
		||||
 | 
			
		||||
        raise beg
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue