forked from goodboy/tractor
Try out `msgspec` in our msgpack stream channel
Can only really use an encoder currently since there is no streaming api in `msgspec` as of currently. See jcrist/msgspec#27. Not sure if any encoding speedups are currently noticeable especially without any validation going on yet XD. First experiments toward #196try_msgspec
parent
aef8f0d18c
commit
9da35dc8bd
|
@ -2,10 +2,11 @@
|
|||
Inter-process comms abstractions
|
||||
"""
|
||||
import typing
|
||||
from typing import Any, Tuple, Optional
|
||||
from typing import Any, Tuple, Optional, Callable
|
||||
from functools import partial
|
||||
|
||||
import msgpack
|
||||
import msgspec
|
||||
import trio
|
||||
from async_generator import asynccontextmanager
|
||||
|
||||
|
@ -21,16 +22,32 @@ except ImportError:
|
|||
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
|
||||
|
||||
|
||||
ms_decode = msgspec.Encoder().encode
|
||||
|
||||
|
||||
class MsgpackStream:
|
||||
"""A ``trio.SocketStream`` delivering ``msgpack`` formatted data.
|
||||
|
||||
"""
|
||||
def __init__(self, stream: trio.SocketStream) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
stream: trio.SocketStream,
|
||||
serialize: Callable = Unpacker(
|
||||
raw=False,
|
||||
use_list=False,
|
||||
).feed,
|
||||
deserialize: Callable = msgpack.dumps,
|
||||
|
||||
) -> None:
|
||||
|
||||
self.stream = stream
|
||||
assert self.stream.socket
|
||||
|
||||
# should both be IP sockets
|
||||
lsockname = stream.socket.getsockname()
|
||||
assert isinstance(lsockname, tuple)
|
||||
self._laddr = lsockname[:2]
|
||||
|
||||
rsockname = stream.socket.getpeername()
|
||||
assert isinstance(rsockname, tuple)
|
||||
self._raddr = rsockname[:2]
|
||||
|
@ -45,6 +62,7 @@ class MsgpackStream:
|
|||
raw=False,
|
||||
use_list=False,
|
||||
)
|
||||
# decoder = msgspec.Decoder() #dict[str, Any])
|
||||
while True:
|
||||
try:
|
||||
data = await self.stream.receive_some(2**10)
|
||||
|
@ -57,6 +75,7 @@ class MsgpackStream:
|
|||
log.debug(f"Stream connection {self.raddr} was closed")
|
||||
return
|
||||
|
||||
# yield decoder.decode(data)
|
||||
unpacker.feed(data)
|
||||
for packet in unpacker:
|
||||
yield packet
|
||||
|
@ -73,7 +92,9 @@ class MsgpackStream:
|
|||
async def send(self, data: Any) -> None:
|
||||
async with self._send_lock:
|
||||
return await self.stream.send_all(
|
||||
msgpack.dumps(data, use_bin_type=True))
|
||||
# msgpack.dumps(data, use_bin_type=True))
|
||||
ms_decode(data)
|
||||
)
|
||||
|
||||
async def recv(self) -> Any:
|
||||
return await self._agen.asend(None)
|
||||
|
|
Loading…
Reference in New Issue