diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py index 31eff62..1650b58 100644 --- a/examples/full_fledged_streaming_service.py +++ b/examples/full_fledged_streaming_service.py @@ -7,7 +7,7 @@ import tractor async def stream_data(seed): for i in range(seed): yield i - await trio.sleep(0) # trigger scheduler + await trio.sleep(0.0001) # trigger scheduler # this is the third actor; the aggregator diff --git a/tests/test_2way.py b/tests/test_2way.py index c038ae4..410c299 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -386,7 +386,8 @@ async def cancel_self( @tractor_test async def test_callee_cancels_before_started(): - '''callee calls `Context.cancel()` while streaming and caller + ''' + Callee calls `Context.cancel()` while streaming and caller sees stream terminated in `ContextCancelled`. ''' @@ -420,9 +421,10 @@ async def simple_rpc( data: int, ) -> None: - """Test a small ping-pong server. + ''' + Test a small ping-pong server. - """ + ''' # signal to parent that we're up await ctx.started(data + 1) @@ -480,9 +482,10 @@ async def simple_rpc_with_forloop( [simple_rpc, simple_rpc_with_forloop], ) def test_simple_rpc(server_func, use_async_for): - """The simplest request response pattern. + ''' + The simplest request response pattern. - """ + ''' async def main(): async with tractor.open_nursery() as n: diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 38fbee4..baee54e 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -132,7 +132,7 @@ async def stream_data(seed): yield i # trigger scheduler to simulate practical usage - await trio.sleep(0) + await trio.sleep(0.0001) # this is the third actor; the aggregator diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index b18a40e..4c3d1ff 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -83,7 +83,7 @@ async def open_sequence_streamer( ) as (ctx, first): assert first is None - async with ctx.open_stream() as stream: + async with ctx.open_stream(backpressure=True) as stream: yield stream await portal.cancel_actor()