From f6af5c7bf82412b6c25bf7c91d7354929e5e94d5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Jul 2022 20:22:33 -0400 Subject: [PATCH 01/11] Drop `msgpack` dep, ensure `msgspec` as hard dep --- setup.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/setup.py b/setup.py index 6f45356..39732c9 100755 --- a/setup.py +++ b/setup.py @@ -51,9 +51,6 @@ setup( 'tricycle', 'trio_typing', - # serialization - 'msgpack>=1.0.3', - # tooling 'colorlog', 'wrapt', @@ -63,21 +60,19 @@ setup( # https://github.com/pdbpp/fancycompleter/issues/37 'pyreadline3 ; platform_system == "Windows"', - ], - extras_require={ - # serialization - 'msgspec': ['msgspec >= "0.4.0"'], + 'msgspec >= "0.4.0"' - }, + ], tests_require=['pytest'], python_requires=">=3.9", keywords=[ 'trio', - "async", - "concurrency", - "actor model", - "distributed", + 'async', + 'concurrency', + 'structured concurrency', + 'actor model', + 'distributed', 'multiprocessing' ], classifiers=[ @@ -88,7 +83,7 @@ setup( "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", "Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: 3 :: Only", - "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.9", "Intended Audience :: Science/Research", "Intended Audience :: Developers", From f94b7cd991a3313c7b13bba95e12c4f503a33dd0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Jul 2022 20:34:10 -0400 Subject: [PATCH 02/11] Drop `msgpack` lib and use `msgspec` for transport --- tractor/_ipc.py | 144 ++++++++++++------------------------------------ 1 file changed, 34 insertions(+), 110 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 4d62f1d..09e6301 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -29,7 +29,7 @@ from typing import ( ) from tricycle import BufferedReceiveStream -import msgpack +import msgspec import trio from async_generator import asynccontextmanager @@ -98,12 +98,13 @@ class MsgTransport(Protocol[MsgType]): class MsgpackTCPStream: ''' A ``trio.SocketStream`` delivering ``msgpack`` formatted data - using ``msgpack-python``. + using the ``msgspec`` codec lib. ''' def __init__( self, stream: trio.SocketStream, + prefix_size: int = 4, ) -> None: @@ -120,105 +121,6 @@ class MsgpackTCPStream: # public i guess? self.drained: list[dict] = [] - async def _iter_packets(self) -> AsyncGenerator[dict, None]: - ''' - Yield packets from the underlying stream. - - ''' - unpacker = msgpack.Unpacker( - raw=False, - ) - while True: - try: - data = await self.stream.receive_some(2**10) - - except trio.BrokenResourceError as err: - msg = err.args[0] - - # XXX: handle connection-reset-by-peer the same as a EOF. - # we're currently remapping this since we allow - # a quick connect then drop for root actors when - # checking to see if there exists an "arbiter" - # on the chosen sockaddr (``_root.py:108`` or thereabouts) - if ( - # nix - '[Errno 104]' in msg or - - # on windows it seems there are a variety of errors - # to handle.. - _is_windows - ): - raise TransportClosed( - f'{self} was broken with {msg}' - ) - - else: - raise - - log.transport(f"received {data}") # type: ignore - - if data == b'': - raise TransportClosed( - f'transport {self} was already closed prior to read' - ) - - unpacker.feed(data) - for packet in unpacker: - yield packet - - @property - def laddr(self) -> Tuple[Any, ...]: - return self._laddr - - @property - def raddr(self) -> Tuple[Any, ...]: - return self._raddr - - async def send(self, msg: Any) -> None: - async with self._send_lock: - return await self.stream.send_all( - msgpack.dumps(msg, use_bin_type=True) - ) - - async def recv(self) -> Any: - return await self._agen.asend(None) - - async def drain(self) -> AsyncIterator[dict]: - ''' - Drain the stream's remaining messages sent from - the far end until the connection is closed by - the peer. - - ''' - try: - async for msg in self._iter_packets(): - self.drained.append(msg) - except TransportClosed: - for msg in self.drained: - yield msg - - def __aiter__(self): - return self._agen - - def connected(self) -> bool: - return self.stream.socket.fileno() != -1 - - -class MsgspecTCPStream(MsgpackTCPStream): - ''' - A ``trio.SocketStream`` delivering ``msgpack`` formatted data - using ``msgspec``. - - ''' - def __init__( - self, - stream: trio.SocketStream, - prefix_size: int = 4, - - ) -> None: - import msgspec - - super().__init__(stream) self.recv_stream = BufferedReceiveStream(transport_stream=stream) self.prefix_size = prefix_size @@ -287,6 +189,37 @@ class MsgspecTCPStream(MsgpackTCPStream): return await self.stream.send_all(size + bytes_data) + @property + def laddr(self) -> Tuple[Any, ...]: + return self._laddr + + @property + def raddr(self) -> Tuple[Any, ...]: + return self._raddr + + async def recv(self) -> Any: + return await self._agen.asend(None) + + async def drain(self) -> AsyncIterator[dict]: + ''' + Drain the stream's remaining messages sent from + the far end until the connection is closed by + the peer. + + ''' + try: + async for msg in self._iter_packets(): + self.drained.append(msg) + except TransportClosed: + for msg in self.drained: + yield msg + + def __aiter__(self): + return self._agen + + def connected(self) -> bool: + return self.stream.socket.fileno() != -1 + def get_msg_transport( @@ -296,7 +229,6 @@ def get_msg_transport( return { ('msgpack', 'tcp'): MsgpackTCPStream, - ('msgspec', 'tcp'): MsgspecTCPStream, }[key] @@ -325,14 +257,6 @@ class Channel: # self._recon_seq = on_reconnect # self._autorecon = auto_reconnect - # TODO: maybe expose this through the nursery api? - try: - # if installed load the msgspec transport since it's faster - import msgspec # noqa - msg_transport_type_key = ('msgspec', 'tcp') - except ImportError: - pass - self._destaddr = destaddr self._transport_key = msg_transport_type_key From bb3f35cdd00ce49f61f891ea30202d20eabb1331 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Jul 2022 20:35:53 -0400 Subject: [PATCH 03/11] Drop `msgspec` specific CI jobs --- .github/workflows/ci.yml | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ea9d2ba..fcc5260 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -76,35 +76,6 @@ jobs: - name: Run tests run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs - testing-linux-msgspec: - # runs jobs on all OS's but with optional `msgspec` dep installed - name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }} - msgspec' - timeout-minutes: 10 - runs-on: ${{ matrix.os }} - - strategy: - fail-fast: false - matrix: - os: [ubuntu-latest] - python: ['3.9', '3.10'] - spawn_backend: ['trio', 'mp'] - - steps: - - - name: Checkout - uses: actions/checkout@v2 - - - name: Setup python - uses: actions/setup-python@v2 - with: - python-version: '${{ matrix.python }}' - - - name: Install dependencies - run: pip install -U .[msgspec] -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager - - - name: Run tests - run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs - # We skip 3.10 on windows for now due to # https://github.com/pytest-dev/pytest/issues/8733 # some kinda weird `pyreadline` issue.. From 4e7ab54452519061c69db630c0dc3514f2849bce Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Jul 2022 11:22:30 -0400 Subject: [PATCH 04/11] Appease `mypy` --- tractor/_ipc.py | 49 +++++++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 09e6301..82ba9ae 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -22,10 +22,17 @@ from __future__ import annotations import platform import struct import typing -from collections.abc import AsyncGenerator, AsyncIterator +from collections.abc import ( + AsyncGenerator, + AsyncIterator, +) from typing import ( - Any, Tuple, Optional, - Type, Protocol, TypeVar, + Any, + runtime_checkable, + Optional, + Protocol, + Type, + TypeVar, ) from tricycle import BufferedReceiveStream @@ -42,7 +49,7 @@ _is_windows = platform.system() == 'Windows' log = get_logger(__name__) -def get_stream_addrs(stream: trio.SocketStream) -> Tuple: +def get_stream_addrs(stream: trio.SocketStream) -> tuple: # should both be IP sockets lsockname = stream.socket.getsockname() rsockname = stream.socket.getpeername() @@ -60,6 +67,7 @@ MsgType = TypeVar("MsgType") # - https://jcristharif.com/msgspec/usage.html#structs +@runtime_checkable class MsgTransport(Protocol[MsgType]): stream: trio.SocketStream @@ -87,15 +95,18 @@ class MsgTransport(Protocol[MsgType]): ... @property - def laddr(self) -> Tuple[str, int]: + def laddr(self) -> tuple[str, int]: ... @property - def raddr(self) -> Tuple[str, int]: + def raddr(self) -> tuple[str, int]: ... -class MsgpackTCPStream: +# TODO: not sure why we have to inherit here, but it seems to be an +# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``; +# probably should make a `mypy` issue? +class MsgpackTCPStream(MsgTransport): ''' A ``trio.SocketStream`` delivering ``msgpack`` formatted data using the ``msgspec`` codec lib. @@ -190,11 +201,11 @@ class MsgpackTCPStream: return await self.stream.send_all(size + bytes_data) @property - def laddr(self) -> Tuple[Any, ...]: + def laddr(self) -> tuple[str, int]: return self._laddr @property - def raddr(self) -> Tuple[Any, ...]: + def raddr(self) -> tuple[str, int]: return self._raddr async def recv(self) -> Any: @@ -223,7 +234,7 @@ class MsgpackTCPStream: def get_msg_transport( - key: Tuple[str, str], + key: tuple[str, str], ) -> Type[MsgTransport]: @@ -237,16 +248,18 @@ class Channel: An inter-process channel for communication between (remote) actors. Wraps a ``MsgStream``: transport + encoding IPC connection. + Currently we only support ``trio.SocketStream`` for transport - (aka TCP). + (aka TCP) and the ``msgpack`` interchange format via the ``msgspec`` + codec libary. ''' def __init__( self, - destaddr: Optional[Tuple[str, int]], + destaddr: Optional[tuple[str, int]], - msg_transport_type_key: Tuple[str, str] = ('msgpack', 'tcp'), + msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'), # TODO: optional reconnection support? # auto_reconnect: bool = False, @@ -266,7 +279,7 @@ class Channel: self.msgstream: Optional[MsgTransport] = None # set after handshake - always uid of far end - self.uid: Optional[Tuple[str, str]] = None + self.uid: Optional[tuple[str, str]] = None self._agen = self._aiter_recv() self._exc: Optional[Exception] = None # set if far end actor errors @@ -294,7 +307,7 @@ class Channel: def set_msg_transport( self, stream: trio.SocketStream, - type_key: Optional[Tuple[str, str]] = None, + type_key: Optional[tuple[str, str]] = None, ) -> MsgTransport: type_key = type_key or self._transport_key @@ -309,16 +322,16 @@ class Channel: return object.__repr__(self) @property - def laddr(self) -> Optional[Tuple[str, int]]: + def laddr(self) -> Optional[tuple[str, int]]: return self.msgstream.laddr if self.msgstream else None @property - def raddr(self) -> Optional[Tuple[str, int]]: + def raddr(self) -> Optional[tuple[str, int]]: return self.msgstream.raddr if self.msgstream else None async def connect( self, - destaddr: Tuple[Any, ...] = None, + destaddr: tuple[Any, ...] = None, **kwargs ) -> MsgTransport: From fc36503f4f7b5ba8f954b991b53aec53aa9c1d71 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Jul 2022 11:43:10 -0400 Subject: [PATCH 05/11] Add nooz file --- nooz/317.misc.rst | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 nooz/317.misc.rst diff --git a/nooz/317.misc.rst b/nooz/317.misc.rst new file mode 100644 index 0000000..724eb93 --- /dev/null +++ b/nooz/317.misc.rst @@ -0,0 +1,8 @@ +Drop use of the ``msgpack`` package and instead move fully to the +``msgspec`` codec library. + +We've now used ``msgspec`` extensively in production and there's no +reason to not use it as default. Further this change preps us for the up +and coming typed messaging semantics (#196), dialog-unprotocol system +(#297), and caps-based messaging-protocols (#299) planned before our +first beta. From 53e3648eca9155aebbb517d129efed913b3a2c3c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 12 Jul 2022 11:52:42 -0400 Subject: [PATCH 06/11] Readme bump --- docs/README.rst | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/docs/README.rst b/docs/README.rst index 527f85a..ffe232b 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -24,7 +24,7 @@ Features - Builtin IPC streaming APIs with task fan-out broadcasting - A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_ - Support for a swappable, OS specific, process spawning layer -- A modular transport stack, allowing for custom serialization (eg. +- A modular transport stack, allowing for custom serialization (eg. with `msgspec`_), communications protocols, and environment specific IPC primitives - Support for spawning process-level-SC, inter-loop one-to-one-task oriented @@ -489,12 +489,6 @@ From PyPi:: pip install tractor -To try out the (optionally) faster `msgspec`_ codec instead of the -default ``msgpack`` lib:: - - pip install tractor[msgspec] - - From git:: pip install git+git://github.com/goodboy/tractor.git @@ -563,11 +557,15 @@ properties of the system. What's on the TODO: ------------------- -Help us push toward the future. +Help us push toward the future of distributed `Python`. -- Typed messaging protocols (ex. via ``msgspec``, see `#36 +- Erlang-style supervisors via composed context managers (see `#22 + `_) +- Typed messaging protocols (ex. via ``msgspec.Struct``, see `#36 `_) -- Erlang-style supervisors via composed context managers +- Typed capability-based (dialog) protocols ( see `#196 + `_ with draft work + started in `#311 `_) Feel like saying hi? From f594f1bddafcb7959e6665d405697ae926097fca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Mar 2022 06:47:43 -0400 Subject: [PATCH 07/11] Handle a connection reset on `msgspec` transport --- tractor/_ipc.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 82ba9ae..51f84f1 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -152,6 +152,7 @@ class MsgpackTCPStream(MsgTransport): except ( ValueError, + ConnectionResetError, # not sure entirely why we need this but without it we # seem to be getting racy failures here on From 932b8411766740664cb94e9f7d7c4b39134077e8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 31 May 2022 12:19:29 -0400 Subject: [PATCH 08/11] Allow up to 4 `msgpsec` decode failures --- tractor/_ipc.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 51f84f1..f0ffb75 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -144,7 +144,7 @@ class MsgpackTCPStream(MsgTransport): ''' import msgspec # noqa - last_decode_failed: bool = False + decodes_failed: int = 0 while True: try: @@ -181,12 +181,16 @@ class MsgpackTCPStream(MsgTransport): msgspec.DecodeError, UnicodeDecodeError, ): - if not last_decode_failed: + if decodes_failed < 4: # ignore decoding errors for now and assume they have to # do with a channel drop - hope that receiving from the # channel will raise an expected error and bubble up. - log.error('`msgspec` failed to decode!?') - last_decode_failed = True + log.error( + '`msgspec` failed to decode!?\n' + 'dumping bytes:\n' + f'{msg_bytes}' + ) + decodes_failed += 1 else: raise From 673c4a8c66a256a935c8fb046d745dfcfe586bdb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jun 2022 16:23:38 -0400 Subject: [PATCH 09/11] Decode bytes prior to log msg --- tractor/_ipc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index f0ffb75..2f51004 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -185,10 +185,11 @@ class MsgpackTCPStream(MsgTransport): # ignore decoding errors for now and assume they have to # do with a channel drop - hope that receiving from the # channel will raise an expected error and bubble up. + decoded_bytes = msg_bytes.decode() log.error( '`msgspec` failed to decode!?\n' 'dumping bytes:\n' - f'{msg_bytes}' + f'{decoded_bytes}' ) decodes_failed += 1 else: From 5168700fbfd73172be940eb80a53fc9e0ab991ad Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Jul 2022 14:37:46 -0400 Subject: [PATCH 10/11] Tolerate non-decode-able bytes --- tractor/_ipc.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 2f51004..6679366 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -185,11 +185,15 @@ class MsgpackTCPStream(MsgTransport): # ignore decoding errors for now and assume they have to # do with a channel drop - hope that receiving from the # channel will raise an expected error and bubble up. - decoded_bytes = msg_bytes.decode() + try: + data = msg_bytes.decode() + except UnicodeDecodeError: + data = msg_bytes + log.error( '`msgspec` failed to decode!?\n' 'dumping bytes:\n' - f'{decoded_bytes}' + f'{data}' ) decodes_failed += 1 else: From 41983edc439cb22d912701770d22b4b4756d6fc9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Jul 2022 09:42:26 -0400 Subject: [PATCH 11/11] Use `str` | `bytes` union for typing msg dump --- tractor/_ipc.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 6679366..0ed22bb 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -186,14 +186,14 @@ class MsgpackTCPStream(MsgTransport): # do with a channel drop - hope that receiving from the # channel will raise an expected error and bubble up. try: - data = msg_bytes.decode() + msg_str: str | bytes = msg_bytes.decode() except UnicodeDecodeError: - data = msg_bytes + msg_str = msg_bytes log.error( '`msgspec` failed to decode!?\n' 'dumping bytes:\n' - f'{data}' + f'{msg_str!r}' ) decodes_failed += 1 else: