diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index b0b145ee..eff9a731 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -1,8 +1,13 @@ """ Multiple python programs invoking the runtime. """ +from __future__ import annotations import platform +import subprocess import time +from typing import ( + TYPE_CHECKING, +) import pytest import trio @@ -10,14 +15,29 @@ import tractor from tractor._testing import ( tractor_test, ) +from tractor import ( + current_actor, + _state, + Actor, + Context, + Portal, +) from .conftest import ( sig_prog, _INT_SIGNAL, _INT_RETURN_CODE, ) +if TYPE_CHECKING: + from tractor.msg import Aid + from tractor._addr import ( + UnwrappedAddress, + ) -def test_abort_on_sigint(daemon): + +def test_abort_on_sigint( + daemon: subprocess.Popen, +): assert daemon.returncode is None time.sleep(0.1) sig_prog(daemon, _INT_SIGNAL) @@ -30,8 +50,11 @@ def test_abort_on_sigint(daemon): @tractor_test -async def test_cancel_remote_arbiter(daemon, reg_addr): - assert not tractor.current_actor().is_arbiter +async def test_cancel_remote_arbiter( + daemon: subprocess.Popen, + reg_addr: UnwrappedAddress, +): + assert not current_actor().is_arbiter async with tractor.get_registry(reg_addr) as portal: await portal.cancel_actor() @@ -45,24 +68,106 @@ async def test_cancel_remote_arbiter(daemon, reg_addr): pass -def test_register_duplicate_name(daemon, reg_addr): - +def test_register_duplicate_name( + daemon: subprocess.Popen, + reg_addr: UnwrappedAddress, +): async def main(): - async with tractor.open_nursery( registry_addrs=[reg_addr], - ) as n: + ) as an: - assert not tractor.current_actor().is_arbiter + assert not current_actor().is_arbiter - p1 = await n.start_actor('doggy') - p2 = await n.start_actor('doggy') + p1 = await an.start_actor('doggy') + p2 = await an.start_actor('doggy') async with tractor.wait_for_actor('doggy') as portal: assert portal.channel.uid in (p2.channel.uid, p1.channel.uid) - await n.cancel() + await an.cancel() - # run it manually since we want to start **after** - # the other "daemon" program + # XXX, run manually since we want to start this root **after** + # the other "daemon" program with it's own root. + trio.run(main) + + +@tractor.context +async def get_root_portal( + ctx: Context, +): + ''' + Connect back to the root actor manually (using `._discovery` API) + and ensure it's contact info is the same as our immediate parent. + + ''' + sub: Actor = current_actor() + rtvs: dict = _state._runtime_vars + raddrs: list[UnwrappedAddress] = rtvs['_root_addrs'] + + # await tractor.pause() + # XXX, in case the sub->root discovery breaks you might need + # this (i know i did Xp)!! + # from tractor.devx import mk_pdb + # mk_pdb().set_trace() + + assert ( + len(raddrs) == 1 + and + list(sub._parent_chan.raddr.unwrap()) in raddrs + ) + + # connect back to our immediate parent which should also + # be the actor-tree's root. + from tractor._discovery import get_root + ptl: Portal + async with get_root() as ptl: + root_aid: Aid = ptl.chan.aid + parent_ptl: Portal = current_actor().get_parent() + assert ( + root_aid.name == 'root' + and + parent_ptl.chan.aid == root_aid + ) + await ctx.started() + + +def test_non_registrar_spawns_child( + daemon: subprocess.Popen, + reg_addr: UnwrappedAddress, + loglevel: str, + debug_mode: bool, +): + ''' + Ensure a non-regristar (serving) root actor can spawn a sub and + that sub can connect back (manually) to it's rent that is the + root without issue. + + More or less this audits the global contact info in + `._state._runtime_vars`. + + ''' + async def main(): + async with tractor.open_nursery( + registry_addrs=[reg_addr], + loglevel=loglevel, + debug_mode=debug_mode, + ) as an: + + actor: Actor = tractor.current_actor() + assert not actor.is_registrar + sub_ptl: Portal = await an.start_actor( + name='sub', + enable_modules=[__name__], + ) + + async with sub_ptl.open_context( + get_root_portal, + ) as (ctx, first): + print('Waiting for `sub` to connect back to us..') + + await an.cancel() + + # XXX, run manually since we want to start this root **after** + # the other "daemon" program with it's own root. trio.run(main) diff --git a/tractor/_root.py b/tractor/_root.py index 2fb7755d..f480a619 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -88,7 +88,8 @@ async def maybe_block_bp( bp_blocked: bool if ( debug_mode - and maybe_enable_greenback + and + maybe_enable_greenback and ( maybe_mod := await debug.maybe_init_greenback( raise_not_found=False, @@ -385,10 +386,13 @@ async def open_root_actor( addr, ) - trans_bind_addrs: list[UnwrappedAddress] = [] + tpt_bind_addrs: list[ + Address # `Address.get_random()` case + |UnwrappedAddress # registrar case `= uw_reg_addrs` + ] = [] - # Create a new local root-actor instance which IS NOT THE - # REGISTRAR + # ------ NON-REGISTRAR ------ + # create a new root-actor instance. if ponged_addrs: if ensure_registry: raise RuntimeError( @@ -415,12 +419,21 @@ async def open_root_actor( # XXX INSTEAD, bind random addrs using the same tpt # proto. for addr in ponged_addrs: - trans_bind_addrs.append( + tpt_bind_addrs.append( + # XXX, these are `Address` NOT `UnwrappedAddress`. + # + # NOTE, in the case of posix/berkley socket + # protos we allocate port=0 such that the system + # allocates a random value at bind time; this + # happens in the `.ipc.*` stack's backend. addr.get_random( bindspace=addr.bindspace, ) ) + # ------ REGISTRAR ------ + # create a new "registry providing" root-actor instance. + # # Start this local actor as the "registrar", aka a regular # actor who manages the local registry of "mailboxes" of # other process-tree-local sub-actors. @@ -429,7 +442,7 @@ async def open_root_actor( # following init steps are taken: # - the tranport layer server is bound to each addr # pair defined in provided registry_addrs, or the default. - trans_bind_addrs = uw_reg_addrs + tpt_bind_addrs = uw_reg_addrs # - it is normally desirable for any registrar to stay up # indefinitely until either all registered (child/sub) @@ -449,20 +462,10 @@ async def open_root_actor( enable_modules=enable_modules, ) # XXX, in case the root actor runtime was actually run from - # `tractor.to_asyncio.run_as_asyncio_guest()` and NOt + # `tractor.to_asyncio.run_as_asyncio_guest()` and NOT # `.trio.run()`. actor._infected_aio = _state._runtime_vars['_is_infected_aio'] - # NOTE, only set the loopback addr for the - # process-tree-global "root" mailbox since all sub-actors - # should be able to speak to their root actor over that - # channel. - raddrs: list[Address] = _state._runtime_vars['_root_addrs'] - raddrs.extend(trans_bind_addrs) - # TODO, remove once we have also removed all usage; - # eventually all (root-)registry apis should expect > 1 addr. - _state._runtime_vars['_root_mailbox'] = raddrs[0] - # Start up main task set via core actor-runtime nurseries. try: # assign process-local actor @@ -499,14 +502,39 @@ async def open_root_actor( # "actor runtime" primitives are SC-compat and thus all # transitively spawned actors/processes must be as # well. - await root_tn.start( + accept_addrs: list[UnwrappedAddress] + reg_addrs: list[UnwrappedAddress] + ( + accept_addrs, + reg_addrs, + ) = await root_tn.start( partial( _runtime.async_main, actor, - accept_addrs=trans_bind_addrs, + accept_addrs=tpt_bind_addrs, parent_addr=None ) ) + # NOTE, only set a local-host addr (i.e. like + # `lo`-loopback for TCP) for the process-tree-global + # "root"-process (its tree-wide "mailbox") since all + # sub-actors should be able to speak to their root + # actor over that channel. + # + # ?TODO, per-OS non-network-proto alt options? + # -[ ] on linux we should be able to always use UDS? + # + raddrs: list[Address] = _state._runtime_vars['_root_addrs'] + raddrs.extend( + accept_addrs, + ) + # TODO, remove once we have also removed all usage; + # eventually all (root-)registry apis should expect > 1 addr. + _state._runtime_vars['_root_mailbox'] = raddrs[0] + # if 'chart' in actor.aid.name: + # from tractor.devx import mk_pdb + # mk_pdb().set_trace() + try: yield actor except ( @@ -588,6 +616,13 @@ async def open_root_actor( ): _state._runtime_vars['_debug_mode'] = False + # !XXX, clear ALL prior contact info state, this is MEGA + # important if you are opening the runtime multiple times + # from the same parent process (like in our test + # harness)! + _state._runtime_vars['_root_addrs'].clear() + _state._runtime_vars['_root_mailbox'] = None + _state._current_actor = None _state._last_actor_terminated = actor diff --git a/tractor/_runtime.py b/tractor/_runtime.py index f18e0d61..c6ff9e4c 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -147,6 +147,8 @@ def get_mod_nsps2fps(mod_ns_paths: list[str]) -> dict[str, str]: return nsp2fp +_bp = False + class Actor: ''' The fundamental "runtime" concurrency primitive. @@ -272,7 +274,9 @@ class Actor: stacklevel=2, ) - registry_addrs: list[Address] = [wrap_address(arbiter_addr)] + registry_addrs: list[Address] = [ + wrap_address(arbiter_addr) + ] # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -959,6 +963,21 @@ class Actor: rvs['_is_root'] = False # obvi XD + # TODO, remove! left in just while protoing init fix! + # global _bp + # if ( + # 'chart' in self.aid.name + # and + # isinstance( + # rvs['_root_addrs'][0], + # dict, + # ) + # and + # not _bp + # ): + # _bp = True + # breakpoint() + _state._runtime_vars.update(rvs) # `SpawnSpec.reg_addrs` @@ -1455,7 +1474,12 @@ async def async_main( # be False when running as root actor and True when as # a subactor. parent_addr: UnwrappedAddress|None = None, - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[ + tuple[ + list[UnwrappedAddress], # accept_addrs + list[UnwrappedAddress], # reg_addrs + ] + ] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -1634,6 +1658,7 @@ async def async_main( # if addresses point to the same actor.. # So we need a way to detect that? maybe iterate # only on unique actor uids? + addr: UnwrappedAddress for addr in actor.reg_addrs: try: waddr = wrap_address(addr) @@ -1642,7 +1667,9 @@ async def async_main( await debug.pause() # !TODO, get rid of the local-portal crap XD + reg_portal: Portal async with get_registry(addr) as reg_portal: + accept_addr: UnwrappedAddress for accept_addr in accept_addrs: accept_addr = wrap_address(accept_addr) @@ -1658,8 +1685,12 @@ async def async_main( is_registered: bool = True - # init steps complete - task_status.started() + # init steps complete, deliver IPC-server and + # registrar addrs back to caller. + task_status.started(( + accept_addrs, + actor.reg_addrs, + )) # Begin handling our new connection back to our # parent. This is done last since we don't want to