diff --git a/newsfragments/264.bug.rst b/newsfragments/264.bug.rst new file mode 100644 index 0000000..dda3389 --- /dev/null +++ b/newsfragments/264.bug.rst @@ -0,0 +1,8 @@ +Fix ``Portal.run_in_actor()`` returns ``None`` result. + +``None`` was being used as the cached result flag and obviously breaks +on a ``None`` returned from the remote target task. This would cause an +infinite hang if user code ever called ``Portal.result()`` *before* the +nursery exit. The simple fix is to use the *return message* as the +initial "no-result-received-yet" flag value and, once received, the +return value is read from the message to avoid the cache logic error. diff --git a/tests/test_spawning.py b/tests/test_spawning.py index b1110fe..cdeacba 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -1,7 +1,7 @@ """ Spawning basics """ -from typing import Dict, Tuple +from typing import Dict, Tuple, Optional import pytest import trio @@ -93,24 +93,38 @@ async def test_movie_theatre_convo(start_method): await portal.cancel_actor() -async def cellar_door(): - return "Dang that's beautiful" +async def cellar_door(return_value: Optional[str]): + return return_value +@pytest.mark.parametrize( + 'return_value', ["Dang that's beautiful", None], + ids=['return_str', 'return_None'], +) @tractor_test -async def test_most_beautiful_word(start_method): - """The main ``tractor`` routine. - """ - async with tractor.open_nursery() as n: +async def test_most_beautiful_word( + start_method, + return_value +): + ''' + The main ``tractor`` routine. - portal = await n.run_in_actor( - cellar_door, - name='some_linguist', - ) + ''' + with trio.fail_after(1): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + cellar_door, + return_value=return_value, + name='some_linguist', + ) + + print(await portal.result()) # The ``async with`` will unblock here since the 'some_linguist' # actor has completed its main task ``cellar_door``. + # this should pull the cached final result already captured during + # the nursery block exit. print(await portal.result()) diff --git a/tractor/_portal.py b/tractor/_portal.py index 382f8e4..80fc902 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -21,7 +21,7 @@ from .log import get_logger from ._exceptions import ( unpack_error, NoResult, - RemoteActorError, + # RemoteActorError, ContextCancelled, ) from ._streaming import Context, ReceiveMsgStream @@ -54,8 +54,23 @@ def func_deats(func: Callable) -> Tuple[str, str]: ) +def _unwrap_msg( + + msg: dict[str, Any], + channel: Channel + +) -> Any: + try: + return msg['return'] + except KeyError: + # internal error should never get here + assert msg.get('cid'), "Received internal error at portal?" + raise unpack_error(msg, channel) + + class Portal: - """A 'portal' to a(n) (remote) ``Actor``. + ''' + A 'portal' to a(n) (remote) ``Actor``. A portal is "opened" (and eventually closed) by one side of an inter-actor communication context. The side which opens the portal @@ -71,13 +86,14 @@ class Portal: function calling semantics are supported transparently; hence it is like having a "portal" between the seperate actor memory spaces. - """ + ''' def __init__(self, channel: Channel) -> None: self.channel = channel # when this is set to a tuple returned from ``_submit()`` then # it is expected that ``result()`` will be awaited at some point # during the portal's lifetime - self._result: Optional[Any] = None + self._result_msg: Optional[dict] = None + # set when _submit_for_result is called self._expect_result: Optional[ Tuple[str, Any, str, Dict[str, Any]] @@ -128,16 +144,12 @@ class Portal: recv_chan: trio.abc.ReceiveChannel, resptype: str, first_msg: dict - ) -> Any: - assert resptype == 'asyncfunc' # single response + ) -> dict[str, Any]: + + assert resptype == 'asyncfunc' # single response msg = await recv_chan.receive() - try: - return msg['return'] - except KeyError: - # internal error should never get here - assert msg.get('cid'), "Received internal error at portal?" - raise unpack_error(msg, self.channel) + return msg async def result(self) -> Any: """Return the result(s) from the remote actor's "main" task. @@ -158,17 +170,11 @@ class Portal: # expecting a "main" result assert self._expect_result - if self._result is None: - try: - self._result = await self._return_once(*self._expect_result) - except RemoteActorError as err: - self._result = err - # re-raise error on every call - if isinstance(self._result, RemoteActorError): - raise self._result + if self._result_msg is None: + self._result_msg = await self._return_once(*self._expect_result) - return self._result + return _unwrap_msg(self._result_msg, self.channel) async def _cancel_streams(self): # terminate all locally running async generator @@ -243,9 +249,10 @@ class Portal: instance methods in the remote runtime. Currently this should only be used for `tractor` internals. """ - return await self._return_once( + msg = await self._return_once( *(await self._submit(namespace_path, function_name, kwargs)) ) + return _unwrap_msg(msg, self.channel) async def run( self, @@ -253,12 +260,14 @@ class Portal: fn_name: Optional[str] = None, **kwargs ) -> Any: - """Submit a remote function to be scheduled and run by actor, in + ''' + Submit a remote function to be scheduled and run by actor, in a new task, wrap and return its (stream of) result(s). This is a blocking call and returns either a value from the remote rpc task or a local async generator instance. - """ + + ''' if isinstance(func, str): warnings.warn( "`Portal.run(namespace: str, funcname: str)` is now" @@ -284,8 +293,11 @@ class Portal: fn_mod_path, fn_name = func_deats(func) - return await self._return_once( - *(await self._submit(fn_mod_path, fn_name, kwargs)) + return _unwrap_msg( + await self._return_once( + *(await self._submit(fn_mod_path, fn_name, kwargs)), + ), + self.channel, ) @asynccontextmanager