Finally switch to using address protocol in all runtime
							parent
							
								
									5f50206d84
								
							
						
					
					
						commit
						76cee99fc2
					
				|  | @ -9,7 +9,7 @@ async def main(service_name): | |||
|     async with tractor.open_nursery() as an: | ||||
|         await an.start_actor(service_name) | ||||
| 
 | ||||
|         async with tractor.get_registry('127.0.0.1', 1616) as portal: | ||||
|         async with tractor.get_registry(('127.0.0.1', 1616)) as portal: | ||||
|             print(f"Arbiter is listening on {portal.channel}") | ||||
| 
 | ||||
|         async with tractor.wait_for_actor(service_name) as sockaddr: | ||||
|  |  | |||
|  | @ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr): | |||
|         portal = await n.start_actor('actor', enable_modules=[__name__]) | ||||
|         uid = portal.channel.uid | ||||
| 
 | ||||
|         async with tractor.get_registry(*reg_addr) as aportal: | ||||
|         async with tractor.get_registry(reg_addr) as aportal: | ||||
|             # this local actor should be the arbiter | ||||
|             assert actor is aportal.actor | ||||
| 
 | ||||
|  | @ -160,7 +160,7 @@ async def spawn_and_check_registry( | |||
|     async with tractor.open_root_actor( | ||||
|         registry_addrs=[reg_addr], | ||||
|     ): | ||||
|         async with tractor.get_registry(*reg_addr) as portal: | ||||
|         async with tractor.get_registry(reg_addr) as portal: | ||||
|             # runtime needs to be up to call this | ||||
|             actor = tractor.current_actor() | ||||
| 
 | ||||
|  | @ -300,7 +300,7 @@ async def close_chans_before_nursery( | |||
|     async with tractor.open_root_actor( | ||||
|         registry_addrs=[reg_addr], | ||||
|     ): | ||||
|         async with tractor.get_registry(*reg_addr) as aportal: | ||||
|         async with tractor.get_registry(reg_addr) as aportal: | ||||
|             try: | ||||
|                 get_reg = partial(unpack_reg, aportal) | ||||
| 
 | ||||
|  |  | |||
|  | @ -871,7 +871,7 @@ async def serve_subactors( | |||
|                 ) | ||||
|                 await ipc.send(( | ||||
|                     peer.chan.uid, | ||||
|                     peer.chan.raddr, | ||||
|                     peer.chan.raddr.unwrap(), | ||||
|                 )) | ||||
| 
 | ||||
|         print('Spawner exiting spawn serve loop!') | ||||
|  |  | |||
|  | @ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr): | |||
|     "Verify waiting on the arbiter to register itself using a local portal." | ||||
|     actor = tractor.current_actor() | ||||
|     assert actor.is_arbiter | ||||
|     async with tractor.get_registry(*reg_addr) as portal: | ||||
|     async with tractor.get_registry(reg_addr) as portal: | ||||
|         assert isinstance(portal, tractor._portal.LocalPortal) | ||||
| 
 | ||||
|         with trio.fail_after(0.2): | ||||
|  |  | |||
|  | @ -32,7 +32,7 @@ def test_abort_on_sigint(daemon): | |||
| @tractor_test | ||||
| async def test_cancel_remote_arbiter(daemon, reg_addr): | ||||
|     assert not tractor.current_actor().is_arbiter | ||||
|     async with tractor.get_registry(*reg_addr) as portal: | ||||
|     async with tractor.get_registry(reg_addr) as portal: | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
|     time.sleep(0.1) | ||||
|  | @ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr): | |||
| 
 | ||||
|     # no arbiter socket should exist | ||||
|     with pytest.raises(OSError): | ||||
|         async with tractor.get_registry(*reg_addr) as portal: | ||||
|         async with tractor.get_registry(reg_addr) as portal: | ||||
|             pass | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -77,7 +77,7 @@ async def movie_theatre_question(): | |||
| async def test_movie_theatre_convo(start_method): | ||||
|     """The main ``tractor`` routine. | ||||
|     """ | ||||
|     async with tractor.open_nursery() as n: | ||||
|     async with tractor.open_nursery(debug_mode=True) as n: | ||||
| 
 | ||||
|         portal = await n.start_actor( | ||||
|             'frank', | ||||
|  |  | |||
|  | @ -0,0 +1,301 @@ | |||
| # tractor: structured concurrent "actors". | ||||
| # Copyright 2018-eternity Tyler Goodlet. | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| from __future__ import annotations | ||||
| import tempfile | ||||
| from uuid import uuid4 | ||||
| from typing import ( | ||||
|     Protocol, | ||||
|     ClassVar, | ||||
|     TypeVar, | ||||
|     Union, | ||||
|     Type | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| from trio import socket | ||||
| 
 | ||||
| 
 | ||||
| NamespaceType = TypeVar('NamespaceType') | ||||
| AddressType = TypeVar('AddressType') | ||||
| StreamType = TypeVar('StreamType') | ||||
| ListenerType = TypeVar('ListenerType') | ||||
| 
 | ||||
| 
 | ||||
| class Address(Protocol[ | ||||
|     NamespaceType, | ||||
|     AddressType, | ||||
|     StreamType, | ||||
|     ListenerType | ||||
| ]): | ||||
| 
 | ||||
|     name_key: ClassVar[str] | ||||
|     address_type: ClassVar[Type[AddressType]] | ||||
| 
 | ||||
|     @property | ||||
|     def is_valid(self) -> bool: | ||||
|         ... | ||||
| 
 | ||||
|     @property | ||||
|     def namespace(self) -> NamespaceType|None: | ||||
|         ... | ||||
| 
 | ||||
|     @classmethod | ||||
|     def from_addr(cls, addr: AddressType) -> Address: | ||||
|         ... | ||||
| 
 | ||||
|     def unwrap(self) -> AddressType: | ||||
|         ... | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_random(cls, namespace: NamespaceType | None = None) -> Address: | ||||
|         ... | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_root(cls) -> Address: | ||||
|         ... | ||||
| 
 | ||||
|     def __repr__(self) -> str: | ||||
|         ... | ||||
| 
 | ||||
|     def __eq__(self, other) -> bool: | ||||
|         ... | ||||
| 
 | ||||
|     async def open_stream(self, **kwargs) -> StreamType: | ||||
|         ... | ||||
| 
 | ||||
|     async def open_listener(self, **kwargs) -> ListenerType: | ||||
|         ... | ||||
| 
 | ||||
| 
 | ||||
| class TCPAddress(Address[ | ||||
|     str, | ||||
|     tuple[str, int], | ||||
|     trio.SocketStream, | ||||
|     trio.SocketListener | ||||
| ]): | ||||
| 
 | ||||
|     name_key: str = 'tcp' | ||||
|     address_type: type = tuple[str, int] | ||||
| 
 | ||||
|     def __init__( | ||||
|         self, | ||||
|         host: str, | ||||
|         port: int | ||||
|     ): | ||||
|         if ( | ||||
|             not isinstance(host, str) | ||||
|             or | ||||
|             not isinstance(port, int) | ||||
|         ): | ||||
|             raise TypeError(f'Expected host {host} to be str and port {port} to be int') | ||||
|         self._host = host | ||||
|         self._port = port | ||||
| 
 | ||||
|     @property | ||||
|     def is_valid(self) -> bool: | ||||
|         return self._port != 0 | ||||
| 
 | ||||
|     @property | ||||
|     def namespace(self) -> str: | ||||
|         return self._host | ||||
| 
 | ||||
|     @classmethod | ||||
|     def from_addr(cls, addr: tuple[str, int]) -> TCPAddress: | ||||
|         return TCPAddress(addr[0], addr[1]) | ||||
| 
 | ||||
|     def unwrap(self) -> tuple[str, int]: | ||||
|         return self._host, self._port | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_random(cls, namespace: str = '127.0.0.1') -> TCPAddress: | ||||
|         return TCPAddress(namespace, 0) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_root(cls) -> Address: | ||||
|         return TCPAddress('127.0.0.1', 1616) | ||||
| 
 | ||||
|     def __repr__(self) -> str: | ||||
|         return f'{type(self)} @ {self.unwrap()}' | ||||
| 
 | ||||
|     def __eq__(self, other) -> bool: | ||||
|         if not isinstance(other, TCPAddress): | ||||
|             raise TypeError( | ||||
|                 f'Can not compare {type(other)} with {type(self)}' | ||||
|             ) | ||||
| 
 | ||||
|         return ( | ||||
|             self._host == other._host | ||||
|             and | ||||
|             self._port == other._port | ||||
|         ) | ||||
| 
 | ||||
|     async def open_stream(self, **kwargs) -> trio.SocketStream: | ||||
|         stream = await trio.open_tcp_stream( | ||||
|             self._host, | ||||
|             self._port, | ||||
|             **kwargs | ||||
|         ) | ||||
|         self._host, self._port = stream.socket.getsockname()[:2] | ||||
|         return stream | ||||
| 
 | ||||
|     async def open_listener(self, **kwargs) -> trio.SocketListener: | ||||
|         listeners = await trio.open_tcp_listeners( | ||||
|             host=self._host, | ||||
|             port=self._port, | ||||
|             **kwargs | ||||
|         ) | ||||
|         assert len(listeners) == 1 | ||||
|         listener = listeners[0] | ||||
|         self._host, self._port = listener.socket.getsockname()[:2] | ||||
|         return listener | ||||
| 
 | ||||
| 
 | ||||
| class UDSAddress(Address[ | ||||
|     None, | ||||
|     str, | ||||
|     trio.SocketStream, | ||||
|     trio.SocketListener | ||||
| ]): | ||||
| 
 | ||||
|     name_key: str = 'uds' | ||||
|     address_type: type = str | ||||
| 
 | ||||
|     def __init__( | ||||
|         self, | ||||
|         filepath: str | ||||
|     ): | ||||
|         self._filepath = filepath | ||||
| 
 | ||||
|     @property | ||||
|     def is_valid(self) -> bool: | ||||
|         return True | ||||
| 
 | ||||
|     @property | ||||
|     def namespace(self) -> None: | ||||
|         return | ||||
| 
 | ||||
|     @classmethod | ||||
|     def from_addr(cls, filepath: str) -> UDSAddress: | ||||
|         return UDSAddress(filepath) | ||||
| 
 | ||||
|     def unwrap(self) -> str: | ||||
|         return self._filepath | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_random(cls, _ns: None = None) -> UDSAddress: | ||||
|         return UDSAddress(f'{tempfile.gettempdir()}/{uuid4().sock}') | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_root(cls) -> Address: | ||||
|         return UDSAddress('tractor.sock') | ||||
| 
 | ||||
|     def __repr__(self) -> str: | ||||
|         return f'{type(self)} @ {self._filepath}' | ||||
| 
 | ||||
|     def __eq__(self, other) -> bool: | ||||
|         if not isinstance(other, UDSAddress): | ||||
|             raise TypeError( | ||||
|                 f'Can not compare {type(other)} with {type(self)}' | ||||
|             ) | ||||
| 
 | ||||
|         return self._filepath == other._filepath | ||||
| 
 | ||||
|     async def open_stream(self, **kwargs) -> trio.SocketStream: | ||||
|         stream = await trio.open_tcp_stream( | ||||
|             self._filepath, | ||||
|             **kwargs | ||||
|         ) | ||||
|         self._binded = True | ||||
|         return stream | ||||
| 
 | ||||
|     async def open_listener(self, **kwargs) -> trio.SocketListener: | ||||
|         sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||||
|         sock.bind(self._filepath) | ||||
|         sock.listen() | ||||
|         self._binded = True | ||||
|         return trio.SocketListener(sock) | ||||
| 
 | ||||
| 
 | ||||
| preferred_transport = 'tcp' | ||||
| 
 | ||||
| 
 | ||||
| _address_types = ( | ||||
|     TCPAddress, | ||||
|     UDSAddress | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
| _default_addrs: dict[str, Type[Address]] = { | ||||
|     cls.name_key: cls | ||||
|     for cls in _address_types | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| AddressTypes = Union[ | ||||
|     tuple([ | ||||
|         cls.address_type | ||||
|         for cls in _address_types | ||||
|     ]) | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| _default_lo_addrs: dict[ | ||||
|     str, | ||||
|     AddressTypes | ||||
| ] = { | ||||
|     cls.name_key: cls.get_root().unwrap() | ||||
|     for cls in _address_types | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| def get_address_cls(name: str) -> Type[Address]: | ||||
|     return _default_addrs[name] | ||||
| 
 | ||||
| 
 | ||||
| def is_wrapped_addr(addr: any) -> bool: | ||||
|     return type(addr) in _address_types | ||||
| 
 | ||||
| 
 | ||||
| def wrap_address(addr: AddressTypes) -> Address: | ||||
| 
 | ||||
|     if is_wrapped_addr(addr): | ||||
|         return addr | ||||
| 
 | ||||
|     cls = None | ||||
|     match addr: | ||||
|         case str(): | ||||
|             cls = UDSAddress | ||||
| 
 | ||||
|         case tuple() | list(): | ||||
|             cls = TCPAddress | ||||
| 
 | ||||
|         case None: | ||||
|             cls = get_address_cls(preferred_transport) | ||||
|             addr = cls.get_root().unwrap() | ||||
| 
 | ||||
|         case _: | ||||
|             raise TypeError( | ||||
|                 f'Can not wrap addr {addr} of type {type(addr)}' | ||||
|             ) | ||||
| 
 | ||||
|     return cls.from_addr(addr) | ||||
| 
 | ||||
| 
 | ||||
| def default_lo_addrs(transports: list[str]) -> list[AddressTypes]: | ||||
|     return [ | ||||
|         _default_lo_addrs[transport] | ||||
|         for transport in transports | ||||
|     ] | ||||
|  | @ -31,8 +31,7 @@ def parse_uid(arg): | |||
|     return str(name), str(uuid)  # ensures str encoding | ||||
| 
 | ||||
| def parse_ipaddr(arg): | ||||
|     host, port = literal_eval(arg) | ||||
|     return (str(host), int(port)) | ||||
|     return literal_eval(arg) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == "__main__": | ||||
|  |  | |||
|  | @ -859,19 +859,10 @@ class Context: | |||
|     @property | ||||
|     def dst_maddr(self) -> str: | ||||
|         chan: Channel = self.chan | ||||
|         dst_addr, dst_port = chan.raddr | ||||
|         trans: MsgTransport = chan.transport | ||||
|         # cid: str = self.cid | ||||
|         # cid_head, cid_tail = cid[:6], cid[-6:] | ||||
|         return ( | ||||
|             f'/ipv4/{dst_addr}' | ||||
|             f'/{trans.name_key}/{dst_port}' | ||||
|             # f'/{self.chan.uid[0]}' | ||||
|             # f'/{self.cid}' | ||||
| 
 | ||||
|             # f'/cid={cid_head}..{cid_tail}' | ||||
|             # TODO: ? not use this ^ right ? | ||||
|         ) | ||||
|         return trans.maddr | ||||
| 
 | ||||
|     dmaddr = dst_maddr | ||||
| 
 | ||||
|  |  | |||
|  | @ -30,6 +30,12 @@ from contextlib import asynccontextmanager as acm | |||
| from tractor.log import get_logger | ||||
| from .trionics import gather_contexts | ||||
| from .ipc import _connect_chan, Channel | ||||
| from ._addr import ( | ||||
|     AddressTypes, | ||||
|     Address, | ||||
|     preferred_transport, | ||||
|     wrap_address | ||||
| ) | ||||
| from ._portal import ( | ||||
|     Portal, | ||||
|     open_portal, | ||||
|  | @ -48,11 +54,7 @@ log = get_logger(__name__) | |||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def get_registry( | ||||
|     host: str, | ||||
|     port: int, | ||||
| 
 | ||||
| ) -> AsyncGenerator[ | ||||
| async def get_registry(addr: AddressTypes) -> AsyncGenerator[ | ||||
|     Portal | LocalPortal | None, | ||||
|     None, | ||||
| ]: | ||||
|  | @ -69,13 +71,13 @@ async def get_registry( | |||
|         # (likely a re-entrant call from the arbiter actor) | ||||
|         yield LocalPortal( | ||||
|             actor, | ||||
|             Channel((host, port)) | ||||
|             await Channel.from_addr(addr) | ||||
|         ) | ||||
|     else: | ||||
|         # TODO: try to look pre-existing connection from | ||||
|         # `Actor._peers` and use it instead? | ||||
|         async with ( | ||||
|             _connect_chan((host, port)) as chan, | ||||
|             _connect_chan(addr) as chan, | ||||
|             open_portal(chan) as regstr_ptl, | ||||
|         ): | ||||
|             yield regstr_ptl | ||||
|  | @ -89,11 +91,10 @@ async def get_root( | |||
| 
 | ||||
|     # TODO: rename mailbox to `_root_maddr` when we finally | ||||
|     # add and impl libp2p multi-addrs? | ||||
|     host, port = _runtime_vars['_root_mailbox'] | ||||
|     assert host is not None | ||||
|     addr = _runtime_vars['_root_mailbox'] | ||||
| 
 | ||||
|     async with ( | ||||
|         _connect_chan((host, port)) as chan, | ||||
|         _connect_chan(addr) as chan, | ||||
|         open_portal(chan, **kwargs) as portal, | ||||
|     ): | ||||
|         yield portal | ||||
|  | @ -134,10 +135,10 @@ def get_peer_by_name( | |||
| @acm | ||||
| async def query_actor( | ||||
|     name: str, | ||||
|     regaddr: tuple[str, int]|None = None, | ||||
|     regaddr: AddressTypes|None = None, | ||||
| 
 | ||||
| ) -> AsyncGenerator[ | ||||
|     tuple[str, int]|None, | ||||
|     AddressTypes|None, | ||||
|     None, | ||||
| ]: | ||||
|     ''' | ||||
|  | @ -163,31 +164,31 @@ async def query_actor( | |||
|         return | ||||
| 
 | ||||
|     reg_portal: Portal | ||||
|     regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0] | ||||
|     async with get_registry(*regaddr) as reg_portal: | ||||
|     regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0] | ||||
|     async with get_registry(regaddr) as reg_portal: | ||||
|         # TODO: return portals to all available actors - for now | ||||
|         # just the last one that registered | ||||
|         sockaddr: tuple[str, int] = await reg_portal.run_from_ns( | ||||
|         addr: AddressTypes = await reg_portal.run_from_ns( | ||||
|             'self', | ||||
|             'find_actor', | ||||
|             name=name, | ||||
|         ) | ||||
|         yield sockaddr | ||||
|         yield addr | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def maybe_open_portal( | ||||
|     addr: tuple[str, int], | ||||
|     addr: AddressTypes, | ||||
|     name: str, | ||||
| ): | ||||
|     async with query_actor( | ||||
|         name=name, | ||||
|         regaddr=addr, | ||||
|     ) as sockaddr: | ||||
|     ) as addr: | ||||
|         pass | ||||
| 
 | ||||
|     if sockaddr: | ||||
|         async with _connect_chan(sockaddr) as chan: | ||||
|     if addr: | ||||
|         async with _connect_chan(addr) as chan: | ||||
|             async with open_portal(chan) as portal: | ||||
|                 yield portal | ||||
|     else: | ||||
|  | @ -197,7 +198,8 @@ async def maybe_open_portal( | |||
| @acm | ||||
| async def find_actor( | ||||
|     name: str, | ||||
|     registry_addrs: list[tuple[str, int]]|None = None, | ||||
|     registry_addrs: list[AddressTypes]|None = None, | ||||
|     enable_transports: list[str] = [preferred_transport], | ||||
| 
 | ||||
|     only_first: bool = True, | ||||
|     raise_on_none: bool = False, | ||||
|  | @ -224,15 +226,15 @@ async def find_actor( | |||
|         # XXX NOTE: make sure to dynamically read the value on | ||||
|         # every call since something may change it globally (eg. | ||||
|         # like in our discovery test suite)! | ||||
|         from . import _root | ||||
|         from ._addr import default_lo_addrs | ||||
|         registry_addrs = ( | ||||
|             _runtime_vars['_registry_addrs'] | ||||
|             or | ||||
|             _root._default_lo_addrs | ||||
|             default_lo_addrs(enable_transports) | ||||
|         ) | ||||
| 
 | ||||
|     maybe_portals: list[ | ||||
|         AsyncContextManager[tuple[str, int]] | ||||
|         AsyncContextManager[AddressTypes] | ||||
|     ] = list( | ||||
|         maybe_open_portal( | ||||
|             addr=addr, | ||||
|  | @ -274,7 +276,7 @@ async def find_actor( | |||
| @acm | ||||
| async def wait_for_actor( | ||||
|     name: str, | ||||
|     registry_addr: tuple[str, int] | None = None, | ||||
|     registry_addr: AddressTypes | None = None, | ||||
| 
 | ||||
| ) -> AsyncGenerator[Portal, None]: | ||||
|     ''' | ||||
|  | @ -291,7 +293,7 @@ async def wait_for_actor( | |||
|             yield peer_portal | ||||
|             return | ||||
| 
 | ||||
|     regaddr: tuple[str, int] = ( | ||||
|     regaddr: AddressTypes = ( | ||||
|         registry_addr | ||||
|         or | ||||
|         actor.reg_addrs[0] | ||||
|  | @ -299,8 +301,8 @@ async def wait_for_actor( | |||
|     # TODO: use `.trionics.gather_contexts()` like | ||||
|     # above in `find_actor()` as well? | ||||
|     reg_portal: Portal | ||||
|     async with get_registry(*regaddr) as reg_portal: | ||||
|         sockaddrs = await reg_portal.run_from_ns( | ||||
|     async with get_registry(regaddr) as reg_portal: | ||||
|         addrs = await reg_portal.run_from_ns( | ||||
|             'self', | ||||
|             'wait_for_actor', | ||||
|             name=name, | ||||
|  | @ -308,8 +310,8 @@ async def wait_for_actor( | |||
| 
 | ||||
|         # get latest registered addr by default? | ||||
|         # TODO: offer multi-portal yields in multi-homed case? | ||||
|         sockaddr: tuple[str, int] = sockaddrs[-1] | ||||
|         addr: AddressTypes = addrs[-1] | ||||
| 
 | ||||
|         async with _connect_chan(sockaddr) as chan: | ||||
|         async with _connect_chan(addr) as chan: | ||||
|             async with open_portal(chan) as portal: | ||||
|                 yield portal | ||||
|  |  | |||
|  | @ -37,6 +37,7 @@ from .log import ( | |||
| from . import _state | ||||
| from .devx import _debug | ||||
| from .to_asyncio import run_as_asyncio_guest | ||||
| from ._addr import AddressTypes | ||||
| from ._runtime import ( | ||||
|     async_main, | ||||
|     Actor, | ||||
|  | @ -52,10 +53,10 @@ log = get_logger(__name__) | |||
| def _mp_main( | ||||
| 
 | ||||
|     actor: Actor, | ||||
|     accept_addrs: list[tuple[str, int]], | ||||
|     accept_addrs: list[AddressTypes], | ||||
|     forkserver_info: tuple[Any, Any, Any, Any, Any], | ||||
|     start_method: SpawnMethodKey, | ||||
|     parent_addr: tuple[str, int] | None = None, | ||||
|     parent_addr: AddressTypes | None = None, | ||||
|     infect_asyncio: bool = False, | ||||
| 
 | ||||
| ) -> None: | ||||
|  | @ -206,7 +207,7 @@ def nest_from_op( | |||
| def _trio_main( | ||||
|     actor: Actor, | ||||
|     *, | ||||
|     parent_addr: tuple[str, int] | None = None, | ||||
|     parent_addr: AddressTypes | None = None, | ||||
|     infect_asyncio: bool = False, | ||||
| 
 | ||||
| ) -> None: | ||||
|  |  | |||
|  | @ -43,21 +43,18 @@ from .devx import _debug | |||
| from . import _spawn | ||||
| from . import _state | ||||
| from . import log | ||||
| from .ipc import _connect_chan | ||||
| from .ipc import ( | ||||
|     _connect_chan, | ||||
| ) | ||||
| from ._addr import ( | ||||
|     AddressTypes, | ||||
|     wrap_address, | ||||
|     preferred_transport, | ||||
|     default_lo_addrs | ||||
| ) | ||||
| from ._exceptions import is_multi_cancelled | ||||
| 
 | ||||
| 
 | ||||
| # set at startup and after forks | ||||
| _default_host: str = '127.0.0.1' | ||||
| _default_port: int = 1616 | ||||
| 
 | ||||
| # default registry always on localhost | ||||
| _default_lo_addrs: list[tuple[str, int]] = [( | ||||
|     _default_host, | ||||
|     _default_port, | ||||
| )] | ||||
| 
 | ||||
| 
 | ||||
| logger = log.get_logger('tractor') | ||||
| 
 | ||||
| 
 | ||||
|  | @ -66,10 +63,12 @@ async def open_root_actor( | |||
| 
 | ||||
|     *, | ||||
|     # defaults are above | ||||
|     registry_addrs: list[tuple[str, int]]|None = None, | ||||
|     registry_addrs: list[AddressTypes]|None = None, | ||||
| 
 | ||||
|     # defaults are above | ||||
|     arbiter_addr: tuple[str, int]|None = None, | ||||
|     arbiter_addr: tuple[AddressTypes]|None = None, | ||||
| 
 | ||||
|     enable_transports: list[str] = [preferred_transport], | ||||
| 
 | ||||
|     name: str|None = 'root', | ||||
| 
 | ||||
|  | @ -195,11 +194,9 @@ async def open_root_actor( | |||
|         ) | ||||
|         registry_addrs = [arbiter_addr] | ||||
| 
 | ||||
|     registry_addrs: list[tuple[str, int]] = ( | ||||
|         registry_addrs | ||||
|         or | ||||
|         _default_lo_addrs | ||||
|     ) | ||||
|     if not registry_addrs: | ||||
|         registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports) | ||||
| 
 | ||||
|     assert registry_addrs | ||||
| 
 | ||||
|     loglevel = ( | ||||
|  | @ -248,10 +245,10 @@ async def open_root_actor( | |||
|         enable_stack_on_sig() | ||||
| 
 | ||||
|     # closed into below ping task-func | ||||
|     ponged_addrs: list[tuple[str, int]] = [] | ||||
|     ponged_addrs: list[AddressTypes] = [] | ||||
| 
 | ||||
|     async def ping_tpt_socket( | ||||
|         addr: tuple[str, int], | ||||
|         addr: AddressTypes, | ||||
|         timeout: float = 1, | ||||
|     ) -> None: | ||||
|         ''' | ||||
|  | @ -284,10 +281,10 @@ async def open_root_actor( | |||
|         for addr in registry_addrs: | ||||
|             tn.start_soon( | ||||
|                 ping_tpt_socket, | ||||
|                 tuple(addr),  # TODO: just drop this requirement? | ||||
|                 addr, | ||||
|             ) | ||||
| 
 | ||||
|     trans_bind_addrs: list[tuple[str, int]] = [] | ||||
|     trans_bind_addrs: list[AddressTypes] = [] | ||||
| 
 | ||||
|     # Create a new local root-actor instance which IS NOT THE | ||||
|     # REGISTRAR | ||||
|  | @ -311,9 +308,12 @@ async def open_root_actor( | |||
|         ) | ||||
|         # DO NOT use the registry_addrs as the transport server | ||||
|         # addrs for this new non-registar, root-actor. | ||||
|         for host, port in ponged_addrs: | ||||
|             # NOTE: zero triggers dynamic OS port allocation | ||||
|             trans_bind_addrs.append((host, 0)) | ||||
|         for addr in ponged_addrs: | ||||
|             waddr = wrap_address(addr) | ||||
|             print(waddr) | ||||
|             trans_bind_addrs.append( | ||||
|                 waddr.get_random(namespace=waddr.namespace) | ||||
|             ) | ||||
| 
 | ||||
|     # Start this local actor as the "registrar", aka a regular | ||||
|     # actor who manages the local registry of "mailboxes" of | ||||
|  | @ -322,7 +322,7 @@ async def open_root_actor( | |||
| 
 | ||||
|         # NOTE that if the current actor IS THE REGISTAR, the | ||||
|         # following init steps are taken: | ||||
|         # - the tranport layer server is bound to each (host, port) | ||||
|         # - the tranport layer server is bound to each addr | ||||
|         #   pair defined in provided registry_addrs, or the default. | ||||
|         trans_bind_addrs = registry_addrs | ||||
| 
 | ||||
|  | @ -462,7 +462,7 @@ def run_daemon( | |||
| 
 | ||||
|     # runtime kwargs | ||||
|     name: str | None = 'root', | ||||
|     registry_addrs: list[tuple[str, int]] = _default_lo_addrs, | ||||
|     registry_addrs: list[AddressTypes]|None = None, | ||||
| 
 | ||||
|     start_method: str | None = None, | ||||
|     debug_mode: bool = False, | ||||
|  |  | |||
|  | @ -74,6 +74,12 @@ from tractor.msg import ( | |||
|     types as msgtypes, | ||||
| ) | ||||
| from .ipc import Channel | ||||
| from ._addr import ( | ||||
|     AddressTypes, | ||||
|     Address, | ||||
|     TCPAddress, | ||||
|     wrap_address, | ||||
| ) | ||||
| from ._context import ( | ||||
|     mk_context, | ||||
|     Context, | ||||
|  | @ -179,11 +185,11 @@ class Actor: | |||
|         enable_modules: list[str] = [], | ||||
|         uid: str|None = None, | ||||
|         loglevel: str|None = None, | ||||
|         registry_addrs: list[tuple[str, int]]|None = None, | ||||
|         registry_addrs: list[AddressTypes]|None = None, | ||||
|         spawn_method: str|None = None, | ||||
| 
 | ||||
|         # TODO: remove! | ||||
|         arbiter_addr: tuple[str, int]|None = None, | ||||
|         arbiter_addr: AddressTypes|None = None, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         ''' | ||||
|  | @ -223,7 +229,7 @@ class Actor: | |||
|                 DeprecationWarning, | ||||
|                 stacklevel=2, | ||||
|             ) | ||||
|             registry_addrs: list[tuple[str, int]] = [arbiter_addr] | ||||
|             registry_addrs: list[AddressTypes] = [arbiter_addr] | ||||
| 
 | ||||
|         # marked by the process spawning backend at startup | ||||
|         # will be None for the parent most process started manually | ||||
|  | @ -257,6 +263,7 @@ class Actor: | |||
|         ] = {} | ||||
| 
 | ||||
|         self._listeners: list[trio.abc.Listener] = [] | ||||
|         self._listen_addrs: list[Address] = [] | ||||
|         self._parent_chan: Channel|None = None | ||||
|         self._forkserver_info: tuple|None = None | ||||
| 
 | ||||
|  | @ -269,13 +276,13 @@ class Actor: | |||
| 
 | ||||
|         # when provided, init the registry addresses property from | ||||
|         # input via the validator. | ||||
|         self._reg_addrs: list[tuple[str, int]] = [] | ||||
|         self._reg_addrs: list[AddressTypes] = [] | ||||
|         if registry_addrs: | ||||
|             self.reg_addrs: list[tuple[str, int]] = registry_addrs | ||||
|             self.reg_addrs: list[AddressTypes] = registry_addrs | ||||
|             _state._runtime_vars['_registry_addrs'] = registry_addrs | ||||
| 
 | ||||
|     @property | ||||
|     def reg_addrs(self) -> list[tuple[str, int]]: | ||||
|     def reg_addrs(self) -> list[AddressTypes]: | ||||
|         ''' | ||||
|         List of (socket) addresses for all known (and contactable) | ||||
|         registry actors. | ||||
|  | @ -286,7 +293,7 @@ class Actor: | |||
|     @reg_addrs.setter | ||||
|     def reg_addrs( | ||||
|         self, | ||||
|         addrs: list[tuple[str, int]], | ||||
|         addrs: list[AddressTypes], | ||||
|     ) -> None: | ||||
|         if not addrs: | ||||
|             log.warning( | ||||
|  | @ -295,16 +302,7 @@ class Actor: | |||
|             ) | ||||
|             return | ||||
| 
 | ||||
|         # always sanity check the input list since it's critical | ||||
|         # that addrs are correct for discovery sys operation. | ||||
|         for addr in addrs: | ||||
|             if not isinstance(addr, tuple): | ||||
|                 raise ValueError( | ||||
|                     'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n' | ||||
|                     f'Got {addrs}' | ||||
|                 ) | ||||
| 
 | ||||
|             self._reg_addrs = addrs | ||||
|         self._reg_addrs = addrs | ||||
| 
 | ||||
|     async def wait_for_peer( | ||||
|         self, | ||||
|  | @ -1024,11 +1022,11 @@ class Actor: | |||
| 
 | ||||
|     async def _from_parent( | ||||
|         self, | ||||
|         parent_addr: tuple[str, int]|None, | ||||
|         parent_addr: AddressTypes|None, | ||||
| 
 | ||||
|     ) -> tuple[ | ||||
|         Channel, | ||||
|         list[tuple[str, int]]|None, | ||||
|         list[AddressTypes]|None, | ||||
|     ]: | ||||
|         ''' | ||||
|         Bootstrap this local actor's runtime config from its parent by | ||||
|  | @ -1040,13 +1038,13 @@ class Actor: | |||
|             # Connect back to the parent actor and conduct initial | ||||
|             # handshake. From this point on if we error, we | ||||
|             # attempt to ship the exception back to the parent. | ||||
|             chan = await Channel.from_destaddr(parent_addr) | ||||
|             chan = await Channel.from_addr(wrap_address(parent_addr)) | ||||
| 
 | ||||
|             # TODO: move this into a `Channel.handshake()`? | ||||
|             # Initial handshake: swap names. | ||||
|             await self._do_handshake(chan) | ||||
| 
 | ||||
|             accept_addrs: list[tuple[str, int]]|None = None | ||||
|             accept_addrs: list[AddressTypes]|None = None | ||||
| 
 | ||||
|             if self._spawn_method == "trio": | ||||
| 
 | ||||
|  | @ -1063,7 +1061,7 @@ class Actor: | |||
|                     # if "trace"/"util" mode is enabled? | ||||
|                     f'{pretty_struct.pformat(spawnspec)}\n' | ||||
|                 ) | ||||
|                 accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs | ||||
|                 accept_addrs: list[AddressTypes] = spawnspec.bind_addrs | ||||
| 
 | ||||
|                 # TODO: another `Struct` for rtvs.. | ||||
|                 rvs: dict[str, Any] = spawnspec._runtime_vars | ||||
|  | @ -1170,8 +1168,7 @@ class Actor: | |||
|         self, | ||||
|         handler_nursery: Nursery, | ||||
|         *, | ||||
|         # (host, port) to bind for channel server | ||||
|         listen_sockaddrs: list[tuple[str, int]]|None = None, | ||||
|         listen_addrs: list[AddressTypes]|None = None, | ||||
| 
 | ||||
|         task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, | ||||
|     ) -> None: | ||||
|  | @ -1183,37 +1180,39 @@ class Actor: | |||
|         `.cancel_server()` is called. | ||||
| 
 | ||||
|         ''' | ||||
|         if listen_sockaddrs is None: | ||||
|             listen_sockaddrs = [(None, 0)] | ||||
|         if listen_addrs is None: | ||||
|             listen_addrs = [TCPAddress.get_random()] | ||||
| 
 | ||||
|         else: | ||||
|             listen_addrs: list[Address] = [ | ||||
|                 wrap_address(a) for a in listen_addrs | ||||
|             ] | ||||
| 
 | ||||
|         self._server_down = trio.Event() | ||||
|         try: | ||||
|             async with trio.open_nursery() as server_n: | ||||
|                 listeners: list[trio.abc.Listener] = [ | ||||
|                     await addr.open_listener() | ||||
|                     for addr in listen_addrs | ||||
|                 ] | ||||
|                 await server_n.start( | ||||
|                     partial( | ||||
|                         trio.serve_listeners, | ||||
|                         handler=self._stream_handler, | ||||
|                         listeners=listeners, | ||||
| 
 | ||||
|                 for host, port in listen_sockaddrs: | ||||
|                     listeners: list[trio.abc.Listener] = await server_n.start( | ||||
|                         partial( | ||||
|                             trio.serve_tcp, | ||||
| 
 | ||||
|                             handler=self._stream_handler, | ||||
|                             port=port, | ||||
|                             host=host, | ||||
| 
 | ||||
|                             # NOTE: configured such that new | ||||
|                             # connections will stay alive even if | ||||
|                             # this server is cancelled! | ||||
|                             handler_nursery=handler_nursery, | ||||
|                         ) | ||||
|                         # NOTE: configured such that new | ||||
|                         # connections will stay alive even if | ||||
|                         # this server is cancelled! | ||||
|                         handler_nursery=handler_nursery | ||||
|                     ) | ||||
|                     sockets: list[trio.socket] = [ | ||||
|                         getattr(listener, 'socket', 'unknown socket') | ||||
|                         for listener in listeners | ||||
|                     ] | ||||
|                     log.runtime( | ||||
|                         'Started TCP server(s)\n' | ||||
|                         f'|_{sockets}\n' | ||||
|                     ) | ||||
|                     self._listeners.extend(listeners) | ||||
|                 ) | ||||
|                 log.runtime( | ||||
|                     'Started server(s)\n' | ||||
|                     '\n'.join([f'|_{addr}' for addr in listen_addrs]) | ||||
|                 ) | ||||
|                 self._listen_addrs.extend(listen_addrs) | ||||
|                 self._listeners.extend(listeners) | ||||
| 
 | ||||
|                 task_status.started(server_n) | ||||
| 
 | ||||
|  | @ -1576,26 +1575,21 @@ class Actor: | |||
|         return False | ||||
| 
 | ||||
|     @property | ||||
|     def accept_addrs(self) -> list[tuple[str, int]]: | ||||
|     def accept_addrs(self) -> list[AddressTypes]: | ||||
|         ''' | ||||
|         All addresses to which the transport-channel server binds | ||||
|         and listens for new connections. | ||||
| 
 | ||||
|         ''' | ||||
|         # throws OSError on failure | ||||
|         return [ | ||||
|             listener.socket.getsockname()  | ||||
|             for listener in self._listeners | ||||
|         ]  # type: ignore | ||||
|         return [a.unwrap() for a in self._listen_addrs] | ||||
| 
 | ||||
|     @property | ||||
|     def accept_addr(self) -> tuple[str, int]: | ||||
|     def accept_addr(self) -> AddressTypes: | ||||
|         ''' | ||||
|         Primary address to which the IPC transport server is | ||||
|         bound and listening for new connections. | ||||
| 
 | ||||
|         ''' | ||||
|         # throws OSError on failure | ||||
|         return self.accept_addrs[0] | ||||
| 
 | ||||
|     def get_parent(self) -> Portal: | ||||
|  | @ -1667,7 +1661,7 @@ class Actor: | |||
| 
 | ||||
| async def async_main( | ||||
|     actor: Actor, | ||||
|     accept_addrs: tuple[str, int]|None = None, | ||||
|     accept_addrs: AddressTypes|None = None, | ||||
| 
 | ||||
|     # XXX: currently ``parent_addr`` is only needed for the | ||||
|     # ``multiprocessing`` backend (which pickles state sent to | ||||
|  | @ -1676,7 +1670,7 @@ async def async_main( | |||
|     # change this to a simple ``is_subactor: bool`` which will | ||||
|     # be False when running as root actor and True when as | ||||
|     # a subactor. | ||||
|     parent_addr: tuple[str, int]|None = None, | ||||
|     parent_addr: AddressTypes|None = None, | ||||
|     task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
| ) -> None: | ||||
|  | @ -1766,7 +1760,7 @@ async def async_main( | |||
|                         partial( | ||||
|                             actor._serve_forever, | ||||
|                             service_nursery, | ||||
|                             listen_sockaddrs=accept_addrs, | ||||
|                             listen_addrs=accept_addrs, | ||||
|                         ) | ||||
|                     ) | ||||
|                 except OSError as oserr: | ||||
|  | @ -1782,7 +1776,7 @@ async def async_main( | |||
| 
 | ||||
|                     raise | ||||
| 
 | ||||
|                 accept_addrs: list[tuple[str, int]] = actor.accept_addrs | ||||
|                 accept_addrs: list[AddressTypes] = actor.accept_addrs | ||||
| 
 | ||||
|                 # NOTE: only set the loopback addr for the  | ||||
|                 # process-tree-global "root" mailbox since | ||||
|  | @ -1790,9 +1784,8 @@ async def async_main( | |||
|                 # their root actor over that channel. | ||||
|                 if _state._runtime_vars['_is_root']: | ||||
|                     for addr in accept_addrs: | ||||
|                         host, _ = addr | ||||
|                         # TODO: generic 'lo' detector predicate | ||||
|                         if '127.0.0.1' in host: | ||||
|                         waddr = wrap_address(addr) | ||||
|                         if waddr == waddr.get_root(): | ||||
|                             _state._runtime_vars['_root_mailbox'] = addr | ||||
| 
 | ||||
|                 # Register with the arbiter if we're told its addr | ||||
|  | @ -1807,24 +1800,21 @@ async def async_main( | |||
|                 # only on unique actor uids? | ||||
|                 for addr in actor.reg_addrs: | ||||
|                     try: | ||||
|                         assert isinstance(addr, tuple) | ||||
|                         assert addr[1]  # non-zero after bind | ||||
|                         waddr = wrap_address(addr) | ||||
|                         assert waddr.is_valid | ||||
|                     except AssertionError: | ||||
|                         await _debug.pause() | ||||
| 
 | ||||
|                     async with get_registry(*addr) as reg_portal: | ||||
|                     async with get_registry(addr) as reg_portal: | ||||
|                         for accept_addr in accept_addrs: | ||||
| 
 | ||||
|                             if not accept_addr[1]: | ||||
|                                 await _debug.pause() | ||||
| 
 | ||||
|                             assert accept_addr[1] | ||||
|                             accept_addr = wrap_address(accept_addr) | ||||
|                             assert accept_addr.is_valid | ||||
| 
 | ||||
|                             await reg_portal.run_from_ns( | ||||
|                                 'self', | ||||
|                                 'register_actor', | ||||
|                                 uid=actor.uid, | ||||
|                                 sockaddr=accept_addr, | ||||
|                                 addr=accept_addr.unwrap(), | ||||
|                             ) | ||||
| 
 | ||||
|                     is_registered: bool = True | ||||
|  | @ -1951,12 +1941,13 @@ async def async_main( | |||
|         ): | ||||
|             failed: bool = False | ||||
|             for addr in actor.reg_addrs: | ||||
|                 assert isinstance(addr, tuple) | ||||
|                 waddr = wrap_address(addr) | ||||
|                 assert waddr.is_valid | ||||
|                 with trio.move_on_after(0.5) as cs: | ||||
|                     cs.shield = True | ||||
|                     try: | ||||
|                         async with get_registry( | ||||
|                             *addr, | ||||
|                             addr, | ||||
|                         ) as reg_portal: | ||||
|                             await reg_portal.run_from_ns( | ||||
|                                 'self', | ||||
|  | @ -2034,7 +2025,7 @@ class Arbiter(Actor): | |||
| 
 | ||||
|         self._registry: dict[ | ||||
|             tuple[str, str], | ||||
|             tuple[str, int], | ||||
|             AddressTypes, | ||||
|         ] = {} | ||||
|         self._waiters: dict[ | ||||
|             str, | ||||
|  | @ -2050,18 +2041,18 @@ class Arbiter(Actor): | |||
|         self, | ||||
|         name: str, | ||||
| 
 | ||||
|     ) -> tuple[str, int]|None: | ||||
|     ) -> AddressTypes|None: | ||||
| 
 | ||||
|         for uid, sockaddr in self._registry.items(): | ||||
|         for uid, addr in self._registry.items(): | ||||
|             if name in uid: | ||||
|                 return sockaddr | ||||
|                 return addr | ||||
| 
 | ||||
|         return None | ||||
| 
 | ||||
|     async def get_registry( | ||||
|         self | ||||
| 
 | ||||
|     ) -> dict[str, tuple[str, int]]: | ||||
|     ) -> dict[str, AddressTypes]: | ||||
|         ''' | ||||
|         Return current name registry. | ||||
| 
 | ||||
|  | @ -2081,7 +2072,7 @@ class Arbiter(Actor): | |||
|         self, | ||||
|         name: str, | ||||
| 
 | ||||
|     ) -> list[tuple[str, int]]: | ||||
|     ) -> list[AddressTypes]: | ||||
|         ''' | ||||
|         Wait for a particular actor to register. | ||||
| 
 | ||||
|  | @ -2089,44 +2080,41 @@ class Arbiter(Actor): | |||
|         registered. | ||||
| 
 | ||||
|         ''' | ||||
|         sockaddrs: list[tuple[str, int]] = [] | ||||
|         sockaddr: tuple[str, int] | ||||
|         addrs: list[AddressTypes] = [] | ||||
|         addr: AddressTypes | ||||
| 
 | ||||
|         mailbox_info: str = 'Actor registry contact infos:\n' | ||||
|         for uid, sockaddr in self._registry.items(): | ||||
|         for uid, addr in self._registry.items(): | ||||
|             mailbox_info += ( | ||||
|                 f'|_uid: {uid}\n' | ||||
|                 f'|_sockaddr: {sockaddr}\n\n' | ||||
|                 f'|_addr: {addr}\n\n' | ||||
|             ) | ||||
|             if name == uid[0]: | ||||
|                 sockaddrs.append(sockaddr) | ||||
|                 addrs.append(addr) | ||||
| 
 | ||||
|         if not sockaddrs: | ||||
|         if not addrs: | ||||
|             waiter = trio.Event() | ||||
|             self._waiters.setdefault(name, []).append(waiter) | ||||
|             await waiter.wait() | ||||
| 
 | ||||
|             for uid in self._waiters[name]: | ||||
|                 if not isinstance(uid, trio.Event): | ||||
|                     sockaddrs.append(self._registry[uid]) | ||||
|                     addrs.append(self._registry[uid]) | ||||
| 
 | ||||
|         log.runtime(mailbox_info) | ||||
|         return sockaddrs | ||||
|         return addrs | ||||
| 
 | ||||
|     async def register_actor( | ||||
|         self, | ||||
|         uid: tuple[str, str], | ||||
|         sockaddr: tuple[str, int] | ||||
| 
 | ||||
|         addr: AddressTypes | ||||
|     ) -> None: | ||||
|         uid = name, hash = (str(uid[0]), str(uid[1])) | ||||
|         addr = (host, port) = ( | ||||
|             str(sockaddr[0]), | ||||
|             int(sockaddr[1]), | ||||
|         ) | ||||
|         if port == 0: | ||||
|         waddr: Address = wrap_address(addr) | ||||
|         if not waddr.is_valid: | ||||
|             # should never be 0-dynamic-os-alloc | ||||
|             await _debug.pause() | ||||
|         assert port  # should never be 0-dynamic-os-alloc | ||||
| 
 | ||||
|         self._registry[uid] = addr | ||||
| 
 | ||||
|         # pop and signal all waiter events | ||||
|  |  | |||
|  | @ -46,6 +46,7 @@ from tractor._state import ( | |||
|     _runtime_vars, | ||||
| ) | ||||
| from tractor.log import get_logger | ||||
| from tractor._addr import AddressTypes | ||||
| from tractor._portal import Portal | ||||
| from tractor._runtime import Actor | ||||
| from tractor._entry import _mp_main | ||||
|  | @ -392,8 +393,8 @@ async def new_proc( | |||
|     errors: dict[tuple[str, str], Exception], | ||||
| 
 | ||||
|     # passed through to actor main | ||||
|     bind_addrs: list[tuple[str, int]], | ||||
|     parent_addr: tuple[str, int], | ||||
|     bind_addrs: list[AddressTypes], | ||||
|     parent_addr: AddressTypes, | ||||
|     _runtime_vars: dict[str, Any],  # serialized and sent to _child | ||||
| 
 | ||||
|     *, | ||||
|  | @ -431,8 +432,8 @@ async def trio_proc( | |||
|     errors: dict[tuple[str, str], Exception], | ||||
| 
 | ||||
|     # passed through to actor main | ||||
|     bind_addrs: list[tuple[str, int]], | ||||
|     parent_addr: tuple[str, int], | ||||
|     bind_addrs: list[AddressTypes], | ||||
|     parent_addr: AddressTypes, | ||||
|     _runtime_vars: dict[str, Any],  # serialized and sent to _child | ||||
|     *, | ||||
|     infect_asyncio: bool = False, | ||||
|  | @ -520,15 +521,15 @@ async def trio_proc( | |||
| 
 | ||||
|         # send a "spawning specification" which configures the | ||||
|         # initial runtime state of the child. | ||||
|         await chan.send( | ||||
|             SpawnSpec( | ||||
|                 _parent_main_data=subactor._parent_main_data, | ||||
|                 enable_modules=subactor.enable_modules, | ||||
|                 reg_addrs=subactor.reg_addrs, | ||||
|                 bind_addrs=bind_addrs, | ||||
|                 _runtime_vars=_runtime_vars, | ||||
|             ) | ||||
|         sspec = SpawnSpec( | ||||
|             _parent_main_data=subactor._parent_main_data, | ||||
|             enable_modules=subactor.enable_modules, | ||||
|             reg_addrs=subactor.reg_addrs, | ||||
|             bind_addrs=bind_addrs, | ||||
|             _runtime_vars=_runtime_vars, | ||||
|         ) | ||||
|         log.runtime(f'Sending spawn spec: {str(sspec)}') | ||||
|         await chan.send(sspec) | ||||
| 
 | ||||
|         # track subactor in current nursery | ||||
|         curr_actor: Actor = current_actor() | ||||
|  | @ -638,8 +639,8 @@ async def mp_proc( | |||
|     subactor: Actor, | ||||
|     errors: dict[tuple[str, str], Exception], | ||||
|     # passed through to actor main | ||||
|     bind_addrs: list[tuple[str, int]], | ||||
|     parent_addr: tuple[str, int], | ||||
|     bind_addrs: list[AddressTypes], | ||||
|     parent_addr: AddressTypes, | ||||
|     _runtime_vars: dict[str, Any],  # serialized and sent to _child | ||||
|     *, | ||||
|     infect_asyncio: bool = False, | ||||
|  |  | |||
|  | @ -28,7 +28,13 @@ import warnings | |||
| 
 | ||||
| import trio | ||||
| 
 | ||||
| 
 | ||||
| from .devx._debug import maybe_wait_for_debugger | ||||
| from ._addr import ( | ||||
|     AddressTypes, | ||||
|     preferred_transport, | ||||
|     get_address_cls | ||||
| ) | ||||
| from ._state import current_actor, is_main_process | ||||
| from .log import get_logger, get_loglevel | ||||
| from ._runtime import Actor | ||||
|  | @ -47,8 +53,6 @@ if TYPE_CHECKING: | |||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| _default_bind_addr: tuple[str, int] = ('127.0.0.1', 0) | ||||
| 
 | ||||
| 
 | ||||
| class ActorNursery: | ||||
|     ''' | ||||
|  | @ -130,8 +134,9 @@ class ActorNursery: | |||
| 
 | ||||
|         *, | ||||
| 
 | ||||
|         bind_addrs: list[tuple[str, int]] = [_default_bind_addr], | ||||
|         bind_addrs: list[AddressTypes]|None = None, | ||||
|         rpc_module_paths: list[str]|None = None, | ||||
|         enable_transports: list[str] = [preferred_transport], | ||||
|         enable_modules: list[str]|None = None, | ||||
|         loglevel: str|None = None,  # set log level per subactor | ||||
|         debug_mode: bool|None = None, | ||||
|  | @ -156,6 +161,12 @@ class ActorNursery: | |||
|             or get_loglevel() | ||||
|         ) | ||||
| 
 | ||||
|         if not bind_addrs: | ||||
|             bind_addrs: list[AddressTypes] = [ | ||||
|                 get_address_cls(transport).get_random().unwrap() | ||||
|                 for transport in enable_transports | ||||
|             ] | ||||
| 
 | ||||
|         # configure and pass runtime state | ||||
|         _rtv = _state._runtime_vars.copy() | ||||
|         _rtv['_is_root'] = False | ||||
|  | @ -224,7 +235,7 @@ class ActorNursery: | |||
|         *, | ||||
| 
 | ||||
|         name: str | None = None, | ||||
|         bind_addrs: tuple[str, int] = [_default_bind_addr], | ||||
|         bind_addrs: AddressTypes|None = None, | ||||
|         rpc_module_paths: list[str] | None = None, | ||||
|         enable_modules: list[str] | None = None, | ||||
|         loglevel: str | None = None,  # set log level per subactor | ||||
|  |  | |||
|  | @ -17,7 +17,6 @@ import platform | |||
| 
 | ||||
| from ._transport import ( | ||||
|     MsgTransportKey as MsgTransportKey, | ||||
|     AddressType as AddressType, | ||||
|     MsgType as MsgType, | ||||
|     MsgTransport as MsgTransport, | ||||
|     MsgpackTransport as MsgpackTransport | ||||
|  | @ -27,10 +26,8 @@ from ._tcp import MsgpackTCPStream as MsgpackTCPStream | |||
| from ._uds import MsgpackUDSStream as MsgpackUDSStream | ||||
| 
 | ||||
| from ._types import ( | ||||
|     default_lo_addrs as default_lo_addrs, | ||||
|     transport_from_destaddr as transport_from_destaddr, | ||||
|     transport_from_addr as transport_from_addr, | ||||
|     transport_from_stream as transport_from_stream, | ||||
|     AddressTypes as AddressTypes | ||||
| ) | ||||
| 
 | ||||
| from ._chan import ( | ||||
|  |  | |||
|  | @ -35,8 +35,12 @@ import trio | |||
| 
 | ||||
| from tractor.ipc._transport import MsgTransport | ||||
| from tractor.ipc._types import ( | ||||
|     transport_from_destaddr, | ||||
|     transport_from_addr, | ||||
|     transport_from_stream, | ||||
| ) | ||||
| from tractor._addr import ( | ||||
|     wrap_address, | ||||
|     Address, | ||||
|     AddressTypes | ||||
| ) | ||||
| from tractor.log import get_logger | ||||
|  | @ -66,7 +70,6 @@ class Channel: | |||
|     def __init__( | ||||
| 
 | ||||
|         self, | ||||
|         destaddr: AddressTypes|None = None, | ||||
|         transport: MsgTransport|None = None, | ||||
|         # TODO: optional reconnection support? | ||||
|         # auto_reconnect: bool = False, | ||||
|  | @ -81,8 +84,6 @@ class Channel: | |||
|         # user in ``.from_stream()``. | ||||
|         self._transport: MsgTransport|None = transport | ||||
| 
 | ||||
|         self._destaddr = destaddr if destaddr else self._transport.raddr | ||||
| 
 | ||||
|         # set after handshake - always uid of far end | ||||
|         self.uid: tuple[str, str]|None = None | ||||
| 
 | ||||
|  | @ -121,13 +122,14 @@ class Channel: | |||
|         ) | ||||
| 
 | ||||
|     @classmethod | ||||
|     async def from_destaddr( | ||||
|     async def from_addr( | ||||
|         cls, | ||||
|         destaddr: AddressTypes, | ||||
|         addr: AddressTypes, | ||||
|         **kwargs | ||||
|     ) -> Channel: | ||||
|         transport_cls = transport_from_destaddr(destaddr) | ||||
|         transport = await transport_cls.connect_to(destaddr, **kwargs) | ||||
|         addr: Address = wrap_address(addr) | ||||
|         transport_cls = transport_from_addr(addr) | ||||
|         transport = await transport_cls.connect_to(addr, **kwargs) | ||||
| 
 | ||||
|         log.transport( | ||||
|             f'Opened channel[{type(transport)}]: {transport.laddr} -> {transport.raddr}' | ||||
|  | @ -164,11 +166,11 @@ class Channel: | |||
|         ) | ||||
| 
 | ||||
|     @property | ||||
|     def laddr(self) -> tuple[str, int]|None: | ||||
|     def laddr(self) -> Address|None: | ||||
|         return self._transport.laddr if self._transport else None | ||||
| 
 | ||||
|     @property | ||||
|     def raddr(self) -> tuple[str, int]|None: | ||||
|     def raddr(self) -> Address|None: | ||||
|         return self._transport.raddr if self._transport else None | ||||
| 
 | ||||
|     # TODO: something like, | ||||
|  | @ -205,7 +207,11 @@ class Channel: | |||
|                 # assert err | ||||
|                 __tracebackhide__: bool = False | ||||
|             else: | ||||
|                 assert err.cid | ||||
|                 try: | ||||
|                     assert err.cid | ||||
| 
 | ||||
|                 except KeyError: | ||||
|                     raise err | ||||
| 
 | ||||
|             raise | ||||
| 
 | ||||
|  | @ -332,14 +338,14 @@ class Channel: | |||
| 
 | ||||
| @acm | ||||
| async def _connect_chan( | ||||
|     destaddr: AddressTypes | ||||
|     addr: AddressTypes | ||||
| ) -> typing.AsyncGenerator[Channel, None]: | ||||
|     ''' | ||||
|     Create and connect a channel with disconnect on context manager | ||||
|     teardown. | ||||
| 
 | ||||
|     ''' | ||||
|     chan = await Channel.from_destaddr(destaddr) | ||||
|     chan = await Channel.from_addr(addr) | ||||
|     yield chan | ||||
|     with trio.CancelScope(shield=True): | ||||
|         await chan.aclose() | ||||
|  |  | |||
|  | @ -183,6 +183,9 @@ class RingBuffSender(trio.abc.SendStream): | |||
|     def wrap_fd(self) -> int: | ||||
|         return self._wrap_event.fd | ||||
| 
 | ||||
|     async def _wait_wrap(self): | ||||
|         await self._wrap_event.read() | ||||
| 
 | ||||
|     async def send_all(self, data: Buffer): | ||||
|         async with self._send_lock: | ||||
|             # while data is larger than the remaining buf | ||||
|  | @ -193,7 +196,7 @@ class RingBuffSender(trio.abc.SendStream): | |||
|                 self._shm.buf[self.ptr:] = data[:remaining] | ||||
|                 # signal write and wait for reader wrap around | ||||
|                 self._write_event.write(remaining) | ||||
|                 await self._wrap_event.read() | ||||
|                 await self._wait_wrap() | ||||
| 
 | ||||
|                 # wrap around and trim already written bytes | ||||
|                 self._ptr = 0 | ||||
|  |  | |||
|  | @ -23,6 +23,7 @@ import trio | |||
| 
 | ||||
| from tractor.msg import MsgCodec | ||||
| from tractor.log import get_logger | ||||
| from tractor._addr import TCPAddress | ||||
| from tractor.ipc._transport import MsgpackTransport | ||||
| 
 | ||||
| 
 | ||||
|  | @ -38,9 +39,8 @@ class MsgpackTCPStream(MsgpackTransport): | |||
|     using the ``msgspec`` codec lib. | ||||
| 
 | ||||
|     ''' | ||||
|     address_type = tuple[str, int] | ||||
|     address_type = TCPAddress | ||||
|     layer_key: int = 4 | ||||
|     name_key: str = 'tcp' | ||||
| 
 | ||||
|     # def __init__( | ||||
|     #     self, | ||||
|  | @ -55,19 +55,32 @@ class MsgpackTCPStream(MsgpackTransport): | |||
|     #         codec=codec | ||||
|     #     ) | ||||
| 
 | ||||
|     @property | ||||
|     def maddr(self) -> str: | ||||
|         host, port = self.raddr.unwrap() | ||||
|         return ( | ||||
|             f'/ipv4/{host}' | ||||
|             f'/{self.address_type.name_key}/{port}' | ||||
|             # f'/{self.chan.uid[0]}' | ||||
|             # f'/{self.cid}' | ||||
| 
 | ||||
|             # f'/cid={cid_head}..{cid_tail}' | ||||
|             # TODO: ? not use this ^ right ? | ||||
|         ) | ||||
| 
 | ||||
|     def connected(self) -> bool: | ||||
|         return self.stream.socket.fileno() != -1 | ||||
| 
 | ||||
|     @classmethod | ||||
|     async def connect_to( | ||||
|         cls, | ||||
|         destaddr: tuple[str, int], | ||||
|         destaddr: TCPAddress, | ||||
|         prefix_size: int = 4, | ||||
|         codec: MsgCodec|None = None, | ||||
|         **kwargs | ||||
|     ) -> MsgpackTCPStream: | ||||
|         stream = await trio.open_tcp_stream( | ||||
|             *destaddr, | ||||
|             *destaddr.unwrap(), | ||||
|             **kwargs | ||||
|         ) | ||||
|         return MsgpackTCPStream( | ||||
|  | @ -87,14 +100,6 @@ class MsgpackTCPStream(MsgpackTransport): | |||
|         lsockname = stream.socket.getsockname() | ||||
|         rsockname = stream.socket.getpeername() | ||||
|         return ( | ||||
|             tuple(lsockname[:2]), | ||||
|             tuple(rsockname[:2]), | ||||
|             TCPAddress.from_addr(tuple(lsockname[:2])), | ||||
|             TCPAddress.from_addr(tuple(rsockname[:2])), | ||||
|         ) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_random_addr(self) -> tuple[str, int]: | ||||
|         return (None, 0) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_root_addr(self) -> tuple[str, int]: | ||||
|         return ('127.0.0.1', 1616) | ||||
|  |  | |||
|  | @ -50,6 +50,7 @@ from tractor.msg import ( | |||
|     types as msgtypes, | ||||
|     pretty_struct, | ||||
| ) | ||||
| from tractor._addr import Address | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
|  | @ -62,12 +63,11 @@ MsgTransportKey = tuple[str, str] | |||
| # ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? | ||||
| # => BLEH, except can't bc prots must inherit typevar or param-spec | ||||
| #   vars.. | ||||
| AddressType = TypeVar('AddressType') | ||||
| MsgType = TypeVar('MsgType') | ||||
| 
 | ||||
| 
 | ||||
| @runtime_checkable | ||||
| class MsgTransport(Protocol[AddressType, MsgType]): | ||||
| class MsgTransport(Protocol[MsgType]): | ||||
| # | ||||
| # ^-TODO-^ consider using a generic def and indexing with our | ||||
| # eventual msg definition/types? | ||||
|  | @ -75,10 +75,9 @@ class MsgTransport(Protocol[AddressType, MsgType]): | |||
| 
 | ||||
|     stream: trio.abc.Stream | ||||
|     drained: list[MsgType] | ||||
|     address_type: ClassVar[Type[AddressType]] | ||||
| 
 | ||||
|     address_type: ClassVar[Type[Address]] | ||||
|     codec_key: ClassVar[str] | ||||
|     name_key: ClassVar[str] | ||||
| 
 | ||||
|     # XXX: should this instead be called `.sendall()`? | ||||
|     async def send(self, msg: MsgType) -> None: | ||||
|  | @ -100,20 +99,24 @@ class MsgTransport(Protocol[AddressType, MsgType]): | |||
| 
 | ||||
|     @classmethod | ||||
|     def key(cls) -> MsgTransportKey: | ||||
|         return cls.codec_key, cls.name_key | ||||
|         return cls.codec_key, cls.address_type.name_key | ||||
| 
 | ||||
|     @property | ||||
|     def laddr(self) -> AddressType: | ||||
|     def laddr(self) -> Address: | ||||
|         ... | ||||
| 
 | ||||
|     @property | ||||
|     def raddr(self) -> AddressType: | ||||
|     def raddr(self) -> Address: | ||||
|         ... | ||||
| 
 | ||||
|     @property | ||||
|     def maddr(self) -> str: | ||||
|         ... | ||||
| 
 | ||||
|     @classmethod | ||||
|     async def connect_to( | ||||
|         cls, | ||||
|         destaddr: AddressType, | ||||
|         addr: Address, | ||||
|         **kwargs | ||||
|     ) -> MsgTransport: | ||||
|         ... | ||||
|  | @ -123,8 +126,8 @@ class MsgTransport(Protocol[AddressType, MsgType]): | |||
|         cls, | ||||
|         stream: trio.abc.Stream | ||||
|     ) -> tuple[ | ||||
|         AddressType,  # local | ||||
|         AddressType   # remote | ||||
|         Address,  # local | ||||
|         Address   # remote | ||||
|     ]: | ||||
|         ''' | ||||
|         Return the `trio` streaming transport prot's addrs for both | ||||
|  | @ -133,14 +136,6 @@ class MsgTransport(Protocol[AddressType, MsgType]): | |||
|         ''' | ||||
|         ... | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_random_addr(self) -> AddressType: | ||||
|         ... | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_root_addr(self) -> AddressType: | ||||
|         ... | ||||
| 
 | ||||
| 
 | ||||
| class MsgpackTransport(MsgTransport): | ||||
| 
 | ||||
|  | @ -447,9 +442,9 @@ class MsgpackTransport(MsgTransport): | |||
|         return self._aiter_pkts | ||||
| 
 | ||||
|     @property | ||||
|     def laddr(self) -> AddressType: | ||||
|     def laddr(self) -> Address: | ||||
|         return self._laddr | ||||
| 
 | ||||
|     @property | ||||
|     def raddr(self) -> AddressType: | ||||
|     def raddr(self) -> Address: | ||||
|         return self._raddr | ||||
|  |  | |||
|  | @ -13,49 +13,42 @@ | |||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| from typing import Type, Union | ||||
| from typing import Type | ||||
| 
 | ||||
| import trio | ||||
| import socket | ||||
| 
 | ||||
| from ._transport import ( | ||||
| from tractor._addr import Address | ||||
| from tractor.ipc._transport import ( | ||||
|     MsgTransportKey, | ||||
|     MsgTransport | ||||
| ) | ||||
| from ._tcp import MsgpackTCPStream | ||||
| from ._uds import MsgpackUDSStream | ||||
| from tractor.ipc._tcp import MsgpackTCPStream | ||||
| from tractor.ipc._uds import MsgpackUDSStream | ||||
| 
 | ||||
| 
 | ||||
| # manually updated list of all supported msg transport types | ||||
| _msg_transports = [ | ||||
|     MsgpackTCPStream, | ||||
|     MsgpackUDSStream | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| # manually updated list of all supported codec+transport types | ||||
| key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = { | ||||
| # convert a MsgTransportKey to the corresponding transport type | ||||
| _key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = { | ||||
|     cls.key(): cls | ||||
|     for cls in _msg_transports | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| # all different address py types we use | ||||
| AddressTypes = Union[ | ||||
|     tuple([ | ||||
|         cls.address_type | ||||
|         for cls in _msg_transports | ||||
|     ]) | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| default_lo_addrs: dict[MsgTransportKey, AddressTypes] = { | ||||
|     cls.key(): cls.get_root_addr() | ||||
| # convert an Address wrapper to its corresponding transport type | ||||
| _addr_to_transport: dict[Type[Address], Type[MsgTransport]] = { | ||||
|     cls.address_type: cls | ||||
|     for cls in _msg_transports | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| def transport_from_destaddr( | ||||
|     destaddr: AddressTypes, | ||||
| def transport_from_addr( | ||||
|     addr: Address, | ||||
|     codec_key: str = 'msgpack', | ||||
| ) -> Type[MsgTransport]: | ||||
|     ''' | ||||
|  | @ -63,23 +56,13 @@ def transport_from_destaddr( | |||
|     corresponding `MsgTransport` type. | ||||
| 
 | ||||
|     ''' | ||||
|     match destaddr: | ||||
|         case str(): | ||||
|             return MsgpackUDSStream | ||||
|     try: | ||||
|         return _addr_to_transport[type(addr)] | ||||
| 
 | ||||
|         case tuple(): | ||||
|             if ( | ||||
|                 len(destaddr) == 2 | ||||
|                 and | ||||
|                 isinstance(destaddr[0], str) | ||||
|                 and | ||||
|                 isinstance(destaddr[1], int) | ||||
|             ): | ||||
|                 return MsgpackTCPStream | ||||
| 
 | ||||
|     raise NotImplementedError( | ||||
|         f'No known transport for address {destaddr}' | ||||
|     ) | ||||
|     except KeyError: | ||||
|         raise NotImplementedError( | ||||
|             f'No known transport for address {repr(addr)}' | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| def transport_from_stream( | ||||
|  | @ -113,4 +96,4 @@ def transport_from_stream( | |||
| 
 | ||||
|     key = (codec_key, transport) | ||||
| 
 | ||||
|     return _msg_transports[key] | ||||
|     return _key_to_transport[key] | ||||
|  |  | |||
|  | @ -18,13 +18,12 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco | |||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| import tempfile | ||||
| from uuid import uuid4 | ||||
| 
 | ||||
| import trio | ||||
| 
 | ||||
| from tractor.msg import MsgCodec | ||||
| from tractor.log import get_logger | ||||
| from tractor._addr import UDSAddress | ||||
| from tractor.ipc._transport import MsgpackTransport | ||||
| 
 | ||||
| 
 | ||||
|  | @ -37,9 +36,8 @@ class MsgpackUDSStream(MsgpackTransport): | |||
|     using the ``msgspec`` codec lib. | ||||
| 
 | ||||
|     ''' | ||||
|     address_type = str | ||||
|     address_type = UDSAddress | ||||
|     layer_key: int = 7 | ||||
|     name_key: str = 'uds' | ||||
| 
 | ||||
|     # def __init__( | ||||
|     #     self, | ||||
|  | @ -54,19 +52,32 @@ class MsgpackUDSStream(MsgpackTransport): | |||
|     #         codec=codec | ||||
|     #     ) | ||||
| 
 | ||||
|     @property | ||||
|     def maddr(self) -> str: | ||||
|         filepath = self.raddr.unwrap() | ||||
|         return ( | ||||
|             f'/ipv4/localhost' | ||||
|             f'/{self.address_type.name_key}/{filepath}' | ||||
|             # f'/{self.chan.uid[0]}' | ||||
|             # f'/{self.cid}' | ||||
| 
 | ||||
|             # f'/cid={cid_head}..{cid_tail}' | ||||
|             # TODO: ? not use this ^ right ? | ||||
|         ) | ||||
| 
 | ||||
|     def connected(self) -> bool: | ||||
|         return self.stream.socket.fileno() != -1 | ||||
| 
 | ||||
|     @classmethod | ||||
|     async def connect_to( | ||||
|         cls, | ||||
|         filename: str, | ||||
|         addr: UDSAddress, | ||||
|         prefix_size: int = 4, | ||||
|         codec: MsgCodec|None = None, | ||||
|         **kwargs | ||||
|     ) -> MsgpackUDSStream: | ||||
|         stream = await trio.open_unix_socket( | ||||
|             filename, | ||||
|             addr.unwrap(), | ||||
|             **kwargs | ||||
|         ) | ||||
|         return MsgpackUDSStream( | ||||
|  | @ -79,16 +90,8 @@ class MsgpackUDSStream(MsgpackTransport): | |||
|     def get_stream_addrs( | ||||
|         cls, | ||||
|         stream: trio.SocketStream | ||||
|     ) -> tuple[str, str]: | ||||
|     ) -> tuple[UDSAddress, UDSAddress]: | ||||
|         return ( | ||||
|             stream.socket.getsockname(), | ||||
|             stream.socket.getpeername(), | ||||
|             UDSAddress.from_addr(stream.socket.getsockname()), | ||||
|             UDSAddress.from_addr(stream.socket.getsockname()), | ||||
|         ) | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_random_addr(self) -> str: | ||||
|         return f'{tempfile.gettempdir()}/{uuid4()}.sock' | ||||
| 
 | ||||
|     @classmethod | ||||
|     def get_root_addr(self) -> str: | ||||
|         return 'tractor.sock' | ||||
|  |  | |||
|  | @ -46,8 +46,8 @@ from msgspec import ( | |||
| from tractor.msg import ( | ||||
|     pretty_struct, | ||||
| ) | ||||
| from tractor.ipc import AddressTypes | ||||
| from tractor.log import get_logger | ||||
| from tractor._addr import AddressTypes | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger('tractor.msgspec') | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue