forked from goodboy/tractor
1
0
Fork 0

Merge pull request #300 from goodboy/msgpack_lists_by_default

Use lists by default like `msgspec`, update to latest `msgspec`  and `msgpack` releases
sort_subs_results_infected_aio
goodboy 2022-02-15 09:08:20 -05:00 committed by GitHub
commit 6e5590dad6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 76 additions and 50 deletions

View File

@ -0,0 +1,3 @@
Update to and pin latest `msgpack` (1.0.3) and `msgspec` (0.4.0)
both of which required adjustments for backwards imcompatible API
tweaks.

View File

@ -56,13 +56,13 @@ setup(
'pdbpp',
# serialization
'msgpack',
'msgpack>=1.0.3',
],
extras_require={
# serialization
'msgspec': ["msgspec >= 0.3.2'; python_version >= '3.9'"],
'msgspec': ['msgspec >= "0.4.0"'],
},
tests_require=['pytest'],

View File

@ -116,11 +116,26 @@ async def stream_from(portal):
print(value)
async def unpack_reg(actor_or_portal):
'''
Get and unpack a "registry" RPC request from the "arbiter" registry
system.
'''
if getattr(actor_or_portal, 'get_registry', None):
msg = await actor_or_portal.get_registry()
else:
msg = await actor_or_portal.run_from_ns('self', 'get_registry')
return {tuple(key.split('.')): val for key, val in msg.items()}
async def spawn_and_check_registry(
arb_addr: tuple,
use_signal: bool,
remote_arbiter: bool = False,
with_streaming: bool = False,
) -> None:
async with tractor.open_root_actor(
@ -134,13 +149,11 @@ async def spawn_and_check_registry(
assert not actor.is_arbiter
if actor.is_arbiter:
async def get_reg():
return await actor.get_registry()
extra = 1 # arbiter is local root actor
get_reg = partial(unpack_reg, actor)
else:
get_reg = partial(portal.run_from_ns, 'self', 'get_registry')
get_reg = partial(unpack_reg, portal)
extra = 2 # local root actor + remote arbiter
# ensure current actor is registered
@ -266,7 +279,7 @@ async def close_chans_before_nursery(
):
async with tractor.get_arbiter(*arb_addr) as aportal:
try:
get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')
get_reg = partial(unpack_reg, aportal)
async with tractor.open_nursery() as tn:
portal1 = await tn.start_actor(

View File

@ -27,7 +27,7 @@ import importlib.util
import inspect
import uuid
import typing
from typing import List, Tuple, Any, Optional, Union
from typing import Any, Optional, Union
from types import ModuleType
import sys
import os
@ -199,7 +199,9 @@ async def _invoke(
assert chan.uid
ctx = actor._contexts.pop((chan.uid, cid))
if ctx:
log.runtime(f'Context entrypoint for {func} was terminated:\n{ctx}')
log.runtime(
f'Context entrypoint for {func} was terminated:\n{ctx}'
)
assert cs
if cs.cancelled_caught:
@ -368,10 +370,10 @@ class Actor:
self,
name: str,
*,
enable_modules: List[str] = [],
enable_modules: list[str] = [],
uid: str = None,
loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
arbiter_addr: Optional[tuple[str, int]] = None,
spawn_method: Optional[str] = None
) -> None:
"""This constructor is called in the parent actor **before** the spawning
@ -421,25 +423,25 @@ class Actor:
# (chan, cid) -> (cancel_scope, func)
self._rpc_tasks: dict[
Tuple[Channel, str],
Tuple[trio.CancelScope, typing.Callable, trio.Event]
tuple[Channel, str],
tuple[trio.CancelScope, typing.Callable, trio.Event]
] = {}
# map {actor uids -> Context}
self._contexts: dict[
Tuple[Tuple[str, str], str],
tuple[tuple[str, str], str],
Context
] = {}
self._listeners: List[trio.abc.Listener] = []
self._listeners: list[trio.abc.Listener] = []
self._parent_chan: Optional[Channel] = None
self._forkserver_info: Optional[
Tuple[Any, Any, Any, Any, Any]] = None
tuple[Any, Any, Any, Any, Any]] = None
self._actoruid2nursery: dict[Optional[tuple[str, str]], 'ActorNursery'] = {} # type: ignore # noqa
async def wait_for_peer(
self, uid: Tuple[str, str]
) -> Tuple[trio.Event, Channel]:
self, uid: tuple[str, str]
) -> tuple[trio.Event, Channel]:
"""Wait for a connection back from a spawned actor with a given
``uid``.
"""
@ -1010,8 +1012,8 @@ class Actor:
async def _from_parent(
self,
parent_addr: Optional[Tuple[str, int]],
) -> Tuple[Channel, Optional[Tuple[str, int]]]:
parent_addr: Optional[tuple[str, int]],
) -> tuple[Channel, Optional[tuple[str, int]]]:
try:
# Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we
@ -1024,7 +1026,7 @@ class Actor:
# Initial handshake: swap names.
await self._do_handshake(chan)
accept_addr: Optional[Tuple[str, int]] = None
accept_addr: Optional[tuple[str, int]] = None
if self._spawn_method == "trio":
# Receive runtime state from our parent
@ -1066,7 +1068,7 @@ class Actor:
async def _async_main(
self,
accept_addr: Optional[Tuple[str, int]] = None,
accept_addr: Optional[tuple[str, int]] = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
@ -1075,7 +1077,7 @@ class Actor:
# change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as
# a subactor.
parent_addr: Optional[Tuple[str, int]] = None,
parent_addr: Optional[tuple[str, int]] = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -1261,7 +1263,7 @@ class Actor:
handler_nursery: trio.Nursery,
*,
# (host, port) to bind for channel server
accept_host: Tuple[str, int] = None,
accept_host: tuple[str, int] = None,
accept_port: int = 0,
task_status: TaskStatus[trio.Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -1273,7 +1275,7 @@ class Actor:
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
l: List[trio.abc.Listener] = await server_n.start(
l: list[trio.abc.Listener] = await server_n.start(
partial(
trio.serve_tcp,
self._stream_handler,
@ -1427,7 +1429,7 @@ class Actor:
self._server_n.cancel_scope.cancel()
@property
def accept_addr(self) -> Optional[Tuple[str, int]]:
def accept_addr(self) -> Optional[tuple[str, int]]:
"""Primary address to which the channel server is bound.
"""
# throws OSError on failure
@ -1438,7 +1440,7 @@ class Actor:
assert self._parent_chan, "No parent channel for this actor?"
return Portal(self._parent_chan)
def get_chans(self, uid: Tuple[str, str]) -> List[Channel]:
def get_chans(self, uid: tuple[str, str]) -> list[Channel]:
"""Return all channels to the actor with provided uid."""
return self._peers[uid]
@ -1446,7 +1448,7 @@ class Actor:
self,
chan: Channel
) -> Tuple[str, str]:
) -> tuple[str, str]:
"""Exchange (name, UUIDs) identifiers as the first communication step.
These are essentially the "mailbox addresses" found in actor model
@ -1454,7 +1456,7 @@ class Actor:
"""
await chan.send(self.uid)
value = await chan.recv()
uid: Tuple[str, str] = (str(value[0]), str(value[1]))
uid: tuple[str, str] = (str(value[0]), str(value[1]))
if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!")
@ -1483,14 +1485,14 @@ class Arbiter(Actor):
def __init__(self, *args, **kwargs):
self._registry: dict[
Tuple[str, str],
Tuple[str, int],
tuple[str, str],
tuple[str, int],
] = {}
self._waiters = {}
super().__init__(*args, **kwargs)
async def find_actor(self, name: str) -> Optional[Tuple[str, int]]:
async def find_actor(self, name: str) -> Optional[tuple[str, int]]:
for uid, sockaddr in self._registry.items():
if name in uid:
return sockaddr
@ -1499,25 +1501,31 @@ class Arbiter(Actor):
async def get_registry(
self
) -> dict[Tuple[str, str], Tuple[str, int]]:
'''Return current name registry.
) -> dict[str, tuple[str, int]]:
'''
Return current name registry.
This method is async to allow for cross-actor invocation.
'''
# NOTE: requires ``strict_map_key=False`` to the msgpack
# unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return self._registry
return {'.'.join(key): val for key, val in self._registry.items()}
async def wait_for_actor(
self,
name: str,
) -> List[Tuple[str, int]]:
'''Wait for a particular actor to register.
) -> list[tuple[str, int]]:
'''
Wait for a particular actor to register.
This is a blocking call if no actor by the provided name is currently
registered.
'''
sockaddrs = []
@ -1536,8 +1544,8 @@ class Arbiter(Actor):
async def register_actor(
self,
uid: Tuple[str, str],
sockaddr: Tuple[str, int]
uid: tuple[str, str],
sockaddr: tuple[str, int]
) -> None:
uid = name, uuid = (str(uid[0]), str(uid[1]))
@ -1552,7 +1560,8 @@ class Arbiter(Actor):
async def unregister_actor(
self,
uid: Tuple[str, str]
uid: tuple[str, str]
) -> None:
uid = (str(uid[0]), str(uid[1]))
self._registry.pop(uid)

View File

@ -96,7 +96,8 @@ class MsgTransport(Protocol[MsgType]):
class MsgpackTCPStream:
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
'''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgpack-python``.
'''
@ -120,12 +121,12 @@ class MsgpackTCPStream:
self.drained: list[dict] = []
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream.
"""
'''
Yield packets from the underlying stream.
'''
unpacker = msgpack.Unpacker(
raw=False,
use_list=False,
strict_map_key=False
)
while True:
try:
@ -222,8 +223,8 @@ class MsgspecTCPStream(MsgpackTCPStream):
self.prefix_size = prefix_size
# TODO: struct aware messaging coders
self.encode = msgspec.Encoder().encode
self.decode = msgspec.Decoder().decode # dict[str, Any])
self.encode = msgspec.msgpack.Encoder().encode
self.decode = msgspec.msgpack.Decoder().decode # dict[str, Any])
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
'''Yield packets from the underlying stream.

View File

@ -71,7 +71,7 @@ async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[tuple[T, ...], None]:
) -> AsyncGenerator[tuple[Optional[T], ...], None]:
'''
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
@ -84,7 +84,7 @@ async def gather_contexts(
entered and exited cancellation just works.
'''
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)
unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event()
parent_exit = trio.Event()