forked from goodboy/tractor
1
0
Fork 0

Mypy fixes to enforce uid tuple

msgspec_infect_asyncio
Tyler Goodlet 2021-07-01 14:52:52 -04:00
parent 7888de6070
commit 7d0541d864
2 changed files with 24 additions and 17 deletions

View File

@ -287,7 +287,7 @@ class Actor:
enable_modules: List[str] = [], enable_modules: List[str] = [],
uid: str = None, uid: str = None,
loglevel: str = None, loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = (None, None), arbiter_addr: Optional[Tuple[str, int]] = None,
spawn_method: Optional[str] = None spawn_method: Optional[str] = None
) -> None: ) -> None:
"""This constructor is called in the parent actor **before** the spawning """This constructor is called in the parent actor **before** the spawning
@ -317,7 +317,8 @@ class Actor:
# TODO: consider making this a dynamically defined # TODO: consider making this a dynamically defined
# @dataclass once we get py3.7 # @dataclass once we get py3.7
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = tuple(arbiter_addr)
self._arb_addr = arbiter_addr or (None, None)
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -796,7 +797,8 @@ class Actor:
# XXX: msgspec doesn't support serializing tuples # XXX: msgspec doesn't support serializing tuples
# so just cash manually here since it's what our # so just cash manually here since it's what our
# internals expect. # internals expect.
self._arb_addr = tuple(value) address: Tuple[str, int] = value
self._arb_addr = value
else: else:
setattr(self, attr, value) setattr(self, attr, value)
@ -1171,6 +1173,7 @@ class Actor:
async def _do_handshake( async def _do_handshake(
self, self,
chan: Channel chan: Channel
) -> Tuple[str, str]: ) -> Tuple[str, str]:
"""Exchange (name, UUIDs) identifiers as the first communication step. """Exchange (name, UUIDs) identifiers as the first communication step.
@ -1178,10 +1181,10 @@ class Actor:
parlance. parlance.
""" """
await chan.send(self.uid) await chan.send(self.uid)
uid: Tuple[str, str] = tuple(await chan.recv()) uid: Tuple[str, str] = await chan.recv()
# if not isinstance(uid, tuple): if not isinstance(uid, tuple):
# raise ValueError(f"{uid} is not a valid uid?!") raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid chan.uid = uid
log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
@ -1246,10 +1249,12 @@ class Arbiter(Actor):
return sockaddrs return sockaddrs
async def register_actor( async def register_actor(
self, uid: Tuple[str, str], sockaddr: Tuple[str, int] self,
uid: Tuple[str, str],
sockaddr: Tuple[str, str]
) -> None: ) -> None:
uid = tuple(uid) name, uuid = tuple(uid)
name, uuid = uid
self._registry[uid] = tuple(sockaddr) self._registry[uid] = tuple(sockaddr)
# pop and signal all waiter events # pop and signal all waiter events

View File

@ -160,7 +160,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
size, = struct.unpack("<I", header) size, = struct.unpack("<I", header)
log.trace(f'received header {size}') log.trace(f'received header {size}') # type: ignore
msg_bytes = await self.recv_stream.receive_exactly(size) msg_bytes = await self.recv_stream.receive_exactly(size)
@ -170,11 +170,11 @@ class MsgspecTCPStream(MsgpackTCPStream):
async def send(self, data: Any) -> None: async def send(self, data: Any) -> None:
async with self._send_lock: async with self._send_lock:
bytes_data = self.encode(data) bytes_data: bytes = self.encode(data)
# supposedly the fastest says, # supposedly the fastest says,
# https://stackoverflow.com/a/54027962 # https://stackoverflow.com/a/54027962
size: int = struct.pack("<I", len(bytes_data)) size: bytes = struct.pack("<I", len(bytes_data))
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
@ -197,17 +197,17 @@ class Channel:
self._recon_seq = on_reconnect self._recon_seq = on_reconnect
self._autorecon = auto_reconnect self._autorecon = auto_reconnect
stream_serializer_type = MsgpackTCPStream
try: try:
# if installed load the msgspec transport since it's faster # if installed load the msgspec transport since it's faster
import msgspec # noqa import msgspec # noqa
stream_serializer_type: type = MsgspecTCPStream stream_serializer_type = MsgspecTCPStream
except ImportError: except ImportError:
stream_serializer_type: type = MsgpackTCPStream pass
self.stream_serializer_type = stream_serializer_type self.stream_serializer_type = stream_serializer_type
self.msgstream: Optional[type] = stream_serializer_type( self.msgstream = stream_serializer_type(stream) if stream else None
stream) if stream else None
if self.msgstream and destaddr: if self.msgstream and destaddr:
raise ValueError( raise ValueError(
@ -215,8 +215,10 @@ class Channel:
) )
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid: Optional[Tuple[str, str]] = None self.uid: Optional[Tuple[str, str]] = None
# set if far end actor errors internally # set if far end actor errors internally
self._exc: Optional[Exception] = None self._exc: Optional[Exception] = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()