Address Copilot review suggestions on PR #366
- Use `bidict.forceput()` in `register_actor()` to handle duplicate addr values from stale entries or actor restarts. - Fix `uid` annotation to `tuple[str, str]|None` in `maybe_open_portal()` and handle the `None` return from `delete_addr()` in log output. - Pass explicit `registry_addrs=[reg_addr]` to `open_nursery()` and `find_actor()` in `test_stale_entry_is_deleted` to ensure the test uses the remote registrar. - Update `query_actor()` docstring to document the new `(addr, reg_portal)` yield shape. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>multicast_revertable_streams
parent
850219f60c
commit
85457cb839
|
|
@ -491,6 +491,7 @@ def test_stale_entry_is_deleted(
|
||||||
async with (
|
async with (
|
||||||
tractor.open_nursery(
|
tractor.open_nursery(
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
|
registry_addrs=[reg_addr],
|
||||||
) as an,
|
) as an,
|
||||||
tractor.get_registry(reg_addr) as _reg_ptl,
|
tractor.get_registry(reg_addr) as _reg_ptl,
|
||||||
):
|
):
|
||||||
|
|
@ -501,7 +502,10 @@ def test_stale_entry_is_deleted(
|
||||||
async with ptl.open_context(
|
async with ptl.open_context(
|
||||||
kill_transport,
|
kill_transport,
|
||||||
) as (first, ctx):
|
) as (first, ctx):
|
||||||
async with tractor.find_actor(name) as maybe_portal:
|
async with tractor.find_actor(
|
||||||
|
name,
|
||||||
|
registry_addrs=[reg_addr],
|
||||||
|
) as maybe_portal:
|
||||||
# because the transitive
|
# because the transitive
|
||||||
# `._discovery.maybe_open_portal()` call should
|
# `._discovery.maybe_open_portal()` call should
|
||||||
# fail and implicitly call `.delete_addr()`
|
# fail and implicitly call `.delete_addr()`
|
||||||
|
|
|
||||||
|
|
@ -160,8 +160,12 @@ async def query_actor(
|
||||||
Lookup a transport address (by actor name) via querying a registrar
|
Lookup a transport address (by actor name) via querying a registrar
|
||||||
listening @ `regaddr`.
|
listening @ `regaddr`.
|
||||||
|
|
||||||
Returns the transport protocol (socket) address or `None` if no
|
Yields a `tuple` of `(addr, reg_portal)` where,
|
||||||
entry under that name exists.
|
- `addr` is the transport protocol (socket) address or `None` if
|
||||||
|
no entry under that name exists,
|
||||||
|
- `reg_portal` is the `Portal` to the registrar used for the
|
||||||
|
lookup (or `None` when the peer was found locally via
|
||||||
|
`get_peer_by_name()`).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
actor: Actor = current_actor()
|
actor: Actor = current_actor()
|
||||||
|
|
@ -225,16 +229,21 @@ async def maybe_open_portal(
|
||||||
# NOTE: ensure we delete the stale entry
|
# NOTE: ensure we delete the stale entry
|
||||||
# from the registrar actor when available.
|
# from the registrar actor when available.
|
||||||
if reg_portal is not None:
|
if reg_portal is not None:
|
||||||
uid: tuple[str, str] = await reg_portal.run_from_ns(
|
uid: tuple[str, str]|None = await reg_portal.run_from_ns(
|
||||||
'self',
|
'self',
|
||||||
'delete_addr',
|
'delete_addr',
|
||||||
addr=addr,
|
addr=addr,
|
||||||
)
|
)
|
||||||
|
if uid:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Deleted stale registry entry !\n'
|
f'Deleted stale registry entry !\n'
|
||||||
f'addr: {addr!r}\n'
|
f'addr: {addr!r}\n'
|
||||||
f'uid: {uid!r}\n'
|
f'uid: {uid!r}\n'
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
log.warning(
|
||||||
|
f'No registry entry found for addr: {addr!r}'
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Connection to {addr!r} failed'
|
f'Connection to {addr!r} failed'
|
||||||
|
|
|
||||||
|
|
@ -2013,8 +2013,13 @@ class Arbiter(Actor):
|
||||||
# should never be 0-dynamic-os-alloc
|
# should never be 0-dynamic-os-alloc
|
||||||
await debug.pause()
|
await debug.pause()
|
||||||
|
|
||||||
# XXX NOTE, value must also be hashable.
|
# XXX NOTE, value must also be hashable AND since
|
||||||
self._registry[uid] = tuple(addr)
|
# `._registry` is a `bidict` values must be unique; use
|
||||||
|
# `.forceput()` to replace any prior (stale) entries
|
||||||
|
# that might map a different uid to the same addr (e.g.
|
||||||
|
# after an unclean shutdown or actor-restart reusing
|
||||||
|
# the same address).
|
||||||
|
self._registry.forceput(uid, tuple(addr))
|
||||||
|
|
||||||
# pop and signal all waiter events
|
# pop and signal all waiter events
|
||||||
events = self._waiters.pop(name, [])
|
events = self._waiters.pop(name, [])
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue