Ugh, appease mypy yet again

msgspec_infect_asyncio
Tyler Goodlet 2021-09-07 20:24:02 -04:00
parent 593fd24a9e
commit c188008844
3 changed files with 25 additions and 20 deletions

View File

@ -318,7 +318,7 @@ class Actor:
# @dataclass once we get py3.7 # @dataclass once we get py3.7
self.loglevel = loglevel self.loglevel = loglevel
self._arb_addr = arbiter_addr self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None
# marked by the process spawning backend at startup # marked by the process spawning backend at startup
# will be None for the parent most process started manually # will be None for the parent most process started manually
@ -780,6 +780,7 @@ class Actor:
if self._spawn_method == "trio": if self._spawn_method == "trio":
# Receive runtime state from our parent # Receive runtime state from our parent
parent_data: dict[str, Any]
parent_data = await chan.recv() parent_data = await chan.recv()
log.debug( log.debug(
"Received state from parent:\n" "Received state from parent:\n"
@ -797,12 +798,11 @@ class Actor:
for attr, value in parent_data.items(): for attr, value in parent_data.items():
if attr == '_arb_addr': if attr == '_arb_addr':
# XXX: msgspec doesn't support serializing tuples # XXX: ``msgspec`` doesn't support serializing tuples
# so just cash manually here since it's what our # so just cash manually here since it's what our
# internals expect. # internals expect.
self._arb_addr: Tuple[str, int] = ( value = tuple(value) if value else None
tuple(value) if value else value self._arb_addr = value
)
else: else:
setattr(self, attr, value) setattr(self, attr, value)
@ -1185,12 +1185,13 @@ class Actor:
parlance. parlance.
""" """
await chan.send(self.uid) await chan.send(self.uid)
uid: Tuple[str, str] = tuple(await chan.recv()) value = await chan.recv()
uid: Tuple[str, str] = (str(value[0]), str(value[1]))
if not isinstance(uid, tuple): if not isinstance(uid, tuple):
raise ValueError(f"{uid} is not a valid uid?!") raise ValueError(f"{uid} is not a valid uid?!")
chan.uid = uid chan.uid = str(uid[0]), str(uid[1])
log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
return uid return uid
@ -1208,7 +1209,10 @@ class Arbiter(Actor):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._registry = defaultdict(list) self._registry: Dict[
Tuple[str, str],
Tuple[str, int],
] = {}
self._waiters = {} self._waiters = {}
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -1222,7 +1226,7 @@ class Arbiter(Actor):
async def get_registry( async def get_registry(
self self
) -> Dict[str, Tuple[str, str]]: ) -> Dict[Tuple[str, str], Tuple[str, int]]:
'''Return current name registry. '''Return current name registry.
This method is async to allow for cross-actor invocation. This method is async to allow for cross-actor invocation.
@ -1231,7 +1235,7 @@ class Arbiter(Actor):
# unpacker since we have tuples as keys (not this makes the # unpacker since we have tuples as keys (not this makes the
# arbiter suscetible to hashdos): # arbiter suscetible to hashdos):
# https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
return dict(self._registry) return self._registry
async def wait_for_actor( async def wait_for_actor(
self, self,
@ -1260,11 +1264,11 @@ class Arbiter(Actor):
async def register_actor( async def register_actor(
self, self,
uid: Tuple[str, str], uid: Tuple[str, str],
sockaddr: Tuple[str, str] sockaddr: Tuple[str, int]
) -> None: ) -> None:
name, uuid = uid = tuple(uid) uid = name, uuid = (str(uid[0]), str(uid[1]))
self._registry[uid] = tuple(sockaddr) self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
# pop and signal all waiter events # pop and signal all waiter events
events = self._waiters.pop(name, ()) events = self._waiters.pop(name, ())
@ -1277,4 +1281,5 @@ class Arbiter(Actor):
self, self,
uid: Tuple[str, str] uid: Tuple[str, str]
) -> None: ) -> None:
self._registry.pop(tuple(uid)) uid = (str(uid[0]), str(uid[1]))
self._registry.pop(uid)

View File

@ -185,7 +185,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
# ignore decoding errors for now and assume they have to # ignore decoding errors for now and assume they have to
# do with a channel drop - hope that receiving from the # do with a channel drop - hope that receiving from the
# channel will raise an expected error and bubble up. # channel will raise an expected error and bubble up.
log.error(f'`msgspec` failed to decode!?\n{msg_bytes}') log.error(f'`msgspec` failed to decode!?')
last_decode_failed = True last_decode_failed = True
async def send(self, data: Any) -> None: async def send(self, data: Any) -> None:

View File

@ -21,8 +21,8 @@ from ._exceptions import is_multi_cancelled
# set at startup and after forks # set at startup and after forks
_default_arbiter_host = '127.0.0.1' _default_arbiter_host: str = '127.0.0.1'
_default_arbiter_port = 1616 _default_arbiter_port: int = 1616
logger = log.get_logger('tractor') logger = log.get_logger('tractor')
@ -32,7 +32,7 @@ logger = log.get_logger('tractor')
async def open_root_actor( async def open_root_actor(
# defaults are above # defaults are above
arbiter_addr: Tuple[str, int] = ( arbiter_addr: Optional[Tuple[str, int]] = (
_default_arbiter_host, _default_arbiter_host,
_default_arbiter_port, _default_arbiter_port,
), ),
@ -95,10 +95,10 @@ async def open_root_actor(
"Debug mode is only supported for the `trio` backend!" "Debug mode is only supported for the `trio` backend!"
) )
arbiter_addr = (host, port) = tuple(arbiter_addr or ( arbiter_addr = (host, port) = arbiter_addr or (
_default_arbiter_host, _default_arbiter_host,
_default_arbiter_port, _default_arbiter_port,
)) )
loglevel = loglevel or log.get_loglevel() loglevel = loglevel or log.get_loglevel()
if loglevel is not None: if loglevel is not None: