From 4079f02acf19596d8b30d5ab29a499904b7637a8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 30 May 2021 17:16:53 -0400 Subject: [PATCH 01/28] Cast to tuples for all uids explicitly --- tractor/_actor.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index f84a597..e3fa19f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -317,7 +317,7 @@ class Actor: # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = arbiter_addr + self._arb_addr = tuple(arbiter_addr) if arbiter_addr is not None else None # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -615,6 +615,7 @@ class Actor: # ``scope = Nursery.start()`` task_status.started(loop_cs) async for msg in chan: + if msg is None: # loop terminate sentinel log.debug( @@ -1169,10 +1170,10 @@ class Actor: parlance. """ await chan.send(self.uid) - uid: Tuple[str, str] = await chan.recv() + uid: Tuple[str, str] = tuple(await chan.recv()) - if not isinstance(uid, tuple): - raise ValueError(f"{uid} is not a valid uid?!") + # if not isinstance(uid, tuple): + # raise ValueError(f"{uid} is not a valid uid?!") chan.uid = uid log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") @@ -1239,8 +1240,9 @@ class Arbiter(Actor): async def register_actor( self, uid: Tuple[str, str], sockaddr: Tuple[str, int] ) -> None: + uid = tuple(uid) name, uuid = uid - self._registry[uid] = sockaddr + self._registry[uid] = tuple(sockaddr) # pop and signal all waiter events events = self._waiters.pop(name, ()) @@ -1250,4 +1252,4 @@ class Arbiter(Actor): event.set() async def unregister_actor(self, uid: Tuple[str, str]) -> None: - self._registry.pop(uid) + self._registry.pop(tuple(uid)) From dda0b22870a512e86d2bbfc0d82293c8e2d881d6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 30 May 2021 17:19:20 -0400 Subject: [PATCH 02/28] Try out `msgspec` in our msgpack stream channel Can only really use an encoder currently since there is no streaming api in `msgspec` as of currently. See jcrist/msgspec#27. Not sure if any encoding speedups are currently noticeable especially without any validation going on yet XD. First experiments toward #196 --- tractor/_ipc.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 08057e9..6051a15 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -4,10 +4,11 @@ Inter-process comms abstractions """ import platform import typing -from typing import Any, Tuple, Optional +from typing import Any, Tuple, Optional, Callable from functools import partial import msgpack +import msgspec import trio from async_generator import asynccontextmanager @@ -27,6 +28,9 @@ except ImportError: Unpacker = partial(msgpack.Unpacker, strict_map_key=False) +ms_decode = msgspec.Encoder().encode + + class MsgpackTCPStream: '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgpack-python``. @@ -35,15 +39,16 @@ class MsgpackTCPStream: def __init__( self, stream: trio.SocketStream, - ) -> None: self.stream = stream assert self.stream.socket + # should both be IP sockets lsockname = stream.socket.getsockname() assert isinstance(lsockname, tuple) self._laddr = lsockname[:2] + rsockname = stream.socket.getpeername() assert isinstance(rsockname, tuple) self._raddr = rsockname[:2] @@ -61,6 +66,7 @@ class MsgpackTCPStream: raw=False, use_list=False, ) + # decoder = msgspec.Decoder() #dict[str, Any]) while True: try: data = await self.stream.receive_some(2**10) @@ -95,6 +101,7 @@ class MsgpackTCPStream: f'transport {self} was already closed prior ro read' ) + # yield decoder.decode(data) unpacker.feed(data) for packet in unpacker: yield packet @@ -111,7 +118,9 @@ class MsgpackTCPStream: async def send(self, data: Any) -> None: async with self._send_lock: return await self.stream.send_all( - msgpack.dumps(data, use_bin_type=True)) + # msgpack.dumps(data, use_bin_type=True)) + ms_decode(data) + ) async def recv(self) -> Any: return await self._agen.asend(None) From 37717343114b4d91031379a308ba0e624096116c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 11 Jun 2021 16:20:35 -0400 Subject: [PATCH 03/28] Add `tricycle` and `msgspec` deps --- setup.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 36294d3..4e5c7f5 100755 --- a/setup.py +++ b/setup.py @@ -45,13 +45,16 @@ setup( 'trio_typing', # tooling + 'tricycle', + 'trio_typing', + 'colorlog', 'wrapt', 'pdbpp', # serialization 'msgpack', - + 'msgspec', ], tests_require=['pytest'], python_requires=">=3.8", From e39ee3a9cc1a64a527a9b2dddf6fcf3ab5425961 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 11 Jun 2021 16:21:26 -0400 Subject: [PATCH 04/28] Always cast arbiter addr to tuple --- tractor/_actor.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index e3fa19f..4f36b44 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -317,7 +317,7 @@ class Actor: # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = tuple(arbiter_addr) if arbiter_addr is not None else None + self._arb_addr = tuple(arbiter_addr) if arbiter_addr is not None else (None, None) # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -791,7 +791,15 @@ class Actor: _state._runtime_vars.update(rvs) for attr, value in parent_data.items(): - setattr(self, attr, value) + + if attr == '_arb_addr': + # XXX: msgspec doesn't support serializing tuples + # so just cash manually here since it's what our + # internals expect. + self._arb_addr = tuple(value) + + else: + setattr(self, attr, value) return chan, accept_addr From 95e35f3d60968400cfaf39516bf2c5cacbe10d60 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 11 Jun 2021 16:38:25 -0400 Subject: [PATCH 05/28] Add streaming decode support for `msgspec` Add a `tractor._ipc.MsgspecStream` type which can be swapped in for `msgspec` serialization transparently. A small msg-length-prefix framing is implemented as part of the type and we use `tricycle.BufferedReceieveStream` to handle buffering logic for the underlying transport. Notes: - had to force cast a few more list -> tuple spots due to no native `tuple`decode-by-default in `msgspec`: https://github.com/jcrist/msgspec/issues/30 - the framing can be understood by this protobuf walkthrough: https://eli.thegreenplace.net/2011/08/02/length-prefix-framing-for-protocol-buffers - `tricycle` becomes a new dependency --- tractor/_ipc.py | 98 +++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 78 insertions(+), 20 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 6051a15..5989a2e 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -3,10 +3,11 @@ Inter-process comms abstractions """ import platform +import struct import typing -from typing import Any, Tuple, Optional, Callable -from functools import partial +from typing import Any, Tuple, Optional +from tricycle import BufferedReceiveStream import msgpack import msgspec import trio @@ -18,20 +19,11 @@ log = get_logger(__name__) _is_windows = platform.system() == 'Windows' - -# :eyeroll: -try: - import msgpack_numpy - Unpacker = msgpack_numpy.Unpacker -except ImportError: - # just plain ``msgpack`` requires tweaking key settings - Unpacker = partial(msgpack.Unpacker, strict_map_key=False) - - +log = get_logger(__name__) ms_decode = msgspec.Encoder().encode -class MsgpackTCPStream: +class MsgpackStream: '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgpack-python``. @@ -39,6 +31,7 @@ class MsgpackTCPStream: def __init__( self, stream: trio.SocketStream, + ) -> None: self.stream = stream @@ -62,11 +55,10 @@ class MsgpackTCPStream: async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ - unpacker = Unpacker( + unpacker = msgpack.Unpacker( raw=False, use_list=False, ) - # decoder = msgspec.Decoder() #dict[str, Any]) while True: try: data = await self.stream.receive_some(2**10) @@ -101,7 +93,6 @@ class MsgpackTCPStream: f'transport {self} was already closed prior ro read' ) - # yield decoder.decode(data) unpacker.feed(data) for packet in unpacker: yield packet @@ -118,8 +109,7 @@ class MsgpackTCPStream: async def send(self, data: Any) -> None: async with self._send_lock: return await self.stream.send_all( - # msgpack.dumps(data, use_bin_type=True)) - ms_decode(data) + msgpack.dumps(data, use_bin_type=True) ) async def recv(self) -> Any: @@ -132,27 +122,95 @@ class MsgpackTCPStream: return self.stream.socket.fileno() != -1 +class MsgspecStream(MsgpackStream): + '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data + using ``msgspec``. + + ''' + ms_encode = msgspec.Encoder().encode + + def __init__( + self, + stream: trio.SocketStream, + prefix_size: int = 4, + + ) -> None: + super().__init__(stream) + self.recv_stream = BufferedReceiveStream(transport_stream=stream) + self.prefix_size = prefix_size + + async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: + """Yield packets from the underlying stream. + """ + decoder = msgspec.Decoder() # dict[str, Any]) + + while True: + try: + header = await self.recv_stream.receive_exactly(4) + if header is None: + continue + + if header == b'': + log.debug(f"Stream connection {self.raddr} was closed") + return + + size, = struct.unpack(" None: + async with self._send_lock: + + bytes_data = self.ms_encode(data) + + # supposedly the fastest says, + # https://stackoverflow.com/a/54027962 + size: int = struct.pack(" None: + self._recon_seq = on_reconnect self._autorecon = auto_reconnect - self.msgstream: Optional[MsgpackTCPStream] = MsgpackTCPStream( + + self.stream_serializer_type = stream_serializer_type + self.msgstream: Optional[type] = stream_serializer_type( stream) if stream else None + if self.msgstream and destaddr: raise ValueError( f"A stream was provided with local addr {self.laddr}" ) + self._destaddr = self.msgstream.raddr if self.msgstream else destaddr # set after handshake - always uid of far end self.uid: Optional[Tuple[str, str]] = None @@ -195,7 +253,7 @@ class Channel: *destaddr, **kwargs ) - self.msgstream = MsgpackTCPStream(stream) + self.msgstream = self.stream_serializer_type(stream) log.transport( f'Opened channel to peer {self.laddr} -> {self.raddr}' From 112117c1fc5ac82415a3072a8dda4ffb08022d94 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Jun 2021 18:49:51 -0400 Subject: [PATCH 06/28] Add our own "transport closed" signal This change some super old (and bad) code from the project's very early days. For some redic reason i must have thought masking `trio`'s internal stream / transport errors and a TCP EOF as `StopAsyncIteration` somehow a good idea. The reality is you probably want to know the difference between an unexpected transport error and a simple EOF lol. This begins to resolve that by adding our own special `TransportClosed` error to signal the "graceful" termination of a channel's underlying transport. Oh, and this builds on the `msgspec` integration which helped shed light on the core issues here B) --- tractor/_ipc.py | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 5989a2e..6c47205 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -23,7 +23,7 @@ log = get_logger(__name__) ms_decode = msgspec.Encoder().encode -class MsgpackStream: +class MsgpackTCPStream: '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgpack-python``. @@ -122,7 +122,7 @@ class MsgpackStream: return self.stream.socket.fileno() != -1 -class MsgspecStream(MsgpackStream): +class MsgspecTCPStream(MsgpackTCPStream): '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgspec``. @@ -147,24 +147,22 @@ class MsgspecStream(MsgpackStream): while True: try: header = await self.recv_stream.receive_exactly(4) - if header is None: - continue - if header == b'': - log.debug(f"Stream connection {self.raddr} was closed") - return + except (ValueError): + raise TransportClosed( + f'transport {self} was already closed prior ro read' + ) - size, = struct.unpack(" None: @@ -236,7 +234,6 @@ class Channel: return self.msgstream.raddr if self.msgstream else None async def connect( - self, destaddr: Tuple[Any, ...] = None, **kwargs From ecd8c4bc7e3433c9cd7d2244e2a16fe63c8abc74 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 27 Jun 2021 00:47:49 -0400 Subject: [PATCH 07/28] Drop happy eyeballs inf delay --- tractor/_ipc.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 6c47205..ef1d422 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -46,9 +46,8 @@ class MsgpackTCPStream: assert isinstance(rsockname, tuple) self._raddr = rsockname[:2] - # start and seed first entry to read loop + # start first entry to read loop self._agen = self._iter_packets() - # self._agen.asend(None) is None self._send_lock = trio.StrictFIFOLock() From 96b3f94c724b6f15b7cccc291aedc5dd1691e3e9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 1 Jul 2021 07:44:03 -0400 Subject: [PATCH 08/28] Accept transport closed error during handshake and msg loop --- tractor/_actor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 4f36b44..bc723e6 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -287,7 +287,7 @@ class Actor: enable_modules: List[str] = [], uid: str = None, loglevel: str = None, - arbiter_addr: Optional[Tuple[str, int]] = None, + arbiter_addr: Optional[Tuple[str, int]] = (None, None), spawn_method: Optional[str] = None ) -> None: """This constructor is called in the parent actor **before** the spawning @@ -317,7 +317,7 @@ class Actor: # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = tuple(arbiter_addr) if arbiter_addr is not None else (None, None) + self._arb_addr = tuple(arbiter_addr) # marked by the process spawning backend at startup # will be None for the parent most process started manually From b64396f7087b2be7151958e1c4b18bf6fde7af14 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 1 Jul 2021 09:41:23 -0400 Subject: [PATCH 09/28] Pkg `msgpec` as optional dep, load transport type if importable --- setup.py | 9 ++++++++- tractor/_ipc.py | 26 ++++++++++++++++---------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/setup.py b/setup.py index 4e5c7f5..08c84ef 100755 --- a/setup.py +++ b/setup.py @@ -48,14 +48,21 @@ setup( 'tricycle', 'trio_typing', + # tooling 'colorlog', 'wrapt', 'pdbpp', # serialization 'msgpack', - 'msgspec', + ], + extras_require={ + + # serialization + 'msgspec': ['msgspec; python_version >= 3.9'], + + }, tests_require=['pytest'], python_requires=">=3.8", keywords=[ diff --git a/tractor/_ipc.py b/tractor/_ipc.py index ef1d422..6af1fc6 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -9,7 +9,6 @@ from typing import Any, Tuple, Optional from tricycle import BufferedReceiveStream import msgpack -import msgspec import trio from async_generator import asynccontextmanager @@ -20,7 +19,6 @@ log = get_logger(__name__) _is_windows = platform.system() == 'Windows' log = get_logger(__name__) -ms_decode = msgspec.Encoder().encode class MsgpackTCPStream: @@ -126,8 +124,6 @@ class MsgspecTCPStream(MsgpackTCPStream): using ``msgspec``. ''' - ms_encode = msgspec.Encoder().encode - def __init__( self, stream: trio.SocketStream, @@ -138,10 +134,15 @@ class MsgspecTCPStream(MsgpackTCPStream): self.recv_stream = BufferedReceiveStream(transport_stream=stream) self.prefix_size = prefix_size + import msgspec + + # TODO: struct aware messaging coders + self.encode = msgspec.Encoder().encode + self.decode = msgspec.Decoder().decode # dict[str, Any]) + async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: """Yield packets from the underlying stream. """ - decoder = msgspec.Decoder() # dict[str, Any]) while True: try: @@ -164,12 +165,12 @@ class MsgspecTCPStream(MsgpackTCPStream): msg_bytes = await self.recv_stream.receive_exactly(size) log.trace(f"received {msg_bytes}") # type: ignore - yield decoder.decode(msg_bytes) + yield self.decode(msg_bytes) async def send(self, data: Any) -> None: async with self._send_lock: - bytes_data = self.ms_encode(data) + bytes_data = self.encode(data) # supposedly the fastest says, # https://stackoverflow.com/a/54027962 @@ -191,14 +192,19 @@ class Channel: auto_reconnect: bool = False, stream: trio.SocketStream = None, # expected to be active - # stream_serializer_type: type = MsgspecTCPStream, - stream_serializer_type: type = MsgpackTCPStream, - ) -> None: self._recon_seq = on_reconnect self._autorecon = auto_reconnect + try: + # if installed load the msgspec transport since it's faster + import msgspec # noqa + stream_serializer_type: type = MsgspecTCPStream + + except ImportError: + stream_serializer_type: type = MsgpackTCPStream + self.stream_serializer_type = stream_serializer_type self.msgstream: Optional[type] = stream_serializer_type( stream) if stream else None From 8375002b409e39887e9edbc9bdc6744b59ac9a9b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 1 Jul 2021 09:54:59 -0400 Subject: [PATCH 10/28] Fix py version classifier --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 08c84ef..aeb1730 100755 --- a/setup.py +++ b/setup.py @@ -60,7 +60,7 @@ setup( extras_require={ # serialization - 'msgspec': ['msgspec; python_version >= 3.9'], + 'msgspec': ["msgspec; python_version >= '3.9'"], }, tests_require=['pytest'], From aa080543d0e739950afb7ba350ffabf064f96e01 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 1 Jul 2021 14:52:52 -0400 Subject: [PATCH 11/28] Mypy fixes to enforce uid tuple --- tractor/_actor.py | 23 ++++++++++++++--------- tractor/_ipc.py | 18 ++++++++++-------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index bc723e6..ee7a1d8 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -287,7 +287,7 @@ class Actor: enable_modules: List[str] = [], uid: str = None, loglevel: str = None, - arbiter_addr: Optional[Tuple[str, int]] = (None, None), + arbiter_addr: Optional[Tuple[str, int]] = None, spawn_method: Optional[str] = None ) -> None: """This constructor is called in the parent actor **before** the spawning @@ -317,7 +317,8 @@ class Actor: # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = tuple(arbiter_addr) + + self._arb_addr = arbiter_addr or (None, None) # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -796,7 +797,8 @@ class Actor: # XXX: msgspec doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - self._arb_addr = tuple(value) + address: Tuple[str, int] = value + self._arb_addr = value else: setattr(self, attr, value) @@ -1171,6 +1173,7 @@ class Actor: async def _do_handshake( self, chan: Channel + ) -> Tuple[str, str]: """Exchange (name, UUIDs) identifiers as the first communication step. @@ -1178,10 +1181,10 @@ class Actor: parlance. """ await chan.send(self.uid) - uid: Tuple[str, str] = tuple(await chan.recv()) + uid: Tuple[str, str] = await chan.recv() - # if not isinstance(uid, tuple): - # raise ValueError(f"{uid} is not a valid uid?!") + if not isinstance(uid, tuple): + raise ValueError(f"{uid} is not a valid uid?!") chan.uid = uid log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") @@ -1246,10 +1249,12 @@ class Arbiter(Actor): return sockaddrs async def register_actor( - self, uid: Tuple[str, str], sockaddr: Tuple[str, int] + self, + uid: Tuple[str, str], + sockaddr: Tuple[str, str] + ) -> None: - uid = tuple(uid) - name, uuid = uid + name, uuid = tuple(uid) self._registry[uid] = tuple(sockaddr) # pop and signal all waiter events diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 6af1fc6..0ed8b8e 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -160,7 +160,7 @@ class MsgspecTCPStream(MsgpackTCPStream): size, = struct.unpack(" None: async with self._send_lock: - bytes_data = self.encode(data) + bytes_data: bytes = self.encode(data) # supposedly the fastest says, # https://stackoverflow.com/a/54027962 - size: int = struct.pack(" Date: Fri, 2 Jul 2021 11:18:05 -0400 Subject: [PATCH 12/28] Add msgspec installs, drop py3.7 --- .github/workflows/ci.yml | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 447b4f3..b90e070 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,6 +3,7 @@ name: CI on: push jobs: + mypy: name: 'MyPy' runs-on: ubuntu-latest @@ -23,23 +24,59 @@ jobs: run: mypy tractor/ --ignore-missing-imports testing: + name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }}' timeout-minutes: 9 runs-on: ${{ matrix.os }} + strategy: fail-fast: false matrix: os: [ubuntu-latest, windows-latest] python: ['3.8', '3.9'] spawn_backend: ['trio', 'mp'] + steps: + - name: Checkout uses: actions/checkout@v2 + - name: Setup python uses: actions/setup-python@v2 with: python-version: '${{ matrix.python }}' + - name: Install dependencies run: pip install -U . -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager + + - name: Run tests + run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs + + testing-msgspec: + # runs py3.9 jobs on all OS's but with optional `msgspec` dep installed + name: '${{ matrix.os }} Python ${{ matrix.python }} - ${{ matrix.spawn_backend }} - msgspec' + timeout-minutes: 10 + runs-on: ${{ matrix.os }} + + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest] + python: ['3.9'] + spawn_backend: ['trio', 'mp'] + + steps: + + - name: Checkout + uses: actions/checkout@v2 + + - name: Setup python + uses: actions/setup-python@v2 + with: + python-version: '${{ matrix.python }}' + + - name: Install dependencies + run: pip install -U .[msgspec] -r requirements-test.txt -r requirements-docs.txt --upgrade-strategy eager + - name: Run tests run: pytest tests/ --spawn-backend=${{ matrix.spawn_backend }} -rs From 3facfb6d4c2f82183bd480aa1d2490a48f8ddfea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Sep 2021 16:29:16 -0400 Subject: [PATCH 13/28] Fix log levels --- tractor/_ipc.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 0ed8b8e..f3b08f6 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -160,11 +160,11 @@ class MsgspecTCPStream(MsgpackTCPStream): size, = struct.unpack(" None: From 562419c9074a2f0a3fa5429c840744b7546eac57 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Sep 2021 16:44:35 -0400 Subject: [PATCH 14/28] Convert actor UIDs to hashable tuples `msgspec` sends python lists over the wire (https://github.com/jcrist/msgspec/issues/30) which is fine and dandy but we use them as lookup keys so we need to be sure we tuple-cast first. --- tractor/_actor.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index ee7a1d8..cddfb1f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -318,7 +318,7 @@ class Actor: # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = arbiter_addr or (None, None) + self._arb_addr = arbiter_addr # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -797,8 +797,8 @@ class Actor: # XXX: msgspec doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - address: Tuple[str, int] = value - self._arb_addr = value + address: Tuple[str, int] = tuple(value) + self._arb_addr = address else: setattr(self, attr, value) @@ -1181,7 +1181,7 @@ class Actor: parlance. """ await chan.send(self.uid) - uid: Tuple[str, str] = await chan.recv() + uid: Tuple[str, str] = tuple(await chan.recv()) if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") @@ -1254,7 +1254,7 @@ class Arbiter(Actor): sockaddr: Tuple[str, str] ) -> None: - name, uuid = tuple(uid) + name, uuid = uid = tuple(uid) self._registry[uid] = tuple(sockaddr) # pop and signal all waiter events From 1ab495a64dbe98f3c246472ebcae68fcb590dbe1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Sep 2021 18:48:09 -0400 Subject: [PATCH 15/28] Map broken stream errs to transport closed; msgspec seems to be racy --- tractor/_actor.py | 5 ++++- tractor/_ipc.py | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index cddfb1f..9c89f8e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -430,7 +430,10 @@ class Actor: uid = await self._do_handshake(chan) except ( + # we need this for ``msgspec`` for some reason? + # for now, it's been put in the stream backend. # trio.BrokenResourceError, + # trio.ClosedResourceError, TransportClosed, ): @@ -797,7 +800,7 @@ class Actor: # XXX: msgspec doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - address: Tuple[str, int] = tuple(value) + address: Tuple[str, int] = tuple(value) if value else value self._arb_addr = address else: diff --git a/tractor/_ipc.py b/tractor/_ipc.py index f3b08f6..c516189 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -148,7 +148,14 @@ class MsgspecTCPStream(MsgpackTCPStream): try: header = await self.recv_stream.receive_exactly(4) - except (ValueError): + except ( + ValueError, + + # not sure entirely why we need this but without it we + # seem to be getting racy failures here on + # arbiter/registry name subs.. + trio.BrokenResourceError, + ): raise TransportClosed( f'transport {self} was already closed prior ro read' ) From 486e9839644092e9b886dc00e2b61dc2138309e4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Sep 2021 11:41:34 -0400 Subject: [PATCH 16/28] Cast `defaultdict` to `dict` for registry get --- tractor/_actor.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 9c89f8e..deeda73 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -800,8 +800,9 @@ class Actor: # XXX: msgspec doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - address: Tuple[str, int] = tuple(value) if value else value - self._arb_addr = address + self._arb_addr: Tuple[str, int] = ( + tuple(value) if value else value + ) else: setattr(self, attr, value) @@ -1206,8 +1207,10 @@ class Arbiter(Actor): is_arbiter = True def __init__(self, *args, **kwargs): + self._registry = defaultdict(list) self._waiters = {} + super().__init__(*args, **kwargs) async def find_actor(self, name: str) -> Optional[Tuple[str, int]]: @@ -1220,22 +1223,25 @@ class Arbiter(Actor): async def get_registry( self ) -> Dict[str, Tuple[str, str]]: - """Return current name registry. - """ + '''Return current name registry. + + This method is async to allow for cross-actor invocation. + ''' # NOTE: requires ``strict_map_key=False`` to the msgpack # unpacker since we have tuples as keys (not this makes the # arbiter suscetible to hashdos): # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 - return self._registry + return dict(self._registry) async def wait_for_actor( - self, name: str + self, + name: str, ) -> List[Tuple[str, int]]: - """Wait for a particular actor to register. + '''Wait for a particular actor to register. This is a blocking call if no actor by the provided name is currently registered. - """ + ''' sockaddrs = [] for (aname, _), sockaddr in self._registry.items(): @@ -1267,5 +1273,8 @@ class Arbiter(Actor): if isinstance(event, trio.Event): event.set() - async def unregister_actor(self, uid: Tuple[str, str]) -> None: + async def unregister_actor( + self, + uid: Tuple[str, str] + ) -> None: self._registry.pop(tuple(uid)) From 93726f139260c4fa4add047e62a2894882c2ec08 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Sep 2021 11:42:49 -0400 Subject: [PATCH 17/28] Call registry getter method in test --- tests/test_discovery.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 383608e..1fa3f5f 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -136,7 +136,7 @@ async def spawn_and_check_registry( if actor.is_arbiter: async def get_reg(): - return actor._registry + return await actor.get_registry() extra = 1 # arbiter is local root actor else: @@ -187,13 +187,12 @@ async def spawn_and_check_registry( await cancel(use_signal) finally: - with trio.CancelScope(shield=True): - await trio.sleep(0.5) + await trio.sleep(0.5) - # all subactors should have de-registered - registry = await get_reg() - assert len(registry) == extra - assert actor.uid in registry + # all subactors should have de-registered + registry = await get_reg() + assert len(registry) == extra + assert actor.uid in registry @pytest.mark.parametrize('use_signal', [False, True]) @@ -277,7 +276,9 @@ async def close_chans_before_nursery( # TODO: compact this back as was in last commit once # 3.9+, see https://github.com/goodboy/tractor/issues/207 - async with portal1.open_stream_from(stream_forever) as agen1: + async with portal1.open_stream_from( + stream_forever + ) as agen1: async with portal2.open_stream_from( stream_forever ) as agen2: @@ -293,8 +294,9 @@ async def close_chans_before_nursery( # reliably triggered by an external SIGINT. # tractor.current_actor()._root_nursery.cancel_scope.cancel() - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block + # XXX: THIS IS THE KEY THING that + # happens **before** exiting the + # actor nursery block # also kill off channels cuz why not await agen1.aclose() From dbc4e3dd46615e3cca32b9cabe322301c62383eb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Sep 2021 11:53:18 -0400 Subject: [PATCH 18/28] Pin to latest and greatest `msgspec` --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index aeb1730..d233ecd 100755 --- a/setup.py +++ b/setup.py @@ -60,7 +60,7 @@ setup( extras_require={ # serialization - 'msgspec': ["msgspec; python_version >= '3.9'"], + 'msgspec': ["msgspec >= 0.3.2'; python_version >= '3.9'"], }, tests_require=['pytest'], From 19d68852438753db6dc4f621faac93058dd49cdb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Sep 2021 12:07:09 -0400 Subject: [PATCH 19/28] Ensure tuple for passed in arbiter addr --- tests/test_spawning.py | 7 ++++++- tractor/_root.py | 6 +++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/test_spawning.py b/tests/test_spawning.py index f9eeb16..b1110fe 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -1,6 +1,7 @@ """ Spawning basics """ +from typing import Dict, Tuple import pytest import trio @@ -11,7 +12,11 @@ from conftest import tractor_test data_to_pass_down = {'doggy': 10, 'kitty': 4} -async def spawn(is_arbiter, data, arb_addr): +async def spawn( + is_arbiter: bool, + data: Dict, + arb_addr: Tuple[str, int], +): namespaces = [__name__] await trio.sleep(0.1) diff --git a/tractor/_root.py b/tractor/_root.py index 9eba909..bbf337a 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -95,10 +95,10 @@ async def open_root_actor( "Debug mode is only supported for the `trio` backend!" ) - arbiter_addr = (host, port) = arbiter_addr or ( + arbiter_addr = (host, port) = tuple(arbiter_addr or ( _default_arbiter_host, - _default_arbiter_port - ) + _default_arbiter_port, + )) loglevel = loglevel or log.get_loglevel() if loglevel is not None: From 076f37c589291b28fdd802843876859d6bcbda57 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 10:43:33 -0400 Subject: [PATCH 20/28] Attempt to gracefully handle channel breakage? --- tractor/_ipc.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index c516189..6296d94 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -141,8 +141,11 @@ class MsgspecTCPStream(MsgpackTCPStream): self.decode = msgspec.Decoder().decode # dict[str, Any]) async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: - """Yield packets from the underlying stream. - """ + '''Yield packets from the underlying stream. + + ''' + import msgspec # noqa + last_decode_failed: bool = False while True: try: @@ -172,7 +175,18 @@ class MsgspecTCPStream(MsgpackTCPStream): msg_bytes = await self.recv_stream.receive_exactly(size) log.transport(f"received {msg_bytes}") # type: ignore - yield self.decode(msg_bytes) + try: + assert not last_decode_failed + yield self.decode(msg_bytes) + except ( + msgspec.DecodingError, + UnicodeDecodeError, + ): + # ignore decoding errors for now and assume they have to + # do with a channel drop - hope that receiving from the + # channel will raise an expected error and bubble up. + log.error(f'`msgspec` failed to decode!?\n{msg_bytes}') + last_decode_failed = True async def send(self, data: Any) -> None: async with self._send_lock: From 1382ad653d163e5b0b4d91adf2661315ea6d4689 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 20:24:02 -0400 Subject: [PATCH 21/28] Ugh, appease mypy yet again --- tractor/_actor.py | 33 +++++++++++++++++++-------------- tractor/_ipc.py | 2 +- tractor/_root.py | 10 +++++----- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index deeda73..43ee2f3 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -318,7 +318,7 @@ class Actor: # @dataclass once we get py3.7 self.loglevel = loglevel - self._arb_addr = arbiter_addr + self._arb_addr = (str(arbiter_addr[0]), int(arbiter_addr[1])) if arbiter_addr else None # marked by the process spawning backend at startup # will be None for the parent most process started manually @@ -780,6 +780,7 @@ class Actor: if self._spawn_method == "trio": # Receive runtime state from our parent + parent_data: dict[str, Any] parent_data = await chan.recv() log.debug( "Received state from parent:\n" @@ -797,12 +798,11 @@ class Actor: for attr, value in parent_data.items(): if attr == '_arb_addr': - # XXX: msgspec doesn't support serializing tuples + # XXX: ``msgspec`` doesn't support serializing tuples # so just cash manually here since it's what our # internals expect. - self._arb_addr: Tuple[str, int] = ( - tuple(value) if value else value - ) + value = tuple(value) if value else None + self._arb_addr = value else: setattr(self, attr, value) @@ -1185,12 +1185,13 @@ class Actor: parlance. """ await chan.send(self.uid) - uid: Tuple[str, str] = tuple(await chan.recv()) + value = await chan.recv() + uid: Tuple[str, str] = (str(value[0]), str(value[1])) if not isinstance(uid, tuple): raise ValueError(f"{uid} is not a valid uid?!") - chan.uid = uid + chan.uid = str(uid[0]), str(uid[1]) log.runtime(f"Handshake with actor {uid}@{chan.raddr} complete") return uid @@ -1208,7 +1209,10 @@ class Arbiter(Actor): def __init__(self, *args, **kwargs): - self._registry = defaultdict(list) + self._registry: Dict[ + Tuple[str, str], + Tuple[str, int], + ] = {} self._waiters = {} super().__init__(*args, **kwargs) @@ -1222,7 +1226,7 @@ class Arbiter(Actor): async def get_registry( self - ) -> Dict[str, Tuple[str, str]]: + ) -> Dict[Tuple[str, str], Tuple[str, int]]: '''Return current name registry. This method is async to allow for cross-actor invocation. @@ -1231,7 +1235,7 @@ class Arbiter(Actor): # unpacker since we have tuples as keys (not this makes the # arbiter suscetible to hashdos): # https://github.com/msgpack/msgpack-python#major-breaking-changes-in-msgpack-10 - return dict(self._registry) + return self._registry async def wait_for_actor( self, @@ -1260,11 +1264,11 @@ class Arbiter(Actor): async def register_actor( self, uid: Tuple[str, str], - sockaddr: Tuple[str, str] + sockaddr: Tuple[str, int] ) -> None: - name, uuid = uid = tuple(uid) - self._registry[uid] = tuple(sockaddr) + uid = name, uuid = (str(uid[0]), str(uid[1])) + self._registry[uid] = (str(sockaddr[0]), int(sockaddr[1])) # pop and signal all waiter events events = self._waiters.pop(name, ()) @@ -1277,4 +1281,5 @@ class Arbiter(Actor): self, uid: Tuple[str, str] ) -> None: - self._registry.pop(tuple(uid)) + uid = (str(uid[0]), str(uid[1])) + self._registry.pop(uid) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 6296d94..e420509 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -185,7 +185,7 @@ class MsgspecTCPStream(MsgpackTCPStream): # ignore decoding errors for now and assume they have to # do with a channel drop - hope that receiving from the # channel will raise an expected error and bubble up. - log.error(f'`msgspec` failed to decode!?\n{msg_bytes}') + log.error(f'`msgspec` failed to decode!?') last_decode_failed = True async def send(self, data: Any) -> None: diff --git a/tractor/_root.py b/tractor/_root.py index bbf337a..b153755 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -21,8 +21,8 @@ from ._exceptions import is_multi_cancelled # set at startup and after forks -_default_arbiter_host = '127.0.0.1' -_default_arbiter_port = 1616 +_default_arbiter_host: str = '127.0.0.1' +_default_arbiter_port: int = 1616 logger = log.get_logger('tractor') @@ -32,7 +32,7 @@ logger = log.get_logger('tractor') async def open_root_actor( # defaults are above - arbiter_addr: Tuple[str, int] = ( + arbiter_addr: Optional[Tuple[str, int]] = ( _default_arbiter_host, _default_arbiter_port, ), @@ -95,10 +95,10 @@ async def open_root_actor( "Debug mode is only supported for the `trio` backend!" ) - arbiter_addr = (host, port) = tuple(arbiter_addr or ( + arbiter_addr = (host, port) = arbiter_addr or ( _default_arbiter_host, _default_arbiter_port, - )) + ) loglevel = loglevel or log.get_loglevel() if loglevel is not None: From 5b23a3bc35d241e8952c688a43c073bb0bd5681f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 20:25:40 -0400 Subject: [PATCH 22/28] Don't expect list value from registry --- tests/test_discovery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 1fa3f5f..2b2996f 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -42,7 +42,7 @@ async def test_reg_then_unreg(arb_addr): await trio.sleep(0.1) assert uid not in aportal.actor._registry - sockaddrs = actor._registry[uid] + sockaddrs = actor._registry.get(uid) assert not sockaddrs From 07e8821cd5d3b71413c4e3abb46ed37e1c714dfd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 21:07:33 -0400 Subject: [PATCH 23/28] Add a stream type factory --- tractor/_ipc.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index e420509..6f3fffa 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -5,7 +5,7 @@ Inter-process comms abstractions import platform import struct import typing -from typing import Any, Tuple, Optional +from typing import Any, Tuple, Optional, Type from tricycle import BufferedReceiveStream import msgpack @@ -55,6 +55,7 @@ class MsgpackTCPStream: unpacker = msgpack.Unpacker( raw=False, use_list=False, + strict_map_key=False ) while True: try: @@ -130,12 +131,12 @@ class MsgspecTCPStream(MsgpackTCPStream): prefix_size: int = 4, ) -> None: + import msgspec + super().__init__(stream) self.recv_stream = BufferedReceiveStream(transport_stream=stream) self.prefix_size = prefix_size - import msgspec - # TODO: struct aware messaging coders self.encode = msgspec.Encoder().encode self.decode = msgspec.Decoder().decode # dict[str, Any]) @@ -185,7 +186,7 @@ class MsgspecTCPStream(MsgpackTCPStream): # ignore decoding errors for now and assume they have to # do with a channel drop - hope that receiving from the # channel will raise an expected error and bubble up. - log.error(f'`msgspec` failed to decode!?') + log.error('`msgspec` failed to decode!?') last_decode_failed = True async def send(self, data: Any) -> None: @@ -200,11 +201,21 @@ class MsgspecTCPStream(MsgpackTCPStream): return await self.stream.send_all(size + bytes_data) +def get_serializer_stream_type( + name: str, +) -> Type: + return { + 'msgpack': MsgpackTCPStream, + 'msgspec': MsgspecTCPStream, + }[name] + + class Channel: - """An inter-process channel for communication between (remote) actors. + '''An inter-process channel for communication between (remote) actors. Currently the only supported transport is a ``trio.SocketStream``. - """ + + ''' def __init__( self, @@ -218,17 +229,17 @@ class Channel: self._recon_seq = on_reconnect self._autorecon = auto_reconnect - stream_serializer_type = MsgpackTCPStream - + # TODO: maybe expose this through the nursery api? try: # if installed load the msgspec transport since it's faster import msgspec # noqa - stream_serializer_type = MsgspecTCPStream + serializer = 'msgspec' except ImportError: - pass + serializer = 'msgpack' - self.stream_serializer_type = stream_serializer_type - self.msgstream = stream_serializer_type(stream) if stream else None + self.stream_serializer_type = get_serializer_stream_type(serializer) + self.msgstream = self.stream_serializer_type( + stream) if stream else None if self.msgstream and destaddr: raise ValueError( From f7fc464ce8cc50f3264b35ffc14c4b25b608ad2b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 21:27:05 -0400 Subject: [PATCH 24/28] Add `msgspec` mentions to readme --- docs/README.rst | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/docs/README.rst b/docs/README.rst index 1199d3d..15abc89 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -24,8 +24,9 @@ Features - Builtin IPC streaming APIs with task fan-out broadcasting - A (first ever?) "native" multi-core debugger UX for Python using `pdb++`_ - Support for a swappable, OS specific, process spawning layer -- A modular transport stack, allowing for custom serialization, - communications protocols, and environment specific IPC primitives +- A modular transport stack, allowing for custom serialization (eg. + `msgspec`_), communications protocols, and environment specific IPC + primitives - `structured concurrency`_ from the ground up @@ -322,6 +323,12 @@ From PyPi:: pip install tractor +To try out the (optionally) faster `msgspec`_ codec instead of the +default ``msgpack`` lib:: + + pip install tractor[msgspec] + + From git:: pip install git+git://github.com/goodboy/tractor.git @@ -394,7 +401,8 @@ Help us push toward the future. - (Soon to land) ``asyncio`` support allowing for "infected" actors where `trio` drives the `asyncio` scheduler via the astounding "`guest mode`_" -- Typed messaging protocols (ex. via ``msgspec``) +- Typed messaging protocols (ex. via ``msgspec``, see `#36 + `_) - Erlang-style supervisors via composed context managers @@ -427,6 +435,7 @@ channel`_! .. _unrequirements: https://en.wikipedia.org/wiki/Actor_model#Direct_communication_and_asynchrony .. _async generators: https://www.python.org/dev/peps/pep-0525/ .. _trio-parallel: https://github.com/richardsheridan/trio-parallel +.. _msgspec: https://jcristharif.com/msgspec/ .. |gh_actions| image:: https://img.shields.io/endpoint.svg?url=https%3A%2F%2Factions-badge.atrox.dev%2Fgoodboy%2Ftractor%2Fbadge&style=popout-square From ef75883b62e76740f54616a6588b5fc89148927d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 21:37:57 -0400 Subject: [PATCH 25/28] Add fragment --- newsfragments/214.feature.rst | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 newsfragments/214.feature.rst diff --git a/newsfragments/214.feature.rst b/newsfragments/214.feature.rst new file mode 100644 index 0000000..1247536 --- /dev/null +++ b/newsfragments/214.feature.rst @@ -0,0 +1,9 @@ +Add optional `msgspec `_ support over +TCP streams as an alernative, faster MessagePack codec. + +This get's us moving toward typed messaging/IPC protocols. Further, +``msgspec`` structs may be a valid tool to start for formalizing our "SC +dialog un-protocol" messages as described in `#36 +`_`. + + From 135459ca25c70d902e69b3c904ddaf9c81c55f6e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 30 Sep 2021 11:32:28 -0400 Subject: [PATCH 26/28] Tolerate one decode error; may have been a registry ping --- tractor/_ipc.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 6f3fffa..b8d0437 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -177,17 +177,19 @@ class MsgspecTCPStream(MsgpackTCPStream): log.transport(f"received {msg_bytes}") # type: ignore try: - assert not last_decode_failed yield self.decode(msg_bytes) except ( msgspec.DecodingError, UnicodeDecodeError, ): - # ignore decoding errors for now and assume they have to - # do with a channel drop - hope that receiving from the - # channel will raise an expected error and bubble up. - log.error('`msgspec` failed to decode!?') - last_decode_failed = True + if not last_decode_failed: + # ignore decoding errors for now and assume they have to + # do with a channel drop - hope that receiving from the + # channel will raise an expected error and bubble up. + log.error('`msgspec` failed to decode!?') + last_decode_failed = True + else: + raise async def send(self, data: Any) -> None: async with self._send_lock: From c6dc96b08cb38078b2eea4504f0e7b0e6b789abe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 6 Oct 2021 14:52:12 -0400 Subject: [PATCH 27/28] Add "message transport" structured sub-typing In an effort to have some kind of more formal interface around the transport layer, add a `MsgTransport` protocol type and use with the channel composition of message streams. Start a little "key map" of `(, )` to `MsgTransport` types which can be dynamically loaded. Add a `Channel.from_stream()` constructor thus cleaning up the mangled logic that was in the constructor based on inputs. Drop all the "auto reconnect" channel logic for now since nothing is using it (internally) and it's likely it will need rework once we bring in a protocol besides TCP. --- tractor/_ipc.py | 248 +++++++++++++++++++++++++++++++----------------- 1 file changed, 159 insertions(+), 89 deletions(-) diff --git a/tractor/_ipc.py b/tractor/_ipc.py index b8d0437..28bef97 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -2,10 +2,14 @@ Inter-process comms abstractions """ +from __future__ import annotations import platform import struct import typing -from typing import Any, Tuple, Optional, Type +from typing import ( + Any, Tuple, Optional, + Type, Protocol, TypeVar +) from tricycle import BufferedReceiveStream import msgpack @@ -21,6 +25,53 @@ _is_windows = platform.system() == 'Windows' log = get_logger(__name__) +def get_stream_addrs(stream: trio.SocketStream) -> Tuple: + # should both be IP sockets + lsockname = stream.socket.getsockname() + rsockname = stream.socket.getpeername() + return ( + tuple(lsockname[:2]), + tuple(rsockname[:2]), + ) + + +MsgType = TypeVar("MsgType") + +# TODO: consider using a generic def and indexing with our eventual +# msg definition/types? +# - https://docs.python.org/3/library/typing.html#typing.Protocol +# - https://jcristharif.com/msgspec/usage.html#structs + + +class MsgTransport(Protocol[MsgType]): + + stream: trio.SocketStream + + def __init__(self, stream: trio.SocketStream) -> None: + ... + + # XXX: should this instead be called `.sendall()`? + async def send(self, msg: MsgType) -> None: + ... + + async def recv(self) -> MsgType: + ... + + def __aiter__(self) -> MsgType: + ... + + def connected(self) -> bool: + ... + + @property + def laddr(self) -> Tuple[str, int]: + ... + + @property + def raddr(self) -> Tuple[str, int]: + ... + + class MsgpackTCPStream: '''A ``trio.SocketStream`` delivering ``msgpack`` formatted data using ``msgpack-python``. @@ -36,17 +87,10 @@ class MsgpackTCPStream: assert self.stream.socket # should both be IP sockets - lsockname = stream.socket.getsockname() - assert isinstance(lsockname, tuple) - self._laddr = lsockname[:2] + self._laddr, self._raddr = get_stream_addrs(stream) - rsockname = stream.socket.getpeername() - assert isinstance(rsockname, tuple) - self._raddr = rsockname[:2] - - # start first entry to read loop + # create read loop instance self._agen = self._iter_packets() - self._send_lock = trio.StrictFIFOLock() async def _iter_packets(self) -> typing.AsyncGenerator[dict, None]: @@ -103,11 +147,10 @@ class MsgpackTCPStream: def raddr(self) -> Tuple[Any, ...]: return self._raddr - # XXX: should this instead be called `.sendall()`? - async def send(self, data: Any) -> None: + async def send(self, msg: Any) -> None: async with self._send_lock: return await self.stream.send_all( - msgpack.dumps(data, use_bin_type=True) + msgpack.dumps(msg, use_bin_type=True) ) async def recv(self) -> Any: @@ -191,10 +234,10 @@ class MsgspecTCPStream(MsgpackTCPStream): else: raise - async def send(self, data: Any) -> None: + async def send(self, msg: Any) -> None: async with self._send_lock: - bytes_data: bytes = self.encode(data) + bytes_data: bytes = self.encode(msg) # supposedly the fastest says, # https://stackoverflow.com/a/54027962 @@ -203,13 +246,16 @@ class MsgspecTCPStream(MsgpackTCPStream): return await self.stream.send_all(size + bytes_data) -def get_serializer_stream_type( - name: str, -) -> Type: +def get_msg_transport( + + key: Tuple[str, str], + +) -> Type[MsgTransport]: + return { - 'msgpack': MsgpackTCPStream, - 'msgspec': MsgspecTCPStream, - }[name] + ('msgpack', 'tcp'): MsgpackTCPStream, + ('msgspec', 'tcp'): MsgspecTCPStream, + }[key] class Channel: @@ -221,34 +267,34 @@ class Channel: def __init__( self, - destaddr: Optional[Tuple[str, int]] = None, - on_reconnect: typing.Callable[..., typing.Awaitable] = None, - auto_reconnect: bool = False, - stream: trio.SocketStream = None, # expected to be active + destaddr: Optional[Tuple[str, int]], + + msg_transport_type_key: Tuple[str, str] = ('msgpack', 'tcp'), + + # TODO: optional reconnection support? + # auto_reconnect: bool = False, + # on_reconnect: typing.Callable[..., typing.Awaitable] = None, ) -> None: - self._recon_seq = on_reconnect - self._autorecon = auto_reconnect + # self._recon_seq = on_reconnect + # self._autorecon = auto_reconnect # TODO: maybe expose this through the nursery api? try: # if installed load the msgspec transport since it's faster import msgspec # noqa - serializer = 'msgspec' + msg_transport_type_key = ('msgspec', 'tcp') except ImportError: - serializer = 'msgpack' + pass - self.stream_serializer_type = get_serializer_stream_type(serializer) - self.msgstream = self.stream_serializer_type( - stream) if stream else None + self._destaddr = destaddr + self._transport_key = msg_transport_type_key - if self.msgstream and destaddr: - raise ValueError( - f"A stream was provided with local addr {self.laddr}" - ) - - self._destaddr = self.msgstream.raddr if self.msgstream else destaddr + # Either created in ``.connect()`` or passed in by + # user in ``.from_stream()``. + self._stream: Optional[trio.SocketStream] = None + self.msgstream: Optional[MsgTransport] = None # set after handshake - always uid of far end self.uid: Optional[Tuple[str, str]] = None @@ -256,9 +302,34 @@ class Channel: # set if far end actor errors internally self._exc: Optional[Exception] = None self._agen = self._aiter_recv() - self._closed: bool = False + @classmethod + def from_stream( + cls, + stream: trio.SocketStream, + **kwargs, + + ) -> Channel: + + src, dst = get_stream_addrs(stream) + chan = Channel(destaddr=dst, **kwargs) + + # set immediately here from provided instance + chan._stream = stream + chan.set_msg_transport(stream) + return chan + + def set_msg_transport( + self, + stream: trio.SocketStream, + type_key: Optional[Tuple[str, str]] = None, + + ) -> MsgTransport: + type_key = type_key or self._transport_key + self.msgstream = get_msg_transport(type_key)(stream) + return self.msgstream + def __repr__(self) -> str: if self.msgstream: return repr( @@ -267,11 +338,11 @@ class Channel: return object.__repr__(self) @property - def laddr(self) -> Optional[Tuple[Any, ...]]: + def laddr(self) -> Optional[Tuple[str, int]]: return self.msgstream.laddr if self.msgstream else None @property - def raddr(self) -> Optional[Tuple[Any, ...]]: + def raddr(self) -> Optional[Tuple[str, int]]: return self.msgstream.raddr if self.msgstream else None async def connect( @@ -279,7 +350,7 @@ class Channel: destaddr: Tuple[Any, ...] = None, **kwargs - ) -> trio.SocketStream: + ) -> MsgTransport: if self.connected(): raise RuntimeError("channel is already connected?") @@ -291,12 +362,12 @@ class Channel: *destaddr, **kwargs ) - self.msgstream = self.stream_serializer_type(stream) + msgstream = self.set_msg_transport(stream) log.transport( - f'Opened channel to peer {self.laddr} -> {self.raddr}' + f'Opened channel[{type(msgstream)}]: {self.laddr} -> {self.raddr}' ) - return stream + return msgstream async def send(self, item: Any) -> None: @@ -307,16 +378,15 @@ class Channel: async def recv(self) -> Any: assert self.msgstream + return await self.msgstream.recv() - try: - return await self.msgstream.recv() - - except trio.BrokenResourceError: - if self._autorecon: - await self._reconnect() - return await self.recv() - - raise + # try: + # return await self.msgstream.recv() + # except trio.BrokenResourceError: + # if self._autorecon: + # await self._reconnect() + # return await self.recv() + # raise async def aclose(self) -> None: @@ -338,34 +408,36 @@ class Channel: def __aiter__(self): return self._agen - async def _reconnect(self) -> None: - """Handle connection failures by polling until a reconnect can be - established. - """ - down = False - while True: - try: - with trio.move_on_after(3) as cancel_scope: - await self.connect() - cancelled = cancel_scope.cancelled_caught - if cancelled: - log.transport( - "Reconnect timed out after 3 seconds, retrying...") - continue - else: - log.transport("Stream connection re-established!") - # run any reconnection sequence - on_recon = self._recon_seq - if on_recon: - await on_recon(self) - break - except (OSError, ConnectionRefusedError): - if not down: - down = True - log.transport( - f"Connection to {self.raddr} went down, waiting" - " for re-establishment") - await trio.sleep(1) + # async def _reconnect(self) -> None: + # """Handle connection failures by polling until a reconnect can be + # established. + # """ + # down = False + # while True: + # try: + # with trio.move_on_after(3) as cancel_scope: + # await self.connect() + # cancelled = cancel_scope.cancelled_caught + # if cancelled: + # log.transport( + # "Reconnect timed out after 3 seconds, retrying...") + # continue + # else: + # log.transport("Stream connection re-established!") + + # # TODO: run any reconnection sequence + # # on_recon = self._recon_seq + # # if on_recon: + # # await on_recon(self) + + # break + # except (OSError, ConnectionRefusedError): + # if not down: + # down = True + # log.transport( + # f"Connection to {self.raddr} went down, waiting" + # " for re-establishment") + # await trio.sleep(1) async def _aiter_recv( self @@ -384,16 +456,14 @@ class Channel: # await self.msgstream.send(sent) except trio.BrokenResourceError: - if not self._autorecon: - raise + # if not self._autorecon: + raise await self.aclose() - if self._autorecon: # attempt reconnect - await self._reconnect() - continue - else: - return + # if self._autorecon: # attempt reconnect + # await self._reconnect() + # continue def connected(self) -> bool: return self.msgstream.connected() if self.msgstream else False From b496e790fed4e8e82c68b36a750c9cae449ed03a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 6 Oct 2021 15:17:28 -0400 Subject: [PATCH 28/28] Use from `.from_stream()` in TCP handler --- tractor/_actor.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 43ee2f3..dcbe541 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -422,7 +422,7 @@ class Actor: """ self._no_more_peers = trio.Event() # unset - chan = Channel(stream=stream) + chan = Channel.from_stream(stream) log.runtime(f"New connection to us {chan}") # send/receive initial handshake response @@ -819,21 +819,25 @@ class Actor: async def _async_main( self, accept_addr: Optional[Tuple[str, int]] = None, + # XXX: currently ``parent_addr`` is only needed for the # ``multiprocessing`` backend (which pickles state sent to # the child instead of relaying it over the connect-back # channel). Once that backend is removed we can likely just - # change this so a simple ``is_subactor: bool`` which will + # change this to a simple ``is_subactor: bool`` which will # be False when running as root actor and True when as # a subactor. parent_addr: Optional[Tuple[str, int]] = None, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + ) -> None: - """Start the channel server, maybe connect back to the parent, and + """ + Start the channel server, maybe connect back to the parent, and start the main task. A "root-most" (or "top-level") nursery for this actor is opened here and when cancelled effectively cancels the actor. + """ registered_with_arbiter = False try: