forked from goodboy/tractor
Try out `msgspec` in our msgpack stream channel
Can only really use an encoder currently since there is no streaming api in `msgspec` as of currently. See jcrist/msgspec#27. Not sure if any encoding speedups are currently noticeable especially without any validation going on yet XD. First experiments toward #196optional_msgspec_support
parent
4079f02acf
commit
dda0b22870
|
@ -4,10 +4,11 @@ Inter-process comms abstractions
|
|||
"""
|
||||
import platform
|
||||
import typing
|
||||
from typing import Any, Tuple, Optional
|
||||
from typing import Any, Tuple, Optional, Callable
|
||||
from functools import partial
|
||||
|
||||
import msgpack
|
||||
import msgspec
|
||||
import trio
|
||||
from async_generator import asynccontextmanager
|
||||
|
||||
|
@ -27,6 +28,9 @@ except ImportError:
|
|||
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
|
||||
|
||||
|
||||
ms_decode = msgspec.Encoder().encode
|
||||
|
||||
|
||||
class MsgpackTCPStream:
|
||||
'''A ``trio.SocketStream`` delivering ``msgpack`` formatted data
|
||||
using ``msgpack-python``.
|
||||
|
@ -35,15 +39,16 @@ class MsgpackTCPStream:
|
|||
def __init__(
|
||||
self,
|
||||
stream: trio.SocketStream,
|
||||
|
||||
) -> None:
|
||||
|
||||
self.stream = stream
|
||||
assert self.stream.socket
|
||||
|
||||
# should both be IP sockets
|
||||
lsockname = stream.socket.getsockname()
|
||||
assert isinstance(lsockname, tuple)
|
||||
self._laddr = lsockname[:2]
|
||||
|
||||
rsockname = stream.socket.getpeername()
|
||||
assert isinstance(rsockname, tuple)
|
||||
self._raddr = rsockname[:2]
|
||||
|
@ -61,6 +66,7 @@ class MsgpackTCPStream:
|
|||
raw=False,
|
||||
use_list=False,
|
||||
)
|
||||
# decoder = msgspec.Decoder() #dict[str, Any])
|
||||
while True:
|
||||
try:
|
||||
data = await self.stream.receive_some(2**10)
|
||||
|
@ -95,6 +101,7 @@ class MsgpackTCPStream:
|
|||
f'transport {self} was already closed prior ro read'
|
||||
)
|
||||
|
||||
# yield decoder.decode(data)
|
||||
unpacker.feed(data)
|
||||
for packet in unpacker:
|
||||
yield packet
|
||||
|
@ -111,7 +118,9 @@ class MsgpackTCPStream:
|
|||
async def send(self, data: Any) -> None:
|
||||
async with self._send_lock:
|
||||
return await self.stream.send_all(
|
||||
msgpack.dumps(data, use_bin_type=True))
|
||||
# msgpack.dumps(data, use_bin_type=True))
|
||||
ms_decode(data)
|
||||
)
|
||||
|
||||
async def recv(self) -> Any:
|
||||
return await self._agen.asend(None)
|
||||
|
|
Loading…
Reference in New Issue