diff --git a/setup.py b/setup.py index e7cc512..e2298ab 100755 --- a/setup.py +++ b/setup.py @@ -45,14 +45,21 @@ setup( 'tricycle', 'trio_typing', + # tooling 'colorlog', 'wrapt', 'pdbpp', # serialization 'msgpack', - 'msgspec', + ], + extras_require={ + + # serialization + 'msgspec': ['msgspec; python_version >= 3.9'], + + }, tests_require=['pytest'], python_requires=">=3.7", keywords=[ diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 13bceb0..88f9d2d 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -8,7 +8,6 @@ from typing import Any, Tuple, Optional from tricycle import BufferedReceiveStream import msgpack -import msgspec import trio from async_generator import asynccontextmanager @@ -103,8 +102,6 @@ class MsgspecTCPStream(MsgpackTCPStream): using ``msgspec``. ''' - ms_encode = msgspec.Encoder().encode - def __init__( self, stream: trio.SocketStream, @@ -115,10 +112,15 @@ class MsgspecTCPStream(MsgpackTCPStream): self.recv_stream = BufferedReceiveStream(transport_stream=stream) self.prefix_size = prefix_size + import msgspec + + # TODO: struct aware messaging coders + self.encode = msgspec.Encoder().encode + self.decode = msgspec.Decoder().decode # dict[str, Any]) + async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ - decoder = msgspec.Decoder() # dict[str, Any]) while True: try: @@ -141,12 +143,12 @@ class MsgspecTCPStream(MsgpackTCPStream): msg_bytes = await self.recv_stream.receive_exactly(size) log.trace(f"received {msg_bytes}") # type: ignore - yield decoder.decode(msg_bytes) + yield self.decode(msg_bytes) async def send(self, data: Any) -> None: async with self._send_lock: - bytes_data = self.ms_encode(data) + bytes_data = self.encode(data) # supposedly the fastest says, # https://stackoverflow.com/a/54027962 @@ -168,13 +170,19 @@ class Channel: auto_reconnect: bool = False, stream: trio.SocketStream = None, # expected to be active - # stream_serializer_type: type = MsgspecTCPStream, - stream_serializer_type: type = MsgpackTCPStream, - ) -> None: self._recon_seq = on_reconnect self._autorecon = auto_reconnect + + try: + # if installed load the msgspec transport since it's faster + import msgspec # noqa + stream_serializer_type: type = MsgspecTCPStream + + except ImportError: + stream_serializer_type: type = MsgpackTCPStream + self.stream_serializer_type = stream_serializer_type self.msgstream: Optional[type] = stream_serializer_type( stream) if stream else None