From 5a5e6baad1e8396d51038e9fbf0103986aeec912 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 11:54:56 -0400 Subject: [PATCH] Update all examples to new streaming API --- examples/asynchronous_generators.py | 5 +-- examples/debugging/multi_daemon_subactors.py | 4 +-- examples/debugging/subactor_breakpoint.py | 2 +- examples/full_fledged_streaming_service.py | 33 +++++++++++++------- examples/multiple_streams_one_portal.py | 11 ++++--- 5 files changed, 33 insertions(+), 22 deletions(-) diff --git a/examples/asynchronous_generators.py b/examples/asynchronous_generators.py index 102b29a..47ee136 100644 --- a/examples/asynchronous_generators.py +++ b/examples/asynchronous_generators.py @@ -24,8 +24,9 @@ async def main(): # this async for loop streams values from the above # async generator running in a separate process - async for letter in await portal.run(stream_forever): - print(letter) + async with portal.open_stream_from(stream_forever) as stream: + async for letter in stream: + print(letter) # we support trio's cancellation system assert cancel_scope.cancelled_caught diff --git a/examples/debugging/multi_daemon_subactors.py b/examples/debugging/multi_daemon_subactors.py index eadb4c1..c37b879 100644 --- a/examples/debugging/multi_daemon_subactors.py +++ b/examples/debugging/multi_daemon_subactors.py @@ -26,8 +26,8 @@ async def main(): p1 = await n.start_actor('name_error', enable_modules=[__name__]) # retreive results - stream = await p0.run(breakpoint_forever) - await p1.run(name_error) + async with p0.open_stream_from(breakpoint_forever) as stream: + await p1.run(name_error) if __name__ == '__main__': diff --git a/examples/debugging/subactor_breakpoint.py b/examples/debugging/subactor_breakpoint.py index cb16004..d880404 100644 --- a/examples/debugging/subactor_breakpoint.py +++ b/examples/debugging/subactor_breakpoint.py @@ -21,4 +21,4 @@ async def main(): if __name__ == '__main__': - tractor.run(main, debug_mode=True) + tractor.run(main, debug_mode=True, loglevel='debug') diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py index 0ed5f66..51978c0 100644 --- a/examples/full_fledged_streaming_service.py +++ b/examples/full_fledged_streaming_service.py @@ -21,7 +21,7 @@ async def aggregate(seed): # fork point portal = await nursery.start_actor( name=f'streamer_{i}', - rpc_module_paths=[__name__], + enable_modules=[__name__], ) portals.append(portal) @@ -29,8 +29,11 @@ async def aggregate(seed): send_chan, recv_chan = trio.open_memory_channel(500) async def push_to_chan(portal, send_chan): - async with send_chan: - async for value in await portal.run(stream_data, seed=seed): + async with ( + send_chan, + portal.open_stream_from(stream_data, seed=seed) as stream, + ): + async for value in stream: # leverage trio's built-in backpressure await send_chan.send(value) @@ -71,18 +74,24 @@ async def main(): import time pre_start = time.time() - portal = await nursery.run_in_actor( - aggregate, + portal = await nursery.start_actor( name='aggregator', - seed=seed, + enable_modules=[__name__], ) - start = time.time() - # the portal call returns exactly what you'd expect - # as if the remote "aggregate" function was called locally - result_stream = [] - async for value in await portal.result(): - result_stream.append(value) + async with portal.open_stream_from( + aggregate, + seed=seed, + ) as stream: + + start = time.time() + # the portal call returns exactly what you'd expect + # as if the remote "aggregate" function was called locally + result_stream = [] + async for value in stream: + result_stream.append(value) + + await portal.cancel_actor() print(f"STREAM TIME = {time.time() - start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") diff --git a/examples/multiple_streams_one_portal.py b/examples/multiple_streams_one_portal.py index 354ec66..3e592a4 100644 --- a/examples/multiple_streams_one_portal.py +++ b/examples/multiple_streams_one_portal.py @@ -15,11 +15,12 @@ async def stream_data(seed=10): async def stream_from_portal(p, consumed): - async for item in await p.run(stream_data): - if item in consumed: - consumed.remove(item) - else: - consumed.append(item) + async with p.open_stream_from(stream_data) as stream: + async for item in stream: + if item in consumed: + consumed.remove(item) + else: + consumed.append(item) async def main():