Rename `Arbiter` -> `Registrar`, mv to `discovery._registry`

Move the `Arbiter` class out of `runtime._runtime` into its
logical home at `discovery._registry` as `Registrar(Actor)`.
This completes the long-standing terminology migration from
"arbiter" to "registrar/registry" throughout the codebase.

Deats,
- add new `discovery/_registry.py` mod with `Registrar`
  class + backward-compat `Arbiter = Registrar` alias.
- rename `Actor.is_arbiter` attr -> `.is_registrar`;
  old attr now a `@property` with `DeprecationWarning`.
- `_root.py` imports `Registrar` directly for
  root-actor instantiation.
- export `Registrar` + `Arbiter` from `tractor.__init__`.
- `_runtime.py` re-imports from `discovery._registry`
  for backward compat.

Also,
- update all test files to use `.is_registrar`
  (`test_local`, `test_rpc`, `test_spawning`,
  `test_discovery`, `test_multi_program`).
- update "arbiter" -> "registrar" in comments/docstrings
  across `_discovery.py`, `_server.py`, `_transport.py`,
  `_testing/pytest.py`, and examples.
- drop resolved TODOs from `_runtime.py` and `_root.py`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
multicast_revertable_streams
Gud Boi 2026-03-23 18:56:21 -04:00
parent f3441a6790
commit 9ec2749ab7
14 changed files with 321 additions and 235 deletions

View File

@ -10,7 +10,7 @@ async def main(service_name):
await an.start_actor(service_name)
async with tractor.get_registry() as portal:
print(f"Arbiter is listening on {portal.channel}")
print(f"Registrar is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr:
print(f"my_service is found at {sockaddr}")

View File

@ -24,7 +24,7 @@ async def test_reg_then_unreg(
reg_addr: tuple,
):
actor = tractor.current_actor()
assert actor.is_arbiter
assert actor.is_registrar
assert len(actor._registry) == 1 # only self is registered
async with tractor.open_nursery(
@ -35,7 +35,7 @@ async def test_reg_then_unreg(
uid = portal.channel.aid.uid
async with tractor.get_registry(reg_addr) as aportal:
# this local actor should be the arbiter
# this local actor should be the registrar
assert actor is aportal.actor
async with tractor.wait_for_actor('actor'):
@ -154,7 +154,7 @@ async def unpack_reg(
actor_or_portal: tractor.Portal|tractor.Actor,
):
'''
Get and unpack a "registry" RPC request from the "arbiter" registry
Get and unpack a "registry" RPC request from the registrar
system.
'''
@ -197,15 +197,15 @@ async def spawn_and_check_registry(
actor = tractor.current_actor()
if remote_arbiter:
assert not actor.is_arbiter
assert not actor.is_registrar
if actor.is_arbiter:
extra = 1 # arbiter is local root actor
if actor.is_registrar:
extra = 1 # registrar is local root actor
get_reg = partial(unpack_reg, actor)
else:
get_reg = partial(unpack_reg, portal)
extra = 2 # local root actor + remote arbiter
extra = 2 # local root actor + remote registrar
# ensure current actor is registered
registry: dict = await get_reg()
@ -285,7 +285,7 @@ def test_subactors_unregister_on_cancel(
):
'''
Verify that cancelling a nursery results in all subactors
deregistering themselves with the arbiter.
deregistering themselves with the registrar.
'''
with pytest.raises(KeyboardInterrupt):
@ -314,7 +314,7 @@ def test_subactors_unregister_on_cancel_remote_daemon(
'''
Verify that cancelling a nursery results in all subactors
deregistering themselves with a **remote** (not in the local
process tree) arbiter.
process tree) registrar.
'''
with pytest.raises(KeyboardInterrupt):
@ -387,7 +387,7 @@ async def close_chans_before_nursery(
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# normal registrar channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
@ -420,7 +420,7 @@ def test_close_channel_explicit(
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
results in subactor(s) deregistering from the registrar.
'''
with pytest.raises(KeyboardInterrupt):
@ -444,7 +444,7 @@ def test_close_channel_explicit_remote_registrar(
'''
Verify that closing a stream explicitly and killing the actor's
"root nursery" **before** the containing nursery tears down also
results in subactor(s) deregistering from the arbiter.
results in subactor(s) deregistering from the registrar.
'''
with pytest.raises(KeyboardInterrupt):

View File

@ -1,5 +1,5 @@
"""
Arbiter and "local" actor api
Registrar and "local" actor api
"""
import time
@ -12,11 +12,11 @@ from tractor._testing import tractor_test
@pytest.mark.trio
async def test_no_runtime():
"""An arbitter must be established before any nurseries
"""A registrar must be established before any nurseries
can be created.
(In other words ``tractor.open_root_actor()`` must be engaged at
some point?)
(In other words ``tractor.open_root_actor()`` must be
engaged at some point?)
"""
with pytest.raises(RuntimeError) :
async with tractor.find_actor('doggy'):
@ -25,9 +25,9 @@ async def test_no_runtime():
@tractor_test
async def test_self_is_registered(reg_addr):
"Verify waiting on the arbiter to register itself using the standard api."
"Verify waiting on the registrar to register itself using the standard api."
actor = tractor.current_actor()
assert actor.is_arbiter
assert actor.is_registrar
with trio.fail_after(0.2):
async with tractor.wait_for_actor('root') as portal:
assert portal.channel.uid[0] == 'root'
@ -35,9 +35,9 @@ async def test_self_is_registered(reg_addr):
@tractor_test
async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal."
"Verify waiting on the registrar to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
assert actor.is_registrar
async with tractor.get_registry(reg_addr) as portal:
assert isinstance(portal, tractor.runtime._portal.LocalPortal)
@ -57,8 +57,8 @@ def test_local_actor_async_func(reg_addr):
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
# arbiter is started in-proc if dne
assert tractor.current_actor().is_arbiter
# registrar is started in-proc if dne
assert tractor.current_actor().is_registrar
for i in range(10):
nums.append(i)

View File

@ -53,19 +53,19 @@ def test_abort_on_sigint(
@tractor_test
async def test_cancel_remote_arbiter(
async def test_cancel_remote_registrar(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
assert not current_actor().is_arbiter
assert not current_actor().is_registrar
async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
# the arbiter channel server is cancelled but not its main task
# the registrar channel server is cancelled but not its main task
assert daemon.returncode is None
# no arbiter socket should exist
# no registrar socket should exist
with pytest.raises(OSError):
async with tractor.get_registry(reg_addr) as portal:
pass
@ -80,7 +80,7 @@ def test_register_duplicate_name(
registry_addrs=[reg_addr],
) as an:
assert not current_actor().is_arbiter
assert not current_actor().is_registrar
p1 = await an.start_actor('doggy')
p2 = await an.start_actor('doggy')

View File

@ -110,7 +110,7 @@ def test_rpc_errors(
) as n:
actor = tractor.current_actor()
assert actor.is_arbiter
assert actor.is_registrar
await n.run_in_actor(
sleep_back_actor,
actor_name=subactor_requests_to,

View File

@ -39,7 +39,7 @@ async def spawn(
):
# now runtime exists
actor: tractor.Actor = tractor.current_actor()
assert actor.is_arbiter == should_be_root
assert actor.is_registrar == should_be_root
# spawns subproc here
portal: tractor.Portal = await an.run_in_actor(
@ -68,7 +68,7 @@ async def spawn(
assert result == 10
return result
else:
assert actor.is_arbiter == should_be_root
assert actor.is_registrar == should_be_root
return 10
@ -181,7 +181,7 @@ def test_loglevel_propagated_to_subactor(
async def main():
async with tractor.open_nursery(
name='arbiter',
name='registrar',
start_method=start_method,
arbiter_addr=reg_addr,

View File

@ -69,4 +69,8 @@ from ._root import (
from .ipc import Channel as Channel
from .runtime._portal import Portal as Portal
from .runtime._runtime import Actor as Actor
from .discovery._registry import (
Registrar as Registrar,
Arbiter as Arbiter,
)
# from . import hilevel as hilevel

View File

@ -38,6 +38,7 @@ import warnings
import trio
from .runtime import _runtime
from .discovery._registry import Registrar
from .devx import (
debug,
_frame_stack,
@ -267,7 +268,6 @@ async def open_root_actor(
if start_method is not None:
_spawn.try_set_start_method(start_method)
# TODO! remove this ASAP!
if arbiter_addr is not None:
warnings.warn(
'`arbiter_addr` is now deprecated\n'
@ -400,7 +400,7 @@ async def open_root_actor(
'registry socket(s) already bound'
)
# we were able to connect to an arbiter
# we were able to connect to a registrar
logger.info(
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
)
@ -453,8 +453,7 @@ async def open_root_actor(
# https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296
# TODO: rename as `RootActor` or is that even necessary?
actor = _runtime.Arbiter(
actor = Registrar(
name=name or 'registrar',
uuid=mk_uuid(),
registry_addrs=registry_addrs,

View File

@ -75,7 +75,7 @@ def tractor_test(
If any of the following fixture are requested by the wrapped test
fn (via normal func-args declaration),
- `reg_addr` (a socket addr tuple where arbiter is listening)
- `reg_addr` (a socket addr tuple where registrar is listening)
- `loglevel` (logging level passed to tractor internals)
- `start_method` (subprocess spawning backend)

View File

@ -72,8 +72,8 @@ async def get_registry(
'''
actor: Actor = current_actor()
if actor.is_registrar:
# we're already the arbiter
# (likely a re-entrant call from the arbiter actor)
# we're already the registrar
# (likely a re-entrant call from the registrar actor)
yield LocalPortal(
actor,
Channel(transport=None)
@ -268,10 +268,10 @@ async def find_actor(
None,
]:
'''
Ask the arbiter to find actor(s) by name.
Ask the registrar to find actor(s) by name.
Returns a connected portal to the last registered matching actor
known to the arbiter.
Returns a connected portal to the last registered
matching actor known to the registrar.
'''
# optimization path, use any pre-existing peer channel

View File

@ -0,0 +1,253 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either
# version 3 of the License, or (at your option) any later
# version.
# This program is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU Affero General Public License for more
# details.
# You should have received a copy of the GNU Affero General
# Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
'''
Actor-registry for process-tree service discovery.
The `Registrar` is a special `Actor` subtype that serves as
the process-tree's name-registry, tracking actor
name-to-address mappings so peers can discover each other.
'''
from __future__ import annotations
from bidict import bidict
import trio
from ..runtime._runtime import Actor
from ._addr import (
UnwrappedAddress,
Address,
wrap_address,
)
from ..devx import debug
from ..log import get_logger
log = get_logger('tractor')
class Registrar(Actor):
'''
A special registrar `Actor` who can contact all other
actors within its immediate process tree and keeps
a registry of others meant to be discoverable in
a distributed application.
Normally the registrar is also the "root actor" and
thus always has access to the top-most-level actor
(process) nursery.
By default, the registrar is always initialized when
and if no other registrar socket addrs have been
specified to runtime init entry-points (such as
`open_root_actor()` or `open_nursery()`). Any time
a new main process is launched (and thus a new root
actor created) and, no existing registrar can be
contacted at the provided `registry_addr`, then
a new one is always created; however, if one can be
reached it is used.
Normally a distributed app requires at least one
registrar per logical host where for that given
"host space" (aka localhost IPC domain of addresses)
it is responsible for making all other host (local
address) bound actors *discoverable* to external
actor trees running on remote hosts.
'''
is_registrar = True
def is_registry(self) -> bool:
return self.is_registrar
def __init__(
self,
*args,
**kwargs,
) -> None:
self._registry: bidict[
tuple[str, str],
UnwrappedAddress,
] = bidict({})
self._waiters: dict[
str,
# either an event to sync to receiving an
# actor uid (which is filled in once the actor
# has sucessfully registered), or that uid
# after registry is complete.
list[trio.Event|tuple[str, str]]
] = {}
super().__init__(*args, **kwargs)
async def find_actor(
self,
name: str,
) -> UnwrappedAddress|None:
for uid, addr in self._registry.items():
if name in uid:
return addr
return None
async def get_registry(
self
) -> dict[str, UnwrappedAddress]:
'''
Return current name registry.
This method is async to allow for cross-actor
invocation.
'''
# NOTE: requires ``strict_map_key=False`` to the
# msgpack unpacker since we have tuples as keys
# (note this makes the registrar suscetible to
# hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return {
'.'.join(key): val
for key, val in self._registry.items()
}
async def wait_for_actor(
self,
name: str,
) -> list[UnwrappedAddress]:
'''
Wait for a particular actor to register.
This is a blocking call if no actor by the
provided name is currently registered.
'''
addrs: list[UnwrappedAddress] = []
addr: UnwrappedAddress
mailbox_info: str = (
'Actor registry contact infos:\n'
)
for uid, addr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
f'|_addr: {addr}\n\n'
)
if name == uid[0]:
addrs.append(addr)
if not addrs:
waiter = trio.Event()
self._waiters.setdefault(
name, []
).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
addrs.append(
self._registry[uid]
)
log.runtime(mailbox_info)
return addrs
async def register_actor(
self,
uid: tuple[str, str],
addr: UnwrappedAddress
) -> None:
uid = name, hash = (
str(uid[0]),
str(uid[1]),
)
waddr: Address = wrap_address(addr)
if not waddr.is_valid:
# should never be 0-dynamic-os-alloc
await debug.pause()
# XXX NOTE, value must also be hashable AND since
# `._registry` is a `bidict` values must be unique;
# use `.forceput()` to replace any prior (stale)
# entries that might map a different uid to the same
# addr (e.g. after an unclean shutdown or
# actor-restart reusing the same address).
self._registry.forceput(uid, tuple(addr))
# pop and signal all waiter events
events = self._waiters.pop(name, [])
self._waiters.setdefault(
name, []
).append(uid)
for event in events:
if isinstance(event, trio.Event):
event.set()
async def unregister_actor(
self,
uid: tuple[str, str]
) -> None:
uid = (str(uid[0]), str(uid[1]))
entry: tuple = self._registry.pop(
uid, None
)
if entry is None:
log.warning(
f'Request to de-register'
f' {uid!r} failed?'
)
async def delete_addr(
self,
addr: tuple[str, int|str]|list[str|int],
) -> tuple[str, str]|None:
# NOTE: `addr` arrives as a `list` over IPC
# (msgpack deserializes tuples -> lists) so
# coerce to `tuple` for the bidict hash lookup.
uid: tuple[str, str]|None = (
self._registry.inverse.pop(
tuple(addr),
None,
)
)
if uid:
report: str = (
'Deleting registry-entry for,\n'
)
else:
report: str = (
'No registry entry for,\n'
)
log.warning(
report
+
f'{addr!r}@{uid!r}'
)
return uid
# Backward compat alias
Arbiter = Registrar

View File

@ -355,7 +355,7 @@ async def handle_stream_from_peer(
# and `MsgpackStream._inter_packets()` on a read from the
# stream particularly when the runtime is first starting up
# inside `open_root_actor()` where there is a check for
# a bound listener on the "arbiter" addr. the reset will be
# a bound listener on the registrar addr. the reset will be
# because the handshake was never meant took place.
log.runtime(
con_status

View File

@ -225,7 +225,7 @@ class MsgpackTransport(MsgTransport):
# not sure entirely why we need this but without it we
# seem to be getting racy failures here on
# arbiter/registry name subs..
# registrar name subs..
trio.BrokenResourceError,
) as trans_err:

View File

@ -68,7 +68,6 @@ import textwrap
from types import ModuleType
import warnings
from bidict import bidict
import trio
from trio._core import _run as trio_runtime
from trio import (
@ -176,13 +175,21 @@ class Actor:
dialog.
'''
# ugh, we need to get rid of this and replace with a "registry" sys
# https://github.com/goodboy/tractor/issues/216
is_arbiter: bool = False
is_registrar: bool = False
@property
def is_registrar(self) -> bool:
return self.is_arbiter
def is_arbiter(self) -> bool:
'''
Deprecated, use `.is_registrar`.
'''
warnings.warn(
'`Actor.is_arbiter` is deprecated.\n'
'Use `.is_registrar` instead.',
DeprecationWarning,
stacklevel=2,
)
return self.is_registrar
@property
def is_root(self) -> bool:
@ -238,7 +245,6 @@ class Actor:
registry_addrs: list[Address]|None = None,
spawn_method: str|None = None,
# TODO: remove!
arbiter_addr: UnwrappedAddress|None = None,
) -> None:
@ -288,8 +294,8 @@ class Actor:
]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
# will be None for the parent most process started
# manually by the user (the "registrar")
self._spawn_method: str = spawn_method
# RPC state
@ -1657,7 +1663,7 @@ async def async_main(
# TODO, just read direct from ipc_server?
accept_addrs: list[UnwrappedAddress] = actor.accept_addrs
# Register with the arbiter if we're told its addr
# Register with the registrar if we're told its addr
log.runtime(
f'Registering `{actor.name}` => {pformat(accept_addrs)}\n'
# ^-TODO-^ we should instead show the maddr here^^
@ -1881,184 +1887,8 @@ async def async_main(
log.runtime(teardown_report)
# TODO: rename to `Registry` and move to `.discovery._registry`!
class Arbiter(Actor):
'''
A special registrar (and for now..) `Actor` who can contact all
other actors within its immediate process tree and possibly keeps
a registry of others meant to be discoverable in a distributed
application. Normally the registrar is also the "root actor" and
thus always has access to the top-most-level actor (process)
nursery.
By default, the registrar is always initialized when and if no
other registrar socket addrs have been specified to runtime
init entry-points (such as `open_root_actor()` or
`open_nursery()`). Any time a new main process is launched (and
thus thus a new root actor created) and, no existing registrar
can be contacted at the provided `registry_addr`, then a new
one is always created; however, if one can be reached it is
used.
Normally a distributed app requires at least registrar per
logical host where for that given "host space" (aka localhost
IPC domain of addresses) it is responsible for making all other
host (local address) bound actors *discoverable* to external
actor trees running on remote hosts.
'''
is_arbiter = True
# TODO, implement this as a read on there existing a `._state` of
# some sort setup by whenever we impl this all as
# a `.discovery._registry.open_registry()` API
def is_registry(self) -> bool:
return self.is_arbiter
def __init__(
self,
*args,
**kwargs,
) -> None:
self._registry: bidict[
tuple[str, str],
UnwrappedAddress,
] = bidict({})
self._waiters: dict[
str,
# either an event to sync to receiving an actor uid (which
# is filled in once the actor has sucessfully registered),
# or that uid after registry is complete.
list[trio.Event | tuple[str, str]]
] = {}
super().__init__(*args, **kwargs)
async def find_actor(
self,
name: str,
) -> UnwrappedAddress|None:
for uid, addr in self._registry.items():
if name in uid:
return addr
return None
async def get_registry(
self
) -> dict[str, UnwrappedAddress]:
'''
Return current name registry.
This method is async to allow for cross-actor invocation.
'''
# NOTE: requires ``strict_map_key=False`` to the msgpack
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return {
'.'.join(key): val
for key, val in self._registry.items()
}
async def wait_for_actor(
self,
name: str,
) -> list[UnwrappedAddress]:
'''
Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently
registered.
'''
addrs: list[UnwrappedAddress] = []
addr: UnwrappedAddress
mailbox_info: str = 'Actor registry contact infos:\n'
for uid, addr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
f'|_addr: {addr}\n\n'
)
if name == uid[0]:
addrs.append(addr)
if not addrs:
waiter = trio.Event()
self._waiters.setdefault(name, []).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
addrs.append(self._registry[uid])
log.runtime(mailbox_info)
return addrs
async def register_actor(
self,
uid: tuple[str, str],
addr: UnwrappedAddress
) -> None:
uid = name, hash = (str(uid[0]), str(uid[1]))
waddr: Address = wrap_address(addr)
if not waddr.is_valid:
# should never be 0-dynamic-os-alloc
await debug.pause()
# XXX NOTE, value must also be hashable AND since
# `._registry` is a `bidict` values must be unique; use
# `.forceput()` to replace any prior (stale) entries
# that might map a different uid to the same addr (e.g.
# after an unclean shutdown or actor-restart reusing
# the same address).
self._registry.forceput(uid, tuple(addr))
# pop and signal all waiter events
events = self._waiters.pop(name, [])
self._waiters.setdefault(name, []).append(uid)
for event in events:
if isinstance(event, trio.Event):
event.set()
async def unregister_actor(
self,
uid: tuple[str, str]
) -> None:
uid = (str(uid[0]), str(uid[1]))
entry: tuple = self._registry.pop(uid, None)
if entry is None:
log.warning(
f'Request to de-register {uid!r} failed?'
)
async def delete_addr(
self,
addr: tuple[str, int|str]|list[str|int],
) -> tuple[str, str]|None:
# NOTE: `addr` arrives as a `list` over IPC
# (msgpack deserializes tuples -> lists) so
# coerce to `tuple` for the bidict hash lookup.
uid: tuple[str, str]|None = self._registry.inverse.pop(
tuple(addr),
None,
)
if uid:
report: str = 'Deleting registry-entry for,\n'
else:
report: str = 'No registry entry for,\n'
log.warning(
report
+
f'{addr!r}@{uid!r}'
)
return uid
# Backward compat: class moved to discovery._registry
from ..discovery._registry import (
Registrar,
Registrar as Arbiter,
)