Modernize streaming example script

- add typing,
- apply multi-line call style,
- use 'cancel' log level,
- enable debug mode.
runtime_to_msgspec
Tyler Goodlet 2024-05-09 16:51:51 -04:00
parent 5cb0cc0f0b
commit d2dee87b36
1 changed files with 31 additions and 14 deletions

View File

@ -1,6 +1,11 @@
import time import time
import trio import trio
import tractor import tractor
from tractor import (
ActorNursery,
MsgStream,
Portal,
)
# this is the first 2 actors, streamer_1 and streamer_2 # this is the first 2 actors, streamer_1 and streamer_2
@ -12,14 +17,18 @@ async def stream_data(seed):
# this is the third actor; the aggregator # this is the third actor; the aggregator
async def aggregate(seed): async def aggregate(seed):
"""Ensure that the two streams we receive match but only stream '''
Ensure that the two streams we receive match but only stream
a single set of values to the parent. a single set of values to the parent.
"""
async with tractor.open_nursery() as nursery: '''
portals = [] an: ActorNursery
async with tractor.open_nursery() as an:
portals: list[Portal] = []
for i in range(1, 3): for i in range(1, 3):
# fork point
portal = await nursery.start_actor( # fork/spawn call
portal = await an.start_actor(
name=f'streamer_{i}', name=f'streamer_{i}',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -43,7 +52,11 @@ async def aggregate(seed):
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for portal in portals: for portal in portals:
n.start_soon(push_to_chan, portal, send_chan.clone()) n.start_soon(
push_to_chan,
portal,
send_chan.clone(),
)
# close this local task's reference to send side # close this local task's reference to send side
await send_chan.aclose() await send_chan.aclose()
@ -60,7 +73,7 @@ async def aggregate(seed):
print("FINISHED ITERATING in aggregator") print("FINISHED ITERATING in aggregator")
await nursery.cancel() await an.cancel()
print("WAITING on `ActorNursery` to finish") print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!") print("AGGREGATOR COMPLETE!")
@ -75,18 +88,21 @@ async def main() -> list[int]:
''' '''
# yes, a nursery which spawns `trio`-"actors" B) # yes, a nursery which spawns `trio`-"actors" B)
nursery: tractor.ActorNursery an: ActorNursery
async with tractor.open_nursery() as nursery: async with tractor.open_nursery(
loglevel='cancel',
debug_mode=True,
) as an:
seed = int(1e3) seed = int(1e3)
pre_start = time.time() pre_start = time.time()
portal: tractor.Portal = await nursery.start_actor( portal: Portal = await an.start_actor(
name='aggregator', name='aggregator',
enable_modules=[__name__], enable_modules=[__name__],
) )
stream: tractor.MsgStream stream: MsgStream
async with portal.open_stream_from( async with portal.open_stream_from(
aggregate, aggregate,
seed=seed, seed=seed,
@ -95,11 +111,12 @@ async def main() -> list[int]:
start = time.time() start = time.time()
# the portal call returns exactly what you'd expect # the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally # as if the remote "aggregate" function was called locally
result_stream = [] result_stream: list[int] = []
async for value in stream: async for value in stream:
result_stream.append(value) result_stream.append(value)
await portal.cancel_actor() cancelled: bool = await portal.cancel_actor()
assert cancelled
print(f"STREAM TIME = {time.time() - start}") print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}") print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")