From a588047ad47d88e98c303ad49a9b97d8ace48559 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Nov 2018 01:24:59 -0500 Subject: [PATCH] Stop channel based async gen streams on exit I'm not sure how this ever worked but when a "fake" async gen (i.e. function with special `chan`, `cid` kwargs) is completed we need to signal the end of the stream just like with normal async gens. Also don't fail when trying to remove tasks that were never tracked. Fixes #46 --- tractor/_actor.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 1e7d173..f1beb45 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -6,7 +6,6 @@ from functools import partial from itertools import chain import importlib import inspect -import traceback import uuid import typing from typing import Dict, List, Tuple, Any, Optional, Union @@ -115,6 +114,10 @@ async def _invoke( with trio.open_cancel_scope() as cs: task_status.started(cs) await coro + if not cs.cancelled_caught: + # task was not cancelled so we can instruct the + # far end async gen to tear down + await chan.send({'stop': None, 'cid': cid}) else: await chan.send({'functype': 'asyncfunction', 'cid': cid}) with trio.open_cancel_scope() as cs: @@ -137,7 +140,13 @@ async def _invoke( # RPC task bookeeping tasks = actor._rpc_tasks.get(chan, None) if tasks: - tasks.remove((cs, func)) + try: + tasks.remove((cs, func)) + except ValueError: + # If we're cancelled before the task returns then the + # cancel scope will not have been inserted yet + log.warn( + f"Task {func} was likely cancelled before it was started") if not tasks: actor._rpc_tasks.pop(chan, None)