Add `Arbiter.delete_sockaddr()` to remove addrs

Since stale addrs can be leaked where the actor transport server task
crashes but doesn't (successfully) unregister from the registrar, we
need a remote way to remove such entries; hence this new (registrar)
method.

To implement this make use of the `bidict` lib for the `._registry`
table thus making it super simple to do reverse uuid lookups from an
input socket-address.
dereg_on_oserror
Tyler Goodlet 2023-08-20 16:22:46 -04:00
parent 22c14e235e
commit 1cf712cfac
2 changed files with 18 additions and 2 deletions

View File

@ -41,6 +41,9 @@ setup(
], ],
install_requires=[ install_requires=[
# discovery subsys
'bidict',
# trio related # trio related
# proper range spec: # proper range spec:
# https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5 # https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5

View File

@ -40,6 +40,7 @@ from contextlib import ExitStack
import warnings import warnings
from async_generator import aclosing from async_generator import aclosing
from bidict import bidict
from exceptiongroup import BaseExceptionGroup from exceptiongroup import BaseExceptionGroup
import trio # type: ignore import trio # type: ignore
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -1774,10 +1775,10 @@ class Arbiter(Actor):
def __init__(self, *args, **kwargs) -> None: def __init__(self, *args, **kwargs) -> None:
self._registry: dict[ self._registry: bidict[
tuple[str, str], tuple[str, str],
tuple[str, int], tuple[str, int],
] = {} ] = bidict({})
self._waiters: dict[ self._waiters: dict[
str, str,
# either an event to sync to receiving an actor uid (which # either an event to sync to receiving an actor uid (which
@ -1871,3 +1872,15 @@ class Arbiter(Actor):
entry: tuple = self._registry.pop(uid, None) entry: tuple = self._registry.pop(uid, None)
if entry is None: if entry is None:
log.warning(f'Request to de-register {uid} failed?') log.warning(f'Request to de-register {uid} failed?')
async def delete_sockaddr(
self,
sockaddr: tuple[str, int],
) -> tuple[str, str]:
uid: tuple = self._registry.inverse.pop(sockaddr)
log.warning(
f'Deleting registry entry for {sockaddr}@{uid}!'
)
return uid