diff --git a/tractor/_actor.py b/tractor/_actor.py index deeda73..43ee2f3 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -318,7 +318,7 @@ class Actor: # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = arbiter_addr + self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -780,6 +780,7 @@ class Actor: if self._spawn_method == "trio": # Receive runtime state from our parent + parent_data: dict[str, Any] parent_data = await chan.recv() log.debug( "Received state from parent:\n" @@ -797,12 +798,11 @@ class Actor: for attr, value in parent_data.items(): if attr == '_arb_addr': - # XXX: msgspec doesn't support serializing tuples + # XXX: ``msgspec`` doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - self._arb_addr: Tuple[str, int] = ( - tuple(value) if value else value - ) + value = tuple(value) if value else None + self._arb_addr = value else: setattr(self, attr, value) @@ -1185,12 +1185,13 @@ class Actor: parlance. """ await chan.send(self.uid) - uid: Tuple[str, str] = tuple(await chan.recv()) + value = await chan.recv() + uid: Tuple[str, str] = (str(value[0]), str(value[1])) if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") - chan.uid = uid + chan.uid = str(uid[0]), str(uid[1]) log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid @@ -1208,7 +1209,10 @@ class Arbiter(Actor): def __init__(self, *args, **kwargs): - self._registry = defaultdict(list) + self._registry: Dict[ + Tuple[str, str], + Tuple[str, int], + ] = {} self._waiters = {} super().__init__(*args, **kwargs) @@ -1222,7 +1226,7 @@ class Arbiter(Actor): async def get_registry( self - ) -> Dict[str, Tuple[str, str]]: + ) -> Dict[Tuple[str, str], Tuple[str, int]]: '''Return current name registry. This method is async to allow for cross-actor invocation. @@ -1231,7 +1235,7 @@ class Arbiter(Actor): # unpacker since we have tuples as keys (not this makes the # arbiter suscetible to hashdos): # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 - return dict(self._registry) + return self._registry async def wait_for_actor( self, @@ -1260,11 +1264,11 @@ class Arbiter(Actor): async def register_actor( self, uid: Tuple[str, str], - sockaddr: Tuple[str, str] + sockaddr: Tuple[str, int] ) -> None: - name, uuid = uid = tuple(uid) - self._registry[uid] = tuple(sockaddr) + uid = name, uuid = (str(uid[0]), str(uid[1])) + self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1])) # pop and signal all waiter events events = self._waiters.pop(name, ()) @@ -1277,4 +1281,5 @@ class Arbiter(Actor): self, uid: Tuple[str, str] ) -> None: - self._registry.pop(tuple(uid)) + uid = (str(uid[0]), str(uid[1])) + self._registry.pop(uid) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 6296d94..e420509 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -185,7 +185,7 @@ class MsgspecTCPStream(MsgpackTCPStream): # ignore decoding errors for now and assume they have to # do with a channel drop - hope that receiving from the # channel will raise an expected error and bubble up. - log.error(f'`msgspec` failed to decode!?\n{msg_bytes}') + log.error(f'`msgspec` failed to decode!?') last_decode_failed = True async def send(self, data: Any) -> None: diff --git a/tractor/_root.py b/tractor/_root.py index bbf337a..b153755 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -21,8 +21,8 @@ from ._exceptions import is_multi_cancelled # set at startup and after forks -_default_arbiter_host = '127.0.0.1' -_default_arbiter_port = 1616 +_default_arbiter_host: str = '127.0.0.1' +_default_arbiter_port: int = 1616 logger = log.get_logger('tractor') @@ -32,7 +32,7 @@ logger = log.get_logger('tractor') async def open_root_actor( # defaults are above - arbiter_addr: Tuple[str, int] = ( + arbiter_addr: Optional[Tuple[str, int]] = ( _default_arbiter_host, _default_arbiter_port, ), @@ -95,10 +95,10 @@ async def open_root_actor( "Debug mode is only supported for the `trio` backend!" ) - arbiter_addr = (host, port) = tuple(arbiter_addr or ( + arbiter_addr = (host, port) = arbiter_addr or ( _default_arbiter_host, _default_arbiter_port, - )) + ) loglevel = loglevel or log.get_loglevel() if loglevel is not None: