Use msgpack for quote-packet serialization
parent
dd5e1e7ea7
commit
51b44cf236
|
@ -3,12 +3,12 @@ Core broker-daemon tasks and API.
|
|||
"""
|
||||
import time
|
||||
import inspect
|
||||
import json
|
||||
from functools import partial
|
||||
import socket
|
||||
from types import ModuleType
|
||||
from typing import Coroutine
|
||||
|
||||
import msgpack
|
||||
import trio
|
||||
|
||||
from ..log import get_logger
|
||||
|
@ -88,6 +88,7 @@ class StreamQueue:
|
|||
"""
|
||||
delim = self._delim
|
||||
buff = b''
|
||||
unpacker = msgpack.Unpacker(raw=False)
|
||||
while True:
|
||||
packets = []
|
||||
try:
|
||||
|
@ -101,23 +102,13 @@ class StreamQueue:
|
|||
log.debug("Stream connection was closed")
|
||||
return
|
||||
|
||||
if buff: # last received packet was segmented
|
||||
data = buff + data
|
||||
|
||||
# if last packet has not fully arrived it will
|
||||
# be a truncated byte-stream
|
||||
packets = data.split(delim)
|
||||
buff = packets.pop()
|
||||
|
||||
for packet in packets:
|
||||
try:
|
||||
yield json.loads(packet)
|
||||
except json.decoder.JSONDecodeError:
|
||||
log.exception(f"Failed to process JSON packet: {buff}")
|
||||
continue
|
||||
unpacker.feed(data)
|
||||
for packet in unpacker:
|
||||
yield packet
|
||||
|
||||
async def put(self, data):
|
||||
return await self.stream.send_all(json.dumps(data).encode() + b'\n')
|
||||
return await self.stream.send_all(
|
||||
msgpack.dumps(data, use_bin_type=True))
|
||||
|
||||
async def get(self):
|
||||
return await self._agen.asend(None)
|
||||
|
|
Loading…
Reference in New Issue