diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index b57cec7..4c74a9a 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -16,16 +16,99 @@ from tractor._exceptions import StreamOverrun from conftest import tractor_test -# the general stream semantics are -# - normal termination: far end relays a stop message which -# terminates an ongoing ``MsgStream`` iteration -# - cancel termination: context is cancelled on either side cancelling -# the "linked" inter-actor task context +# ``Context`` semantics are as follows, +# ------------------------------------ + +# - standard setup/teardown: +# ``Portal.open_context()`` starts a new +# remote task context in another actor. The target actor's task must +# call ``Context.started()`` to unblock this entry on the caller side. +# the callee task executes until complete and returns a final value +# which is delivered to the caller side and retreived via +# ``Context.result()``. + +# - cancel termination: +# context can be cancelled on either side where either end's task can +# call ``Context.cancel()`` which raises a local ``trio.Cancelled`` +# and sends a task cancel request to the remote task which in turn +# raises a ``trio.Cancelled`` in that scope, catches it, and re-raises +# as ``ContextCancelled``. This is then caught by +# ``Portal.open_context()``'s exit and we get a graceful termination +# of the linked tasks. + +# - error termination: +# error is caught after all context-cancel-scope tasks are cancelled +# via regular ``trio`` cancel scope semantics, error is sent to other +# side and unpacked as a `RemoteActorError`. + + +# ``Context.open_stream() as stream: MsgStream:`` msg semantics are: +# ----------------------------------------------------------------- + +# - either side can ``.send()`` which emits a 'yield' msgs and delivers +# a value to the a ``MsgStream.receive()`` call. + +# - stream closure: one end relays a 'stop' message which terminates an +# ongoing ``MsgStream`` iteration. + +# - cancel/error termination: as per the context semantics above but +# with implicit stream closure on the cancelling end. _state: bool = False +@tractor.context +async def too_many_starteds( + ctx: tractor.Context, +) -> None: + ''' + Call ``Context.started()`` more then once (an error). + + ''' + await ctx.started() + try: + await ctx.started() + except RuntimeError: + raise + + +@tractor.context +async def not_started_but_stream_opened( + ctx: tractor.Context, +) -> None: + ''' + Enter ``Context.open_stream()`` without calling ``.started()``. + + ''' + try: + async with ctx.open_stream(): + assert 0 + except RuntimeError: + raise + + +@pytest.mark.parametrize( + 'target', + [too_many_starteds, not_started_but_stream_opened], + ids='misuse_type={}'.format, +) +def test_started_misuse(target): + + async def main(): + async with tractor.open_nursery() as n: + portal = await n.start_actor( + target.__name__, + enable_modules=[__name__], + ) + + async with portal.open_context(target) as (ctx, sent): + await trio.sleep(1) + + with pytest.raises(tractor.RemoteActorError): + trio.run(main) + + @tractor.context async def simple_setup_teardown( @@ -378,10 +461,18 @@ async def cancel_self( _state = True await ctx.cancel() + + # should inline raise immediately + try: + async with ctx.open_stream(): + pass + except ContextCancelled: + pass + + # check a real ``trio.Cancelled`` is raised on a checkpoint try: with trio.fail_after(0.1): await trio.sleep_forever() - except trio.Cancelled: raise @@ -420,34 +511,6 @@ async def test_callee_cancels_before_started(): await portal.cancel_actor() -@tractor.context -async def really_started( - ctx: tractor.Context, -) -> None: - await ctx.started() - try: - await ctx.started() - except RuntimeError: - raise - - -def test_started_called_more_then_once(): - - async def main(): - async with tractor.open_nursery() as n: - portal = await n.start_actor( - 'too_much_starteds', - enable_modules=[__name__], - ) - - async with portal.open_context(really_started) as (ctx, sent): - await trio.sleep(1) - # pass - - with pytest.raises(tractor.RemoteActorError): - trio.run(main) - - @tractor.context async def never_open_stream(