forked from goodboy/tractor
Pub sub internals type fixes
parent
3df001f3a9
commit
e546ead2ff
|
@ -9,7 +9,7 @@ import importlib.util
|
||||||
import inspect
|
import inspect
|
||||||
import uuid
|
import uuid
|
||||||
import typing
|
import typing
|
||||||
from typing import Dict, List, Tuple, Any, Optional
|
from typing import Dict, List, Tuple, Any, Optional, Union
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
@ -48,7 +48,9 @@ async def _invoke(
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
func: typing.Callable,
|
func: typing.Callable,
|
||||||
kwargs: Dict[str, Any],
|
kwargs: Dict[str, Any],
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[
|
||||||
|
Union[trio.CancelScope, BaseException]
|
||||||
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
):
|
):
|
||||||
"""Invoke local func and deliver result(s) over provided channel.
|
"""Invoke local func and deliver result(s) over provided channel.
|
||||||
"""
|
"""
|
||||||
|
@ -155,6 +157,7 @@ async def _invoke(
|
||||||
if cs is None:
|
if cs is None:
|
||||||
# error is from above code not from rpc invocation
|
# error is from above code not from rpc invocation
|
||||||
task_status.started(err)
|
task_status.started(err)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# RPC task bookeeping
|
# RPC task bookeeping
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -204,8 +204,8 @@ class Portal:
|
||||||
fn_name: Optional[str] = None,
|
fn_name: Optional[str] = None,
|
||||||
**kwargs
|
**kwargs
|
||||||
) -> Any:
|
) -> Any:
|
||||||
"""Submit a remote function to be scheduled and run by actor,
|
"""Submit a remote function to be scheduled and run by actor, in
|
||||||
wrap and return its (stream of) result(s).
|
a new task, wrap and return its (stream of) result(s).
|
||||||
|
|
||||||
This is a blocking call and returns either a value from the
|
This is a blocking call and returns either a value from the
|
||||||
remote rpc task or a local async generator instance.
|
remote rpc task or a local async generator instance.
|
||||||
|
|
|
@ -37,7 +37,7 @@ def current_context():
|
||||||
|
|
||||||
|
|
||||||
def stream(func):
|
def stream(func):
|
||||||
"""Mark an async function as a streaming routine.
|
"""Mark an async function as a streaming routine with ``@stream``.
|
||||||
"""
|
"""
|
||||||
func._tractor_stream_function = True
|
func._tractor_stream_function = True
|
||||||
sig = inspect.signature(func)
|
sig = inspect.signature(func)
|
||||||
|
|
|
@ -3,7 +3,7 @@ Messaging pattern APIs and helpers.
|
||||||
"""
|
"""
|
||||||
import inspect
|
import inspect
|
||||||
import typing
|
import typing
|
||||||
from typing import Dict, Any, Set, Union, Callable
|
from typing import Dict, Any, Set, Callable
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from async_generator import aclosing
|
from async_generator import aclosing
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ def modify_subs(topics2ctxs, topics, ctx):
|
||||||
|
|
||||||
|
|
||||||
_pub_state: Dict[str, dict] = {}
|
_pub_state: Dict[str, dict] = {}
|
||||||
_pubtask2lock: Dict[str, dict] = {}
|
_pubtask2lock: Dict[str, trio.StrictFIFOLock] = {}
|
||||||
|
|
||||||
|
|
||||||
def pub(
|
def pub(
|
||||||
|
@ -184,7 +184,7 @@ def pub(
|
||||||
if wrapped is None:
|
if wrapped is None:
|
||||||
return partial(pub, tasks=tasks)
|
return partial(pub, tasks=tasks)
|
||||||
|
|
||||||
task2lock: Dict[Union[str, None], trio.StrictFIFOLock] = {}
|
task2lock: Dict[str, trio.StrictFIFOLock] = {}
|
||||||
|
|
||||||
for name in tasks:
|
for name in tasks:
|
||||||
task2lock[name] = trio.StrictFIFOLock()
|
task2lock[name] = trio.StrictFIFOLock()
|
||||||
|
|
Loading…
Reference in New Issue