Register each actor using its unique ID tuple
This allows for registering more then one actor with the same "name" when you have multiple actors fulfilling the same role. Eventually we'll need support for looking up all actors registered under a given "service name" (or whatever we decide to call it). Also, a fix to the arbiter such that each new instance refers to a separate `_registry` dict (found an issue with duplicate names during testing). Resolves #7reg_with_uid
parent
758fbc6790
commit
1bd5582d8a
|
@ -434,7 +434,7 @@ class Actor:
|
||||||
async with get_arbiter(*arbiter_addr) as arb_portal:
|
async with get_arbiter(*arbiter_addr) as arb_portal:
|
||||||
await arb_portal.run(
|
await arb_portal.run(
|
||||||
'self', 'register_actor',
|
'self', 'register_actor',
|
||||||
name=self.name, sockaddr=self.accept_addr)
|
uid=self.uid, sockaddr=self.accept_addr)
|
||||||
registered_with_arbiter = True
|
registered_with_arbiter = True
|
||||||
|
|
||||||
task_status.started()
|
task_status.started()
|
||||||
|
@ -522,7 +522,7 @@ class Actor:
|
||||||
if arbiter_addr is not None:
|
if arbiter_addr is not None:
|
||||||
async with get_arbiter(*arbiter_addr) as arb_portal:
|
async with get_arbiter(*arbiter_addr) as arb_portal:
|
||||||
await arb_portal.run(
|
await arb_portal.run(
|
||||||
'self', 'unregister_actor', name=self.name)
|
'self', 'unregister_actor', uid=self.uid)
|
||||||
except OSError:
|
except OSError:
|
||||||
log.warn(f"Unable to unregister {self.name} from arbiter")
|
log.warn(f"Unable to unregister {self.name} from arbiter")
|
||||||
|
|
||||||
|
@ -580,24 +580,30 @@ class Actor:
|
||||||
|
|
||||||
class Arbiter(Actor):
|
class Arbiter(Actor):
|
||||||
"""A special actor who knows all the other actors and always has
|
"""A special actor who knows all the other actors and always has
|
||||||
access to the top level nursery.
|
access to a top level nursery.
|
||||||
|
|
||||||
The arbiter is by default the first actor spawned on each host
|
The arbiter is by default the first actor spawned on each host
|
||||||
and is responsible for keeping track of all other actors for
|
and is responsible for keeping track of all other actors for
|
||||||
coordination purposes. If a new main process is launched and an
|
coordination purposes. If a new main process is launched and an
|
||||||
arbiter is already running that arbiter will be used.
|
arbiter is already running that arbiter will be used.
|
||||||
"""
|
"""
|
||||||
_registry = defaultdict(list)
|
|
||||||
is_arbiter = True
|
is_arbiter = True
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self._registry = defaultdict(list)
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
def find_actor(self, name):
|
def find_actor(self, name):
|
||||||
return self._registry[name]
|
for uid, actor in self._registry.items():
|
||||||
|
if name in uid:
|
||||||
|
print('found it!')
|
||||||
|
return actor
|
||||||
|
|
||||||
def register_actor(self, name, sockaddr):
|
def register_actor(self, uid, sockaddr):
|
||||||
self._registry[name].append(sockaddr)
|
self._registry[uid].append(sockaddr)
|
||||||
|
|
||||||
def unregister_actor(self, name):
|
def unregister_actor(self, uid):
|
||||||
self._registry.pop(name, None)
|
self._registry.pop(uid, None)
|
||||||
|
|
||||||
|
|
||||||
async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
|
async def _start_actor(actor, main, host, port, arbiter_addr, nursery=None):
|
||||||
|
|
Loading…
Reference in New Issue