From 51b44cf236ad882c747b8e94d1fa7919c5cd3492 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Apr 2018 13:56:56 -0400 Subject: [PATCH] Use msgpack for quote-packet serialization --- piker/brokers/core.py | 23 +++++++---------------- setup.py | 2 +- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 5a4c9140..511c216c 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -3,12 +3,12 @@ Core broker-daemon tasks and API. """ import time import inspect -import json from functools import partial import socket from types import ModuleType from typing import Coroutine +import msgpack import trio from ..log import get_logger @@ -88,6 +88,7 @@ class StreamQueue: """ delim = self._delim buff = b'' + unpacker = msgpack.Unpacker(raw=False) while True: packets = [] try: @@ -101,23 +102,13 @@ class StreamQueue: log.debug("Stream connection was closed") return - if buff: # last received packet was segmented - data = buff + data - - # if last packet has not fully arrived it will - # be a truncated byte-stream - packets = data.split(delim) - buff = packets.pop() - - for packet in packets: - try: - yield json.loads(packet) - except json.decoder.JSONDecodeError: - log.exception(f"Failed to process JSON packet: {buff}") - continue + unpacker.feed(data) + for packet in unpacker: + yield packet async def put(self, data): - return await self.stream.send_all(json.dumps(data).encode() + b'\n') + return await self.stream.send_all( + msgpack.dumps(data, use_bin_type=True)) async def get(self): return await self._agen.asend(None) diff --git a/setup.py b/setup.py index 3f9f7416..3ba37c91 100755 --- a/setup.py +++ b/setup.py @@ -36,7 +36,7 @@ setup( }, install_requires=[ 'click', 'colorlog', 'trio', 'attrs', 'async_generator', - 'pygments', 'cython', 'asks', 'pandas', + 'pygments', 'cython', 'asks', 'pandas', 'msgpack', #'kivy', see requirement.txt; using a custom branch atm ], extras_require={