Update all examples to new streaming API

stream_contexts
Tyler Goodlet 2021-04-28 11:54:56 -04:00
parent f59346d854
commit 5a5e6baad1
5 changed files with 33 additions and 22 deletions

View File

@ -24,7 +24,8 @@ async def main():
# this async for loop streams values from the above # this async for loop streams values from the above
# async generator running in a separate process # async generator running in a separate process
async for letter in await portal.run(stream_forever): async with portal.open_stream_from(stream_forever) as stream:
async for letter in stream:
print(letter) print(letter)
# we support trio's cancellation system # we support trio's cancellation system

View File

@ -26,7 +26,7 @@ async def main():
p1 = await n.start_actor('name_error', enable_modules=[__name__]) p1 = await n.start_actor('name_error', enable_modules=[__name__])
# retreive results # retreive results
stream = await p0.run(breakpoint_forever) async with p0.open_stream_from(breakpoint_forever) as stream:
await p1.run(name_error) await p1.run(name_error)

View File

@ -21,4 +21,4 @@ async def main():
if __name__ == '__main__': if __name__ == '__main__':
tractor.run(main, debug_mode=True) tractor.run(main, debug_mode=True, loglevel='debug')

View File

@ -21,7 +21,7 @@ async def aggregate(seed):
# fork point # fork point
portal = await nursery.start_actor( portal = await nursery.start_actor(
name=f'streamer_{i}', name=f'streamer_{i}',
rpc_module_paths=[__name__], enable_modules=[__name__],
) )
portals.append(portal) portals.append(portal)
@ -29,8 +29,11 @@ async def aggregate(seed):
send_chan, recv_chan = trio.open_memory_channel(500) send_chan, recv_chan = trio.open_memory_channel(500)
async def push_to_chan(portal, send_chan): async def push_to_chan(portal, send_chan):
async with send_chan: async with (
async for value in await portal.run(stream_data, seed=seed): send_chan,
portal.open_stream_from(stream_data, seed=seed) as stream,
):
async for value in stream:
# leverage trio's built-in backpressure # leverage trio's built-in backpressure
await send_chan.send(value) await send_chan.send(value)
@ -71,19 +74,25 @@ async def main():
import time import time
pre_start = time.time() pre_start = time.time()
portal = await nursery.run_in_actor( portal = await nursery.start_actor(
aggregate,
name='aggregator', name='aggregator',
seed=seed, enable_modules=[__name__],
) )
async with portal.open_stream_from(
aggregate,
seed=seed,
) as stream:
start = time.time() start = time.time()
# the portal call returns exactly what you'd expect # the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally # as if the remote "aggregate" function was called locally
result_stream = [] result_stream = []
async for value in await portal.result(): async for value in stream:
result_stream.append(value) result_stream.append(value)
await portal.cancel_actor()
print(f"STREAM TIME = {time.time() - start}") print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
assert result_stream == list(range(seed)) assert result_stream == list(range(seed))

View File

@ -15,7 +15,8 @@ async def stream_data(seed=10):
async def stream_from_portal(p, consumed): async def stream_from_portal(p, consumed):
async for item in await p.run(stream_data): async with p.open_stream_from(stream_data) as stream:
async for item in stream:
if item in consumed: if item in consumed:
consumed.remove(item) consumed.remove(item)
else: else: