forked from goodboy/tractor
Add 2-way streaming example to readme and scripts
parent
69bbf6a957
commit
240f591234
|
@ -127,7 +127,8 @@ Zombie safe: self-destruct a process tree
|
||||||
print('This process tree will self-destruct in 1 sec...')
|
print('This process tree will self-destruct in 1 sec...')
|
||||||
await trio.sleep(1)
|
await trio.sleep(1)
|
||||||
|
|
||||||
# you could have done this yourself
|
# raise an error in root actor/process and trigger
|
||||||
|
# reaping of all minions
|
||||||
raise Exception('Self Destructed')
|
raise Exception('Self Destructed')
|
||||||
|
|
||||||
|
|
||||||
|
@ -197,6 +198,98 @@ And, yes, there's a built-in crash handling mode B)
|
||||||
We're hoping to add a respawn-from-repl system soon!
|
We're hoping to add a respawn-from-repl system soon!
|
||||||
|
|
||||||
|
|
||||||
|
SC compatible bi-directional streaming
|
||||||
|
--------------------------------------
|
||||||
|
Yes, you saw it here first; we provide 2-way streams
|
||||||
|
with reliable, transitive setup/teardown semantics.
|
||||||
|
|
||||||
|
Our nascent api is remniscent of ``trio.Nursery.start()``
|
||||||
|
style invocation:
|
||||||
|
|
||||||
|
.. code:: python
|
||||||
|
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def simple_rpc(
|
||||||
|
|
||||||
|
ctx: tractor.Context,
|
||||||
|
data: int,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''Test a small ping-pong 2-way streaming server.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# signal to parent that we're up much like
|
||||||
|
# ``trio_typing.TaskStatus.started()``
|
||||||
|
await ctx.started(data + 1)
|
||||||
|
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
async for msg in stream:
|
||||||
|
|
||||||
|
assert msg == 'ping'
|
||||||
|
await stream.send('pong')
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
else:
|
||||||
|
assert count == 10
|
||||||
|
|
||||||
|
|
||||||
|
async def main() -> None:
|
||||||
|
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
|
portal = await n.start_actor(
|
||||||
|
'rpc_server',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: this syntax requires py3.9
|
||||||
|
async with (
|
||||||
|
|
||||||
|
portal.open_context(
|
||||||
|
simple_rpc,
|
||||||
|
data=10,
|
||||||
|
) as (ctx, sent),
|
||||||
|
|
||||||
|
ctx.open_stream() as stream,
|
||||||
|
):
|
||||||
|
|
||||||
|
assert sent == 11
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
# receive msgs using async for style
|
||||||
|
await stream.send('ping')
|
||||||
|
|
||||||
|
async for msg in stream:
|
||||||
|
assert msg == 'pong'
|
||||||
|
await stream.send('ping')
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
if count >= 9:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
# explicitly teardown the daemon-actor
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
See original proposal and discussion in `#53`_ as well
|
||||||
|
as follow up improvements in `#223`_ that we'd love to
|
||||||
|
hear your thoughts on!
|
||||||
|
|
||||||
|
.. _#53: https://github.com/goodboy/tractor/issues/53
|
||||||
|
.. _#223: https://github.com/goodboy/tractor/issues/223
|
||||||
|
|
||||||
|
|
||||||
Worker poolz are easy peasy
|
Worker poolz are easy peasy
|
||||||
---------------------------
|
---------------------------
|
||||||
The initial ask from most new users is *"how do I make a worker
|
The initial ask from most new users is *"how do I make a worker
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def simple_rpc(
|
||||||
|
|
||||||
|
ctx: tractor.Context,
|
||||||
|
data: int,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''Test a small ping-pong 2-way streaming server.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# signal to parent that we're up much like
|
||||||
|
# ``trio_typing.TaskStatus.started()``
|
||||||
|
await ctx.started(data + 1)
|
||||||
|
|
||||||
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
async for msg in stream:
|
||||||
|
|
||||||
|
assert msg == 'ping'
|
||||||
|
await stream.send('pong')
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
else:
|
||||||
|
assert count == 10
|
||||||
|
|
||||||
|
|
||||||
|
async def main() -> None:
|
||||||
|
|
||||||
|
async with tractor.open_nursery() as n:
|
||||||
|
|
||||||
|
portal = await n.start_actor(
|
||||||
|
'rpc_server',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
|
||||||
|
# XXX: syntax requires py3.9
|
||||||
|
async with (
|
||||||
|
|
||||||
|
portal.open_context(
|
||||||
|
simple_rpc, # taken from pytest parameterization
|
||||||
|
data=10,
|
||||||
|
|
||||||
|
) as (ctx, sent),
|
||||||
|
|
||||||
|
ctx.open_stream() as stream,
|
||||||
|
):
|
||||||
|
|
||||||
|
assert sent == 11
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
# receive msgs using async for style
|
||||||
|
await stream.send('ping')
|
||||||
|
|
||||||
|
async for msg in stream:
|
||||||
|
assert msg == 'pong'
|
||||||
|
await stream.send('ping')
|
||||||
|
count += 1
|
||||||
|
|
||||||
|
if count >= 9:
|
||||||
|
break
|
||||||
|
|
||||||
|
# explicitly teardown the daemon-actor
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
trio.run(main)
|
|
@ -84,8 +84,8 @@ def run_example_in_subproc(loglevel, testdir, arb_addr):
|
||||||
|
|
||||||
if '__' not in f
|
if '__' not in f
|
||||||
and f[0] != '_'
|
and f[0] != '_'
|
||||||
and 'debugging' not in p[0]
|
and 'debugging' not in p[0]],
|
||||||
],
|
|
||||||
ids=lambda t: t[1],
|
ids=lambda t: t[1],
|
||||||
)
|
)
|
||||||
def test_example(run_example_in_subproc, example_script):
|
def test_example(run_example_in_subproc, example_script):
|
||||||
|
@ -98,6 +98,10 @@ def test_example(run_example_in_subproc, example_script):
|
||||||
test_example``.
|
test_example``.
|
||||||
"""
|
"""
|
||||||
ex_file = os.path.join(*example_script)
|
ex_file = os.path.join(*example_script)
|
||||||
|
|
||||||
|
if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9):
|
||||||
|
pytest.skip("2-way streaming example requires py3.9 async with syntax")
|
||||||
|
|
||||||
with open(ex_file, 'r') as ex:
|
with open(ex_file, 'r') as ex:
|
||||||
code = ex.read()
|
code = ex.read()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue