Merge pull request #47 from tgoodlet/fix_46

Stop channel based async gen streams on exit
remote_module_errors
goodboy 2018-11-30 08:19:21 -05:00 committed by GitHub
commit 1a81ef286f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 11 additions and 2 deletions

View File

@ -6,7 +6,6 @@ from functools import partial
from itertools import chain from itertools import chain
import importlib import importlib
import inspect import inspect
import traceback
import uuid import uuid
import typing import typing
from typing import Dict, List, Tuple, Any, Optional, Union from typing import Dict, List, Tuple, Any, Optional, Union
@ -115,6 +114,10 @@ async def _invoke(
with trio.open_cancel_scope() as cs: with trio.open_cancel_scope() as cs:
task_status.started(cs) task_status.started(cs)
await coro await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': None, 'cid': cid})
else: else:
await chan.send({'functype': 'asyncfunction', 'cid': cid}) await chan.send({'functype': 'asyncfunction', 'cid': cid})
with trio.open_cancel_scope() as cs: with trio.open_cancel_scope() as cs:
@ -137,7 +140,13 @@ async def _invoke(
# RPC task bookeeping # RPC task bookeeping
tasks = actor._rpc_tasks.get(chan, None) tasks = actor._rpc_tasks.get(chan, None)
if tasks: if tasks:
tasks.remove((cs, func)) try:
tasks.remove((cs, func))
except ValueError:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warn(
f"Task {func} was likely cancelled before it was started")
if not tasks: if not tasks:
actor._rpc_tasks.pop(chan, None) actor._rpc_tasks.pop(chan, None)