diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 9d34b3a..3d0cac2 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -2,10 +2,11 @@ Inter-process comms abstractions """ import typing -from typing import Any, Tuple, Optional +from typing import Any, Tuple, Optional, Callable from functools import partial import msgpack +import msgspec import trio from async_generator import asynccontextmanager @@ -21,16 +22,32 @@ except ImportError: Unpacker = partial(msgpack.Unpacker, strict_map_key=False) +ms_decode = msgspec.Encoder().encode + + class MsgpackStream: """A ``trio.SocketStream`` delivering ``msgpack`` formatted data. + """ - def __init__(self, stream: trio.SocketStream) -> None: + def __init__( + self, + stream: trio.SocketStream, + serialize: Callable = Unpacker( + raw=False, + use_list=False, + ).feed, + deserialize: Callable = msgpack.dumps, + + ) -> None: + self.stream = stream assert self.stream.socket + # should both be IP sockets lsockname = stream.socket.getsockname() assert isinstance(lsockname, tuple) self._laddr = lsockname[:2] + rsockname = stream.socket.getpeername() assert isinstance(rsockname, tuple) self._raddr = rsockname[:2] @@ -45,6 +62,7 @@ class MsgpackStream: raw=False, use_list=False, ) + # decoder = msgspec.Decoder() #dict[str, Any]) while True: try: data = await self.stream.receive_some(2**10) @@ -57,6 +75,7 @@ class MsgpackStream: log.debug(f"Stream connection {self.raddr} was closed") return + # yield decoder.decode(data) unpacker.feed(data) for packet in unpacker: yield packet @@ -73,7 +92,9 @@ class MsgpackStream: async def send(self, data: Any) -> None: async with self._send_lock: return await self.stream.send_all( - msgpack.dumps(data, use_bin_type=True)) + # msgpack.dumps(data, use_bin_type=True)) + ms_decode(data) + ) async def recv(self) -> Any: return await self._agen.asend(None)