forked from goodboy/tractor
				
			Drop run and rpc_module_paths from discovery tests
							parent
							
								
									2efd8ed167
								
							
						
					
					
						commit
						1584c547cd
					
				| 
						 | 
				
			
			@ -20,8 +20,11 @@ async def test_reg_then_unreg(arb_addr):
 | 
			
		|||
    assert actor.is_arbiter
 | 
			
		||||
    assert len(actor._registry) == 1  # only self is registered
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_nursery() as n:
 | 
			
		||||
        portal = await n.start_actor('actor', rpc_module_paths=[__name__])
 | 
			
		||||
    async with tractor.open_nursery(
 | 
			
		||||
        arbiter_addr=arb_addr,
 | 
			
		||||
    ) as n:
 | 
			
		||||
 | 
			
		||||
        portal = await n.start_actor('actor', enable_modules=[__name__])
 | 
			
		||||
        uid = portal.channel.uid
 | 
			
		||||
 | 
			
		||||
        async with tractor.get_arbiter(*arb_addr) as aportal:
 | 
			
		||||
| 
						 | 
				
			
			@ -66,7 +69,7 @@ async def say_hello_use_wait(other_actor):
 | 
			
		|||
 | 
			
		||||
@tractor_test
 | 
			
		||||
@pytest.mark.parametrize('func', [say_hello, say_hello_use_wait])
 | 
			
		||||
async def test_trynamic_trio(func, start_method):
 | 
			
		||||
async def test_trynamic_trio(func, start_method, arb_addr):
 | 
			
		||||
    """Main tractor entry point, the "master" process (for now
 | 
			
		||||
    acts as the "director").
 | 
			
		||||
    """
 | 
			
		||||
| 
						 | 
				
			
			@ -119,74 +122,78 @@ async def spawn_and_check_registry(
 | 
			
		|||
    remote_arbiter: bool = False,
 | 
			
		||||
    with_streaming: bool = False,
 | 
			
		||||
) -> None:
 | 
			
		||||
    actor = tractor.current_actor()
 | 
			
		||||
 | 
			
		||||
    if remote_arbiter:
 | 
			
		||||
        assert not actor.is_arbiter
 | 
			
		||||
    async with tractor.open_root_actor(
 | 
			
		||||
        arbiter_addr=arb_addr,
 | 
			
		||||
    ):
 | 
			
		||||
        async with tractor.get_arbiter(*arb_addr) as portal:
 | 
			
		||||
            # runtime needs to be up to call this
 | 
			
		||||
            actor = tractor.current_actor()
 | 
			
		||||
 | 
			
		||||
    async with tractor.get_arbiter(*arb_addr) as portal:
 | 
			
		||||
            if remote_arbiter:
 | 
			
		||||
                assert not actor.is_arbiter
 | 
			
		||||
 | 
			
		||||
        if actor.is_arbiter:
 | 
			
		||||
            if actor.is_arbiter:
 | 
			
		||||
 | 
			
		||||
            async def get_reg():
 | 
			
		||||
                return actor._registry
 | 
			
		||||
                async def get_reg():
 | 
			
		||||
                    return actor._registry
 | 
			
		||||
 | 
			
		||||
            extra = 1  # arbiter is local root actor
 | 
			
		||||
        else:
 | 
			
		||||
            get_reg = partial(portal.run_from_ns, 'self', 'get_registry')
 | 
			
		||||
            extra = 2  # local root actor + remote arbiter
 | 
			
		||||
                extra = 1  # arbiter is local root actor
 | 
			
		||||
            else:
 | 
			
		||||
                get_reg = partial(portal.run_from_ns, 'self', 'get_registry')
 | 
			
		||||
                extra = 2  # local root actor + remote arbiter
 | 
			
		||||
 | 
			
		||||
        # ensure current actor is registered
 | 
			
		||||
        registry = await get_reg()
 | 
			
		||||
        assert actor.uid in registry
 | 
			
		||||
            # ensure current actor is registered
 | 
			
		||||
            registry = await get_reg()
 | 
			
		||||
            assert actor.uid in registry
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            async with tractor.open_nursery() as n:
 | 
			
		||||
                async with trio.open_nursery() as trion:
 | 
			
		||||
            try:
 | 
			
		||||
                async with tractor.open_nursery() as n:
 | 
			
		||||
                    async with trio.open_nursery() as trion:
 | 
			
		||||
 | 
			
		||||
                        portals = {}
 | 
			
		||||
                        for i in range(3):
 | 
			
		||||
                            name = f'a{i}'
 | 
			
		||||
                            if with_streaming:
 | 
			
		||||
                                portals[name] = await n.start_actor(
 | 
			
		||||
                                    name=name, enable_modules=[__name__])
 | 
			
		||||
 | 
			
		||||
                            else:  # no streaming
 | 
			
		||||
                                portals[name] = await n.run_in_actor(
 | 
			
		||||
                                    trio.sleep_forever, name=name)
 | 
			
		||||
 | 
			
		||||
                        # wait on last actor to come up
 | 
			
		||||
                        async with tractor.wait_for_actor(name):
 | 
			
		||||
                            registry = await get_reg()
 | 
			
		||||
                            for uid in n._children:
 | 
			
		||||
                                assert uid in registry
 | 
			
		||||
 | 
			
		||||
                        assert len(portals) + extra == len(registry)
 | 
			
		||||
 | 
			
		||||
                    portals = {}
 | 
			
		||||
                    for i in range(3):
 | 
			
		||||
                        name = f'a{i}'
 | 
			
		||||
                        if with_streaming:
 | 
			
		||||
                            portals[name] = await n.start_actor(
 | 
			
		||||
                                name=name, enable_modules=[__name__])
 | 
			
		||||
                            await trio.sleep(0.1)
 | 
			
		||||
 | 
			
		||||
                        else:  # no streaming
 | 
			
		||||
                            portals[name] = await n.run_in_actor(
 | 
			
		||||
                                trio.sleep_forever, name=name)
 | 
			
		||||
                            pts = list(portals.values())
 | 
			
		||||
                            for p in pts[:-1]:
 | 
			
		||||
                                trion.start_soon(stream_from, p)
 | 
			
		||||
 | 
			
		||||
                    # wait on last actor to come up
 | 
			
		||||
                    async with tractor.wait_for_actor(name):
 | 
			
		||||
                        registry = await get_reg()
 | 
			
		||||
                        for uid in n._children:
 | 
			
		||||
                            assert uid in registry
 | 
			
		||||
                            # stream for 1 sec
 | 
			
		||||
                            trion.start_soon(cancel, use_signal, 1)
 | 
			
		||||
 | 
			
		||||
                    assert len(portals) + extra == len(registry)
 | 
			
		||||
                            last_p = pts[-1]
 | 
			
		||||
                            await stream_from(last_p)
 | 
			
		||||
 | 
			
		||||
                    if with_streaming:
 | 
			
		||||
                        await trio.sleep(0.1)
 | 
			
		||||
                        else:
 | 
			
		||||
                            await cancel(use_signal)
 | 
			
		||||
 | 
			
		||||
                        pts = list(portals.values())
 | 
			
		||||
                        for p in pts[:-1]:
 | 
			
		||||
                            trion.start_soon(stream_from, p)
 | 
			
		||||
            finally:
 | 
			
		||||
                with trio.CancelScope(shield=True):
 | 
			
		||||
                    await trio.sleep(0.5)
 | 
			
		||||
 | 
			
		||||
                        # stream for 1 sec
 | 
			
		||||
                        trion.start_soon(cancel, use_signal, 1)
 | 
			
		||||
 | 
			
		||||
                        last_p = pts[-1]
 | 
			
		||||
                        await stream_from(last_p)
 | 
			
		||||
 | 
			
		||||
                    else:
 | 
			
		||||
                        await cancel(use_signal)
 | 
			
		||||
 | 
			
		||||
        finally:
 | 
			
		||||
            with trio.CancelScope(shield=True):
 | 
			
		||||
                await trio.sleep(0.5)
 | 
			
		||||
 | 
			
		||||
                # all subactors should have de-registered
 | 
			
		||||
                registry = await get_reg()
 | 
			
		||||
                assert len(registry) == extra
 | 
			
		||||
                assert actor.uid in registry
 | 
			
		||||
                    # all subactors should have de-registered
 | 
			
		||||
                    registry = await get_reg()
 | 
			
		||||
                    assert len(registry) == extra
 | 
			
		||||
                    assert actor.uid in registry
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize('use_signal', [False, True])
 | 
			
		||||
| 
						 | 
				
			
			@ -201,7 +208,7 @@ def test_subactors_unregister_on_cancel(
 | 
			
		|||
    deregistering themselves with the arbiter.
 | 
			
		||||
    """
 | 
			
		||||
    with pytest.raises(KeyboardInterrupt):
 | 
			
		||||
        tractor.run(
 | 
			
		||||
        trio.run(
 | 
			
		||||
            partial(
 | 
			
		||||
                spawn_and_check_registry,
 | 
			
		||||
                arb_addr,
 | 
			
		||||
| 
						 | 
				
			
			@ -209,7 +216,6 @@ def test_subactors_unregister_on_cancel(
 | 
			
		|||
                remote_arbiter=False,
 | 
			
		||||
                with_streaming=with_streaming,
 | 
			
		||||
            ),
 | 
			
		||||
            arbiter_addr=arb_addr
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -227,7 +233,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
 | 
			
		|||
    tree) arbiter.
 | 
			
		||||
    """
 | 
			
		||||
    with pytest.raises(KeyboardInterrupt):
 | 
			
		||||
        tractor.run(
 | 
			
		||||
        trio.run(
 | 
			
		||||
            partial(
 | 
			
		||||
                spawn_and_check_registry,
 | 
			
		||||
                arb_addr,
 | 
			
		||||
| 
						 | 
				
			
			@ -235,8 +241,6 @@ def test_subactors_unregister_on_cancel_remote_daemon(
 | 
			
		|||
                remote_arbiter=True,
 | 
			
		||||
                with_streaming=with_streaming,
 | 
			
		||||
            ),
 | 
			
		||||
            # XXX: required to use remote daemon!
 | 
			
		||||
            arbiter_addr=arb_addr
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -258,49 +262,52 @@ async def close_chans_before_nursery(
 | 
			
		|||
    else:
 | 
			
		||||
        entries_at_end = 1
 | 
			
		||||
 | 
			
		||||
    async with tractor.get_arbiter(*arb_addr) as aportal:
 | 
			
		||||
        try:
 | 
			
		||||
            get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')
 | 
			
		||||
    async with tractor.open_root_actor(
 | 
			
		||||
        arbiter_addr=arb_addr,
 | 
			
		||||
    ):
 | 
			
		||||
        async with tractor.get_arbiter(*arb_addr) as aportal:
 | 
			
		||||
            try:
 | 
			
		||||
                get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')
 | 
			
		||||
 | 
			
		||||
            async with tractor.open_nursery() as tn:
 | 
			
		||||
                portal1 = await tn.start_actor(
 | 
			
		||||
                    name='consumer1', enable_modules=[__name__])
 | 
			
		||||
                portal2 = await tn.start_actor(
 | 
			
		||||
                    'consumer2', enable_modules=[__name__])
 | 
			
		||||
                async with tractor.open_nursery() as tn:
 | 
			
		||||
                    portal1 = await tn.start_actor(
 | 
			
		||||
                        name='consumer1', enable_modules=[__name__])
 | 
			
		||||
                    portal2 = await tn.start_actor(
 | 
			
		||||
                        'consumer2', enable_modules=[__name__])
 | 
			
		||||
 | 
			
		||||
                # TODO: compact this back as was in last commit once
 | 
			
		||||
                # 3.9+, see https://github.com/goodboy/tractor/issues/207
 | 
			
		||||
                async with portal1.open_stream_from(stream_forever) as agen1:
 | 
			
		||||
                    async with portal2.open_stream_from(
 | 
			
		||||
                        stream_forever
 | 
			
		||||
                    ) as agen2:
 | 
			
		||||
                        async with trio.open_nursery() as n:
 | 
			
		||||
                            n.start_soon(streamer, agen1)
 | 
			
		||||
                            n.start_soon(cancel, use_signal, .5)
 | 
			
		||||
                            try:
 | 
			
		||||
                                await streamer(agen2)
 | 
			
		||||
                            finally:
 | 
			
		||||
                                # Kill the root nursery thus resulting in
 | 
			
		||||
                                # normal arbiter channel ops to fail during
 | 
			
		||||
                                # teardown. It doesn't seem like this is
 | 
			
		||||
                                # reliably triggered by an external SIGINT.
 | 
			
		||||
                                # tractor.current_actor()._root_nursery.cancel_scope.cancel()
 | 
			
		||||
                    # TODO: compact this back as was in last commit once
 | 
			
		||||
                    # 3.9+, see https://github.com/goodboy/tractor/issues/207
 | 
			
		||||
                    async with portal1.open_stream_from(stream_forever) as agen1:
 | 
			
		||||
                        async with portal2.open_stream_from(
 | 
			
		||||
                            stream_forever
 | 
			
		||||
                        ) as agen2:
 | 
			
		||||
                            async with trio.open_nursery() as n:
 | 
			
		||||
                                n.start_soon(streamer, agen1)
 | 
			
		||||
                                n.start_soon(cancel, use_signal, .5)
 | 
			
		||||
                                try:
 | 
			
		||||
                                    await streamer(agen2)
 | 
			
		||||
                                finally:
 | 
			
		||||
                                    # Kill the root nursery thus resulting in
 | 
			
		||||
                                    # normal arbiter channel ops to fail during
 | 
			
		||||
                                    # teardown. It doesn't seem like this is
 | 
			
		||||
                                    # reliably triggered by an external SIGINT.
 | 
			
		||||
                                    # tractor.current_actor()._root_nursery.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
                                # XXX: THIS IS THE KEY THING that happens
 | 
			
		||||
                                # **before** exiting the actor nursery block
 | 
			
		||||
                                    # XXX: THIS IS THE KEY THING that happens
 | 
			
		||||
                                    # **before** exiting the actor nursery block
 | 
			
		||||
 | 
			
		||||
                                # also kill off channels cuz why not
 | 
			
		||||
                                await agen1.aclose()
 | 
			
		||||
                                await agen2.aclose()
 | 
			
		||||
        finally:
 | 
			
		||||
            with trio.CancelScope(shield=True):
 | 
			
		||||
                await trio.sleep(1)
 | 
			
		||||
                                    # also kill off channels cuz why not
 | 
			
		||||
                                    await agen1.aclose()
 | 
			
		||||
                                    await agen2.aclose()
 | 
			
		||||
            finally:
 | 
			
		||||
                with trio.CancelScope(shield=True):
 | 
			
		||||
                    await trio.sleep(1)
 | 
			
		||||
 | 
			
		||||
                # all subactors should have de-registered
 | 
			
		||||
                registry = await get_reg()
 | 
			
		||||
                assert portal1.channel.uid not in registry
 | 
			
		||||
                assert portal2.channel.uid not in registry
 | 
			
		||||
                assert len(registry) == entries_at_end
 | 
			
		||||
                    # all subactors should have de-registered
 | 
			
		||||
                    registry = await get_reg()
 | 
			
		||||
                    assert portal1.channel.uid not in registry
 | 
			
		||||
                    assert portal2.channel.uid not in registry
 | 
			
		||||
                    assert len(registry) == entries_at_end
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize('use_signal', [False, True])
 | 
			
		||||
| 
						 | 
				
			
			@ -314,15 +321,13 @@ def test_close_channel_explicit(
 | 
			
		|||
    results in subactor(s) deregistering from the arbiter.
 | 
			
		||||
    """
 | 
			
		||||
    with pytest.raises(KeyboardInterrupt):
 | 
			
		||||
        tractor.run(
 | 
			
		||||
        trio.run(
 | 
			
		||||
            partial(
 | 
			
		||||
                close_chans_before_nursery,
 | 
			
		||||
                arb_addr,
 | 
			
		||||
                use_signal,
 | 
			
		||||
                remote_arbiter=False,
 | 
			
		||||
            ),
 | 
			
		||||
            # XXX: required to use remote daemon!
 | 
			
		||||
            arbiter_addr=arb_addr
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -338,13 +343,11 @@ def test_close_channel_explicit_remote_arbiter(
 | 
			
		|||
    results in subactor(s) deregistering from the arbiter.
 | 
			
		||||
    """
 | 
			
		||||
    with pytest.raises(KeyboardInterrupt):
 | 
			
		||||
        tractor.run(
 | 
			
		||||
        trio.run(
 | 
			
		||||
            partial(
 | 
			
		||||
                close_chans_before_nursery,
 | 
			
		||||
                arb_addr,
 | 
			
		||||
                use_signal,
 | 
			
		||||
                remote_arbiter=True,
 | 
			
		||||
            ),
 | 
			
		||||
            # XXX: required to use remote daemon!
 | 
			
		||||
            arbiter_addr=arb_addr
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -113,6 +113,7 @@ class ActorNursery:
 | 
			
		|||
        name: Optional[str] = None,
 | 
			
		||||
        bind_addr: Tuple[str, int] = _default_bind_addr,
 | 
			
		||||
        rpc_module_paths: Optional[List[str]] = None,
 | 
			
		||||
        enable_modules: List[str] = None,
 | 
			
		||||
        loglevel: str = None,  # set log level per subactor
 | 
			
		||||
        **kwargs,  # explicit args to ``fn``
 | 
			
		||||
    ) -> Portal:
 | 
			
		||||
| 
						 | 
				
			
			@ -131,7 +132,7 @@ class ActorNursery:
 | 
			
		|||
 | 
			
		||||
        portal = await self.start_actor(
 | 
			
		||||
            name,
 | 
			
		||||
            rpc_module_paths=[mod_path] + (rpc_module_paths or []),
 | 
			
		||||
            enable_modules=[mod_path] + (enable_modules or rpc_module_paths or []),
 | 
			
		||||
            bind_addr=bind_addr,
 | 
			
		||||
            loglevel=loglevel,
 | 
			
		||||
            # use the run_in_actor nursery
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue