From 240f5912340bc9bfacef46cb252d6af8619e937e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 31 Jul 2021 12:10:25 -0400 Subject: [PATCH] Add 2-way streaming example to readme and scripts --- docs/README.rst | 95 ++++++++++++++++++++++++++++++++- examples/rpc_bidir_streaming.py | 72 +++++++++++++++++++++++++ tests/test_docs_examples.py | 8 ++- 3 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 examples/rpc_bidir_streaming.py diff --git a/docs/README.rst b/docs/README.rst index bd7b6af..18afd26 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -127,7 +127,8 @@ Zombie safe: self-destruct a process tree print('This process tree will self-destruct in 1 sec...') await trio.sleep(1) - # you could have done this yourself + # raise an error in root actor/process and trigger + # reaping of all minions raise Exception('Self Destructed') @@ -197,6 +198,98 @@ And, yes, there's a built-in crash handling mode B) We're hoping to add a respawn-from-repl system soon! +SC compatible bi-directional streaming +-------------------------------------- +Yes, you saw it here first; we provide 2-way streams +with reliable, transitive setup/teardown semantics. + +Our nascent api is remniscent of ``trio.Nursery.start()`` +style invocation: + +.. code:: python + + import trio + import tractor + + + @tractor.context + async def simple_rpc( + + ctx: tractor.Context, + data: int, + + ) -> None: + '''Test a small ping-pong 2-way streaming server. + + ''' + # signal to parent that we're up much like + # ``trio_typing.TaskStatus.started()`` + await ctx.started(data + 1) + + async with ctx.open_stream() as stream: + + count = 0 + async for msg in stream: + + assert msg == 'ping' + await stream.send('pong') + count += 1 + + else: + assert count == 10 + + + async def main() -> None: + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'rpc_server', + enable_modules=[__name__], + ) + + # XXX: this syntax requires py3.9 + async with ( + + portal.open_context( + simple_rpc, + data=10, + ) as (ctx, sent), + + ctx.open_stream() as stream, + ): + + assert sent == 11 + + count = 0 + # receive msgs using async for style + await stream.send('ping') + + async for msg in stream: + assert msg == 'pong' + await stream.send('ping') + count += 1 + + if count >= 9: + break + + + # explicitly teardown the daemon-actor + await portal.cancel_actor() + + + if __name__ == '__main__': + trio.run(main) + + +See original proposal and discussion in `#53`_ as well +as follow up improvements in `#223`_ that we'd love to +hear your thoughts on! + +.. _#53: https://github.com/goodboy/tractor/issues/53 +.. _#223: https://github.com/goodboy/tractor/issues/223 + + Worker poolz are easy peasy --------------------------- The initial ask from most new users is *"how do I make a worker diff --git a/examples/rpc_bidir_streaming.py b/examples/rpc_bidir_streaming.py new file mode 100644 index 0000000..7320081 --- /dev/null +++ b/examples/rpc_bidir_streaming.py @@ -0,0 +1,72 @@ +import trio +import tractor + + +@tractor.context +async def simple_rpc( + + ctx: tractor.Context, + data: int, + +) -> None: + '''Test a small ping-pong 2-way streaming server. + + ''' + # signal to parent that we're up much like + # ``trio_typing.TaskStatus.started()`` + await ctx.started(data + 1) + + async with ctx.open_stream() as stream: + + count = 0 + async for msg in stream: + + assert msg == 'ping' + await stream.send('pong') + count += 1 + + else: + assert count == 10 + + +async def main() -> None: + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'rpc_server', + enable_modules=[__name__], + ) + + # XXX: syntax requires py3.9 + async with ( + + portal.open_context( + simple_rpc, # taken from pytest parameterization + data=10, + + ) as (ctx, sent), + + ctx.open_stream() as stream, + ): + + assert sent == 11 + + count = 0 + # receive msgs using async for style + await stream.send('ping') + + async for msg in stream: + assert msg == 'pong' + await stream.send('ping') + count += 1 + + if count >= 9: + break + + # explicitly teardown the daemon-actor + await portal.cancel_actor() + + +if __name__ == '__main__': + trio.run(main) diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 632d85c..5f47419 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -84,8 +84,8 @@ def run_example_in_subproc(loglevel, testdir, arb_addr): if '__' not in f and f[0] != '_' - and 'debugging' not in p[0] - ], + and 'debugging' not in p[0]], + ids=lambda t: t[1], ) def test_example(run_example_in_subproc, example_script): @@ -98,6 +98,10 @@ def test_example(run_example_in_subproc, example_script): test_example``. """ ex_file = os.path.join(*example_script) + + if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9): + pytest.skip("2-way streaming example requires py3.9 async with syntax") + with open(ex_file, 'r') as ex: code = ex.read()