diff --git a/newsfragments/239.bug.rst b/newsfragments/239.bug.rst new file mode 100644 index 0000000..bf9138a --- /dev/null +++ b/newsfragments/239.bug.rst @@ -0,0 +1,6 @@ +Fix keyboard interrupt handling in ``Portal.open_context()`` blocks. + +Previously this not triggering cancellation of the remote task context +and could result in hangs if a stream was also opened. This fix is to +accept `BaseException` since it is likely any other top level exception +other then kbi (even though not expected) should also get this result. diff --git a/tests/test_2way.py b/tests/test_2way.py index 1ef05d2..c038ae4 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -2,6 +2,8 @@ Bidirectional streaming and context API. """ +import platform + import pytest import trio import tractor @@ -51,54 +53,87 @@ async def assert_state(value: bool): @pytest.mark.parametrize( 'error_parent', - [False, True], + [False, ValueError, KeyboardInterrupt], ) @pytest.mark.parametrize( 'callee_blocks_forever', [False, True], + ids=lambda item: f'callee_blocks_forever={item}' +) +@pytest.mark.parametrize( + 'pointlessly_open_stream', + [False, True], + ids=lambda item: f'open_stream={item}' ) def test_simple_context( error_parent, callee_blocks_forever, + pointlessly_open_stream, ): + timeout = 1.5 if not platform.system() == 'Windows' else 3 + async def main(): - async with tractor.open_nursery() as n: + with trio.fail_after(timeout): + async with tractor.open_nursery() as nursery: - portal = await n.start_actor( - 'simple_context', - enable_modules=[__name__], - ) + portal = await nursery.start_actor( + 'simple_context', + enable_modules=[__name__], + ) - async with portal.open_context( - simple_setup_teardown, - data=10, - block_forever=callee_blocks_forever, - ) as (ctx, sent): + try: + async with portal.open_context( + simple_setup_teardown, + data=10, + block_forever=callee_blocks_forever, + ) as (ctx, sent): - assert sent == 11 + assert sent == 11 - if callee_blocks_forever: - await portal.run(assert_state, value=True) - await ctx.cancel() - else: - assert await ctx.result() == 'yo' + if callee_blocks_forever: + await portal.run(assert_state, value=True) + else: + assert await ctx.result() == 'yo' - # after cancellation - await portal.run(assert_state, value=False) + if not error_parent: + await ctx.cancel() - if error_parent: - raise ValueError + if pointlessly_open_stream: + async with ctx.open_stream(): + if error_parent: + raise error_parent - # shut down daemon - await portal.cancel_actor() + if callee_blocks_forever: + await ctx.cancel() + else: + # in this case the stream will send a + # 'stop' msg to the far end which needs + # to be ignored + pass + else: + if error_parent: + raise error_parent + + finally: + + # after cancellation + if not error_parent: + await portal.run(assert_state, value=False) + + # shut down daemon + await portal.cancel_actor() if error_parent: try: trio.run(main) - except ValueError: + except error_parent: pass + except trio.MultiError as me: + # XXX: on windows it seems we may have to expect the group error + from tractor._exceptions import is_multi_cancelled + assert is_multi_cancelled(me) else: trio.run(main) diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index 4429d25..74c06ca 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -218,3 +218,54 @@ def test_reqresp_ontopof_streaming(): trio.run(main) except trio.TooSlowError: pass + + +async def async_gen_stream(sequence): + for i in sequence: + yield i + await trio.sleep(0.1) + + +@tractor.context +async def echo_ctx_stream( + ctx: tractor.Context, +) -> None: + await ctx.started() + + async with ctx.open_stream() as stream: + async for msg in stream: + await stream.send(msg) + + +def test_sigint_both_stream_types(): + '''Verify that running a bi-directional and recv only stream + side-by-side will cancel correctly from SIGINT. + + ''' + async def main(): + with trio.fail_after(2): + async with tractor.open_nursery() as n: + # name of this actor will be same as target func + portal = await n.start_actor( + '2_way', + enable_modules=[__name__] + ) + + async with portal.open_context(echo_ctx_stream) as (ctx, _): + async with ctx.open_stream() as stream: + async with portal.open_stream_from( + async_gen_stream, + sequence=list(range(1)), + ) as gen_stream: + + msg = await gen_stream.receive() + await stream.send(msg) + resp = await stream.receive() + assert resp == msg + raise KeyboardInterrupt + + try: + trio.run(main) + assert 0, "Didn't receive KBI!?" + except KeyboardInterrupt: + pass diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 8265197..f32d209 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -1,5 +1,6 @@ """ Broadcast channels for fan-out to local tasks. + """ from contextlib import asynccontextmanager from functools import partial @@ -332,6 +333,9 @@ def test_ensure_slow_consumers_lag_out( await trio.sleep(delay) if task.name == 'sub_1': + # trigger checkpoint to clean out other subs + await trio.sleep(0) + # the non-lagger got # a ``trio.EndOfChannel`` # because the ``tx`` below was closed diff --git a/tractor/_portal.py b/tractor/_portal.py index 63c59ed..137761e 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -177,7 +177,6 @@ class Portal: f"Cancelling all streams with {self.channel.uid}") for stream in self._streams.copy(): try: - # with trio.CancelScope(shield=True): await stream.aclose() except trio.ClosedResourceError: # don't error the stream having already been closed @@ -294,7 +293,6 @@ class Portal: async def open_stream_from( self, async_gen_func: Callable, # typing: ignore - shield: bool = False, **kwargs, ) -> AsyncGenerator[ReceiveMsgStream, None]: @@ -318,11 +316,17 @@ class Portal: # receive only stream assert functype == 'asyncgen' - ctx = Context(self.channel, cid, _portal=self) + ctx = Context( + self.channel, + cid, + # do we need this to be closed implicitly? + # _recv_chan=recv_chan, + _portal=self + ) try: # deliver receive only stream async with ReceiveMsgStream( - ctx, recv_chan, shield=shield + ctx, recv_chan, ) as rchan: self._streams.add(rchan) yield rchan @@ -337,13 +341,16 @@ class Portal: # message right now since there shouldn't be a reason to # stop and restart the stream, right? try: - await ctx.cancel() + with trio.CancelScope(shield=True): + await ctx.cancel() except trio.ClosedResourceError: # if the far end terminates before we send a cancel the # underlying transport-channel may already be closed. - log.debug(f'Context {ctx} was already closed?') + log.warning(f'Context {ctx} was already closed?') + # XXX: should this always be done? + # await recv_chan.aclose() self._streams.remove(rchan) @asynccontextmanager @@ -408,8 +415,8 @@ class Portal: # pairs with handling in ``Actor._push_result()`` # recv_chan._ctx = ctx - # await trio.lowlevel.checkpoint() + yield ctx, first except ContextCancelled as err: @@ -427,9 +434,14 @@ class Portal: log.debug(f'Context {ctx} cancelled gracefully') except ( - trio.Cancelled, - trio.MultiError, - Exception, + BaseException, + + # more specifically, we need to handle: + # Exception, + # trio.Cancelled, + # trio.MultiError, + # KeyboardInterrupt, + ) as err: _err = err # the context cancels itself on any cancel @@ -440,6 +452,11 @@ class Portal: raise finally: + # in the case where a runtime nursery (due to internal bug) + # or a remote actor transmits an error we want to be + # sure we get the error the underlying feeder mem chan. + # if it's not raised here it *should* be raised from the + # msg loop nursery right? result = await ctx.result() # though it should be impossible for any tasks diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9d832b2..2477531 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -51,7 +51,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: 'Context', # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, - shield: bool = False, _broadcaster: Optional[BroadcastReceiver] = None, ) -> None: @@ -295,6 +294,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): '''Send a message over this stream to the far end. ''' + # if self._eoc: + # raise trio.ClosedResourceError('This stream is already ded') + await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -365,7 +367,7 @@ class Context: ''' side = 'caller' if self._portal else 'callee' - log.warning(f'Cancelling {side} side of context to {self.chan}') + log.warning(f'Cancelling {side} side of context to {self.chan.uid}') self._cancel_called = True @@ -396,6 +398,10 @@ class Context: log.warning( "May have failed to cancel remote task " f"{cid} for {self._portal.channel.uid}") + else: + log.warning( + "Timed out on cancelling remote task " + f"{cid} for {self._portal.channel.uid}") else: # callee side remote task @@ -439,16 +445,6 @@ class Context: # here we create a mem chan that corresponds to the # far end caller / callee. - # NOTE: in one way streaming this only happens on the - # caller side inside `Actor.send_cmd()` so if you try - # to send a stop from the caller to the callee in the - # single-direction-stream case you'll get a lookup error - # currently. - _, recv_chan = actor.get_memchans( - self.chan.uid, - self.cid - ) - # Likewise if the surrounding context has been cancelled we error here # since it likely means the surrounding block was exited or # killed @@ -459,6 +455,16 @@ class Context: f'Context around {actor.uid[0]}:{task} was already cancelled!' ) + # NOTE: in one way streaming this only happens on the + # caller side inside `Actor.send_cmd()` so if you try + # to send a stop from the caller to the callee in the + # single-direction-stream case you'll get a lookup error + # currently. + _, recv_chan = actor.get_memchans( + self.chan.uid, + self.cid + ) + # XXX: If the underlying channel feeder receive mem chan has # been closed then likely client code has already exited # a ``.open_stream()`` block prior or there was some other @@ -482,12 +488,6 @@ class Context: # await trio.lowlevel.checkpoint() yield rchan - except trio.EndOfChannel: - # likely the far end sent us a 'stop' message to - # terminate the stream. - raise - - else: # XXX: Make the stream "one-shot use". On exit, signal # ``trio.EndOfChannel``/``StopAsyncIteration`` to the # far end. diff --git a/tractor/log.py b/tractor/log.py index 667c7c6..4bfc798 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -52,6 +52,8 @@ BOLD_PALETTE = { } +# TODO: this isn't showing the correct '{filename}' +# as it did before.. class StackLevelAdapter(logging.LoggerAdapter): def transport(