forked from goodboy/tractor
Drop stream exhaustion; no longer needed
parent
3e19fd311b
commit
ad9256bcdb
|
@ -98,17 +98,11 @@ async def exhaust_portal(
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
log.debug(f"Waiting on final result from {actor.uid}")
|
log.debug(f"Waiting on final result from {actor.uid}")
|
||||||
final = res = await portal.result()
|
|
||||||
# if it's an async-gen then alert that we're cancelling it
|
# XXX: streams should never be reaped here since they should
|
||||||
if inspect.isasyncgen(res):
|
# always be established and shutdown using a context manager api
|
||||||
final = []
|
final = await portal.result()
|
||||||
log.warning(
|
|
||||||
f"Blindly consuming asyncgen for {actor.uid}")
|
|
||||||
with trio.fail_after(1):
|
|
||||||
async with aclosing(res) as agen:
|
|
||||||
async for item in agen:
|
|
||||||
log.debug(f"Consuming item {item}")
|
|
||||||
final.append(item)
|
|
||||||
except (Exception, trio.MultiError) as err:
|
except (Exception, trio.MultiError) as err:
|
||||||
# we reraise in the parent task via a ``trio.MultiError``
|
# we reraise in the parent task via a ``trio.MultiError``
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue