forked from goodboy/tractor
				
			Ugh, appease mypy yet again
							parent
							
								
									076f37c589
								
							
						
					
					
						commit
						1382ad653d
					
				| 
						 | 
				
			
			@ -318,7 +318,7 @@ class Actor:
 | 
			
		|||
        # @dataclass once we get py3.7
 | 
			
		||||
        self.loglevel = loglevel
 | 
			
		||||
 | 
			
		||||
        self._arb_addr = arbiter_addr
 | 
			
		||||
        self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None
 | 
			
		||||
 | 
			
		||||
        # marked by the process spawning backend at startup
 | 
			
		||||
        # will be None for the parent most process started manually
 | 
			
		||||
| 
						 | 
				
			
			@ -780,6 +780,7 @@ class Actor:
 | 
			
		|||
 | 
			
		||||
            if self._spawn_method == "trio":
 | 
			
		||||
                # Receive runtime state from our parent
 | 
			
		||||
                parent_data: dict[str, Any]
 | 
			
		||||
                parent_data = await chan.recv()
 | 
			
		||||
                log.debug(
 | 
			
		||||
                    "Received state from parent:\n"
 | 
			
		||||
| 
						 | 
				
			
			@ -797,12 +798,11 @@ class Actor:
 | 
			
		|||
                for attr, value in parent_data.items():
 | 
			
		||||
 | 
			
		||||
                    if attr == '_arb_addr':
 | 
			
		||||
                        # XXX: msgspec doesn't support serializing tuples
 | 
			
		||||
                        # XXX: ``msgspec`` doesn't support serializing tuples
 | 
			
		||||
                        # so just cash manually here since it's what our
 | 
			
		||||
                        # internals expect.
 | 
			
		||||
                        self._arb_addr: Tuple[str, int] = (
 | 
			
		||||
                            tuple(value) if value else value
 | 
			
		||||
                        )
 | 
			
		||||
                        value = tuple(value) if value else None
 | 
			
		||||
                        self._arb_addr = value
 | 
			
		||||
 | 
			
		||||
                    else:
 | 
			
		||||
                        setattr(self, attr, value)
 | 
			
		||||
| 
						 | 
				
			
			@ -1185,12 +1185,13 @@ class Actor:
 | 
			
		|||
        parlance.
 | 
			
		||||
        """
 | 
			
		||||
        await chan.send(self.uid)
 | 
			
		||||
        uid: Tuple[str, str] = tuple(await chan.recv())
 | 
			
		||||
        value = await chan.recv()
 | 
			
		||||
        uid: Tuple[str, str] = (str(value[0]), str(value[1]))
 | 
			
		||||
 | 
			
		||||
        if not isinstance(uid, tuple):
 | 
			
		||||
            raise ValueError(f"{uid} is not a valid uid?!")
 | 
			
		||||
 | 
			
		||||
        chan.uid = uid
 | 
			
		||||
        chan.uid = str(uid[0]), str(uid[1])
 | 
			
		||||
        log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete")
 | 
			
		||||
        return uid
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1208,7 +1209,10 @@ class Arbiter(Actor):
 | 
			
		|||
 | 
			
		||||
    def __init__(self, *args, **kwargs):
 | 
			
		||||
 | 
			
		||||
        self._registry = defaultdict(list)
 | 
			
		||||
        self._registry: Dict[
 | 
			
		||||
            Tuple[str, str],
 | 
			
		||||
            Tuple[str, int],
 | 
			
		||||
        ] = {}
 | 
			
		||||
        self._waiters = {}
 | 
			
		||||
 | 
			
		||||
        super().__init__(*args, **kwargs)
 | 
			
		||||
| 
						 | 
				
			
			@ -1222,7 +1226,7 @@ class Arbiter(Actor):
 | 
			
		|||
 | 
			
		||||
    async def get_registry(
 | 
			
		||||
        self
 | 
			
		||||
    ) -> Dict[str, Tuple[str, str]]:
 | 
			
		||||
    ) -> Dict[Tuple[str, str], Tuple[str, int]]:
 | 
			
		||||
        '''Return current name registry.
 | 
			
		||||
 | 
			
		||||
        This method is async to allow for cross-actor invocation.
 | 
			
		||||
| 
						 | 
				
			
			@ -1231,7 +1235,7 @@ class Arbiter(Actor):
 | 
			
		|||
        # unpacker since we have tuples as keys (not this makes the
 | 
			
		||||
        # arbiter suscetible to hashdos):
 | 
			
		||||
        # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10
 | 
			
		||||
        return dict(self._registry)
 | 
			
		||||
        return self._registry
 | 
			
		||||
 | 
			
		||||
    async def wait_for_actor(
 | 
			
		||||
        self,
 | 
			
		||||
| 
						 | 
				
			
			@ -1260,11 +1264,11 @@ class Arbiter(Actor):
 | 
			
		|||
    async def register_actor(
 | 
			
		||||
        self,
 | 
			
		||||
        uid: Tuple[str, str],
 | 
			
		||||
        sockaddr: Tuple[str, str]
 | 
			
		||||
        sockaddr: Tuple[str, int]
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        name, uuid = uid = tuple(uid)
 | 
			
		||||
        self._registry[uid] = tuple(sockaddr)
 | 
			
		||||
        uid = name, uuid = (str(uid[0]), str(uid[1]))
 | 
			
		||||
        self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1]))
 | 
			
		||||
 | 
			
		||||
        # pop and signal all waiter events
 | 
			
		||||
        events = self._waiters.pop(name, ())
 | 
			
		||||
| 
						 | 
				
			
			@ -1277,4 +1281,5 @@ class Arbiter(Actor):
 | 
			
		|||
        self,
 | 
			
		||||
        uid: Tuple[str, str]
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        self._registry.pop(tuple(uid))
 | 
			
		||||
        uid = (str(uid[0]), str(uid[1]))
 | 
			
		||||
        self._registry.pop(uid)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -185,7 +185,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
 | 
			
		|||
                # ignore decoding errors for now and assume they have to
 | 
			
		||||
                # do with a channel drop - hope that receiving from the
 | 
			
		||||
                # channel will raise an expected error and bubble up.
 | 
			
		||||
                log.error(f'`msgspec` failed to decode!?\n{msg_bytes}')
 | 
			
		||||
                log.error(f'`msgspec` failed to decode!?')
 | 
			
		||||
                last_decode_failed = True
 | 
			
		||||
 | 
			
		||||
    async def send(self, data: Any) -> None:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -21,8 +21,8 @@ from ._exceptions import is_multi_cancelled
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
# set at startup and after forks
 | 
			
		||||
_default_arbiter_host = '127.0.0.1'
 | 
			
		||||
_default_arbiter_port = 1616
 | 
			
		||||
_default_arbiter_host: str = '127.0.0.1'
 | 
			
		||||
_default_arbiter_port: int = 1616
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
logger = log.get_logger('tractor')
 | 
			
		||||
| 
						 | 
				
			
			@ -32,7 +32,7 @@ logger = log.get_logger('tractor')
 | 
			
		|||
async def open_root_actor(
 | 
			
		||||
 | 
			
		||||
    # defaults are above
 | 
			
		||||
    arbiter_addr: Tuple[str, int] = (
 | 
			
		||||
    arbiter_addr: Optional[Tuple[str, int]] = (
 | 
			
		||||
        _default_arbiter_host,
 | 
			
		||||
        _default_arbiter_port,
 | 
			
		||||
    ),
 | 
			
		||||
| 
						 | 
				
			
			@ -95,10 +95,10 @@ async def open_root_actor(
 | 
			
		|||
            "Debug mode is only supported for the `trio` backend!"
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    arbiter_addr = (host, port) = tuple(arbiter_addr or (
 | 
			
		||||
    arbiter_addr = (host, port) = arbiter_addr or (
 | 
			
		||||
        _default_arbiter_host,
 | 
			
		||||
        _default_arbiter_port,
 | 
			
		||||
    ))
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    loglevel = loglevel or log.get_loglevel()
 | 
			
		||||
    if loglevel is not None:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue