forked from goodboy/tractor
1
0
Fork 0

Try out `msgspec` in our msgpack stream channel

Can only really use an encoder currently since there is no streaming api
in `msgspec` as of currently. See jcrist/msgspec#27.

Not sure if any encoding speedups are currently noticeable especially
without any validation going on yet XD.

First experiments toward #196
msgspec_infect_asyncio
Tyler Goodlet 2021-05-30 17:19:20 -04:00
parent 93a83eab1c
commit adc77861bb
1 changed files with 12 additions and 3 deletions

View File

@ -4,10 +4,11 @@ Inter-process comms abstractions
""" """
import platform import platform
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional, Callable
from functools import partial from functools import partial
import msgpack import msgpack
import msgspec
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
@ -27,6 +28,9 @@ except ImportError:
Unpacker = partial(msgpack.Unpacker, strict_map_key=False) Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
ms_decode = msgspec.Encoder().encode
class MsgpackTCPStream: class MsgpackTCPStream:
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
using ``msgpack-python``. using ``msgpack-python``.
@ -35,15 +39,16 @@ class MsgpackTCPStream:
def __init__( def __init__(
self, self,
stream: trio.SocketStream, stream: trio.SocketStream,
) -> None: ) -> None:
self.stream = stream self.stream = stream
assert self.stream.socket assert self.stream.socket
# should both be IP sockets # should both be IP sockets
lsockname = stream.socket.getsockname() lsockname = stream.socket.getsockname()
assert isinstance(lsockname, tuple) assert isinstance(lsockname, tuple)
self._laddr = lsockname[:2] self._laddr = lsockname[:2]
rsockname = stream.socket.getpeername() rsockname = stream.socket.getpeername()
assert isinstance(rsockname, tuple) assert isinstance(rsockname, tuple)
self._raddr = rsockname[:2] self._raddr = rsockname[:2]
@ -61,6 +66,7 @@ class MsgpackTCPStream:
raw=False, raw=False,
use_list=False, use_list=False,
) )
# decoder = msgspec.Decoder() #dict[str, Any])
while True: while True:
try: try:
data = await self.stream.receive_some(2**10) data = await self.stream.receive_some(2**10)
@ -95,6 +101,7 @@ class MsgpackTCPStream:
f'transport {self} was already closed prior ro read' f'transport {self} was already closed prior ro read'
) )
# yield decoder.decode(data)
unpacker.feed(data) unpacker.feed(data)
for packet in unpacker: for packet in unpacker:
yield packet yield packet
@ -111,7 +118,9 @@ class MsgpackTCPStream:
async def send(self, data: Any) -> None: async def send(self, data: Any) -> None:
async with self._send_lock: async with self._send_lock:
return await self.stream.send_all( return await self.stream.send_all(
msgpack.dumps(data, use_bin_type=True)) # msgpack.dumps(data, use_bin_type=True))
ms_decode(data)
)
async def recv(self) -> Any: async def recv(self) -> Any:
return await self._agen.asend(None) return await self._agen.asend(None)