Only warn on mismatched `open_registry()` input addrs

When a new (actor) caller opens the registry there are 2 possible cases:
1. - some task already opened the registry during init and set the global
  superset of registrar addrs that are expected to be used,
2. - some task after the init task opens with a subset of addrs.
3. - some task after init opens with a disjoint set - should be an error?

In the 2nd case we don't want to error since the may just not need to
know about other registrar (multi-homed) addrs and thus only needs
specific access - so only warn about the diff in that case. If the
caller is requesting some disjoint set then we still runtime raise.

Adjust `find_service()` to allow a null `registry_addrs` input in which
case we fail over to using whatever pre-set the `Registry.addrs` has;
makes it simple for actors that don't want/need to know about the global
registrar set for their actor tree. Also, always set pass
`tractor.find_actor(only_first=True)` (for now).
distribute_dis
Tyler Goodlet 2023-10-01 15:36:17 -04:00
parent 5d081a40d5
commit 7258d57c69
1 changed files with 35 additions and 15 deletions

View File

@ -66,7 +66,13 @@ async def open_registry(
ensure_exists: bool = True,
) -> list[tuple[str, int]]:
'''
Open the service-actor-discovery registry by returning a set of
tranport socket-addrs to registrar actors which may be
contacted and queried for similar addresses for other
non-registrar actors.
'''
global _tractor_kwargs
actor = tractor.current_actor()
uid = actor.uid
@ -76,11 +82,19 @@ async def open_registry(
and addrs
):
if preset_reg_addrs != addrs:
raise RuntimeError(
f'`{uid}` has non-matching registrar addresses?\n'
f'request: {addrs}\n'
f'already set: {preset_reg_addrs}'
)
# if any(addr in preset_reg_addrs for addr in addrs):
diff: set[tuple[str, int]] = set(preset_reg_addrs) - set(addrs)
if diff:
log.warning(
f'`{uid}` requested only subset of registrars: {addrs}\n'
f'However there are more @{diff}'
)
else:
raise RuntimeError(
f'`{uid}` has non-matching registrar addresses?\n'
f'request: {addrs}\n'
f'already set: {preset_reg_addrs}'
)
was_set: bool = False
@ -122,7 +136,7 @@ async def open_registry(
@acm
async def find_service(
service_name: str,
registry_addrs: list[tuple[str, int]],
registry_addrs: list[tuple[str, int]] | None = None,
first_only: bool = True,
@ -130,22 +144,26 @@ async def find_service(
reg_addrs: list[tuple[str, int]]
async with open_registry(
addrs=registry_addrs,
addrs=(
registry_addrs
# NOTE: if no addr set is passed assume the registry has
# already been opened and use the previously applied
# startup set.
or Registry.addrs
),
) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,
registry_addrs=reg_addrs,
only_first=first_only, # if set only returns single ref
) as maybe_portals:
if not maybe_portals:
yield None
return
if first_only:
yield maybe_portals[0]
else:
yield maybe_portals[0]
yield maybe_portals
async def check_for_service(
@ -156,9 +174,11 @@ async def check_for_service(
Service daemon "liveness" predicate.
'''
async with open_registry(ensure_exists=False) as reg_addr:
async with tractor.query_actor(
async with (
open_registry(ensure_exists=False) as reg_addr,
tractor.query_actor(
service_name,
arbiter_sockaddr=reg_addr,
) as sockaddr:
return sockaddr
) as sockaddr,
):
return sockaddr