diff --git a/tests/test_discovery.py b/tests/test_discovery.py index af03ce6..383608e 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -20,8 +20,11 @@ async def test_reg_then_unreg(arb_addr): assert actor.is_arbiter assert len(actor._registry) == 1 # only self is registered - async with tractor.open_nursery() as n: - portal = await n.start_actor('actor', rpc_module_paths=[__name__]) + async with tractor.open_nursery( + arbiter_addr=arb_addr, + ) as n: + + portal = await n.start_actor('actor', enable_modules=[__name__]) uid = portal.channel.uid async with tractor.get_arbiter(*arb_addr) as aportal: @@ -66,7 +69,7 @@ async def say_hello_use_wait(other_actor): @tractor_test @pytest.mark.parametrize('func', [say_hello, say_hello_use_wait]) -async def test_trynamic_trio(func, start_method): +async def test_trynamic_trio(func, start_method, arb_addr): """Main tractor entry point, the "master" process (for now acts as the "director"). """ @@ -119,74 +122,78 @@ async def spawn_and_check_registry( remote_arbiter: bool = False, with_streaming: bool = False, ) -> None: - actor = tractor.current_actor() - if remote_arbiter: - assert not actor.is_arbiter + async with tractor.open_root_actor( + arbiter_addr=arb_addr, + ): + async with tractor.get_arbiter(*arb_addr) as portal: + # runtime needs to be up to call this + actor = tractor.current_actor() - async with tractor.get_arbiter(*arb_addr) as portal: + if remote_arbiter: + assert not actor.is_arbiter - if actor.is_arbiter: + if actor.is_arbiter: - async def get_reg(): - return actor._registry + async def get_reg(): + return actor._registry - extra = 1 # arbiter is local root actor - else: - get_reg = partial(portal.run_from_ns, 'self', 'get_registry') - extra = 2 # local root actor + remote arbiter + extra = 1 # arbiter is local root actor + else: + get_reg = partial(portal.run_from_ns, 'self', 'get_registry') + extra = 2 # local root actor + remote arbiter - # ensure current actor is registered - registry = await get_reg() - assert actor.uid in registry + # ensure current actor is registered + registry = await get_reg() + assert actor.uid in registry - try: - async with tractor.open_nursery() as n: - async with trio.open_nursery() as trion: + try: + async with tractor.open_nursery() as n: + async with trio.open_nursery() as trion: + + portals = {} + for i in range(3): + name = f'a{i}' + if with_streaming: + portals[name] = await n.start_actor( + name=name, enable_modules=[__name__]) + + else: # no streaming + portals[name] = await n.run_in_actor( + trio.sleep_forever, name=name) + + # wait on last actor to come up + async with tractor.wait_for_actor(name): + registry = await get_reg() + for uid in n._children: + assert uid in registry + + assert len(portals) + extra == len(registry) - portals = {} - for i in range(3): - name = f'a{i}' if with_streaming: - portals[name] = await n.start_actor( - name=name, enable_modules=[__name__]) + await trio.sleep(0.1) - else: # no streaming - portals[name] = await n.run_in_actor( - trio.sleep_forever, name=name) + pts = list(portals.values()) + for p in pts[:-1]: + trion.start_soon(stream_from, p) - # wait on last actor to come up - async with tractor.wait_for_actor(name): - registry = await get_reg() - for uid in n._children: - assert uid in registry + # stream for 1 sec + trion.start_soon(cancel, use_signal, 1) - assert len(portals) + extra == len(registry) + last_p = pts[-1] + await stream_from(last_p) - if with_streaming: - await trio.sleep(0.1) + else: + await cancel(use_signal) - pts = list(portals.values()) - for p in pts[:-1]: - trion.start_soon(stream_from, p) + finally: + with trio.CancelScope(shield=True): + await trio.sleep(0.5) - # stream for 1 sec - trion.start_soon(cancel, use_signal, 1) - - last_p = pts[-1] - await stream_from(last_p) - - else: - await cancel(use_signal) - - finally: - with trio.CancelScope(shield=True): - await trio.sleep(0.5) - - # all subactors should have de-registered - registry = await get_reg() - assert len(registry) == extra - assert actor.uid in registry + # all subactors should have de-registered + registry = await get_reg() + assert len(registry) == extra + assert actor.uid in registry @pytest.mark.parametrize('use_signal', [False, True]) @@ -201,7 +208,7 @@ def test_subactors_unregister_on_cancel( deregistering themselves with the arbiter. """ with pytest.raises(KeyboardInterrupt): - tractor.run( + trio.run( partial( spawn_and_check_registry, arb_addr, @@ -209,7 +216,6 @@ def test_subactors_unregister_on_cancel( remote_arbiter=False, with_streaming=with_streaming, ), - arbiter_addr=arb_addr ) @@ -227,7 +233,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( tree) arbiter. """ with pytest.raises(KeyboardInterrupt): - tractor.run( + trio.run( partial( spawn_and_check_registry, arb_addr, @@ -235,8 +241,6 @@ def test_subactors_unregister_on_cancel_remote_daemon( remote_arbiter=True, with_streaming=with_streaming, ), - # XXX: required to use remote daemon! - arbiter_addr=arb_addr ) @@ -258,49 +262,52 @@ async def close_chans_before_nursery( else: entries_at_end = 1 - async with tractor.get_arbiter(*arb_addr) as aportal: - try: - get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') + async with tractor.open_root_actor( + arbiter_addr=arb_addr, + ): + async with tractor.get_arbiter(*arb_addr) as aportal: + try: + get_reg = partial(aportal.run_from_ns, 'self', 'get_registry') - async with tractor.open_nursery() as tn: - portal1 = await tn.start_actor( - name='consumer1', enable_modules=[__name__]) - portal2 = await tn.start_actor( - 'consumer2', enable_modules=[__name__]) + async with tractor.open_nursery() as tn: + portal1 = await tn.start_actor( + name='consumer1', enable_modules=[__name__]) + portal2 = await tn.start_actor( + 'consumer2', enable_modules=[__name__]) - # TODO: compact this back as was in last commit once - # 3.9+, see https://github.com/goodboy/tractor/issues/207 - async with portal1.open_stream_from(stream_forever) as agen1: - async with portal2.open_stream_from( - stream_forever - ) as agen2: - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - # tractor.current_actor()._root_nursery.cancel_scope.cancel() + # TODO: compact this back as was in last commit once + # 3.9+, see https://github.com/goodboy/tractor/issues/207 + async with portal1.open_stream_from(stream_forever) as agen1: + async with portal2.open_stream_from( + stream_forever + ) as agen2: + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() - finally: - with trio.CancelScope(shield=True): - await trio.sleep(1) + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() + finally: + with trio.CancelScope(shield=True): + await trio.sleep(1) - # all subactors should have de-registered - registry = await get_reg() - assert portal1.channel.uid not in registry - assert portal2.channel.uid not in registry - assert len(registry) == entries_at_end + # all subactors should have de-registered + registry = await get_reg() + assert portal1.channel.uid not in registry + assert portal2.channel.uid not in registry + assert len(registry) == entries_at_end @pytest.mark.parametrize('use_signal', [False, True]) @@ -314,15 +321,13 @@ def test_close_channel_explicit( results in subactor(s) deregistering from the arbiter. """ with pytest.raises(KeyboardInterrupt): - tractor.run( + trio.run( partial( close_chans_before_nursery, arb_addr, use_signal, remote_arbiter=False, ), - # XXX: required to use remote daemon! - arbiter_addr=arb_addr ) @@ -338,13 +343,11 @@ def test_close_channel_explicit_remote_arbiter( results in subactor(s) deregistering from the arbiter. """ with pytest.raises(KeyboardInterrupt): - tractor.run( + trio.run( partial( close_chans_before_nursery, arb_addr, use_signal, remote_arbiter=True, ), - # XXX: required to use remote daemon! - arbiter_addr=arb_addr ) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 4674bd9..61fa0c5 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -113,6 +113,7 @@ class ActorNursery: name: Optional[str] = None, bind_addr: Tuple[str, int] = _default_bind_addr, rpc_module_paths: Optional[List[str]] = None, + enable_modules: List[str] = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` ) -> Portal: @@ -131,7 +132,7 @@ class ActorNursery: portal = await self.start_actor( name, - rpc_module_paths=[mod_path] + (rpc_module_paths or []), + enable_modules=[mod_path] + (enable_modules or rpc_module_paths or []), bind_addr=bind_addr, loglevel=loglevel, # use the run_in_actor nursery