forked from goodboy/tractor
Pack tuple keys as . delim strs in registry tests
parent
17e195aacf
commit
c5acc3b969
|
@ -116,11 +116,26 @@ async def stream_from(portal):
|
|||
print(value)
|
||||
|
||||
|
||||
async def unpack_reg(actor_or_portal):
|
||||
'''
|
||||
Get and unpack a "registry" RPC request from the "arbiter" registry
|
||||
system.
|
||||
|
||||
'''
|
||||
if getattr(actor_or_portal, 'get_registry', None):
|
||||
msg = await actor_or_portal.get_registry()
|
||||
else:
|
||||
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
|
||||
|
||||
return {tuple(key.split('.')): val for key, val in msg.items()}
|
||||
|
||||
|
||||
async def spawn_and_check_registry(
|
||||
arb_addr: tuple,
|
||||
use_signal: bool,
|
||||
remote_arbiter: bool = False,
|
||||
with_streaming: bool = False,
|
||||
|
||||
) -> None:
|
||||
|
||||
async with tractor.open_root_actor(
|
||||
|
@ -134,13 +149,11 @@ async def spawn_and_check_registry(
|
|||
assert not actor.is_arbiter
|
||||
|
||||
if actor.is_arbiter:
|
||||
|
||||
async def get_reg():
|
||||
return await actor.get_registry()
|
||||
|
||||
extra = 1 # arbiter is local root actor
|
||||
get_reg = partial(unpack_reg, actor)
|
||||
|
||||
else:
|
||||
get_reg = partial(portal.run_from_ns, 'self', 'get_registry')
|
||||
get_reg = partial(unpack_reg, portal)
|
||||
extra = 2 # local root actor + remote arbiter
|
||||
|
||||
# ensure current actor is registered
|
||||
|
@ -266,7 +279,7 @@ async def close_chans_before_nursery(
|
|||
):
|
||||
async with tractor.get_arbiter(*arb_addr) as aportal:
|
||||
try:
|
||||
get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')
|
||||
get_reg = partial(unpack_reg, aportal)
|
||||
|
||||
async with tractor.open_nursery() as tn:
|
||||
portal1 = await tn.start_actor(
|
||||
|
|
|
@ -27,7 +27,7 @@ import importlib.util
|
|||
import inspect
|
||||
import uuid
|
||||
import typing
|
||||
from typing import List, Tuple, Any, Optional, Union
|
||||
from typing import Any, Optional, Union
|
||||
from types import ModuleType
|
||||
import sys
|
||||
import os
|
||||
|
@ -199,7 +199,9 @@ async def _invoke(
|
|||
assert chan.uid
|
||||
ctx = actor._contexts.pop((chan.uid, cid))
|
||||
if ctx:
|
||||
log.runtime(f'Context entrypoint for {func} was terminated:\n{ctx}')
|
||||
log.runtime(
|
||||
f'Context entrypoint for {func} was terminated:\n{ctx}'
|
||||
)
|
||||
|
||||
assert cs
|
||||
if cs.cancelled_caught:
|
||||
|
@ -368,10 +370,10 @@ class Actor:
|
|||
self,
|
||||
name: str,
|
||||
*,
|
||||
enable_modules: List[str] = [],
|
||||
enable_modules: list[str] = [],
|
||||
uid: str = None,
|
||||
loglevel: str = None,
|
||||
arbiter_addr: Optional[Tuple[str, int]] = None,
|
||||
arbiter_addr: Optional[tuple[str, int]] = None,
|
||||
spawn_method: Optional[str] = None
|
||||
) -> None:
|
||||
"""This constructor is called in the parent actor **before** the spawning
|
||||
|
@ -421,25 +423,25 @@ class Actor:
|
|||
|
||||
# (chan, cid) -> (cancel_scope, func)
|
||||
self._rpc_tasks: dict[
|
||||
Tuple[Channel, str],
|
||||
Tuple[trio.CancelScope, typing.Callable, trio.Event]
|
||||
tuple[Channel, str],
|
||||
tuple[trio.CancelScope, typing.Callable, trio.Event]
|
||||
] = {}
|
||||
|
||||
# map {actor uids -> Context}
|
||||
self._contexts: dict[
|
||||
Tuple[Tuple[str, str], str],
|
||||
tuple[tuple[str, str], str],
|
||||
Context
|
||||
] = {}
|
||||
|
||||
self._listeners: List[trio.abc.Listener] = []
|
||||
self._listeners: list[trio.abc.Listener] = []
|
||||
self._parent_chan: Optional[Channel] = None
|
||||
self._forkserver_info: Optional[
|
||||
Tuple[Any, Any, Any, Any, Any]] = None
|
||||
tuple[Any, Any, Any, Any, Any]] = None
|
||||
self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa
|
||||
|
||||
async def wait_for_peer(
|
||||
self, uid: Tuple[str, str]
|
||||
) -> Tuple[trio.Event, Channel]:
|
||||
self, uid: tuple[str, str]
|
||||
) -> tuple[trio.Event, Channel]:
|
||||
"""Wait for a connection back from a spawned actor with a given
|
||||
``uid``.
|
||||
"""
|
||||
|
@ -1010,8 +1012,8 @@ class Actor:
|
|||
|
||||
async def _from_parent(
|
||||
self,
|
||||
parent_addr: Optional[Tuple[str, int]],
|
||||
) -> Tuple[Channel, Optional[Tuple[str, int]]]:
|
||||
parent_addr: Optional[tuple[str, int]],
|
||||
) -> tuple[Channel, Optional[tuple[str, int]]]:
|
||||
try:
|
||||
# Connect back to the parent actor and conduct initial
|
||||
# handshake. From this point on if we error, we
|
||||
|
@ -1024,7 +1026,7 @@ class Actor:
|
|||
# Initial handshake: swap names.
|
||||
await self._do_handshake(chan)
|
||||
|
||||
accept_addr: Optional[Tuple[str, int]] = None
|
||||
accept_addr: Optional[tuple[str, int]] = None
|
||||
|
||||
if self._spawn_method == "trio":
|
||||
# Receive runtime state from our parent
|
||||
|
@ -1066,7 +1068,7 @@ class Actor:
|
|||
|
||||
async def _async_main(
|
||||
self,
|
||||
accept_addr: Optional[Tuple[str, int]] = None,
|
||||
accept_addr: Optional[tuple[str, int]] = None,
|
||||
|
||||
# XXX: currently ``parent_addr`` is only needed for the
|
||||
# ``multiprocessing`` backend (which pickles state sent to
|
||||
|
@ -1075,7 +1077,7 @@ class Actor:
|
|||
# change this to a simple ``is_subactor: bool`` which will
|
||||
# be False when running as root actor and True when as
|
||||
# a subactor.
|
||||
parent_addr: Optional[Tuple[str, int]] = None,
|
||||
parent_addr: Optional[tuple[str, int]] = None,
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
|
@ -1261,7 +1263,7 @@ class Actor:
|
|||
handler_nursery: trio.Nursery,
|
||||
*,
|
||||
# (host, port) to bind for channel server
|
||||
accept_host: Tuple[str, int] = None,
|
||||
accept_host: tuple[str, int] = None,
|
||||
accept_port: int = 0,
|
||||
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
|
||||
) -> None:
|
||||
|
@ -1273,7 +1275,7 @@ class Actor:
|
|||
self._server_down = trio.Event()
|
||||
try:
|
||||
async with trio.open_nursery() as server_n:
|
||||
l: List[trio.abc.Listener] = await server_n.start(
|
||||
l: list[trio.abc.Listener] = await server_n.start(
|
||||
partial(
|
||||
trio.serve_tcp,
|
||||
self._stream_handler,
|
||||
|
@ -1427,7 +1429,7 @@ class Actor:
|
|||
self._server_n.cancel_scope.cancel()
|
||||
|
||||
@property
|
||||
def accept_addr(self) -> Optional[Tuple[str, int]]:
|
||||
def accept_addr(self) -> Optional[tuple[str, int]]:
|
||||
"""Primary address to which the channel server is bound.
|
||||
"""
|
||||
# throws OSError on failure
|
||||
|
@ -1438,7 +1440,7 @@ class Actor:
|
|||
assert self._parent_chan, "No parent channel for this actor?"
|
||||
return Portal(self._parent_chan)
|
||||
|
||||
def get_chans(self, uid: Tuple[str, str]) -> List[Channel]:
|
||||
def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
|
||||
"""Return all channels to the actor with provided uid."""
|
||||
return self._peers[uid]
|
||||
|
||||
|
@ -1446,7 +1448,7 @@ class Actor:
|
|||
self,
|
||||
chan: Channel
|
||||
|
||||
) -> Tuple[str, str]:
|
||||
) -> tuple[str, str]:
|
||||
"""Exchange (name, UUIDs) identifiers as the first communication step.
|
||||
|
||||
These are essentially the "mailbox addresses" found in actor model
|
||||
|
@ -1454,7 +1456,7 @@ class Actor:
|
|||
"""
|
||||
await chan.send(self.uid)
|
||||
value = await chan.recv()
|
||||
uid: Tuple[str, str] = (str(value[0]), str(value[1]))
|
||||
uid: tuple[str, str] = (str(value[0]), str(value[1]))
|
||||
|
||||
if not isinstance(uid, tuple):
|
||||
raise ValueError(f"{uid} is not a valid uid?!")
|
||||
|
@ -1483,14 +1485,14 @@ class Arbiter(Actor):
|
|||
def __init__(self, *args, **kwargs):
|
||||
|
||||
self._registry: dict[
|
||||
Tuple[str, str],
|
||||
Tuple[str, int],
|
||||
tuple[str, str],
|
||||
tuple[str, int],
|
||||
] = {}
|
||||
self._waiters = {}
|
||||
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
async def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
|
||||
async def find_actor(self, name: str) -> Optional[tuple[str, int]]:
|
||||
for uid, sockaddr in self._registry.items():
|
||||
if name in uid:
|
||||
return sockaddr
|
||||
|
@ -1499,25 +1501,31 @@ class Arbiter(Actor):
|
|||
|
||||
async def get_registry(
|
||||
self
|
||||
) -> dict[Tuple[str, str], Tuple[str, int]]:
|
||||
'''Return current name registry.
|
||||
|
||||
) -> dict[tuple[str, str], tuple[str, int]]:
|
||||
'''
|
||||
Return current name registry.
|
||||
|
||||
This method is async to allow for cross-actor invocation.
|
||||
|
||||
'''
|
||||
# NOTE: requires ``strict_map_key=False`` to the msgpack
|
||||
# unpacker since we have tuples as keys (not this makes the
|
||||
# arbiter suscetible to hashdos):
|
||||
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
|
||||
return self._registry
|
||||
return {'.'.join(key): val for key, val in self._registry.items()}
|
||||
|
||||
async def wait_for_actor(
|
||||
self,
|
||||
name: str,
|
||||
) -> List[Tuple[str, int]]:
|
||||
'''Wait for a particular actor to register.
|
||||
|
||||
) -> list[tuple[str, int]]:
|
||||
'''
|
||||
Wait for a particular actor to register.
|
||||
|
||||
This is a blocking call if no actor by the provided name is currently
|
||||
registered.
|
||||
|
||||
'''
|
||||
sockaddrs = []
|
||||
|
||||
|
@ -1536,8 +1544,8 @@ class Arbiter(Actor):
|
|||
|
||||
async def register_actor(
|
||||
self,
|
||||
uid: Tuple[str, str],
|
||||
sockaddr: Tuple[str, int]
|
||||
uid: tuple[str, str],
|
||||
sockaddr: tuple[str, int]
|
||||
|
||||
) -> None:
|
||||
uid = name, uuid = (str(uid[0]), str(uid[1]))
|
||||
|
@ -1552,7 +1560,8 @@ class Arbiter(Actor):
|
|||
|
||||
async def unregister_actor(
|
||||
self,
|
||||
uid: Tuple[str, str]
|
||||
uid: tuple[str, str]
|
||||
|
||||
) -> None:
|
||||
uid = (str(uid[0]), str(uid[1]))
|
||||
self._registry.pop(uid)
|
||||
|
|
Loading…
Reference in New Issue