forked from goodboy/tractor
Mypy fixes to enforce uid tuple
parent
1e49066b16
commit
76f07898d9
|
@ -203,7 +203,7 @@ class Actor:
|
||||||
enable_modules: List[str] = [],
|
enable_modules: List[str] = [],
|
||||||
uid: str = None,
|
uid: str = None,
|
||||||
loglevel: str = None,
|
loglevel: str = None,
|
||||||
arbiter_addr: Optional[Tuple[str, int]] = (None, None),
|
arbiter_addr: Optional[Tuple[str, int]] = None,
|
||||||
spawn_method: Optional[str] = None
|
spawn_method: Optional[str] = None
|
||||||
) -> None:
|
) -> None:
|
||||||
"""This constructor is called in the parent actor **before** the spawning
|
"""This constructor is called in the parent actor **before** the spawning
|
||||||
|
@ -233,7 +233,8 @@ class Actor:
|
||||||
# TODO: consider making this a dynamically defined
|
# TODO: consider making this a dynamically defined
|
||||||
# @dataclass once we get py3.7
|
# @dataclass once we get py3.7
|
||||||
self.loglevel = loglevel
|
self.loglevel = loglevel
|
||||||
self._arb_addr = tuple(arbiter_addr)
|
|
||||||
|
self._arb_addr = arbiter_addr or (None, None)
|
||||||
|
|
||||||
# marked by the process spawning backend at startup
|
# marked by the process spawning backend at startup
|
||||||
# will be None for the parent most process started manually
|
# will be None for the parent most process started manually
|
||||||
|
@ -263,7 +264,7 @@ class Actor:
|
||||||
self._parent_chan: Optional[Channel] = None
|
self._parent_chan: Optional[Channel] = None
|
||||||
self._forkserver_info: Optional[
|
self._forkserver_info: Optional[
|
||||||
Tuple[Any, Any, Any, Any, Any]] = None
|
Tuple[Any, Any, Any, Any, Any]] = None
|
||||||
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore
|
self._actoruid2nursery: Dict[str, 'ActorNursery'] = {} # type: ignore # noqa
|
||||||
|
|
||||||
async def wait_for_peer(
|
async def wait_for_peer(
|
||||||
self, uid: Tuple[str, str]
|
self, uid: Tuple[str, str]
|
||||||
|
@ -668,7 +669,8 @@ class Actor:
|
||||||
# XXX: msgspec doesn't support serializing tuples
|
# XXX: msgspec doesn't support serializing tuples
|
||||||
# so just cash manually here since it's what our
|
# so just cash manually here since it's what our
|
||||||
# internals expect.
|
# internals expect.
|
||||||
self._arb_addr = tuple(value)
|
address: Tuple[str, int] = value
|
||||||
|
self._arb_addr = value
|
||||||
|
|
||||||
else:
|
else:
|
||||||
setattr(self, attr, value)
|
setattr(self, attr, value)
|
||||||
|
@ -1043,6 +1045,7 @@ class Actor:
|
||||||
async def _do_handshake(
|
async def _do_handshake(
|
||||||
self,
|
self,
|
||||||
chan: Channel
|
chan: Channel
|
||||||
|
|
||||||
) -> Tuple[str, str]:
|
) -> Tuple[str, str]:
|
||||||
"""Exchange (name, UUIDs) identifiers as the first communication step.
|
"""Exchange (name, UUIDs) identifiers as the first communication step.
|
||||||
|
|
||||||
|
@ -1050,10 +1053,10 @@ class Actor:
|
||||||
parlance.
|
parlance.
|
||||||
"""
|
"""
|
||||||
await chan.send(self.uid)
|
await chan.send(self.uid)
|
||||||
uid: Tuple[str, str] = tuple(await chan.recv())
|
uid: Tuple[str, str] = await chan.recv()
|
||||||
|
|
||||||
# if not isinstance(uid, tuple):
|
if not isinstance(uid, tuple):
|
||||||
# raise ValueError(f"{uid} is not a valid uid?!")
|
raise ValueError(f"{uid} is not a valid uid?!")
|
||||||
|
|
||||||
chan.uid = uid
|
chan.uid = uid
|
||||||
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
log.info(f"Handshake with actor {uid}@{chan.raddr} complete")
|
||||||
|
@ -1118,10 +1121,12 @@ class Arbiter(Actor):
|
||||||
return sockaddrs
|
return sockaddrs
|
||||||
|
|
||||||
async def register_actor(
|
async def register_actor(
|
||||||
self, uid: Tuple[str, str], sockaddr: Tuple[str, int]
|
self,
|
||||||
|
uid: Tuple[str, str],
|
||||||
|
sockaddr: Tuple[str, str]
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
uid = tuple(uid)
|
name, uuid = tuple(uid)
|
||||||
name, uuid = uid
|
|
||||||
self._registry[uid] = tuple(sockaddr)
|
self._registry[uid] = tuple(sockaddr)
|
||||||
|
|
||||||
# pop and signal all waiter events
|
# pop and signal all waiter events
|
||||||
|
|
|
@ -138,7 +138,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
|
||||||
|
|
||||||
size, = struct.unpack("<I", header)
|
size, = struct.unpack("<I", header)
|
||||||
|
|
||||||
log.trace(f'received header {size}')
|
log.trace(f'received header {size}') # type: ignore
|
||||||
|
|
||||||
msg_bytes = await self.recv_stream.receive_exactly(size)
|
msg_bytes = await self.recv_stream.receive_exactly(size)
|
||||||
|
|
||||||
|
@ -148,11 +148,11 @@ class MsgspecTCPStream(MsgpackTCPStream):
|
||||||
async def send(self, data: Any) -> None:
|
async def send(self, data: Any) -> None:
|
||||||
async with self._send_lock:
|
async with self._send_lock:
|
||||||
|
|
||||||
bytes_data = self.encode(data)
|
bytes_data: bytes = self.encode(data)
|
||||||
|
|
||||||
# supposedly the fastest says,
|
# supposedly the fastest says,
|
||||||
# https://stackoverflow.com/a/54027962
|
# https://stackoverflow.com/a/54027962
|
||||||
size: int = struct.pack("<I", len(bytes_data))
|
size: bytes = struct.pack("<I", len(bytes_data))
|
||||||
|
|
||||||
return await self.stream.send_all(size + bytes_data)
|
return await self.stream.send_all(size + bytes_data)
|
||||||
|
|
||||||
|
@ -175,17 +175,17 @@ class Channel:
|
||||||
self._recon_seq = on_reconnect
|
self._recon_seq = on_reconnect
|
||||||
self._autorecon = auto_reconnect
|
self._autorecon = auto_reconnect
|
||||||
|
|
||||||
|
stream_serializer_type = MsgpackTCPStream
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# if installed load the msgspec transport since it's faster
|
# if installed load the msgspec transport since it's faster
|
||||||
import msgspec # noqa
|
import msgspec # noqa
|
||||||
stream_serializer_type: type = MsgspecTCPStream
|
stream_serializer_type = MsgspecTCPStream
|
||||||
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
stream_serializer_type: type = MsgpackTCPStream
|
pass
|
||||||
|
|
||||||
self.stream_serializer_type = stream_serializer_type
|
self.stream_serializer_type = stream_serializer_type
|
||||||
self.msgstream: Optional[type] = stream_serializer_type(
|
self.msgstream = stream_serializer_type(stream) if stream else None
|
||||||
stream) if stream else None
|
|
||||||
|
|
||||||
if self.msgstream and destaddr:
|
if self.msgstream and destaddr:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
|
@ -193,8 +193,10 @@ class Channel:
|
||||||
)
|
)
|
||||||
|
|
||||||
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
|
self._destaddr = self.msgstream.raddr if self.msgstream else destaddr
|
||||||
|
|
||||||
# set after handshake - always uid of far end
|
# set after handshake - always uid of far end
|
||||||
self.uid: Optional[Tuple[str, str]] = None
|
self.uid: Optional[Tuple[str, str]] = None
|
||||||
|
|
||||||
# set if far end actor errors internally
|
# set if far end actor errors internally
|
||||||
self._exc: Optional[Exception] = None
|
self._exc: Optional[Exception] = None
|
||||||
self._agen = self._aiter_recv()
|
self._agen = self._aiter_recv()
|
||||||
|
|
Loading…
Reference in New Issue