forked from goodboy/tractor
1
0
Fork 0

Stop channel based async gen streams on exit

I'm not sure how this ever worked but when a "fake" async gen
(i.e. function with special `chan`, `cid` kwargs) is completed
we need to signal the end of the stream just like with normal
async gens. Also don't fail when trying to remove tasks that were
never tracked.

Fixes #46
fix_46
Tyler Goodlet 2018-11-30 01:24:59 -05:00
parent 58ebacf0f7
commit a588047ad4
1 changed files with 11 additions and 2 deletions

View File

@ -6,7 +6,6 @@ from functools import partial
from itertools import chain from itertools import chain
import importlib import importlib
import inspect import inspect
import traceback
import uuid import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional, Union from typing import Dict, List, Tuple, Any, Optional, Union
@ -115,6 +114,10 @@ async def _invoke(
with trio.open_cancel_scope() as cs: with trio.open_cancel_scope() as cs:
task_status.started(cs) task_status.started(cs)
await coro await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': None, 'cid': cid})
else: else:
await chan.send({'functype': 'asyncfunction', 'cid': cid}) await chan.send({'functype': 'asyncfunction', 'cid': cid})
with trio.open_cancel_scope() as cs: with trio.open_cancel_scope() as cs:
@ -137,7 +140,13 @@ async def _invoke(
# RPC task bookeeping # RPC task bookeeping
tasks = actor._rpc_tasks.get(chan, None) tasks = actor._rpc_tasks.get(chan, None)
if tasks: if tasks:
tasks.remove((cs, func)) try:
tasks.remove((cs, func))
except ValueError:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warn(
f"Task {func} was likely cancelled before it was started")
if not tasks: if not tasks:
actor._rpc_tasks.pop(chan, None) actor._rpc_tasks.pop(chan, None)