diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ea9d2ba..fcc5260 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -76,35 +76,6 @@ jobs: - name: Run tests run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs - testing-linux-msgspec: - # runs jobs on all OS's but with optional `msgspec` dep installed - name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }} - msgspec' - timeout-minutes: 10 - runs-on: ${{ matrix.os }} - - strategy: - fail-fast: false - matrix: - os: [ubuntu-latest] - python: ['3.9', '3.10'] - spawn_backend: ['trio', 'mp'] - - steps: - - - name: Checkout - uses: actions/checkout@v2 - - - name: Setup python - uses: actions/setup-python@v2 - with: - python-version: '${{ matrix.python }}' - - - name: Install dependencies - run: pip install -U .[msgspec] -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager - - - name: Run tests - run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs - # We skip 3.10 on windows for now due to # https://github.com/pytest-dev/pytest/issues/8733 # some kinda weird `pyreadline` issue.. diff --git a/docs/README.rst b/docs/README.rst index 527f85a..ffe232b 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -24,7 +24,7 @@ Features - Builtin IPC streaming APIs with task fan-out broadcasting - A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_ - Support for a swappable, OS specific, process spawning layer -- A modular transport stack, allowing for custom serialization (eg. +- A modular transport stack, allowing for custom serialization (eg. with `msgspec`_), communications protocols, and environment specific IPC primitives - Support for spawning process-level-SC, inter-loop one-to-one-task oriented @@ -489,12 +489,6 @@ From PyPi:: pip install tractor -To try out the (optionally) faster `msgspec`_ codec instead of the -default ``msgpack`` lib:: - - pip install tractor[msgspec] - - From git:: pip install git+git://github.com/goodboy/tractor.git @@ -563,11 +557,15 @@ properties of the system. What's on the TODO: ------------------- -Help us push toward the future. +Help us push toward the future of distributed `Python`. -- Typed messaging protocols (ex. via ``msgspec``, see `#36 +- Erlang-style supervisors via composed context managers (see `#22 + `_) +- Typed messaging protocols (ex. via ``msgspec.Struct``, see `#36 `_) -- Erlang-style supervisors via composed context managers +- Typed capability-based (dialog) protocols ( see `#196 + `_ with draft work + started in `#311 `_) Feel like saying hi? diff --git a/nooz/317.misc.rst b/nooz/317.misc.rst new file mode 100644 index 0000000..724eb93 --- /dev/null +++ b/nooz/317.misc.rst @@ -0,0 +1,8 @@ +Drop use of the ``msgpack`` package and instead move fully to the +``msgspec`` codec library. + +We've now used ``msgspec`` extensively in production and there's no +reason to not use it as default. Further this change preps us for the up +and coming typed messaging semantics (#196), dialog-unprotocol system +(#297), and caps-based messaging-protocols (#299) planned before our +first beta. diff --git a/setup.py b/setup.py index 6f45356..39732c9 100755 --- a/setup.py +++ b/setup.py @@ -51,9 +51,6 @@ setup( 'tricycle', 'trio_typing', - # serialization - 'msgpack>=1.0.3', - # tooling 'colorlog', 'wrapt', @@ -63,21 +60,19 @@ setup( # https://github.com/pdbpp/fancycompleter/issues/37 'pyreadline3 ; platform_system == "Windows"', - ], - extras_require={ - # serialization - 'msgspec': ['msgspec >= "0.4.0"'], + 'msgspec >= "0.4.0"' - }, + ], tests_require=['pytest'], python_requires=">=3.9", keywords=[ 'trio', - "async", - "concurrency", - "actor model", - "distributed", + 'async', + 'concurrency', + 'structured concurrency', + 'actor model', + 'distributed', 'multiprocessing' ], classifiers=[ @@ -88,7 +83,7 @@ setup( "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.9", "Intended Audience :: Science/Research", "Intended Audience :: Developers", diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 4d62f1d..0ed22bb 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -22,14 +22,21 @@ from __future__ import annotations import platform import struct import typing -from collections.abc import AsyncGenerator, AsyncIterator +from collections.abc import ( + AsyncGenerator, + AsyncIterator, +) from typing import ( - Any, Tuple, Optional, - Type, Protocol, TypeVar, + Any, + runtime_checkable, + Optional, + Protocol, + Type, + TypeVar, ) from tricycle import BufferedReceiveStream -import msgpack +import msgspec import trio from async_generator import asynccontextmanager @@ -42,7 +49,7 @@ _is_windows = platform.system() == 'Windows' log = get_logger(__name__) -def get_stream_addrs(stream: trio.SocketStream) -> Tuple: +def get_stream_addrs(stream: trio.SocketStream) -> tuple: # should both be IP sockets lsockname = stream.socket.getsockname() rsockname = stream.socket.getpeername() @@ -60,6 +67,7 @@ MsgType = TypeVar("MsgType") # - https://jcristharif.com/msgspec/usage.html#structs +@runtime_checkable class MsgTransport(Protocol[MsgType]): stream: trio.SocketStream @@ -87,23 +95,27 @@ class MsgTransport(Protocol[MsgType]): ... @property - def laddr(self) -> Tuple[str, int]: + def laddr(self) -> tuple[str, int]: ... @property - def raddr(self) -> Tuple[str, int]: + def raddr(self) -> tuple[str, int]: ... -class MsgpackTCPStream: +# TODO: not sure why we have to inherit here, but it seems to be an +# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; +# probably should make a `mypy` issue? +class MsgpackTCPStream(MsgTransport): ''' A ``trio.SocketStream`` delivering ``msgpack`` formatted data - using ``msgpack-python``. + using the ``msgspec`` codec lib. ''' def __init__( self, stream: trio.SocketStream, + prefix_size: int = 4, ) -> None: @@ -120,105 +132,6 @@ class MsgpackTCPStream: # public i guess? self.drained: list[dict] = [] - async def _iter_packets(self) -> AsyncGenerator[dict, None]: - ''' - Yield packets from the underlying stream. - - ''' - unpacker = msgpack.Unpacker( - raw=False, - ) - while True: - try: - data = await self.stream.receive_some(2**10) - - except trio.BrokenResourceError as err: - msg = err.args[0] - - # XXX: handle connection-reset-by-peer the same as a EOF. - # we're currently remapping this since we allow - # a quick connect then drop for root actors when - # checking to see if there exists an "arbiter" - # on the chosen sockaddr (``_root.py:108`` or thereabouts) - if ( - # nix - '[Errno 104]' in msg or - - # on windows it seems there are a variety of errors - # to handle.. - _is_windows - ): - raise TransportClosed( - f'{self} was broken with {msg}' - ) - - else: - raise - - log.transport(f"received {data}") # type: ignore - - if data == b'': - raise TransportClosed( - f'transport {self} was already closed prior to read' - ) - - unpacker.feed(data) - for packet in unpacker: - yield packet - - @property - def laddr(self) -> Tuple[Any, ...]: - return self._laddr - - @property - def raddr(self) -> Tuple[Any, ...]: - return self._raddr - - async def send(self, msg: Any) -> None: - async with self._send_lock: - return await self.stream.send_all( - msgpack.dumps(msg, use_bin_type=True) - ) - - async def recv(self) -> Any: - return await self._agen.asend(None) - - async def drain(self) -> AsyncIterator[dict]: - ''' - Drain the stream's remaining messages sent from - the far end until the connection is closed by - the peer. - - ''' - try: - async for msg in self._iter_packets(): - self.drained.append(msg) - except TransportClosed: - for msg in self.drained: - yield msg - - def __aiter__(self): - return self._agen - - def connected(self) -> bool: - return self.stream.socket.fileno() != -1 - - -class MsgspecTCPStream(MsgpackTCPStream): - ''' - A ``trio.SocketStream`` delivering ``msgpack`` formatted data - using ``msgspec``. - - ''' - def __init__( - self, - stream: trio.SocketStream, - prefix_size: int = 4, - - ) -> None: - import msgspec - - super().__init__(stream) self.recv_stream = BufferedReceiveStream(transport_stream=stream) self.prefix_size = prefix_size @@ -231,7 +144,7 @@ class MsgspecTCPStream(MsgpackTCPStream): ''' import msgspec # noqa - last_decode_failed: bool = False + decodes_failed: int = 0 while True: try: @@ -239,6 +152,7 @@ class MsgspecTCPStream(MsgpackTCPStream): except ( ValueError, + ConnectionResetError, # not sure entirely why we need this but without it we # seem to be getting racy failures here on @@ -267,12 +181,21 @@ class MsgspecTCPStream(MsgpackTCPStream): msgspec.DecodeError, UnicodeDecodeError, ): - if not last_decode_failed: + if decodes_failed < 4: # ignore decoding errors for now and assume they have to # do with a channel drop - hope that receiving from the # channel will raise an expected error and bubble up. - log.error('`msgspec` failed to decode!?') - last_decode_failed = True + try: + msg_str: str | bytes = msg_bytes.decode() + except UnicodeDecodeError: + msg_str = msg_bytes + + log.error( + '`msgspec` failed to decode!?\n' + 'dumping bytes:\n' + f'{msg_str!r}' + ) + decodes_failed += 1 else: raise @@ -287,16 +210,46 @@ class MsgspecTCPStream(MsgpackTCPStream): return await self.stream.send_all(size + bytes_data) + @property + def laddr(self) -> tuple[str, int]: + return self._laddr + + @property + def raddr(self) -> tuple[str, int]: + return self._raddr + + async def recv(self) -> Any: + return await self._agen.asend(None) + + async def drain(self) -> AsyncIterator[dict]: + ''' + Drain the stream's remaining messages sent from + the far end until the connection is closed by + the peer. + + ''' + try: + async for msg in self._iter_packets(): + self.drained.append(msg) + except TransportClosed: + for msg in self.drained: + yield msg + + def __aiter__(self): + return self._agen + + def connected(self) -> bool: + return self.stream.socket.fileno() != -1 + def get_msg_transport( - key: Tuple[str, str], + key: tuple[str, str], ) -> Type[MsgTransport]: return { ('msgpack', 'tcp'): MsgpackTCPStream, - ('msgspec', 'tcp'): MsgspecTCPStream, }[key] @@ -305,16 +258,18 @@ class Channel: An inter-process channel for communication between (remote) actors. Wraps a ``MsgStream``: transport + encoding IPC connection. + Currently we only support ``trio.SocketStream`` for transport - (aka TCP). + (aka TCP) and the ``msgpack`` interchange format via the ``msgspec`` + codec libary. ''' def __init__( self, - destaddr: Optional[Tuple[str, int]], + destaddr: Optional[tuple[str, int]], - msg_transport_type_key: Tuple[str, str] = ('msgpack', 'tcp'), + msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'), # TODO: optional reconnection support? # auto_reconnect: bool = False, @@ -325,14 +280,6 @@ class Channel: # self._recon_seq = on_reconnect # self._autorecon = auto_reconnect - # TODO: maybe expose this through the nursery api? - try: - # if installed load the msgspec transport since it's faster - import msgspec # noqa - msg_transport_type_key = ('msgspec', 'tcp') - except ImportError: - pass - self._destaddr = destaddr self._transport_key = msg_transport_type_key @@ -342,7 +289,7 @@ class Channel: self.msgstream: Optional[MsgTransport] = None # set after handshake - always uid of far end - self.uid: Optional[Tuple[str, str]] = None + self.uid: Optional[tuple[str, str]] = None self._agen = self._aiter_recv() self._exc: Optional[Exception] = None # set if far end actor errors @@ -370,7 +317,7 @@ class Channel: def set_msg_transport( self, stream: trio.SocketStream, - type_key: Optional[Tuple[str, str]] = None, + type_key: Optional[tuple[str, str]] = None, ) -> MsgTransport: type_key = type_key or self._transport_key @@ -385,16 +332,16 @@ class Channel: return object.__repr__(self) @property - def laddr(self) -> Optional[Tuple[str, int]]: + def laddr(self) -> Optional[tuple[str, int]]: return self.msgstream.laddr if self.msgstream else None @property - def raddr(self) -> Optional[Tuple[str, int]]: + def raddr(self) -> Optional[tuple[str, int]]: return self.msgstream.raddr if self.msgstream else None async def connect( self, - destaddr: Tuple[Any, ...] = None, + destaddr: tuple[Any, ...] = None, **kwargs ) -> MsgTransport: