diff --git a/examples/service_discovery.py b/examples/service_discovery.py index 1219f0c1..574ba019 100644 --- a/examples/service_discovery.py +++ b/examples/service_discovery.py @@ -10,7 +10,7 @@ async def main(service_name): await an.start_actor(service_name) async with tractor.get_registry() as portal: - print(f"Arbiter is listening on {portal.channel}") + print(f"Registrar is listening on {portal.channel}") async with tractor.wait_for_actor(service_name) as sockaddr: print(f"my_service is found at {sockaddr}") diff --git a/tests/test_discovery.py b/tests/test_discovery.py index fcf73156..0fbac8be 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -24,7 +24,7 @@ async def test_reg_then_unreg( reg_addr: tuple, ): actor = tractor.current_actor() - assert actor.is_arbiter + assert actor.is_registrar assert len(actor._registry) == 1 # only self is registered async with tractor.open_nursery( @@ -35,7 +35,7 @@ async def test_reg_then_unreg( uid = portal.channel.aid.uid async with tractor.get_registry(reg_addr) as aportal: - # this local actor should be the arbiter + # this local actor should be the registrar assert actor is aportal.actor async with tractor.wait_for_actor('actor'): @@ -154,7 +154,7 @@ async def unpack_reg( actor_or_portal: tractor.Portal|tractor.Actor, ): ''' - Get and unpack a "registry" RPC request from the "arbiter" registry + Get and unpack a "registry" RPC request from the registrar system. ''' @@ -197,15 +197,15 @@ async def spawn_and_check_registry( actor = tractor.current_actor() if remote_arbiter: - assert not actor.is_arbiter + assert not actor.is_registrar - if actor.is_arbiter: - extra = 1 # arbiter is local root actor + if actor.is_registrar: + extra = 1 # registrar is local root actor get_reg = partial(unpack_reg, actor) else: get_reg = partial(unpack_reg, portal) - extra = 2 # local root actor + remote arbiter + extra = 2 # local root actor + remote registrar # ensure current actor is registered registry: dict = await get_reg() @@ -285,7 +285,7 @@ def test_subactors_unregister_on_cancel( ): ''' Verify that cancelling a nursery results in all subactors - deregistering themselves with the arbiter. + deregistering themselves with the registrar. ''' with pytest.raises(KeyboardInterrupt): @@ -314,7 +314,7 @@ def test_subactors_unregister_on_cancel_remote_daemon( ''' Verify that cancelling a nursery results in all subactors deregistering themselves with a **remote** (not in the local - process tree) arbiter. + process tree) registrar. ''' with pytest.raises(KeyboardInterrupt): @@ -387,7 +387,7 @@ async def close_chans_before_nursery( await streamer(agen2) finally: # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during + # normal registrar channel ops to fail during # teardown. It doesn't seem like this is # reliably triggered by an external SIGINT. # tractor.current_actor()._root_nursery.cancel_scope.cancel() @@ -420,7 +420,7 @@ def test_close_channel_explicit( ''' Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also - results in subactor(s) deregistering from the arbiter. + results in subactor(s) deregistering from the registrar. ''' with pytest.raises(KeyboardInterrupt): @@ -444,7 +444,7 @@ def test_close_channel_explicit_remote_registrar( ''' Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also - results in subactor(s) deregistering from the arbiter. + results in subactor(s) deregistering from the registrar. ''' with pytest.raises(KeyboardInterrupt): diff --git a/tests/test_local.py b/tests/test_local.py index b535cb33..4e1e983b 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -1,5 +1,5 @@ """ -Arbiter and "local" actor api +Registrar and "local" actor api """ import time @@ -12,11 +12,11 @@ from tractor._testing import tractor_test @pytest.mark.trio async def test_no_runtime(): - """An arbitter must be established before any nurseries + """A registrar must be established before any nurseries can be created. - (In other words ``tractor.open_root_actor()`` must be engaged at - some point?) + (In other words ``tractor.open_root_actor()`` must be + engaged at some point?) """ with pytest.raises(RuntimeError) : async with tractor.find_actor('doggy'): @@ -25,9 +25,9 @@ async def test_no_runtime(): @tractor_test async def test_self_is_registered(reg_addr): - "Verify waiting on the arbiter to register itself using the standard api." + "Verify waiting on the registrar to register itself using the standard api." actor = tractor.current_actor() - assert actor.is_arbiter + assert actor.is_registrar with trio.fail_after(0.2): async with tractor.wait_for_actor('root') as portal: assert portal.channel.uid[0] == 'root' @@ -35,9 +35,9 @@ async def test_self_is_registered(reg_addr): @tractor_test async def test_self_is_registered_localportal(reg_addr): - "Verify waiting on the arbiter to register itself using a local portal." + "Verify waiting on the registrar to register itself using a local portal." actor = tractor.current_actor() - assert actor.is_arbiter + assert actor.is_registrar async with tractor.get_registry(reg_addr) as portal: assert isinstance(portal, tractor.runtime._portal.LocalPortal) @@ -57,8 +57,8 @@ def test_local_actor_async_func(reg_addr): async with tractor.open_root_actor( registry_addrs=[reg_addr], ): - # arbiter is started in-proc if dne - assert tractor.current_actor().is_arbiter + # registrar is started in-proc if dne + assert tractor.current_actor().is_registrar for i in range(10): nums.append(i) diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index f44d036a..100f01c6 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -53,19 +53,19 @@ def test_abort_on_sigint( @tractor_test -async def test_cancel_remote_arbiter( +async def test_cancel_remote_registrar( daemon: subprocess.Popen, reg_addr: UnwrappedAddress, ): - assert not current_actor().is_arbiter + assert not current_actor().is_registrar async with tractor.get_registry(reg_addr) as portal: await portal.cancel_actor() time.sleep(0.1) - # the arbiter channel server is cancelled but not its main task + # the registrar channel server is cancelled but not its main task assert daemon.returncode is None - # no arbiter socket should exist + # no registrar socket should exist with pytest.raises(OSError): async with tractor.get_registry(reg_addr) as portal: pass @@ -80,7 +80,7 @@ def test_register_duplicate_name( registry_addrs=[reg_addr], ) as an: - assert not current_actor().is_arbiter + assert not current_actor().is_registrar p1 = await an.start_actor('doggy') p2 = await an.start_actor('doggy') diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 9581708f..6e2b414c 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -110,7 +110,7 @@ def test_rpc_errors( ) as n: actor = tractor.current_actor() - assert actor.is_arbiter + assert actor.is_registrar await n.run_in_actor( sleep_back_actor, actor_name=subactor_requests_to, diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 30e084d5..283d1785 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -39,7 +39,7 @@ async def spawn( ): # now runtime exists actor: tractor.Actor = tractor.current_actor() - assert actor.is_arbiter == should_be_root + assert actor.is_registrar == should_be_root # spawns subproc here portal: tractor.Portal = await an.run_in_actor( @@ -68,7 +68,7 @@ async def spawn( assert result == 10 return result else: - assert actor.is_arbiter == should_be_root + assert actor.is_registrar == should_be_root return 10 @@ -181,7 +181,7 @@ def test_loglevel_propagated_to_subactor( async def main(): async with tractor.open_nursery( - name='arbiter', + name='registrar', start_method=start_method, arbiter_addr=reg_addr, diff --git a/tractor/__init__.py b/tractor/__init__.py index 0c7055b5..1aafe98e 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -69,4 +69,8 @@ from ._root import ( from .ipc import Channel as Channel from .runtime._portal import Portal as Portal from .runtime._runtime import Actor as Actor +from .discovery._registry import ( + Registrar as Registrar, + Arbiter as Arbiter, +) # from . import hilevel as hilevel diff --git a/tractor/_root.py b/tractor/_root.py index d9a74b45..39a7880c 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -38,6 +38,7 @@ import warnings import trio from .runtime import _runtime +from .discovery._registry import Registrar from .devx import ( debug, _frame_stack, @@ -267,7 +268,6 @@ async def open_root_actor( if start_method is not None: _spawn.try_set_start_method(start_method) - # TODO! remove this ASAP! if arbiter_addr is not None: warnings.warn( '`arbiter_addr` is now deprecated\n' @@ -400,7 +400,7 @@ async def open_root_actor( 'registry socket(s) already bound' ) - # we were able to connect to an arbiter + # we were able to connect to a registrar logger.info( f'Registry(s) seem(s) to exist @ {ponged_addrs}' ) @@ -453,8 +453,7 @@ async def open_root_actor( # https://github.com/goodboy/tractor/pull/348 # https://github.com/goodboy/tractor/issues/296 - # TODO: rename as `RootActor` or is that even necessary? - actor = _runtime.Arbiter( + actor = Registrar( name=name or 'registrar', uuid=mk_uuid(), registry_addrs=registry_addrs, diff --git a/tractor/_testing/pytest.py b/tractor/_testing/pytest.py index 55842bf9..ec65abc4 100644 --- a/tractor/_testing/pytest.py +++ b/tractor/_testing/pytest.py @@ -75,7 +75,7 @@ def tractor_test( If any of the following fixture are requested by the wrapped test fn (via normal func-args declaration), - - `reg_addr` (a socket addr tuple where arbiter is listening) + - `reg_addr` (a socket addr tuple where registrar is listening) - `loglevel` (logging level passed to tractor internals) - `start_method` (subprocess spawning backend) diff --git a/tractor/discovery/_discovery.py b/tractor/discovery/_discovery.py index 494802c5..c3f4a98f 100644 --- a/tractor/discovery/_discovery.py +++ b/tractor/discovery/_discovery.py @@ -72,8 +72,8 @@ async def get_registry( ''' actor: Actor = current_actor() if actor.is_registrar: - # we're already the arbiter - # (likely a re-entrant call from the arbiter actor) + # we're already the registrar + # (likely a re-entrant call from the registrar actor) yield LocalPortal( actor, Channel(transport=None) @@ -268,10 +268,10 @@ async def find_actor( None, ]: ''' - Ask the arbiter to find actor(s) by name. + Ask the registrar to find actor(s) by name. - Returns a connected portal to the last registered matching actor - known to the arbiter. + Returns a connected portal to the last registered + matching actor known to the registrar. ''' # optimization path, use any pre-existing peer channel diff --git a/tractor/discovery/_registry.py b/tractor/discovery/_registry.py new file mode 100644 index 00000000..cb8fc157 --- /dev/null +++ b/tractor/discovery/_registry.py @@ -0,0 +1,253 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later +# version. + +# This program is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +# PURPOSE. See the GNU Affero General Public License for more +# details. + +# You should have received a copy of the GNU Affero General +# Public License along with this program. If not, see +# . + +''' +Actor-registry for process-tree service discovery. + +The `Registrar` is a special `Actor` subtype that serves as +the process-tree's name-registry, tracking actor +name-to-address mappings so peers can discover each other. + +''' +from __future__ import annotations + +from bidict import bidict +import trio + +from ..runtime._runtime import Actor +from ._addr import ( + UnwrappedAddress, + Address, + wrap_address, +) +from ..devx import debug +from ..log import get_logger + + +log = get_logger('tractor') + + +class Registrar(Actor): + ''' + A special registrar `Actor` who can contact all other + actors within its immediate process tree and keeps + a registry of others meant to be discoverable in + a distributed application. + + Normally the registrar is also the "root actor" and + thus always has access to the top-most-level actor + (process) nursery. + + By default, the registrar is always initialized when + and if no other registrar socket addrs have been + specified to runtime init entry-points (such as + `open_root_actor()` or `open_nursery()`). Any time + a new main process is launched (and thus a new root + actor created) and, no existing registrar can be + contacted at the provided `registry_addr`, then + a new one is always created; however, if one can be + reached it is used. + + Normally a distributed app requires at least one + registrar per logical host where for that given + "host space" (aka localhost IPC domain of addresses) + it is responsible for making all other host (local + address) bound actors *discoverable* to external + actor trees running on remote hosts. + + ''' + is_registrar = True + + def is_registry(self) -> bool: + return self.is_registrar + + def __init__( + self, + *args, + **kwargs, + ) -> None: + + self._registry: bidict[ + tuple[str, str], + UnwrappedAddress, + ] = bidict({}) + self._waiters: dict[ + str, + # either an event to sync to receiving an + # actor uid (which is filled in once the actor + # has sucessfully registered), or that uid + # after registry is complete. + list[trio.Event|tuple[str, str]] + ] = {} + + super().__init__(*args, **kwargs) + + async def find_actor( + self, + name: str, + + ) -> UnwrappedAddress|None: + + for uid, addr in self._registry.items(): + if name in uid: + return addr + + return None + + async def get_registry( + self + + ) -> dict[str, UnwrappedAddress]: + ''' + Return current name registry. + + This method is async to allow for cross-actor + invocation. + + ''' + # NOTE: requires ``strict_map_key=False`` to the + # msgpack unpacker since we have tuples as keys + # (note this makes the registrar suscetible to + # hashdos): + # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 + return { + '.'.join(key): val + for key, val in self._registry.items() + } + + async def wait_for_actor( + self, + name: str, + + ) -> list[UnwrappedAddress]: + ''' + Wait for a particular actor to register. + + This is a blocking call if no actor by the + provided name is currently registered. + + ''' + addrs: list[UnwrappedAddress] = [] + addr: UnwrappedAddress + + mailbox_info: str = ( + 'Actor registry contact infos:\n' + ) + for uid, addr in self._registry.items(): + mailbox_info += ( + f'|_uid: {uid}\n' + f'|_addr: {addr}\n\n' + ) + if name == uid[0]: + addrs.append(addr) + + if not addrs: + waiter = trio.Event() + self._waiters.setdefault( + name, [] + ).append(waiter) + await waiter.wait() + + for uid in self._waiters[name]: + if not isinstance(uid, trio.Event): + addrs.append( + self._registry[uid] + ) + + log.runtime(mailbox_info) + return addrs + + async def register_actor( + self, + uid: tuple[str, str], + addr: UnwrappedAddress + ) -> None: + uid = name, hash = ( + str(uid[0]), + str(uid[1]), + ) + waddr: Address = wrap_address(addr) + if not waddr.is_valid: + # should never be 0-dynamic-os-alloc + await debug.pause() + + # XXX NOTE, value must also be hashable AND since + # `._registry` is a `bidict` values must be unique; + # use `.forceput()` to replace any prior (stale) + # entries that might map a different uid to the same + # addr (e.g. after an unclean shutdown or + # actor-restart reusing the same address). + self._registry.forceput(uid, tuple(addr)) + + # pop and signal all waiter events + events = self._waiters.pop(name, []) + self._waiters.setdefault( + name, [] + ).append(uid) + for event in events: + if isinstance(event, trio.Event): + event.set() + + async def unregister_actor( + self, + uid: tuple[str, str] + + ) -> None: + uid = (str(uid[0]), str(uid[1])) + entry: tuple = self._registry.pop( + uid, None + ) + if entry is None: + log.warning( + f'Request to de-register' + f' {uid!r} failed?' + ) + + async def delete_addr( + self, + addr: tuple[str, int|str]|list[str|int], + ) -> tuple[str, str]|None: + # NOTE: `addr` arrives as a `list` over IPC + # (msgpack deserializes tuples -> lists) so + # coerce to `tuple` for the bidict hash lookup. + uid: tuple[str, str]|None = ( + self._registry.inverse.pop( + tuple(addr), + None, + ) + ) + if uid: + report: str = ( + 'Deleting registry-entry for,\n' + ) + else: + report: str = ( + 'No registry entry for,\n' + ) + + log.warning( + report + + + f'{addr!r}@{uid!r}' + ) + return uid + + +# Backward compat alias +Arbiter = Registrar diff --git a/tractor/ipc/_server.py b/tractor/ipc/_server.py index 78cd469f..6cfbf474 100644 --- a/tractor/ipc/_server.py +++ b/tractor/ipc/_server.py @@ -355,7 +355,7 @@ async def handle_stream_from_peer( # and `MsgpackStream._inter_packets()` on a read from the # stream particularly when the runtime is first starting up # inside `open_root_actor()` where there is a check for - # a bound listener on the "arbiter" addr. the reset will be + # a bound listener on the registrar addr. the reset will be # because the handshake was never meant took place. log.runtime( con_status diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 0c141d26..a3f87293 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -225,7 +225,7 @@ class MsgpackTransport(MsgTransport): # not sure entirely why we need this but without it we # seem to be getting racy failures here on - # arbiter/registry name subs.. + # registrar name subs.. trio.BrokenResourceError, ) as trans_err: diff --git a/tractor/runtime/_runtime.py b/tractor/runtime/_runtime.py index 8d03600d..f6050840 100644 --- a/tractor/runtime/_runtime.py +++ b/tractor/runtime/_runtime.py @@ -68,7 +68,6 @@ import textwrap from types import ModuleType import warnings -from bidict import bidict import trio from trio._core import _run as trio_runtime from trio import ( @@ -176,13 +175,21 @@ class Actor: dialog. ''' - # ugh, we need to get rid of this and replace with a "registry" sys - # https://github.com/goodboy/tractor/issues/216 - is_arbiter: bool = False + is_registrar: bool = False @property - def is_registrar(self) -> bool: - return self.is_arbiter + def is_arbiter(self) -> bool: + ''' + Deprecated, use `.is_registrar`. + + ''' + warnings.warn( + '`Actor.is_arbiter` is deprecated.\n' + 'Use `.is_registrar` instead.', + DeprecationWarning, + stacklevel=2, + ) + return self.is_registrar @property def is_root(self) -> bool: @@ -238,7 +245,6 @@ class Actor: registry_addrs: list[Address]|None = None, spawn_method: str|None = None, - # TODO: remove! arbiter_addr: UnwrappedAddress|None = None, ) -> None: @@ -288,8 +294,8 @@ class Actor: ] # marked by the process spawning backend at startup - # will be None for the parent most process started manually - # by the user (currently called the "arbiter") + # will be None for the parent most process started + # manually by the user (the "registrar") self._spawn_method: str = spawn_method # RPC state @@ -1657,7 +1663,7 @@ async def async_main( # TODO, just read direct from ipc_server? accept_addrs: list[UnwrappedAddress] = actor.accept_addrs - # Register with the arbiter if we're told its addr + # Register with the registrar if we're told its addr log.runtime( f'Registering `{actor.name}` => {pformat(accept_addrs)}\n' # ^-TODO-^ we should instead show the maddr here^^ @@ -1881,184 +1887,8 @@ async def async_main( log.runtime(teardown_report) -# TODO: rename to `Registry` and move to `.discovery._registry`! -class Arbiter(Actor): - ''' - A special registrar (and for now..) `Actor` who can contact all - other actors within its immediate process tree and possibly keeps - a registry of others meant to be discoverable in a distributed - application. Normally the registrar is also the "root actor" and - thus always has access to the top-most-level actor (process) - nursery. - - By default, the registrar is always initialized when and if no - other registrar socket addrs have been specified to runtime - init entry-points (such as `open_root_actor()` or - `open_nursery()`). Any time a new main process is launched (and - thus thus a new root actor created) and, no existing registrar - can be contacted at the provided `registry_addr`, then a new - one is always created; however, if one can be reached it is - used. - - Normally a distributed app requires at least registrar per - logical host where for that given "host space" (aka localhost - IPC domain of addresses) it is responsible for making all other - host (local address) bound actors *discoverable* to external - actor trees running on remote hosts. - - ''' - is_arbiter = True - - # TODO, implement this as a read on there existing a `._state` of - # some sort setup by whenever we impl this all as - # a `.discovery._registry.open_registry()` API - def is_registry(self) -> bool: - return self.is_arbiter - - def __init__( - self, - *args, - **kwargs, - ) -> None: - - self._registry: bidict[ - tuple[str, str], - UnwrappedAddress, - ] = bidict({}) - self._waiters: dict[ - str, - # either an event to sync to receiving an actor uid (which - # is filled in once the actor has sucessfully registered), - # or that uid after registry is complete. - list[trio.Event | tuple[str, str]] - ] = {} - - super().__init__(*args, **kwargs) - - async def find_actor( - self, - name: str, - - ) -> UnwrappedAddress|None: - - for uid, addr in self._registry.items(): - if name in uid: - return addr - - return None - - async def get_registry( - self - - ) -> dict[str, UnwrappedAddress]: - ''' - Return current name registry. - - This method is async to allow for cross-actor invocation. - - ''' - # NOTE: requires ``strict_map_key=False`` to the msgpack - # unpacker since we have tuples as keys (not this makes the - # arbiter suscetible to hashdos): - # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 - return { - '.'.join(key): val - for key, val in self._registry.items() - } - - async def wait_for_actor( - self, - name: str, - - ) -> list[UnwrappedAddress]: - ''' - Wait for a particular actor to register. - - This is a blocking call if no actor by the provided name is currently - registered. - - ''' - addrs: list[UnwrappedAddress] = [] - addr: UnwrappedAddress - - mailbox_info: str = 'Actor registry contact infos:\n' - for uid, addr in self._registry.items(): - mailbox_info += ( - f'|_uid: {uid}\n' - f'|_addr: {addr}\n\n' - ) - if name == uid[0]: - addrs.append(addr) - - if not addrs: - waiter = trio.Event() - self._waiters.setdefault(name, []).append(waiter) - await waiter.wait() - - for uid in self._waiters[name]: - if not isinstance(uid, trio.Event): - addrs.append(self._registry[uid]) - - log.runtime(mailbox_info) - return addrs - - async def register_actor( - self, - uid: tuple[str, str], - addr: UnwrappedAddress - ) -> None: - uid = name, hash = (str(uid[0]), str(uid[1])) - waddr: Address = wrap_address(addr) - if not waddr.is_valid: - # should never be 0-dynamic-os-alloc - await debug.pause() - - # XXX NOTE, value must also be hashable AND since - # `._registry` is a `bidict` values must be unique; use - # `.forceput()` to replace any prior (stale) entries - # that might map a different uid to the same addr (e.g. - # after an unclean shutdown or actor-restart reusing - # the same address). - self._registry.forceput(uid, tuple(addr)) - - # pop and signal all waiter events - events = self._waiters.pop(name, []) - self._waiters.setdefault(name, []).append(uid) - for event in events: - if isinstance(event, trio.Event): - event.set() - - async def unregister_actor( - self, - uid: tuple[str, str] - - ) -> None: - uid = (str(uid[0]), str(uid[1])) - entry: tuple = self._registry.pop(uid, None) - if entry is None: - log.warning( - f'Request to de-register {uid!r} failed?' - ) - - async def delete_addr( - self, - addr: tuple[str, int|str]|list[str|int], - ) -> tuple[str, str]|None: - # NOTE: `addr` arrives as a `list` over IPC - # (msgpack deserializes tuples -> lists) so - # coerce to `tuple` for the bidict hash lookup. - uid: tuple[str, str]|None = self._registry.inverse.pop( - tuple(addr), - None, - ) - if uid: - report: str = 'Deleting registry-entry for,\n' - else: - report: str = 'No registry entry for,\n' - - log.warning( - report - + - f'{addr!r}@{uid!r}' - ) - return uid +# Backward compat: class moved to discovery._registry +from ..discovery._registry import ( + Registrar, + Registrar as Arbiter, +)