forked from goodboy/tractor
1
0
Fork 0

Allow for tuple keys with std `msgpack`

ensure_deregister
Tyler Goodlet 2020-08-03 18:41:21 -04:00
parent a5279f80a7
commit fbd68d2d91
1 changed files with 14 additions and 1 deletions

View File

@ -3,6 +3,8 @@ Inter-process comms abstractions
""" """
import typing import typing
from typing import Any, Tuple, Optional from typing import Any, Tuple, Optional
from functools import partial
import inspect
import msgpack import msgpack
import trio import trio
@ -11,6 +13,14 @@ from async_generator import asynccontextmanager
from .log import get_logger from .log import get_logger
log = get_logger('ipc') log = get_logger('ipc')
# :eyeroll:
try:
import msgpack_numpy
Unpacker = msgpack_numpy.Unpacker
except ImportError:
# just plain ``msgpack`` requires tweaking key settings
Unpacker = partial(msgpack.Unpacker, strict_map_key=False)
class MsgpackStream: class MsgpackStream:
"""A ``trio.SocketStream`` delivering ``msgpack`` formatted data. """A ``trio.SocketStream`` delivering ``msgpack`` formatted data.
@ -32,7 +42,10 @@ class MsgpackStream:
async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]:
"""Yield packets from the underlying stream. """Yield packets from the underlying stream.
""" """
unpacker = msgpack.Unpacker(raw=False, use_list=False) unpacker = Unpacker(
raw=False,
use_list=False,
)
while True: while True:
try: try:
data = await self.stream.receive_some(2**10) data = await self.stream.receive_some(2**10)