diff --git a/examples/full_fledged_streaming_service.py b/examples/full_fledged_streaming_service.py index 51978c0..126eed9 100644 --- a/examples/full_fledged_streaming_service.py +++ b/examples/full_fledged_streaming_service.py @@ -29,13 +29,13 @@ async def aggregate(seed): send_chan, recv_chan = trio.open_memory_channel(500) async def push_to_chan(portal, send_chan): - async with ( - send_chan, - portal.open_stream_from(stream_data, seed=seed) as stream, - ): - async for value in stream: - # leverage trio's built-in backpressure - await send_chan.send(value) + + # TODO: https://github.com/goodboy/tractor/issues/207 + async with send_chan: + async with portal.open_stream_from(stream_data, seed=seed) as stream: + async for value in stream: + # leverage trio's built-in backpressure + await send_chan.send(value) print(f"FINISHED ITERATING {portal.channel.uid}") diff --git a/tests/test_discovery.py b/tests/test_discovery.py index d1f8474..af03ce6 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -268,28 +268,30 @@ async def close_chans_before_nursery( portal2 = await tn.start_actor( 'consumer2', enable_modules=[__name__]) - async with ( - portal1.open_stream_from(stream_forever) as agen1, - portal2.open_stream_from(stream_forever) as agen2, - ): - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - # tractor.current_actor()._root_nursery.cancel_scope.cancel() + # TODO: compact this back as was in last commit once + # 3.9+, see https://github.com/goodboy/tractor/issues/207 + async with portal1.open_stream_from(stream_forever) as agen1: + async with portal2.open_stream_from( + stream_forever + ) as agen2: + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() - # XXX: THIS IS THE KEY THING that happens - # **before** exiting the actor nursery block + # XXX: THIS IS THE KEY THING that happens + # **before** exiting the actor nursery block - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() finally: with trio.CancelScope(shield=True): await trio.sleep(1) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 48e65b2..0d4c62d 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -60,14 +60,13 @@ async def subs( def pred(i): return isinstance(i, int) + # TODO: https://github.com/goodboy/tractor/issues/207 async with tractor.find_actor(pub_actor_name) as portal: - async with ( - portal.open_stream_from( - pubber, - topics=which, - seed=seed, - ) as stream - ): + async with portal.open_stream_from( + pubber, + topics=which, + seed=seed, + ) as stream: task_status.started(stream) times = 10 count = 0 @@ -81,13 +80,11 @@ async def subs( await stream.aclose() - async with ( - portal.open_stream_from( - pubber, - topics=['odd'], - seed=seed, - ) as stream - ): + async with portal.open_stream_from( + pubber, + topics=['odd'], + seed=seed, + ) as stream: await stream.__anext__() count = 0 # async with aclosing(stream) as stream: