diff --git a/examples/service_discovery.py b/examples/service_discovery.py
index a0f37b88..64697e5b 100644
--- a/examples/service_discovery.py
+++ b/examples/service_discovery.py
@@ -9,7 +9,7 @@ async def main(service_name):
async with tractor.open_nursery() as an:
await an.start_actor(service_name)
- async with tractor.get_registry('127.0.0.1', 1616) as portal:
+ async with tractor.get_registry(('127.0.0.1', 1616)) as portal:
print(f"Arbiter is listening on {portal.channel}")
async with tractor.wait_for_actor(service_name) as sockaddr:
diff --git a/tests/test_discovery.py b/tests/test_discovery.py
index 87455983..18b2aa1b 100644
--- a/tests/test_discovery.py
+++ b/tests/test_discovery.py
@@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr):
portal = await n.start_actor('actor', enable_modules=[__name__])
uid = portal.channel.uid
- async with tractor.get_registry(*reg_addr) as aportal:
+ async with tractor.get_registry(reg_addr) as aportal:
# this local actor should be the arbiter
assert actor is aportal.actor
@@ -160,7 +160,7 @@ async def spawn_and_check_registry(
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
- async with tractor.get_registry(*reg_addr) as portal:
+ async with tractor.get_registry(reg_addr) as portal:
# runtime needs to be up to call this
actor = tractor.current_actor()
@@ -300,7 +300,7 @@ async def close_chans_before_nursery(
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
):
- async with tractor.get_registry(*reg_addr) as aportal:
+ async with tractor.get_registry(reg_addr) as aportal:
try:
get_reg = partial(unpack_reg, aportal)
diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py
index bac9a791..25935df2 100644
--- a/tests/test_inter_peer_cancellation.py
+++ b/tests/test_inter_peer_cancellation.py
@@ -871,7 +871,7 @@ async def serve_subactors(
)
await ipc.send((
peer.chan.uid,
- peer.chan.raddr,
+ peer.chan.raddr.unwrap(),
))
print('Spawner exiting spawn serve loop!')
diff --git a/tests/test_local.py b/tests/test_local.py
index ecdad5fe..c6f5047a 100644
--- a/tests/test_local.py
+++ b/tests/test_local.py
@@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr):
"Verify waiting on the arbiter to register itself using a local portal."
actor = tractor.current_actor()
assert actor.is_arbiter
- async with tractor.get_registry(*reg_addr) as portal:
+ async with tractor.get_registry(reg_addr) as portal:
assert isinstance(portal, tractor._portal.LocalPortal)
with trio.fail_after(0.2):
diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py
index 860eeebb..b0b145ee 100644
--- a/tests/test_multi_program.py
+++ b/tests/test_multi_program.py
@@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon):
@tractor_test
async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter
- async with tractor.get_registry(*reg_addr) as portal:
+ async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor()
time.sleep(0.1)
@@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
# no arbiter socket should exist
with pytest.raises(OSError):
- async with tractor.get_registry(*reg_addr) as portal:
+ async with tractor.get_registry(reg_addr) as portal:
pass
diff --git a/tests/test_spawning.py b/tests/test_spawning.py
index 99ec9abc..58aa955a 100644
--- a/tests/test_spawning.py
+++ b/tests/test_spawning.py
@@ -77,7 +77,7 @@ async def movie_theatre_question():
async def test_movie_theatre_convo(start_method):
"""The main ``tractor`` routine.
"""
- async with tractor.open_nursery() as n:
+ async with tractor.open_nursery(debug_mode=True) as n:
portal = await n.start_actor(
'frank',
diff --git a/tractor/_addr.py b/tractor/_addr.py
new file mode 100644
index 00000000..0944c89d
--- /dev/null
+++ b/tractor/_addr.py
@@ -0,0 +1,301 @@
+# tractor: structured concurrent "actors".
+# Copyright 2018-eternity Tyler Goodlet.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+from __future__ import annotations
+import tempfile
+from uuid import uuid4
+from typing import (
+ Protocol,
+ ClassVar,
+ TypeVar,
+ Union,
+ Type
+)
+
+import trio
+from trio import socket
+
+
+NamespaceType = TypeVar('NamespaceType')
+AddressType = TypeVar('AddressType')
+StreamType = TypeVar('StreamType')
+ListenerType = TypeVar('ListenerType')
+
+
+class Address(Protocol[
+ NamespaceType,
+ AddressType,
+ StreamType,
+ ListenerType
+]):
+
+ name_key: ClassVar[str]
+ address_type: ClassVar[Type[AddressType]]
+
+ @property
+ def is_valid(self) -> bool:
+ ...
+
+ @property
+ def namespace(self) -> NamespaceType|None:
+ ...
+
+ @classmethod
+ def from_addr(cls, addr: AddressType) -> Address:
+ ...
+
+ def unwrap(self) -> AddressType:
+ ...
+
+ @classmethod
+ def get_random(cls, namespace: NamespaceType | None = None) -> Address:
+ ...
+
+ @classmethod
+ def get_root(cls) -> Address:
+ ...
+
+ def __repr__(self) -> str:
+ ...
+
+ def __eq__(self, other) -> bool:
+ ...
+
+ async def open_stream(self, **kwargs) -> StreamType:
+ ...
+
+ async def open_listener(self, **kwargs) -> ListenerType:
+ ...
+
+
+class TCPAddress(Address[
+ str,
+ tuple[str, int],
+ trio.SocketStream,
+ trio.SocketListener
+]):
+
+ name_key: str = 'tcp'
+ address_type: type = tuple[str, int]
+
+ def __init__(
+ self,
+ host: str,
+ port: int
+ ):
+ if (
+ not isinstance(host, str)
+ or
+ not isinstance(port, int)
+ ):
+ raise TypeError(f'Expected host {host} to be str and port {port} to be int')
+ self._host = host
+ self._port = port
+
+ @property
+ def is_valid(self) -> bool:
+ return self._port != 0
+
+ @property
+ def namespace(self) -> str:
+ return self._host
+
+ @classmethod
+ def from_addr(cls, addr: tuple[str, int]) -> TCPAddress:
+ return TCPAddress(addr[0], addr[1])
+
+ def unwrap(self) -> tuple[str, int]:
+ return self._host, self._port
+
+ @classmethod
+ def get_random(cls, namespace: str = '127.0.0.1') -> TCPAddress:
+ return TCPAddress(namespace, 0)
+
+ @classmethod
+ def get_root(cls) -> Address:
+ return TCPAddress('127.0.0.1', 1616)
+
+ def __repr__(self) -> str:
+ return f'{type(self)} @ {self.unwrap()}'
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, TCPAddress):
+ raise TypeError(
+ f'Can not compare {type(other)} with {type(self)}'
+ )
+
+ return (
+ self._host == other._host
+ and
+ self._port == other._port
+ )
+
+ async def open_stream(self, **kwargs) -> trio.SocketStream:
+ stream = await trio.open_tcp_stream(
+ self._host,
+ self._port,
+ **kwargs
+ )
+ self._host, self._port = stream.socket.getsockname()[:2]
+ return stream
+
+ async def open_listener(self, **kwargs) -> trio.SocketListener:
+ listeners = await trio.open_tcp_listeners(
+ host=self._host,
+ port=self._port,
+ **kwargs
+ )
+ assert len(listeners) == 1
+ listener = listeners[0]
+ self._host, self._port = listener.socket.getsockname()[:2]
+ return listener
+
+
+class UDSAddress(Address[
+ None,
+ str,
+ trio.SocketStream,
+ trio.SocketListener
+]):
+
+ name_key: str = 'uds'
+ address_type: type = str
+
+ def __init__(
+ self,
+ filepath: str
+ ):
+ self._filepath = filepath
+
+ @property
+ def is_valid(self) -> bool:
+ return True
+
+ @property
+ def namespace(self) -> None:
+ return
+
+ @classmethod
+ def from_addr(cls, filepath: str) -> UDSAddress:
+ return UDSAddress(filepath)
+
+ def unwrap(self) -> str:
+ return self._filepath
+
+ @classmethod
+ def get_random(cls, _ns: None = None) -> UDSAddress:
+ return UDSAddress(f'{tempfile.gettempdir()}/{uuid4().sock}')
+
+ @classmethod
+ def get_root(cls) -> Address:
+ return UDSAddress('tractor.sock')
+
+ def __repr__(self) -> str:
+ return f'{type(self)} @ {self._filepath}'
+
+ def __eq__(self, other) -> bool:
+ if not isinstance(other, UDSAddress):
+ raise TypeError(
+ f'Can not compare {type(other)} with {type(self)}'
+ )
+
+ return self._filepath == other._filepath
+
+ async def open_stream(self, **kwargs) -> trio.SocketStream:
+ stream = await trio.open_tcp_stream(
+ self._filepath,
+ **kwargs
+ )
+ self._binded = True
+ return stream
+
+ async def open_listener(self, **kwargs) -> trio.SocketListener:
+ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ sock.bind(self._filepath)
+ sock.listen()
+ self._binded = True
+ return trio.SocketListener(sock)
+
+
+preferred_transport = 'tcp'
+
+
+_address_types = (
+ TCPAddress,
+ UDSAddress
+)
+
+
+_default_addrs: dict[str, Type[Address]] = {
+ cls.name_key: cls
+ for cls in _address_types
+}
+
+
+AddressTypes = Union[
+ tuple([
+ cls.address_type
+ for cls in _address_types
+ ])
+]
+
+
+_default_lo_addrs: dict[
+ str,
+ AddressTypes
+] = {
+ cls.name_key: cls.get_root().unwrap()
+ for cls in _address_types
+}
+
+
+def get_address_cls(name: str) -> Type[Address]:
+ return _default_addrs[name]
+
+
+def is_wrapped_addr(addr: any) -> bool:
+ return type(addr) in _address_types
+
+
+def wrap_address(addr: AddressTypes) -> Address:
+
+ if is_wrapped_addr(addr):
+ return addr
+
+ cls = None
+ match addr:
+ case str():
+ cls = UDSAddress
+
+ case tuple() | list():
+ cls = TCPAddress
+
+ case None:
+ cls = get_address_cls(preferred_transport)
+ addr = cls.get_root().unwrap()
+
+ case _:
+ raise TypeError(
+ f'Can not wrap addr {addr} of type {type(addr)}'
+ )
+
+ return cls.from_addr(addr)
+
+
+def default_lo_addrs(transports: list[str]) -> list[AddressTypes]:
+ return [
+ _default_lo_addrs[transport]
+ for transport in transports
+ ]
diff --git a/tractor/_child.py b/tractor/_child.py
index 4226ae90..69142889 100644
--- a/tractor/_child.py
+++ b/tractor/_child.py
@@ -31,8 +31,7 @@ def parse_uid(arg):
return str(name), str(uuid) # ensures str encoding
def parse_ipaddr(arg):
- host, port = literal_eval(arg)
- return (str(host), int(port))
+ return literal_eval(arg)
if __name__ == "__main__":
diff --git a/tractor/_context.py b/tractor/_context.py
index d93d7759..19f3daef 100644
--- a/tractor/_context.py
+++ b/tractor/_context.py
@@ -859,19 +859,10 @@ class Context:
@property
def dst_maddr(self) -> str:
chan: Channel = self.chan
- dst_addr, dst_port = chan.raddr
trans: MsgTransport = chan.transport
# cid: str = self.cid
# cid_head, cid_tail = cid[:6], cid[-6:]
- return (
- f'/ipv4/{dst_addr}'
- f'/{trans.name_key}/{dst_port}'
- # f'/{self.chan.uid[0]}'
- # f'/{self.cid}'
-
- # f'/cid={cid_head}..{cid_tail}'
- # TODO: ? not use this ^ right ?
- )
+ return trans.maddr
dmaddr = dst_maddr
diff --git a/tractor/_discovery.py b/tractor/_discovery.py
index f6f4b9d9..9258b3de 100644
--- a/tractor/_discovery.py
+++ b/tractor/_discovery.py
@@ -30,6 +30,12 @@ from contextlib import asynccontextmanager as acm
from tractor.log import get_logger
from .trionics import gather_contexts
from .ipc import _connect_chan, Channel
+from ._addr import (
+ AddressTypes,
+ Address,
+ preferred_transport,
+ wrap_address
+)
from ._portal import (
Portal,
open_portal,
@@ -48,11 +54,7 @@ log = get_logger(__name__)
@acm
-async def get_registry(
- host: str,
- port: int,
-
-) -> AsyncGenerator[
+async def get_registry(addr: AddressTypes) -> AsyncGenerator[
Portal | LocalPortal | None,
None,
]:
@@ -69,13 +71,13 @@ async def get_registry(
# (likely a re-entrant call from the arbiter actor)
yield LocalPortal(
actor,
- Channel((host, port))
+ await Channel.from_addr(addr)
)
else:
# TODO: try to look pre-existing connection from
# `Actor._peers` and use it instead?
async with (
- _connect_chan((host, port)) as chan,
+ _connect_chan(addr) as chan,
open_portal(chan) as regstr_ptl,
):
yield regstr_ptl
@@ -89,11 +91,10 @@ async def get_root(
# TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs?
- host, port = _runtime_vars['_root_mailbox']
- assert host is not None
+ addr = _runtime_vars['_root_mailbox']
async with (
- _connect_chan((host, port)) as chan,
+ _connect_chan(addr) as chan,
open_portal(chan, **kwargs) as portal,
):
yield portal
@@ -134,10 +135,10 @@ def get_peer_by_name(
@acm
async def query_actor(
name: str,
- regaddr: tuple[str, int]|None = None,
+ regaddr: AddressTypes|None = None,
) -> AsyncGenerator[
- tuple[str, int]|None,
+ AddressTypes|None,
None,
]:
'''
@@ -163,31 +164,31 @@ async def query_actor(
return
reg_portal: Portal
- regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0]
- async with get_registry(*regaddr) as reg_portal:
+ regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0]
+ async with get_registry(regaddr) as reg_portal:
# TODO: return portals to all available actors - for now
# just the last one that registered
- sockaddr: tuple[str, int] = await reg_portal.run_from_ns(
+ addr: AddressTypes = await reg_portal.run_from_ns(
'self',
'find_actor',
name=name,
)
- yield sockaddr
+ yield addr
@acm
async def maybe_open_portal(
- addr: tuple[str, int],
+ addr: AddressTypes,
name: str,
):
async with query_actor(
name=name,
regaddr=addr,
- ) as sockaddr:
+ ) as addr:
pass
- if sockaddr:
- async with _connect_chan(sockaddr) as chan:
+ if addr:
+ async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:
yield portal
else:
@@ -197,7 +198,8 @@ async def maybe_open_portal(
@acm
async def find_actor(
name: str,
- registry_addrs: list[tuple[str, int]]|None = None,
+ registry_addrs: list[AddressTypes]|None = None,
+ enable_transports: list[str] = [preferred_transport],
only_first: bool = True,
raise_on_none: bool = False,
@@ -224,15 +226,15 @@ async def find_actor(
# XXX NOTE: make sure to dynamically read the value on
# every call since something may change it globally (eg.
# like in our discovery test suite)!
- from . import _root
+ from ._addr import default_lo_addrs
registry_addrs = (
_runtime_vars['_registry_addrs']
or
- _root._default_lo_addrs
+ default_lo_addrs(enable_transports)
)
maybe_portals: list[
- AsyncContextManager[tuple[str, int]]
+ AsyncContextManager[AddressTypes]
] = list(
maybe_open_portal(
addr=addr,
@@ -274,7 +276,7 @@ async def find_actor(
@acm
async def wait_for_actor(
name: str,
- registry_addr: tuple[str, int] | None = None,
+ registry_addr: AddressTypes | None = None,
) -> AsyncGenerator[Portal, None]:
'''
@@ -291,7 +293,7 @@ async def wait_for_actor(
yield peer_portal
return
- regaddr: tuple[str, int] = (
+ regaddr: AddressTypes = (
registry_addr
or
actor.reg_addrs[0]
@@ -299,8 +301,8 @@ async def wait_for_actor(
# TODO: use `.trionics.gather_contexts()` like
# above in `find_actor()` as well?
reg_portal: Portal
- async with get_registry(*regaddr) as reg_portal:
- sockaddrs = await reg_portal.run_from_ns(
+ async with get_registry(regaddr) as reg_portal:
+ addrs = await reg_portal.run_from_ns(
'self',
'wait_for_actor',
name=name,
@@ -308,8 +310,8 @@ async def wait_for_actor(
# get latest registered addr by default?
# TODO: offer multi-portal yields in multi-homed case?
- sockaddr: tuple[str, int] = sockaddrs[-1]
+ addr: AddressTypes = addrs[-1]
- async with _connect_chan(sockaddr) as chan:
+ async with _connect_chan(addr) as chan:
async with open_portal(chan) as portal:
yield portal
diff --git a/tractor/_entry.py b/tractor/_entry.py
index 8156d25f..1328aa45 100644
--- a/tractor/_entry.py
+++ b/tractor/_entry.py
@@ -37,6 +37,7 @@ from .log import (
from . import _state
from .devx import _debug
from .to_asyncio import run_as_asyncio_guest
+from ._addr import AddressTypes
from ._runtime import (
async_main,
Actor,
@@ -52,10 +53,10 @@ log = get_logger(__name__)
def _mp_main(
actor: Actor,
- accept_addrs: list[tuple[str, int]],
+ accept_addrs: list[AddressTypes],
forkserver_info: tuple[Any, Any, Any, Any, Any],
start_method: SpawnMethodKey,
- parent_addr: tuple[str, int] | None = None,
+ parent_addr: AddressTypes | None = None,
infect_asyncio: bool = False,
) -> None:
@@ -206,7 +207,7 @@ def nest_from_op(
def _trio_main(
actor: Actor,
*,
- parent_addr: tuple[str, int] | None = None,
+ parent_addr: AddressTypes | None = None,
infect_asyncio: bool = False,
) -> None:
diff --git a/tractor/_root.py b/tractor/_root.py
index 40682a7a..e9cac3f2 100644
--- a/tractor/_root.py
+++ b/tractor/_root.py
@@ -43,21 +43,18 @@ from .devx import _debug
from . import _spawn
from . import _state
from . import log
-from .ipc import _connect_chan
+from .ipc import (
+ _connect_chan,
+)
+from ._addr import (
+ AddressTypes,
+ wrap_address,
+ preferred_transport,
+ default_lo_addrs
+)
from ._exceptions import is_multi_cancelled
-# set at startup and after forks
-_default_host: str = '127.0.0.1'
-_default_port: int = 1616
-
-# default registry always on localhost
-_default_lo_addrs: list[tuple[str, int]] = [(
- _default_host,
- _default_port,
-)]
-
-
logger = log.get_logger('tractor')
@@ -66,10 +63,12 @@ async def open_root_actor(
*,
# defaults are above
- registry_addrs: list[tuple[str, int]]|None = None,
+ registry_addrs: list[AddressTypes]|None = None,
# defaults are above
- arbiter_addr: tuple[str, int]|None = None,
+ arbiter_addr: tuple[AddressTypes]|None = None,
+
+ enable_transports: list[str] = [preferred_transport],
name: str|None = 'root',
@@ -195,11 +194,9 @@ async def open_root_actor(
)
registry_addrs = [arbiter_addr]
- registry_addrs: list[tuple[str, int]] = (
- registry_addrs
- or
- _default_lo_addrs
- )
+ if not registry_addrs:
+ registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports)
+
assert registry_addrs
loglevel = (
@@ -248,10 +245,10 @@ async def open_root_actor(
enable_stack_on_sig()
# closed into below ping task-func
- ponged_addrs: list[tuple[str, int]] = []
+ ponged_addrs: list[AddressTypes] = []
async def ping_tpt_socket(
- addr: tuple[str, int],
+ addr: AddressTypes,
timeout: float = 1,
) -> None:
'''
@@ -284,10 +281,10 @@ async def open_root_actor(
for addr in registry_addrs:
tn.start_soon(
ping_tpt_socket,
- tuple(addr), # TODO: just drop this requirement?
+ addr,
)
- trans_bind_addrs: list[tuple[str, int]] = []
+ trans_bind_addrs: list[AddressTypes] = []
# Create a new local root-actor instance which IS NOT THE
# REGISTRAR
@@ -311,9 +308,12 @@ async def open_root_actor(
)
# DO NOT use the registry_addrs as the transport server
# addrs for this new non-registar, root-actor.
- for host, port in ponged_addrs:
- # NOTE: zero triggers dynamic OS port allocation
- trans_bind_addrs.append((host, 0))
+ for addr in ponged_addrs:
+ waddr = wrap_address(addr)
+ print(waddr)
+ trans_bind_addrs.append(
+ waddr.get_random(namespace=waddr.namespace)
+ )
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
@@ -322,7 +322,7 @@ async def open_root_actor(
# NOTE that if the current actor IS THE REGISTAR, the
# following init steps are taken:
- # - the tranport layer server is bound to each (host, port)
+ # - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default.
trans_bind_addrs = registry_addrs
@@ -462,7 +462,7 @@ def run_daemon(
# runtime kwargs
name: str | None = 'root',
- registry_addrs: list[tuple[str, int]] = _default_lo_addrs,
+ registry_addrs: list[AddressTypes]|None = None,
start_method: str | None = None,
debug_mode: bool = False,
diff --git a/tractor/_runtime.py b/tractor/_runtime.py
index eaab31b6..e755d5ce 100644
--- a/tractor/_runtime.py
+++ b/tractor/_runtime.py
@@ -74,6 +74,12 @@ from tractor.msg import (
types as msgtypes,
)
from .ipc import Channel
+from ._addr import (
+ AddressTypes,
+ Address,
+ TCPAddress,
+ wrap_address,
+)
from ._context import (
mk_context,
Context,
@@ -179,11 +185,11 @@ class Actor:
enable_modules: list[str] = [],
uid: str|None = None,
loglevel: str|None = None,
- registry_addrs: list[tuple[str, int]]|None = None,
+ registry_addrs: list[AddressTypes]|None = None,
spawn_method: str|None = None,
# TODO: remove!
- arbiter_addr: tuple[str, int]|None = None,
+ arbiter_addr: AddressTypes|None = None,
) -> None:
'''
@@ -223,7 +229,7 @@ class Actor:
DeprecationWarning,
stacklevel=2,
)
- registry_addrs: list[tuple[str, int]] = [arbiter_addr]
+ registry_addrs: list[AddressTypes] = [arbiter_addr]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@@ -257,6 +263,7 @@ class Actor:
] = {}
self._listeners: list[trio.abc.Listener] = []
+ self._listen_addrs: list[Address] = []
self._parent_chan: Channel|None = None
self._forkserver_info: tuple|None = None
@@ -269,13 +276,13 @@ class Actor:
# when provided, init the registry addresses property from
# input via the validator.
- self._reg_addrs: list[tuple[str, int]] = []
+ self._reg_addrs: list[AddressTypes] = []
if registry_addrs:
- self.reg_addrs: list[tuple[str, int]] = registry_addrs
+ self.reg_addrs: list[AddressTypes] = registry_addrs
_state._runtime_vars['_registry_addrs'] = registry_addrs
@property
- def reg_addrs(self) -> list[tuple[str, int]]:
+ def reg_addrs(self) -> list[AddressTypes]:
'''
List of (socket) addresses for all known (and contactable)
registry actors.
@@ -286,7 +293,7 @@ class Actor:
@reg_addrs.setter
def reg_addrs(
self,
- addrs: list[tuple[str, int]],
+ addrs: list[AddressTypes],
) -> None:
if not addrs:
log.warning(
@@ -295,16 +302,7 @@ class Actor:
)
return
- # always sanity check the input list since it's critical
- # that addrs are correct for discovery sys operation.
- for addr in addrs:
- if not isinstance(addr, tuple):
- raise ValueError(
- 'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n'
- f'Got {addrs}'
- )
-
- self._reg_addrs = addrs
+ self._reg_addrs = addrs
async def wait_for_peer(
self,
@@ -1024,11 +1022,11 @@ class Actor:
async def _from_parent(
self,
- parent_addr: tuple[str, int]|None,
+ parent_addr: AddressTypes|None,
) -> tuple[
Channel,
- list[tuple[str, int]]|None,
+ list[AddressTypes]|None,
]:
'''
Bootstrap this local actor's runtime config from its parent by
@@ -1040,13 +1038,13 @@ class Actor:
# Connect back to the parent actor and conduct initial
# handshake. From this point on if we error, we
# attempt to ship the exception back to the parent.
- chan = await Channel.from_destaddr(parent_addr)
+ chan = await Channel.from_addr(wrap_address(parent_addr))
# TODO: move this into a `Channel.handshake()`?
# Initial handshake: swap names.
await self._do_handshake(chan)
- accept_addrs: list[tuple[str, int]]|None = None
+ accept_addrs: list[AddressTypes]|None = None
if self._spawn_method == "trio":
@@ -1063,7 +1061,7 @@ class Actor:
# if "trace"/"util" mode is enabled?
f'{pretty_struct.pformat(spawnspec)}\n'
)
- accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs
+ accept_addrs: list[AddressTypes] = spawnspec.bind_addrs
# TODO: another `Struct` for rtvs..
rvs: dict[str, Any] = spawnspec._runtime_vars
@@ -1170,8 +1168,7 @@ class Actor:
self,
handler_nursery: Nursery,
*,
- # (host, port) to bind for channel server
- listen_sockaddrs: list[tuple[str, int]]|None = None,
+ listen_addrs: list[AddressTypes]|None = None,
task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED,
) -> None:
@@ -1183,37 +1180,39 @@ class Actor:
`.cancel_server()` is called.
'''
- if listen_sockaddrs is None:
- listen_sockaddrs = [(None, 0)]
+ if listen_addrs is None:
+ listen_addrs = [TCPAddress.get_random()]
+
+ else:
+ listen_addrs: list[Address] = [
+ wrap_address(a) for a in listen_addrs
+ ]
self._server_down = trio.Event()
try:
async with trio.open_nursery() as server_n:
+ listeners: list[trio.abc.Listener] = [
+ await addr.open_listener()
+ for addr in listen_addrs
+ ]
+ await server_n.start(
+ partial(
+ trio.serve_listeners,
+ handler=self._stream_handler,
+ listeners=listeners,
- for host, port in listen_sockaddrs:
- listeners: list[trio.abc.Listener] = await server_n.start(
- partial(
- trio.serve_tcp,
-
- handler=self._stream_handler,
- port=port,
- host=host,
-
- # NOTE: configured such that new
- # connections will stay alive even if
- # this server is cancelled!
- handler_nursery=handler_nursery,
- )
+ # NOTE: configured such that new
+ # connections will stay alive even if
+ # this server is cancelled!
+ handler_nursery=handler_nursery
)
- sockets: list[trio.socket] = [
- getattr(listener, 'socket', 'unknown socket')
- for listener in listeners
- ]
- log.runtime(
- 'Started TCP server(s)\n'
- f'|_{sockets}\n'
- )
- self._listeners.extend(listeners)
+ )
+ log.runtime(
+ 'Started server(s)\n'
+ '\n'.join([f'|_{addr}' for addr in listen_addrs])
+ )
+ self._listen_addrs.extend(listen_addrs)
+ self._listeners.extend(listeners)
task_status.started(server_n)
@@ -1576,26 +1575,21 @@ class Actor:
return False
@property
- def accept_addrs(self) -> list[tuple[str, int]]:
+ def accept_addrs(self) -> list[AddressTypes]:
'''
All addresses to which the transport-channel server binds
and listens for new connections.
'''
- # throws OSError on failure
- return [
- listener.socket.getsockname()
- for listener in self._listeners
- ] # type: ignore
+ return [a.unwrap() for a in self._listen_addrs]
@property
- def accept_addr(self) -> tuple[str, int]:
+ def accept_addr(self) -> AddressTypes:
'''
Primary address to which the IPC transport server is
bound and listening for new connections.
'''
- # throws OSError on failure
return self.accept_addrs[0]
def get_parent(self) -> Portal:
@@ -1667,7 +1661,7 @@ class Actor:
async def async_main(
actor: Actor,
- accept_addrs: tuple[str, int]|None = None,
+ accept_addrs: AddressTypes|None = None,
# XXX: currently ``parent_addr`` is only needed for the
# ``multiprocessing`` backend (which pickles state sent to
@@ -1676,7 +1670,7 @@ async def async_main(
# change this to a simple ``is_subactor: bool`` which will
# be False when running as root actor and True when as
# a subactor.
- parent_addr: tuple[str, int]|None = None,
+ parent_addr: AddressTypes|None = None,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
@@ -1766,7 +1760,7 @@ async def async_main(
partial(
actor._serve_forever,
service_nursery,
- listen_sockaddrs=accept_addrs,
+ listen_addrs=accept_addrs,
)
)
except OSError as oserr:
@@ -1782,7 +1776,7 @@ async def async_main(
raise
- accept_addrs: list[tuple[str, int]] = actor.accept_addrs
+ accept_addrs: list[AddressTypes] = actor.accept_addrs
# NOTE: only set the loopback addr for the
# process-tree-global "root" mailbox since
@@ -1790,9 +1784,8 @@ async def async_main(
# their root actor over that channel.
if _state._runtime_vars['_is_root']:
for addr in accept_addrs:
- host, _ = addr
- # TODO: generic 'lo' detector predicate
- if '127.0.0.1' in host:
+ waddr = wrap_address(addr)
+ if waddr == waddr.get_root():
_state._runtime_vars['_root_mailbox'] = addr
# Register with the arbiter if we're told its addr
@@ -1807,24 +1800,21 @@ async def async_main(
# only on unique actor uids?
for addr in actor.reg_addrs:
try:
- assert isinstance(addr, tuple)
- assert addr[1] # non-zero after bind
+ waddr = wrap_address(addr)
+ assert waddr.is_valid
except AssertionError:
await _debug.pause()
- async with get_registry(*addr) as reg_portal:
+ async with get_registry(addr) as reg_portal:
for accept_addr in accept_addrs:
-
- if not accept_addr[1]:
- await _debug.pause()
-
- assert accept_addr[1]
+ accept_addr = wrap_address(accept_addr)
+ assert accept_addr.is_valid
await reg_portal.run_from_ns(
'self',
'register_actor',
uid=actor.uid,
- sockaddr=accept_addr,
+ addr=accept_addr.unwrap(),
)
is_registered: bool = True
@@ -1951,12 +1941,13 @@ async def async_main(
):
failed: bool = False
for addr in actor.reg_addrs:
- assert isinstance(addr, tuple)
+ waddr = wrap_address(addr)
+ assert waddr.is_valid
with trio.move_on_after(0.5) as cs:
cs.shield = True
try:
async with get_registry(
- *addr,
+ addr,
) as reg_portal:
await reg_portal.run_from_ns(
'self',
@@ -2034,7 +2025,7 @@ class Arbiter(Actor):
self._registry: dict[
tuple[str, str],
- tuple[str, int],
+ AddressTypes,
] = {}
self._waiters: dict[
str,
@@ -2050,18 +2041,18 @@ class Arbiter(Actor):
self,
name: str,
- ) -> tuple[str, int]|None:
+ ) -> AddressTypes|None:
- for uid, sockaddr in self._registry.items():
+ for uid, addr in self._registry.items():
if name in uid:
- return sockaddr
+ return addr
return None
async def get_registry(
self
- ) -> dict[str, tuple[str, int]]:
+ ) -> dict[str, AddressTypes]:
'''
Return current name registry.
@@ -2081,7 +2072,7 @@ class Arbiter(Actor):
self,
name: str,
- ) -> list[tuple[str, int]]:
+ ) -> list[AddressTypes]:
'''
Wait for a particular actor to register.
@@ -2089,44 +2080,41 @@ class Arbiter(Actor):
registered.
'''
- sockaddrs: list[tuple[str, int]] = []
- sockaddr: tuple[str, int]
+ addrs: list[AddressTypes] = []
+ addr: AddressTypes
mailbox_info: str = 'Actor registry contact infos:\n'
- for uid, sockaddr in self._registry.items():
+ for uid, addr in self._registry.items():
mailbox_info += (
f'|_uid: {uid}\n'
- f'|_sockaddr: {sockaddr}\n\n'
+ f'|_addr: {addr}\n\n'
)
if name == uid[0]:
- sockaddrs.append(sockaddr)
+ addrs.append(addr)
- if not sockaddrs:
+ if not addrs:
waiter = trio.Event()
self._waiters.setdefault(name, []).append(waiter)
await waiter.wait()
for uid in self._waiters[name]:
if not isinstance(uid, trio.Event):
- sockaddrs.append(self._registry[uid])
+ addrs.append(self._registry[uid])
log.runtime(mailbox_info)
- return sockaddrs
+ return addrs
async def register_actor(
self,
uid: tuple[str, str],
- sockaddr: tuple[str, int]
-
+ addr: AddressTypes
) -> None:
uid = name, hash = (str(uid[0]), str(uid[1]))
- addr = (host, port) = (
- str(sockaddr[0]),
- int(sockaddr[1]),
- )
- if port == 0:
+ waddr: Address = wrap_address(addr)
+ if not waddr.is_valid:
+ # should never be 0-dynamic-os-alloc
await _debug.pause()
- assert port # should never be 0-dynamic-os-alloc
+
self._registry[uid] = addr
# pop and signal all waiter events
diff --git a/tractor/_spawn.py b/tractor/_spawn.py
index dc2429d9..d1eb7f37 100644
--- a/tractor/_spawn.py
+++ b/tractor/_spawn.py
@@ -46,6 +46,7 @@ from tractor._state import (
_runtime_vars,
)
from tractor.log import get_logger
+from tractor._addr import AddressTypes
from tractor._portal import Portal
from tractor._runtime import Actor
from tractor._entry import _mp_main
@@ -392,8 +393,8 @@ async def new_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
- bind_addrs: list[tuple[str, int]],
- parent_addr: tuple[str, int],
+ bind_addrs: list[AddressTypes],
+ parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
@@ -431,8 +432,8 @@ async def trio_proc(
errors: dict[tuple[str, str], Exception],
# passed through to actor main
- bind_addrs: list[tuple[str, int]],
- parent_addr: tuple[str, int],
+ bind_addrs: list[AddressTypes],
+ parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
@@ -520,15 +521,15 @@ async def trio_proc(
# send a "spawning specification" which configures the
# initial runtime state of the child.
- await chan.send(
- SpawnSpec(
- _parent_main_data=subactor._parent_main_data,
- enable_modules=subactor.enable_modules,
- reg_addrs=subactor.reg_addrs,
- bind_addrs=bind_addrs,
- _runtime_vars=_runtime_vars,
- )
+ sspec = SpawnSpec(
+ _parent_main_data=subactor._parent_main_data,
+ enable_modules=subactor.enable_modules,
+ reg_addrs=subactor.reg_addrs,
+ bind_addrs=bind_addrs,
+ _runtime_vars=_runtime_vars,
)
+ log.runtime(f'Sending spawn spec: {str(sspec)}')
+ await chan.send(sspec)
# track subactor in current nursery
curr_actor: Actor = current_actor()
@@ -638,8 +639,8 @@ async def mp_proc(
subactor: Actor,
errors: dict[tuple[str, str], Exception],
# passed through to actor main
- bind_addrs: list[tuple[str, int]],
- parent_addr: tuple[str, int],
+ bind_addrs: list[AddressTypes],
+ parent_addr: AddressTypes,
_runtime_vars: dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
diff --git a/tractor/_supervise.py b/tractor/_supervise.py
index 052a5f4c..2a3842f7 100644
--- a/tractor/_supervise.py
+++ b/tractor/_supervise.py
@@ -28,7 +28,13 @@ import warnings
import trio
+
from .devx._debug import maybe_wait_for_debugger
+from ._addr import (
+ AddressTypes,
+ preferred_transport,
+ get_address_cls
+)
from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel
from ._runtime import Actor
@@ -47,8 +53,6 @@ if TYPE_CHECKING:
log = get_logger(__name__)
-_default_bind_addr: tuple[str, int] = ('127.0.0.1', 0)
-
class ActorNursery:
'''
@@ -130,8 +134,9 @@ class ActorNursery:
*,
- bind_addrs: list[tuple[str, int]] = [_default_bind_addr],
+ bind_addrs: list[AddressTypes]|None = None,
rpc_module_paths: list[str]|None = None,
+ enable_transports: list[str] = [preferred_transport],
enable_modules: list[str]|None = None,
loglevel: str|None = None, # set log level per subactor
debug_mode: bool|None = None,
@@ -156,6 +161,12 @@ class ActorNursery:
or get_loglevel()
)
+ if not bind_addrs:
+ bind_addrs: list[AddressTypes] = [
+ get_address_cls(transport).get_random().unwrap()
+ for transport in enable_transports
+ ]
+
# configure and pass runtime state
_rtv = _state._runtime_vars.copy()
_rtv['_is_root'] = False
@@ -224,7 +235,7 @@ class ActorNursery:
*,
name: str | None = None,
- bind_addrs: tuple[str, int] = [_default_bind_addr],
+ bind_addrs: AddressTypes|None = None,
rpc_module_paths: list[str] | None = None,
enable_modules: list[str] | None = None,
loglevel: str | None = None, # set log level per subactor
diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py
index dd4d9e5a..9bd2240f 100644
--- a/tractor/ipc/__init__.py
+++ b/tractor/ipc/__init__.py
@@ -17,7 +17,6 @@ import platform
from ._transport import (
MsgTransportKey as MsgTransportKey,
- AddressType as AddressType,
MsgType as MsgType,
MsgTransport as MsgTransport,
MsgpackTransport as MsgpackTransport
@@ -27,10 +26,8 @@ from ._tcp import MsgpackTCPStream as MsgpackTCPStream
from ._uds import MsgpackUDSStream as MsgpackUDSStream
from ._types import (
- default_lo_addrs as default_lo_addrs,
- transport_from_destaddr as transport_from_destaddr,
+ transport_from_addr as transport_from_addr,
transport_from_stream as transport_from_stream,
- AddressTypes as AddressTypes
)
from ._chan import (
diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py
index ee259371..93f17132 100644
--- a/tractor/ipc/_chan.py
+++ b/tractor/ipc/_chan.py
@@ -35,8 +35,12 @@ import trio
from tractor.ipc._transport import MsgTransport
from tractor.ipc._types import (
- transport_from_destaddr,
+ transport_from_addr,
transport_from_stream,
+)
+from tractor._addr import (
+ wrap_address,
+ Address,
AddressTypes
)
from tractor.log import get_logger
@@ -66,7 +70,6 @@ class Channel:
def __init__(
self,
- destaddr: AddressTypes|None = None,
transport: MsgTransport|None = None,
# TODO: optional reconnection support?
# auto_reconnect: bool = False,
@@ -81,8 +84,6 @@ class Channel:
# user in ``.from_stream()``.
self._transport: MsgTransport|None = transport
- self._destaddr = destaddr if destaddr else self._transport.raddr
-
# set after handshake - always uid of far end
self.uid: tuple[str, str]|None = None
@@ -121,13 +122,14 @@ class Channel:
)
@classmethod
- async def from_destaddr(
+ async def from_addr(
cls,
- destaddr: AddressTypes,
+ addr: AddressTypes,
**kwargs
) -> Channel:
- transport_cls = transport_from_destaddr(destaddr)
- transport = await transport_cls.connect_to(destaddr, **kwargs)
+ addr: Address = wrap_address(addr)
+ transport_cls = transport_from_addr(addr)
+ transport = await transport_cls.connect_to(addr, **kwargs)
log.transport(
f'Opened channel[{type(transport)}]: {transport.laddr} -> {transport.raddr}'
@@ -164,11 +166,11 @@ class Channel:
)
@property
- def laddr(self) -> tuple[str, int]|None:
+ def laddr(self) -> Address|None:
return self._transport.laddr if self._transport else None
@property
- def raddr(self) -> tuple[str, int]|None:
+ def raddr(self) -> Address|None:
return self._transport.raddr if self._transport else None
# TODO: something like,
@@ -205,7 +207,11 @@ class Channel:
# assert err
__tracebackhide__: bool = False
else:
- assert err.cid
+ try:
+ assert err.cid
+
+ except KeyError:
+ raise err
raise
@@ -332,14 +338,14 @@ class Channel:
@acm
async def _connect_chan(
- destaddr: AddressTypes
+ addr: AddressTypes
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a channel with disconnect on context manager
teardown.
'''
- chan = await Channel.from_destaddr(destaddr)
+ chan = await Channel.from_addr(addr)
yield chan
with trio.CancelScope(shield=True):
await chan.aclose()
diff --git a/tractor/ipc/_ringbuf.py b/tractor/ipc/_ringbuf.py
index 10975b7a..6a71a00a 100644
--- a/tractor/ipc/_ringbuf.py
+++ b/tractor/ipc/_ringbuf.py
@@ -183,6 +183,9 @@ class RingBuffSender(trio.abc.SendStream):
def wrap_fd(self) -> int:
return self._wrap_event.fd
+ async def _wait_wrap(self):
+ await self._wrap_event.read()
+
async def send_all(self, data: Buffer):
async with self._send_lock:
# while data is larger than the remaining buf
@@ -193,7 +196,7 @@ class RingBuffSender(trio.abc.SendStream):
self._shm.buf[self.ptr:] = data[:remaining]
# signal write and wait for reader wrap around
self._write_event.write(remaining)
- await self._wrap_event.read()
+ await self._wait_wrap()
# wrap around and trim already written bytes
self._ptr = 0
diff --git a/tractor/ipc/_tcp.py b/tractor/ipc/_tcp.py
index 4a69ebbd..a8008519 100644
--- a/tractor/ipc/_tcp.py
+++ b/tractor/ipc/_tcp.py
@@ -23,6 +23,7 @@ import trio
from tractor.msg import MsgCodec
from tractor.log import get_logger
+from tractor._addr import TCPAddress
from tractor.ipc._transport import MsgpackTransport
@@ -38,9 +39,8 @@ class MsgpackTCPStream(MsgpackTransport):
using the ``msgspec`` codec lib.
'''
- address_type = tuple[str, int]
+ address_type = TCPAddress
layer_key: int = 4
- name_key: str = 'tcp'
# def __init__(
# self,
@@ -55,19 +55,32 @@ class MsgpackTCPStream(MsgpackTransport):
# codec=codec
# )
+ @property
+ def maddr(self) -> str:
+ host, port = self.raddr.unwrap()
+ return (
+ f'/ipv4/{host}'
+ f'/{self.address_type.name_key}/{port}'
+ # f'/{self.chan.uid[0]}'
+ # f'/{self.cid}'
+
+ # f'/cid={cid_head}..{cid_tail}'
+ # TODO: ? not use this ^ right ?
+ )
+
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
@classmethod
async def connect_to(
cls,
- destaddr: tuple[str, int],
+ destaddr: TCPAddress,
prefix_size: int = 4,
codec: MsgCodec|None = None,
**kwargs
) -> MsgpackTCPStream:
stream = await trio.open_tcp_stream(
- *destaddr,
+ *destaddr.unwrap(),
**kwargs
)
return MsgpackTCPStream(
@@ -87,14 +100,6 @@ class MsgpackTCPStream(MsgpackTransport):
lsockname = stream.socket.getsockname()
rsockname = stream.socket.getpeername()
return (
- tuple(lsockname[:2]),
- tuple(rsockname[:2]),
+ TCPAddress.from_addr(tuple(lsockname[:2])),
+ TCPAddress.from_addr(tuple(rsockname[:2])),
)
-
- @classmethod
- def get_random_addr(self) -> tuple[str, int]:
- return (None, 0)
-
- @classmethod
- def get_root_addr(self) -> tuple[str, int]:
- return ('127.0.0.1', 1616)
diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py
index 55a218f7..8c35ffee 100644
--- a/tractor/ipc/_transport.py
+++ b/tractor/ipc/_transport.py
@@ -50,6 +50,7 @@ from tractor.msg import (
types as msgtypes,
pretty_struct,
)
+from tractor._addr import Address
log = get_logger(__name__)
@@ -62,12 +63,11 @@ MsgTransportKey = tuple[str, str]
# ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..?
# => BLEH, except can't bc prots must inherit typevar or param-spec
# vars..
-AddressType = TypeVar('AddressType')
MsgType = TypeVar('MsgType')
@runtime_checkable
-class MsgTransport(Protocol[AddressType, MsgType]):
+class MsgTransport(Protocol[MsgType]):
#
# ^-TODO-^ consider using a generic def and indexing with our
# eventual msg definition/types?
@@ -75,10 +75,9 @@ class MsgTransport(Protocol[AddressType, MsgType]):
stream: trio.abc.Stream
drained: list[MsgType]
- address_type: ClassVar[Type[AddressType]]
+ address_type: ClassVar[Type[Address]]
codec_key: ClassVar[str]
- name_key: ClassVar[str]
# XXX: should this instead be called `.sendall()`?
async def send(self, msg: MsgType) -> None:
@@ -100,20 +99,24 @@ class MsgTransport(Protocol[AddressType, MsgType]):
@classmethod
def key(cls) -> MsgTransportKey:
- return cls.codec_key, cls.name_key
+ return cls.codec_key, cls.address_type.name_key
@property
- def laddr(self) -> AddressType:
+ def laddr(self) -> Address:
...
@property
- def raddr(self) -> AddressType:
+ def raddr(self) -> Address:
+ ...
+
+ @property
+ def maddr(self) -> str:
...
@classmethod
async def connect_to(
cls,
- destaddr: AddressType,
+ addr: Address,
**kwargs
) -> MsgTransport:
...
@@ -123,8 +126,8 @@ class MsgTransport(Protocol[AddressType, MsgType]):
cls,
stream: trio.abc.Stream
) -> tuple[
- AddressType, # local
- AddressType # remote
+ Address, # local
+ Address # remote
]:
'''
Return the `trio` streaming transport prot's addrs for both
@@ -133,14 +136,6 @@ class MsgTransport(Protocol[AddressType, MsgType]):
'''
...
- @classmethod
- def get_random_addr(self) -> AddressType:
- ...
-
- @classmethod
- def get_root_addr(self) -> AddressType:
- ...
-
class MsgpackTransport(MsgTransport):
@@ -447,9 +442,9 @@ class MsgpackTransport(MsgTransport):
return self._aiter_pkts
@property
- def laddr(self) -> AddressType:
+ def laddr(self) -> Address:
return self._laddr
@property
- def raddr(self) -> AddressType:
+ def raddr(self) -> Address:
return self._raddr
diff --git a/tractor/ipc/_types.py b/tractor/ipc/_types.py
index 92d3af91..3e0e43e5 100644
--- a/tractor/ipc/_types.py
+++ b/tractor/ipc/_types.py
@@ -13,49 +13,42 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
-from typing import Type, Union
+from typing import Type
import trio
import socket
-from ._transport import (
+from tractor._addr import Address
+from tractor.ipc._transport import (
MsgTransportKey,
MsgTransport
)
-from ._tcp import MsgpackTCPStream
-from ._uds import MsgpackUDSStream
+from tractor.ipc._tcp import MsgpackTCPStream
+from tractor.ipc._uds import MsgpackUDSStream
+# manually updated list of all supported msg transport types
_msg_transports = [
MsgpackTCPStream,
MsgpackUDSStream
]
-# manually updated list of all supported codec+transport types
-key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = {
+# convert a MsgTransportKey to the corresponding transport type
+_key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = {
cls.key(): cls
for cls in _msg_transports
}
-
-# all different address py types we use
-AddressTypes = Union[
- tuple([
- cls.address_type
- for cls in _msg_transports
- ])
-]
-
-
-default_lo_addrs: dict[MsgTransportKey, AddressTypes] = {
- cls.key(): cls.get_root_addr()
+# convert an Address wrapper to its corresponding transport type
+_addr_to_transport: dict[Type[Address], Type[MsgTransport]] = {
+ cls.address_type: cls
for cls in _msg_transports
}
-def transport_from_destaddr(
- destaddr: AddressTypes,
+def transport_from_addr(
+ addr: Address,
codec_key: str = 'msgpack',
) -> Type[MsgTransport]:
'''
@@ -63,23 +56,13 @@ def transport_from_destaddr(
corresponding `MsgTransport` type.
'''
- match destaddr:
- case str():
- return MsgpackUDSStream
+ try:
+ return _addr_to_transport[type(addr)]
- case tuple():
- if (
- len(destaddr) == 2
- and
- isinstance(destaddr[0], str)
- and
- isinstance(destaddr[1], int)
- ):
- return MsgpackTCPStream
-
- raise NotImplementedError(
- f'No known transport for address {destaddr}'
- )
+ except KeyError:
+ raise NotImplementedError(
+ f'No known transport for address {repr(addr)}'
+ )
def transport_from_stream(
@@ -113,4 +96,4 @@ def transport_from_stream(
key = (codec_key, transport)
- return _msg_transports[key]
+ return _key_to_transport[key]
diff --git a/tractor/ipc/_uds.py b/tractor/ipc/_uds.py
index eb2e7f32..ee147d42 100644
--- a/tractor/ipc/_uds.py
+++ b/tractor/ipc/_uds.py
@@ -18,13 +18,12 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco
'''
from __future__ import annotations
-import tempfile
-from uuid import uuid4
import trio
from tractor.msg import MsgCodec
from tractor.log import get_logger
+from tractor._addr import UDSAddress
from tractor.ipc._transport import MsgpackTransport
@@ -37,9 +36,8 @@ class MsgpackUDSStream(MsgpackTransport):
using the ``msgspec`` codec lib.
'''
- address_type = str
+ address_type = UDSAddress
layer_key: int = 7
- name_key: str = 'uds'
# def __init__(
# self,
@@ -54,19 +52,32 @@ class MsgpackUDSStream(MsgpackTransport):
# codec=codec
# )
+ @property
+ def maddr(self) -> str:
+ filepath = self.raddr.unwrap()
+ return (
+ f'/ipv4/localhost'
+ f'/{self.address_type.name_key}/{filepath}'
+ # f'/{self.chan.uid[0]}'
+ # f'/{self.cid}'
+
+ # f'/cid={cid_head}..{cid_tail}'
+ # TODO: ? not use this ^ right ?
+ )
+
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
@classmethod
async def connect_to(
cls,
- filename: str,
+ addr: UDSAddress,
prefix_size: int = 4,
codec: MsgCodec|None = None,
**kwargs
) -> MsgpackUDSStream:
stream = await trio.open_unix_socket(
- filename,
+ addr.unwrap(),
**kwargs
)
return MsgpackUDSStream(
@@ -79,16 +90,8 @@ class MsgpackUDSStream(MsgpackTransport):
def get_stream_addrs(
cls,
stream: trio.SocketStream
- ) -> tuple[str, str]:
+ ) -> tuple[UDSAddress, UDSAddress]:
return (
- stream.socket.getsockname(),
- stream.socket.getpeername(),
+ UDSAddress.from_addr(stream.socket.getsockname()),
+ UDSAddress.from_addr(stream.socket.getsockname()),
)
-
- @classmethod
- def get_random_addr(self) -> str:
- return f'{tempfile.gettempdir()}/{uuid4()}.sock'
-
- @classmethod
- def get_root_addr(self) -> str:
- return 'tractor.sock'
diff --git a/tractor/msg/types.py b/tractor/msg/types.py
index 76d0bad6..3e58ae3a 100644
--- a/tractor/msg/types.py
+++ b/tractor/msg/types.py
@@ -46,8 +46,8 @@ from msgspec import (
from tractor.msg import (
pretty_struct,
)
-from tractor.ipc import AddressTypes
from tractor.log import get_logger
+from tractor._addr import AddressTypes
log = get_logger('tractor.msgspec')