Merge pull request #317 from goodboy/drop_msgpack

Drop `msgpack`
aio_error_propagation
goodboy 2022-07-12 13:31:45 -04:00 committed by GitHub
commit 80121ed211
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 101 additions and 182 deletions

View File

@ -76,35 +76,6 @@ jobs:
- name: Run tests - name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs
testing-linux-msgspec:
# runs jobs on all OS's but with optional `msgspec` dep installed
name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }} - msgspec'
timeout-minutes: 10
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
python: ['3.9', '3.10']
spawn_backend: ['trio', 'mp']
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '${{ matrix.python }}'
- name: Install dependencies
run: pip install -U .[msgspec] -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager
- name: Run tests
run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs
# We skip 3.10 on windows for now due to # We skip 3.10 on windows for now due to
# https://github.com/pytest-dev/pytest/issues/8733 # https://github.com/pytest-dev/pytest/issues/8733
# some kinda weird `pyreadline` issue.. # some kinda weird `pyreadline` issue..

View File

@ -24,7 +24,7 @@ Features
- Builtin IPC streaming APIs with task fan-out broadcasting - Builtin IPC streaming APIs with task fan-out broadcasting
- A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_ - A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_
- Support for a swappable, OS specific, process spawning layer - Support for a swappable, OS specific, process spawning layer
- A modular transport stack, allowing for custom serialization (eg. - A modular transport stack, allowing for custom serialization (eg. with
`msgspec`_), communications protocols, and environment specific IPC `msgspec`_), communications protocols, and environment specific IPC
primitives primitives
- Support for spawning process-level-SC, inter-loop one-to-one-task oriented - Support for spawning process-level-SC, inter-loop one-to-one-task oriented
@ -489,12 +489,6 @@ From PyPi::
pip install tractor pip install tractor
To try out the (optionally) faster `msgspec`_ codec instead of the
default ``msgpack`` lib::
pip install tractor[msgspec]
From git:: From git::
pip install git+git://github.com/goodboy/tractor.git pip install git+git://github.com/goodboy/tractor.git
@ -563,11 +557,15 @@ properties of the system.
What's on the TODO: What's on the TODO:
------------------- -------------------
Help us push toward the future. Help us push toward the future of distributed `Python`.
- Typed messaging protocols (ex. via ``msgspec``, see `#36 - Erlang-style supervisors via composed context managers (see `#22
<https://github.com/goodboy/tractor/issues/22>`_)
- Typed messaging protocols (ex. via ``msgspec.Struct``, see `#36
<https://github.com/goodboy/tractor/issues/36>`_) <https://github.com/goodboy/tractor/issues/36>`_)
- Erlang-style supervisors via composed context managers - Typed capability-based (dialog) protocols ( see `#196
<https://github.com/goodboy/tractor/issues/196>`_ with draft work
started in `#311 <https://github.com/goodboy/tractor/pull/311>`_)
Feel like saying hi? Feel like saying hi?

View File

@ -0,0 +1,8 @@
Drop use of the ``msgpack`` package and instead move fully to the
``msgspec`` codec library.
We've now used ``msgspec`` extensively in production and there's no
reason to not use it as default. Further this change preps us for the up
and coming typed messaging semantics (#196), dialog-unprotocol system
(#297), and caps-based messaging-protocols (#299) planned before our
first beta.

View File

@ -51,9 +51,6 @@ setup(
'tricycle', 'tricycle',
'trio_typing', 'trio_typing',
# serialization
'msgpack>=1.0.3',
# tooling # tooling
'colorlog', 'colorlog',
'wrapt', 'wrapt',
@ -63,21 +60,19 @@ setup(
# https://github.com/pdbpp/fancycompleter/issues/37 # https://github.com/pdbpp/fancycompleter/issues/37
'pyreadline3 ; platform_system == "Windows"', 'pyreadline3 ; platform_system == "Windows"',
],
extras_require={
# serialization # serialization
'msgspec': ['msgspec >= "0.4.0"'], 'msgspec >= "0.4.0"'
}, ],
tests_require=['pytest'], tests_require=['pytest'],
python_requires=">=3.9", python_requires=">=3.9",
keywords=[ keywords=[
'trio', 'trio',
"async", 'async',
"concurrency", 'concurrency',
"actor model", 'structured concurrency',
"distributed", 'actor model',
'distributed',
'multiprocessing' 'multiprocessing'
], ],
classifiers=[ classifiers=[
@ -88,7 +83,7 @@ setup(
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.9",
"Intended Audience :: Science/Research", "Intended Audience :: Science/Research",
"Intended Audience :: Developers", "Intended Audience :: Developers",

View File

@ -22,14 +22,21 @@ from __future__ import annotations
import platform import platform
import struct import struct
import typing import typing
from collections.abc import AsyncGenerator, AsyncIterator from collections.abc import (
AsyncGenerator,
AsyncIterator,
)
from typing import ( from typing import (
Any, Tuple, Optional, Any,
Type, Protocol, TypeVar, runtime_checkable,
Optional,
Protocol,
Type,
TypeVar,
) )
from tricycle import BufferedReceiveStream from tricycle import BufferedReceiveStream
import msgpack import msgspec
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
@ -42,7 +49,7 @@ _is_windows = platform.system() == 'Windows'
log = get_logger(__name__) log = get_logger(__name__)
def get_stream_addrs(stream: trio.SocketStream) -> Tuple: def get_stream_addrs(stream: trio.SocketStream) -> tuple:
# should both be IP sockets # should both be IP sockets
lsockname = stream.socket.getsockname() lsockname = stream.socket.getsockname()
rsockname = stream.socket.getpeername() rsockname = stream.socket.getpeername()
@ -60,6 +67,7 @@ MsgType = TypeVar("MsgType")
# - https://jcristharif.com/msgspec/usage.html#structs # - https://jcristharif.com/msgspec/usage.html#structs
@runtime_checkable
class MsgTransport(Protocol[MsgType]): class MsgTransport(Protocol[MsgType]):
stream: trio.SocketStream stream: trio.SocketStream
@ -87,23 +95,27 @@ class MsgTransport(Protocol[MsgType]):
... ...
@property @property
def laddr(self) -> Tuple[str, int]: def laddr(self) -> tuple[str, int]:
... ...
@property @property
def raddr(self) -> Tuple[str, int]: def raddr(self) -> tuple[str, int]:
... ...
class MsgpackTCPStream: # TODO: not sure why we have to inherit here, but it seems to be an
# issue with ``get_msg_transport()`` returning a ``Type[Protocol]``;
# probably should make a `mypy` issue?
class MsgpackTCPStream(MsgTransport):
''' '''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgpack-python``. using the ``msgspec`` codec lib.
''' '''
def __init__( def __init__(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
prefix_size: int = 4,
) -> None: ) -> None:
@ -120,105 +132,6 @@ class MsgpackTCPStream:
# public i guess? # public i guess?
self.drained: list[dict] = [] self.drained: list[dict] = []
async def _iter_packets(self) -> AsyncGenerator[dict, None]:
'''
Yield packets from the underlying stream.
'''
unpacker = msgpack.Unpacker(
raw=False,
)
while True:
try:
data = await self.stream.receive_some(2**10)
except trio.BrokenResourceError as err:
msg = err.args[0]
# XXX: handle connection-reset-by-peer the same as a EOF.
# we're currently remapping this since we allow
# a quick connect then drop for root actors when
# checking to see if there exists an "arbiter"
# on the chosen sockaddr (``_root.py:108`` or thereabouts)
if (
# nix
'[Errno 104]' in msg or
# on windows it seems there are a variety of errors
# to handle..
_is_windows
):
raise TransportClosed(
f'{self} was broken with {msg}'
)
else:
raise
log.transport(f"received {data}") # type: ignore
if data == b'':
raise TransportClosed(
f'transport {self} was already closed prior to read'
)
unpacker.feed(data)
for packet in unpacker:
yield packet
@property
def laddr(self) -> Tuple[Any, ...]:
return self._laddr
@property
def raddr(self) -> Tuple[Any, ...]:
return self._raddr
async def send(self, msg: Any) -> None:
async with self._send_lock:
return await self.stream.send_all(
msgpack.dumps(msg, use_bin_type=True)
)
async def recv(self) -> Any:
return await self._agen.asend(None)
async def drain(self) -> AsyncIterator[dict]:
'''
Drain the stream's remaining messages sent from
the far end until the connection is closed by
the peer.
'''
try:
async for msg in self._iter_packets():
self.drained.append(msg)
except TransportClosed:
for msg in self.drained:
yield msg
def __aiter__(self):
return self._agen
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
class MsgspecTCPStream(MsgpackTCPStream):
'''
A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgspec``.
'''
def __init__(
self,
stream: trio.SocketStream,
prefix_size: int = 4,
) -> None:
import msgspec
super().__init__(stream)
self.recv_stream = BufferedReceiveStream(transport_stream=stream) self.recv_stream = BufferedReceiveStream(transport_stream=stream)
self.prefix_size = prefix_size self.prefix_size = prefix_size
@ -231,7 +144,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
''' '''
import msgspec # noqa import msgspec # noqa
last_decode_failed: bool = False decodes_failed: int = 0
while True: while True:
try: try:
@ -239,6 +152,7 @@ class MsgspecTCPStream(MsgpackTCPStream):
except ( except (
ValueError, ValueError,
ConnectionResetError,
# not sure entirely why we need this but without it we # not sure entirely why we need this but without it we
# seem to be getting racy failures here on # seem to be getting racy failures here on
@ -267,12 +181,21 @@ class MsgspecTCPStream(MsgpackTCPStream):
msgspec.DecodeError, msgspec.DecodeError,
UnicodeDecodeError, UnicodeDecodeError,
): ):
if not last_decode_failed: if decodes_failed < 4:
# ignore decoding errors for now and assume they have to # ignore decoding errors for now and assume they have to
# do with a channel drop - hope that receiving from the # do with a channel drop - hope that receiving from the
# channel will raise an expected error and bubble up. # channel will raise an expected error and bubble up.
log.error('`msgspec` failed to decode!?') try:
last_decode_failed = True msg_str: str | bytes = msg_bytes.decode()
except UnicodeDecodeError:
msg_str = msg_bytes
log.error(
'`msgspec` failed to decode!?\n'
'dumping bytes:\n'
f'{msg_str!r}'
)
decodes_failed += 1
else: else:
raise raise
@ -287,16 +210,46 @@ class MsgspecTCPStream(MsgpackTCPStream):
return await self.stream.send_all(size + bytes_data) return await self.stream.send_all(size + bytes_data)
@property
def laddr(self) -> tuple[str, int]:
return self._laddr
@property
def raddr(self) -> tuple[str, int]:
return self._raddr
async def recv(self) -> Any:
return await self._agen.asend(None)
async def drain(self) -> AsyncIterator[dict]:
'''
Drain the stream's remaining messages sent from
the far end until the connection is closed by
the peer.
'''
try:
async for msg in self._iter_packets():
self.drained.append(msg)
except TransportClosed:
for msg in self.drained:
yield msg
def __aiter__(self):
return self._agen
def connected(self) -> bool:
return self.stream.socket.fileno() != -1
def get_msg_transport( def get_msg_transport(
key: Tuple[str, str], key: tuple[str, str],
) -> Type[MsgTransport]: ) -> Type[MsgTransport]:
return { return {
('msgpack', 'tcp'): MsgpackTCPStream, ('msgpack', 'tcp'): MsgpackTCPStream,
('msgspec', 'tcp'): MsgspecTCPStream,
}[key] }[key]
@ -305,16 +258,18 @@ class Channel:
An inter-process channel for communication between (remote) actors. An inter-process channel for communication between (remote) actors.
Wraps a ``MsgStream``: transport + encoding IPC connection. Wraps a ``MsgStream``: transport + encoding IPC connection.
Currently we only support ``trio.SocketStream`` for transport Currently we only support ``trio.SocketStream`` for transport
(aka TCP). (aka TCP) and the ``msgpack`` interchange format via the ``msgspec``
codec libary.
''' '''
def __init__( def __init__(
self, self,
destaddr: Optional[Tuple[str, int]], destaddr: Optional[tuple[str, int]],
msg_transport_type_key: Tuple[str, str] = ('msgpack', 'tcp'), msg_transport_type_key: tuple[str, str] = ('msgpack', 'tcp'),
# TODO: optional reconnection support? # TODO: optional reconnection support?
# auto_reconnect: bool = False, # auto_reconnect: bool = False,
@ -325,14 +280,6 @@ class Channel:
# self._recon_seq = on_reconnect # self._recon_seq = on_reconnect
# self._autorecon = auto_reconnect # self._autorecon = auto_reconnect
# TODO: maybe expose this through the nursery api?
try:
# if installed load the msgspec transport since it's faster
import msgspec # noqa
msg_transport_type_key = ('msgspec', 'tcp')
except ImportError:
pass
self._destaddr = destaddr self._destaddr = destaddr
self._transport_key = msg_transport_type_key self._transport_key = msg_transport_type_key
@ -342,7 +289,7 @@ class Channel:
self.msgstream: Optional[MsgTransport] = None self.msgstream: Optional[MsgTransport] = None
# set after handshake - always uid of far end # set after handshake - always uid of far end
self.uid: Optional[Tuple[str, str]] = None self.uid: Optional[tuple[str, str]] = None
self._agen = self._aiter_recv() self._agen = self._aiter_recv()
self._exc: Optional[Exception] = None # set if far end actor errors self._exc: Optional[Exception] = None # set if far end actor errors
@ -370,7 +317,7 @@ class Channel:
def set_msg_transport( def set_msg_transport(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
type_key: Optional[Tuple[str, str]] = None, type_key: Optional[tuple[str, str]] = None,
) -> MsgTransport: ) -> MsgTransport:
type_key = type_key or self._transport_key type_key = type_key or self._transport_key
@ -385,16 +332,16 @@ class Channel:
return object.__repr__(self) return object.__repr__(self)
@property @property
def laddr(self) -> Optional[Tuple[str, int]]: def laddr(self) -> Optional[tuple[str, int]]:
return self.msgstream.laddr if self.msgstream else None return self.msgstream.laddr if self.msgstream else None
@property @property
def raddr(self) -> Optional[Tuple[str, int]]: def raddr(self) -> Optional[tuple[str, int]]:
return self.msgstream.raddr if self.msgstream else None return self.msgstream.raddr if self.msgstream else None
async def connect( async def connect(
self, self,
destaddr: Tuple[Any, ...] = None, destaddr: tuple[Any, ...] = None,
**kwargs **kwargs
) -> MsgTransport: ) -> MsgTransport: