From e546ead2ff88a07ab10da76b67c6144095ebd1fc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 14 Jan 2021 18:18:44 -0500 Subject: [PATCH] Pub sub internals type fixes --- tractor/_actor.py | 7 +++++-- tractor/_portal.py | 4 ++-- tractor/_streaming.py | 2 +- tractor/msg.py | 6 +++--- 4 files changed, 11 insertions(+), 8 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index cb9dc7b..f8f2bdd 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -9,7 +9,7 @@ import importlib.util import inspect import uuid import typing -from typing import Dict, List, Tuple, Any, Optional +from typing import Dict, List, Tuple, Any, Optional, Union from types import ModuleType import sys import os @@ -48,7 +48,9 @@ async def _invoke( chan: Channel, func: typing.Callable, kwargs: Dict[str, Any], - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[ + Union[trio.CancelScope, BaseException] + ] = trio.TASK_STATUS_IGNORED, ): """Invoke local func and deliver result(s) over provided channel. """ @@ -155,6 +157,7 @@ async def _invoke( if cs is None: # error is from above code not from rpc invocation task_status.started(err) + finally: # RPC task bookeeping try: diff --git a/tractor/_portal.py b/tractor/_portal.py index 6c026f8..9ad4c2f 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -204,8 +204,8 @@ class Portal: fn_name: Optional[str] = None, **kwargs ) -> Any: - """Submit a remote function to be scheduled and run by actor, - wrap and return its (stream of) result(s). + """Submit a remote function to be scheduled and run by actor, in + a new task, wrap and return its (stream of) result(s). This is a blocking call and returns either a value from the remote rpc task or a local async generator instance. diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9ed0f14..e683216 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -37,7 +37,7 @@ def current_context(): def stream(func): - """Mark an async function as a streaming routine. + """Mark an async function as a streaming routine with ``@stream``. """ func._tractor_stream_function = True sig = inspect.signature(func) diff --git a/tractor/msg.py b/tractor/msg.py index 4184333..5b343b6 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -3,7 +3,7 @@ Messaging pattern APIs and helpers. """ import inspect import typing -from typing import Dict, Any, Set, Union, Callable +from typing import Dict, Any, Set, Callable from functools import partial from async_generator import aclosing @@ -90,7 +90,7 @@ def modify_subs(topics2ctxs, topics, ctx): _pub_state: Dict[str, dict] = {} -_pubtask2lock: Dict[str, dict] = {} +_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {} def pub( @@ -184,7 +184,7 @@ def pub( if wrapped is None: return partial(pub, tasks=tasks) - task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = {} + task2lock: Dict[str, trio.StrictFIFOLock] = {} for name in tasks: task2lock[name] = trio.StrictFIFOLock()