diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py index 31eff62..1650b58 100644 --- a/examples/full_fledged_streaming_service.py +++ b/examples/full_fledged_streaming_service.py @@ -7,7 +7,7 @@ import tractor async def stream_data(seed): for i in range(seed): yield i - await trio.sleep(0) # trigger scheduler + await trio.sleep(0.0001) # trigger scheduler # this is the third actor; the aggregator diff --git a/newsfragments/261.misc.rst b/newsfragments/261.misc.rst new file mode 100644 index 0000000..6e4a934 --- /dev/null +++ b/newsfragments/261.misc.rst @@ -0,0 +1,37 @@ +Add cross-actor-task ``Context`` oriented error relay, a new +stream overrun error-signal ``StreamOverrun``, and support +disabling ``MsgStream`` backpressure as the default before a stream +is opened or by choice of the user. + +We added stricter semantics around ``tractor.Context.open_stream():`` +particularly to do with streams which are only opened at one end. +Previously, if only one end opened a stream there was no way for that +sender to know if msgs are being received until first, the feeder mem +chan on the receiver side hit a backpressure state and then that +condition delayed its msg loop processing task to eventually create +backpressure on the associated IPC transport. This is non-ideal in the +case where the receiver side never opened a stream by mistake since it +results in silent block of the sender and no adherence to the underlying +mem chan buffer size settings (which is still unsolved btw). + +To solve this we add non-backpressure style message pushing inside +``Actor._push_result()`` by default and only use the backpressure +``trio.MemorySendChannel.send()`` call **iff** the local end of the +context has entered ``Context.open_stream():``. This way if the stream +was never opened but the mem chan is overrun, we relay back to the +sender a (new exception) ``SteamOverrun`` error which is raised in the +sender's scope with a special error message about the stream never +having been opened. Further, this behaviour (non-backpressure style +where senders can expect an error on overruns) can now be enabled with +``.open_stream(backpressure=False)`` and the underlying mem chan size +can be specified with a kwarg ``msg_buffer_size: int``. + +Further bug fixes and enhancements in this changeset include: +- fix a race we were ignoring where if the callee task opened a context + it could enter ``Context.open_stream()`` before calling + ``.started()``. +- Disallow calling ``Context.started()`` more then once. +- Enable ``Context`` linked tasks error relaying via the new + ``Context._maybe_raise_from_remote_msg()`` which (for now) uses + a simple ``trio.Nursery.start_soon()`` to raise the error via closure + in the local scope. diff --git a/tests/test_2way.py b/tests/test_2way.py index c038ae4..db3be4d 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -1,417 +1,11 @@ """ -Bidirectional streaming and context API. +Bidirectional streaming. """ -import platform - import pytest import trio import tractor -from conftest import tractor_test - -# the general stream semantics are -# - normal termination: far end relays a stop message which -# terminates an ongoing ``MsgStream`` iteration -# - cancel termination: context is cancelled on either side cancelling -# the "linked" inter-actor task context - - -_state: bool = False - - -@tractor.context -async def simple_setup_teardown( - - ctx: tractor.Context, - data: int, - block_forever: bool = False, - -) -> None: - - # startup phase - global _state - _state = True - - # signal to parent that we're up - await ctx.started(data + 1) - - try: - if block_forever: - # block until cancelled - await trio.sleep_forever() - else: - return 'yo' - finally: - _state = False - - -async def assert_state(value: bool): - global _state - assert _state == value - - -@pytest.mark.parametrize( - 'error_parent', - [False, ValueError, KeyboardInterrupt], -) -@pytest.mark.parametrize( - 'callee_blocks_forever', - [False, True], - ids=lambda item: f'callee_blocks_forever={item}' -) -@pytest.mark.parametrize( - 'pointlessly_open_stream', - [False, True], - ids=lambda item: f'open_stream={item}' -) -def test_simple_context( - error_parent, - callee_blocks_forever, - pointlessly_open_stream, -): - - timeout = 1.5 if not platform.system() == 'Windows' else 3 - - async def main(): - - with trio.fail_after(timeout): - async with tractor.open_nursery() as nursery: - - portal = await nursery.start_actor( - 'simple_context', - enable_modules=[__name__], - ) - - try: - async with portal.open_context( - simple_setup_teardown, - data=10, - block_forever=callee_blocks_forever, - ) as (ctx, sent): - - assert sent == 11 - - if callee_blocks_forever: - await portal.run(assert_state, value=True) - else: - assert await ctx.result() == 'yo' - - if not error_parent: - await ctx.cancel() - - if pointlessly_open_stream: - async with ctx.open_stream(): - if error_parent: - raise error_parent - - if callee_blocks_forever: - await ctx.cancel() - else: - # in this case the stream will send a - # 'stop' msg to the far end which needs - # to be ignored - pass - else: - if error_parent: - raise error_parent - - finally: - - # after cancellation - if not error_parent: - await portal.run(assert_state, value=False) - - # shut down daemon - await portal.cancel_actor() - - if error_parent: - try: - trio.run(main) - except error_parent: - pass - except trio.MultiError as me: - # XXX: on windows it seems we may have to expect the group error - from tractor._exceptions import is_multi_cancelled - assert is_multi_cancelled(me) - else: - trio.run(main) - - -# basic stream terminations: -# - callee context closes without using stream -# - caller context closes without using stream -# - caller context calls `Context.cancel()` while streaming -# is ongoing resulting in callee being cancelled -# - callee calls `Context.cancel()` while streaming and caller -# sees stream terminated in `RemoteActorError` - -# TODO: future possible features -# - restart request: far end raises `ContextRestart` - - -@tractor.context -async def close_ctx_immediately( - - ctx: tractor.Context, - -) -> None: - - await ctx.started() - global _state - - async with ctx.open_stream(): - pass - - -@tractor_test -async def test_callee_closes_ctx_after_stream_open(): - 'callee context closes without using stream' - - async with tractor.open_nursery() as n: - - portal = await n.start_actor( - 'fast_stream_closer', - enable_modules=[__name__], - ) - - async with portal.open_context( - close_ctx_immediately, - - # flag to avoid waiting the final result - # cancel_on_exit=True, - - ) as (ctx, sent): - - assert sent is None - - with trio.fail_after(0.5): - async with ctx.open_stream() as stream: - - # should fall through since ``StopAsyncIteration`` - # should be raised through translation of - # a ``trio.EndOfChannel`` by - # ``trio.abc.ReceiveChannel.__anext__()`` - async for _ in stream: - assert 0 - else: - - # verify stream is now closed - try: - await stream.receive() - except trio.EndOfChannel: - pass - - # TODO: should be just raise the closed resource err - # directly here to enforce not allowing a re-open - # of a stream to the context (at least until a time of - # if/when we decide that's a good idea?) - try: - async with ctx.open_stream() as stream: - pass - except trio.ClosedResourceError: - pass - - await portal.cancel_actor() - - -@tractor.context -async def expect_cancelled( - - ctx: tractor.Context, - -) -> None: - global _state - _state = True - - await ctx.started() - - try: - async with ctx.open_stream() as stream: - async for msg in stream: - await stream.send(msg) # echo server - - except trio.Cancelled: - # expected case - _state = False - raise - - else: - assert 0, "Wasn't cancelled!?" - - -@pytest.mark.parametrize( - 'use_ctx_cancel_method', - [False, True], -) -@tractor_test -async def test_caller_closes_ctx_after_callee_opens_stream( - use_ctx_cancel_method: bool, -): - 'caller context closes without using stream' - - async with tractor.open_nursery() as n: - - portal = await n.start_actor( - 'ctx_cancelled', - enable_modules=[__name__], - ) - - async with portal.open_context( - expect_cancelled, - ) as (ctx, sent): - await portal.run(assert_state, value=True) - - assert sent is None - - # call cancel explicitly - if use_ctx_cancel_method: - - await ctx.cancel() - - try: - async with ctx.open_stream() as stream: - async for msg in stream: - pass - - except tractor.ContextCancelled: - raise # XXX: must be propagated to __aexit__ - - else: - assert 0, "Should have context cancelled?" - - # channel should still be up - assert portal.channel.connected() - - # ctx is closed here - await portal.run(assert_state, value=False) - - else: - try: - with trio.fail_after(0.2): - await ctx.result() - assert 0, "Callee should have blocked!?" - except trio.TooSlowError: - await ctx.cancel() - try: - async with ctx.open_stream() as stream: - async for msg in stream: - pass - except tractor.ContextCancelled: - pass - else: - assert 0, "Should have received closed resource error?" - - # ctx is closed here - await portal.run(assert_state, value=False) - - # channel should not have been destroyed yet, only the - # inter-actor-task context - assert portal.channel.connected() - - # teardown the actor - await portal.cancel_actor() - - -@tractor_test -async def test_multitask_caller_cancels_from_nonroot_task(): - - async with tractor.open_nursery() as n: - - portal = await n.start_actor( - 'ctx_cancelled', - enable_modules=[__name__], - ) - - async with portal.open_context( - expect_cancelled, - ) as (ctx, sent): - - await portal.run(assert_state, value=True) - assert sent is None - - async with ctx.open_stream() as stream: - - async def send_msg_then_cancel(): - await stream.send('yo') - await portal.run(assert_state, value=True) - await ctx.cancel() - await portal.run(assert_state, value=False) - - async with trio.open_nursery() as n: - n.start_soon(send_msg_then_cancel) - - try: - async for msg in stream: - assert msg == 'yo' - - except tractor.ContextCancelled: - raise # XXX: must be propagated to __aexit__ - - # channel should still be up - assert portal.channel.connected() - - # ctx is closed here - await portal.run(assert_state, value=False) - - # channel should not have been destroyed yet, only the - # inter-actor-task context - assert portal.channel.connected() - - # teardown the actor - await portal.cancel_actor() - - -@tractor.context -async def cancel_self( - - ctx: tractor.Context, - -) -> None: - global _state - _state = True - - await ctx.cancel() - try: - with trio.fail_after(0.1): - await trio.sleep_forever() - - except trio.Cancelled: - raise - - except trio.TooSlowError: - # should never get here - assert 0 - - -@tractor_test -async def test_callee_cancels_before_started(): - '''callee calls `Context.cancel()` while streaming and caller - sees stream terminated in `ContextCancelled`. - - ''' - async with tractor.open_nursery() as n: - - portal = await n.start_actor( - 'cancels_self', - enable_modules=[__name__], - ) - try: - - async with portal.open_context( - cancel_self, - ) as (ctx, sent): - async with ctx.open_stream(): - - await trio.sleep_forever() - - # raises a special cancel signal - except tractor.ContextCancelled as ce: - ce.type == trio.Cancelled - - # teardown the actor - await portal.cancel_actor() - @tractor.context async def simple_rpc( @@ -420,9 +14,10 @@ async def simple_rpc( data: int, ) -> None: - """Test a small ping-pong server. + ''' + Test a small ping-pong server. - """ + ''' # signal to parent that we're up await ctx.started(data + 1) @@ -480,9 +75,10 @@ async def simple_rpc_with_forloop( [simple_rpc, simple_rpc_with_forloop], ) def test_simple_rpc(server_func, use_async_for): - """The simplest request response pattern. + ''' + The simplest request response pattern. - """ + ''' async def main(): async with tractor.open_nursery() as n: diff --git a/tests/test_clustering.py b/tests/test_clustering.py index ba8052f..56e629b 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -14,7 +14,7 @@ MESSAGE = 'tractoring at full speed' @tractor.context async def worker(ctx: tractor.Context) -> None: await ctx.started() - async with ctx.open_stream() as stream: + async with ctx.open_stream(backpressure=True) as stream: async for msg in stream: # do something with msg print(msg) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py new file mode 100644 index 0000000..4c74a9a --- /dev/null +++ b/tests/test_context_stream_semantics.py @@ -0,0 +1,714 @@ +''' +``async with ():`` inlined context-stream cancellation testing. + +Verify the we raise errors when streams are opened prior to sync-opening +a ``tractor.Context`` beforehand. + +''' +from itertools import count +import platform +from typing import Optional + +import pytest +import trio +import tractor +from tractor._exceptions import StreamOverrun + +from conftest import tractor_test + +# ``Context`` semantics are as follows, +# ------------------------------------ + +# - standard setup/teardown: +# ``Portal.open_context()`` starts a new +# remote task context in another actor. The target actor's task must +# call ``Context.started()`` to unblock this entry on the caller side. +# the callee task executes until complete and returns a final value +# which is delivered to the caller side and retreived via +# ``Context.result()``. + +# - cancel termination: +# context can be cancelled on either side where either end's task can +# call ``Context.cancel()`` which raises a local ``trio.Cancelled`` +# and sends a task cancel request to the remote task which in turn +# raises a ``trio.Cancelled`` in that scope, catches it, and re-raises +# as ``ContextCancelled``. This is then caught by +# ``Portal.open_context()``'s exit and we get a graceful termination +# of the linked tasks. + +# - error termination: +# error is caught after all context-cancel-scope tasks are cancelled +# via regular ``trio`` cancel scope semantics, error is sent to other +# side and unpacked as a `RemoteActorError`. + + +# ``Context.open_stream() as stream: MsgStream:`` msg semantics are: +# ----------------------------------------------------------------- + +# - either side can ``.send()`` which emits a 'yield' msgs and delivers +# a value to the a ``MsgStream.receive()`` call. + +# - stream closure: one end relays a 'stop' message which terminates an +# ongoing ``MsgStream`` iteration. + +# - cancel/error termination: as per the context semantics above but +# with implicit stream closure on the cancelling end. + + +_state: bool = False + + +@tractor.context +async def too_many_starteds( + ctx: tractor.Context, +) -> None: + ''' + Call ``Context.started()`` more then once (an error). + + ''' + await ctx.started() + try: + await ctx.started() + except RuntimeError: + raise + + +@tractor.context +async def not_started_but_stream_opened( + ctx: tractor.Context, +) -> None: + ''' + Enter ``Context.open_stream()`` without calling ``.started()``. + + ''' + try: + async with ctx.open_stream(): + assert 0 + except RuntimeError: + raise + + +@pytest.mark.parametrize( + 'target', + [too_many_starteds, not_started_but_stream_opened], + ids='misuse_type={}'.format, +) +def test_started_misuse(target): + + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + target.__name__, + enable_modules=[__name__], + ) + + async with portal.open_context(target) as (ctx, sent): + await trio.sleep(1) + + with pytest.raises(tractor.RemoteActorError): + trio.run(main) + + +@tractor.context +async def simple_setup_teardown( + + ctx: tractor.Context, + data: int, + block_forever: bool = False, + +) -> None: + + # startup phase + global _state + _state = True + + # signal to parent that we're up + await ctx.started(data + 1) + + try: + if block_forever: + # block until cancelled + await trio.sleep_forever() + else: + return 'yo' + finally: + _state = False + + +async def assert_state(value: bool): + global _state + assert _state == value + + +@pytest.mark.parametrize( + 'error_parent', + [False, ValueError, KeyboardInterrupt], +) +@pytest.mark.parametrize( + 'callee_blocks_forever', + [False, True], + ids=lambda item: f'callee_blocks_forever={item}' +) +@pytest.mark.parametrize( + 'pointlessly_open_stream', + [False, True], + ids=lambda item: f'open_stream={item}' +) +def test_simple_context( + error_parent, + callee_blocks_forever, + pointlessly_open_stream, +): + + timeout = 1.5 if not platform.system() == 'Windows' else 4 + + async def main(): + + with trio.fail_after(timeout): + async with tractor.open_nursery() as nursery: + + portal = await nursery.start_actor( + 'simple_context', + enable_modules=[__name__], + ) + + try: + async with portal.open_context( + simple_setup_teardown, + data=10, + block_forever=callee_blocks_forever, + ) as (ctx, sent): + + assert sent == 11 + + if callee_blocks_forever: + await portal.run(assert_state, value=True) + else: + assert await ctx.result() == 'yo' + + if not error_parent: + await ctx.cancel() + + if pointlessly_open_stream: + async with ctx.open_stream(): + if error_parent: + raise error_parent + + if callee_blocks_forever: + await ctx.cancel() + else: + # in this case the stream will send a + # 'stop' msg to the far end which needs + # to be ignored + pass + else: + if error_parent: + raise error_parent + + finally: + + # after cancellation + if not error_parent: + await portal.run(assert_state, value=False) + + # shut down daemon + await portal.cancel_actor() + + if error_parent: + try: + trio.run(main) + except error_parent: + pass + except trio.MultiError as me: + # XXX: on windows it seems we may have to expect the group error + from tractor._exceptions import is_multi_cancelled + assert is_multi_cancelled(me) + else: + trio.run(main) + + +# basic stream terminations: +# - callee context closes without using stream +# - caller context closes without using stream +# - caller context calls `Context.cancel()` while streaming +# is ongoing resulting in callee being cancelled +# - callee calls `Context.cancel()` while streaming and caller +# sees stream terminated in `RemoteActorError` + +# TODO: future possible features +# - restart request: far end raises `ContextRestart` + + +@tractor.context +async def close_ctx_immediately( + + ctx: tractor.Context, + +) -> None: + + await ctx.started() + global _state + + async with ctx.open_stream(): + pass + + +@tractor_test +async def test_callee_closes_ctx_after_stream_open(): + 'callee context closes without using stream' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'fast_stream_closer', + enable_modules=[__name__], + ) + + async with portal.open_context( + close_ctx_immediately, + + # flag to avoid waiting the final result + # cancel_on_exit=True, + + ) as (ctx, sent): + + assert sent is None + + with trio.fail_after(0.5): + async with ctx.open_stream() as stream: + + # should fall through since ``StopAsyncIteration`` + # should be raised through translation of + # a ``trio.EndOfChannel`` by + # ``trio.abc.ReceiveChannel.__anext__()`` + async for _ in stream: + assert 0 + else: + + # verify stream is now closed + try: + await stream.receive() + except trio.EndOfChannel: + pass + + # TODO: should be just raise the closed resource err + # directly here to enforce not allowing a re-open + # of a stream to the context (at least until a time of + # if/when we decide that's a good idea?) + try: + async with ctx.open_stream() as stream: + pass + except trio.ClosedResourceError: + pass + + await portal.cancel_actor() + + +@tractor.context +async def expect_cancelled( + + ctx: tractor.Context, + +) -> None: + global _state + _state = True + + await ctx.started() + + try: + async with ctx.open_stream() as stream: + async for msg in stream: + await stream.send(msg) # echo server + + except trio.Cancelled: + # expected case + _state = False + raise + + else: + assert 0, "Wasn't cancelled!?" + + +@pytest.mark.parametrize( + 'use_ctx_cancel_method', + [False, True], +) +@tractor_test +async def test_caller_closes_ctx_after_callee_opens_stream( + use_ctx_cancel_method: bool, +): + 'caller context closes without using stream' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'ctx_cancelled', + enable_modules=[__name__], + ) + + async with portal.open_context( + expect_cancelled, + ) as (ctx, sent): + await portal.run(assert_state, value=True) + + assert sent is None + + # call cancel explicitly + if use_ctx_cancel_method: + + await ctx.cancel() + + try: + async with ctx.open_stream() as stream: + async for msg in stream: + pass + + except tractor.ContextCancelled: + raise # XXX: must be propagated to __aexit__ + + else: + assert 0, "Should have context cancelled?" + + # channel should still be up + assert portal.channel.connected() + + # ctx is closed here + await portal.run(assert_state, value=False) + + else: + try: + with trio.fail_after(0.2): + await ctx.result() + assert 0, "Callee should have blocked!?" + except trio.TooSlowError: + await ctx.cancel() + try: + async with ctx.open_stream() as stream: + async for msg in stream: + pass + except tractor.ContextCancelled: + pass + else: + assert 0, "Should have received closed resource error?" + + # ctx is closed here + await portal.run(assert_state, value=False) + + # channel should not have been destroyed yet, only the + # inter-actor-task context + assert portal.channel.connected() + + # teardown the actor + await portal.cancel_actor() + + +@tractor_test +async def test_multitask_caller_cancels_from_nonroot_task(): + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'ctx_cancelled', + enable_modules=[__name__], + ) + + async with portal.open_context( + expect_cancelled, + ) as (ctx, sent): + + await portal.run(assert_state, value=True) + assert sent is None + + async with ctx.open_stream() as stream: + + async def send_msg_then_cancel(): + await stream.send('yo') + await portal.run(assert_state, value=True) + await ctx.cancel() + await portal.run(assert_state, value=False) + + async with trio.open_nursery() as n: + n.start_soon(send_msg_then_cancel) + + try: + async for msg in stream: + assert msg == 'yo' + + except tractor.ContextCancelled: + raise # XXX: must be propagated to __aexit__ + + # channel should still be up + assert portal.channel.connected() + + # ctx is closed here + await portal.run(assert_state, value=False) + + # channel should not have been destroyed yet, only the + # inter-actor-task context + assert portal.channel.connected() + + # teardown the actor + await portal.cancel_actor() + + +@tractor.context +async def cancel_self( + + ctx: tractor.Context, + +) -> None: + global _state + _state = True + + await ctx.cancel() + + # should inline raise immediately + try: + async with ctx.open_stream(): + pass + except ContextCancelled: + pass + + # check a real ``trio.Cancelled`` is raised on a checkpoint + try: + with trio.fail_after(0.1): + await trio.sleep_forever() + except trio.Cancelled: + raise + + except trio.TooSlowError: + # should never get here + assert 0 + + +@tractor_test +async def test_callee_cancels_before_started(): + ''' + Callee calls `Context.cancel()` while streaming and caller + sees stream terminated in `ContextCancelled`. + + ''' + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'cancels_self', + enable_modules=[__name__], + ) + try: + + async with portal.open_context( + cancel_self, + ) as (ctx, sent): + async with ctx.open_stream(): + + await trio.sleep_forever() + + # raises a special cancel signal + except tractor.ContextCancelled as ce: + ce.type == trio.Cancelled + + # teardown the actor + await portal.cancel_actor() + + +@tractor.context +async def never_open_stream( + + ctx: tractor.Context, + +) -> None: + ''' + Context which never opens a stream and blocks. + + ''' + await ctx.started() + await trio.sleep_forever() + + +@tractor.context +async def keep_sending_from_callee( + + ctx: tractor.Context, + msg_buffer_size: Optional[int] = None, + +) -> None: + ''' + Send endlessly on the calleee stream. + + ''' + await ctx.started() + async with ctx.open_stream( + msg_buffer_size=msg_buffer_size, + ) as stream: + for msg in count(): + print(f'callee sending {msg}') + await stream.send(msg) + await trio.sleep(0.01) + + +@pytest.mark.parametrize( + 'overrun_by', + [ + ('caller', 1, never_open_stream), + ('cancel_caller_during_overrun', 1, never_open_stream), + ('callee', 0, keep_sending_from_callee), + ], + ids='overrun_condition={}'.format, +) +def test_one_end_stream_not_opened(overrun_by): + ''' + This should exemplify the bug from: + https://github.com/goodboy/tractor/issues/265 + + ''' + overrunner, buf_size_increase, entrypoint = overrun_by + from tractor._actor import Actor + buf_size = buf_size_increase + Actor.msg_buffer_size + + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + entrypoint.__name__, + enable_modules=[__name__], + ) + + async with portal.open_context( + entrypoint, + ) as (ctx, sent): + assert sent is None + + if 'caller' in overrunner: + + async with ctx.open_stream() as stream: + for i in range(buf_size): + print(f'sending {i}') + await stream.send(i) + + if 'cancel' in overrunner: + # without this we block waiting on the child side + await ctx.cancel() + + else: + # expect overrun error to be relayed back + # and this sleep interrupted + await trio.sleep_forever() + + else: + # callee overruns caller case so we do nothing here + await trio.sleep_forever() + + await portal.cancel_actor() + + # 2 overrun cases and the no overrun case (which pushes right up to + # the msg limit) + if overrunner == 'caller': + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) + + assert excinfo.value.type == StreamOverrun + + elif 'cancel' in overrunner: + with pytest.raises(trio.MultiError) as excinfo: + trio.run(main) + + multierr = excinfo.value + + for exc in multierr.exceptions: + etype = type(exc) + if etype == tractor.RemoteActorError: + assert exc.type == StreamOverrun + else: + assert etype == tractor.ContextCancelled + + elif overrunner == 'callee': + with pytest.raises(tractor.RemoteActorError) as excinfo: + trio.run(main) + + # TODO: embedded remote errors so that we can verify the source + # error? + # the callee delivers an error which is an overrun wrapped + # in a remote actor error. + assert excinfo.value.type == tractor.RemoteActorError + + else: + trio.run(main) + + +@tractor.context +async def echo_back_sequence( + + ctx: tractor.Context, + seq: list[int], + msg_buffer_size: Optional[int] = None, + +) -> None: + ''' + Send endlessly on the calleee stream. + + ''' + await ctx.started() + async with ctx.open_stream( + msg_buffer_size=msg_buffer_size, + ) as stream: + + seq = list(seq) # bleh, `msgpack`... + count = 0 + while count < 3: + batch = [] + async for msg in stream: + batch.append(msg) + if batch == seq: + break + + for msg in batch: + print(f'callee sending {msg}') + await stream.send(msg) + + count += 1 + + return 'yo' + + +def test_stream_backpressure(): + ''' + Demonstrate small overruns of each task back and forth + on a stream not raising any errors by default. + + ''' + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + 'callee_sends_forever', + enable_modules=[__name__], + ) + seq = list(range(3)) + async with portal.open_context( + echo_back_sequence, + seq=seq, + msg_buffer_size=1, + ) as (ctx, sent): + assert sent is None + + async with ctx.open_stream(msg_buffer_size=1) as stream: + count = 0 + while count < 3: + for msg in seq: + print(f'caller sending {msg}') + await stream.send(msg) + await trio.sleep(0.1) + + batch = [] + async for msg in stream: + batch.append(msg) + if batch == seq: + break + + count += 1 + + # here the context should return + assert await ctx.result() == 'yo' + + # cancel the daemon + await portal.cancel_actor() + + trio.run(main) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 38fbee4..baee54e 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -132,7 +132,7 @@ async def stream_data(seed): yield i # trigger scheduler to simulate practical usage - await trio.sleep(0) + await trio.sleep(0.0001) # this is the third actor; the aggregator diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index b18a40e..9b4258e 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -83,7 +83,7 @@ async def open_sequence_streamer( ) as (ctx, first): assert first is None - async with ctx.open_stream() as stream: + async with ctx.open_stream(backpressure=True) as stream: yield stream await portal.cancel_actor() @@ -334,7 +334,7 @@ def test_ensure_slow_consumers_lag_out( if task.name == 'sub_1': # trigger checkpoint to clean out other subs - await trio.sleep(0) + await trio.sleep(0.01) # the non-lagger got # a ``trio.EndOfChannel`` @@ -401,7 +401,7 @@ def test_ensure_slow_consumers_lag_out( assert not tx._state.open_send_channels # check that "first" bcaster that we created - # above, never wass iterated and is thus overrun + # above, never was iterated and is thus overrun try: await brx.receive() except Lagged: diff --git a/tractor/_actor.py b/tractor/_actor.py index de86df1..dac2105 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -32,6 +32,7 @@ from ._exceptions import ( is_multi_cancelled, ContextCancelled, TransportClosed, + StreamOverrun, ) from . import _debug from ._discovery import get_arbiter @@ -50,6 +51,7 @@ async def _invoke( chan: Channel, func: typing.Callable, kwargs: dict[str, Any], + is_rpc: bool = True, task_status: TaskStatus[ Union[trio.CancelScope, BaseException] @@ -68,9 +70,10 @@ async def _invoke( tb = None cancel_scope = trio.CancelScope() + # activated cancel scope ref cs: Optional[trio.CancelScope] = None - ctx = Context(chan, cid) + ctx = actor.get_context(chan, cid) context: bool = False if getattr(func, '_tractor_stream_function', False): @@ -159,15 +162,33 @@ async def _invoke( # context func with support for bi-dir streaming await chan.send({'functype': 'context', 'cid': cid}) - async with trio.open_nursery() as scope_nursery: - ctx._scope_nursery = scope_nursery - cs = scope_nursery.cancel_scope - task_status.started(cs) - try: + try: + async with trio.open_nursery() as scope_nursery: + ctx._scope_nursery = scope_nursery + cs = scope_nursery.cancel_scope + task_status.started(cs) await chan.send({'return': await coro, 'cid': cid}) - except trio.Cancelled as err: - tb = err.__traceback__ + except trio.Cancelled as err: + tb = err.__traceback__ + + except trio.MultiError: + # if a context error was set then likely + # thei multierror was raised due to that + if ctx._error is not None: + raise ctx._error from None + + raise + + finally: + # XXX: only pop the context tracking if + # a ``@tractor.context`` entrypoint was called + assert chan.uid + ctx = actor._contexts.pop((chan.uid, cid)) + if ctx: + log.runtime(f'Context entrypoint for {func} was terminated:\n{ctx}') + + assert cs if cs.cancelled_caught: # TODO: pack in ``trio.Cancelled.__traceback__`` here @@ -246,6 +267,7 @@ async def _invoke( try: scope, func, is_complete = actor._rpc_tasks.pop((chan, cid)) is_complete.set() + except KeyError: if is_rpc: # If we're cancelled before the task returns then the @@ -312,6 +334,7 @@ class Actor: # ugh, we need to get rid of this and replace with a "registry" sys # https://github.com/goodboy/tractor/issues/216 is_arbiter: bool = False + msg_buffer_size: int = 2**6 # nursery placeholders filled in by `_async_main()` after fork _root_n: Optional[trio.Nursery] = None @@ -379,19 +402,19 @@ class Actor: self._no_more_peers.set() self._ongoing_rpc_tasks = trio.Event() self._ongoing_rpc_tasks.set() + # (chan, cid) -> (cancel_scope, func) self._rpc_tasks: dict[ Tuple[Channel, str], Tuple[trio.CancelScope, typing.Callable, trio.Event] ] = {} - # map {uids -> {callids -> waiter queues}} - self._cids2qs: dict[ + + # map {actor uids -> Context} + self._contexts: dict[ Tuple[Tuple[str, str], str], - Tuple[ - trio.abc.SendChannel[Any], - trio.abc.ReceiveChannel[Any] - ] + Context ] = {} + self._listeners: List[trio.abc.Listener] = [] self._parent_chan: Optional[Channel] = None self._forkserver_info: Optional[ @@ -546,7 +569,7 @@ class Actor: # now in a cancelled condition) when the local runtime here # is now cancelled while (presumably) in the middle of msg # loop processing. - with trio.move_on_after(0.1) as cs: + with trio.move_on_after(0.5) as cs: cs.shield = True # Attempt to wait for the far end to close the channel # and bail after timeout (2-generals on closure). @@ -567,17 +590,7 @@ class Actor: await local_nursery.exited.wait() - # channel cleanup sequence - - # for (channel, cid) in self._rpc_tasks.copy(): - # if channel is chan: - # with trio.CancelScope(shield=True): - # await self._cancel_task(cid, channel) - - # # close all consumer side task mem chans - # send_chan, _ = self._cids2qs[(chan.uid, cid)] - # assert send_chan.cid == cid # type: ignore - # await send_chan.aclose() + # ``Channel`` teardown and closure sequence # Drop ref to channel so it can be gc-ed and disconnected log.runtime(f"Releasing channel {chan} from {chan.uid}") @@ -588,6 +601,10 @@ class Actor: log.runtime(f"No more channels for {chan.uid}") self._peers.pop(chan.uid, None) + # for (uid, cid) in self._contexts.copy(): + # if chan.uid == uid: + # self._contexts.pop((uid, cid)) + log.runtime(f"Peers is {self._peers}") if not self._peers: # no more channels connected @@ -619,26 +636,32 @@ class Actor: cid: str, msg: dict[str, Any], ) -> None: - """Push an RPC result to the local consumer's queue. - """ - # actorid = chan.uid - assert chan.uid, f"`chan.uid` can't be {chan.uid}" - send_chan, recv_chan = self._cids2qs[(chan.uid, cid)] - assert send_chan.cid == cid # type: ignore - - # if 'error' in msg: - # ctx = getattr(recv_chan, '_ctx', None) - # if ctx: - # ctx._error_from_remote_msg(msg) - - # log.runtime(f"{send_chan} was terminated at remote end") - # # indicate to consumer that far end has stopped - # return await send_chan.aclose() + ''' + Push an RPC result to the local consumer's queue. + ''' + uid = chan.uid + assert uid, f"`chan.uid` can't be {uid}" try: - log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") - # maintain backpressure - await send_chan.send(msg) + ctx = self._contexts[(uid, cid)] + except KeyError: + log.warning( + f'Ignoring msg from [no-longer/un]known context with {uid}:' + f'\n{msg}') + return + + send_chan = ctx._send_chan + + log.runtime(f"Delivering {msg} from {chan.uid} to caller {cid}") + + # XXX: we do **not** maintain backpressure and instead + # opt to relay stream overrun errors to the sender. + try: + send_chan.send_nowait(msg) + # if an error is deteced we should always + # expect it to be raised by any context (stream) + # consumer task + await ctx._maybe_raise_from_remote_msg(msg) except trio.BrokenResourceError: # TODO: what is the right way to handle the case where the @@ -650,44 +673,120 @@ class Actor: # XXX: local consumer has closed their side # so cancel the far end streaming task log.warning(f"{send_chan} consumer is already closed") + return - def get_memchans( + except trio.WouldBlock: + # XXX: always push an error even if the local + # receiver is in overrun state. + await ctx._maybe_raise_from_remote_msg(msg) + + uid = chan.uid + lines = [ + 'Task context stream was overrun', + f'local task: {cid} @ {self.uid}', + f'remote sender: {uid}', + ] + if not ctx._stream_opened: + lines.insert( + 1, + f'\n*** No stream open on `{self.uid[0]}` side! ***\n' + ) + text = '\n'.join(lines) + + if ctx._backpressure: + log.warning(text) + await send_chan.send(msg) + else: + try: + raise StreamOverrun(text) from None + except StreamOverrun as err: + err_msg = pack_error(err) + err_msg['cid'] = cid + try: + await chan.send(err_msg) + except trio.BrokenResourceError: + # XXX: local consumer has closed their side + # so cancel the far end streaming task + log.warning(f"{chan} is already closed") + + def get_context( self, - actorid: Tuple[str, str], - cid: str + chan: Channel, + cid: str, + msg_buffer_size: Optional[int] = None, - ) -> Tuple[trio.abc.SendChannel, trio.abc.ReceiveChannel]: + ) -> Context: + ''' + Look up or create a new inter-actor-task-IPC-linked task + "context" which encapsulates the local task's scheduling + enviroment including a ``trio`` cancel scope, a pair of IPC + messaging "feeder" channels, and an RPC id unique to the + task-as-function invocation. - log.runtime(f"Getting result queue for {actorid} cid {cid}") + ''' + log.runtime(f"Getting result queue for {chan.uid} cid {cid}") + actor_uid = chan.uid + assert actor_uid try: - send_chan, recv_chan = self._cids2qs[(actorid, cid)] + ctx = self._contexts[(actor_uid, cid)] + + # adjust buffer size if specified + state = ctx._send_chan._state # type: ignore + if msg_buffer_size and state.max_buffer_size != msg_buffer_size: + state.max_buffer_size = msg_buffer_size + except KeyError: - send_chan, recv_chan = trio.open_memory_channel(2**6) - send_chan.cid = cid # type: ignore - recv_chan.cid = cid # type: ignore - self._cids2qs[(actorid, cid)] = send_chan, recv_chan + send_chan: trio.MemorySendChannel + recv_chan: trio.MemoryReceiveChannel + send_chan, recv_chan = trio.open_memory_channel( + msg_buffer_size or self.msg_buffer_size) + ctx = Context( + chan, + cid, + _send_chan=send_chan, + _recv_chan=recv_chan, + ) + self._contexts[(actor_uid, cid)] = ctx - return send_chan, recv_chan + return ctx - async def send_cmd( + async def start_remote_task( self, chan: Channel, ns: str, func: str, - kwargs: dict - ) -> Tuple[str, trio.abc.ReceiveChannel]: + kwargs: dict, + msg_buffer_size: Optional[int] = None, + + ) -> Context: ''' - Send a ``'cmd'`` message to a remote actor and return a - caller id and a ``trio.Queue`` that can be used to wait for - responses delivered by the local message processing loop. + Send a ``'cmd'`` message to a remote actor, which starts + a remote task-as-function entrypoint. + + Synchronously validates the endpoint type and return a caller + side task ``Context`` that can be used to wait for responses + delivered by the local runtime's message processing loop. ''' cid = str(uuid.uuid4()) assert chan.uid - send_chan, recv_chan = self.get_memchans(chan.uid, cid) + ctx = self.get_context(chan, cid, msg_buffer_size=msg_buffer_size) log.runtime(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return cid, recv_chan + + # Wait on first response msg and validate; this should be + # immediate. + first_msg = await ctx._recv_chan.receive() + functype = first_msg.get('functype') + + if 'error' in first_msg: + raise unpack_error(first_msg, chan) + + elif functype not in ('asyncfunc', 'asyncgen', 'context'): + raise ValueError(f"{first_msg} is an invalid response packet?") + + ctx._remote_func_type = functype + return ctx async def _process_messages( self, @@ -721,7 +820,8 @@ class Actor: if msg is None: # loop terminate sentinel log.cancel( - f"Cancelling all tasks for {chan} from {chan.uid}") + f"Channerl to {chan.uid} terminated?\n" + "Cancelling all associated tasks..") for (channel, cid) in self._rpc_tasks.copy(): if channel is chan: diff --git a/tractor/_exceptions.py b/tractor/_exceptions.py index 9c3edac..79bcd3c 100644 --- a/tractor/_exceptions.py +++ b/tractor/_exceptions.py @@ -61,6 +61,10 @@ class NoRuntime(RuntimeError): "The root actor has not been initialized yet" +class StreamOverrun(trio.TooSlowError): + "This stream was overrun by sender" + + def pack_error( exc: BaseException, tb=None, diff --git a/tractor/_portal.py b/tractor/_portal.py index 70339fa..bdfef3e 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -6,7 +6,7 @@ concurrency linked tasks running in disparate memory domains. import importlib import inspect from typing import ( - Tuple, Any, Dict, Optional, Set, + Any, Optional, Callable, AsyncGenerator ) from functools import partial @@ -49,7 +49,7 @@ async def maybe_open_nursery( yield nursery -def func_deats(func: Callable) -> Tuple[str, str]: +def func_deats(func: Callable) -> tuple[str, str]: return ( func.__module__, func.__name__, @@ -98,68 +98,45 @@ class Portal: # during the portal's lifetime self._result_msg: Optional[dict] = None - # When this is set to a tuple returned from ``_submit()`` then + # When set to a ``Context`` (when _submit_for_result is called) # it is expected that ``result()`` will be awaited at some - # point. Set when _submit_for_result is called - self._expect_result: Optional[ - Tuple[str, Any, str, Dict[str, Any]] - ] = None - self._streams: Set[ReceiveMsgStream] = set() + # point. + self._expect_result: Optional[Context] = None + self._streams: set[ReceiveMsgStream] = set() self.actor = current_actor() - async def _submit( + async def _submit_for_result( self, ns: str, func: str, - kwargs, - ) -> Tuple[str, trio.MemoryReceiveChannel, str, Dict[str, Any]]: - """Submit a function to be scheduled and run by actor, return the - associated caller id, response queue, response type str, - first message packet as a tuple. - - This is an async call. - """ - # ship a function call request to the remote actor - cid, recv_chan = await self.actor.send_cmd( - self.channel, ns, func, kwargs) - - # wait on first response msg and handle (this should be - # in an immediate response) - - first_msg = await recv_chan.receive() - functype = first_msg.get('functype') - - if 'error' in first_msg: - raise unpack_error(first_msg, self.channel) - - elif functype not in ('asyncfunc', 'asyncgen', 'context'): - raise ValueError(f"{first_msg} is an invalid response packet?") - - return cid, recv_chan, functype, first_msg - - async def _submit_for_result(self, ns: str, func: str, **kwargs) -> None: + **kwargs + ) -> None: assert self._expect_result is None, \ "A pending main result has already been submitted" - self._expect_result = await self._submit(ns, func, kwargs) + self._expect_result = await self.actor.start_remote_task( + self.channel, + ns, + func, + kwargs + ) async def _return_once( self, - cid: str, - recv_chan: trio.abc.ReceiveChannel, - resptype: str, - first_msg: dict + ctx: Context, ) -> dict[str, Any]: - assert resptype == 'asyncfunc' # single response - msg = await recv_chan.receive() + assert ctx._remote_func_type == 'asyncfunc' # single response + msg = await ctx._recv_chan.receive() return msg async def result(self) -> Any: - """Return the result(s) from the remote actor's "main" task. - """ + ''' + Return the result(s) from the remote actor's "main" task. + + ''' # Check for non-rpc errors slapped on the # channel for which we always raise exc = self.channel._exc @@ -178,7 +155,9 @@ class Portal: assert self._expect_result if self._result_msg is None: - self._result_msg = await self._return_once(*self._expect_result) + self._result_msg = await self._return_once( + self._expect_result + ) return _unwrap_msg(self._result_msg, self.channel) @@ -244,7 +223,8 @@ class Portal: trio.BrokenResourceError, ): log.cancel( - f"{self.channel} for {self.channel.uid} was already closed or broken?") + f"{self.channel} for {self.channel.uid} was already " + "closed or broken?") return False async def run_from_ns( @@ -269,9 +249,14 @@ class Portal: internals. ''' - msg = await self._return_once( - *(await self._submit(namespace_path, function_name, kwargs)) + ctx = await self.actor.start_remote_task( + self.channel, + namespace_path, + function_name, + kwargs, ) + ctx._portal = self + msg = await self._return_once(ctx) return _unwrap_msg(msg, self.channel) async def run( @@ -313,10 +298,15 @@ class Portal: fn_mod_path, fn_name = func_deats(func) + ctx = await self.actor.start_remote_task( + self.channel, + fn_mod_path, + fn_name, + kwargs, + ) + ctx._portal = self return _unwrap_msg( - await self._return_once( - *(await self._submit(fn_mod_path, fn_name, kwargs)), - ), + await self._return_once(ctx), self.channel, ) @@ -337,27 +327,21 @@ class Portal: f'{async_gen_func} must be an async generator function!') fn_mod_path, fn_name = func_deats(async_gen_func) - ( - cid, - recv_chan, - functype, - first_msg - ) = await self._submit(fn_mod_path, fn_name, kwargs) - - # receive only stream - assert functype == 'asyncgen' - - ctx = Context( + ctx = await self.actor.start_remote_task( self.channel, - cid, - # do we need this to be closed implicitly? - # _recv_chan=recv_chan, - _portal=self + fn_mod_path, + fn_name, + kwargs ) + ctx._portal = self + + # ensure receive-only stream entrypoint + assert ctx._remote_func_type == 'asyncgen' + try: # deliver receive only stream async with ReceiveMsgStream( - ctx, recv_chan, + ctx, ctx._recv_chan, ) as rchan: self._streams.add(rchan) yield rchan @@ -391,8 +375,9 @@ class Portal: func: Callable, **kwargs, - ) -> AsyncGenerator[Tuple[Context, Any], None]: - '''Open an inter-actor task context. + ) -> AsyncGenerator[tuple[Context, Any], None]: + ''' + Open an inter-actor task context. This is a synchronous API which allows for deterministic setup/teardown of a remote task. The yielded ``Context`` further @@ -400,7 +385,6 @@ class Portal: and synchronized final result collection. See ``tractor.Context``. ''' - # conduct target func method structural checks if not inspect.iscoroutinefunction(func) and ( getattr(func, '_tractor_contex_function', False) @@ -408,20 +392,25 @@ class Portal: raise TypeError( f'{func} must be an async generator function!') + __tracebackhide__ = True + fn_mod_path, fn_name = func_deats(func) - recv_chan: Optional[trio.MemoryReceiveChannel] = None + ctx = await self.actor.start_remote_task( + self.channel, + fn_mod_path, + fn_name, + kwargs + ) - cid, recv_chan, functype, first_msg = await self._submit( - fn_mod_path, fn_name, kwargs) - - assert functype == 'context' - msg = await recv_chan.receive() + assert ctx._remote_func_type == 'context' + msg = await ctx._recv_chan.receive() try: # the "first" value here is delivered by the callee's # ``Context.started()`` call. first = msg['started'] + ctx._started_called = True except KeyError: assert msg.get('cid'), ("Received internal error at context?") @@ -433,19 +422,14 @@ class Portal: raise _err: Optional[BaseException] = None + ctx._portal = self + # deliver context instance and .started() msg value in open tuple. try: async with trio.open_nursery() as scope_nursery: - ctx = Context( - self.channel, - cid, - _portal=self, - _recv_chan=recv_chan, - _scope_nursery=scope_nursery, - ) + ctx._scope_nursery = scope_nursery - # pairs with handling in ``Actor._push_result()`` - # recv_chan._ctx = ctx + # do we need this? # await trio.lowlevel.checkpoint() yield ctx, first @@ -478,9 +462,10 @@ class Portal: _err = err # the context cancels itself on any cancel # causing error. - log.cancel(f'Context {ctx} sending cancel to far end') - with trio.CancelScope(shield=True): - await ctx.cancel() + log.cancel( + f'Context to {self.channel.uid} sending cancel request..') + + await ctx.cancel() raise finally: @@ -495,8 +480,9 @@ class Portal: # operating *in* this scope to have survived # we tear down the runtime feeder chan last # to avoid premature stream clobbers. - if recv_chan is not None: - await recv_chan.aclose() + if ctx._recv_chan is not None: + # should we encapsulate this in the context api? + await ctx._recv_chan.aclose() if _err: if ctx._cancel_called: @@ -513,6 +499,9 @@ class Portal: f'value from callee `{result}`' ) + # remove the context from runtime tracking + self.actor._contexts.pop((self.channel.uid, ctx.cid)) + @dataclass class LocalPortal: diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 5c22116..05a3073 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -32,9 +32,10 @@ log = get_logger(__name__) class ReceiveMsgStream(trio.abc.ReceiveChannel): - '''A IPC message stream for receiving logically sequenced values - over an inter-actor ``Channel``. This is the type returned to - a local task which entered either ``Portal.open_stream_from()`` or + ''' + A IPC message stream for receiving logically sequenced values over + an inter-actor ``Channel``. This is the type returned to a local + task which entered either ``Portal.open_stream_from()`` or ``Context.open_stream()``. Termination rules: @@ -177,7 +178,7 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # In the bidirectional case, `Context.open_stream()` will create # the `Actor._cids2qs` entry from a call to - # `Actor.get_memchans()` and will send the stop message in + # `Actor.get_context()` and will send the stop message in # ``__aexit__()`` on teardown so it **does not** need to be # called here. if not self._ctx._portal: @@ -189,7 +190,8 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): # was it shouldn't matter since it's unlikely a user # will try to re-use a stream after attemping to close # it). - await self._ctx.send_stop() + with trio.CancelScope(shield=True): + await self._ctx.send_stop() except ( trio.BrokenResourceError, @@ -282,11 +284,11 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): class MsgStream(ReceiveMsgStream, trio.abc.Channel): - """ + ''' Bidirectional message stream for use within an inter-actor actor ``Context```. - """ + ''' async def send( self, data: Any @@ -297,36 +299,58 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): # if self._eoc: # raise trio.ClosedResourceError('This stream is already ded') + if self._ctx._error: + raise self._ctx._error # from None + await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @dataclass class Context: - '''An inter-actor task communication context. + ''' + An inter-actor, ``trio`` task communication context. + + NB: This class should never be instatiated directly, it is delivered + by either runtime machinery to a remotely started task or by entering + ``Portal.open_context()``. Allows maintaining task or protocol specific state between 2 communicating actor tasks. A unique context is created on the callee side/end for every request to a remote actor from a portal. A context can be cancelled and (possibly eventually restarted) from - either side of the underlying IPC channel. - - A context can be used to open task oriented message streams and can - be thought of as an IPC aware inter-actor cancel scope. + either side of the underlying IPC channel, open task oriented + message streams and acts as an IPC aware inter-actor-task cancel + scope. ''' chan: Channel cid: str + # these are the "feeder" channels for delivering + # message values to the local task from the runtime + # msg processing loop. + _recv_chan: trio.MemoryReceiveChannel + _send_chan: trio.MemorySendChannel + + _remote_func_type: Optional[str] = None + # only set on the caller side _portal: Optional['Portal'] = None # type: ignore # noqa - _recv_chan: Optional[trio.MemoryReceiveChannel] = None _result: Optional[Any] = False + _error: Optional[BaseException] = None + + # status flags _cancel_called: bool = False + _started_called: bool = False + _started_received: bool = False + _stream_opened: bool = False # only set on the callee side _scope_nursery: Optional[trio.Nursery] = None + _backpressure: bool = False + async def send_yield(self, data: Any) -> None: warnings.warn( @@ -340,26 +364,59 @@ class Context: async def send_stop(self) -> None: await self.chan.send({'stop': True, 'cid': self.cid}) - def _error_from_remote_msg( + async def _maybe_raise_from_remote_msg( self, msg: Dict[str, Any], ) -> None: - '''Unpack and raise a msg error into the local scope + ''' + (Maybe) unpack and raise a msg error into the local scope nursery for this context. Acts as a form of "relay" for a remote error raised in the corresponding remote callee task. + ''' - assert self._scope_nursery + error = msg.get('error') + if error: + # If this is an error message from a context opened by + # ``Portal.open_context()`` we want to interrupt any ongoing + # (child) tasks within that context to be notified of the remote + # error relayed here. + # + # The reason we may want to raise the remote error immediately + # is that there is no guarantee the associated local task(s) + # will attempt to read from any locally opened stream any time + # soon. + # + # NOTE: this only applies when + # ``Portal.open_context()`` has been called since it is assumed + # (currently) that other portal APIs (``Portal.run()``, + # ``.run_in_actor()``) do their own error checking at the point + # of the call and result processing. + log.error( + f'Remote context error for {self.chan.uid}:{self.cid}:\n' + f'{msg["error"]["tb_str"]}' + ) + # await ctx._maybe_error_from_remote_msg(msg) + self._error = unpack_error(msg, self.chan) - async def raiser(): - raise unpack_error(msg, self.chan) + # TODO: tempted to **not** do this by-reraising in a + # nursery and instead cancel a surrounding scope, detect + # the cancellation, then lookup the error that was set? + if self._scope_nursery: - self._scope_nursery.start_soon(raiser) + async def raiser(): + raise self._error from None + + # from trio.testing import wait_all_tasks_blocked + # await wait_all_tasks_blocked() + if not self._scope_nursery._closed: # type: ignore + self._scope_nursery.start_soon(raiser) async def cancel(self) -> None: - '''Cancel this inter-actor-task context. + ''' + Cancel this inter-actor-task context. Request that the far side cancel it's current linked context, Timeout quickly in an attempt to sidestep 2-generals... @@ -420,9 +477,12 @@ class Context: async def open_stream( self, + backpressure: Optional[bool] = True, + msg_buffer_size: Optional[int] = None, ) -> AsyncGenerator[MsgStream, None]: - '''Open a ``MsgStream``, a bi-directional stream connected to the + ''' + Open a ``MsgStream``, a bi-directional stream connected to the cross-actor (far end) task for this ``Context``. This context manager must be entered on both the caller and @@ -455,34 +515,44 @@ class Context: f'Context around {actor.uid[0]}:{task} was already cancelled!' ) + if not self._portal and not self._started_called: + raise RuntimeError( + 'Context.started()` must be called before opening a stream' + ) + # NOTE: in one way streaming this only happens on the - # caller side inside `Actor.send_cmd()` so if you try + # caller side inside `Actor.start_remote_task()` so if you try # to send a stop from the caller to the callee in the # single-direction-stream case you'll get a lookup error # currently. - _, recv_chan = actor.get_memchans( - self.chan.uid, - self.cid + ctx = actor.get_context( + self.chan, + self.cid, + msg_buffer_size=msg_buffer_size, ) + ctx._backpressure = backpressure + assert ctx is self # XXX: If the underlying channel feeder receive mem chan has # been closed then likely client code has already exited # a ``.open_stream()`` block prior or there was some other # unanticipated error or cancellation from ``trio``. - if recv_chan._closed: + if ctx._recv_chan._closed: raise trio.ClosedResourceError( 'The underlying channel for this stream was already closed!?') async with MsgStream( ctx=self, - rx_chan=recv_chan, + rx_chan=ctx._recv_chan, ) as rchan: if self._portal: self._portal._streams.add(rchan) try: + self._stream_opened = True + # ensure we aren't cancelled before delivering # the stream # await trio.lowlevel.checkpoint() @@ -518,7 +588,7 @@ class Context: try: self._result = msg['return'] break - except KeyError: + except KeyError as msgerr: if 'yield' in msg: # far end task is still streaming to us so discard @@ -532,17 +602,36 @@ class Context: # internal error should never get here assert msg.get('cid'), ( "Received internal error at portal?") - raise unpack_error(msg, self._portal.channel) + + raise unpack_error( + msg, self._portal.channel + ) from msgerr return self._result - async def started(self, value: Optional[Any] = None) -> None: + async def started( + self, + value: Optional[Any] = None + ) -> None: + ''' + Indicate to calling actor's task that this linked context + has started and send ``value`` to the other side. + + On the calling side ``value`` is the second item delivered + in the tuple returned by ``Portal.open_context()``. + + ''' if self._portal: raise RuntimeError( f"Caller side context {self} can not call started!") + elif self._started_called: + raise RuntimeError( + f"called 'started' twice on context with {self.chan.uid}") + await self.chan.send({'started': value, 'cid': self.cid}) + self._started_called = True # TODO: do we need a restart api? # async def restart(self) -> None: