From 15ead6b561e0cce87d5bcbaecac308bf8f7f49c2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 Dec 2020 21:42:28 -0500 Subject: [PATCH 1/4] Add a way to shield a stream's underlying channel Add a ``tractor._portal.StreamReceiveChannel.shield_channel()`` context manager which allows for avoiding the closing of an IPC stream's underlying channel for the purposes of task re-spawning. Sometimes you might want to cancel a task consuming a stream but not tear down the IPC between actors (the default). A common use can might be where the task's "setup" work might need to be redone but you want to keep the established portal / channel in tact despite the task restart. Includes a test. --- tests/test_streaming.py | 73 +++++++++++++++++++++++++++++++++++++++++ tractor/_portal.py | 20 +++++++++++ 2 files changed, 93 insertions(+) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 919b278..c7ea4e0 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -7,6 +7,7 @@ import platform import trio import tractor +from tractor.testing import tractor_test import pytest @@ -53,6 +54,7 @@ async def stream_from_single_subactor(stream_func_name): """Verify we can spawn a daemon actor and retrieve streamed data. """ async with tractor.find_actor('streamerd') as portals: + if not portals: # only one per host address, spawns an actor if None async with tractor.open_nursery() as nursery: @@ -73,8 +75,10 @@ async def stream_from_single_subactor(stream_func_name): # it'd sure be nice to have an asyncitertools here... iseq = iter(seq) ival = next(iseq) + async for val in stream: assert val == ival + try: ival = next(iseq) except StopIteration: @@ -83,6 +87,7 @@ async def stream_from_single_subactor(stream_func_name): await stream.aclose() await trio.sleep(0.3) + try: await stream.__anext__() except StopAsyncIteration: @@ -109,8 +114,11 @@ def test_stream_from_single_subactor(arb_addr, start_method, stream_func): # this is the first 2 actors, streamer_1 and streamer_2 async def stream_data(seed): + for i in range(seed): + yield i + # trigger scheduler to simulate practical usage await trio.sleep(0) @@ -246,3 +254,68 @@ def test_not_fast_enough_quad( else: # should be cancelled mid-streaming assert results is None + + +@tractor_test +async def test_respawn_consumer_task( + arb_addr, + spawn_backend, + loglevel, +): + """Verify that ``._portal.StreamReceiveChannel.shield_channel()`` + sucessfully protects the underlying IPC channel from being closed + when cancelling and respawning a consumer task. + + This also serves to verify that all values from the stream can be + received despite the respawns. + + """ + stream = None + + async with tractor.open_nursery() as n: + + stream = await(await n.run_in_actor( + 'streamer', + stream_data, + seed=11, + )).result() + + expect = set(range(11)) + received = [] + + # this is the re-spawn task routine + async def consume(task_status=trio.TASK_STATUS_IGNORED): + print('starting consume task..') + nonlocal stream + + with trio.CancelScope() as cs: + task_status.started(cs) + + # shield stream's underlying channel from cancellation + with stream.shield_channel(): + + async for v in stream: + print(f'from stream: {v}') + expect.remove(v) + received.append(v) + + print('exited consume') + + async with trio.open_nursery() as ln: + cs = await ln.start(consume) + + while True: + + await trio.sleep(0.1) + + if received[-1] % 2 == 0: + + print('cancelling consume task..') + cs.cancel() + + # respawn + cs = await ln.start(consume) + + if not expect: + print("all values streamed, BREAKING") + break diff --git a/tractor/_portal.py b/tractor/_portal.py index af3c1b5..50036a8 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -7,6 +7,7 @@ import typing from typing import Tuple, Any, Dict, Optional, Set from functools import partial from dataclasses import dataclass +from contextlib import contextmanager import trio from async_generator import asynccontextmanager @@ -59,6 +60,7 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): self._cid = cid self._rx_chan = rx_chan self._portal = portal + self._shielded = False # delegate directly to underlying mem channel def receive_nowait(self): @@ -83,6 +85,18 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): "Received internal error at portal?") raise unpack_error(msg, self._portal.channel) + @contextmanager + def shield_channel( + self + ) -> typing.AsyncGenerator['StreamReceiveChannel', None]: + """Shield this stream's underlying channel such that a local consumer task + can be cancelled (and possibly restarted) using ``trio.Cancelled``. + + """ + self._shielded = True + yield self + self._shielded = False + async def aclose(self): """Cancel associated remote actor task and local memory channel on close. @@ -90,12 +104,18 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): if self._rx_chan._closed: log.warning(f"{self} is already closed") return + + if self._shielded: + log.warning(f"{self} is shielded, portal channel being kept alive") + return + cid = self._cid with trio.move_on_after(0.5) as cs: cs.shield = True log.warning( f"Cancelling stream {cid} to " f"{self._portal.channel.uid}") + # NOTE: we're telling the far end actor to cancel a task # corresponding to *this actor*. The far end local channel # instance is passed to `Actor._cancel_task()` implicitly. From 201771a521b409f7665a356a4dfa68bca8b43151 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 17 Dec 2020 11:58:48 -0500 Subject: [PATCH 2/4] 'Fix mypy, change interal type name to `ReceiveStream`, settle on `.shield()`' --- tests/test_streaming.py | 4 ++-- tractor/_portal.py | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index c7ea4e0..a64515f 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -262,7 +262,7 @@ async def test_respawn_consumer_task( spawn_backend, loglevel, ): - """Verify that ``._portal.StreamReceiveChannel.shield_channel()`` + """Verify that ``._portal.ReceiveStream.shield()`` sucessfully protects the underlying IPC channel from being closed when cancelling and respawning a consumer task. @@ -292,7 +292,7 @@ async def test_respawn_consumer_task( task_status.started(cs) # shield stream's underlying channel from cancellation - with stream.shield_channel(): + with stream.shield(): async for v in stream: print(f'from stream: {v}') diff --git a/tractor/_portal.py b/tractor/_portal.py index 50036a8..15d86e3 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -4,7 +4,7 @@ Portal api import importlib import inspect import typing -from typing import Tuple, Any, Dict, Optional, Set +from typing import Tuple, Any, Dict, Optional, Set, Iterator from functools import partial from dataclasses import dataclass from contextlib import contextmanager @@ -38,7 +38,7 @@ async def maybe_open_nursery( yield nursery -class StreamReceiveChannel(trio.abc.ReceiveChannel): +class ReceiveStream(trio.abc.ReceiveChannel): """A wrapper around a ``trio._channel.MemoryReceiveChannel`` with special behaviour for signalling stream termination across an inter-actor ``Channel``. This is the type returned to a local task @@ -86,9 +86,9 @@ class StreamReceiveChannel(trio.abc.ReceiveChannel): raise unpack_error(msg, self._portal.channel) @contextmanager - def shield_channel( + def shield( self - ) -> typing.AsyncGenerator['StreamReceiveChannel', None]: + ) -> Iterator['ReceiveStream']: # noqa """Shield this stream's underlying channel such that a local consumer task can be cancelled (and possibly restarted) using ``trio.Cancelled``. @@ -156,7 +156,7 @@ class Portal: self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] ] = None - self._streams: Set[StreamReceiveChannel] = set() + self._streams: Set[ReceiveStream] = set() self.actor = current_actor() async def _submit( @@ -219,7 +219,7 @@ class Portal: # to make async-generators the fundamental IPC API over channels! # (think `yield from`, `gen.send()`, and functional reactive stuff) if resptype == 'yield': # stream response - rchan = StreamReceiveChannel(cid, recv_chan, self) + rchan = ReceiveStream(cid, recv_chan, self) self._streams.add(rchan) return rchan @@ -322,7 +322,7 @@ class LocalPortal: A compatibility shim for normal portals but for invoking functions using an in process actor instance. """ - actor: 'Actor' # type: ignore + actor: 'Actor' # type: ignore # noqa channel: Channel async def run(self, ns: str, func_name: str, **kwargs) -> Any: From 797bcc1df2c33580494f57bc3f3531a28956d852 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 17 Dec 2020 13:35:45 -0500 Subject: [PATCH 3/4] Handle early timeouts on last debugger test --- tests/test_debugger.py | 30 +++++++++++++++++++++++++----- tractor/_ipc.py | 3 +++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 53c3c84..d63a1be 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -6,6 +6,7 @@ All these tests can be understood (somewhat) by running the equivalent TODO: None of these tests have been run successfully on windows yet. """ +import time from os import path import pytest @@ -370,6 +371,8 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method child has unblocked (which can happen when it has the tty lock and is engaged in pdb) it is indeed cancelled after exiting the debugger. """ + timed_out_early = False + child = spawn('root_cancelled_but_child_is_in_tty_lock') child.expect(r"\(Pdb\+\+\)") @@ -377,9 +380,13 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method before = str(child.before.decode()) assert "NameError: name 'doggypants' is not defined" in before assert "tractor._exceptions.RemoteActorError: ('name_error'" not in before + time.sleep(0.5) + child.sendline('c') - for _ in range(4): + + for i in range(4): + time.sleep(0.5) try: child.expect(r"\(Pdb\+\+\)") except TimeoutError: @@ -390,13 +397,26 @@ def test_root_nursery_cancels_before_child_releases_tty_lock(spawn, start_method else: raise + except pexpect.exceptions.EOF: + print(f"Failed early on {i}?") + before = str(child.before.decode()) + + timed_out_early = True + + # race conditions on how fast the continue is sent? + break + + before = str(child.before.decode()) assert "NameError: name 'doggypants' is not defined" in before child.sendline('c') child.expect(pexpect.EOF) - before = str(child.before.decode()) - assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before - assert "tractor._exceptions.RemoteActorError: ('name_error'" in before - assert "NameError: name 'doggypants' is not defined" in before + + if not timed_out_early: + + before = str(child.before.decode()) + assert "tractor._exceptions.RemoteActorError: ('spawner0'" in before + assert "tractor._exceptions.RemoteActorError: ('name_error'" in before + assert "NameError: name 'doggypants' is not defined" in before diff --git a/tractor/_ipc.py b/tractor/_ipc.py index a3a271d..7f6a498 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -214,9 +214,12 @@ class Channel: # # time is pointless # await self.msgstream.send(sent) except trio.BrokenResourceError: + if not self._autorecon: raise + await self.aclose() + if self._autorecon: # attempt reconnect await self._reconnect() continue From 47f68a05323993f076cb8647266d9294f75f486c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 17 Dec 2020 16:37:05 -0500 Subject: [PATCH 4/4] Skip debugger tests on non-trio backends --- tests/test_debugger.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index d63a1be..8a0423b 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -42,10 +42,16 @@ def mk_cmd(ex_name: str) -> str: @pytest.fixture def spawn( + start_method, testdir, arb_addr, ) -> 'pexpect.spawn': + if start_method != 'trio': + pytest.skip( + "Debugger tests are only supported on the trio backend" + ) + def _spawn(cmd): return testdir.spawn( cmd=mk_cmd(cmd),