From a47a7a39b114380d4ea76e023feee79ec1b27adc Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sat, 22 Mar 2025 15:29:48 -0300 Subject: [PATCH 1/4] Starting to make tractor.ipc.Channel work with multiple MsgTransports --- tractor/_discovery.py | 8 +- tractor/_root.py | 2 +- tractor/_runtime.py | 5 +- tractor/ipc/__init__.py | 20 +- tractor/ipc/_chan.py | 121 +++-------- tractor/ipc/_tcp.py | 409 +++++--------------------------------- tractor/ipc/_transport.py | 382 ++++++++++++++++++++++++++++++++++- tractor/ipc/_types.py | 101 ++++++++++ tractor/ipc/_uds.py | 84 ++++++++ 9 files changed, 657 insertions(+), 475 deletions(-) create mode 100644 tractor/ipc/_types.py create mode 100644 tractor/ipc/_uds.py diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 1c3cbff0..f6f4b9d9 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -75,7 +75,7 @@ async def get_registry( # TODO: try to look pre-existing connection from # `Actor._peers` and use it instead? async with ( - _connect_chan(host, port) as chan, + _connect_chan((host, port)) as chan, open_portal(chan) as regstr_ptl, ): yield regstr_ptl @@ -93,7 +93,7 @@ async def get_root( assert host is not None async with ( - _connect_chan(host, port) as chan, + _connect_chan((host, port)) as chan, open_portal(chan, **kwargs) as portal, ): yield portal @@ -187,7 +187,7 @@ async def maybe_open_portal( pass if sockaddr: - async with _connect_chan(*sockaddr) as chan: + async with _connect_chan(sockaddr) as chan: async with open_portal(chan) as portal: yield portal else: @@ -310,6 +310,6 @@ async def wait_for_actor( # TODO: offer multi-portal yields in multi-homed case? sockaddr: tuple[str, int] = sockaddrs[-1] - async with _connect_chan(*sockaddr) as chan: + async with _connect_chan(sockaddr) as chan: async with open_portal(chan) as portal: yield portal diff --git a/tractor/_root.py b/tractor/_root.py index 35639c15..40682a7a 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -271,7 +271,7 @@ async def open_root_actor( # be better to eventually have a "discovery" protocol # with basic handshake instead? with trio.move_on_after(timeout): - async with _connect_chan(*addr): + async with _connect_chan(addr): ponged_addrs.append(addr) except OSError: diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 2c8dbbd9..eaab31b6 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -1040,10 +1040,7 @@ class Actor: # Connect back to the parent actor and conduct initial # handshake. From this point on if we error, we # attempt to ship the exception back to the parent. - chan = Channel( - destaddr=parent_addr, - ) - await chan.connect() + chan = await Channel.from_destaddr(parent_addr) # TODO: move this into a `Channel.handshake()`? # Initial handshake: swap names. diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py index 4f0cd2b4..0c8e09ca 100644 --- a/tractor/ipc/__init__.py +++ b/tractor/ipc/__init__.py @@ -13,20 +13,26 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . - - import platform -from ._transport import MsgTransport as MsgTransport +from ._transport import ( + AddressType as AddressType, + MsgType as MsgType, + MsgTransport as MsgTransport, + MsgpackTransport as MsgpackTransport +) -from ._tcp import ( - get_stream_addrs as get_stream_addrs, - MsgpackTCPStream as MsgpackTCPStream +from ._tcp import MsgpackTCPStream as MsgpackTCPStream +from ._uds import MsgpackUDSStream as MsgpackUDSStream + +from ._types import ( + transport_from_destaddr as transport_from_destaddr, + transport_from_stream as transport_from_stream, + AddressTypes as AddressTypes ) from ._chan import ( _connect_chan as _connect_chan, - get_msg_transport as get_msg_transport, Channel as Channel ) diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index 1b6ba29f..ee259371 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -29,15 +29,15 @@ from pprint import pformat import typing from typing import ( Any, - Type ) import trio from tractor.ipc._transport import MsgTransport -from tractor.ipc._tcp import ( - MsgpackTCPStream, - get_stream_addrs +from tractor.ipc._types import ( + transport_from_destaddr, + transport_from_stream, + AddressTypes ) from tractor.log import get_logger from tractor._exceptions import ( @@ -52,17 +52,6 @@ log = get_logger(__name__) _is_windows = platform.system() == 'Windows' -def get_msg_transport( - - key: tuple[str, str], - -) -> Type[MsgTransport]: - - return { - ('msgpack', 'tcp'): MsgpackTCPStream, - }[key] - - class Channel: ''' An inter-process channel for communication between (remote) actors. @@ -77,10 +66,8 @@ class Channel: def __init__( self, - destaddr: tuple[str, int]|None, - - msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'), - + destaddr: AddressTypes|None = None, + transport: MsgTransport|None = None, # TODO: optional reconnection support? # auto_reconnect: bool = False, # on_reconnect: typing.Callable[..., typing.Awaitable] = None, @@ -90,13 +77,11 @@ class Channel: # self._recon_seq = on_reconnect # self._autorecon = auto_reconnect - self._destaddr = destaddr - self._transport_key = msg_transport_type_key - # Either created in ``.connect()`` or passed in by # user in ``.from_stream()``. - self._stream: trio.SocketStream|None = None - self._transport: MsgTransport|None = None + self._transport: MsgTransport|None = transport + + self._destaddr = destaddr if destaddr else self._transport.raddr # set after handshake - always uid of far end self.uid: tuple[str, str]|None = None @@ -110,6 +95,10 @@ class Channel: # runtime. self._cancel_called: bool = False + @property + def stream(self) -> trio.abc.Stream | None: + return self._transport.stream if self._transport else None + @property def msgstream(self) -> MsgTransport: log.info( @@ -124,52 +113,31 @@ class Channel: @classmethod def from_stream( cls, - stream: trio.SocketStream, - **kwargs, - + stream: trio.abc.Stream, ) -> Channel: - - src, dst = get_stream_addrs(stream) - chan = Channel( - destaddr=dst, - **kwargs, + transport_cls = transport_from_stream(stream) + return Channel( + transport=transport_cls(stream) ) - # set immediately here from provided instance - chan._stream: trio.SocketStream = stream - chan.set_msg_transport(stream) - return chan + @classmethod + async def from_destaddr( + cls, + destaddr: AddressTypes, + **kwargs + ) -> Channel: + transport_cls = transport_from_destaddr(destaddr) + transport = await transport_cls.connect_to(destaddr, **kwargs) - def set_msg_transport( - self, - stream: trio.SocketStream, - type_key: tuple[str, str]|None = None, - - # XXX optionally provided codec pair for `msgspec`: - # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - codec: MsgCodec|None = None, - - ) -> MsgTransport: - type_key = ( - type_key - or - self._transport_key + log.transport( + f'Opened channel[{type(transport)}]: {transport.laddr} -> {transport.raddr}' ) - # get transport type, then - self._transport = get_msg_transport( - type_key - # instantiate an instance of the msg-transport - )( - stream, - codec=codec, - ) - return self._transport + return Channel(transport=transport) @cm def apply_codec( self, codec: MsgCodec, - ) -> None: ''' Temporarily override the underlying IPC msg codec for @@ -189,7 +157,7 @@ class Channel: return '' return repr( - self._transport.stream.socket._sock + self._transport ).replace( # type: ignore "socket.socket", "Channel", @@ -203,30 +171,6 @@ class Channel: def raddr(self) -> tuple[str, int]|None: return self._transport.raddr if self._transport else None - async def connect( - self, - destaddr: tuple[Any, ...] | None = None, - **kwargs - - ) -> MsgTransport: - - if self.connected(): - raise RuntimeError("channel is already connected?") - - destaddr = destaddr or self._destaddr - assert isinstance(destaddr, tuple) - - stream = await trio.open_tcp_stream( - *destaddr, - **kwargs - ) - transport = self.set_msg_transport(stream) - - log.transport( - f'Opened channel[{type(transport)}]: {self.laddr} -> {self.raddr}' - ) - return transport - # TODO: something like, # `pdbp.hideframe_on(errors=[MsgTypeError])` # instead of the `try/except` hack we have rn.. @@ -388,17 +332,14 @@ class Channel: @acm async def _connect_chan( - host: str, - port: int - + destaddr: AddressTypes ) -> typing.AsyncGenerator[Channel, None]: ''' Create and connect a channel with disconnect on context manager teardown. ''' - chan = Channel((host, port)) - await chan.connect() + chan = await Channel.from_destaddr(destaddr) yield chan with trio.CancelScope(shield=True): await chan.aclose() diff --git a/tractor/ipc/_tcp.py b/tractor/ipc/_tcp.py index 3ce0b4ea..71265f38 100644 --- a/tractor/ipc/_tcp.py +++ b/tractor/ipc/_tcp.py @@ -18,388 +18,75 @@ TCP implementation of tractor.ipc._transport.MsgTransport protocol ''' from __future__ import annotations -from collections.abc import ( - AsyncGenerator, - AsyncIterator, -) -import struct -from typing import ( - Any, - Callable, -) -import msgspec -from tricycle import BufferedReceiveStream import trio +from tractor.msg import MsgCodec from tractor.log import get_logger -from tractor._exceptions import ( - MsgTypeError, - TransportClosed, - _mk_send_mte, - _mk_recv_mte, -) -from tractor.msg import ( - _ctxvar_MsgCodec, - # _codec, XXX see `self._codec` sanity/debug checks - MsgCodec, - types as msgtypes, - pretty_struct, -) -from tractor.ipc import MsgTransport +from tractor.ipc._transport import MsgpackTransport log = get_logger(__name__) -def get_stream_addrs( - stream: trio.SocketStream -) -> tuple[ - tuple[str, int], # local - tuple[str, int], # remote -]: - ''' - Return the `trio` streaming transport prot's socket-addrs for - both the local and remote sides as a pair. - - ''' - # rn, should both be IP sockets - lsockname = stream.socket.getsockname() - rsockname = stream.socket.getpeername() - return ( - tuple(lsockname[:2]), - tuple(rsockname[:2]), - ) - - # TODO: typing oddity.. not sure why we have to inherit here, but it # seems to be an issue with `get_msg_transport()` returning # a `Type[Protocol]`; probably should make a `mypy` issue? -class MsgpackTCPStream(MsgTransport): +class MsgpackTCPStream(MsgpackTransport): ''' A ``trio.SocketStream`` delivering ``msgpack`` formatted data using the ``msgspec`` codec lib. ''' + address_type = tuple[str, int] layer_key: int = 4 name_key: str = 'tcp' - # TODO: better naming for this? - # -[ ] check how libp2p does naming for such things? - codec_key: str = 'msgpack' + # def __init__( + # self, + # stream: trio.SocketStream, + # prefix_size: int = 4, + # codec: CodecType = None, - def __init__( - self, - stream: trio.SocketStream, - prefix_size: int = 4, - - # XXX optionally provided codec pair for `msgspec`: - # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types - # - # TODO: define this as a `Codec` struct which can be - # overriden dynamically by the application/runtime? - codec: tuple[ - Callable[[Any], Any]|None, # coder - Callable[[type, Any], Any]|None, # decoder - ]|None = None, - - ) -> None: - - self.stream = stream - assert self.stream.socket - - # should both be IP sockets - self._laddr, self._raddr = get_stream_addrs(stream) - - # create read loop instance - self._aiter_pkts = self._iter_packets() - self._send_lock = trio.StrictFIFOLock() - - # public i guess? - self.drained: list[dict] = [] - - self.recv_stream = BufferedReceiveStream( - transport_stream=stream - ) - self.prefix_size = prefix_size - - # allow for custom IPC msg interchange format - # dynamic override Bo - self._task = trio.lowlevel.current_task() - - # XXX for ctxvar debug only! - # self._codec: MsgCodec = ( - # codec - # or - # _codec._ctxvar_MsgCodec.get() - # ) - - async def _iter_packets(self) -> AsyncGenerator[dict, None]: - ''' - Yield `bytes`-blob decoded packets from the underlying TCP - stream using the current task's `MsgCodec`. - - This is a streaming routine implemented as an async generator - func (which was the original design, but could be changed?) - and is allocated by a `.__call__()` inside `.__init__()` where - it is assigned to the `._aiter_pkts` attr. - - ''' - decodes_failed: int = 0 - - while True: - try: - header: bytes = await self.recv_stream.receive_exactly(4) - except ( - ValueError, - ConnectionResetError, - - # not sure entirely why we need this but without it we - # seem to be getting racy failures here on - # arbiter/registry name subs.. - trio.BrokenResourceError, - - ) as trans_err: - - loglevel = 'transport' - match trans_err: - # case ( - # ConnectionResetError() - # ): - # loglevel = 'transport' - - # peer actor (graceful??) TCP EOF but `tricycle` - # seems to raise a 0-bytes-read? - case ValueError() if ( - 'unclean EOF' in trans_err.args[0] - ): - pass - - # peer actor (task) prolly shutdown quickly due - # to cancellation - case trio.BrokenResourceError() if ( - 'Connection reset by peer' in trans_err.args[0] - ): - pass - - # unless the disconnect condition falls under "a - # normal operation breakage" we usualy console warn - # about it. - case _: - loglevel: str = 'warning' - - - raise TransportClosed( - message=( - f'IPC transport already closed by peer\n' - f'x)> {type(trans_err)}\n' - f' |_{self}\n' - ), - loglevel=loglevel, - ) from trans_err - - # XXX definitely can happen if transport is closed - # manually by another `trio.lowlevel.Task` in the - # same actor; we use this in some simulated fault - # testing for ex, but generally should never happen - # under normal operation! - # - # NOTE: as such we always re-raise this error from the - # RPC msg loop! - except trio.ClosedResourceError as closure_err: - raise TransportClosed( - message=( - f'IPC transport already manually closed locally?\n' - f'x)> {type(closure_err)} \n' - f' |_{self}\n' - ), - loglevel='error', - raise_on_report=( - closure_err.args[0] == 'another task closed this fd' - or - closure_err.args[0] in ['another task closed this fd'] - ), - ) from closure_err - - # graceful TCP EOF disconnect - if header == b'': - raise TransportClosed( - message=( - f'IPC transport already gracefully closed\n' - f')>\n' - f'|_{self}\n' - ), - loglevel='transport', - # cause=??? # handy or no? - ) - - size: int - size, = struct.unpack(" None: - ''' - Send a msgpack encoded py-object-blob-as-msg over TCP. - - If `strict_types == True` then a `MsgTypeError` will be raised on any - invalid msg type - - ''' - __tracebackhide__: bool = hide_tb - - # XXX see `trio._sync.AsyncContextManagerMixin` for details - # on the `.acquire()`/`.release()` sequencing.. - async with self._send_lock: - - # NOTE: lookup the `trio.Task.context`'s var for - # the current `MsgCodec`. - codec: MsgCodec = _ctxvar_MsgCodec.get() - - # XXX for ctxvar debug only! - # if self._codec.pld_spec != codec.pld_spec: - # self._codec = codec - # log.runtime( - # f'Using new codec in {self}.send()\n' - # f'codec: {self._codec}\n\n' - # f'msg: {msg}\n' - # ) - - if type(msg) not in msgtypes.__msg_types__: - if strict_types: - raise _mk_send_mte( - msg, - codec=codec, - ) - else: - log.warning( - 'Sending non-`Msg`-spec msg?\n\n' - f'{msg}\n' - ) - - try: - bytes_data: bytes = codec.encode(msg) - except TypeError as _err: - typerr = _err - msgtyperr: MsgTypeError = _mk_send_mte( - msg, - codec=codec, - message=( - f'IPC-msg-spec violation in\n\n' - f'{pretty_struct.Struct.pformat(msg)}' - ), - src_type_error=typerr, - ) - raise msgtyperr from typerr - - # supposedly the fastest says, - # https://stackoverflow.com/a/54027962 - size: bytes = struct.pack(" - # except BaseException as _err: - # err = _err - # if not isinstance(err, MsgTypeError): - # __tracebackhide__: bool = False - # raise - - @property - def laddr(self) -> tuple[str, int]: - return self._laddr - - @property - def raddr(self) -> tuple[str, int]: - return self._raddr - - async def recv(self) -> Any: - return await self._aiter_pkts.asend(None) - - async def drain(self) -> AsyncIterator[dict]: - ''' - Drain the stream's remaining messages sent from - the far end until the connection is closed by - the peer. - - ''' - try: - async for msg in self._iter_packets(): - self.drained.append(msg) - except TransportClosed: - for msg in self.drained: - yield msg - - def __aiter__(self): - return self._aiter_pkts + # ) -> None: + # super().__init__( + # stream, + # prefix_size=prefix_size, + # codec=codec + # ) def connected(self) -> bool: return self.stream.socket.fileno() != -1 + + @classmethod + async def connect_to( + cls, + destaddr: tuple[str, int], + prefix_size: int = 4, + codec: MsgCodec|None = None, + **kwargs + ) -> MsgpackTCPStream: + stream = await trio.open_tcp_stream( + *destaddr, + **kwargs + ) + return MsgpackTCPStream( + stream, + prefix_size=prefix_size, + codec=codec + ) + + @classmethod + def get_stream_addrs( + cls, + stream: trio.SocketStream + ) -> tuple[ + tuple[str, int], + tuple[str, int] + ]: + lsockname = stream.socket.getsockname() + rsockname = stream.socket.getpeername() + return ( + tuple(lsockname[:2]), + tuple(rsockname[:2]), + ) diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 64453c89..70ba2088 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -18,24 +18,56 @@ typing.Protocol based generic msg API, implement this class to add backends for tractor.ipc.Channel ''' -import trio +from __future__ import annotations from typing import ( runtime_checkable, + Type, Protocol, TypeVar, + ClassVar ) -from collections.abc import AsyncIterator +from collections.abc import ( + AsyncGenerator, + AsyncIterator, +) +import struct +from typing import ( + Any, + Callable, +) + +import trio +import msgspec +from tricycle import BufferedReceiveStream + +from tractor.log import get_logger +from tractor._exceptions import ( + MsgTypeError, + TransportClosed, + _mk_send_mte, + _mk_recv_mte, +) +from tractor.msg import ( + _ctxvar_MsgCodec, + # _codec, XXX see `self._codec` sanity/debug checks + MsgCodec, + types as msgtypes, + pretty_struct, +) + +log = get_logger(__name__) # from tractor.msg.types import MsgType # ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? # => BLEH, except can't bc prots must inherit typevar or param-spec # vars.. +AddressType = TypeVar('AddressType') MsgType = TypeVar('MsgType') @runtime_checkable -class MsgTransport(Protocol[MsgType]): +class MsgTransport(Protocol[AddressType, MsgType]): # # ^-TODO-^ consider using a generic def and indexing with our # eventual msg definition/types? @@ -43,9 +75,7 @@ class MsgTransport(Protocol[MsgType]): stream: trio.abc.Stream drained: list[MsgType] - - def __init__(self, stream: trio.abc.Stream) -> None: - ... + address_type: ClassVar[Type[AddressType]] # XXX: should this instead be called `.sendall()`? async def send(self, msg: MsgType) -> None: @@ -66,9 +96,345 @@ class MsgTransport(Protocol[MsgType]): ... @property - def laddr(self) -> tuple[str, int]: + def laddr(self) -> AddressType: ... @property - def raddr(self) -> tuple[str, int]: + def raddr(self) -> AddressType: ... + + @classmethod + async def connect_to( + cls, + destaddr: AddressType, + **kwargs + ) -> MsgTransport: + ... + + @classmethod + def get_stream_addrs( + cls, + stream: trio.abc.Stream + ) -> tuple[ + AddressType, # local + AddressType # remote + ]: + ''' + Return the `trio` streaming transport prot's addrs for both + the local and remote sides as a pair. + + ''' + ... + + +class MsgpackTransport(MsgTransport): + + # TODO: better naming for this? + # -[ ] check how libp2p does naming for such things? + codec_key: str = 'msgpack' + + def __init__( + self, + stream: trio.abc.Stream, + prefix_size: int = 4, + + # XXX optionally provided codec pair for `msgspec`: + # https://jcristharif.com/msgspec/extending.html#mapping-to-from-native-types + # + # TODO: define this as a `Codec` struct which can be + # overriden dynamically by the application/runtime? + codec: MsgCodec = None, + + ) -> None: + self.stream = stream + self._laddr, self._raddr = self.get_stream_addrs(stream) + + # create read loop instance + self._aiter_pkts = self._iter_packets() + self._send_lock = trio.StrictFIFOLock() + + # public i guess? + self.drained: list[dict] = [] + + self.recv_stream = BufferedReceiveStream( + transport_stream=stream + ) + self.prefix_size = prefix_size + + # allow for custom IPC msg interchange format + # dynamic override Bo + self._task = trio.lowlevel.current_task() + + # XXX for ctxvar debug only! + # self._codec: MsgCodec = ( + # codec + # or + # _codec._ctxvar_MsgCodec.get() + # ) + + async def _iter_packets(self) -> AsyncGenerator[dict, None]: + ''' + Yield `bytes`-blob decoded packets from the underlying TCP + stream using the current task's `MsgCodec`. + + This is a streaming routine implemented as an async generator + func (which was the original design, but could be changed?) + and is allocated by a `.__call__()` inside `.__init__()` where + it is assigned to the `._aiter_pkts` attr. + + ''' + decodes_failed: int = 0 + + while True: + try: + header: bytes = await self.recv_stream.receive_exactly(4) + except ( + ValueError, + ConnectionResetError, + + # not sure entirely why we need this but without it we + # seem to be getting racy failures here on + # arbiter/registry name subs.. + trio.BrokenResourceError, + + ) as trans_err: + + loglevel = 'transport' + match trans_err: + # case ( + # ConnectionResetError() + # ): + # loglevel = 'transport' + + # peer actor (graceful??) TCP EOF but `tricycle` + # seems to raise a 0-bytes-read? + case ValueError() if ( + 'unclean EOF' in trans_err.args[0] + ): + pass + + # peer actor (task) prolly shutdown quickly due + # to cancellation + case trio.BrokenResourceError() if ( + 'Connection reset by peer' in trans_err.args[0] + ): + pass + + # unless the disconnect condition falls under "a + # normal operation breakage" we usualy console warn + # about it. + case _: + loglevel: str = 'warning' + + + raise TransportClosed( + message=( + f'IPC transport already closed by peer\n' + f'x)> {type(trans_err)}\n' + f' |_{self}\n' + ), + loglevel=loglevel, + ) from trans_err + + # XXX definitely can happen if transport is closed + # manually by another `trio.lowlevel.Task` in the + # same actor; we use this in some simulated fault + # testing for ex, but generally should never happen + # under normal operation! + # + # NOTE: as such we always re-raise this error from the + # RPC msg loop! + except trio.ClosedResourceError as closure_err: + raise TransportClosed( + message=( + f'IPC transport already manually closed locally?\n' + f'x)> {type(closure_err)} \n' + f' |_{self}\n' + ), + loglevel='error', + raise_on_report=( + closure_err.args[0] == 'another task closed this fd' + or + closure_err.args[0] in ['another task closed this fd'] + ), + ) from closure_err + + # graceful TCP EOF disconnect + if header == b'': + raise TransportClosed( + message=( + f'IPC transport already gracefully closed\n' + f')>\n' + f'|_{self}\n' + ), + loglevel='transport', + # cause=??? # handy or no? + ) + + size: int + size, = struct.unpack(" None: + ''' + Send a msgpack encoded py-object-blob-as-msg over TCP. + + If `strict_types == True` then a `MsgTypeError` will be raised on any + invalid msg type + + ''' + __tracebackhide__: bool = hide_tb + + # XXX see `trio._sync.AsyncContextManagerMixin` for details + # on the `.acquire()`/`.release()` sequencing.. + async with self._send_lock: + + # NOTE: lookup the `trio.Task.context`'s var for + # the current `MsgCodec`. + codec: MsgCodec = _ctxvar_MsgCodec.get() + + # XXX for ctxvar debug only! + # if self._codec.pld_spec != codec.pld_spec: + # self._codec = codec + # log.runtime( + # f'Using new codec in {self}.send()\n' + # f'codec: {self._codec}\n\n' + # f'msg: {msg}\n' + # ) + + if type(msg) not in msgtypes.__msg_types__: + if strict_types: + raise _mk_send_mte( + msg, + codec=codec, + ) + else: + log.warning( + 'Sending non-`Msg`-spec msg?\n\n' + f'{msg}\n' + ) + + try: + bytes_data: bytes = codec.encode(msg) + except TypeError as _err: + typerr = _err + msgtyperr: MsgTypeError = _mk_send_mte( + msg, + codec=codec, + message=( + f'IPC-msg-spec violation in\n\n' + f'{pretty_struct.Struct.pformat(msg)}' + ), + src_type_error=typerr, + ) + raise msgtyperr from typerr + + # supposedly the fastest says, + # https://stackoverflow.com/a/54027962 + size: bytes = struct.pack(" + # except BaseException as _err: + # err = _err + # if not isinstance(err, MsgTypeError): + # __tracebackhide__: bool = False + # raise + + async def recv(self) -> Any: + return await self._aiter_pkts.asend(None) + + async def drain(self) -> AsyncIterator[dict]: + ''' + Drain the stream's remaining messages sent from + the far end until the connection is closed by + the peer. + + ''' + try: + async for msg in self._iter_packets(): + self.drained.append(msg) + except TransportClosed: + for msg in self.drained: + yield msg + + def __aiter__(self): + return self._aiter_pkts + + @property + def laddr(self) -> AddressType: + return self._laddr + + @property + def raddr(self) -> AddressType: + return self._raddr diff --git a/tractor/ipc/_types.py b/tractor/ipc/_types.py new file mode 100644 index 00000000..93c5e3c9 --- /dev/null +++ b/tractor/ipc/_types.py @@ -0,0 +1,101 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from typing import Type, Union + +import trio +import socket + +from ._transport import MsgTransport +from ._tcp import MsgpackTCPStream +from ._uds import MsgpackUDSStream + + +# manually updated list of all supported codec+transport types +_msg_transports = { + ('msgpack', 'tcp'): MsgpackTCPStream, + ('msgpack', 'uds'): MsgpackUDSStream +} + + +# all different address py types we use +AddressTypes = Union[ + tuple([ + cls.address_type + for key, cls in _msg_transports.items() + ]) +] + + +def transport_from_destaddr( + destaddr: AddressTypes, + codec_key: str = 'msgpack', +) -> Type[MsgTransport]: + ''' + Given a destination address and a desired codec, find the + corresponding `MsgTransport` type. + + ''' + match destaddr: + case str(): + return MsgpackUDSStream + + case tuple(): + if ( + len(destaddr) == 2 + and + isinstance(destaddr[0], str) + and + isinstance(destaddr[1], int) + ): + return MsgpackTCPStream + + raise NotImplementedError( + f'No known transport for address {destaddr}' + ) + + +def transport_from_stream( + stream: trio.abc.Stream, + codec_key: str = 'msgpack' +) -> Type[MsgTransport]: + ''' + Given an arbitrary `trio.abc.Stream` and a desired codec, + find the corresponding `MsgTransport` type. + + ''' + transport = None + if isinstance(stream, trio.SocketStream): + sock = stream.socket + match sock.family: + case socket.AF_INET | socket.AF_INET6: + transport = 'tcp' + + case socket.AF_UNIX: + transport = 'uds' + + case _: + raise NotImplementedError( + f'Unsupported socket family: {sock.family}' + ) + + if not transport: + raise NotImplementedError( + f'Could not figure out transport type for stream type {type(stream)}' + ) + + key = (codec_key, transport) + + return _msg_transports[key] diff --git a/tractor/ipc/_uds.py b/tractor/ipc/_uds.py new file mode 100644 index 00000000..3b848898 --- /dev/null +++ b/tractor/ipc/_uds.py @@ -0,0 +1,84 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +''' +Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protocol + +''' +from __future__ import annotations + +import trio + +from tractor.msg import MsgCodec +from tractor.log import get_logger +from tractor.ipc._transport import MsgpackTransport + + +log = get_logger(__name__) + + +class MsgpackUDSStream(MsgpackTransport): + ''' + A ``trio.SocketStream`` delivering ``msgpack`` formatted data + using the ``msgspec`` codec lib. + + ''' + address_type = str + layer_key: int = 7 + name_key: str = 'uds' + + # def __init__( + # self, + # stream: trio.SocketStream, + # prefix_size: int = 4, + # codec: CodecType = None, + + # ) -> None: + # super().__init__( + # stream, + # prefix_size=prefix_size, + # codec=codec + # ) + + def connected(self) -> bool: + return self.stream.socket.fileno() != -1 + + @classmethod + async def connect_to( + cls, + filename: str, + prefix_size: int = 4, + codec: MsgCodec|None = None, + **kwargs + ) -> MsgpackUDSStream: + stream = await trio.open_unix_socket( + filename, + **kwargs + ) + return MsgpackUDSStream( + stream, + prefix_size=prefix_size, + codec=codec + ) + + @classmethod + def get_stream_addrs( + cls, + stream: trio.SocketStream + ) -> tuple[str, str]: + return ( + stream.socket.getsockname(), + stream.socket.getpeername(), + ) -- 2.34.1 From 5f50206d841360f9acb972fdbf2c93d1251ed201 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sat, 22 Mar 2025 16:17:50 -0300 Subject: [PATCH 2/4] Add root and random addr getters on MsgTransport type --- tractor/ipc/__init__.py | 2 ++ tractor/ipc/_tcp.py | 8 ++++++++ tractor/ipc/_transport.py | 25 ++++++++++++++++++++----- tractor/ipc/_types.py | 25 ++++++++++++++++++++----- tractor/ipc/_uds.py | 10 ++++++++++ tractor/msg/types.py | 5 +++-- 6 files changed, 63 insertions(+), 12 deletions(-) diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py index 0c8e09ca..dd4d9e5a 100644 --- a/tractor/ipc/__init__.py +++ b/tractor/ipc/__init__.py @@ -16,6 +16,7 @@ import platform from ._transport import ( + MsgTransportKey as MsgTransportKey, AddressType as AddressType, MsgType as MsgType, MsgTransport as MsgTransport, @@ -26,6 +27,7 @@ from ._tcp import MsgpackTCPStream as MsgpackTCPStream from ._uds import MsgpackUDSStream as MsgpackUDSStream from ._types import ( + default_lo_addrs as default_lo_addrs, transport_from_destaddr as transport_from_destaddr, transport_from_stream as transport_from_stream, AddressTypes as AddressTypes diff --git a/tractor/ipc/_tcp.py b/tractor/ipc/_tcp.py index 71265f38..4a69ebbd 100644 --- a/tractor/ipc/_tcp.py +++ b/tractor/ipc/_tcp.py @@ -90,3 +90,11 @@ class MsgpackTCPStream(MsgpackTransport): tuple(lsockname[:2]), tuple(rsockname[:2]), ) + + @classmethod + def get_random_addr(self) -> tuple[str, int]: + return (None, 0) + + @classmethod + def get_root_addr(self) -> tuple[str, int]: + return ('127.0.0.1', 1616) diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 70ba2088..55a218f7 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -31,10 +31,6 @@ from collections.abc import ( AsyncIterator, ) import struct -from typing import ( - Any, - Callable, -) import trio import msgspec @@ -58,6 +54,10 @@ from tractor.msg import ( log = get_logger(__name__) +# (codec, transport) +MsgTransportKey = tuple[str, str] + + # from tractor.msg.types import MsgType # ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? # => BLEH, except can't bc prots must inherit typevar or param-spec @@ -77,6 +77,9 @@ class MsgTransport(Protocol[AddressType, MsgType]): drained: list[MsgType] address_type: ClassVar[Type[AddressType]] + codec_key: ClassVar[str] + name_key: ClassVar[str] + # XXX: should this instead be called `.sendall()`? async def send(self, msg: MsgType) -> None: ... @@ -95,6 +98,10 @@ class MsgTransport(Protocol[AddressType, MsgType]): def drain(self) -> AsyncIterator[dict]: ... + @classmethod + def key(cls) -> MsgTransportKey: + return cls.codec_key, cls.name_key + @property def laddr(self) -> AddressType: ... @@ -126,6 +133,14 @@ class MsgTransport(Protocol[AddressType, MsgType]): ''' ... + @classmethod + def get_random_addr(self) -> AddressType: + ... + + @classmethod + def get_root_addr(self) -> AddressType: + ... + class MsgpackTransport(MsgTransport): @@ -411,7 +426,7 @@ class MsgpackTransport(MsgTransport): # __tracebackhide__: bool = False # raise - async def recv(self) -> Any: + async def recv(self) -> msgtypes.MsgType: return await self._aiter_pkts.asend(None) async def drain(self) -> AsyncIterator[dict]: diff --git a/tractor/ipc/_types.py b/tractor/ipc/_types.py index 93c5e3c9..92d3af91 100644 --- a/tractor/ipc/_types.py +++ b/tractor/ipc/_types.py @@ -18,15 +18,24 @@ from typing import Type, Union import trio import socket -from ._transport import MsgTransport +from ._transport import ( + MsgTransportKey, + MsgTransport +) from ._tcp import MsgpackTCPStream from ._uds import MsgpackUDSStream +_msg_transports = [ + MsgpackTCPStream, + MsgpackUDSStream +] + + # manually updated list of all supported codec+transport types -_msg_transports = { - ('msgpack', 'tcp'): MsgpackTCPStream, - ('msgpack', 'uds'): MsgpackUDSStream +key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = { + cls.key(): cls + for cls in _msg_transports } @@ -34,11 +43,17 @@ _msg_transports = { AddressTypes = Union[ tuple([ cls.address_type - for key, cls in _msg_transports.items() + for cls in _msg_transports ]) ] +default_lo_addrs: dict[MsgTransportKey, AddressTypes] = { + cls.key(): cls.get_root_addr() + for cls in _msg_transports +} + + def transport_from_destaddr( destaddr: AddressTypes, codec_key: str = 'msgpack', diff --git a/tractor/ipc/_uds.py b/tractor/ipc/_uds.py index 3b848898..eb2e7f32 100644 --- a/tractor/ipc/_uds.py +++ b/tractor/ipc/_uds.py @@ -18,6 +18,8 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco ''' from __future__ import annotations +import tempfile +from uuid import uuid4 import trio @@ -82,3 +84,11 @@ class MsgpackUDSStream(MsgpackTransport): stream.socket.getsockname(), stream.socket.getpeername(), ) + + @classmethod + def get_random_addr(self) -> str: + return f'{tempfile.gettempdir()}/{uuid4()}.sock' + + @classmethod + def get_root_addr(self) -> str: + return 'tractor.sock' diff --git a/tractor/msg/types.py b/tractor/msg/types.py index 1cc8b78e..76d0bad6 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -46,6 +46,7 @@ from msgspec import ( from tractor.msg import ( pretty_struct, ) +from tractor.ipc import AddressTypes from tractor.log import get_logger @@ -167,8 +168,8 @@ class SpawnSpec( # TODO: not just sockaddr pairs? # -[ ] abstract into a `TransportAddr` type? - reg_addrs: list[tuple[str, int]] - bind_addrs: list[tuple[str, int]] + reg_addrs: list[AddressTypes] + bind_addrs: list[AddressTypes] # TODO: caps based RPC support in the payload? -- 2.34.1 From 76cee99fc2a1e54f22e0189222c190e777352fa2 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sun, 23 Mar 2025 00:14:04 -0300 Subject: [PATCH 3/4] Finally switch to using address protocol in all runtime --- examples/service_discovery.py | 2 +- tests/test_discovery.py | 6 +- tests/test_inter_peer_cancellation.py | 2 +- tests/test_local.py | 2 +- tests/test_multi_program.py | 4 +- tests/test_spawning.py | 2 +- tractor/_addr.py | 301 ++++++++++++++++++++++++++ tractor/_child.py | 3 +- tractor/_context.py | 11 +- tractor/_discovery.py | 62 +++--- tractor/_entry.py | 7 +- tractor/_root.py | 56 ++--- tractor/_runtime.py | 182 ++++++++-------- tractor/_spawn.py | 29 +-- tractor/_supervise.py | 19 +- tractor/ipc/__init__.py | 5 +- tractor/ipc/_chan.py | 32 +-- tractor/ipc/_ringbuf.py | 5 +- tractor/ipc/_tcp.py | 33 +-- tractor/ipc/_transport.py | 35 ++- tractor/ipc/_types.py | 57 ++--- tractor/ipc/_uds.py | 37 ++-- tractor/msg/types.py | 2 +- 23 files changed, 590 insertions(+), 304 deletions(-) create mode 100644 tractor/_addr.py diff --git a/examples/service_discovery.py b/examples/service_discovery.py index a0f37b88..64697e5b 100644 --- a/examples/service_discovery.py +++ b/examples/service_discovery.py @@ -9,7 +9,7 @@ async def main(service_name): async with tractor.open_nursery() as an: await an.start_actor(service_name) - async with tractor.get_registry('127.0.0.1', 1616) as portal: + async with tractor.get_registry(('127.0.0.1', 1616)) as portal: print(f"Arbiter is listening on {portal.channel}") async with tractor.wait_for_actor(service_name) as sockaddr: diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 87455983..18b2aa1b 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -26,7 +26,7 @@ async def test_reg_then_unreg(reg_addr): portal = await n.start_actor('actor', enable_modules=[__name__]) uid = portal.channel.uid - async with tractor.get_registry(*reg_addr) as aportal: + async with tractor.get_registry(reg_addr) as aportal: # this local actor should be the arbiter assert actor is aportal.actor @@ -160,7 +160,7 @@ async def spawn_and_check_registry( async with tractor.open_root_actor( registry_addrs=[reg_addr], ): - async with tractor.get_registry(*reg_addr) as portal: + async with tractor.get_registry(reg_addr) as portal: # runtime needs to be up to call this actor = tractor.current_actor() @@ -300,7 +300,7 @@ async def close_chans_before_nursery( async with tractor.open_root_actor( registry_addrs=[reg_addr], ): - async with tractor.get_registry(*reg_addr) as aportal: + async with tractor.get_registry(reg_addr) as aportal: try: get_reg = partial(unpack_reg, aportal) diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index bac9a791..25935df2 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -871,7 +871,7 @@ async def serve_subactors( ) await ipc.send(( peer.chan.uid, - peer.chan.raddr, + peer.chan.raddr.unwrap(), )) print('Spawner exiting spawn serve loop!') diff --git a/tests/test_local.py b/tests/test_local.py index ecdad5fe..c6f5047a 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -38,7 +38,7 @@ async def test_self_is_registered_localportal(reg_addr): "Verify waiting on the arbiter to register itself using a local portal." actor = tractor.current_actor() assert actor.is_arbiter - async with tractor.get_registry(*reg_addr) as portal: + async with tractor.get_registry(reg_addr) as portal: assert isinstance(portal, tractor._portal.LocalPortal) with trio.fail_after(0.2): diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index 860eeebb..b0b145ee 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -32,7 +32,7 @@ def test_abort_on_sigint(daemon): @tractor_test async def test_cancel_remote_arbiter(daemon, reg_addr): assert not tractor.current_actor().is_arbiter - async with tractor.get_registry(*reg_addr) as portal: + async with tractor.get_registry(reg_addr) as portal: await portal.cancel_actor() time.sleep(0.1) @@ -41,7 +41,7 @@ async def test_cancel_remote_arbiter(daemon, reg_addr): # no arbiter socket should exist with pytest.raises(OSError): - async with tractor.get_registry(*reg_addr) as portal: + async with tractor.get_registry(reg_addr) as portal: pass diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 99ec9abc..58aa955a 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -77,7 +77,7 @@ async def movie_theatre_question(): async def test_movie_theatre_convo(start_method): """The main ``tractor`` routine. """ - async with tractor.open_nursery() as n: + async with tractor.open_nursery(debug_mode=True) as n: portal = await n.start_actor( 'frank', diff --git a/tractor/_addr.py b/tractor/_addr.py new file mode 100644 index 00000000..0944c89d --- /dev/null +++ b/tractor/_addr.py @@ -0,0 +1,301 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from __future__ import annotations +import tempfile +from uuid import uuid4 +from typing import ( + Protocol, + ClassVar, + TypeVar, + Union, + Type +) + +import trio +from trio import socket + + +NamespaceType = TypeVar('NamespaceType') +AddressType = TypeVar('AddressType') +StreamType = TypeVar('StreamType') +ListenerType = TypeVar('ListenerType') + + +class Address(Protocol[ + NamespaceType, + AddressType, + StreamType, + ListenerType +]): + + name_key: ClassVar[str] + address_type: ClassVar[Type[AddressType]] + + @property + def is_valid(self) -> bool: + ... + + @property + def namespace(self) -> NamespaceType|None: + ... + + @classmethod + def from_addr(cls, addr: AddressType) -> Address: + ... + + def unwrap(self) -> AddressType: + ... + + @classmethod + def get_random(cls, namespace: NamespaceType | None = None) -> Address: + ... + + @classmethod + def get_root(cls) -> Address: + ... + + def __repr__(self) -> str: + ... + + def __eq__(self, other) -> bool: + ... + + async def open_stream(self, **kwargs) -> StreamType: + ... + + async def open_listener(self, **kwargs) -> ListenerType: + ... + + +class TCPAddress(Address[ + str, + tuple[str, int], + trio.SocketStream, + trio.SocketListener +]): + + name_key: str = 'tcp' + address_type: type = tuple[str, int] + + def __init__( + self, + host: str, + port: int + ): + if ( + not isinstance(host, str) + or + not isinstance(port, int) + ): + raise TypeError(f'Expected host {host} to be str and port {port} to be int') + self._host = host + self._port = port + + @property + def is_valid(self) -> bool: + return self._port != 0 + + @property + def namespace(self) -> str: + return self._host + + @classmethod + def from_addr(cls, addr: tuple[str, int]) -> TCPAddress: + return TCPAddress(addr[0], addr[1]) + + def unwrap(self) -> tuple[str, int]: + return self._host, self._port + + @classmethod + def get_random(cls, namespace: str = '127.0.0.1') -> TCPAddress: + return TCPAddress(namespace, 0) + + @classmethod + def get_root(cls) -> Address: + return TCPAddress('127.0.0.1', 1616) + + def __repr__(self) -> str: + return f'{type(self)} @ {self.unwrap()}' + + def __eq__(self, other) -> bool: + if not isinstance(other, TCPAddress): + raise TypeError( + f'Can not compare {type(other)} with {type(self)}' + ) + + return ( + self._host == other._host + and + self._port == other._port + ) + + async def open_stream(self, **kwargs) -> trio.SocketStream: + stream = await trio.open_tcp_stream( + self._host, + self._port, + **kwargs + ) + self._host, self._port = stream.socket.getsockname()[:2] + return stream + + async def open_listener(self, **kwargs) -> trio.SocketListener: + listeners = await trio.open_tcp_listeners( + host=self._host, + port=self._port, + **kwargs + ) + assert len(listeners) == 1 + listener = listeners[0] + self._host, self._port = listener.socket.getsockname()[:2] + return listener + + +class UDSAddress(Address[ + None, + str, + trio.SocketStream, + trio.SocketListener +]): + + name_key: str = 'uds' + address_type: type = str + + def __init__( + self, + filepath: str + ): + self._filepath = filepath + + @property + def is_valid(self) -> bool: + return True + + @property + def namespace(self) -> None: + return + + @classmethod + def from_addr(cls, filepath: str) -> UDSAddress: + return UDSAddress(filepath) + + def unwrap(self) -> str: + return self._filepath + + @classmethod + def get_random(cls, _ns: None = None) -> UDSAddress: + return UDSAddress(f'{tempfile.gettempdir()}/{uuid4().sock}') + + @classmethod + def get_root(cls) -> Address: + return UDSAddress('tractor.sock') + + def __repr__(self) -> str: + return f'{type(self)} @ {self._filepath}' + + def __eq__(self, other) -> bool: + if not isinstance(other, UDSAddress): + raise TypeError( + f'Can not compare {type(other)} with {type(self)}' + ) + + return self._filepath == other._filepath + + async def open_stream(self, **kwargs) -> trio.SocketStream: + stream = await trio.open_tcp_stream( + self._filepath, + **kwargs + ) + self._binded = True + return stream + + async def open_listener(self, **kwargs) -> trio.SocketListener: + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(self._filepath) + sock.listen() + self._binded = True + return trio.SocketListener(sock) + + +preferred_transport = 'tcp' + + +_address_types = ( + TCPAddress, + UDSAddress +) + + +_default_addrs: dict[str, Type[Address]] = { + cls.name_key: cls + for cls in _address_types +} + + +AddressTypes = Union[ + tuple([ + cls.address_type + for cls in _address_types + ]) +] + + +_default_lo_addrs: dict[ + str, + AddressTypes +] = { + cls.name_key: cls.get_root().unwrap() + for cls in _address_types +} + + +def get_address_cls(name: str) -> Type[Address]: + return _default_addrs[name] + + +def is_wrapped_addr(addr: any) -> bool: + return type(addr) in _address_types + + +def wrap_address(addr: AddressTypes) -> Address: + + if is_wrapped_addr(addr): + return addr + + cls = None + match addr: + case str(): + cls = UDSAddress + + case tuple() | list(): + cls = TCPAddress + + case None: + cls = get_address_cls(preferred_transport) + addr = cls.get_root().unwrap() + + case _: + raise TypeError( + f'Can not wrap addr {addr} of type {type(addr)}' + ) + + return cls.from_addr(addr) + + +def default_lo_addrs(transports: list[str]) -> list[AddressTypes]: + return [ + _default_lo_addrs[transport] + for transport in transports + ] diff --git a/tractor/_child.py b/tractor/_child.py index 4226ae90..69142889 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -31,8 +31,7 @@ def parse_uid(arg): return str(name), str(uuid) # ensures str encoding def parse_ipaddr(arg): - host, port = literal_eval(arg) - return (str(host), int(port)) + return literal_eval(arg) if __name__ == "__main__": diff --git a/tractor/_context.py b/tractor/_context.py index d93d7759..19f3daef 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -859,19 +859,10 @@ class Context: @property def dst_maddr(self) -> str: chan: Channel = self.chan - dst_addr, dst_port = chan.raddr trans: MsgTransport = chan.transport # cid: str = self.cid # cid_head, cid_tail = cid[:6], cid[-6:] - return ( - f'/ipv4/{dst_addr}' - f'/{trans.name_key}/{dst_port}' - # f'/{self.chan.uid[0]}' - # f'/{self.cid}' - - # f'/cid={cid_head}..{cid_tail}' - # TODO: ? not use this ^ right ? - ) + return trans.maddr dmaddr = dst_maddr diff --git a/tractor/_discovery.py b/tractor/_discovery.py index f6f4b9d9..9258b3de 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -30,6 +30,12 @@ from contextlib import asynccontextmanager as acm from tractor.log import get_logger from .trionics import gather_contexts from .ipc import _connect_chan, Channel +from ._addr import ( + AddressTypes, + Address, + preferred_transport, + wrap_address +) from ._portal import ( Portal, open_portal, @@ -48,11 +54,7 @@ log = get_logger(__name__) @acm -async def get_registry( - host: str, - port: int, - -) -> AsyncGenerator[ +async def get_registry(addr: AddressTypes) -> AsyncGenerator[ Portal | LocalPortal | None, None, ]: @@ -69,13 +71,13 @@ async def get_registry( # (likely a re-entrant call from the arbiter actor) yield LocalPortal( actor, - Channel((host, port)) + await Channel.from_addr(addr) ) else: # TODO: try to look pre-existing connection from # `Actor._peers` and use it instead? async with ( - _connect_chan((host, port)) as chan, + _connect_chan(addr) as chan, open_portal(chan) as regstr_ptl, ): yield regstr_ptl @@ -89,11 +91,10 @@ async def get_root( # TODO: rename mailbox to `_root_maddr` when we finally # add and impl libp2p multi-addrs? - host, port = _runtime_vars['_root_mailbox'] - assert host is not None + addr = _runtime_vars['_root_mailbox'] async with ( - _connect_chan((host, port)) as chan, + _connect_chan(addr) as chan, open_portal(chan, **kwargs) as portal, ): yield portal @@ -134,10 +135,10 @@ def get_peer_by_name( @acm async def query_actor( name: str, - regaddr: tuple[str, int]|None = None, + regaddr: AddressTypes|None = None, ) -> AsyncGenerator[ - tuple[str, int]|None, + AddressTypes|None, None, ]: ''' @@ -163,31 +164,31 @@ async def query_actor( return reg_portal: Portal - regaddr: tuple[str, int] = regaddr or actor.reg_addrs[0] - async with get_registry(*regaddr) as reg_portal: + regaddr: Address = wrap_address(regaddr) or actor.reg_addrs[0] + async with get_registry(regaddr) as reg_portal: # TODO: return portals to all available actors - for now # just the last one that registered - sockaddr: tuple[str, int] = await reg_portal.run_from_ns( + addr: AddressTypes = await reg_portal.run_from_ns( 'self', 'find_actor', name=name, ) - yield sockaddr + yield addr @acm async def maybe_open_portal( - addr: tuple[str, int], + addr: AddressTypes, name: str, ): async with query_actor( name=name, regaddr=addr, - ) as sockaddr: + ) as addr: pass - if sockaddr: - async with _connect_chan(sockaddr) as chan: + if addr: + async with _connect_chan(addr) as chan: async with open_portal(chan) as portal: yield portal else: @@ -197,7 +198,8 @@ async def maybe_open_portal( @acm async def find_actor( name: str, - registry_addrs: list[tuple[str, int]]|None = None, + registry_addrs: list[AddressTypes]|None = None, + enable_transports: list[str] = [preferred_transport], only_first: bool = True, raise_on_none: bool = False, @@ -224,15 +226,15 @@ async def find_actor( # XXX NOTE: make sure to dynamically read the value on # every call since something may change it globally (eg. # like in our discovery test suite)! - from . import _root + from ._addr import default_lo_addrs registry_addrs = ( _runtime_vars['_registry_addrs'] or - _root._default_lo_addrs + default_lo_addrs(enable_transports) ) maybe_portals: list[ - AsyncContextManager[tuple[str, int]] + AsyncContextManager[AddressTypes] ] = list( maybe_open_portal( addr=addr, @@ -274,7 +276,7 @@ async def find_actor( @acm async def wait_for_actor( name: str, - registry_addr: tuple[str, int] | None = None, + registry_addr: AddressTypes | None = None, ) -> AsyncGenerator[Portal, None]: ''' @@ -291,7 +293,7 @@ async def wait_for_actor( yield peer_portal return - regaddr: tuple[str, int] = ( + regaddr: AddressTypes = ( registry_addr or actor.reg_addrs[0] @@ -299,8 +301,8 @@ async def wait_for_actor( # TODO: use `.trionics.gather_contexts()` like # above in `find_actor()` as well? reg_portal: Portal - async with get_registry(*regaddr) as reg_portal: - sockaddrs = await reg_portal.run_from_ns( + async with get_registry(regaddr) as reg_portal: + addrs = await reg_portal.run_from_ns( 'self', 'wait_for_actor', name=name, @@ -308,8 +310,8 @@ async def wait_for_actor( # get latest registered addr by default? # TODO: offer multi-portal yields in multi-homed case? - sockaddr: tuple[str, int] = sockaddrs[-1] + addr: AddressTypes = addrs[-1] - async with _connect_chan(sockaddr) as chan: + async with _connect_chan(addr) as chan: async with open_portal(chan) as portal: yield portal diff --git a/tractor/_entry.py b/tractor/_entry.py index 8156d25f..1328aa45 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -37,6 +37,7 @@ from .log import ( from . import _state from .devx import _debug from .to_asyncio import run_as_asyncio_guest +from ._addr import AddressTypes from ._runtime import ( async_main, Actor, @@ -52,10 +53,10 @@ log = get_logger(__name__) def _mp_main( actor: Actor, - accept_addrs: list[tuple[str, int]], + accept_addrs: list[AddressTypes], forkserver_info: tuple[Any, Any, Any, Any, Any], start_method: SpawnMethodKey, - parent_addr: tuple[str, int] | None = None, + parent_addr: AddressTypes | None = None, infect_asyncio: bool = False, ) -> None: @@ -206,7 +207,7 @@ def nest_from_op( def _trio_main( actor: Actor, *, - parent_addr: tuple[str, int] | None = None, + parent_addr: AddressTypes | None = None, infect_asyncio: bool = False, ) -> None: diff --git a/tractor/_root.py b/tractor/_root.py index 40682a7a..e9cac3f2 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -43,21 +43,18 @@ from .devx import _debug from . import _spawn from . import _state from . import log -from .ipc import _connect_chan +from .ipc import ( + _connect_chan, +) +from ._addr import ( + AddressTypes, + wrap_address, + preferred_transport, + default_lo_addrs +) from ._exceptions import is_multi_cancelled -# set at startup and after forks -_default_host: str = '127.0.0.1' -_default_port: int = 1616 - -# default registry always on localhost -_default_lo_addrs: list[tuple[str, int]] = [( - _default_host, - _default_port, -)] - - logger = log.get_logger('tractor') @@ -66,10 +63,12 @@ async def open_root_actor( *, # defaults are above - registry_addrs: list[tuple[str, int]]|None = None, + registry_addrs: list[AddressTypes]|None = None, # defaults are above - arbiter_addr: tuple[str, int]|None = None, + arbiter_addr: tuple[AddressTypes]|None = None, + + enable_transports: list[str] = [preferred_transport], name: str|None = 'root', @@ -195,11 +194,9 @@ async def open_root_actor( ) registry_addrs = [arbiter_addr] - registry_addrs: list[tuple[str, int]] = ( - registry_addrs - or - _default_lo_addrs - ) + if not registry_addrs: + registry_addrs: list[AddressTypes] = default_lo_addrs(enable_transports) + assert registry_addrs loglevel = ( @@ -248,10 +245,10 @@ async def open_root_actor( enable_stack_on_sig() # closed into below ping task-func - ponged_addrs: list[tuple[str, int]] = [] + ponged_addrs: list[AddressTypes] = [] async def ping_tpt_socket( - addr: tuple[str, int], + addr: AddressTypes, timeout: float = 1, ) -> None: ''' @@ -284,10 +281,10 @@ async def open_root_actor( for addr in registry_addrs: tn.start_soon( ping_tpt_socket, - tuple(addr), # TODO: just drop this requirement? + addr, ) - trans_bind_addrs: list[tuple[str, int]] = [] + trans_bind_addrs: list[AddressTypes] = [] # Create a new local root-actor instance which IS NOT THE # REGISTRAR @@ -311,9 +308,12 @@ async def open_root_actor( ) # DO NOT use the registry_addrs as the transport server # addrs for this new non-registar, root-actor. - for host, port in ponged_addrs: - # NOTE: zero triggers dynamic OS port allocation - trans_bind_addrs.append((host, 0)) + for addr in ponged_addrs: + waddr = wrap_address(addr) + print(waddr) + trans_bind_addrs.append( + waddr.get_random(namespace=waddr.namespace) + ) # Start this local actor as the "registrar", aka a regular # actor who manages the local registry of "mailboxes" of @@ -322,7 +322,7 @@ async def open_root_actor( # NOTE that if the current actor IS THE REGISTAR, the # following init steps are taken: - # - the tranport layer server is bound to each (host, port) + # - the tranport layer server is bound to each addr # pair defined in provided registry_addrs, or the default. trans_bind_addrs = registry_addrs @@ -462,7 +462,7 @@ def run_daemon( # runtime kwargs name: str | None = 'root', - registry_addrs: list[tuple[str, int]] = _default_lo_addrs, + registry_addrs: list[AddressTypes]|None = None, start_method: str | None = None, debug_mode: bool = False, diff --git a/tractor/_runtime.py b/tractor/_runtime.py index eaab31b6..e755d5ce 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -74,6 +74,12 @@ from tractor.msg import ( types as msgtypes, ) from .ipc import Channel +from ._addr import ( + AddressTypes, + Address, + TCPAddress, + wrap_address, +) from ._context import ( mk_context, Context, @@ -179,11 +185,11 @@ class Actor: enable_modules: list[str] = [], uid: str|None = None, loglevel: str|None = None, - registry_addrs: list[tuple[str, int]]|None = None, + registry_addrs: list[AddressTypes]|None = None, spawn_method: str|None = None, # TODO: remove! - arbiter_addr: tuple[str, int]|None = None, + arbiter_addr: AddressTypes|None = None, ) -> None: ''' @@ -223,7 +229,7 @@ class Actor: DeprecationWarning, stacklevel=2, ) - registry_addrs: list[tuple[str, int]] = [arbiter_addr] + registry_addrs: list[AddressTypes] = [arbiter_addr] # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -257,6 +263,7 @@ class Actor: ] = {} self._listeners: list[trio.abc.Listener] = [] + self._listen_addrs: list[Address] = [] self._parent_chan: Channel|None = None self._forkserver_info: tuple|None = None @@ -269,13 +276,13 @@ class Actor: # when provided, init the registry addresses property from # input via the validator. - self._reg_addrs: list[tuple[str, int]] = [] + self._reg_addrs: list[AddressTypes] = [] if registry_addrs: - self.reg_addrs: list[tuple[str, int]] = registry_addrs + self.reg_addrs: list[AddressTypes] = registry_addrs _state._runtime_vars['_registry_addrs'] = registry_addrs @property - def reg_addrs(self) -> list[tuple[str, int]]: + def reg_addrs(self) -> list[AddressTypes]: ''' List of (socket) addresses for all known (and contactable) registry actors. @@ -286,7 +293,7 @@ class Actor: @reg_addrs.setter def reg_addrs( self, - addrs: list[tuple[str, int]], + addrs: list[AddressTypes], ) -> None: if not addrs: log.warning( @@ -295,16 +302,7 @@ class Actor: ) return - # always sanity check the input list since it's critical - # that addrs are correct for discovery sys operation. - for addr in addrs: - if not isinstance(addr, tuple): - raise ValueError( - 'Expected `Actor.reg_addrs: list[tuple[str, int]]`\n' - f'Got {addrs}' - ) - - self._reg_addrs = addrs + self._reg_addrs = addrs async def wait_for_peer( self, @@ -1024,11 +1022,11 @@ class Actor: async def _from_parent( self, - parent_addr: tuple[str, int]|None, + parent_addr: AddressTypes|None, ) -> tuple[ Channel, - list[tuple[str, int]]|None, + list[AddressTypes]|None, ]: ''' Bootstrap this local actor's runtime config from its parent by @@ -1040,13 +1038,13 @@ class Actor: # Connect back to the parent actor and conduct initial # handshake. From this point on if we error, we # attempt to ship the exception back to the parent. - chan = await Channel.from_destaddr(parent_addr) + chan = await Channel.from_addr(wrap_address(parent_addr)) # TODO: move this into a `Channel.handshake()`? # Initial handshake: swap names. await self._do_handshake(chan) - accept_addrs: list[tuple[str, int]]|None = None + accept_addrs: list[AddressTypes]|None = None if self._spawn_method == "trio": @@ -1063,7 +1061,7 @@ class Actor: # if "trace"/"util" mode is enabled? f'{pretty_struct.pformat(spawnspec)}\n' ) - accept_addrs: list[tuple[str, int]] = spawnspec.bind_addrs + accept_addrs: list[AddressTypes] = spawnspec.bind_addrs # TODO: another `Struct` for rtvs.. rvs: dict[str, Any] = spawnspec._runtime_vars @@ -1170,8 +1168,7 @@ class Actor: self, handler_nursery: Nursery, *, - # (host, port) to bind for channel server - listen_sockaddrs: list[tuple[str, int]]|None = None, + listen_addrs: list[AddressTypes]|None = None, task_status: TaskStatus[Nursery] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1183,37 +1180,39 @@ class Actor: `.cancel_server()` is called. ''' - if listen_sockaddrs is None: - listen_sockaddrs = [(None, 0)] + if listen_addrs is None: + listen_addrs = [TCPAddress.get_random()] + + else: + listen_addrs: list[Address] = [ + wrap_address(a) for a in listen_addrs + ] self._server_down = trio.Event() try: async with trio.open_nursery() as server_n: + listeners: list[trio.abc.Listener] = [ + await addr.open_listener() + for addr in listen_addrs + ] + await server_n.start( + partial( + trio.serve_listeners, + handler=self._stream_handler, + listeners=listeners, - for host, port in listen_sockaddrs: - listeners: list[trio.abc.Listener] = await server_n.start( - partial( - trio.serve_tcp, - - handler=self._stream_handler, - port=port, - host=host, - - # NOTE: configured such that new - # connections will stay alive even if - # this server is cancelled! - handler_nursery=handler_nursery, - ) + # NOTE: configured such that new + # connections will stay alive even if + # this server is cancelled! + handler_nursery=handler_nursery ) - sockets: list[trio.socket] = [ - getattr(listener, 'socket', 'unknown socket') - for listener in listeners - ] - log.runtime( - 'Started TCP server(s)\n' - f'|_{sockets}\n' - ) - self._listeners.extend(listeners) + ) + log.runtime( + 'Started server(s)\n' + '\n'.join([f'|_{addr}' for addr in listen_addrs]) + ) + self._listen_addrs.extend(listen_addrs) + self._listeners.extend(listeners) task_status.started(server_n) @@ -1576,26 +1575,21 @@ class Actor: return False @property - def accept_addrs(self) -> list[tuple[str, int]]: + def accept_addrs(self) -> list[AddressTypes]: ''' All addresses to which the transport-channel server binds and listens for new connections. ''' - # throws OSError on failure - return [ - listener.socket.getsockname() - for listener in self._listeners - ] # type: ignore + return [a.unwrap() for a in self._listen_addrs] @property - def accept_addr(self) -> tuple[str, int]: + def accept_addr(self) -> AddressTypes: ''' Primary address to which the IPC transport server is bound and listening for new connections. ''' - # throws OSError on failure return self.accept_addrs[0] def get_parent(self) -> Portal: @@ -1667,7 +1661,7 @@ class Actor: async def async_main( actor: Actor, - accept_addrs: tuple[str, int]|None = None, + accept_addrs: AddressTypes|None = None, # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to @@ -1676,7 +1670,7 @@ async def async_main( # change this to a simple ``is_subactor: bool`` which will # be False when running as root actor and True when as # a subactor. - parent_addr: tuple[str, int]|None = None, + parent_addr: AddressTypes|None = None, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: @@ -1766,7 +1760,7 @@ async def async_main( partial( actor._serve_forever, service_nursery, - listen_sockaddrs=accept_addrs, + listen_addrs=accept_addrs, ) ) except OSError as oserr: @@ -1782,7 +1776,7 @@ async def async_main( raise - accept_addrs: list[tuple[str, int]] = actor.accept_addrs + accept_addrs: list[AddressTypes] = actor.accept_addrs # NOTE: only set the loopback addr for the # process-tree-global "root" mailbox since @@ -1790,9 +1784,8 @@ async def async_main( # their root actor over that channel. if _state._runtime_vars['_is_root']: for addr in accept_addrs: - host, _ = addr - # TODO: generic 'lo' detector predicate - if '127.0.0.1' in host: + waddr = wrap_address(addr) + if waddr == waddr.get_root(): _state._runtime_vars['_root_mailbox'] = addr # Register with the arbiter if we're told its addr @@ -1807,24 +1800,21 @@ async def async_main( # only on unique actor uids? for addr in actor.reg_addrs: try: - assert isinstance(addr, tuple) - assert addr[1] # non-zero after bind + waddr = wrap_address(addr) + assert waddr.is_valid except AssertionError: await _debug.pause() - async with get_registry(*addr) as reg_portal: + async with get_registry(addr) as reg_portal: for accept_addr in accept_addrs: - - if not accept_addr[1]: - await _debug.pause() - - assert accept_addr[1] + accept_addr = wrap_address(accept_addr) + assert accept_addr.is_valid await reg_portal.run_from_ns( 'self', 'register_actor', uid=actor.uid, - sockaddr=accept_addr, + addr=accept_addr.unwrap(), ) is_registered: bool = True @@ -1951,12 +1941,13 @@ async def async_main( ): failed: bool = False for addr in actor.reg_addrs: - assert isinstance(addr, tuple) + waddr = wrap_address(addr) + assert waddr.is_valid with trio.move_on_after(0.5) as cs: cs.shield = True try: async with get_registry( - *addr, + addr, ) as reg_portal: await reg_portal.run_from_ns( 'self', @@ -2034,7 +2025,7 @@ class Arbiter(Actor): self._registry: dict[ tuple[str, str], - tuple[str, int], + AddressTypes, ] = {} self._waiters: dict[ str, @@ -2050,18 +2041,18 @@ class Arbiter(Actor): self, name: str, - ) -> tuple[str, int]|None: + ) -> AddressTypes|None: - for uid, sockaddr in self._registry.items(): + for uid, addr in self._registry.items(): if name in uid: - return sockaddr + return addr return None async def get_registry( self - ) -> dict[str, tuple[str, int]]: + ) -> dict[str, AddressTypes]: ''' Return current name registry. @@ -2081,7 +2072,7 @@ class Arbiter(Actor): self, name: str, - ) -> list[tuple[str, int]]: + ) -> list[AddressTypes]: ''' Wait for a particular actor to register. @@ -2089,44 +2080,41 @@ class Arbiter(Actor): registered. ''' - sockaddrs: list[tuple[str, int]] = [] - sockaddr: tuple[str, int] + addrs: list[AddressTypes] = [] + addr: AddressTypes mailbox_info: str = 'Actor registry contact infos:\n' - for uid, sockaddr in self._registry.items(): + for uid, addr in self._registry.items(): mailbox_info += ( f'|_uid: {uid}\n' - f'|_sockaddr: {sockaddr}\n\n' + f'|_addr: {addr}\n\n' ) if name == uid[0]: - sockaddrs.append(sockaddr) + addrs.append(addr) - if not sockaddrs: + if not addrs: waiter = trio.Event() self._waiters.setdefault(name, []).append(waiter) await waiter.wait() for uid in self._waiters[name]: if not isinstance(uid, trio.Event): - sockaddrs.append(self._registry[uid]) + addrs.append(self._registry[uid]) log.runtime(mailbox_info) - return sockaddrs + return addrs async def register_actor( self, uid: tuple[str, str], - sockaddr: tuple[str, int] - + addr: AddressTypes ) -> None: uid = name, hash = (str(uid[0]), str(uid[1])) - addr = (host, port) = ( - str(sockaddr[0]), - int(sockaddr[1]), - ) - if port == 0: + waddr: Address = wrap_address(addr) + if not waddr.is_valid: + # should never be 0-dynamic-os-alloc await _debug.pause() - assert port # should never be 0-dynamic-os-alloc + self._registry[uid] = addr # pop and signal all waiter events diff --git a/tractor/_spawn.py b/tractor/_spawn.py index dc2429d9..d1eb7f37 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -46,6 +46,7 @@ from tractor._state import ( _runtime_vars, ) from tractor.log import get_logger +from tractor._addr import AddressTypes from tractor._portal import Portal from tractor._runtime import Actor from tractor._entry import _mp_main @@ -392,8 +393,8 @@ async def new_proc( errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addrs: list[tuple[str, int]], - parent_addr: tuple[str, int], + bind_addrs: list[AddressTypes], + parent_addr: AddressTypes, _runtime_vars: dict[str, Any], # serialized and sent to _child *, @@ -431,8 +432,8 @@ async def trio_proc( errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addrs: list[tuple[str, int]], - parent_addr: tuple[str, int], + bind_addrs: list[AddressTypes], + parent_addr: AddressTypes, _runtime_vars: dict[str, Any], # serialized and sent to _child *, infect_asyncio: bool = False, @@ -520,15 +521,15 @@ async def trio_proc( # send a "spawning specification" which configures the # initial runtime state of the child. - await chan.send( - SpawnSpec( - _parent_main_data=subactor._parent_main_data, - enable_modules=subactor.enable_modules, - reg_addrs=subactor.reg_addrs, - bind_addrs=bind_addrs, - _runtime_vars=_runtime_vars, - ) + sspec = SpawnSpec( + _parent_main_data=subactor._parent_main_data, + enable_modules=subactor.enable_modules, + reg_addrs=subactor.reg_addrs, + bind_addrs=bind_addrs, + _runtime_vars=_runtime_vars, ) + log.runtime(f'Sending spawn spec: {str(sspec)}') + await chan.send(sspec) # track subactor in current nursery curr_actor: Actor = current_actor() @@ -638,8 +639,8 @@ async def mp_proc( subactor: Actor, errors: dict[tuple[str, str], Exception], # passed through to actor main - bind_addrs: list[tuple[str, int]], - parent_addr: tuple[str, int], + bind_addrs: list[AddressTypes], + parent_addr: AddressTypes, _runtime_vars: dict[str, Any], # serialized and sent to _child *, infect_asyncio: bool = False, diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 052a5f4c..2a3842f7 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -28,7 +28,13 @@ import warnings import trio + from .devx._debug import maybe_wait_for_debugger +from ._addr import ( + AddressTypes, + preferred_transport, + get_address_cls +) from ._state import current_actor, is_main_process from .log import get_logger, get_loglevel from ._runtime import Actor @@ -47,8 +53,6 @@ if TYPE_CHECKING: log = get_logger(__name__) -_default_bind_addr: tuple[str, int] = ('127.0.0.1', 0) - class ActorNursery: ''' @@ -130,8 +134,9 @@ class ActorNursery: *, - bind_addrs: list[tuple[str, int]] = [_default_bind_addr], + bind_addrs: list[AddressTypes]|None = None, rpc_module_paths: list[str]|None = None, + enable_transports: list[str] = [preferred_transport], enable_modules: list[str]|None = None, loglevel: str|None = None, # set log level per subactor debug_mode: bool|None = None, @@ -156,6 +161,12 @@ class ActorNursery: or get_loglevel() ) + if not bind_addrs: + bind_addrs: list[AddressTypes] = [ + get_address_cls(transport).get_random().unwrap() + for transport in enable_transports + ] + # configure and pass runtime state _rtv = _state._runtime_vars.copy() _rtv['_is_root'] = False @@ -224,7 +235,7 @@ class ActorNursery: *, name: str | None = None, - bind_addrs: tuple[str, int] = [_default_bind_addr], + bind_addrs: AddressTypes|None = None, rpc_module_paths: list[str] | None = None, enable_modules: list[str] | None = None, loglevel: str | None = None, # set log level per subactor diff --git a/tractor/ipc/__init__.py b/tractor/ipc/__init__.py index dd4d9e5a..9bd2240f 100644 --- a/tractor/ipc/__init__.py +++ b/tractor/ipc/__init__.py @@ -17,7 +17,6 @@ import platform from ._transport import ( MsgTransportKey as MsgTransportKey, - AddressType as AddressType, MsgType as MsgType, MsgTransport as MsgTransport, MsgpackTransport as MsgpackTransport @@ -27,10 +26,8 @@ from ._tcp import MsgpackTCPStream as MsgpackTCPStream from ._uds import MsgpackUDSStream as MsgpackUDSStream from ._types import ( - default_lo_addrs as default_lo_addrs, - transport_from_destaddr as transport_from_destaddr, + transport_from_addr as transport_from_addr, transport_from_stream as transport_from_stream, - AddressTypes as AddressTypes ) from ._chan import ( diff --git a/tractor/ipc/_chan.py b/tractor/ipc/_chan.py index ee259371..93f17132 100644 --- a/tractor/ipc/_chan.py +++ b/tractor/ipc/_chan.py @@ -35,8 +35,12 @@ import trio from tractor.ipc._transport import MsgTransport from tractor.ipc._types import ( - transport_from_destaddr, + transport_from_addr, transport_from_stream, +) +from tractor._addr import ( + wrap_address, + Address, AddressTypes ) from tractor.log import get_logger @@ -66,7 +70,6 @@ class Channel: def __init__( self, - destaddr: AddressTypes|None = None, transport: MsgTransport|None = None, # TODO: optional reconnection support? # auto_reconnect: bool = False, @@ -81,8 +84,6 @@ class Channel: # user in ``.from_stream()``. self._transport: MsgTransport|None = transport - self._destaddr = destaddr if destaddr else self._transport.raddr - # set after handshake - always uid of far end self.uid: tuple[str, str]|None = None @@ -121,13 +122,14 @@ class Channel: ) @classmethod - async def from_destaddr( + async def from_addr( cls, - destaddr: AddressTypes, + addr: AddressTypes, **kwargs ) -> Channel: - transport_cls = transport_from_destaddr(destaddr) - transport = await transport_cls.connect_to(destaddr, **kwargs) + addr: Address = wrap_address(addr) + transport_cls = transport_from_addr(addr) + transport = await transport_cls.connect_to(addr, **kwargs) log.transport( f'Opened channel[{type(transport)}]: {transport.laddr} -> {transport.raddr}' @@ -164,11 +166,11 @@ class Channel: ) @property - def laddr(self) -> tuple[str, int]|None: + def laddr(self) -> Address|None: return self._transport.laddr if self._transport else None @property - def raddr(self) -> tuple[str, int]|None: + def raddr(self) -> Address|None: return self._transport.raddr if self._transport else None # TODO: something like, @@ -205,7 +207,11 @@ class Channel: # assert err __tracebackhide__: bool = False else: - assert err.cid + try: + assert err.cid + + except KeyError: + raise err raise @@ -332,14 +338,14 @@ class Channel: @acm async def _connect_chan( - destaddr: AddressTypes + addr: AddressTypes ) -> typing.AsyncGenerator[Channel, None]: ''' Create and connect a channel with disconnect on context manager teardown. ''' - chan = await Channel.from_destaddr(destaddr) + chan = await Channel.from_addr(addr) yield chan with trio.CancelScope(shield=True): await chan.aclose() diff --git a/tractor/ipc/_ringbuf.py b/tractor/ipc/_ringbuf.py index 10975b7a..6a71a00a 100644 --- a/tractor/ipc/_ringbuf.py +++ b/tractor/ipc/_ringbuf.py @@ -183,6 +183,9 @@ class RingBuffSender(trio.abc.SendStream): def wrap_fd(self) -> int: return self._wrap_event.fd + async def _wait_wrap(self): + await self._wrap_event.read() + async def send_all(self, data: Buffer): async with self._send_lock: # while data is larger than the remaining buf @@ -193,7 +196,7 @@ class RingBuffSender(trio.abc.SendStream): self._shm.buf[self.ptr:] = data[:remaining] # signal write and wait for reader wrap around self._write_event.write(remaining) - await self._wrap_event.read() + await self._wait_wrap() # wrap around and trim already written bytes self._ptr = 0 diff --git a/tractor/ipc/_tcp.py b/tractor/ipc/_tcp.py index 4a69ebbd..a8008519 100644 --- a/tractor/ipc/_tcp.py +++ b/tractor/ipc/_tcp.py @@ -23,6 +23,7 @@ import trio from tractor.msg import MsgCodec from tractor.log import get_logger +from tractor._addr import TCPAddress from tractor.ipc._transport import MsgpackTransport @@ -38,9 +39,8 @@ class MsgpackTCPStream(MsgpackTransport): using the ``msgspec`` codec lib. ''' - address_type = tuple[str, int] + address_type = TCPAddress layer_key: int = 4 - name_key: str = 'tcp' # def __init__( # self, @@ -55,19 +55,32 @@ class MsgpackTCPStream(MsgpackTransport): # codec=codec # ) + @property + def maddr(self) -> str: + host, port = self.raddr.unwrap() + return ( + f'/ipv4/{host}' + f'/{self.address_type.name_key}/{port}' + # f'/{self.chan.uid[0]}' + # f'/{self.cid}' + + # f'/cid={cid_head}..{cid_tail}' + # TODO: ? not use this ^ right ? + ) + def connected(self) -> bool: return self.stream.socket.fileno() != -1 @classmethod async def connect_to( cls, - destaddr: tuple[str, int], + destaddr: TCPAddress, prefix_size: int = 4, codec: MsgCodec|None = None, **kwargs ) -> MsgpackTCPStream: stream = await trio.open_tcp_stream( - *destaddr, + *destaddr.unwrap(), **kwargs ) return MsgpackTCPStream( @@ -87,14 +100,6 @@ class MsgpackTCPStream(MsgpackTransport): lsockname = stream.socket.getsockname() rsockname = stream.socket.getpeername() return ( - tuple(lsockname[:2]), - tuple(rsockname[:2]), + TCPAddress.from_addr(tuple(lsockname[:2])), + TCPAddress.from_addr(tuple(rsockname[:2])), ) - - @classmethod - def get_random_addr(self) -> tuple[str, int]: - return (None, 0) - - @classmethod - def get_root_addr(self) -> tuple[str, int]: - return ('127.0.0.1', 1616) diff --git a/tractor/ipc/_transport.py b/tractor/ipc/_transport.py index 55a218f7..8c35ffee 100644 --- a/tractor/ipc/_transport.py +++ b/tractor/ipc/_transport.py @@ -50,6 +50,7 @@ from tractor.msg import ( types as msgtypes, pretty_struct, ) +from tractor._addr import Address log = get_logger(__name__) @@ -62,12 +63,11 @@ MsgTransportKey = tuple[str, str] # ?TODO? this should be our `Union[*msgtypes.__spec__]` alias now right..? # => BLEH, except can't bc prots must inherit typevar or param-spec # vars.. -AddressType = TypeVar('AddressType') MsgType = TypeVar('MsgType') @runtime_checkable -class MsgTransport(Protocol[AddressType, MsgType]): +class MsgTransport(Protocol[MsgType]): # # ^-TODO-^ consider using a generic def and indexing with our # eventual msg definition/types? @@ -75,10 +75,9 @@ class MsgTransport(Protocol[AddressType, MsgType]): stream: trio.abc.Stream drained: list[MsgType] - address_type: ClassVar[Type[AddressType]] + address_type: ClassVar[Type[Address]] codec_key: ClassVar[str] - name_key: ClassVar[str] # XXX: should this instead be called `.sendall()`? async def send(self, msg: MsgType) -> None: @@ -100,20 +99,24 @@ class MsgTransport(Protocol[AddressType, MsgType]): @classmethod def key(cls) -> MsgTransportKey: - return cls.codec_key, cls.name_key + return cls.codec_key, cls.address_type.name_key @property - def laddr(self) -> AddressType: + def laddr(self) -> Address: ... @property - def raddr(self) -> AddressType: + def raddr(self) -> Address: + ... + + @property + def maddr(self) -> str: ... @classmethod async def connect_to( cls, - destaddr: AddressType, + addr: Address, **kwargs ) -> MsgTransport: ... @@ -123,8 +126,8 @@ class MsgTransport(Protocol[AddressType, MsgType]): cls, stream: trio.abc.Stream ) -> tuple[ - AddressType, # local - AddressType # remote + Address, # local + Address # remote ]: ''' Return the `trio` streaming transport prot's addrs for both @@ -133,14 +136,6 @@ class MsgTransport(Protocol[AddressType, MsgType]): ''' ... - @classmethod - def get_random_addr(self) -> AddressType: - ... - - @classmethod - def get_root_addr(self) -> AddressType: - ... - class MsgpackTransport(MsgTransport): @@ -447,9 +442,9 @@ class MsgpackTransport(MsgTransport): return self._aiter_pkts @property - def laddr(self) -> AddressType: + def laddr(self) -> Address: return self._laddr @property - def raddr(self) -> AddressType: + def raddr(self) -> Address: return self._raddr diff --git a/tractor/ipc/_types.py b/tractor/ipc/_types.py index 92d3af91..3e0e43e5 100644 --- a/tractor/ipc/_types.py +++ b/tractor/ipc/_types.py @@ -13,49 +13,42 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Type, Union +from typing import Type import trio import socket -from ._transport import ( +from tractor._addr import Address +from tractor.ipc._transport import ( MsgTransportKey, MsgTransport ) -from ._tcp import MsgpackTCPStream -from ._uds import MsgpackUDSStream +from tractor.ipc._tcp import MsgpackTCPStream +from tractor.ipc._uds import MsgpackUDSStream +# manually updated list of all supported msg transport types _msg_transports = [ MsgpackTCPStream, MsgpackUDSStream ] -# manually updated list of all supported codec+transport types -key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = { +# convert a MsgTransportKey to the corresponding transport type +_key_to_transport: dict[MsgTransportKey, Type[MsgTransport]] = { cls.key(): cls for cls in _msg_transports } - -# all different address py types we use -AddressTypes = Union[ - tuple([ - cls.address_type - for cls in _msg_transports - ]) -] - - -default_lo_addrs: dict[MsgTransportKey, AddressTypes] = { - cls.key(): cls.get_root_addr() +# convert an Address wrapper to its corresponding transport type +_addr_to_transport: dict[Type[Address], Type[MsgTransport]] = { + cls.address_type: cls for cls in _msg_transports } -def transport_from_destaddr( - destaddr: AddressTypes, +def transport_from_addr( + addr: Address, codec_key: str = 'msgpack', ) -> Type[MsgTransport]: ''' @@ -63,23 +56,13 @@ def transport_from_destaddr( corresponding `MsgTransport` type. ''' - match destaddr: - case str(): - return MsgpackUDSStream + try: + return _addr_to_transport[type(addr)] - case tuple(): - if ( - len(destaddr) == 2 - and - isinstance(destaddr[0], str) - and - isinstance(destaddr[1], int) - ): - return MsgpackTCPStream - - raise NotImplementedError( - f'No known transport for address {destaddr}' - ) + except KeyError: + raise NotImplementedError( + f'No known transport for address {repr(addr)}' + ) def transport_from_stream( @@ -113,4 +96,4 @@ def transport_from_stream( key = (codec_key, transport) - return _msg_transports[key] + return _key_to_transport[key] diff --git a/tractor/ipc/_uds.py b/tractor/ipc/_uds.py index eb2e7f32..ee147d42 100644 --- a/tractor/ipc/_uds.py +++ b/tractor/ipc/_uds.py @@ -18,13 +18,12 @@ Unix Domain Socket implementation of tractor.ipc._transport.MsgTransport protoco ''' from __future__ import annotations -import tempfile -from uuid import uuid4 import trio from tractor.msg import MsgCodec from tractor.log import get_logger +from tractor._addr import UDSAddress from tractor.ipc._transport import MsgpackTransport @@ -37,9 +36,8 @@ class MsgpackUDSStream(MsgpackTransport): using the ``msgspec`` codec lib. ''' - address_type = str + address_type = UDSAddress layer_key: int = 7 - name_key: str = 'uds' # def __init__( # self, @@ -54,19 +52,32 @@ class MsgpackUDSStream(MsgpackTransport): # codec=codec # ) + @property + def maddr(self) -> str: + filepath = self.raddr.unwrap() + return ( + f'/ipv4/localhost' + f'/{self.address_type.name_key}/{filepath}' + # f'/{self.chan.uid[0]}' + # f'/{self.cid}' + + # f'/cid={cid_head}..{cid_tail}' + # TODO: ? not use this ^ right ? + ) + def connected(self) -> bool: return self.stream.socket.fileno() != -1 @classmethod async def connect_to( cls, - filename: str, + addr: UDSAddress, prefix_size: int = 4, codec: MsgCodec|None = None, **kwargs ) -> MsgpackUDSStream: stream = await trio.open_unix_socket( - filename, + addr.unwrap(), **kwargs ) return MsgpackUDSStream( @@ -79,16 +90,8 @@ class MsgpackUDSStream(MsgpackTransport): def get_stream_addrs( cls, stream: trio.SocketStream - ) -> tuple[str, str]: + ) -> tuple[UDSAddress, UDSAddress]: return ( - stream.socket.getsockname(), - stream.socket.getpeername(), + UDSAddress.from_addr(stream.socket.getsockname()), + UDSAddress.from_addr(stream.socket.getsockname()), ) - - @classmethod - def get_random_addr(self) -> str: - return f'{tempfile.gettempdir()}/{uuid4()}.sock' - - @classmethod - def get_root_addr(self) -> str: - return 'tractor.sock' diff --git a/tractor/msg/types.py b/tractor/msg/types.py index 76d0bad6..3e58ae3a 100644 --- a/tractor/msg/types.py +++ b/tractor/msg/types.py @@ -46,8 +46,8 @@ from msgspec import ( from tractor.msg import ( pretty_struct, ) -from tractor.ipc import AddressTypes from tractor.log import get_logger +from tractor._addr import AddressTypes log = get_logger('tractor.msgspec') -- 2.34.1 From efd11f7d74deb0bad5f7d91fbcfe7fc08f721888 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sun, 23 Mar 2025 02:18:01 -0300 Subject: [PATCH 4/4] Trying to make full suite pass with uds --- default.nix | 1 + examples/service_discovery.py | 2 +- tests/test_docs_examples.py | 15 +++++++++++---- tractor/_addr.py | 29 +++++++++++++++++++---------- tractor/_child.py | 7 ++++++- tractor/_discovery.py | 2 +- tractor/_runtime.py | 7 +++++-- 7 files changed, 44 insertions(+), 19 deletions(-) diff --git a/default.nix b/default.nix index 31615def..1f5559cd 100644 --- a/default.nix +++ b/default.nix @@ -10,6 +10,7 @@ pkgs.mkShell { inherit nativeBuildInputs; LD_LIBRARY_PATH = pkgs.lib.makeLibraryPath nativeBuildInputs; + TMPDIR = "/tmp"; shellHook = '' set -e diff --git a/examples/service_discovery.py b/examples/service_discovery.py index 64697e5b..1219f0c1 100644 --- a/examples/service_discovery.py +++ b/examples/service_discovery.py @@ -9,7 +9,7 @@ async def main(service_name): async with tractor.open_nursery() as an: await an.start_actor(service_name) - async with tractor.get_registry(('127.0.0.1', 1616)) as portal: + async with tractor.get_registry() as portal: print(f"Arbiter is listening on {portal.channel}") async with tractor.wait_for_actor(service_name) as sockaddr: diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index cc4904f8..6250e0aa 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -66,6 +66,9 @@ def run_example_in_subproc( # due to backpressure!!! proc = testdir.popen( cmdargs, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, **kwargs, ) assert not proc.returncode @@ -119,10 +122,14 @@ def test_example( code = ex.read() with run_example_in_subproc(code) as proc: - proc.wait() - err, _ = proc.stderr.read(), proc.stdout.read() - # print(f'STDERR: {err}') - # print(f'STDOUT: {out}') + err = None + try: + if not proc.poll(): + _, err = proc.communicate(timeout=15) + + except subprocess.TimeoutExpired as e: + proc.kill() + err = e.stderr # if we get some gnarly output let's aggregate and raise if err: diff --git a/tractor/_addr.py b/tractor/_addr.py index 0944c89d..f59ad542 100644 --- a/tractor/_addr.py +++ b/tractor/_addr.py @@ -14,6 +14,7 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from __future__ import annotations +import os import tempfile from uuid import uuid4 from typing import ( @@ -79,6 +80,9 @@ class Address(Protocol[ async def open_listener(self, **kwargs) -> ListenerType: ... + async def close_listener(self): + ... + class TCPAddress(Address[ str, @@ -162,6 +166,9 @@ class TCPAddress(Address[ self._host, self._port = listener.socket.getsockname()[:2] return listener + async def close_listener(self): + ... + class UDSAddress(Address[ None, @@ -195,8 +202,8 @@ class UDSAddress(Address[ return self._filepath @classmethod - def get_random(cls, _ns: None = None) -> UDSAddress: - return UDSAddress(f'{tempfile.gettempdir()}/{uuid4().sock}') + def get_random(cls, namespace: None = None) -> UDSAddress: + return UDSAddress(f'{tempfile.gettempdir()}/{uuid4()}.sock') @classmethod def get_root(cls) -> Address: @@ -214,22 +221,24 @@ class UDSAddress(Address[ return self._filepath == other._filepath async def open_stream(self, **kwargs) -> trio.SocketStream: - stream = await trio.open_tcp_stream( + stream = await trio.open_unix_socket( self._filepath, **kwargs ) - self._binded = True return stream async def open_listener(self, **kwargs) -> trio.SocketListener: - sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.bind(self._filepath) - sock.listen() - self._binded = True - return trio.SocketListener(sock) + self._sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + await self._sock.bind(self._filepath) + self._sock.listen(1) + return trio.SocketListener(self._sock) + + async def close_listener(self): + self._sock.close() + os.unlink(self._filepath) -preferred_transport = 'tcp' +preferred_transport = 'uds' _address_types = ( diff --git a/tractor/_child.py b/tractor/_child.py index 69142889..4666e1fa 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -31,7 +31,12 @@ def parse_uid(arg): return str(name), str(uuid) # ensures str encoding def parse_ipaddr(arg): - return literal_eval(arg) + try: + return literal_eval(arg) + + except (ValueError, SyntaxError): + # UDS: try to interpret as a straight up str + return arg if __name__ == "__main__": diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 9258b3de..8973342f 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -54,7 +54,7 @@ log = get_logger(__name__) @acm -async def get_registry(addr: AddressTypes) -> AsyncGenerator[ +async def get_registry(addr: AddressTypes | None = None) -> AsyncGenerator[ Portal | LocalPortal | None, None, ]: diff --git a/tractor/_runtime.py b/tractor/_runtime.py index e755d5ce..cb46e953 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -77,8 +77,9 @@ from .ipc import Channel from ._addr import ( AddressTypes, Address, - TCPAddress, wrap_address, + preferred_transport, + default_lo_addrs ) from ._context import ( mk_context, @@ -1181,7 +1182,7 @@ class Actor: ''' if listen_addrs is None: - listen_addrs = [TCPAddress.get_random()] + listen_addrs = default_lo_addrs([preferred_transport]) else: listen_addrs: list[Address] = [ @@ -1217,6 +1218,8 @@ class Actor: task_status.started(server_n) finally: + for addr in listen_addrs: + await addr.close_listener() # signal the server is down since nursery above terminated self._server_down.set() -- 2.34.1