diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 08057e9..6051a15 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -4,10 +4,11 @@ Inter-process comms abstractions """ import platform import typing -from typing import Any, Tuple, Optional +from typing import Any, Tuple, Optional, Callable from functools import partial import msgpack +import msgspec import trio from async_generator import asynccontextmanager @@ -27,6 +28,9 @@ except ImportError: Unpacker = partial(msgpack.Unpacker, strict_map_key=False) +ms_decode = msgspec.Encoder().encode + + class MsgpackTCPStream: '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgpack-python``. @@ -35,15 +39,16 @@ class MsgpackTCPStream: def __init__( self, stream: trio.SocketStream, - ) -> None: self.stream = stream assert self.stream.socket + # should both be IP sockets lsockname = stream.socket.getsockname() assert isinstance(lsockname, tuple) self._laddr = lsockname[:2] + rsockname = stream.socket.getpeername() assert isinstance(rsockname, tuple) self._raddr = rsockname[:2] @@ -61,6 +66,7 @@ class MsgpackTCPStream: raw=False, use_list=False, ) + # decoder = msgspec.Decoder() #dict[str, Any]) while True: try: data = await self.stream.receive_some(2**10) @@ -95,6 +101,7 @@ class MsgpackTCPStream: f'transport {self} was already closed prior ro read' ) + # yield decoder.decode(data) unpacker.feed(data) for packet in unpacker: yield packet @@ -111,7 +118,9 @@ class MsgpackTCPStream: async def send(self, data: Any) -> None: async with self._send_lock: return await self.stream.send_all( - msgpack.dumps(data, use_bin_type=True)) + # msgpack.dumps(data, use_bin_type=True)) + ms_decode(data) + ) async def recv(self) -> Any: return await self._agen.asend(None)