diff --git a/tractor/_actor.py b/tractor/_actor.py index cb9dc7b..f8f2bdd 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -9,7 +9,7 @@ import importlib.util import inspect import uuid import typing -from typing import Dict, List, Tuple, Any, Optional +from typing import Dict, List, Tuple, Any, Optional, Union from types import ModuleType import sys import os @@ -48,7 +48,9 @@ async def _invoke( chan: Channel, func: typing.Callable, kwargs: Dict[str, Any], - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[ + Union[trio.CancelScope, BaseException] + ] = trio.TASK_STATUS_IGNORED, ): """Invoke local func and deliver result(s) over provided channel. """ @@ -155,6 +157,7 @@ async def _invoke( if cs is None: # error is from above code not from rpc invocation task_status.started(err) + finally: # RPC task bookeeping try: diff --git a/tractor/_portal.py b/tractor/_portal.py index 6c026f8..9ad4c2f 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -204,8 +204,8 @@ class Portal: fn_name: Optional[str] = None, **kwargs ) -> Any: - """Submit a remote function to be scheduled and run by actor, - wrap and return its (stream of) result(s). + """Submit a remote function to be scheduled and run by actor, in + a new task, wrap and return its (stream of) result(s). This is a blocking call and returns either a value from the remote rpc task or a local async generator instance. diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9ed0f14..e683216 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -37,7 +37,7 @@ def current_context(): def stream(func): - """Mark an async function as a streaming routine. + """Mark an async function as a streaming routine with ``@stream``. """ func._tractor_stream_function = True sig = inspect.signature(func) diff --git a/tractor/msg.py b/tractor/msg.py index 4184333..5b343b6 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -3,7 +3,7 @@ Messaging pattern APIs and helpers. """ import inspect import typing -from typing import Dict, Any, Set, Union, Callable +from typing import Dict, Any, Set, Callable from functools import partial from async_generator import aclosing @@ -90,7 +90,7 @@ def modify_subs(topics2ctxs, topics, ctx): _pub_state: Dict[str, dict] = {} -_pubtask2lock: Dict[str, dict] = {} +_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {} def pub( @@ -184,7 +184,7 @@ def pub( if wrapped is None: return partial(pub, tasks=tasks) - task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = {} + task2lock: Dict[str, trio.StrictFIFOLock] = {} for name in tasks: task2lock[name] = trio.StrictFIFOLock()