forked from goodboy/tractor
1
0
Fork 0

De-compact async with tuple on 3.8-

Turns out can't use the nicer syntax before python 3.9 (even though it
doesn't seem documented anywhere?).

Relates to #207
stream_contexts
Tyler Goodlet 2021-04-28 14:35:25 -04:00
parent 2498a4963b
commit b1f657e246
3 changed files with 40 additions and 41 deletions

View File

@ -29,13 +29,13 @@ async def aggregate(seed):
send_chan, recv_chan = trio.open_memory_channel(500) send_chan, recv_chan = trio.open_memory_channel(500)
async def push_to_chan(portal, send_chan): async def push_to_chan(portal, send_chan):
async with (
send_chan, # TODO: https://github.com/goodboy/tractor/issues/207
portal.open_stream_from(stream_data, seed=seed) as stream, async with send_chan:
): async with portal.open_stream_from(stream_data, seed=seed) as stream:
async for value in stream: async for value in stream:
# leverage trio's built-in backpressure # leverage trio's built-in backpressure
await send_chan.send(value) await send_chan.send(value)
print(f"FINISHED ITERATING {portal.channel.uid}") print(f"FINISHED ITERATING {portal.channel.uid}")

View File

@ -268,28 +268,30 @@ async def close_chans_before_nursery(
portal2 = await tn.start_actor( portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__]) 'consumer2', enable_modules=[__name__])
async with ( # TODO: compact this back as was in last commit once
portal1.open_stream_from(stream_forever) as agen1, # 3.9+, see https://github.com/goodboy/tractor/issues/207
portal2.open_stream_from(stream_forever) as agen2, async with portal1.open_stream_from(stream_forever) as agen1:
): async with portal2.open_stream_from(
async with trio.open_nursery() as n: stream_forever
n.start_soon(streamer, agen1) ) as agen2:
n.start_soon(cancel, use_signal, .5) async with trio.open_nursery() as n:
try: n.start_soon(streamer, agen1)
await streamer(agen2) n.start_soon(cancel, use_signal, .5)
finally: try:
# Kill the root nursery thus resulting in await streamer(agen2)
# normal arbiter channel ops to fail during finally:
# teardown. It doesn't seem like this is # Kill the root nursery thus resulting in
# reliably triggered by an external SIGINT. # normal arbiter channel ops to fail during
# tractor.current_actor()._root_nursery.cancel_scope.cancel() # teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()
# XXX: THIS IS THE KEY THING that happens # XXX: THIS IS THE KEY THING that happens
# **before** exiting the actor nursery block # **before** exiting the actor nursery block
# also kill off channels cuz why not # also kill off channels cuz why not
await agen1.aclose() await agen1.aclose()
await agen2.aclose() await agen2.aclose()
finally: finally:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trio.sleep(1) await trio.sleep(1)

View File

@ -60,14 +60,13 @@ async def subs(
def pred(i): def pred(i):
return isinstance(i, int) return isinstance(i, int)
# TODO: https://github.com/goodboy/tractor/issues/207
async with tractor.find_actor(pub_actor_name) as portal: async with tractor.find_actor(pub_actor_name) as portal:
async with ( async with portal.open_stream_from(
portal.open_stream_from( pubber,
pubber, topics=which,
topics=which, seed=seed,
seed=seed, ) as stream:
) as stream
):
task_status.started(stream) task_status.started(stream)
times = 10 times = 10
count = 0 count = 0
@ -81,13 +80,11 @@ async def subs(
await stream.aclose() await stream.aclose()
async with ( async with portal.open_stream_from(
portal.open_stream_from( pubber,
pubber, topics=['odd'],
topics=['odd'], seed=seed,
seed=seed, ) as stream:
) as stream
):
await stream.__anext__() await stream.__anext__()
count = 0 count = 0
# async with aclosing(stream) as stream: # async with aclosing(stream) as stream: